fvquerysource.cpp 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "eclrtl.hpp"
  15. #include "hqlexpr.hpp"
  16. #include "hqlthql.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fileview.hpp"
  19. #include "fvquerysource.ipp"
  20. #include "fvwugen.hpp"
  21. QueryDataSource::QueryDataSource(IConstWUResult * _wuResult, const char * _wuid, const char * _username, const char * _password)
  22. {
  23. wuResult.set(_wuResult);
  24. wuid.set(_wuid);
  25. username.set(_username);
  26. password.set(_password);
  27. }
  28. QueryDataSource::~QueryDataSource()
  29. {
  30. if (browseWuid)
  31. {
  32. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  33. factory->deleteWorkUnit(browseWuid);
  34. }
  35. }
  36. bool QueryDataSource::createBrowseWU()
  37. {
  38. StringAttr dataset, datasetDefs;
  39. StringAttrAdaptor a1(dataset), a2(datasetDefs);
  40. wuResult->getResultDataset(a1, a2);
  41. if (!dataset || !datasetDefs)
  42. return false;
  43. StringBuffer fullText;
  44. fullText.append(datasetDefs).append(dataset);
  45. OwnedHqlExpr parsed = parseQuery(fullText.str());
  46. if (!parsed)
  47. return false;
  48. HqlExprAttr selectFields = parsed.getLink();
  49. if (selectFields->getOperator() == no_output)
  50. selectFields.set(selectFields->queryChild(0));
  51. OwnedHqlExpr browseWUcode = buildQueryViewerEcl(selectFields);
  52. if (!browseWUcode)
  53. return false;
  54. returnedRecord.set(browseWUcode->queryChild(0)->queryRecord());
  55. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  56. Owned<IConstWorkUnit> parent = factory->openWorkUnit(wuid);
  57. const char *user = parent->queryUser();
  58. Owned<IWorkUnit> workunit = factory->createWorkUnit("fileViewer", user);
  59. workunit->setUser(user);
  60. workunit->setClusterName(parent->queryClusterName());
  61. browseWuid.set(workunit->queryWuid());
  62. workunit->setDebugValueInt("importImplicitModules", false, true);
  63. workunit->setDebugValueInt("importAllModules", false, true);
  64. workunit->setDebugValueInt("forceFakeThor", 1, true);
  65. StringBuffer jobName;
  66. jobName.append("FileView for ").append(wuid).append(":").append("x");
  67. workunit->setJobName(jobName.str());
  68. StringBuffer eclText;
  69. toECL(browseWUcode, eclText, true);
  70. Owned<IWUQuery> query = workunit->updateQuery();
  71. query->setQueryText(eclText.str());
  72. query->setQueryName(jobName.str());
  73. return true;
  74. }
  75. bool QueryDataSource::init()
  76. {
  77. return createBrowseWU();
  78. }
  79. void QueryDataSource::improveLocation(__int64 row, RowLocation & location)
  80. {
  81. #if 0
  82. if (!diskMeta->isFixedSize())
  83. return;
  84. if (location.bestRow <= row && location.bestRow + DISKREAD_PAGE_SIZE > row)
  85. return;
  86. assertex(row >= 0);
  87. //Align the row so the chunks don't overlap....
  88. location.bestRow = (row / DISKREAD_PAGE_SIZE) * DISKREAD_PAGE_SIZE;
  89. location.bestOffset = location.bestRow * diskMeta->fixedSize();
  90. #endif
  91. }
  92. bool QueryDataSource::loadBlock(__int64 startRow, offset_t startOffset)
  93. {
  94. MemoryBuffer temp;
  95. //enter scope....>
  96. {
  97. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  98. Owned<IWorkUnit> wu = factory->updateWorkUnit(browseWuid);
  99. Owned<IWUResult> lower = wu->updateVariableByName(LOWER_LIMIT_ID);
  100. lower->setResultInt(startOffset);
  101. lower->setResultStatus(ResultStatusSupplied);
  102. Owned<IWUResult> dataResult = wu->updateResultBySequence(0);
  103. dataResult->setResultRaw(0, NULL, ResultFormatRaw);
  104. dataResult->setResultStatus(ResultStatusUndefined);
  105. wu->clearExceptions();
  106. if (wu->getState() != WUStateUnknown)
  107. wu->setState(WUStateCompiled);
  108. //Owned<IWUResult> count = wu->updateVariableByName(RECORD_LIMIT_ID);
  109. //count->setResultInt64(fetchSize);
  110. }
  111. //Resubmit the query...
  112. submitWorkUnit(browseWuid, username, password);
  113. WUState finalState = waitForWorkUnitToComplete(browseWuid, -1, true);
  114. if(!((finalState == WUStateCompleted) || (finalState == WUStateWait)))
  115. return false;
  116. //Now extract the results...
  117. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  118. Owned<IConstWorkUnit> wu = factory->openWorkUnit(browseWuid);
  119. Owned<IConstWUResult> dataResult = wu->getResultBySequence(0);
  120. MemoryBuffer2IDataVal xxx(temp); dataResult->getResultRaw(xxx, NULL, NULL);
  121. if (temp.length() == 0)
  122. return false;
  123. RowBlock * rows;
  124. if (returnedMeta->isFixedSize())
  125. rows = new FilePosFixedRowBlock(temp, startRow, startOffset, returnedMeta->fixedSize());
  126. else
  127. rows = new FilePosVariableRowBlock(temp, startRow, startOffset, returnedMeta, true);
  128. cache.addRowsOwn(rows);
  129. return true;
  130. }