thfetch.cpp 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "eclhelper.hpp"
  15. #include "thbufdef.hpp"
  16. #include "mptag.hpp"
  17. #include "dadfs.hpp"
  18. #include "jstats.h"
  19. #include "thexception.hpp"
  20. #include "../hashdistrib/thhashdistrib.ipp"
  21. #include "thfetch.ipp"
  22. class CFetchActivityMaster : public CMasterActivity
  23. {
  24. Owned<CSlavePartMapping> mapping;
  25. MemoryBuffer offsetMapMb;
  26. SocketEndpoint *endpoints;
  27. std::vector<OwnedPtr<CThorStatsCollection>> subFileStats;
  28. protected:
  29. IHThorFetchArg *helper;
  30. public:
  31. CFetchActivityMaster(CMasterGraphElement *info) : CMasterActivity(info, diskReadActivityStatistics)
  32. {
  33. endpoints = NULL;
  34. if (!container.queryLocalOrGrouped())
  35. mpTag = container.queryJob().allocateMPTag();
  36. helper = (IHThorFetchArg *)queryHelper();
  37. reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename));
  38. }
  39. ~CFetchActivityMaster()
  40. {
  41. if (endpoints) free(endpoints);
  42. }
  43. virtual void init()
  44. {
  45. CMasterActivity::init();
  46. OwnedRoxieString fname(helper->getFileName());
  47. Owned<IDistributedFile> fetchFile = queryThorFileManager().lookup(container.queryJob(), fname, false, 0 != (helper->getFetchFlags() & FFdatafileoptional), false, container.activityIsCodeSigned());
  48. if (fetchFile)
  49. {
  50. if (isFileKey(fetchFile))
  51. throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read index as a flat file: %s", fname.get());
  52. Owned<IFileDescriptor> fileDesc = getConfiguredFileDescriptor(*fetchFile);
  53. void *ekey;
  54. size32_t ekeylen;
  55. helper->getFileEncryptKey(ekeylen,ekey);
  56. bool encrypted = fileDesc->queryProperties().getPropBool("@encrypted");
  57. if (0 != ekeylen)
  58. {
  59. memset(ekey,0,ekeylen);
  60. free(ekey);
  61. if (!encrypted)
  62. {
  63. Owned<IException> e = MakeActivityWarning(&container, TE_EncryptionMismatch, "Ignoring encryption key provided as file '%s' was not published as encrypted", fetchFile->queryLogicalName());
  64. queryJobChannel().fireException(e);
  65. }
  66. }
  67. else if (encrypted)
  68. throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", fetchFile->queryLogicalName());
  69. IDistributedSuperFile *super = fetchFile->querySuperFile();
  70. unsigned numsubs = super?super->numSubFiles(true):0;
  71. for (unsigned i=0; i<numsubs; i++)
  72. subFileStats.push_back(new CThorStatsCollection(diskReadActivityStatistics));
  73. mapping.setown(getFileSlaveMaps(fetchFile->queryLogicalName(), *fileDesc, container.queryJob().queryUserDescriptor(), container.queryJob().querySlaveGroup(), container.queryLocalOrGrouped(), false, NULL, super));
  74. mapping->serializeFileOffsetMap(offsetMapMb);
  75. addReadFile(fetchFile);
  76. }
  77. }
  78. virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
  79. {
  80. if (mapping)
  81. {
  82. mapping->serializeMap(slave, dst);
  83. dst.append(offsetMapMb);
  84. }
  85. else
  86. {
  87. CSlavePartMapping::serializeNullMap(dst);
  88. CSlavePartMapping::serializeNullOffsetMap(dst);
  89. }
  90. if (!container.queryLocalOrGrouped())
  91. dst.append((int)mpTag);
  92. }
  93. virtual void deserializeStats(unsigned node, MemoryBuffer &mb) override
  94. {
  95. CMasterActivity::deserializeStats(node, mb);
  96. for (auto &stats: subFileStats)
  97. stats->deserialize(node, mb);
  98. }
  99. virtual void done() override
  100. {
  101. if (!subFileStats.empty())
  102. {
  103. unsigned numSubFiles = subFileStats.size();
  104. for (unsigned i=0; i<numSubFiles; i++)
  105. {
  106. IDistributedFile *file = queryReadFile(i);
  107. if (file)
  108. file->addAttrValue("@numDiskReads", subFileStats[i]->getStatisticSum(StNumDiskReads));
  109. }
  110. }
  111. else
  112. {
  113. IDistributedFile *file = queryReadFile(0);
  114. if (file)
  115. file->addAttrValue("@numDiskReads", statsCollection.getStatisticSum(StNumDiskReads));
  116. }
  117. CMasterActivity::done();
  118. }
  119. };
  120. class CCsvFetchActivityMaster : public CFetchActivityMaster
  121. {
  122. public:
  123. CCsvFetchActivityMaster(CMasterGraphElement *info) : CFetchActivityMaster(info) { }
  124. virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
  125. {
  126. CFetchActivityMaster::serializeSlaveData(dst, slave);
  127. IDistributedFile *fetchFile = queryReadFile(0);
  128. if (fetchFile)
  129. fetchFile->queryAttributes().serialize(dst);
  130. }
  131. };
  132. class CXmlFetchActivityMaster : public CFetchActivityMaster
  133. {
  134. public:
  135. CXmlFetchActivityMaster(CMasterGraphElement *info) : CFetchActivityMaster(info)
  136. {
  137. }
  138. };
  139. CActivityBase *createFetchActivityMaster(CMasterGraphElement *container)
  140. {
  141. return new CFetchActivityMaster(container);
  142. }
  143. CActivityBase *createCsvFetchActivityMaster(CMasterGraphElement *container)
  144. {
  145. return new CCsvFetchActivityMaster(container);
  146. }
  147. CActivityBase *createXmlFetchActivityMaster(CMasterGraphElement *container)
  148. {
  149. return new CXmlFetchActivityMaster(container);
  150. }