thdiskbase.cpp 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "rmtfile.hpp"
  14. #include "dadfs.hpp"
  15. #define NO_BWD_COMPAT_MAXSIZE
  16. #include "thorcommon.ipp"
  17. #include "thmem.hpp"
  18. #include "thmfilemanager.hpp"
  19. #include "eclhelper.hpp"
  20. #include "thexception.hpp"
  21. #include "eclhelper.hpp" // tmp for IHThorArg interface
  22. #include "thdiskbase.ipp"
  23. CDiskReadMasterBase::CDiskReadMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(info->queryJob(), diskReadRemoteStatistics)
  24. {
  25. hash = NULL;
  26. inputProgress.setown(new ProgressInfo(queryJob()));
  27. }
  28. void CDiskReadMasterBase::init()
  29. {
  30. CMasterActivity::init();
  31. IHThorDiskReadBaseArg *helper = (IHThorDiskReadBaseArg *) queryHelper();
  32. bool mangle = 0 != (helper->getFlags() & (TDXtemporary|TDXjobtemp));
  33. OwnedRoxieString helperFileName = helper->getFileName();
  34. StringBuffer expandedFileName;
  35. queryThorFileManager().addScope(container.queryJob(), helperFileName, expandedFileName, mangle);
  36. fileName.set(expandedFileName);
  37. Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), helperFileName, 0 != ((TDXtemporary|TDXjobtemp) & helper->getFlags()), 0 != (TDRoptional & helper->getFlags()), true);
  38. if (file)
  39. {
  40. if (isFileKey(file))
  41. throw MakeActivityException(this, 0, "Attempting to read index as a flat file: %s", helperFileName.get());
  42. if (file->isExternal() && (helper->getFlags() & TDXcompress))
  43. file->queryAttributes().setPropBool("@blockCompressed", true);
  44. if (file->numParts() > 1)
  45. fileDesc.setown(getConfiguredFileDescriptor(*file));
  46. else
  47. fileDesc.setown(file->getFileDescriptor());
  48. reInit = 0 != (helper->getFlags() & (TDXvarfilename|TDXdynamicfilename));
  49. if (container.queryLocal() || helper->canMatchAny()) // if local, assume may match
  50. {
  51. bool temp = 0 != (TDXtemporary & helper->getFlags());
  52. bool local;
  53. if (temp)
  54. local = false;
  55. else
  56. local = container.queryLocal();
  57. mapping.setown(getFileSlaveMaps(file->queryLogicalName(), *fileDesc, container.queryJob().queryUserDescriptor(), container.queryJob().querySlaveGroup(), local, false, hash, file->querySuperFile()));
  58. addReadFile(file, temp);
  59. }
  60. if (0 != (helper->getFlags() & TDRfilenamecallback)) // only get/serialize if using virtual file name fields
  61. {
  62. IDistributedSuperFile *super = file->querySuperFile();
  63. if (super)
  64. {
  65. unsigned numsubs = super->numSubFiles(true);
  66. unsigned s=0;
  67. for (; s<numsubs; s++)
  68. {
  69. IDistributedFile &subfile = super->querySubFile(s, true);
  70. subfileLogicalFilenames.append(subfile.queryLogicalName());
  71. }
  72. }
  73. }
  74. validateFile(file);
  75. void *ekey;
  76. size32_t ekeylen;
  77. helper->getEncryptKey(ekeylen,ekey);
  78. bool encrypted = fileDesc->queryProperties().getPropBool("@encrypted");
  79. if (0 != ekeylen)
  80. {
  81. memset(ekey,0,ekeylen);
  82. free(ekey);
  83. if (!encrypted)
  84. {
  85. Owned<IException> e = MakeActivityWarning(&container, TE_EncryptionMismatch, "Ignoring encryption key provided as file '%s' was not published as encrypted", fileName.get());
  86. queryJobChannel().fireException(e);
  87. }
  88. }
  89. else if (encrypted)
  90. throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", fileName.get());
  91. }
  92. }
  93. void CDiskReadMasterBase::serializeSlaveData(MemoryBuffer &dst, unsigned slave)
  94. {
  95. IHThorDiskReadBaseArg *helper = (IHThorDiskReadBaseArg *) queryHelper();
  96. dst.append(fileName);
  97. dst.append(subfileLogicalFilenames.ordinality());
  98. if (subfileLogicalFilenames.ordinality())
  99. {
  100. ForEachItemIn(s, subfileLogicalFilenames)
  101. dst.append(subfileLogicalFilenames.item(s));
  102. }
  103. if (mapping)
  104. mapping->serializeMap(slave, dst);
  105. else
  106. CSlavePartMapping::serializeNullMap(dst);
  107. }
  108. void CDiskReadMasterBase::deserializeStats(unsigned node, MemoryBuffer &mb)
  109. {
  110. CMasterActivity::deserializeStats(node, mb);
  111. rowcount_t progress;
  112. mb.read(progress);
  113. inputProgress->set(node, progress);
  114. diskStats.deserializeMerge(node, mb);
  115. }
  116. void CDiskReadMasterBase::getActivityStats(IStatisticGatherer & stats)
  117. {
  118. CMasterActivity::getActivityStats(stats);
  119. diskStats.getStats(stats);
  120. }
  121. void CDiskReadMasterBase::getEdgeStats(IStatisticGatherer & stats, unsigned idx)
  122. {
  123. //This should be an activity stats
  124. CMasterActivity::getEdgeStats(stats, idx);
  125. inputProgress->processInfo();
  126. stats.addStatistic(StNumDiskRowsRead, inputProgress->queryTotal());
  127. }
  128. /////////////////
  129. void CWriteMasterBase::init()
  130. {
  131. published = false;
  132. recordsProcessed = 0;
  133. bool mangle = 0 != (diskHelperBase->getFlags() & (TDXtemporary|TDXjobtemp));
  134. OwnedRoxieString helperFileName = diskHelperBase->getFileName();
  135. StringBuffer expandedFileName;
  136. queryThorFileManager().addScope(container.queryJob(), helperFileName, expandedFileName, mangle);
  137. fileName.set(expandedFileName);
  138. dlfn.set(fileName);
  139. if (diskHelperBase->getFlags() & TDWextend)
  140. {
  141. assertex(0 == (diskHelperBase->getFlags() & (TDXtemporary|TDXjobtemp)));
  142. Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), helperFileName, false, true);
  143. if (file.get())
  144. {
  145. fileDesc.setown(file->getFileDescriptor());
  146. queryThorFileManager().noteFileRead(container.queryJob(), file, true);
  147. }
  148. }
  149. if (dlfn.isExternal())
  150. mpTag = container.queryJob().allocateMPTag(); // used
  151. if (NULL == fileDesc.get())
  152. {
  153. bool overwriteok = 0!=(TDWoverwrite & diskHelperBase->getFlags());
  154. unsigned idx=0;
  155. while (true)
  156. {
  157. OwnedRoxieString cluster(diskHelperBase->getCluster(idx));
  158. if(!cluster)
  159. break;
  160. clusters.append(cluster);
  161. idx++;
  162. }
  163. IArrayOf<IGroup> groups;
  164. fillClusterArray(container.queryJob(), fileName, clusters, groups);
  165. fileDesc.setown(queryThorFileManager().create(container.queryJob(), fileName, clusters, groups, overwriteok, diskHelperBase->getFlags()));
  166. if (1 == groups.ordinality())
  167. targetOffset = getGroupOffset(groups.item(0), container.queryJob().querySlaveGroup());
  168. IPropertyTree &props = fileDesc->queryProperties();
  169. if (diskHelperBase->getFlags() & (TDWowned|TDXjobtemp|TDXtemporary))
  170. props.setPropBool("@owned", true);
  171. if (diskHelperBase->getFlags() & TDWresult)
  172. props.setPropBool("@result", true);
  173. const char *rececl= diskHelperBase->queryRecordECL();
  174. if (rececl&&*rececl)
  175. props.setProp("ECL", rececl);
  176. setRtlFormat(props, diskHelperBase->queryDiskRecordSize());
  177. bool blockCompressed=false;
  178. void *ekey;
  179. size32_t ekeylen;
  180. diskHelperBase->getEncryptKey(ekeylen,ekey);
  181. if (ekeylen)
  182. {
  183. memset(ekey,0,ekeylen);
  184. free(ekey);
  185. props.setPropBool("@encrypted", true);
  186. blockCompressed = true;
  187. }
  188. else if (0 != (diskHelperBase->getFlags() & TDWnewcompress) || 0 != (diskHelperBase->getFlags() & TDXcompress))
  189. blockCompressed = true;
  190. if (blockCompressed)
  191. props.setPropBool("@blockCompressed", true);
  192. props.setProp("@kind", "flat");
  193. if (((TAKdiskwrite == container.getKind()) || (TAKspillwrite == container.getKind())) &&
  194. (0 != (diskHelperBase->getFlags() & TDXtemporary)) && container.queryOwner().queryOwner() && (!container.queryOwner().isGlobal())) // I am in a child query
  195. { // do early, because this will be local act. and will not come back to master until end of owning graph.
  196. publish();
  197. }
  198. }
  199. }
  200. void CWriteMasterBase::publish()
  201. {
  202. if (published) return;
  203. published = true;
  204. if (!(diskHelperBase->getFlags() & (TDXtemporary|TDXjobtemp)))
  205. updateActivityResult(container.queryJob().queryWorkUnit(), diskHelperBase->getFlags(), diskHelperBase->getSequence(), fileName, recordsProcessed);
  206. IPropertyTree &props = fileDesc->queryProperties();
  207. props.setPropInt64("@recordCount", recordsProcessed);
  208. if (0 == (diskHelperBase->getFlags() & TDXtemporary) || container.queryJob().queryUseCheckpoints())
  209. {
  210. if (0 != (diskHelperBase->getFlags() & TDWexpires))
  211. setExpiryTime(props, diskHelperBase->getExpiryDays());
  212. if (TDWupdate & diskHelperBase->getFlags())
  213. {
  214. unsigned eclCRC;
  215. unsigned __int64 totalCRC;
  216. diskHelperBase->getUpdateCRCs(eclCRC, totalCRC);
  217. props.setPropInt("@eclCRC", eclCRC);
  218. props.setPropInt64("@totalCRC", totalCRC);
  219. }
  220. }
  221. container.queryTempHandler()->registerFile(fileName, container.queryOwner().queryGraphId(), diskHelperBase->getTempUsageCount(), TDXtemporary & diskHelperBase->getFlags(), getDiskOutputKind(diskHelperBase->getFlags()), &clusters);
  222. if (!dlfn.isExternal())
  223. {
  224. bool temporary = 0 != (diskHelperBase->getFlags()&TDXtemporary);
  225. if (!temporary && (queryJob().querySlaves() < fileDesc->numParts()))
  226. {
  227. // create empty parts for a fileDesc being published that is larger than this clusters
  228. size32_t recordSize = 0;
  229. IOutputMetaData *diskRowMeta = diskHelperBase->queryDiskRecordSize()->querySerializedDiskMeta();
  230. if (diskRowMeta->isFixedSize() && ((TAKdiskwrite == container.getKind()) || (TAKspillwrite == container.getKind())))
  231. {
  232. recordSize = diskRowMeta->getMinRecordSize();
  233. if (0 != (diskHelperBase->getFlags() & TDXgrouped))
  234. recordSize += 1;
  235. }
  236. unsigned compMethod = COMPRESS_METHOD_LZW;
  237. // rowdiff used if recordSize > 0, else fallback to compMethod
  238. if (getOptBool(THOROPT_COMP_FORCELZW, false))
  239. {
  240. recordSize = 0; // by default if fixed length (recordSize set), row diff compression is used. This forces compMethod.
  241. compMethod = COMPRESS_METHOD_LZW;
  242. }
  243. else if (getOptBool(THOROPT_COMP_FORCEFLZ, false))
  244. compMethod = COMPRESS_METHOD_FASTLZ;
  245. else if (getOptBool(THOROPT_COMP_FORCELZ4, false))
  246. compMethod = COMPRESS_METHOD_LZ4;
  247. bool blockCompressed;
  248. bool compressed = fileDesc->isCompressed(&blockCompressed);
  249. for (unsigned clusterIdx=0; clusterIdx<fileDesc->numClusters(); clusterIdx++)
  250. {
  251. StringBuffer clusterName;
  252. fileDesc->getClusterGroupName(clusterIdx, clusterName, &queryNamedGroupStore());
  253. PROGLOG("Creating blank parts for file '%s', cluster '%s'", fileName.get(), clusterName.str());
  254. unsigned p=0;
  255. while (p<fileDesc->numParts())
  256. {
  257. if (p == targetOffset)
  258. p += queryJob().querySlaves();
  259. IPartDescriptor *partDesc = fileDesc->queryPart(p);
  260. CDateTime createTime, modifiedTime;
  261. for (unsigned c=0; c<partDesc->numCopies(); c++)
  262. {
  263. RemoteFilename rfn;
  264. partDesc->getFilename(c, rfn);
  265. StringBuffer path;
  266. rfn.getPath(path);
  267. try
  268. {
  269. ensureDirectoryForFile(path.str());
  270. OwnedIFile iFile = createIFile(path.str());
  271. Owned<IFileIO> iFileIO;
  272. if (compressed)
  273. iFileIO.setown(createCompressedFileWriter(iFile, recordSize, false, true, NULL, compMethod));
  274. else
  275. iFileIO.setown(iFile->open(IFOcreate));
  276. dbgassertex(iFileIO.get());
  277. iFileIO.clear();
  278. // ensure copies have matching datestamps, as they would do normally (backupnode expects it)
  279. if (partDesc->numCopies() > 1)
  280. {
  281. if (0 == c)
  282. iFile->getTime(&createTime, &modifiedTime, NULL);
  283. else
  284. iFile->setTime(&createTime, &modifiedTime, NULL);
  285. }
  286. }
  287. catch (IException *e)
  288. {
  289. if (0 == c)
  290. throw;
  291. Owned<IThorException> e2 = MakeThorException(e);
  292. e->Release();
  293. e2->setAction(tea_warning);
  294. queryJob().fireException(e2);
  295. }
  296. }
  297. partDesc->queryProperties().setPropInt64("@size", 0);
  298. p++;
  299. }
  300. clusterIdx++;
  301. }
  302. }
  303. queryThorFileManager().publish(container.queryJob(), fileName, *fileDesc, NULL);
  304. }
  305. }
  306. CWriteMasterBase::CWriteMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(info->queryJob(), diskWriteRemoteStatistics)
  307. {
  308. publishReplicatedDone = !globals->getPropBool("@replicateAsync", true);
  309. replicateProgress.setown(new ProgressInfo(queryJob()));
  310. diskHelperBase = (IHThorDiskWriteArg *)queryHelper();
  311. targetOffset = 0;
  312. }
  313. void CWriteMasterBase::deserializeStats(unsigned node, MemoryBuffer &mb)
  314. {
  315. CMasterActivity::deserializeStats(node, mb);
  316. unsigned repPerc;
  317. mb.read(repPerc);
  318. replicateProgress->set(node, repPerc);
  319. diskStats.deserializeMerge(node, mb);
  320. }
  321. void CWriteMasterBase::getActivityStats(IStatisticGatherer & stats)
  322. {
  323. CMasterActivity::getActivityStats(stats);
  324. if (publishReplicatedDone)
  325. {
  326. replicateProgress->processInfo();
  327. stats.addStatistic(StPerReplicated, replicateProgress->queryAverage() * 10000);
  328. }
  329. diskStats.getStats(stats);
  330. }
  331. void CWriteMasterBase::preStart(size32_t parentExtractSz, const byte *parentExtract)
  332. {
  333. CMasterActivity::preStart(parentExtractSz, parentExtract);
  334. if (TAKdiskwrite == container.getKind())
  335. {
  336. if (0 == ((TDXvarfilename|TDXtemporary|TDXjobtemp) & diskHelperBase->getFlags()))
  337. {
  338. OwnedRoxieString fname(diskHelperBase->getFileName());
  339. Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), fname, false, true);
  340. if (file)
  341. {
  342. if (0 == ((TDWextend+TDWoverwrite) & diskHelperBase->getFlags()))
  343. throw MakeActivityException(this, TE_OverwriteNotSpecified, "Cannot write %s, file already exists (missing OVERWRITE attribute?)", file->queryLogicalName());
  344. checkSuperFileOwnership(*file);
  345. }
  346. }
  347. }
  348. }
  349. void CWriteMasterBase::serializeSlaveData(MemoryBuffer &dst, unsigned slave)
  350. {
  351. OwnedRoxieString fname(diskHelperBase->getFileName());
  352. dst.append(fileName);
  353. if (diskHelperBase->getFlags() & TDXtemporary)
  354. {
  355. unsigned usageCount = container.queryJob().queryWorkUnit().queryFileUsage(fname);
  356. if (0 == usageCount) usageCount = diskHelperBase->getTempUsageCount();
  357. dst.append(usageCount);
  358. }
  359. if (dlfn.isExternal())
  360. {
  361. fileDesc->queryPart(0)->serialize(dst);
  362. dst.append(mpTag);
  363. }
  364. else
  365. fileDesc->queryPart(targetOffset+slave)->serialize(dst);
  366. }
  367. void CWriteMasterBase::done()
  368. {
  369. CMasterActivity::done();
  370. publish();
  371. if (((TAKdiskwrite == container.getKind()) || (TAKspillwrite == container.getKind())) && (0 != (diskHelperBase->getFlags() & TDXtemporary)) && container.queryOwner().queryOwner()) // I am in a child query
  372. {
  373. published = false;
  374. recordsProcessed = 0;
  375. }
  376. }
  377. void CWriteMasterBase::slaveDone(size32_t slaveIdx, MemoryBuffer &mb)
  378. {
  379. if (mb.length()) // if 0 implies aborted out from this slave.
  380. {
  381. rowcount_t slaveProcessed;
  382. mb.read(slaveProcessed);
  383. recordsProcessed += slaveProcessed;
  384. if (dlfn.isExternal())
  385. return;
  386. offset_t size, physicalSize;
  387. mb.read(size);
  388. mb.read(physicalSize);
  389. unsigned fileCrc;
  390. mb.read(fileCrc);
  391. CDateTime modifiedTime(mb);
  392. IPartDescriptor *partDesc = fileDesc->queryPart(targetOffset+slaveIdx);
  393. IPropertyTree &props = partDesc->queryProperties();
  394. props.setPropInt64("@size", size);
  395. if (fileDesc->isCompressed())
  396. props.setPropInt64("@compressedSize", physicalSize);
  397. props.setPropInt64("@fileCrc", fileCrc);
  398. StringBuffer timeStr;
  399. modifiedTime.getString(timeStr);
  400. props.setProp("@modified", timeStr.str());
  401. }
  402. }
  403. /////////////////
  404. rowcount_t getCount(CActivityBase &activity, unsigned partialResults, rowcount_t limit, mptag_t mpTag)
  405. {
  406. rowcount_t totalCount = 0;
  407. CMessageBuffer msg;
  408. while (partialResults--)
  409. {
  410. rank_t sender;
  411. msg.clear();
  412. if (!activity.receiveMsg(msg, RANK_ALL, mpTag, &sender)) return 0;
  413. if (activity.queryAbortSoon()) return 0;
  414. rowcount_t partialCount;
  415. msg.read(partialCount);
  416. totalCount += (rowcount_t)partialCount;
  417. if (totalCount > limit)
  418. break;
  419. }
  420. return totalCount;
  421. }
  422. const void *getAggregate(CActivityBase &activity, unsigned partialResults, IThorRowInterfaces &rowIf, IHThorCompoundAggregateExtra &aggHelper, mptag_t mpTag)
  423. {
  424. // JCSMORE - pity this isn't common routine with similar one in aggregate, but helper is not common
  425. CThorExpandingRowArray slaveResults(activity, &activity, ers_allow, stableSort_none, true, partialResults);
  426. unsigned _partialResults = partialResults;
  427. while (_partialResults--)
  428. {
  429. CMessageBuffer mb;
  430. rank_t sender;
  431. if (!activity.receiveMsg(mb, RANK_ALL, mpTag, &sender)) return NULL;
  432. if (activity.queryAbortSoon()) return 0;
  433. if (mb.length())
  434. {
  435. CThorStreamDeserializerSource ds(mb.length(), mb.readDirect(mb.length()));
  436. RtlDynamicRowBuilder rowBuilder(rowIf.queryRowAllocator());
  437. size32_t sz = rowIf.queryRowDeserializer()->deserialize(rowBuilder, ds);
  438. slaveResults.setRow(sender-1, rowBuilder.finalizeRowClear(sz));
  439. }
  440. }
  441. RtlDynamicRowBuilder result(rowIf.queryRowAllocator(), false);
  442. size32_t sz = 0;
  443. bool first = true;
  444. _partialResults = 0;
  445. for (;_partialResults<partialResults; _partialResults++)
  446. {
  447. const void *partialResult = slaveResults.query(_partialResults);
  448. if (partialResult)
  449. {
  450. if (first)
  451. {
  452. first = false;
  453. sz = cloneRow(result, partialResult, rowIf.queryRowMetaData());
  454. }
  455. else
  456. sz = aggHelper.mergeAggregate(result, partialResult);
  457. }
  458. }
  459. if (first)
  460. sz = aggHelper.clearAggregate(result);
  461. return result.finalizeRowClear(sz);
  462. }