fvsource.cpp 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "platform.h"
  15. #include "jliball.hpp"
  16. #include "eclrtl.hpp"
  17. #include "rtlds_imp.hpp"
  18. #include "fvresultset.ipp"
  19. #include "fileview.hpp"
  20. #include "fvsource.ipp"
  21. #include "hqlerror.hpp"
  22. #include "eclhelper.hpp"
  23. #include "hqlattr.hpp"
  24. #include "hqlutil.hpp"
  25. #include "fvdatasource.hpp"
  26. #define MAX_RECORD_SIZE 4096
  27. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, const char * _name, const char * _xpath, ITypeInfo * _type)
  28. {
  29. flags = _flags;
  30. name.set(_name);
  31. type.set(_type);
  32. xpath.set(_xpath);
  33. }
  34. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, MemoryBuffer & in)
  35. {
  36. flags = _flags;
  37. in.read(name);
  38. in.read(xpath);
  39. type.setown(deserializeType(in));
  40. }
  41. void DataSourceMetaItem::serialize(MemoryBuffer & out) const
  42. {
  43. out.append(flags);
  44. out.append(name);
  45. out.append(xpath);
  46. type->serialize(out);
  47. }
  48. //---------------------------------------------------------------------------
  49. DataSourceDatasetItem::DataSourceDatasetItem(const char * _name, const char * _xpath, IHqlExpression * expr) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(expr->queryRecord(), 0, true, false, false)
  50. {
  51. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  52. name.set(_name);
  53. xpath.set(_xpath);
  54. }
  55. DataSourceDatasetItem::DataSourceDatasetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(in)
  56. {
  57. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  58. in.read(name);
  59. }
  60. void DataSourceDatasetItem::serialize(MemoryBuffer & out) const
  61. {
  62. out.append(flags);
  63. record.serialize(out);
  64. out.append(name);
  65. }
  66. //---------------------------------------------------------------------------
  67. DataSourceSetItem::DataSourceSetItem(const char * _name, const char * _xpath, ITypeInfo * _type) : DataSourceMetaItem(FVFFset, _name, _xpath, _type)
  68. {
  69. createChild();
  70. }
  71. DataSourceSetItem::DataSourceSetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(flags, in)
  72. {
  73. createChild();
  74. }
  75. void DataSourceSetItem::createChild()
  76. {
  77. ITypeInfo * childType = type->queryChildType()->queryPromotedType();
  78. record.addSimpleField("Item", NULL, childType);
  79. }
  80. void DataSourceSetItem::serialize(MemoryBuffer & out) const
  81. {
  82. out.append(flags);
  83. record.serialize(out);
  84. out.append(name);
  85. }
  86. //---------------------------------------------------------------------------
  87. DataSourceMetaData::DataSourceMetaData(IHqlExpression * _record, byte _numFieldsToIgnore, bool _randomIsOk, bool _isGrouped, unsigned _keyedSize)
  88. {
  89. init();
  90. numFieldsToIgnore = _numFieldsToIgnore;
  91. randomIsOk = _randomIsOk;
  92. isStoredFixedWidth = true;
  93. //MORE: Blobs aren't handled correctly in indexes....
  94. maxRecordSize = ::getMaxRecordSize(_record, MAX_RECORD_SIZE);
  95. keyedSize = _keyedSize;
  96. gatherFields(_record, false);
  97. if (_isGrouped)
  98. {
  99. Owned<ITypeInfo> type = makeBoolType();
  100. addSimpleField("__groupfollows__", NULL, type);
  101. maxRecordSize++;
  102. }
  103. if (isStoredFixedWidth)
  104. assertex(storedFixedSize == maxRecordSize);
  105. }
  106. DataSourceMetaData::DataSourceMetaData()
  107. {
  108. init();
  109. randomIsOk = true;
  110. isStoredFixedWidth = true;
  111. }
  112. DataSourceMetaData::DataSourceMetaData(type_t typeCode)
  113. {
  114. init();
  115. OwnedITypeInfo type;
  116. if (typeCode == type_unicode)
  117. type.setown(makeUnicodeType(UNKNOWN_LENGTH, 0));
  118. else
  119. type.setown(makeStringType(UNKNOWN_LENGTH, NULL, NULL));
  120. fields.append(*new DataSourceMetaItem(FVFFnone, "line", NULL, type));
  121. }
  122. void DataSourceMetaData::init()
  123. {
  124. keyedSize = 0;
  125. storedFixedSize = 0;
  126. maxRecordSize = 0;
  127. bitsRemaining = 0;
  128. numVirtualFields = 0;
  129. isStoredFixedWidth = false;
  130. randomIsOk = false;
  131. numFieldsToIgnore = 0;
  132. }
  133. DataSourceMetaData::DataSourceMetaData(MemoryBuffer & buffer)
  134. {
  135. numVirtualFields = 0;
  136. buffer.read(numFieldsToIgnore);
  137. buffer.read(randomIsOk);
  138. buffer.read(isStoredFixedWidth);
  139. buffer.read(storedFixedSize);
  140. buffer.read(keyedSize);
  141. buffer.read(maxRecordSize);
  142. unsigned numFields;
  143. buffer.read(numFields);
  144. for (unsigned idx=0; idx < numFields; idx++)
  145. {
  146. byte flags;
  147. buffer.read(flags);
  148. if (flags == FVFFdataset)
  149. fields.append(*new DataSourceDatasetItem(flags, buffer));
  150. else if (flags == FVFFset)
  151. fields.append(*new DataSourceSetItem(flags, buffer));
  152. else
  153. fields.append(*new DataSourceMetaItem(flags, buffer));
  154. if (flags == FVFFvirtual)
  155. ++numVirtualFields;
  156. }
  157. }
  158. void DataSourceMetaData::addFileposition()
  159. {
  160. addVirtualField("__fileposition__", NULL, makeIntType(8, false));
  161. }
  162. void DataSourceMetaData::addSimpleField(const char * name, const char * xpath, ITypeInfo * type)
  163. {
  164. ITypeInfo * promoted = type->queryPromotedType();
  165. unsigned size = promoted->getSize();
  166. unsigned thisBits = 0;
  167. if (size == UNKNOWN_LENGTH)
  168. isStoredFixedWidth = false;
  169. else if (type->getTypeCode() == type_bitfield)
  170. {
  171. thisBits = type->getBitSize();
  172. if (thisBits > bitsRemaining)
  173. {
  174. size = type->queryChildType()->getSize();
  175. storedFixedSize += size;
  176. bitsRemaining = size * 8;
  177. }
  178. bitsRemaining -= thisBits;
  179. }
  180. else
  181. storedFixedSize += size;
  182. if (thisBits == 0)
  183. bitsRemaining = 0;
  184. fields.append(*new DataSourceMetaItem(FVFFnone, name, xpath, type));
  185. }
  186. void DataSourceMetaData::addVirtualField(const char * name, const char * xpath, ITypeInfo * type)
  187. {
  188. fields.append(*new DataSourceMetaItem(FVFFvirtual, name, xpath, type));
  189. ++numVirtualFields;
  190. }
  191. void DataSourceMetaData::extractKeyedInfo(UnsignedArray & offsets, TypeInfoArray & types)
  192. {
  193. unsigned curOffset = 0;
  194. ForEachItemIn(i, fields)
  195. {
  196. if (curOffset >= keyedSize)
  197. break;
  198. DataSourceMetaItem & cur = fields.item(i);
  199. switch (cur.flags)
  200. {
  201. case FVFFnone:
  202. {
  203. offsets.append(curOffset);
  204. types.append(*LINK(cur.type));
  205. unsigned size = cur.type->getSize();
  206. assertex(size != UNKNOWN_LENGTH);
  207. curOffset += size;
  208. break;
  209. }
  210. case FVFFbeginrecord:
  211. case FVFFendrecord:
  212. break;
  213. default:
  214. throwUnexpected();
  215. }
  216. }
  217. offsets.append(curOffset);
  218. assertex(curOffset == keyedSize);
  219. }
  220. //MORE: Really this should create no_selects for the sub records, but pass on that for the moment.
  221. void DataSourceMetaData::gatherFields(IHqlExpression * expr, bool isConditional)
  222. {
  223. switch (expr->getOperator())
  224. {
  225. case no_record:
  226. gatherChildFields(expr, isConditional);
  227. break;
  228. case no_ifblock:
  229. {
  230. OwnedITypeInfo boolType = makeBoolType();
  231. OwnedITypeInfo voidType = makeVoidType();
  232. isStoredFixedWidth = false;
  233. fields.append(*new DataSourceMetaItem(FVFFbeginif, NULL, NULL, boolType));
  234. gatherChildFields(expr->queryChild(1), true);
  235. fields.append(*new DataSourceMetaItem(FVFFendif, NULL, NULL, voidType));
  236. break;
  237. }
  238. case no_field:
  239. {
  240. if (expr->hasProperty(__ifblockAtom))
  241. break;
  242. Linked<ITypeInfo> type = expr->queryType();
  243. IAtom * name = expr->queryName();
  244. IHqlExpression * nameAttr = expr->queryProperty(namedAtom);
  245. StringBuffer outname;
  246. if (nameAttr && nameAttr->queryChild(0)->queryValue())
  247. nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
  248. else
  249. outname.append(name).toLowerCase();
  250. StringBuffer xpathtext;
  251. const char * xpath = NULL;
  252. IHqlExpression * xpathAttr = expr->queryProperty(xpathAtom);
  253. if (xpathAttr && xpathAttr->queryChild(0)->queryValue())
  254. xpath = xpathAttr->queryChild(0)->queryValue()->getStringValue(xpathtext);
  255. if (isKey() && expr->hasProperty(blobAtom))
  256. type.setown(makeIntType(8, false));
  257. type_t tc = type->getTypeCode();
  258. if (tc == type_row)
  259. {
  260. OwnedITypeInfo voidType = makeVoidType();
  261. fields.append(*new DataSourceMetaItem(FVFFbeginrecord, outname, xpath, voidType));
  262. gatherChildFields(expr->queryRecord(), isConditional);
  263. fields.append(*new DataSourceMetaItem(FVFFendrecord, outname, xpath, voidType));
  264. }
  265. else if ((tc == type_table) || (tc == type_groupedtable))
  266. {
  267. isStoredFixedWidth = false;
  268. fields.append(*new DataSourceDatasetItem(outname, xpath, expr));
  269. }
  270. else if (tc == type_set)
  271. {
  272. isStoredFixedWidth = false;
  273. fields.append(*new DataSourceSetItem(outname, xpath, type));
  274. }
  275. else
  276. {
  277. if (type->getTypeCode() == type_alien)
  278. {
  279. IHqlAlienTypeInfo * alien = queryAlienType(type);
  280. type.set(alien->queryPhysicalType());
  281. }
  282. addSimpleField(outname, xpath, type);
  283. }
  284. break;
  285. }
  286. }
  287. }
  288. void DataSourceMetaData::gatherChildFields(IHqlExpression * expr, bool isConditional)
  289. {
  290. bitsRemaining = 0;
  291. ForEachChild(idx, expr)
  292. gatherFields(expr->queryChild(idx), isConditional);
  293. }
  294. unsigned DataSourceMetaData::numKeyedColumns() const
  295. {
  296. unsigned count = 0;
  297. unsigned curOffset = 0;
  298. ForEachItemIn(i, fields)
  299. {
  300. if (curOffset >= keyedSize)
  301. break;
  302. DataSourceMetaItem & cur = fields.item(i);
  303. switch (cur.flags)
  304. {
  305. case FVFFnone:
  306. {
  307. unsigned size = cur.type->getSize();
  308. assertex(size != UNKNOWN_LENGTH);
  309. curOffset += size;
  310. count++;
  311. break;
  312. }
  313. default:
  314. throwUnexpected();
  315. }
  316. }
  317. return count;
  318. assertex(curOffset == keyedSize);
  319. }
  320. unsigned DataSourceMetaData::numColumns() const
  321. {
  322. return fields.ordinality() - numFieldsToIgnore;
  323. }
  324. ITypeInfo * DataSourceMetaData::queryType(unsigned column) const
  325. {
  326. return fields.item(column).type;
  327. }
  328. unsigned DataSourceMetaData::queryFieldFlags(unsigned column) const
  329. {
  330. return fields.item(column).flags;
  331. }
  332. const char * DataSourceMetaData::queryName(unsigned column) const
  333. {
  334. return fields.item(column).name;
  335. }
  336. const char * DataSourceMetaData::queryXPath(unsigned column) const
  337. {
  338. return fields.item(column).xpath;
  339. }
  340. IFvDataSourceMetaData * DataSourceMetaData::queryChildMeta(unsigned column) const
  341. {
  342. return fields.item(column).queryChildMeta();
  343. }
  344. IFvDataSource * DataSourceMetaData::createChildDataSource(unsigned column, unsigned len, const void * data)
  345. {
  346. DataSourceMetaData * childMeta = fields.item(column).queryChildMeta();
  347. if (childMeta)
  348. return new NestedDataSource(*childMeta, len, data);
  349. return NULL;
  350. }
  351. bool DataSourceMetaData::supportsRandomSeek() const
  352. {
  353. return randomIsOk;
  354. }
  355. void DataSourceMetaData::serialize(MemoryBuffer & buffer) const
  356. {
  357. //NB: Update NullDataSourceMeta if this changes....
  358. buffer.append(numFieldsToIgnore);
  359. buffer.append(randomIsOk);
  360. buffer.append(isStoredFixedWidth);
  361. buffer.append(storedFixedSize);
  362. buffer.append(keyedSize);
  363. buffer.append(maxRecordSize);
  364. unsigned numFields = fields.ordinality();
  365. buffer.append(numFields);
  366. for (unsigned idx=0; idx < numFields; idx++)
  367. {
  368. fields.item(idx).serialize(buffer);
  369. }
  370. }
  371. size32_t DataSourceMetaData::getRecordSize(const void *rec)
  372. {
  373. if (isStoredFixedWidth)
  374. return storedFixedSize;
  375. if (!rec)
  376. return maxRecordSize;
  377. const byte * data = (const byte *)rec;
  378. unsigned curOffset = 0;
  379. unsigned bitsRemaining = 0;
  380. unsigned max = fields.ordinality() - numVirtualFields;
  381. for (unsigned idx=0; idx < max; idx++)
  382. {
  383. ITypeInfo & type = *fields.item(idx).type;
  384. unsigned size = type.getSize();
  385. if (size == UNKNOWN_LENGTH)
  386. {
  387. const byte * cur = data + curOffset;
  388. switch (type.getTypeCode())
  389. {
  390. case type_data:
  391. case type_string:
  392. case type_table:
  393. case type_groupedtable:
  394. size = *((unsigned *)cur) + sizeof(unsigned);
  395. break;
  396. case type_set:
  397. size = *((unsigned *)(cur + sizeof(bool))) + sizeof(unsigned) + sizeof(bool);
  398. break;
  399. case type_qstring:
  400. size = rtlQStrSize(*((unsigned *)cur)) + sizeof(unsigned);
  401. break;
  402. case type_unicode:
  403. size = *((unsigned *)cur)*2 + sizeof(unsigned);
  404. break;
  405. case type_utf8:
  406. size = sizeof(unsigned) + rtlUtf8Size(*(unsigned *)cur, cur+sizeof(unsigned));
  407. break;
  408. case type_varstring:
  409. size = strlen((char *)cur)+1;
  410. break;
  411. case type_varunicode:
  412. size = (rtlUnicodeStrlen((UChar *)cur)+1)*2;
  413. break;
  414. case type_packedint:
  415. size = rtlGetPackedSize(cur);
  416. break;
  417. default:
  418. UNIMPLEMENTED;
  419. }
  420. }
  421. if (type.getTypeCode() == type_bitfield)
  422. {
  423. unsigned thisBits = type.getBitSize();
  424. if (thisBits > bitsRemaining)
  425. {
  426. size = type.queryChildType()->getSize();
  427. bitsRemaining = size * 8;
  428. }
  429. else
  430. size = 0;
  431. bitsRemaining -= thisBits;
  432. }
  433. else
  434. bitsRemaining = 0;
  435. curOffset += size;
  436. }
  437. return curOffset;
  438. }
  439. size32_t DataSourceMetaData::getFixedSize() const
  440. {
  441. if (isStoredFixedWidth)
  442. return storedFixedSize;
  443. return 0;
  444. }
  445. IFvDataSourceMetaData * deserializeDataSourceMeta(MemoryBuffer & in)
  446. {
  447. return new DataSourceMetaData(in);
  448. }
  449. //---------------------------------------------------------------------------
  450. RowBlock::RowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset)
  451. {
  452. buffer.swapWith(_buffer);
  453. start = _start;
  454. startOffset = _startOffset;
  455. numRows = 0;
  456. }
  457. RowBlock::RowBlock(__int64 _start, __int64 _startOffset)
  458. {
  459. start = _start;
  460. startOffset = _startOffset;
  461. numRows = 0;
  462. }
  463. void RowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  464. {
  465. row = getNextRow();
  466. offset = startOffset + buffer.length();
  467. }
  468. FixedRowBlock::FixedRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, size32_t _fixedRecordSize) : RowBlock(_buffer, _start, _startOffset)
  469. {
  470. if (_fixedRecordSize == 0) _fixedRecordSize = 1;
  471. fixedRecordSize = _fixedRecordSize;
  472. numRows = buffer.length() / fixedRecordSize;
  473. }
  474. const void * FixedRowBlock::fetchRow(__int64 offset, size32_t & len)
  475. {
  476. __int64 maxOffset = startOffset + buffer.length();
  477. if (offset < startOffset || offset >= maxOffset)
  478. return NULL;
  479. len = fixedRecordSize;
  480. return buffer.toByteArray() + (offset - startOffset);
  481. }
  482. const void * FixedRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  483. {
  484. if (row < start || row >= start + numRows)
  485. return NULL;
  486. unsigned index = (unsigned)(row - start);
  487. unsigned blockOffset = index * fixedRecordSize;
  488. len = fixedRecordSize;
  489. rowOffset = startOffset + blockOffset;
  490. return buffer.toByteArray() + blockOffset;
  491. }
  492. VariableRowBlock::VariableRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, IRecordSizeEx * recordSize, bool isLast) : RowBlock(_buffer, _start, _startOffset)
  493. {
  494. const char * buff = buffer.toByteArray();
  495. unsigned cur = 0;
  496. unsigned max = buffer.length();
  497. unsigned maxCur = max;
  498. if (!isLast)
  499. maxCur -= recordSize->getRecordSize(NULL);
  500. while (cur < maxCur)
  501. {
  502. rowIndex.append(cur);
  503. cur += recordSize->getRecordSize(max-cur, buff + cur);
  504. }
  505. buffer.setLength(cur);
  506. rowIndex.append(cur);
  507. numRows = rowIndex.ordinality()-1;
  508. }
  509. VariableRowBlock::VariableRowBlock(MemoryBuffer & inBuffer, __int64 _start) : RowBlock(_start, 0)
  510. {
  511. inBuffer.read(numRows);
  512. for (unsigned row = 0; row < numRows; row++)
  513. {
  514. unsigned thisLength;
  515. rowIndex.append(buffer.length());
  516. inBuffer.read(thisLength);
  517. buffer.append(thisLength, inBuffer.readDirect(thisLength));
  518. }
  519. rowIndex.append(buffer.length());
  520. }
  521. const void * VariableRowBlock::fetchRow(__int64 offset, size32_t & len)
  522. {
  523. __int64 maxOffset = startOffset + buffer.length();
  524. if (offset < startOffset || offset >= maxOffset)
  525. return NULL;
  526. size32_t rowOffset = (size32_t)(offset - startOffset);
  527. unsigned pos = rowIndex.find(rowOffset);
  528. if (pos == NotFound)
  529. return NULL;
  530. len = rowIndex.item(pos+1)-rowOffset;
  531. return buffer.toByteArray() + rowOffset;
  532. }
  533. const void * VariableRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  534. {
  535. if (row < start || row >= start + numRows)
  536. return NULL;
  537. unsigned index = (unsigned)(row - start);
  538. unsigned blockOffset = rowIndex.item(index);
  539. len = rowIndex.item(index+1) - blockOffset;
  540. rowOffset = startOffset + blockOffset;
  541. return buffer.toByteArray() + blockOffset;
  542. }
  543. //---------------------------------------------------------------------------
  544. offset_t calculateNextOffset(const char * data, unsigned len)
  545. {
  546. offset_t offset = *(offset_t *)(data + len - sizeof(offset_t) - sizeof(unsigned short));
  547. return offset + *(unsigned short*)(data + len - sizeof(unsigned short));
  548. }
  549. void FilePosFixedRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  550. {
  551. row = start + numRows;
  552. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  553. }
  554. void FilePosVariableRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  555. {
  556. row = start + numRows;
  557. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  558. }
  559. //---------------------------------------------------------------------------
  560. static int rowCacheId;
  561. void RowCache::addRowsOwn(RowBlock * rows)
  562. {
  563. if (allRows.ordinality() == MaxBlocksCached)
  564. makeRoom();
  565. unsigned newPos = getInsertPosition(rows->getStartRow());
  566. allRows.add(*rows, newPos);
  567. ages.add(++rowCacheId, newPos);
  568. }
  569. bool RowCache::getCacheRow(__int64 row, RowLocation & location)
  570. {
  571. unsigned numRows = allRows.ordinality();
  572. if (numRows == 0)
  573. return false;
  574. const RowBlock & first = allRows.item(0);
  575. if (row < first.getStartRow())
  576. {
  577. location.bestRow = 0;
  578. location.bestOffset = 0;
  579. return false;
  580. }
  581. unsigned best = getBestRow(row);
  582. ages.replace(++rowCacheId, best);
  583. RowBlock & cur = allRows.item(best);
  584. location.matchRow = cur.getRow(row, location.matchLength, location.bestOffset);
  585. if (location.matchRow)
  586. return true;
  587. if (location.bestRow < cur.getNextRow())
  588. {
  589. cur.getNextStoredOffset(location.bestRow, location.bestOffset);
  590. }
  591. return false;
  592. }
  593. //Find the rowBlock that contains the expected row.
  594. //Return the *previous* RowBlock if no match is found.
  595. unsigned RowCache::getBestRow(__int64 row)
  596. {
  597. unsigned max = allRows.ordinality();
  598. int start = 0;
  599. int end = max;
  600. while (end - start > 1)
  601. {
  602. int mid = (start + end) >> 1;
  603. RowBlock & cur = allRows.item(mid);
  604. if (row >= cur.getStartRow())
  605. start = mid;
  606. else
  607. end = mid;
  608. }
  609. assertex(row >= allRows.item(start).getStartRow());
  610. assertex(start != max);
  611. return start;
  612. }
  613. unsigned RowCache::getInsertPosition(__int64 row)
  614. {
  615. unsigned max = allRows.ordinality();
  616. int start = 0;
  617. int end = max;
  618. while (end - start > 1)
  619. {
  620. int mid = (start + end) >> 1;
  621. RowBlock & cur = allRows.item(mid);
  622. if (row >= cur.getStartRow())
  623. start = mid;
  624. else
  625. end = mid;
  626. }
  627. if (start != max)
  628. {
  629. if (row >= allRows.item(start).getStartRow())
  630. start++;
  631. }
  632. assertex(start == max || (row < allRows.item(start).getStartRow()));
  633. return start;
  634. }
  635. void RowCache::makeRoom()
  636. {
  637. #if 0
  638. //For testing the caching by throwing away random blocks
  639. while (allRows.ordinality() > MinBlocksCached)
  640. {
  641. unsigned index = getRandom() % allRows.ordinality();
  642. allRows.remove(index);
  643. ages.remove(index);
  644. }
  645. return;
  646. #endif
  647. unsigned numToFree = allRows.ordinality() - MinBlocksCached;
  648. RowBlock * * oldestRow = new RowBlock * [numToFree];
  649. __int64 * oldestAge = new __int64[numToFree];
  650. unsigned numGot = 0;
  651. ForEachItemIn(idx, ages)
  652. {
  653. __int64 curAge = ages.item(idx);
  654. unsigned compare = numGot;
  655. while (compare != 0)
  656. {
  657. if (curAge >= oldestAge[compare-1])
  658. break;
  659. compare--;
  660. }
  661. if (compare < numToFree)
  662. {
  663. unsigned copySize = numGot - compare;
  664. if (numGot == numToFree)
  665. copySize--; //NB: Cannot go negative because compare < numToFree, numToFree == numGot => compare < numGot
  666. if (copySize)
  667. {
  668. memmove(oldestAge + compare + 1, oldestAge + compare, copySize * sizeof(*oldestAge));
  669. memmove(oldestRow + compare + 1, oldestRow + compare, copySize * sizeof(*oldestRow));
  670. }
  671. oldestAge[compare] = curAge;
  672. oldestRow[compare] = &allRows.item(idx);
  673. if (numGot != numToFree)
  674. numGot++;
  675. }
  676. }
  677. unsigned max = allRows.ordinality();
  678. unsigned i;
  679. for (i = 0; i < max; )
  680. {
  681. for (unsigned j = 0; j < numGot; j++)
  682. {
  683. if (oldestRow[j] == &allRows.item(i))
  684. {
  685. allRows.remove(i);
  686. ages.remove(i);
  687. max--;
  688. goto testNext;
  689. }
  690. }
  691. i++;
  692. testNext: ;
  693. }
  694. delete [] oldestRow;
  695. delete [] oldestAge;
  696. }
  697. //---------------------------------------------------------------------------
  698. FVDataSource::FVDataSource()
  699. {
  700. transformer = NULL;
  701. extraFieldsSize = 0;
  702. openCount = 0;
  703. appendFileposition = false;
  704. }
  705. FVDataSource::~FVDataSource()
  706. {
  707. //ensure all refs into the dll are cleared before it is unloaded
  708. returnedRecordSize.clear();
  709. }
  710. void FVDataSource::addFileposition()
  711. {
  712. appendFileposition = true;
  713. transformedMeta->addFileposition(); // This may modify the other metas as well!
  714. }
  715. void FVDataSource::copyRow(MemoryBuffer & out, const void * src, size32_t length)
  716. {
  717. if (transformer)
  718. {
  719. unsigned curLen = out.length();
  720. size32_t maxSize = transformedMeta->getMaxRecordSize();
  721. void * target = out.reserve(maxSize);
  722. //MORE: won't cope with dynamic maxlengths
  723. RtlStaticRowBuilder rowBuilder(target, maxSize);
  724. unsigned copied = transformer(rowBuilder, (const byte *)src);
  725. out.rewrite(curLen + copied);
  726. }
  727. else
  728. out.append(length, src);
  729. }
  730. bool FVDataSource::fetchRow(MemoryBuffer & out, __int64 offset)
  731. {
  732. if (!transformer)
  733. return fetchRawRow(out, offset);
  734. MemoryBuffer temp;
  735. if (fetchRowData(temp, offset))
  736. {
  737. copyRow(out, temp.toByteArray(), temp.length());
  738. if (appendFileposition)
  739. out.append(offset);
  740. return true;
  741. }
  742. return false;
  743. }
  744. bool FVDataSource::fetchRawRow(MemoryBuffer & out, __int64 offset)
  745. {
  746. if (fetchRowData(out, offset))
  747. {
  748. if (appendFileposition)
  749. out.append(offset);
  750. return true;
  751. }
  752. return false;
  753. }
  754. bool FVDataSource::getRow(MemoryBuffer & out, __int64 row)
  755. {
  756. size32_t length;
  757. const void * data;
  758. unsigned __int64 offset = 0;
  759. if (getRowData(row, length, data, offset))
  760. {
  761. copyRow(out, data, length);
  762. if (appendFileposition)
  763. out.append(offset);
  764. return true;
  765. }
  766. return false;
  767. }
  768. bool FVDataSource::getRawRow(MemoryBuffer & out, __int64 row)
  769. {
  770. size32_t length;
  771. const void * data;
  772. unsigned __int64 offset = 0;
  773. if (getRowData(row, length, data, offset))
  774. {
  775. out.append(length-extraFieldsSize, data);
  776. if (appendFileposition)
  777. out.append(offset);
  778. return true;
  779. }
  780. return false;
  781. }
  782. void FVDataSource::loadDll(const char * wuid)
  783. {
  784. //MORE: This code should be commoned up and available in the work unit code or something...
  785. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  786. Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid, false);
  787. //Plugins should already be loaded when they were registered with the ViewTransformerRegistry
  788. //Something like the following code could be used to check the plugin version...
  789. #if 0
  790. Owned<IConstWUPluginIterator> plugins = &wu->getPlugins();
  791. SafePluginMap * map = NULL;
  792. ForEach(*plugins)
  793. {
  794. IConstWUPlugin &thisplugin = plugins->query();
  795. SCMStringBuffer name, version;
  796. thisplugin.getPluginName(name);
  797. thisplugin.getPluginVersion(version);
  798. Owned<ILoadedDllEntry> loadedPlugin = map->getPluginDll(name.str(), version.str(), true);
  799. if (!loadedPlugin)
  800. break;
  801. }
  802. #endif
  803. Owned<IConstWUQuery> q = wu->getQuery();
  804. SCMStringBuffer dllname;
  805. q->getQueryDllName(dllname);
  806. // MORE - leaks....
  807. loadedDll.setown(queryDllServer().loadDll(dllname.str(), DllLocationAnywhere));
  808. }
  809. IFvDataSourceMetaData * FVDataSource::queryMetaData()
  810. {
  811. return transformedMeta;
  812. }
  813. bool FVDataSource::setReturnedInfoFromResult()
  814. {
  815. SCMStringBuffer s;
  816. wuResult->getResultEclSchema(s);
  817. returnedRecord.setown(parseQuery(s.str()));
  818. if (!returnedRecord)
  819. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  820. bool isKey = false;
  821. bool isGrouped = false; // this isn't strictly true...it could be true for an internal result, but no current flag to test
  822. returnedMeta.setown(new DataSourceMetaData(returnedRecord, 0, true, isGrouped, 0));
  823. transformedRecord.setown(getSimplifiedRecord(returnedRecord, isKey));
  824. if (!transformedRecord)
  825. {
  826. //No transformations needed, so don't need to loaded the dll containing the transform functions etc.
  827. transformedRecord.set(returnedRecord);
  828. returnedRecordSize.set(returnedMeta);
  829. }
  830. else
  831. {
  832. loadDll(wuid);
  833. s.clear();
  834. wuResult->getResultRecordSizeEntry(s);
  835. if (s.length())
  836. {
  837. typedef IRecordSize * (* recSizeFunc)();
  838. recSizeFunc func = (recSizeFunc)loadedDll->getEntry(s.str());
  839. Owned<IRecordSize> createdRecordSize = func();
  840. returnedRecordSize.setown(new RecordSizeToEx(createdRecordSize));
  841. }
  842. s.clear();
  843. wuResult->getResultTransformerEntry(s);
  844. if (s.length())
  845. transformer = (rowTransformFunction)loadedDll->getEntry(s.str());
  846. }
  847. transformedMeta.setown(new DataSourceMetaData(transformedRecord, 0, true, isGrouped, 0));
  848. return (returnedRecordSize != NULL);
  849. }
  850. //---------------------------------------------------------------------------
  851. __int64 PagedDataSource::numRows(bool force)
  852. {
  853. if (force && (totalRows == UNKNOWN_NUM_ROWS))
  854. {
  855. //MORE: Need to go and work it out - however painful...
  856. }
  857. return totalRows;
  858. }
  859. bool PagedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  860. {
  861. if ((row < 0) || ((unsigned __int64)row > totalRows))
  862. return false;
  863. RowLocation location;
  864. loop
  865. {
  866. if (cache.getCacheRow(row, location))
  867. {
  868. length = location.matchLength;
  869. data = location.matchRow;
  870. offset = location.bestOffset;
  871. return true;
  872. }
  873. improveLocation(row, location);
  874. if (!loadBlock(location.bestRow, location.bestOffset))
  875. return false;
  876. }
  877. }
  878. void PagedDataSource::improveLocation(__int64 row, RowLocation & location)
  879. {
  880. }
  881. //---------------------------------------------------------------------------
  882. NestedDataSource::NestedDataSource(DataSourceMetaData & meta, unsigned len, const void * data)
  883. {
  884. returnedMeta.set(&meta);
  885. returnedRecordSize.set(returnedMeta);
  886. transformedMeta.set(returnedMeta);
  887. totalSize = len;
  888. MemoryBuffer temp;
  889. temp.append(len, data);
  890. if (returnedMeta->isFixedSize())
  891. rows.setown(new FixedRowBlock(temp, 0, 0, returnedMeta->fixedSize()));
  892. else
  893. rows.setown(new VariableRowBlock(temp, 0, 0, returnedRecordSize, true));
  894. }
  895. bool NestedDataSource::init()
  896. {
  897. return true;
  898. }
  899. __int64 NestedDataSource::numRows(bool force)
  900. {
  901. return rows->getNextRow() - rows->getStartRow();
  902. }
  903. bool NestedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  904. {
  905. data = rows->getRow(row, length, offset);
  906. return (data != NULL);
  907. }
  908. //---------------------------------------------------------------------------
  909. NullDataSource::NullDataSource(IHqlExpression * _record, bool _isGrouped, unsigned _keyedSize)
  910. : meta(_record, 0, true, _isGrouped, _keyedSize)
  911. {
  912. }
  913. FailureDataSource::FailureDataSource(IHqlExpression * _record, IException * _error, bool _isGrouped, unsigned _keyedSize)
  914. : NullDataSource(_record, _isGrouped, _keyedSize), error(_error)
  915. {
  916. }
  917. //---------------------------------------------------------------------------
  918. IHqlExpression * parseQuery(const char * text)
  919. {
  920. MultiErrorReceiver errs;
  921. OwnedHqlExpr ret = ::parseQuery(text, &errs);
  922. if (errs.errCount() == 0)
  923. return ret.getClear();
  924. for (unsigned i=0; i < errs.errCount(); i++)
  925. {
  926. StringBuffer msg;
  927. PrintLog("%d %s", errs.item(i)->getLine(), errs.item(i)->errorMessage(msg).str());
  928. }
  929. return NULL;
  930. }
  931. IFvDataSourceMetaData * createMetaData(IConstWUResult * wuResult)
  932. {
  933. SCMStringBuffer s;
  934. wuResult->getResultEclSchema(s);
  935. OwnedHqlExpr record = parseQuery(s.str());
  936. if (!record)
  937. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  938. OwnedHqlExpr simplifiedRecord = getSimplifiedRecord(record, false);
  939. bool isGrouped = false; // more not sure this is strictly true...
  940. if (!simplifiedRecord)
  941. return new DataSourceMetaData(record, 0, true, isGrouped, 0);
  942. return new DataSourceMetaData(simplifiedRecord, 0, true, isGrouped, 0);
  943. }