fvdisksource.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jliball.hpp"
  15. #include "eclrtl.hpp"
  16. #include "hqlexpr.hpp"
  17. #include "hqlthql.hpp"
  18. #include "fvresultset.ipp"
  19. #include "fileview.hpp"
  20. #include "fvdisksource.ipp"
  21. #include "fvwugen.hpp"
  22. #include "fverror.hpp"
  23. #include "dasess.hpp"
  24. #define DEFAULT_MAX_CSV_SIZE 0x1100
  25. PhysicalFileInfo::PhysicalFileInfo()
  26. {
  27. cachedPart = (unsigned)-1;
  28. totalSize = 0;
  29. }
  30. offset_t getPartSize(IDistributedFilePart & part, unsigned copy)
  31. {
  32. try
  33. {
  34. RemoteFilename rfn;
  35. Owned<IFile> in = createIFile(part.getFilename(rfn,copy));
  36. return in->size();
  37. }
  38. catch (IException * e)
  39. {
  40. e->Release();
  41. }
  42. return (offset_t) -1;
  43. }
  44. void PhysicalFileInfo::init(IDistributedFile * _df)
  45. {
  46. df.set(_df);
  47. totalSize = 0;
  48. Owned<IDistributedFilePartIterator> iter = df->getIterator();
  49. ForEach(*iter)
  50. {
  51. IDistributedFilePart & cur = iter->query();
  52. offset_t partSize = cur.getFileSize(true, false);
  53. if (partSize == -1)
  54. partSize = getPartSize(cur, 0);
  55. if (partSize == -1)
  56. partSize = getPartSize(cur, 1);
  57. if (partSize == -1)
  58. partSize = 0x100000; // force an error when the part is opened.
  59. partSizes.append(partSize);
  60. totalSize += partSize;
  61. }
  62. }
  63. offset_t PhysicalFileInfo::getOptimizedOffset(offset_t offset, unsigned copyLength)
  64. {
  65. offset_t newOffset = 0;
  66. ForEachItemIn(idx, partSizes)
  67. {
  68. offset_t curSize = partSizes.item(idx);
  69. if (offset < curSize)
  70. return newOffset + ((offset) / copyLength) * copyLength;
  71. newOffset += curSize;
  72. offset -= curSize;
  73. }
  74. return newOffset;
  75. }
  76. bool PhysicalFileInfo::readData(MemoryBuffer & out, __int64 startOffset, size32_t copyLength)
  77. {
  78. CriticalBlock procedure(cs);
  79. offset_t chunkOffset = startOffset;
  80. unsigned numParts = partSizes.ordinality();
  81. unsigned part;
  82. offset_t curPartLength;
  83. if (isLocalFpos(startOffset))
  84. {
  85. part = getLocalFposPart(startOffset);
  86. chunkOffset = getLocalFposOffset(startOffset);
  87. if (part >= numParts)
  88. return false;
  89. curPartLength = partSizes.item(part);
  90. }
  91. else
  92. {
  93. for (part = 0; part < numParts; part++)
  94. {
  95. curPartLength = partSizes.item(part);
  96. if (chunkOffset < curPartLength)
  97. break;
  98. chunkOffset -= curPartLength;
  99. }
  100. }
  101. if (part == numParts)
  102. return false;
  103. bool isLast = false;
  104. if (chunkOffset + copyLength >= curPartLength)
  105. {
  106. copyLength = (size32_t)(curPartLength - chunkOffset);
  107. isLast = true;
  108. }
  109. if (part != cachedPart)
  110. {
  111. cachedPart = (unsigned)-1;
  112. cachedFile.clear();
  113. cachedIO.clear();
  114. Owned<IDistributedFilePart> dfp = df->getPart(part);
  115. try
  116. {
  117. RemoteFilename rfn;
  118. cachedFile.setown(createIFile(dfp->getFilename(rfn)));
  119. cachedIO.setown(cachedFile->open(IFOread));
  120. }
  121. catch (IException * e)
  122. {
  123. e->Release();
  124. }
  125. if (!cachedIO)
  126. {
  127. RemoteFilename rfn;
  128. cachedFile.setown(createIFile(dfp->getFilename(rfn,1)));
  129. cachedIO.setown(cachedFile->open(IFOread));
  130. if (!cachedIO)
  131. {
  132. StringBuffer str;
  133. throwError1(FVERR_FailedOpenFile, dfp->getPartName(str).str());
  134. return false;
  135. }
  136. }
  137. if (df->isCompressed())
  138. {
  139. cachedIO.setown(createCompressedFileReader(cachedIO));
  140. if (!cachedIO)
  141. {
  142. StringBuffer str;
  143. throwError1(FVERR_FailedOpenCompressedFile, dfp->getPartName(str).str());
  144. return false;
  145. }
  146. }
  147. cachedPart = part;
  148. }
  149. char * data = (char *)out.clear().reserve(copyLength);
  150. unsigned numGot = cachedIO->read(chunkOffset, copyLength, data);
  151. out.setLength(numGot);
  152. return isLast;
  153. }
  154. void PhysicalFileInfo::close()
  155. {
  156. cachedPart = (unsigned)-1;
  157. cachedFile.clear();
  158. cachedIO.clear();
  159. }
  160. //---------------------------------------------------------------------------
  161. DiskDataSource::DiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password)
  162. {
  163. logicalName.set(_logicalName);
  164. diskRecord.set(_diskRecord);
  165. Owned<IUserDescriptor> udesc;
  166. if(_username != NULL && *_username != '\0')
  167. {
  168. udesc.setown(createUserDescriptor());
  169. udesc->set(_username, _password);
  170. }
  171. df.setown(queryDistributedFileDirectory().lookup(logicalName, udesc.get()));
  172. }
  173. //---------------------------------------------------------------------------
  174. DirectDiskDataSource::DirectDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password) : DiskDataSource(_logicalName, _diskRecord, _username, _password)
  175. {
  176. }
  177. bool DirectDiskDataSource::init()
  178. {
  179. if (!df)
  180. return false;
  181. IPropertyTree & properties = df->queryAttributes();
  182. const char * kind = properties.queryProp("@kind");
  183. bool isGrouped =properties.getPropBool("@grouped");
  184. if (kind && (stricmp(kind, "key") == 0))
  185. throwError1(FVERR_CannotViewKey, logicalName.get());
  186. //Need to assign the transformed record to meta
  187. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, isGrouped, 0));
  188. if (!returnedMeta)
  189. {
  190. returnedMeta.set(diskMeta);
  191. returnedRecordSize.set(returnedMeta);
  192. }
  193. if (!transformedMeta)
  194. transformedMeta.set(returnedMeta);
  195. addFileposition();
  196. physical.init(df);
  197. if (diskMeta->isFixedSize())
  198. {
  199. if (diskMeta->fixedSize() == 0)
  200. throwError1(FVERR_ZeroSizeRecord, logicalName.get());
  201. totalRows = physical.totalSize / diskMeta->fixedSize();
  202. }
  203. else if (properties.hasProp("@recordCount"))
  204. totalRows = properties.getPropInt64("@recordCount");
  205. else
  206. totalRows = UNKNOWN_NUM_ROWS;
  207. readBlockSize = 4 * diskMeta->getRecordSize(NULL);
  208. if (readBlockSize < DISK_BLOCK_SIZE) readBlockSize = DISK_BLOCK_SIZE;
  209. return true;
  210. }
  211. bool DirectDiskDataSource::fetchRowData(MemoryBuffer & out, __int64 offset)
  212. {
  213. physical.readData(out, offset, returnedMeta->getMaxRecordSize());
  214. if (out.length() == 0)
  215. return false;
  216. out.setLength(returnedMeta->getRecordSize(out.toByteArray()));
  217. return true;
  218. }
  219. size32_t DirectDiskDataSource::getCopyLength()
  220. {
  221. size32_t copyLength = readBlockSize;
  222. if (returnedMeta->isFixedSize())
  223. {
  224. unsigned fixedSize = returnedMeta->fixedSize();
  225. copyLength = (copyLength / fixedSize) * fixedSize;
  226. }
  227. return copyLength;
  228. }
  229. void DirectDiskDataSource::improveLocation(__int64 row, RowLocation & location)
  230. {
  231. if (!returnedMeta->isFixedSize())
  232. return;
  233. //Align the row so the chunks don't overlap....
  234. unsigned fixedSize = returnedMeta->fixedSize();
  235. size32_t copyLength = getCopyLength();
  236. location.bestOffset = physical.getOptimizedOffset(row * fixedSize, copyLength);
  237. location.bestRow = location.bestOffset / fixedSize;
  238. }
  239. bool DirectDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  240. {
  241. size32_t copyLength = getCopyLength();
  242. MemoryBuffer temp;
  243. bool isLast = physical.readData(temp, startOffset, copyLength);
  244. if (temp.length() == 0)
  245. return false;
  246. RowBlock * rows;
  247. if (returnedMeta->isFixedSize())
  248. rows = new FixedRowBlock(temp, startRow, startOffset, returnedMeta->fixedSize());
  249. else
  250. rows = new VariableRowBlock(temp, startRow, startOffset, returnedRecordSize, isLast);
  251. cache.addRowsOwn(rows);
  252. return true;
  253. }
  254. void DirectDiskDataSource::onClose()
  255. {
  256. DiskDataSource::onClose();
  257. if (openCount == 0)
  258. physical.close();
  259. }
  260. //---------------------------------------------------------------------------
  261. UtfReader::UtfFormat getFormat(const char * format)
  262. {
  263. if (memicmp(format, "utf", 3) == 0)
  264. {
  265. const char * tail = format + 3;
  266. if (*tail == '-')
  267. tail++;
  268. if (stricmp(tail, "8N")==0)
  269. return UtfReader::Utf8;
  270. else if (stricmp(tail, "16BE")==0)
  271. return UtfReader::Utf16be;
  272. else if (stricmp(tail, "16LE")==0)
  273. return UtfReader::Utf16le;
  274. else if (stricmp(tail, "32BE")==0)
  275. return UtfReader::Utf32be;
  276. else if (stricmp(tail, "32LE")==0)
  277. return UtfReader::Utf32le;
  278. else
  279. throwError1(FVERR_UnknownUTFFormat, format);
  280. }
  281. return UtfReader::Utf8;
  282. }
  283. enum { NONE, TERMINATOR };
  284. void CsvRecordSize::init(IDistributedFile * df)
  285. {
  286. IPropertyTree * props = &df->queryAttributes();
  287. UtfReader::UtfFormat utfType = getFormat(props->queryProp("@format"));
  288. switch (utfType)
  289. {
  290. case UtfReader::Utf16be:
  291. case UtfReader::Utf16le:
  292. unitSize = 2;
  293. break;
  294. case UtfReader::Utf32be:
  295. case UtfReader::Utf32le:
  296. unitSize = 4;
  297. break;
  298. default:
  299. unitSize = 1;
  300. }
  301. maxRecordSize = props->getPropInt("@maxRecordSize", DEFAULT_MAX_CSV_SIZE);
  302. const char * terminate = props->queryProp("@csvTerminate");
  303. addUtfActionList(matcher, terminate ? terminate : "\\n,\\r\\n", TERMINATOR, NULL, utfType);
  304. }
  305. size32_t CsvRecordSize::getRecordLength(size32_t maxLength, const void * start, bool includeTerminator)
  306. {
  307. //If we need more complicated processing...
  308. const byte * cur = (const byte *)start;
  309. const byte * end = (const byte *)start + maxLength;
  310. while (cur != end)
  311. {
  312. unsigned matchLen;
  313. unsigned match = matcher.getMatch(end-cur, (const char *)cur, matchLen);
  314. switch (match & 255)
  315. {
  316. case NONE:
  317. cur += unitSize; // matchLen == 0;
  318. break;
  319. case TERMINATOR:
  320. if (includeTerminator)
  321. return cur + matchLen - (const byte *)start;
  322. return cur - (const byte *)start;
  323. }
  324. cur += matchLen;
  325. }
  326. return end - (const byte *)start;
  327. }
  328. size32_t CsvRecordSize::getRecordSize(const void * start)
  329. {
  330. if (!start) return maxRecordSize;
  331. return getRecordLength(maxRecordSize, start, true);
  332. }
  333. size32_t CsvRecordSize::getRecordSize(unsigned maxLength, const void * start)
  334. {
  335. if (!start) return maxRecordSize;
  336. return getRecordLength(maxLength, start, true);
  337. }
  338. size32_t CsvRecordSize::getFixedSize() const
  339. {
  340. return 0; // is variable
  341. }
  342. DirectCsvDiskDataSource::DirectCsvDiskDataSource(IDistributedFile * _df, const char * _format)
  343. {
  344. df.set(_df);
  345. isUnicode = (memicmp(_format, "utf", 3) == 0);
  346. utfFormat = getFormat(_format);
  347. returnedMeta.setown(new DataSourceMetaData(isUnicode ? type_unicode : type_string));
  348. returnedRecordSize.set(&recordSizer);
  349. transformedMeta.set(returnedMeta);
  350. addFileposition();
  351. IPropertyTree & properties = df->queryAttributes();
  352. if (properties.hasProp("@recordCount"))
  353. totalRows = properties.getPropInt64("@recordCount");
  354. }
  355. bool DirectCsvDiskDataSource::init()
  356. {
  357. physical.init(df);
  358. recordSizer.init(df);
  359. readBlockSize = 4 * recordSizer.getRecordSize(NULL);
  360. if (readBlockSize < DISK_BLOCK_SIZE) readBlockSize = DISK_BLOCK_SIZE;
  361. return true;
  362. }
  363. void DirectCsvDiskDataSource::copyRow(MemoryBuffer & out, size32_t length, const void * data)
  364. {
  365. if (isUnicode)
  366. {
  367. unsigned offsetOfLength = out.length();
  368. out.append(length);
  369. convertUtf(out, UtfReader::Utf16le, length, data, utfFormat);
  370. unsigned savedLength = out.length();
  371. out.setWritePos(offsetOfLength);
  372. out.append((unsigned) (savedLength - offsetOfLength - sizeof(unsigned))/2);
  373. out.setWritePos(savedLength);
  374. }
  375. else
  376. {
  377. out.append(length);
  378. out.append(length, data);
  379. }
  380. }
  381. bool DirectCsvDiskDataSource::fetchRowData(MemoryBuffer & out, __int64 offset)
  382. {
  383. MemoryBuffer temp;
  384. physical.readData(temp, offset, recordSizer.getRecordSize(NULL));
  385. if (temp.length() == 0)
  386. return false;
  387. unsigned realLength = recordSizer.getRecordSize(temp.length(), temp.toByteArray());
  388. copyRow(out, realLength, temp.toByteArray());
  389. return true;
  390. }
  391. bool DirectCsvDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  392. {
  393. size32_t copyLength = readBlockSize;
  394. MemoryBuffer temp;
  395. bool isLast = physical.readData(temp, startOffset, copyLength);
  396. if (temp.length() == 0)
  397. return false;
  398. RowBlock * rows = new VariableRowBlock(temp, startRow, startOffset, &recordSizer, isLast);
  399. cache.addRowsOwn(rows);
  400. return true;
  401. }
  402. bool DirectCsvDiskDataSource::getRow(MemoryBuffer & out, __int64 row)
  403. {
  404. size32_t length;
  405. const void * data;
  406. unsigned __int64 offset = 0;
  407. if (getRowData(row, length, data, offset))
  408. {
  409. //strip the end of line terminator from the length...
  410. length = recordSizer.getRecordLength(length, data, false);
  411. copyRow(out, length, data);
  412. out.append(offset);
  413. return true;
  414. }
  415. return false;
  416. }
  417. //---------------------------------------------------------------------------
  418. WorkunitDiskDataSource::WorkunitDiskDataSource(const char * _logicalName, IConstWUResult * _wuResult, const char * _wuid, const char * _username, const char * _password) : DirectDiskDataSource(_logicalName, NULL, _username, _password)
  419. {
  420. wuid.set(_wuid);
  421. wuResult.set(_wuResult);
  422. }
  423. bool WorkunitDiskDataSource::init()
  424. {
  425. if (!setReturnedInfoFromResult())
  426. return false;
  427. diskRecord.set(returnedRecord);
  428. return DirectDiskDataSource::init();
  429. }
  430. //---------------------------------------------------------------------------
  431. TranslatedDiskDataSource::TranslatedDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char * _cluster, const char * _username, const char * _password)
  432. {
  433. logicalName.set(_logicalName);
  434. diskRecord.set(_diskRecord);
  435. cluster.set(_cluster);
  436. username.set(_username);
  437. password.set(_password);
  438. openCount = 0;
  439. }
  440. TranslatedDiskDataSource::~TranslatedDiskDataSource()
  441. {
  442. if (helperWuid)
  443. {
  444. directSource.clear();
  445. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  446. factory->deleteWorkUnit(helperWuid);
  447. }
  448. }
  449. bool TranslatedDiskDataSource::createHelperWU()
  450. {
  451. OwnedHqlExpr browseWUcode = buildDiskOutputEcl(logicalName, diskRecord);
  452. if (!browseWUcode)
  453. return false;
  454. // MORE: Where should we get these parameters from ????
  455. StringAttr application("fileViewer");
  456. StringAttr customerid("viewer");
  457. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  458. Owned<IWorkUnit> workunit = factory->createWorkUnit(NULL, application, username);
  459. workunit->setUser(username);
  460. workunit->setClusterName(cluster);
  461. workunit->setCustomerId(customerid);
  462. workunit->setAction(WUActionCompile);
  463. StringBuffer jobName;
  464. jobName.append("FileView_for_").append(logicalName);
  465. workunit->setJobName(jobName.str());
  466. StringBuffer eclText;
  467. toECL(browseWUcode, eclText, true);
  468. Owned<IWUQuery> query = workunit->updateQuery();
  469. query->setQueryText(eclText.str());
  470. query->setQueryName(jobName.str());
  471. workunit->setCompareMode(CompareModeOff);
  472. StringAttrAdaptor xxx(helperWuid); workunit->getWuid(xxx);
  473. return true;
  474. }
  475. bool TranslatedDiskDataSource::init()
  476. {
  477. if (!createHelperWU() || !compileHelperWU())
  478. return false;
  479. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  480. Owned<IConstWorkUnit> wu = factory->openWorkUnit(helperWuid, false);
  481. Owned<IConstWUResult> dataResult = wu->getResultBySequence(0);
  482. directSource.setown(new WorkunitDiskDataSource(logicalName, dataResult, helperWuid, username.get(), password.get()));
  483. return directSource->init();
  484. }
  485. bool TranslatedDiskDataSource::compileHelperWU()
  486. {
  487. submitWorkUnit(helperWuid, username, password);
  488. return waitForWorkUnitToCompile(helperWuid);
  489. }
  490. //---------------------------------------------------------------------------
  491. IndirectDiskDataSource::IndirectDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char * _cluster, const char * _username, const char * _password) : DiskDataSource(_logicalName, _diskRecord, _username, _password)
  492. {
  493. cluster.set(_cluster);
  494. username.set(_username);
  495. password.set(_password);
  496. extraFieldsSize = sizeof(offset_t) + sizeof(unsigned short);
  497. totalSize = 0;
  498. }
  499. IndirectDiskDataSource::~IndirectDiskDataSource()
  500. {
  501. if (browseWuid)
  502. {
  503. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  504. factory->deleteWorkUnit(browseWuid);
  505. }
  506. }
  507. bool IndirectDiskDataSource::createBrowseWU()
  508. {
  509. OwnedHqlExpr browseWUcode = buildDiskFileViewerEcl(logicalName, diskRecord);
  510. if (!browseWUcode)
  511. return false;
  512. returnedRecord.set(browseWUcode->queryChild(0)->queryRecord());
  513. // MORE: Where should we get these parameters from ????
  514. StringAttr application("fileViewer");
  515. StringAttr owner("fileViewer");
  516. StringAttr customerid("viewer");
  517. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  518. Owned<IWorkUnit> workunit = factory->createWorkUnit(NULL, application, owner);
  519. workunit->setUser(owner);
  520. workunit->setClusterName(cluster);
  521. workunit->setCustomerId(customerid);
  522. StringBuffer jobName;
  523. jobName.append("FileView_for_").append(logicalName);
  524. workunit->setJobName(jobName.str());
  525. StringBuffer eclText;
  526. toECL(browseWUcode, eclText, true);
  527. Owned<IWUQuery> query = workunit->updateQuery();
  528. query->setQueryText(eclText.str());
  529. query->setQueryName(jobName.str());
  530. workunit->setCompareMode(CompareModeOff);
  531. StringAttrAdaptor xxx(browseWuid); workunit->getWuid(xxx);
  532. return true;
  533. }
  534. bool IndirectDiskDataSource::init()
  535. {
  536. if (!df)
  537. return false;
  538. if (!createBrowseWU())
  539. return false;
  540. //Need to assign the transformed record to meta
  541. bool isGrouped = false; // more not sure this is strictly true...
  542. returnedMeta.setown(new DataSourceMetaData(returnedRecord, 2, true, isGrouped, 0));
  543. transformedMeta.set(returnedMeta);
  544. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, isGrouped, 0));
  545. totalSize = df->getFileSize(true,false);
  546. if (diskMeta->isFixedSize())
  547. totalRows = totalSize / diskMeta->fixedSize();
  548. else
  549. totalRows = UNKNOWN_NUM_ROWS;
  550. return true;
  551. }
  552. bool IndirectDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  553. {
  554. MemoryBuffer temp;
  555. //enter scope....>
  556. {
  557. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  558. Owned<IWorkUnit> wu = factory->updateWorkUnit(browseWuid);
  559. Owned<IWUResult> lower = wu->updateVariableByName(LOWER_LIMIT_ID);
  560. lower->setResultInt(startOffset);
  561. lower->setResultStatus(ResultStatusSupplied);
  562. Owned<IWUResult> dataResult = wu->updateResultBySequence(0);
  563. dataResult->setResultRaw(0, NULL, ResultFormatRaw);
  564. dataResult->setResultStatus(ResultStatusUndefined);
  565. wu->clearExceptions();
  566. if (wu->getState() != WUStateUnknown)
  567. wu->setState(WUStateCompiled);
  568. //Owned<IWUResult> count = wu->updateVariableByName(RECORD_LIMIT_ID);
  569. //count->setResultInt64(fetchSize);
  570. }
  571. //Resubmit the query...
  572. submitWorkUnit(browseWuid, username, password);
  573. WUState finalState = waitForWorkUnitToComplete(browseWuid, -1, true);
  574. if(!((finalState == WUStateCompleted) || (finalState == WUStateWait)))
  575. return false;
  576. //Now extract the results...
  577. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  578. Owned<IConstWorkUnit> wu = factory->openWorkUnit(browseWuid, false);
  579. Owned<IConstWUResult> dataResult = wu->getResultBySequence(0);
  580. MemoryBuffer2IDataVal xxx(temp); dataResult->getResultRaw(xxx, NULL, NULL);
  581. if (temp.length() == 0)
  582. return false;
  583. RowBlock * rows;
  584. if (returnedMeta->isFixedSize())
  585. rows = new FilePosFixedRowBlock(temp, startRow, startOffset, returnedMeta->fixedSize());
  586. else
  587. rows = new FilePosVariableRowBlock(temp, startRow, startOffset, returnedMeta, true);
  588. cache.addRowsOwn(rows);
  589. return true;
  590. }