rtlds.cpp 42 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jliball.hpp"
  15. #include "eclrtl.hpp"
  16. #include "eclhelper.hpp"
  17. #include "rtlds_imp.hpp"
  18. #include "rtlread_imp.hpp"
  19. #define FIRST_CHUNK_SIZE 0x100
  20. #define DOUBLE_LIMIT 0x10000 // must be a power of 2
  21. unsigned getNextSize(unsigned max, unsigned required)
  22. {
  23. if (required > DOUBLE_LIMIT)
  24. {
  25. max = (required + DOUBLE_LIMIT) & ~(DOUBLE_LIMIT-1);
  26. if (required >= max)
  27. throw MakeStringException(-1, "getNextSize: Request for %d bytes oldMax = %d", required, max);
  28. }
  29. else
  30. {
  31. if (max == 0)
  32. max = FIRST_CHUNK_SIZE;
  33. while (required >= max)
  34. max += max;
  35. }
  36. return max;
  37. }
  38. //---------------------------------------------------------------------------
  39. RtlDatasetBuilder::RtlDatasetBuilder()
  40. {
  41. maxSize = 0;
  42. buffer = NULL;
  43. totalSize = 0;
  44. }
  45. RtlDatasetBuilder::~RtlDatasetBuilder()
  46. {
  47. free(buffer);
  48. }
  49. void RtlDatasetBuilder::ensure(size32_t required)
  50. {
  51. if (required > maxSize)
  52. {
  53. maxSize = getNextSize(maxSize, required);
  54. byte * newbuffer = (byte *)realloc(buffer, maxSize);
  55. if (!newbuffer)
  56. throw MakeStringException(-1, "Failed to allocate temporary dataset (requesting %d bytes)", maxSize);
  57. buffer = newbuffer;
  58. }
  59. self = buffer + totalSize;
  60. }
  61. byte * RtlDatasetBuilder::ensureCapacity(size32_t required, const char * fieldName)
  62. {
  63. ensure(totalSize + required);
  64. return self; // self is updated by ensure()
  65. }
  66. void RtlDatasetBuilder::flushDataset()
  67. {
  68. }
  69. void RtlDatasetBuilder::getData(size32_t & len, void * & data)
  70. {
  71. flushDataset();
  72. len = totalSize;
  73. data = malloc(totalSize);
  74. memcpy(data, buffer, totalSize);
  75. }
  76. size32_t RtlDatasetBuilder::getSize()
  77. {
  78. flushDataset();
  79. return totalSize;
  80. }
  81. byte * RtlDatasetBuilder::queryData()
  82. {
  83. flushDataset();
  84. return buffer;
  85. }
  86. void RtlDatasetBuilder::reportMissingRow() const
  87. {
  88. throw MakeStringException(MSGAUD_user, 1000, "RtlDatasetBuilder::row() is NULL");
  89. }
  90. //---------------------------------------------------------------------------
  91. RtlFixedDatasetBuilder::RtlFixedDatasetBuilder(unsigned _recordSize, unsigned maxRows)
  92. {
  93. recordSize = _recordSize;
  94. if ((int)maxRows > 0)
  95. ensure(recordSize * maxRows);
  96. }
  97. byte * RtlFixedDatasetBuilder::createSelf()
  98. {
  99. self = ensureCapacity(recordSize, NULL);
  100. return self;
  101. }
  102. //---------------------------------------------------------------------------
  103. RtlLimitedFixedDatasetBuilder::RtlLimitedFixedDatasetBuilder(unsigned _recordSize, unsigned _maxRows, DefaultRowCreator _rowCreator, IResourceContext *_ctx) : RtlFixedDatasetBuilder(_recordSize, _maxRows)
  104. {
  105. maxRows = _maxRows;
  106. if ((int)maxRows < 0) maxRows = 0;
  107. rowCreator = _rowCreator;
  108. ctx = _ctx;
  109. }
  110. byte * RtlLimitedFixedDatasetBuilder::createRow()
  111. {
  112. if (totalSize >= maxRows * recordSize)
  113. return NULL;
  114. return RtlFixedDatasetBuilder::createRow();
  115. }
  116. void RtlLimitedFixedDatasetBuilder::flushDataset()
  117. {
  118. if (rowCreator)
  119. {
  120. while (totalSize < maxRows * recordSize)
  121. {
  122. createRow();
  123. size32_t size = rowCreator(rowBuilder(), ctx);
  124. finalizeRow(size);
  125. }
  126. }
  127. RtlFixedDatasetBuilder::flushDataset();
  128. }
  129. //---------------------------------------------------------------------------
  130. RtlVariableDatasetBuilder::RtlVariableDatasetBuilder(IRecordSize & _recordSize)
  131. {
  132. recordSize = &_recordSize;
  133. maxRowSize = recordSize->getRecordSize(NULL); // initial size
  134. }
  135. byte * RtlVariableDatasetBuilder::createSelf()
  136. {
  137. self = ensureCapacity(maxRowSize, NULL);
  138. return self;
  139. }
  140. void RtlVariableDatasetBuilder::deserializeRow(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in)
  141. {
  142. createRow();
  143. size32_t rowSize = deserializer.deserialize(rowBuilder(), in);
  144. finalizeRow(rowSize);
  145. }
  146. //---------------------------------------------------------------------------
  147. RtlLimitedVariableDatasetBuilder::RtlLimitedVariableDatasetBuilder(IRecordSize & _recordSize, unsigned _maxRows, DefaultRowCreator _rowCreator, IResourceContext *_ctx) : RtlVariableDatasetBuilder(_recordSize)
  148. {
  149. numRows = 0;
  150. maxRows = _maxRows;
  151. rowCreator = _rowCreator;
  152. ctx = _ctx;
  153. }
  154. byte * RtlLimitedVariableDatasetBuilder::createRow()
  155. {
  156. if (numRows >= maxRows)
  157. return NULL;
  158. numRows++;
  159. return RtlVariableDatasetBuilder::createRow();
  160. }
  161. void RtlLimitedVariableDatasetBuilder::flushDataset()
  162. {
  163. if (rowCreator)
  164. {
  165. while (numRows < maxRows)
  166. {
  167. createRow();
  168. size32_t thisSize = rowCreator(rowBuilder(), ctx);
  169. finalizeRow(thisSize);
  170. }
  171. }
  172. RtlVariableDatasetBuilder::flushDataset();
  173. }
  174. //---------------------------------------------------------------------------
  175. byte * * rtlRowsAttr::linkrows() const
  176. {
  177. if (rows)
  178. rtlLinkRowset(rows);
  179. return rows;
  180. }
  181. void rtlRowsAttr::set(size32_t _count, byte * * _rows)
  182. {
  183. setown(_count, rtlLinkRowset(_rows));
  184. }
  185. void rtlRowsAttr::setRow(IEngineRowAllocator * rowAllocator, const byte * _row)
  186. {
  187. setown(1, rowAllocator->appendRowOwn(NULL, 1, rowAllocator->linkRow(_row)));
  188. }
  189. void rtlRowsAttr::setown(size32_t _count, byte * * _rows)
  190. {
  191. dispose();
  192. count = _count;
  193. rows = _rows;
  194. }
  195. void rtlRowsAttr::dispose()
  196. {
  197. if (rows)
  198. rtlReleaseRowset(count, rows);
  199. }
  200. //---------------------------------------------------------------------------
  201. void rtlReportFieldOverflow(unsigned size, unsigned max, const RtlFieldInfo * field)
  202. {
  203. if (field)
  204. rtlReportFieldOverflow(size, max, field->name->str());
  205. else
  206. rtlReportRowOverflow(size, max);
  207. }
  208. void RtlRowBuilderBase::reportMissingRow() const
  209. {
  210. throw MakeStringException(MSGAUD_user, 1000, "RtlRowBuilderBase::row() is NULL");
  211. }
  212. byte * RtlDynamicRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
  213. {
  214. if (required > maxLength)
  215. {
  216. if (!self)
  217. create();
  218. if (required > maxLength)
  219. {
  220. void * next = rowAllocator->resizeRow(required, self, maxLength);
  221. if (!next)
  222. {
  223. rtlReportFieldOverflow(required, maxLength, fieldName);
  224. return NULL;
  225. }
  226. self = static_cast<byte *>(next);
  227. }
  228. }
  229. return self;
  230. }
  231. void RtlDynamicRowBuilder::swapWith(RtlDynamicRowBuilder & other)
  232. {
  233. size32_t savedMaxLength = maxLength;
  234. void * savedSelf = getUnfinalizedClear();
  235. setown(other.getMaxLength(), other.getUnfinalizedClear());
  236. other.setown(savedMaxLength, savedSelf);
  237. }
  238. //---------------------------------------------------------------------------
  239. byte * RtlStaticRowBuilder::ensureCapacity(size32_t required, const char * fieldName)
  240. {
  241. if (required <= maxLength)
  242. return static_cast<byte *>(self);
  243. rtlReportFieldOverflow(required, maxLength, fieldName);
  244. return NULL;
  245. }
  246. byte * RtlStaticRowBuilder::createSelf()
  247. {
  248. throwUnexpected();
  249. }
  250. //---------------------------------------------------------------------------
  251. RtlLinkedDatasetBuilder::RtlLinkedDatasetBuilder(IEngineRowAllocator * _rowAllocator, int _choosenLimit) : builder(_rowAllocator, false)
  252. {
  253. rowAllocator = LINK(_rowAllocator);
  254. rowset = NULL;
  255. count = 0;
  256. max = 0;
  257. choosenLimit = (unsigned)_choosenLimit;
  258. }
  259. RtlLinkedDatasetBuilder::~RtlLinkedDatasetBuilder()
  260. {
  261. builder.clear();
  262. if (rowset)
  263. rowAllocator->releaseRowset(count, rowset);
  264. ::Release(rowAllocator);
  265. }
  266. void RtlLinkedDatasetBuilder::clear()
  267. {
  268. builder.clear();
  269. if (rowset)
  270. rowAllocator->releaseRowset(count, rowset);
  271. rowset = NULL;
  272. count = 0;
  273. max = 0;
  274. }
  275. void RtlLinkedDatasetBuilder::append(const void * source)
  276. {
  277. if (count < choosenLimit)
  278. {
  279. ensure(count+1);
  280. rowset[count] = source ? (byte *)rowAllocator->linkRow(source) : NULL;
  281. count++;
  282. }
  283. }
  284. void RtlLinkedDatasetBuilder::appendRows(size32_t num, byte * * rows)
  285. {
  286. if (num && (count < choosenLimit))
  287. {
  288. unsigned maxNumToAdd = (count + num < choosenLimit) ? num : choosenLimit - count;
  289. unsigned numAdded = 0;
  290. ensure(count+maxNumToAdd);
  291. for (unsigned i=0; i < num; i++)
  292. {
  293. byte *row = rows[i];
  294. if (row)
  295. {
  296. rowset[count+numAdded] = (byte *)rowAllocator->linkRow(row);
  297. numAdded++;
  298. if (numAdded == maxNumToAdd)
  299. break;
  300. }
  301. }
  302. count += numAdded;
  303. }
  304. }
  305. void RtlLinkedDatasetBuilder::appendOwn(const void * row)
  306. {
  307. assertex(!builder.exists());
  308. if (count < choosenLimit)
  309. {
  310. ensure(count+1);
  311. rowset[count] = (byte *)row;
  312. count++;
  313. }
  314. else
  315. rowAllocator->releaseRow(row);
  316. }
  317. byte * RtlLinkedDatasetBuilder::createRow()
  318. {
  319. if (count >= choosenLimit)
  320. return NULL;
  321. return builder.getSelf();
  322. }
  323. //cloned from thorcommon.cpp
  324. class RtlChildRowLinkerWalker : implements IIndirectMemberVisitor
  325. {
  326. public:
  327. virtual void visitRowset(size32_t count, byte * * rows)
  328. {
  329. rtlLinkRowset(rows);
  330. }
  331. virtual void visitRow(const byte * row)
  332. {
  333. rtlLinkRow(row);
  334. }
  335. };
  336. void RtlLinkedDatasetBuilder::cloneRow(size32_t len, const void * row)
  337. {
  338. if (count >= choosenLimit)
  339. return;
  340. byte * self = builder.ensureCapacity(len, NULL);
  341. memcpy(self, row, len);
  342. IOutputMetaData * meta = rowAllocator->queryOutputMeta();
  343. if (meta->getMetaFlags() & MDFneeddestruct)
  344. {
  345. RtlChildRowLinkerWalker walker;
  346. meta->walkIndirectMembers(self, walker);
  347. }
  348. finalizeRow(len);
  349. }
  350. void RtlLinkedDatasetBuilder::deserializeRow(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in)
  351. {
  352. builder.ensureRow();
  353. size32_t rowSize = deserializer.deserialize(builder, in);
  354. finalizeRow(rowSize);
  355. }
  356. void RtlLinkedDatasetBuilder::finalizeRows()
  357. {
  358. if (count != max)
  359. resize(count);
  360. }
  361. void RtlLinkedDatasetBuilder::finalizeRow(size32_t rowSize)
  362. {
  363. assertex(builder.exists());
  364. const void * next = builder.finalizeRowClear(rowSize);
  365. appendOwn(next);
  366. }
  367. byte * * RtlLinkedDatasetBuilder::linkrows()
  368. {
  369. finalizeRows();
  370. return rtlLinkRowset(rowset);
  371. }
  372. void RtlLinkedDatasetBuilder::expand(size32_t required)
  373. {
  374. assertex(required <= choosenLimit);
  375. //MORE: Next factoring change this so it passes this logic over to the row allocator
  376. size32_t newMax = max ? max : 4;
  377. while (newMax < required)
  378. {
  379. newMax += newMax;
  380. assertex(newMax);
  381. }
  382. if (newMax > choosenLimit)
  383. newMax = choosenLimit;
  384. resize(newMax);
  385. }
  386. void RtlLinkedDatasetBuilder::resize(size32_t required)
  387. {
  388. rowset = rowAllocator->reallocRows(rowset, max, required);
  389. max = required;
  390. }
  391. void appendRowsToRowset(size32_t & targetCount, byte * * & targetRowset, IEngineRowAllocator * rowAllocator, size32_t extraCount, byte * * extraRows)
  392. {
  393. if (extraCount)
  394. {
  395. size32_t prevCount = targetCount;
  396. byte * * expandedRowset = rowAllocator->reallocRows(targetRowset, prevCount, prevCount+extraCount);
  397. unsigned numAdded = 0;
  398. for (unsigned i=0; i < extraCount; i++)
  399. {
  400. byte *extraRow = extraRows[i];
  401. if (extraRow)
  402. {
  403. expandedRowset[prevCount+numAdded] = (byte *)rowAllocator->linkRow(extraRow);
  404. numAdded++;
  405. }
  406. }
  407. targetCount = prevCount + numAdded;
  408. targetRowset = expandedRowset;
  409. }
  410. }
  411. //---------------------------------------------------------------------------
  412. RtlLinkedDictionaryBuilder::RtlLinkedDictionaryBuilder(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo, unsigned _initialSize)
  413. : builder(_rowAllocator, false)
  414. {
  415. init(_rowAllocator, _hashInfo, _initialSize);
  416. }
  417. RtlLinkedDictionaryBuilder::RtlLinkedDictionaryBuilder(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo)
  418. : builder(_rowAllocator, false)
  419. {
  420. init(_rowAllocator, _hashInfo, 8);
  421. }
  422. void RtlLinkedDictionaryBuilder::init(IEngineRowAllocator * _rowAllocator, IHThorHashLookupInfo *_hashInfo, unsigned _initialSize)
  423. {
  424. hash = _hashInfo->queryHash();
  425. compare = _hashInfo->queryCompare();
  426. initialSize = _initialSize;
  427. rowAllocator = LINK(_rowAllocator);
  428. table = NULL;
  429. usedCount = 0;
  430. usedLimit = 0;
  431. tableSize = 0;
  432. }
  433. RtlLinkedDictionaryBuilder::~RtlLinkedDictionaryBuilder()
  434. {
  435. // builder.clear();
  436. if (table)
  437. rowAllocator->releaseRowset(tableSize, table);
  438. ::Release(rowAllocator);
  439. }
  440. void RtlLinkedDictionaryBuilder::append(const void * source)
  441. {
  442. if (source)
  443. {
  444. appendOwn(rowAllocator->linkRow(source));
  445. }
  446. }
  447. void RtlLinkedDictionaryBuilder::appendOwn(const void * source)
  448. {
  449. if (source)
  450. {
  451. checkSpace();
  452. unsigned rowidx = hash->hash(source) % tableSize;
  453. loop
  454. {
  455. const void *entry = table[rowidx];
  456. if (entry && compare->docompare(source, entry)==0)
  457. {
  458. rowAllocator->releaseRow(entry);
  459. usedCount--;
  460. entry = NULL;
  461. }
  462. if (!entry)
  463. {
  464. table[rowidx] = (byte *) source;
  465. usedCount++;
  466. break;
  467. }
  468. rowidx++;
  469. if (rowidx==tableSize)
  470. rowidx = 0;
  471. }
  472. }
  473. }
  474. void RtlLinkedDictionaryBuilder::checkSpace()
  475. {
  476. if (!table)
  477. {
  478. table = rowAllocator->createRowset(initialSize);
  479. tableSize = initialSize;
  480. memset(table, 0, tableSize*sizeof(byte *));
  481. usedLimit = (tableSize * 3) / 4;
  482. usedCount = 0;
  483. }
  484. else if (usedCount > usedLimit)
  485. {
  486. // Rehash
  487. byte * * oldTable = table;
  488. unsigned oldSize = tableSize;
  489. table = rowAllocator->createRowset(tableSize*2);
  490. tableSize = tableSize*2; // Don't update until we have successfully allocated, so that we remain consistent if createRowset throws an exception.
  491. memset(table, 0, tableSize * sizeof(byte *));
  492. usedLimit = (tableSize * 3) / 4;
  493. usedCount = 0;
  494. unsigned i;
  495. for (i = 0; i < oldSize; i++)
  496. {
  497. append(oldTable[i]); // we link the rows here...
  498. }
  499. rowAllocator->releaseRowset(oldSize, oldTable); // ... because this will release them
  500. }
  501. }
  502. void RtlLinkedDictionaryBuilder::deserializeRow(IOutputRowDeserializer & deserializer, IRowDeserializerSource & in)
  503. {
  504. builder.ensureRow();
  505. size32_t rowSize = deserializer.deserialize(builder, in);
  506. finalizeRow(rowSize);
  507. }
  508. void RtlLinkedDictionaryBuilder::appendRows(size32_t num, byte * * rows)
  509. {
  510. // MORE - if we know that the source is already a hashtable, we can optimize the add to an empty table...
  511. for (unsigned i=0; i < num; i++)
  512. append(rows[i]);
  513. }
  514. void RtlLinkedDictionaryBuilder::finalizeRow(size32_t rowSize)
  515. {
  516. assertex(builder.exists());
  517. const void * next = builder.finalizeRowClear(rowSize);
  518. appendOwn(next);
  519. }
  520. void RtlLinkedDictionaryBuilder::cloneRow(size32_t len, const void * row)
  521. {
  522. byte * self = builder.ensureCapacity(len, NULL);
  523. memcpy(self, row, len);
  524. IOutputMetaData * meta = rowAllocator->queryOutputMeta();
  525. if (meta->getMetaFlags() & MDFneeddestruct)
  526. {
  527. RtlChildRowLinkerWalker walker;
  528. meta->walkIndirectMembers(self, walker);
  529. }
  530. finalizeRow(len);
  531. }
  532. extern ECLRTL_API unsigned __int64 rtlDictionaryCount(size32_t tableSize, byte **table)
  533. {
  534. unsigned __int64 ret = 0;
  535. for (size32_t i = 0; i < tableSize; i++)
  536. if (table[i])
  537. ret++;
  538. return ret;
  539. }
  540. extern ECLRTL_API byte *rtlDictionaryLookup(IHThorHashLookupInfo &hashInfo, size32_t tableSize, byte **table, const byte *source, byte *defaultRow)
  541. {
  542. if (!tableSize)
  543. return (byte *) rtlLinkRow(defaultRow);
  544. IHash *hash = hashInfo.queryHash();
  545. ICompare *compare = hashInfo.queryCompare();
  546. unsigned rowidx = hash->hash(source) % tableSize;
  547. loop
  548. {
  549. const void *entry = table[rowidx];
  550. if (!entry)
  551. return (byte *) rtlLinkRow(defaultRow);
  552. if (compare->docompare(source, entry)==0)
  553. return (byte *) rtlLinkRow(entry);
  554. rowidx++;
  555. if (rowidx==tableSize)
  556. rowidx = 0;
  557. }
  558. }
  559. extern ECLRTL_API bool rtlDictionaryLookupExists(IHThorHashLookupInfo &hashInfo, size32_t tableSize, byte **table, const byte *source)
  560. {
  561. if (!tableSize)
  562. return false;
  563. IHash *hash = hashInfo.queryHash();
  564. ICompare *compare = hashInfo.queryCompare();
  565. unsigned rowidx = hash->hash(source) % tableSize;
  566. loop
  567. {
  568. const void *entry = table[rowidx];
  569. if (!entry)
  570. return false;
  571. if (compare->docompare(source, entry)==0)
  572. return true;
  573. rowidx++;
  574. if (rowidx==tableSize)
  575. rowidx = 0;
  576. }
  577. }
  578. //---------------------------------------------------------------------------------------------------------------------
  579. // Serialization helper classes
  580. //These definitions should be shared with thorcommon, but to do that
  581. //they would need to be moved to an rtlds.ipp header, which thorcommon then included.
  582. class ECLRTL_API CMemoryBufferSerializeTarget : implements IRowSerializerTarget
  583. {
  584. public:
  585. CMemoryBufferSerializeTarget(MemoryBuffer & _buffer) : buffer(_buffer)
  586. {
  587. }
  588. virtual void put(size32_t len, const void * ptr)
  589. {
  590. buffer.append(len, ptr);
  591. }
  592. virtual size32_t beginNested()
  593. {
  594. unsigned pos = buffer.length();
  595. buffer.append((size32_t)0);
  596. return pos;
  597. }
  598. virtual void endNested(size32_t sizePos)
  599. {
  600. unsigned pos = buffer.length();
  601. buffer.rewrite(sizePos);
  602. buffer.append((size32_t)(pos - (sizePos + sizeof(size32_t))));
  603. buffer.rewrite(pos);
  604. }
  605. protected:
  606. MemoryBuffer & buffer;
  607. };
  608. class ECLRTL_API CRowBuilderSerializeTarget : implements IRowSerializerTarget
  609. {
  610. public:
  611. CRowBuilderSerializeTarget(ARowBuilder & _builder) : builder(_builder)
  612. {
  613. offset = 0;
  614. }
  615. virtual void put(size32_t len, const void * ptr)
  616. {
  617. byte * data = builder.ensureCapacity(offset + len, "");
  618. memcpy(data+offset, ptr, len);
  619. offset += len;
  620. }
  621. virtual size32_t beginNested()
  622. {
  623. unsigned pos = offset;
  624. offset += sizeof(size32_t);
  625. return pos;
  626. }
  627. virtual void endNested(size32_t sizePos)
  628. {
  629. byte * self = builder.getSelf();
  630. *(size32_t *)(self + sizePos) = offset - (sizePos + sizeof(size32_t));
  631. }
  632. inline size32_t length() const { return offset; }
  633. protected:
  634. ARowBuilder & builder;
  635. size32_t offset;
  636. };
  637. //---------------------------------------------------------------------------------------------------------------------
  638. // internal serialization helper functions
  639. inline void doDeserializeRowset(RtlLinkedDatasetBuilder & builder, IOutputRowDeserializer & deserializer, IRowDeserializerSource & in, offset_t marker, bool isGrouped)
  640. {
  641. byte eogPending = false;
  642. while (!in.finishedNested(marker))
  643. {
  644. if (isGrouped && eogPending)
  645. builder.appendEOG();
  646. builder.deserializeRow(deserializer, in);
  647. if (isGrouped)
  648. in.read(1, &eogPending);
  649. }
  650. }
  651. inline void doSerializeRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
  652. {
  653. for (unsigned i=0; i < count; i++)
  654. {
  655. byte *row = rows[i];
  656. if (row)
  657. {
  658. serializer->serialize(out, rows[i]);
  659. if (isGrouped)
  660. {
  661. byte eogPending = (i+1 < count) && (rows[i+1] == NULL);
  662. out.put(1, &eogPending);
  663. }
  664. }
  665. else
  666. {
  667. assert(isGrouped); // should not be seeing NULLs otherwise - should not use this function for DICTIONARY
  668. }
  669. }
  670. }
  671. inline void doSerializeRowsetStripNulls(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  672. {
  673. for (unsigned i=0; i < count; i++)
  674. {
  675. byte *row = rows[i];
  676. if (row)
  677. serializer->serialize(out, rows[i]);
  678. }
  679. }
  680. inline void doDeserializeRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, offset_t marker, IRowDeserializerSource & in, bool isGrouped)
  681. {
  682. RtlLinkedDatasetBuilder builder(_rowAllocator);
  683. doDeserializeRowset(builder, *deserializer, in, marker, isGrouped);
  684. count = builder.getcount();
  685. rowset = builder.linkrows();
  686. }
  687. inline void doDeserializeChildRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in, bool isGrouped)
  688. {
  689. offset_t marker = in.beginNested();
  690. doDeserializeRowset(count, rowset, _rowAllocator, deserializer, marker, in, isGrouped);
  691. }
  692. //--------------------------------------------------------------------------------------------------------------------
  693. //Serialize/deserialize functions call for child datasets in the row serializer
  694. extern ECLRTL_API void rtlDeserializeChildRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in)
  695. {
  696. doDeserializeChildRowset(count, rowset, _rowAllocator, deserializer, in, false);
  697. }
  698. extern ECLRTL_API void rtlDeserializeChildGroupedRowset(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in)
  699. {
  700. doDeserializeChildRowset(count, rowset, _rowAllocator, deserializer, in, true);
  701. }
  702. void rtlSerializeChildRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  703. {
  704. size32_t marker = out.beginNested();
  705. doSerializeRowset(out, serializer, count, rows, false);
  706. out.endNested(marker);
  707. }
  708. void rtlSerializeChildGroupedRowset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  709. {
  710. size32_t marker = out.beginNested();
  711. doSerializeRowset(out, serializer, count, rows, true);
  712. out.endNested(marker);
  713. }
  714. //--------------------------------------------------------------------------------------------------------------------
  715. //Serialize/deserialize functions used to serialize data from the master to the slave (defined in eclrtl.hpp) to/from a MemoryBuffer
  716. extern void deserializeRowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer &in)
  717. {
  718. Owned<ISerialStream> stream = createMemoryBufferSerialStream(in);
  719. CThorStreamDeserializerSource rowSource(stream);
  720. doDeserializeChildRowset(count, rowset, _rowAllocator, deserializer, rowSource, false);
  721. }
  722. extern void deserializeGroupedRowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer &in)
  723. {
  724. Owned<ISerialStream> stream = createMemoryBufferSerialStream(in);
  725. CThorStreamDeserializerSource rowSource(stream);
  726. doDeserializeChildRowset(count, rowset, _rowAllocator, deserializer, rowSource, true);
  727. }
  728. extern void serializeRowsetX(size32_t count, byte * * rows, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
  729. {
  730. CMemoryBufferSerializeTarget out(buffer);
  731. rtlSerializeChildRowset(out, serializer, count, rows);
  732. }
  733. extern void serializeGroupedRowsetX(size32_t count, byte * * rows, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
  734. {
  735. CMemoryBufferSerializeTarget out(buffer);
  736. rtlSerializeChildGroupedRowset(out, serializer, count, rows);
  737. }
  738. //--------------------------------------------------------------------------------------------------------------------
  739. // Functions for converting between different representations - where the source/target are complete datasets
  740. inline void doDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src, bool isGrouped)
  741. {
  742. Owned<ISerialStream> stream = createMemorySerialStream(src, lenSrc);
  743. CThorStreamDeserializerSource source(stream);
  744. doDeserializeRowset(count, rowset, rowAllocator, deserializer, lenSrc, source, isGrouped);
  745. }
  746. extern ECLRTL_API void rtlDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src, bool isGrouped)
  747. {
  748. doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, isGrouped);
  749. }
  750. extern ECLRTL_API void rtlDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
  751. {
  752. doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, false);
  753. }
  754. extern ECLRTL_API void rtlGroupedDataset2RowsetX(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
  755. {
  756. doDataset2RowsetX(count, rowset, rowAllocator, deserializer, lenSrc, src, true);
  757. }
  758. inline void doRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
  759. {
  760. MemoryBuffer buffer;
  761. CMemoryBufferSerializeTarget out(buffer);
  762. doSerializeRowset(out, serializer, count, rows, isGrouped);
  763. rtlFree(tgt);
  764. tlen = buffer.length();
  765. tgt = buffer.detach(); // not strictly speaking correct - it should have been allocated with rtlMalloc();
  766. }
  767. extern ECLRTL_API void rtlRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows, bool isGrouped)
  768. {
  769. doRowset2DatasetX(tlen, tgt, serializer, count, rows, isGrouped);
  770. }
  771. extern ECLRTL_API void rtlRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  772. {
  773. doRowset2DatasetX(tlen, tgt, serializer, count, rows, false);
  774. }
  775. extern ECLRTL_API void rtlGroupedRowset2DatasetX(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  776. {
  777. doRowset2DatasetX(tlen, tgt, serializer, count, rows, true);
  778. }
  779. //--------------------------------------------------------------------------------------------------------------------
  780. // Serialize/deserialize rows to a memory buffer
  781. void serializeRow(const void * row, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
  782. {
  783. CMemoryBufferSerializeTarget out(buffer);
  784. serializer->serialize(out, static_cast<const byte *>(row));
  785. }
  786. extern ECLRTL_API byte * rtlDeserializeBufferRow(IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer & buffer)
  787. {
  788. Owned<ISerialStream> stream = createMemoryBufferSerialStream(buffer);
  789. CThorStreamDeserializerSource source(stream);
  790. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  791. size32_t rowSize = deserializer->deserialize(rowBuilder, source);
  792. return static_cast<byte *>(const_cast<void *>(rowBuilder.finalizeRowClear(rowSize)));
  793. }
  794. //--------------------------------------------------------------------------------------------------------------------
  795. // serialize/deserialize a row to a builder or another row
  796. extern ECLRTL_API byte * rtlDeserializeRow(IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, const void * src)
  797. {
  798. const size32_t unknownSourceLength = 0x7fffffff;
  799. Owned<ISerialStream> stream = createMemorySerialStream(src, unknownSourceLength);
  800. CThorStreamDeserializerSource source(stream);
  801. RtlDynamicRowBuilder rowBuilder(rowAllocator);
  802. size32_t rowSize = deserializer->deserialize(rowBuilder, source);
  803. return static_cast<byte *>(const_cast<void *>(rowBuilder.finalizeRowClear(rowSize)));
  804. }
  805. extern ECLRTL_API size32_t rtlDeserializeToBuilder(ARowBuilder & builder, IOutputRowDeserializer * deserializer, const void * src)
  806. {
  807. const size32_t unknownSourceLength = 0x7fffffff;
  808. Owned<ISerialStream> stream = createMemorySerialStream(src, unknownSourceLength);
  809. CThorStreamDeserializerSource source(stream);
  810. return deserializer->deserialize(builder, source);
  811. }
  812. extern ECLRTL_API size32_t rtlSerializeToBuilder(ARowBuilder & builder, IOutputRowSerializer * serializer, const void * src)
  813. {
  814. CRowBuilderSerializeTarget target(builder);
  815. serializer->serialize(target, (const byte *)src);
  816. return target.length();
  817. }
  818. //--------------------------------------------------------------------------------------------------------------------
  819. static void doSerializeDictionary(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  820. {
  821. out.put(sizeof(count), &count);
  822. size32_t idx = 0;
  823. while (idx < count)
  824. {
  825. byte numRows = 0;
  826. while (numRows < 255 && idx+numRows < count && rows[idx+numRows] != NULL)
  827. numRows++;
  828. out.put(1, &numRows);
  829. for (int i = 0; i < numRows; i++)
  830. {
  831. byte *nextrec = rows[idx+i];
  832. assert(nextrec);
  833. serializer->serialize(out, nextrec);
  834. }
  835. idx += numRows;
  836. byte numNulls = 0;
  837. while (numNulls < 255 && idx+numNulls < count && rows[idx+numNulls] == NULL)
  838. numNulls++;
  839. out.put(1, &numNulls);
  840. idx += numNulls;
  841. }
  842. }
  843. static void doDeserializeDictionary(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, offset_t marker, IRowDeserializerSource & in)
  844. {
  845. RtlLinkedDatasetBuilder builder(rowAllocator);
  846. size32_t totalRows;
  847. in.read(sizeof(totalRows), &totalRows);
  848. builder.ensure(totalRows);
  849. byte nullsPending = 0;
  850. byte rowsPending = 0;
  851. while (!in.finishedNested(marker))
  852. {
  853. in.read(1, &rowsPending);
  854. for (int i = 0; i < rowsPending; i++)
  855. builder.deserializeRow(*deserializer, in);
  856. in.read(1, &nullsPending);
  857. for (int i = 0; i < nullsPending; i++)
  858. builder.appendEOG();
  859. }
  860. count = builder.getcount();
  861. assertex(count==totalRows);
  862. rowset = builder.linkrows();
  863. }
  864. static void doDeserializeDictionaryFromDataset(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, IHThorHashLookupInfo & hashInfo, offset_t marker, IRowDeserializerSource & in)
  865. {
  866. RtlLinkedDictionaryBuilder builder(rowAllocator, &hashInfo);
  867. while (!in.finishedNested(marker))
  868. builder.deserializeRow(*deserializer, in);
  869. count = builder.getcount();
  870. rowset = builder.linkrows();
  871. }
  872. extern ECLRTL_API void rtlSerializeDictionary(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  873. {
  874. doSerializeDictionary(out, serializer, count, rows);
  875. }
  876. extern ECLRTL_API void rtlSerializeDictionaryToDataset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  877. {
  878. doSerializeRowsetStripNulls(out, serializer, count, rows);
  879. }
  880. extern ECLRTL_API void rtlDeserializeChildDictionary(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, IRowDeserializerSource & in)
  881. {
  882. offset_t marker = in.beginNested(); // MORE: Would this be better as a count?
  883. doDeserializeDictionary(count, rowset, rowAllocator, deserializer, marker, in);
  884. }
  885. extern ECLRTL_API void rtlDeserializeChildDictionaryFromDataset(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, IHThorHashLookupInfo & hashInfo, IRowDeserializerSource & in)
  886. {
  887. offset_t marker = in.beginNested(); // MORE: Would this be better as a count?
  888. doDeserializeDictionaryFromDataset(count, rowset, rowAllocator, deserializer, hashInfo, marker, in);
  889. }
  890. extern ECLRTL_API void rtlSerializeChildDictionary(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  891. {
  892. size32_t marker = out.beginNested();
  893. doSerializeDictionary(out, serializer, count, rows);
  894. out.endNested(marker);
  895. }
  896. extern ECLRTL_API void rtlSerializeChildDictionaryToDataset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  897. {
  898. size32_t marker = out.beginNested();
  899. doSerializeRowsetStripNulls(out, serializer, count, rows);
  900. out.endNested(marker);
  901. }
  902. extern void deserializeDictionaryX(size32_t & count, byte * * & rowset, IEngineRowAllocator * _rowAllocator, IOutputRowDeserializer * deserializer, MemoryBuffer &in)
  903. {
  904. Owned<ISerialStream> stream = createMemoryBufferSerialStream(in);
  905. CThorStreamDeserializerSource rowSource(stream);
  906. rtlDeserializeChildDictionary(count, rowset, _rowAllocator, deserializer, rowSource);
  907. }
  908. extern void serializeDictionaryX(size32_t count, byte * * rows, IOutputRowSerializer * serializer, MemoryBuffer & buffer)
  909. {
  910. CMemoryBufferSerializeTarget out(buffer);
  911. rtlSerializeChildDictionary(out, serializer, count, rows);
  912. }
  913. extern ECLRTL_API void rtlDeserializeDictionary(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, size32_t lenSrc, const void * src)
  914. {
  915. Owned<ISerialStream> stream = createMemorySerialStream(src, lenSrc);
  916. CThorStreamDeserializerSource in(stream);
  917. doDeserializeDictionary(count, rowset, rowAllocator, deserializer, lenSrc, in);
  918. }
  919. extern ECLRTL_API void rtlDeserializeDictionaryFromDataset(size32_t & count, byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, IHThorHashLookupInfo & hashInfo, size32_t lenSrc, const void * src)
  920. {
  921. Owned<ISerialStream> stream = createMemorySerialStream(src, lenSrc);
  922. CThorStreamDeserializerSource in(stream);
  923. doDeserializeDictionaryFromDataset(count, rowset, rowAllocator, deserializer, hashInfo, lenSrc, in);
  924. }
  925. extern ECLRTL_API void rtlSerializeDictionary(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  926. {
  927. MemoryBuffer buffer;
  928. CMemoryBufferSerializeTarget out(buffer);
  929. doSerializeDictionary(out, serializer, count, rows);
  930. rtlFree(tgt);
  931. tlen = buffer.length();
  932. tgt = buffer.detach(); // not strictly speaking correct - it should have been allocated with rtlMalloc();
  933. }
  934. extern ECLRTL_API void rtlSerializeDictionaryToDataset(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, byte * * rows)
  935. {
  936. MemoryBuffer buffer;
  937. CMemoryBufferSerializeTarget out(buffer);
  938. doSerializeRowsetStripNulls(out, serializer, count, rows);
  939. rtlFree(tgt);
  940. tlen = buffer.length();
  941. tgt = buffer.detach(); // not strictly speaking correct - it should have been allocated with rtlMalloc();
  942. }
  943. //---------------------------------------------------------------------------
  944. RtlDatasetCursor::RtlDatasetCursor(size32_t _len, const void * _data)
  945. {
  946. setDataset(_len, _data);
  947. }
  948. bool RtlDatasetCursor::exists()
  949. {
  950. return (end != buffer);
  951. }
  952. const byte * RtlDatasetCursor::first()
  953. {
  954. if (buffer != end)
  955. cur = buffer;
  956. return cur;
  957. }
  958. const byte * RtlDatasetCursor::get()
  959. {
  960. return cur;
  961. }
  962. void RtlDatasetCursor::setDataset(size32_t _len, const void * _data)
  963. {
  964. buffer = (const byte *)_data;
  965. end = buffer + _len;
  966. cur = NULL;
  967. }
  968. bool RtlDatasetCursor::isValid()
  969. {
  970. return (cur != NULL);
  971. }
  972. /*
  973. const byte * RtlDatasetCursor::next()
  974. {
  975. if (cur)
  976. {
  977. cur += getRowSize();
  978. if (cur >= end)
  979. cur = NULL;
  980. }
  981. return cur;
  982. }
  983. */
  984. //---------------------------------------------------------------------------
  985. RtlFixedDatasetCursor::RtlFixedDatasetCursor(size32_t _len, const void * _data, unsigned _recordSize) : RtlDatasetCursor(_len, _data)
  986. {
  987. recordSize = _recordSize;
  988. }
  989. RtlFixedDatasetCursor::RtlFixedDatasetCursor() : RtlDatasetCursor(0, NULL)
  990. {
  991. recordSize = 1;
  992. }
  993. size32_t RtlFixedDatasetCursor::count()
  994. {
  995. return (size32_t)((end - buffer) / recordSize);
  996. }
  997. size32_t RtlFixedDatasetCursor::getSize()
  998. {
  999. return recordSize;
  1000. }
  1001. void RtlFixedDatasetCursor::init(size32_t _len, const void * _data, unsigned _recordSize)
  1002. {
  1003. recordSize = _recordSize;
  1004. setDataset(_len, _data);
  1005. }
  1006. const byte * RtlFixedDatasetCursor::next()
  1007. {
  1008. if (cur)
  1009. {
  1010. cur += recordSize;
  1011. if (cur >= end)
  1012. cur = NULL;
  1013. }
  1014. return cur;
  1015. }
  1016. const byte * RtlFixedDatasetCursor::select(unsigned idx)
  1017. {
  1018. cur = buffer + idx * recordSize;
  1019. if (cur >= end)
  1020. cur = NULL;
  1021. return cur;
  1022. }
  1023. //---------------------------------------------------------------------------
  1024. RtlVariableDatasetCursor::RtlVariableDatasetCursor(size32_t _len, const void * _data, IRecordSize & _recordSize) : RtlDatasetCursor(_len, _data)
  1025. {
  1026. recordSize = &_recordSize;
  1027. }
  1028. RtlVariableDatasetCursor::RtlVariableDatasetCursor() : RtlDatasetCursor(0, NULL)
  1029. {
  1030. recordSize = NULL;
  1031. }
  1032. void RtlVariableDatasetCursor::init(size32_t _len, const void * _data, IRecordSize & _recordSize)
  1033. {
  1034. recordSize = &_recordSize;
  1035. setDataset(_len, _data);
  1036. }
  1037. size32_t RtlVariableDatasetCursor::count()
  1038. {
  1039. const byte * finger = buffer;
  1040. unsigned c = 0;
  1041. while (finger < end)
  1042. {
  1043. finger += recordSize->getRecordSize(finger);
  1044. c++;
  1045. }
  1046. assertex(finger == end);
  1047. return c;
  1048. }
  1049. size32_t RtlVariableDatasetCursor::getSize()
  1050. {
  1051. return recordSize->getRecordSize(cur);
  1052. }
  1053. const byte * RtlVariableDatasetCursor::next()
  1054. {
  1055. if (cur)
  1056. {
  1057. cur += recordSize->getRecordSize(cur);
  1058. if (cur >= end)
  1059. cur = NULL;
  1060. }
  1061. return cur;
  1062. }
  1063. const byte * RtlVariableDatasetCursor::select(unsigned idx)
  1064. {
  1065. const byte * finger = buffer;
  1066. unsigned c = 0;
  1067. while (finger < end)
  1068. {
  1069. if (c == idx)
  1070. {
  1071. cur = finger;
  1072. return cur;
  1073. }
  1074. finger += recordSize->getRecordSize(finger);
  1075. c++;
  1076. }
  1077. assertex(finger == end);
  1078. cur = NULL;
  1079. return NULL;
  1080. }
  1081. //---------------------------------------------------------------------------
  1082. RtlLinkedDatasetCursor::RtlLinkedDatasetCursor(unsigned _numRows, byte * * _rows) : numRows(_numRows), rows(_rows)
  1083. {
  1084. cur = (unsigned)-1;
  1085. }
  1086. RtlLinkedDatasetCursor::RtlLinkedDatasetCursor()
  1087. {
  1088. numRows = 0;
  1089. rows = NULL;
  1090. cur = (unsigned)-1;
  1091. }
  1092. void RtlLinkedDatasetCursor::init(unsigned _numRows, byte * * _rows)
  1093. {
  1094. numRows = _numRows;
  1095. rows = _rows;
  1096. cur = (unsigned)-1;
  1097. }
  1098. const byte * RtlLinkedDatasetCursor::first()
  1099. {
  1100. cur = 0;
  1101. return cur < numRows ? rows[cur] : NULL;
  1102. }
  1103. const byte * RtlLinkedDatasetCursor::get()
  1104. {
  1105. return cur < numRows ? rows[cur] : NULL;
  1106. }
  1107. bool RtlLinkedDatasetCursor::isValid()
  1108. {
  1109. return (cur < numRows);
  1110. }
  1111. const byte * RtlLinkedDatasetCursor::next()
  1112. {
  1113. if (cur < numRows)
  1114. cur++;
  1115. return cur < numRows ? rows[cur] : NULL;
  1116. }
  1117. const byte * RtlLinkedDatasetCursor::select(unsigned idx)
  1118. {
  1119. cur = idx;
  1120. return cur < numRows ? rows[cur] : NULL;
  1121. }
  1122. //---------------------------------------------------------------------------
  1123. bool rtlCheckInList(const void * lhs, IRtlDatasetCursor * cursor, ICompare * compare)
  1124. {
  1125. const byte * cur;
  1126. for (cur = cursor->first(); cur; cur = cursor->next())
  1127. {
  1128. if (compare->docompare(lhs, cur) == 0)
  1129. return true;
  1130. }
  1131. return false;
  1132. }
  1133. void rtlSetToSetX(bool & outIsAll, size32_t & outLen, void * & outData, bool inIsAll, size32_t inLen, void * inData)
  1134. {
  1135. outIsAll = inIsAll;
  1136. outLen = inLen;
  1137. outData = malloc(inLen);
  1138. memcpy(outData, inData, inLen);
  1139. }
  1140. void rtlAppendSetX(bool & outIsAll, size32_t & outLen, void * & outData, bool leftIsAll, size32_t leftLen, void * leftData, bool rightIsAll, size32_t rightLen, void * rightData)
  1141. {
  1142. outIsAll = leftIsAll | rightIsAll;
  1143. if (outIsAll)
  1144. {
  1145. outLen = 0;
  1146. outData = NULL;
  1147. }
  1148. else
  1149. {
  1150. outLen = leftLen+rightLen;
  1151. outData = malloc(outLen);
  1152. memcpy(outData, leftData, leftLen);
  1153. memcpy((byte*)outData+leftLen, rightData, rightLen);
  1154. }
  1155. }
  1156. //------------------------------------------------------------------------------
  1157. RtlCompoundIterator::RtlCompoundIterator()
  1158. {
  1159. ok = false;
  1160. numLevels = 0;
  1161. iters = NULL;
  1162. cursors = NULL;
  1163. }
  1164. RtlCompoundIterator::~RtlCompoundIterator()
  1165. {
  1166. delete [] iters;
  1167. delete [] cursors;
  1168. }
  1169. void RtlCompoundIterator::addIter(unsigned idx, IRtlDatasetSimpleCursor * iter, byte * * cursor)
  1170. {
  1171. assertex(idx < numLevels);
  1172. iters[idx] = iter;
  1173. cursors[idx] = cursor;
  1174. }
  1175. void RtlCompoundIterator::init(unsigned _numLevels)
  1176. {
  1177. numLevels = _numLevels;
  1178. iters = new IRtlDatasetSimpleCursor * [numLevels];
  1179. cursors = new byte * * [numLevels];
  1180. }
  1181. //Could either duplicate this function, N times, or have it as a helper function that accesses pre-defined virtuals.
  1182. bool RtlCompoundIterator::first(unsigned level)
  1183. {
  1184. IRtlDatasetSimpleCursor * curIter = iters[level];
  1185. if (level == 0)
  1186. {
  1187. const byte * cur = curIter->first();
  1188. setCursor(level, cur);
  1189. return (cur != NULL);
  1190. }
  1191. if (!first(level-1))
  1192. return false;
  1193. loop
  1194. {
  1195. const byte * cur = curIter->first();
  1196. if (cur)
  1197. {
  1198. setCursor(level, cur);
  1199. return true;
  1200. }
  1201. if (!next(level-1))
  1202. return false;
  1203. }
  1204. }
  1205. bool RtlCompoundIterator::next(unsigned level)
  1206. {
  1207. IRtlDatasetSimpleCursor * curIter = iters[level];
  1208. const byte * cur = curIter->next();
  1209. if (cur)
  1210. {
  1211. setCursor(level, cur);
  1212. return true;
  1213. }
  1214. if (level == 0)
  1215. return false;
  1216. loop
  1217. {
  1218. if (!next(level-1))
  1219. return false;
  1220. const byte * cur = curIter->first();
  1221. if (cur)
  1222. {
  1223. setCursor(level, cur);
  1224. return true;
  1225. }
  1226. }
  1227. }
  1228. //------------------------------------------------------------------------------
  1229. void RtlSimpleIterator::addIter(unsigned idx, IRtlDatasetSimpleCursor * _iter, byte * * _cursor)
  1230. {
  1231. assertex(idx == 0);
  1232. iter = _iter;
  1233. cursor = _cursor;
  1234. *cursor = NULL;
  1235. }
  1236. bool RtlSimpleIterator::first()
  1237. {
  1238. const byte * cur = iter->first();
  1239. *cursor = (byte *)cur;
  1240. return (cur != NULL);
  1241. }
  1242. bool RtlSimpleIterator::next()
  1243. {
  1244. const byte * cur = iter->next();
  1245. *cursor = (byte *)cur;
  1246. return (cur != NULL);
  1247. }