eclhelper_dyn.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jptree.hpp"
  15. #include "eclrtl.hpp"
  16. #include "eclhelper.hpp"
  17. #include "rtlds_imp.hpp"
  18. #include "eclhelper_base.hpp"
  19. #include "eclhelper_dyn.hpp"
  20. #include "rtlfield.hpp"
  21. #include "rtlrecord.hpp"
  22. #include "rtldynfield.hpp"
  23. #include "rtlkey.hpp"
  24. //---------------------------------------------------------------------------------------------------------------------
  25. /**
  26. * class CDeserializedOutputMetaData
  27. *
  28. * An implementation of IOutputMetaData for use with serialized rtlTypeInfo information
  29. *
  30. */
  31. class CDeserializedOutputMetaData : public COutputMetaData
  32. {
  33. public:
  34. CDeserializedOutputMetaData(MemoryBuffer &binInfo, bool isGrouped, IThorIndexCallback *callback);
  35. CDeserializedOutputMetaData(IPropertyTree &jsonInfo, bool isGrouped, IThorIndexCallback *callback);
  36. CDeserializedOutputMetaData(const char *json, bool isGrouped, IThorIndexCallback *callback);
  37. virtual const RtlTypeInfo * queryTypeInfo() const override { return typeInfo; }
  38. virtual unsigned getMetaFlags() override { return flags; }
  39. protected:
  40. Owned<IRtlFieldTypeDeserializer> deserializer;
  41. const RtlTypeInfo *typeInfo = nullptr;
  42. unsigned flags = MDFhasserialize|MDFhasxml;
  43. };
  44. CDeserializedOutputMetaData::CDeserializedOutputMetaData(MemoryBuffer &binInfo, bool isGrouped, IThorIndexCallback *callback)
  45. {
  46. deserializer.setown(createRtlFieldTypeDeserializer(callback));
  47. typeInfo = deserializer->deserialize(binInfo);
  48. if (isGrouped)
  49. flags |= MDFgrouped;
  50. }
  51. CDeserializedOutputMetaData::CDeserializedOutputMetaData(IPropertyTree &jsonInfo, bool isGrouped, IThorIndexCallback *callback)
  52. {
  53. deserializer.setown(createRtlFieldTypeDeserializer(callback));
  54. typeInfo = deserializer->deserialize(jsonInfo);
  55. if (isGrouped)
  56. flags |= MDFgrouped;
  57. }
  58. CDeserializedOutputMetaData::CDeserializedOutputMetaData(const char *json, bool isGrouped, IThorIndexCallback *callback)
  59. {
  60. deserializer.setown(createRtlFieldTypeDeserializer(callback));
  61. typeInfo = deserializer->deserialize(json);
  62. if (isGrouped)
  63. flags |= MDFgrouped;
  64. }
  65. extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(MemoryBuffer &binInfo, bool isGrouped, IThorIndexCallback *callback)
  66. {
  67. return new CDeserializedOutputMetaData(binInfo, isGrouped, callback);
  68. }
  69. extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(IPropertyTree &jsonInfo, bool isGrouped, IThorIndexCallback *callback)
  70. {
  71. return new CDeserializedOutputMetaData(jsonInfo, isGrouped, callback);
  72. }
  73. extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(const char *json, bool isGrouped, IThorIndexCallback *callback)
  74. {
  75. return new CDeserializedOutputMetaData(json, isGrouped, callback);
  76. }
  77. //---------------------------------------------------------------------------------------------------------------------
  78. static int compareOffsets(const unsigned *a, const unsigned *b)
  79. {
  80. if (*a < *b)
  81. return -1;
  82. else if (*a==*b)
  83. return 0;
  84. else
  85. return 1;
  86. }
  87. class ECLRTL_API CDynamicDiskReadArg : public CThorDiskReadArg
  88. {
  89. public:
  90. CDynamicDiskReadArg(const char *_fileName, IOutputMetaData *_in, IOutputMetaData *_out, unsigned __int64 _chooseN, unsigned __int64 _skipN, unsigned __int64 _rowLimit)
  91. : fileName(_fileName), in(_in), out(_out), chooseN(_chooseN), skipN(_skipN), rowLimit(_rowLimit)
  92. {
  93. translator.setown(createRecordTranslator(out->queryRecordAccessor(true), in->queryRecordAccessor(true)));
  94. }
  95. virtual bool needTransform() override
  96. {
  97. return true;
  98. }
  99. virtual unsigned getFlags() override
  100. {
  101. return flags;
  102. }
  103. virtual void createSegmentMonitors(IIndexReadContext *irc) override
  104. {
  105. filters.createSegmentMonitors(irc);
  106. }
  107. virtual IOutputMetaData * queryOutputMeta() override
  108. {
  109. return out;
  110. }
  111. virtual const char * getFileName() override final
  112. {
  113. return fileName;
  114. }
  115. virtual IOutputMetaData * queryDiskRecordSize() override final
  116. {
  117. return in;
  118. }
  119. virtual IOutputMetaData * queryProjectedDiskRecordSize() override final
  120. {
  121. return in;
  122. }
  123. virtual unsigned getFormatCrc() override
  124. {
  125. return 0; // engines should treat 0 as 'ignore'
  126. }
  127. virtual size32_t transform(ARowBuilder & rowBuilder, const void * src) override
  128. {
  129. return translator->translate(rowBuilder, (const byte *) src);
  130. }
  131. virtual unsigned __int64 getChooseNLimit() { return chooseN; }
  132. virtual unsigned __int64 getRowLimit() { return rowLimit; }
  133. void addFilter(const char *filter)
  134. {
  135. filters.addFilter(in->queryRecordAccessor(true), filter);
  136. flags |= TDRkeyed;
  137. }
  138. private:
  139. StringAttr fileName;
  140. unsigned flags = 0;
  141. Owned<IOutputMetaData> in;
  142. Owned<IOutputMetaData> out;
  143. Owned<const IDynamicTransform> translator;
  144. RowFilter filters;
  145. unsigned __int64 chooseN = I64C(0x7fffffffffffffff); // constant(s) should be commoned up somewhere
  146. unsigned __int64 skipN = 0;
  147. unsigned __int64 rowLimit = (unsigned __int64) -1;
  148. };
  149. class LegacyFilterSet
  150. {
  151. public:
  152. LegacyFilterSet(const RtlRecord &_inrec) : inrec(_inrec)
  153. {
  154. }
  155. void addFilter(const char *filter)
  156. {
  157. // Format of a filter is:
  158. // field[..n]: valuestring
  159. // value string format specifies ranges using a comma-separated list of ranges.
  160. // Each range is specified as paren lower, upper paren, where the paren is either ( or [ depending
  161. // on whether the specified bound is inclusive or exclusive.
  162. // If only one bound is specified then it is used for both upper and lower bound (only meaningful with [] )
  163. //
  164. // ( A means values > A - exclusive
  165. // [ means values >= A - inclusive
  166. // A ) means values < A - exclusive
  167. // A ] means values <= A - inclusive
  168. // For example:
  169. // [A] matches just A
  170. // (,A),(A,) matches all but A
  171. // (A] of [A) are both empty ranges
  172. // [A,B) means A*
  173. // Values use the ECL syntax for constants. String constants are always utf8. Binary use d'xx' format (hexpairs)
  174. // Note that binary serialization format is different
  175. assertex(filter);
  176. const char *epos = strpbrk(filter,"=~");
  177. if (!epos)
  178. throw MakeStringException(0, "Invalid filter string: expected = or ~ after fieldname");
  179. StringBuffer fieldName(epos-filter, filter);
  180. unsigned fieldNum;
  181. if (isdigit(fieldName[0]))
  182. fieldNum = atoi(fieldName);
  183. else
  184. fieldNum = inrec.getFieldNum(fieldName);
  185. if (fieldNum == (unsigned) -1)
  186. throw MakeStringException(0, "Invalid filter string: field '%s' not recognized", fieldName.str());
  187. unsigned numOffsets = inrec.getNumVarFields() + 1;
  188. unsigned fieldOffset = inrec.getFixedOffset(fieldNum);
  189. unsigned fieldSize = inrec.getFixedOffset(fieldNum+1) - fieldOffset;
  190. const RtlTypeInfo *fieldType = inrec.queryType(fieldNum);
  191. filter = epos+1;
  192. if (*filter=='~')
  193. {
  194. UNIMPLEMENTED; // use a regex?
  195. }
  196. else
  197. {
  198. MemoryBuffer lobuffer;
  199. MemoryBuffer hibuffer;
  200. Owned<IStringSet> filterSet = createStringSet(fieldSize);
  201. deserializeSet(*filterSet, inrec.getMinRecordSize(), fieldType, filter);
  202. while (filters.length()<=fieldNum)
  203. {
  204. filters.append(nullptr);
  205. unsigned dummyOffset = inrec.getFixedOffset(filters.length());
  206. filterOffsets.append(dummyOffset);
  207. filterSizes.append(inrec.getFixedOffset(filters.length()+1) - dummyOffset);
  208. }
  209. IStringSet *prev = filters.item(fieldNum);
  210. if (prev)
  211. filterSet.setown(prev->unionSet(filterSet)); // Debatable - would intersect be more appropriate?
  212. filters.replace(filterSet.getClear(), fieldNum);
  213. filterOffsets.replace(fieldOffset, fieldNum); // MORE - probably refactor this in a bit
  214. filterSizes.replace(fieldSize, fieldNum); // MORE - probably refactor this in a bit
  215. }
  216. }
  217. void createSegmentMonitors(IIndexReadContext *irc)
  218. {
  219. ForEachItemIn(idx, filters)
  220. {
  221. IStringSet *filter = filters.item(idx);
  222. if (filter)
  223. irc->append(createKeySegmentMonitor(false, LINK(filter), idx, filterOffsets.item(idx), filter->getSize()));
  224. }
  225. }
  226. void createSegmentMonitorsWithWild(IIndexReadContext *irc, unsigned keySize)
  227. {
  228. unsigned lastOffset = 0;
  229. ForEachItemIn(idx, filters)
  230. {
  231. IStringSet *filter = filters.item(idx);
  232. unsigned offset = filterOffsets.item(idx);
  233. unsigned size = filterSizes.item(idx);
  234. if (filter)
  235. {
  236. assertex(size == filter->getSize());
  237. irc->append(createKeySegmentMonitor(false, LINK(filter), idx, offset, size));
  238. }
  239. else
  240. irc->append(createWildKeySegmentMonitor(idx, offset, size)); // MORE - move this logic to irc::append ?
  241. }
  242. // MORE - trailing wild needs adding
  243. /*
  244. if (keySize > lastOffset)
  245. irc->append(createWildKeySegmentMonitor(lastOffset, keySize-lastOffset));
  246. */
  247. }
  248. protected:
  249. IPointerArrayOf<IStringSet> filters;
  250. UnsignedArray filterOffsets;
  251. UnsignedArray filterSizes;
  252. const RtlRecord &inrec;
  253. };
  254. class ECLRTL_API CDynamicIndexReadArg : public CThorIndexReadArg, implements IDynamicIndexReadArg
  255. {
  256. public:
  257. CDynamicIndexReadArg(const char *_fileName, IOutputMetaData *_in, IOutputMetaData *_out, unsigned __int64 _chooseN, unsigned __int64 _skipN, unsigned __int64 _rowLimit)
  258. : fileName(_fileName), in(_in), out(_out), chooseN(_chooseN), skipN(_skipN), rowLimit(_rowLimit), filters(in->queryRecordAccessor(true))
  259. {
  260. translator.setown(createRecordTranslator(out->queryRecordAccessor(true), in->queryRecordAccessor(true)));
  261. if (!translator->canTranslate())
  262. {
  263. translator->describe();
  264. throw makeStringException(0, "Translation not possible");
  265. }
  266. }
  267. virtual bool needTransform() override
  268. {
  269. return true;
  270. }
  271. virtual unsigned getFlags() override
  272. {
  273. return flags;
  274. }
  275. virtual void createSegmentMonitors(IIndexReadContext *irc) override
  276. {
  277. filters.createSegmentMonitorsWithWild(irc, 0); // Should be the total keyed size, but that's not available. And probably should not really be needed. Why should I create trailing wildsegs?
  278. }
  279. virtual IOutputMetaData * queryOutputMeta() override
  280. {
  281. return out;
  282. }
  283. virtual const char * getFileName() override final
  284. {
  285. return fileName;
  286. }
  287. virtual IOutputMetaData * queryDiskRecordSize() override final
  288. {
  289. return in;
  290. }
  291. virtual IOutputMetaData * queryProjectedDiskRecordSize() override final
  292. {
  293. return in;
  294. }
  295. virtual unsigned getFormatCrc() override
  296. {
  297. return 0; // engines should treat 0 as 'ignore'
  298. }
  299. virtual size32_t transform(ARowBuilder & rowBuilder, const void * src) override
  300. {
  301. return translator->translate(rowBuilder, (const byte *) src);
  302. }
  303. virtual unsigned __int64 getChooseNLimit() { return chooseN; }
  304. virtual unsigned __int64 getRowLimit() { return rowLimit; }
  305. virtual void addFilter(const char *filter) override
  306. {
  307. filters.addFilter(filter);
  308. }
  309. private:
  310. StringAttr fileName;
  311. unsigned flags = 0;
  312. Owned<IOutputMetaData> in;
  313. Owned<IOutputMetaData> out;
  314. Owned<const IDynamicTransform> translator;
  315. LegacyFilterSet filters;
  316. unsigned __int64 chooseN = I64C(0x7fffffffffffffff); // constant(s) should be commoned up somewhere
  317. unsigned __int64 skipN = 0;
  318. unsigned __int64 rowLimit = (unsigned __int64) -1;
  319. };
  320. class ECLRTL_API CDynamicWorkUnitWriteArg : public CThorWorkUnitWriteArg
  321. {
  322. public:
  323. CDynamicWorkUnitWriteArg(IOutputMetaData *_in) : in(_in)
  324. {
  325. }
  326. virtual int getSequence() override final { return 0; }
  327. virtual IOutputMetaData * queryOutputMeta() override final { return in; }
  328. private:
  329. Owned<IOutputMetaData> in;
  330. };
  331. static IOutputMetaData *loadTypeInfo(IPropertyTree &xgmml, const char *key)
  332. {
  333. StringBuffer xpath;
  334. MemoryBuffer binInfo;
  335. xgmml.getPropBin(xpath.setf("att[@name='%s_binary']/value", key), binInfo);
  336. assertex(binInfo.length());
  337. bool grouped = xgmml.getPropBool(xpath.setf("att[@name='%s_binary']/value", key), false);
  338. return new CDeserializedOutputMetaData(binInfo, grouped, nullptr);
  339. }
  340. extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(IPropertyTree &xgmml)
  341. {
  342. Owned <IOutputMetaData> in = loadTypeInfo(xgmml, "input");
  343. Owned <IOutputMetaData> out = loadTypeInfo(xgmml, "output");
  344. const char *fileName = xgmml.queryProp("att[@name=\"_fileName\"]/@value");
  345. unsigned __int64 chooseN = xgmml.getPropInt64("att[@name=\"chooseN\"]/@value", -1);
  346. unsigned __int64 skipN = xgmml.getPropInt64("att[@name=\"skipN\"]/@value", -1);
  347. unsigned __int64 rowLimit = xgmml.getPropInt64("att[@name=\"rowLimit\"]/@value", -1);
  348. Owned<CDynamicDiskReadArg> ret = new CDynamicDiskReadArg(fileName, in.getClear(), out.getClear(), chooseN, skipN, rowLimit);
  349. Owned<IPropertyTreeIterator> filters = xgmml.getElements("att[@name=\"keyfilter\"]");
  350. ForEach(*filters)
  351. ret->addFilter(filters->query().queryProp("@value"));
  352. return ret.getClear();
  353. }
  354. extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(const char *fileName, IOutputMetaData *in, IOutputMetaData *out, unsigned __int64 chooseN, unsigned __int64 skipN, unsigned __int64 rowLimit)
  355. {
  356. return new CDynamicDiskReadArg(fileName, in, out, chooseN, skipN, rowLimit);
  357. }
  358. extern ECLRTL_API IHThorIndexReadArg *createIndexReadArg(const char *fileName, IOutputMetaData *in, IOutputMetaData *out, unsigned __int64 chooseN, unsigned __int64 skipN, unsigned __int64 rowLimit)
  359. {
  360. return new CDynamicIndexReadArg(fileName, in, out, chooseN, skipN, rowLimit);
  361. }
  362. extern ECLRTL_API IHThorArg *createWorkunitWriteArg(IPropertyTree &xgmml)
  363. {
  364. Owned <IOutputMetaData> in = loadTypeInfo(xgmml, "input");
  365. return new CDynamicWorkUnitWriteArg(in.getClear());
  366. }
  367. struct ECLRTL_API DynamicEclProcess : public EclProcess {
  368. virtual unsigned getActivityVersion() const override { return ACTIVITY_INTERFACE_VERSION; }
  369. virtual int perform(IGlobalCodeContext * gctx, unsigned wfid) override {
  370. ICodeContext * ctx;
  371. ctx = gctx->queryCodeContext();
  372. ctx->executeGraph("graph1",false,0,NULL);
  373. return 1U;
  374. }
  375. };
  376. extern ECLRTL_API IEclProcess* createDynamicEclProcess()
  377. {
  378. return new DynamicEclProcess;
  379. }