fvidxsource.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "eclrtl_imp.hpp"
  15. #include "hqlexpr.hpp"
  16. #include "fileview.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fvidxsource.ipp"
  19. #include "fverror.hpp"
  20. #include "dasess.hpp"
  21. //cloned from hthor - a candidate for commoning up.
  22. static IKeyIndex *openKeyFile(IDistributedFilePart *keyFile)
  23. {
  24. unsigned numCopies = keyFile->numCopies();
  25. assertex(numCopies);
  26. for (unsigned copy=0; copy < numCopies; copy++)
  27. {
  28. RemoteFilename rfn;
  29. try
  30. {
  31. OwnedIFile ifile = createIFile(keyFile->getFilename(rfn,copy));
  32. unsigned __int64 thissize = ifile->size();
  33. if (thissize != -1)
  34. {
  35. StringBuffer remotePath;
  36. rfn.getRemotePath(remotePath);
  37. unsigned crc = 0;
  38. keyFile->getCrc(crc);
  39. return createKeyIndex(remotePath.str(), crc, false, false);
  40. }
  41. }
  42. catch (IException *E)
  43. {
  44. EXCLOG(E, "While opening index file");
  45. E->Release();
  46. }
  47. }
  48. RemoteFilename rfn;
  49. StringBuffer url;
  50. keyFile->getFilename(rfn).getRemotePath(url);
  51. throw MakeStringException(1001, "Could not open key file at %s%s", url.str(), (numCopies > 1) ? " or any alternate location." : ".");
  52. }
  53. //---------------------------------------------------------------------------
  54. #define MIN_CACHED_ROWS 150
  55. #define MAX_CACHED_ROWS 200
  56. IndexPageCache::IndexPageCache()
  57. {
  58. firstRow = 0;
  59. offsetDelta = 0;
  60. offsets.append(0);
  61. saved.ensureCapacity(0x10000);
  62. }
  63. void IndexPageCache::addRow(__int64 row, size32_t len, const void * data)
  64. {
  65. if (row != firstRow + numRowsCached())
  66. {
  67. firstRow = row;
  68. offsetDelta = 0;
  69. offsets.kill();
  70. offsets.append(0);
  71. saved.setWritePos(0);
  72. }
  73. else if (numRowsCached() >= MAX_CACHED_ROWS)
  74. {
  75. unsigned numToRemove = numRowsCached() - MIN_CACHED_ROWS;
  76. __int64 newDelta = offsets.item(numToRemove);
  77. size32_t sizeLost = (size32_t)(newDelta-offsetDelta);
  78. //copy the cached rows
  79. byte * base = (byte *)saved.bufferBase();
  80. memmove(base, base+sizeLost, saved.length()-sizeLost);
  81. saved.setWritePos(saved.length()-sizeLost);
  82. offsets.removen(0, numToRemove);
  83. firstRow += numToRemove;
  84. offsetDelta = newDelta;
  85. }
  86. assertex(row == firstRow + numRowsCached());
  87. assertex(offsets.tos() == saved.length() + offsetDelta);
  88. saved.append(len, data);
  89. offsets.append(saved.length() + offsetDelta);
  90. }
  91. bool IndexPageCache::getRow(__int64 row, size32_t & len, const void * & data)
  92. {
  93. if (row < firstRow || row >= firstRow + numRowsCached())
  94. return false;
  95. unsigned __int64 startOffset = offsets.item((unsigned)(row-firstRow));
  96. unsigned __int64 endOffset = offsets.item((unsigned)(row-firstRow+1));
  97. len = (size32_t)(endOffset - startOffset);
  98. data = saved.toByteArray() + (unsigned)(startOffset - offsetDelta);
  99. return true;
  100. }
  101. //---------------------------------------------------------------------------
  102. //Could probably cope with unknown record, just displaying as binary.
  103. IndexDataSource::IndexDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password)
  104. {
  105. logicalName.set(_logicalName);
  106. diskRecord.set(_diskRecord);
  107. Owned<IUserDescriptor> udesc;
  108. if(_username != NULL && *_username != '\0')
  109. {
  110. udesc.setown(createUserDescriptor());
  111. udesc->set(_username, _password);
  112. }
  113. df.setown(queryDistributedFileDirectory().lookup(logicalName, udesc.get()));
  114. filtered = false;
  115. }
  116. IndexDataSource::IndexDataSource(IndexDataSource * _other)
  117. {
  118. logicalName.set(_other->logicalName);
  119. diskRecord.set(_other->diskRecord);
  120. df.set(_other->df);
  121. original.set(_other); // stop any work units etc. being unloaded.
  122. diskMeta.set(_other->diskMeta); // optimization - would be handled by init anyway
  123. filtered = false;
  124. //MORE: What else needs cloning/initializing?
  125. }
  126. IFvDataSource * IndexDataSource::cloneForFilter()
  127. {
  128. Owned<IndexDataSource> ret = new IndexDataSource(this);
  129. if (ret->init())
  130. return ret.getClear();
  131. return NULL;
  132. }
  133. bool IndexDataSource::init()
  134. {
  135. if (!df)
  136. return false;
  137. numParts = df->numParts();
  138. singlePart = (numParts == 1);
  139. ignoreSkippedRows = true; // better if skipping to a particular point
  140. StringBuffer partName;
  141. Owned<IDistributedFilePart> kf = df->getPart(numParts-1);
  142. tlk.setown(openKeyFile(kf));
  143. if (!tlk)
  144. return false;
  145. IPropertyTree & properties = df->queryAttributes();
  146. //Need to assign the transformed record to meta
  147. if (!diskMeta)
  148. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, false, tlk->keyedSize()));
  149. if (!returnedMeta)
  150. {
  151. returnedMeta.set(diskMeta);
  152. returnedRecordSize.set(returnedMeta);
  153. }
  154. if (!transformedMeta)
  155. transformedMeta.set(returnedMeta);
  156. if (properties.hasProp("@recordCount"))
  157. totalRows = properties.getPropInt64("@recordCount");
  158. else
  159. totalRows = UNKNOWN_NUM_ROWS; // more: could probably count them
  160. isLocal = properties.hasProp("@local");
  161. diskMeta->extractKeyedInfo(keyedOffsets, keyedTypes);
  162. ForEachItemIn(i, keyedTypes)
  163. {
  164. IStringSet * set = createRtlStringSet(fieldSize(i));
  165. set->addAll();
  166. values.append(*set);
  167. }
  168. diskMeta->patchIndexFileposition(); // Now returned as a bigendian field on the end of the row
  169. //Default cursor if no filter is applied
  170. applyFilter();
  171. return true;
  172. }
  173. __int64 IndexDataSource::numRows(bool force)
  174. {
  175. if (!filtered)
  176. return totalRows;
  177. //If leading component isn't filtered, then this can take a very long time...
  178. if (!force && values.item(0).isFullSet())
  179. return UNKNOWN_NUM_ROWS;
  180. __int64 total = 0;
  181. ForEachItemIn(i, matchingParts)
  182. {
  183. manager->setKey(NULL);
  184. curPart.clear();
  185. if (singlePart)
  186. curPart.set(tlk);
  187. else
  188. {
  189. Owned<IDistributedFilePart> kf = df->getPart(matchingParts.item(i));
  190. curPart.setown(openKeyFile(kf));
  191. if (!curPart)
  192. {
  193. total = UNKNOWN_NUM_ROWS;
  194. break;
  195. }
  196. }
  197. manager->setKey(curPart);
  198. manager->reset();
  199. total += manager->getCount();
  200. }
  201. manager->setKey(NULL);
  202. curPart.clear();
  203. resetCursor();
  204. return total;
  205. }
  206. bool IndexDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  207. {
  208. if (cache.getRow(row, length, data))
  209. return true;
  210. if (row < 0)
  211. return false;
  212. if ((unsigned __int64)row < nextRowToRead)
  213. resetCursor();
  214. MemoryBuffer temp;
  215. while ((unsigned __int64)row >= nextRowToRead)
  216. {
  217. bool saveRow = !ignoreSkippedRows || (row == nextRowToRead);
  218. if (!getNextRow(temp.clear(), saveRow))
  219. return false;
  220. if (saveRow)
  221. cache.addRow(nextRowToRead, temp.length(), temp.toByteArray());
  222. nextRowToRead++;
  223. }
  224. return cache.getRow(row, length, data);
  225. }
  226. bool IndexDataSource::getNextRow(MemoryBuffer & out, bool extractRow)
  227. {
  228. bool nextPart = !matchingParts.isItem((unsigned)curPartIndex);
  229. for (;;)
  230. {
  231. if (nextPart)
  232. {
  233. if ((curPartIndex != -1) && ((unsigned)curPartIndex >= matchingParts.ordinality()))
  234. return false;
  235. manager->setKey(NULL);
  236. curPart.clear();
  237. if ((unsigned)++curPartIndex >= matchingParts.ordinality())
  238. return false;
  239. if (singlePart)
  240. curPart.set(tlk);
  241. else
  242. {
  243. Owned<IDistributedFilePart> kf = df->getPart(matchingParts.item(curPartIndex));
  244. curPart.setown(openKeyFile(kf));
  245. if (!curPart)
  246. return false;
  247. }
  248. manager->setKey(curPart);
  249. manager->reset();
  250. }
  251. if (manager->lookup(true))
  252. {
  253. if (extractRow)
  254. {
  255. if (false)
  256. {
  257. //MORE: Allow a transformer to cope with blobs etc.
  258. }
  259. else
  260. {
  261. offset_t filepos;
  262. const byte * thisRow = manager->queryKeyBuffer(filepos);
  263. unsigned thisSize = diskMeta->getRecordSize(thisRow);
  264. void * temp = out.reserve(thisSize);
  265. memcpy(temp, thisRow, thisSize);
  266. }
  267. }
  268. return true;
  269. }
  270. nextPart = true;
  271. }
  272. }
  273. void IndexDataSource::resetCursor()
  274. {
  275. curPartIndex = -1;
  276. nextRowToRead = 0;
  277. }
  278. bool IndexDataSource::addFilter(unsigned column, unsigned matchLen, unsigned sizeData, const void * data)
  279. {
  280. if (!values.isItem(column))
  281. return false;
  282. unsigned curSize = fieldSize(column);
  283. IStringSet & set = values.item(column);
  284. if (set.isFullSet())
  285. set.reset();
  286. ITypeInfo & cur = keyedTypes.item(column);
  287. rtlDataAttr tempLow, tempHigh;
  288. unsigned keyedSize = cur.getSize();
  289. byte * temp = (byte *)alloca(keyedSize);
  290. const void * low = data;
  291. const void * high = data;
  292. type_t tc = cur.getTypeCode();
  293. switch (tc)
  294. {
  295. case type_int:
  296. case type_swapint:
  297. {
  298. assertex(sizeData == curSize);
  299. // values are already converted to bigendian and correctly biased
  300. break;
  301. }
  302. case type_varstring:
  303. //should cast from string to varstring
  304. break;
  305. case type_string:
  306. case type_data:
  307. {
  308. const char * inbuff = (const char *)data;
  309. if (matchLen != FullStringMatch)
  310. {
  311. unsigned lenLow, lenHigh;
  312. rtlCreateRangeLow(lenLow, tempLow.refstr(), curSize, matchLen, sizeData, inbuff);
  313. rtlCreateRangeHigh(lenHigh, tempHigh.refstr(), curSize, matchLen, sizeData, inbuff);
  314. low = tempLow.getdata();
  315. high = tempHigh.getdata();
  316. }
  317. else
  318. {
  319. if (tc == type_string)
  320. {
  321. //may need to cast from ascii to ebcidic
  322. rtlStrToStr(curSize, temp, sizeData, data);
  323. }
  324. else
  325. rtlDataToData(curSize, temp, sizeData, data);
  326. low = high = temp;
  327. }
  328. break;
  329. }
  330. case type_qstring:
  331. {
  332. const char * inbuff = (const char *)data;
  333. unsigned lenData = rtlQStrLength(sizeData);
  334. unsigned lenField = cur.getStringLen();
  335. if (matchLen != FullStringMatch)
  336. {
  337. unsigned lenLow, lenHigh;
  338. rtlCreateQStrRangeLow(lenLow, tempLow.refstr(), lenField, matchLen, lenData, inbuff);
  339. rtlCreateQStrRangeHigh(lenHigh, tempHigh.refstr(), lenField, matchLen, lenData, inbuff);
  340. low = tempLow.getdata();
  341. high = tempHigh.getdata();
  342. }
  343. else
  344. {
  345. rtlQStrToQStr(lenField, (char *)temp, lenData, (const char *)data);
  346. low = high = temp;
  347. }
  348. break;
  349. }
  350. case type_unicode:
  351. {
  352. const char * inbuff = (const char *)data;
  353. unsigned lenData = sizeData / sizeof(UChar);
  354. unsigned lenField = cur.getStringLen();
  355. if (matchLen != FullStringMatch)
  356. {
  357. unsigned lenLow, lenHigh;
  358. rtlCreateUnicodeRangeLow(lenLow, tempLow.refustr(), lenField, matchLen, lenData, (const UChar *)inbuff);
  359. rtlCreateUnicodeRangeHigh(lenHigh, tempHigh.refustr(), lenField, matchLen, lenData, (const UChar *)inbuff);
  360. low = tempLow.getdata();
  361. high = tempHigh.getdata();
  362. }
  363. else
  364. {
  365. rtlUnicodeToUnicode(lenField, (UChar *)temp, lenData, (const UChar *)data);
  366. low = high = temp;
  367. }
  368. break;
  369. }
  370. default:
  371. assertex(sizeData == curSize);
  372. break;
  373. }
  374. set.addRange(low, high);
  375. filtered = true;
  376. return true;
  377. }
  378. void IndexDataSource::applyFilter()
  379. {
  380. manager.setown(createLocalKeyManager(tlk, NULL));
  381. ForEachItemIn(i, values)
  382. {
  383. IStringSet & cur = values.item(i);
  384. bool extend = true; // almost certainly better
  385. manager->append(createKeySegmentMonitor(extend, LINK(&cur), i, keyedOffsets.item(i), fieldSize(i)));
  386. }
  387. manager->finishSegmentMonitors();
  388. //Now work out which parts are affected.
  389. matchingParts.kill();
  390. if (singlePart)
  391. matchingParts.append(0);
  392. else if (isLocal)
  393. {
  394. for (unsigned i = 0; i < numParts; i++)
  395. matchingParts.append(i);
  396. }
  397. else
  398. {
  399. manager->reset();
  400. while (manager->lookup(false))
  401. {
  402. offset_t node = manager->queryFpos();
  403. if (node)
  404. matchingParts.append((unsigned)(node-1));
  405. }
  406. }
  407. resetCursor();
  408. }