fvsource.cpp 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "platform.h"
  15. #include "jliball.hpp"
  16. #include "eclrtl.hpp"
  17. #include "rtlds_imp.hpp"
  18. #include "fvresultset.ipp"
  19. #include "fileview.hpp"
  20. #include "fvsource.ipp"
  21. #include "hqlerror.hpp"
  22. #include "eclhelper.hpp"
  23. #include "hqlattr.hpp"
  24. #include "hqlutil.hpp"
  25. #include "fvdatasource.hpp"
  26. #define MAX_RECORD_SIZE 4096
  27. inline void appendNextXpathName(StringBuffer &s, const char *&xpath)
  28. {
  29. while (*xpath && !strchr("*[/", *xpath))
  30. s.append(*xpath++);
  31. xpath = strchr(xpath, '/');
  32. }
  33. void splitXmlTagNamesFromXPath(const char *xpath, StringAttr &inner, StringAttr *outer=NULL)
  34. {
  35. if (!xpath || !xpath)
  36. return;
  37. StringBuffer s1;
  38. StringBuffer s2;
  39. appendNextXpathName(s1, xpath);
  40. if (outer && xpath)
  41. appendNextXpathName(s2, ++xpath);
  42. if (xpath) //xpath too deep
  43. return;
  44. if (!s2.length())
  45. inner.set(s1.str());
  46. else
  47. {
  48. inner.set(s2.str());
  49. outer->set(s1.str());
  50. }
  51. if (!inner.get())
  52. inner.set("");
  53. if (outer && !outer->get())
  54. outer->set("");
  55. }
  56. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, const char * _name, const char * _xpath, ITypeInfo * _type)
  57. {
  58. flags = _flags;
  59. name.set(_name);
  60. type.set(_type);
  61. xpath.set(_xpath);
  62. splitXmlTagNamesFromXPath(_xpath, tagname);
  63. }
  64. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, MemoryBuffer & in)
  65. {
  66. flags = _flags;
  67. in.read(name);
  68. in.read(xpath);
  69. type.setown(deserializeType(in));
  70. splitXmlTagNamesFromXPath(xpath.get(), tagname);
  71. }
  72. void DataSourceMetaItem::serialize(MemoryBuffer & out) const
  73. {
  74. out.append(flags);
  75. out.append(name);
  76. out.append(xpath);
  77. type->serialize(out);
  78. }
  79. //---------------------------------------------------------------------------
  80. DataSourceDatasetItem::DataSourceDatasetItem(const char * _name, const char * _xpath, IHqlExpression * expr) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(expr->queryRecord(), 0, true, false, false)
  81. {
  82. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  83. name.set(_name);
  84. xpath.set(_xpath);
  85. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  86. if (!record.tagname.length())
  87. record.tagname.set("Row");
  88. }
  89. DataSourceDatasetItem::DataSourceDatasetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(in)
  90. {
  91. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  92. in.read(name);
  93. }
  94. void DataSourceDatasetItem::serialize(MemoryBuffer & out) const
  95. {
  96. out.append(flags);
  97. record.serialize(out);
  98. out.append(name);
  99. }
  100. //---------------------------------------------------------------------------
  101. DataSourceSetItem::DataSourceSetItem(const char * _name, const char * _xpath, ITypeInfo * _type) : DataSourceMetaItem(FVFFset, _name, NULL, _type)
  102. {
  103. createChild();
  104. xpath.set(_xpath);
  105. StringBuffer attr;
  106. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  107. if (!record.tagname.length())
  108. record.tagname.set("Item");
  109. }
  110. DataSourceSetItem::DataSourceSetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(flags, in)
  111. {
  112. createChild();
  113. splitXmlTagNamesFromXPath(xpath.get(), record.tagname, &tagname);
  114. if (!record.tagname.length())
  115. record.tagname.set("Item");
  116. }
  117. void DataSourceSetItem::createChild()
  118. {
  119. ITypeInfo * childType = type->queryChildType()->queryPromotedType();
  120. record.addSimpleField("Item", NULL, childType);
  121. }
  122. void DataSourceSetItem::serialize(MemoryBuffer & out) const
  123. {
  124. out.append(flags);
  125. record.serialize(out);
  126. out.append(name);
  127. }
  128. //---------------------------------------------------------------------------
  129. DataSourceMetaData::DataSourceMetaData(IHqlExpression * _record, byte _numFieldsToIgnore, bool _randomIsOk, bool _isGrouped, unsigned _keyedSize)
  130. {
  131. init();
  132. numFieldsToIgnore = _numFieldsToIgnore;
  133. randomIsOk = _randomIsOk;
  134. isStoredFixedWidth = true;
  135. //MORE: Blobs aren't handled correctly in indexes....
  136. maxRecordSize = ::getMaxRecordSize(_record, MAX_RECORD_SIZE);
  137. keyedSize = _keyedSize;
  138. gatherFields(_record, false);
  139. if (_isGrouped)
  140. {
  141. Owned<ITypeInfo> type = makeBoolType();
  142. addSimpleField("__groupfollows__", NULL, type);
  143. maxRecordSize++;
  144. }
  145. gatherAttributes();
  146. if (isStoredFixedWidth)
  147. assertex(storedFixedSize == maxRecordSize);
  148. }
  149. DataSourceMetaData::DataSourceMetaData()
  150. {
  151. init();
  152. randomIsOk = true;
  153. isStoredFixedWidth = true;
  154. }
  155. DataSourceMetaData::DataSourceMetaData(type_t typeCode)
  156. {
  157. init();
  158. OwnedITypeInfo type;
  159. if (typeCode == type_unicode)
  160. type.setown(makeUnicodeType(UNKNOWN_LENGTH, 0));
  161. else
  162. type.setown(makeStringType(UNKNOWN_LENGTH, NULL, NULL));
  163. fields.append(*new DataSourceMetaItem(FVFFnone, "line", NULL, type));
  164. }
  165. void DataSourceMetaData::init()
  166. {
  167. keyedSize = 0;
  168. storedFixedSize = 0;
  169. maxRecordSize = 0;
  170. bitsRemaining = 0;
  171. numVirtualFields = 0;
  172. isStoredFixedWidth = false;
  173. randomIsOk = false;
  174. numFieldsToIgnore = 0;
  175. }
  176. DataSourceMetaData::DataSourceMetaData(MemoryBuffer & buffer)
  177. {
  178. numVirtualFields = 0;
  179. buffer.read(numFieldsToIgnore);
  180. buffer.read(randomIsOk);
  181. buffer.read(isStoredFixedWidth);
  182. buffer.read(storedFixedSize);
  183. buffer.read(keyedSize);
  184. buffer.read(maxRecordSize);
  185. unsigned numFields;
  186. buffer.read(numFields);
  187. for (unsigned idx=0; idx < numFields; idx++)
  188. {
  189. byte flags;
  190. buffer.read(flags);
  191. if (flags == FVFFdataset)
  192. fields.append(*new DataSourceDatasetItem(flags, buffer));
  193. else if (flags == FVFFset)
  194. fields.append(*new DataSourceSetItem(flags, buffer));
  195. else
  196. fields.append(*new DataSourceMetaItem(flags, buffer));
  197. if (flags == FVFFvirtual)
  198. ++numVirtualFields;
  199. }
  200. gatherAttributes();
  201. }
  202. void DataSourceMetaData::addFileposition()
  203. {
  204. addVirtualField("__fileposition__", NULL, makeIntType(8, false));
  205. }
  206. void DataSourceMetaData::addSimpleField(const char * name, const char * xpath, ITypeInfo * type)
  207. {
  208. ITypeInfo * promoted = type->queryPromotedType();
  209. unsigned size = promoted->getSize();
  210. unsigned thisBits = 0;
  211. if (size == UNKNOWN_LENGTH)
  212. isStoredFixedWidth = false;
  213. else if (type->getTypeCode() == type_bitfield)
  214. {
  215. thisBits = type->getBitSize();
  216. if (thisBits > bitsRemaining)
  217. {
  218. size = type->queryChildType()->getSize();
  219. storedFixedSize += size;
  220. bitsRemaining = size * 8;
  221. }
  222. bitsRemaining -= thisBits;
  223. }
  224. else
  225. storedFixedSize += size;
  226. if (thisBits == 0)
  227. bitsRemaining = 0;
  228. fields.append(*new DataSourceMetaItem(FVFFnone, name, xpath, type));
  229. }
  230. void DataSourceMetaData::addVirtualField(const char * name, const char * xpath, ITypeInfo * type)
  231. {
  232. fields.append(*new DataSourceMetaItem(FVFFvirtual, name, xpath, type));
  233. ++numVirtualFields;
  234. }
  235. void DataSourceMetaData::extractKeyedInfo(UnsignedArray & offsets, TypeInfoArray & types)
  236. {
  237. unsigned curOffset = 0;
  238. ForEachItemIn(i, fields)
  239. {
  240. if (curOffset >= keyedSize)
  241. break;
  242. DataSourceMetaItem & cur = fields.item(i);
  243. switch (cur.flags)
  244. {
  245. case FVFFnone:
  246. {
  247. offsets.append(curOffset);
  248. types.append(*LINK(cur.type));
  249. unsigned size = cur.type->getSize();
  250. assertex(size != UNKNOWN_LENGTH);
  251. curOffset += size;
  252. break;
  253. }
  254. case FVFFbeginrecord:
  255. case FVFFendrecord:
  256. break;
  257. default:
  258. throwUnexpected();
  259. }
  260. }
  261. offsets.append(curOffset);
  262. assertex(curOffset == keyedSize);
  263. }
  264. //MORE: Really this should create no_selects for the sub records, but pass on that for the moment.
  265. void DataSourceMetaData::gatherFields(IHqlExpression * expr, bool isConditional)
  266. {
  267. switch (expr->getOperator())
  268. {
  269. case no_record:
  270. gatherChildFields(expr, isConditional);
  271. break;
  272. case no_ifblock:
  273. {
  274. OwnedITypeInfo boolType = makeBoolType();
  275. OwnedITypeInfo voidType = makeVoidType();
  276. isStoredFixedWidth = false;
  277. fields.append(*new DataSourceMetaItem(FVFFbeginif, NULL, NULL, boolType));
  278. gatherChildFields(expr->queryChild(1), true);
  279. fields.append(*new DataSourceMetaItem(FVFFendif, NULL, NULL, voidType));
  280. break;
  281. }
  282. case no_field:
  283. {
  284. if (expr->hasProperty(__ifblockAtom))
  285. break;
  286. Linked<ITypeInfo> type = expr->queryType();
  287. IAtom * name = expr->queryName();
  288. IHqlExpression * nameAttr = expr->queryProperty(namedAtom);
  289. StringBuffer outname;
  290. if (nameAttr && nameAttr->queryChild(0)->queryValue())
  291. nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
  292. else
  293. outname.append(name).toLowerCase();
  294. StringBuffer xpathtext;
  295. const char * xpath = NULL;
  296. IHqlExpression * xpathAttr = expr->queryProperty(xpathAtom);
  297. if (xpathAttr && xpathAttr->queryChild(0)->queryValue())
  298. xpath = xpathAttr->queryChild(0)->queryValue()->getStringValue(xpathtext);
  299. if (isKey() && expr->hasProperty(blobAtom))
  300. type.setown(makeIntType(8, false));
  301. type_t tc = type->getTypeCode();
  302. if (tc == type_row)
  303. {
  304. OwnedITypeInfo voidType = makeVoidType();
  305. fields.append(*new DataSourceMetaItem(FVFFbeginrecord, outname, xpath, voidType));
  306. gatherChildFields(expr->queryRecord(), isConditional);
  307. fields.append(*new DataSourceMetaItem(FVFFendrecord, outname, xpath, voidType));
  308. }
  309. else if ((tc == type_table) || (tc == type_groupedtable))
  310. {
  311. isStoredFixedWidth = false;
  312. fields.append(*new DataSourceDatasetItem(outname, xpath, expr));
  313. }
  314. else if (tc == type_set)
  315. {
  316. isStoredFixedWidth = false;
  317. fields.append(*new DataSourceSetItem(outname, xpath, type));
  318. }
  319. else
  320. {
  321. if (type->getTypeCode() == type_alien)
  322. {
  323. IHqlAlienTypeInfo * alien = queryAlienType(type);
  324. type.set(alien->queryPhysicalType());
  325. }
  326. addSimpleField(outname, xpath, type);
  327. }
  328. break;
  329. }
  330. }
  331. }
  332. void DataSourceMetaData::gatherChildFields(IHqlExpression * expr, bool isConditional)
  333. {
  334. bitsRemaining = 0;
  335. ForEachChild(idx, expr)
  336. gatherFields(expr->queryChild(idx), isConditional);
  337. }
  338. unsigned DataSourceMetaData::numKeyedColumns() const
  339. {
  340. unsigned count = 0;
  341. unsigned curOffset = 0;
  342. ForEachItemIn(i, fields)
  343. {
  344. if (curOffset >= keyedSize)
  345. break;
  346. DataSourceMetaItem & cur = fields.item(i);
  347. switch (cur.flags)
  348. {
  349. case FVFFnone:
  350. {
  351. unsigned size = cur.type->getSize();
  352. assertex(size != UNKNOWN_LENGTH);
  353. curOffset += size;
  354. count++;
  355. break;
  356. }
  357. default:
  358. throwUnexpected();
  359. }
  360. }
  361. return count;
  362. assertex(curOffset == keyedSize);
  363. }
  364. unsigned DataSourceMetaData::numColumns() const
  365. {
  366. return fields.ordinality() - numFieldsToIgnore;
  367. }
  368. ITypeInfo * DataSourceMetaData::queryType(unsigned column) const
  369. {
  370. return fields.item(column).type;
  371. }
  372. unsigned DataSourceMetaData::queryFieldFlags(unsigned column) const
  373. {
  374. return fields.item(column).flags;
  375. }
  376. const char * DataSourceMetaData::queryName(unsigned column) const
  377. {
  378. return fields.item(column).name;
  379. }
  380. const char * DataSourceMetaData::queryXPath(unsigned column) const
  381. {
  382. return fields.item(column).xpath;
  383. }
  384. const char * DataSourceMetaData::queryXmlTag(unsigned column) const
  385. {
  386. DataSourceMetaItem &item = fields.item(column);
  387. return (item.tagname.get()) ? item.tagname.get() : item.name.get();
  388. }
  389. const char *DataSourceMetaData::queryXmlTag() const
  390. {
  391. return (tagname.get()) ? tagname.get() : "Row";
  392. }
  393. void DataSourceMetaData::gatherNestedAttributes(DataSourceMetaItem &rec, aindex_t &idx)
  394. {
  395. aindex_t numItems = fields.ordinality();
  396. while (++idx < numItems)
  397. {
  398. DataSourceMetaItem &item = fields.item(idx);
  399. if (item.flags==FVFFendrecord)
  400. return;
  401. else if (item.flags==FVFFbeginrecord)
  402. gatherNestedAttributes((!item.tagname.get() || *item.tagname.get()) ? item : rec, idx);
  403. else if (item.isXmlAttribute())
  404. rec.nestedAttributes.append(idx);
  405. }
  406. }
  407. void DataSourceMetaData::gatherAttributes()
  408. {
  409. aindex_t numItems = fields.ordinality();
  410. for (aindex_t idx = 0; idx < numItems; ++idx)
  411. {
  412. DataSourceMetaItem &item = fields.item(idx);
  413. if (item.flags==FVFFbeginrecord)
  414. {
  415. if (!item.tagname.get() || *item.tagname.get())
  416. gatherNestedAttributes(item, idx);
  417. }
  418. else if (item.isXmlAttribute())
  419. attributes.append(idx);
  420. }
  421. }
  422. const IntArray &DataSourceMetaData::queryAttrList(unsigned column)
  423. {
  424. return fields.item(column).nestedAttributes;
  425. }
  426. const IntArray &DataSourceMetaData::queryAttrList()
  427. {
  428. return attributes;
  429. }
  430. IFvDataSourceMetaData * DataSourceMetaData::queryChildMeta(unsigned column) const
  431. {
  432. return fields.item(column).queryChildMeta();
  433. }
  434. IFvDataSource * DataSourceMetaData::createChildDataSource(unsigned column, unsigned len, const void * data)
  435. {
  436. DataSourceMetaData * childMeta = fields.item(column).queryChildMeta();
  437. if (childMeta)
  438. return new NestedDataSource(*childMeta, len, data);
  439. return NULL;
  440. }
  441. bool DataSourceMetaData::supportsRandomSeek() const
  442. {
  443. return randomIsOk;
  444. }
  445. void DataSourceMetaData::serialize(MemoryBuffer & buffer) const
  446. {
  447. //NB: Update NullDataSourceMeta if this changes....
  448. buffer.append(numFieldsToIgnore);
  449. buffer.append(randomIsOk);
  450. buffer.append(isStoredFixedWidth);
  451. buffer.append(storedFixedSize);
  452. buffer.append(keyedSize);
  453. buffer.append(maxRecordSize);
  454. unsigned numFields = fields.ordinality();
  455. buffer.append(numFields);
  456. for (unsigned idx=0; idx < numFields; idx++)
  457. {
  458. fields.item(idx).serialize(buffer);
  459. }
  460. }
  461. size32_t DataSourceMetaData::getRecordSize(const void *rec)
  462. {
  463. if (isStoredFixedWidth)
  464. return storedFixedSize;
  465. if (!rec)
  466. return maxRecordSize;
  467. const byte * data = (const byte *)rec;
  468. unsigned curOffset = 0;
  469. unsigned bitsRemaining = 0;
  470. unsigned max = fields.ordinality() - numVirtualFields;
  471. for (unsigned idx=0; idx < max; idx++)
  472. {
  473. ITypeInfo & type = *fields.item(idx).type;
  474. unsigned size = type.getSize();
  475. if (size == UNKNOWN_LENGTH)
  476. {
  477. const byte * cur = data + curOffset;
  478. switch (type.getTypeCode())
  479. {
  480. case type_data:
  481. case type_string:
  482. case type_table:
  483. case type_groupedtable:
  484. size = *((unsigned *)cur) + sizeof(unsigned);
  485. break;
  486. case type_set:
  487. size = *((unsigned *)(cur + sizeof(bool))) + sizeof(unsigned) + sizeof(bool);
  488. break;
  489. case type_qstring:
  490. size = rtlQStrSize(*((unsigned *)cur)) + sizeof(unsigned);
  491. break;
  492. case type_unicode:
  493. size = *((unsigned *)cur)*2 + sizeof(unsigned);
  494. break;
  495. case type_utf8:
  496. size = sizeof(unsigned) + rtlUtf8Size(*(unsigned *)cur, cur+sizeof(unsigned));
  497. break;
  498. case type_varstring:
  499. size = strlen((char *)cur)+1;
  500. break;
  501. case type_varunicode:
  502. size = (rtlUnicodeStrlen((UChar *)cur)+1)*2;
  503. break;
  504. case type_packedint:
  505. size = rtlGetPackedSize(cur);
  506. break;
  507. default:
  508. UNIMPLEMENTED;
  509. }
  510. }
  511. if (type.getTypeCode() == type_bitfield)
  512. {
  513. unsigned thisBits = type.getBitSize();
  514. if (thisBits > bitsRemaining)
  515. {
  516. size = type.queryChildType()->getSize();
  517. bitsRemaining = size * 8;
  518. }
  519. else
  520. size = 0;
  521. bitsRemaining -= thisBits;
  522. }
  523. else
  524. bitsRemaining = 0;
  525. curOffset += size;
  526. }
  527. return curOffset;
  528. }
  529. size32_t DataSourceMetaData::getFixedSize() const
  530. {
  531. if (isStoredFixedWidth)
  532. return storedFixedSize;
  533. return 0;
  534. }
  535. IFvDataSourceMetaData * deserializeDataSourceMeta(MemoryBuffer & in)
  536. {
  537. return new DataSourceMetaData(in);
  538. }
  539. //---------------------------------------------------------------------------
  540. RowBlock::RowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset)
  541. {
  542. buffer.swapWith(_buffer);
  543. start = _start;
  544. startOffset = _startOffset;
  545. numRows = 0;
  546. }
  547. RowBlock::RowBlock(__int64 _start, __int64 _startOffset)
  548. {
  549. start = _start;
  550. startOffset = _startOffset;
  551. numRows = 0;
  552. }
  553. void RowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  554. {
  555. row = getNextRow();
  556. offset = startOffset + buffer.length();
  557. }
  558. FixedRowBlock::FixedRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, size32_t _fixedRecordSize) : RowBlock(_buffer, _start, _startOffset)
  559. {
  560. if (_fixedRecordSize == 0) _fixedRecordSize = 1;
  561. fixedRecordSize = _fixedRecordSize;
  562. numRows = buffer.length() / fixedRecordSize;
  563. }
  564. const void * FixedRowBlock::fetchRow(__int64 offset, size32_t & len)
  565. {
  566. __int64 maxOffset = startOffset + buffer.length();
  567. if (offset < startOffset || offset >= maxOffset)
  568. return NULL;
  569. len = fixedRecordSize;
  570. return buffer.toByteArray() + (offset - startOffset);
  571. }
  572. const void * FixedRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  573. {
  574. if (row < start || row >= start + numRows)
  575. return NULL;
  576. unsigned index = (unsigned)(row - start);
  577. unsigned blockOffset = index * fixedRecordSize;
  578. len = fixedRecordSize;
  579. rowOffset = startOffset + blockOffset;
  580. return buffer.toByteArray() + blockOffset;
  581. }
  582. VariableRowBlock::VariableRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, IRecordSizeEx * recordSize, bool isLast) : RowBlock(_buffer, _start, _startOffset)
  583. {
  584. const char * buff = buffer.toByteArray();
  585. unsigned cur = 0;
  586. unsigned max = buffer.length();
  587. unsigned maxCur = max;
  588. if (!isLast)
  589. maxCur -= recordSize->getRecordSize(NULL);
  590. while (cur < maxCur)
  591. {
  592. rowIndex.append(cur);
  593. cur += recordSize->getRecordSize(max-cur, buff + cur);
  594. }
  595. buffer.setLength(cur);
  596. rowIndex.append(cur);
  597. numRows = rowIndex.ordinality()-1;
  598. }
  599. VariableRowBlock::VariableRowBlock(MemoryBuffer & inBuffer, __int64 _start) : RowBlock(_start, 0)
  600. {
  601. inBuffer.read(numRows);
  602. for (unsigned row = 0; row < numRows; row++)
  603. {
  604. unsigned thisLength;
  605. rowIndex.append(buffer.length());
  606. inBuffer.read(thisLength);
  607. buffer.append(thisLength, inBuffer.readDirect(thisLength));
  608. }
  609. rowIndex.append(buffer.length());
  610. }
  611. const void * VariableRowBlock::fetchRow(__int64 offset, size32_t & len)
  612. {
  613. __int64 maxOffset = startOffset + buffer.length();
  614. if (offset < startOffset || offset >= maxOffset)
  615. return NULL;
  616. size32_t rowOffset = (size32_t)(offset - startOffset);
  617. unsigned pos = rowIndex.find(rowOffset);
  618. if (pos == NotFound)
  619. return NULL;
  620. len = rowIndex.item(pos+1)-rowOffset;
  621. return buffer.toByteArray() + rowOffset;
  622. }
  623. const void * VariableRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  624. {
  625. if (row < start || row >= start + numRows)
  626. return NULL;
  627. unsigned index = (unsigned)(row - start);
  628. unsigned blockOffset = rowIndex.item(index);
  629. len = rowIndex.item(index+1) - blockOffset;
  630. rowOffset = startOffset + blockOffset;
  631. return buffer.toByteArray() + blockOffset;
  632. }
  633. //---------------------------------------------------------------------------
  634. offset_t calculateNextOffset(const char * data, unsigned len)
  635. {
  636. offset_t offset = *(offset_t *)(data + len - sizeof(offset_t) - sizeof(unsigned short));
  637. return offset + *(unsigned short*)(data + len - sizeof(unsigned short));
  638. }
  639. void FilePosFixedRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  640. {
  641. row = start + numRows;
  642. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  643. }
  644. void FilePosVariableRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  645. {
  646. row = start + numRows;
  647. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  648. }
  649. //---------------------------------------------------------------------------
  650. static int rowCacheId;
  651. void RowCache::addRowsOwn(RowBlock * rows)
  652. {
  653. if (allRows.ordinality() == MaxBlocksCached)
  654. makeRoom();
  655. unsigned newPos = getInsertPosition(rows->getStartRow());
  656. allRows.add(*rows, newPos);
  657. ages.add(++rowCacheId, newPos);
  658. }
  659. bool RowCache::getCacheRow(__int64 row, RowLocation & location)
  660. {
  661. unsigned numRows = allRows.ordinality();
  662. if (numRows == 0)
  663. return false;
  664. const RowBlock & first = allRows.item(0);
  665. if (row < first.getStartRow())
  666. {
  667. location.bestRow = 0;
  668. location.bestOffset = 0;
  669. return false;
  670. }
  671. unsigned best = getBestRow(row);
  672. ages.replace(++rowCacheId, best);
  673. RowBlock & cur = allRows.item(best);
  674. location.matchRow = cur.getRow(row, location.matchLength, location.bestOffset);
  675. if (location.matchRow)
  676. return true;
  677. if (location.bestRow < cur.getNextRow())
  678. {
  679. cur.getNextStoredOffset(location.bestRow, location.bestOffset);
  680. }
  681. return false;
  682. }
  683. //Find the rowBlock that contains the expected row.
  684. //Return the *previous* RowBlock if no match is found.
  685. unsigned RowCache::getBestRow(__int64 row)
  686. {
  687. unsigned max = allRows.ordinality();
  688. int start = 0;
  689. int end = max;
  690. while (end - start > 1)
  691. {
  692. int mid = (start + end) >> 1;
  693. RowBlock & cur = allRows.item(mid);
  694. if (row >= cur.getStartRow())
  695. start = mid;
  696. else
  697. end = mid;
  698. }
  699. assertex(row >= allRows.item(start).getStartRow());
  700. assertex(start != max);
  701. return start;
  702. }
  703. unsigned RowCache::getInsertPosition(__int64 row)
  704. {
  705. unsigned max = allRows.ordinality();
  706. int start = 0;
  707. int end = max;
  708. while (end - start > 1)
  709. {
  710. int mid = (start + end) >> 1;
  711. RowBlock & cur = allRows.item(mid);
  712. if (row >= cur.getStartRow())
  713. start = mid;
  714. else
  715. end = mid;
  716. }
  717. if (start != max)
  718. {
  719. if (row >= allRows.item(start).getStartRow())
  720. start++;
  721. }
  722. assertex(start == max || (row < allRows.item(start).getStartRow()));
  723. return start;
  724. }
  725. void RowCache::makeRoom()
  726. {
  727. #if 0
  728. //For testing the caching by throwing away random blocks
  729. while (allRows.ordinality() > MinBlocksCached)
  730. {
  731. unsigned index = getRandom() % allRows.ordinality();
  732. allRows.remove(index);
  733. ages.remove(index);
  734. }
  735. return;
  736. #endif
  737. unsigned numToFree = allRows.ordinality() - MinBlocksCached;
  738. RowBlock * * oldestRow = new RowBlock * [numToFree];
  739. __int64 * oldestAge = new __int64[numToFree];
  740. unsigned numGot = 0;
  741. ForEachItemIn(idx, ages)
  742. {
  743. __int64 curAge = ages.item(idx);
  744. unsigned compare = numGot;
  745. while (compare != 0)
  746. {
  747. if (curAge >= oldestAge[compare-1])
  748. break;
  749. compare--;
  750. }
  751. if (compare < numToFree)
  752. {
  753. unsigned copySize = numGot - compare;
  754. if (numGot == numToFree)
  755. copySize--; //NB: Cannot go negative because compare < numToFree, numToFree == numGot => compare < numGot
  756. if (copySize)
  757. {
  758. memmove(oldestAge + compare + 1, oldestAge + compare, copySize * sizeof(*oldestAge));
  759. memmove(oldestRow + compare + 1, oldestRow + compare, copySize * sizeof(*oldestRow));
  760. }
  761. oldestAge[compare] = curAge;
  762. oldestRow[compare] = &allRows.item(idx);
  763. if (numGot != numToFree)
  764. numGot++;
  765. }
  766. }
  767. unsigned max = allRows.ordinality();
  768. unsigned i;
  769. for (i = 0; i < max; )
  770. {
  771. for (unsigned j = 0; j < numGot; j++)
  772. {
  773. if (oldestRow[j] == &allRows.item(i))
  774. {
  775. allRows.remove(i);
  776. ages.remove(i);
  777. max--;
  778. goto testNext;
  779. }
  780. }
  781. i++;
  782. testNext: ;
  783. }
  784. delete [] oldestRow;
  785. delete [] oldestAge;
  786. }
  787. //---------------------------------------------------------------------------
  788. FVDataSource::FVDataSource()
  789. {
  790. transformer = NULL;
  791. extraFieldsSize = 0;
  792. openCount = 0;
  793. appendFileposition = false;
  794. }
  795. FVDataSource::~FVDataSource()
  796. {
  797. //ensure all refs into the dll are cleared before it is unloaded
  798. returnedRecordSize.clear();
  799. }
  800. void FVDataSource::addFileposition()
  801. {
  802. appendFileposition = true;
  803. transformedMeta->addFileposition(); // This may modify the other metas as well!
  804. }
  805. void FVDataSource::copyRow(MemoryBuffer & out, const void * src, size32_t length)
  806. {
  807. if (transformer)
  808. {
  809. unsigned curLen = out.length();
  810. size32_t maxSize = transformedMeta->getMaxRecordSize();
  811. void * target = out.reserve(maxSize);
  812. //MORE: won't cope with dynamic maxlengths
  813. RtlStaticRowBuilder rowBuilder(target, maxSize);
  814. unsigned copied = transformer(rowBuilder, (const byte *)src);
  815. out.rewrite(curLen + copied);
  816. }
  817. else
  818. out.append(length, src);
  819. }
  820. bool FVDataSource::fetchRow(MemoryBuffer & out, __int64 offset)
  821. {
  822. if (!transformer)
  823. return fetchRawRow(out, offset);
  824. MemoryBuffer temp;
  825. if (fetchRowData(temp, offset))
  826. {
  827. copyRow(out, temp.toByteArray(), temp.length());
  828. if (appendFileposition)
  829. out.append(offset);
  830. return true;
  831. }
  832. return false;
  833. }
  834. bool FVDataSource::fetchRawRow(MemoryBuffer & out, __int64 offset)
  835. {
  836. if (fetchRowData(out, offset))
  837. {
  838. if (appendFileposition)
  839. out.append(offset);
  840. return true;
  841. }
  842. return false;
  843. }
  844. bool FVDataSource::getRow(MemoryBuffer & out, __int64 row)
  845. {
  846. size32_t length;
  847. const void * data;
  848. unsigned __int64 offset = 0;
  849. if (getRowData(row, length, data, offset))
  850. {
  851. copyRow(out, data, length);
  852. if (appendFileposition)
  853. out.append(offset);
  854. return true;
  855. }
  856. return false;
  857. }
  858. bool FVDataSource::getRawRow(MemoryBuffer & out, __int64 row)
  859. {
  860. size32_t length;
  861. const void * data;
  862. unsigned __int64 offset = 0;
  863. if (getRowData(row, length, data, offset))
  864. {
  865. out.append(length-extraFieldsSize, data);
  866. if (appendFileposition)
  867. out.append(offset);
  868. return true;
  869. }
  870. return false;
  871. }
  872. void FVDataSource::loadDll(const char * wuid)
  873. {
  874. //MORE: This code should be commoned up and available in the work unit code or something...
  875. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  876. Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid, false);
  877. //Plugins should already be loaded when they were registered with the ViewTransformerRegistry
  878. //Something like the following code could be used to check the plugin version...
  879. #if 0
  880. Owned<IConstWUPluginIterator> plugins = &wu->getPlugins();
  881. SafePluginMap * map = NULL;
  882. ForEach(*plugins)
  883. {
  884. IConstWUPlugin &thisplugin = plugins->query();
  885. SCMStringBuffer name, version;
  886. thisplugin.getPluginName(name);
  887. thisplugin.getPluginVersion(version);
  888. Owned<ILoadedDllEntry> loadedPlugin = map->getPluginDll(name.str(), version.str(), true);
  889. if (!loadedPlugin)
  890. break;
  891. }
  892. #endif
  893. Owned<IConstWUQuery> q = wu->getQuery();
  894. SCMStringBuffer dllname;
  895. q->getQueryDllName(dllname);
  896. // MORE - leaks....
  897. loadedDll.setown(queryDllServer().loadDll(dllname.str(), DllLocationAnywhere));
  898. }
  899. IFvDataSourceMetaData * FVDataSource::queryMetaData()
  900. {
  901. return transformedMeta;
  902. }
  903. bool FVDataSource::setReturnedInfoFromResult()
  904. {
  905. SCMStringBuffer s;
  906. wuResult->getResultEclSchema(s);
  907. returnedRecord.setown(parseQuery(s.str()));
  908. if (!returnedRecord)
  909. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  910. bool isKey = false;
  911. bool isGrouped = false; // this isn't strictly true...it could be true for an internal result, but no current flag to test
  912. returnedMeta.setown(new DataSourceMetaData(returnedRecord, 0, true, isGrouped, 0));
  913. transformedRecord.setown(getSimplifiedRecord(returnedRecord, isKey));
  914. if (!transformedRecord)
  915. {
  916. //No transformations needed, so don't need to loaded the dll containing the transform functions etc.
  917. transformedRecord.set(returnedRecord);
  918. returnedRecordSize.set(returnedMeta);
  919. }
  920. else
  921. {
  922. loadDll(wuid);
  923. s.clear();
  924. wuResult->getResultRecordSizeEntry(s);
  925. if (s.length())
  926. {
  927. typedef IRecordSize * (* recSizeFunc)();
  928. recSizeFunc func = (recSizeFunc)loadedDll->getEntry(s.str());
  929. Owned<IRecordSize> createdRecordSize = func();
  930. returnedRecordSize.setown(new RecordSizeToEx(createdRecordSize));
  931. }
  932. s.clear();
  933. wuResult->getResultTransformerEntry(s);
  934. if (s.length())
  935. transformer = (rowTransformFunction)loadedDll->getEntry(s.str());
  936. }
  937. transformedMeta.setown(new DataSourceMetaData(transformedRecord, 0, true, isGrouped, 0));
  938. return (returnedRecordSize != NULL);
  939. }
  940. //---------------------------------------------------------------------------
  941. __int64 PagedDataSource::numRows(bool force)
  942. {
  943. if (force && (totalRows == UNKNOWN_NUM_ROWS))
  944. {
  945. //MORE: Need to go and work it out - however painful...
  946. }
  947. return totalRows;
  948. }
  949. bool PagedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  950. {
  951. if ((row < 0) || ((unsigned __int64)row > totalRows))
  952. return false;
  953. RowLocation location;
  954. loop
  955. {
  956. if (cache.getCacheRow(row, location))
  957. {
  958. length = location.matchLength;
  959. data = location.matchRow;
  960. offset = location.bestOffset;
  961. return true;
  962. }
  963. improveLocation(row, location);
  964. if (!loadBlock(location.bestRow, location.bestOffset))
  965. return false;
  966. }
  967. }
  968. void PagedDataSource::improveLocation(__int64 row, RowLocation & location)
  969. {
  970. }
  971. //---------------------------------------------------------------------------
  972. NestedDataSource::NestedDataSource(DataSourceMetaData & meta, unsigned len, const void * data)
  973. {
  974. returnedMeta.set(&meta);
  975. returnedRecordSize.set(returnedMeta);
  976. transformedMeta.set(returnedMeta);
  977. totalSize = len;
  978. MemoryBuffer temp;
  979. temp.append(len, data);
  980. if (returnedMeta->isFixedSize())
  981. rows.setown(new FixedRowBlock(temp, 0, 0, returnedMeta->fixedSize()));
  982. else
  983. rows.setown(new VariableRowBlock(temp, 0, 0, returnedRecordSize, true));
  984. }
  985. bool NestedDataSource::init()
  986. {
  987. return true;
  988. }
  989. __int64 NestedDataSource::numRows(bool force)
  990. {
  991. return rows->getNextRow() - rows->getStartRow();
  992. }
  993. bool NestedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  994. {
  995. data = rows->getRow(row, length, offset);
  996. return (data != NULL);
  997. }
  998. //---------------------------------------------------------------------------
  999. NullDataSource::NullDataSource(IHqlExpression * _record, bool _isGrouped, unsigned _keyedSize)
  1000. : meta(_record, 0, true, _isGrouped, _keyedSize)
  1001. {
  1002. }
  1003. FailureDataSource::FailureDataSource(IHqlExpression * _record, IException * _error, bool _isGrouped, unsigned _keyedSize)
  1004. : NullDataSource(_record, _isGrouped, _keyedSize), error(_error)
  1005. {
  1006. }
  1007. //---------------------------------------------------------------------------
  1008. IHqlExpression * parseQuery(const char * text)
  1009. {
  1010. MultiErrorReceiver errs;
  1011. OwnedHqlExpr ret = ::parseQuery(text, &errs);
  1012. if (errs.errCount() == 0)
  1013. return ret.getClear();
  1014. for (unsigned i=0; i < errs.errCount(); i++)
  1015. {
  1016. StringBuffer msg;
  1017. PrintLog("%d %s", errs.item(i)->getLine(), errs.item(i)->errorMessage(msg).str());
  1018. }
  1019. return NULL;
  1020. }
  1021. IFvDataSourceMetaData * createMetaData(IConstWUResult * wuResult)
  1022. {
  1023. SCMStringBuffer s;
  1024. wuResult->getResultEclSchema(s);
  1025. OwnedHqlExpr record = parseQuery(s.str());
  1026. if (!record)
  1027. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  1028. OwnedHqlExpr simplifiedRecord = getSimplifiedRecord(record, false);
  1029. bool isGrouped = false; // more not sure this is strictly true...
  1030. if (!simplifiedRecord)
  1031. return new DataSourceMetaData(record, 0, true, isGrouped, 0);
  1032. return new DataSourceMetaData(simplifiedRecord, 0, true, isGrouped, 0);
  1033. }