thindexwriteslave.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jio.hpp"
  14. #include "jfile.hpp"
  15. #include "jtime.hpp"
  16. #include "jfile.ipp"
  17. #include "thbuf.hpp"
  18. #include "slave.ipp"
  19. #include "thexception.hpp"
  20. #include "thmfilemanager.hpp"
  21. #include "thactivityutil.ipp"
  22. #include "keybuild.hpp"
  23. #include "thbufdef.hpp"
  24. #include "backup.hpp"
  25. #include "rtldynfield.hpp"
  26. #define SINGLEPART_KEY_TRANSFER_SIZE 0x10000
  27. #define FEWWARNCAP 10
  28. class IndexWriteSlaveActivity : public ProcessSlaveActivity, public ILookAheadStopNotify, implements ICopyFileProgress, implements IBlobCreator
  29. {
  30. typedef ProcessSlaveActivity PARENT;
  31. StringAttr logicalFilename;
  32. Owned<IPartDescriptor> partDesc, tlkDesc;
  33. IHThorIndexWriteArg *helper;
  34. Owned <IKeyBuilder> builder;
  35. Owned<IRowStream> myInputStream;
  36. Owned<IPropertyTree> metadata;
  37. Linked<IEngineRowAllocator> outRowAllocator;
  38. bool buildTlk, active;
  39. bool sizeSignalled;
  40. bool isLocal, singlePartKey, reportOverflow, fewcapwarned, refactor;
  41. unsigned __int64 totalCount;
  42. size32_t maxDiskRecordSize, lastRowSize, firstRowSize;
  43. MemoryBuffer rowBuff;
  44. OwnedConstThorRow lastRow, firstRow;
  45. bool needFirstRow, enableTlkPart0, receivingTag2;
  46. unsigned replicateDone;
  47. Owned<IFile> existingTlkIFile;
  48. unsigned partCrc, tlkCrc;
  49. mptag_t mpTag2;
  50. Owned<IRowServer> rowServer;
  51. void init()
  52. {
  53. sizeSignalled = false;
  54. totalCount = 0;
  55. lastRowSize = firstRowSize = 0;
  56. replicateDone = 0;
  57. fewcapwarned = false;
  58. needFirstRow = true;
  59. receivingTag2 = false;
  60. }
  61. public:
  62. IMPLEMENT_IINTERFACE_USING(PARENT);
  63. IndexWriteSlaveActivity(CGraphElementBase *_container) : ProcessSlaveActivity(_container)
  64. {
  65. helper = static_cast <IHThorIndexWriteArg *> (queryHelper());
  66. init();
  67. maxDiskRecordSize = 0;
  68. active = false;
  69. isLocal = false;
  70. buildTlk = true;
  71. singlePartKey = false;
  72. refactor = false;
  73. enableTlkPart0 = (0 != container.queryJob().getWorkUnitValueInt("enableTlkPart0", globals->getPropBool("@enableTlkPart0", true)));
  74. reInit = (0 != (TIWvarfilename & helper->getFlags()));
  75. }
  76. virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) override
  77. {
  78. isLocal = 0 != (TIWlocal & helper->getFlags());
  79. mpTag = container.queryJobChannel().deserializeMPTag(data);
  80. mpTag2 = container.queryJobChannel().deserializeMPTag(data);
  81. data.read(active);
  82. if (active)
  83. {
  84. data.read(logicalFilename);
  85. partDesc.setown(deserializePartFileDescriptor(data));
  86. }
  87. data.read(singlePartKey);
  88. data.read(refactor);
  89. if (singlePartKey)
  90. buildTlk = false;
  91. else
  92. {
  93. data.read(buildTlk);
  94. if (firstNode())
  95. {
  96. if (buildTlk)
  97. tlkDesc.setown(deserializePartFileDescriptor(data));
  98. else if (!isLocal) // existing tlk then..
  99. {
  100. tlkDesc.setown(deserializePartFileDescriptor(data));
  101. unsigned c;
  102. data.read(c);
  103. while (c--)
  104. {
  105. RemoteFilename rf;
  106. rf.deserialize(data);
  107. if (!existingTlkIFile)
  108. {
  109. Owned<IFile> iFile = createIFile(rf);
  110. if (iFile->exists())
  111. existingTlkIFile.set(iFile);
  112. }
  113. }
  114. if (!existingTlkIFile)
  115. throw MakeActivityException(this, TE_FileNotFound, "Top level key part does not exist, for key");
  116. }
  117. }
  118. }
  119. IOutputMetaData * diskSize = helper->queryDiskRecordSize();
  120. //Need to adjust the size if the last field is used in the special fileposition location.
  121. size32_t fileposSize = hasTrailingFileposition(diskSize->queryTypeInfo()) ? sizeof(offset_t) : 0;
  122. assertex(!(diskSize->getMetaFlags() & MDFneedserializedisk));
  123. if (diskSize->isVariableSize())
  124. {
  125. if (TIWmaxlength & helper->getFlags())
  126. maxDiskRecordSize = helper->getMaxKeySize();
  127. else
  128. maxDiskRecordSize = KEYBUILD_MAXLENGTH; //Current default behaviour, could be improved in the future
  129. }
  130. else
  131. maxDiskRecordSize = diskSize->getFixedSize() - fileposSize;
  132. reportOverflow = false;
  133. }
  134. virtual void setInputStream(unsigned index, CThorInput &_input, bool consumerOrdered) override
  135. {
  136. PARENT::setInputStream(index, _input, consumerOrdered);
  137. // JCSMORE - not sure why you ever want a look ahead on a sink like this?
  138. if (!isFastThrough(input))
  139. setLookAhead(0, createRowStreamLookAhead(this, inputStream, queryRowInterfaces(input), INDEXWRITE_SMART_BUFFER_SIZE, true, false, RCUNBOUND, this, &container.queryJob().queryIDiskUsage()));
  140. }
  141. void open(IPartDescriptor &partDesc, bool isTopLevel, bool isVariable)
  142. {
  143. StringBuffer partFname;
  144. getPartFilename(partDesc, 0, partFname);
  145. bool compress=false;
  146. OwnedIFileIO iFileIO = createMultipleWrite(this, partDesc, 0, TW_RenameToPrimary, compress, NULL, this, &abortSoon);
  147. Owned<IFileIOStream> out = createBufferedIOStream(iFileIO);
  148. ActPrintLog("INDEXWRITE: created fixed output stream %s", partFname.str());
  149. unsigned flags = COL_PREFIX;
  150. if (TIWrowcompress & helper->getFlags())
  151. flags |= HTREE_COMPRESSED_KEY|HTREE_QUICK_COMPRESSED_KEY;
  152. else if (!(TIWnolzwcompress & helper->getFlags()))
  153. flags |= HTREE_COMPRESSED_KEY;
  154. if (!isLocal)
  155. flags |= HTREE_FULLSORT_KEY;
  156. if (isVariable)
  157. flags |= HTREE_VARSIZE;
  158. buildUserMetadata(metadata);
  159. buildLayoutMetadata(metadata);
  160. unsigned nodeSize = metadata ? metadata->getPropInt("_nodeSize", NODESIZE) : NODESIZE;
  161. builder.setown(createKeyBuilder(out, flags, maxDiskRecordSize, nodeSize, helper->getKeyedSize(), isTopLevel ? 0 : totalCount));
  162. }
  163. void buildUserMetadata(Owned<IPropertyTree> & metadata)
  164. {
  165. size32_t nameLen;
  166. char * nameBuff;
  167. size32_t valueLen;
  168. char * valueBuff;
  169. unsigned idx = 0;
  170. while(helper->getIndexMeta(nameLen, nameBuff, valueLen, valueBuff, idx++))
  171. {
  172. StringBuffer name(nameLen, nameBuff);
  173. StringBuffer value(valueLen, valueBuff);
  174. if(*nameBuff == '_' && strcmp(name, "_nodeSize") != 0)
  175. throw MakeActivityException(this, 0, "Invalid name %s in user metadata for index %s (names beginning with underscore are reserved)", name.str(), logicalFilename.get());
  176. if(!validateXMLTag(name.str()))
  177. throw MakeActivityException(this, 0, "Invalid name %s in user metadata for index %s (not legal XML element name)", name.str(), logicalFilename.get());
  178. if(!metadata) metadata.setown(createPTree("metadata"));
  179. metadata->setProp(name.str(), value.str());
  180. }
  181. }
  182. void buildLayoutMetadata(Owned<IPropertyTree> & metadata)
  183. {
  184. if(!metadata) metadata.setown(createPTree("metadata"));
  185. metadata->setProp("_record_ECL", helper->queryRecordECL());
  186. if (helper->queryDiskRecordSize()->queryTypeInfo())
  187. {
  188. MemoryBuffer out;
  189. dumpTypeInfo(out, helper->queryDiskRecordSize()->queryTypeInfo());
  190. metadata->setPropBin("_rtlType", out.length(), out.toByteArray());
  191. }
  192. }
  193. void close(IPartDescriptor &partDesc, unsigned &crc)
  194. {
  195. StringBuffer partFname;
  196. getPartFilename(partDesc, 0, partFname);
  197. Owned<IException> e;
  198. try
  199. {
  200. if (builder)
  201. builder->finish(metadata, &crc);
  202. }
  203. catch (IException *_e)
  204. {
  205. ActPrintLog(_e, "Error closing file: %s", partFname.str());
  206. abortSoon = true;
  207. e.setown(_e);
  208. }
  209. catch (CATCHALL)
  210. {
  211. abortSoon = true;
  212. e.setown(MakeActivityException(this, 0, "INDEXWRITE: Error closing file: %s - unknown exception", partFname.str()));
  213. }
  214. try
  215. {
  216. metadata.clear();
  217. builder.clear();
  218. }
  219. catch (IException *_e)
  220. {
  221. ActPrintLog(_e, "Error closing file: %s", partFname.str());
  222. _e->Release();
  223. }
  224. if (abortSoon)
  225. removeFiles(partDesc);
  226. if (e)
  227. throw LINK(e);
  228. }
  229. void removeFiles(IPartDescriptor &partDesc)
  230. {
  231. StringBuffer partFname;
  232. getPartFilename(partDesc, 0, partFname);
  233. Owned<IFile> primary = createIFile(partFname.str());
  234. try { primary->remove(); }
  235. catch (IException *e) { ActPrintLog(e, "Failed to remove file: %s", partFname.str()); e->Release(); }
  236. catch (CATCHALL) { ActPrintLog("Failed to remove: %s", partFname.str()); }
  237. }
  238. virtual unsigned __int64 createBlob(size32_t size, const void * ptr)
  239. {
  240. return builder->createBlob(size, (const char *) ptr);
  241. }
  242. virtual void process() override
  243. {
  244. ActPrintLog("INDEXWRITE: Start");
  245. init();
  246. IRowStream *stream = inputStream;
  247. ThorDataLinkMetaInfo info;
  248. input->getMetaInfo(info);
  249. outRowAllocator.setown(getRowAllocator(helper->queryDiskRecordSize()));
  250. start();
  251. if (refactor)
  252. {
  253. assertex(isLocal);
  254. if (active)
  255. {
  256. unsigned targetWidth = partDesc->queryOwner().numParts()-(buildTlk?1:0);
  257. assertex(0 == container.queryJob().querySlaves() % targetWidth);
  258. unsigned partsPerNode = container.queryJob().querySlaves() / targetWidth;
  259. unsigned myPart = queryJobChannel().queryMyRank();
  260. IArrayOf<IRowStream> streams;
  261. streams.append(*LINK(stream));
  262. --partsPerNode;
  263. // Should this be merging 1,11,21,31 etc.
  264. unsigned p=0;
  265. unsigned fromPart = targetWidth+1 + (partsPerNode * (myPart-1));
  266. for (; p<partsPerNode; p++)
  267. {
  268. streams.append(*createRowStreamFromNode(*this, fromPart++, queryJobChannel().queryJobComm(), mpTag, abortSoon));
  269. }
  270. ICompare *icompare = helper->queryCompare();
  271. assertex(icompare);
  272. Owned<IRowLinkCounter> linkCounter = new CThorRowLinkCounter;
  273. myInputStream.setown(createRowStreamMerger(streams.ordinality(), streams.getArray(), icompare, false, linkCounter));
  274. stream = myInputStream;
  275. }
  276. else // serve nodes, creating merged parts
  277. rowServer.setown(createRowServer(this, stream, queryJobChannel().queryJobComm(), mpTag));
  278. }
  279. processed = THORDATALINK_STARTED;
  280. // single part key support
  281. // has to serially pull all data fron nodes 2-N
  282. // nodes 2-N, could/should start pushing some data (as it's supposed to be small) to cut down on serial nature.
  283. unsigned node = queryJobChannel().queryMyRank();
  284. if (singlePartKey)
  285. {
  286. if (1 == node)
  287. {
  288. try
  289. {
  290. open(*partDesc, false, helper->queryDiskRecordSize()->isVariableSize());
  291. for (;;)
  292. {
  293. OwnedConstThorRow row = inputStream->ungroupedNextRow();
  294. if (!row)
  295. break;
  296. if (abortSoon) return;
  297. processRow(row);
  298. }
  299. unsigned node = 2;
  300. while (node <= container.queryJob().querySlaves())
  301. {
  302. Linked<IOutputRowDeserializer> deserializer = ::queryRowDeserializer(input);
  303. CMessageBuffer mb;
  304. Owned<ISerialStream> stream = createMemoryBufferSerialStream(mb);
  305. CThorStreamDeserializerSource rowSource;
  306. rowSource.setStream(stream);
  307. bool successSR;
  308. for (;;)
  309. {
  310. {
  311. BooleanOnOff tf(receivingTag2);
  312. successSR = queryJobChannel().queryJobComm().sendRecv(mb, node, mpTag2);
  313. }
  314. if (successSR)
  315. {
  316. if (rowSource.eos())
  317. break;
  318. Linked<IEngineRowAllocator> allocator = ::queryRowAllocator(input);
  319. do
  320. {
  321. RtlDynamicRowBuilder rowBuilder(allocator);
  322. size32_t sz = deserializer->deserialize(rowBuilder, rowSource);
  323. OwnedConstThorRow fRow = rowBuilder.finalizeRowClear(sz);
  324. processRow(fRow);
  325. }
  326. while (!rowSource.eos());
  327. }
  328. }
  329. node++;
  330. }
  331. }
  332. catch (CATCHALL)
  333. {
  334. close(*partDesc, partCrc);
  335. throw;
  336. }
  337. close(*partDesc, partCrc);
  338. stop();
  339. }
  340. else
  341. {
  342. CMessageBuffer mb;
  343. CMemoryRowSerializer mbs(mb);
  344. Linked<IOutputRowSerializer> serializer = ::queryRowSerializer(input);
  345. for (;;)
  346. {
  347. BooleanOnOff tf(receivingTag2);
  348. if (queryJobChannel().queryJobComm().recv(mb, 1, mpTag2)) // node 1 asking for more..
  349. {
  350. if (abortSoon) break;
  351. mb.clear();
  352. do
  353. {
  354. OwnedConstThorRow row = inputStream->ungroupedNextRow();
  355. if (!row) break;
  356. serializer->serialize(mbs, (const byte *)row.get());
  357. } while (mb.length() < SINGLEPART_KEY_TRANSFER_SIZE); // NB: at least one row
  358. if (!queryJobChannel().queryJobComm().reply(mb))
  359. throw MakeThorException(0, "Failed to send index data to node 1, from node %d", node);
  360. if (0 == mb.length())
  361. break;
  362. }
  363. }
  364. }
  365. }
  366. else
  367. {
  368. if (!refactor || active)
  369. {
  370. try
  371. {
  372. StringBuffer partFname;
  373. getPartFilename(*partDesc, 0, partFname);
  374. ActPrintLog("INDEXWRITE: process: handling fname : %s", partFname.str());
  375. open(*partDesc, false, helper->queryDiskRecordSize()->isVariableSize());
  376. ActPrintLog("INDEXWRITE: write");
  377. BooleanOnOff tf(receiving);
  378. if (!refactor || !active)
  379. receiving = false;
  380. do
  381. {
  382. OwnedConstThorRow row = inputStream->ungroupedNextRow();
  383. if (!row)
  384. break;
  385. processRow(row);
  386. } while (!abortSoon);
  387. ActPrintLog("INDEXWRITE: write level 0 complete");
  388. }
  389. catch (CATCHALL)
  390. {
  391. close(*partDesc, partCrc);
  392. throw;
  393. }
  394. close(*partDesc, partCrc);
  395. stop();
  396. ActPrintLog("INDEXWRITE: Wrote %" RCPF "d records", processed & THORDATALINK_COUNT_MASK);
  397. if (buildTlk)
  398. {
  399. ActPrintLog("INDEXWRITE: sending rows");
  400. NodeInfoArray tlkRows;
  401. CMessageBuffer msg;
  402. if (firstNode())
  403. {
  404. if (processed & THORDATALINK_COUNT_MASK)
  405. {
  406. if (enableTlkPart0)
  407. tlkRows.append(* new CNodeInfo(0, firstRow.get(), firstRowSize, totalCount));
  408. tlkRows.append(* new CNodeInfo(1, lastRow.get(), lastRowSize, totalCount));
  409. }
  410. }
  411. else
  412. {
  413. if (processed & THORDATALINK_COUNT_MASK)
  414. {
  415. CNodeInfo row(queryJobChannel().queryMyRank(), lastRow.get(), lastRowSize, totalCount);
  416. row.serialize(msg);
  417. }
  418. queryJobChannel().queryJobComm().send(msg, 1, mpTag);
  419. }
  420. if (firstNode())
  421. {
  422. ActPrintLog("INDEXWRITE: Waiting on tlk to complete");
  423. // JCSMORE if refactor==true, is rowsToReceive here right??
  424. unsigned rowsToReceive = (refactor ? (tlkDesc->queryOwner().numParts()-1) : container.queryJob().querySlaves()) -1; // -1 'cos got my own in array already
  425. ActPrintLog("INDEXWRITE: will wait for info from %d slaves before writing TLK", rowsToReceive);
  426. while (rowsToReceive--)
  427. {
  428. msg.clear();
  429. receiveMsg(msg, RANK_ALL, mpTag); // NH->JCS RANK_ALL_OTHER not supported for recv
  430. if (abortSoon)
  431. return;
  432. if (msg.length())
  433. {
  434. CNodeInfo *ni = new CNodeInfo();
  435. ni->deserialize(msg);
  436. tlkRows.append(*ni);
  437. }
  438. }
  439. tlkRows.sort(CNodeInfo::compare);
  440. StringBuffer path;
  441. getPartFilename(*tlkDesc, 0, path);
  442. ActPrintLog("INDEXWRITE: creating toplevel key file : %s", path.str());
  443. try
  444. {
  445. open(*tlkDesc, true, helper->queryDiskRecordSize()->isVariableSize());
  446. if (tlkRows.length())
  447. {
  448. CNodeInfo &lastNode = tlkRows.item(tlkRows.length()-1);
  449. memset(lastNode.value, 0xff, lastNode.size);
  450. }
  451. ForEachItemIn(idx, tlkRows)
  452. {
  453. CNodeInfo &info = tlkRows.item(idx);
  454. builder->processKeyData((char *)info.value, info.pos, info.size);
  455. }
  456. close(*tlkDesc, tlkCrc);
  457. }
  458. catch (CATCHALL)
  459. {
  460. abortSoon = true;
  461. close(*tlkDesc, tlkCrc);
  462. removeFiles(*partDesc);
  463. throw;
  464. }
  465. }
  466. }
  467. else if (!isLocal && firstNode())
  468. {
  469. // if !buildTlk - then copy provided index's tlk.
  470. unsigned l;
  471. for (l=0; l<tlkDesc->numCopies(); l++)
  472. {
  473. StringBuffer path;
  474. getPartFilename(*tlkDesc, l, path, true);
  475. if (0 == l)
  476. {
  477. OwnedIFile dstIFile = createIFile(path.str());
  478. copyFile(dstIFile, existingTlkIFile);
  479. }
  480. else
  481. doReplicate(this, *tlkDesc, NULL);
  482. }
  483. }
  484. }
  485. ActPrintLog("INDEXWRITE: All done");
  486. }
  487. }
  488. virtual void endProcess() override
  489. {
  490. if (processed & THORDATALINK_STARTED)
  491. {
  492. if (!inputStopped) // probably already stopped in process()
  493. stop();
  494. processed |= THORDATALINK_STOPPED;
  495. }
  496. inputStream = NULL;
  497. }
  498. virtual void abort() override
  499. {
  500. PARENT::abort();
  501. cancelReceiveMsg(RANK_ALL, mpTag);
  502. if (receivingTag2)
  503. queryJobChannel().queryJobComm().cancel(RANK_ALL, mpTag2);
  504. if (rowServer)
  505. rowServer->stop();
  506. }
  507. virtual void kill() override
  508. {
  509. PARENT::kill();
  510. if (abortSoon)
  511. {
  512. if (partDesc)
  513. removeFiles(*partDesc);
  514. if (tlkDesc.get())
  515. removeFiles(*tlkDesc);
  516. }
  517. }
  518. virtual void processDone(MemoryBuffer &mb) override
  519. {
  520. builder.clear();
  521. if (refactor && !active)
  522. return;
  523. rowcount_t _processed = processed & THORDATALINK_COUNT_MASK;
  524. mb.append(_processed);
  525. if (!singlePartKey || firstNode())
  526. {
  527. StringBuffer partFname;
  528. getPartFilename(*partDesc, 0, partFname);
  529. Owned<IFile> ifile = createIFile(partFname.str());
  530. offset_t sz = ifile->size();
  531. mb.append(sz);
  532. if (-1 != sz)
  533. container.queryJob().queryIDiskUsage().increase(sz);
  534. CDateTime createTime, modifiedTime, accessedTime;
  535. ifile->getTime(&createTime, &modifiedTime, &accessedTime);
  536. modifiedTime.serialize(mb);
  537. mb.append(partCrc);
  538. if (!singlePartKey && firstNode() && buildTlk)
  539. {
  540. mb.append(tlkCrc);
  541. StringBuffer path;
  542. getPartFilename(*tlkDesc, 0, path);
  543. ifile.setown(createIFile(path.str()));
  544. sz = ifile->size();
  545. mb.append(sz);
  546. if (-1 != sz)
  547. container.queryJob().queryIDiskUsage().increase(sz);
  548. ifile->getTime(&createTime, &modifiedTime, &accessedTime);
  549. modifiedTime.serialize(mb);
  550. }
  551. }
  552. }
  553. inline void processRow(const void *row)
  554. {
  555. // Extract the file position and insert the sequence number and other rollups...
  556. unsigned __int64 fpos;
  557. RtlDynamicRowBuilder lastRowBuilder(outRowAllocator);
  558. lastRowSize = helper->transform(lastRowBuilder, row, this, fpos);
  559. lastRow.setown(lastRowBuilder.finalizeRowClear(lastRowSize));
  560. // NB: result of transform is serialized
  561. if (enableTlkPart0 && needFirstRow)
  562. {
  563. needFirstRow = false;
  564. firstRow.set(lastRow);
  565. firstRowSize = lastRowSize;
  566. }
  567. if (reportOverflow && totalCount == I64C(0x100000000))
  568. {
  569. Owned<IThorException> e = MakeActivityWarning(this, TE_MoxieIndarOverflow, "Moxie indar sequence number has overflowed");
  570. fireException(e);
  571. reportOverflow = false;
  572. }
  573. builder->processKeyData((const char *)lastRow.get(), fpos, lastRowSize);
  574. processed++;
  575. totalCount++;
  576. if (singlePartKey && !fewcapwarned && totalCount>(FEWWARNCAP*0x100000))
  577. {
  578. fewcapwarned = true;
  579. Owned<IThorException> e = MakeActivityWarning(this, TE_BuildIndexFewExcess, "BUILDINDEX: building single part key because marked as 'FEW' but row count in excess of %dM", FEWWARNCAP);
  580. fireException(e);
  581. }
  582. }
  583. virtual void onInputFinished(rowcount_t finalcount)
  584. {
  585. if (!sizeSignalled)
  586. {
  587. sizeSignalled = true;
  588. ActPrintLog("finished input %" RCPF "d", finalcount);
  589. }
  590. }
  591. virtual void serializeStats(MemoryBuffer &mb)
  592. {
  593. PARENT::serializeStats(mb);
  594. mb.append(replicateDone);
  595. }
  596. // ICopyFileProgress
  597. CFPmode onProgress(unsigned __int64 sizeDone, unsigned __int64 totalSize)
  598. {
  599. replicateDone = sizeDone ? ((unsigned)(sizeDone*100/totalSize)) : 0;
  600. return abortSoon?CFPstop:CFPcontinue;
  601. }
  602. };
  603. CActivityBase *createIndexWriteSlave(CGraphElementBase *container)
  604. {
  605. return new IndexWriteSlaveActivity(container);
  606. }