fvidxsource.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jliball.hpp"
  15. #include "eclrtl_imp.hpp"
  16. #include "hqlexpr.hpp"
  17. #include "fileview.hpp"
  18. #include "fvresultset.ipp"
  19. #include "fvidxsource.ipp"
  20. #include "fverror.hpp"
  21. #include "dasess.hpp"
  22. //cloned from hthor - a candidate for commoning up.
  23. static IKeyIndex *openKeyFile(IDistributedFilePart *keyFile)
  24. {
  25. unsigned numCopies = keyFile->numCopies();
  26. assertex(numCopies);
  27. for (unsigned copy=0; copy < numCopies; copy++)
  28. {
  29. RemoteFilename rfn;
  30. try
  31. {
  32. OwnedIFile ifile = createIFile(keyFile->getFilename(rfn,copy));
  33. unsigned __int64 thissize = ifile->size();
  34. if (thissize != -1)
  35. {
  36. StringBuffer remotePath;
  37. rfn.getRemotePath(remotePath);
  38. unsigned crc;
  39. keyFile->getCrc(crc);
  40. return createKeyIndex(remotePath.str(), crc, false, false);
  41. }
  42. }
  43. catch (IException *E)
  44. {
  45. EXCLOG(E, "While opening index file");
  46. E->Release();
  47. }
  48. }
  49. RemoteFilename rfn;
  50. StringBuffer url;
  51. keyFile->getFilename(rfn).getRemotePath(url);
  52. throw MakeStringException(1001, "Could not open key file at %s%s", url.str(), (numCopies > 1) ? " or any alternate location." : ".");
  53. }
  54. //---------------------------------------------------------------------------
  55. #define MIN_CACHED_ROWS 150
  56. #define MAX_CACHED_ROWS 200
  57. IndexPageCache::IndexPageCache()
  58. {
  59. firstRow = 0;
  60. offsetDelta = 0;
  61. offsets.append(0);
  62. saved.ensureCapacity(0x10000);
  63. }
  64. void IndexPageCache::addRow(__int64 row, size32_t len, const void * data)
  65. {
  66. if (row != firstRow + numRowsCached())
  67. {
  68. firstRow = row;
  69. offsetDelta = 0;
  70. offsets.kill();
  71. offsets.append(0);
  72. saved.setWritePos(0);
  73. }
  74. else if (numRowsCached() >= MAX_CACHED_ROWS)
  75. {
  76. unsigned numToRemove = numRowsCached() - MIN_CACHED_ROWS;
  77. __int64 newDelta = offsets.item(numToRemove);
  78. size32_t sizeLost = (size32_t)(newDelta-offsetDelta);
  79. //copy the cached rows
  80. byte * base = (byte *)saved.bufferBase();
  81. memmove(base, base+sizeLost, saved.length()-sizeLost);
  82. saved.setWritePos(saved.length()-sizeLost);
  83. offsets.removen(0, numToRemove);
  84. firstRow += numToRemove;
  85. offsetDelta = newDelta;
  86. }
  87. assertex(row == firstRow + numRowsCached());
  88. assertex(offsets.tos() == saved.length() + offsetDelta);
  89. saved.append(len, data);
  90. offsets.append(saved.length() + offsetDelta);
  91. }
  92. bool IndexPageCache::getRow(__int64 row, size32_t & len, const void * & data)
  93. {
  94. if (row < firstRow || row >= firstRow + numRowsCached())
  95. return false;
  96. unsigned __int64 startOffset = offsets.item((unsigned)(row-firstRow));
  97. unsigned __int64 endOffset = offsets.item((unsigned)(row-firstRow+1));
  98. len = (size32_t)(endOffset - startOffset);
  99. data = saved.toByteArray() + (unsigned)(startOffset - offsetDelta);
  100. return true;
  101. }
  102. //---------------------------------------------------------------------------
  103. //Could probably cope with unknown record, just displaying as binary.
  104. IndexDataSource::IndexDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password)
  105. {
  106. logicalName.set(_logicalName);
  107. diskRecord.set(_diskRecord);
  108. Owned<IUserDescriptor> udesc;
  109. if(_username != NULL && *_username != '\0')
  110. {
  111. udesc.setown(createUserDescriptor());
  112. udesc->set(_username, _password);
  113. }
  114. df.setown(queryDistributedFileDirectory().lookup(logicalName, udesc.get()));
  115. filtered = false;
  116. }
  117. IndexDataSource::IndexDataSource(IndexDataSource * _other)
  118. {
  119. logicalName.set(_other->logicalName);
  120. diskRecord.set(_other->diskRecord);
  121. df.set(_other->df);
  122. original.set(_other); // stop any work units etc. being unloaded.
  123. diskMeta.set(_other->diskMeta); // optimization - would be handled by init anyway
  124. filtered = false;
  125. //MORE: What else needs cloning/initializing?
  126. }
  127. IFvDataSource * IndexDataSource::cloneForFilter()
  128. {
  129. Owned<IndexDataSource> ret = new IndexDataSource(this);
  130. if (ret->init())
  131. return ret.getClear();
  132. return NULL;
  133. }
  134. bool IndexDataSource::init()
  135. {
  136. if (!df)
  137. return false;
  138. numParts = df->numParts();
  139. singlePart = (numParts == 1);
  140. ignoreSkippedRows = true; // better if skipping to a particular point
  141. StringBuffer partName;
  142. Owned<IDistributedFilePart> kf = df->getPart(numParts-1);
  143. tlk.setown(openKeyFile(kf));
  144. if (!tlk)
  145. return false;
  146. IPropertyTree & properties = df->queryProperties();
  147. //Need to assign the transformed record to meta
  148. if (!diskMeta)
  149. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, false, tlk->keyedSize()));
  150. if (!returnedMeta)
  151. {
  152. returnedMeta.set(diskMeta);
  153. returnedRecordSize.set(returnedMeta);
  154. }
  155. if (!transformedMeta)
  156. transformedMeta.set(returnedMeta);
  157. if (properties.hasProp("@recordCount"))
  158. totalRows = properties.getPropInt64("@recordCount");
  159. else
  160. totalRows = UNKNOWN_NUM_ROWS; // more: could probably count them
  161. isLocal = properties.hasProp("@local");
  162. diskMeta->extractKeyedInfo(keyedOffsets, keyedTypes);
  163. ForEachItemIn(i, keyedTypes)
  164. {
  165. IStringSet * set = createRtlStringSet(fieldSize(i));
  166. set->addAll();
  167. values.append(*set);
  168. }
  169. fileposFieldType = diskMeta->queryType(diskMeta->numColumns()-1);
  170. assertex(fileposFieldType && fileposFieldType->isInteger());
  171. //Now gather all the
  172. //Default cursor if no filter is applied
  173. applyFilter();
  174. return true;
  175. }
  176. __int64 IndexDataSource::numRows(bool force)
  177. {
  178. if (!filtered)
  179. return totalRows;
  180. //If leading component isn't filtered, then this can take a very long time...
  181. if (!force && values.item(0).isFullSet())
  182. return UNKNOWN_NUM_ROWS;
  183. __int64 total = 0;
  184. ForEachItemIn(i, matchingParts)
  185. {
  186. manager->setKey(NULL);
  187. curPart.clear();
  188. if (singlePart)
  189. curPart.set(tlk);
  190. else
  191. {
  192. Owned<IDistributedFilePart> kf = df->getPart(matchingParts.item(i));
  193. curPart.setown(openKeyFile(kf));
  194. if (!curPart)
  195. {
  196. total = UNKNOWN_NUM_ROWS;
  197. break;
  198. }
  199. }
  200. manager->setKey(curPart);
  201. manager->reset();
  202. total += manager->getCount();
  203. }
  204. manager->setKey(NULL);
  205. curPart.clear();
  206. resetCursor();
  207. return total;
  208. }
  209. bool IndexDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
  210. {
  211. if (cache.getRow(row, length, data))
  212. return true;
  213. if (row < 0)
  214. return false;
  215. if ((unsigned __int64)row < nextRowToRead)
  216. resetCursor();
  217. MemoryBuffer temp;
  218. while ((unsigned __int64)row >= nextRowToRead)
  219. {
  220. bool saveRow = !ignoreSkippedRows || (row == nextRowToRead);
  221. if (!getNextRow(temp.clear(), saveRow))
  222. return false;
  223. if (saveRow)
  224. cache.addRow(nextRowToRead, temp.length(), temp.toByteArray());
  225. nextRowToRead++;
  226. }
  227. return cache.getRow(row, length, data);
  228. }
  229. bool IndexDataSource::getNextRow(MemoryBuffer & out, bool extractRow)
  230. {
  231. bool nextPart = !matchingParts.isItem((unsigned)curPartIndex);
  232. loop
  233. {
  234. if (nextPart)
  235. {
  236. if ((curPartIndex != -1) && ((unsigned)curPartIndex >= matchingParts.ordinality()))
  237. return false;
  238. manager->setKey(NULL);
  239. curPart.clear();
  240. if ((unsigned)++curPartIndex >= matchingParts.ordinality())
  241. return false;
  242. if (singlePart)
  243. curPart.set(tlk);
  244. else
  245. {
  246. Owned<IDistributedFilePart> kf = df->getPart(matchingParts.item(curPartIndex));
  247. curPart.setown(openKeyFile(kf));
  248. if (!curPart)
  249. return false;
  250. }
  251. manager->setKey(curPart);
  252. manager->reset();
  253. }
  254. if (manager->lookup(true))
  255. {
  256. if (extractRow)
  257. {
  258. if (false)
  259. {
  260. //MORE: Allow a transformer to cope with blobs etc.
  261. }
  262. else
  263. {
  264. unsigned fileposSize = fileposFieldType->getSize();
  265. // unsigned thisSize = manager->queryRecordSize(); // Should be possible - needs a new function to call cursor->getSize()
  266. offset_t filepos;
  267. const byte * thisRow = manager->queryKeyBuffer(filepos);
  268. unsigned thisSize = diskMeta->getRecordSize(thisRow) - fileposSize;
  269. void * temp = out.reserve(thisSize);
  270. memcpy(temp, thisRow, thisSize);
  271. //Append the fileposition, in the correct size/endianness
  272. assertex(sizeof(filepos) >= 8);
  273. #if __BYTE_ORDER == __LITTLE_ENDIAN
  274. void * data = &filepos;
  275. #else
  276. void * data = (byte *)&filepos + sizeof(filepos) - fileposSize;
  277. #endif
  278. if (fileposFieldType->isSwappedEndian())
  279. out.appendSwap(fileposSize, data);
  280. else
  281. out.append(fileposSize, data);
  282. }
  283. }
  284. return true;
  285. }
  286. nextPart = true;
  287. }
  288. }
  289. void IndexDataSource::resetCursor()
  290. {
  291. curPartIndex = -1;
  292. nextRowToRead = 0;
  293. }
  294. bool IndexDataSource::addFilter(unsigned column, unsigned matchLen, unsigned sizeData, const void * data)
  295. {
  296. if (!values.isItem(column))
  297. return false;
  298. unsigned curSize = fieldSize(column);
  299. IStringSet & set = values.item(column);
  300. if (set.isFullSet())
  301. set.reset();
  302. ITypeInfo & cur = keyedTypes.item(column);
  303. rtlDataAttr tempLow, tempHigh;
  304. unsigned keyedSize = cur.getSize();
  305. byte * temp = (byte *)alloca(keyedSize);
  306. const void * low = data;
  307. const void * high = data;
  308. type_t tc = cur.getTypeCode();
  309. switch (tc)
  310. {
  311. case type_int:
  312. case type_swapint:
  313. {
  314. assertex(sizeData == curSize);
  315. // values are already converted to bigendian and correctly biased
  316. break;
  317. }
  318. case type_varstring:
  319. //should cast from string to varstring
  320. break;
  321. case type_string:
  322. case type_data:
  323. {
  324. const char * inbuff = (const char *)data;
  325. if (matchLen != FullStringMatch)
  326. {
  327. unsigned lenLow, lenHigh;
  328. rtlCreateRangeLow(lenLow, tempLow.refstr(), curSize, matchLen, sizeData, inbuff);
  329. rtlCreateRangeHigh(lenHigh, tempHigh.refstr(), curSize, matchLen, sizeData, inbuff);
  330. low = tempLow.getdata();
  331. high = tempHigh.getdata();
  332. }
  333. else
  334. {
  335. if (tc == type_string)
  336. {
  337. //may need to cast from ascii to ebcidic
  338. rtlStrToStr(curSize, temp, sizeData, data);
  339. }
  340. else
  341. rtlDataToData(curSize, temp, sizeData, data);
  342. low = high = temp;
  343. }
  344. break;
  345. }
  346. case type_qstring:
  347. {
  348. const char * inbuff = (const char *)data;
  349. unsigned lenData = rtlQStrLength(sizeData);
  350. unsigned lenField = cur.getStringLen();
  351. if (matchLen != FullStringMatch)
  352. {
  353. unsigned lenLow, lenHigh;
  354. rtlCreateQStrRangeLow(lenLow, tempLow.refstr(), lenField, matchLen, lenData, inbuff);
  355. rtlCreateQStrRangeHigh(lenHigh, tempHigh.refstr(), lenField, matchLen, lenData, inbuff);
  356. low = tempLow.getdata();
  357. high = tempHigh.getdata();
  358. }
  359. else
  360. {
  361. rtlQStrToQStr(lenField, (char *)temp, lenData, (const char *)data);
  362. low = high = temp;
  363. }
  364. break;
  365. }
  366. default:
  367. assertex(sizeData == curSize);
  368. break;
  369. }
  370. set.addRange(low, high);
  371. filtered = true;
  372. return true;
  373. }
  374. void IndexDataSource::applyFilter()
  375. {
  376. manager.setown(createKeyManager(tlk, tlk->keySize(), NULL));
  377. ForEachItemIn(i, values)
  378. {
  379. IStringSet & cur = values.item(i);
  380. bool extend = true; // almost certainly better
  381. manager->append(createKeySegmentMonitor(extend, LINK(&cur), keyedOffsets.item(i), fieldSize(i)));
  382. }
  383. manager->finishSegmentMonitors();
  384. //Now work out which parts are affected.
  385. matchingParts.kill();
  386. if (singlePart)
  387. matchingParts.append(0);
  388. else if (isLocal)
  389. {
  390. for (unsigned i = 0; i < numParts; i++)
  391. matchingParts.append(i);
  392. }
  393. else
  394. {
  395. manager->reset();
  396. while (manager->lookup(false))
  397. {
  398. offset_t node = manager->queryFpos();
  399. if (node)
  400. matchingParts.append((unsigned)(node-1));
  401. }
  402. }
  403. resetCursor();
  404. }