fvsource.cpp 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jliball.hpp"
  15. #include "eclrtl.hpp"
  16. #include "rtlds_imp.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fileview.hpp"
  19. #include "fvsource.ipp"
  20. #include "hqlerror.hpp"
  21. #include "eclhelper.hpp"
  22. #include "hqlattr.hpp"
  23. #include "hqlutil.hpp"
  24. #include "fvdatasource.hpp"
  25. #define MAX_RECORD_SIZE 4096
  26. inline void appendNextXpathName(StringBuffer &s, const char *&xpath)
  27. {
  28. while (*xpath && !strchr("*[/", *xpath))
  29. s.append(*xpath++);
  30. xpath = strchr(xpath, '/');
  31. }
  32. void splitXmlTagNamesFromXPath(const char *xpath, StringAttr &inner, StringAttr *outer=NULL)
  33. {
  34. if (!xpath || !xpath)
  35. return;
  36. StringBuffer s1;
  37. StringBuffer s2;
  38. appendNextXpathName(s1, xpath);
  39. if (outer && xpath)
  40. appendNextXpathName(s2, ++xpath);
  41. if (xpath) //xpath too deep
  42. return;
  43. if (!s2.length())
  44. inner.set(s1.str());
  45. else
  46. {
  47. inner.set(s2.str());
  48. outer->set(s1.str());
  49. }
  50. if (!inner.get())
  51. inner.set("");
  52. if (outer && !outer->get())
  53. outer->set("");
  54. }
  55. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, const char * _name, const char * _xpath, ITypeInfo * _type)
  56. {
  57. flags = _flags;
  58. name.set(_name);
  59. type.set(_type);
  60. xpath.set(_xpath);
  61. splitXmlTagNamesFromXPath(_xpath, tagname);
  62. }
  63. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, MemoryBuffer & in)
  64. {
  65. flags = _flags;
  66. in.read(name);
  67. in.read(xpath);
  68. type.setown(deserializeType(in));
  69. splitXmlTagNamesFromXPath(xpath.get(), tagname);
  70. }
  71. void DataSourceMetaItem::serialize(MemoryBuffer & out) const
  72. {
  73. out.append(flags);
  74. out.append(name);
  75. out.append(xpath);
  76. type->serialize(out);
  77. }
  78. //---------------------------------------------------------------------------
  79. DataSourceDatasetItem::DataSourceDatasetItem(const char * _name, const char * _xpath, IHqlExpression * expr) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(expr->queryRecord(), 0, true, false, false)
  80. {
  81. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  82. name.set(_name);
  83. xpath.set(_xpath);
  84. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  85. if (!record.tagname.length())
  86. record.tagname.set("Row");
  87. }
  88. DataSourceDatasetItem::DataSourceDatasetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(in)
  89. {
  90. type.setown(makeTableType(NULL, NULL, NULL, NULL));
  91. in.read(name);
  92. }
  93. void DataSourceDatasetItem::serialize(MemoryBuffer & out) const
  94. {
  95. out.append(flags);
  96. record.serialize(out);
  97. out.append(name);
  98. }
  99. //---------------------------------------------------------------------------
  100. DataSourceSetItem::DataSourceSetItem(const char * _name, const char * _xpath, ITypeInfo * _type) : DataSourceMetaItem(FVFFset, _name, NULL, _type)
  101. {
  102. createChild();
  103. xpath.set(_xpath);
  104. StringBuffer attr;
  105. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  106. if (!record.tagname.length())
  107. record.tagname.set("Item");
  108. }
  109. DataSourceSetItem::DataSourceSetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(flags, in)
  110. {
  111. createChild();
  112. splitXmlTagNamesFromXPath(xpath.get(), record.tagname, &tagname);
  113. if (!record.tagname.length())
  114. record.tagname.set("Item");
  115. }
  116. void DataSourceSetItem::createChild()
  117. {
  118. ITypeInfo * childType = type->queryChildType()->queryPromotedType();
  119. record.addSimpleField("Item", NULL, childType);
  120. }
  121. void DataSourceSetItem::serialize(MemoryBuffer & out) const
  122. {
  123. out.append(flags);
  124. record.serialize(out);
  125. out.append(name);
  126. }
  127. //---------------------------------------------------------------------------
  128. DataSourceMetaData::DataSourceMetaData(IHqlExpression * _record, byte _numFieldsToIgnore, bool _randomIsOk, bool _isGrouped, unsigned _keyedSize)
  129. {
  130. init();
  131. numFieldsToIgnore = _numFieldsToIgnore;
  132. randomIsOk = _randomIsOk;
  133. isStoredFixedWidth = true;
  134. //MORE: Blobs aren't handled correctly in indexes....
  135. maxRecordSize = ::getMaxRecordSize(_record, MAX_RECORD_SIZE);
  136. keyedSize = _keyedSize;
  137. gatherFields(_record, false);
  138. if (_isGrouped)
  139. {
  140. Owned<ITypeInfo> type = makeBoolType();
  141. addSimpleField("__groupfollows__", NULL, type);
  142. maxRecordSize++;
  143. }
  144. gatherAttributes();
  145. if (isStoredFixedWidth)
  146. assertex(storedFixedSize == maxRecordSize);
  147. }
  148. DataSourceMetaData::DataSourceMetaData()
  149. {
  150. init();
  151. randomIsOk = true;
  152. isStoredFixedWidth = true;
  153. }
  154. DataSourceMetaData::DataSourceMetaData(type_t typeCode)
  155. {
  156. init();
  157. OwnedITypeInfo type;
  158. if (typeCode == type_unicode)
  159. type.setown(makeUnicodeType(UNKNOWN_LENGTH, 0));
  160. else
  161. type.setown(makeStringType(UNKNOWN_LENGTH, NULL, NULL));
  162. fields.append(*new DataSourceMetaItem(FVFFnone, "line", NULL, type));
  163. }
  164. void DataSourceMetaData::init()
  165. {
  166. keyedSize = 0;
  167. storedFixedSize = 0;
  168. maxRecordSize = 0;
  169. bitsRemaining = 0;
  170. numVirtualFields = 0;
  171. isStoredFixedWidth = false;
  172. randomIsOk = false;
  173. numFieldsToIgnore = 0;
  174. }
  175. DataSourceMetaData::DataSourceMetaData(MemoryBuffer & buffer)
  176. {
  177. numVirtualFields = 0;
  178. buffer.read(numFieldsToIgnore);
  179. buffer.read(randomIsOk);
  180. buffer.read(isStoredFixedWidth);
  181. buffer.read(storedFixedSize);
  182. buffer.read(keyedSize);
  183. buffer.read(maxRecordSize);
  184. unsigned numFields;
  185. buffer.read(numFields);
  186. for (unsigned idx=0; idx < numFields; idx++)
  187. {
  188. byte flags;
  189. buffer.read(flags);
  190. if (flags == FVFFdataset)
  191. fields.append(*new DataSourceDatasetItem(flags, buffer));
  192. else if (flags == FVFFset)
  193. fields.append(*new DataSourceSetItem(flags, buffer));
  194. else
  195. fields.append(*new DataSourceMetaItem(flags, buffer));
  196. if (flags == FVFFvirtual)
  197. ++numVirtualFields;
  198. }
  199. gatherAttributes();
  200. }
  201. void DataSourceMetaData::addFileposition()
  202. {
  203. addVirtualField("__fileposition__", NULL, makeIntType(8, false));
  204. }
  205. void DataSourceMetaData::addSimpleField(const char * name, const char * xpath, ITypeInfo * type)
  206. {
  207. ITypeInfo * promoted = type->queryPromotedType();
  208. unsigned size = promoted->getSize();
  209. unsigned thisBits = 0;
  210. if (size == UNKNOWN_LENGTH)
  211. isStoredFixedWidth = false;
  212. else if (type->getTypeCode() == type_bitfield)
  213. {
  214. thisBits = type->getBitSize();
  215. if (thisBits > bitsRemaining)
  216. {
  217. size = type->queryChildType()->getSize();
  218. storedFixedSize += size;
  219. bitsRemaining = size * 8;
  220. }
  221. bitsRemaining -= thisBits;
  222. }
  223. else
  224. storedFixedSize += size;
  225. if (thisBits == 0)
  226. bitsRemaining = 0;
  227. fields.append(*new DataSourceMetaItem(FVFFnone, name, xpath, type));
  228. }
  229. void DataSourceMetaData::addVirtualField(const char * name, const char * xpath, ITypeInfo * type)
  230. {
  231. fields.append(*new DataSourceMetaItem(FVFFvirtual, name, xpath, type));
  232. ++numVirtualFields;
  233. }
  234. void DataSourceMetaData::extractKeyedInfo(UnsignedArray & offsets, TypeInfoArray & types)
  235. {
  236. unsigned curOffset = 0;
  237. ForEachItemIn(i, fields)
  238. {
  239. if (curOffset >= keyedSize)
  240. break;
  241. DataSourceMetaItem & cur = fields.item(i);
  242. switch (cur.flags)
  243. {
  244. case FVFFnone:
  245. {
  246. offsets.append(curOffset);
  247. types.append(*LINK(cur.type));
  248. unsigned size = cur.type->getSize();
  249. assertex(size != UNKNOWN_LENGTH);
  250. curOffset += size;
  251. break;
  252. }
  253. case FVFFbeginrecord:
  254. case FVFFendrecord:
  255. break;
  256. default:
  257. throwUnexpected();
  258. }
  259. }
  260. offsets.append(curOffset);
  261. assertex(curOffset == keyedSize);
  262. }
  263. //MORE: Really this should create no_selects for the sub records, but pass on that for the moment.
  264. void DataSourceMetaData::gatherFields(IHqlExpression * expr, bool isConditional)
  265. {
  266. switch (expr->getOperator())
  267. {
  268. case no_record:
  269. gatherChildFields(expr, isConditional);
  270. break;
  271. case no_ifblock:
  272. {
  273. OwnedITypeInfo boolType = makeBoolType();
  274. OwnedITypeInfo voidType = makeVoidType();
  275. isStoredFixedWidth = false;
  276. fields.append(*new DataSourceMetaItem(FVFFbeginif, NULL, NULL, boolType));
  277. gatherChildFields(expr->queryChild(1), true);
  278. fields.append(*new DataSourceMetaItem(FVFFendif, NULL, NULL, voidType));
  279. break;
  280. }
  281. case no_field:
  282. {
  283. if (expr->hasProperty(__ifblockAtom))
  284. break;
  285. Linked<ITypeInfo> type = expr->queryType();
  286. IAtom * name = expr->queryName();
  287. IHqlExpression * nameAttr = expr->queryProperty(namedAtom);
  288. StringBuffer outname;
  289. if (nameAttr && nameAttr->queryChild(0)->queryValue())
  290. nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
  291. else
  292. outname.append(name).toLowerCase();
  293. StringBuffer xpathtext;
  294. const char * xpath = NULL;
  295. IHqlExpression * xpathAttr = expr->queryProperty(xpathAtom);
  296. if (xpathAttr && xpathAttr->queryChild(0)->queryValue())
  297. xpath = xpathAttr->queryChild(0)->queryValue()->getStringValue(xpathtext);
  298. if (isKey() && expr->hasProperty(blobAtom))
  299. type.setown(makeIntType(8, false));
  300. type_t tc = type->getTypeCode();
  301. if (tc == type_row)
  302. {
  303. OwnedITypeInfo voidType = makeVoidType();
  304. fields.append(*new DataSourceMetaItem(FVFFbeginrecord, outname, xpath, voidType));
  305. gatherChildFields(expr->queryRecord(), isConditional);
  306. fields.append(*new DataSourceMetaItem(FVFFendrecord, outname, xpath, voidType));
  307. }
  308. else if ((tc == type_dictionary) || (tc == type_table) || (tc == type_groupedtable))
  309. {
  310. isStoredFixedWidth = false;
  311. fields.append(*new DataSourceDatasetItem(outname, xpath, expr));
  312. }
  313. else if (tc == type_set)
  314. {
  315. isStoredFixedWidth = false;
  316. fields.append(*new DataSourceSetItem(outname, xpath, type));
  317. }
  318. else
  319. {
  320. if (type->getTypeCode() == type_alien)
  321. {
  322. IHqlAlienTypeInfo * alien = queryAlienType(type);
  323. type.set(alien->queryPhysicalType());
  324. }
  325. addSimpleField(outname, xpath, type);
  326. }
  327. break;
  328. }
  329. }
  330. }
  331. void DataSourceMetaData::gatherChildFields(IHqlExpression * expr, bool isConditional)
  332. {
  333. bitsRemaining = 0;
  334. ForEachChild(idx, expr)
  335. gatherFields(expr->queryChild(idx), isConditional);
  336. }
  337. unsigned DataSourceMetaData::numKeyedColumns() const
  338. {
  339. unsigned count = 0;
  340. unsigned curOffset = 0;
  341. ForEachItemIn(i, fields)
  342. {
  343. if (curOffset >= keyedSize)
  344. break;
  345. DataSourceMetaItem & cur = fields.item(i);
  346. switch (cur.flags)
  347. {
  348. case FVFFnone:
  349. {
  350. unsigned size = cur.type->getSize();
  351. assertex(size != UNKNOWN_LENGTH);
  352. curOffset += size;
  353. count++;
  354. break;
  355. }
  356. default:
  357. throwUnexpected();
  358. }
  359. }
  360. return count;
  361. assertex(curOffset == keyedSize);
  362. }
  363. unsigned DataSourceMetaData::numColumns() const
  364. {
  365. return fields.ordinality() - numFieldsToIgnore;
  366. }
  367. ITypeInfo * DataSourceMetaData::queryType(unsigned column) const
  368. {
  369. return fields.item(column).type;
  370. }
  371. unsigned DataSourceMetaData::queryFieldFlags(unsigned column) const
  372. {
  373. return fields.item(column).flags;
  374. }
  375. const char * DataSourceMetaData::queryName(unsigned column) const
  376. {
  377. return fields.item(column).name;
  378. }
  379. const char * DataSourceMetaData::queryXPath(unsigned column) const
  380. {
  381. return fields.item(column).xpath;
  382. }
  383. const char * DataSourceMetaData::queryXmlTag(unsigned column) const
  384. {
  385. DataSourceMetaItem &item = fields.item(column);
  386. return (item.tagname.get()) ? item.tagname.get() : item.name.get();
  387. }
  388. const char *DataSourceMetaData::queryXmlTag() const
  389. {
  390. return (tagname.get()) ? tagname.get() : "Row";
  391. }
  392. void DataSourceMetaData::gatherNestedAttributes(DataSourceMetaItem &rec, aindex_t &idx)
  393. {
  394. aindex_t numItems = fields.ordinality();
  395. while (++idx < numItems)
  396. {
  397. DataSourceMetaItem &item = fields.item(idx);
  398. if (item.flags==FVFFendrecord)
  399. return;
  400. else if (item.flags==FVFFbeginrecord)
  401. gatherNestedAttributes((!item.tagname.get() || *item.tagname.get()) ? item : rec, idx);
  402. else if (item.isXmlAttribute())
  403. rec.nestedAttributes.append(idx);
  404. }
  405. }
  406. void DataSourceMetaData::gatherAttributes()
  407. {
  408. aindex_t numItems = fields.ordinality();
  409. for (aindex_t idx = 0; idx < numItems; ++idx)
  410. {
  411. DataSourceMetaItem &item = fields.item(idx);
  412. if (item.flags==FVFFbeginrecord)
  413. {
  414. if (!item.tagname.get() || *item.tagname.get())
  415. gatherNestedAttributes(item, idx);
  416. }
  417. else if (item.isXmlAttribute())
  418. attributes.append(idx);
  419. }
  420. }
  421. const IntArray &DataSourceMetaData::queryAttrList(unsigned column) const
  422. {
  423. return fields.item(column).nestedAttributes;
  424. }
  425. const IntArray &DataSourceMetaData::queryAttrList() const
  426. {
  427. return attributes;
  428. }
  429. IFvDataSourceMetaData * DataSourceMetaData::queryChildMeta(unsigned column) const
  430. {
  431. return fields.item(column).queryChildMeta();
  432. }
  433. IFvDataSource * DataSourceMetaData::createChildDataSource(unsigned column, unsigned len, const void * data)
  434. {
  435. DataSourceMetaData * childMeta = fields.item(column).queryChildMeta();
  436. if (childMeta)
  437. return new NestedDataSource(*childMeta, len, data);
  438. return NULL;
  439. }
  440. bool DataSourceMetaData::supportsRandomSeek() const
  441. {
  442. return randomIsOk;
  443. }
  444. void DataSourceMetaData::serialize(MemoryBuffer & buffer) const
  445. {
  446. //NB: Update NullDataSourceMeta if this changes....
  447. buffer.append(numFieldsToIgnore);
  448. buffer.append(randomIsOk);
  449. buffer.append(isStoredFixedWidth);
  450. buffer.append(storedFixedSize);
  451. buffer.append(keyedSize);
  452. buffer.append(maxRecordSize);
  453. unsigned numFields = fields.ordinality();
  454. buffer.append(numFields);
  455. for (unsigned idx=0; idx < numFields; idx++)
  456. {
  457. fields.item(idx).serialize(buffer);
  458. }
  459. }
  460. size32_t DataSourceMetaData::getRecordSize(const void *rec)
  461. {
  462. if (isStoredFixedWidth)
  463. return storedFixedSize;
  464. if (!rec)
  465. return maxRecordSize;
  466. const byte * data = (const byte *)rec;
  467. unsigned curOffset = 0;
  468. unsigned bitsRemaining = 0;
  469. unsigned max = fields.ordinality() - numVirtualFields;
  470. for (unsigned idx=0; idx < max; idx++)
  471. {
  472. ITypeInfo & type = *fields.item(idx).type;
  473. unsigned size = type.getSize();
  474. if (size == UNKNOWN_LENGTH)
  475. {
  476. const byte * cur = data + curOffset;
  477. switch (type.getTypeCode())
  478. {
  479. case type_data:
  480. case type_string:
  481. case type_table:
  482. case type_groupedtable:
  483. size = *((unsigned *)cur) + sizeof(unsigned);
  484. break;
  485. case type_set:
  486. size = *((unsigned *)(cur + sizeof(bool))) + sizeof(unsigned) + sizeof(bool);
  487. break;
  488. case type_qstring:
  489. size = rtlQStrSize(*((unsigned *)cur)) + sizeof(unsigned);
  490. break;
  491. case type_unicode:
  492. size = *((unsigned *)cur)*2 + sizeof(unsigned);
  493. break;
  494. case type_utf8:
  495. size = sizeof(unsigned) + rtlUtf8Size(*(unsigned *)cur, cur+sizeof(unsigned));
  496. break;
  497. case type_varstring:
  498. size = strlen((char *)cur)+1;
  499. break;
  500. case type_varunicode:
  501. size = (rtlUnicodeStrlen((UChar *)cur)+1)*2;
  502. break;
  503. case type_packedint:
  504. size = rtlGetPackedSize(cur);
  505. break;
  506. default:
  507. UNIMPLEMENTED;
  508. }
  509. }
  510. if (type.getTypeCode() == type_bitfield)
  511. {
  512. unsigned thisBits = type.getBitSize();
  513. if (thisBits > bitsRemaining)
  514. {
  515. size = type.queryChildType()->getSize();
  516. bitsRemaining = size * 8;
  517. }
  518. else
  519. size = 0;
  520. bitsRemaining -= thisBits;
  521. }
  522. else
  523. bitsRemaining = 0;
  524. curOffset += size;
  525. }
  526. return curOffset;
  527. }
  528. size32_t DataSourceMetaData::getFixedSize() const
  529. {
  530. if (isStoredFixedWidth)
  531. return storedFixedSize;
  532. return 0;
  533. }
  534. IFvDataSourceMetaData * deserializeDataSourceMeta(MemoryBuffer & in)
  535. {
  536. return new DataSourceMetaData(in);
  537. }
  538. //---------------------------------------------------------------------------
  539. RowBlock::RowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset)
  540. {
  541. buffer.swapWith(_buffer);
  542. start = _start;
  543. startOffset = _startOffset;
  544. numRows = 0;
  545. }
  546. RowBlock::RowBlock(__int64 _start, __int64 _startOffset)
  547. {
  548. start = _start;
  549. startOffset = _startOffset;
  550. numRows = 0;
  551. }
  552. void RowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  553. {
  554. row = getNextRow();
  555. offset = startOffset + buffer.length();
  556. }
  557. FixedRowBlock::FixedRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, size32_t _fixedRecordSize) : RowBlock(_buffer, _start, _startOffset)
  558. {
  559. if (_fixedRecordSize == 0) _fixedRecordSize = 1;
  560. fixedRecordSize = _fixedRecordSize;
  561. numRows = buffer.length() / fixedRecordSize;
  562. }
  563. const void * FixedRowBlock::fetchRow(__int64 offset, size32_t & len)
  564. {
  565. __int64 maxOffset = startOffset + buffer.length();
  566. if (offset < startOffset || offset >= maxOffset)
  567. return NULL;
  568. len = fixedRecordSize;
  569. return buffer.toByteArray() + (offset - startOffset);
  570. }
  571. const void * FixedRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  572. {
  573. if (row < start || row >= start + numRows)
  574. return NULL;
  575. unsigned index = (unsigned)(row - start);
  576. unsigned blockOffset = index * fixedRecordSize;
  577. len = fixedRecordSize;
  578. rowOffset = startOffset + blockOffset;
  579. return buffer.toByteArray() + blockOffset;
  580. }
  581. VariableRowBlock::VariableRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, IRecordSizeEx * recordSize, bool isLast) : RowBlock(_buffer, _start, _startOffset)
  582. {
  583. const char * buff = buffer.toByteArray();
  584. unsigned cur = 0;
  585. unsigned max = buffer.length();
  586. unsigned maxCur = max;
  587. if (!isLast)
  588. maxCur -= recordSize->getRecordSize(NULL);
  589. while (cur < maxCur)
  590. {
  591. rowIndex.append(cur);
  592. cur += recordSize->getRecordSize(max-cur, buff + cur);
  593. }
  594. buffer.setLength(cur);
  595. rowIndex.append(cur);
  596. numRows = rowIndex.ordinality()-1;
  597. }
  598. VariableRowBlock::VariableRowBlock(MemoryBuffer & inBuffer, __int64 _start) : RowBlock(_start, 0)
  599. {
  600. inBuffer.read(numRows);
  601. for (unsigned row = 0; row < numRows; row++)
  602. {
  603. unsigned thisLength;
  604. rowIndex.append(buffer.length());
  605. inBuffer.read(thisLength);
  606. buffer.append(thisLength, inBuffer.readDirect(thisLength));
  607. }
  608. rowIndex.append(buffer.length());
  609. }
  610. const void * VariableRowBlock::fetchRow(__int64 offset, size32_t & len)
  611. {
  612. __int64 maxOffset = startOffset + buffer.length();
  613. if (offset < startOffset || offset >= maxOffset)
  614. return NULL;
  615. size32_t rowOffset = (size32_t)(offset - startOffset);
  616. unsigned pos = rowIndex.find(rowOffset);
  617. if (pos == NotFound)
  618. return NULL;
  619. len = rowIndex.item(pos+1)-rowOffset;
  620. return buffer.toByteArray() + rowOffset;
  621. }
  622. const void * VariableRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  623. {
  624. if (row < start || row >= start + numRows)
  625. return NULL;
  626. unsigned index = (unsigned)(row - start);
  627. unsigned blockOffset = rowIndex.item(index);
  628. len = rowIndex.item(index+1) - blockOffset;
  629. rowOffset = startOffset + blockOffset;
  630. return buffer.toByteArray() + blockOffset;
  631. }
  632. //---------------------------------------------------------------------------
  633. offset_t calculateNextOffset(const char * data, unsigned len)
  634. {
  635. offset_t offset = *(offset_t *)(data + len - sizeof(offset_t) - sizeof(unsigned short));
  636. return offset + *(unsigned short*)(data + len - sizeof(unsigned short));
  637. }
  638. void FilePosFixedRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  639. {
  640. row = start + numRows;
  641. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  642. }
  643. void FilePosVariableRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  644. {
  645. row = start + numRows;
  646. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  647. }
  648. //---------------------------------------------------------------------------
  649. static int rowCacheId;
  650. void RowCache::addRowsOwn(RowBlock * rows)
  651. {
  652. if (allRows.ordinality() == MaxBlocksCached)
  653. makeRoom();
  654. unsigned newPos = getInsertPosition(rows->getStartRow());
  655. allRows.add(*rows, newPos);
  656. ages.add(++rowCacheId, newPos);
  657. }
  658. bool RowCache::getCacheRow(__int64 row, RowLocation & location)
  659. {
  660. unsigned numRows = allRows.ordinality();
  661. if (numRows == 0)
  662. return false;
  663. const RowBlock & first = allRows.item(0);
  664. if (row < first.getStartRow())
  665. {
  666. location.bestRow = 0;
  667. location.bestOffset = 0;
  668. return false;
  669. }
  670. unsigned best = getBestRow(row);
  671. ages.replace(++rowCacheId, best);
  672. RowBlock & cur = allRows.item(best);
  673. location.matchRow = cur.getRow(row, location.matchLength, location.bestOffset);
  674. if (location.matchRow)
  675. return true;
  676. if (location.bestRow < cur.getNextRow())
  677. {
  678. cur.getNextStoredOffset(location.bestRow, location.bestOffset);
  679. }
  680. return false;
  681. }
  682. //Find the rowBlock that contains the expected row.
  683. //Return the *previous* RowBlock if no match is found.
  684. unsigned RowCache::getBestRow(__int64 row)
  685. {
  686. unsigned max = allRows.ordinality();
  687. int start = 0;
  688. int end = max;
  689. while (end - start > 1)
  690. {
  691. int mid = (start + end) >> 1;
  692. RowBlock & cur = allRows.item(mid);
  693. if (row >= cur.getStartRow())
  694. start = mid;
  695. else
  696. end = mid;
  697. }
  698. assertex(row >= allRows.item(start).getStartRow());
  699. assertex(start != max);
  700. return start;
  701. }
  702. unsigned RowCache::getInsertPosition(__int64 row)
  703. {
  704. unsigned max = allRows.ordinality();
  705. int start = 0;
  706. int end = max;
  707. while (end - start > 1)
  708. {
  709. int mid = (start + end) >> 1;
  710. RowBlock & cur = allRows.item(mid);
  711. if (row >= cur.getStartRow())
  712. start = mid;
  713. else
  714. end = mid;
  715. }
  716. if (start != max)
  717. {
  718. if (row >= allRows.item(start).getStartRow())
  719. start++;
  720. }
  721. assertex(start == max || (row < allRows.item(start).getStartRow()));
  722. return start;
  723. }
  724. void RowCache::makeRoom()
  725. {
  726. #if 0
  727. //For testing the caching by throwing away random blocks
  728. while (allRows.ordinality() > MinBlocksCached)
  729. {
  730. unsigned index = getRandom() % allRows.ordinality();
  731. allRows.remove(index);
  732. ages.remove(index);
  733. }
  734. return;
  735. #endif
  736. unsigned numToFree = allRows.ordinality() - MinBlocksCached;
  737. RowBlock * * oldestRow = new RowBlock * [numToFree];
  738. __int64 * oldestAge = new __int64[numToFree];
  739. unsigned numGot = 0;
  740. ForEachItemIn(idx, ages)
  741. {
  742. __int64 curAge = ages.item(idx);
  743. unsigned compare = numGot;
  744. while (compare != 0)
  745. {
  746. if (curAge >= oldestAge[compare-1])
  747. break;
  748. compare--;
  749. }
  750. if (compare < numToFree)
  751. {
  752. unsigned copySize = numGot - compare;
  753. if (numGot == numToFree)
  754. copySize--; //NB: Cannot go negative because compare < numToFree, numToFree == numGot => compare < numGot
  755. if (copySize)
  756. {
  757. memmove(oldestAge + compare + 1, oldestAge + compare, copySize * sizeof(*oldestAge));
  758. memmove(oldestRow + compare + 1, oldestRow + compare, copySize * sizeof(*oldestRow));
  759. }
  760. oldestAge[compare] = curAge;
  761. oldestRow[compare] = &allRows.item(idx);
  762. if (numGot != numToFree)
  763. numGot++;
  764. }
  765. }
  766. unsigned max = allRows.ordinality();
  767. unsigned i;
  768. for (i = 0; i < max; )
  769. {
  770. for (unsigned j = 0; j < numGot; j++)
  771. {
  772. if (oldestRow[j] == &allRows.item(i))
  773. {
  774. allRows.remove(i);
  775. ages.remove(i);
  776. max--;
  777. goto testNext;
  778. }
  779. }
  780. i++;
  781. testNext: ;
  782. }
  783. delete [] oldestRow;
  784. delete [] oldestAge;
  785. }
  786. //---------------------------------------------------------------------------
  787. FVDataSource::FVDataSource()
  788. {
  789. transformer = NULL;
  790. extraFieldsSize = 0;
  791. openCount = 0;
  792. appendFileposition = false;
  793. }
  794. FVDataSource::~FVDataSource()
  795. {
  796. //ensure all refs into the dll are cleared before it is unloaded
  797. returnedRecordSize.clear();
  798. }
  799. void FVDataSource::addFileposition()
  800. {
  801. appendFileposition = true;
  802. transformedMeta->addFileposition(); // This may modify the other metas as well!
  803. }
  804. void FVDataSource::copyRow(MemoryBuffer & out, const void * src, size32_t length)
  805. {
  806. if (transformer)
  807. {
  808. unsigned curLen = out.length();
  809. size32_t maxSize = transformedMeta->getMaxRecordSize();
  810. void * target = out.reserve(maxSize);
  811. //MORE: won't cope with dynamic maxlengths
  812. RtlStaticRowBuilder rowBuilder(target, maxSize);
  813. unsigned copied = transformer(rowBuilder, (const byte *)src);
  814. out.rewrite(curLen + copied);
  815. }
  816. else
  817. out.append(length, src);
  818. }
  819. bool FVDataSource::fetchRow(MemoryBuffer & out, __int64 offset)
  820. {
  821. if (!transformer)
  822. return fetchRawRow(out, offset);
  823. MemoryBuffer temp;
  824. if (fetchRowData(temp, offset))
  825. {
  826. copyRow(out, temp.toByteArray(), temp.length());
  827. if (appendFileposition)
  828. out.append(offset);
  829. return true;
  830. }
  831. return false;
  832. }
  833. bool FVDataSource::fetchRawRow(MemoryBuffer & out, __int64 offset)
  834. {
  835. if (fetchRowData(out, offset))
  836. {
  837. if (appendFileposition)
  838. out.append(offset);
  839. return true;
  840. }
  841. return false;
  842. }
  843. bool FVDataSource::getRow(MemoryBuffer & out, __int64 row)
  844. {
  845. size32_t length;
  846. const void * data;
  847. unsigned __int64 offset = 0;
  848. if (getRowData(row, length, data, offset))
  849. {
  850. copyRow(out, data, length);
  851. if (appendFileposition)
  852. out.append(offset);
  853. return true;
  854. }
  855. return false;
  856. }
  857. bool FVDataSource::getRawRow(MemoryBuffer & out, __int64 row)
  858. {
  859. size32_t length;
  860. const void * data;
  861. unsigned __int64 offset = 0;
  862. if (getRowData(row, length, data, offset))
  863. {
  864. out.append(length-extraFieldsSize, data);
  865. if (appendFileposition)
  866. out.append(offset);
  867. return true;
  868. }
  869. return false;
  870. }
  871. void FVDataSource::loadDll(const char * wuid)
  872. {
  873. //MORE: This code should be commoned up and available in the work unit code or something...
  874. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  875. Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid, false);
  876. //Plugins should already be loaded when they were registered with the ViewTransformerRegistry
  877. //Something like the following code could be used to check the plugin version...
  878. #if 0
  879. Owned<IConstWUPluginIterator> plugins = &wu->getPlugins();
  880. SafePluginMap * map = NULL;
  881. ForEach(*plugins)
  882. {
  883. IConstWUPlugin &thisplugin = plugins->query();
  884. SCMStringBuffer name, version;
  885. thisplugin.getPluginName(name);
  886. thisplugin.getPluginVersion(version);
  887. Owned<ILoadedDllEntry> loadedPlugin = map->getPluginDll(name.str(), version.str(), true);
  888. if (!loadedPlugin)
  889. break;
  890. }
  891. #endif
  892. Owned<IConstWUQuery> q = wu->getQuery();
  893. SCMStringBuffer dllname;
  894. q->getQueryDllName(dllname);
  895. // MORE - leaks....
  896. loadedDll.setown(queryDllServer().loadDll(dllname.str(), DllLocationAnywhere));
  897. }
  898. IFvDataSourceMetaData * FVDataSource::queryMetaData()
  899. {
  900. return transformedMeta;
  901. }
  902. bool FVDataSource::setReturnedInfoFromResult()
  903. {
  904. SCMStringBuffer s;
  905. wuResult->getResultEclSchema(s);
  906. returnedRecord.setown(parseQuery(s.str()));
  907. if (!returnedRecord)
  908. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  909. bool isKey = false;
  910. bool isGrouped = false; // this isn't strictly true...it could be true for an internal result, but no current flag to test
  911. returnedMeta.setown(new DataSourceMetaData(returnedRecord, 0, true, isGrouped, 0));
  912. transformedRecord.setown(getFileViewerRecord(returnedRecord, isKey));
  913. if (!transformedRecord)
  914. {
  915. //No transformations needed, so don't need to loaded the dll containing the transform functions etc.
  916. transformedRecord.set(returnedRecord);
  917. returnedRecordSize.set(returnedMeta);
  918. }
  919. else
  920. {
  921. loadDll(wuid);
  922. s.clear();
  923. wuResult->getResultRecordSizeEntry(s);
  924. if (s.length())
  925. {
  926. typedef IRecordSize * (* recSizeFunc)();
  927. recSizeFunc func = (recSizeFunc)loadedDll->getEntry(s.str());
  928. Owned<IRecordSize> createdRecordSize = func();
  929. returnedRecordSize.setown(new RecordSizeToEx(createdRecordSize));
  930. }
  931. s.clear();
  932. wuResult->getResultTransformerEntry(s);
  933. if (s.length())
  934. transformer = (rowTransformFunction)loadedDll->getEntry(s.str());
  935. }
  936. transformedMeta.setown(new DataSourceMetaData(transformedRecord, 0, true, isGrouped, 0));
  937. return (returnedRecordSize != NULL);
  938. }
  939. //---------------------------------------------------------------------------
  940. __int64 PagedDataSource::numRows(bool force)
  941. {
  942. if (force && (totalRows == UNKNOWN_NUM_ROWS))
  943. {
  944. //MORE: Need to go and work it out - however painful...
  945. }
  946. return totalRows;
  947. }
  948. bool PagedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  949. {
  950. if ((row < 0) || ((unsigned __int64)row > totalRows))
  951. return false;
  952. RowLocation location;
  953. loop
  954. {
  955. if (cache.getCacheRow(row, location))
  956. {
  957. length = location.matchLength;
  958. data = location.matchRow;
  959. offset = location.bestOffset;
  960. return true;
  961. }
  962. improveLocation(row, location);
  963. if (!loadBlock(location.bestRow, location.bestOffset))
  964. return false;
  965. }
  966. }
  967. void PagedDataSource::improveLocation(__int64 row, RowLocation & location)
  968. {
  969. }
  970. //---------------------------------------------------------------------------
  971. NestedDataSource::NestedDataSource(DataSourceMetaData & meta, unsigned len, const void * data)
  972. {
  973. returnedMeta.set(&meta);
  974. returnedRecordSize.set(returnedMeta);
  975. transformedMeta.set(returnedMeta);
  976. totalSize = len;
  977. MemoryBuffer temp;
  978. temp.append(len, data);
  979. if (returnedMeta->isFixedSize())
  980. rows.setown(new FixedRowBlock(temp, 0, 0, returnedMeta->fixedSize()));
  981. else
  982. rows.setown(new VariableRowBlock(temp, 0, 0, returnedRecordSize, true));
  983. }
  984. bool NestedDataSource::init()
  985. {
  986. return true;
  987. }
  988. __int64 NestedDataSource::numRows(bool force)
  989. {
  990. return rows->getNextRow() - rows->getStartRow();
  991. }
  992. bool NestedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  993. {
  994. data = rows->getRow(row, length, offset);
  995. return (data != NULL);
  996. }
  997. //---------------------------------------------------------------------------
  998. NullDataSource::NullDataSource(IHqlExpression * _record, bool _isGrouped, unsigned _keyedSize)
  999. : meta(_record, 0, true, _isGrouped, _keyedSize)
  1000. {
  1001. }
  1002. FailureDataSource::FailureDataSource(IHqlExpression * _record, IException * _error, bool _isGrouped, unsigned _keyedSize)
  1003. : NullDataSource(_record, _isGrouped, _keyedSize), error(_error)
  1004. {
  1005. }
  1006. //---------------------------------------------------------------------------
  1007. IHqlExpression * parseQuery(const char * text)
  1008. {
  1009. MultiErrorReceiver errs;
  1010. OwnedHqlExpr ret = ::parseQuery(text, &errs);
  1011. if (errs.errCount() == 0)
  1012. return ret.getClear();
  1013. for (unsigned i=0; i < errs.errCount(); i++)
  1014. {
  1015. StringBuffer msg;
  1016. PrintLog("%d %s", errs.item(i)->getLine(), errs.item(i)->errorMessage(msg).str());
  1017. }
  1018. return NULL;
  1019. }
  1020. IFvDataSourceMetaData * createMetaData(IConstWUResult * wuResult)
  1021. {
  1022. SCMStringBuffer s;
  1023. wuResult->getResultEclSchema(s);
  1024. OwnedHqlExpr record = parseQuery(s.str());
  1025. if (!record)
  1026. throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
  1027. OwnedHqlExpr simplifiedRecord = getFileViewerRecord(record, false);
  1028. bool isGrouped = false; // more not sure this is strictly true...
  1029. if (!simplifiedRecord)
  1030. return new DataSourceMetaData(record, 0, true, isGrouped, 0);
  1031. return new DataSourceMetaData(simplifiedRecord, 0, true, isGrouped, 0);
  1032. }