fvsource.cpp 33 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "platform.h"
  15. #include "jliball.hpp"
  16. #include "eclrtl.hpp"
  17. #include "rtlds_imp.hpp"
  18. #include "fvresultset.ipp"
  19. #include "fileview.hpp"
  20. #include "fvsource.ipp"
  21. #include "hqlerror.hpp"
  22. #include "eclhelper.hpp"
  23. #include "hqlattr.hpp"
  24. #include "hqlutil.hpp"
  25. #include "fvdatasource.hpp"
  26. #define MAX_RECORD_SIZE 4096
  27. inline void appendNextXpathName(StringBuffer &s, const char *xpath, const char *&next)
  28. {
  29. while (*xpath && !strchr("*[/", *xpath))
  30. s.append(*xpath++);
  31. next = strchr(xpath, '/');
  32. }
  33. void splitXmlTagNamesFromXPath(const char *xpath, StringBuffer &inner, StringBuffer *outer=NULL)
  34. {
  35. if (!xpath || !xpath)
  36. return;
  37. StringBuffer s1;
  38. StringBuffer s2;
  39. const char *next=xpath;
  40. appendNextXpathName(s1, xpath, next);
  41. if (outer && next)
  42. appendNextXpathName(s2, ++next, next);
  43. if (next) //xpath too deep
  44. return;
  45. if (!s2.length())
  46. inner.swapWith(s1);
  47. else
  48. {
  49. inner.swapWith(s2);
  50. outer->swapWith(s1);
  51. }
  52. if (!inner.length())
  53. inner.set("/");
  54. if (outer && !outer->length())
  55. outer->set("/");
  56. }
  57. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, const char * _name, const char * _xpath, ITypeInfo * _type)
  58. {
  59. flags = _flags;
  60. name.set(_name);
  61. type.set(_type);
  62. xpath.set(_xpath);
  63. splitXmlTagNamesFromXPath(_xpath, tagname);
  64. }
  65. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, MemoryBuffer & in)
  66. {
  67. flags = _flags;
  68. in.read(name);
  69. in.read(xpath);
  70. type.setown(deserializeType(in));
  71. splitXmlTagNamesFromXPath(xpath.get(), tagname);
  72. }
  73. void DataSourceMetaItem::serialize(MemoryBuffer & out) const
  74. {
  75. out.append(flags);
  76. out.append(name);
  77. out.append(xpath);
  78. type->serialize(out);
  79. }
  80. //---------------------------------------------------------------------------
  81. DataSourceDatasetItem::DataSourceDatasetItem(const char * _name, const char * _xpath, IHqlExpression * expr) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(expr->queryRecord(), 0, true, false, false)
  82. {
  83. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  84. name.set(_name);
  85. xpath.set(_xpath);
  86. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  87. if (!record.tagname.length())
  88. record.tagname.set("Row");
  89. }
  90. DataSourceDatasetItem::DataSourceDatasetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(in)
  91. {
  92. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  93. in.read(name);
  94. }
  95. void DataSourceDatasetItem::serialize(MemoryBuffer & out) const
  96. {
  97. out.append(flags);
  98. record.serialize(out);
  99. out.append(name);
  100. }
  101. //---------------------------------------------------------------------------
  102. DataSourceSetItem::DataSourceSetItem(const char * _name, const char * _xpath, ITypeInfo * _type) : DataSourceMetaItem(FVFFset, _name, NULL, _type)
  103. {
  104. createChild();
  105. xpath.set(_xpath);
  106. StringBuffer attr;
  107. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  108. if (!record.tagname.length())
  109. record.tagname.set("Item");
  110. }
  111. DataSourceSetItem::DataSourceSetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(flags, in)
  112. {
  113. createChild();
  114. StringBuffer attr;
  115. splitXmlTagNamesFromXPath(xpath.get(), record.tagname, &tagname.clear());
  116. if (!record.tagname.length())
  117. record.tagname.set("Item");
  118. }
  119. void DataSourceSetItem::createChild()
  120. {
  121. ITypeInfo * childType = type->queryChildType()->queryPromotedType();
  122. record.addSimpleField("Item", NULL, childType);
  123. }
  124. void DataSourceSetItem::serialize(MemoryBuffer & out) const
  125. {
  126. out.append(flags);
  127. record.serialize(out);
  128. out.append(name);
  129. }
  130. //---------------------------------------------------------------------------
  131. DataSourceMetaData::DataSourceMetaData(IHqlExpression * _record, byte _numFieldsToIgnore, bool _randomIsOk, bool _isGrouped, unsigned _keyedSize)
  132. {
  133. init();
  134. numFieldsToIgnore = _numFieldsToIgnore;
  135. randomIsOk = _randomIsOk;
  136. isStoredFixedWidth = true;
  137. //MORE: Blobs aren't handled correctly in indexes....
  138. maxRecordSize = ::getMaxRecordSize(_record, MAX_RECORD_SIZE);
  139. keyedSize = _keyedSize;
  140. gatherFields(_record, false);
  141. if (_isGrouped)
  142. {
  143. Owned<ITypeInfo> type = makeBoolType();
  144. addSimpleField("__groupfollows__", NULL, type);
  145. maxRecordSize++;
  146. }
  147. if (isStoredFixedWidth)
  148. assertex(storedFixedSize == maxRecordSize);
  149. }
  150. DataSourceMetaData::DataSourceMetaData()
  151. {
  152. init();
  153. randomIsOk = true;
  154. isStoredFixedWidth = true;
  155. }
  156. DataSourceMetaData::DataSourceMetaData(type_t typeCode)
  157. {
  158. init();
  159. OwnedITypeInfo type;
  160. if (typeCode == type_unicode)
  161. type.setown(makeUnicodeType(UNKNOWN_LENGTH, 0));
  162. else
  163. type.setown(makeStringType(UNKNOWN_LENGTH, NULL, NULL));
  164. fields.append(*new DataSourceMetaItem(FVFFnone, "line", NULL, type));
  165. }
  166. void DataSourceMetaData::init()
  167. {
  168. keyedSize = 0;
  169. storedFixedSize = 0;
  170. maxRecordSize = 0;
  171. bitsRemaining = 0;
  172. numVirtualFields = 0;
  173. isStoredFixedWidth = false;
  174. randomIsOk = false;
  175. numFieldsToIgnore = 0;
  176. attrset = false;
  177. }
  178. DataSourceMetaData::DataSourceMetaData(MemoryBuffer & buffer)
  179. {
  180. attrset = false;
  181. numVirtualFields = 0;
  182. buffer.read(numFieldsToIgnore);
  183. buffer.read(randomIsOk);
  184. buffer.read(isStoredFixedWidth);
  185. buffer.read(storedFixedSize);
  186. buffer.read(keyedSize);
  187. buffer.read(maxRecordSize);
  188. unsigned numFields;
  189. buffer.read(numFields);
  190. for (unsigned idx=0; idx < numFields; idx++)
  191. {
  192. byte flags;
  193. buffer.read(flags);
  194. if (flags == FVFFdataset)
  195. fields.append(*new DataSourceDatasetItem(flags, buffer));
  196. else if (flags == FVFFset)
  197. fields.append(*new DataSourceSetItem(flags, buffer));
  198. else
  199. fields.append(*new DataSourceMetaItem(flags, buffer));
  200. if (flags == FVFFvirtual)
  201. ++numVirtualFields;
  202. }
  203. }
  204. void DataSourceMetaData::addFileposition()
  205. {
  206. addVirtualField("__fileposition__", NULL, makeIntType(8, false));
  207. }
  208. void DataSourceMetaData::addSimpleField(const char * name, const char * xpath, ITypeInfo * type)
  209. {
  210. ITypeInfo * promoted = type->queryPromotedType();
  211. unsigned size = promoted->getSize();
  212. unsigned thisBits = 0;
  213. if (size == UNKNOWN_LENGTH)
  214. isStoredFixedWidth = false;
  215. else if (type->getTypeCode() == type_bitfield)
  216. {
  217. thisBits = type->getBitSize();
  218. if (thisBits > bitsRemaining)
  219. {
  220. size = type->queryChildType()->getSize();
  221. storedFixedSize += size;
  222. bitsRemaining = size * 8;
  223. }
  224. bitsRemaining -= thisBits;
  225. }
  226. else
  227. storedFixedSize += size;
  228. if (thisBits == 0)
  229. bitsRemaining = 0;
  230. fields.append(*new DataSourceMetaItem(FVFFnone, name, xpath, type));
  231. }
  232. void DataSourceMetaData::addVirtualField(const char * name, const char * xpath, ITypeInfo * type)
  233. {
  234. fields.append(*new DataSourceMetaItem(FVFFvirtual, name, xpath, type));
  235. ++numVirtualFields;
  236. }
  237. void DataSourceMetaData::extractKeyedInfo(UnsignedArray & offsets, TypeInfoArray & types)
  238. {
  239. unsigned curOffset = 0;
  240. ForEachItemIn(i, fields)
  241. {
  242. if (curOffset >= keyedSize)
  243. break;
  244. DataSourceMetaItem & cur = fields.item(i);
  245. switch (cur.flags)
  246. {
  247. case FVFFnone:
  248. {
  249. offsets.append(curOffset);
  250. types.append(*LINK(cur.type));
  251. unsigned size = cur.type->getSize();
  252. assertex(size != UNKNOWN_LENGTH);
  253. curOffset += size;
  254. break;
  255. }
  256. case FVFFbeginrecord:
  257. case FVFFendrecord:
  258. break;
  259. default:
  260. throwUnexpected();
  261. }
  262. }
  263. offsets.append(curOffset);
  264. assertex(curOffset == keyedSize);
  265. }
  266. //MORE: Really this should create no_selects for the sub records, but pass on that for the moment.
  267. void DataSourceMetaData::gatherFields(IHqlExpression * expr, bool isConditional)
  268. {
  269. switch (expr->getOperator())
  270. {
  271. case no_record:
  272. gatherChildFields(expr, isConditional);
  273. break;
  274. case no_ifblock:
  275. {
  276. OwnedITypeInfo boolType = makeBoolType();
  277. OwnedITypeInfo voidType = makeVoidType();
  278. isStoredFixedWidth = false;
  279. fields.append(*new DataSourceMetaItem(FVFFbeginif, NULL, NULL, boolType));
  280. gatherChildFields(expr->queryChild(1), true);
  281. fields.append(*new DataSourceMetaItem(FVFFendif, NULL, NULL, voidType));
  282. break;
  283. }
  284. case no_field:
  285. {
  286. if (expr->hasProperty(__ifblockAtom))
  287. break;
  288. Linked<ITypeInfo> type = expr->queryType();
  289. IAtom * name = expr->queryName();
  290. IHqlExpression * nameAttr = expr->queryProperty(namedAtom);
  291. StringBuffer outname;
  292. if (nameAttr && nameAttr->queryChild(0)->queryValue())
  293. nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
  294. else
  295. outname.append(name).toLowerCase();
  296. StringBuffer xpathtext;
  297. const char * xpath = NULL;
  298. IHqlExpression * xpathAttr = expr->queryProperty(xpathAtom);
  299. if (xpathAttr && xpathAttr->queryChild(0)->queryValue())
  300. xpath = xpathAttr->queryChild(0)->queryValue()->getStringValue(xpathtext);
  301. if (isKey() && expr->hasProperty(blobAtom))
  302. type.setown(makeIntType(8, false));
  303. type_t tc = type->getTypeCode();
  304. if (tc == type_row)
  305. {
  306. OwnedITypeInfo voidType = makeVoidType();
  307. fields.append(*new DataSourceMetaItem(FVFFbeginrecord, outname, xpath, voidType));
  308. gatherChildFields(expr->queryRecord(), isConditional);
  309. fields.append(*new DataSourceMetaItem(FVFFendrecord, outname, xpath, voidType));
  310. }
  311. else if ((tc == type_table) || (tc == type_groupedtable))
  312. {
  313. isStoredFixedWidth = false;
  314. fields.append(*new DataSourceDatasetItem(outname, xpath, expr));
  315. }
  316. else if (tc == type_set)
  317. {
  318. isStoredFixedWidth = false;
  319. fields.append(*new DataSourceSetItem(outname, xpath, type));
  320. }
  321. else
  322. {
  323. if (type->getTypeCode() == type_alien)
  324. {
  325. IHqlAlienTypeInfo * alien = queryAlienType(type);
  326. type.set(alien->queryPhysicalType());
  327. }
  328. addSimpleField(outname, xpath, type);
  329. }
  330. break;
  331. }
  332. }
  333. }
  334. void DataSourceMetaData::gatherChildFields(IHqlExpression * expr, bool isConditional)
  335. {
  336. bitsRemaining = 0;
  337. ForEachChild(idx, expr)
  338. gatherFields(expr->queryChild(idx), isConditional);
  339. }
  340. unsigned DataSourceMetaData::numKeyedColumns() const
  341. {
  342. unsigned count = 0;
  343. unsigned curOffset = 0;
  344. ForEachItemIn(i, fields)
  345. {
  346. if (curOffset >= keyedSize)
  347. break;
  348. DataSourceMetaItem & cur = fields.item(i);
  349. switch (cur.flags)
  350. {
  351. case FVFFnone:
  352. {
  353. unsigned size = cur.type->getSize();
  354. assertex(size != UNKNOWN_LENGTH);
  355. curOffset += size;
  356. count++;
  357. break;
  358. }
  359. default:
  360. throwUnexpected();
  361. }
  362. }
  363. return count;
  364. assertex(curOffset == keyedSize);
  365. }
  366. unsigned DataSourceMetaData::numColumns() const
  367. {
  368. return fields.ordinality() - numFieldsToIgnore;
  369. }
  370. ITypeInfo * DataSourceMetaData::queryType(unsigned column) const
  371. {
  372. return fields.item(column).type;
  373. }
  374. unsigned DataSourceMetaData::queryFieldFlags(unsigned column) const
  375. {
  376. return fields.item(column).flags;
  377. }
  378. const char * DataSourceMetaData::queryName(unsigned column) const
  379. {
  380. return fields.item(column).name;
  381. }
  382. const char * DataSourceMetaData::queryXPath(unsigned column) const
  383. {
  384. return fields.item(column).xpath;
  385. }
  386. const char * DataSourceMetaData::queryXmlTag(unsigned column) const
  387. {
  388. DataSourceMetaItem &item = fields.item(column);
  389. if (item.tagname.length())
  390. return (*item.tagname.str()!='/') ? item.tagname.str() : NULL;
  391. return item.name.get();
  392. }
  393. const char *DataSourceMetaData::queryXmlTag() const
  394. {
  395. if (tagname.length())
  396. return (*tagname.str()!='/') ? tagname.str() : NULL;
  397. return "Row";
  398. }
  399. const IntArray &DataSourceMetaData::queryAttrList()
  400. {
  401. if (!attrset)
  402. {
  403. ForEachItemIn(idx, fields)
  404. {
  405. DataSourceMetaItem &item = fields.item(idx);
  406. if (*item.tagname.str()=='@')
  407. attributes.append(idx);
  408. }
  409. attrset=true;
  410. }
  411. return attributes;
  412. }
  413. IFvDataSourceMetaData * DataSourceMetaData::queryChildMeta(unsigned column) const
  414. {
  415. return fields.item(column).queryChildMeta();
  416. }
  417. IFvDataSource * DataSourceMetaData::createChildDataSource(unsigned column, unsigned len, const void * data)
  418. {
  419. DataSourceMetaData * childMeta = fields.item(column).queryChildMeta();
  420. if (childMeta)
  421. return new NestedDataSource(*childMeta, len, data);
  422. return NULL;
  423. }
  424. bool DataSourceMetaData::supportsRandomSeek() const
  425. {
  426. return randomIsOk;
  427. }
  428. void DataSourceMetaData::serialize(MemoryBuffer & buffer) const
  429. {
  430. //NB: Update NullDataSourceMeta if this changes....
  431. buffer.append(numFieldsToIgnore);
  432. buffer.append(randomIsOk);
  433. buffer.append(isStoredFixedWidth);
  434. buffer.append(storedFixedSize);
  435. buffer.append(keyedSize);
  436. buffer.append(maxRecordSize);
  437. unsigned numFields = fields.ordinality();
  438. buffer.append(numFields);
  439. for (unsigned idx=0; idx < numFields; idx++)
  440. {
  441. fields.item(idx).serialize(buffer);
  442. }
  443. }
  444. size32_t DataSourceMetaData::getRecordSize(const void *rec)
  445. {
  446. if (isStoredFixedWidth)
  447. return storedFixedSize;
  448. if (!rec)
  449. return maxRecordSize;
  450. const byte * data = (const byte *)rec;
  451. unsigned curOffset = 0;
  452. unsigned bitsRemaining = 0;
  453. unsigned max = fields.ordinality() - numVirtualFields;
  454. for (unsigned idx=0; idx < max; idx++)
  455. {
  456. ITypeInfo & type = *fields.item(idx).type;
  457. unsigned size = type.getSize();
  458. if (size == UNKNOWN_LENGTH)
  459. {
  460. const byte * cur = data + curOffset;
  461. switch (type.getTypeCode())
  462. {
  463. case type_data:
  464. case type_string:
  465. case type_table:
  466. case type_groupedtable:
  467. size = *((unsigned *)cur) + sizeof(unsigned);
  468. break;
  469. case type_set:
  470. size = *((unsigned *)(cur + sizeof(bool))) + sizeof(unsigned) + sizeof(bool);
  471. break;
  472. case type_qstring:
  473. size = rtlQStrSize(*((unsigned *)cur)) + sizeof(unsigned);
  474. break;
  475. case type_unicode:
  476. size = *((unsigned *)cur)*2 + sizeof(unsigned);
  477. break;
  478. case type_utf8:
  479. size = sizeof(unsigned) + rtlUtf8Size(*(unsigned *)cur, cur+sizeof(unsigned));
  480. break;
  481. case type_varstring:
  482. size = strlen((char *)cur)+1;
  483. break;
  484. case type_varunicode:
  485. size = (rtlUnicodeStrlen((UChar *)cur)+1)*2;
  486. break;
  487. case type_packedint:
  488. size = rtlGetPackedSize(cur);
  489. break;
  490. default:
  491. UNIMPLEMENTED;
  492. }
  493. }
  494. if (type.getTypeCode() == type_bitfield)
  495. {
  496. unsigned thisBits = type.getBitSize();
  497. if (thisBits > bitsRemaining)
  498. {
  499. size = type.queryChildType()->getSize();
  500. bitsRemaining = size * 8;
  501. }
  502. else
  503. size = 0;
  504. bitsRemaining -= thisBits;
  505. }
  506. else
  507. bitsRemaining = 0;
  508. curOffset += size;
  509. }
  510. return curOffset;
  511. }
  512. size32_t DataSourceMetaData::getFixedSize() const
  513. {
  514. if (isStoredFixedWidth)
  515. return storedFixedSize;
  516. return 0;
  517. }
  518. IFvDataSourceMetaData * deserializeDataSourceMeta(MemoryBuffer & in)
  519. {
  520. return new DataSourceMetaData(in);
  521. }
  522. //---------------------------------------------------------------------------
  523. RowBlock::RowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset)
  524. {
  525. buffer.swapWith(_buffer);
  526. start = _start;
  527. startOffset = _startOffset;
  528. numRows = 0;
  529. }
  530. RowBlock::RowBlock(__int64 _start, __int64 _startOffset)
  531. {
  532. start = _start;
  533. startOffset = _startOffset;
  534. numRows = 0;
  535. }
  536. void RowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  537. {
  538. row = getNextRow();
  539. offset = startOffset + buffer.length();
  540. }
  541. FixedRowBlock::FixedRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, size32_t _fixedRecordSize) : RowBlock(_buffer, _start, _startOffset)
  542. {
  543. if (_fixedRecordSize == 0) _fixedRecordSize = 1;
  544. fixedRecordSize = _fixedRecordSize;
  545. numRows = buffer.length() / fixedRecordSize;
  546. }
  547. const void * FixedRowBlock::fetchRow(__int64 offset, size32_t & len)
  548. {
  549. __int64 maxOffset = startOffset + buffer.length();
  550. if (offset < startOffset || offset >= maxOffset)
  551. return NULL;
  552. len = fixedRecordSize;
  553. return buffer.toByteArray() + (offset - startOffset);
  554. }
  555. const void * FixedRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  556. {
  557. if (row < start || row >= start + numRows)
  558. return NULL;
  559. unsigned index = (unsigned)(row - start);
  560. unsigned blockOffset = index * fixedRecordSize;
  561. len = fixedRecordSize;
  562. rowOffset = startOffset + blockOffset;
  563. return buffer.toByteArray() + blockOffset;
  564. }
  565. VariableRowBlock::VariableRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, IRecordSizeEx * recordSize, bool isLast) : RowBlock(_buffer, _start, _startOffset)
  566. {
  567. const char * buff = buffer.toByteArray();
  568. unsigned cur = 0;
  569. unsigned max = buffer.length();
  570. unsigned maxCur = max;
  571. if (!isLast)
  572. maxCur -= recordSize->getRecordSize(NULL);
  573. while (cur < maxCur)
  574. {
  575. rowIndex.append(cur);
  576. cur += recordSize->getRecordSize(max-cur, buff + cur);
  577. }
  578. buffer.setLength(cur);
  579. rowIndex.append(cur);
  580. numRows = rowIndex.ordinality()-1;
  581. }
  582. VariableRowBlock::VariableRowBlock(MemoryBuffer & inBuffer, __int64 _start) : RowBlock(_start, 0)
  583. {
  584. inBuffer.read(numRows);
  585. for (unsigned row = 0; row < numRows; row++)
  586. {
  587. unsigned thisLength;
  588. rowIndex.append(buffer.length());
  589. inBuffer.read(thisLength);
  590. buffer.append(thisLength, inBuffer.readDirect(thisLength));
  591. }
  592. rowIndex.append(buffer.length());
  593. }
  594. const void * VariableRowBlock::fetchRow(__int64 offset, size32_t & len)
  595. {
  596. __int64 maxOffset = startOffset + buffer.length();
  597. if (offset < startOffset || offset >= maxOffset)
  598. return NULL;
  599. size32_t rowOffset = (size32_t)(offset - startOffset);
  600. unsigned pos = rowIndex.find(rowOffset);
  601. if (pos == NotFound)
  602. return NULL;
  603. len = rowIndex.item(pos+1)-rowOffset;
  604. return buffer.toByteArray() + rowOffset;
  605. }
  606. const void * VariableRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  607. {
  608. if (row < start || row >= start + numRows)
  609. return NULL;
  610. unsigned index = (unsigned)(row - start);
  611. unsigned blockOffset = rowIndex.item(index);
  612. len = rowIndex.item(index+1) - blockOffset;
  613. rowOffset = startOffset + blockOffset;
  614. return buffer.toByteArray() + blockOffset;
  615. }
  616. //---------------------------------------------------------------------------
  617. offset_t calculateNextOffset(const char * data, unsigned len)
  618. {
  619. offset_t offset = *(offset_t *)(data + len - sizeof(offset_t) - sizeof(unsigned short));
  620. return offset + *(unsigned short*)(data + len - sizeof(unsigned short));
  621. }
  622. void FilePosFixedRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  623. {
  624. row = start + numRows;
  625. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  626. }
  627. void FilePosVariableRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  628. {
  629. row = start + numRows;
  630. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  631. }
  632. //---------------------------------------------------------------------------
  633. static int rowCacheId;
  634. void RowCache::addRowsOwn(RowBlock * rows)
  635. {
  636. if (allRows.ordinality() == MaxBlocksCached)
  637. makeRoom();
  638. unsigned newPos = getInsertPosition(rows->getStartRow());
  639. allRows.add(*rows, newPos);
  640. ages.add(++rowCacheId, newPos);
  641. }
  642. bool RowCache::getCacheRow(__int64 row, RowLocation & location)
  643. {
  644. unsigned numRows = allRows.ordinality();
  645. if (numRows == 0)
  646. return false;
  647. const RowBlock & first = allRows.item(0);
  648. if (row < first.getStartRow())
  649. {
  650. location.bestRow = 0;
  651. location.bestOffset = 0;
  652. return false;
  653. }
  654. unsigned best = getBestRow(row);
  655. ages.replace(++rowCacheId, best);
  656. RowBlock & cur = allRows.item(best);
  657. location.matchRow = cur.getRow(row, location.matchLength, location.bestOffset);
  658. if (location.matchRow)
  659. return true;
  660. if (location.bestRow < cur.getNextRow())
  661. {
  662. cur.getNextStoredOffset(location.bestRow, location.bestOffset);
  663. }
  664. return false;
  665. }
  666. //Find the rowBlock that contains the expected row.
  667. //Return the *previous* RowBlock if no match is found.
  668. unsigned RowCache::getBestRow(__int64 row)
  669. {
  670. unsigned max = allRows.ordinality();
  671. int start = 0;
  672. int end = max;
  673. while (end - start > 1)
  674. {
  675. int mid = (start + end) >> 1;
  676. RowBlock & cur = allRows.item(mid);
  677. if (row >= cur.getStartRow())
  678. start = mid;
  679. else
  680. end = mid;
  681. }
  682. assertex(row >= allRows.item(start).getStartRow());
  683. assertex(start != max);
  684. return start;
  685. }
  686. unsigned RowCache::getInsertPosition(__int64 row)
  687. {
  688. unsigned max = allRows.ordinality();
  689. int start = 0;
  690. int end = max;
  691. while (end - start > 1)
  692. {
  693. int mid = (start + end) >> 1;
  694. RowBlock & cur = allRows.item(mid);
  695. if (row >= cur.getStartRow())
  696. start = mid;
  697. else
  698. end = mid;
  699. }
  700. if (start != max)
  701. {
  702. if (row >= allRows.item(start).getStartRow())
  703. start++;
  704. }
  705. assertex(start == max || (row < allRows.item(start).getStartRow()));
  706. return start;
  707. }
  708. void RowCache::makeRoom()
  709. {
  710. #if 0
  711. //For testing the caching by throwing away random blocks
  712. while (allRows.ordinality() > MinBlocksCached)
  713. {
  714. unsigned index = getRandom() % allRows.ordinality();
  715. allRows.remove(index);
  716. ages.remove(index);
  717. }
  718. return;
  719. #endif
  720. unsigned numToFree = allRows.ordinality() - MinBlocksCached;
  721. RowBlock * * oldestRow = new RowBlock * [numToFree];
  722. __int64 * oldestAge = new __int64[numToFree];
  723. unsigned numGot = 0;
  724. ForEachItemIn(idx, ages)
  725. {
  726. __int64 curAge = ages.item(idx);
  727. unsigned compare = numGot;
  728. while (compare != 0)
  729. {
  730. if (curAge >= oldestAge[compare-1])
  731. break;
  732. compare--;
  733. }
  734. if (compare < numToFree)
  735. {
  736. unsigned copySize = numGot - compare;
  737. if (numGot == numToFree)
  738. copySize--; //NB: Cannot go negative because compare < numToFree, numToFree == numGot => compare < numGot
  739. if (copySize)
  740. {
  741. memmove(oldestAge + compare + 1, oldestAge + compare, copySize * sizeof(*oldestAge));
  742. memmove(oldestRow + compare + 1, oldestRow + compare, copySize * sizeof(*oldestRow));
  743. }
  744. oldestAge[compare] = curAge;
  745. oldestRow[compare] = &allRows.item(idx);
  746. if (numGot != numToFree)
  747. numGot++;
  748. }
  749. }
  750. unsigned max = allRows.ordinality();
  751. unsigned i;
  752. for (i = 0; i < max; )
  753. {
  754. for (unsigned j = 0; j < numGot; j++)
  755. {
  756. if (oldestRow[j] == &allRows.item(i))
  757. {
  758. allRows.remove(i);
  759. ages.remove(i);
  760. max--;
  761. goto testNext;
  762. }
  763. }
  764. i++;
  765. testNext: ;
  766. }
  767. delete [] oldestRow;
  768. delete [] oldestAge;
  769. }
  770. //---------------------------------------------------------------------------
  771. FVDataSource::FVDataSource()
  772. {
  773. transformer = NULL;
  774. extraFieldsSize = 0;
  775. openCount = 0;
  776. appendFileposition = false;
  777. }
  778. FVDataSource::~FVDataSource()
  779. {
  780. //ensure all refs into the dll are cleared before it is unloaded
  781. returnedRecordSize.clear();
  782. }
  783. void FVDataSource::addFileposition()
  784. {
  785. appendFileposition = true;
  786. transformedMeta->addFileposition(); // This may modify the other metas as well!
  787. }
  788. void FVDataSource::copyRow(MemoryBuffer & out, const void * src, size32_t length)
  789. {
  790. if (transformer)
  791. {
  792. unsigned curLen = out.length();
  793. size32_t maxSize = transformedMeta->getMaxRecordSize();
  794. void * target = out.reserve(maxSize);
  795. //MORE: won't cope with dynamic maxlengths
  796. RtlStaticRowBuilder rowBuilder(target, maxSize);
  797. unsigned copied = transformer(rowBuilder, (const byte *)src);
  798. out.rewrite(curLen + copied);
  799. }
  800. else
  801. out.append(length, src);
  802. }
  803. bool FVDataSource::fetchRow(MemoryBuffer & out, __int64 offset)
  804. {
  805. if (!transformer)
  806. return fetchRawRow(out, offset);
  807. MemoryBuffer temp;
  808. if (fetchRowData(temp, offset))
  809. {
  810. copyRow(out, temp.toByteArray(), temp.length());
  811. if (appendFileposition)
  812. out.append(offset);
  813. return true;
  814. }
  815. return false;
  816. }
  817. bool FVDataSource::fetchRawRow(MemoryBuffer & out, __int64 offset)
  818. {
  819. if (fetchRowData(out, offset))
  820. {
  821. if (appendFileposition)
  822. out.append(offset);
  823. return true;
  824. }
  825. return false;
  826. }
  827. bool FVDataSource::getRow(MemoryBuffer & out, __int64 row)
  828. {
  829. size32_t length;
  830. const void * data;
  831. unsigned __int64 offset = 0;
  832. if (getRowData(row, length, data, offset))
  833. {
  834. copyRow(out, data, length);
  835. if (appendFileposition)
  836. out.append(offset);
  837. return true;
  838. }
  839. return false;
  840. }
  841. bool FVDataSource::getRawRow(MemoryBuffer & out, __int64 row)
  842. {
  843. size32_t length;
  844. const void * data;
  845. unsigned __int64 offset = 0;
  846. if (getRowData(row, length, data, offset))
  847. {
  848. out.append(length-extraFieldsSize, data);
  849. if (appendFileposition)
  850. out.append(offset);
  851. return true;
  852. }
  853. return false;
  854. }
  855. void FVDataSource::loadDll(const char * wuid)
  856. {
  857. //MORE: This code should be commoned up and available in the work unit code or something...
  858. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  859. Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid, false);
  860. //Plugins should already be loaded when they were registered with the ViewTransformerRegistry
  861. //Something like the following code could be used to check the plugin version...
  862. #if 0
  863. Owned<IConstWUPluginIterator> plugins = &wu->getPlugins();
  864. SafePluginMap * map = NULL;
  865. ForEach(*plugins)
  866. {
  867. IConstWUPlugin &thisplugin = plugins->query();
  868. SCMStringBuffer name, version;
  869. thisplugin.getPluginName(name);
  870. thisplugin.getPluginVersion(version);
  871. Owned<ILoadedDllEntry> loadedPlugin = map->getPluginDll(name.str(), version.str(), true);
  872. if (!loadedPlugin)
  873. break;
  874. }
  875. #endif
  876. Owned<IConstWUQuery> q = wu->getQuery();
  877. SCMStringBuffer dllname;
  878. q->getQueryDllName(dllname);
  879. // MORE - leaks....
  880. loadedDll.setown(queryDllServer().loadDll(dllname.str(), DllLocationAnywhere));
  881. }
  882. IFvDataSourceMetaData * FVDataSource::queryMetaData()
  883. {
  884. return transformedMeta;
  885. }
  886. bool FVDataSource::setReturnedInfoFromResult()
  887. {
  888. SCMStringBuffer s;
  889. wuResult->getResultEclSchema(s);
  890. returnedRecord.setown(parseQuery(s.str()));
  891. if (!returnedRecord)
  892. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  893. bool isKey = false;
  894. bool isGrouped = false; // this isn't strictly true...it could be true for an internal result, but no current flag to test
  895. returnedMeta.setown(new DataSourceMetaData(returnedRecord, 0, true, isGrouped, 0));
  896. transformedRecord.setown(getSimplifiedRecord(returnedRecord, isKey));
  897. if (!transformedRecord)
  898. {
  899. //No transformations needed, so don't need to loaded the dll containing the transform functions etc.
  900. transformedRecord.set(returnedRecord);
  901. returnedRecordSize.set(returnedMeta);
  902. }
  903. else
  904. {
  905. loadDll(wuid);
  906. s.clear();
  907. wuResult->getResultRecordSizeEntry(s);
  908. if (s.length())
  909. {
  910. typedef IRecordSize * (* recSizeFunc)();
  911. recSizeFunc func = (recSizeFunc)loadedDll->getEntry(s.str());
  912. Owned<IRecordSize> createdRecordSize = func();
  913. returnedRecordSize.setown(new RecordSizeToEx(createdRecordSize));
  914. }
  915. s.clear();
  916. wuResult->getResultTransformerEntry(s);
  917. if (s.length())
  918. transformer = (rowTransformFunction)loadedDll->getEntry(s.str());
  919. }
  920. transformedMeta.setown(new DataSourceMetaData(transformedRecord, 0, true, isGrouped, 0));
  921. return (returnedRecordSize != NULL);
  922. }
  923. //---------------------------------------------------------------------------
  924. __int64 PagedDataSource::numRows(bool force)
  925. {
  926. if (force && (totalRows == UNKNOWN_NUM_ROWS))
  927. {
  928. //MORE: Need to go and work it out - however painful...
  929. }
  930. return totalRows;
  931. }
  932. bool PagedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  933. {
  934. if ((row < 0) || ((unsigned __int64)row > totalRows))
  935. return false;
  936. RowLocation location;
  937. loop
  938. {
  939. if (cache.getCacheRow(row, location))
  940. {
  941. length = location.matchLength;
  942. data = location.matchRow;
  943. offset = location.bestOffset;
  944. return true;
  945. }
  946. improveLocation(row, location);
  947. if (!loadBlock(location.bestRow, location.bestOffset))
  948. return false;
  949. }
  950. }
  951. void PagedDataSource::improveLocation(__int64 row, RowLocation & location)
  952. {
  953. }
  954. //---------------------------------------------------------------------------
  955. NestedDataSource::NestedDataSource(DataSourceMetaData & meta, unsigned len, const void * data)
  956. {
  957. returnedMeta.set(&meta);
  958. returnedRecordSize.set(returnedMeta);
  959. transformedMeta.set(returnedMeta);
  960. totalSize = len;
  961. MemoryBuffer temp;
  962. temp.append(len, data);
  963. if (returnedMeta->isFixedSize())
  964. rows.setown(new FixedRowBlock(temp, 0, 0, returnedMeta->fixedSize()));
  965. else
  966. rows.setown(new VariableRowBlock(temp, 0, 0, returnedRecordSize, true));
  967. }
  968. bool NestedDataSource::init()
  969. {
  970. return true;
  971. }
  972. __int64 NestedDataSource::numRows(bool force)
  973. {
  974. return rows->getNextRow() - rows->getStartRow();
  975. }
  976. bool NestedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  977. {
  978. data = rows->getRow(row, length, offset);
  979. return (data != NULL);
  980. }
  981. //---------------------------------------------------------------------------
  982. NullDataSource::NullDataSource(IHqlExpression * _record, bool _isGrouped, unsigned _keyedSize)
  983. : meta(_record, 0, true, _isGrouped, _keyedSize)
  984. {
  985. }
  986. FailureDataSource::FailureDataSource(IHqlExpression * _record, IException * _error, bool _isGrouped, unsigned _keyedSize)
  987. : NullDataSource(_record, _isGrouped, _keyedSize), error(_error)
  988. {
  989. }
  990. //---------------------------------------------------------------------------
  991. IHqlExpression * parseQuery(const char * text)
  992. {
  993. MultiErrorReceiver errs;
  994. OwnedHqlExpr ret = ::parseQuery(text, &errs);
  995. if (errs.errCount() == 0)
  996. return ret.getClear();
  997. for (unsigned i=0; i < errs.errCount(); i++)
  998. {
  999. StringBuffer msg;
  1000. PrintLog("%d %s", errs.item(i)->getLine(), errs.item(i)->errorMessage(msg).str());
  1001. }
  1002. return NULL;
  1003. }
  1004. IFvDataSourceMetaData * createMetaData(IConstWUResult * wuResult)
  1005. {
  1006. SCMStringBuffer s;
  1007. wuResult->getResultEclSchema(s);
  1008. OwnedHqlExpr record = parseQuery(s.str());
  1009. if (!record)
  1010. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  1011. OwnedHqlExpr simplifiedRecord = getSimplifiedRecord(record, false);
  1012. bool isGrouped = false; // more not sure this is strictly true...
  1013. if (!simplifiedRecord)
  1014. return new DataSourceMetaData(record, 0, true, isGrouped, 0);
  1015. return new DataSourceMetaData(simplifiedRecord, 0, true, isGrouped, 0);
  1016. }