slavmain.cpp 96 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <type_traits>
  15. #include <unordered_map>
  16. #include "jlib.hpp"
  17. #include "jexcept.hpp"
  18. #include "jthread.hpp"
  19. #include "jprop.hpp"
  20. #include "jiter.ipp"
  21. #include "jlzw.hpp"
  22. #include "jflz.hpp"
  23. #include "jhtree.hpp"
  24. #include "mpcomm.hpp"
  25. #include "portlist.h"
  26. #include "rmtfile.hpp"
  27. #include "daclient.hpp"
  28. #include "dafdesc.hpp"
  29. #include "slwatchdog.hpp"
  30. #include "thbuf.hpp"
  31. #include "thmem.hpp"
  32. #include "thexception.hpp"
  33. #include "backup.hpp"
  34. #include "slave.hpp"
  35. #include "thormisc.hpp"
  36. #include "thorport.hpp"
  37. #include "thgraphslave.hpp"
  38. #include "slave.ipp"
  39. #include "thcompressutil.hpp"
  40. #include "environment.hpp"
  41. #include "eclhelper_dyn.hpp"
  42. #include "rtlcommon.hpp"
  43. #include "../activities/keyedjoin/thkeyedjoincommon.hpp"
  44. //---------------------------------------------------------------------------
  45. //---------------------------------------------------------------------------
  46. #define ISDALICLIENT // JCSMORE plugins *can* access dali - though I think we should probably prohibit somehow.
  47. void enableThorSlaveAsDaliClient()
  48. {
  49. #ifdef ISDALICLIENT
  50. PROGLOG("Slave activated as a Dali client");
  51. const char *daliServers = globals->queryProp("@daliServers");
  52. if (!daliServers)
  53. throw MakeStringException(0, "No Dali server list specified");
  54. Owned<IGroup> serverGroup = createIGroup(daliServers, DALI_SERVER_PORT);
  55. unsigned retry = 0;
  56. for (;;)
  57. {
  58. try
  59. {
  60. LOG(MCdebugProgress, thorJob, "calling initClientProcess");
  61. initClientProcess(serverGroup,DCR_ThorSlave, getFixedPort(TPORT_mp));
  62. break;
  63. }
  64. catch (IJSOCK_Exception *e)
  65. {
  66. if ((e->errorCode()!=JSOCKERR_port_in_use))
  67. throw;
  68. FLLOG(MCexception(e), thorJob, e,"InitClientProcess");
  69. if (retry++>10)
  70. throw;
  71. e->Release();
  72. LOG(MCdebugProgress, thorJob, "Retrying");
  73. Sleep(retry*2000);
  74. }
  75. }
  76. #endif
  77. }
  78. void disableThorSlaveAsDaliClient()
  79. {
  80. #ifdef ISDALICLIENT
  81. closeEnvironment();
  82. closedownClientProcess(); // dali client closedown
  83. PROGLOG("Slave deactivated as a Dali client");
  84. #endif
  85. }
  86. class CKJService : public CSimpleInterfaceOf<IKJService>, implements IThreaded, implements IExceptionHandler
  87. {
  88. const unsigned defaultMaxCachedKJManagers = 1000;
  89. const unsigned defaultMaxCachedFetchContexts = 1000;
  90. const unsigned defaultKeyLookupMaxProcessThreads = 16;
  91. unsigned uniqueId = 0;
  92. CThreadedPersistent threaded;
  93. mptag_t keyLookupMpTag = TAG_NULL;
  94. bool aborted = false;
  95. unsigned numKMCached = 0;
  96. unsigned numFCCached = 0;
  97. CJobBase *currentJob = nullptr;
  98. unsigned maxCachedKJManagers = defaultMaxCachedKJManagers;
  99. unsigned maxCachedFetchContexts = defaultMaxCachedFetchContexts;
  100. unsigned keyLookupMaxProcessThreads = defaultKeyLookupMaxProcessThreads;
  101. class CLookupKey
  102. {
  103. unsigned hashv = 0;
  104. public:
  105. activity_id id = 0;
  106. StringAttr fname;
  107. unsigned crc = 0;
  108. CLookupKey(MemoryBuffer &mb)
  109. {
  110. mb.read(id);
  111. mb.read(fname);
  112. mb.read(crc);
  113. unsigned h = hashvalue(id, crc);
  114. hashv = hashc((const unsigned char *)&id, sizeof(unsigned), h);
  115. }
  116. unsigned queryHash() const { return hashv; }
  117. const char *queryFilename() const { return fname; }
  118. bool operator==(CLookupKey const &other) const
  119. {
  120. return (id == other.id) && (crc == other.crc) && strsame(fname, other.fname);
  121. }
  122. const char *getTracing(StringBuffer &tracing) const
  123. {
  124. return tracing.append(fname);
  125. }
  126. };
  127. struct CLookupKeyHasher
  128. {
  129. std::size_t operator() (const CLookupKey &key) const
  130. {
  131. return key.queryHash();
  132. }
  133. };
  134. class CFetchKey
  135. {
  136. unsigned hashv = 0;
  137. public:
  138. activity_id id = 0;
  139. unsigned partNo = 0;
  140. CFetchKey(MemoryBuffer &mb)
  141. {
  142. mb.read(id);
  143. mb.read(partNo);
  144. hashv = hashvalue(id, partNo);
  145. }
  146. unsigned queryHash() const { return hashv; }
  147. bool operator==(CFetchKey const &other) const { return id==other.id && partNo==other.partNo; }
  148. const char *getTracing(StringBuffer &tracing) const
  149. {
  150. return tracing.appendf("actId=%u, partNo=%u", id, partNo);
  151. }
  152. };
  153. struct CFetchKeyHasher
  154. {
  155. std::size_t operator() (const CFetchKey &key) const
  156. {
  157. return key.queryHash();
  158. }
  159. };
  160. class CActivityContext : public CInterface
  161. {
  162. CKJService &service;
  163. activity_id id;
  164. Owned<IHThorKeyedJoinArg> helper;
  165. Owned<IOutputRowDeserializer> lookupInputDeserializer;
  166. Owned<IOutputRowSerializer> joinFieldsSerializer;
  167. Owned<IEngineRowAllocator> lookupInputAllocator, joinFieldsAllocator;
  168. Owned<IEngineRowAllocator> fetchInputAllocator;
  169. Owned<IEngineRowAllocator> fetchOutputAllocator;
  170. Owned<IOutputRowDeserializer> fetchInputDeserializer;
  171. Owned<IOutputRowSerializer> fetchOutputSerializer;
  172. Owned<IEngineRowAllocator> fetchDiskAllocator;
  173. Owned<IOutputRowDeserializer> fetchDiskDeserializer;
  174. ICodeContext *codeCtx;
  175. CriticalSection crit;
  176. StringArray fetchFilenames;
  177. IPointerArrayOf<IFileIO> openFetchFiles;
  178. size32_t fetchInMinSz = 0;
  179. bool encrypted = false;
  180. bool compressed = false;
  181. bool messageCompression = false;
  182. public:
  183. CActivityContext(CKJService &_service, activity_id _id, IHThorKeyedJoinArg *_helper, ICodeContext *_codeCtx)
  184. : service(_service), id(_id), helper(_helper), codeCtx(_codeCtx)
  185. {
  186. Owned<IOutputMetaData> lookupInputMeta = new CPrefixedOutputMeta(sizeof(KeyLookupHeader), helper->queryIndexReadInputRecordSize());
  187. lookupInputDeserializer.setown(lookupInputMeta->createDiskDeserializer(codeCtx, id));
  188. lookupInputAllocator.setown(codeCtx->getRowAllocatorEx(lookupInputMeta,id, (roxiemem::RoxieHeapFlags)roxiemem::RHFpacked|roxiemem::RHFunique));
  189. joinFieldsAllocator.setown(codeCtx->getRowAllocatorEx(helper->queryJoinFieldsRecordSize(), id, roxiemem::RHFnone));
  190. joinFieldsSerializer.setown(helper->queryJoinFieldsRecordSize()->createDiskSerializer(codeCtx, id));
  191. if (helper->diskAccessRequired())
  192. {
  193. Owned<IOutputMetaData> fetchInputMeta = new CPrefixedOutputMeta(sizeof(FetchRequestHeader), helper->queryFetchInputRecordSize());
  194. fetchInputAllocator.setown(codeCtx->getRowAllocatorEx(fetchInputMeta, id, (roxiemem::RoxieHeapFlags)roxiemem::RHFpacked|roxiemem::RHFunique));
  195. fetchInputDeserializer.setown(fetchInputMeta->createDiskDeserializer(codeCtx, id));
  196. Owned<IOutputMetaData> fetchOutputMeta = createOutputMetaDataWithChildRow(joinFieldsAllocator, sizeof(FetchReplyHeader));
  197. fetchOutputAllocator.setown(codeCtx->getRowAllocatorEx(fetchOutputMeta, id, (roxiemem::RoxieHeapFlags)roxiemem::RHFpacked|roxiemem::RHFunique));
  198. fetchOutputSerializer.setown(fetchOutputMeta->createDiskSerializer(codeCtx, id));
  199. fetchDiskAllocator.setown(codeCtx->getRowAllocatorEx(helper->queryDiskRecordSize(), id, (roxiemem::RoxieHeapFlags)roxiemem::RHFpacked|roxiemem::RHFunique));
  200. fetchDiskDeserializer.setown(helper->queryDiskRecordSize()->createDiskDeserializer(codeCtx, id));
  201. fetchInMinSz = helper->queryFetchInputRecordSize()->getMinRecordSize();
  202. }
  203. }
  204. ~CActivityContext()
  205. {
  206. // should already be removed by last Key or Fetch context
  207. service.removeActivityContext(this);
  208. }
  209. activity_id queryId() const { return id; }
  210. IEngineRowAllocator *queryLookupInputAllocator() const { return lookupInputAllocator; }
  211. IOutputRowDeserializer *queryLookupInputDeserializer() const { return lookupInputDeserializer; }
  212. IEngineRowAllocator *queryJoinFieldsAllocator() const { return joinFieldsAllocator; }
  213. IOutputRowSerializer *queryJoinFieldsSerializer() const { return joinFieldsSerializer; }
  214. IEngineRowAllocator *queryFetchInputAllocator() const { return fetchInputAllocator; }
  215. IOutputRowDeserializer *queryFetchInputDeserializer() const { return fetchInputDeserializer; }
  216. IEngineRowAllocator *queryFetchOutputAllocator() const { return fetchOutputAllocator; }
  217. IOutputRowSerializer *queryFetchOutputSerializer() const { return fetchOutputSerializer; }
  218. IEngineRowAllocator *queryFetchDiskAllocator() const { return fetchDiskAllocator; }
  219. IOutputRowDeserializer *queryFetchDiskDeserializer() const { return fetchDiskDeserializer; }
  220. inline IHThorKeyedJoinArg *queryHelper() const { return helper; }
  221. void addFetchFile(byte _flags, unsigned _partNo, const char *_fname)
  222. {
  223. CriticalBlock b(crit);
  224. if (_partNo<fetchFilenames.ordinality() && !isEmptyString(fetchFilenames.item(_partNo)))
  225. return;
  226. while (_partNo>=fetchFilenames.ordinality())
  227. fetchFilenames.append("");
  228. fetchFilenames.replace(_fname, _partNo);
  229. compressed = _flags & kjf_compressed;
  230. encrypted = _flags & kjf_encrypted;
  231. }
  232. void setMessageCompression(bool _messageCompression) { messageCompression = _messageCompression; }
  233. inline bool useMessageCompression() const { return messageCompression; }
  234. IFileIO *getFetchFileIO(unsigned part)
  235. {
  236. CriticalBlock b(crit);
  237. if (part>=openFetchFiles.ordinality())
  238. {
  239. do
  240. {
  241. openFetchFiles.append(nullptr);
  242. }
  243. while (part>=openFetchFiles.ordinality());
  244. }
  245. else
  246. {
  247. IFileIO *fileIO = openFetchFiles.item(part);
  248. if (fileIO)
  249. return LINK(fileIO);
  250. }
  251. const char *fname = fetchFilenames.item(part);
  252. Owned<IFile> iFile = createIFile(fname);
  253. unsigned encryptedKeyLen;
  254. void *encryptedKey;
  255. helper->getFileEncryptKey(encryptedKeyLen,encryptedKey);
  256. Owned<IExpander> eexp;
  257. if (0 != encryptedKeyLen)
  258. {
  259. if (encrypted)
  260. eexp.setown(createAESExpander256(encryptedKeyLen, encryptedKey));
  261. memset(encryptedKey, 0, encryptedKeyLen);
  262. free(encryptedKey);
  263. }
  264. IFileIO *fileIO;
  265. if (nullptr != eexp.get())
  266. fileIO = createCompressedFileReader(iFile, eexp);
  267. else if (compressed)
  268. fileIO = createCompressedFileReader(iFile);
  269. else
  270. fileIO = iFile->open(IFOread);
  271. if (!fileIO)
  272. throw MakeStringException(0, "Failed to open fetch file part %u: %s", part, fname);
  273. openFetchFiles.replace(fileIO, part);
  274. return LINK(fileIO);
  275. }
  276. size32_t queryFetchInMinSize() const { return fetchInMinSz; }
  277. };
  278. class CContext : public CInterface
  279. {
  280. protected:
  281. CKJService &service;
  282. Linked<CActivityContext> activityCtx;
  283. RecordTranslationMode translationMode = RecordTranslationMode::None;
  284. Owned<IOutputMetaData> publishedFormat, projectedFormat, expectedFormat;
  285. unsigned publishedFormatCrc = 0, expectedFormatCrc = 0;
  286. Owned<const IDynamicTransform> translator;
  287. Owned<ISourceRowPrefetcher> prefetcher;
  288. public:
  289. CContext(CKJService &_service, CActivityContext *_activityCtx) : service(_service), activityCtx(_activityCtx)
  290. {
  291. }
  292. virtual void beforeDispose() override
  293. {
  294. service.freeActivityContext(activityCtx.getClear());
  295. }
  296. CActivityContext *queryActivityCtx() const { return activityCtx; }
  297. void setTranslation(RecordTranslationMode _translationMode, IOutputMetaData *_publishedFormat, unsigned _publishedFormatCrc, IOutputMetaData *_projectedFormat)
  298. {
  299. dbgassertex(expectedFormatCrc); // translation mode wouldn't have been set unless available
  300. translationMode = _translationMode;
  301. publishedFormat.set(_publishedFormat);
  302. publishedFormatCrc = _publishedFormatCrc;
  303. projectedFormat.set(_projectedFormat);
  304. }
  305. const IDynamicTransform *queryTranslator(const char *tracing)
  306. {
  307. if (RecordTranslationMode::None == translationMode)
  308. {
  309. //Check if the file requires translation, but translation is disabled
  310. if (publishedFormatCrc && expectedFormatCrc && (publishedFormatCrc != expectedFormatCrc))
  311. throwTranslationError(publishedFormat->queryRecordAccessor(true), expectedFormat->queryRecordAccessor(true), tracing);
  312. return nullptr;
  313. }
  314. else if (!translator)
  315. {
  316. if (RecordTranslationMode::AlwaysDisk == translationMode)
  317. translator.setown(createRecordTranslator(projectedFormat->queryRecordAccessor(true), publishedFormat->queryRecordAccessor(true)));
  318. else if (RecordTranslationMode::AlwaysECL == translationMode)
  319. {
  320. if (publishedFormatCrc && publishedFormatCrc != expectedFormatCrc)
  321. DBGLOG("Overriding stored record layout reading file %s", tracing);
  322. translator.setown(createRecordTranslator(projectedFormat->queryRecordAccessor(true), expectedFormat->queryRecordAccessor(true)));
  323. }
  324. else if (publishedFormatCrc && publishedFormatCrc != expectedFormatCrc)
  325. {
  326. if (!projectedFormat)
  327. throw MakeStringException(0, "Record layout mismatch for: %s", tracing);
  328. translator.setown(createRecordTranslator(projectedFormat->queryRecordAccessor(true), publishedFormat->queryRecordAccessor(true)));
  329. if (!translator->canTranslate())
  330. throw MakeStringException(0, "Untranslatable record layout mismatch detected for: %s", tracing);
  331. }
  332. DBGLOG("Record layout translator created for %s", tracing);
  333. translator->describe();
  334. dbgassertex(translator->canTranslate());
  335. }
  336. return translator;
  337. }
  338. ISourceRowPrefetcher *queryPrefetcher()
  339. {
  340. if (!prefetcher)
  341. {
  342. if (translator)
  343. prefetcher.setown(publishedFormat->createDiskPrefetcher());
  344. else
  345. prefetcher.setown(expectedFormat->createDiskPrefetcher());
  346. }
  347. return prefetcher;
  348. }
  349. RecordTranslationMode queryTranslationMode() const { return translationMode; }
  350. IOutputMetaData *queryPublishedMeta() const { return publishedFormat; }
  351. unsigned queryPublishedMetaCrc() const { return publishedFormatCrc; }
  352. IOutputMetaData *queryProjectedMeta() const { return projectedFormat; }
  353. };
  354. class CKeyLookupContext : public CContext
  355. {
  356. CLookupKey key;
  357. Owned<IKeyIndex> keyIndex;
  358. public:
  359. CKeyLookupContext(CKJService &_service, CActivityContext *_activityCtx, const CLookupKey &_key)
  360. : CContext(_service, _activityCtx), key(_key)
  361. {
  362. keyIndex.setown(createKeyIndex(key.fname, key.crc, false));
  363. expectedFormat.set(activityCtx->queryHelper()->queryIndexRecordSize());
  364. expectedFormatCrc = activityCtx->queryHelper()->getIndexFormatCrc();
  365. }
  366. unsigned queryHash() const { return key.queryHash(); }
  367. const CLookupKey &queryKey() const { return key; }
  368. inline const char *queryFileName() const { return key.fname; }
  369. IEngineRowAllocator *queryLookupInputAllocator() const { return activityCtx->queryLookupInputAllocator(); }
  370. IOutputRowDeserializer *queryLookupInputDeserializer() const { return activityCtx->queryLookupInputDeserializer(); }
  371. IEngineRowAllocator *queryJoinFieldsAllocator() const { return activityCtx->queryJoinFieldsAllocator(); }
  372. IOutputRowSerializer *queryJoinFieldsSerializer() const { return activityCtx->queryJoinFieldsSerializer(); }
  373. IEngineRowAllocator *queryFetchInputAllocator() const { return activityCtx->queryFetchInputAllocator(); }
  374. IOutputRowDeserializer *queryFetchInputDeserializer() const { return activityCtx->queryFetchInputDeserializer(); }
  375. IEngineRowAllocator *queryFetchOutputAllocator() const { return activityCtx->queryFetchOutputAllocator(); }
  376. IOutputRowSerializer *queryFetchOutputSerializer() const { return activityCtx->queryFetchOutputSerializer(); }
  377. IEngineRowAllocator *queryFetchDiskAllocator() const { return activityCtx->queryFetchDiskAllocator(); }
  378. IOutputRowDeserializer *queryFetchDiskDeserializer() const { return activityCtx->queryFetchDiskDeserializer(); }
  379. IKeyManager *createKeyManager()
  380. {
  381. return createLocalKeyManager(queryHelper()->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, nullptr, queryHelper()->hasNewSegmentMonitors(), false);
  382. }
  383. inline IHThorKeyedJoinArg *queryHelper() const { return activityCtx->queryHelper(); }
  384. };
  385. class CFetchContext : public CContext
  386. {
  387. CFetchKey key;
  388. unsigned handle = 0;
  389. Owned<const IDynamicTransform> translator;
  390. Owned<ISourceRowPrefetcher> prefetcher;
  391. Owned<ISerialStream> ioStream;
  392. CThorContiguousRowBuffer prefetchSource;
  393. bool initialized = false;
  394. public:
  395. CFetchContext(CKJService &_service, CActivityContext *_activityCtx, const CFetchKey &_key) : CContext(_service, _activityCtx), key(_key)
  396. {
  397. handle = service.getUniqId();
  398. expectedFormat.set(activityCtx->queryHelper()->queryDiskRecordSize());
  399. expectedFormatCrc = activityCtx->queryHelper()->getDiskFormatCrc();
  400. }
  401. unsigned queryHandle() const { return handle; }
  402. const CFetchKey &queryKey() const { return key; }
  403. CThorContiguousRowBuffer &queryPrefetchSource()
  404. {
  405. if (!initialized)
  406. {
  407. initialized = true;
  408. Owned<IFileIO> iFileIO = activityCtx->getFetchFileIO(key.partNo);
  409. ioStream.setown(createFileSerialStream(iFileIO, 0, (offset_t)-1, 0));
  410. prefetchSource.setStream(ioStream);
  411. }
  412. return prefetchSource;
  413. }
  414. };
  415. class CKMContainer : public CInterface
  416. {
  417. CKJService &service;
  418. Linked<CKeyLookupContext> ctx;
  419. Owned<IKeyManager> keyManager;
  420. unsigned handle = 0;
  421. Owned<IHThorKeyedJoinArg> helper;
  422. public:
  423. CKMContainer(CKJService &_service, CKeyLookupContext *_ctx)
  424. : service(_service), ctx(_ctx)
  425. {
  426. keyManager.setown(ctx->createKeyManager());
  427. StringBuffer tracing;
  428. const IDynamicTransform *translator = ctx->queryTranslator(ctx->queryKey().getTracing(tracing));
  429. if (translator)
  430. keyManager->setLayoutTranslator(translator);
  431. handle = service.getUniqId();
  432. helper.set(ctx->queryHelper());
  433. }
  434. ~CKMContainer()
  435. {
  436. service.freeLookupContext(ctx.getClear());
  437. }
  438. CKeyLookupContext &queryCtx() const { return *ctx; }
  439. IKeyManager *queryKeyManager() const { return keyManager; }
  440. unsigned queryHandle() const { return handle; }
  441. void setContexts(MemoryBuffer &parentCtxMb, MemoryBuffer &startCtxMb, MemoryBuffer &createCtxMb)
  442. {
  443. // Only create a new helper, if either parent or start are present, in which case onStart evaluation may vary.
  444. if (parentCtxMb.length() || startCtxMb.length())
  445. helper.setown(service.createHelper(*service.currentJob, ctx->queryKey().id, createCtxMb));
  446. helper->onStart((const byte *)parentCtxMb.toByteArray(), startCtxMb.length() ? &startCtxMb : nullptr);
  447. }
  448. inline IHThorKeyedJoinArg *queryHelper() const { return helper; }
  449. };
  450. template<class KEY, class ITEM>
  451. class CKeyedCacheEntry : public CInterface
  452. {
  453. KEY key;
  454. CIArrayOf<ITEM> items;
  455. public:
  456. CKeyedCacheEntry(const KEY &_key) : key(_key)
  457. {
  458. }
  459. inline ITEM *pop()
  460. {
  461. return &items.popGet();
  462. }
  463. inline void push(ITEM *kmc)
  464. {
  465. items.append(*kmc);
  466. }
  467. inline unsigned count() { return items.ordinality(); }
  468. bool remove(ITEM *kmc)
  469. {
  470. return items.zap(*kmc);
  471. }
  472. unsigned queryHash() const { return key.queryHash(); }
  473. const KEY &queryKey() const { return key; }
  474. };
  475. typedef CKeyedCacheEntry<CLookupKey, CKMContainer> CKMKeyEntry;
  476. typedef CKeyedCacheEntry<CFetchKey, CFetchContext> CFCKeyEntry;
  477. class CLookupRequest : public CSimpleInterface
  478. {
  479. protected:
  480. Linked<CActivityContext> activityCtx;
  481. std::vector<const void *> rows;
  482. rank_t sender;
  483. mptag_t replyTag;
  484. bool replyAttempt = false;
  485. IEngineRowAllocator *allocator = nullptr;
  486. IOutputRowDeserializer *deserializer = nullptr;
  487. public:
  488. CLookupRequest(CActivityContext *_activityCtx, rank_t _sender, mptag_t _replyTag)
  489. : activityCtx(_activityCtx), sender(_sender), replyTag(_replyTag)
  490. {
  491. }
  492. ~CLookupRequest()
  493. {
  494. for (auto &r : rows)
  495. ReleaseThorRow(r);
  496. }
  497. inline void addRow(const void *row)
  498. {
  499. rows.push_back(row);
  500. }
  501. inline const void *getRowClear(unsigned r)
  502. {
  503. const void *row = rows[r];
  504. rows[r] = nullptr;
  505. return row;
  506. }
  507. inline unsigned getRowCount() const { return rows.size(); }
  508. inline CActivityContext &queryCtx() const { return *activityCtx; }
  509. void deserialize(size32_t sz, const void *_requestData)
  510. {
  511. MemoryBuffer requestData;
  512. if (activityCtx->useMessageCompression())
  513. fastLZDecompressToBuffer(requestData, _requestData);
  514. else
  515. requestData.setBuffer(sz, (void *)_requestData, false);
  516. unsigned count;
  517. requestData.read(count);
  518. size32_t rowDataSz = requestData.remaining();
  519. CThorStreamDeserializerSource d(rowDataSz, requestData.readDirect(rowDataSz));
  520. for (unsigned r=0; r<count; r++)
  521. {
  522. assertex(!d.eos());
  523. RtlDynamicRowBuilder rowBuilder(allocator);
  524. size32_t sz = deserializer->deserialize(rowBuilder, d);
  525. addRow(rowBuilder.finalizeRowClear(sz));
  526. }
  527. }
  528. void reply(CMessageBuffer &msg)
  529. {
  530. replyAttempt = true;
  531. if (!queryNodeComm().send(msg, sender, replyTag, LONGTIMEOUT))
  532. throw MakeStringException(0, "Failed to reply to lookup request");
  533. }
  534. void replyError(IException *e)
  535. {
  536. EXCLOG(e, "CLookupRequest");
  537. if (replyAttempt)
  538. return;
  539. byte errorCode = kjse_exception;
  540. CMessageBuffer msg;
  541. msg.append(errorCode);
  542. serializeException(e, msg);
  543. if (!queryNodeComm().send(msg, sender, replyTag, LONGTIMEOUT))
  544. throw MakeStringException(0, "Failed to reply to lookup request");
  545. }
  546. virtual void process(bool &abortSoon) = 0;
  547. };
  548. class CLookupResult
  549. {
  550. protected:
  551. CActivityContext &activityCtx;
  552. std::vector<const void *> rows;
  553. IOutputRowSerializer *serializer = nullptr;
  554. void clearRows()
  555. {
  556. for (auto &r : rows)
  557. ReleaseThorRow(r);
  558. rows.clear();
  559. }
  560. public:
  561. CLookupResult(CActivityContext &_activityCtx) : activityCtx(_activityCtx)
  562. {
  563. }
  564. ~CLookupResult()
  565. {
  566. clearRows();
  567. }
  568. void serializeRows(MemoryBuffer &mb) const
  569. {
  570. if (rows.size()) // will be 0 if fetch needed
  571. {
  572. DelayedSizeMarker sizeMark(mb);
  573. CMemoryRowSerializer s(mb);
  574. for (auto &row : rows)
  575. serializer->serialize(s, (const byte *)row);
  576. sizeMark.write();
  577. }
  578. }
  579. };
  580. class CKeyLookupResult : public CLookupResult
  581. {
  582. typedef CLookupResult PARENT;
  583. std::vector<unsigned __int64> fposs;
  584. GroupFlags groupFlag = gf_null;
  585. public:
  586. CKeyLookupResult(CActivityContext &_activityCtx) : PARENT(_activityCtx)
  587. {
  588. serializer = activityCtx.queryJoinFieldsSerializer();
  589. }
  590. void addRow(const void *row, offset_t fpos)
  591. {
  592. if (row)
  593. rows.push_back(row);
  594. fposs.push_back(fpos);
  595. }
  596. void clear()
  597. {
  598. groupFlag = gf_null;
  599. clearRows();
  600. fposs.clear();
  601. }
  602. inline unsigned getCount() const { return fposs.size(); }
  603. inline GroupFlags queryFlag() const { return groupFlag; }
  604. void setFlag(GroupFlags gf)
  605. {
  606. clear();
  607. groupFlag = gf;
  608. }
  609. void serialize(MemoryBuffer &mb) const
  610. {
  611. mb.append(groupFlag);
  612. if (gf_null != groupFlag)
  613. return;
  614. unsigned candidates = fposs.size();
  615. mb.append(candidates);
  616. if (candidates)
  617. {
  618. serializeRows(mb);
  619. // JCSMORE - even in half-keyed join case, fpos' may be used by transform (would be good to have tip from codegen to say if used or not)
  620. mb.append(candidates * sizeof(unsigned __int64), &fposs[0]);
  621. }
  622. }
  623. };
  624. class CKeyLookupRequest : public CLookupRequest
  625. {
  626. IHThorKeyedJoinArg *helper = nullptr;
  627. Linked<CKMContainer> kmc;
  628. rowcount_t abortLimit = 0;
  629. rowcount_t atMost = 0;
  630. bool fetchRequired = false;
  631. IEngineRowAllocator *joinFieldsAllocator = nullptr;
  632. template <class HeaderStruct>
  633. void getHeaderFromRow(const void *row, HeaderStruct &header)
  634. {
  635. memcpy(&header, row, sizeof(HeaderStruct));
  636. }
  637. void processRow(const void *row, IKeyManager *keyManager, CKeyLookupResult &reply)
  638. {
  639. KeyLookupHeader lookupKeyHeader;
  640. getHeaderFromRow(row, lookupKeyHeader);
  641. const void *keyedFieldsRow = (byte *)row + sizeof(KeyLookupHeader);
  642. helper->createSegmentMonitors(keyManager, keyedFieldsRow);
  643. keyManager->finishSegmentMonitors();
  644. keyManager->reset();
  645. unsigned candidates = 0;
  646. // NB: keepLimit is not on hard matches and can only be applied later, since other filtering (e.g. in transform) may keep below keepLimit
  647. while (keyManager->lookup(true))
  648. {
  649. ++candidates;
  650. if (candidates > abortLimit)
  651. {
  652. reply.setFlag(gf_limitabort);
  653. break;
  654. }
  655. else if (candidates > atMost) // atMost - filter out group if > max hard matches
  656. {
  657. reply.setFlag(gf_limitatmost);
  658. break;
  659. }
  660. KLBlobProviderAdapter adapter(keyManager);
  661. byte const * keyRow = keyManager->queryKeyBuffer();
  662. size_t fposOffset = keyManager->queryRowSize() - sizeof(offset_t);
  663. offset_t fpos = rtlReadBigUInt8(keyRow + fposOffset);
  664. if (helper->indexReadMatch(keyedFieldsRow, keyRow, &adapter))
  665. {
  666. if (fetchRequired)
  667. reply.addRow(nullptr, fpos);
  668. else
  669. {
  670. RtlDynamicRowBuilder joinFieldsRowBuilder(joinFieldsAllocator);
  671. size32_t sz = helper->extractJoinFields(joinFieldsRowBuilder, keyRow, &adapter);
  672. /* NB: Each row lookup could in theory == lots of keyed results. If needed to break into smaller replies
  673. * Would have to create/keep a keyManager per sender, in those circumstances.
  674. * As it stands, each lookup will be processed and all rows (below limits) will be returned, but I think that's okay.
  675. * There are other reasons why might want a keyManager per sender, e.g. for concurrency.
  676. */
  677. reply.addRow(joinFieldsRowBuilder.finalizeRowClear(sz), fpos);
  678. }
  679. }
  680. }
  681. keyManager->releaseSegmentMonitors();
  682. }
  683. const unsigned DEFAULT_KEYLOOKUP_MAXREPLYSZ = 0x100000;
  684. public:
  685. CKeyLookupRequest(CKJService &_service, CKeyLookupContext *_ctx, CKMContainer *_kmc, rank_t _sender, mptag_t _replyTag)
  686. : CLookupRequest(_ctx->queryActivityCtx(), _sender, _replyTag), kmc(_kmc)
  687. {
  688. helper = kmc->queryHelper();
  689. allocator = activityCtx->queryLookupInputAllocator();
  690. deserializer = activityCtx->queryLookupInputDeserializer();
  691. joinFieldsAllocator = activityCtx->queryJoinFieldsAllocator();
  692. atMost = helper->getJoinLimit();
  693. if (atMost == 0)
  694. atMost = (unsigned)-1;
  695. abortLimit = helper->getMatchAbortLimit();
  696. if (abortLimit == 0)
  697. abortLimit = (unsigned)-1;
  698. if (abortLimit < atMost)
  699. atMost = abortLimit;
  700. fetchRequired = helper->diskAccessRequired();
  701. }
  702. virtual void process(bool &abortSoon) override
  703. {
  704. Owned<IException> exception;
  705. try
  706. {
  707. CKeyLookupResult lookupResult(*activityCtx); // reply for 1 request row
  708. byte errorCode = kjse_nop;
  709. CMessageBuffer replyMsg;
  710. replyMsg.append(errorCode);
  711. unsigned startPos = replyMsg.length();
  712. MemoryBuffer tmpMB;
  713. MemoryBuffer &replyMb = activityCtx->useMessageCompression() ? tmpMB : replyMsg;
  714. replyMb.append(kmc->queryHandle()); // NB: not resent if multiple packets, see below
  715. DelayedMarker<unsigned> countMarker(replyMb);
  716. unsigned rowCount = getRowCount();
  717. unsigned rowNum = 0;
  718. unsigned rowStart = 0;
  719. unsigned __int64 startSeeks = kmc->queryKeyManager()->querySeeks();
  720. unsigned __int64 startScans = kmc->queryKeyManager()->queryScans();
  721. unsigned __int64 startWildSeeks = kmc->queryKeyManager()->queryWildSeeks();
  722. while (!abortSoon)
  723. {
  724. OwnedConstThorRow row = getRowClear(rowNum++);
  725. processRow(row, kmc->queryKeyManager(), lookupResult);
  726. lookupResult.serialize(replyMb);
  727. bool last = rowNum == rowCount;
  728. if (last || (replyMb.length() >= DEFAULT_KEYLOOKUP_MAXREPLYSZ))
  729. {
  730. countMarker.write(rowNum-rowStart);
  731. replyMb.append(kmc->queryKeyManager()->querySeeks()-startSeeks);
  732. replyMb.append(kmc->queryKeyManager()->queryScans()-startScans);
  733. replyMb.append(kmc->queryKeyManager()->queryWildSeeks()-startWildSeeks);
  734. if (activityCtx->useMessageCompression())
  735. {
  736. fastLZCompressToBuffer(replyMsg, tmpMB.length(), tmpMB.toByteArray());
  737. tmpMB.clear();
  738. }
  739. reply(replyMsg);
  740. if (last)
  741. break;
  742. replyMsg.setLength(startPos);
  743. countMarker.restart();
  744. // NB: handle not resent, 1st packet was { errorCode, handle, key-row-count, key-row-data.. }, subsequent packets are { errorCode, key-row-count, key-row-data.. }
  745. rowStart = rowNum;
  746. }
  747. lookupResult.clear();
  748. }
  749. }
  750. catch (IException *e)
  751. {
  752. exception.setown(e);
  753. }
  754. if (exception)
  755. replyError(exception);
  756. }
  757. };
  758. class CFetchLookupResult : public CLookupResult
  759. {
  760. typedef CLookupResult PARENT;
  761. unsigned accepted = 0;
  762. unsigned rejected = 0;
  763. public:
  764. CFetchLookupResult(CActivityContext &_activityCtx) : PARENT(_activityCtx)
  765. {
  766. serializer = activityCtx.queryFetchOutputSerializer();
  767. }
  768. inline void incAccepted() { ++accepted; }
  769. inline void incRejected() { ++rejected; }
  770. void addRow(const void *row)
  771. {
  772. rows.push_back(row);
  773. }
  774. void clear()
  775. {
  776. clearRows();
  777. }
  778. void serialize(MemoryBuffer &mb) const
  779. {
  780. unsigned numRows = rows.size();
  781. mb.append(numRows);
  782. if (numRows)
  783. serializeRows(mb);
  784. mb.append(accepted);
  785. mb.append(rejected);
  786. }
  787. };
  788. class CFetchLookupRequest : public CLookupRequest
  789. {
  790. IHThorKeyedJoinArg *helper = nullptr;
  791. Linked<CFetchContext> fetchContext;
  792. const unsigned defaultMaxFetchLookupReplySz = 0x100000;
  793. const IDynamicTransform *translator = nullptr;
  794. ISourceRowPrefetcher *prefetcher = nullptr;
  795. CThorContiguousRowBuffer &prefetchSource;
  796. void processRow(const void *row, CFetchLookupResult &reply)
  797. {
  798. FetchRequestHeader &requestHeader = *(FetchRequestHeader *)row;
  799. const void *fetchKey = nullptr;
  800. if (0 != activityCtx->queryFetchInMinSize())
  801. fetchKey = (const byte *)row + sizeof(FetchRequestHeader);
  802. prefetchSource.reset(requestHeader.fpos);
  803. prefetcher->readAhead(prefetchSource);
  804. const byte *diskFetchRow = prefetchSource.queryRow();
  805. RtlDynamicRowBuilder fetchReplyBuilder(activityCtx->queryFetchOutputAllocator());
  806. FetchReplyHeader &replyHeader = *(FetchReplyHeader *)fetchReplyBuilder.getUnfinalized();
  807. replyHeader.sequence = requestHeader.sequence;
  808. const void * &childRow = *(const void **)((byte *)fetchReplyBuilder.getUnfinalized() + sizeof(FetchReplyHeader));
  809. MemoryBuffer diskFetchRowMb;
  810. if (translator)
  811. {
  812. MemoryBufferBuilder aBuilder(diskFetchRowMb, 0);
  813. LocalVirtualFieldCallback fieldCallback("<MORE>", requestHeader.fpos, 0);
  814. translator->translate(aBuilder, fieldCallback, diskFetchRow);
  815. diskFetchRow = aBuilder.getSelf();
  816. }
  817. size32_t fetchReplySz = sizeof(FetchReplyHeader);
  818. if (helper->fetchMatch(fetchKey, diskFetchRow))
  819. {
  820. replyHeader.sequence |= FetchReplyHeader::fetchMatchedMask;
  821. RtlDynamicRowBuilder joinFieldsRow(activityCtx->queryJoinFieldsAllocator());
  822. size32_t joinFieldsSz = helper->extractJoinFields(joinFieldsRow, diskFetchRow, (IBlobProvider*)nullptr); // JCSMORE is it right that passing NULL IBlobProvider here??
  823. fetchReplySz += joinFieldsSz;
  824. childRow = joinFieldsRow.finalizeRowClear(joinFieldsSz);
  825. reply.incAccepted();
  826. }
  827. else
  828. {
  829. childRow = nullptr;
  830. reply.incRejected();
  831. }
  832. reply.addRow(fetchReplyBuilder.finalizeRowClear(fetchReplySz));
  833. }
  834. public:
  835. CFetchLookupRequest(CKJService &_service, CFetchContext *_fetchContext, rank_t _sender, mptag_t _replyTag)
  836. : CLookupRequest(_fetchContext->queryActivityCtx(), _sender, _replyTag),
  837. fetchContext(_fetchContext), prefetchSource(fetchContext->queryPrefetchSource())
  838. {
  839. allocator = activityCtx->queryFetchInputAllocator();
  840. deserializer = activityCtx->queryFetchInputDeserializer();
  841. StringBuffer tracing;
  842. translator = fetchContext->queryTranslator(fetchContext->queryKey().getTracing(tracing));
  843. prefetcher = fetchContext->queryPrefetcher();
  844. helper = queryCtx().queryHelper();
  845. }
  846. virtual void process(bool &abortSoon) override
  847. {
  848. Owned<IException> exception;
  849. try
  850. {
  851. CFetchLookupResult fetchLookupResult(*activityCtx);
  852. byte errorCode = kjse_nop;
  853. CMessageBuffer replyMsg;
  854. replyMsg.append(errorCode);
  855. unsigned startPos = replyMsg.length();
  856. MemoryBuffer tmpMB;
  857. MemoryBuffer &replyMb = activityCtx->useMessageCompression() ? tmpMB : replyMsg;
  858. replyMb.append(fetchContext->queryHandle()); // NB: not resent if multiple packets, see below
  859. unsigned rowCount = getRowCount();
  860. unsigned rowNum = 0;
  861. // JCSMORE sorting batch of requests by fpos could reduce seeking...
  862. while (!abortSoon)
  863. {
  864. OwnedConstThorRow row = getRowClear(rowNum++);
  865. processRow(row, fetchLookupResult);
  866. bool last = rowNum == rowCount;
  867. if (last || (replyMb.length() >= defaultMaxFetchLookupReplySz))
  868. {
  869. fetchLookupResult.serialize(replyMb);
  870. if (activityCtx->useMessageCompression())
  871. {
  872. fastLZCompressToBuffer(replyMsg, tmpMB.length(), tmpMB.toByteArray());
  873. tmpMB.clear();
  874. }
  875. reply(replyMsg);
  876. if (last)
  877. break;
  878. replyMsg.setLength(startPos);
  879. // NB: handle not resent, 1st packet was { errorCode, handle, fetch-row-count, fetch-row-data.. }, subsequent packets are { errorCode, fetch-row-count, fetch-row-data.. }
  880. fetchLookupResult.clear();
  881. }
  882. }
  883. }
  884. catch (IException *e)
  885. {
  886. exception.setown(e);
  887. }
  888. if (exception)
  889. replyError(exception);
  890. }
  891. };
  892. class CRemoteLookupProcessor : public CSimpleInterfaceOf<IPooledThread>
  893. {
  894. Owned<CLookupRequest> lookupRequest;
  895. bool abortSoon = false;
  896. public:
  897. CRemoteLookupProcessor(CKJService &_service)
  898. {
  899. }
  900. // IPooledThread impl.
  901. virtual void init(void *param) override
  902. {
  903. abortSoon = false;
  904. lookupRequest.set((CLookupRequest *)param);
  905. }
  906. virtual bool stop() override
  907. {
  908. abortSoon = true; return true;
  909. }
  910. virtual bool canReuse() const override { return true; }
  911. virtual void threadmain() override
  912. {
  913. Owned<CLookupRequest> request = lookupRequest.getClear();
  914. request->process(abortSoon);
  915. }
  916. };
  917. class CProcessorFactory : public CSimpleInterfaceOf<IThreadFactory>
  918. {
  919. CKJService &service;
  920. public:
  921. CProcessorFactory(CKJService &_service) : service(_service)
  922. {
  923. }
  924. // IThreadFactory
  925. virtual IPooledThread *createNew() override
  926. {
  927. return service.createProcessor();
  928. }
  929. };
  930. std::unordered_map<unsigned, CActivityContext *> activityContextsHT;
  931. std::unordered_map<CLookupKey, CKeyLookupContext *, CLookupKeyHasher> keyLookupContextsHT;
  932. std::unordered_map<CLookupKey, Owned<CKMKeyEntry>, CLookupKeyHasher> cachedKMs;
  933. std::unordered_map<CFetchKey, Owned<CFCKeyEntry>, CFetchKeyHasher> cachedFetchContexts;
  934. std::unordered_map<unsigned, Owned<CKMContainer>> activeKManagersByHandle;
  935. std::unordered_map<unsigned, Owned<CFetchContext>> activeFetchContextsByHandle;
  936. CICopyArrayOf<CKMContainer> cachedKMsMRU;
  937. CICopyArrayOf<CFetchContext> cachedFCsMRU;
  938. CriticalSection kMCrit, lCCrit;
  939. Owned<IThreadPool> processorPool;
  940. IHThorKeyedJoinArg *createHelper(CJobBase &job, activity_id id, MemoryBuffer &createCtxMb)
  941. {
  942. VStringBuffer helperName("fAc%u", (unsigned)id);
  943. EclHelperFactory helperFactory = (EclHelperFactory) job.queryDllEntry().getEntry(helperName.str());
  944. if (!helperFactory)
  945. throw makeOsExceptionV(GetLastError(), "Failed to load helper factory method: %s (dll handle = %p)", helperName.str(), job.queryDllEntry().getInstance());
  946. ICodeContext &codeCtx = job.queryJobChannel(0).querySharedMemCodeContext();
  947. Owned<IHThorKeyedJoinArg> helper = static_cast<IHThorKeyedJoinArg *>(helperFactory());
  948. helper->onCreate(&codeCtx, nullptr, &createCtxMb); // JCS->GH - will I ever need colocalParent here?
  949. return helper.getClear();
  950. }
  951. CActivityContext *createActivityContext(CJobBase &job, activity_id id, MemoryBuffer &createCtxMb)
  952. {
  953. IHThorKeyedJoinArg *helper = createHelper(job, id, createCtxMb);
  954. ICodeContext &codeCtx = job.queryJobChannel(0).querySharedMemCodeContext();
  955. return new CActivityContext(*this, id, helper, &codeCtx);
  956. }
  957. CActivityContext *ensureActivityContext(CJobBase &job, activity_id id, MemoryBuffer &createCtxMb)
  958. {
  959. CriticalBlock b(lCCrit);
  960. auto it = activityContextsHT.find(id);
  961. if (it != activityContextsHT.end())
  962. return LINK(it->second);
  963. CActivityContext *activityCtx = createActivityContext(job, id, createCtxMb);
  964. activityContextsHT.insert({id, activityCtx}); // NB: does not link/take ownership
  965. return activityCtx;
  966. }
  967. CKeyLookupContext *createLookupContext(CActivityContext *activityCtx, const CLookupKey &key)
  968. {
  969. return new CKeyLookupContext(*this, activityCtx, key);
  970. }
  971. CKeyLookupContext *ensureKeyLookupContext(CJobBase &job, const CLookupKey &key, MemoryBuffer &createCtxMb, bool *created=nullptr)
  972. {
  973. CriticalBlock b(lCCrit);
  974. auto it = keyLookupContextsHT.find(key);
  975. if (it != keyLookupContextsHT.end())
  976. {
  977. if (created)
  978. *created = false;
  979. return LINK(it->second);
  980. }
  981. if (created)
  982. *created = true;
  983. Owned<CActivityContext> activityCtx = ensureActivityContext(job, key.id, createCtxMb);
  984. CKeyLookupContext *keyLookupContext = createLookupContext(activityCtx, key);
  985. keyLookupContextsHT.insert({key, keyLookupContext}); // NB: does not link/take ownership
  986. return keyLookupContext;
  987. }
  988. void removeActivityContext(CActivityContext *activityContext)
  989. {
  990. CriticalBlock b(lCCrit);
  991. activityContextsHT.erase(activityContext->queryId());
  992. }
  993. void freeActivityContext(CActivityContext *_activityContext)
  994. {
  995. Owned<CActivityContext> activityContext = _activityContext;
  996. CriticalBlock b(lCCrit);
  997. if (!activityContext->IsShared())
  998. activityContextsHT.erase(activityContext->queryId());
  999. }
  1000. void freeLookupContext(CKeyLookupContext *_lookupContext)
  1001. {
  1002. Owned<CKeyLookupContext> keyLookupContext = _lookupContext;
  1003. CriticalBlock b(lCCrit);
  1004. if (!keyLookupContext->IsShared())
  1005. keyLookupContextsHT.erase(keyLookupContext->queryKey());
  1006. }
  1007. CFetchContext *getActiveFetchContext(unsigned handle)
  1008. {
  1009. CriticalBlock b(lCCrit);
  1010. auto it = activeFetchContextsByHandle.find(handle);
  1011. if (it == activeFetchContextsByHandle.end())
  1012. return nullptr;
  1013. return it->second.getLink();
  1014. }
  1015. CFetchContext *getCachedFetchContext(const CFetchKey &key)
  1016. {
  1017. CriticalBlock b(lCCrit);
  1018. auto it = cachedFetchContexts.find(key);
  1019. if (it == cachedFetchContexts.end())
  1020. return nullptr;
  1021. CFCKeyEntry *fce = it->second;
  1022. CFetchContext *fetchContext = fce->pop(); // will always be at least 1.
  1023. if (0 == fce->count())
  1024. cachedFetchContexts.erase(it);
  1025. verifyex(cachedFCsMRU.zap(*fetchContext));
  1026. --numFCCached;
  1027. return fetchContext;
  1028. }
  1029. CFetchContext *createActiveFetchContext(CJobBase &job, CFetchKey &key, MemoryBuffer &createCtxMb) // gets a unused (cached) CFetchContext based on context key, or create a new one.
  1030. {
  1031. CFetchContext *fetchContext = getCachedFetchContext(key);
  1032. if (!fetchContext)
  1033. {
  1034. Owned<CActivityContext> activityCtx = ensureActivityContext(job, key.id, createCtxMb);
  1035. fetchContext = new CFetchContext(*this, activityCtx, key);
  1036. }
  1037. activeFetchContextsByHandle.insert({fetchContext->queryHandle(), fetchContext}); // owns
  1038. return LINK(fetchContext);
  1039. }
  1040. bool removeActiveFetchContext(unsigned handle)
  1041. {
  1042. auto it = activeFetchContextsByHandle.find(handle);
  1043. if (it == activeFetchContextsByHandle.end())
  1044. return false;
  1045. Linked<CFetchContext> fetchContext = it->second;
  1046. activeFetchContextsByHandle.erase(it);
  1047. if (maxCachedFetchContexts)
  1048. addToFetchContextCache(fetchContext.getClear());
  1049. return true;
  1050. }
  1051. CKMContainer *getActiveKeyManager(unsigned handle)
  1052. {
  1053. CriticalBlock b(kMCrit);
  1054. auto it = activeKManagersByHandle.find(handle);
  1055. if (it == activeKManagersByHandle.end())
  1056. return nullptr;
  1057. return it->second.getLink();
  1058. }
  1059. CKMContainer *getCachedKeyManager(const CLookupKey &key)
  1060. {
  1061. CriticalBlock b(kMCrit);
  1062. auto it = cachedKMs.find(key);
  1063. if (it == cachedKMs.end())
  1064. return nullptr;
  1065. CKMKeyEntry *kme = it->second;
  1066. CKMContainer *kmc = kme->pop(); // will always be at least 1.
  1067. if (0 == kme->count())
  1068. cachedKMs.erase(it);
  1069. verifyex(cachedKMsMRU.zap(*kmc));
  1070. --numKMCached;
  1071. return kmc;
  1072. }
  1073. CKMContainer *createActiveKeyManager(CKeyLookupContext *keyLookupContext) // gets a unused (cached) CKM based on context key, or create a new one.
  1074. {
  1075. CKMContainer *kmc = getCachedKeyManager(keyLookupContext->queryKey());
  1076. if (!kmc)
  1077. {
  1078. // NB: container links keyLookupContext, and will remove it from keyLookupContextsHT when last reference.
  1079. // The container creates new IKeyManager and unique handle
  1080. kmc = new CKMContainer(*this, keyLookupContext);
  1081. }
  1082. activeKManagersByHandle.insert({kmc->queryHandle(), kmc}); // owns
  1083. return LINK(kmc);
  1084. }
  1085. bool removeActiveKeyManager(unsigned handle)
  1086. {
  1087. auto it = activeKManagersByHandle.find(handle);
  1088. if (it == activeKManagersByHandle.end())
  1089. return false;
  1090. Linked<CKMContainer> kmc = it->second;
  1091. activeKManagersByHandle.erase(it);
  1092. if (maxCachedKJManagers)
  1093. addToKeyManagerCache(kmc.getClear());
  1094. return true;
  1095. }
  1096. unsigned getUniqId()
  1097. {
  1098. ++uniqueId;
  1099. return uniqueId;
  1100. }
  1101. void clearAll()
  1102. {
  1103. if (activeKManagersByHandle.size())
  1104. {
  1105. DBGLOG("KJService: clearing active %u key manager container(s), that were not closed cleanly", (unsigned)activeKManagersByHandle.size());
  1106. activeKManagersByHandle.clear();
  1107. }
  1108. if (activeFetchContextsByHandle.size())
  1109. {
  1110. DBGLOG("KJService: clearing %u fetch context(s), that were not closed cleanly", (unsigned)activeFetchContextsByHandle.size());
  1111. activeFetchContextsByHandle.clear();
  1112. }
  1113. cachedKMs.clear();
  1114. cachedFetchContexts.clear();
  1115. activityContextsHT.clear();
  1116. keyLookupContextsHT.clear();
  1117. cachedKMsMRU.kill();
  1118. cachedFCsMRU.kill();
  1119. currentJob = nullptr;
  1120. numKMCached = 0;
  1121. numFCCached = 0;
  1122. }
  1123. void setupProcessorPool()
  1124. {
  1125. Owned<CProcessorFactory> factory = new CProcessorFactory(*this);
  1126. processorPool.setown(createThreadPool("KJService processor pool", factory, this, keyLookupMaxProcessThreads, 10000));
  1127. processorPool->setStartDelayTracing(60000);
  1128. }
  1129. public:
  1130. IMPLEMENT_IINTERFACE_USING(CSimpleInterfaceOf<IKJService>);
  1131. CKJService(mptag_t _mpTag) : threaded("CKJService", this), keyLookupMpTag(_mpTag)
  1132. {
  1133. setupProcessorPool();
  1134. }
  1135. ~CKJService()
  1136. {
  1137. stop();
  1138. }
  1139. void addToKeyManagerCache(CKMContainer *kmc)
  1140. {
  1141. dbgassertex(maxCachedKJManagers); // should not be called if maxCachedKJManagers is 0
  1142. CriticalBlock b(kMCrit);
  1143. if (numKMCached == maxCachedKJManagers)
  1144. {
  1145. CKMContainer &oldest = cachedKMsMRU.item(0);
  1146. auto it = cachedKMs.find(oldest.queryCtx().queryKey());
  1147. assertex(it != cachedKMs.end());
  1148. CKMKeyEntry *kme = it->second;
  1149. verifyex(kme->remove(&oldest));
  1150. if (0 == kme->count())
  1151. cachedKMs.erase(it);
  1152. cachedKMsMRU.remove(0);
  1153. --numKMCached;
  1154. }
  1155. const CLookupKey &key = kmc->queryCtx().queryKey();
  1156. auto it = cachedKMs.find(key);
  1157. CKMKeyEntry *kme;
  1158. if (it == cachedKMs.end())
  1159. {
  1160. kme = new CKMKeyEntry(key);
  1161. cachedKMs.insert({key, kme});
  1162. }
  1163. else
  1164. kme = it->second;
  1165. kme->push(kmc); // takes ownership. JCSMORE cap. to some max #
  1166. cachedKMsMRU.append(*kmc);
  1167. ++numKMCached;
  1168. }
  1169. void addToFetchContextCache(CFetchContext *fc)
  1170. {
  1171. dbgassertex(maxCachedFetchContexts); // should not be called if maxCachedFetchContexts is 0
  1172. CriticalBlock b(lCCrit);
  1173. if (numFCCached == maxCachedFetchContexts)
  1174. {
  1175. CFetchContext &oldest = cachedFCsMRU.item(0);
  1176. auto it = cachedFetchContexts.find(oldest.queryKey());
  1177. assertex(it != cachedFetchContexts.end());
  1178. CFCKeyEntry *fce = it->second;
  1179. verifyex(fce->remove(&oldest));
  1180. if (0 == fce->count())
  1181. cachedFetchContexts.erase(it);
  1182. cachedFCsMRU.remove(0);
  1183. --numFCCached;
  1184. }
  1185. const CFetchKey &key = fc->queryKey();
  1186. auto it = cachedFetchContexts.find(key);
  1187. CFCKeyEntry *fce;
  1188. if (it == cachedFetchContexts.end())
  1189. {
  1190. fce = new CFCKeyEntry(key);
  1191. cachedFetchContexts.insert({key, fce});
  1192. }
  1193. else
  1194. fce = it->second;
  1195. fce->push(fc); // takes ownership. JCSMORE cap. to some max #
  1196. cachedFCsMRU.append(*fc);
  1197. ++numFCCached;
  1198. }
  1199. void abort()
  1200. {
  1201. if (aborted)
  1202. return;
  1203. aborted = true;
  1204. queryNodeComm().cancel(RANK_ALL, keyLookupMpTag);
  1205. processorPool->stopAll(true);
  1206. processorPool->joinAll(true);
  1207. threaded.join(INFINITE);
  1208. clearAll();
  1209. }
  1210. void processKeyLookupRequest(CMessageBuffer &msg, CKMContainer *kmc, rank_t sender, mptag_t replyTag)
  1211. {
  1212. CKeyLookupContext *keyLookupContext = &kmc->queryCtx();
  1213. Owned<CKeyLookupRequest> lookupRequest = new CKeyLookupRequest(*this, keyLookupContext, kmc, sender, replyTag);
  1214. size32_t requestSz;
  1215. msg.read(requestSz);
  1216. lookupRequest->deserialize(requestSz, msg.readDirect(requestSz));
  1217. msg.clear();
  1218. // NB: kmc is added to cache at end of request handling
  1219. processorPool->start(lookupRequest);
  1220. }
  1221. IPooledThread *createProcessor()
  1222. {
  1223. return new CRemoteLookupProcessor(*this);
  1224. }
  1225. // IThreaded
  1226. virtual void threadmain() override
  1227. {
  1228. while (!aborted)
  1229. {
  1230. rank_t sender = RANK_NULL;
  1231. CMessageBuffer msg;
  1232. mptag_t replyTag = TAG_NULL;
  1233. byte errorCode = kjse_nop;
  1234. bool replyAttempt = false;
  1235. try
  1236. {
  1237. if (!queryNodeComm().recv(msg, RANK_ALL, keyLookupMpTag, &sender))
  1238. break;
  1239. if (!msg.length())
  1240. break;
  1241. assertex(currentJob);
  1242. KJServiceCmds cmd;
  1243. readUnderlyingType<KJServiceCmds>(msg, cmd);
  1244. readUnderlyingType<mptag_t>(msg, replyTag);
  1245. switch (cmd)
  1246. {
  1247. case kjs_keyopen:
  1248. {
  1249. CLookupKey key(msg);
  1250. size32_t createCtxSz;
  1251. msg.read(createCtxSz);
  1252. MemoryBuffer createCtxMb;
  1253. createCtxMb.setBuffer(createCtxSz, (void *)msg.readDirect(createCtxSz)); // NB: read only
  1254. size32_t parentCtxSz;
  1255. msg.read(parentCtxSz);
  1256. MemoryBuffer parentCtxMb;
  1257. parentCtxMb.setBuffer(parentCtxSz, (void *)msg.readDirect(parentCtxSz)); // NB: read only
  1258. size32_t startCtxSz;
  1259. msg.read(startCtxSz);
  1260. MemoryBuffer startCtxMb;
  1261. startCtxMb.setBuffer(startCtxSz, (void *)msg.readDirect(startCtxSz)); // NB: read only
  1262. bool created;
  1263. Owned<CKeyLookupContext> keyLookupContext = ensureKeyLookupContext(*currentJob, key, createCtxMb, &created); // ensure entry in keyLookupContextsHT, will be removed by last CKMContainer
  1264. bool messageCompression;
  1265. msg.read(messageCompression);
  1266. keyLookupContext->queryActivityCtx()->setMessageCompression(messageCompression);
  1267. RecordTranslationMode translationMode;
  1268. readUnderlyingType(msg, translationMode);
  1269. if (RecordTranslationMode::None != translationMode)
  1270. {
  1271. unsigned publishedFormatCrc;
  1272. msg.read(publishedFormatCrc);
  1273. Owned<IOutputMetaData> publishedFormat = createTypeInfoOutputMetaData(msg, false);
  1274. Owned<IOutputMetaData> projectedFormat;
  1275. bool projected;
  1276. msg.read(projected);
  1277. if (projected)
  1278. projectedFormat.setown(createTypeInfoOutputMetaData(msg, false));
  1279. else
  1280. projectedFormat.set(publishedFormat);
  1281. if (created) // translation for the key context will already have been setup and do not want to free existing
  1282. keyLookupContext->setTranslation(translationMode, publishedFormat, publishedFormatCrc, projectedFormat);
  1283. }
  1284. Owned<CKMContainer> kmc = createActiveKeyManager(keyLookupContext); // owns keyLookupContext
  1285. kmc->setContexts(parentCtxMb, startCtxMb, createCtxMb);
  1286. processKeyLookupRequest(msg, kmc, sender, replyTag);
  1287. break;
  1288. }
  1289. case kjs_keyread:
  1290. {
  1291. unsigned handle;
  1292. msg.read(handle);
  1293. dbgassertex(handle);
  1294. Owned<CKMContainer> kmc = getActiveKeyManager(handle);
  1295. if (!kmc) // if closed/not known, alternative is to send just handle and send challenge response if unknown
  1296. {
  1297. msg.clear();
  1298. errorCode = kjse_unknownhandle;
  1299. msg.append(errorCode);
  1300. if (!queryNodeComm().send(msg, sender, replyTag, LONGTIMEOUT))
  1301. throw MakeStringException(0, "Failed to reply to challenge on key read");
  1302. // client will resent with kjs_keyopen + full info.
  1303. continue;
  1304. }
  1305. processKeyLookupRequest(msg, kmc, sender, replyTag);
  1306. break;
  1307. }
  1308. case kjs_keyclose:
  1309. {
  1310. unsigned handle;
  1311. msg.read(handle);
  1312. bool res = removeActiveKeyManager(handle);
  1313. msg.clear();
  1314. msg.append(errorCode);
  1315. msg.append(res);
  1316. replyAttempt = true;
  1317. if (!queryNodeComm().send(msg, sender, replyTag, LONGTIMEOUT))
  1318. throw MakeStringException(0, "kjs_keyclose: Failed to reply to lookup request");
  1319. msg.clear();
  1320. break;
  1321. }
  1322. case kjs_fetchopen:
  1323. {
  1324. CFetchKey key(msg); // key by {actid, partNo}
  1325. size32_t createCtxSz;
  1326. msg.read(createCtxSz);
  1327. MemoryBuffer createCtxMb;
  1328. createCtxMb.setBuffer(createCtxSz, (void *)msg.readDirect(createCtxSz)); // NB: read only
  1329. Owned<CFetchContext> fetchContext = createActiveFetchContext(*currentJob, key, createCtxMb);
  1330. CActivityContext *activityCtx = fetchContext->queryActivityCtx();
  1331. /* NB: clients will send it on their first request, but might already have from others
  1332. * If have it already, ignore/skip it.
  1333. * Alternative is to not send it by default on 1st request and send challenge response.
  1334. */
  1335. byte flags;
  1336. msg.read(flags); // compress/encrypted;
  1337. StringAttr fname;
  1338. msg.read(fname);
  1339. // NB: will be ignored if it already has it
  1340. activityCtx->addFetchFile(flags, key.partNo, fname);
  1341. bool messageCompression;
  1342. msg.read(messageCompression);
  1343. activityCtx->setMessageCompression(messageCompression);
  1344. RecordTranslationMode translationMode;
  1345. readUnderlyingType(msg, translationMode);
  1346. if (RecordTranslationMode::None != translationMode)
  1347. {
  1348. unsigned publishedFormatCrc;
  1349. msg.read(publishedFormatCrc);
  1350. Owned<IOutputMetaData> publishedFormat = createTypeInfoOutputMetaData(msg, false);
  1351. Owned<IOutputMetaData> projectedFormat;
  1352. bool projected;
  1353. msg.read(projected);
  1354. if (projected)
  1355. projectedFormat.setown(createTypeInfoOutputMetaData(msg, false));
  1356. else
  1357. projectedFormat.set(publishedFormat);
  1358. fetchContext->setTranslation(translationMode, publishedFormat, publishedFormatCrc, projectedFormat);
  1359. }
  1360. Owned<CFetchLookupRequest> lookupRequest = new CFetchLookupRequest(*this, fetchContext, sender, replyTag);
  1361. size32_t requestSz;
  1362. msg.read(requestSz);
  1363. lookupRequest->deserialize(requestSz, msg.readDirect(requestSz));
  1364. msg.clear();
  1365. // NB: kmc is added to cache at end of request handling
  1366. processorPool->start(lookupRequest);
  1367. break;
  1368. }
  1369. case kjs_fetchread:
  1370. {
  1371. unsigned handle;
  1372. msg.read(handle);
  1373. dbgassertex(handle);
  1374. Owned<CFetchContext> fetchContext = getActiveFetchContext(handle);
  1375. if (!fetchContext) // if closed/not known, alternative is to send just handle and send challenge response if unknown
  1376. {
  1377. msg.clear();
  1378. errorCode = kjse_unknownhandle;
  1379. msg.append(errorCode);
  1380. if (!queryNodeComm().send(msg, sender, replyTag, LONGTIMEOUT))
  1381. throw MakeStringException(0, "Failed to reply to challenge on fetch read");
  1382. // client will resent with kjs_fetchopen + full info.
  1383. continue;
  1384. }
  1385. Owned<CFetchLookupRequest> lookupRequest = new CFetchLookupRequest(*this, fetchContext, sender, replyTag);
  1386. size32_t requestSz;
  1387. msg.read(requestSz);
  1388. lookupRequest->deserialize(requestSz, msg.readDirect(requestSz));
  1389. msg.clear();
  1390. // NB: kmc is added to cache at end of request handling
  1391. processorPool->start(lookupRequest);
  1392. break;
  1393. }
  1394. case kjs_fetchclose:
  1395. {
  1396. unsigned handle;
  1397. msg.read(handle);
  1398. bool res = removeActiveFetchContext(handle);
  1399. msg.clear();
  1400. msg.append(errorCode);
  1401. msg.append(res);
  1402. replyAttempt = true;
  1403. if (!queryNodeComm().send(msg, sender, replyTag, LONGTIMEOUT))
  1404. throw MakeStringException(0, "kjs_fetchclose: Failed to reply to lookup request");
  1405. msg.clear();
  1406. break;
  1407. }
  1408. default:
  1409. throwUnexpected();
  1410. }
  1411. }
  1412. catch (IMP_Exception *e)
  1413. {
  1414. EXCLOG(e, nullptr);
  1415. e->Release();
  1416. break;
  1417. }
  1418. catch (IJSOCK_Exception *e)
  1419. {
  1420. EXCLOG(e, nullptr);
  1421. e->Release();
  1422. break;
  1423. }
  1424. catch (IException *e)
  1425. {
  1426. if (replyAttempt)
  1427. EXCLOG(e, "CKJService: failed to send reply");
  1428. else if (TAG_NULL == replyTag)
  1429. {
  1430. StringBuffer msg("CKJService: Exception without reply tag. Received from slave: ");
  1431. if (RANK_NULL==sender)
  1432. msg.append("<unknown>");
  1433. else
  1434. msg.append(sender-1);
  1435. EXCLOG(e, msg.str());
  1436. msg.clear();
  1437. }
  1438. else
  1439. {
  1440. msg.clear();
  1441. errorCode = kjse_exception;
  1442. msg.append(errorCode);
  1443. serializeException(e, msg);
  1444. }
  1445. e->Release();
  1446. }
  1447. if (!replyAttempt && msg.length())
  1448. {
  1449. if (!queryNodeComm().send(msg, sender, replyTag, LONGTIMEOUT))
  1450. {
  1451. OERRLOG("CKJService: Failed to send error response");
  1452. break;
  1453. }
  1454. }
  1455. }
  1456. }
  1457. // IKJService
  1458. virtual void setCurrentJob(CJobBase &job)
  1459. {
  1460. /* NB: For now the service contexts are tied to activities/helpers from a particular job,
  1461. * but since there's only 1 job running a time that is okay
  1462. * Once there's a dynamic implementation of the helper this won't be necessary
  1463. */
  1464. currentJob = &job;
  1465. maxCachedKJManagers = job.getOptUInt("keyedJoinMaxKJMs", defaultMaxCachedKJManagers);
  1466. maxCachedFetchContexts = job.getOptUInt("keyedJoinMaxFetchContexts", defaultMaxCachedFetchContexts);
  1467. unsigned newKeyLookupMaxProcessThreads = job.getOptUInt("keyedJoinMaxProcessors", defaultKeyLookupMaxProcessThreads);
  1468. if (newKeyLookupMaxProcessThreads != keyLookupMaxProcessThreads)
  1469. {
  1470. keyLookupMaxProcessThreads = newKeyLookupMaxProcessThreads;
  1471. setupProcessorPool();
  1472. }
  1473. }
  1474. virtual void reset() override
  1475. {
  1476. LOG(MCthorDetailedDebugInfo, thorJob, "KJService reset()");
  1477. processorPool->stopAll(true);
  1478. processorPool->joinAll(false);
  1479. clearAll();
  1480. LOG(MCthorDetailedDebugInfo, thorJob, "KJService reset() done");
  1481. }
  1482. virtual void start() override
  1483. {
  1484. aborted = false;
  1485. threaded.start();
  1486. }
  1487. virtual void stop() override
  1488. {
  1489. if (aborted)
  1490. return;
  1491. LOG(MCthorDetailedDebugInfo, thorJob, "KJService stop()");
  1492. queryNodeComm().cancel(RANK_ALL, keyLookupMpTag);
  1493. processorPool->stopAll(true);
  1494. processorPool->joinAll(true);
  1495. while (!threaded.join(60000, false))
  1496. PROGLOG("Receiver waiting on remote handlers to signal completion");
  1497. if (aborted)
  1498. return;
  1499. aborted = true;
  1500. clearAll();
  1501. }
  1502. // IExceptionHandler impl.
  1503. virtual bool fireException(IException *e) override
  1504. {
  1505. // exceptions should always be handled by processor
  1506. EXCLOG(e, nullptr);
  1507. e->Release();
  1508. return true;
  1509. }
  1510. };
  1511. class CJobListener : public CSimpleInterface
  1512. {
  1513. bool &stopped;
  1514. CriticalSection crit;
  1515. OwningStringSuperHashTableOf<CJobSlave> jobs;
  1516. #ifndef _CONTAINERIZED
  1517. CFifoFileCache querySoCache; // used to mirror master cache
  1518. #endif
  1519. IArrayOf<IMPServer> mpServers;
  1520. unsigned channelsPerSlave;
  1521. class CThreadExceptionCatcher : implements IExceptionHandler
  1522. {
  1523. CJobListener &jobListener;
  1524. public:
  1525. CThreadExceptionCatcher(CJobListener &_jobListener) : jobListener(_jobListener)
  1526. {
  1527. addThreadExceptionHandler(this);
  1528. }
  1529. ~CThreadExceptionCatcher()
  1530. {
  1531. removeThreadExceptionHandler(this);
  1532. }
  1533. virtual bool fireException(IException *e)
  1534. {
  1535. mptag_t mptag;
  1536. {
  1537. CriticalBlock b(jobListener.crit);
  1538. if (0 == jobListener.jobs.count())
  1539. {
  1540. EXCLOG(e, "No job active exception: ");
  1541. return true;
  1542. }
  1543. IThorException *te = QUERYINTERFACE(e, IThorException);
  1544. CJobSlave *job = NULL;
  1545. if (te && te->queryJobId())
  1546. job = jobListener.jobs.find(te->queryJobId());
  1547. if (!job)
  1548. {
  1549. // JCSMORE - exception fallen through to thread exception handler, from unknown job, fire up to 1st job for now.
  1550. job = (CJobSlave *)jobListener.jobs.next(NULL);
  1551. }
  1552. mptag = job->querySlaveMpTag();
  1553. }
  1554. CMessageBuffer msg;
  1555. msg.append(smt_errorMsg);
  1556. msg.append(0); // unknown really
  1557. serializeThorException(e, msg);
  1558. try
  1559. {
  1560. if (!queryNodeComm().sendRecv(msg, 0, mptag, LONGTIMEOUT))
  1561. EXCLOG(e, "Failed to send exception to master");
  1562. }
  1563. catch (IException *e2)
  1564. {
  1565. StringBuffer str("Error whilst sending exception '");
  1566. e->errorMessage(str);
  1567. str.append("' to master");
  1568. EXCLOG(e2, str.str());
  1569. e2->Release();
  1570. }
  1571. return true;
  1572. }
  1573. } excptHandler;
  1574. public:
  1575. CJobListener(bool &_stopped) : stopped(_stopped), excptHandler(*this)
  1576. {
  1577. stopped = true;
  1578. channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
  1579. unsigned localThorPortInc = globals->getPropInt("@localThorPortInc", DEFAULT_SLAVEPORTINC);
  1580. mpServers.append(* getMPServer());
  1581. bool reconnect = globals->getPropBool("@MPChannelReconnect");
  1582. for (unsigned sc=1; sc<channelsPerSlave; sc++)
  1583. {
  1584. unsigned port = getMachinePortBase() + (sc * localThorPortInc);
  1585. IMPServer *mpServer = startNewMPServer(port, true);
  1586. if (reconnect)
  1587. mpServer->setOpt(mpsopt_channelreopen, "true");
  1588. mpServers.append(*mpServer);
  1589. }
  1590. }
  1591. ~CJobListener()
  1592. {
  1593. try
  1594. {
  1595. for (unsigned sc=1; sc<channelsPerSlave; sc++)
  1596. mpServers.item(sc).stop();
  1597. mpServers.kill();
  1598. stop();
  1599. }
  1600. catch (IException *e)
  1601. {
  1602. e->Release();
  1603. }
  1604. // do we ignore other exceptions (...) here ?
  1605. // if so, we may have std::terminate call abort() ...
  1606. }
  1607. void stop()
  1608. {
  1609. queryNodeComm().cancel(0, masterSlaveMpTag);
  1610. }
  1611. void slaveMain(ILogMsgHandler *logHandler)
  1612. {
  1613. rank_t slaveProc = queryNodeGroup().rank()-1;
  1614. unsigned totSlaveProcs = queryNodeClusterWidth();
  1615. StringBuffer slaveStr;
  1616. for (unsigned c=0; c<channelsPerSlave; c++)
  1617. {
  1618. unsigned o = slaveProc + (c * totSlaveProcs);
  1619. if (c)
  1620. slaveStr.append(",");
  1621. slaveStr.append(o+1);
  1622. }
  1623. StringBuffer virtStr;
  1624. if (channelsPerSlave>1)
  1625. virtStr.append("virtual slaves:");
  1626. else
  1627. virtStr.append("slave:");
  1628. PROGLOG("Slave log %u contains %s %s", slaveProc+1, virtStr.str(), slaveStr.str());
  1629. traceMemUsage();
  1630. if (channelsPerSlave>1)
  1631. {
  1632. class CVerifyThread : public CInterface, implements IThreaded
  1633. {
  1634. CThreaded threaded;
  1635. CJobListener &jobListener;
  1636. unsigned channel;
  1637. public:
  1638. CVerifyThread(CJobListener &_jobListener, unsigned _channel)
  1639. : jobListener(_jobListener), channel(_channel), threaded("CVerifyThread", this)
  1640. {
  1641. start();
  1642. }
  1643. ~CVerifyThread() { join(); }
  1644. void start() { threaded.start(); }
  1645. void join() { threaded.join(); }
  1646. virtual void threadmain() override
  1647. {
  1648. Owned<ICommunicator> comm = jobListener.mpServers.item(channel).createCommunicator(&queryClusterGroup());
  1649. PROGLOG("verifying mp connection to rest of slaves (from channel=%d)", channel);
  1650. if (!comm->verifyAll())
  1651. OERRLOG("Failed to connect to rest of slaves");
  1652. else
  1653. PROGLOG("verified mp connection to rest of slaves");
  1654. }
  1655. };
  1656. CIArrayOf<CInterface> verifyThreads;
  1657. for (unsigned c=0; c<channelsPerSlave; c++)
  1658. verifyThreads.append(*new CVerifyThread(*this, c));
  1659. }
  1660. #ifndef _CONTAINERIZED
  1661. StringBuffer soPath;
  1662. globals->getProp("@query_so_dir", soPath);
  1663. StringBuffer soPattern("*.");
  1664. #ifdef _WIN32
  1665. soPattern.append("dll");
  1666. #else
  1667. soPattern.append("so");
  1668. #endif
  1669. if (getExpertOptBool("dllsToSlaves", true))
  1670. querySoCache.init(soPath.str(), DEFAULT_QUERYSO_LIMIT, soPattern);
  1671. #endif
  1672. Owned<ISlaveWatchdog> watchdog;
  1673. if (globals->getPropBool("@watchdogEnabled"))
  1674. watchdog.setown(createProgressHandler(globals->getPropBool("@useUDPWatchdog")));
  1675. CMessageBuffer msg;
  1676. stopped = false;
  1677. bool doReply;
  1678. while (!stopped && queryNodeComm().recv(msg, 0, masterSlaveMpTag))
  1679. {
  1680. doReply = true;
  1681. try
  1682. {
  1683. msgids cmd;
  1684. readUnderlyingType<msgids>(msg, cmd);
  1685. switch (cmd)
  1686. {
  1687. case QueryInit:
  1688. {
  1689. MemoryBuffer mb;
  1690. decompressToBuffer(mb, msg);
  1691. msg.swapWith(mb);
  1692. mptag_t slaveMsgTag;
  1693. deserializeMPtag(msg, slaveMsgTag);
  1694. queryNodeComm().flush(slaveMsgTag);
  1695. StringAttr wuid, graphName;
  1696. StringBuffer soPath;
  1697. msg.read(wuid);
  1698. msg.read(graphName);
  1699. Owned<ILoadedDllEntry> querySo;
  1700. #ifdef _CONTAINERIZED
  1701. StringAttr soName;
  1702. msg.read(soName);
  1703. querySo.setown(createDllEntry(soName.str(), false, NULL, false));
  1704. soPath.append(soName);
  1705. #else
  1706. StringBuffer soPathTail;
  1707. StringAttr remoteSoPath;
  1708. msg.read(remoteSoPath);
  1709. bool sendSo;
  1710. msg.read(sendSo);
  1711. RemoteFilename rfn;
  1712. SocketEndpoint masterEp = queryMyNode()->endpoint();
  1713. masterEp.port = 0;
  1714. rfn.setPath(masterEp, remoteSoPath);
  1715. rfn.getTail(soPathTail);
  1716. if (sendSo)
  1717. {
  1718. size32_t size;
  1719. msg.read(size);
  1720. globals->getProp("@query_so_dir", soPath);
  1721. if (soPath.length())
  1722. addPathSepChar(soPath);
  1723. soPath.append(soPathTail);
  1724. const byte *queryPtr = msg.readDirect(size);
  1725. Owned<IFile> iFile = createIFile(soPath.str());
  1726. try
  1727. {
  1728. iFile->setCreateFlags(S_IRWXU);
  1729. Owned<IFileIO> iFileIO = iFile->open(IFOwrite);
  1730. iFileIO->write(0, size, queryPtr);
  1731. }
  1732. catch (IException *e)
  1733. {
  1734. IException *e2 = ThorWrapException(e, "Failed to save dll: %s", soPath.str());
  1735. e->Release();
  1736. throw e2;
  1737. }
  1738. assertex(getExpertOptBool("dllsToSlaves", true));
  1739. querySoCache.add(soPath.str());
  1740. }
  1741. else
  1742. {
  1743. if (!rfn.isLocal())
  1744. {
  1745. StringBuffer _remoteSoPath;
  1746. rfn.getRemotePath(_remoteSoPath);
  1747. remoteSoPath.set(_remoteSoPath);
  1748. }
  1749. if (getExpertOptBool("dllsToSlaves", true))
  1750. {
  1751. globals->getProp("@query_so_dir", soPath);
  1752. if (soPath.length())
  1753. addPathSepChar(soPath);
  1754. soPath.append(soPathTail);
  1755. OwnedIFile iFile = createIFile(soPath.str());
  1756. if (!iFile->exists())
  1757. {
  1758. IWARNLOG("Slave cached query dll missing: %s, will attempt to fetch from master", soPath.str());
  1759. copyFile(soPath.str(), remoteSoPath);
  1760. }
  1761. querySoCache.add(soPath.str());
  1762. }
  1763. else
  1764. soPath.append(remoteSoPath);
  1765. }
  1766. #ifdef __linux__
  1767. // only relevant if dllsToSlaves=false and query_so_dir was fully qualified remote path (e.g. //<ip>/path/file
  1768. rfn.setRemotePath(soPath.str());
  1769. StringBuffer tempSo;
  1770. if (!rfn.isLocal())
  1771. {
  1772. IWARNLOG("Cannot load shared object directly from remote path, creating temporary local copy: %s", soPath.str());
  1773. GetTempFilePath(tempSo,"so");
  1774. copyFile(tempSo.str(), soPath.str());
  1775. soPath.clear().append(tempSo.str());
  1776. }
  1777. #endif
  1778. querySo.setown(createDllEntry(soPath.str(), false, NULL, false));
  1779. #endif
  1780. Owned<IPropertyTree> workUnitInfo = createPTree(msg);
  1781. StringBuffer user;
  1782. workUnitInfo->getProp("user", user);
  1783. unsigned maxLogDetail = workUnitInfo->getPropInt("Debug/maxlogdetail", DefaultDetail);
  1784. ILogMsgFilter *existingLogHandler = queryLogMsgManager()->queryMonitorFilter(logHandler);
  1785. dbgassertex(existingLogHandler);
  1786. verifyex(queryLogMsgManager()->changeMonitorFilterOwn(logHandler, getCategoryLogMsgFilter(existingLogHandler->queryAudienceMask(), existingLogHandler->queryClassMask(), maxLogDetail)));
  1787. LogMsgJobId thorJobId = queryLogMsgManager()->addJobId(wuid);
  1788. thorJob.setJobID(thorJobId);
  1789. setDefaultJobId(thorJobId);
  1790. PROGLOG("Started wuid=%s, user=%s, graph=%s [log detail level=%u]\n", wuid.get(), user.str(), graphName.get(), maxLogDetail);
  1791. PROGLOG("Using query: %s", soPath.str());
  1792. if (!getExpertOptBool("slaveDaliClient") && workUnitInfo->getPropBool("Debug/slavedaliclient", false))
  1793. {
  1794. PROGLOG("Workunit option 'slaveDaliClient' enabled");
  1795. enableThorSlaveAsDaliClient();
  1796. }
  1797. Owned<IPropertyTree> deps = createPTree(msg);
  1798. Owned<CJobSlave> job = new CJobSlave(watchdog, workUnitInfo, graphName, querySo, slaveMsgTag);
  1799. job->setXGMML(deps);
  1800. for (unsigned sc=0; sc<channelsPerSlave; sc++)
  1801. {
  1802. CJobChannel *channel = job->addChannel(&mpServers.item(sc));
  1803. channel->reservePortKind(TPORT_mp);
  1804. }
  1805. jobs.replace(*job.getLink());
  1806. job->startJob();
  1807. msg.clear();
  1808. msg.append(false);
  1809. break;
  1810. }
  1811. case QueryDone:
  1812. {
  1813. StringAttr key;
  1814. msg.read(key);
  1815. CJobSlave *job = jobs.find(key.get());
  1816. if (!job)
  1817. throw makeStringException(0, "QueryDone: job not found"); // can happen if job failed during initialization on some slaves
  1818. StringAttr wuid = job->queryWuid();
  1819. StringAttr graphName = job->queryGraphName();
  1820. PROGLOG("Finished wuid=%s, graph=%s", wuid.get(), graphName.get());
  1821. if (!getExpertOptBool("slaveDaliClient") && job->getWorkUnitValueBool("slaveDaliClient", false))
  1822. disableThorSlaveAsDaliClient();
  1823. PROGLOG("QueryDone, removing %s from jobs", key.get());
  1824. Owned<IException> exception;
  1825. try
  1826. {
  1827. job->endJob();
  1828. }
  1829. catch (IException *e)
  1830. {
  1831. exception.setown(e);
  1832. }
  1833. jobs.removeExact(job);
  1834. PROGLOG("QueryDone, removed %s from jobs", key.get());
  1835. // reset for next job
  1836. setProcessAborted(false);
  1837. if (exception)
  1838. throw exception.getClear(); // NB: this will cause exception to be part of the reply to master
  1839. msg.clear();
  1840. msg.append(false);
  1841. break;
  1842. }
  1843. case GraphInit:
  1844. {
  1845. StringAttr jobKey;
  1846. msg.read(jobKey);
  1847. CJobSlave *job = jobs.find(jobKey.get());
  1848. if (!job)
  1849. throw MakeStringException(0, "Job not found: %s", jobKey.get());
  1850. mptag_t executeReplyTag = job->deserializeMPTag(msg);
  1851. size32_t len;
  1852. msg.read(len);
  1853. MemoryBuffer createInitData;
  1854. createInitData.append(len, msg.readDirect(len));
  1855. graph_id subGraphId;
  1856. msg.read(subGraphId);
  1857. unsigned graphInitDataPos = msg.getPos();
  1858. VStringBuffer xpath("node[@id='%" GIDPF "u']", subGraphId);
  1859. Owned<IPropertyTree> graphNode = job->queryGraphXGMML()->getPropTree(xpath.str());
  1860. job->addSubGraph(*graphNode);
  1861. /* JCSMORE - should improve, create 1st graph with create context/init data and clone
  1862. * Should perhaps do this initialization in parallel..
  1863. */
  1864. for (unsigned c=0; c<job->queryJobChannels(); c++)
  1865. {
  1866. PROGLOG("GraphInit: %s, graphId=%" GIDPF "d, slaveChannel=%d", jobKey.get(), subGraphId, c);
  1867. CJobChannel &jobChannel = job->queryJobChannel(c);
  1868. Owned<CSlaveGraph> subGraph = (CSlaveGraph *)jobChannel.getGraph(subGraphId);
  1869. subGraph->setExecuteReplyTag(executeReplyTag);
  1870. createInitData.reset(0);
  1871. subGraph->deserializeCreateContexts(createInitData);
  1872. msg.reset(graphInitDataPos);
  1873. subGraph->init(msg);
  1874. jobChannel.addDependencies(job->queryXGMML(), false);
  1875. }
  1876. for (unsigned c=0; c<job->queryJobChannels(); c++)
  1877. {
  1878. CJobChannel &jobChannel = job->queryJobChannel(c);
  1879. Owned<CSlaveGraph> subGraph = (CSlaveGraph *)jobChannel.getGraph(subGraphId);
  1880. jobChannel.startGraph(*subGraph, true, 0, NULL);
  1881. }
  1882. msg.clear();
  1883. msg.append(false);
  1884. break;
  1885. }
  1886. case GraphEnd:
  1887. {
  1888. StringAttr jobKey;
  1889. msg.read(jobKey);
  1890. CJobSlave *job = jobs.find(jobKey.get());
  1891. if (job)
  1892. {
  1893. graph_id gid;
  1894. msg.read(gid);
  1895. msg.clear();
  1896. msg.append(false);
  1897. for (unsigned c=0; c<job->queryJobChannels(); c++)
  1898. {
  1899. CJobChannel &jobChannel = job->queryJobChannel(c);
  1900. Owned<CSlaveGraph> graph = (CSlaveGraph *)jobChannel.getGraph(gid);
  1901. if (graph)
  1902. {
  1903. msg.append(jobChannel.queryMyRank()-1);
  1904. graph->getDone(msg);
  1905. }
  1906. else
  1907. {
  1908. msg.append((rank_t)0); // JCSMORE - not sure why this would ever happen
  1909. }
  1910. }
  1911. job->reportGraphEnd(gid);
  1912. }
  1913. else
  1914. {
  1915. msg.clear();
  1916. msg.append(false);
  1917. }
  1918. break;
  1919. }
  1920. case GraphAbort:
  1921. {
  1922. StringAttr jobKey;
  1923. msg.read(jobKey);
  1924. PROGLOG("GraphAbort: %s", jobKey.get());
  1925. CJobSlave *job = jobs.find(jobKey.get());
  1926. if (job)
  1927. {
  1928. bool dumpInfo;
  1929. msg.read(dumpInfo);
  1930. if (dumpInfo)
  1931. {
  1932. StringBuffer dumpInfoCmd;
  1933. checkAndDumpAbortInfo(job->getOpt("dumpInfoCmd", dumpInfoCmd));
  1934. }
  1935. graph_id gid;
  1936. msg.read(gid);
  1937. for (unsigned c=0; c<job->queryJobChannels(); c++)
  1938. {
  1939. CJobChannel &jobChannel = job->queryJobChannel(c);
  1940. Owned<CGraphBase> graph = jobChannel.getGraph(gid);
  1941. if (graph)
  1942. {
  1943. Owned<IThorException> e = MakeGraphException(graph, 0, "GraphAbort");
  1944. graph->abort(e);
  1945. }
  1946. }
  1947. }
  1948. msg.clear();
  1949. msg.append(false);
  1950. break;
  1951. }
  1952. case Shutdown:
  1953. {
  1954. stopped = true;
  1955. PROGLOG("Shutdown received");
  1956. if (watchdog)
  1957. watchdog->stop();
  1958. mptag_t sdreplyTag;
  1959. deserializeMPtag(msg, sdreplyTag);
  1960. msg.setReplyTag(sdreplyTag);
  1961. msg.clear();
  1962. msg.append(false);
  1963. break;
  1964. }
  1965. case GraphGetResult:
  1966. {
  1967. StringAttr jobKey;
  1968. msg.read(jobKey);
  1969. PROGLOG("GraphGetResult: %s", jobKey.get());
  1970. CJobSlave *job = jobs.find(jobKey.get());
  1971. if (job)
  1972. {
  1973. graph_id gid;
  1974. msg.read(gid);
  1975. activity_id ownerId;
  1976. msg.read(ownerId);
  1977. unsigned resultId;
  1978. msg.read(resultId);
  1979. mptag_t replyTag = job->deserializeMPTag(msg);
  1980. msg.setReplyTag(replyTag);
  1981. msg.clear();
  1982. doReply = false;
  1983. for (unsigned c=0; c<job->queryJobChannels(); c++)
  1984. {
  1985. CJobChannel &jobChannel = job->queryJobChannel(c);
  1986. Owned<IThorResult> result = jobChannel.getOwnedResult(gid, ownerId, resultId);
  1987. Owned<IRowStream> resultStream = result->getRowStream();
  1988. sendInChunks(jobChannel.queryJobComm(), 0, replyTag, resultStream, result->queryRowInterfaces());
  1989. }
  1990. }
  1991. break;
  1992. }
  1993. case DebugRequest:
  1994. {
  1995. StringAttr jobKey;
  1996. msg.read(jobKey);
  1997. CJobSlave *job = jobs.find(jobKey.get());
  1998. if (job)
  1999. {
  2000. mptag_t replyTag = job->deserializeMPTag(msg);
  2001. msg.setReplyTag(replyTag);
  2002. StringAttr rawText;
  2003. msg.read(rawText);
  2004. PROGLOG("DebugRequest: %s %s", jobKey.get(), rawText.get());
  2005. msg.clear();
  2006. job->debugRequest(msg, rawText);
  2007. }
  2008. else
  2009. PROGLOG("DebugRequest: %s - Job not found", jobKey.get());
  2010. break;
  2011. }
  2012. default:
  2013. throwUnexpected();
  2014. }
  2015. }
  2016. catch (IException *e)
  2017. {
  2018. EXCLOG(e, NULL);
  2019. if (doReply && TAG_NULL != msg.getReplyTag())
  2020. {
  2021. doReply = false;
  2022. msg.clear();
  2023. msg.append(true);
  2024. serializeThorException(e, msg);
  2025. queryNodeComm().reply(msg);
  2026. }
  2027. e->Release();
  2028. }
  2029. if (doReply && msg.getReplyTag()!=TAG_NULL)
  2030. queryNodeComm().reply(msg);
  2031. }
  2032. }
  2033. friend class CThreadExceptionCatcher;
  2034. };
  2035. //////////////////////////
  2036. class CStringAttr : public StringAttr, public CSimpleInterface
  2037. {
  2038. public:
  2039. CStringAttr(const char *str) : StringAttr(str) { }
  2040. const char *queryFindString() const { return get(); }
  2041. };
  2042. class CFileInProgressHandler : public CSimpleInterface, implements IFileInProgressHandler
  2043. {
  2044. CriticalSection crit;
  2045. StringSuperHashTableOf<CStringAttr> lookup;
  2046. QueueOf<CStringAttr, false> fipList;
  2047. OwnedIFileIO iFileIO;
  2048. static const char *formatV;
  2049. void write()
  2050. {
  2051. if (0 == fipList.ordinality())
  2052. iFileIO->setSize(0);
  2053. else
  2054. {
  2055. Owned<IFileIOStream> stream = createBufferedIOStream(iFileIO);
  2056. stream->write(3, formatV); // 3 byte format definition, incase of change later
  2057. ForEachItemIn(i, fipList)
  2058. {
  2059. writeStringToStream(*stream, fipList.item(i)->get());
  2060. writeCharToStream(*stream, '\n');
  2061. }
  2062. offset_t pos = stream->tell();
  2063. stream.clear();
  2064. iFileIO->setSize(pos);
  2065. }
  2066. }
  2067. void doDelete(const char *fip)
  2068. {
  2069. OwnedIFile iFile = createIFile(fip);
  2070. try
  2071. {
  2072. iFile->remove();
  2073. }
  2074. catch (IException *e)
  2075. {
  2076. StringBuffer errStr("FileInProgressHandler, failed to remove: ");
  2077. EXCLOG(e, errStr.append(fip).str());
  2078. e->Release();
  2079. }
  2080. }
  2081. void backup(const char *dir, IFile *iFile)
  2082. {
  2083. StringBuffer origName(iFile->queryFilename());
  2084. StringBuffer bakName("fiplist_");
  2085. CDateTime dt;
  2086. dt.setNow();
  2087. bakName.append((unsigned)dt.getSimple()).append("_").append((unsigned)GetCurrentProcessId()).append(".bak");
  2088. iFileIO.clear(); // close old for rename
  2089. iFile->rename(bakName.str());
  2090. DBGLOG("Renamed to %s", bakName.str());
  2091. OwnedIFile newIFile = createIFile(origName);
  2092. iFileIO.setown(newIFile->open(IFOreadwrite)); // reopen
  2093. }
  2094. public:
  2095. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  2096. CFileInProgressHandler()
  2097. {
  2098. init();
  2099. }
  2100. ~CFileInProgressHandler()
  2101. {
  2102. deinit();
  2103. }
  2104. void deinit()
  2105. {
  2106. for (;;)
  2107. {
  2108. CStringAttr *item = fipList.dequeue();
  2109. if (!item) break;
  2110. doDelete(item->get());
  2111. item->Release();
  2112. }
  2113. lookup.kill();
  2114. }
  2115. void init()
  2116. {
  2117. StringBuffer dir;
  2118. globals->getProp("@thorPath", dir);
  2119. StringBuffer path(dir);
  2120. addPathSepChar(path);
  2121. path.append("fiplist_");
  2122. globals->getProp("@name", path);
  2123. path.append("_");
  2124. path.append(queryNodeGroup().rank(queryMyNode()));
  2125. path.append(".lst");
  2126. ensureDirectoryForFile(path.str());
  2127. Owned<IFile> iFile = createIFile(path.str());
  2128. iFileIO.setown(iFile->open(IFOreadwrite));
  2129. if (!iFileIO)
  2130. {
  2131. PROGLOG("Failed to open/create backup file: %s", path.str());
  2132. return;
  2133. }
  2134. MemoryBuffer mb;
  2135. size32_t sz = read(iFileIO, 0, (size32_t)iFileIO->size(), mb);
  2136. const char *mem = mb.toByteArray();
  2137. if (mem)
  2138. {
  2139. if (sz<=3)
  2140. {
  2141. IWARNLOG("Corrupt files-in-progress file detected: %s", path.str());
  2142. backup(dir, iFile);
  2143. }
  2144. else
  2145. {
  2146. const char *endMem = mem+mb.length();
  2147. mem += 3; // formatV header
  2148. do
  2149. {
  2150. const char *eol = strchr(mem, '\n');
  2151. if (!eol)
  2152. {
  2153. IWARNLOG("Corrupt files-in-progress file detected: %s", path.str());
  2154. backup(dir, iFile);
  2155. break;
  2156. }
  2157. StringAttr fip(mem, eol-mem);
  2158. doDelete(fip);
  2159. mem = eol+1;
  2160. }
  2161. while (mem != endMem);
  2162. }
  2163. }
  2164. write();
  2165. }
  2166. // IFileInProgressHandler
  2167. virtual void add(const char *fip)
  2168. {
  2169. CriticalBlock b(crit);
  2170. CStringAttr *item = lookup.find(fip);
  2171. assertex(!item);
  2172. item = new CStringAttr(fip);
  2173. fipList.enqueue(item);
  2174. lookup.add(* item);
  2175. write();
  2176. }
  2177. virtual void remove(const char *fip)
  2178. {
  2179. CriticalBlock b(crit);
  2180. CStringAttr *item = lookup.find(fip);
  2181. if (item)
  2182. {
  2183. lookup.removeExact(item);
  2184. fipList.dequeue(item);
  2185. item->Release();
  2186. write();
  2187. }
  2188. }
  2189. };
  2190. const char *CFileInProgressHandler::formatV = "01\n";
  2191. class CThorResourceSlave : public CThorResourceBase
  2192. {
  2193. Owned<IThorFileCache> fileCache;
  2194. Owned<IBackup> backupHandler;
  2195. Owned<IFileInProgressHandler> fipHandler;
  2196. Owned<IKJService> kjService;
  2197. public:
  2198. CThorResourceSlave()
  2199. {
  2200. backupHandler.setown(createBackupHandler());
  2201. fileCache.setown(createFileCache(globals->getPropInt("@fileCacheLimit", 1800)));
  2202. fipHandler.setown(new CFileInProgressHandler());
  2203. kjService.setown(new CKJService(kjServiceMpTag));
  2204. kjService->start();
  2205. }
  2206. ~CThorResourceSlave()
  2207. {
  2208. kjService.clear();
  2209. fileCache.clear();
  2210. backupHandler.clear();
  2211. fipHandler.clear();
  2212. }
  2213. virtual void beforeDispose() override
  2214. {
  2215. kjService->stop();
  2216. }
  2217. // IThorResource
  2218. virtual IThorFileCache &queryFileCache() override { return *fileCache.get(); }
  2219. virtual IBackup &queryBackup() override { return *backupHandler.get(); }
  2220. virtual IFileInProgressHandler &queryFileInProgressHandler() override { return *fipHandler.get(); }
  2221. virtual IKJService &queryKeyedJoinService() override { return *kjService.get(); }
  2222. };
  2223. void slaveMain(bool &jobListenerStopped, ILogMsgHandler *logHandler)
  2224. {
  2225. unsigned masterMemMB = globals->getPropInt("@masterTotalMem");
  2226. HardwareInfo hdwInfo;
  2227. getHardwareInfo(hdwInfo);
  2228. if (hdwInfo.totalMemory < masterMemMB)
  2229. OWARNLOG("Slave has less memory than master node");
  2230. CThorResourceSlave slaveResource;
  2231. CJobListener jobListener(jobListenerStopped);
  2232. setIThorResource(slaveResource);
  2233. #ifdef __linux__
  2234. bool useMirrorMount = getExpertOptBool("useMirrorMount", false);
  2235. if (useMirrorMount && queryNodeGroup().ordinality() > 2)
  2236. {
  2237. unsigned slaves = queryNodeGroup().ordinality()-1;
  2238. rank_t next = queryNodeGroup().rank()%slaves; // note 0 = master
  2239. const IpAddress &ip = queryNodeGroup().queryNode(next+1).endpoint();
  2240. StringBuffer ipStr;
  2241. ip.getIpText(ipStr);
  2242. PROGLOG("Redirecting local mount to %s", ipStr.str());
  2243. const char *replicateDirectory = queryBaseDirectory(grp_unknown, 1); // default directories configured at start up (see thslavemain.cpp)
  2244. setLocalMountRedirect(ip, replicateDirectory, "/mnt/mirror");
  2245. }
  2246. #endif
  2247. jobListener.slaveMain(logHandler);
  2248. }
  2249. void abortSlave()
  2250. {
  2251. if (clusterInitialized())
  2252. queryNodeComm().cancel(0, masterSlaveMpTag);
  2253. }