fvidxsource.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "eclrtl_imp.hpp"
  15. #include "hqlexpr.hpp"
  16. #include "fileview.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fvidxsource.ipp"
  19. #include "fverror.hpp"
  20. #include "dasess.hpp"
  21. #include "rtlrecord.hpp"
  22. #include "rtldynfield.hpp"
  23. #define MAX_FILE_READ_FAIL_COUNT 3
  24. //cloned from hthor - a candidate for commoning up.
  25. static IKeyIndex *openKeyFile(IDistributedFilePart *keyFile)
  26. {
  27. unsigned failcount = 0;
  28. unsigned numCopies = keyFile->numCopies();
  29. assertex(numCopies);
  30. for (unsigned copy=0; copy < numCopies && failcount < MAX_FILE_READ_FAIL_COUNT; copy++)
  31. {
  32. RemoteFilename rfn;
  33. try
  34. {
  35. OwnedIFile ifile = createIFile(keyFile->getFilename(rfn,copy));
  36. unsigned __int64 thissize = ifile->size();
  37. if (thissize != -1)
  38. {
  39. StringBuffer remotePath;
  40. rfn.getPath(remotePath);
  41. unsigned crc = 0;
  42. keyFile->getCrc(crc);
  43. return createKeyIndex(remotePath.str(), crc, false);
  44. }
  45. }
  46. catch (IException *E)
  47. {
  48. EXCLOG(E, "While opening index file");
  49. E->Release();
  50. failcount++;
  51. }
  52. }
  53. RemoteFilename rfn;
  54. StringBuffer url;
  55. keyFile->getFilename(rfn).getRemotePath(url);
  56. throw MakeStringException(1001, "Could not open key file at %s%s", url.str(), (numCopies > 1) ? " or any alternate location." : ".");
  57. }
  58. //---------------------------------------------------------------------------
  59. #define MIN_CACHED_ROWS 150
  60. #define MAX_CACHED_ROWS 200
  61. IndexPageCache::IndexPageCache()
  62. {
  63. firstRow = 0;
  64. offsetDelta = 0;
  65. offsets.append(0);
  66. saved.ensureCapacity(0x10000);
  67. }
  68. void IndexPageCache::addRow(__int64 row, size32_t len, const void * data)
  69. {
  70. if (row != firstRow + numRowsCached())
  71. {
  72. firstRow = row;
  73. offsetDelta = 0;
  74. offsets.kill();
  75. offsets.append(0);
  76. saved.setWritePos(0);
  77. }
  78. else if (numRowsCached() >= MAX_CACHED_ROWS)
  79. {
  80. unsigned numToRemove = numRowsCached() - MIN_CACHED_ROWS;
  81. __int64 newDelta = offsets.item(numToRemove);
  82. size32_t sizeLost = (size32_t)(newDelta-offsetDelta);
  83. //copy the cached rows
  84. byte * base = (byte *)saved.bufferBase();
  85. memmove(base, base+sizeLost, saved.length()-sizeLost);
  86. saved.setWritePos(saved.length()-sizeLost);
  87. offsets.removen(0, numToRemove);
  88. firstRow += numToRemove;
  89. offsetDelta = newDelta;
  90. }
  91. assertex(row == firstRow + numRowsCached());
  92. assertex(offsets.tos() == saved.length() + offsetDelta);
  93. saved.append(len, data);
  94. offsets.append(saved.length() + offsetDelta);
  95. }
  96. bool IndexPageCache::getRow(__int64 row, size32_t & len, const void * & data)
  97. {
  98. if (row < firstRow || row >= firstRow + numRowsCached())
  99. return false;
  100. unsigned __int64 startOffset = offsets.item((unsigned)(row-firstRow));
  101. unsigned __int64 endOffset = offsets.item((unsigned)(row-firstRow+1));
  102. len = (size32_t)(endOffset - startOffset);
  103. data = saved.toByteArray() + (unsigned)(startOffset - offsetDelta);
  104. return true;
  105. }
  106. //---------------------------------------------------------------------------
  107. //Could probably cope with unknown record, just displaying as binary.
  108. IndexDataSource::IndexDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password)
  109. {
  110. logicalName.set(_logicalName);
  111. diskRecord.set(_diskRecord);
  112. deserializer.setown(createRtlFieldTypeDeserializer());
  113. diskRecordMeta.setown(new CDynamicOutputMetaData(* static_cast<const RtlRecordTypeInfo *>(queryRtlType(*deserializer.get(), diskRecord))));
  114. Owned<IUserDescriptor> udesc;
  115. if(_username != NULL && *_username != '\0')
  116. {
  117. udesc.setown(createUserDescriptor());
  118. udesc->set(_username, _password);
  119. }
  120. df.setown(queryDistributedFileDirectory().lookup(logicalName, udesc.get(),AccessMode::tbdRead,false,false,nullptr,defaultPrivilegedUser));
  121. filtered = false;
  122. }
  123. IndexDataSource::IndexDataSource(IndexDataSource * _other)
  124. {
  125. logicalName.set(_other->logicalName);
  126. diskRecord.set(_other->diskRecord);
  127. deserializer.set(_other->deserializer);
  128. diskRecordMeta.set(_other->diskRecordMeta);
  129. df.set(_other->df);
  130. original.set(_other); // stop any work units etc. being unloaded.
  131. diskMeta.set(_other->diskMeta); // optimization - would be handled by init anyway
  132. filtered = false;
  133. //MORE: What else needs cloning/initializing?
  134. }
  135. IFvDataSource * IndexDataSource::cloneForFilter()
  136. {
  137. Owned<IndexDataSource> ret = new IndexDataSource(this);
  138. if (ret->init())
  139. return ret.getClear();
  140. return NULL;
  141. }
  142. bool IndexDataSource::init()
  143. {
  144. if (!df)
  145. return false;
  146. numParts = df->numParts();
  147. singlePart = (numParts == 1);
  148. ignoreSkippedRows = true; // better if skipping to a particular point
  149. StringBuffer partName;
  150. Owned<IDistributedFilePart> kf = df->getPart(numParts-1);
  151. tlk.setown(openKeyFile(kf));
  152. if (!tlk)
  153. return false;
  154. IPropertyTree & properties = df->queryAttributes();
  155. //Need to assign the transformed record to meta
  156. if (!diskMeta)
  157. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, false, tlk->keyedSize()));
  158. if (!returnedMeta)
  159. {
  160. returnedMeta.set(diskMeta);
  161. returnedRecordSize.set(returnedMeta);
  162. }
  163. if (!transformedMeta)
  164. transformedMeta.set(returnedMeta);
  165. if (properties.hasProp("@recordCount"))
  166. totalRows = properties.getPropInt64("@recordCount");
  167. else
  168. totalRows = UNKNOWN_NUM_ROWS; // more: could probably count them
  169. isLocal = properties.hasProp("@local");
  170. diskMeta->extractKeyedInfo(keyedOffsets, keyedTypes);
  171. ForEachItemIn(i, keyedTypes)
  172. {
  173. IStringSet * set = createRtlStringSet(fieldSize(i));
  174. set->addAll();
  175. values.append(*set);
  176. }
  177. diskMeta->patchIndexFileposition(); // Now returned as a bigendian field on the end of the row
  178. //Default cursor if no filter is applied
  179. applyFilter();
  180. return true;
  181. }
  182. __int64 IndexDataSource::numRows(bool force)
  183. {
  184. if (!filtered)
  185. return totalRows;
  186. //If leading component isn't filtered, then this can take a very long time...
  187. if (!force && values.item(0).isFullSet())
  188. return UNKNOWN_NUM_ROWS;
  189. __int64 total = 0;
  190. ForEachItemIn(i, matchingParts)
  191. {
  192. manager->setKey(NULL);
  193. curPart.clear();
  194. if (singlePart)
  195. curPart.set(tlk);
  196. else
  197. {
  198. Owned<IDistributedFilePart> kf = df->getPart(matchingParts.item(i));
  199. curPart.setown(openKeyFile(kf));
  200. if (!curPart)
  201. {
  202. total = UNKNOWN_NUM_ROWS;
  203. break;
  204. }
  205. }
  206. manager->setKey(curPart);
  207. manager->reset();
  208. total += manager->getCount();
  209. }
  210. manager->setKey(NULL);
  211. curPart.clear();
  212. resetCursor();
  213. return total;
  214. }
  215. bool IndexDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  216. {
  217. if (cache.getRow(row, length, data))
  218. return true;
  219. if (row < 0)
  220. return false;
  221. if ((unsigned __int64)row < nextRowToRead)
  222. resetCursor();
  223. MemoryBuffer temp;
  224. while ((unsigned __int64)row >= nextRowToRead)
  225. {
  226. bool saveRow = !ignoreSkippedRows || (row == nextRowToRead);
  227. if (!getNextRow(temp.clear(), saveRow))
  228. return false;
  229. if (saveRow)
  230. cache.addRow(nextRowToRead, temp.length(), temp.toByteArray());
  231. nextRowToRead++;
  232. }
  233. return cache.getRow(row, length, data);
  234. }
  235. bool IndexDataSource::getNextRow(MemoryBuffer & out, bool extractRow)
  236. {
  237. bool nextPart = !matchingParts.isItem((unsigned)curPartIndex);
  238. for (;;)
  239. {
  240. if (nextPart)
  241. {
  242. if ((curPartIndex != -1) && ((unsigned)curPartIndex >= matchingParts.ordinality()))
  243. return false;
  244. manager->setKey(NULL);
  245. curPart.clear();
  246. if ((unsigned)++curPartIndex >= matchingParts.ordinality())
  247. return false;
  248. if (singlePart)
  249. curPart.set(tlk);
  250. else
  251. {
  252. Owned<IDistributedFilePart> kf = df->getPart(matchingParts.item(curPartIndex));
  253. curPart.setown(openKeyFile(kf));
  254. if (!curPart)
  255. return false;
  256. }
  257. manager->setKey(curPart);
  258. manager->reset();
  259. }
  260. if (manager->lookup(true))
  261. {
  262. if (extractRow)
  263. {
  264. if (false)
  265. {
  266. //MORE: Allow a transformer to cope with blobs etc.
  267. }
  268. else
  269. {
  270. const byte * thisRow = manager->queryKeyBuffer();
  271. unsigned thisSize = diskMeta->getRecordSize(thisRow);
  272. void * temp = out.reserve(thisSize);
  273. memcpy(temp, thisRow, thisSize);
  274. }
  275. }
  276. return true;
  277. }
  278. nextPart = true;
  279. }
  280. }
  281. void IndexDataSource::resetCursor()
  282. {
  283. curPartIndex = -1;
  284. nextRowToRead = 0;
  285. }
  286. bool IndexDataSource::addFilter(unsigned column, unsigned matchLen, unsigned sizeData, const void * data)
  287. {
  288. if (!values.isItem(column))
  289. return false;
  290. unsigned curSize = fieldSize(column);
  291. IStringSet & set = values.item(column);
  292. if (set.isFullSet())
  293. set.reset();
  294. ITypeInfo & cur = keyedTypes.item(column);
  295. rtlDataAttr tempLow, tempHigh;
  296. unsigned keyedSize = cur.getSize();
  297. byte * temp = (byte *)alloca(keyedSize);
  298. const void * low = data;
  299. const void * high = data;
  300. type_t tc = cur.getTypeCode();
  301. switch (tc)
  302. {
  303. case type_int:
  304. case type_swapint:
  305. {
  306. assertex(sizeData == curSize);
  307. // values are already converted to bigendian and correctly biased
  308. break;
  309. }
  310. case type_varstring:
  311. //should cast from string to varstring
  312. break;
  313. case type_string:
  314. case type_data:
  315. {
  316. const char * inbuff = (const char *)data;
  317. if (matchLen != FullStringMatch)
  318. {
  319. unsigned lenLow, lenHigh;
  320. rtlCreateRangeLow(lenLow, tempLow.refstr(), curSize, matchLen, sizeData, inbuff);
  321. rtlCreateRangeHigh(lenHigh, tempHigh.refstr(), curSize, matchLen, sizeData, inbuff);
  322. low = tempLow.getdata();
  323. high = tempHigh.getdata();
  324. }
  325. else
  326. {
  327. if (tc == type_string)
  328. {
  329. //may need to cast from ascii to ebcidic
  330. rtlStrToStr(curSize, temp, sizeData, data);
  331. }
  332. else
  333. rtlDataToData(curSize, temp, sizeData, data);
  334. low = high = temp;
  335. }
  336. break;
  337. }
  338. case type_qstring:
  339. {
  340. const char * inbuff = (const char *)data;
  341. unsigned lenData = rtlQStrLength(sizeData);
  342. unsigned lenField = cur.getStringLen();
  343. if (matchLen != FullStringMatch)
  344. {
  345. unsigned lenLow, lenHigh;
  346. rtlCreateQStrRangeLow(lenLow, tempLow.refstr(), lenField, matchLen, lenData, inbuff);
  347. rtlCreateQStrRangeHigh(lenHigh, tempHigh.refstr(), lenField, matchLen, lenData, inbuff);
  348. low = tempLow.getdata();
  349. high = tempHigh.getdata();
  350. }
  351. else
  352. {
  353. rtlQStrToQStr(lenField, (char *)temp, lenData, (const char *)data);
  354. low = high = temp;
  355. }
  356. break;
  357. }
  358. case type_unicode:
  359. {
  360. const char * inbuff = (const char *)data;
  361. unsigned lenData = sizeData / sizeof(UChar);
  362. unsigned lenField = cur.getStringLen();
  363. if (matchLen != FullStringMatch)
  364. {
  365. unsigned lenLow, lenHigh;
  366. rtlCreateUnicodeRangeLow(lenLow, tempLow.refustr(), lenField, matchLen, lenData, (const UChar *)inbuff);
  367. rtlCreateUnicodeRangeHigh(lenHigh, tempHigh.refustr(), lenField, matchLen, lenData, (const UChar *)inbuff);
  368. low = tempLow.getdata();
  369. high = tempHigh.getdata();
  370. }
  371. else
  372. {
  373. rtlUnicodeToUnicode(lenField, (UChar *)temp, lenData, (const UChar *)data);
  374. low = high = temp;
  375. }
  376. break;
  377. }
  378. default:
  379. assertex(sizeData == curSize);
  380. break;
  381. }
  382. set.addRange(low, high);
  383. filtered = true;
  384. return true;
  385. }
  386. void IndexDataSource::applyFilter()
  387. {
  388. manager.setown(createLocalKeyManager(diskRecordMeta->queryRecordAccessor(true), tlk, NULL, false, false));
  389. ForEachItemIn(i, values)
  390. {
  391. IStringSet & cur = values.item(i);
  392. bool extend = true; // almost certainly better
  393. manager->append(createKeySegmentMonitor(extend, LINK(&cur), i, keyedOffsets.item(i), fieldSize(i)));
  394. }
  395. manager->finishSegmentMonitors();
  396. //Now work out which parts are affected.
  397. matchingParts.kill();
  398. if (singlePart)
  399. matchingParts.append(0);
  400. else if (isLocal)
  401. {
  402. for (unsigned i = 0; i < numParts; i++)
  403. matchingParts.append(i);
  404. }
  405. else
  406. {
  407. manager->reset();
  408. while (manager->lookup(false))
  409. {
  410. offset_t node = extractFpos(manager);
  411. if (node)
  412. matchingParts.append((unsigned)(node-1));
  413. }
  414. }
  415. resetCursor();
  416. }