fvdisksource.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "eclrtl.hpp"
  15. #include "hqlexpr.hpp"
  16. #include "hqlthql.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fileview.hpp"
  19. #include "fvdisksource.ipp"
  20. #include "fvwugen.hpp"
  21. #include "fverror.hpp"
  22. #include "dasess.hpp"
  23. #define DEFAULT_MAX_CSV_SIZE 0x1100
  24. PhysicalFileInfo::PhysicalFileInfo()
  25. {
  26. cachedPart = (unsigned)-1;
  27. totalSize = 0;
  28. }
  29. offset_t getPartSize(IDistributedFilePart & part, unsigned copy)
  30. {
  31. try
  32. {
  33. RemoteFilename rfn;
  34. Owned<IFile> in = createIFile(part.getFilename(rfn,copy));
  35. return in->size();
  36. }
  37. catch (IException * e)
  38. {
  39. e->Release();
  40. }
  41. return (offset_t) -1;
  42. }
  43. void PhysicalFileInfo::init(IDistributedFile * _df)
  44. {
  45. df.set(_df);
  46. totalSize = 0;
  47. Owned<IDistributedFilePartIterator> iter = df->getIterator();
  48. ForEach(*iter)
  49. {
  50. IDistributedFilePart & cur = iter->query();
  51. offset_t partSize = cur.getFileSize(true, false);
  52. if (partSize == -1)
  53. partSize = getPartSize(cur, 0);
  54. if (partSize == -1)
  55. partSize = getPartSize(cur, 1);
  56. if (partSize == -1)
  57. partSize = 0x100000; // force an error when the part is opened.
  58. partSizes.append(partSize);
  59. totalSize += partSize;
  60. }
  61. }
  62. offset_t PhysicalFileInfo::getOptimizedOffset(offset_t offset, unsigned copyLength)
  63. {
  64. offset_t newOffset = 0;
  65. ForEachItemIn(idx, partSizes)
  66. {
  67. offset_t curSize = partSizes.item(idx);
  68. if (offset < curSize)
  69. return newOffset + ((offset) / copyLength) * copyLength;
  70. newOffset += curSize;
  71. offset -= curSize;
  72. }
  73. return newOffset;
  74. }
  75. bool PhysicalFileInfo::readData(MemoryBuffer & out, __int64 startOffset, size32_t copyLength)
  76. {
  77. CriticalBlock procedure(cs);
  78. offset_t chunkOffset = startOffset;
  79. unsigned numParts = partSizes.ordinality();
  80. unsigned part;
  81. offset_t curPartLength;
  82. if (isLocalFpos(startOffset))
  83. {
  84. part = getLocalFposPart(startOffset);
  85. chunkOffset = getLocalFposOffset(startOffset);
  86. if (part >= numParts)
  87. return false;
  88. curPartLength = partSizes.item(part);
  89. }
  90. else
  91. {
  92. for (part = 0; part < numParts; part++)
  93. {
  94. curPartLength = partSizes.item(part);
  95. if (chunkOffset < curPartLength)
  96. break;
  97. chunkOffset -= curPartLength;
  98. }
  99. }
  100. if (part == numParts)
  101. return false;
  102. bool isLast = false;
  103. if (chunkOffset + copyLength >= curPartLength)
  104. {
  105. copyLength = (size32_t)(curPartLength - chunkOffset);
  106. isLast = true;
  107. }
  108. if (part != cachedPart)
  109. {
  110. cachedPart = (unsigned)-1;
  111. cachedFile.clear();
  112. cachedIO.clear();
  113. Owned<IDistributedFilePart> dfp = df->getPart(part);
  114. try
  115. {
  116. RemoteFilename rfn;
  117. cachedFile.setown(createIFile(dfp->getFilename(rfn)));
  118. cachedIO.setown(cachedFile->open(IFOread));
  119. }
  120. catch (IException * e)
  121. {
  122. e->Release();
  123. }
  124. if (!cachedIO)
  125. {
  126. RemoteFilename rfn;
  127. cachedFile.setown(createIFile(dfp->getFilename(rfn,1)));
  128. cachedIO.setown(cachedFile->open(IFOread));
  129. if (!cachedIO)
  130. {
  131. StringBuffer str;
  132. throwError1(FVERR_FailedOpenFile, dfp->getPartName(str).str());
  133. return false;
  134. }
  135. }
  136. if (df->isCompressed())
  137. {
  138. cachedIO.setown(createCompressedFileReader(cachedIO));
  139. if (!cachedIO)
  140. {
  141. StringBuffer str;
  142. throwError1(FVERR_FailedOpenCompressedFile, dfp->getPartName(str).str());
  143. return false;
  144. }
  145. }
  146. cachedPart = part;
  147. }
  148. char * data = (char *)out.clear().reserve(copyLength);
  149. unsigned numGot = cachedIO->read(chunkOffset, copyLength, data);
  150. out.setLength(numGot);
  151. return isLast;
  152. }
  153. void PhysicalFileInfo::close()
  154. {
  155. cachedPart = (unsigned)-1;
  156. cachedFile.clear();
  157. cachedIO.clear();
  158. }
  159. //---------------------------------------------------------------------------
  160. DiskDataSource::DiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password)
  161. {
  162. logicalName.set(_logicalName);
  163. diskRecord.set(_diskRecord);
  164. Owned<IUserDescriptor> udesc;
  165. if(_username != NULL && *_username != '\0')
  166. {
  167. udesc.setown(createUserDescriptor());
  168. udesc->set(_username, _password);
  169. }
  170. df.setown(queryDistributedFileDirectory().lookup(logicalName, udesc.get()));
  171. }
  172. //---------------------------------------------------------------------------
  173. DirectDiskDataSource::DirectDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password) : DiskDataSource(_logicalName, _diskRecord, _username, _password)
  174. {
  175. }
  176. bool DirectDiskDataSource::init()
  177. {
  178. if (!df)
  179. return false;
  180. IPropertyTree & properties = df->queryAttributes();
  181. const char * kind = properties.queryProp("@kind");
  182. bool isGrouped =properties.getPropBool("@grouped");
  183. if (kind && (stricmp(kind, "key") == 0))
  184. throwError1(FVERR_CannotViewKey, logicalName.get());
  185. //Need to assign the transformed record to meta
  186. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, isGrouped, 0));
  187. if (!returnedMeta)
  188. {
  189. returnedMeta.set(diskMeta);
  190. returnedRecordSize.set(returnedMeta);
  191. }
  192. if (!transformedMeta)
  193. transformedMeta.set(returnedMeta);
  194. addFileposition();
  195. physical.init(df);
  196. if (diskMeta->isFixedSize())
  197. {
  198. if (diskMeta->fixedSize() == 0)
  199. throwError1(FVERR_ZeroSizeRecord, logicalName.get());
  200. totalRows = physical.totalSize / diskMeta->fixedSize();
  201. }
  202. else if (properties.hasProp("@recordCount"))
  203. totalRows = properties.getPropInt64("@recordCount");
  204. else
  205. totalRows = UNKNOWN_NUM_ROWS;
  206. readBlockSize = 4 * diskMeta->getRecordSize(NULL);
  207. if (readBlockSize < DISK_BLOCK_SIZE) readBlockSize = DISK_BLOCK_SIZE;
  208. return true;
  209. }
  210. bool DirectDiskDataSource::fetchRowData(MemoryBuffer & out, __int64 offset)
  211. {
  212. physical.readData(out, offset, returnedMeta->getMaxRecordSize());
  213. if (out.length() == 0)
  214. return false;
  215. size32_t actualLength = returnedMeta->getRecordSize(out.toByteArray());
  216. if (actualLength > readBlockSize)
  217. throwError(FVERR_RowTooLarge);
  218. out.setLength(actualLength);
  219. return true;
  220. }
  221. size32_t DirectDiskDataSource::getCopyLength()
  222. {
  223. size32_t copyLength = readBlockSize;
  224. if (returnedMeta->isFixedSize())
  225. {
  226. unsigned fixedSize = returnedMeta->fixedSize();
  227. copyLength = (copyLength / fixedSize) * fixedSize;
  228. }
  229. return copyLength;
  230. }
  231. void DirectDiskDataSource::improveLocation(__int64 row, RowLocation & location)
  232. {
  233. if (!returnedMeta->isFixedSize())
  234. return;
  235. //Align the row so the chunks don't overlap....
  236. unsigned fixedSize = returnedMeta->fixedSize();
  237. size32_t copyLength = getCopyLength();
  238. location.bestOffset = physical.getOptimizedOffset(row * fixedSize, copyLength);
  239. location.bestRow = location.bestOffset / fixedSize;
  240. }
  241. bool DirectDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  242. {
  243. size32_t copyLength = getCopyLength();
  244. MemoryBuffer temp;
  245. bool isLast = physical.readData(temp, startOffset, copyLength);
  246. if (temp.length() == 0)
  247. return false;
  248. RowBlock * rows;
  249. if (returnedMeta->isFixedSize())
  250. rows = new FixedRowBlock(temp, startRow, startOffset, returnedMeta->fixedSize());
  251. else
  252. rows = new VariableRowBlock(temp, startRow, startOffset, returnedRecordSize, isLast);
  253. cache.addRowsOwn(rows);
  254. return true;
  255. }
  256. void DirectDiskDataSource::onClose()
  257. {
  258. DiskDataSource::onClose();
  259. if (openCount == 0)
  260. physical.close();
  261. }
  262. //---------------------------------------------------------------------------
  263. UtfReader::UtfFormat getFormat(const char * format)
  264. {
  265. if (memicmp(format, "utf", 3) == 0)
  266. {
  267. const char * tail = format + 3;
  268. if (*tail == '-')
  269. tail++;
  270. if (stricmp(tail, "8N")==0)
  271. return UtfReader::Utf8;
  272. else if (stricmp(tail, "16BE")==0)
  273. return UtfReader::Utf16be;
  274. else if (stricmp(tail, "16LE")==0)
  275. return UtfReader::Utf16le;
  276. else if (stricmp(tail, "32BE")==0)
  277. return UtfReader::Utf32be;
  278. else if (stricmp(tail, "32LE")==0)
  279. return UtfReader::Utf32le;
  280. else
  281. throwError1(FVERR_UnknownUTFFormat, format);
  282. }
  283. return UtfReader::Utf8;
  284. }
  285. enum { NONE, TERMINATOR };
  286. void CsvRecordSize::init(IDistributedFile * df)
  287. {
  288. IPropertyTree * props = &df->queryAttributes();
  289. UtfReader::UtfFormat utfType = getFormat(props->queryProp("@format"));
  290. switch (utfType)
  291. {
  292. case UtfReader::Utf16be:
  293. case UtfReader::Utf16le:
  294. unitSize = 2;
  295. break;
  296. case UtfReader::Utf32be:
  297. case UtfReader::Utf32le:
  298. unitSize = 4;
  299. break;
  300. default:
  301. unitSize = 1;
  302. }
  303. maxRecordSize = props->getPropInt("@maxRecordSize", DEFAULT_MAX_CSV_SIZE);
  304. const char * terminate = props->queryProp("@csvTerminate");
  305. addUtfActionList(matcher, terminate ? terminate : "\\n,\\r\\n", TERMINATOR, NULL, utfType);
  306. }
  307. size32_t CsvRecordSize::getRecordLength(size32_t maxLength, const void * start, bool includeTerminator)
  308. {
  309. //If we need more complicated processing...
  310. const byte * cur = (const byte *)start;
  311. const byte * end = (const byte *)start + maxLength;
  312. while (cur != end)
  313. {
  314. unsigned matchLen;
  315. unsigned match = matcher.getMatch(end-cur, (const char *)cur, matchLen);
  316. switch (match & 255)
  317. {
  318. case NONE:
  319. cur += unitSize; // matchLen == 0;
  320. break;
  321. case TERMINATOR:
  322. if (includeTerminator)
  323. return cur + matchLen - (const byte *)start;
  324. return cur - (const byte *)start;
  325. }
  326. cur += matchLen;
  327. }
  328. return end - (const byte *)start;
  329. }
  330. size32_t CsvRecordSize::getRecordSize(const void * start)
  331. {
  332. if (!start) return maxRecordSize;
  333. return getRecordLength(maxRecordSize, start, true);
  334. }
  335. size32_t CsvRecordSize::getRecordSize(unsigned maxLength, const void * start)
  336. {
  337. if (!start) return maxRecordSize;
  338. return getRecordLength(maxLength, start, true);
  339. }
  340. size32_t CsvRecordSize::getFixedSize() const
  341. {
  342. return 0; // is variable
  343. }
  344. DirectCsvDiskDataSource::DirectCsvDiskDataSource(IDistributedFile * _df, const char * _format)
  345. {
  346. df.set(_df);
  347. isUnicode = (memicmp(_format, "utf", 3) == 0);
  348. utfFormat = getFormat(_format);
  349. returnedMeta.setown(new DataSourceMetaData(isUnicode ? type_unicode : type_string));
  350. returnedRecordSize.set(&recordSizer);
  351. transformedMeta.set(returnedMeta);
  352. addFileposition();
  353. IPropertyTree & properties = df->queryAttributes();
  354. if (properties.hasProp("@recordCount"))
  355. totalRows = properties.getPropInt64("@recordCount");
  356. }
  357. bool DirectCsvDiskDataSource::init()
  358. {
  359. physical.init(df);
  360. recordSizer.init(df);
  361. readBlockSize = 4 * recordSizer.getRecordSize(NULL);
  362. if (readBlockSize < DISK_BLOCK_SIZE) readBlockSize = DISK_BLOCK_SIZE;
  363. return true;
  364. }
  365. void DirectCsvDiskDataSource::copyRow(MemoryBuffer & out, size32_t length, const void * data)
  366. {
  367. if (isUnicode)
  368. {
  369. unsigned offsetOfLength = out.length();
  370. out.append(length);
  371. convertUtf(out, UtfReader::Utf16le, length, data, utfFormat);
  372. unsigned savedLength = out.length();
  373. out.setWritePos(offsetOfLength);
  374. out.append((unsigned) (savedLength - offsetOfLength - sizeof(unsigned))/2);
  375. out.setWritePos(savedLength);
  376. }
  377. else
  378. {
  379. out.append(length);
  380. out.append(length, data);
  381. }
  382. }
  383. bool DirectCsvDiskDataSource::fetchRowData(MemoryBuffer & out, __int64 offset)
  384. {
  385. MemoryBuffer temp;
  386. physical.readData(temp, offset, recordSizer.getRecordSize(NULL));
  387. if (temp.length() == 0)
  388. return false;
  389. unsigned realLength = recordSizer.getRecordSize(temp.length(), temp.toByteArray());
  390. copyRow(out, realLength, temp.toByteArray());
  391. return true;
  392. }
  393. bool DirectCsvDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  394. {
  395. size32_t copyLength = readBlockSize;
  396. MemoryBuffer temp;
  397. bool isLast = physical.readData(temp, startOffset, copyLength);
  398. if (temp.length() == 0)
  399. return false;
  400. RowBlock * rows = new VariableRowBlock(temp, startRow, startOffset, &recordSizer, isLast);
  401. cache.addRowsOwn(rows);
  402. return true;
  403. }
  404. bool DirectCsvDiskDataSource::getRow(MemoryBuffer & out, __int64 row)
  405. {
  406. size32_t length;
  407. const void * data;
  408. unsigned __int64 offset = 0;
  409. if (getRowData(row, length, data, offset))
  410. {
  411. //strip the end of line terminator from the length...
  412. length = recordSizer.getRecordLength(length, data, false);
  413. copyRow(out, length, data);
  414. out.append(offset);
  415. return true;
  416. }
  417. return false;
  418. }
  419. //---------------------------------------------------------------------------
  420. WorkunitDiskDataSource::WorkunitDiskDataSource(const char * _logicalName, IConstWUResult * _wuResult, const char * _wuid, const char * _username, const char * _password) : DirectDiskDataSource(_logicalName, NULL, _username, _password)
  421. {
  422. wuid.set(_wuid);
  423. wuResult.set(_wuResult);
  424. }
  425. bool WorkunitDiskDataSource::init()
  426. {
  427. if (!setReturnedInfoFromResult())
  428. return false;
  429. diskRecord.set(returnedRecord);
  430. return DirectDiskDataSource::init();
  431. }
  432. //---------------------------------------------------------------------------
  433. TranslatedDiskDataSource::TranslatedDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char * _cluster, const char * _username, const char * _password)
  434. {
  435. logicalName.set(_logicalName);
  436. diskRecord.set(_diskRecord);
  437. cluster.set(_cluster);
  438. username.set(_username);
  439. password.set(_password);
  440. openCount = 0;
  441. }
  442. TranslatedDiskDataSource::~TranslatedDiskDataSource()
  443. {
  444. if (helperWuid)
  445. {
  446. directSource.clear();
  447. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  448. factory->deleteWorkUnit(helperWuid);
  449. }
  450. }
  451. bool TranslatedDiskDataSource::createHelperWU()
  452. {
  453. OwnedHqlExpr browseWUcode = buildDiskOutputEcl(logicalName, diskRecord);
  454. if (!browseWUcode)
  455. return false;
  456. // MORE: Where should we get these parameters from ????
  457. StringAttr application("fileViewer");
  458. StringAttr customerid("viewer");
  459. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  460. Owned<IWorkUnit> workunit = factory->createWorkUnit(NULL, application, username);
  461. workunit->setUser(username);
  462. workunit->setClusterName(cluster);
  463. workunit->setCustomerId(customerid);
  464. workunit->setAction(WUActionCompile);
  465. StringBuffer jobName;
  466. jobName.append("FileView_for_").append(logicalName);
  467. workunit->setJobName(jobName.str());
  468. StringBuffer eclText;
  469. toECL(browseWUcode, eclText, true);
  470. Owned<IWUQuery> query = workunit->updateQuery();
  471. query->setQueryText(eclText.str());
  472. query->setQueryName(jobName.str());
  473. workunit->setCompareMode(CompareModeOff);
  474. StringAttrAdaptor xxx(helperWuid); workunit->getWuid(xxx);
  475. return true;
  476. }
  477. bool TranslatedDiskDataSource::init()
  478. {
  479. if (!createHelperWU() || !compileHelperWU())
  480. return false;
  481. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  482. Owned<IConstWorkUnit> wu = factory->openWorkUnit(helperWuid, false);
  483. Owned<IConstWUResult> dataResult = wu->getResultBySequence(0);
  484. directSource.setown(new WorkunitDiskDataSource(logicalName, dataResult, helperWuid, username.get(), password.get()));
  485. return directSource->init();
  486. }
  487. bool TranslatedDiskDataSource::compileHelperWU()
  488. {
  489. submitWorkUnit(helperWuid, username, password);
  490. return waitForWorkUnitToCompile(helperWuid);
  491. }
  492. //---------------------------------------------------------------------------
  493. IndirectDiskDataSource::IndirectDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char * _cluster, const char * _username, const char * _password) : DiskDataSource(_logicalName, _diskRecord, _username, _password)
  494. {
  495. cluster.set(_cluster);
  496. username.set(_username);
  497. password.set(_password);
  498. extraFieldsSize = sizeof(offset_t) + sizeof(unsigned short);
  499. totalSize = 0;
  500. }
  501. IndirectDiskDataSource::~IndirectDiskDataSource()
  502. {
  503. if (browseWuid)
  504. {
  505. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  506. factory->deleteWorkUnit(browseWuid);
  507. }
  508. }
  509. bool IndirectDiskDataSource::createBrowseWU()
  510. {
  511. OwnedHqlExpr browseWUcode = buildDiskFileViewerEcl(logicalName, diskRecord);
  512. if (!browseWUcode)
  513. return false;
  514. returnedRecord.set(browseWUcode->queryChild(0)->queryRecord());
  515. // MORE: Where should we get these parameters from ????
  516. StringAttr application("fileViewer");
  517. StringAttr owner("fileViewer");
  518. StringAttr customerid("viewer");
  519. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  520. Owned<IWorkUnit> workunit = factory->createWorkUnit(NULL, application, owner);
  521. workunit->setUser(owner);
  522. workunit->setClusterName(cluster);
  523. workunit->setCustomerId(customerid);
  524. StringBuffer jobName;
  525. jobName.append("FileView_for_").append(logicalName);
  526. workunit->setJobName(jobName.str());
  527. StringBuffer eclText;
  528. toECL(browseWUcode, eclText, true);
  529. Owned<IWUQuery> query = workunit->updateQuery();
  530. query->setQueryText(eclText.str());
  531. query->setQueryName(jobName.str());
  532. workunit->setCompareMode(CompareModeOff);
  533. StringAttrAdaptor xxx(browseWuid); workunit->getWuid(xxx);
  534. return true;
  535. }
  536. bool IndirectDiskDataSource::init()
  537. {
  538. if (!df)
  539. return false;
  540. if (!createBrowseWU())
  541. return false;
  542. //Need to assign the transformed record to meta
  543. bool isGrouped = false; // more not sure this is strictly true...
  544. returnedMeta.setown(new DataSourceMetaData(returnedRecord, 2, true, isGrouped, 0));
  545. transformedMeta.set(returnedMeta);
  546. diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, isGrouped, 0));
  547. totalSize = df->getFileSize(true,false);
  548. if (diskMeta->isFixedSize())
  549. totalRows = totalSize / diskMeta->fixedSize();
  550. else
  551. totalRows = UNKNOWN_NUM_ROWS;
  552. return true;
  553. }
  554. bool IndirectDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  555. {
  556. MemoryBuffer temp;
  557. //enter scope....>
  558. {
  559. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  560. Owned<IWorkUnit> wu = factory->updateWorkUnit(browseWuid);
  561. Owned<IWUResult> lower = wu->updateVariableByName(LOWER_LIMIT_ID);
  562. lower->setResultInt(startOffset);
  563. lower->setResultStatus(ResultStatusSupplied);
  564. Owned<IWUResult> dataResult = wu->updateResultBySequence(0);
  565. dataResult->setResultRaw(0, NULL, ResultFormatRaw);
  566. dataResult->setResultStatus(ResultStatusUndefined);
  567. wu->clearExceptions();
  568. if (wu->getState() != WUStateUnknown)
  569. wu->setState(WUStateCompiled);
  570. //Owned<IWUResult> count = wu->updateVariableByName(RECORD_LIMIT_ID);
  571. //count->setResultInt64(fetchSize);
  572. }
  573. //Resubmit the query...
  574. submitWorkUnit(browseWuid, username, password);
  575. WUState finalState = waitForWorkUnitToComplete(browseWuid, -1, true);
  576. if(!((finalState == WUStateCompleted) || (finalState == WUStateWait)))
  577. return false;
  578. //Now extract the results...
  579. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  580. Owned<IConstWorkUnit> wu = factory->openWorkUnit(browseWuid, false);
  581. Owned<IConstWUResult> dataResult = wu->getResultBySequence(0);
  582. MemoryBuffer2IDataVal xxx(temp); dataResult->getResultRaw(xxx, NULL, NULL);
  583. if (temp.length() == 0)
  584. return false;
  585. RowBlock * rows;
  586. if (returnedMeta->isFixedSize())
  587. rows = new FilePosFixedRowBlock(temp, startRow, startOffset, returnedMeta->fixedSize());
  588. else
  589. rows = new FilePosVariableRowBlock(temp, startRow, startOffset, returnedMeta, true);
  590. cache.addRowsOwn(rows);
  591. return true;
  592. }