123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693 |
- /* -*- Mode: C++ -*- */
- /*
- * Copyright 2016 Google Inc. All Rights Reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- /*
- * This program starts with a text file (and optionally a vocabulary file) and
- * computes co-occurrence statistics. It emits output in a format that can be
- * consumed by the "swivel" program. It's functionally equivalent to "prep.py",
- * but works much more quickly.
- */
- #include <assert.h>
- #include <fcntl.h>
- #include <stdio.h>
- #include <sys/mman.h>
- #include <sys/stat.h>
- #include <unistd.h>
- #include <algorithm>
- #include <fstream>
- #include <iomanip>
- #include <iostream>
- #include <map>
- #include <mutex>
- #include <string>
- #include <thread>
- #include <tuple>
- #include <unordered_map>
- #include <vector>
- #include "google/protobuf/io/zero_copy_stream_impl.h"
- #include "tensorflow/core/example/example.pb.h"
- #include "tensorflow/core/example/feature.pb.h"
- static const char usage[] = R"(
- Prepares a corpus for processing by Swivel.
- Usage:
- prep --output_dir <output-dir> --input <text-file>
- Options:
- --input <filename>
- The input text.
- --output_dir <directory>
- Specifies the output directory where the various Swivel data
- files should be placed. This directory must exist.
- --shard_size <int>
- Specifies the shard size; default 4096.
- --min_count <int>
- The minimum number of times a word should appear to be included in the
- generated vocabulary; default 5. (Ignored if --vocab is used.)
- --max_vocab <int>
- The maximum vocabulary size to generate from the input corpus; default
- 102,400. (Ignored if --vocab is used.)
- --vocab <filename>
- Use the specified unigram vocabulary instead of generating
- it from the corpus.
- --window_size <int>
- Specifies the window size for computing co-occurrence stats;
- default 10.
- --num_threads <int>
- The number of workers to calculate the co-occurrence matrix;
- default 4.
- )";
- struct cooc_t {
- int row;
- int col;
- float cnt;
- };
- typedef std::map<long long, float> cooc_counts_t;
- // Retrieves the next word from the input stream, treating words as simply being
- // delimited by whitespace. Returns true if this is the end of a "sentence";
- // i.e., a newline.
- bool NextWord(std::ifstream &fin, std::string* word) {
- std::string buf;
- char c;
- if (fin.eof()) {
- word->erase();
- return true;
- }
- // Skip leading whitespace.
- do {
- c = fin.get();
- } while (!fin.eof() && std::isspace(c));
- if (fin.eof()) {
- word->erase();
- return true;
- }
- // Read the next word.
- do {
- buf += c;
- c = fin.get();
- } while (!fin.eof() && !std::isspace(c));
- *word = buf;
- if (c == '\n' || fin.eof()) return true;
- // Skip trailing whitespace.
- do {
- c = fin.get();
- } while (!fin.eof() && std::isspace(c));
- if (fin.eof()) return true;
- fin.unget();
- return false;
- }
- // Creates a vocabulary from the most frequent terms in the input file.
- std::vector<std::string> CreateVocabulary(const std::string input_filename,
- const int shard_size,
- const int min_vocab_count,
- const int max_vocab_size) {
- std::vector<std::string> vocab;
- // Count all the distinct tokens in the file. (XXX this will eventually
- // consume all memory and should be re-written to periodically trim the data.)
- std::unordered_map<std::string, long long> counts;
- std::ifstream fin(input_filename, std::ifstream::ate);
- if (!fin) {
- std::cerr << "couldn't read input file '" << input_filename << "'"
- << std::endl;
- return vocab;
- }
- const auto input_size = fin.tellg();
- fin.seekg(0);
- long long ntokens = 0;
- while (!fin.eof()) {
- std::string word;
- NextWord(fin, &word);
- counts[word] += 1;
- if (++ntokens % 1000000 == 0) {
- const float pct = 100.0 * static_cast<float>(fin.tellg()) / input_size;
- fprintf(stdout, "\rComputing vocabulary: %0.1f%% complete...", pct);
- std::flush(std::cout);
- }
- }
- std::cout << counts.size() << " distinct tokens" << std::endl;
- // Sort the vocabulary from most frequent to least frequent.
- std::vector<std::pair<std::string, long long>> buf;
- std::copy(counts.begin(), counts.end(), std::back_inserter(buf));
- std::sort(buf.begin(), buf.end(),
- [](const std::pair<std::string, long long> &a,
- const std::pair<std::string, long long> &b) {
- return b.second < a.second;
- });
- // Truncate to the maximum vocabulary size
- if (static_cast<int>(buf.size()) > max_vocab_size) buf.resize(max_vocab_size);
- if (buf.empty()) return vocab;
- // Eliminate rare tokens and truncate to a size modulo the shard size.
- int vocab_size = buf.size();
- while (vocab_size > 0 && buf[vocab_size - 1].second < min_vocab_count)
- --vocab_size;
- vocab_size -= vocab_size % shard_size;
- if (static_cast<int>(buf.size()) > vocab_size) buf.resize(vocab_size);
- // Copy out the tokens.
- for (const auto& pair : buf) vocab.push_back(pair.first);
- return vocab;
- }
- std::vector<std::string> ReadVocabulary(const std::string vocab_filename) {
- std::vector<std::string> vocab;
- std::ifstream fin(vocab_filename);
- int index = 0;
- for (std::string token; std::getline(fin, token); ++index) {
- auto n = token.find('\t');
- if (n != std::string::npos) token = token.substr(n);
- vocab.push_back(token);
- }
- return vocab;
- }
- void WriteVocabulary(const std::vector<std::string> &vocab,
- const std::string &output_dirname) {
- for (const std::string filename : {"row_vocab.txt", "col_vocab.txt"}) {
- std::ofstream fout(output_dirname + "/" + filename);
- for (const auto &token : vocab) fout << token << std::endl;
- }
- }
- // Manages accumulation of co-occurrence data into temporary disk buffer files.
- class CoocBuffer {
- public:
- CoocBuffer(const std::string &output_dirname, const int num_shards,
- const int shard_size);
- // Accumulate the co-occurrence counts to the buffer.
- void AccumulateCoocs(const cooc_counts_t &coocs);
- // Read the buffer to produce shard files.
- void WriteShards();
- protected:
- // The output directory. Also used for temporary buffer files.
- const std::string output_dirname_;
- // The number of row/column shards.
- const int num_shards_;
- // The number of elements per shard.
- const int shard_size_;
- // Parallel arrays of temporary file paths and file descriptors.
- std::vector<std::string> paths_;
- std::vector<int> fds_;
- // Ensures that only one buffer file is getting written at a time.
- std::mutex writer_mutex_;
- };
- CoocBuffer::CoocBuffer(const std::string &output_dirname, const int num_shards,
- const int shard_size)
- : output_dirname_(output_dirname),
- num_shards_(num_shards),
- shard_size_(shard_size) {
- for (int row = 0; row < num_shards_; ++row) {
- for (int col = 0; col < num_shards_; ++col) {
- char filename[256];
- sprintf(filename, "shard-%03d-%03d.tmp", row, col);
- std::string path = output_dirname + "/" + filename;
- int fd = open(path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
- assert(fd > 0);
- paths_.push_back(path);
- fds_.push_back(fd);
- }
- }
- }
- void CoocBuffer::AccumulateCoocs(const cooc_counts_t &coocs) {
- std::vector<std::vector<cooc_t>> bufs(fds_.size());
- for (const auto &cooc : coocs) {
- const int row_id = cooc.first >> 32;
- const int col_id = cooc.first & 0xffffffff;
- const float cnt = cooc.second;
- const int row_shard = row_id % num_shards_;
- const int row_off = row_id / num_shards_;
- const int col_shard = col_id % num_shards_;
- const int col_off = col_id / num_shards_;
- const int top_shard_idx = row_shard * num_shards_ + col_shard;
- bufs[top_shard_idx].push_back(cooc_t{row_off, col_off, cnt});
- const int bot_shard_idx = col_shard * num_shards_ + row_shard;
- bufs[bot_shard_idx].push_back(cooc_t{col_off, row_off, cnt});
- }
- for (int i = 0; i < static_cast<int>(fds_.size()); ++i) {
- std::lock_guard<std::mutex> rv(writer_mutex_);
- const int nbytes = bufs[i].size() * sizeof(cooc_t);
- int nwritten = write(fds_[i], bufs[i].data(), nbytes);
- assert(nwritten == nbytes);
- }
- }
- void CoocBuffer::WriteShards() {
- for (int shard = 0; shard < static_cast<int>(fds_.size()); ++shard) {
- const int row_shard = shard / num_shards_;
- const int col_shard = shard % num_shards_;
- std::cout << "\rwriting shard " << (shard + 1) << "/"
- << (num_shards_ * num_shards_);
- std::flush(std::cout);
- // Construct the tf::Example proto. First, we add the global rows and
- // column that are present in the shard.
- tensorflow::Example example;
- auto &feature = *example.mutable_features()->mutable_feature();
- auto global_row = feature["global_row"].mutable_int64_list();
- auto global_col = feature["global_col"].mutable_int64_list();
- for (int i = 0; i < shard_size_; ++i) {
- global_row->add_value(row_shard + i * num_shards_);
- global_col->add_value(col_shard + i * num_shards_);
- }
- // Next we add co-occurrences as a sparse representation. Map the
- // co-occurrence counts that we've spooled off to disk: these are in
- // arbitrary order and may contain duplicates.
- const off_t nbytes = lseek(fds_[shard], 0, SEEK_END);
- cooc_t *coocs = static_cast<cooc_t*>(
- mmap(0, nbytes, PROT_READ | PROT_WRITE, MAP_SHARED, fds_[shard], 0));
- const int ncoocs = nbytes / sizeof(cooc_t);
- cooc_t* cur = coocs;
- cooc_t* end = coocs + ncoocs;
- auto sparse_value = feature["sparse_value"].mutable_float_list();
- auto sparse_local_row = feature["sparse_local_row"].mutable_int64_list();
- auto sparse_local_col = feature["sparse_local_col"].mutable_int64_list();
- std::sort(cur, end, [](const cooc_t &a, const cooc_t &b) {
- return a.row < b.row || (a.row == b.row && a.col < b.col);
- });
- // Accumulate the counts into the protocol buffer.
- int last_row = -1, last_col = -1;
- float count = 0;
- for (; cur != end; ++cur) {
- if (cur->row != last_row || cur->col != last_col) {
- if (last_row >= 0 && last_col >= 0) {
- sparse_local_row->add_value(last_row);
- sparse_local_col->add_value(last_col);
- sparse_value->add_value(count);
- }
- last_row = cur->row;
- last_col = cur->col;
- count = 0;
- }
- count += cur->cnt;
- }
- if (last_row >= 0 && last_col >= 0) {
- sparse_local_row->add_value(last_row);
- sparse_local_col->add_value(last_col);
- sparse_value->add_value(count);
- }
- munmap(coocs, nbytes);
- close(fds_[shard]);
- if (sparse_local_row->value_size() * 8 >= (64 << 20)) {
- std::cout << "Warning: you are likely to catch protobuf parsing errors "
- "in TF 1.0 and older because the shard is too fat (>= 64MiB); see "
- << std::endl <<
- "kDefaultTotalBytesLimit in src/google/protobuf/io/coded_stream.h "
- " changed in protobuf/commit/5a76e633ea9b5adb215e93fdc11e1c0c08b3fc74"
- << std::endl <<
- "https://github.com/tensorflow/tensorflow/issues/7311"
- << std::endl <<
- "Consider increasing the number of shards.";
- }
- // Write the protocol buffer as a binary blob to disk.
- const int filename_max_size = 4096;
- std::unique_ptr<char[]> filename(new char[filename_max_size]);
- snprintf(filename.get(), filename_max_size, "shard-%03d-%03d.pb", row_shard,
- col_shard);
- const std::string path = output_dirname_ + "/" + filename.get();
- int fd = open(path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 0666);
- assert(fd != -1);
- google::protobuf::io::FileOutputStream fout(fd);
- example.SerializeToZeroCopyStream(&fout);
- fout.Close();
- // Remove the temporary file.
- unlink(paths_[shard].c_str());
- }
- std::cout << std::endl;
- }
- // Counts the co-occurrences in part of the file.
- class CoocCounter {
- public:
- CoocCounter(const std::string &input_filename, const off_t start,
- const off_t end, const int window_size,
- const std::unordered_map<std::string, int> &token_to_id_map,
- CoocBuffer *coocbuf)
- : fin_(input_filename, std::ifstream::ate),
- start_(start),
- end_(end),
- window_size_(window_size),
- token_to_id_map_(token_to_id_map),
- coocbuf_(coocbuf),
- marginals_(token_to_id_map.size()) {}
- // PTthreads-friendly thunk to Count.
- static void* Run(void* param) {
- CoocCounter* self = static_cast<CoocCounter*>(param);
- self->Count();
- return nullptr;
- }
- // Counts the co-occurrences.
- void Count();
- const std::vector<double>& Marginals() const { return marginals_; }
- protected:
- // The input stream.
- std::ifstream fin_;
- // The range of the file to which this counter should attend.
- const off_t start_;
- const off_t end_;
- // The window size for computing co-occurrences.
- const int window_size_;
- // A reference to the mapping from tokens to IDs.
- const std::unordered_map<std::string, int> &token_to_id_map_;
- // The buffer into which counts are to be accumulated.
- CoocBuffer* coocbuf_;
- // The marginal counts accumulated by this counter.
- std::vector<double> marginals_;
- };
- void CoocCounter::Count() {
- const int max_coocs_size = 16 * 1024 * 1024;
- // A buffer of co-occurrence counts that we'll periodically sort into
- // shards.
- cooc_counts_t coocs;
- fin_.seekg(start_);
- int nlines = 0;
- for (off_t filepos = start_; filepos < end_ && !fin_.eof(); filepos = fin_.tellg()) {
- // Buffer a single sentence.
- std::vector<int> sentence;
- bool eos;
- do {
- std::string word;
- eos = NextWord(fin_, &word);
- auto it = token_to_id_map_.find(word);
- if (it != token_to_id_map_.end()) sentence.push_back(it->second);
- } while (!eos);
- // Generate the co-occurrences for the sentence.
- for (int pos = 0; pos < static_cast<int>(sentence.size()); ++pos) {
- const int left_id = sentence[pos];
- const int window_extent =
- std::min(static_cast<int>(sentence.size()) - pos, 1 + window_size_);
- for (int off = 1; off < window_extent; ++off) {
- const int right_id = sentence[pos + off];
- const double count = 1.0 / static_cast<double>(off);
- const long long lo = std::min(left_id, right_id);
- const long long hi = std::max(left_id, right_id);
- const long long key = (hi << 32) | lo;
- coocs[key] += count;
- marginals_[left_id] += count;
- marginals_[right_id] += count;
- }
- marginals_[left_id] += 1.0;
- const long long key = (static_cast<long long>(left_id) << 32) |
- static_cast<long long>(left_id);
- coocs[key] += 0.5;
- }
- // Periodically flush the co-occurrences to disk.
- if (coocs.size() > max_coocs_size) {
- coocbuf_->AccumulateCoocs(coocs);
- coocs.clear();
- }
- if (start_ == 0 && ++nlines % 1000 == 0) {
- const double pct = 100.0 * filepos / end_;
- fprintf(stdout, "\rComputing co-occurrences: %0.1f%% complete...", pct);
- std::flush(std::cout);
- }
- }
- // Accumulate anything we haven't flushed yet.
- coocbuf_->AccumulateCoocs(coocs);
- if (start_ == 0) std::cout << "done." << std::endl;
- }
- void WriteMarginals(const std::vector<double> &marginals,
- const std::string &output_dirname) {
- for (const std::string filename : {"row_sums.txt", "col_sums.txt"}) {
- std::ofstream fout(output_dirname + "/" + filename);
- fout.setf(std::ios::fixed);
- for (double sum : marginals) fout << sum << std::endl;
- }
- }
- int main(int argc, char *argv[]) {
- std::string input_filename;
- std::string vocab_filename;
- std::string output_dirname;
- bool generate_vocab = true;
- int max_vocab_size = 100 * 1024;
- int min_vocab_count = 5;
- int window_size = 10;
- int shard_size = 4096;
- int num_threads = 4;
- for (int i = 1; i < argc; ++i) {
- std::string arg(argv[i]);
- if (arg == "--vocab") {
- if (++i >= argc) goto argmissing;
- generate_vocab = false;
- vocab_filename = argv[i];
- } else if (arg == "--max_vocab") {
- if (++i >= argc) goto argmissing;
- if ((max_vocab_size = atoi(argv[i])) <= 0) goto badarg;
- } else if (arg == "--min_count") {
- if (++i >= argc) goto argmissing;
- if ((min_vocab_count = atoi(argv[i])) <= 0) goto badarg;
- } else if (arg == "--window_size") {
- if (++i >= argc) goto argmissing;
- if ((window_size = atoi(argv[i])) <= 0) goto badarg;
- } else if (arg == "--input") {
- if (++i >= argc) goto argmissing;
- input_filename = argv[i];
- } else if (arg == "--output_dir") {
- if (++i >= argc) goto argmissing;
- output_dirname = argv[i];
- } else if (arg == "--shard_size") {
- if (++i >= argc) goto argmissing;
- shard_size = atoi(argv[i]);
- } else if (arg == "--num_threads") {
- if (++i >= argc) goto argmissing;
- num_threads = atoi(argv[i]);
- } else if (arg == "--help") {
- std::cout << usage << std::endl;
- return 0;
- } else {
- std::cerr << "unknown arg '" << arg << "'; try --help?" << std::endl;
- return 2;
- }
- continue;
- badarg:
- std::cerr << "'" << argv[i] << "' is not a valid value for '" << arg
- << "'; try --help?" << std::endl;
- return 2;
- argmissing:
- std::cerr << arg << " requires an argument; try --help?" << std::endl;
- }
- if (input_filename.empty()) {
- std::cerr << "please specify the input text with '--input'; try --help?"
- << std::endl;
- return 2;
- }
- if (output_dirname.empty()) {
- std::cerr << "please specify the output directory with '--output_dir'"
- << std::endl;
- return 2;
- }
- struct stat sb;
- if (lstat(output_dirname.c_str(), &sb) != 0 || !S_ISDIR(sb.st_mode)) {
- if (mkdir(output_dirname.c_str(), 0755) != 0) {
- std::cerr << "output directory '" << output_dirname
- << "' does not exist or is not a directory." << std::endl;
- return 1;
- }
- }
- if (lstat(input_filename.c_str(), &sb) != 0 || !S_ISREG(sb.st_mode)) {
- std::cerr << "input file '" << input_filename
- << "' does not exist or is not a file." << std::endl;
- return 1;
- }
- // The total size of the input.
- const off_t input_size = sb.st_size;
- const std::vector<std::string> vocab =
- generate_vocab ? CreateVocabulary(input_filename, shard_size,
- min_vocab_count, max_vocab_size)
- : ReadVocabulary(vocab_filename);
- if (!vocab.size()) {
- std::cerr << "Empty vocabulary." << std::endl;
- return 1;
- }
- std::cout << "Generating Swivel co-occurrence data into " << output_dirname
- << std::endl;
- std::cout << "Shard size: " << shard_size << "x" << shard_size << std::endl;
- std::cout << "Vocab size: " << vocab.size() << std::endl;
- // Write the vocabulary files into the output directory.
- WriteVocabulary(vocab, output_dirname);
- const int num_shards = vocab.size() / shard_size;
- CoocBuffer coocbuf(output_dirname, num_shards, shard_size);
- // Build a mapping from the token to its position in the vocabulary file.
- std::unordered_map<std::string, int> token_to_id_map;
- for (int i = 0; i < static_cast<int>(vocab.size()); ++i)
- token_to_id_map[vocab[i]] = i;
- // Compute the co-occurrences
- std::vector<std::thread> threads;
- threads.reserve(num_threads);
- std::vector<CoocCounter*> counters;
- const off_t nbytes_per_thread = input_size / num_threads;
- std::cout << "Running " << num_threads << " threads, each on "
- << nbytes_per_thread << " bytes" << std::endl;
- for (int i = 0; i < num_threads; ++i) {
- // We could make this smarter and look around for newlines. But
- // realistically that's not going to change things much.
- const off_t start = i * nbytes_per_thread;
- const off_t end =
- i < num_threads - 1 ? (i + 1) * nbytes_per_thread : input_size;
- CoocCounter *counter = new CoocCounter(
- input_filename, start, end, window_size, token_to_id_map, &coocbuf);
- counters.push_back(counter);
- threads.emplace_back(CoocCounter::Run, counter);
- }
- // Wait for threads to finish and collect marginals.
- std::vector<double> marginals(vocab.size());
- for (int i = 0; i < num_threads; ++i) {
- if (i > 0) {
- std::cout << "joining thread #" << (i + 1) << std::endl;
- }
- threads[i].join();
- const std::vector<double>& counter_marginals = counters[i]->Marginals();
- for (int j = 0; j < static_cast<int>(vocab.size()); ++j)
- marginals[j] += counter_marginals[j];
- delete counters[i];
- }
- std::cout << "writing marginals..." << std::endl;
- WriteMarginals(marginals, output_dirname);
- std::cout << "writing shards..." << std::endl;
- coocbuf.WriteShards();
- return 0;
- }
|