fvdisksource.cpp 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "eclrtl.hpp"
  15. #include "hqlexpr.hpp"
  16. #include "hqlthql.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fileview.hpp"
  19. #include "fvdisksource.ipp"
  20. #include "fvwugen.hpp"
  21. #include "fverror.hpp"
  22. #include "dasess.hpp"
  23. #define DEFAULT_MAX_CSV_SIZE 0x10000 // 64k
  24. PhysicalFileInfo::PhysicalFileInfo()
  25. {
  26. cachedPart = (unsigned)-1;
  27. totalSize = 0;
  28. }
  29. offset_t getPartSize(IDistributedFilePart & part, unsigned copy)
  30. {
  31. try
  32. {
  33. RemoteFilename rfn;
  34. Owned<IFile> in = createIFile(part.getFilename(rfn,copy));
  35. return in->size();
  36. }
  37. catch (IException * e)
  38. {
  39. e->Release();
  40. }
  41. return (offset_t) -1;
  42. }
  43. void PhysicalFileInfo::init(IDistributedFile * _df)
  44. {
  45. df.set(_df);
  46. totalSize = 0;
  47. Owned<IDistributedFilePartIterator> iter = df->getIterator();
  48. ForEach(*iter)
  49. {
  50. IDistributedFilePart & cur = iter->query();
  51. offset_t partSize = cur.getFileSize(true, false);
  52. if (partSize == -1)
  53. partSize = getPartSize(cur, 0);
  54. if (partSize == -1)
  55. partSize = getPartSize(cur, 1);
  56. if (partSize == -1)
  57. partSize = 0x100000; // force an error when the part is opened.
  58. partSizes.append(partSize);
  59. totalSize += partSize;
  60. }
  61. }
  62. offset_t PhysicalFileInfo::getOptimizedOffset(offset_t offset, unsigned copyLength)
  63. {
  64. offset_t newOffset = 0;
  65. ForEachItemIn(idx, partSizes)
  66. {
  67. offset_t curSize = partSizes.item(idx);
  68. if (offset < curSize)
  69. return newOffset + ((offset) / copyLength) * copyLength;
  70. newOffset += curSize;
  71. offset -= curSize;
  72. }
  73. return newOffset;
  74. }
  75. bool PhysicalFileInfo::readData(MemoryBuffer & out, __int64 startOffset, size32_t copyLength)
  76. {
  77. CriticalBlock procedure(cs);
  78. offset_t chunkOffset = startOffset;
  79. unsigned numParts = partSizes.ordinality();
  80. unsigned part;
  81. offset_t curPartLength;
  82. if (isLocalFpos(startOffset))
  83. {
  84. part = getLocalFposPart(startOffset);
  85. chunkOffset = getLocalFposOffset(startOffset);
  86. if (part >= numParts)
  87. return false;
  88. curPartLength = partSizes.item(part);
  89. }
  90. else
  91. {
  92. for (part = 0; part < numParts; part++)
  93. {
  94. curPartLength = partSizes.item(part);
  95. if (chunkOffset < curPartLength)
  96. break;
  97. chunkOffset -= curPartLength;
  98. }
  99. }
  100. if (part == numParts)
  101. return false;
  102. bool isLast = false;
  103. if (chunkOffset + copyLength >= curPartLength)
  104. {
  105. copyLength = (size32_t)(curPartLength - chunkOffset);
  106. isLast = true;
  107. }
  108. if (part != cachedPart)
  109. {
  110. cachedPart = (unsigned)-1;
  111. cachedFile.clear();
  112. cachedIO.clear();
  113. Owned<IDistributedFilePart> dfp = df->getPart(part);
  114. try
  115. {
  116. RemoteFilename rfn;
  117. cachedFile.setown(createIFile(dfp->getFilename(rfn)));
  118. cachedIO.setown(cachedFile->open(IFOread));
  119. }
  120. catch (IException * e)
  121. {
  122. e->Release();
  123. }
  124. if (!cachedIO)
  125. {
  126. RemoteFilename rfn;
  127. cachedFile.setown(createIFile(dfp->getFilename(rfn,1)));
  128. cachedIO.setown(cachedFile->open(IFOread));
  129. if (!cachedIO)
  130. {
  131. StringBuffer str;
  132. throwError1(FVERR_FailedOpenFile, dfp->getPartName(str).str());
  133. return false;
  134. }
  135. }
  136. if (df->isCompressed())
  137. {
  138. cachedIO.setown(createCompressedFileReader(cachedIO));
  139. if (!cachedIO)
  140. {
  141. StringBuffer str;
  142. throwError1(FVERR_FailedOpenCompressedFile, dfp->getPartName(str).str());
  143. return false;
  144. }
  145. }
  146. cachedPart = part;
  147. }
  148. char * data = (char *)out.clear().reserve(copyLength);
  149. unsigned numGot = cachedIO->read(chunkOffset, copyLength, data);
  150. out.setLength(numGot);
  151. return isLast;
  152. }
  153. void PhysicalFileInfo::close()
  154. {
  155. cachedPart = (unsigned)-1;
  156. cachedFile.clear();
  157. cachedIO.clear();
  158. }
  159. //---------------------------------------------------------------------------
  160. DiskDataSource::DiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password)
  161. {
  162. logicalName.set(_logicalName);
  163. diskRecord.set(_diskRecord);
  164. Owned<IUserDescriptor> udesc;
  165. if(_username != NULL && *_username != '\0')
  166. {
  167. udesc.setown(createUserDescriptor());
  168. udesc->set(_username, _password);
  169. }
  170. df.setown(queryDistributedFileDirectory().lookup(logicalName, udesc.get()));
  171. }
  172. //---------------------------------------------------------------------------
  173. DirectDiskDataSource::DirectDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password) : DiskDataSource(_logicalName, _diskRecord, _username, _password)
  174. {
  175. }
  176. bool DirectDiskDataSource::init()
  177. {
  178. if (!df)
  179. return false;
  180. IPropertyTree & properties = df->queryAttributes();
  181. const char * kind = properties.queryProp("@kind");
  182. bool isGrouped =properties.getPropBool("@grouped");
  183. if (kind && (stricmp(kind, "key") == 0))
  184. throwError1(FVERR_CannotViewKey, logicalName.get());
  185. //Need to assign the transformed record to meta
  186. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, isGrouped, 0));
  187. if (!returnedMeta)
  188. {
  189. returnedMeta.set(diskMeta);
  190. returnedRecordSize.set(returnedMeta);
  191. }
  192. if (!transformedMeta)
  193. transformedMeta.set(returnedMeta);
  194. if (!isWorkunitResult())
  195. addFileposition();
  196. physical.init(df);
  197. if (diskMeta->isFixedSize())
  198. {
  199. if (diskMeta->fixedSize() == 0)
  200. throwError1(FVERR_ZeroSizeRecord, logicalName.get());
  201. totalRows = physical.totalSize / diskMeta->fixedSize();
  202. }
  203. else if (properties.hasProp("@recordCount"))
  204. totalRows = properties.getPropInt64("@recordCount");
  205. else
  206. totalRows = UNKNOWN_NUM_ROWS;
  207. readBlockSize = 4 * diskMeta->getRecordSize(NULL);
  208. if (readBlockSize < DISK_BLOCK_SIZE) readBlockSize = DISK_BLOCK_SIZE;
  209. return true;
  210. }
  211. bool DirectDiskDataSource::fetchRowData(MemoryBuffer & out, __int64 offset)
  212. {
  213. physical.readData(out, offset, returnedMeta->getMaxRecordSize());
  214. if (out.length() == 0)
  215. return false;
  216. size32_t actualLength = returnedMeta->getRecordSize(out.toByteArray());
  217. if (actualLength > readBlockSize)
  218. throwError(FVERR_RowTooLarge);
  219. out.setLength(actualLength);
  220. return true;
  221. }
  222. size32_t DirectDiskDataSource::getCopyLength()
  223. {
  224. size32_t copyLength = readBlockSize;
  225. if (returnedMeta->isFixedSize())
  226. {
  227. unsigned fixedSize = returnedMeta->fixedSize();
  228. copyLength = (copyLength / fixedSize) * fixedSize;
  229. }
  230. return copyLength;
  231. }
  232. void DirectDiskDataSource::improveLocation(__int64 row, RowLocation & location)
  233. {
  234. if (!returnedMeta->isFixedSize())
  235. return;
  236. //Align the row so the chunks don't overlap....
  237. unsigned fixedSize = returnedMeta->fixedSize();
  238. size32_t copyLength = getCopyLength();
  239. location.bestOffset = physical.getOptimizedOffset(row * fixedSize, copyLength);
  240. location.bestRow = location.bestOffset / fixedSize;
  241. }
  242. bool DirectDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  243. {
  244. size32_t copyLength = getCopyLength();
  245. MemoryBuffer temp;
  246. bool isLast = physical.readData(temp, startOffset, copyLength);
  247. if (temp.length() == 0)
  248. return false;
  249. RowBlock * rows;
  250. if (returnedMeta->isFixedSize())
  251. rows = new FixedRowBlock(temp, startRow, startOffset, returnedMeta->fixedSize());
  252. else
  253. rows = new VariableRowBlock(temp, startRow, startOffset, returnedRecordSize, isLast);
  254. cache.addRowsOwn(rows);
  255. return true;
  256. }
  257. void DirectDiskDataSource::onClose()
  258. {
  259. DiskDataSource::onClose();
  260. if (openCount == 0)
  261. physical.close();
  262. }
  263. //---------------------------------------------------------------------------
  264. UtfReader::UtfFormat getFormat(const char * format)
  265. {
  266. if (memicmp(format, "utf", 3) == 0)
  267. {
  268. const char * tail = format + 3;
  269. if (*tail == '-')
  270. tail++;
  271. if (stricmp(tail, "8N")==0)
  272. return UtfReader::Utf8;
  273. else if (stricmp(tail, "16BE")==0)
  274. return UtfReader::Utf16be;
  275. else if (stricmp(tail, "16LE")==0)
  276. return UtfReader::Utf16le;
  277. else if (stricmp(tail, "32BE")==0)
  278. return UtfReader::Utf32be;
  279. else if (stricmp(tail, "32LE")==0)
  280. return UtfReader::Utf32le;
  281. else
  282. throwError1(FVERR_UnknownUTFFormat, format);
  283. }
  284. return UtfReader::Utf8;
  285. }
  286. enum { NONE=0, SEPARATOR=1, TERMINATOR=2, WHITESPACE=3, QUOTE=4, ESCAPE=5 };
  287. void CsvRecordSize::init(IDistributedFile * df)
  288. {
  289. IPropertyTree * props = &df->queryAttributes();
  290. UtfReader::UtfFormat utfType = getFormat(props->queryProp("@format"));
  291. switch (utfType)
  292. {
  293. case UtfReader::Utf16be:
  294. case UtfReader::Utf16le:
  295. unitSize = 2;
  296. break;
  297. case UtfReader::Utf32be:
  298. case UtfReader::Utf32le:
  299. unitSize = 4;
  300. break;
  301. default:
  302. unitSize = 1;
  303. }
  304. maxRecordSize = props->getPropInt("@maxRecordSize", DEFAULT_MAX_CSV_SIZE);
  305. const char * terminate = props->queryProp("@csvTerminate");
  306. addUtfActionList(matcher, terminate ? terminate : "\\n,\\r\\n", TERMINATOR, NULL, utfType);
  307. const char * separate = props->queryProp("@csvSeparate");
  308. addUtfActionList(matcher, separate ? separate : "\\,", SEPARATOR, NULL, utfType);
  309. const char * quote = props->queryProp("@csvQuote");
  310. addUtfActionList(matcher, quote ? quote : "\"", QUOTE, NULL, utfType);
  311. const char * escape = props->queryProp("@csvEscape");
  312. addUtfActionList(matcher, escape, ESCAPE, NULL, utfType);
  313. addUtfActionList(matcher, " ", WHITESPACE, NULL, utfType);
  314. addUtfActionList(matcher, "\t", WHITESPACE, NULL, utfType);
  315. }
  316. size32_t CsvRecordSize::getRecordLength(size32_t maxLength, const void * start, bool includeTerminator)
  317. {
  318. //If we need more complicated processing...
  319. unsigned quote = 0;
  320. unsigned quoteToStrip = 0;
  321. const byte * cur = (const byte *)start;
  322. const byte * end = (const byte *)start + maxLength;
  323. const byte * firstGood = cur;
  324. const byte * lastGood = cur;
  325. bool lastEscape = false;
  326. while (cur != end)
  327. {
  328. unsigned matchLen;
  329. unsigned match = matcher.getMatch(end-cur, (const char *)cur, matchLen);
  330. switch (match & 255)
  331. {
  332. case NONE:
  333. cur += unitSize; // matchLen == 0;
  334. lastGood = cur;
  335. break;
  336. case WHITESPACE:
  337. //Skip leading whitespace
  338. if (quote)
  339. lastGood = cur+matchLen;
  340. else if (cur == firstGood)
  341. {
  342. firstGood = cur+matchLen;
  343. lastGood = cur+matchLen;
  344. }
  345. break;
  346. case SEPARATOR:
  347. // Quoted separator
  348. if (quote == 0)
  349. {
  350. lastEscape = false;
  351. quoteToStrip = 0;
  352. firstGood = cur + matchLen;
  353. }
  354. lastGood = cur+matchLen;
  355. break;
  356. case TERMINATOR:
  357. if (quote == 0) // Is this a good idea? Means a mismatched quote is not fixed by EOL
  358. {
  359. if (includeTerminator)
  360. return cur + matchLen - (const byte *)start;
  361. return cur - (const byte *)start;
  362. }
  363. lastGood = cur+matchLen;
  364. break;
  365. case QUOTE:
  366. // Quoted quote
  367. if (quote == 0)
  368. {
  369. if (cur == firstGood)
  370. {
  371. quote = match;
  372. firstGood = cur+matchLen;
  373. }
  374. lastGood = cur+matchLen;
  375. }
  376. else
  377. {
  378. if (quote == match)
  379. {
  380. const byte * next = cur + matchLen;
  381. //Check for double quotes
  382. if ((next != end))
  383. {
  384. unsigned nextMatchLen;
  385. unsigned nextMatch = matcher.getMatch((size32_t)(end-next), (const char *)next, nextMatchLen);
  386. if (nextMatch == quote)
  387. {
  388. quoteToStrip = quote;
  389. matchLen += nextMatchLen;
  390. lastGood = cur+matchLen;
  391. }
  392. else
  393. quote = 0;
  394. }
  395. else
  396. quote = 0;
  397. }
  398. else
  399. lastGood = cur+matchLen;
  400. }
  401. break;
  402. case ESCAPE:
  403. lastEscape = true;
  404. lastGood = cur+matchLen;
  405. // If this escape is at the end, proceed to field range
  406. if (lastGood == end)
  407. break;
  408. // Skip escape and ignore the next match
  409. cur += matchLen;
  410. match = matcher.getMatch((size32_t)(end-cur), (const char *)cur, matchLen);
  411. if ((match & 255) == NONE)
  412. matchLen = unitSize;
  413. lastGood += matchLen;
  414. break;
  415. }
  416. cur += matchLen;
  417. }
  418. return end - (const byte *)start;
  419. }
  420. size32_t CsvRecordSize::getRecordSize(const void * start)
  421. {
  422. if (!start) return maxRecordSize;
  423. return getRecordLength(maxRecordSize, start, true);
  424. }
  425. size32_t CsvRecordSize::getRecordSize(unsigned maxLength, const void * start)
  426. {
  427. if (!start) return maxRecordSize;
  428. return getRecordLength(maxLength, start, true);
  429. }
  430. size32_t CsvRecordSize::getFixedSize() const
  431. {
  432. return 0; // is variable
  433. }
  434. size32_t CsvRecordSize::getMinRecordSize() const
  435. {
  436. return unitSize;
  437. }
  438. DirectCsvDiskDataSource::DirectCsvDiskDataSource(IDistributedFile * _df, const char * _format)
  439. {
  440. df.set(_df);
  441. isUnicode = (memicmp(_format, "utf", 3) == 0);
  442. utfFormat = getFormat(_format);
  443. returnedMeta.setown(new DataSourceMetaData(isUnicode ? type_unicode : type_string));
  444. returnedRecordSize.set(&recordSizer);
  445. transformedMeta.set(returnedMeta);
  446. addFileposition();
  447. IPropertyTree & properties = df->queryAttributes();
  448. if (properties.hasProp("@recordCount"))
  449. totalRows = properties.getPropInt64("@recordCount");
  450. }
  451. bool DirectCsvDiskDataSource::init()
  452. {
  453. physical.init(df);
  454. recordSizer.init(df);
  455. readBlockSize = 4 * recordSizer.getRecordSize(NULL);
  456. if (readBlockSize < DISK_BLOCK_SIZE) readBlockSize = DISK_BLOCK_SIZE;
  457. return true;
  458. }
  459. void DirectCsvDiskDataSource::copyRow(MemoryBuffer & out, size32_t length, const void * data)
  460. {
  461. if (isUnicode)
  462. {
  463. unsigned offsetOfLength = out.length();
  464. out.append(length);
  465. convertUtf(out, UtfReader::Utf16le, length, data, utfFormat);
  466. unsigned savedLength = out.length();
  467. out.setWritePos(offsetOfLength);
  468. out.append((unsigned) (savedLength - offsetOfLength - sizeof(unsigned))/2);
  469. out.setWritePos(savedLength);
  470. }
  471. else
  472. {
  473. out.append(length);
  474. out.append(length, data);
  475. }
  476. }
  477. bool DirectCsvDiskDataSource::fetchRowData(MemoryBuffer & out, __int64 offset)
  478. {
  479. MemoryBuffer temp;
  480. physical.readData(temp, offset, recordSizer.getRecordSize(NULL));
  481. if (temp.length() == 0)
  482. return false;
  483. unsigned realLength = recordSizer.getRecordSize(temp.length(), temp.toByteArray());
  484. copyRow(out, realLength, temp.toByteArray());
  485. return true;
  486. }
  487. bool DirectCsvDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  488. {
  489. size32_t copyLength = readBlockSize;
  490. MemoryBuffer temp;
  491. bool isLast = physical.readData(temp, startOffset, copyLength);
  492. if (temp.length() == 0)
  493. return false;
  494. RowBlock * rows = new VariableRowBlock(temp, startRow, startOffset, &recordSizer, isLast);
  495. cache.addRowsOwn(rows);
  496. return true;
  497. }
  498. bool DirectCsvDiskDataSource::getRow(MemoryBuffer & out, __int64 row)
  499. {
  500. size32_t length;
  501. const void * data;
  502. unsigned __int64 offset = 0;
  503. if (getRowData(row, length, data, offset))
  504. {
  505. //strip the end of line terminator from the length...
  506. length = recordSizer.getRecordLength(length, data, false);
  507. copyRow(out, length, data);
  508. out.append(offset);
  509. return true;
  510. }
  511. return false;
  512. }
  513. //---------------------------------------------------------------------------
  514. WorkunitDiskDataSource::WorkunitDiskDataSource(const char * _logicalName, IConstWUResult * _wuResult, const char * _wuid, const char * _username, const char * _password) : DirectDiskDataSource(_logicalName, NULL, _username, _password)
  515. {
  516. wuid.set(_wuid);
  517. wuResult.set(_wuResult);
  518. }
  519. bool WorkunitDiskDataSource::init()
  520. {
  521. if (!setReturnedInfoFromResult())
  522. return false;
  523. diskRecord.set(returnedRecord);
  524. return DirectDiskDataSource::init();
  525. }
  526. //---------------------------------------------------------------------------
  527. TranslatedDiskDataSource::TranslatedDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char * _cluster, const char * _username, const char * _password)
  528. {
  529. logicalName.set(_logicalName);
  530. diskRecord.set(_diskRecord);
  531. cluster.set(_cluster);
  532. username.set(_username);
  533. password.set(_password);
  534. openCount = 0;
  535. }
  536. TranslatedDiskDataSource::~TranslatedDiskDataSource()
  537. {
  538. if (helperWuid)
  539. {
  540. directSource.clear();
  541. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  542. factory->deleteWorkUnit(helperWuid);
  543. }
  544. }
  545. bool TranslatedDiskDataSource::createHelperWU()
  546. {
  547. OwnedHqlExpr browseWUcode = buildDiskOutputEcl(logicalName, diskRecord);
  548. if (!browseWUcode)
  549. return false;
  550. // MORE: Where should we get these parameters from ????
  551. StringAttr application("fileViewer");
  552. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  553. Owned<IWorkUnit> workunit = factory->createWorkUnit(application, username);
  554. workunit->setUser(username);
  555. workunit->setClusterName(cluster);
  556. workunit->setAction(WUActionCompile);
  557. StringBuffer jobName;
  558. jobName.append("FileView_for_").append(logicalName);
  559. workunit->setJobName(jobName.str());
  560. StringBuffer eclText;
  561. toECL(browseWUcode, eclText, true);
  562. Owned<IWUQuery> query = workunit->updateQuery();
  563. query->setQueryText(eclText.str());
  564. query->setQueryName(jobName.str());
  565. helperWuid.set(workunit->queryWuid());
  566. return true;
  567. }
  568. bool TranslatedDiskDataSource::init()
  569. {
  570. if (!createHelperWU() || !compileHelperWU())
  571. return false;
  572. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  573. Owned<IConstWorkUnit> wu = factory->openWorkUnit(helperWuid, false);
  574. Owned<IConstWUResult> dataResult = wu->getResultBySequence(0);
  575. directSource.setown(new WorkunitDiskDataSource(logicalName, dataResult, helperWuid, username.get(), password.get()));
  576. return directSource->init();
  577. }
  578. bool TranslatedDiskDataSource::compileHelperWU()
  579. {
  580. submitWorkUnit(helperWuid, username, password);
  581. return waitForWorkUnitToCompile(helperWuid);
  582. }
  583. //---------------------------------------------------------------------------
  584. IndirectDiskDataSource::IndirectDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char * _cluster, const char * _username, const char * _password) : DiskDataSource(_logicalName, _diskRecord, _username, _password)
  585. {
  586. cluster.set(_cluster);
  587. username.set(_username);
  588. password.set(_password);
  589. extraFieldsSize = sizeof(offset_t) + sizeof(unsigned short);
  590. totalSize = 0;
  591. }
  592. IndirectDiskDataSource::~IndirectDiskDataSource()
  593. {
  594. if (browseWuid)
  595. {
  596. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  597. factory->deleteWorkUnit(browseWuid);
  598. }
  599. }
  600. bool IndirectDiskDataSource::createBrowseWU()
  601. {
  602. OwnedHqlExpr browseWUcode = buildDiskFileViewerEcl(logicalName, diskRecord);
  603. if (!browseWUcode)
  604. return false;
  605. returnedRecord.set(browseWUcode->queryChild(0)->queryRecord());
  606. // MORE: Where should we get these parameters from ????
  607. StringAttr application("fileViewer");
  608. StringAttr owner("fileViewer");
  609. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  610. Owned<IWorkUnit> workunit = factory->createWorkUnit(application, owner);
  611. workunit->setUser(owner);
  612. workunit->setClusterName(cluster);
  613. StringBuffer jobName;
  614. jobName.append("FileView_for_").append(logicalName);
  615. workunit->setJobName(jobName.str());
  616. StringBuffer eclText;
  617. toECL(browseWUcode, eclText, true);
  618. Owned<IWUQuery> query = workunit->updateQuery();
  619. query->setQueryText(eclText.str());
  620. query->setQueryName(jobName.str());
  621. browseWuid.set(workunit->queryWuid());
  622. return true;
  623. }
  624. bool IndirectDiskDataSource::init()
  625. {
  626. if (!df)
  627. return false;
  628. if (!createBrowseWU())
  629. return false;
  630. //Need to assign the transformed record to meta
  631. bool isGrouped = false; // more not sure this is strictly true...
  632. returnedMeta.setown(new DataSourceMetaData(returnedRecord, 2, true, isGrouped, 0));
  633. transformedMeta.set(returnedMeta);
  634. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, isGrouped, 0));
  635. totalSize = df->getFileSize(true,false);
  636. if (diskMeta->isFixedSize())
  637. totalRows = totalSize / diskMeta->fixedSize();
  638. else
  639. totalRows = UNKNOWN_NUM_ROWS;
  640. return true;
  641. }
  642. bool IndirectDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  643. {
  644. MemoryBuffer temp;
  645. //enter scope....>
  646. {
  647. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  648. Owned<IWorkUnit> wu = factory->updateWorkUnit(browseWuid);
  649. Owned<IWUResult> lower = wu->updateVariableByName(LOWER_LIMIT_ID);
  650. lower->setResultInt(startOffset);
  651. lower->setResultStatus(ResultStatusSupplied);
  652. Owned<IWUResult> dataResult = wu->updateResultBySequence(0);
  653. dataResult->setResultRaw(0, NULL, ResultFormatRaw);
  654. dataResult->setResultStatus(ResultStatusUndefined);
  655. wu->clearExceptions();
  656. if (wu->getState() != WUStateUnknown)
  657. wu->setState(WUStateCompiled);
  658. //Owned<IWUResult> count = wu->updateVariableByName(RECORD_LIMIT_ID);
  659. //count->setResultInt64(fetchSize);
  660. }
  661. //Resubmit the query...
  662. submitWorkUnit(browseWuid, username, password);
  663. WUState finalState = waitForWorkUnitToComplete(browseWuid, -1, true);
  664. if(!((finalState == WUStateCompleted) || (finalState == WUStateWait)))
  665. return false;
  666. //Now extract the results...
  667. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  668. Owned<IConstWorkUnit> wu = factory->openWorkUnit(browseWuid, false);
  669. Owned<IConstWUResult> dataResult = wu->getResultBySequence(0);
  670. MemoryBuffer2IDataVal xxx(temp); dataResult->getResultRaw(xxx, NULL, NULL);
  671. if (temp.length() == 0)
  672. return false;
  673. RowBlock * rows;
  674. if (returnedMeta->isFixedSize())
  675. rows = new FilePosFixedRowBlock(temp, startRow, startOffset, returnedMeta->fixedSize());
  676. else
  677. rows = new FilePosVariableRowBlock(temp, startRow, startOffset, returnedMeta, true);
  678. cache.addRowsOwn(rows);
  679. return true;
  680. }