fastprep.cc 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681
  1. /* -*- Mode: C++ -*- */
  2. /*
  3. * Copyright 2016 Google Inc. All Rights Reserved.
  4. *
  5. * Licensed under the Apache License, Version 2.0 (the "License");
  6. * you may not use this file except in compliance with the License.
  7. * You may obtain a copy of the License at
  8. *
  9. * http://www.apache.org/licenses/LICENSE-2.0
  10. *
  11. * Unless required by applicable law or agreed to in writing, software
  12. * distributed under the License is distributed on an "AS IS" BASIS,
  13. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. * See the License for the specific language governing permissions and
  15. * limitations under the License.
  16. */
  17. /*
  18. * This program starts with a text file (and optionally a vocabulary file) and
  19. * computes co-occurrence statistics. It emits output in a format that can be
  20. * consumed by the "swivel" program. It's functionally equivalent to "prep.py",
  21. * but works much more quickly.
  22. */
  23. #include <assert.h>
  24. #include <fcntl.h>
  25. #include <pthread.h>
  26. #include <stdio.h>
  27. #include <sys/mman.h>
  28. #include <sys/stat.h>
  29. #include <unistd.h>
  30. #include <algorithm>
  31. #include <fstream>
  32. #include <iomanip>
  33. #include <iostream>
  34. #include <map>
  35. #include <string>
  36. #include <tuple>
  37. #include <unordered_map>
  38. #include <vector>
  39. #include "google/protobuf/io/zero_copy_stream_impl.h"
  40. #include "tensorflow/core/example/example.pb.h"
  41. #include "tensorflow/core/example/feature.pb.h"
  42. static const char usage[] = R"(
  43. Prepares a corpus for processing by Swivel.
  44. Usage:
  45. prep --output_dir <output-dir> --input <text-file>
  46. Options:
  47. --input <filename>
  48. The input text.
  49. --output_dir <directory>
  50. Specifies the output directory where the various Swivel data
  51. files should be placed. This directory must exist.
  52. --shard_size <int>
  53. Specifies the shard size; default 4096.
  54. --min_count <int>
  55. The minimum number of times a word should appear to be included in the
  56. generated vocabulary; default 5. (Ignored if --vocab is used.)
  57. --max_vocab <int>
  58. The maximum vocabulary size to generate from the input corpus; default
  59. 102,400. (Ignored if --vocab is used.)
  60. --vocab <filename>
  61. Use the specified unigram vocabulary instead of generating
  62. it from the corpus.
  63. --window_size <int>
  64. Specifies the window size for computing co-occurrence stats;
  65. default 10.
  66. )";
  67. struct cooc_t {
  68. int row;
  69. int col;
  70. float cnt;
  71. };
  72. typedef std::map<long long, float> cooc_counts_t;
  73. // Retrieves the next word from the input stream, treating words as simply being
  74. // delimited by whitespace. Returns true if this is the end of a "sentence";
  75. // i.e., a newline.
  76. bool NextWord(std::ifstream &fin, std::string* word) {
  77. std::string buf;
  78. char c;
  79. if (fin.eof()) {
  80. word->erase();
  81. return true;
  82. }
  83. // Skip leading whitespace.
  84. do {
  85. c = fin.get();
  86. } while (!fin.eof() && std::isspace(c));
  87. if (fin.eof()) {
  88. word->erase();
  89. return true;
  90. }
  91. // Read the next word.
  92. do {
  93. buf += c;
  94. c = fin.get();
  95. } while (!fin.eof() && !std::isspace(c));
  96. *word = buf;
  97. if (c == '\n' || fin.eof()) return true;
  98. // Skip trailing whitespace.
  99. do {
  100. c = fin.get();
  101. } while (!fin.eof() && std::isspace(c));
  102. if (fin.eof()) return true;
  103. fin.unget();
  104. return false;
  105. }
  106. // Creates a vocabulary from the most frequent terms in the input file.
  107. std::vector<std::string> CreateVocabulary(const std::string input_filename,
  108. const int shard_size,
  109. const int min_vocab_count,
  110. const int max_vocab_size) {
  111. std::vector<std::string> vocab;
  112. // Count all the distinct tokens in the file. (XXX this will eventually
  113. // consume all memory and should be re-written to periodically trim the data.)
  114. std::unordered_map<std::string, long long> counts;
  115. std::ifstream fin(input_filename, std::ifstream::ate);
  116. if (!fin) {
  117. std::cerr << "couldn't read input file '" << input_filename << "'"
  118. << std::endl;
  119. return vocab;
  120. }
  121. const auto input_size = fin.tellg();
  122. fin.seekg(0);
  123. long long ntokens = 0;
  124. while (!fin.eof()) {
  125. std::string word;
  126. NextWord(fin, &word);
  127. counts[word] += 1;
  128. if (++ntokens % 1000000 == 0) {
  129. const float pct = 100.0 * static_cast<float>(fin.tellg()) / input_size;
  130. fprintf(stdout, "\rComputing vocabulary: %0.1f%% complete...", pct);
  131. std::flush(std::cout);
  132. }
  133. }
  134. std::cout << counts.size() << " distinct tokens" << std::endl;
  135. // Sort the vocabulary from most frequent to least frequent.
  136. std::vector<std::pair<std::string, long long>> buf;
  137. std::copy(counts.begin(), counts.end(), std::back_inserter(buf));
  138. std::sort(buf.begin(), buf.end(),
  139. [](const std::pair<std::string, long long> &a,
  140. const std::pair<std::string, long long> &b) {
  141. return b.second < a.second;
  142. });
  143. // Truncate to the maximum vocabulary size
  144. if (static_cast<int>(buf.size()) > max_vocab_size) buf.resize(max_vocab_size);
  145. if (buf.empty()) return vocab;
  146. // Eliminate rare tokens and truncate to a size modulo the shard size.
  147. int vocab_size = buf.size();
  148. while (vocab_size > 0 && buf[vocab_size - 1].second < min_vocab_count)
  149. --vocab_size;
  150. vocab_size -= vocab_size % shard_size;
  151. if (static_cast<int>(buf.size()) > vocab_size) buf.resize(vocab_size);
  152. // Copy out the tokens.
  153. for (const auto& pair : buf) vocab.push_back(pair.first);
  154. return vocab;
  155. }
  156. std::vector<std::string> ReadVocabulary(const std::string vocab_filename) {
  157. std::vector<std::string> vocab;
  158. std::ifstream fin(vocab_filename);
  159. int index = 0;
  160. for (std::string token; std::getline(fin, token); ++index) {
  161. auto n = token.find('\t');
  162. if (n != std::string::npos) token = token.substr(n);
  163. vocab.push_back(token);
  164. }
  165. return vocab;
  166. }
  167. void WriteVocabulary(const std::vector<std::string> &vocab,
  168. const std::string &output_dirname) {
  169. for (const std::string filename : {"row_vocab.txt", "col_vocab.txt"}) {
  170. std::ofstream fout(output_dirname + "/" + filename);
  171. for (const auto &token : vocab) fout << token << std::endl;
  172. }
  173. }
  174. // Manages accumulation of co-occurrence data into temporary disk buffer files.
  175. class CoocBuffer {
  176. public:
  177. CoocBuffer(const std::string &output_dirname, const int num_shards,
  178. const int shard_size);
  179. // Accumulate the co-occurrence counts to the buffer.
  180. void AccumulateCoocs(const cooc_counts_t &coocs);
  181. // Read the buffer to produce shard files.
  182. void WriteShards();
  183. protected:
  184. // The output directory. Also used for temporary buffer files.
  185. const std::string output_dirname_;
  186. // The number of row/column shards.
  187. const int num_shards_;
  188. // The number of elements per shard.
  189. const int shard_size_;
  190. // Parallel arrays of temporary file paths and file descriptors.
  191. std::vector<std::string> paths_;
  192. std::vector<int> fds_;
  193. // Ensures that only one buffer file is getting written at a time.
  194. pthread_mutex_t writer_mutex_;
  195. };
  196. CoocBuffer::CoocBuffer(const std::string &output_dirname, const int num_shards,
  197. const int shard_size)
  198. : output_dirname_(output_dirname),
  199. num_shards_(num_shards),
  200. shard_size_(shard_size),
  201. writer_mutex_(PTHREAD_MUTEX_INITIALIZER) {
  202. for (int row = 0; row < num_shards_; ++row) {
  203. for (int col = 0; col < num_shards_; ++col) {
  204. char filename[256];
  205. sprintf(filename, "shard-%03d-%03d.tmp", row, col);
  206. std::string path = output_dirname + "/" + filename;
  207. int fd = open(path.c_str(), O_RDWR | O_CREAT | O_TRUNC, 0666);
  208. assert(fd > 0);
  209. paths_.push_back(path);
  210. fds_.push_back(fd);
  211. }
  212. }
  213. }
  214. void CoocBuffer::AccumulateCoocs(const cooc_counts_t &coocs) {
  215. std::vector<std::vector<cooc_t>> bufs(fds_.size());
  216. for (const auto &cooc : coocs) {
  217. const int row_id = cooc.first >> 32;
  218. const int col_id = cooc.first & 0xffffffff;
  219. const float cnt = cooc.second;
  220. const int row_shard = row_id % num_shards_;
  221. const int row_off = row_id / num_shards_;
  222. const int col_shard = col_id % num_shards_;
  223. const int col_off = col_id / num_shards_;
  224. const int top_shard_idx = row_shard * num_shards_ + col_shard;
  225. bufs[top_shard_idx].push_back(cooc_t{row_off, col_off, cnt});
  226. const int bot_shard_idx = col_shard * num_shards_ + row_shard;
  227. bufs[bot_shard_idx].push_back(cooc_t{col_off, row_off, cnt});
  228. }
  229. // XXX TODO: lock
  230. for (int i = 0; i < static_cast<int>(fds_.size()); ++i) {
  231. int rv = pthread_mutex_lock(&writer_mutex_);
  232. assert(rv == 0);
  233. const int nbytes = bufs[i].size() * sizeof(cooc_t);
  234. int nwritten = write(fds_[i], bufs[i].data(), nbytes);
  235. assert(nwritten == nbytes);
  236. pthread_mutex_unlock(&writer_mutex_);
  237. }
  238. }
  239. void CoocBuffer::WriteShards() {
  240. for (int shard = 0; shard < static_cast<int>(fds_.size()); ++shard) {
  241. const int row_shard = shard / num_shards_;
  242. const int col_shard = shard % num_shards_;
  243. std::cout << "\rwriting shard " << (shard + 1) << "/"
  244. << (num_shards_ * num_shards_);
  245. std::flush(std::cout);
  246. // Construct the tf::Example proto. First, we add the global rows and
  247. // column that are present in the shard.
  248. tensorflow::Example example;
  249. auto &feature = *example.mutable_features()->mutable_feature();
  250. auto global_row = feature["global_row"].mutable_int64_list();
  251. auto global_col = feature["global_col"].mutable_int64_list();
  252. for (int i = 0; i < shard_size_; ++i) {
  253. global_row->add_value(row_shard + i * num_shards_);
  254. global_col->add_value(col_shard + i * num_shards_);
  255. }
  256. // Next we add co-occurrences as a sparse representation. Map the
  257. // co-occurrence counts that we've spooled off to disk: these are in
  258. // arbitrary order and may contain duplicates.
  259. const off_t nbytes = lseek(fds_[shard], 0, SEEK_END);
  260. cooc_t *coocs = static_cast<cooc_t*>(
  261. mmap(0, nbytes, PROT_READ | PROT_WRITE, MAP_SHARED, fds_[shard], 0));
  262. const int ncoocs = nbytes / sizeof(cooc_t);
  263. cooc_t* cur = coocs;
  264. cooc_t* end = coocs + ncoocs;
  265. auto sparse_value = feature["sparse_value"].mutable_float_list();
  266. auto sparse_local_row = feature["sparse_local_row"].mutable_int64_list();
  267. auto sparse_local_col = feature["sparse_local_col"].mutable_int64_list();
  268. std::sort(cur, end, [](const cooc_t &a, const cooc_t &b) {
  269. return a.row < b.row || (a.row == b.row && a.col < b.col);
  270. });
  271. // Accumulate the counts into the protocol buffer.
  272. int last_row = -1, last_col = -1;
  273. float count = 0;
  274. for (; cur != end; ++cur) {
  275. if (cur->row != last_row || cur->col != last_col) {
  276. if (last_row >= 0 && last_col >= 0) {
  277. sparse_local_row->add_value(last_row);
  278. sparse_local_col->add_value(last_col);
  279. sparse_value->add_value(count);
  280. }
  281. last_row = cur->row;
  282. last_col = cur->col;
  283. count = 0;
  284. }
  285. count += cur->cnt;
  286. }
  287. if (last_row >= 0 && last_col >= 0) {
  288. sparse_local_row->add_value(last_row);
  289. sparse_local_col->add_value(last_col);
  290. sparse_value->add_value(count);
  291. }
  292. munmap(coocs, nbytes);
  293. close(fds_[shard]);
  294. // Write the protocol buffer as a binary blob to disk.
  295. char filename[256];
  296. snprintf(filename, sizeof(filename), "shard-%03d-%03d.pb", row_shard,
  297. col_shard);
  298. const std::string path = output_dirname_ + "/" + filename;
  299. int fd = open(path.c_str(), O_WRONLY | O_TRUNC | O_CREAT, 0666);
  300. assert(fd != -1);
  301. google::protobuf::io::FileOutputStream fout(fd);
  302. example.SerializeToZeroCopyStream(&fout);
  303. fout.Close();
  304. // Remove the temporary file.
  305. unlink(paths_[shard].c_str());
  306. }
  307. std::cout << std::endl;
  308. }
  309. // Counts the co-occurrences in part of the file.
  310. class CoocCounter {
  311. public:
  312. CoocCounter(const std::string &input_filename, const off_t start,
  313. const off_t end, const int window_size,
  314. const std::unordered_map<std::string, int> &token_to_id_map,
  315. CoocBuffer *coocbuf)
  316. : fin_(input_filename, std::ifstream::ate),
  317. start_(start),
  318. end_(end),
  319. window_size_(window_size),
  320. token_to_id_map_(token_to_id_map),
  321. coocbuf_(coocbuf),
  322. marginals_(token_to_id_map.size()) {}
  323. // PTthreads-friendly thunk to Count.
  324. static void* Run(void* param) {
  325. CoocCounter* self = static_cast<CoocCounter*>(param);
  326. self->Count();
  327. return nullptr;
  328. }
  329. // Counts the co-occurrences.
  330. void Count();
  331. const std::vector<double>& Marginals() const { return marginals_; }
  332. protected:
  333. // The input stream.
  334. std::ifstream fin_;
  335. // The range of the file to which this counter should attend.
  336. const off_t start_;
  337. const off_t end_;
  338. // The window size for computing co-occurrences.
  339. const int window_size_;
  340. // A reference to the mapping from tokens to IDs.
  341. const std::unordered_map<std::string, int> &token_to_id_map_;
  342. // The buffer into which counts are to be accumulated.
  343. CoocBuffer* coocbuf_;
  344. // The marginal counts accumulated by this counter.
  345. std::vector<double> marginals_;
  346. };
  347. void CoocCounter::Count() {
  348. const int max_coocs_size = 16 * 1024 * 1024;
  349. // A buffer of co-occurrence counts that we'll periodically sort into
  350. // shards.
  351. cooc_counts_t coocs;
  352. fin_.seekg(start_);
  353. int nlines = 0;
  354. for (off_t filepos = start_; filepos < end_; filepos = fin_.tellg()) {
  355. // Buffer a single sentence.
  356. std::vector<int> sentence;
  357. bool eos;
  358. do {
  359. std::string word;
  360. eos = NextWord(fin_, &word);
  361. auto it = token_to_id_map_.find(word);
  362. if (it != token_to_id_map_.end()) sentence.push_back(it->second);
  363. } while (!eos);
  364. // Generate the co-occurrences for the sentence.
  365. for (int pos = 0; pos < static_cast<int>(sentence.size()); ++pos) {
  366. const int left_id = sentence[pos];
  367. const int window_extent =
  368. std::min(static_cast<int>(sentence.size()) - pos, 1 + window_size_);
  369. for (int off = 1; off < window_extent; ++off) {
  370. const int right_id = sentence[pos + off];
  371. const double count = 1.0 / static_cast<double>(off);
  372. const long long lo = std::min(left_id, right_id);
  373. const long long hi = std::max(left_id, right_id);
  374. const long long key = (hi << 32) | lo;
  375. coocs[key] += count;
  376. marginals_[left_id] += count;
  377. marginals_[right_id] += count;
  378. }
  379. marginals_[left_id] += 1.0;
  380. const long long key = (static_cast<long long>(left_id) << 32) |
  381. static_cast<long long>(left_id);
  382. coocs[key] += 0.5;
  383. }
  384. // Periodically flush the co-occurrences to disk.
  385. if (coocs.size() > max_coocs_size) {
  386. coocbuf_->AccumulateCoocs(coocs);
  387. coocs.clear();
  388. }
  389. if (start_ == 0 && ++nlines % 1000 == 0) {
  390. const double pct = 100.0 * filepos / end_;
  391. fprintf(stdout, "\rComputing co-occurrences: %0.1f%% complete...", pct);
  392. std::flush(std::cout);
  393. }
  394. }
  395. // Accumulate anything we haven't flushed yet.
  396. coocbuf_->AccumulateCoocs(coocs);
  397. if (start_ == 0) std::cout << "done." << std::endl;
  398. }
  399. void WriteMarginals(const std::vector<double> &marginals,
  400. const std::string &output_dirname) {
  401. for (const std::string filename : {"row_sums.txt", "col_sums.txt"}) {
  402. std::ofstream fout(output_dirname + "/" + filename);
  403. fout.setf(std::ios::fixed);
  404. for (double sum : marginals) fout << sum << std::endl;
  405. }
  406. }
  407. int main(int argc, char *argv[]) {
  408. std::string input_filename;
  409. std::string vocab_filename;
  410. std::string output_dirname;
  411. bool generate_vocab = true;
  412. int max_vocab_size = 100 * 1024;
  413. int min_vocab_count = 5;
  414. int window_size = 10;
  415. int shard_size = 4096;
  416. int num_threads = 4;
  417. for (int i = 1; i < argc; ++i) {
  418. std::string arg(argv[i]);
  419. if (arg == "--vocab") {
  420. if (++i >= argc) goto argmissing;
  421. generate_vocab = false;
  422. vocab_filename = argv[i];
  423. } else if (arg == "--max_vocab") {
  424. if (++i >= argc) goto argmissing;
  425. if ((max_vocab_size = atoi(argv[i])) <= 0) goto badarg;
  426. } else if (arg == "--min_count") {
  427. if (++i >= argc) goto argmissing;
  428. if ((min_vocab_count = atoi(argv[i])) <= 0) goto badarg;
  429. } else if (arg == "--window_size") {
  430. if (++i >= argc) goto argmissing;
  431. if ((window_size = atoi(argv[i])) <= 0) goto badarg;
  432. } else if (arg == "--input") {
  433. if (++i >= argc) goto argmissing;
  434. input_filename = argv[i];
  435. } else if (arg == "--output_dir") {
  436. if (++i >= argc) goto argmissing;
  437. output_dirname = argv[i];
  438. } else if (arg == "--shard_size") {
  439. if (++i >= argc) goto argmissing;
  440. shard_size = atoi(argv[i]);
  441. } else if (arg == "--num_threads") {
  442. if (++i >= argc) goto argmissing;
  443. num_threads = atoi(argv[i]);
  444. } else if (arg == "--help") {
  445. std::cout << usage << std::endl;
  446. return 0;
  447. } else {
  448. std::cerr << "unknown arg '" << arg << "'; try --help?" << std::endl;
  449. return 2;
  450. }
  451. continue;
  452. badarg:
  453. std::cerr << "'" << argv[i] << "' is not a valid value for '" << arg
  454. << "'; try --help?" << std::endl;
  455. return 2;
  456. argmissing:
  457. std::cerr << arg << " requires an argument; try --help?" << std::endl;
  458. }
  459. if (input_filename.empty()) {
  460. std::cerr << "please specify the input text with '--input'; try --help?"
  461. << std::endl;
  462. return 2;
  463. }
  464. if (output_dirname.empty()) {
  465. std::cerr << "please specify the output directory with '--output_dir'"
  466. << std::endl;
  467. return 2;
  468. }
  469. struct stat sb;
  470. if (lstat(output_dirname.c_str(), &sb) != 0 || !S_ISDIR(sb.st_mode)) {
  471. std::cerr << "output directory '" << output_dirname
  472. << "' does not exist of is not a directory." << std::endl;
  473. return 1;
  474. }
  475. if (lstat(input_filename.c_str(), &sb) != 0 || !S_ISREG(sb.st_mode)) {
  476. std::cerr << "input file '" << input_filename
  477. << "' does not exist or is not a file." << std::endl;
  478. return 1;
  479. }
  480. // The total size of the input.
  481. const off_t input_size = sb.st_size;
  482. const std::vector<std::string> vocab =
  483. generate_vocab ? CreateVocabulary(input_filename, shard_size,
  484. min_vocab_count, max_vocab_size)
  485. : ReadVocabulary(vocab_filename);
  486. if (!vocab.size()) {
  487. std::cerr << "Empty vocabulary." << std::endl;
  488. return 1;
  489. }
  490. std::cout << "Generating Swivel co-occurrence data into " << output_dirname
  491. << std::endl;
  492. std::cout << "Shard size: " << shard_size << "x" << shard_size << std::endl;
  493. std::cout << "Vocab size: " << vocab.size() << std::endl;
  494. // Write the vocabulary files into the output directory.
  495. WriteVocabulary(vocab, output_dirname);
  496. const int num_shards = vocab.size() / shard_size;
  497. CoocBuffer coocbuf(output_dirname, num_shards, shard_size);
  498. // Build a mapping from the token to its position in the vocabulary file.
  499. std::unordered_map<std::string, int> token_to_id_map;
  500. for (int i = 0; i < static_cast<int>(vocab.size()); ++i)
  501. token_to_id_map[vocab[i]] = i;
  502. // Compute the co-occurrences
  503. std::vector<pthread_t> threads;
  504. std::vector<CoocCounter*> counters;
  505. const off_t nbytes_per_thread = input_size / num_threads;
  506. pthread_attr_t attr;
  507. if (pthread_attr_init(&attr) != 0) {
  508. std::cerr << "unable to initalize pthreads" << std::endl;
  509. return 1;
  510. }
  511. for (int i = 0; i < num_threads; ++i) {
  512. // We could make this smarter and look around for newlines. But
  513. // realistically that's not going to change things much.
  514. const off_t start = i * nbytes_per_thread;
  515. const off_t end =
  516. i < num_threads - 1 ? (i + 1) * nbytes_per_thread : input_size;
  517. CoocCounter *counter = new CoocCounter(
  518. input_filename, start, end, window_size, token_to_id_map, &coocbuf);
  519. counters.push_back(counter);
  520. pthread_t thread;
  521. pthread_create(&thread, &attr, CoocCounter::Run, counter);
  522. threads.push_back(thread);
  523. }
  524. // Wait for threads to finish and collect marginals.
  525. std::vector<double> marginals(vocab.size());
  526. for (int i = 0; i < num_threads; ++i) {
  527. pthread_join(threads[i], 0);
  528. const std::vector<double>& counter_marginals = counters[i]->Marginals();
  529. for (int j = 0; j < static_cast<int>(vocab.size()); ++j)
  530. marginals[j] += counter_marginals[j];
  531. delete counters[i];
  532. }
  533. std::cout << "writing marginals..." << std::endl;
  534. WriteMarginals(marginals, output_dirname);
  535. std::cout << "writing shards..." << std::endl;
  536. coocbuf.WriteShards();
  537. return 0;
  538. }