rtlds.cpp 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "platform.h"
  15. #include "jliball.hpp"
  16. #include "eclrtl.hpp"
  17. #include "eclhelper.hpp"
  18. #include "rtlds_imp.hpp"
  19. #include "rtlread_imp.hpp"
  20. #define FIRST_CHUNK_SIZE 0x100
  21. #define DOUBLE_LIMIT 0x10000 // must be a power of 2
  22. unsigned getNextSize(unsigned max, unsigned required)
  23. {
  24. if (required > DOUBLE_LIMIT)
  25. {
  26. max = (required + DOUBLE_LIMIT) & ~(DOUBLE_LIMIT-1);
  27. if (required >= max)
  28. throw MakeStringException(-1, "getNextSize: Request for %d bytes oldMax = %d", required, max);
  29. }
  30. else
  31. {
  32. if (max == 0)
  33. max = FIRST_CHUNK_SIZE;
  34. while (required >= max)
  35. max += max;
  36. }
  37. return max;
  38. }
  39. //---------------------------------------------------------------------------
  40. RtlDatasetBuilder::RtlDatasetBuilder()
  41. {
  42. maxSize = 0;
  43. buffer = NULL;
  44. totalSize = 0;
  45. }
  46. RtlDatasetBuilder::~RtlDatasetBuilder()
  47. {
  48. free(buffer);
  49. }
  50. void RtlDatasetBuilder::ensure(size32_t required)
  51. {
  52. if (required > maxSize)
  53. {
  54. maxSize = getNextSize(maxSize, required);
  55. buffer = (byte *)realloc(buffer, maxSize);
  56. if (!buffer)
  57. throw MakeStringException(-1, "Failed to allocate temporary dataset (requesting %d bytes)", maxSize);
  58. }
  59. }
  60. byte * RtlDatasetBuilder::ensureCapacity(size32_t required, const char * fieldName)
  61. {
  62. ensure(totalSize + required);
  63. return buffer + totalSize;
  64. }
  65. void RtlDatasetBuilder::flushDataset()
  66. {
  67. }
  68. void RtlDatasetBuilder::getData(size32_t & len, void * & data)
  69. {
  70. flushDataset();
  71. len = totalSize;
  72. data = malloc(totalSize);
  73. memcpy(data, buffer, totalSize);
  74. }
  75. size32_t RtlDatasetBuilder::getSize()
  76. {
  77. flushDataset();
  78. return totalSize;
  79. }
  80. byte * RtlDatasetBuilder::queryData()
  81. {
  82. flushDataset();
  83. return buffer;
  84. }
  85. void RtlDatasetBuilder::reportMissingRow() const
  86. {
  87. throw MakeStringException(MSGAUD_user, 1000, "RtlDatasetBuilder::row() is NULL");
  88. }
  89. //---------------------------------------------------------------------------
  90. RtlFixedDatasetBuilder::RtlFixedDatasetBuilder(unsigned _recordSize, unsigned maxRows)
  91. {
  92. recordSize = _recordSize;
  93. if ((int)maxRows > 0)
  94. ensure(recordSize * maxRows);
  95. }
  96. byte * RtlFixedDatasetBuilder::createSelf()
  97. {
  98. self = ensureCapacity(recordSize, NULL);
  99. return self;
  100. }
  101. //---------------------------------------------------------------------------
  102. RtlLimitedFixedDatasetBuilder::RtlLimitedFixedDatasetBuilder(unsigned _recordSize, unsigned _maxRows, DefaultRowCreator _rowCreator, IResourceContext *_ctx) : RtlFixedDatasetBuilder(_recordSize, _maxRows)
  103. {
  104. maxRows = _maxRows;
  105. if ((int)maxRows < 0) maxRows = 0;
  106. rowCreator = _rowCreator;
  107. ctx = _ctx;
  108. }
  109. byte * RtlLimitedFixedDatasetBuilder::createRow()
  110. {
  111. if (totalSize >= maxRows * recordSize)
  112. return NULL;
  113. return RtlFixedDatasetBuilder::createRow();
  114. }
  115. void RtlLimitedFixedDatasetBuilder::flushDataset()
  116. {
  117. if (rowCreator)
  118. {
  119. while (totalSize < maxRows * recordSize)
  120. {
  121. createRow();
  122. size32_t size = rowCreator(rowBuilder(), ctx);
  123. finalizeRow(size);
  124. }
  125. }
  126. RtlFixedDatasetBuilder::flushDataset();
  127. }
  128. //---------------------------------------------------------------------------
  129. RtlVariableDatasetBuilder::RtlVariableDatasetBuilder(IRecordSize & _recordSize)
  130. {
  131. recordSize = &_recordSize;
  132. maxRowSize = recordSize->getRecordSize(NULL); // initial size
  133. }
  134. byte * RtlVariableDatasetBuilder::createSelf()
  135. {
  136. self = ensureCapacity(maxRowSize, NULL);
  137. return self;
  138. }
  139. void RtlVariableDatasetBuilder::deserializeRow(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in)
  140. {
  141. createRow();
  142. size32_t rowSize = deserializer.deserialize(rowBuilder(), in);
  143. finalizeRow(rowSize);
  144. }
  145. //---------------------------------------------------------------------------
  146. RtlLimitedVariableDatasetBuilder::RtlLimitedVariableDatasetBuilder(IRecordSize & _recordSize, unsigned _maxRows, DefaultRowCreator _rowCreator, IResourceContext *_ctx) : RtlVariableDatasetBuilder(_recordSize)
  147. {
  148. numRows = 0;
  149. maxRows = _maxRows;
  150. rowCreator = _rowCreator;
  151. ctx = _ctx;
  152. }
  153. byte * RtlLimitedVariableDatasetBuilder::createRow()
  154. {
  155. if (numRows >= maxRows)
  156. return NULL;
  157. numRows++;
  158. return RtlVariableDatasetBuilder::createRow();
  159. }
  160. void RtlLimitedVariableDatasetBuilder::flushDataset()
  161. {
  162. if (rowCreator)
  163. {
  164. while (numRows < maxRows)
  165. {
  166. createRow();
  167. size32_t thisSize = rowCreator(rowBuilder(), ctx);
  168. finalizeRow(thisSize);
  169. }
  170. }
  171. RtlVariableDatasetBuilder::flushDataset();
  172. }
  173. //---------------------------------------------------------------------------
  174. byte * * rtlRowsAttr::linkrows() const
  175. {
  176. if (rows)
  177. rtlLinkRowset(rows);
  178. return rows;
  179. }
  180. void rtlRowsAttr::set(size32_t _count, byte * * _rows)
  181. {
  182. setown(_count, rtlLinkRowset(_rows));
  183. }
  184. void rtlRowsAttr::setRow(IEngineRowAllocator * rowAllocator, const byte * _row)
  185. {
  186. setown(1, rowAllocator->appendRowOwn(NULL, 1, rowAllocator->linkRow(_row)));
  187. }
  188. void rtlRowsAttr::setown(size32_t _count, byte * * _rows)
  189. {
  190. dispose();
  191. count = _count;
  192. rows = _rows;
  193. }
  194. void rtlRowsAttr::dispose()
  195. {
  196. if (rows)
  197. rtlReleaseRowset(count, rows);
  198. }
  199. //---------------------------------------------------------------------------
  200. void rtlReportFieldOverflow(unsigned size, unsigned max, const RtlFieldInfo * field)
  201. {
  202. if (field)
  203. rtlReportFieldOverflow(size, max, field->name->str());
  204. else
  205. rtlReportRowOverflow(size, max);
  206. }
  207. void RtlRowBuilderBase::reportMissingRow() const
  208. {
  209. throw MakeStringException(MSGAUD_user, 1000, "RtlRowBuilderBase::row() is NULL");
  210. }
  211. byte * RtlDynamicRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
  212. {
  213. if (required > maxLength)
  214. {
  215. if (!self)
  216. create();
  217. if (required > maxLength)
  218. {
  219. void * next = rowAllocator->resizeRow(required, self, maxLength);
  220. if (!next)
  221. {
  222. rtlReportFieldOverflow(required, maxLength, fieldName);
  223. return NULL;
  224. }
  225. self = static_cast<byte *>(next);
  226. }
  227. }
  228. return self;
  229. }
  230. void RtlDynamicRowBuilder::swapWith(RtlDynamicRowBuilder & other)
  231. {
  232. size32_t savedMaxLength = maxLength;
  233. void * savedSelf = getUnfinalizedClear();
  234. setown(other.getMaxLength(), other.getUnfinalizedClear());
  235. other.setown(savedMaxLength, savedSelf);
  236. }
  237. //---------------------------------------------------------------------------
  238. byte * RtlStaticRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
  239. {
  240. if (required <= maxLength)
  241. return static_cast<byte *>(self);
  242. rtlReportFieldOverflow(required, maxLength, fieldName);
  243. return NULL;
  244. }
  245. byte * RtlStaticRowBuilder::createSelf()
  246. {
  247. throwUnexpected();
  248. }
  249. //---------------------------------------------------------------------------
  250. RtlLinkedDatasetBuilder::RtlLinkedDatasetBuilder(IEngineRowAllocator * _rowAllocator, int _choosenLimit) : builder(_rowAllocator, false)
  251. {
  252. rowAllocator = LINK(_rowAllocator);
  253. rowset = NULL;
  254. count = 0;
  255. max = 0;
  256. choosenLimit = (unsigned)_choosenLimit;
  257. }
  258. RtlLinkedDatasetBuilder::~RtlLinkedDatasetBuilder()
  259. {
  260. builder.clear();
  261. if (rowset)
  262. rowAllocator->releaseRowset(count, rowset);
  263. ::Release(rowAllocator);
  264. }
  265. void RtlLinkedDatasetBuilder::append(const void * source)
  266. {
  267. if (count < choosenLimit)
  268. {
  269. ensure(count+1);
  270. rowset[count] = (byte *)rowAllocator->linkRow(source);
  271. count++;
  272. }
  273. }
  274. void RtlLinkedDatasetBuilder::appendRows(size32_t num, byte * * rows)
  275. {
  276. if (num && (count < choosenLimit))
  277. {
  278. unsigned numToAdd = (count + num < choosenLimit) ? num : choosenLimit - count;
  279. ensure(count+numToAdd);
  280. for (unsigned i=0; i < numToAdd; i++)
  281. rowset[count+i] = (byte *)rowAllocator->linkRow(rows[i]);
  282. count += numToAdd;
  283. }
  284. }
  285. void RtlLinkedDatasetBuilder::appendOwn(const void * row)
  286. {
  287. assertex(!builder.exists());
  288. if (count < choosenLimit)
  289. {
  290. ensure(count+1);
  291. rowset[count] = (byte *)row;
  292. count++;
  293. }
  294. else
  295. rowAllocator->releaseRow(row);
  296. }
  297. byte * RtlLinkedDatasetBuilder::createRow()
  298. {
  299. if (count >= choosenLimit)
  300. return NULL;
  301. return builder.getSelf();
  302. }
  303. //cloned from thorcommon.cpp
  304. class RtlChildRowLinkerWalker : implements IIndirectMemberVisitor
  305. {
  306. public:
  307. virtual void visitRowset(size32_t count, byte * * rows)
  308. {
  309. rtlLinkRowset(rows);
  310. }
  311. virtual void visitRow(const byte * row)
  312. {
  313. rtlLinkRow(row);
  314. }
  315. };
  316. void RtlLinkedDatasetBuilder::cloneRow(size32_t len, const void * row)
  317. {
  318. if (count >= choosenLimit)
  319. return;
  320. byte * self = builder.ensureCapacity(len, NULL);
  321. memcpy(self, row, len);
  322. IOutputMetaData * meta = rowAllocator->queryOutputMeta();
  323. if (meta->getMetaFlags() & MDFneedserialize)
  324. {
  325. RtlChildRowLinkerWalker walker;
  326. meta->walkIndirectMembers(self, walker);
  327. }
  328. finalizeRow(len);
  329. }
  330. void RtlLinkedDatasetBuilder::deserializeRow(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in)
  331. {
  332. builder.ensureRow();
  333. size32_t rowSize = deserializer.deserialize(builder, in);
  334. finalizeRow(rowSize);
  335. }
  336. inline void doDeserializeRowset(RtlLinkedDatasetBuilder & builder, IOutputRowDeserializer & deserializer, IRowDeserializerSource & in, offset_t marker, bool isGrouped)
  337. {
  338. byte eogPending = false;
  339. while (!in.finishedNested(marker))
  340. {
  341. if (isGrouped && eogPending)
  342. builder.appendEOG();
  343. builder.deserializeRow(deserializer, in);
  344. if (isGrouped)
  345. in.read(1, &eogPending);
  346. }
  347. }
  348. inline void doSerializeRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
  349. {
  350. for (unsigned i=0; i < count; i++)
  351. {
  352. byte *row = rows[i];
  353. if (row) // When serializing a dictionary, there may be nulls in the rowset. These can be skipped (we rehash on deserialize)
  354. {
  355. serializer->serialize(out, rows[i]);
  356. if (isGrouped)
  357. {
  358. byte eogPending = (i+1 < count) && (rows[i+1] == NULL);
  359. out.put(1, &eogPending);
  360. }
  361. }
  362. }
  363. }
  364. void RtlLinkedDatasetBuilder::deserialize(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in, bool isGrouped)
  365. {
  366. offset_t marker = in.beginNested();
  367. doDeserializeRowset(*this, deserializer, in, marker, isGrouped);
  368. }
  369. void RtlLinkedDatasetBuilder::finalizeRows()
  370. {
  371. if (count != max)
  372. resize(count);
  373. }
  374. void RtlLinkedDatasetBuilder::finalizeRow(size32_t rowSize)
  375. {
  376. assertex(builder.exists());
  377. const void * next = builder.finalizeRowClear(rowSize);
  378. appendOwn(next);
  379. }
  380. byte * * RtlLinkedDatasetBuilder::linkrows()
  381. {
  382. finalizeRows();
  383. return rtlLinkRowset(rowset);
  384. }
  385. void RtlLinkedDatasetBuilder::expand(size32_t required)
  386. {
  387. assertex(required <= choosenLimit);
  388. //MORE: Next factoring change this so it passes this logic over to the row allocator
  389. size32_t newMax = max ? max : 4;
  390. while (newMax < required)
  391. {
  392. newMax += newMax;
  393. assertex(newMax);
  394. }
  395. if (newMax > choosenLimit)
  396. newMax = choosenLimit;
  397. resize(newMax);
  398. }
  399. void RtlLinkedDatasetBuilder::resize(size32_t required)
  400. {
  401. rowset = rowAllocator->reallocRows(rowset, max, required);
  402. max = required;
  403. }
  404. void appendRowsToRowset(size32_t & targetCount, byte * * & targetRowset, IEngineRowAllocator * rowAllocator, size32_t extraCount, byte * * extraRows)
  405. {
  406. if (extraCount)
  407. {
  408. size32_t prevCount = targetCount;
  409. byte * * expandedRowset = rowAllocator->reallocRows(targetRowset, prevCount, prevCount+extraCount);
  410. for (unsigned i=0; i < extraCount; i++)
  411. expandedRowset[prevCount+i] = (byte *)rowAllocator->linkRow(extraRows[i]);
  412. targetCount = prevCount + extraCount;
  413. targetRowset = expandedRowset;
  414. }
  415. }
  416. inline void doDeserializeRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in, bool isGrouped)
  417. {
  418. RtlLinkedDatasetBuilder builder(_rowAllocator);
  419. builder.deserialize(*deserializer, in, isGrouped);
  420. count = builder.getcount();
  421. rowset = builder.linkrows();
  422. }
  423. extern ECLRTL_API void rtlDeserializeRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in)
  424. {
  425. doDeserializeRowset(count, rowset, _rowAllocator, deserializer, in, false);
  426. }
  427. extern ECLRTL_API void rtlDeserializeGroupedRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in)
  428. {
  429. doDeserializeRowset(count, rowset, _rowAllocator, deserializer, in, true);
  430. }
  431. void rtlSerializeRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  432. {
  433. size32_t marker = out.beginNested();
  434. doSerializeRowset(out, serializer, count, rows, false);
  435. out.endNested(marker);
  436. }
  437. void rtlSerializeGroupedRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  438. {
  439. size32_t marker = out.beginNested();
  440. doSerializeRowset(out, serializer, count, rows, true);
  441. out.endNested(marker);
  442. }
  443. //---------------------------------------------------------------------------
  444. RtlLinkedDictionaryBuilder::RtlLinkedDictionaryBuilder(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo, unsigned _initialSize)
  445. : builder(_rowAllocator, false)
  446. {
  447. init(_rowAllocator, _hashInfo, _initialSize);
  448. }
  449. RtlLinkedDictionaryBuilder::RtlLinkedDictionaryBuilder(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo)
  450. : builder(_rowAllocator, false)
  451. {
  452. init(_rowAllocator, _hashInfo, 8);
  453. }
  454. void RtlLinkedDictionaryBuilder::init(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo, unsigned _initialSize)
  455. {
  456. hash = _hashInfo->queryHash();
  457. compare = _hashInfo->queryCompare();
  458. initialSize = _initialSize;
  459. rowAllocator = LINK(_rowAllocator);
  460. table = NULL;
  461. usedCount = 0;
  462. usedLimit = 0;
  463. tableSize = 0;
  464. }
  465. RtlLinkedDictionaryBuilder::~RtlLinkedDictionaryBuilder()
  466. {
  467. // builder.clear();
  468. if (table)
  469. rowAllocator->releaseRowset(tableSize, table);
  470. ::Release(rowAllocator);
  471. }
  472. void RtlLinkedDictionaryBuilder::append(const void * source)
  473. {
  474. if (source)
  475. {
  476. appendOwn(rowAllocator->linkRow(source));
  477. }
  478. }
  479. void RtlLinkedDictionaryBuilder::appendOwn(const void * source)
  480. {
  481. if (source)
  482. {
  483. checkSpace();
  484. unsigned rowidx = hash->hash(source) % tableSize;
  485. loop
  486. {
  487. const void *entry = table[rowidx];
  488. if (entry && compare->docompare(source, entry)==0)
  489. {
  490. rowAllocator->releaseRow(entry);
  491. usedCount--;
  492. entry = NULL;
  493. }
  494. if (!entry)
  495. {
  496. table[rowidx] = (byte *) source;
  497. usedCount++;
  498. break;
  499. }
  500. rowidx++;
  501. if (rowidx==tableSize)
  502. rowidx = 0;
  503. }
  504. }
  505. }
  506. void RtlLinkedDictionaryBuilder::checkSpace()
  507. {
  508. if (!table)
  509. {
  510. table = rowAllocator->createRowset(initialSize);
  511. tableSize = initialSize;
  512. memset(table, 0, tableSize*sizeof(byte *));
  513. usedLimit = (tableSize * 3) / 4;
  514. usedCount = 0;
  515. }
  516. else if (usedCount > usedLimit)
  517. {
  518. // Rehash
  519. byte * * oldTable = table;
  520. unsigned oldSize = tableSize;
  521. table = rowAllocator->createRowset(tableSize*2);
  522. tableSize = tableSize*2; // Don't update until we have successfully allocated, so that we remain consistent if createRowset throws an exception.
  523. memset(table, 0, tableSize * sizeof(byte *));
  524. usedLimit = (tableSize * 3) / 4;
  525. usedCount = 0;
  526. unsigned i;
  527. for (i = 0; i < oldSize; i++)
  528. {
  529. append(oldTable[i]); // we link the rows here...
  530. }
  531. rowAllocator->releaseRowset(oldSize, oldTable); // ... because this will release them
  532. }
  533. }
  534. void RtlLinkedDictionaryBuilder::appendRows(size32_t num, byte * * rows)
  535. {
  536. // MORE - if we know that the source is already a hashtable, we can optimize the add to an empty table...
  537. for (unsigned i=0; i < num; i++)
  538. append(rows[i]);
  539. }
  540. void RtlLinkedDictionaryBuilder::finalizeRow(size32_t rowSize)
  541. {
  542. assertex(builder.exists());
  543. const void * next = builder.finalizeRowClear(rowSize);
  544. appendOwn(next);
  545. }
  546. extern ECLRTL_API byte *rtlDictionaryLookup(IHThorHashLookupInfo &hashInfo, size32_t tableSize, byte **table, const byte *source, byte *defaultRow)
  547. {
  548. IHash *hash = hashInfo.queryHash();
  549. ICompare *compare = hashInfo.queryCompare();
  550. unsigned rowidx = hash->hash(source) % tableSize;
  551. loop
  552. {
  553. const void *entry = table[rowidx];
  554. if (!entry)
  555. return (byte *) rtlLinkRow(defaultRow);
  556. if (compare->docompare(source, entry)==0)
  557. return (byte *) rtlLinkRow(entry);
  558. rowidx++;
  559. if (rowidx==tableSize)
  560. rowidx = 0;
  561. }
  562. }
  563. //---------------------------------------------------------------------------
  564. //These definitions should be shared with thorcommon, but to do that
  565. //they would need to be moved to an rtlds.ipp header, which thorcommon then included.
  566. class ECLRTL_API CThorRtlRowSerializer : implements IRowSerializerTarget
  567. {
  568. public:
  569. CThorRtlRowSerializer(MemoryBuffer & _buffer) : buffer(_buffer)
  570. {
  571. }
  572. virtual void put(size32_t len, const void * ptr)
  573. {
  574. buffer.append(len, ptr);
  575. }
  576. virtual size32_t beginNested()
  577. {
  578. unsigned pos = buffer.length();
  579. buffer.append((size32_t)0);
  580. return pos;
  581. }
  582. virtual void endNested(size32_t sizePos)
  583. {
  584. unsigned pos = buffer.length();
  585. buffer.rewrite(sizePos);
  586. buffer.append((size32_t)(pos - (sizePos + sizeof(size32_t))));
  587. buffer.rewrite(pos);
  588. }
  589. protected:
  590. MemoryBuffer & buffer;
  591. };
  592. inline void doDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src, bool isGrouped)
  593. {
  594. RtlLinkedDatasetBuilder builder(rowAllocator);
  595. Owned<ISerialStream> stream = createMemorySerialStream(src, lenSrc);
  596. CThorStreamDeserializerSource source(stream);
  597. doDeserializeRowset(builder, *deserializer, source, lenSrc, isGrouped);
  598. count = builder.getcount();
  599. rowset = builder.linkrows();
  600. }
  601. inline void doRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
  602. {
  603. MemoryBuffer buffer;
  604. CThorRtlRowSerializer out(buffer);
  605. doSerializeRowset(out, serializer, count, rows, isGrouped);
  606. rtlFree(tgt);
  607. tlen = buffer.length();
  608. tgt = buffer.detach(); // not strictly speaking correct - it should have been allocated with rtlMalloc();
  609. }
  610. extern ECLRTL_API void rtlDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src, bool isGrouped)
  611. {
  612. doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, isGrouped);
  613. }
  614. extern ECLRTL_API void rtlDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
  615. {
  616. doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, false);
  617. }
  618. extern ECLRTL_API void rtlGroupedDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
  619. {
  620. doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, true);
  621. }
  622. extern ECLRTL_API void rtlRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
  623. {
  624. doRowset2DatasetX(tlen, tgt, serializer, count, rows, isGrouped);
  625. }
  626. extern ECLRTL_API void rtlRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  627. {
  628. doRowset2DatasetX(tlen, tgt, serializer, count, rows, false);
  629. }
  630. extern ECLRTL_API void rtlGroupedRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  631. {
  632. doRowset2DatasetX(tlen, tgt, serializer, count, rows, true);
  633. }
  634. void deserializeRowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer &in)
  635. {
  636. Owned<ISerialStream> stream = createMemoryBufferSerialStream(in);
  637. CThorStreamDeserializerSource rowSource(stream);
  638. doDeserializeRowset(count, rowset, _rowAllocator, deserializer, rowSource, false);
  639. }
  640. void deserializeGroupedRowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer &in)
  641. {
  642. Owned<ISerialStream> stream = createMemoryBufferSerialStream(in);
  643. CThorStreamDeserializerSource rowSource(stream);
  644. doDeserializeRowset(count, rowset, _rowAllocator, deserializer, rowSource, true);
  645. }
  646. void serializeRowsetX(size32_t count, byte * * rows, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
  647. {
  648. CThorRtlRowSerializer out(buffer);
  649. rtlSerializeRowset(out, serializer, count, rows);
  650. }
  651. void serializeGroupedRowsetX(size32_t count, byte * * rows, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
  652. {
  653. CThorRtlRowSerializer out(buffer);
  654. rtlSerializeGroupedRowset(out, serializer, count, rows);
  655. }
  656. void serializeRow(const void * row, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
  657. {
  658. CThorRtlRowSerializer out(buffer);
  659. serializer->serialize(out, static_cast<const byte *>(row));
  660. }
  661. extern ECLRTL_API byte * rtlDeserializeBufferRow(IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer & buffer)
  662. {
  663. Owned<ISerialStream> stream = createMemoryBufferSerialStream(buffer);
  664. CThorStreamDeserializerSource source(stream);
  665. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  666. size32_t rowSize = deserializer->deserialize(rowBuilder, source);
  667. return static_cast<byte *>(const_cast<void *>(rowBuilder.finalizeRowClear(rowSize)));
  668. }
  669. extern ECLRTL_API byte * rtlDeserializeRow(IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, const void * src)
  670. {
  671. Owned<ISerialStream> stream = createMemorySerialStream(src, 0x7fffffff);
  672. CThorStreamDeserializerSource source(stream);
  673. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  674. size32_t rowSize = deserializer->deserialize(rowBuilder, source);
  675. return static_cast<byte *>(const_cast<void *>(rowBuilder.finalizeRowClear(rowSize)));
  676. }
  677. extern ECLRTL_API size32_t rtlSerializeRow(size32_t lenOut, void * out, IOutputRowSerializer * serializer, const void * src)
  678. {
  679. MemoryBuffer buffer;
  680. CThorRtlRowSerializer result(buffer);
  681. buffer.setBuffer(lenOut, out, false);
  682. buffer.setWritePos(0);
  683. serializer->serialize(result, (const byte *)src);
  684. return buffer.length();
  685. }
  686. class ECLRTL_API CThorRtlBuilderSerializer : implements IRowSerializerTarget
  687. {
  688. public:
  689. CThorRtlBuilderSerializer(ARowBuilder & _builder) : builder(_builder)
  690. {
  691. offset = 0;
  692. }
  693. virtual void put(size32_t len, const void * ptr)
  694. {
  695. byte * data = builder.ensureCapacity(offset + len, "");
  696. memcpy(data+offset, ptr, len);
  697. offset += len;
  698. }
  699. virtual size32_t beginNested()
  700. {
  701. unsigned pos = offset;
  702. offset += sizeof(size32_t);
  703. return pos;
  704. }
  705. virtual void endNested(size32_t sizePos)
  706. {
  707. byte * self = builder.getSelf();
  708. *(size32_t *)(self + sizePos) = offset - (sizePos + sizeof(size32_t));
  709. }
  710. inline size32_t length() const { return offset; }
  711. protected:
  712. ARowBuilder & builder;
  713. size32_t offset;
  714. };
  715. extern ECLRTL_API size32_t rtlDeserializeToBuilder(ARowBuilder & builder, IOutputRowDeserializer * deserializer, const void * src)
  716. {
  717. Owned<ISerialStream> stream = createMemorySerialStream(src, 0x7fffffff);
  718. CThorStreamDeserializerSource source(stream);
  719. return deserializer->deserialize(builder, source);
  720. }
  721. extern ECLRTL_API size32_t rtlSerializeToBuilder(ARowBuilder & builder, IOutputRowSerializer * serializer, const void * src)
  722. {
  723. CThorRtlBuilderSerializer target(builder);
  724. serializer->serialize(target, (const byte *)src);
  725. return target.length();
  726. }
  727. //---------------------------------------------------------------------------
  728. RtlDatasetCursor::RtlDatasetCursor(size32_t _len, const void * _data)
  729. {
  730. setDataset(_len, _data);
  731. }
  732. bool RtlDatasetCursor::exists()
  733. {
  734. return (end != buffer);
  735. }
  736. const byte * RtlDatasetCursor::first()
  737. {
  738. if (buffer != end)
  739. cur = buffer;
  740. return cur;
  741. }
  742. const byte * RtlDatasetCursor::get()
  743. {
  744. return cur;
  745. }
  746. void RtlDatasetCursor::setDataset(size32_t _len, const void * _data)
  747. {
  748. buffer = (const byte *)_data;
  749. end = buffer + _len;
  750. cur = NULL;
  751. }
  752. bool RtlDatasetCursor::isValid()
  753. {
  754. return (cur != NULL);
  755. }
  756. /*
  757. const byte * RtlDatasetCursor::next()
  758. {
  759. if (cur)
  760. {
  761. cur += getRowSize();
  762. if (cur >= end)
  763. cur = NULL;
  764. }
  765. return cur;
  766. }
  767. */
  768. //---------------------------------------------------------------------------
  769. RtlFixedDatasetCursor::RtlFixedDatasetCursor(size32_t _len, const void * _data, unsigned _recordSize) : RtlDatasetCursor(_len, _data)
  770. {
  771. recordSize = _recordSize;
  772. }
  773. RtlFixedDatasetCursor::RtlFixedDatasetCursor() : RtlDatasetCursor(0, NULL)
  774. {
  775. recordSize = 1;
  776. }
  777. size32_t RtlFixedDatasetCursor::count()
  778. {
  779. return (size32_t)((end - buffer) / recordSize);
  780. }
  781. size32_t RtlFixedDatasetCursor::getSize()
  782. {
  783. return recordSize;
  784. }
  785. void RtlFixedDatasetCursor::init(size32_t _len, const void * _data, unsigned _recordSize)
  786. {
  787. recordSize = _recordSize;
  788. setDataset(_len, _data);
  789. }
  790. const byte * RtlFixedDatasetCursor::next()
  791. {
  792. if (cur)
  793. {
  794. cur += recordSize;
  795. if (cur >= end)
  796. cur = NULL;
  797. }
  798. return cur;
  799. }
  800. const byte * RtlFixedDatasetCursor::select(unsigned idx)
  801. {
  802. cur = buffer + idx * recordSize;
  803. if (cur >= end)
  804. cur = NULL;
  805. return cur;
  806. }
  807. //---------------------------------------------------------------------------
  808. RtlVariableDatasetCursor::RtlVariableDatasetCursor(size32_t _len, const void * _data, IRecordSize & _recordSize) : RtlDatasetCursor(_len, _data)
  809. {
  810. recordSize = &_recordSize;
  811. }
  812. RtlVariableDatasetCursor::RtlVariableDatasetCursor() : RtlDatasetCursor(0, NULL)
  813. {
  814. recordSize = NULL;
  815. }
  816. void RtlVariableDatasetCursor::init(size32_t _len, const void * _data, IRecordSize & _recordSize)
  817. {
  818. recordSize = &_recordSize;
  819. setDataset(_len, _data);
  820. }
  821. size32_t RtlVariableDatasetCursor::count()
  822. {
  823. const byte * finger = buffer;
  824. unsigned c = 0;
  825. while (finger < end)
  826. {
  827. finger += recordSize->getRecordSize(finger);
  828. c++;
  829. }
  830. assertex(finger == end);
  831. return c;
  832. }
  833. size32_t RtlVariableDatasetCursor::getSize()
  834. {
  835. return recordSize->getRecordSize(cur);
  836. }
  837. const byte * RtlVariableDatasetCursor::next()
  838. {
  839. if (cur)
  840. {
  841. cur += recordSize->getRecordSize(cur);
  842. if (cur >= end)
  843. cur = NULL;
  844. }
  845. return cur;
  846. }
  847. const byte * RtlVariableDatasetCursor::select(unsigned idx)
  848. {
  849. const byte * finger = buffer;
  850. unsigned c = 0;
  851. while (finger < end)
  852. {
  853. if (c == idx)
  854. {
  855. cur = finger;
  856. return cur;
  857. }
  858. finger += recordSize->getRecordSize(finger);
  859. c++;
  860. }
  861. assertex(finger == end);
  862. cur = NULL;
  863. return NULL;
  864. }
  865. //---------------------------------------------------------------------------
  866. RtlLinkedDatasetCursor::RtlLinkedDatasetCursor(unsigned _numRows, byte * * _rows) : numRows(_numRows), rows(_rows)
  867. {
  868. cur = (unsigned)-1;
  869. }
  870. RtlLinkedDatasetCursor::RtlLinkedDatasetCursor()
  871. {
  872. numRows = 0;
  873. rows = NULL;
  874. cur = (unsigned)-1;
  875. }
  876. void RtlLinkedDatasetCursor::init(unsigned _numRows, byte * * _rows)
  877. {
  878. numRows = _numRows;
  879. rows = _rows;
  880. cur = (unsigned)-1;
  881. }
  882. const byte * RtlLinkedDatasetCursor::first()
  883. {
  884. cur = 0;
  885. return cur < numRows ? rows[cur] : NULL;
  886. }
  887. const byte * RtlLinkedDatasetCursor::get()
  888. {
  889. return cur < numRows ? rows[cur] : NULL;
  890. }
  891. bool RtlLinkedDatasetCursor::isValid()
  892. {
  893. return (cur < numRows);
  894. }
  895. const byte * RtlLinkedDatasetCursor::next()
  896. {
  897. if (cur < numRows)
  898. cur++;
  899. return cur < numRows ? rows[cur] : NULL;
  900. }
  901. const byte * RtlLinkedDatasetCursor::select(unsigned idx)
  902. {
  903. cur = idx;
  904. return cur < numRows ? rows[cur] : NULL;
  905. }
  906. //---------------------------------------------------------------------------
  907. bool rtlCheckInList(const void * lhs, IRtlDatasetCursor * cursor, ICompare * compare)
  908. {
  909. const byte * cur;
  910. for (cur = cursor->first(); cur; cur = cursor->next())
  911. {
  912. if (compare->docompare(lhs, cur) == 0)
  913. return true;
  914. }
  915. return false;
  916. }
  917. void rtlSetToSetX(bool & outIsAll, size32_t & outLen, void * & outData, bool inIsAll, size32_t inLen, void * inData)
  918. {
  919. outIsAll = inIsAll;
  920. outLen = inLen;
  921. outData = malloc(inLen);
  922. memcpy(outData, inData, inLen);
  923. }
  924. void rtlAppendSetX(bool & outIsAll, size32_t & outLen, void * & outData, bool leftIsAll, size32_t leftLen, void * leftData, bool rightIsAll, size32_t rightLen, void * rightData)
  925. {
  926. outIsAll = leftIsAll | rightIsAll;
  927. if (outIsAll)
  928. {
  929. outLen = 0;
  930. outData = NULL;
  931. }
  932. else
  933. {
  934. outLen = leftLen+rightLen;
  935. outData = malloc(outLen);
  936. memcpy(outData, leftData, leftLen);
  937. memcpy((byte*)outData+leftLen, rightData, rightLen);
  938. }
  939. }
  940. //------------------------------------------------------------------------------
  941. RtlCompoundIterator::RtlCompoundIterator()
  942. {
  943. ok = false;
  944. numLevels = 0;
  945. iters = NULL;
  946. cursors = NULL;
  947. }
  948. RtlCompoundIterator::~RtlCompoundIterator()
  949. {
  950. delete [] iters;
  951. delete [] cursors;
  952. }
  953. void RtlCompoundIterator::addIter(unsigned idx, IRtlDatasetSimpleCursor * iter, byte * * cursor)
  954. {
  955. assertex(idx < numLevels);
  956. iters[idx] = iter;
  957. cursors[idx] = cursor;
  958. }
  959. void RtlCompoundIterator::init(unsigned _numLevels)
  960. {
  961. numLevels = _numLevels;
  962. iters = new IRtlDatasetSimpleCursor * [numLevels];
  963. cursors = new byte * * [numLevels];
  964. }
  965. //Could either duplicate this function, N times, or have it as a helper function that accesses pre-defined virtuals.
  966. bool RtlCompoundIterator::first(unsigned level)
  967. {
  968. IRtlDatasetSimpleCursor * curIter = iters[level];
  969. if (level == 0)
  970. {
  971. const byte * cur = curIter->first();
  972. setCursor(level, cur);
  973. return (cur != NULL);
  974. }
  975. if (!first(level-1))
  976. return false;
  977. loop
  978. {
  979. const byte * cur = curIter->first();
  980. if (cur)
  981. {
  982. setCursor(level, cur);
  983. return true;
  984. }
  985. if (!next(level-1))
  986. return false;
  987. }
  988. }
  989. bool RtlCompoundIterator::next(unsigned level)
  990. {
  991. IRtlDatasetSimpleCursor * curIter = iters[level];
  992. const byte * cur = curIter->next();
  993. if (cur)
  994. {
  995. setCursor(level, cur);
  996. return true;
  997. }
  998. if (level == 0)
  999. return false;
  1000. loop
  1001. {
  1002. if (!next(level-1))
  1003. return false;
  1004. const byte * cur = curIter->first();
  1005. if (cur)
  1006. {
  1007. setCursor(level, cur);
  1008. return true;
  1009. }
  1010. }
  1011. }
  1012. //------------------------------------------------------------------------------
  1013. void RtlSimpleIterator::addIter(unsigned idx, IRtlDatasetSimpleCursor * _iter, byte * * _cursor)
  1014. {
  1015. assertex(idx == 0);
  1016. iter = _iter;
  1017. cursor = _cursor;
  1018. *cursor = NULL;
  1019. }
  1020. bool RtlSimpleIterator::first()
  1021. {
  1022. const byte * cur = iter->first();
  1023. *cursor = (byte *)cur;
  1024. return (cur != NULL);
  1025. }
  1026. bool RtlSimpleIterator::next()
  1027. {
  1028. const byte * cur = iter->next();
  1029. *cursor = (byte *)cur;
  1030. return (cur != NULL);
  1031. }