thkeyedjoin.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <array>
  14. #include <vector>
  15. #include <algorithm>
  16. #include "dasess.hpp"
  17. #include "dadfs.hpp"
  18. #include "thexception.hpp"
  19. #include "../fetch/thfetchcommon.hpp"
  20. #include "../hashdistrib/thhashdistrib.ipp"
  21. #include "thkeyedjoin.ipp"
  22. #include "jhtree.hpp"
  23. static const std::array<StatisticKind, 8> progressKinds{ StNumIndexSeeks, StNumIndexScans, StNumIndexAccepted, StNumPostFiltered, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected };
  24. class CKeyedJoinMaster : public CMasterActivity
  25. {
  26. IHThorKeyedJoinArg *helper = nullptr;
  27. Owned<IFileDescriptor> dataFileDesc, indexFileDesc;
  28. MemoryBuffer initMb;
  29. unsigned numTags = 0;
  30. std::vector<mptag_t> tags;
  31. ProgressInfoArray progressInfoArr;
  32. bool local = false;
  33. bool remoteKeyedLookup = false;
  34. bool remoteKeyedFetch = false;
  35. unsigned totalIndexParts = 0;
  36. // CMap contains mappings and lists of parts for each slave
  37. class CMap
  38. {
  39. static const unsigned partMask = 0x00ffffff;
  40. public:
  41. std::vector<unsigned> allParts;
  42. std::vector<std::vector<unsigned>> slavePartMap; // vector of slave parts (IPartDescriptor's slavePartMap[<slave>] serialized to each slave)
  43. std::vector<unsigned> partToSlave; // vector mapping part index to slave (sent to all slaves)
  44. void setup(unsigned slaves, unsigned parts)
  45. {
  46. clear();
  47. slavePartMap.resize(slaves);
  48. partToSlave.resize(parts);
  49. }
  50. void clear()
  51. {
  52. allParts.clear();
  53. slavePartMap.clear();
  54. partToSlave.clear();
  55. }
  56. unsigned count() const { return partToSlave.size(); }
  57. void serializePartMap(MemoryBuffer &mb) const
  58. {
  59. mb.append(partToSlave.size() * sizeof(unsigned), &partToSlave[0]);
  60. }
  61. unsigned querySlave(unsigned part) const { return partToSlave[part]; }
  62. std::vector<unsigned> &querySlaveParts(unsigned slave) { return slavePartMap[slave]; }
  63. std::vector<unsigned> &queryAllParts() { return allParts; }
  64. /* maps input file into lists of parts for slaves and a mapping for slaves to find other parts
  65. * If 'allLocal' option is true, it will also map replicate copies and use them directly if local to slave.
  66. */
  67. void map(CKeyedJoinMaster &activity, IDistributedFile *file, bool isIndexWithTlk, bool allLocal)
  68. {
  69. Owned<IFileDescriptor> fileDesc = file->getFileDescriptor();
  70. assertex(fileDesc);
  71. IDistributedSuperFile *super = file->querySuperFile();
  72. ISuperFileDescriptor *superFileDesc = fileDesc->querySuperFileDescriptor();
  73. unsigned totalParts = file->numParts();
  74. if (isIndexWithTlk)
  75. totalParts -= super ? super->numSubFiles(true) : 1;
  76. IGroup &dfsGroup = queryDfsGroup();
  77. setup(dfsGroup.ordinality(), totalParts);
  78. unsigned numSuperIndexSubs = 0;
  79. unsigned superWidth = 0;
  80. if (super)
  81. {
  82. if (super->numSubFiles(true))
  83. {
  84. if (!super->isInterleaved())
  85. numSuperIndexSubs = super->numSubFiles(true);
  86. IDistributedFile &sub = super->querySubFile(0, true);
  87. superWidth = sub.numParts();
  88. if (isIndexWithTlk)
  89. --superWidth;
  90. }
  91. }
  92. unsigned groupSize = dfsGroup.ordinality();
  93. std::vector<unsigned> partsByPartIdx;
  94. Owned<IBitSet> partsOnSlaves = createBitSet();
  95. unsigned numParts = fileDesc->numParts();
  96. unsigned nextGroupStartPos = 0;
  97. for (unsigned p=0; p<numParts; p++)
  98. {
  99. IPartDescriptor *part = fileDesc->queryPart(p);
  100. const char *kind = isIndexWithTlk ? part->queryProperties().queryProp("@kind") : nullptr;
  101. if (!kind || !strsame("topLevelKey", kind))
  102. {
  103. unsigned partIdx = part->queryPartIndex();
  104. unsigned subfile = NotFound;
  105. unsigned subPartIdx = partIdx;
  106. if (superFileDesc)
  107. {
  108. superFileDesc->mapSubPart(partIdx, subfile, subPartIdx);
  109. partIdx = superWidth*subfile+subPartIdx;
  110. }
  111. if (activity.local)
  112. {
  113. if (activity.queryContainer().queryLocalData())
  114. {
  115. if (subPartIdx < dfsGroup.ordinality())
  116. {
  117. std::vector<unsigned> &slaveParts = querySlaveParts(subPartIdx);
  118. slaveParts.push_back(p);
  119. }
  120. }
  121. else
  122. {
  123. for (auto &slaveParts : slavePartMap)
  124. slaveParts.push_back(p);
  125. }
  126. partsByPartIdx.push_back(partIdx);
  127. }
  128. else
  129. {
  130. /* see if any of the part copies are local to any of the cluster nodes
  131. * Add them to local parts list if found.
  132. */
  133. unsigned mappedPos = NotFound;
  134. for (unsigned c=0; c<part->numCopies(); c++)
  135. {
  136. INode *partNode = part->queryNode(c);
  137. unsigned partCopy = p | (c << 24);
  138. unsigned start=nextGroupStartPos;
  139. unsigned gn=start;
  140. do
  141. {
  142. INode &groupNode = dfsGroup.queryNode(gn);
  143. if ((partNode->equals(&groupNode)))
  144. {
  145. /* NB: If there's >1 slave per node (e.g. slavesPerNode>1) then there are multiple matching node's in the dfsGroup
  146. * Which means a copy of a part may already be assigned to a cluster slave map. This check avoid handling it again if it has.
  147. */
  148. if (!partsOnSlaves->testSet(groupSize*p+gn))
  149. {
  150. std::vector<unsigned> &slaveParts = querySlaveParts(gn);
  151. if (NotFound == mappedPos)
  152. {
  153. /* NB: to avoid all parts being mapped to same remote slave process (significant if slavesPerNode>1)
  154. * or (conditionally) all accessible locals being added to all slaves (which may have detrimental effect on key node caching)
  155. * inc. group start pos for beginning of next search.
  156. */
  157. slaveParts.push_back(partCopy);
  158. if (activity.queryContainer().queryJob().queryChannelsPerSlave()>1)
  159. mappedPos = gn % queryNodeClusterWidth();
  160. else
  161. mappedPos = gn;
  162. nextGroupStartPos = gn+1;
  163. if (nextGroupStartPos == groupSize)
  164. nextGroupStartPos = 0;
  165. }
  166. else if (allLocal) // all slaves get all locally accessible parts
  167. slaveParts.push_back(partCopy);
  168. }
  169. }
  170. gn++;
  171. if (gn == groupSize)
  172. gn = 0;
  173. }
  174. while (gn != start);
  175. }
  176. if (NotFound == mappedPos)
  177. {
  178. // part not within the cluster, add it to all slave maps, meaning these part meta will be serialized to all slaves so they handle the lookups directly.
  179. for (auto &slaveParts : slavePartMap)
  180. slaveParts.push_back(p);
  181. }
  182. if (superFileDesc)
  183. partIdx = superWidth*subfile+subPartIdx;
  184. partsByPartIdx.push_back(partIdx);
  185. assertex(partIdx < totalParts);
  186. partToSlave[partIdx] = mappedPos;
  187. }
  188. }
  189. }
  190. if (!activity.local)
  191. {
  192. if (0 == numSuperIndexSubs)
  193. {
  194. for (unsigned p=0; p<totalParts; p++)
  195. allParts.push_back(p);
  196. }
  197. else // non-interleaved superindex
  198. {
  199. unsigned p=0;
  200. for (unsigned i=0; i<numSuperIndexSubs; i++)
  201. {
  202. for (unsigned kp=0; kp<superWidth; kp++)
  203. allParts.push_back(p++);
  204. if (isIndexWithTlk)
  205. p++; // TLK's serialized separately.
  206. }
  207. }
  208. // ensure sorted by partIdx, so that consistent order for partHandlers/lookup
  209. std::sort(allParts.begin(), allParts.end(), [partsByPartIdx](unsigned a, unsigned b) { return partsByPartIdx[a] < partsByPartIdx[b]; });
  210. }
  211. // ensure sorted by partIdx, so that consistent order for partHandlers/lookup
  212. for (auto &slaveParts : slavePartMap)
  213. std::sort(slaveParts.begin(), slaveParts.end(), [partsByPartIdx](unsigned a, unsigned b) { return partsByPartIdx[a & partMask] < partsByPartIdx[b & partMask]; });
  214. }
  215. };
  216. CMap indexMap, dataMap;
  217. public:
  218. CKeyedJoinMaster(CMasterGraphElement *info) : CMasterActivity(info)
  219. {
  220. helper = (IHThorKeyedJoinArg *) queryHelper();
  221. unsigned numStats = helper->diskAccessRequired() ? 8 : 5; // see progressKinds array
  222. for (unsigned s=0; s<numStats; s++)
  223. progressInfoArr.append(*new ProgressInfo(queryJob()));
  224. reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename);
  225. // NB: force options are there to force all parts to be remote, even if local to slave (handled on slave)
  226. remoteKeyedLookup = getOptBool(THOROPT_REMOTE_KEYED_LOOKUP, true);
  227. if (getOptBool(THOROPT_FORCE_REMOTE_KEYED_LOOKUP))
  228. remoteKeyedLookup = true;
  229. remoteKeyedFetch = getOptBool(THOROPT_REMOTE_KEYED_FETCH, true);
  230. if (getOptBool(THOROPT_FORCE_REMOTE_KEYED_FETCH))
  231. remoteKeyedFetch = true;
  232. if (helper->diskAccessRequired())
  233. numTags += 2;
  234. for (unsigned t=0; t<numTags; t++)
  235. {
  236. mptag_t tag = container.queryJob().allocateMPTag();
  237. tags.push_back(tag);
  238. }
  239. }
  240. ~CKeyedJoinMaster()
  241. {
  242. for (const mptag_t &tag : tags)
  243. container.queryJob().freeMPTag(tag);
  244. }
  245. virtual void init()
  246. {
  247. CMasterActivity::init();
  248. OwnedRoxieString indexFileName(helper->getIndexFileName());
  249. initMb.clear();
  250. initMb.append(indexFileName.get());
  251. bool keyHasTlk = false;
  252. totalIndexParts = 0;
  253. Owned<IDistributedFile> dataFile;
  254. Owned<IDistributedFile> indexFile = queryThorFileManager().lookup(container.queryJob(), indexFileName, false, 0 != (helper->getJoinFlags() & JFindexoptional), true, container.activityIsCodeSigned());
  255. if (indexFile)
  256. {
  257. if (!isFileKey(indexFile))
  258. throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read flat file as an index: %s", indexFileName.get());
  259. IDistributedSuperFile *superIndex = indexFile->querySuperFile();
  260. if (helper->diskAccessRequired())
  261. {
  262. OwnedRoxieString fetchFilename(helper->getFileName());
  263. if (fetchFilename)
  264. {
  265. dataFile.setown(queryThorFileManager().lookup(container.queryJob(), fetchFilename, false, 0 != (helper->getFetchFlags() & FFdatafileoptional), true, container.activityIsCodeSigned()));
  266. if (dataFile)
  267. {
  268. if (isFileKey(dataFile))
  269. throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read index as a flat file: %s", fetchFilename.get());
  270. if (superIndex)
  271. throw MakeActivityException(this, 0, "Superkeys and full keyed joins are not supported");
  272. dataFileDesc.setown(getConfiguredFileDescriptor(*dataFile));
  273. void *ekey;
  274. size32_t ekeylen;
  275. helper->getFileEncryptKey(ekeylen,ekey);
  276. bool encrypted = dataFileDesc->queryProperties().getPropBool("@encrypted");
  277. if (0 != ekeylen)
  278. {
  279. memset(ekey,0,ekeylen);
  280. free(ekey);
  281. if (!encrypted)
  282. {
  283. Owned<IException> e = MakeActivityWarning(&container, TE_EncryptionMismatch, "Ignoring encryption key provided as file '%s' was not published as encrypted", dataFile->queryLogicalName());
  284. queryJobChannel().fireException(e);
  285. }
  286. }
  287. else if (encrypted)
  288. throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", dataFile->queryLogicalName());
  289. /* If fetch file is local to cluster, fetches are sent to the slave the parts are local to.
  290. * If fetch file is off cluster, fetches are performed by requesting node directly on fetch part, therefore each nodes
  291. * needs all part descriptors.
  292. */
  293. if (remoteKeyedFetch)
  294. {
  295. RemoteFilename rfn;
  296. dataFileDesc->queryPart(0)->getFilename(0, rfn);
  297. if (!rfn.queryIP().ipequals(container.queryJob().querySlaveGroup().queryNode(0).endpoint()))
  298. remoteKeyedFetch = false;
  299. }
  300. dataMap.map(*this, dataFile, false, getOptBool("allLocalFetchParts"));
  301. }
  302. }
  303. }
  304. if (!helper->diskAccessRequired() || dataFileDesc)
  305. {
  306. bool localKey = indexFile->queryAttributes().getPropBool("@local");
  307. bool partitionKey = indexFile->queryAttributes().hasProp("@partitionFieldMask");
  308. local = (localKey && !partitionKey) || container.queryLocalData();
  309. if (local)
  310. {
  311. remoteKeyedLookup = false;
  312. remoteKeyedFetch = false;
  313. }
  314. //MORE: Change to getIndexProjectedFormatCrc once we support projected rows for indexes?
  315. checkFormatCrc(this, indexFile, helper->getIndexFormatCrc(), helper->queryIndexRecordSize(), helper->getProjectedIndexFormatCrc(), helper->queryProjectedIndexRecordSize(), true);
  316. indexFileDesc.setown(indexFile->getFileDescriptor());
  317. unsigned superIndexWidth = 0;
  318. unsigned numSuperIndexSubs = 0;
  319. if (superIndex)
  320. {
  321. numSuperIndexSubs = superIndex->numSubFiles(true);
  322. bool first=true;
  323. // consistency check
  324. Owned<IDistributedFileIterator> iter = superIndex->getSubFileIterator(true);
  325. ForEach(*iter)
  326. {
  327. IDistributedFile &f = iter->query();
  328. unsigned np = f.numParts()-1;
  329. IDistributedFilePart &part = f.queryPart(np);
  330. const char *kind = part.queryAttributes().queryProp("@kind");
  331. bool hasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind); // if last part not tlk, then deemed local (might be singlePartKey)
  332. if (first)
  333. {
  334. first = false;
  335. keyHasTlk = hasTlk;
  336. superIndexWidth = f.numParts();
  337. if (keyHasTlk)
  338. --superIndexWidth;
  339. }
  340. else
  341. {
  342. if (hasTlk != keyHasTlk)
  343. throw MakeActivityException(this, 0, "Local/Single part keys cannot be mixed with distributed(tlk) keys in keyedjoin");
  344. if (keyHasTlk && superIndexWidth != f.numParts()-1)
  345. throw MakeActivityException(this, 0, "Super sub keys of different width cannot be mixed with distributed(tlk) keys in keyedjoin");
  346. if (localKey && superIndexWidth != queryClusterWidth())
  347. throw MakeActivityException(this, 0, "Super keys of local index must be same width as target cluster");
  348. }
  349. }
  350. if (keyHasTlk)
  351. totalIndexParts = superIndexWidth * numSuperIndexSubs;
  352. else
  353. totalIndexParts = superIndex->numParts();
  354. }
  355. else
  356. {
  357. totalIndexParts = indexFile->numParts();
  358. if (totalIndexParts)
  359. {
  360. const char *kind = indexFile->queryPart(indexFile->numParts()-1).queryAttributes().queryProp("@kind");
  361. keyHasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind);
  362. if (keyHasTlk)
  363. --totalIndexParts;
  364. }
  365. }
  366. // serialize common (to all slaves) info
  367. initMb.append(totalIndexParts);
  368. if (totalIndexParts)
  369. {
  370. indexMap.map(*this, indexFile, keyHasTlk, getOptBool("allLocalIndexParts"));
  371. initMb.append(numTags);
  372. for (auto &tag: tags)
  373. initMb.append(tag);
  374. initMb.append(remoteKeyedLookup);
  375. initMb.append(remoteKeyedFetch);
  376. initMb.append(superIndexWidth); // 0 if not superIndex
  377. if (localKey && !partitionKey)
  378. keyHasTlk = false; // JCSMORE, not used at least for now
  379. initMb.append(keyHasTlk);
  380. if (keyHasTlk)
  381. {
  382. if (numSuperIndexSubs)
  383. initMb.append(numSuperIndexSubs);
  384. else
  385. initMb.append((unsigned)1);
  386. Owned<IDistributedFileIterator> iter;
  387. IDistributedFile *f;
  388. if (superIndex)
  389. {
  390. iter.setown(superIndex->getSubFileIterator(true));
  391. f = &iter->query();
  392. }
  393. else
  394. f = indexFile;
  395. for (;;)
  396. {
  397. unsigned location;
  398. OwnedIFile iFile;
  399. StringBuffer filePath;
  400. Owned<IFileDescriptor> fileDesc = f->getFileDescriptor();
  401. Owned<IPartDescriptor> tlkDesc = fileDesc->getPart(fileDesc->numParts()-1);
  402. if (!getBestFilePart(this, *tlkDesc, iFile, location, filePath, this))
  403. throw MakeThorException(TE_FileNotFound, "Top level key part does not exist, for key: %s", f->queryLogicalName());
  404. OwnedIFileIO iFileIO = iFile->open(IFOread);
  405. assertex(iFileIO);
  406. size32_t tlkSz = (size32_t)iFileIO->size();
  407. initMb.append(tlkSz);
  408. ::read(iFileIO, 0, tlkSz, initMb);
  409. if (!iter || !iter->next())
  410. break;
  411. f = &iter->query();
  412. }
  413. }
  414. }
  415. else
  416. {
  417. indexFile.clear();
  418. indexFileDesc.clear();
  419. dataFile.clear();
  420. dataFileDesc.clear();
  421. }
  422. }
  423. }
  424. else
  425. initMb.append(totalIndexParts); // 0
  426. if (indexFile)
  427. {
  428. addReadFile(indexFile);
  429. if (dataFile)
  430. addReadFile(dataFile);
  431. }
  432. }
  433. virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
  434. {
  435. dst.append(initMb);
  436. if (totalIndexParts)
  437. {
  438. std::vector<unsigned> &allParts = local ? indexMap.querySlaveParts(slave) : indexMap.queryAllParts();
  439. unsigned numParts = allParts.size();
  440. dst.append(numParts);
  441. if (numParts)
  442. {
  443. indexFileDesc->serializeParts(dst, &allParts[0], numParts);
  444. std::vector<unsigned> &parts = remoteKeyedLookup ? indexMap.querySlaveParts(slave) : allParts;
  445. unsigned numSlaveParts = parts.size();
  446. dst.append(numSlaveParts);
  447. if (numSlaveParts)
  448. dst.append(sizeof(unsigned)*numSlaveParts, &parts[0]);
  449. }
  450. if (remoteKeyedLookup)
  451. indexMap.serializePartMap(dst);
  452. unsigned totalDataParts = dataMap.count();
  453. dst.append(totalDataParts);
  454. if (totalDataParts)
  455. {
  456. std::vector<unsigned> &allParts = dataMap.queryAllParts();
  457. unsigned numParts = allParts.size();
  458. dst.append(numParts);
  459. if (numParts)
  460. {
  461. dataFileDesc->serializeParts(dst, &allParts[0], numParts);
  462. std::vector<unsigned> &parts = remoteKeyedFetch ? dataMap.querySlaveParts(slave) : allParts;
  463. unsigned numSlaveParts = parts.size();
  464. dst.append(numSlaveParts);
  465. if (numSlaveParts)
  466. dst.append(sizeof(unsigned)*numSlaveParts, &parts[0]);
  467. }
  468. if (remoteKeyedFetch)
  469. dataMap.serializePartMap(dst);
  470. }
  471. }
  472. }
  473. virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
  474. {
  475. CMasterActivity::deserializeStats(node, mb);
  476. ForEachItemIn(p, progressInfoArr)
  477. {
  478. unsigned __int64 st;
  479. mb.read(st);
  480. progressInfoArr.item(p).set(node, st);
  481. }
  482. }
  483. virtual void getEdgeStats(IStatisticGatherer & stats, unsigned idx)
  484. {
  485. //This should be an activity stats
  486. CMasterActivity::getEdgeStats(stats, idx);
  487. assertex(0 == idx);
  488. ForEachItemIn(p, progressInfoArr)
  489. {
  490. ProgressInfo &progress = progressInfoArr.item(p);
  491. progress.processInfo();
  492. stats.addStatistic(progressKinds[p], progress.queryTotal());
  493. }
  494. }
  495. };
  496. CActivityBase *createKeyedJoinActivityMaster(CMasterGraphElement *info)
  497. {
  498. if (info->getOptBool("legacykj"))
  499. return LegacyKJ::createKeyedJoinActivityMaster(info);
  500. return new CKeyedJoinMaster(info);
  501. }