thhashdistrib.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "thhashdistrib.ipp"
  15. #include "eclhelper.hpp"
  16. #include "mptag.hpp"
  17. #include "dasess.hpp"
  18. #include "dadfs.hpp"
  19. #include "jhtree.hpp"
  20. #include "thorport.hpp"
  21. #include "thbufdef.hpp"
  22. #include "thmem.hpp"
  23. #include "thexception.hpp"
  24. #define NUMINPARALLEL 16
  25. enum DistributeMode { DM_distrib, DM_dedup, DM_join , DM_groupaggregate, DM_index, DM_redistribute, DM_distribmerge };
  26. class HashDistributeMasterBase : public CMasterActivity
  27. {
  28. DistributeMode mode;
  29. mptag_t mptag;
  30. mptag_t mptag2; // for tag 2
  31. bool redistribute;
  32. double skew;
  33. double targetskew;
  34. public:
  35. HashDistributeMasterBase(DistributeMode _mode, CMasterGraphElement *info)
  36. : CMasterActivity(info), mode(_mode)
  37. {
  38. mptag = TAG_NULL;
  39. mptag2 = TAG_NULL;
  40. }
  41. ~HashDistributeMasterBase()
  42. {
  43. if (mptag!=TAG_NULL)
  44. container.queryJob().freeMPTag(mptag);
  45. if (mptag2!=TAG_NULL)
  46. container.queryJob().freeMPTag(mptag2);
  47. }
  48. protected:
  49. virtual void init()
  50. {
  51. mptag = container.queryJob().allocateMPTag();
  52. if (mode==DM_join)
  53. mptag2 = container.queryJob().allocateMPTag();
  54. }
  55. virtual void process()
  56. {
  57. ActPrintLog("HashDistributeActivityMaster::process");
  58. CMasterActivity::process();
  59. ActPrintLog("HashDistributeActivityMaster::process exit");
  60. }
  61. virtual void done()
  62. {
  63. ActPrintLog("HashDistributeActivityMaster::done");
  64. CMasterActivity::done();
  65. ActPrintLog("HashDistributeActivityMaster::done exit");
  66. }
  67. virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
  68. {
  69. dst.append((int)mptag);
  70. if (mode==DM_join)
  71. dst.append((int)mptag2);
  72. }
  73. };
  74. class HashDistributeActivityMaster : public HashDistributeMasterBase
  75. {
  76. public:
  77. HashDistributeActivityMaster(DistributeMode mode, CMasterGraphElement *info) : HashDistributeMasterBase(mode, info) { }
  78. };
  79. class HashJoinDistributeActivityMaster : public HashDistributeActivityMaster
  80. {
  81. Owned<ProgressInfo> lhsProgress, rhsProgress;
  82. public:
  83. HashJoinDistributeActivityMaster(DistributeMode mode, CMasterGraphElement *info) : HashDistributeActivityMaster(mode, info)
  84. {
  85. lhsProgress.setown(new ProgressInfo);
  86. rhsProgress.setown(new ProgressInfo);
  87. }
  88. virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
  89. {
  90. HashDistributeActivityMaster::deserializeStats(node, mb);
  91. rowcount_t lhsProgressCount, rhsProgressCount;
  92. mb.read(lhsProgressCount);
  93. mb.read(rhsProgressCount);
  94. lhsProgress->set(node, lhsProgressCount);
  95. rhsProgress->set(node, rhsProgressCount);
  96. }
  97. virtual void getXGMML(unsigned idx, IPropertyTree *edge)
  98. {
  99. HashDistributeActivityMaster::getXGMML(idx, edge);
  100. assertex(0 == idx);
  101. lhsProgress->processInfo();
  102. rhsProgress->processInfo();
  103. StringBuffer label;
  104. label.append("@progressInput-").append(container.queryInput(0)->queryId());
  105. edge->setPropInt64(label.str(), lhsProgress->queryTotal());
  106. label.clear().append("@progressInput-").append(container.queryInput(1)->queryId());
  107. edge->setPropInt64(label.str(), rhsProgress->queryTotal());
  108. }
  109. };
  110. class IndexDistributeActivityMaster : public HashDistributeMasterBase
  111. {
  112. MemoryBuffer tlkMb;
  113. public:
  114. IndexDistributeActivityMaster(CMasterGraphElement *info) : HashDistributeMasterBase(DM_index, info) { }
  115. virtual void init()
  116. {
  117. HashDistributeMasterBase::init();
  118. // JCSMORE should common up some with indexread
  119. IHThorKeyedDistributeArg *helper = (IHThorKeyedDistributeArg *)queryHelper();
  120. StringBuffer scoped;
  121. queryThorFileManager().addScope(container.queryJob(), helper->getIndexFileName(), scoped);
  122. Owned<IDistributedFile> f = queryThorFileManager().lookup(container.queryJob(), helper->getIndexFileName());
  123. if (!f)
  124. throw MakeActivityException(this, 0, "KeyedDistribute: Failed to find key: %s", scoped.str());
  125. if (0 == f->numParts())
  126. throw MakeActivityException(this, 0, "KeyedDistribute: Can't distribute based on an empty key: %s", scoped.str());
  127. checkFormatCrc(this, f, helper->getFormatCrc(), true);
  128. Owned<IFileDescriptor> fileDesc = f->getFileDescriptor();
  129. Owned<IPartDescriptor> tlkDesc = fileDesc->getPart(fileDesc->numParts()-1);
  130. if (!tlkDesc->queryProperties().hasProp("@kind") || 0 != stricmp("topLevelKey", tlkDesc->queryProperties().queryProp("@kind")))
  131. throw MakeActivityException(this, 0, "Cannot distribute using a non-distributed key: '%s'", scoped.str());
  132. unsigned location;
  133. OwnedIFile iFile;
  134. StringBuffer filePath;
  135. if (!getBestFilePart(this, *tlkDesc, iFile, location, filePath))
  136. throw MakeThorException(TE_FileNotFound, "Top level key part does not exist, for key: %s", f->queryLogicalName());
  137. OwnedIFileIO iFileIO = iFile->open(IFOread);
  138. assertex(iFileIO);
  139. tlkMb.append(iFileIO->size());
  140. ::read(iFileIO, 0, (size32_t)iFileIO->size(), tlkMb);
  141. queryThorFileManager().noteFileRead(container.queryJob(), f);
  142. }
  143. void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
  144. {
  145. HashDistributeMasterBase::serializeSlaveData(dst, slave); // have to chain for standard activity data..
  146. dst.append(tlkMb);
  147. }
  148. };
  149. class ReDistributeActivityMaster : public HashDistributeMasterBase
  150. {
  151. mptag_t statstag;
  152. public:
  153. ReDistributeActivityMaster(CMasterGraphElement *info) : HashDistributeMasterBase(DM_redistribute, info)
  154. {
  155. statstag = container.queryJob().allocateMPTag();
  156. }
  157. ~ReDistributeActivityMaster()
  158. {
  159. container.queryJob().freeMPTag(statstag);
  160. }
  161. void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
  162. {
  163. HashDistributeMasterBase::serializeSlaveData(dst, slave); // have to chain for standard activity data..
  164. dst.append((int)statstag);
  165. }
  166. void process()
  167. {
  168. ActPrintLog("ReDistributeActivityMaster::process");
  169. HashDistributeMasterBase::process();
  170. IHThorHashDistributeArg *helper = (IHThorHashDistributeArg *)queryHelper();
  171. unsigned n = container.queryJob().querySlaves();
  172. MemoryAttr ma;
  173. offset_t *sizes = (offset_t *)ma.allocate(sizeof(offset_t)*n);
  174. unsigned i;
  175. try {
  176. for (i=0;i<n;i++) {
  177. if (abortSoon)
  178. return;
  179. CMessageBuffer mb;
  180. #ifdef _TRACE
  181. ActPrintLog("ReDistribute process, Receiving on tag %d",statstag);
  182. #endif
  183. rank_t sender;
  184. if (!receiveMsg(mb, RANK_ALL, statstag, &sender)||abortSoon)
  185. return;
  186. #ifdef _TRACE
  187. ActPrintLog("ReDistribute process, Received size from %d",sender);
  188. #endif
  189. sender--;
  190. assertex((unsigned)sender<n);
  191. mb.read(sizes[sender]);
  192. }
  193. ActPrintLog("ReDistributeActivityMaster::process sizes got");
  194. for (i=0;i<n;i++) {
  195. CMessageBuffer mb;
  196. mb.append(n*sizeof(offset_t),sizes);
  197. #ifdef _TRACE
  198. ActPrintLog("ReDistribute process, Replying to node %d tag %d",i+1,statstag);
  199. #endif
  200. if (!container.queryJob().queryJobComm().send(mb, (rank_t)i+1, statstag))
  201. return;
  202. }
  203. // check if any max skew broken
  204. double maxskew = helper->getTargetSkew();
  205. if (maxskew>helper->getSkew()) {
  206. offset_t tot = 0;
  207. for (i=0;i<n;i++)
  208. tot += sizes[i];
  209. offset_t avg = tot/n;
  210. for (i=0;i<n;i++) {
  211. double r = ((double)sizes[i]-(double)avg)/(double)avg;
  212. if ((r>=maxskew)||(-r>maxskew)) {
  213. throw MakeActivityException(this, TE_DistributeFailedSkewExceeded, "DISTRIBUTE maximum skew exceeded (node %d has %"I64F"d, average is %"I64F"d)",i+1,sizes[i],avg);
  214. }
  215. }
  216. }
  217. }
  218. catch (IException *e) {
  219. ActPrintLog(e,"ReDistribute");
  220. throw;
  221. }
  222. ActPrintLog("ReDistributeActivityMaster::process exit");
  223. }
  224. void abort()
  225. {
  226. HashDistributeMasterBase::abort();
  227. cancelReceiveMsg(RANK_ALL, statstag);
  228. }
  229. };
  230. CActivityBase *createHashDistributeActivityMaster(CMasterGraphElement *container)
  231. {
  232. if (container&&(((IHThorHashDistributeArg *)container->queryHelper())->queryHash()==NULL))
  233. return new ReDistributeActivityMaster(container);
  234. else
  235. return new HashDistributeActivityMaster(DM_distrib, container);
  236. }
  237. CActivityBase *createDistributeMergeActivityMaster(CMasterGraphElement *container)
  238. {
  239. return new HashDistributeActivityMaster(DM_distribmerge, container);
  240. }
  241. CActivityBase *createHashDedupMergeActivityMaster(CMasterGraphElement *container)
  242. {
  243. if (container->queryLocalOrGrouped())
  244. return new CMasterActivity(container);
  245. else
  246. return new HashDistributeActivityMaster(DM_dedup, container);
  247. }
  248. CActivityBase *createHashJoinActivityMaster(CMasterGraphElement *container)
  249. {
  250. return new HashJoinDistributeActivityMaster(DM_join, container);
  251. }
  252. CActivityBase *createHashAggregateActivityMaster(CMasterGraphElement *container)
  253. {
  254. if (container->queryLocalOrGrouped())
  255. return new CMasterActivity(container);
  256. else
  257. return new HashDistributeActivityMaster(DM_groupaggregate, container);
  258. }
  259. CActivityBase *createKeyedDistributeActivityMaster(CMasterGraphElement *container)
  260. {
  261. return new IndexDistributeActivityMaster(container);
  262. }