fvsource.cpp 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jliball.hpp"
  15. #include "eclrtl.hpp"
  16. #include "rtlds_imp.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fileview.hpp"
  19. #include "fvsource.ipp"
  20. #include "fverror.hpp"
  21. #include "hqlerror.hpp"
  22. #include "eclhelper.hpp"
  23. #include "hqlattr.hpp"
  24. #include "hqlutil.hpp"
  25. #include "fvdatasource.hpp"
  26. #define MAX_RECORD_SIZE 4096
  27. inline void appendNextXpathName(StringBuffer &s, const char *&xpath)
  28. {
  29. while (*xpath && !strchr("*[/", *xpath))
  30. s.append(*xpath++);
  31. xpath = strchr(xpath, '/');
  32. }
  33. void splitXmlTagNamesFromXPath(const char *xpath, StringAttr &inner, StringAttr *outer=NULL)
  34. {
  35. if (!xpath || !xpath)
  36. return;
  37. StringBuffer s1;
  38. StringBuffer s2;
  39. appendNextXpathName(s1, xpath);
  40. if (outer && xpath)
  41. appendNextXpathName(s2, ++xpath);
  42. if (xpath) //xpath too deep
  43. return;
  44. if (!s2.length())
  45. inner.set(s1.str());
  46. else
  47. {
  48. inner.set(s2.str());
  49. outer->set(s1.str());
  50. }
  51. if (!inner.get())
  52. inner.set("");
  53. if (outer && !outer->get())
  54. outer->set("");
  55. }
  56. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, const char * _name, const char * _xpath, ITypeInfo * _type)
  57. {
  58. flags = _flags;
  59. name.set(_name);
  60. type.set(_type);
  61. xpath.set(_xpath);
  62. splitXmlTagNamesFromXPath(_xpath, tagname);
  63. }
  64. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, MemoryBuffer & in)
  65. {
  66. flags = _flags;
  67. in.read(name);
  68. in.read(xpath);
  69. type.setown(deserializeType(in));
  70. splitXmlTagNamesFromXPath(xpath.get(), tagname);
  71. }
  72. void DataSourceMetaItem::serialize(MemoryBuffer & out) const
  73. {
  74. out.append(flags);
  75. out.append(name);
  76. out.append(xpath);
  77. type->serialize(out);
  78. }
  79. //---------------------------------------------------------------------------
  80. DataSourceDatasetItem::DataSourceDatasetItem(const char * _name, const char * _xpath, IHqlExpression * expr) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(expr->queryRecord(), 0, true, false, false)
  81. {
  82. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  83. name.set(_name);
  84. xpath.set(_xpath);
  85. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  86. if (!record.tagname.length())
  87. record.tagname.set("Row");
  88. }
  89. DataSourceDatasetItem::DataSourceDatasetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(in)
  90. {
  91. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  92. in.read(name);
  93. }
  94. void DataSourceDatasetItem::serialize(MemoryBuffer & out) const
  95. {
  96. out.append(flags);
  97. record.serialize(out);
  98. out.append(name);
  99. }
  100. //---------------------------------------------------------------------------
  101. DataSourceSetItem::DataSourceSetItem(const char * _name, const char * _xpath, ITypeInfo * _type) : DataSourceMetaItem(FVFFset, _name, NULL, _type)
  102. {
  103. createChild();
  104. xpath.set(_xpath);
  105. StringBuffer attr;
  106. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  107. if (!record.tagname.length())
  108. record.tagname.set("Item");
  109. }
  110. DataSourceSetItem::DataSourceSetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(flags, in)
  111. {
  112. createChild();
  113. splitXmlTagNamesFromXPath(xpath.get(), record.tagname, &tagname);
  114. if (!record.tagname.length())
  115. record.tagname.set("Item");
  116. }
  117. void DataSourceSetItem::createChild()
  118. {
  119. ITypeInfo * childType = type->queryChildType()->queryPromotedType();
  120. record.addSimpleField("Item", NULL, childType);
  121. }
  122. void DataSourceSetItem::serialize(MemoryBuffer & out) const
  123. {
  124. out.append(flags);
  125. record.serialize(out);
  126. out.append(name);
  127. }
  128. //---------------------------------------------------------------------------
  129. DataSourceMetaData::DataSourceMetaData(IHqlExpression * _record, byte _numFieldsToIgnore, bool _randomIsOk, bool _isGrouped, unsigned _keyedSize)
  130. {
  131. init();
  132. numFieldsToIgnore = _numFieldsToIgnore;
  133. randomIsOk = _randomIsOk;
  134. isStoredFixedWidth = true;
  135. //MORE: Blobs aren't handled correctly in indexes....
  136. maxRecordSize = ::getMaxRecordSize(_record, MAX_RECORD_SIZE);
  137. keyedSize = _keyedSize;
  138. gatherFields(_record, false);
  139. if (_isGrouped)
  140. {
  141. Owned<ITypeInfo> type = makeBoolType();
  142. addSimpleField("__groupfollows__", NULL, type);
  143. maxRecordSize++;
  144. }
  145. gatherAttributes();
  146. if (isStoredFixedWidth)
  147. assertex(minRecordSize == maxRecordSize);
  148. }
  149. DataSourceMetaData::DataSourceMetaData()
  150. {
  151. init();
  152. randomIsOk = true;
  153. isStoredFixedWidth = true;
  154. }
  155. DataSourceMetaData::DataSourceMetaData(type_t typeCode)
  156. {
  157. init();
  158. OwnedITypeInfo type;
  159. if (typeCode == type_unicode)
  160. type.setown(makeUnicodeType(UNKNOWN_LENGTH, 0));
  161. else
  162. type.setown(makeStringType(UNKNOWN_LENGTH, NULL, NULL));
  163. fields.append(*new DataSourceMetaItem(FVFFnone, "line", NULL, type));
  164. }
  165. void DataSourceMetaData::init()
  166. {
  167. keyedSize = 0;
  168. minRecordSize = 0;
  169. maxRecordSize = 0;
  170. bitsRemaining = 0;
  171. numVirtualFields = 0;
  172. isStoredFixedWidth = false;
  173. randomIsOk = false;
  174. numFieldsToIgnore = 0;
  175. }
  176. DataSourceMetaData::DataSourceMetaData(MemoryBuffer & buffer)
  177. {
  178. numVirtualFields = 0;
  179. buffer.read(numFieldsToIgnore);
  180. buffer.read(randomIsOk);
  181. buffer.read(isStoredFixedWidth);
  182. buffer.read(minRecordSize);
  183. buffer.read(keyedSize);
  184. buffer.read(maxRecordSize);
  185. unsigned numFields;
  186. buffer.read(numFields);
  187. for (unsigned idx=0; idx < numFields; idx++)
  188. {
  189. byte flags;
  190. buffer.read(flags);
  191. if (flags == FVFFdataset)
  192. fields.append(*new DataSourceDatasetItem(flags, buffer));
  193. else if (flags == FVFFset)
  194. fields.append(*new DataSourceSetItem(flags, buffer));
  195. else
  196. fields.append(*new DataSourceMetaItem(flags, buffer));
  197. if (flags == FVFFvirtual)
  198. ++numVirtualFields;
  199. }
  200. gatherAttributes();
  201. }
  202. void DataSourceMetaData::addFileposition()
  203. {
  204. addVirtualField("__fileposition__", NULL, makeIntType(8, false));
  205. }
  206. void DataSourceMetaData::addSimpleField(const char * name, const char * xpath, ITypeInfo * type)
  207. {
  208. ITypeInfo * promoted = type->queryPromotedType();
  209. unsigned size = promoted->getSize();
  210. unsigned thisBits = 0;
  211. if (size == UNKNOWN_LENGTH)
  212. {
  213. isStoredFixedWidth = false;
  214. switch (type->getTypeCode())
  215. {
  216. case type_set:
  217. minRecordSize += sizeof(bool) + sizeof(size32_t);
  218. break;
  219. case type_varstring:
  220. minRecordSize += 1;
  221. break;
  222. case type_packedint:
  223. minRecordSize += 1;
  224. break;
  225. case type_varunicode:
  226. minRecordSize += sizeof(UChar);
  227. break;
  228. default:
  229. minRecordSize += sizeof(size32_t);
  230. break;
  231. }
  232. }
  233. else if (type->getTypeCode() == type_bitfield)
  234. {
  235. thisBits = type->getBitSize();
  236. if (thisBits > bitsRemaining)
  237. {
  238. size = type->queryChildType()->getSize();
  239. minRecordSize += size;
  240. bitsRemaining = size * 8;
  241. }
  242. bitsRemaining -= thisBits;
  243. }
  244. else
  245. minRecordSize += size;
  246. if (thisBits == 0)
  247. bitsRemaining = 0;
  248. fields.append(*new DataSourceMetaItem(FVFFnone, name, xpath, type));
  249. }
  250. void DataSourceMetaData::addVirtualField(const char * name, const char * xpath, ITypeInfo * type)
  251. {
  252. fields.append(*new DataSourceMetaItem(FVFFvirtual, name, xpath, type));
  253. ++numVirtualFields;
  254. }
  255. void DataSourceMetaData::extractKeyedInfo(UnsignedArray & offsets, TypeInfoArray & types)
  256. {
  257. unsigned curOffset = 0;
  258. ForEachItemIn(i, fields)
  259. {
  260. if (curOffset >= keyedSize)
  261. break;
  262. DataSourceMetaItem & cur = fields.item(i);
  263. switch (cur.flags)
  264. {
  265. case FVFFnone:
  266. {
  267. offsets.append(curOffset);
  268. types.append(*LINK(cur.type));
  269. unsigned size = cur.type->getSize();
  270. assertex(size != UNKNOWN_LENGTH);
  271. curOffset += size;
  272. break;
  273. }
  274. case FVFFbeginrecord:
  275. case FVFFendrecord:
  276. break;
  277. default:
  278. throwUnexpected();
  279. }
  280. }
  281. offsets.append(curOffset);
  282. assertex(curOffset == keyedSize);
  283. }
  284. //MORE: Really this should create no_selects for the sub records, but pass on that for the moment.
  285. void DataSourceMetaData::gatherFields(IHqlExpression * expr, bool isConditional)
  286. {
  287. switch (expr->getOperator())
  288. {
  289. case no_record:
  290. gatherChildFields(expr, isConditional);
  291. break;
  292. case no_ifblock:
  293. {
  294. OwnedITypeInfo boolType = makeBoolType();
  295. OwnedITypeInfo voidType = makeVoidType();
  296. isStoredFixedWidth = false;
  297. fields.append(*new DataSourceMetaItem(FVFFbeginif, NULL, NULL, boolType));
  298. gatherChildFields(expr->queryChild(1), true);
  299. fields.append(*new DataSourceMetaItem(FVFFendif, NULL, NULL, voidType));
  300. break;
  301. }
  302. case no_field:
  303. {
  304. if (expr->hasAttribute(__ifblockAtom))
  305. break;
  306. Linked<ITypeInfo> type = expr->queryType();
  307. IAtom * name = expr->queryName();
  308. IHqlExpression * nameAttr = expr->queryAttribute(namedAtom);
  309. StringBuffer outname;
  310. if (nameAttr && nameAttr->queryChild(0)->queryValue())
  311. nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
  312. else
  313. outname.append(name).toLowerCase();
  314. StringBuffer xpathtext;
  315. const char * xpath = NULL;
  316. IHqlExpression * xpathAttr = expr->queryAttribute(xpathAtom);
  317. if (xpathAttr && xpathAttr->queryChild(0)->queryValue())
  318. xpath = xpathAttr->queryChild(0)->queryValue()->getStringValue(xpathtext);
  319. if (isKey() && expr->hasAttribute(blobAtom))
  320. type.setown(makeIntType(8, false));
  321. type_t tc = type->getTypeCode();
  322. if (tc == type_row)
  323. {
  324. OwnedITypeInfo voidType = makeVoidType();
  325. fields.append(*new DataSourceMetaItem(FVFFbeginrecord, outname, xpath, voidType));
  326. gatherChildFields(expr->queryRecord(), isConditional);
  327. fields.append(*new DataSourceMetaItem(FVFFendrecord, outname, xpath, voidType));
  328. }
  329. else if ((tc == type_dictionary) || (tc == type_table) || (tc == type_groupedtable))
  330. {
  331. isStoredFixedWidth = false;
  332. fields.append(*new DataSourceDatasetItem(outname, xpath, expr));
  333. }
  334. else if (tc == type_set)
  335. {
  336. isStoredFixedWidth = false;
  337. fields.append(*new DataSourceSetItem(outname, xpath, type));
  338. }
  339. else
  340. {
  341. if (type->getTypeCode() == type_alien)
  342. {
  343. IHqlAlienTypeInfo * alien = queryAlienType(type);
  344. type.set(alien->queryPhysicalType());
  345. }
  346. addSimpleField(outname, xpath, type);
  347. }
  348. break;
  349. }
  350. }
  351. }
  352. void DataSourceMetaData::gatherChildFields(IHqlExpression * expr, bool isConditional)
  353. {
  354. bitsRemaining = 0;
  355. ForEachChild(idx, expr)
  356. gatherFields(expr->queryChild(idx), isConditional);
  357. }
  358. unsigned DataSourceMetaData::numKeyedColumns() const
  359. {
  360. unsigned count = 0;
  361. unsigned curOffset = 0;
  362. ForEachItemIn(i, fields)
  363. {
  364. if (curOffset >= keyedSize)
  365. break;
  366. DataSourceMetaItem & cur = fields.item(i);
  367. switch (cur.flags)
  368. {
  369. case FVFFnone:
  370. {
  371. unsigned size = cur.type->getSize();
  372. assertex(size != UNKNOWN_LENGTH);
  373. curOffset += size;
  374. count++;
  375. break;
  376. }
  377. default:
  378. throwUnexpected();
  379. }
  380. }
  381. return count;
  382. assertex(curOffset == keyedSize);
  383. }
  384. unsigned DataSourceMetaData::numColumns() const
  385. {
  386. return fields.ordinality() - numFieldsToIgnore;
  387. }
  388. ITypeInfo * DataSourceMetaData::queryType(unsigned column) const
  389. {
  390. return fields.item(column).type;
  391. }
  392. unsigned DataSourceMetaData::queryFieldFlags(unsigned column) const
  393. {
  394. return fields.item(column).flags;
  395. }
  396. const char * DataSourceMetaData::queryName(unsigned column) const
  397. {
  398. return fields.item(column).name;
  399. }
  400. const char * DataSourceMetaData::queryXPath(unsigned column) const
  401. {
  402. return fields.item(column).xpath;
  403. }
  404. const char * DataSourceMetaData::queryXmlTag(unsigned column) const
  405. {
  406. DataSourceMetaItem &item = fields.item(column);
  407. return (item.tagname.get()) ? item.tagname.get() : item.name.get();
  408. }
  409. const char *DataSourceMetaData::queryXmlTag() const
  410. {
  411. return (tagname.get()) ? tagname.get() : "Row";
  412. }
  413. void DataSourceMetaData::gatherNestedAttributes(DataSourceMetaItem &rec, aindex_t &idx)
  414. {
  415. aindex_t numItems = fields.ordinality();
  416. while (++idx < numItems)
  417. {
  418. DataSourceMetaItem &item = fields.item(idx);
  419. if (item.flags==FVFFendrecord)
  420. return;
  421. else if (item.flags==FVFFbeginrecord)
  422. gatherNestedAttributes((!item.tagname.get() || *item.tagname.get()) ? item : rec, idx);
  423. else if (item.isXmlAttribute())
  424. rec.nestedAttributes.append(idx);
  425. }
  426. }
  427. void DataSourceMetaData::gatherAttributes()
  428. {
  429. aindex_t numItems = fields.ordinality();
  430. for (aindex_t idx = 0; idx < numItems; ++idx)
  431. {
  432. DataSourceMetaItem &item = fields.item(idx);
  433. if (item.flags==FVFFbeginrecord)
  434. {
  435. if (!item.tagname.get() || *item.tagname.get())
  436. gatherNestedAttributes(item, idx);
  437. }
  438. else if (item.isXmlAttribute())
  439. attributes.append(idx);
  440. }
  441. }
  442. const IntArray &DataSourceMetaData::queryAttrList(unsigned column) const
  443. {
  444. return fields.item(column).nestedAttributes;
  445. }
  446. const IntArray &DataSourceMetaData::queryAttrList() const
  447. {
  448. return attributes;
  449. }
  450. IFvDataSourceMetaData * DataSourceMetaData::queryChildMeta(unsigned column) const
  451. {
  452. return fields.item(column).queryChildMeta();
  453. }
  454. IFvDataSource * DataSourceMetaData::createChildDataSource(unsigned column, unsigned len, const void * data)
  455. {
  456. DataSourceMetaData * childMeta = fields.item(column).queryChildMeta();
  457. if (childMeta)
  458. return new NestedDataSource(*childMeta, len, data);
  459. return NULL;
  460. }
  461. bool DataSourceMetaData::supportsRandomSeek() const
  462. {
  463. return randomIsOk;
  464. }
  465. void DataSourceMetaData::serialize(MemoryBuffer & buffer) const
  466. {
  467. //NB: Update NullDataSourceMeta if this changes....
  468. buffer.append(numFieldsToIgnore);
  469. buffer.append(randomIsOk);
  470. buffer.append(isStoredFixedWidth);
  471. buffer.append(minRecordSize);
  472. buffer.append(keyedSize);
  473. buffer.append(maxRecordSize);
  474. unsigned numFields = fields.ordinality();
  475. buffer.append(numFields);
  476. for (unsigned idx=0; idx < numFields; idx++)
  477. {
  478. fields.item(idx).serialize(buffer);
  479. }
  480. }
  481. size32_t DataSourceMetaData::getRecordSize(const void *rec)
  482. {
  483. if (isStoredFixedWidth)
  484. return minRecordSize;
  485. if (!rec)
  486. return maxRecordSize;
  487. const byte * data = (const byte *)rec;
  488. unsigned curOffset = 0;
  489. unsigned bitsRemaining = 0;
  490. unsigned max = fields.ordinality() - numVirtualFields;
  491. for (unsigned idx=0; idx < max; idx++)
  492. {
  493. ITypeInfo & type = *fields.item(idx).type;
  494. unsigned size = type.getSize();
  495. if (size == UNKNOWN_LENGTH)
  496. {
  497. const byte * cur = data + curOffset;
  498. switch (type.getTypeCode())
  499. {
  500. case type_data:
  501. case type_string:
  502. case type_table:
  503. case type_groupedtable:
  504. size = *((unsigned *)cur) + sizeof(unsigned);
  505. break;
  506. case type_set:
  507. size = *((unsigned *)(cur + sizeof(bool))) + sizeof(unsigned) + sizeof(bool);
  508. break;
  509. case type_qstring:
  510. size = rtlQStrSize(*((unsigned *)cur)) + sizeof(unsigned);
  511. break;
  512. case type_unicode:
  513. size = *((unsigned *)cur)*2 + sizeof(unsigned);
  514. break;
  515. case type_utf8:
  516. size = sizeof(unsigned) + rtlUtf8Size(*(unsigned *)cur, cur+sizeof(unsigned));
  517. break;
  518. case type_varstring:
  519. size = strlen((char *)cur)+1;
  520. break;
  521. case type_varunicode:
  522. size = (rtlUnicodeStrlen((UChar *)cur)+1)*2;
  523. break;
  524. case type_packedint:
  525. size = rtlGetPackedSize(cur);
  526. break;
  527. default:
  528. UNIMPLEMENTED;
  529. }
  530. }
  531. if (type.getTypeCode() == type_bitfield)
  532. {
  533. unsigned thisBits = type.getBitSize();
  534. if (thisBits > bitsRemaining)
  535. {
  536. size = type.queryChildType()->getSize();
  537. bitsRemaining = size * 8;
  538. }
  539. else
  540. size = 0;
  541. bitsRemaining -= thisBits;
  542. }
  543. else
  544. bitsRemaining = 0;
  545. curOffset += size;
  546. }
  547. return curOffset;
  548. }
  549. size32_t DataSourceMetaData::getFixedSize() const
  550. {
  551. if (isStoredFixedWidth)
  552. return minRecordSize;
  553. return 0;
  554. }
  555. size32_t DataSourceMetaData::getMinRecordSize() const
  556. {
  557. return minRecordSize;
  558. }
  559. IFvDataSourceMetaData * deserializeDataSourceMeta(MemoryBuffer & in)
  560. {
  561. return new DataSourceMetaData(in);
  562. }
  563. //---------------------------------------------------------------------------
  564. RowBlock::RowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset)
  565. {
  566. buffer.swapWith(_buffer);
  567. start = _start;
  568. startOffset = _startOffset;
  569. numRows = 0;
  570. }
  571. RowBlock::RowBlock(__int64 _start, __int64 _startOffset)
  572. {
  573. start = _start;
  574. startOffset = _startOffset;
  575. numRows = 0;
  576. }
  577. void RowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  578. {
  579. row = getNextRow();
  580. offset = startOffset + buffer.length();
  581. }
  582. FixedRowBlock::FixedRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, size32_t _fixedRecordSize) : RowBlock(_buffer, _start, _startOffset)
  583. {
  584. if (_fixedRecordSize == 0) _fixedRecordSize = 1;
  585. fixedRecordSize = _fixedRecordSize;
  586. numRows = buffer.length() / fixedRecordSize;
  587. }
  588. const void * FixedRowBlock::fetchRow(__int64 offset, size32_t & len)
  589. {
  590. __int64 maxOffset = startOffset + buffer.length();
  591. if (offset < startOffset || offset >= maxOffset)
  592. return NULL;
  593. len = fixedRecordSize;
  594. return buffer.toByteArray() + (offset - startOffset);
  595. }
  596. const void * FixedRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  597. {
  598. if (row < start || row >= start + numRows)
  599. return NULL;
  600. unsigned index = (unsigned)(row - start);
  601. unsigned blockOffset = index * fixedRecordSize;
  602. len = fixedRecordSize;
  603. rowOffset = startOffset + blockOffset;
  604. return buffer.toByteArray() + blockOffset;
  605. }
  606. VariableRowBlock::VariableRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, IRecordSizeEx * recordSize, bool isLast) : RowBlock(_buffer, _start, _startOffset)
  607. {
  608. const char * buff = buffer.toByteArray();
  609. unsigned cur = 0;
  610. unsigned max = buffer.length();
  611. unsigned maxCur = max;
  612. if (!isLast)
  613. maxCur -= recordSize->getRecordSize(NULL);
  614. while (cur < maxCur)
  615. {
  616. rowIndex.append(cur);
  617. cur += recordSize->getRecordSize(max-cur, buff + cur);
  618. if (cur > max)
  619. throwError(FVERR_RowTooLarge);
  620. }
  621. buffer.setLength(cur);
  622. rowIndex.append(cur);
  623. numRows = rowIndex.ordinality()-1;
  624. }
  625. VariableRowBlock::VariableRowBlock(MemoryBuffer & inBuffer, __int64 _start) : RowBlock(_start, 0)
  626. {
  627. inBuffer.read(numRows);
  628. for (unsigned row = 0; row < numRows; row++)
  629. {
  630. unsigned thisLength;
  631. rowIndex.append(buffer.length());
  632. inBuffer.read(thisLength);
  633. buffer.append(thisLength, inBuffer.readDirect(thisLength));
  634. }
  635. rowIndex.append(buffer.length());
  636. }
  637. const void * VariableRowBlock::fetchRow(__int64 offset, size32_t & len)
  638. {
  639. __int64 maxOffset = startOffset + buffer.length();
  640. if (offset < startOffset || offset >= maxOffset)
  641. return NULL;
  642. size32_t rowOffset = (size32_t)(offset - startOffset);
  643. unsigned pos = rowIndex.find(rowOffset);
  644. if (pos == NotFound)
  645. return NULL;
  646. len = rowIndex.item(pos+1)-rowOffset;
  647. return buffer.toByteArray() + rowOffset;
  648. }
  649. const void * VariableRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  650. {
  651. if (row < start || row >= start + numRows)
  652. return NULL;
  653. unsigned index = (unsigned)(row - start);
  654. unsigned blockOffset = rowIndex.item(index);
  655. len = rowIndex.item(index+1) - blockOffset;
  656. rowOffset = startOffset + blockOffset;
  657. return buffer.toByteArray() + blockOffset;
  658. }
  659. //---------------------------------------------------------------------------
  660. offset_t calculateNextOffset(const char * data, unsigned len)
  661. {
  662. offset_t offset = *(offset_t *)(data + len - sizeof(offset_t) - sizeof(unsigned short));
  663. return offset + *(unsigned short*)(data + len - sizeof(unsigned short));
  664. }
  665. void FilePosFixedRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  666. {
  667. row = start + numRows;
  668. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  669. }
  670. void FilePosVariableRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  671. {
  672. row = start + numRows;
  673. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  674. }
  675. //---------------------------------------------------------------------------
  676. static int rowCacheId;
  677. void RowCache::addRowsOwn(RowBlock * rows)
  678. {
  679. if (allRows.ordinality() == MaxBlocksCached)
  680. makeRoom();
  681. unsigned newPos = getInsertPosition(rows->getStartRow());
  682. allRows.add(*rows, newPos);
  683. ages.add(++rowCacheId, newPos);
  684. }
  685. bool RowCache::getCacheRow(__int64 row, RowLocation & location)
  686. {
  687. unsigned numRows = allRows.ordinality();
  688. if (numRows == 0)
  689. return false;
  690. const RowBlock & first = allRows.item(0);
  691. if (row < first.getStartRow())
  692. {
  693. location.bestRow = 0;
  694. location.bestOffset = 0;
  695. return false;
  696. }
  697. unsigned best = getBestRow(row);
  698. ages.replace(++rowCacheId, best);
  699. RowBlock & cur = allRows.item(best);
  700. location.matchRow = cur.getRow(row, location.matchLength, location.bestOffset);
  701. if (location.matchRow)
  702. return true;
  703. if (location.bestRow < cur.getNextRow())
  704. {
  705. cur.getNextStoredOffset(location.bestRow, location.bestOffset);
  706. }
  707. return false;
  708. }
  709. //Find the rowBlock that contains the expected row.
  710. //Return the *previous* RowBlock if no match is found.
  711. unsigned RowCache::getBestRow(__int64 row)
  712. {
  713. unsigned max = allRows.ordinality();
  714. int start = 0;
  715. int end = max;
  716. while (end - start > 1)
  717. {
  718. int mid = (start + end) >> 1;
  719. RowBlock & cur = allRows.item(mid);
  720. if (row >= cur.getStartRow())
  721. start = mid;
  722. else
  723. end = mid;
  724. }
  725. assertex(row >= allRows.item(start).getStartRow());
  726. assertex(start != max);
  727. return start;
  728. }
  729. unsigned RowCache::getInsertPosition(__int64 row)
  730. {
  731. unsigned max = allRows.ordinality();
  732. int start = 0;
  733. int end = max;
  734. while (end - start > 1)
  735. {
  736. int mid = (start + end) >> 1;
  737. RowBlock & cur = allRows.item(mid);
  738. if (row >= cur.getStartRow())
  739. start = mid;
  740. else
  741. end = mid;
  742. }
  743. if (start != max)
  744. {
  745. if (row >= allRows.item(start).getStartRow())
  746. start++;
  747. }
  748. assertex(start == max || (row < allRows.item(start).getStartRow()));
  749. return start;
  750. }
  751. void RowCache::makeRoom()
  752. {
  753. #if 0
  754. //For testing the caching by throwing away random blocks
  755. while (allRows.ordinality() > MinBlocksCached)
  756. {
  757. unsigned index = getRandom() % allRows.ordinality();
  758. allRows.remove(index);
  759. ages.remove(index);
  760. }
  761. return;
  762. #endif
  763. unsigned numToFree = allRows.ordinality() - MinBlocksCached;
  764. RowBlock * * oldestRow = new RowBlock * [numToFree];
  765. __int64 * oldestAge = new __int64[numToFree];
  766. unsigned numGot = 0;
  767. ForEachItemIn(idx, ages)
  768. {
  769. __int64 curAge = ages.item(idx);
  770. unsigned compare = numGot;
  771. while (compare != 0)
  772. {
  773. if (curAge >= oldestAge[compare-1])
  774. break;
  775. compare--;
  776. }
  777. if (compare < numToFree)
  778. {
  779. unsigned copySize = numGot - compare;
  780. if (numGot == numToFree)
  781. copySize--; //NB: Cannot go negative because compare < numToFree, numToFree == numGot => compare < numGot
  782. if (copySize)
  783. {
  784. memmove(oldestAge + compare + 1, oldestAge + compare, copySize * sizeof(*oldestAge));
  785. memmove(oldestRow + compare + 1, oldestRow + compare, copySize * sizeof(*oldestRow));
  786. }
  787. oldestAge[compare] = curAge;
  788. oldestRow[compare] = &allRows.item(idx);
  789. if (numGot != numToFree)
  790. numGot++;
  791. }
  792. }
  793. unsigned max = allRows.ordinality();
  794. unsigned i;
  795. for (i = 0; i < max; )
  796. {
  797. for (unsigned j = 0; j < numGot; j++)
  798. {
  799. if (oldestRow[j] == &allRows.item(i))
  800. {
  801. allRows.remove(i);
  802. ages.remove(i);
  803. max--;
  804. goto testNext;
  805. }
  806. }
  807. i++;
  808. testNext: ;
  809. }
  810. delete [] oldestRow;
  811. delete [] oldestAge;
  812. }
  813. //---------------------------------------------------------------------------
  814. FVDataSource::FVDataSource()
  815. {
  816. transformer = NULL;
  817. extraFieldsSize = 0;
  818. openCount = 0;
  819. appendFileposition = false;
  820. }
  821. FVDataSource::~FVDataSource()
  822. {
  823. //ensure all refs into the dll are cleared before it is unloaded
  824. returnedRecordSize.clear();
  825. }
  826. void FVDataSource::addFileposition()
  827. {
  828. appendFileposition = true;
  829. transformedMeta->addFileposition(); // This may modify the other metas as well!
  830. }
  831. void FVDataSource::copyRow(MemoryBuffer & out, const void * src, size32_t length)
  832. {
  833. if (transformer)
  834. {
  835. unsigned curLen = out.length();
  836. size32_t maxSize = transformedMeta->getMaxRecordSize();
  837. void * target = out.reserve(maxSize);
  838. //MORE: won't cope with dynamic maxlengths
  839. RtlStaticRowBuilder rowBuilder(target, maxSize);
  840. unsigned copied = transformer(rowBuilder, (const byte *)src);
  841. out.rewrite(curLen + copied);
  842. }
  843. else
  844. out.append(length, src);
  845. }
  846. bool FVDataSource::fetchRow(MemoryBuffer & out, __int64 offset)
  847. {
  848. if (!transformer)
  849. return fetchRawRow(out, offset);
  850. MemoryBuffer temp;
  851. if (fetchRowData(temp, offset))
  852. {
  853. copyRow(out, temp.toByteArray(), temp.length());
  854. if (appendFileposition)
  855. out.append(offset);
  856. return true;
  857. }
  858. return false;
  859. }
  860. bool FVDataSource::fetchRawRow(MemoryBuffer & out, __int64 offset)
  861. {
  862. if (fetchRowData(out, offset))
  863. {
  864. if (appendFileposition)
  865. out.append(offset);
  866. return true;
  867. }
  868. return false;
  869. }
  870. bool FVDataSource::getRow(MemoryBuffer & out, __int64 row)
  871. {
  872. size32_t length;
  873. const void * data;
  874. unsigned __int64 offset = 0;
  875. if (getRowData(row, length, data, offset))
  876. {
  877. copyRow(out, data, length);
  878. if (appendFileposition)
  879. out.append(offset);
  880. return true;
  881. }
  882. return false;
  883. }
  884. bool FVDataSource::getRawRow(MemoryBuffer & out, __int64 row)
  885. {
  886. size32_t length;
  887. const void * data;
  888. unsigned __int64 offset = 0;
  889. if (getRowData(row, length, data, offset))
  890. {
  891. out.append(length-extraFieldsSize, data);
  892. if (appendFileposition)
  893. out.append(offset);
  894. return true;
  895. }
  896. return false;
  897. }
  898. void FVDataSource::loadDll(const char * wuid)
  899. {
  900. //MORE: This code should be commoned up and available in the work unit code or something...
  901. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  902. Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid, false);
  903. //Plugins should already be loaded when they were registered with the ViewTransformerRegistry
  904. //Something like the following code could be used to check the plugin version...
  905. #if 0
  906. Owned<IConstWUPluginIterator> plugins = &wu->getPlugins();
  907. SafePluginMap * map = NULL;
  908. ForEach(*plugins)
  909. {
  910. IConstWUPlugin &thisplugin = plugins->query();
  911. SCMStringBuffer name, version;
  912. thisplugin.getPluginName(name);
  913. thisplugin.getPluginVersion(version);
  914. Owned<ILoadedDllEntry> loadedPlugin = map->getPluginDll(name.str(), version.str(), true);
  915. if (!loadedPlugin)
  916. break;
  917. }
  918. #endif
  919. Owned<IConstWUQuery> q = wu->getQuery();
  920. SCMStringBuffer dllname;
  921. q->getQueryDllName(dllname);
  922. // MORE - leaks....
  923. loadedDll.setown(queryDllServer().loadDll(dllname.str(), DllLocationAnywhere));
  924. }
  925. IFvDataSourceMetaData * FVDataSource::queryMetaData()
  926. {
  927. return transformedMeta;
  928. }
  929. bool FVDataSource::setReturnedInfoFromResult()
  930. {
  931. SCMStringBuffer s;
  932. wuResult->getResultEclSchema(s);
  933. returnedRecord.setown(parseQuery(s.str()));
  934. if (!returnedRecord)
  935. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  936. bool isKey = false;
  937. bool isGrouped = false; // this isn't strictly true...it could be true for an internal result, but no current flag to test
  938. returnedMeta.setown(new DataSourceMetaData(returnedRecord, 0, true, isGrouped, 0));
  939. transformedRecord.setown(getFileViewerRecord(returnedRecord, isKey));
  940. if (!transformedRecord)
  941. {
  942. //No transformations needed, so don't need to loaded the dll containing the transform functions etc.
  943. transformedRecord.set(returnedRecord);
  944. returnedRecordSize.set(returnedMeta);
  945. }
  946. else
  947. {
  948. loadDll(wuid);
  949. s.clear();
  950. wuResult->getResultRecordSizeEntry(s);
  951. if (s.length())
  952. {
  953. typedef IRecordSize * (* recSizeFunc)();
  954. recSizeFunc func = (recSizeFunc)loadedDll->getEntry(s.str());
  955. Owned<IRecordSize> createdRecordSize = func();
  956. returnedRecordSize.setown(new RecordSizeToEx(createdRecordSize));
  957. }
  958. s.clear();
  959. wuResult->getResultTransformerEntry(s);
  960. if (s.length())
  961. transformer = (rowTransformFunction)loadedDll->getEntry(s.str());
  962. }
  963. transformedMeta.setown(new DataSourceMetaData(transformedRecord, 0, true, isGrouped, 0));
  964. return (returnedRecordSize != NULL);
  965. }
  966. //---------------------------------------------------------------------------
  967. __int64 PagedDataSource::numRows(bool force)
  968. {
  969. if (force && (totalRows == UNKNOWN_NUM_ROWS))
  970. {
  971. //MORE: Need to go and work it out - however painful...
  972. }
  973. return totalRows;
  974. }
  975. bool PagedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  976. {
  977. if ((row < 0) || ((unsigned __int64)row > totalRows))
  978. return false;
  979. RowLocation location;
  980. loop
  981. {
  982. if (cache.getCacheRow(row, location))
  983. {
  984. length = location.matchLength;
  985. data = location.matchRow;
  986. offset = location.bestOffset;
  987. return true;
  988. }
  989. improveLocation(row, location);
  990. if (!loadBlock(location.bestRow, location.bestOffset))
  991. return false;
  992. }
  993. }
  994. void PagedDataSource::improveLocation(__int64 row, RowLocation & location)
  995. {
  996. }
  997. //---------------------------------------------------------------------------
  998. NestedDataSource::NestedDataSource(DataSourceMetaData & meta, unsigned len, const void * data)
  999. {
  1000. returnedMeta.set(&meta);
  1001. returnedRecordSize.set(returnedMeta);
  1002. transformedMeta.set(returnedMeta);
  1003. totalSize = len;
  1004. MemoryBuffer temp;
  1005. temp.append(len, data);
  1006. if (returnedMeta->isFixedSize())
  1007. rows.setown(new FixedRowBlock(temp, 0, 0, returnedMeta->fixedSize()));
  1008. else
  1009. rows.setown(new VariableRowBlock(temp, 0, 0, returnedRecordSize, true));
  1010. }
  1011. bool NestedDataSource::init()
  1012. {
  1013. return true;
  1014. }
  1015. __int64 NestedDataSource::numRows(bool force)
  1016. {
  1017. return rows->getNextRow() - rows->getStartRow();
  1018. }
  1019. bool NestedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  1020. {
  1021. data = rows->getRow(row, length, offset);
  1022. return (data != NULL);
  1023. }
  1024. //---------------------------------------------------------------------------
  1025. NullDataSource::NullDataSource(IHqlExpression * _record, bool _isGrouped, unsigned _keyedSize)
  1026. : meta(_record, 0, true, _isGrouped, _keyedSize)
  1027. {
  1028. }
  1029. FailureDataSource::FailureDataSource(IHqlExpression * _record, IException * _error, bool _isGrouped, unsigned _keyedSize)
  1030. : NullDataSource(_record, _isGrouped, _keyedSize), error(_error)
  1031. {
  1032. }
  1033. //---------------------------------------------------------------------------
  1034. IHqlExpression * parseQuery(const char * text)
  1035. {
  1036. MultiErrorReceiver errs;
  1037. OwnedHqlExpr ret = ::parseQuery(text, &errs);
  1038. if (errs.errCount() == 0)
  1039. return ret.getClear();
  1040. for (unsigned i=0; i < errs.errCount(); i++)
  1041. {
  1042. StringBuffer msg;
  1043. PrintLog("%d %s", errs.item(i)->getLine(), errs.item(i)->errorMessage(msg).str());
  1044. }
  1045. return NULL;
  1046. }
  1047. IFvDataSourceMetaData * createMetaData(IConstWUResult * wuResult)
  1048. {
  1049. SCMStringBuffer s;
  1050. wuResult->getResultEclSchema(s);
  1051. OwnedHqlExpr record = parseQuery(s.str());
  1052. if (!record)
  1053. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  1054. OwnedHqlExpr simplifiedRecord = getFileViewerRecord(record, false);
  1055. bool isGrouped = false; // more not sure this is strictly true...
  1056. if (!simplifiedRecord)
  1057. return new DataSourceMetaData(record, 0, true, isGrouped, 0);
  1058. return new DataSourceMetaData(simplifiedRecord, 0, true, isGrouped, 0);
  1059. }