fvdisksource.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "eclrtl.hpp"
  15. #include "hqlexpr.hpp"
  16. #include "hqlthql.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fileview.hpp"
  19. #include "fvdisksource.ipp"
  20. #include "fvwugen.hpp"
  21. #include "fverror.hpp"
  22. #include "dasess.hpp"
  23. #define DEFAULT_MAX_CSV_SIZE 0x10000 // 64k
  24. PhysicalFileInfo::PhysicalFileInfo()
  25. {
  26. cachedPart = (unsigned)-1;
  27. totalSize = 0;
  28. }
  29. offset_t getPartSize(IDistributedFilePart & part, unsigned copy)
  30. {
  31. try
  32. {
  33. RemoteFilename rfn;
  34. Owned<IFile> in = createIFile(part.getFilename(rfn,copy));
  35. return in->size();
  36. }
  37. catch (IException * e)
  38. {
  39. e->Release();
  40. }
  41. return (offset_t) -1;
  42. }
  43. void PhysicalFileInfo::init(IDistributedFile * _df)
  44. {
  45. df.set(_df);
  46. totalSize = 0;
  47. Owned<IDistributedFilePartIterator> iter = df->getIterator();
  48. ForEach(*iter)
  49. {
  50. IDistributedFilePart & cur = iter->query();
  51. offset_t partSize = cur.getFileSize(true, false);
  52. if (partSize == -1)
  53. partSize = getPartSize(cur, 0);
  54. if (partSize == -1)
  55. partSize = getPartSize(cur, 1);
  56. if (partSize == -1)
  57. partSize = 0x100000; // force an error when the part is opened.
  58. partSizes.append(partSize);
  59. totalSize += partSize;
  60. }
  61. }
  62. offset_t PhysicalFileInfo::getOptimizedOffset(offset_t offset, unsigned copyLength)
  63. {
  64. offset_t newOffset = 0;
  65. ForEachItemIn(idx, partSizes)
  66. {
  67. offset_t curSize = partSizes.item(idx);
  68. if (offset < curSize)
  69. return newOffset + ((offset) / copyLength) * copyLength;
  70. newOffset += curSize;
  71. offset -= curSize;
  72. }
  73. return newOffset;
  74. }
  75. bool PhysicalFileInfo::readData(MemoryBuffer & out, __int64 startOffset, size32_t copyLength)
  76. {
  77. CriticalBlock procedure(cs);
  78. offset_t chunkOffset = startOffset;
  79. unsigned numParts = partSizes.ordinality();
  80. unsigned part;
  81. offset_t curPartLength;
  82. if (isLocalFpos(startOffset))
  83. {
  84. part = getLocalFposPart(startOffset);
  85. chunkOffset = getLocalFposOffset(startOffset);
  86. if (part >= numParts)
  87. return false;
  88. curPartLength = partSizes.item(part);
  89. }
  90. else
  91. {
  92. for (part = 0; part < numParts; part++)
  93. {
  94. curPartLength = partSizes.item(part);
  95. if (chunkOffset < curPartLength)
  96. break;
  97. chunkOffset -= curPartLength;
  98. }
  99. }
  100. if (part == numParts)
  101. return false;
  102. bool isLast = false;
  103. if (chunkOffset + copyLength >= curPartLength)
  104. {
  105. copyLength = (size32_t)(curPartLength - chunkOffset);
  106. isLast = true;
  107. }
  108. if (part != cachedPart)
  109. {
  110. cachedPart = (unsigned)-1;
  111. cachedFile.clear();
  112. cachedIO.clear();
  113. Owned<IDistributedFilePart> dfp = df->getPart(part);
  114. for (unsigned copy=0; copy<2; copy++)
  115. {
  116. try
  117. {
  118. RemoteFilename rfn;
  119. cachedFile.setown(createIFile(dfp->getFilename(rfn, copy)));
  120. cachedIO.setown(cachedFile->open(IFOread));
  121. if (cachedIO) break;
  122. }
  123. catch (IException * e)
  124. {
  125. e->Release();
  126. }
  127. }
  128. if (!cachedIO)
  129. {
  130. StringBuffer str;
  131. throwError1(FVERR_FailedOpenFile, dfp->getPartName(str).str());
  132. return false;
  133. }
  134. if (df->isCompressed())
  135. {
  136. cachedIO.setown(createCompressedFileReader(cachedIO));
  137. if (!cachedIO)
  138. {
  139. StringBuffer str;
  140. throwError1(FVERR_FailedOpenCompressedFile, dfp->getPartName(str).str());
  141. return false;
  142. }
  143. }
  144. cachedPart = part;
  145. }
  146. char * data = (char *)out.clear().reserve(copyLength);
  147. unsigned numGot = cachedIO->read(chunkOffset, copyLength, data);
  148. out.setLength(numGot);
  149. return isLast;
  150. }
  151. void PhysicalFileInfo::close()
  152. {
  153. cachedPart = (unsigned)-1;
  154. cachedFile.clear();
  155. cachedIO.clear();
  156. }
  157. //---------------------------------------------------------------------------
  158. DiskDataSource::DiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password)
  159. {
  160. logicalName.set(_logicalName);
  161. diskRecord.set(_diskRecord);
  162. Owned<IUserDescriptor> udesc;
  163. if(_username != NULL && *_username != '\0')
  164. {
  165. udesc.setown(createUserDescriptor());
  166. udesc->set(_username, _password);
  167. }
  168. df.setown(queryDistributedFileDirectory().lookup(logicalName, udesc.get(),false,false,false,nullptr,defaultPrivilegedUser));
  169. }
  170. //---------------------------------------------------------------------------
  171. DirectDiskDataSource::DirectDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password) : DiskDataSource(_logicalName, _diskRecord, _username, _password)
  172. {
  173. }
  174. bool DirectDiskDataSource::init()
  175. {
  176. if (!df)
  177. return false;
  178. IPropertyTree & properties = df->queryAttributes();
  179. const char * kind = properties.queryProp("@kind");
  180. bool isGrouped =properties.getPropBool("@grouped");
  181. if (kind && (stricmp(kind, "key") == 0))
  182. throwError1(FVERR_CannotViewKey, logicalName.get());
  183. //Need to assign the transformed record to meta
  184. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, isGrouped, 0));
  185. if (!returnedMeta)
  186. {
  187. returnedMeta.set(diskMeta);
  188. returnedRecordSize.set(returnedMeta);
  189. }
  190. if (!transformedMeta)
  191. transformedMeta.set(returnedMeta);
  192. if (!isWorkunitResult())
  193. addFileposition();
  194. physical.init(df);
  195. if (diskMeta->isFixedSize())
  196. {
  197. if (diskMeta->fixedSize() == 0)
  198. throwError1(FVERR_ZeroSizeRecord, logicalName.get());
  199. totalRows = physical.totalSize / diskMeta->fixedSize();
  200. }
  201. else if (properties.hasProp("@recordCount"))
  202. totalRows = properties.getPropInt64("@recordCount");
  203. else
  204. totalRows = UNKNOWN_NUM_ROWS;
  205. readBlockSize = 4 * diskMeta->getRecordSize(NULL);
  206. if (readBlockSize < DISK_BLOCK_SIZE) readBlockSize = DISK_BLOCK_SIZE;
  207. return true;
  208. }
  209. bool DirectDiskDataSource::fetchRowData(MemoryBuffer & out, __int64 offset)
  210. {
  211. physical.readData(out, offset, returnedMeta->getMaxRecordSize());
  212. if (out.length() == 0)
  213. return false;
  214. size32_t actualLength = returnedMeta->getRecordSize(out.toByteArray());
  215. if (actualLength > readBlockSize)
  216. throwError(FVERR_RowTooLarge);
  217. out.setLength(actualLength);
  218. return true;
  219. }
  220. size32_t DirectDiskDataSource::getCopyLength()
  221. {
  222. size32_t copyLength = readBlockSize;
  223. if (returnedMeta->isFixedSize())
  224. {
  225. unsigned fixedSize = returnedMeta->fixedSize();
  226. copyLength = (copyLength / fixedSize) * fixedSize;
  227. }
  228. return copyLength;
  229. }
  230. void DirectDiskDataSource::improveLocation(__int64 row, RowLocation & location)
  231. {
  232. if (!returnedMeta->isFixedSize())
  233. return;
  234. //Align the row so the chunks don't overlap....
  235. unsigned fixedSize = returnedMeta->fixedSize();
  236. size32_t copyLength = getCopyLength();
  237. location.bestOffset = physical.getOptimizedOffset(row * fixedSize, copyLength);
  238. location.bestRow = location.bestOffset / fixedSize;
  239. }
  240. bool DirectDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  241. {
  242. size32_t copyLength = getCopyLength();
  243. MemoryBuffer temp;
  244. bool isLast = physical.readData(temp, startOffset, copyLength);
  245. if (temp.length() == 0)
  246. return false;
  247. RowBlock * rows;
  248. if (returnedMeta->isFixedSize())
  249. rows = new FixedRowBlock(temp, startRow, startOffset, returnedMeta->fixedSize());
  250. else
  251. rows = new VariableRowBlock(temp, startRow, startOffset, returnedRecordSize, isLast);
  252. cache.addRowsOwn(rows);
  253. return true;
  254. }
  255. void DirectDiskDataSource::onClose()
  256. {
  257. DiskDataSource::onClose();
  258. if (openCount == 0)
  259. physical.close();
  260. }
  261. //---------------------------------------------------------------------------
  262. UtfReader::UtfFormat getFormat(const char * format)
  263. {
  264. if (memicmp(format, "utf", 3) == 0)
  265. {
  266. const char * tail = format + 3;
  267. if (*tail == '-')
  268. tail++;
  269. if (stricmp(tail, "8N")==0)
  270. return UtfReader::Utf8;
  271. else if (stricmp(tail, "16BE")==0)
  272. return UtfReader::Utf16be;
  273. else if (stricmp(tail, "16LE")==0)
  274. return UtfReader::Utf16le;
  275. else if (stricmp(tail, "32BE")==0)
  276. return UtfReader::Utf32be;
  277. else if (stricmp(tail, "32LE")==0)
  278. return UtfReader::Utf32le;
  279. else
  280. throwError1(FVERR_UnknownUTFFormat, format);
  281. }
  282. return UtfReader::Utf8;
  283. }
  284. enum { NONE=0, SEPARATOR=1, TERMINATOR=2, WHITESPACE=3, QUOTE=4, ESCAPE=5 };
  285. void CsvRecordSize::init(IDistributedFile * df)
  286. {
  287. IPropertyTree * props = &df->queryAttributes();
  288. UtfReader::UtfFormat utfType = getFormat(props->queryProp("@format"));
  289. switch (utfType)
  290. {
  291. case UtfReader::Utf16be:
  292. case UtfReader::Utf16le:
  293. unitSize = 2;
  294. break;
  295. case UtfReader::Utf32be:
  296. case UtfReader::Utf32le:
  297. unitSize = 4;
  298. break;
  299. default:
  300. unitSize = 1;
  301. }
  302. maxRecordSize = props->getPropInt("@maxRecordSize", DEFAULT_MAX_CSV_SIZE);
  303. const char * terminate = props->queryProp("@csvTerminate");
  304. addUtfActionList(matcher, terminate ? terminate : "\\n,\\r\\n", TERMINATOR, NULL, utfType);
  305. const char * separate = props->queryProp("@csvSeparate");
  306. addUtfActionList(matcher, separate ? separate : "\\,", SEPARATOR, NULL, utfType);
  307. const char * quote = props->queryProp("@csvQuote");
  308. addUtfActionList(matcher, quote ? quote : "\"", QUOTE, NULL, utfType);
  309. const char * escape = props->queryProp("@csvEscape");
  310. addUtfActionList(matcher, escape, ESCAPE, NULL, utfType);
  311. addUtfActionList(matcher, " ", WHITESPACE, NULL, utfType);
  312. addUtfActionList(matcher, "\t", WHITESPACE, NULL, utfType);
  313. }
  314. size32_t CsvRecordSize::getRecordLength(size32_t maxLength, const void * start, bool includeTerminator)
  315. {
  316. //If we need more complicated processing...
  317. unsigned quote = 0;
  318. unsigned quoteToStrip = 0;
  319. const byte * cur = (const byte *)start;
  320. const byte * end = (const byte *)start + maxLength;
  321. const byte * firstGood = cur;
  322. const byte * lastGood = cur;
  323. bool lastEscape = false;
  324. while (cur != end)
  325. {
  326. unsigned matchLen;
  327. unsigned match = matcher.getMatch(end-cur, (const char *)cur, matchLen);
  328. switch (match & 255)
  329. {
  330. case NONE:
  331. cur += unitSize; // matchLen == 0;
  332. lastGood = cur;
  333. break;
  334. case WHITESPACE:
  335. //Skip leading whitespace
  336. if (quote)
  337. lastGood = cur+matchLen;
  338. else if (cur == firstGood)
  339. {
  340. firstGood = cur+matchLen;
  341. lastGood = cur+matchLen;
  342. }
  343. break;
  344. case SEPARATOR:
  345. // Quoted separator
  346. if (quote == 0)
  347. {
  348. lastEscape = false;
  349. quoteToStrip = 0;
  350. firstGood = cur + matchLen;
  351. }
  352. lastGood = cur+matchLen;
  353. break;
  354. case TERMINATOR:
  355. if (quote == 0) // Is this a good idea? Means a mismatched quote is not fixed by EOL
  356. {
  357. if (includeTerminator)
  358. return cur + matchLen - (const byte *)start;
  359. return cur - (const byte *)start;
  360. }
  361. lastGood = cur+matchLen;
  362. break;
  363. case QUOTE:
  364. // Quoted quote
  365. if (quote == 0)
  366. {
  367. if (cur == firstGood)
  368. {
  369. quote = match;
  370. firstGood = cur+matchLen;
  371. }
  372. lastGood = cur+matchLen;
  373. }
  374. else
  375. {
  376. if (quote == match)
  377. {
  378. const byte * next = cur + matchLen;
  379. //Check for double quotes
  380. if ((next != end))
  381. {
  382. unsigned nextMatchLen;
  383. unsigned nextMatch = matcher.getMatch((size32_t)(end-next), (const char *)next, nextMatchLen);
  384. if (nextMatch == quote)
  385. {
  386. quoteToStrip = quote;
  387. matchLen += nextMatchLen;
  388. lastGood = cur+matchLen;
  389. }
  390. else
  391. quote = 0;
  392. }
  393. else
  394. quote = 0;
  395. }
  396. else
  397. lastGood = cur+matchLen;
  398. }
  399. break;
  400. case ESCAPE:
  401. lastEscape = true;
  402. lastGood = cur+matchLen;
  403. // If this escape is at the end, proceed to field range
  404. if (lastGood == end)
  405. break;
  406. // Skip escape and ignore the next match
  407. cur += matchLen;
  408. match = matcher.getMatch((size32_t)(end-cur), (const char *)cur, matchLen);
  409. if ((match & 255) == NONE)
  410. matchLen = unitSize;
  411. lastGood += matchLen;
  412. break;
  413. }
  414. cur += matchLen;
  415. }
  416. return end - (const byte *)start;
  417. }
  418. size32_t CsvRecordSize::getRecordSize(const void * start)
  419. {
  420. if (!start) return maxRecordSize;
  421. return getRecordLength(maxRecordSize, start, true);
  422. }
  423. size32_t CsvRecordSize::getRecordSize(unsigned maxLength, const void * start)
  424. {
  425. if (!start) return maxRecordSize;
  426. return getRecordLength(maxLength, start, true);
  427. }
  428. size32_t CsvRecordSize::getFixedSize() const
  429. {
  430. return 0; // is variable
  431. }
  432. size32_t CsvRecordSize::getMinRecordSize() const
  433. {
  434. return unitSize;
  435. }
  436. DirectCsvDiskDataSource::DirectCsvDiskDataSource(IDistributedFile * _df, const char * _format)
  437. {
  438. df.set(_df);
  439. isUnicode = (memicmp(_format, "utf", 3) == 0);
  440. utfFormat = getFormat(_format);
  441. returnedMeta.setown(new DataSourceMetaData(isUnicode ? type_unicode : type_string));
  442. returnedRecordSize.set(&recordSizer);
  443. transformedMeta.set(returnedMeta);
  444. addFileposition();
  445. IPropertyTree & properties = df->queryAttributes();
  446. if (properties.hasProp("@recordCount"))
  447. totalRows = properties.getPropInt64("@recordCount");
  448. }
  449. bool DirectCsvDiskDataSource::init()
  450. {
  451. physical.init(df);
  452. recordSizer.init(df);
  453. readBlockSize = 4 * recordSizer.getRecordSize(NULL);
  454. if (readBlockSize < DISK_BLOCK_SIZE) readBlockSize = DISK_BLOCK_SIZE;
  455. return true;
  456. }
  457. void DirectCsvDiskDataSource::copyRow(MemoryBuffer & out, size32_t length, const void * data)
  458. {
  459. if (isUnicode)
  460. {
  461. unsigned offsetOfLength = out.length();
  462. out.append(length);
  463. convertUtf(out, UtfReader::Utf16le, length, data, utfFormat);
  464. unsigned savedLength = out.length();
  465. out.setWritePos(offsetOfLength);
  466. out.append((unsigned) (savedLength - offsetOfLength - sizeof(unsigned))/2);
  467. out.setWritePos(savedLength);
  468. }
  469. else
  470. {
  471. out.append(length);
  472. out.append(length, data);
  473. }
  474. }
  475. bool DirectCsvDiskDataSource::fetchRowData(MemoryBuffer & out, __int64 offset)
  476. {
  477. MemoryBuffer temp;
  478. physical.readData(temp, offset, recordSizer.getRecordSize(NULL));
  479. if (temp.length() == 0)
  480. return false;
  481. unsigned realLength = recordSizer.getRecordSize(temp.length(), temp.toByteArray());
  482. copyRow(out, realLength, temp.toByteArray());
  483. return true;
  484. }
  485. bool DirectCsvDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  486. {
  487. size32_t copyLength = readBlockSize;
  488. MemoryBuffer temp;
  489. bool isLast = physical.readData(temp, startOffset, copyLength);
  490. if (temp.length() == 0)
  491. return false;
  492. RowBlock * rows = new VariableRowBlock(temp, startRow, startOffset, &recordSizer, isLast);
  493. cache.addRowsOwn(rows);
  494. return true;
  495. }
  496. bool DirectCsvDiskDataSource::getRow(MemoryBuffer & out, __int64 row)
  497. {
  498. size32_t length;
  499. const void * data;
  500. unsigned __int64 offset = 0;
  501. if (getRowData(row, length, data, offset))
  502. {
  503. //strip the end of line terminator from the length...
  504. length = recordSizer.getRecordLength(length, data, false);
  505. copyRow(out, length, data);
  506. out.append(offset);
  507. return true;
  508. }
  509. return false;
  510. }
  511. //---------------------------------------------------------------------------
  512. WorkunitDiskDataSource::WorkunitDiskDataSource(const char * _logicalName, IConstWUResult * _wuResult, const char * _wuid, const char * _username, const char * _password) : DirectDiskDataSource(_logicalName, NULL, _username, _password)
  513. {
  514. wuid.set(_wuid);
  515. wuResult.set(_wuResult);
  516. }
  517. bool WorkunitDiskDataSource::init()
  518. {
  519. if (!setReturnedInfoFromResult())
  520. return false;
  521. diskRecord.set(returnedRecord);
  522. return DirectDiskDataSource::init();
  523. }
  524. //---------------------------------------------------------------------------
  525. TranslatedDiskDataSource::TranslatedDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char * _cluster, const char * _username, const char * _password)
  526. {
  527. logicalName.set(_logicalName);
  528. diskRecord.set(_diskRecord);
  529. cluster.set(_cluster);
  530. username.set(_username);
  531. password.set(_password);
  532. openCount = 0;
  533. }
  534. TranslatedDiskDataSource::~TranslatedDiskDataSource()
  535. {
  536. if (helperWuid)
  537. {
  538. directSource.clear();
  539. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  540. factory->deleteWorkUnit(helperWuid);
  541. }
  542. }
  543. bool TranslatedDiskDataSource::createHelperWU()
  544. {
  545. OwnedHqlExpr browseWUcode = buildDiskOutputEcl(logicalName, diskRecord);
  546. if (!browseWUcode)
  547. return false;
  548. // MORE: Where should we get these parameters from ????
  549. StringAttr application("fileViewer");
  550. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  551. Owned<IWorkUnit> workunit = factory->createWorkUnit(application, username);
  552. workunit->setUser(username);
  553. workunit->setClusterName(cluster);
  554. workunit->setAction(WUActionCompile);
  555. StringBuffer jobName;
  556. jobName.append("FileView_for_").append(logicalName);
  557. workunit->setJobName(jobName.str());
  558. StringBuffer eclText;
  559. toECL(browseWUcode, eclText, true);
  560. Owned<IWUQuery> query = workunit->updateQuery();
  561. query->setQueryText(eclText.str());
  562. query->setQueryName(jobName.str());
  563. helperWuid.set(workunit->queryWuid());
  564. return true;
  565. }
  566. bool TranslatedDiskDataSource::init()
  567. {
  568. if (!createHelperWU() || !compileHelperWU())
  569. return false;
  570. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  571. Owned<IConstWorkUnit> wu = factory->openWorkUnit(helperWuid);
  572. Owned<IConstWUResult> dataResult = wu->getResultBySequence(0);
  573. directSource.setown(new WorkunitDiskDataSource(logicalName, dataResult, helperWuid, username.get(), password.get()));
  574. return directSource->init();
  575. }
  576. bool TranslatedDiskDataSource::compileHelperWU()
  577. {
  578. submitWorkUnit(helperWuid, username, password);
  579. return waitForWorkUnitToCompile(helperWuid);
  580. }
  581. //---------------------------------------------------------------------------
  582. IndirectDiskDataSource::IndirectDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char * _cluster, const char * _username, const char * _password) : DiskDataSource(_logicalName, _diskRecord, _username, _password)
  583. {
  584. cluster.set(_cluster);
  585. username.set(_username);
  586. password.set(_password);
  587. extraFieldsSize = sizeof(offset_t) + sizeof(unsigned short);
  588. totalSize = 0;
  589. }
  590. IndirectDiskDataSource::~IndirectDiskDataSource()
  591. {
  592. if (browseWuid)
  593. {
  594. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  595. factory->deleteWorkUnit(browseWuid);
  596. }
  597. }
  598. bool IndirectDiskDataSource::createBrowseWU()
  599. {
  600. OwnedHqlExpr browseWUcode = buildDiskFileViewerEcl(logicalName, diskRecord);
  601. if (!browseWUcode)
  602. return false;
  603. returnedRecord.set(browseWUcode->queryChild(0)->queryRecord());
  604. // MORE: Where should we get these parameters from ????
  605. StringAttr application("fileViewer");
  606. StringAttr owner("fileViewer");
  607. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  608. Owned<IWorkUnit> workunit = factory->createWorkUnit(application, owner);
  609. workunit->setUser(owner);
  610. workunit->setClusterName(cluster);
  611. StringBuffer jobName;
  612. jobName.append("FileView_for_").append(logicalName);
  613. workunit->setJobName(jobName.str());
  614. StringBuffer eclText;
  615. toECL(browseWUcode, eclText, true);
  616. Owned<IWUQuery> query = workunit->updateQuery();
  617. query->setQueryText(eclText.str());
  618. query->setQueryName(jobName.str());
  619. browseWuid.set(workunit->queryWuid());
  620. return true;
  621. }
  622. bool IndirectDiskDataSource::init()
  623. {
  624. if (!df)
  625. return false;
  626. if (!createBrowseWU())
  627. return false;
  628. //Need to assign the transformed record to meta
  629. bool isGrouped = false; // more not sure this is strictly true...
  630. returnedMeta.setown(new DataSourceMetaData(returnedRecord, 2, true, isGrouped, 0));
  631. transformedMeta.set(returnedMeta);
  632. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, isGrouped, 0));
  633. totalSize = df->getFileSize(true,false);
  634. if (diskMeta->isFixedSize())
  635. totalRows = totalSize / diskMeta->fixedSize();
  636. else
  637. totalRows = UNKNOWN_NUM_ROWS;
  638. return true;
  639. }
  640. bool IndirectDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  641. {
  642. MemoryBuffer temp;
  643. //enter scope....>
  644. {
  645. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  646. Owned<IWorkUnit> wu = factory->updateWorkUnit(browseWuid);
  647. Owned<IWUResult> lower = wu->updateVariableByName(LOWER_LIMIT_ID);
  648. lower->setResultInt(startOffset);
  649. lower->setResultStatus(ResultStatusSupplied);
  650. Owned<IWUResult> dataResult = wu->updateResultBySequence(0);
  651. dataResult->setResultRaw(0, NULL, ResultFormatRaw);
  652. dataResult->setResultStatus(ResultStatusUndefined);
  653. wu->clearExceptions();
  654. if (wu->getState() != WUStateUnknown)
  655. wu->setState(WUStateCompiled);
  656. //Owned<IWUResult> count = wu->updateVariableByName(RECORD_LIMIT_ID);
  657. //count->setResultInt64(fetchSize);
  658. }
  659. //Resubmit the query...
  660. submitWorkUnit(browseWuid, username, password);
  661. WUState finalState = waitForWorkUnitToComplete(browseWuid, -1, true);
  662. if(!((finalState == WUStateCompleted) || (finalState == WUStateWait)))
  663. return false;
  664. //Now extract the results...
  665. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  666. Owned<IConstWorkUnit> wu = factory->openWorkUnit(browseWuid);
  667. Owned<IConstWUResult> dataResult = wu->getResultBySequence(0);
  668. MemoryBuffer2IDataVal xxx(temp); dataResult->getResultRaw(xxx, NULL, NULL);
  669. if (temp.length() == 0)
  670. return false;
  671. RowBlock * rows;
  672. if (returnedMeta->isFixedSize())
  673. rows = new FilePosFixedRowBlock(temp, startRow, startOffset, returnedMeta->fixedSize());
  674. else
  675. rows = new FilePosVariableRowBlock(temp, startRow, startOffset, returnedMeta, true);
  676. cache.addRowsOwn(rows);
  677. return true;
  678. }