fvidxsource.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "eclrtl_imp.hpp"
  15. #include "hqlexpr.hpp"
  16. #include "fileview.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fvidxsource.ipp"
  19. #include "fverror.hpp"
  20. #include "dasess.hpp"
  21. #include "rtlrecord.hpp"
  22. #include "rtldynfield.hpp"
  23. //cloned from hthor - a candidate for commoning up.
  24. static IKeyIndex *openKeyFile(IDistributedFilePart *keyFile)
  25. {
  26. unsigned numCopies = keyFile->numCopies();
  27. assertex(numCopies);
  28. for (unsigned copy=0; copy < numCopies; copy++)
  29. {
  30. RemoteFilename rfn;
  31. try
  32. {
  33. OwnedIFile ifile = createIFile(keyFile->getFilename(rfn,copy));
  34. unsigned __int64 thissize = ifile->size();
  35. if (thissize != -1)
  36. {
  37. StringBuffer remotePath;
  38. rfn.getRemotePath(remotePath);
  39. unsigned crc = 0;
  40. keyFile->getCrc(crc);
  41. return createKeyIndex(remotePath.str(), crc, false, false);
  42. }
  43. }
  44. catch (IException *E)
  45. {
  46. EXCLOG(E, "While opening index file");
  47. E->Release();
  48. }
  49. }
  50. RemoteFilename rfn;
  51. StringBuffer url;
  52. keyFile->getFilename(rfn).getRemotePath(url);
  53. throw MakeStringException(1001, "Could not open key file at %s%s", url.str(), (numCopies > 1) ? " or any alternate location." : ".");
  54. }
  55. //---------------------------------------------------------------------------
  56. #define MIN_CACHED_ROWS 150
  57. #define MAX_CACHED_ROWS 200
  58. IndexPageCache::IndexPageCache()
  59. {
  60. firstRow = 0;
  61. offsetDelta = 0;
  62. offsets.append(0);
  63. saved.ensureCapacity(0x10000);
  64. }
  65. void IndexPageCache::addRow(__int64 row, size32_t len, const void * data)
  66. {
  67. if (row != firstRow + numRowsCached())
  68. {
  69. firstRow = row;
  70. offsetDelta = 0;
  71. offsets.kill();
  72. offsets.append(0);
  73. saved.setWritePos(0);
  74. }
  75. else if (numRowsCached() >= MAX_CACHED_ROWS)
  76. {
  77. unsigned numToRemove = numRowsCached() - MIN_CACHED_ROWS;
  78. __int64 newDelta = offsets.item(numToRemove);
  79. size32_t sizeLost = (size32_t)(newDelta-offsetDelta);
  80. //copy the cached rows
  81. byte * base = (byte *)saved.bufferBase();
  82. memmove(base, base+sizeLost, saved.length()-sizeLost);
  83. saved.setWritePos(saved.length()-sizeLost);
  84. offsets.removen(0, numToRemove);
  85. firstRow += numToRemove;
  86. offsetDelta = newDelta;
  87. }
  88. assertex(row == firstRow + numRowsCached());
  89. assertex(offsets.tos() == saved.length() + offsetDelta);
  90. saved.append(len, data);
  91. offsets.append(saved.length() + offsetDelta);
  92. }
  93. bool IndexPageCache::getRow(__int64 row, size32_t & len, const void * & data)
  94. {
  95. if (row < firstRow || row >= firstRow + numRowsCached())
  96. return false;
  97. unsigned __int64 startOffset = offsets.item((unsigned)(row-firstRow));
  98. unsigned __int64 endOffset = offsets.item((unsigned)(row-firstRow+1));
  99. len = (size32_t)(endOffset - startOffset);
  100. data = saved.toByteArray() + (unsigned)(startOffset - offsetDelta);
  101. return true;
  102. }
  103. //---------------------------------------------------------------------------
  104. //Could probably cope with unknown record, just displaying as binary.
  105. IndexDataSource::IndexDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password)
  106. {
  107. logicalName.set(_logicalName);
  108. diskRecord.set(_diskRecord);
  109. deserializer.setown(createRtlFieldTypeDeserializer(nullptr));
  110. diskRecordMeta.setown(new CDynamicOutputMetaData(* static_cast<const RtlRecordTypeInfo *>(queryRtlType(*deserializer.get(), diskRecord))));
  111. Owned<IUserDescriptor> udesc;
  112. if(_username != NULL && *_username != '\0')
  113. {
  114. udesc.setown(createUserDescriptor());
  115. udesc->set(_username, _password);
  116. }
  117. df.setown(queryDistributedFileDirectory().lookup(logicalName, udesc.get()));
  118. filtered = false;
  119. }
  120. IndexDataSource::IndexDataSource(IndexDataSource * _other)
  121. {
  122. logicalName.set(_other->logicalName);
  123. diskRecord.set(_other->diskRecord);
  124. deserializer.set(_other->deserializer);
  125. diskRecordMeta.set(_other->diskRecordMeta);
  126. df.set(_other->df);
  127. original.set(_other); // stop any work units etc. being unloaded.
  128. diskMeta.set(_other->diskMeta); // optimization - would be handled by init anyway
  129. filtered = false;
  130. //MORE: What else needs cloning/initializing?
  131. }
  132. IFvDataSource * IndexDataSource::cloneForFilter()
  133. {
  134. Owned<IndexDataSource> ret = new IndexDataSource(this);
  135. if (ret->init())
  136. return ret.getClear();
  137. return NULL;
  138. }
  139. bool IndexDataSource::init()
  140. {
  141. if (!df)
  142. return false;
  143. numParts = df->numParts();
  144. singlePart = (numParts == 1);
  145. ignoreSkippedRows = true; // better if skipping to a particular point
  146. StringBuffer partName;
  147. Owned<IDistributedFilePart> kf = df->getPart(numParts-1);
  148. tlk.setown(openKeyFile(kf));
  149. if (!tlk)
  150. return false;
  151. IPropertyTree & properties = df->queryAttributes();
  152. //Need to assign the transformed record to meta
  153. if (!diskMeta)
  154. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, false, tlk->keyedSize()));
  155. if (!returnedMeta)
  156. {
  157. returnedMeta.set(diskMeta);
  158. returnedRecordSize.set(returnedMeta);
  159. }
  160. if (!transformedMeta)
  161. transformedMeta.set(returnedMeta);
  162. if (properties.hasProp("@recordCount"))
  163. totalRows = properties.getPropInt64("@recordCount");
  164. else
  165. totalRows = UNKNOWN_NUM_ROWS; // more: could probably count them
  166. isLocal = properties.hasProp("@local");
  167. diskMeta->extractKeyedInfo(keyedOffsets, keyedTypes);
  168. ForEachItemIn(i, keyedTypes)
  169. {
  170. IStringSet * set = createRtlStringSet(fieldSize(i));
  171. set->addAll();
  172. values.append(*set);
  173. }
  174. diskMeta->patchIndexFileposition(); // Now returned as a bigendian field on the end of the row
  175. //Default cursor if no filter is applied
  176. applyFilter();
  177. return true;
  178. }
  179. __int64 IndexDataSource::numRows(bool force)
  180. {
  181. if (!filtered)
  182. return totalRows;
  183. //If leading component isn't filtered, then this can take a very long time...
  184. if (!force && values.item(0).isFullSet())
  185. return UNKNOWN_NUM_ROWS;
  186. __int64 total = 0;
  187. ForEachItemIn(i, matchingParts)
  188. {
  189. manager->setKey(NULL);
  190. curPart.clear();
  191. if (singlePart)
  192. curPart.set(tlk);
  193. else
  194. {
  195. Owned<IDistributedFilePart> kf = df->getPart(matchingParts.item(i));
  196. curPart.setown(openKeyFile(kf));
  197. if (!curPart)
  198. {
  199. total = UNKNOWN_NUM_ROWS;
  200. break;
  201. }
  202. }
  203. manager->setKey(curPart);
  204. manager->reset();
  205. total += manager->getCount();
  206. }
  207. manager->setKey(NULL);
  208. curPart.clear();
  209. resetCursor();
  210. return total;
  211. }
  212. bool IndexDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  213. {
  214. if (cache.getRow(row, length, data))
  215. return true;
  216. if (row < 0)
  217. return false;
  218. if ((unsigned __int64)row < nextRowToRead)
  219. resetCursor();
  220. MemoryBuffer temp;
  221. while ((unsigned __int64)row >= nextRowToRead)
  222. {
  223. bool saveRow = !ignoreSkippedRows || (row == nextRowToRead);
  224. if (!getNextRow(temp.clear(), saveRow))
  225. return false;
  226. if (saveRow)
  227. cache.addRow(nextRowToRead, temp.length(), temp.toByteArray());
  228. nextRowToRead++;
  229. }
  230. return cache.getRow(row, length, data);
  231. }
  232. bool IndexDataSource::getNextRow(MemoryBuffer & out, bool extractRow)
  233. {
  234. bool nextPart = !matchingParts.isItem((unsigned)curPartIndex);
  235. for (;;)
  236. {
  237. if (nextPart)
  238. {
  239. if ((curPartIndex != -1) && ((unsigned)curPartIndex >= matchingParts.ordinality()))
  240. return false;
  241. manager->setKey(NULL);
  242. curPart.clear();
  243. if ((unsigned)++curPartIndex >= matchingParts.ordinality())
  244. return false;
  245. if (singlePart)
  246. curPart.set(tlk);
  247. else
  248. {
  249. Owned<IDistributedFilePart> kf = df->getPart(matchingParts.item(curPartIndex));
  250. curPart.setown(openKeyFile(kf));
  251. if (!curPart)
  252. return false;
  253. }
  254. manager->setKey(curPart);
  255. manager->reset();
  256. }
  257. if (manager->lookup(true))
  258. {
  259. if (extractRow)
  260. {
  261. if (false)
  262. {
  263. //MORE: Allow a transformer to cope with blobs etc.
  264. }
  265. else
  266. {
  267. const byte * thisRow = manager->queryKeyBuffer();
  268. unsigned thisSize = diskMeta->getRecordSize(thisRow);
  269. void * temp = out.reserve(thisSize);
  270. memcpy(temp, thisRow, thisSize);
  271. }
  272. }
  273. return true;
  274. }
  275. nextPart = true;
  276. }
  277. }
  278. void IndexDataSource::resetCursor()
  279. {
  280. curPartIndex = -1;
  281. nextRowToRead = 0;
  282. }
  283. bool IndexDataSource::addFilter(unsigned column, unsigned matchLen, unsigned sizeData, const void * data)
  284. {
  285. if (!values.isItem(column))
  286. return false;
  287. unsigned curSize = fieldSize(column);
  288. IStringSet & set = values.item(column);
  289. if (set.isFullSet())
  290. set.reset();
  291. ITypeInfo & cur = keyedTypes.item(column);
  292. rtlDataAttr tempLow, tempHigh;
  293. unsigned keyedSize = cur.getSize();
  294. byte * temp = (byte *)alloca(keyedSize);
  295. const void * low = data;
  296. const void * high = data;
  297. type_t tc = cur.getTypeCode();
  298. switch (tc)
  299. {
  300. case type_int:
  301. case type_swapint:
  302. {
  303. assertex(sizeData == curSize);
  304. // values are already converted to bigendian and correctly biased
  305. break;
  306. }
  307. case type_varstring:
  308. //should cast from string to varstring
  309. break;
  310. case type_string:
  311. case type_data:
  312. {
  313. const char * inbuff = (const char *)data;
  314. if (matchLen != FullStringMatch)
  315. {
  316. unsigned lenLow, lenHigh;
  317. rtlCreateRangeLow(lenLow, tempLow.refstr(), curSize, matchLen, sizeData, inbuff);
  318. rtlCreateRangeHigh(lenHigh, tempHigh.refstr(), curSize, matchLen, sizeData, inbuff);
  319. low = tempLow.getdata();
  320. high = tempHigh.getdata();
  321. }
  322. else
  323. {
  324. if (tc == type_string)
  325. {
  326. //may need to cast from ascii to ebcidic
  327. rtlStrToStr(curSize, temp, sizeData, data);
  328. }
  329. else
  330. rtlDataToData(curSize, temp, sizeData, data);
  331. low = high = temp;
  332. }
  333. break;
  334. }
  335. case type_qstring:
  336. {
  337. const char * inbuff = (const char *)data;
  338. unsigned lenData = rtlQStrLength(sizeData);
  339. unsigned lenField = cur.getStringLen();
  340. if (matchLen != FullStringMatch)
  341. {
  342. unsigned lenLow, lenHigh;
  343. rtlCreateQStrRangeLow(lenLow, tempLow.refstr(), lenField, matchLen, lenData, inbuff);
  344. rtlCreateQStrRangeHigh(lenHigh, tempHigh.refstr(), lenField, matchLen, lenData, inbuff);
  345. low = tempLow.getdata();
  346. high = tempHigh.getdata();
  347. }
  348. else
  349. {
  350. rtlQStrToQStr(lenField, (char *)temp, lenData, (const char *)data);
  351. low = high = temp;
  352. }
  353. break;
  354. }
  355. case type_unicode:
  356. {
  357. const char * inbuff = (const char *)data;
  358. unsigned lenData = sizeData / sizeof(UChar);
  359. unsigned lenField = cur.getStringLen();
  360. if (matchLen != FullStringMatch)
  361. {
  362. unsigned lenLow, lenHigh;
  363. rtlCreateUnicodeRangeLow(lenLow, tempLow.refustr(), lenField, matchLen, lenData, (const UChar *)inbuff);
  364. rtlCreateUnicodeRangeHigh(lenHigh, tempHigh.refustr(), lenField, matchLen, lenData, (const UChar *)inbuff);
  365. low = tempLow.getdata();
  366. high = tempHigh.getdata();
  367. }
  368. else
  369. {
  370. rtlUnicodeToUnicode(lenField, (UChar *)temp, lenData, (const UChar *)data);
  371. low = high = temp;
  372. }
  373. break;
  374. }
  375. default:
  376. assertex(sizeData == curSize);
  377. break;
  378. }
  379. set.addRange(low, high);
  380. filtered = true;
  381. return true;
  382. }
  383. void IndexDataSource::applyFilter()
  384. {
  385. manager.setown(createLocalKeyManager(diskRecordMeta->queryRecordAccessor(true), tlk, NULL));
  386. ForEachItemIn(i, values)
  387. {
  388. IStringSet & cur = values.item(i);
  389. bool extend = true; // almost certainly better
  390. manager->append(createKeySegmentMonitor(extend, LINK(&cur), i, keyedOffsets.item(i), fieldSize(i)));
  391. }
  392. manager->finishSegmentMonitors();
  393. //Now work out which parts are affected.
  394. matchingParts.kill();
  395. if (singlePart)
  396. matchingParts.append(0);
  397. else if (isLocal)
  398. {
  399. for (unsigned i = 0; i < numParts; i++)
  400. matchingParts.append(i);
  401. }
  402. else
  403. {
  404. manager->reset();
  405. while (manager->lookup(false))
  406. {
  407. offset_t node = extractFpos(manager);
  408. if (node)
  409. matchingParts.append((unsigned)(node-1));
  410. }
  411. }
  412. resetCursor();
  413. }