rtlds.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "platform.h"
  15. #include "jliball.hpp"
  16. #include "eclrtl.hpp"
  17. #include "eclhelper.hpp"
  18. #include "rtlds_imp.hpp"
  19. #include "rtlread_imp.hpp"
  20. #define FIRST_CHUNK_SIZE 0x100
  21. #define DOUBLE_LIMIT 0x10000 // must be a power of 2
  22. unsigned getNextSize(unsigned max, unsigned required)
  23. {
  24. if (required > DOUBLE_LIMIT)
  25. {
  26. max = (required + DOUBLE_LIMIT) & ~(DOUBLE_LIMIT-1);
  27. if (required >= max)
  28. throw MakeStringException(-1, "getNextSize: Request for %d bytes oldMax = %d", required, max);
  29. }
  30. else
  31. {
  32. if (max == 0)
  33. max = FIRST_CHUNK_SIZE;
  34. while (required >= max)
  35. max += max;
  36. }
  37. return max;
  38. }
  39. //---------------------------------------------------------------------------
  40. RtlDatasetBuilder::RtlDatasetBuilder()
  41. {
  42. maxSize = 0;
  43. buffer = NULL;
  44. totalSize = 0;
  45. }
  46. RtlDatasetBuilder::~RtlDatasetBuilder()
  47. {
  48. free(buffer);
  49. }
  50. void RtlDatasetBuilder::ensure(size32_t required)
  51. {
  52. if (required > maxSize)
  53. {
  54. maxSize = getNextSize(maxSize, required);
  55. buffer = (byte *)realloc(buffer, maxSize);
  56. if (!buffer)
  57. throw MakeStringException(-1, "Failed to allocate temporary dataset (requesting %d bytes)", maxSize);
  58. }
  59. }
  60. byte * RtlDatasetBuilder::ensureCapacity(size32_t required, const char * fieldName)
  61. {
  62. ensure(totalSize + required);
  63. return buffer + totalSize;
  64. }
  65. void RtlDatasetBuilder::flushDataset()
  66. {
  67. }
  68. void RtlDatasetBuilder::getData(size32_t & len, void * & data)
  69. {
  70. flushDataset();
  71. len = totalSize;
  72. data = malloc(totalSize);
  73. memcpy(data, buffer, totalSize);
  74. }
  75. size32_t RtlDatasetBuilder::getSize()
  76. {
  77. flushDataset();
  78. return totalSize;
  79. }
  80. byte * RtlDatasetBuilder::queryData()
  81. {
  82. flushDataset();
  83. return buffer;
  84. }
  85. void RtlDatasetBuilder::reportMissingRow() const
  86. {
  87. throw MakeStringException(MSGAUD_user, 1000, "RtlDatasetBuilder::row() is NULL");
  88. }
  89. //---------------------------------------------------------------------------
  90. RtlFixedDatasetBuilder::RtlFixedDatasetBuilder(unsigned _recordSize, unsigned maxRows)
  91. {
  92. recordSize = _recordSize;
  93. if ((int)maxRows > 0)
  94. ensure(recordSize * maxRows);
  95. }
  96. byte * RtlFixedDatasetBuilder::createSelf()
  97. {
  98. self = ensureCapacity(recordSize, NULL);
  99. return self;
  100. }
  101. //---------------------------------------------------------------------------
  102. RtlLimitedFixedDatasetBuilder::RtlLimitedFixedDatasetBuilder(unsigned _recordSize, unsigned _maxRows, DefaultRowCreator _rowCreator, IResourceContext *_ctx) : RtlFixedDatasetBuilder(_recordSize, _maxRows)
  103. {
  104. maxRows = _maxRows;
  105. if ((int)maxRows < 0) maxRows = 0;
  106. rowCreator = _rowCreator;
  107. ctx = _ctx;
  108. }
  109. byte * RtlLimitedFixedDatasetBuilder::createRow()
  110. {
  111. if (totalSize >= maxRows * recordSize)
  112. return NULL;
  113. return RtlFixedDatasetBuilder::createRow();
  114. }
  115. void RtlLimitedFixedDatasetBuilder::flushDataset()
  116. {
  117. if (rowCreator)
  118. {
  119. while (totalSize < maxRows * recordSize)
  120. {
  121. createRow();
  122. size32_t size = rowCreator(rowBuilder(), ctx);
  123. finalizeRow(size);
  124. }
  125. }
  126. RtlFixedDatasetBuilder::flushDataset();
  127. }
  128. //---------------------------------------------------------------------------
  129. RtlVariableDatasetBuilder::RtlVariableDatasetBuilder(IRecordSize & _recordSize)
  130. {
  131. recordSize = &_recordSize;
  132. maxRowSize = recordSize->getRecordSize(NULL); // initial size
  133. }
  134. byte * RtlVariableDatasetBuilder::createSelf()
  135. {
  136. self = ensureCapacity(maxRowSize, NULL);
  137. return self;
  138. }
  139. void RtlVariableDatasetBuilder::deserializeRow(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in)
  140. {
  141. createRow();
  142. size32_t rowSize = deserializer.deserialize(rowBuilder(), in);
  143. finalizeRow(rowSize);
  144. }
  145. //---------------------------------------------------------------------------
  146. RtlLimitedVariableDatasetBuilder::RtlLimitedVariableDatasetBuilder(IRecordSize & _recordSize, unsigned _maxRows, DefaultRowCreator _rowCreator, IResourceContext *_ctx) : RtlVariableDatasetBuilder(_recordSize)
  147. {
  148. numRows = 0;
  149. maxRows = _maxRows;
  150. rowCreator = _rowCreator;
  151. ctx = _ctx;
  152. }
  153. byte * RtlLimitedVariableDatasetBuilder::createRow()
  154. {
  155. if (numRows >= maxRows)
  156. return NULL;
  157. numRows++;
  158. return RtlVariableDatasetBuilder::createRow();
  159. }
  160. void RtlLimitedVariableDatasetBuilder::flushDataset()
  161. {
  162. if (rowCreator)
  163. {
  164. while (numRows < maxRows)
  165. {
  166. createRow();
  167. size32_t thisSize = rowCreator(rowBuilder(), ctx);
  168. finalizeRow(thisSize);
  169. }
  170. }
  171. RtlVariableDatasetBuilder::flushDataset();
  172. }
  173. //---------------------------------------------------------------------------
  174. byte * * rtlRowsAttr::linkrows() const
  175. {
  176. if (rows)
  177. rtlLinkRowset(rows);
  178. return rows;
  179. }
  180. void rtlRowsAttr::set(size32_t _count, byte * * _rows)
  181. {
  182. setown(_count, rtlLinkRowset(_rows));
  183. }
  184. void rtlRowsAttr::setRow(IEngineRowAllocator * rowAllocator, const byte * _row)
  185. {
  186. setown(1, rowAllocator->appendRowOwn(NULL, 1, rowAllocator->linkRow(_row)));
  187. }
  188. void rtlRowsAttr::setown(size32_t _count, byte * * _rows)
  189. {
  190. dispose();
  191. count = _count;
  192. rows = _rows;
  193. }
  194. void rtlRowsAttr::dispose()
  195. {
  196. if (rows)
  197. rtlReleaseRowset(count, rows);
  198. }
  199. //---------------------------------------------------------------------------
  200. void rtlReportFieldOverflow(unsigned size, unsigned max, const RtlFieldInfo * field)
  201. {
  202. if (field)
  203. rtlReportFieldOverflow(size, max, field->name->str());
  204. else
  205. rtlReportRowOverflow(size, max);
  206. }
  207. void RtlRowBuilderBase::reportMissingRow() const
  208. {
  209. throw MakeStringException(MSGAUD_user, 1000, "RtlRowBuilderBase::row() is NULL");
  210. }
  211. byte * RtlDynamicRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
  212. {
  213. if (required > maxLength)
  214. {
  215. if (!self)
  216. create();
  217. if (required > maxLength)
  218. {
  219. void * next = rowAllocator->resizeRow(required, self, maxLength);
  220. if (!next)
  221. {
  222. rtlReportFieldOverflow(required, maxLength, fieldName);
  223. return NULL;
  224. }
  225. self = static_cast<byte *>(next);
  226. }
  227. }
  228. return self;
  229. }
  230. void RtlDynamicRowBuilder::swapWith(RtlDynamicRowBuilder & other)
  231. {
  232. size32_t savedMaxLength = maxLength;
  233. void * savedSelf = getUnfinalizedClear();
  234. setown(other.getMaxLength(), other.getUnfinalizedClear());
  235. other.setown(savedMaxLength, savedSelf);
  236. }
  237. //---------------------------------------------------------------------------
  238. byte * RtlStaticRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
  239. {
  240. if (required <= maxLength)
  241. return static_cast<byte *>(self);
  242. rtlReportFieldOverflow(required, maxLength, fieldName);
  243. return NULL;
  244. }
  245. byte * RtlStaticRowBuilder::createSelf()
  246. {
  247. throwUnexpected();
  248. }
  249. //---------------------------------------------------------------------------
  250. RtlLinkedDatasetBuilder::RtlLinkedDatasetBuilder(IEngineRowAllocator * _rowAllocator, int _choosenLimit) : builder(_rowAllocator, false)
  251. {
  252. rowAllocator = LINK(_rowAllocator);
  253. rowset = NULL;
  254. count = 0;
  255. max = 0;
  256. choosenLimit = (unsigned)_choosenLimit;
  257. }
  258. RtlLinkedDatasetBuilder::~RtlLinkedDatasetBuilder()
  259. {
  260. builder.clear();
  261. if (rowset)
  262. rowAllocator->releaseRowset(count, rowset);
  263. ::Release(rowAllocator);
  264. }
  265. void RtlLinkedDatasetBuilder::append(const void * source)
  266. {
  267. if (count < choosenLimit)
  268. {
  269. ensure(count+1);
  270. rowset[count] = (byte *)rowAllocator->linkRow(source);
  271. count++;
  272. }
  273. }
  274. void RtlLinkedDatasetBuilder::appendRows(size32_t num, byte * * rows)
  275. {
  276. if (num && (count < choosenLimit))
  277. {
  278. unsigned numToAdd = (count + num < choosenLimit) ? num : choosenLimit - count;
  279. ensure(count+numToAdd);
  280. for (unsigned i=0; i < numToAdd; i++)
  281. rowset[count+i] = (byte *)rowAllocator->linkRow(rows[i]);
  282. count += numToAdd;
  283. }
  284. }
  285. void RtlLinkedDatasetBuilder::appendOwn(const void * row)
  286. {
  287. assertex(!builder.exists());
  288. if (count < choosenLimit)
  289. {
  290. ensure(count+1);
  291. rowset[count] = (byte *)row;
  292. count++;
  293. }
  294. else
  295. rowAllocator->releaseRow(row);
  296. }
  297. byte * RtlLinkedDatasetBuilder::createRow()
  298. {
  299. if (count >= choosenLimit)
  300. return NULL;
  301. return builder.getSelf();
  302. }
  303. //cloned from thorcommon.cpp
  304. class RtlChildRowLinkerWalker : implements IIndirectMemberVisitor
  305. {
  306. public:
  307. virtual void visitRowset(size32_t count, byte * * rows)
  308. {
  309. rtlLinkRowset(rows);
  310. }
  311. virtual void visitRow(const byte * row)
  312. {
  313. rtlLinkRow(row);
  314. }
  315. };
  316. void RtlLinkedDatasetBuilder::cloneRow(size32_t len, const void * row)
  317. {
  318. if (count >= choosenLimit)
  319. return;
  320. byte * self = builder.ensureCapacity(len, NULL);
  321. memcpy(self, row, len);
  322. IOutputMetaData * meta = rowAllocator->queryOutputMeta();
  323. if (meta->getMetaFlags() & MDFneedserialize)
  324. {
  325. RtlChildRowLinkerWalker walker;
  326. meta->walkIndirectMembers(self, walker);
  327. }
  328. finalizeRow(len);
  329. }
  330. void RtlLinkedDatasetBuilder::deserializeRow(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in)
  331. {
  332. builder.ensureRow();
  333. size32_t rowSize = deserializer.deserialize(builder, in);
  334. finalizeRow(rowSize);
  335. }
  336. inline void doDeserializeRowset(RtlLinkedDatasetBuilder & builder, IOutputRowDeserializer & deserializer, IRowDeserializerSource & in, offset_t marker, bool isGrouped)
  337. {
  338. byte eogPending = false;
  339. while (!in.finishedNested(marker))
  340. {
  341. if (isGrouped && eogPending)
  342. builder.appendEOG();
  343. builder.deserializeRow(deserializer, in);
  344. if (isGrouped)
  345. in.read(1, &eogPending);
  346. }
  347. }
  348. inline void doSerializeRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
  349. {
  350. for (unsigned i=0; i < count; i++)
  351. {
  352. byte *row = rows[i];
  353. if (row) // When serializing a dictionary, there may be nulls in the rowset. These can be skipped (we rehash on deserialize)
  354. {
  355. serializer->serialize(out, rows[i]);
  356. if (isGrouped)
  357. {
  358. byte eogPending = (i+1 < count) && (rows[i+1] == NULL);
  359. out.put(1, &eogPending);
  360. }
  361. }
  362. }
  363. }
  364. void RtlLinkedDatasetBuilder::deserialize(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in, bool isGrouped)
  365. {
  366. offset_t marker = in.beginNested();
  367. doDeserializeRowset(*this, deserializer, in, marker, isGrouped);
  368. }
  369. void RtlLinkedDatasetBuilder::finalizeRows()
  370. {
  371. if (count != max)
  372. resize(count);
  373. }
  374. void RtlLinkedDatasetBuilder::finalizeRow(size32_t rowSize)
  375. {
  376. assertex(builder.exists());
  377. const void * next = builder.finalizeRowClear(rowSize);
  378. appendOwn(next);
  379. }
  380. byte * * RtlLinkedDatasetBuilder::linkrows()
  381. {
  382. finalizeRows();
  383. return rtlLinkRowset(rowset);
  384. }
  385. void RtlLinkedDatasetBuilder::expand(size32_t required)
  386. {
  387. assertex(required <= choosenLimit);
  388. //MORE: Next factoring change this so it passes this logic over to the row allocator
  389. size32_t newMax = max ? max : 4;
  390. while (newMax < required)
  391. {
  392. newMax += newMax;
  393. assertex(newMax);
  394. }
  395. if (newMax > choosenLimit)
  396. newMax = choosenLimit;
  397. resize(newMax);
  398. }
  399. void RtlLinkedDatasetBuilder::resize(size32_t required)
  400. {
  401. rowset = rowAllocator->reallocRows(rowset, max, required);
  402. max = required;
  403. }
  404. void appendRowsToRowset(size32_t & targetCount, byte * * & targetRowset, IEngineRowAllocator * rowAllocator, size32_t extraCount, byte * * extraRows)
  405. {
  406. if (extraCount)
  407. {
  408. size32_t prevCount = targetCount;
  409. byte * * expandedRowset = rowAllocator->reallocRows(targetRowset, prevCount, prevCount+extraCount);
  410. for (unsigned i=0; i < extraCount; i++)
  411. expandedRowset[prevCount+i] = (byte *)rowAllocator->linkRow(extraRows[i]);
  412. targetCount = prevCount + extraCount;
  413. targetRowset = expandedRowset;
  414. }
  415. }
  416. inline void doDeserializeRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in, bool isGrouped)
  417. {
  418. RtlLinkedDatasetBuilder builder(_rowAllocator);
  419. builder.deserialize(*deserializer, in, isGrouped);
  420. count = builder.getcount();
  421. rowset = builder.linkrows();
  422. }
  423. extern ECLRTL_API void rtlDeserializeRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in)
  424. {
  425. doDeserializeRowset(count, rowset, _rowAllocator, deserializer, in, false);
  426. }
  427. extern ECLRTL_API void rtlDeserializeGroupedRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in)
  428. {
  429. doDeserializeRowset(count, rowset, _rowAllocator, deserializer, in, true);
  430. }
  431. void rtlSerializeRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  432. {
  433. size32_t marker = out.beginNested();
  434. doSerializeRowset(out, serializer, count, rows, false);
  435. out.endNested(marker);
  436. }
  437. void rtlSerializeGroupedRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  438. {
  439. size32_t marker = out.beginNested();
  440. doSerializeRowset(out, serializer, count, rows, true);
  441. out.endNested(marker);
  442. }
  443. //---------------------------------------------------------------------------
  444. RtlLinkedDictionaryBuilder::RtlLinkedDictionaryBuilder(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo, unsigned _initialSize)
  445. : builder(_rowAllocator, false)
  446. {
  447. init(_rowAllocator, _hashInfo, _initialSize);
  448. }
  449. RtlLinkedDictionaryBuilder::RtlLinkedDictionaryBuilder(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo)
  450. : builder(_rowAllocator, false)
  451. {
  452. init(_rowAllocator, _hashInfo, 8);
  453. }
  454. void RtlLinkedDictionaryBuilder::init(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo, unsigned _initialSize)
  455. {
  456. hash = _hashInfo->queryHash();
  457. compare = _hashInfo->queryCompare();
  458. initialSize = _initialSize;
  459. rowAllocator = LINK(_rowAllocator);
  460. table = NULL;
  461. usedCount = 0;
  462. usedLimit = 0;
  463. tableSize = 0;
  464. }
  465. RtlLinkedDictionaryBuilder::~RtlLinkedDictionaryBuilder()
  466. {
  467. // builder.clear();
  468. if (table)
  469. rowAllocator->releaseRowset(tableSize, table);
  470. ::Release(rowAllocator);
  471. }
  472. void RtlLinkedDictionaryBuilder::append(const void * source)
  473. {
  474. if (source)
  475. {
  476. appendOwn(rowAllocator->linkRow(source));
  477. }
  478. }
  479. void RtlLinkedDictionaryBuilder::appendOwn(const void * source)
  480. {
  481. if (source)
  482. {
  483. checkSpace();
  484. unsigned rowidx = hash->hash(source) % tableSize;
  485. loop
  486. {
  487. const void *entry = table[rowidx];
  488. if (entry && compare->docompare(source, entry)==0)
  489. {
  490. rowAllocator->releaseRow(entry);
  491. usedCount--;
  492. entry = NULL;
  493. }
  494. if (!entry)
  495. {
  496. table[rowidx] = (byte *) source;
  497. usedCount++;
  498. break;
  499. }
  500. rowidx++;
  501. if (rowidx==tableSize)
  502. rowidx = 0;
  503. }
  504. }
  505. }
  506. void RtlLinkedDictionaryBuilder::checkSpace()
  507. {
  508. if (!table)
  509. {
  510. table = rowAllocator->createRowset(initialSize);
  511. tableSize = initialSize;
  512. memset(table, 0, tableSize*sizeof(byte *));
  513. usedLimit = (tableSize * 3) / 4;
  514. usedCount = 0;
  515. }
  516. else if (usedCount > usedLimit)
  517. {
  518. // Rehash
  519. byte * * oldTable = table;
  520. unsigned oldSize = tableSize;
  521. tableSize = tableSize*2;
  522. table = rowAllocator->createRowset(tableSize);
  523. memset(table, 0, tableSize * sizeof(byte *));
  524. usedLimit = (tableSize * 3) / 4;
  525. usedCount = 0;
  526. unsigned i;
  527. for (i = 0; i < oldSize; i++)
  528. {
  529. append(oldTable[i]); // we link the rows here...
  530. }
  531. rowAllocator->releaseRowset(oldSize, oldTable); // ... because this will release them
  532. }
  533. }
  534. void RtlLinkedDictionaryBuilder::appendRows(size32_t num, byte * * rows)
  535. {
  536. // MORE - if we know that the source is already a hashtable, we can optimize the add to an empty table...
  537. for (unsigned i=0; i < num; i++)
  538. append(rows[i]);
  539. }
  540. void RtlLinkedDictionaryBuilder::finalizeRow(size32_t rowSize)
  541. {
  542. assertex(builder.exists());
  543. const void * next = builder.finalizeRowClear(rowSize);
  544. appendOwn(next);
  545. }
  546. //---------------------------------------------------------------------------
  547. //These definitions should be shared with thorcommon, but to do that
  548. //they would need to be moved to an rtlds.ipp header, which thorcommon then included.
  549. class ECLRTL_API CThorRtlRowSerializer : implements IRowSerializerTarget
  550. {
  551. public:
  552. CThorRtlRowSerializer(MemoryBuffer & _buffer) : buffer(_buffer)
  553. {
  554. }
  555. virtual void put(size32_t len, const void * ptr)
  556. {
  557. buffer.append(len, ptr);
  558. }
  559. virtual size32_t beginNested()
  560. {
  561. unsigned pos = buffer.length();
  562. buffer.append((size32_t)0);
  563. return pos;
  564. }
  565. virtual void endNested(size32_t sizePos)
  566. {
  567. unsigned pos = buffer.length();
  568. buffer.rewrite(sizePos);
  569. buffer.append((size32_t)(pos - (sizePos + sizeof(size32_t))));
  570. buffer.rewrite(pos);
  571. }
  572. protected:
  573. MemoryBuffer & buffer;
  574. };
  575. inline void doDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src, bool isGrouped)
  576. {
  577. RtlLinkedDatasetBuilder builder(rowAllocator);
  578. Owned<ISerialStream> stream = createMemorySerialStream(src, lenSrc);
  579. CThorStreamDeserializerSource source(stream);
  580. doDeserializeRowset(builder, *deserializer, source, lenSrc, isGrouped);
  581. count = builder.getcount();
  582. rowset = builder.linkrows();
  583. }
  584. inline void doRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
  585. {
  586. MemoryBuffer buffer;
  587. CThorRtlRowSerializer out(buffer);
  588. doSerializeRowset(out, serializer, count, rows, isGrouped);
  589. rtlFree(tgt);
  590. tlen = buffer.length();
  591. tgt = buffer.detach(); // not strictly speaking correct - it should have been allocated with rtlMalloc();
  592. }
  593. extern ECLRTL_API void rtlDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src, bool isGrouped)
  594. {
  595. doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, isGrouped);
  596. }
  597. extern ECLRTL_API void rtlDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
  598. {
  599. doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, false);
  600. }
  601. extern ECLRTL_API void rtlGroupedDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
  602. {
  603. doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, true);
  604. }
  605. extern ECLRTL_API void rtlRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
  606. {
  607. doRowset2DatasetX(tlen, tgt, serializer, count, rows, isGrouped);
  608. }
  609. extern ECLRTL_API void rtlRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  610. {
  611. doRowset2DatasetX(tlen, tgt, serializer, count, rows, false);
  612. }
  613. extern ECLRTL_API void rtlGroupedRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  614. {
  615. doRowset2DatasetX(tlen, tgt, serializer, count, rows, true);
  616. }
  617. void deserializeRowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer &in)
  618. {
  619. Owned<ISerialStream> stream = createMemoryBufferSerialStream(in);
  620. CThorStreamDeserializerSource rowSource(stream);
  621. doDeserializeRowset(count, rowset, _rowAllocator, deserializer, rowSource, false);
  622. }
  623. void deserializeGroupedRowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer &in)
  624. {
  625. Owned<ISerialStream> stream = createMemoryBufferSerialStream(in);
  626. CThorStreamDeserializerSource rowSource(stream);
  627. doDeserializeRowset(count, rowset, _rowAllocator, deserializer, rowSource, true);
  628. }
  629. void serializeRowsetX(size32_t count, byte * * rows, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
  630. {
  631. CThorRtlRowSerializer out(buffer);
  632. rtlSerializeRowset(out, serializer, count, rows);
  633. }
  634. void serializeGroupedRowsetX(size32_t count, byte * * rows, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
  635. {
  636. CThorRtlRowSerializer out(buffer);
  637. rtlSerializeGroupedRowset(out, serializer, count, rows);
  638. }
  639. void serializeRow(const void * row, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
  640. {
  641. CThorRtlRowSerializer out(buffer);
  642. serializer->serialize(out, static_cast<const byte *>(row));
  643. }
  644. extern ECLRTL_API byte * rtlDeserializeBufferRow(IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer & buffer)
  645. {
  646. Owned<ISerialStream> stream = createMemoryBufferSerialStream(buffer);
  647. CThorStreamDeserializerSource source(stream);
  648. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  649. size32_t rowSize = deserializer->deserialize(rowBuilder, source);
  650. return static_cast<byte *>(const_cast<void *>(rowBuilder.finalizeRowClear(rowSize)));
  651. }
  652. extern ECLRTL_API byte * rtlDeserializeRow(IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, const void * src)
  653. {
  654. Owned<ISerialStream> stream = createMemorySerialStream(src, 0x7fffffff);
  655. CThorStreamDeserializerSource source(stream);
  656. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  657. size32_t rowSize = deserializer->deserialize(rowBuilder, source);
  658. return static_cast<byte *>(const_cast<void *>(rowBuilder.finalizeRowClear(rowSize)));
  659. }
  660. extern ECLRTL_API size32_t rtlSerializeRow(size32_t lenOut, void * out, IOutputRowSerializer * serializer, const void * src)
  661. {
  662. MemoryBuffer buffer;
  663. CThorRtlRowSerializer result(buffer);
  664. buffer.setBuffer(lenOut, out, false);
  665. buffer.setWritePos(0);
  666. serializer->serialize(result, (const byte *)src);
  667. return buffer.length();
  668. }
  669. class ECLRTL_API CThorRtlBuilderSerializer : implements IRowSerializerTarget
  670. {
  671. public:
  672. CThorRtlBuilderSerializer(ARowBuilder & _builder) : builder(_builder)
  673. {
  674. offset = 0;
  675. }
  676. virtual void put(size32_t len, const void * ptr)
  677. {
  678. byte * data = builder.ensureCapacity(offset + len, "");
  679. memcpy(data+offset, ptr, len);
  680. offset += len;
  681. }
  682. virtual size32_t beginNested()
  683. {
  684. unsigned pos = offset;
  685. offset += sizeof(size32_t);
  686. return pos;
  687. }
  688. virtual void endNested(size32_t sizePos)
  689. {
  690. byte * self = builder.getSelf();
  691. *(size32_t *)(self + sizePos) = offset - (sizePos + sizeof(size32_t));
  692. }
  693. inline size32_t length() const { return offset; }
  694. protected:
  695. ARowBuilder & builder;
  696. size32_t offset;
  697. };
  698. extern ECLRTL_API size32_t rtlDeserializeToBuilder(ARowBuilder & builder, IOutputRowDeserializer * deserializer, const void * src)
  699. {
  700. Owned<ISerialStream> stream = createMemorySerialStream(src, 0x7fffffff);
  701. CThorStreamDeserializerSource source(stream);
  702. return deserializer->deserialize(builder, source);
  703. }
  704. extern ECLRTL_API size32_t rtlSerializeToBuilder(ARowBuilder & builder, IOutputRowSerializer * serializer, const void * src)
  705. {
  706. CThorRtlBuilderSerializer target(builder);
  707. serializer->serialize(target, (const byte *)src);
  708. return target.length();
  709. }
  710. //---------------------------------------------------------------------------
  711. RtlDatasetCursor::RtlDatasetCursor(size32_t _len, const void * _data)
  712. {
  713. setDataset(_len, _data);
  714. }
  715. bool RtlDatasetCursor::exists()
  716. {
  717. return (end != buffer);
  718. }
  719. const byte * RtlDatasetCursor::first()
  720. {
  721. if (buffer != end)
  722. cur = buffer;
  723. return cur;
  724. }
  725. const byte * RtlDatasetCursor::get()
  726. {
  727. return cur;
  728. }
  729. void RtlDatasetCursor::setDataset(size32_t _len, const void * _data)
  730. {
  731. buffer = (const byte *)_data;
  732. end = buffer + _len;
  733. cur = NULL;
  734. }
  735. bool RtlDatasetCursor::isValid()
  736. {
  737. return (cur != NULL);
  738. }
  739. /*
  740. const byte * RtlDatasetCursor::next()
  741. {
  742. if (cur)
  743. {
  744. cur += getRowSize();
  745. if (cur >= end)
  746. cur = NULL;
  747. }
  748. return cur;
  749. }
  750. */
  751. //---------------------------------------------------------------------------
  752. RtlFixedDatasetCursor::RtlFixedDatasetCursor(size32_t _len, const void * _data, unsigned _recordSize) : RtlDatasetCursor(_len, _data)
  753. {
  754. recordSize = _recordSize;
  755. }
  756. RtlFixedDatasetCursor::RtlFixedDatasetCursor() : RtlDatasetCursor(0, NULL)
  757. {
  758. recordSize = 1;
  759. }
  760. size32_t RtlFixedDatasetCursor::count()
  761. {
  762. return (size32_t)((end - buffer) / recordSize);
  763. }
  764. size32_t RtlFixedDatasetCursor::getSize()
  765. {
  766. return recordSize;
  767. }
  768. void RtlFixedDatasetCursor::init(size32_t _len, const void * _data, unsigned _recordSize)
  769. {
  770. recordSize = _recordSize;
  771. setDataset(_len, _data);
  772. }
  773. const byte * RtlFixedDatasetCursor::next()
  774. {
  775. if (cur)
  776. {
  777. cur += recordSize;
  778. if (cur >= end)
  779. cur = NULL;
  780. }
  781. return cur;
  782. }
  783. const byte * RtlFixedDatasetCursor::select(unsigned idx)
  784. {
  785. cur = buffer + idx * recordSize;
  786. if (cur >= end)
  787. cur = NULL;
  788. return cur;
  789. }
  790. //---------------------------------------------------------------------------
  791. RtlVariableDatasetCursor::RtlVariableDatasetCursor(size32_t _len, const void * _data, IRecordSize & _recordSize) : RtlDatasetCursor(_len, _data)
  792. {
  793. recordSize = &_recordSize;
  794. }
  795. RtlVariableDatasetCursor::RtlVariableDatasetCursor() : RtlDatasetCursor(0, NULL)
  796. {
  797. recordSize = NULL;
  798. }
  799. void RtlVariableDatasetCursor::init(size32_t _len, const void * _data, IRecordSize & _recordSize)
  800. {
  801. recordSize = &_recordSize;
  802. setDataset(_len, _data);
  803. }
  804. size32_t RtlVariableDatasetCursor::count()
  805. {
  806. const byte * finger = buffer;
  807. unsigned c = 0;
  808. while (finger < end)
  809. {
  810. finger += recordSize->getRecordSize(finger);
  811. c++;
  812. }
  813. assertex(finger == end);
  814. return c;
  815. }
  816. size32_t RtlVariableDatasetCursor::getSize()
  817. {
  818. return recordSize->getRecordSize(cur);
  819. }
  820. const byte * RtlVariableDatasetCursor::next()
  821. {
  822. if (cur)
  823. {
  824. cur += recordSize->getRecordSize(cur);
  825. if (cur >= end)
  826. cur = NULL;
  827. }
  828. return cur;
  829. }
  830. const byte * RtlVariableDatasetCursor::select(unsigned idx)
  831. {
  832. const byte * finger = buffer;
  833. unsigned c = 0;
  834. while (finger < end)
  835. {
  836. if (c == idx)
  837. {
  838. cur = finger;
  839. return cur;
  840. }
  841. finger += recordSize->getRecordSize(finger);
  842. c++;
  843. }
  844. assertex(finger == end);
  845. cur = NULL;
  846. return NULL;
  847. }
  848. //---------------------------------------------------------------------------
  849. RtlLinkedDatasetCursor::RtlLinkedDatasetCursor(unsigned _numRows, byte * * _rows) : numRows(_numRows), rows(_rows)
  850. {
  851. cur = (unsigned)-1;
  852. }
  853. RtlLinkedDatasetCursor::RtlLinkedDatasetCursor()
  854. {
  855. numRows = 0;
  856. rows = NULL;
  857. cur = (unsigned)-1;
  858. }
  859. void RtlLinkedDatasetCursor::init(unsigned _numRows, byte * * _rows)
  860. {
  861. numRows = _numRows;
  862. rows = _rows;
  863. cur = (unsigned)-1;
  864. }
  865. const byte * RtlLinkedDatasetCursor::first()
  866. {
  867. cur = 0;
  868. return cur < numRows ? rows[cur] : NULL;
  869. }
  870. const byte * RtlLinkedDatasetCursor::get()
  871. {
  872. return cur < numRows ? rows[cur] : NULL;
  873. }
  874. bool RtlLinkedDatasetCursor::isValid()
  875. {
  876. return (cur < numRows);
  877. }
  878. const byte * RtlLinkedDatasetCursor::next()
  879. {
  880. if (cur < numRows)
  881. cur++;
  882. return cur < numRows ? rows[cur] : NULL;
  883. }
  884. const byte * RtlLinkedDatasetCursor::select(unsigned idx)
  885. {
  886. cur = idx;
  887. return cur < numRows ? rows[cur] : NULL;
  888. }
  889. //---------------------------------------------------------------------------
  890. bool rtlCheckInList(const void * lhs, IRtlDatasetCursor * cursor, ICompare * compare)
  891. {
  892. const byte * cur;
  893. for (cur = cursor->first(); cur; cur = cursor->next())
  894. {
  895. if (compare->docompare(lhs, cur) == 0)
  896. return true;
  897. }
  898. return false;
  899. }
  900. void rtlSetToSetX(bool & outIsAll, size32_t & outLen, void * & outData, bool inIsAll, size32_t inLen, void * inData)
  901. {
  902. outIsAll = inIsAll;
  903. outLen = inLen;
  904. outData = malloc(inLen);
  905. memcpy(outData, inData, inLen);
  906. }
  907. void rtlAppendSetX(bool & outIsAll, size32_t & outLen, void * & outData, bool leftIsAll, size32_t leftLen, void * leftData, bool rightIsAll, size32_t rightLen, void * rightData)
  908. {
  909. outIsAll = leftIsAll | rightIsAll;
  910. if (outIsAll)
  911. {
  912. outLen = 0;
  913. outData = NULL;
  914. }
  915. else
  916. {
  917. outLen = leftLen+rightLen;
  918. outData = malloc(outLen);
  919. memcpy(outData, leftData, leftLen);
  920. memcpy((byte*)outData+leftLen, rightData, rightLen);
  921. }
  922. }
  923. //------------------------------------------------------------------------------
  924. RtlCompoundIterator::RtlCompoundIterator()
  925. {
  926. ok = false;
  927. numLevels = 0;
  928. iters = NULL;
  929. cursors = NULL;
  930. }
  931. RtlCompoundIterator::~RtlCompoundIterator()
  932. {
  933. delete [] iters;
  934. delete [] cursors;
  935. }
  936. void RtlCompoundIterator::addIter(unsigned idx, IRtlDatasetSimpleCursor * iter, byte * * cursor)
  937. {
  938. assertex(idx < numLevels);
  939. iters[idx] = iter;
  940. cursors[idx] = cursor;
  941. }
  942. void RtlCompoundIterator::init(unsigned _numLevels)
  943. {
  944. numLevels = _numLevels;
  945. iters = new IRtlDatasetSimpleCursor * [numLevels];
  946. cursors = new byte * * [numLevels];
  947. }
  948. //Could either duplicate this function, N times, or have it as a helper function that accesses pre-defined virtuals.
  949. bool RtlCompoundIterator::first(unsigned level)
  950. {
  951. IRtlDatasetSimpleCursor * curIter = iters[level];
  952. if (level == 0)
  953. {
  954. const byte * cur = curIter->first();
  955. setCursor(level, cur);
  956. return (cur != NULL);
  957. }
  958. if (!first(level-1))
  959. return false;
  960. loop
  961. {
  962. const byte * cur = curIter->first();
  963. if (cur)
  964. {
  965. setCursor(level, cur);
  966. return true;
  967. }
  968. if (!next(level-1))
  969. return false;
  970. }
  971. }
  972. bool RtlCompoundIterator::next(unsigned level)
  973. {
  974. IRtlDatasetSimpleCursor * curIter = iters[level];
  975. const byte * cur = curIter->next();
  976. if (cur)
  977. {
  978. setCursor(level, cur);
  979. return true;
  980. }
  981. if (level == 0)
  982. return false;
  983. loop
  984. {
  985. if (!next(level-1))
  986. return false;
  987. const byte * cur = curIter->first();
  988. if (cur)
  989. {
  990. setCursor(level, cur);
  991. return true;
  992. }
  993. }
  994. }
  995. //------------------------------------------------------------------------------
  996. void RtlSimpleIterator::addIter(unsigned idx, IRtlDatasetSimpleCursor * _iter, byte * * _cursor)
  997. {
  998. assertex(idx == 0);
  999. iter = _iter;
  1000. cursor = _cursor;
  1001. *cursor = NULL;
  1002. }
  1003. bool RtlSimpleIterator::first()
  1004. {
  1005. const byte * cur = iter->first();
  1006. *cursor = (byte *)cur;
  1007. return (cur != NULL);
  1008. }
  1009. bool RtlSimpleIterator::next()
  1010. {
  1011. const byte * cur = iter->next();
  1012. *cursor = (byte *)cur;
  1013. return (cur != NULL);
  1014. }