fvsource.cpp 36 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jliball.hpp"
  15. #include "eclrtl.hpp"
  16. #include "rtlds_imp.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fileview.hpp"
  19. #include "fvsource.ipp"
  20. #include "fverror.hpp"
  21. #include "hqlerror.hpp"
  22. #include "eclhelper.hpp"
  23. #include "hqlattr.hpp"
  24. #include "hqlutil.hpp"
  25. #include "fvdatasource.hpp"
  26. #define MAX_RECORD_SIZE 4096
  27. inline void appendNextXpathName(StringBuffer &s, const char *&xpath)
  28. {
  29. while (*xpath && !strchr("*[/", *xpath))
  30. s.append(*xpath++);
  31. xpath = strchr(xpath, '/');
  32. }
  33. void splitXmlTagNamesFromXPath(const char *xpath, StringAttr &inner, StringAttr *outer=NULL)
  34. {
  35. if (!xpath)
  36. return;
  37. StringBuffer s1;
  38. StringBuffer s2;
  39. appendNextXpathName(s1, xpath);
  40. if (outer && xpath)
  41. appendNextXpathName(s2, ++xpath);
  42. if (xpath) //xpath too deep
  43. return;
  44. if (!s2.length())
  45. inner.set(s1.str());
  46. else
  47. {
  48. inner.set(s2.str());
  49. outer->set(s1.str());
  50. }
  51. if (!inner.get())
  52. inner.set("");
  53. if (outer && !outer->get())
  54. outer->set("");
  55. }
  56. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, const char * _name, const char * _xpath, ITypeInfo * _type)
  57. {
  58. flags = _flags;
  59. name.set(_name);
  60. type.set(_type);
  61. xpath.set(_xpath);
  62. splitXmlTagNamesFromXPath(_xpath, tagname);
  63. hasMixedContent = false;
  64. }
  65. DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, MemoryBuffer & in)
  66. {
  67. flags = _flags;
  68. in.read(hasMixedContent);
  69. in.read(name);
  70. in.read(xpath);
  71. type.setown(deserializeType(in));
  72. splitXmlTagNamesFromXPath(xpath.get(), tagname);
  73. }
  74. void DataSourceMetaItem::serialize(MemoryBuffer & out) const
  75. {
  76. out.append(flags);
  77. out.append(hasMixedContent);
  78. out.append(name);
  79. out.append(xpath);
  80. type->serialize(out);
  81. }
  82. //---------------------------------------------------------------------------
  83. DataSourceDatasetItem::DataSourceDatasetItem(const char * _name, const char * _xpath, IHqlExpression * expr) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(expr->queryRecord(), 0, true, false, false)
  84. {
  85. type.setown(makeTableType(NULL));
  86. name.set(_name);
  87. xpath.set(_xpath);
  88. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  89. if (!record.tagname.length())
  90. record.tagname.set("Row");
  91. }
  92. DataSourceDatasetItem::DataSourceDatasetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(in)
  93. {
  94. type.setown(makeTableType(NULL));
  95. in.read(name);
  96. }
  97. void DataSourceDatasetItem::serialize(MemoryBuffer & out) const
  98. {
  99. out.append(flags);
  100. record.serialize(out);
  101. out.append(name);
  102. }
  103. //---------------------------------------------------------------------------
  104. DataSourceSetItem::DataSourceSetItem(const char * _name, const char * _xpath, ITypeInfo * _type) : DataSourceMetaItem(FVFFset, _name, NULL, _type)
  105. {
  106. createChild();
  107. xpath.set(_xpath);
  108. StringBuffer attr;
  109. splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
  110. if (!_xpath)
  111. record.tagname.set("Item");
  112. }
  113. DataSourceSetItem::DataSourceSetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(flags, in)
  114. {
  115. createChild();
  116. splitXmlTagNamesFromXPath(xpath.get(), record.tagname, &tagname);
  117. if (!record.tagname.get())
  118. record.tagname.set("Item");
  119. }
  120. void DataSourceSetItem::createChild()
  121. {
  122. ITypeInfo * childType = type->queryChildType()->queryPromotedType();
  123. record.addSimpleField("Item", NULL, childType);
  124. }
  125. void DataSourceSetItem::serialize(MemoryBuffer & out) const
  126. {
  127. out.append(flags);
  128. record.serialize(out);
  129. out.append(name);
  130. }
  131. //---------------------------------------------------------------------------
  132. DataSourceMetaData::DataSourceMetaData(IHqlExpression * _record, byte _numFieldsToIgnore, bool _randomIsOk, bool _isGrouped, unsigned _keyedSize)
  133. {
  134. init();
  135. numFieldsToIgnore = _numFieldsToIgnore;
  136. randomIsOk = _randomIsOk;
  137. isStoredFixedWidth = true;
  138. //MORE: Blobs aren't handled correctly in indexes....
  139. maxRecordSize = ::getMaxRecordSize(_record, MAX_RECORD_SIZE);
  140. keyedSize = _keyedSize;
  141. gatherFields(_record, false, &hasMixedContent);
  142. if (_isGrouped)
  143. {
  144. Owned<ITypeInfo> type = makeBoolType();
  145. addSimpleField("__groupfollows__", NULL, type);
  146. maxRecordSize++;
  147. }
  148. gatherAttributes();
  149. if (isStoredFixedWidth)
  150. assertex(minRecordSize == maxRecordSize);
  151. }
  152. DataSourceMetaData::DataSourceMetaData()
  153. {
  154. init();
  155. randomIsOk = true;
  156. isStoredFixedWidth = true;
  157. }
  158. DataSourceMetaData::DataSourceMetaData(type_t typeCode)
  159. {
  160. init();
  161. OwnedITypeInfo type;
  162. if (typeCode == type_unicode)
  163. type.setown(makeUnicodeType(UNKNOWN_LENGTH, 0));
  164. else
  165. type.setown(makeStringType(UNKNOWN_LENGTH, NULL, NULL));
  166. fields.append(*new DataSourceMetaItem(FVFFnone, "line", NULL, type));
  167. }
  168. void DataSourceMetaData::init()
  169. {
  170. keyedSize = 0;
  171. minRecordSize = 0;
  172. maxRecordSize = 0;
  173. bitsRemaining = 0;
  174. numVirtualFields = 0;
  175. isStoredFixedWidth = false;
  176. randomIsOk = false;
  177. numFieldsToIgnore = 0;
  178. hasMixedContent = false;
  179. }
  180. DataSourceMetaData::DataSourceMetaData(MemoryBuffer & buffer)
  181. {
  182. numVirtualFields = 0;
  183. buffer.read(numFieldsToIgnore);
  184. buffer.read(randomIsOk);
  185. buffer.read(hasMixedContent);
  186. buffer.read(isStoredFixedWidth);
  187. buffer.read(minRecordSize);
  188. buffer.read(keyedSize);
  189. buffer.read(maxRecordSize);
  190. unsigned numFields;
  191. buffer.read(numFields);
  192. for (unsigned idx=0; idx < numFields; idx++)
  193. {
  194. byte flags;
  195. buffer.read(flags);
  196. if (flags == FVFFdataset)
  197. fields.append(*new DataSourceDatasetItem(flags, buffer));
  198. else if (flags == FVFFset)
  199. fields.append(*new DataSourceSetItem(flags, buffer));
  200. else
  201. fields.append(*new DataSourceMetaItem(flags, buffer));
  202. if (flags == FVFFvirtual)
  203. ++numVirtualFields;
  204. }
  205. gatherAttributes();
  206. }
  207. void DataSourceMetaData::addFileposition()
  208. {
  209. addVirtualField("__fileposition__", NULL, makeIntType(8, false));
  210. }
  211. void DataSourceMetaData::addSimpleField(const char * name, const char * xpath, ITypeInfo * type, unsigned flag)
  212. {
  213. ITypeInfo * promoted = type->queryPromotedType();
  214. unsigned size = promoted->getSize();
  215. unsigned thisBits = 0;
  216. if (size == UNKNOWN_LENGTH)
  217. {
  218. isStoredFixedWidth = false;
  219. switch (type->getTypeCode())
  220. {
  221. case type_set:
  222. minRecordSize += sizeof(bool) + sizeof(size32_t);
  223. break;
  224. case type_varstring:
  225. minRecordSize += 1;
  226. break;
  227. case type_packedint:
  228. minRecordSize += 1;
  229. break;
  230. case type_varunicode:
  231. minRecordSize += sizeof(UChar);
  232. break;
  233. default:
  234. minRecordSize += sizeof(size32_t);
  235. break;
  236. }
  237. }
  238. else if (type->getTypeCode() == type_bitfield)
  239. {
  240. thisBits = type->getBitSize();
  241. if (thisBits > bitsRemaining)
  242. {
  243. size = type->queryChildType()->getSize();
  244. minRecordSize += size;
  245. bitsRemaining = size * 8;
  246. }
  247. bitsRemaining -= thisBits;
  248. }
  249. else
  250. minRecordSize += size;
  251. if (thisBits == 0)
  252. bitsRemaining = 0;
  253. fields.append(*new DataSourceMetaItem(flag, name, xpath, type));
  254. }
  255. void DataSourceMetaData::addVirtualField(const char * name, const char * xpath, ITypeInfo * type)
  256. {
  257. fields.append(*new DataSourceMetaItem(FVFFvirtual, name, xpath, type));
  258. ++numVirtualFields;
  259. }
  260. void DataSourceMetaData::extractKeyedInfo(UnsignedArray & offsets, TypeInfoArray & types)
  261. {
  262. unsigned curOffset = 0;
  263. ForEachItemIn(i, fields)
  264. {
  265. if (curOffset >= keyedSize)
  266. break;
  267. DataSourceMetaItem & cur = fields.item(i);
  268. switch (cur.flags)
  269. {
  270. case FVFFnone:
  271. {
  272. offsets.append(curOffset);
  273. types.append(*LINK(cur.type));
  274. unsigned size = cur.type->getSize();
  275. assertex(size != UNKNOWN_LENGTH);
  276. curOffset += size;
  277. break;
  278. }
  279. case FVFFbeginrecord:
  280. case FVFFendrecord:
  281. break;
  282. default:
  283. throwUnexpected();
  284. }
  285. }
  286. offsets.append(curOffset);
  287. assertex(curOffset == keyedSize);
  288. }
  289. //MORE: Really this should create no_selects for the sub records, but pass on that for the moment.
  290. void DataSourceMetaData::gatherFields(IHqlExpression * expr, bool isConditional, bool *pMixedContent)
  291. {
  292. switch (expr->getOperator())
  293. {
  294. case no_record:
  295. gatherChildFields(expr, isConditional, pMixedContent);
  296. break;
  297. case no_ifblock:
  298. {
  299. OwnedITypeInfo boolType = makeBoolType();
  300. OwnedITypeInfo voidType = makeVoidType();
  301. isStoredFixedWidth = false;
  302. fields.append(*new DataSourceMetaItem(FVFFbeginif, NULL, NULL, boolType));
  303. gatherChildFields(expr->queryChild(1), true, pMixedContent);
  304. fields.append(*new DataSourceMetaItem(FVFFendif, NULL, NULL, voidType));
  305. break;
  306. }
  307. case no_field:
  308. {
  309. if (expr->hasAttribute(__ifblockAtom))
  310. break;
  311. Linked<ITypeInfo> type = expr->queryType();
  312. IAtom * name = expr->queryName();
  313. IHqlExpression * nameAttr = expr->queryAttribute(namedAtom);
  314. StringBuffer outname;
  315. if (nameAttr && nameAttr->queryChild(0)->queryValue())
  316. nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
  317. else
  318. outname.append(name).toLowerCase();
  319. StringBuffer xpathtext;
  320. const char * xpath = NULL;
  321. IHqlExpression * xpathAttr = expr->queryAttribute(xpathAtom);
  322. if (xpathAttr && xpathAttr->queryChild(0)->queryValue())
  323. xpath = xpathAttr->queryChild(0)->queryValue()->getStringValue(xpathtext);
  324. unsigned flag = FVFFnone;
  325. if (isKey() && expr->hasAttribute(blobAtom))
  326. {
  327. type.setown(makeIntType(8, false));
  328. flag = FVFFblob;
  329. }
  330. type_t tc = type->getTypeCode();
  331. if (tc == type_row)
  332. {
  333. OwnedITypeInfo voidType = makeVoidType();
  334. Owned<DataSourceMetaItem> begin = new DataSourceMetaItem(FVFFbeginrecord, outname, xpath, voidType);
  335. //inherit mixed content from child row with xpath('')
  336. bool *pItemMixedContent = (pMixedContent && xpath && !*xpath) ? pMixedContent : &(begin->hasMixedContent);
  337. fields.append(*begin.getClear());
  338. gatherChildFields(expr->queryRecord(), isConditional, pItemMixedContent);
  339. fields.append(*new DataSourceMetaItem(FVFFendrecord, outname, xpath, voidType));
  340. }
  341. else if ((tc == type_dictionary) || (tc == type_table) || (tc == type_groupedtable))
  342. {
  343. isStoredFixedWidth = false;
  344. Owned<DataSourceDatasetItem> ds = new DataSourceDatasetItem(outname, xpath, expr);
  345. if (pMixedContent && xpath && !*xpath)
  346. *pMixedContent = ds->queryChildMeta()->hasMixedContent;
  347. fields.append(*ds.getClear());
  348. }
  349. else if (tc == type_set)
  350. {
  351. isStoredFixedWidth = false;
  352. if (pMixedContent && xpath && !*xpath)
  353. *pMixedContent = true;
  354. fields.append(*new DataSourceSetItem(outname, xpath, type));
  355. }
  356. else
  357. {
  358. if (type->getTypeCode() == type_alien)
  359. {
  360. IHqlAlienTypeInfo * alien = queryAlienType(type);
  361. type.set(alien->queryPhysicalType());
  362. }
  363. if (pMixedContent && xpath && !*xpath)
  364. *pMixedContent = true;
  365. addSimpleField(outname, xpath, type, flag);
  366. }
  367. break;
  368. }
  369. }
  370. }
  371. void DataSourceMetaData::gatherChildFields(IHqlExpression * expr, bool isConditional, bool *pMixedContent)
  372. {
  373. bitsRemaining = 0;
  374. ForEachChild(idx, expr)
  375. gatherFields(expr->queryChild(idx), isConditional, pMixedContent);
  376. }
  377. unsigned DataSourceMetaData::numKeyedColumns() const
  378. {
  379. unsigned count = 0;
  380. unsigned curOffset = 0;
  381. ForEachItemIn(i, fields)
  382. {
  383. if (curOffset >= keyedSize)
  384. break;
  385. DataSourceMetaItem & cur = fields.item(i);
  386. switch (cur.flags)
  387. {
  388. case FVFFnone:
  389. {
  390. unsigned size = cur.type->getSize();
  391. assertex(size != UNKNOWN_LENGTH);
  392. curOffset += size;
  393. count++;
  394. break;
  395. }
  396. default:
  397. throwUnexpected();
  398. }
  399. }
  400. return count;
  401. assertex(curOffset == keyedSize);
  402. }
  403. unsigned DataSourceMetaData::numColumns() const
  404. {
  405. return fields.ordinality() - numFieldsToIgnore;
  406. }
  407. ITypeInfo * DataSourceMetaData::queryType(unsigned column) const
  408. {
  409. return fields.item(column).type;
  410. }
  411. unsigned DataSourceMetaData::queryFieldFlags(unsigned column) const
  412. {
  413. return fields.item(column).flags;
  414. }
  415. bool DataSourceMetaData::mixedContent(unsigned column) const
  416. {
  417. return fields.item(column).hasMixedContent;
  418. }
  419. bool DataSourceMetaData::mixedContent() const
  420. {
  421. return hasMixedContent;
  422. }
  423. const char * DataSourceMetaData::queryName(unsigned column) const
  424. {
  425. return fields.item(column).name;
  426. }
  427. const char * DataSourceMetaData::queryXPath(unsigned column) const
  428. {
  429. return fields.item(column).xpath;
  430. }
  431. const char * DataSourceMetaData::queryXmlTag(unsigned column) const
  432. {
  433. DataSourceMetaItem &item = fields.item(column);
  434. return (item.tagname.get()) ? item.tagname.get() : item.name.get();
  435. }
  436. const char *DataSourceMetaData::queryXmlTag() const
  437. {
  438. return (tagname.get()) ? tagname.get() : "Row";
  439. }
  440. void DataSourceMetaData::gatherNestedAttributes(DataSourceMetaItem &rec, aindex_t &idx)
  441. {
  442. aindex_t numItems = fields.ordinality();
  443. while (++idx < numItems)
  444. {
  445. DataSourceMetaItem &item = fields.item(idx);
  446. if (item.flags==FVFFendrecord)
  447. return;
  448. else if (item.flags==FVFFbeginrecord)
  449. gatherNestedAttributes((!item.tagname.get() || *item.tagname.get()) ? item : rec, idx);
  450. else if (item.isXmlAttribute())
  451. rec.nestedAttributes.append(idx);
  452. }
  453. }
  454. void DataSourceMetaData::gatherAttributes()
  455. {
  456. aindex_t numItems = fields.ordinality();
  457. for (aindex_t idx = 0; idx < numItems; ++idx)
  458. {
  459. DataSourceMetaItem &item = fields.item(idx);
  460. if (item.flags==FVFFbeginrecord)
  461. {
  462. if (!item.tagname.get() || *item.tagname.get())
  463. gatherNestedAttributes(item, idx);
  464. }
  465. else if (item.isXmlAttribute())
  466. attributes.append(idx);
  467. }
  468. }
  469. const IntArray &DataSourceMetaData::queryAttrList(unsigned column) const
  470. {
  471. return fields.item(column).nestedAttributes;
  472. }
  473. const IntArray &DataSourceMetaData::queryAttrList() const
  474. {
  475. return attributes;
  476. }
  477. IFvDataSourceMetaData * DataSourceMetaData::queryChildMeta(unsigned column) const
  478. {
  479. return fields.item(column).queryChildMeta();
  480. }
  481. IFvDataSource * DataSourceMetaData::createChildDataSource(unsigned column, unsigned len, const void * data)
  482. {
  483. DataSourceMetaData * childMeta = fields.item(column).queryChildMeta();
  484. if (childMeta)
  485. return new NestedDataSource(*childMeta, len, data);
  486. return NULL;
  487. }
  488. bool DataSourceMetaData::supportsRandomSeek() const
  489. {
  490. return randomIsOk;
  491. }
  492. void DataSourceMetaData::serialize(MemoryBuffer & buffer) const
  493. {
  494. //NB: Update NullDataSourceMeta if this changes....
  495. buffer.append(numFieldsToIgnore);
  496. buffer.append(randomIsOk);
  497. buffer.append(hasMixedContent);
  498. buffer.append(isStoredFixedWidth);
  499. buffer.append(minRecordSize);
  500. buffer.append(keyedSize);
  501. buffer.append(maxRecordSize);
  502. unsigned numFields = fields.ordinality();
  503. buffer.append(numFields);
  504. for (unsigned idx=0; idx < numFields; idx++)
  505. {
  506. fields.item(idx).serialize(buffer);
  507. }
  508. }
  509. size32_t DataSourceMetaData::getRecordSize(const void *rec)
  510. {
  511. if (isStoredFixedWidth)
  512. return minRecordSize;
  513. if (!rec)
  514. return maxRecordSize;
  515. const byte * data = (const byte *)rec;
  516. unsigned curOffset = 0;
  517. unsigned bitsRemaining = 0;
  518. unsigned max = fields.ordinality() - numVirtualFields;
  519. for (unsigned idx=0; idx < max; idx++)
  520. {
  521. ITypeInfo & type = *fields.item(idx).type;
  522. unsigned size = type.getSize();
  523. if (size == UNKNOWN_LENGTH)
  524. {
  525. const byte * cur = data + curOffset;
  526. switch (type.getTypeCode())
  527. {
  528. case type_data:
  529. case type_string:
  530. case type_table:
  531. case type_groupedtable:
  532. size = *((unsigned *)cur) + sizeof(unsigned);
  533. break;
  534. case type_set:
  535. size = *((unsigned *)(cur + sizeof(bool))) + sizeof(unsigned) + sizeof(bool);
  536. break;
  537. case type_qstring:
  538. size = rtlQStrSize(*((unsigned *)cur)) + sizeof(unsigned);
  539. break;
  540. case type_unicode:
  541. size = *((unsigned *)cur)*2 + sizeof(unsigned);
  542. break;
  543. case type_utf8:
  544. size = sizeof(unsigned) + rtlUtf8Size(*(unsigned *)cur, cur+sizeof(unsigned));
  545. break;
  546. case type_varstring:
  547. size = strlen((char *)cur)+1;
  548. break;
  549. case type_varunicode:
  550. size = (rtlUnicodeStrlen((UChar *)cur)+1)*2;
  551. break;
  552. case type_packedint:
  553. size = rtlGetPackedSize(cur);
  554. break;
  555. default:
  556. UNIMPLEMENTED;
  557. }
  558. }
  559. if (type.getTypeCode() == type_bitfield)
  560. {
  561. unsigned thisBits = type.getBitSize();
  562. if (thisBits > bitsRemaining)
  563. {
  564. size = type.queryChildType()->getSize();
  565. bitsRemaining = size * 8;
  566. }
  567. else
  568. size = 0;
  569. bitsRemaining -= thisBits;
  570. }
  571. else
  572. bitsRemaining = 0;
  573. curOffset += size;
  574. }
  575. return curOffset;
  576. }
  577. size32_t DataSourceMetaData::getFixedSize() const
  578. {
  579. if (isStoredFixedWidth)
  580. return minRecordSize;
  581. return 0;
  582. }
  583. size32_t DataSourceMetaData::getMinRecordSize() const
  584. {
  585. return minRecordSize;
  586. }
  587. IFvDataSourceMetaData * deserializeDataSourceMeta(MemoryBuffer & in)
  588. {
  589. return new DataSourceMetaData(in);
  590. }
  591. //---------------------------------------------------------------------------
  592. RowBlock::RowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset)
  593. {
  594. buffer.swapWith(_buffer);
  595. start = _start;
  596. startOffset = _startOffset;
  597. numRows = 0;
  598. }
  599. RowBlock::RowBlock(__int64 _start, __int64 _startOffset)
  600. {
  601. start = _start;
  602. startOffset = _startOffset;
  603. numRows = 0;
  604. }
  605. void RowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  606. {
  607. row = getNextRow();
  608. offset = startOffset + buffer.length();
  609. }
  610. FixedRowBlock::FixedRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, size32_t _fixedRecordSize) : RowBlock(_buffer, _start, _startOffset)
  611. {
  612. if (_fixedRecordSize == 0) _fixedRecordSize = 1;
  613. fixedRecordSize = _fixedRecordSize;
  614. numRows = buffer.length() / fixedRecordSize;
  615. }
  616. const void * FixedRowBlock::fetchRow(__int64 offset, size32_t & len)
  617. {
  618. __int64 maxOffset = startOffset + buffer.length();
  619. if (offset < startOffset || offset >= maxOffset)
  620. return NULL;
  621. len = fixedRecordSize;
  622. return buffer.toByteArray() + (offset - startOffset);
  623. }
  624. const void * FixedRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  625. {
  626. if (row < start || row >= start + numRows)
  627. return NULL;
  628. unsigned index = (unsigned)(row - start);
  629. unsigned blockOffset = index * fixedRecordSize;
  630. len = fixedRecordSize;
  631. rowOffset = startOffset + blockOffset;
  632. return buffer.toByteArray() + blockOffset;
  633. }
  634. VariableRowBlock::VariableRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, IRecordSizeEx * recordSize, bool isLast) : RowBlock(_buffer, _start, _startOffset)
  635. {
  636. const char * buff = buffer.toByteArray();
  637. unsigned cur = 0;
  638. unsigned max = buffer.length();
  639. unsigned maxCur = max;
  640. if (!isLast)
  641. maxCur -= recordSize->getRecordSize(NULL);
  642. while (cur < maxCur)
  643. {
  644. rowIndex.append(cur);
  645. cur += recordSize->getRecordSize(max-cur, buff + cur);
  646. if (cur > max)
  647. throwError(FVERR_RowTooLarge);
  648. }
  649. buffer.setLength(cur);
  650. rowIndex.append(cur);
  651. numRows = rowIndex.ordinality()-1;
  652. }
  653. VariableRowBlock::VariableRowBlock(MemoryBuffer & inBuffer, __int64 _start) : RowBlock(_start, 0)
  654. {
  655. inBuffer.read(numRows);
  656. for (unsigned row = 0; row < numRows; row++)
  657. {
  658. unsigned thisLength;
  659. rowIndex.append(buffer.length());
  660. inBuffer.read(thisLength);
  661. buffer.append(thisLength, inBuffer.readDirect(thisLength));
  662. }
  663. rowIndex.append(buffer.length());
  664. }
  665. const void * VariableRowBlock::fetchRow(__int64 offset, size32_t & len)
  666. {
  667. __int64 maxOffset = startOffset + buffer.length();
  668. if (offset < startOffset || offset >= maxOffset)
  669. return NULL;
  670. size32_t rowOffset = (size32_t)(offset - startOffset);
  671. unsigned pos = rowIndex.find(rowOffset);
  672. if (pos == NotFound)
  673. return NULL;
  674. len = rowIndex.item(pos+1)-rowOffset;
  675. return buffer.toByteArray() + rowOffset;
  676. }
  677. const void * VariableRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
  678. {
  679. if (row < start || row >= start + numRows)
  680. return NULL;
  681. unsigned index = (unsigned)(row - start);
  682. unsigned blockOffset = rowIndex.item(index);
  683. len = rowIndex.item(index+1) - blockOffset;
  684. rowOffset = startOffset + blockOffset;
  685. return buffer.toByteArray() + blockOffset;
  686. }
  687. //---------------------------------------------------------------------------
  688. offset_t calculateNextOffset(const char * data, unsigned len)
  689. {
  690. offset_t offset = *(offset_t *)(data + len - sizeof(offset_t) - sizeof(unsigned short));
  691. return offset + *(unsigned short*)(data + len - sizeof(unsigned short));
  692. }
  693. void FilePosFixedRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  694. {
  695. row = start + numRows;
  696. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  697. }
  698. void FilePosVariableRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
  699. {
  700. row = start + numRows;
  701. offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
  702. }
  703. //---------------------------------------------------------------------------
  704. static int rowCacheId;
  705. void RowCache::addRowsOwn(RowBlock * rows)
  706. {
  707. if (allRows.ordinality() == MaxBlocksCached)
  708. makeRoom();
  709. unsigned newPos = getInsertPosition(rows->getStartRow());
  710. allRows.add(*rows, newPos);
  711. ages.add(++rowCacheId, newPos);
  712. }
  713. bool RowCache::getCacheRow(__int64 row, RowLocation & location)
  714. {
  715. unsigned numRows = allRows.ordinality();
  716. if (numRows == 0)
  717. return false;
  718. const RowBlock & first = allRows.item(0);
  719. if (row < first.getStartRow())
  720. {
  721. location.bestRow = 0;
  722. location.bestOffset = 0;
  723. return false;
  724. }
  725. unsigned best = getBestRow(row);
  726. ages.replace(++rowCacheId, best);
  727. RowBlock & cur = allRows.item(best);
  728. location.matchRow = cur.getRow(row, location.matchLength, location.bestOffset);
  729. if (location.matchRow)
  730. return true;
  731. if (location.bestRow < cur.getNextRow())
  732. {
  733. cur.getNextStoredOffset(location.bestRow, location.bestOffset);
  734. }
  735. return false;
  736. }
  737. //Find the rowBlock that contains the expected row.
  738. //Return the *previous* RowBlock if no match is found.
  739. unsigned RowCache::getBestRow(__int64 row)
  740. {
  741. unsigned max = allRows.ordinality();
  742. int start = 0;
  743. int end = max;
  744. while (end - start > 1)
  745. {
  746. int mid = (start + end) >> 1;
  747. RowBlock & cur = allRows.item(mid);
  748. if (row >= cur.getStartRow())
  749. start = mid;
  750. else
  751. end = mid;
  752. }
  753. assertex(row >= allRows.item(start).getStartRow());
  754. assertex(start != max);
  755. return start;
  756. }
  757. unsigned RowCache::getInsertPosition(__int64 row)
  758. {
  759. unsigned max = allRows.ordinality();
  760. int start = 0;
  761. int end = max;
  762. while (end - start > 1)
  763. {
  764. int mid = (start + end) >> 1;
  765. RowBlock & cur = allRows.item(mid);
  766. if (row >= cur.getStartRow())
  767. start = mid;
  768. else
  769. end = mid;
  770. }
  771. if (start != max)
  772. {
  773. if (row >= allRows.item(start).getStartRow())
  774. start++;
  775. }
  776. assertex(start == max || (row < allRows.item(start).getStartRow()));
  777. return start;
  778. }
  779. void RowCache::makeRoom()
  780. {
  781. #if 0
  782. //For testing the caching by throwing away random blocks
  783. while (allRows.ordinality() > MinBlocksCached)
  784. {
  785. unsigned index = getRandom() % allRows.ordinality();
  786. allRows.remove(index);
  787. ages.remove(index);
  788. }
  789. return;
  790. #endif
  791. unsigned numToFree = allRows.ordinality() - MinBlocksCached;
  792. RowBlock * * oldestRow = new RowBlock * [numToFree];
  793. __int64 * oldestAge = new __int64[numToFree];
  794. unsigned numGot = 0;
  795. ForEachItemIn(idx, ages)
  796. {
  797. __int64 curAge = ages.item(idx);
  798. unsigned compare = numGot;
  799. while (compare != 0)
  800. {
  801. if (curAge >= oldestAge[compare-1])
  802. break;
  803. compare--;
  804. }
  805. if (compare < numToFree)
  806. {
  807. unsigned copySize = numGot - compare;
  808. if (numGot == numToFree)
  809. copySize--; //NB: Cannot go negative because compare < numToFree, numToFree == numGot => compare < numGot
  810. if (copySize)
  811. {
  812. memmove(oldestAge + compare + 1, oldestAge + compare, copySize * sizeof(*oldestAge));
  813. memmove(oldestRow + compare + 1, oldestRow + compare, copySize * sizeof(*oldestRow));
  814. }
  815. oldestAge[compare] = curAge;
  816. oldestRow[compare] = &allRows.item(idx);
  817. if (numGot != numToFree)
  818. numGot++;
  819. }
  820. }
  821. unsigned max = allRows.ordinality();
  822. unsigned i;
  823. for (i = 0; i < max; )
  824. {
  825. for (unsigned j = 0; j < numGot; j++)
  826. {
  827. if (oldestRow[j] == &allRows.item(i))
  828. {
  829. allRows.remove(i);
  830. ages.remove(i);
  831. max--;
  832. goto testNext;
  833. }
  834. }
  835. i++;
  836. testNext: ;
  837. }
  838. delete [] oldestRow;
  839. delete [] oldestAge;
  840. }
  841. //---------------------------------------------------------------------------
  842. FVDataSource::FVDataSource()
  843. {
  844. transformer = NULL;
  845. extraFieldsSize = 0;
  846. openCount = 0;
  847. appendFileposition = false;
  848. }
  849. FVDataSource::~FVDataSource()
  850. {
  851. //ensure all refs into the dll are cleared before it is unloaded
  852. returnedRecordSize.clear();
  853. }
  854. void FVDataSource::addFileposition()
  855. {
  856. appendFileposition = true;
  857. transformedMeta->addFileposition(); // This may modify the other metas as well!
  858. }
  859. void FVDataSource::copyRow(MemoryBuffer & out, const void * src, size32_t length)
  860. {
  861. if (transformer)
  862. {
  863. unsigned curLen = out.length();
  864. size32_t maxSize = transformedMeta->getMaxRecordSize();
  865. void * target = out.reserve(maxSize);
  866. //MORE: won't cope with dynamic maxlengths
  867. RtlStaticRowBuilder rowBuilder(target, maxSize);
  868. unsigned copied = transformer(rowBuilder, (const byte *)src);
  869. out.rewrite(curLen + copied);
  870. }
  871. else
  872. out.append(length, src);
  873. }
  874. bool FVDataSource::fetchRow(MemoryBuffer & out, __int64 offset)
  875. {
  876. if (!transformer)
  877. return fetchRawRow(out, offset);
  878. MemoryBuffer temp;
  879. if (fetchRowData(temp, offset))
  880. {
  881. copyRow(out, temp.toByteArray(), temp.length());
  882. if (appendFileposition)
  883. out.append(offset);
  884. return true;
  885. }
  886. return false;
  887. }
  888. bool FVDataSource::fetchRawRow(MemoryBuffer & out, __int64 offset)
  889. {
  890. if (fetchRowData(out, offset))
  891. {
  892. if (appendFileposition)
  893. out.append(offset);
  894. return true;
  895. }
  896. return false;
  897. }
  898. bool FVDataSource::getRow(MemoryBuffer & out, __int64 row)
  899. {
  900. size32_t length;
  901. const void * data;
  902. unsigned __int64 offset = 0;
  903. if (getRowData(row, length, data, offset))
  904. {
  905. copyRow(out, data, length);
  906. if (appendFileposition)
  907. out.append(offset);
  908. return true;
  909. }
  910. return false;
  911. }
  912. bool FVDataSource::getRawRow(MemoryBuffer & out, __int64 row)
  913. {
  914. size32_t length;
  915. const void * data;
  916. unsigned __int64 offset = 0;
  917. if (getRowData(row, length, data, offset))
  918. {
  919. out.append(length-extraFieldsSize, data);
  920. if (appendFileposition)
  921. out.append(offset);
  922. return true;
  923. }
  924. return false;
  925. }
  926. void FVDataSource::loadDll(const char * wuid)
  927. {
  928. //MORE: This code should be commoned up and available in the work unit code or something...
  929. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  930. Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid);
  931. //Plugins should already be loaded when they were registered with the ViewTransformerRegistry
  932. //Something like the following code could be used to check the plugin version...
  933. #if 0
  934. Owned<IConstWUPluginIterator> plugins = &wu->getPlugins();
  935. SafePluginMap * map = NULL;
  936. ForEach(*plugins)
  937. {
  938. IConstWUPlugin &thisplugin = plugins->query();
  939. SCMStringBuffer name, version;
  940. thisplugin.getPluginName(name);
  941. thisplugin.getPluginVersion(version);
  942. Owned<ILoadedDllEntry> loadedPlugin = map->getPluginDll(name.str(), version.str(), true);
  943. if (!loadedPlugin)
  944. break;
  945. }
  946. #endif
  947. Owned<IConstWUQuery> q = wu->getQuery();
  948. SCMStringBuffer dllname;
  949. q->getQueryDllName(dllname);
  950. // MORE - leaks....
  951. loadedDll.setown(queryDllServer().loadDll(dllname.str(), DllLocationAnywhere));
  952. }
  953. IFvDataSourceMetaData * FVDataSource::queryMetaData()
  954. {
  955. return transformedMeta;
  956. }
  957. bool FVDataSource::setReturnedInfoFromResult()
  958. {
  959. SCMStringBuffer s;
  960. wuResult->getResultEclSchema(s);
  961. returnedRecord.setown(parseQuery(s.str()));
  962. if (!returnedRecord)
  963. throw MakeStringException(FVERR_CouldNotProcessSchema, "Could not process result schema [%s]", s.str());
  964. bool isKey = false;
  965. bool isGrouped = false; // this isn't strictly true...it could be true for an internal result, but no current flag to test
  966. returnedMeta.setown(new DataSourceMetaData(returnedRecord, 0, true, isGrouped, 0));
  967. transformedRecord.setown(getFileViewerRecord(returnedRecord, isKey));
  968. if (!transformedRecord)
  969. {
  970. //No transformations needed, so don't need to loaded the dll containing the transform functions etc.
  971. transformedRecord.set(returnedRecord);
  972. returnedRecordSize.set(returnedMeta);
  973. }
  974. else
  975. {
  976. loadDll(wuid);
  977. s.clear();
  978. wuResult->getResultRecordSizeEntry(s);
  979. if (s.length())
  980. {
  981. typedef IRecordSize * (* recSizeFunc)();
  982. recSizeFunc func = (recSizeFunc)loadedDll->getEntry(s.str());
  983. Owned<IRecordSize> createdRecordSize = func();
  984. returnedRecordSize.setown(new RecordSizeToEx(createdRecordSize));
  985. }
  986. s.clear();
  987. wuResult->getResultTransformerEntry(s);
  988. if (s.length())
  989. transformer = (rowTransformFunction)loadedDll->getEntry(s.str());
  990. }
  991. transformedMeta.setown(new DataSourceMetaData(transformedRecord, 0, true, isGrouped, 0));
  992. return (returnedRecordSize != NULL);
  993. }
  994. //---------------------------------------------------------------------------
  995. __int64 PagedDataSource::numRows(bool force)
  996. {
  997. if (force && (totalRows == UNKNOWN_NUM_ROWS))
  998. {
  999. //MORE: Need to go and work it out - however painful...
  1000. }
  1001. return totalRows;
  1002. }
  1003. bool PagedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  1004. {
  1005. if ((row < 0) || ((unsigned __int64)row > totalRows))
  1006. return false;
  1007. RowLocation location;
  1008. for (;;)
  1009. {
  1010. if (cache.getCacheRow(row, location))
  1011. {
  1012. length = location.matchLength;
  1013. data = location.matchRow;
  1014. offset = location.bestOffset;
  1015. return true;
  1016. }
  1017. improveLocation(row, location);
  1018. if (!loadBlock(location.bestRow, location.bestOffset))
  1019. return false;
  1020. }
  1021. }
  1022. void PagedDataSource::improveLocation(__int64 row, RowLocation & location)
  1023. {
  1024. }
  1025. //---------------------------------------------------------------------------
  1026. NestedDataSource::NestedDataSource(DataSourceMetaData & meta, unsigned len, const void * data)
  1027. {
  1028. returnedMeta.set(&meta);
  1029. returnedRecordSize.set(returnedMeta);
  1030. transformedMeta.set(returnedMeta);
  1031. totalSize = len;
  1032. MemoryBuffer temp;
  1033. temp.append(len, data);
  1034. if (returnedMeta->isFixedSize())
  1035. rows.setown(new FixedRowBlock(temp, 0, 0, returnedMeta->fixedSize()));
  1036. else
  1037. rows.setown(new VariableRowBlock(temp, 0, 0, returnedRecordSize, true));
  1038. }
  1039. bool NestedDataSource::init()
  1040. {
  1041. return true;
  1042. }
  1043. __int64 NestedDataSource::numRows(bool force)
  1044. {
  1045. return rows->getNextRow() - rows->getStartRow();
  1046. }
  1047. bool NestedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  1048. {
  1049. data = rows->getRow(row, length, offset);
  1050. return (data != NULL);
  1051. }
  1052. //---------------------------------------------------------------------------
  1053. NullDataSource::NullDataSource(IHqlExpression * _record, bool _isGrouped, unsigned _keyedSize)
  1054. : meta(_record, 0, true, _isGrouped, _keyedSize)
  1055. {
  1056. }
  1057. FailureDataSource::FailureDataSource(IHqlExpression * _record, IException * _error, bool _isGrouped, unsigned _keyedSize)
  1058. : NullDataSource(_record, _isGrouped, _keyedSize), error(_error)
  1059. {
  1060. }
  1061. //---------------------------------------------------------------------------
  1062. IHqlExpression * parseQuery(const char * text)
  1063. {
  1064. MultiErrorReceiver errs;
  1065. OwnedHqlExpr ret = ::parseQuery(text, &errs);
  1066. if (errs.errCount() == 0)
  1067. return ret.getClear();
  1068. for (unsigned i=0; i < errs.errCount(); i++)
  1069. {
  1070. StringBuffer msg;
  1071. PrintLog("%d %s", errs.item(i)->getLine(), errs.item(i)->errorMessage(msg).str());
  1072. }
  1073. return NULL;
  1074. }
  1075. IFvDataSourceMetaData * createMetaData(IConstWUResult * wuResult)
  1076. {
  1077. SCMStringBuffer s;
  1078. wuResult->getResultEclSchema(s);
  1079. OwnedHqlExpr record = parseQuery(s.str());
  1080. if (!record)
  1081. throw MakeStringException(FVERR_CouldNotProcessSchema, "Could not process result schema [%s]", s.str());
  1082. OwnedHqlExpr simplifiedRecord = getFileViewerRecord(record, false);
  1083. bool isGrouped = false; // more not sure this is strictly true...
  1084. if (!simplifiedRecord)
  1085. return new DataSourceMetaData(record, 0, true, isGrouped, 0);
  1086. return new DataSourceMetaData(simplifiedRecord, 0, true, isGrouped, 0);
  1087. }