fvidxsource.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "eclrtl_imp.hpp"
  15. #include "hqlexpr.hpp"
  16. #include "fileview.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fvidxsource.ipp"
  19. #include "fverror.hpp"
  20. #include "dasess.hpp"
  21. //cloned from hthor - a candidate for commoning up.
  22. static IKeyIndex *openKeyFile(IDistributedFilePart *keyFile)
  23. {
  24. unsigned numCopies = keyFile->numCopies();
  25. assertex(numCopies);
  26. for (unsigned copy=0; copy < numCopies; copy++)
  27. {
  28. RemoteFilename rfn;
  29. try
  30. {
  31. OwnedIFile ifile = createIFile(keyFile->getFilename(rfn,copy));
  32. unsigned __int64 thissize = ifile->size();
  33. if (thissize != -1)
  34. {
  35. StringBuffer remotePath;
  36. rfn.getRemotePath(remotePath);
  37. unsigned crc = 0;
  38. keyFile->getCrc(crc);
  39. return createKeyIndex(remotePath.str(), crc, false, false);
  40. }
  41. }
  42. catch (IException *E)
  43. {
  44. EXCLOG(E, "While opening index file");
  45. E->Release();
  46. }
  47. }
  48. RemoteFilename rfn;
  49. StringBuffer url;
  50. keyFile->getFilename(rfn).getRemotePath(url);
  51. throw MakeStringException(1001, "Could not open key file at %s%s", url.str(), (numCopies > 1) ? " or any alternate location." : ".");
  52. }
  53. //---------------------------------------------------------------------------
  54. #define MIN_CACHED_ROWS 150
  55. #define MAX_CACHED_ROWS 200
  56. IndexPageCache::IndexPageCache()
  57. {
  58. firstRow = 0;
  59. offsetDelta = 0;
  60. offsets.append(0);
  61. saved.ensureCapacity(0x10000);
  62. }
  63. void IndexPageCache::addRow(__int64 row, size32_t len, const void * data)
  64. {
  65. if (row != firstRow + numRowsCached())
  66. {
  67. firstRow = row;
  68. offsetDelta = 0;
  69. offsets.kill();
  70. offsets.append(0);
  71. saved.setWritePos(0);
  72. }
  73. else if (numRowsCached() >= MAX_CACHED_ROWS)
  74. {
  75. unsigned numToRemove = numRowsCached() - MIN_CACHED_ROWS;
  76. __int64 newDelta = offsets.item(numToRemove);
  77. size32_t sizeLost = (size32_t)(newDelta-offsetDelta);
  78. //copy the cached rows
  79. byte * base = (byte *)saved.bufferBase();
  80. memmove(base, base+sizeLost, saved.length()-sizeLost);
  81. saved.setWritePos(saved.length()-sizeLost);
  82. offsets.removen(0, numToRemove);
  83. firstRow += numToRemove;
  84. offsetDelta = newDelta;
  85. }
  86. assertex(row == firstRow + numRowsCached());
  87. assertex(offsets.tos() == saved.length() + offsetDelta);
  88. saved.append(len, data);
  89. offsets.append(saved.length() + offsetDelta);
  90. }
  91. bool IndexPageCache::getRow(__int64 row, size32_t & len, const void * & data)
  92. {
  93. if (row < firstRow || row >= firstRow + numRowsCached())
  94. return false;
  95. unsigned __int64 startOffset = offsets.item((unsigned)(row-firstRow));
  96. unsigned __int64 endOffset = offsets.item((unsigned)(row-firstRow+1));
  97. len = (size32_t)(endOffset - startOffset);
  98. data = saved.toByteArray() + (unsigned)(startOffset - offsetDelta);
  99. return true;
  100. }
  101. //---------------------------------------------------------------------------
  102. //Could probably cope with unknown record, just displaying as binary.
  103. IndexDataSource::IndexDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password)
  104. {
  105. logicalName.set(_logicalName);
  106. diskRecord.set(_diskRecord);
  107. Owned<IUserDescriptor> udesc;
  108. if(_username != NULL && *_username != '\0')
  109. {
  110. udesc.setown(createUserDescriptor());
  111. udesc->set(_username, _password);
  112. }
  113. df.setown(queryDistributedFileDirectory().lookup(logicalName, udesc.get()));
  114. filtered = false;
  115. }
  116. IndexDataSource::IndexDataSource(IndexDataSource * _other)
  117. {
  118. logicalName.set(_other->logicalName);
  119. diskRecord.set(_other->diskRecord);
  120. df.set(_other->df);
  121. original.set(_other); // stop any work units etc. being unloaded.
  122. diskMeta.set(_other->diskMeta); // optimization - would be handled by init anyway
  123. filtered = false;
  124. //MORE: What else needs cloning/initializing?
  125. }
  126. IFvDataSource * IndexDataSource::cloneForFilter()
  127. {
  128. Owned<IndexDataSource> ret = new IndexDataSource(this);
  129. if (ret->init())
  130. return ret.getClear();
  131. return NULL;
  132. }
  133. bool IndexDataSource::init()
  134. {
  135. if (!df)
  136. return false;
  137. numParts = df->numParts();
  138. singlePart = (numParts == 1);
  139. ignoreSkippedRows = true; // better if skipping to a particular point
  140. StringBuffer partName;
  141. Owned<IDistributedFilePart> kf = df->getPart(numParts-1);
  142. tlk.setown(openKeyFile(kf));
  143. if (!tlk)
  144. return false;
  145. IPropertyTree & properties = df->queryAttributes();
  146. //Need to assign the transformed record to meta
  147. if (!diskMeta)
  148. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, false, tlk->keyedSize()));
  149. if (!returnedMeta)
  150. {
  151. returnedMeta.set(diskMeta);
  152. returnedRecordSize.set(returnedMeta);
  153. }
  154. if (!transformedMeta)
  155. transformedMeta.set(returnedMeta);
  156. if (properties.hasProp("@recordCount"))
  157. totalRows = properties.getPropInt64("@recordCount");
  158. else
  159. totalRows = UNKNOWN_NUM_ROWS; // more: could probably count them
  160. isLocal = properties.hasProp("@local");
  161. diskMeta->extractKeyedInfo(keyedOffsets, keyedTypes);
  162. ForEachItemIn(i, keyedTypes)
  163. {
  164. IStringSet * set = createRtlStringSet(fieldSize(i));
  165. set->addAll();
  166. values.append(*set);
  167. }
  168. fileposFieldType = diskMeta->queryType(diskMeta->numColumns()-1);
  169. assertex(fileposFieldType && fileposFieldType->isInteger());
  170. //Now gather all the
  171. //Default cursor if no filter is applied
  172. applyFilter();
  173. return true;
  174. }
  175. __int64 IndexDataSource::numRows(bool force)
  176. {
  177. if (!filtered)
  178. return totalRows;
  179. //If leading component isn't filtered, then this can take a very long time...
  180. if (!force && values.item(0).isFullSet())
  181. return UNKNOWN_NUM_ROWS;
  182. __int64 total = 0;
  183. ForEachItemIn(i, matchingParts)
  184. {
  185. manager->setKey(NULL);
  186. curPart.clear();
  187. if (singlePart)
  188. curPart.set(tlk);
  189. else
  190. {
  191. Owned<IDistributedFilePart> kf = df->getPart(matchingParts.item(i));
  192. curPart.setown(openKeyFile(kf));
  193. if (!curPart)
  194. {
  195. total = UNKNOWN_NUM_ROWS;
  196. break;
  197. }
  198. }
  199. manager->setKey(curPart);
  200. manager->reset();
  201. total += manager->getCount();
  202. }
  203. manager->setKey(NULL);
  204. curPart.clear();
  205. resetCursor();
  206. return total;
  207. }
  208. bool IndexDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  209. {
  210. if (cache.getRow(row, length, data))
  211. return true;
  212. if (row < 0)
  213. return false;
  214. if ((unsigned __int64)row < nextRowToRead)
  215. resetCursor();
  216. MemoryBuffer temp;
  217. while ((unsigned __int64)row >= nextRowToRead)
  218. {
  219. bool saveRow = !ignoreSkippedRows || (row == nextRowToRead);
  220. if (!getNextRow(temp.clear(), saveRow))
  221. return false;
  222. if (saveRow)
  223. cache.addRow(nextRowToRead, temp.length(), temp.toByteArray());
  224. nextRowToRead++;
  225. }
  226. return cache.getRow(row, length, data);
  227. }
  228. bool IndexDataSource::getNextRow(MemoryBuffer & out, bool extractRow)
  229. {
  230. bool nextPart = !matchingParts.isItem((unsigned)curPartIndex);
  231. for (;;)
  232. {
  233. if (nextPart)
  234. {
  235. if ((curPartIndex != -1) && ((unsigned)curPartIndex >= matchingParts.ordinality()))
  236. return false;
  237. manager->setKey(NULL);
  238. curPart.clear();
  239. if ((unsigned)++curPartIndex >= matchingParts.ordinality())
  240. return false;
  241. if (singlePart)
  242. curPart.set(tlk);
  243. else
  244. {
  245. Owned<IDistributedFilePart> kf = df->getPart(matchingParts.item(curPartIndex));
  246. curPart.setown(openKeyFile(kf));
  247. if (!curPart)
  248. return false;
  249. }
  250. manager->setKey(curPart);
  251. manager->reset();
  252. }
  253. if (manager->lookup(true))
  254. {
  255. if (extractRow)
  256. {
  257. if (false)
  258. {
  259. //MORE: Allow a transformer to cope with blobs etc.
  260. }
  261. else
  262. {
  263. unsigned fileposSize = fileposFieldType->getSize();
  264. // unsigned thisSize = manager->queryRecordSize(); // Should be possible - needs a new function to call cursor->getSize()
  265. offset_t filepos;
  266. const byte * thisRow = manager->queryKeyBuffer(filepos);
  267. unsigned thisSize = diskMeta->getRecordSize(thisRow) - fileposSize;
  268. void * temp = out.reserve(thisSize);
  269. memcpy(temp, thisRow, thisSize);
  270. //Append the fileposition, in the correct size/endianness
  271. assertex(sizeof(filepos) >= 8);
  272. #if __BYTE_ORDER == __LITTLE_ENDIAN
  273. void * data = &filepos;
  274. #else
  275. void * data = (byte *)&filepos + sizeof(filepos) - fileposSize;
  276. #endif
  277. if (fileposFieldType->isSwappedEndian())
  278. out.appendSwap(fileposSize, data);
  279. else
  280. out.append(fileposSize, data);
  281. }
  282. }
  283. return true;
  284. }
  285. nextPart = true;
  286. }
  287. }
  288. void IndexDataSource::resetCursor()
  289. {
  290. curPartIndex = -1;
  291. nextRowToRead = 0;
  292. }
  293. bool IndexDataSource::addFilter(unsigned column, unsigned matchLen, unsigned sizeData, const void * data)
  294. {
  295. if (!values.isItem(column))
  296. return false;
  297. unsigned curSize = fieldSize(column);
  298. IStringSet & set = values.item(column);
  299. if (set.isFullSet())
  300. set.reset();
  301. ITypeInfo & cur = keyedTypes.item(column);
  302. rtlDataAttr tempLow, tempHigh;
  303. unsigned keyedSize = cur.getSize();
  304. byte * temp = (byte *)alloca(keyedSize);
  305. const void * low = data;
  306. const void * high = data;
  307. type_t tc = cur.getTypeCode();
  308. switch (tc)
  309. {
  310. case type_int:
  311. case type_swapint:
  312. {
  313. assertex(sizeData == curSize);
  314. // values are already converted to bigendian and correctly biased
  315. break;
  316. }
  317. case type_varstring:
  318. //should cast from string to varstring
  319. break;
  320. case type_string:
  321. case type_data:
  322. {
  323. const char * inbuff = (const char *)data;
  324. if (matchLen != FullStringMatch)
  325. {
  326. unsigned lenLow, lenHigh;
  327. rtlCreateRangeLow(lenLow, tempLow.refstr(), curSize, matchLen, sizeData, inbuff);
  328. rtlCreateRangeHigh(lenHigh, tempHigh.refstr(), curSize, matchLen, sizeData, inbuff);
  329. low = tempLow.getdata();
  330. high = tempHigh.getdata();
  331. }
  332. else
  333. {
  334. if (tc == type_string)
  335. {
  336. //may need to cast from ascii to ebcidic
  337. rtlStrToStr(curSize, temp, sizeData, data);
  338. }
  339. else
  340. rtlDataToData(curSize, temp, sizeData, data);
  341. low = high = temp;
  342. }
  343. break;
  344. }
  345. case type_qstring:
  346. {
  347. const char * inbuff = (const char *)data;
  348. unsigned lenData = rtlQStrLength(sizeData);
  349. unsigned lenField = cur.getStringLen();
  350. if (matchLen != FullStringMatch)
  351. {
  352. unsigned lenLow, lenHigh;
  353. rtlCreateQStrRangeLow(lenLow, tempLow.refstr(), lenField, matchLen, lenData, inbuff);
  354. rtlCreateQStrRangeHigh(lenHigh, tempHigh.refstr(), lenField, matchLen, lenData, inbuff);
  355. low = tempLow.getdata();
  356. high = tempHigh.getdata();
  357. }
  358. else
  359. {
  360. rtlQStrToQStr(lenField, (char *)temp, lenData, (const char *)data);
  361. low = high = temp;
  362. }
  363. break;
  364. }
  365. case type_unicode:
  366. {
  367. const char * inbuff = (const char *)data;
  368. unsigned lenData = sizeData / sizeof(UChar);
  369. unsigned lenField = cur.getStringLen();
  370. if (matchLen != FullStringMatch)
  371. {
  372. unsigned lenLow, lenHigh;
  373. rtlCreateUnicodeRangeLow(lenLow, tempLow.refustr(), lenField, matchLen, lenData, (const UChar *)inbuff);
  374. rtlCreateUnicodeRangeHigh(lenHigh, tempHigh.refustr(), lenField, matchLen, lenData, (const UChar *)inbuff);
  375. low = tempLow.getdata();
  376. high = tempHigh.getdata();
  377. }
  378. else
  379. {
  380. rtlUnicodeToUnicode(lenField, (UChar *)temp, lenData, (const UChar *)data);
  381. low = high = temp;
  382. }
  383. break;
  384. }
  385. default:
  386. assertex(sizeData == curSize);
  387. break;
  388. }
  389. set.addRange(low, high);
  390. filtered = true;
  391. return true;
  392. }
  393. void IndexDataSource::applyFilter()
  394. {
  395. manager.setown(createLocalKeyManager(tlk, tlk->keySize(), NULL));
  396. ForEachItemIn(i, values)
  397. {
  398. IStringSet & cur = values.item(i);
  399. bool extend = true; // almost certainly better
  400. manager->append(createKeySegmentMonitor(extend, LINK(&cur), keyedOffsets.item(i), fieldSize(i)));
  401. }
  402. manager->finishSegmentMonitors();
  403. //Now work out which parts are affected.
  404. matchingParts.kill();
  405. if (singlePart)
  406. matchingParts.append(0);
  407. else if (isLocal)
  408. {
  409. for (unsigned i = 0; i < numParts; i++)
  410. matchingParts.append(i);
  411. }
  412. else
  413. {
  414. manager->reset();
  415. while (manager->lookup(false))
  416. {
  417. offset_t node = manager->queryFpos();
  418. if (node)
  419. matchingParts.append((unsigned)(node-1));
  420. }
  421. }
  422. resetCursor();
  423. }