fvsource.cpp 34 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "platform.h"
  15. #include "jliball.hpp"
  16. #include "eclrtl.hpp"
  17. #include "rtlds_imp.hpp"
  18. #include "fvresultset.ipp"
  19. #include "fileview.hpp"
  20. #include "fvsource.ipp"
  21. #include "hqlerror.hpp"
  22. #include "eclhelper.hpp"
  23. #include "hqlattr.hpp"
  24. #include "hqlutil.hpp"
  25. #include "fvdatasource.hpp"
  26. #define MAX_RECORD_SIZE 4096
  27. inline void appendNextXpathName(StringBuffer &s, const char *&xpath)
  28. {
  29. while (*xpath && !strchr("*[/", *xpath))
  30. s.append(*xpath++);
  31. xpath = strchr(xpath, '/');
  32. }
  33. void splitXmlTagNamesFromXPath(const char *xpath, StringAttr &inner, StringAttr *outer=NULL)
  34. {
  35. if (!xpath || !xpath)
  36. return;
  37. StringBuffer s1;
  38. StringBuffer s2;
  39. appendNextXpathName(s1, xpath);
  40. if (outer && xpath)
  41. appendNextXpathName(s2, ++xpath);
  42. if (xpath) //xpath too deep
  43. return;
  44. if (!s2.length())
  45. inner.set(s1.str());
  46. else
  47. {
  48. inner.set(s2.str());
  49. outer->set(s1.str());
  50. }
  51. if (!inner.get())
  52. inner.set("");
  53. if (outer && !outer->get())
  54. outer->set("");
  55. }
  56. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, const char * _name, const char * _xpath, ITypeInfo * _type)
  57. {
  58. flags = _flags;
  59. name.set(_name);
  60. type.set(_type);
  61. xpath.set(_xpath);
  62. splitXmlTagNamesFromXPath(_xpath, tagname);
  63. }
  64. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, MemoryBuffer & in)
  65. {
  66. flags = _flags;
  67. in.read(name);
  68. in.read(xpath);
  69. type.setown(deserializeType(in));
  70. splitXmlTagNamesFromXPath(xpath.get(), tagname);
  71. }
  72. void DataSourceMetaItem::serialize(MemoryBuffer & out) const
  73. {
  74. out.append(flags);
  75. out.append(name);
  76. out.append(xpath);
  77. type->serialize(out);
  78. }
  79. //---------------------------------------------------------------------------
  80. DataSourceDatasetItem::DataSourceDatasetItem(const char * _name, const char * _xpath, IHqlExpression * expr) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(expr->queryRecord(), 0, true, false, false)
  81. {
  82. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  83. name.set(_name);
  84. xpath.set(_xpath);
  85. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  86. if (!record.tagname.length())
  87. record.tagname.set("Row");
  88. }
  89. DataSourceDatasetItem::DataSourceDatasetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(in)
  90. {
  91. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  92. in.read(name);
  93. }
  94. void DataSourceDatasetItem::serialize(MemoryBuffer & out) const
  95. {
  96. out.append(flags);
  97. record.serialize(out);
  98. out.append(name);
  99. }
  100. //---------------------------------------------------------------------------
  101. DataSourceSetItem::DataSourceSetItem(const char * _name, const char * _xpath, ITypeInfo * _type) : DataSourceMetaItem(FVFFset, _name, NULL, _type)
  102. {
  103. createChild();
  104. xpath.set(_xpath);
  105. StringBuffer attr;
  106. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  107. if (!record.tagname.length())
  108. record.tagname.set("Item");
  109. }
  110. DataSourceSetItem::DataSourceSetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(flags, in)
  111. {
  112. createChild();
  113. splitXmlTagNamesFromXPath(xpath.get(), record.tagname, &tagname);
  114. if (!record.tagname.length())
  115. record.tagname.set("Item");
  116. }
  117. void DataSourceSetItem::createChild()
  118. {
  119. ITypeInfo * childType = type->queryChildType()->queryPromotedType();
  120. record.addSimpleField("Item", NULL, childType);
  121. }
  122. void DataSourceSetItem::serialize(MemoryBuffer & out) const
  123. {
  124. out.append(flags);
  125. record.serialize(out);
  126. out.append(name);
  127. }
  128. //---------------------------------------------------------------------------
  129. DataSourceMetaData::DataSourceMetaData(IHqlExpression * _record, byte _numFieldsToIgnore, bool _randomIsOk, bool _isGrouped, unsigned _keyedSize)
  130. {
  131. init();
  132. numFieldsToIgnore = _numFieldsToIgnore;
  133. randomIsOk = _randomIsOk;
  134. isStoredFixedWidth = true;
  135. //MORE: Blobs aren't handled correctly in indexes....
  136. maxRecordSize = ::getMaxRecordSize(_record, MAX_RECORD_SIZE);
  137. keyedSize = _keyedSize;
  138. gatherFields(_record, false);
  139. if (_isGrouped)
  140. {
  141. Owned<ITypeInfo> type = makeBoolType();
  142. addSimpleField("__groupfollows__", NULL, type);
  143. maxRecordSize++;
  144. }
  145. gatherAttributes();
  146. if (isStoredFixedWidth)
  147. assertex(storedFixedSize == maxRecordSize);
  148. }
  149. DataSourceMetaData::DataSourceMetaData()
  150. {
  151. init();
  152. randomIsOk = true;
  153. isStoredFixedWidth = true;
  154. }
  155. DataSourceMetaData::DataSourceMetaData(type_t typeCode)
  156. {
  157. init();
  158. OwnedITypeInfo type;
  159. if (typeCode == type_unicode)
  160. type.setown(makeUnicodeType(UNKNOWN_LENGTH, 0));
  161. else
  162. type.setown(makeStringType(UNKNOWN_LENGTH, NULL, NULL));
  163. fields.append(*new DataSourceMetaItem(FVFFnone, "line", NULL, type));
  164. }
  165. void DataSourceMetaData::init()
  166. {
  167. keyedSize = 0;
  168. storedFixedSize = 0;
  169. maxRecordSize = 0;
  170. bitsRemaining = 0;
  171. numVirtualFields = 0;
  172. isStoredFixedWidth = false;
  173. randomIsOk = false;
  174. numFieldsToIgnore = 0;
  175. }
  176. DataSourceMetaData::DataSourceMetaData(MemoryBuffer & buffer)
  177. {
  178. numVirtualFields = 0;
  179. buffer.read(numFieldsToIgnore);
  180. buffer.read(randomIsOk);
  181. buffer.read(isStoredFixedWidth);
  182. buffer.read(storedFixedSize);
  183. buffer.read(keyedSize);
  184. buffer.read(maxRecordSize);
  185. unsigned numFields;
  186. buffer.read(numFields);
  187. for (unsigned idx=0; idx < numFields; idx++)
  188. {
  189. byte flags;
  190. buffer.read(flags);
  191. if (flags == FVFFdataset)
  192. fields.append(*new DataSourceDatasetItem(flags, buffer));
  193. else if (flags == FVFFset)
  194. fields.append(*new DataSourceSetItem(flags, buffer));
  195. else
  196. fields.append(*new DataSourceMetaItem(flags, buffer));
  197. if (flags == FVFFvirtual)
  198. ++numVirtualFields;
  199. }
  200. gatherAttributes();
  201. }
  202. void DataSourceMetaData::addFileposition()
  203. {
  204. addVirtualField("__fileposition__", NULL, makeIntType(8, false));
  205. }
  206. void DataSourceMetaData::addSimpleField(const char * name, const char * xpath, ITypeInfo * type)
  207. {
  208. ITypeInfo * promoted = type->queryPromotedType();
  209. unsigned size = promoted->getSize();
  210. unsigned thisBits = 0;
  211. if (size == UNKNOWN_LENGTH)
  212. isStoredFixedWidth = false;
  213. else if (type->getTypeCode() == type_bitfield)
  214. {
  215. thisBits = type->getBitSize();
  216. if (thisBits > bitsRemaining)
  217. {
  218. size = type->queryChildType()->getSize();
  219. storedFixedSize += size;
  220. bitsRemaining = size * 8;
  221. }
  222. bitsRemaining -= thisBits;
  223. }
  224. else
  225. storedFixedSize += size;
  226. if (thisBits == 0)
  227. bitsRemaining = 0;
  228. fields.append(*new DataSourceMetaItem(FVFFnone, name, xpath, type));
  229. }
  230. void DataSourceMetaData::addVirtualField(const char * name, const char * xpath, ITypeInfo * type)
  231. {
  232. fields.append(*new DataSourceMetaItem(FVFFvirtual, name, xpath, type));
  233. ++numVirtualFields;
  234. }
  235. void DataSourceMetaData::extractKeyedInfo(UnsignedArray & offsets, TypeInfoArray & types)
  236. {
  237. unsigned curOffset = 0;
  238. ForEachItemIn(i, fields)
  239. {
  240. if (curOffset >= keyedSize)
  241. break;
  242. DataSourceMetaItem & cur = fields.item(i);
  243. switch (cur.flags)
  244. {
  245. case FVFFnone:
  246. {
  247. offsets.append(curOffset);
  248. types.append(*LINK(cur.type));
  249. unsigned size = cur.type->getSize();
  250. assertex(size != UNKNOWN_LENGTH);
  251. curOffset += size;
  252. break;
  253. }
  254. case FVFFbeginrecord:
  255. case FVFFendrecord:
  256. break;
  257. default:
  258. throwUnexpected();
  259. }
  260. }
  261. offsets.append(curOffset);
  262. assertex(curOffset == keyedSize);
  263. }
  264. //MORE: Really this should create no_selects for the sub records, but pass on that for the moment.
  265. void DataSourceMetaData::gatherFields(IHqlExpression * expr, bool isConditional)
  266. {
  267. switch (expr->getOperator())
  268. {
  269. case no_record:
  270. gatherChildFields(expr, isConditional);
  271. break;
  272. case no_ifblock:
  273. {
  274. OwnedITypeInfo boolType = makeBoolType();
  275. OwnedITypeInfo voidType = makeVoidType();
  276. isStoredFixedWidth = false;
  277. fields.append(*new DataSourceMetaItem(FVFFbeginif, NULL, NULL, boolType));
  278. gatherChildFields(expr->queryChild(1), true);
  279. fields.append(*new DataSourceMetaItem(FVFFendif, NULL, NULL, voidType));
  280. break;
  281. }
  282. case no_field:
  283. {
  284. if (expr->hasProperty(__ifblockAtom))
  285. break;
  286. Linked<ITypeInfo> type = expr->queryType();
  287. IAtom * name = expr->queryName();
  288. IHqlExpression * nameAttr = expr->queryProperty(namedAtom);
  289. StringBuffer outname;
  290. if (nameAttr && nameAttr->queryChild(0)->queryValue())
  291. nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
  292. else
  293. outname.append(name).toLowerCase();
  294. StringBuffer xpathtext;
  295. const char * xpath = NULL;
  296. IHqlExpression * xpathAttr = expr->queryProperty(xpathAtom);
  297. if (xpathAttr && xpathAttr->queryChild(0)->queryValue())
  298. xpath = xpathAttr->queryChild(0)->queryValue()->getStringValue(xpathtext);
  299. if (isKey() && expr->hasProperty(blobAtom))
  300. type.setown(makeIntType(8, false));
  301. type_t tc = type->getTypeCode();
  302. if (tc == type_row)
  303. {
  304. OwnedITypeInfo voidType = makeVoidType();
  305. fields.append(*new DataSourceMetaItem(FVFFbeginrecord, outname, xpath, voidType));
  306. gatherChildFields(expr->queryRecord(), isConditional);
  307. fields.append(*new DataSourceMetaItem(FVFFendrecord, outname, xpath, voidType));
  308. }
  309. else if ((tc == type_dictionary))
  310. {
  311. UNIMPLEMENTED;
  312. }
  313. else if ((tc == type_table) || (tc == type_groupedtable))
  314. {
  315. isStoredFixedWidth = false;
  316. fields.append(*new DataSourceDatasetItem(outname, xpath, expr));
  317. }
  318. else if (tc == type_set)
  319. {
  320. isStoredFixedWidth = false;
  321. fields.append(*new DataSourceSetItem(outname, xpath, type));
  322. }
  323. else
  324. {
  325. if (type->getTypeCode() == type_alien)
  326. {
  327. IHqlAlienTypeInfo * alien = queryAlienType(type);
  328. type.set(alien->queryPhysicalType());
  329. }
  330. addSimpleField(outname, xpath, type);
  331. }
  332. break;
  333. }
  334. }
  335. }
  336. void DataSourceMetaData::gatherChildFields(IHqlExpression * expr, bool isConditional)
  337. {
  338. bitsRemaining = 0;
  339. ForEachChild(idx, expr)
  340. gatherFields(expr->queryChild(idx), isConditional);
  341. }
  342. unsigned DataSourceMetaData::numKeyedColumns() const
  343. {
  344. unsigned count = 0;
  345. unsigned curOffset = 0;
  346. ForEachItemIn(i, fields)
  347. {
  348. if (curOffset >= keyedSize)
  349. break;
  350. DataSourceMetaItem & cur = fields.item(i);
  351. switch (cur.flags)
  352. {
  353. case FVFFnone:
  354. {
  355. unsigned size = cur.type->getSize();
  356. assertex(size != UNKNOWN_LENGTH);
  357. curOffset += size;
  358. count++;
  359. break;
  360. }
  361. default:
  362. throwUnexpected();
  363. }
  364. }
  365. return count;
  366. assertex(curOffset == keyedSize);
  367. }
  368. unsigned DataSourceMetaData::numColumns() const
  369. {
  370. return fields.ordinality() - numFieldsToIgnore;
  371. }
  372. ITypeInfo * DataSourceMetaData::queryType(unsigned column) const
  373. {
  374. return fields.item(column).type;
  375. }
  376. unsigned DataSourceMetaData::queryFieldFlags(unsigned column) const
  377. {
  378. return fields.item(column).flags;
  379. }
  380. const char * DataSourceMetaData::queryName(unsigned column) const
  381. {
  382. return fields.item(column).name;
  383. }
  384. const char * DataSourceMetaData::queryXPath(unsigned column) const
  385. {
  386. return fields.item(column).xpath;
  387. }
  388. const char * DataSourceMetaData::queryXmlTag(unsigned column) const
  389. {
  390. DataSourceMetaItem &item = fields.item(column);
  391. return (item.tagname.get()) ? item.tagname.get() : item.name.get();
  392. }
  393. const char *DataSourceMetaData::queryXmlTag() const
  394. {
  395. return (tagname.get()) ? tagname.get() : "Row";
  396. }
  397. void DataSourceMetaData::gatherNestedAttributes(DataSourceMetaItem &rec, aindex_t &idx)
  398. {
  399. aindex_t numItems = fields.ordinality();
  400. while (++idx < numItems)
  401. {
  402. DataSourceMetaItem &item = fields.item(idx);
  403. if (item.flags==FVFFendrecord)
  404. return;
  405. else if (item.flags==FVFFbeginrecord)
  406. gatherNestedAttributes((!item.tagname.get() || *item.tagname.get()) ? item : rec, idx);
  407. else if (item.isXmlAttribute())
  408. rec.nestedAttributes.append(idx);
  409. }
  410. }
  411. void DataSourceMetaData::gatherAttributes()
  412. {
  413. aindex_t numItems = fields.ordinality();
  414. for (aindex_t idx = 0; idx < numItems; ++idx)
  415. {
  416. DataSourceMetaItem &item = fields.item(idx);
  417. if (item.flags==FVFFbeginrecord)
  418. {
  419. if (!item.tagname.get() || *item.tagname.get())
  420. gatherNestedAttributes(item, idx);
  421. }
  422. else if (item.isXmlAttribute())
  423. attributes.append(idx);
  424. }
  425. }
  426. const IntArray &DataSourceMetaData::queryAttrList(unsigned column) const
  427. {
  428. return fields.item(column).nestedAttributes;
  429. }
  430. const IntArray &DataSourceMetaData::queryAttrList() const
  431. {
  432. return attributes;
  433. }
  434. IFvDataSourceMetaData * DataSourceMetaData::queryChildMeta(unsigned column) const
  435. {
  436. return fields.item(column).queryChildMeta();
  437. }
  438. IFvDataSource * DataSourceMetaData::createChildDataSource(unsigned column, unsigned len, const void * data)
  439. {
  440. DataSourceMetaData * childMeta = fields.item(column).queryChildMeta();
  441. if (childMeta)
  442. return new NestedDataSource(*childMeta, len, data);
  443. return NULL;
  444. }
  445. bool DataSourceMetaData::supportsRandomSeek() const
  446. {
  447. return randomIsOk;
  448. }
  449. void DataSourceMetaData::serialize(MemoryBuffer & buffer) const
  450. {
  451. //NB: Update NullDataSourceMeta if this changes....
  452. buffer.append(numFieldsToIgnore);
  453. buffer.append(randomIsOk);
  454. buffer.append(isStoredFixedWidth);
  455. buffer.append(storedFixedSize);
  456. buffer.append(keyedSize);
  457. buffer.append(maxRecordSize);
  458. unsigned numFields = fields.ordinality();
  459. buffer.append(numFields);
  460. for (unsigned idx=0; idx < numFields; idx++)
  461. {
  462. fields.item(idx).serialize(buffer);
  463. }
  464. }
  465. size32_t DataSourceMetaData::getRecordSize(const void *rec)
  466. {
  467. if (isStoredFixedWidth)
  468. return storedFixedSize;
  469. if (!rec)
  470. return maxRecordSize;
  471. const byte * data = (const byte *)rec;
  472. unsigned curOffset = 0;
  473. unsigned bitsRemaining = 0;
  474. unsigned max = fields.ordinality() - numVirtualFields;
  475. for (unsigned idx=0; idx < max; idx++)
  476. {
  477. ITypeInfo & type = *fields.item(idx).type;
  478. unsigned size = type.getSize();
  479. if (size == UNKNOWN_LENGTH)
  480. {
  481. const byte * cur = data + curOffset;
  482. switch (type.getTypeCode())
  483. {
  484. case type_data:
  485. case type_string:
  486. case type_table:
  487. case type_groupedtable:
  488. size = *((unsigned *)cur) + sizeof(unsigned);
  489. break;
  490. case type_set:
  491. size = *((unsigned *)(cur + sizeof(bool))) + sizeof(unsigned) + sizeof(bool);
  492. break;
  493. case type_qstring:
  494. size = rtlQStrSize(*((unsigned *)cur)) + sizeof(unsigned);
  495. break;
  496. case type_unicode:
  497. size = *((unsigned *)cur)*2 + sizeof(unsigned);
  498. break;
  499. case type_utf8:
  500. size = sizeof(unsigned) + rtlUtf8Size(*(unsigned *)cur, cur+sizeof(unsigned));
  501. break;
  502. case type_varstring:
  503. size = strlen((char *)cur)+1;
  504. break;
  505. case type_varunicode:
  506. size = (rtlUnicodeStrlen((UChar *)cur)+1)*2;
  507. break;
  508. case type_packedint:
  509. size = rtlGetPackedSize(cur);
  510. break;
  511. default:
  512. UNIMPLEMENTED;
  513. }
  514. }
  515. if (type.getTypeCode() == type_bitfield)
  516. {
  517. unsigned thisBits = type.getBitSize();
  518. if (thisBits > bitsRemaining)
  519. {
  520. size = type.queryChildType()->getSize();
  521. bitsRemaining = size * 8;
  522. }
  523. else
  524. size = 0;
  525. bitsRemaining -= thisBits;
  526. }
  527. else
  528. bitsRemaining = 0;
  529. curOffset += size;
  530. }
  531. return curOffset;
  532. }
  533. size32_t DataSourceMetaData::getFixedSize() const
  534. {
  535. if (isStoredFixedWidth)
  536. return storedFixedSize;
  537. return 0;
  538. }
  539. IFvDataSourceMetaData * deserializeDataSourceMeta(MemoryBuffer & in)
  540. {
  541. return new DataSourceMetaData(in);
  542. }
  543. //---------------------------------------------------------------------------
  544. RowBlock::RowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset)
  545. {
  546. buffer.swapWith(_buffer);
  547. start = _start;
  548. startOffset = _startOffset;
  549. numRows = 0;
  550. }
  551. RowBlock::RowBlock(__int64 _start, __int64 _startOffset)
  552. {
  553. start = _start;
  554. startOffset = _startOffset;
  555. numRows = 0;
  556. }
  557. void RowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  558. {
  559. row = getNextRow();
  560. offset = startOffset + buffer.length();
  561. }
  562. FixedRowBlock::FixedRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, size32_t _fixedRecordSize) : RowBlock(_buffer, _start, _startOffset)
  563. {
  564. if (_fixedRecordSize == 0) _fixedRecordSize = 1;
  565. fixedRecordSize = _fixedRecordSize;
  566. numRows = buffer.length() / fixedRecordSize;
  567. }
  568. const void * FixedRowBlock::fetchRow(__int64 offset, size32_t & len)
  569. {
  570. __int64 maxOffset = startOffset + buffer.length();
  571. if (offset < startOffset || offset >= maxOffset)
  572. return NULL;
  573. len = fixedRecordSize;
  574. return buffer.toByteArray() + (offset - startOffset);
  575. }
  576. const void * FixedRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  577. {
  578. if (row < start || row >= start + numRows)
  579. return NULL;
  580. unsigned index = (unsigned)(row - start);
  581. unsigned blockOffset = index * fixedRecordSize;
  582. len = fixedRecordSize;
  583. rowOffset = startOffset + blockOffset;
  584. return buffer.toByteArray() + blockOffset;
  585. }
  586. VariableRowBlock::VariableRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, IRecordSizeEx * recordSize, bool isLast) : RowBlock(_buffer, _start, _startOffset)
  587. {
  588. const char * buff = buffer.toByteArray();
  589. unsigned cur = 0;
  590. unsigned max = buffer.length();
  591. unsigned maxCur = max;
  592. if (!isLast)
  593. maxCur -= recordSize->getRecordSize(NULL);
  594. while (cur < maxCur)
  595. {
  596. rowIndex.append(cur);
  597. cur += recordSize->getRecordSize(max-cur, buff + cur);
  598. }
  599. buffer.setLength(cur);
  600. rowIndex.append(cur);
  601. numRows = rowIndex.ordinality()-1;
  602. }
  603. VariableRowBlock::VariableRowBlock(MemoryBuffer & inBuffer, __int64 _start) : RowBlock(_start, 0)
  604. {
  605. inBuffer.read(numRows);
  606. for (unsigned row = 0; row < numRows; row++)
  607. {
  608. unsigned thisLength;
  609. rowIndex.append(buffer.length());
  610. inBuffer.read(thisLength);
  611. buffer.append(thisLength, inBuffer.readDirect(thisLength));
  612. }
  613. rowIndex.append(buffer.length());
  614. }
  615. const void * VariableRowBlock::fetchRow(__int64 offset, size32_t & len)
  616. {
  617. __int64 maxOffset = startOffset + buffer.length();
  618. if (offset < startOffset || offset >= maxOffset)
  619. return NULL;
  620. size32_t rowOffset = (size32_t)(offset - startOffset);
  621. unsigned pos = rowIndex.find(rowOffset);
  622. if (pos == NotFound)
  623. return NULL;
  624. len = rowIndex.item(pos+1)-rowOffset;
  625. return buffer.toByteArray() + rowOffset;
  626. }
  627. const void * VariableRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  628. {
  629. if (row < start || row >= start + numRows)
  630. return NULL;
  631. unsigned index = (unsigned)(row - start);
  632. unsigned blockOffset = rowIndex.item(index);
  633. len = rowIndex.item(index+1) - blockOffset;
  634. rowOffset = startOffset + blockOffset;
  635. return buffer.toByteArray() + blockOffset;
  636. }
  637. //---------------------------------------------------------------------------
  638. offset_t calculateNextOffset(const char * data, unsigned len)
  639. {
  640. offset_t offset = *(offset_t *)(data + len - sizeof(offset_t) - sizeof(unsigned short));
  641. return offset + *(unsigned short*)(data + len - sizeof(unsigned short));
  642. }
  643. void FilePosFixedRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  644. {
  645. row = start + numRows;
  646. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  647. }
  648. void FilePosVariableRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  649. {
  650. row = start + numRows;
  651. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  652. }
  653. //---------------------------------------------------------------------------
  654. static int rowCacheId;
  655. void RowCache::addRowsOwn(RowBlock * rows)
  656. {
  657. if (allRows.ordinality() == MaxBlocksCached)
  658. makeRoom();
  659. unsigned newPos = getInsertPosition(rows->getStartRow());
  660. allRows.add(*rows, newPos);
  661. ages.add(++rowCacheId, newPos);
  662. }
  663. bool RowCache::getCacheRow(__int64 row, RowLocation & location)
  664. {
  665. unsigned numRows = allRows.ordinality();
  666. if (numRows == 0)
  667. return false;
  668. const RowBlock & first = allRows.item(0);
  669. if (row < first.getStartRow())
  670. {
  671. location.bestRow = 0;
  672. location.bestOffset = 0;
  673. return false;
  674. }
  675. unsigned best = getBestRow(row);
  676. ages.replace(++rowCacheId, best);
  677. RowBlock & cur = allRows.item(best);
  678. location.matchRow = cur.getRow(row, location.matchLength, location.bestOffset);
  679. if (location.matchRow)
  680. return true;
  681. if (location.bestRow < cur.getNextRow())
  682. {
  683. cur.getNextStoredOffset(location.bestRow, location.bestOffset);
  684. }
  685. return false;
  686. }
  687. //Find the rowBlock that contains the expected row.
  688. //Return the *previous* RowBlock if no match is found.
  689. unsigned RowCache::getBestRow(__int64 row)
  690. {
  691. unsigned max = allRows.ordinality();
  692. int start = 0;
  693. int end = max;
  694. while (end - start > 1)
  695. {
  696. int mid = (start + end) >> 1;
  697. RowBlock & cur = allRows.item(mid);
  698. if (row >= cur.getStartRow())
  699. start = mid;
  700. else
  701. end = mid;
  702. }
  703. assertex(row >= allRows.item(start).getStartRow());
  704. assertex(start != max);
  705. return start;
  706. }
  707. unsigned RowCache::getInsertPosition(__int64 row)
  708. {
  709. unsigned max = allRows.ordinality();
  710. int start = 0;
  711. int end = max;
  712. while (end - start > 1)
  713. {
  714. int mid = (start + end) >> 1;
  715. RowBlock & cur = allRows.item(mid);
  716. if (row >= cur.getStartRow())
  717. start = mid;
  718. else
  719. end = mid;
  720. }
  721. if (start != max)
  722. {
  723. if (row >= allRows.item(start).getStartRow())
  724. start++;
  725. }
  726. assertex(start == max || (row < allRows.item(start).getStartRow()));
  727. return start;
  728. }
  729. void RowCache::makeRoom()
  730. {
  731. #if 0
  732. //For testing the caching by throwing away random blocks
  733. while (allRows.ordinality() > MinBlocksCached)
  734. {
  735. unsigned index = getRandom() % allRows.ordinality();
  736. allRows.remove(index);
  737. ages.remove(index);
  738. }
  739. return;
  740. #endif
  741. unsigned numToFree = allRows.ordinality() - MinBlocksCached;
  742. RowBlock * * oldestRow = new RowBlock * [numToFree];
  743. __int64 * oldestAge = new __int64[numToFree];
  744. unsigned numGot = 0;
  745. ForEachItemIn(idx, ages)
  746. {
  747. __int64 curAge = ages.item(idx);
  748. unsigned compare = numGot;
  749. while (compare != 0)
  750. {
  751. if (curAge >= oldestAge[compare-1])
  752. break;
  753. compare--;
  754. }
  755. if (compare < numToFree)
  756. {
  757. unsigned copySize = numGot - compare;
  758. if (numGot == numToFree)
  759. copySize--; //NB: Cannot go negative because compare < numToFree, numToFree == numGot => compare < numGot
  760. if (copySize)
  761. {
  762. memmove(oldestAge + compare + 1, oldestAge + compare, copySize * sizeof(*oldestAge));
  763. memmove(oldestRow + compare + 1, oldestRow + compare, copySize * sizeof(*oldestRow));
  764. }
  765. oldestAge[compare] = curAge;
  766. oldestRow[compare] = &allRows.item(idx);
  767. if (numGot != numToFree)
  768. numGot++;
  769. }
  770. }
  771. unsigned max = allRows.ordinality();
  772. unsigned i;
  773. for (i = 0; i < max; )
  774. {
  775. for (unsigned j = 0; j < numGot; j++)
  776. {
  777. if (oldestRow[j] == &allRows.item(i))
  778. {
  779. allRows.remove(i);
  780. ages.remove(i);
  781. max--;
  782. goto testNext;
  783. }
  784. }
  785. i++;
  786. testNext: ;
  787. }
  788. delete [] oldestRow;
  789. delete [] oldestAge;
  790. }
  791. //---------------------------------------------------------------------------
  792. FVDataSource::FVDataSource()
  793. {
  794. transformer = NULL;
  795. extraFieldsSize = 0;
  796. openCount = 0;
  797. appendFileposition = false;
  798. }
  799. FVDataSource::~FVDataSource()
  800. {
  801. //ensure all refs into the dll are cleared before it is unloaded
  802. returnedRecordSize.clear();
  803. }
  804. void FVDataSource::addFileposition()
  805. {
  806. appendFileposition = true;
  807. transformedMeta->addFileposition(); // This may modify the other metas as well!
  808. }
  809. void FVDataSource::copyRow(MemoryBuffer & out, const void * src, size32_t length)
  810. {
  811. if (transformer)
  812. {
  813. unsigned curLen = out.length();
  814. size32_t maxSize = transformedMeta->getMaxRecordSize();
  815. void * target = out.reserve(maxSize);
  816. //MORE: won't cope with dynamic maxlengths
  817. RtlStaticRowBuilder rowBuilder(target, maxSize);
  818. unsigned copied = transformer(rowBuilder, (const byte *)src);
  819. out.rewrite(curLen + copied);
  820. }
  821. else
  822. out.append(length, src);
  823. }
  824. bool FVDataSource::fetchRow(MemoryBuffer & out, __int64 offset)
  825. {
  826. if (!transformer)
  827. return fetchRawRow(out, offset);
  828. MemoryBuffer temp;
  829. if (fetchRowData(temp, offset))
  830. {
  831. copyRow(out, temp.toByteArray(), temp.length());
  832. if (appendFileposition)
  833. out.append(offset);
  834. return true;
  835. }
  836. return false;
  837. }
  838. bool FVDataSource::fetchRawRow(MemoryBuffer & out, __int64 offset)
  839. {
  840. if (fetchRowData(out, offset))
  841. {
  842. if (appendFileposition)
  843. out.append(offset);
  844. return true;
  845. }
  846. return false;
  847. }
  848. bool FVDataSource::getRow(MemoryBuffer & out, __int64 row)
  849. {
  850. size32_t length;
  851. const void * data;
  852. unsigned __int64 offset = 0;
  853. if (getRowData(row, length, data, offset))
  854. {
  855. copyRow(out, data, length);
  856. if (appendFileposition)
  857. out.append(offset);
  858. return true;
  859. }
  860. return false;
  861. }
  862. bool FVDataSource::getRawRow(MemoryBuffer & out, __int64 row)
  863. {
  864. size32_t length;
  865. const void * data;
  866. unsigned __int64 offset = 0;
  867. if (getRowData(row, length, data, offset))
  868. {
  869. out.append(length-extraFieldsSize, data);
  870. if (appendFileposition)
  871. out.append(offset);
  872. return true;
  873. }
  874. return false;
  875. }
  876. void FVDataSource::loadDll(const char * wuid)
  877. {
  878. //MORE: This code should be commoned up and available in the work unit code or something...
  879. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  880. Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid, false);
  881. //Plugins should already be loaded when they were registered with the ViewTransformerRegistry
  882. //Something like the following code could be used to check the plugin version...
  883. #if 0
  884. Owned<IConstWUPluginIterator> plugins = &wu->getPlugins();
  885. SafePluginMap * map = NULL;
  886. ForEach(*plugins)
  887. {
  888. IConstWUPlugin &thisplugin = plugins->query();
  889. SCMStringBuffer name, version;
  890. thisplugin.getPluginName(name);
  891. thisplugin.getPluginVersion(version);
  892. Owned<ILoadedDllEntry> loadedPlugin = map->getPluginDll(name.str(), version.str(), true);
  893. if (!loadedPlugin)
  894. break;
  895. }
  896. #endif
  897. Owned<IConstWUQuery> q = wu->getQuery();
  898. SCMStringBuffer dllname;
  899. q->getQueryDllName(dllname);
  900. // MORE - leaks....
  901. loadedDll.setown(queryDllServer().loadDll(dllname.str(), DllLocationAnywhere));
  902. }
  903. IFvDataSourceMetaData * FVDataSource::queryMetaData()
  904. {
  905. return transformedMeta;
  906. }
  907. bool FVDataSource::setReturnedInfoFromResult()
  908. {
  909. SCMStringBuffer s;
  910. wuResult->getResultEclSchema(s);
  911. returnedRecord.setown(parseQuery(s.str()));
  912. if (!returnedRecord)
  913. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  914. bool isKey = false;
  915. bool isGrouped = false; // this isn't strictly true...it could be true for an internal result, but no current flag to test
  916. returnedMeta.setown(new DataSourceMetaData(returnedRecord, 0, true, isGrouped, 0));
  917. transformedRecord.setown(getSimplifiedRecord(returnedRecord, isKey));
  918. if (!transformedRecord)
  919. {
  920. //No transformations needed, so don't need to loaded the dll containing the transform functions etc.
  921. transformedRecord.set(returnedRecord);
  922. returnedRecordSize.set(returnedMeta);
  923. }
  924. else
  925. {
  926. loadDll(wuid);
  927. s.clear();
  928. wuResult->getResultRecordSizeEntry(s);
  929. if (s.length())
  930. {
  931. typedef IRecordSize * (* recSizeFunc)();
  932. recSizeFunc func = (recSizeFunc)loadedDll->getEntry(s.str());
  933. Owned<IRecordSize> createdRecordSize = func();
  934. returnedRecordSize.setown(new RecordSizeToEx(createdRecordSize));
  935. }
  936. s.clear();
  937. wuResult->getResultTransformerEntry(s);
  938. if (s.length())
  939. transformer = (rowTransformFunction)loadedDll->getEntry(s.str());
  940. }
  941. transformedMeta.setown(new DataSourceMetaData(transformedRecord, 0, true, isGrouped, 0));
  942. return (returnedRecordSize != NULL);
  943. }
  944. //---------------------------------------------------------------------------
  945. __int64 PagedDataSource::numRows(bool force)
  946. {
  947. if (force && (totalRows == UNKNOWN_NUM_ROWS))
  948. {
  949. //MORE: Need to go and work it out - however painful...
  950. }
  951. return totalRows;
  952. }
  953. bool PagedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  954. {
  955. if ((row < 0) || ((unsigned __int64)row > totalRows))
  956. return false;
  957. RowLocation location;
  958. loop
  959. {
  960. if (cache.getCacheRow(row, location))
  961. {
  962. length = location.matchLength;
  963. data = location.matchRow;
  964. offset = location.bestOffset;
  965. return true;
  966. }
  967. improveLocation(row, location);
  968. if (!loadBlock(location.bestRow, location.bestOffset))
  969. return false;
  970. }
  971. }
  972. void PagedDataSource::improveLocation(__int64 row, RowLocation & location)
  973. {
  974. }
  975. //---------------------------------------------------------------------------
  976. NestedDataSource::NestedDataSource(DataSourceMetaData & meta, unsigned len, const void * data)
  977. {
  978. returnedMeta.set(&meta);
  979. returnedRecordSize.set(returnedMeta);
  980. transformedMeta.set(returnedMeta);
  981. totalSize = len;
  982. MemoryBuffer temp;
  983. temp.append(len, data);
  984. if (returnedMeta->isFixedSize())
  985. rows.setown(new FixedRowBlock(temp, 0, 0, returnedMeta->fixedSize()));
  986. else
  987. rows.setown(new VariableRowBlock(temp, 0, 0, returnedRecordSize, true));
  988. }
  989. bool NestedDataSource::init()
  990. {
  991. return true;
  992. }
  993. __int64 NestedDataSource::numRows(bool force)
  994. {
  995. return rows->getNextRow() - rows->getStartRow();
  996. }
  997. bool NestedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  998. {
  999. data = rows->getRow(row, length, offset);
  1000. return (data != NULL);
  1001. }
  1002. //---------------------------------------------------------------------------
  1003. NullDataSource::NullDataSource(IHqlExpression * _record, bool _isGrouped, unsigned _keyedSize)
  1004. : meta(_record, 0, true, _isGrouped, _keyedSize)
  1005. {
  1006. }
  1007. FailureDataSource::FailureDataSource(IHqlExpression * _record, IException * _error, bool _isGrouped, unsigned _keyedSize)
  1008. : NullDataSource(_record, _isGrouped, _keyedSize), error(_error)
  1009. {
  1010. }
  1011. //---------------------------------------------------------------------------
  1012. IHqlExpression * parseQuery(const char * text)
  1013. {
  1014. MultiErrorReceiver errs;
  1015. OwnedHqlExpr ret = ::parseQuery(text, &errs);
  1016. if (errs.errCount() == 0)
  1017. return ret.getClear();
  1018. for (unsigned i=0; i < errs.errCount(); i++)
  1019. {
  1020. StringBuffer msg;
  1021. PrintLog("%d %s", errs.item(i)->getLine(), errs.item(i)->errorMessage(msg).str());
  1022. }
  1023. return NULL;
  1024. }
  1025. IFvDataSourceMetaData * createMetaData(IConstWUResult * wuResult)
  1026. {
  1027. SCMStringBuffer s;
  1028. wuResult->getResultEclSchema(s);
  1029. OwnedHqlExpr record = parseQuery(s.str());
  1030. if (!record)
  1031. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  1032. OwnedHqlExpr simplifiedRecord = getSimplifiedRecord(record, false);
  1033. bool isGrouped = false; // more not sure this is strictly true...
  1034. if (!simplifiedRecord)
  1035. return new DataSourceMetaData(record, 0, true, isGrouped, 0);
  1036. return new DataSourceMetaData(simplifiedRecord, 0, true, isGrouped, 0);
  1037. }