eclhelper_dyn.cpp 15 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jptree.hpp"
  15. #include "eclrtl.hpp"
  16. #include "eclhelper.hpp"
  17. #include "rtlds_imp.hpp"
  18. #include "eclhelper_base.hpp"
  19. #include "eclhelper_dyn.hpp"
  20. #include "rtlfield.hpp"
  21. #include "rtlrecord.hpp"
  22. #include "rtldynfield.hpp"
  23. #include "rtlkey.hpp"
  24. //---------------------------------------------------------------------------------------------------------------------
  25. /**
  26. * class CDeserializedOutputMetaData
  27. *
  28. * An implementation of IOutputMetaData for use with serialized rtlTypeInfo information
  29. *
  30. */
  31. class CDeserializedOutputMetaData : public COutputMetaData
  32. {
  33. public:
  34. CDeserializedOutputMetaData(MemoryBuffer &binInfo, bool isGrouped, IThorIndexCallback *callback);
  35. CDeserializedOutputMetaData(IPropertyTree &jsonInfo, IThorIndexCallback *callback);
  36. CDeserializedOutputMetaData(const char *json, IThorIndexCallback *callback);
  37. virtual const RtlTypeInfo * queryTypeInfo() const override { return typeInfo; }
  38. virtual unsigned getMetaFlags() override { return flags; }
  39. protected:
  40. Owned<IRtlFieldTypeDeserializer> deserializer;
  41. const RtlTypeInfo *typeInfo = nullptr;
  42. unsigned flags = MDFhasserialize|MDFhasxml;
  43. };
  44. CDeserializedOutputMetaData::CDeserializedOutputMetaData(MemoryBuffer &binInfo, bool isGrouped, IThorIndexCallback *callback)
  45. {
  46. deserializer.setown(createRtlFieldTypeDeserializer(callback));
  47. typeInfo = deserializer->deserialize(binInfo);
  48. if (isGrouped)
  49. flags |= MDFgrouped;
  50. }
  51. CDeserializedOutputMetaData::CDeserializedOutputMetaData(IPropertyTree &jsonInfo, IThorIndexCallback *callback)
  52. {
  53. deserializer.setown(createRtlFieldTypeDeserializer(callback));
  54. typeInfo = deserializer->deserialize(jsonInfo);
  55. }
  56. CDeserializedOutputMetaData::CDeserializedOutputMetaData(const char *json, IThorIndexCallback *callback)
  57. {
  58. deserializer.setown(createRtlFieldTypeDeserializer(callback));
  59. typeInfo = deserializer->deserialize(json);
  60. }
  61. extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(MemoryBuffer &binInfo, bool isGrouped, IThorIndexCallback *callback)
  62. {
  63. return new CDeserializedOutputMetaData(binInfo, isGrouped, callback);
  64. }
  65. extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(IPropertyTree &jsonInfo, IThorIndexCallback *callback)
  66. {
  67. return new CDeserializedOutputMetaData(jsonInfo, callback);
  68. }
  69. extern ECLRTL_API IOutputMetaData *createTypeInfoOutputMetaData(const char *json, IThorIndexCallback *callback)
  70. {
  71. return new CDeserializedOutputMetaData(json, callback);
  72. }
  73. //---------------------------------------------------------------------------------------------------------------------
  74. static int compareOffsets(const unsigned *a, const unsigned *b)
  75. {
  76. if (*a < *b)
  77. return -1;
  78. else if (*a==*b)
  79. return 0;
  80. else
  81. return 1;
  82. }
  83. class FilterSet
  84. {
  85. public:
  86. FilterSet(const RtlRecord &_inrec) : inrec(_inrec)
  87. {
  88. }
  89. void addFilter(const char *filter)
  90. {
  91. // Format of a filter is:
  92. // field[..n]: valuestring
  93. // value string format specifies ranges using a comma-separated list of ranges.
  94. // Each range is specified as paren lower, upper paren, where the paren is either ( or [ depending
  95. // on whether the specified bound is inclusive or exclusive.
  96. // If only one bound is specified then it is used for both upper and lower bound (only meaningful with [] )
  97. //
  98. // ( A means values > A - exclusive
  99. // [ means values >= A - inclusive
  100. // A ) means values < A - exclusive
  101. // A ] means values <= A - inclusive
  102. // For example:
  103. // [A] matches just A
  104. // (,A),(A,) matches all but A
  105. // (A] of [A) are both empty ranges
  106. // [A,B) means A*
  107. // Values use the ECL syntax for constants. String constants are always utf8. Binary use d'xx' format (hexpairs)
  108. // Note that binary serialization format is different
  109. assertex(filter);
  110. const char *epos = strpbrk(filter,"=~");
  111. if (!epos)
  112. throw MakeStringException(0, "Invalid filter string: expected = or ~ after fieldname");
  113. StringBuffer fieldName(epos-filter, filter);
  114. unsigned fieldNum = inrec.getFieldNum(fieldName);
  115. if (fieldNum == (unsigned) -1)
  116. throw MakeStringException(0, "Invalid filter string: field '%s' not recognized", fieldName.str());
  117. unsigned numOffsets = inrec.getNumVarFields() + 1;
  118. size_t * variableOffsets = (size_t *)alloca(numOffsets * sizeof(size_t));
  119. RtlRow offsetCalculator(inrec, nullptr, numOffsets, variableOffsets);
  120. unsigned fieldOffset = offsetCalculator.getOffset(fieldNum);
  121. unsigned fieldSize = offsetCalculator.getSize(fieldNum);
  122. const RtlTypeInfo *fieldType = inrec.queryType(fieldNum);
  123. filter = epos+1;
  124. if (*filter=='~')
  125. {
  126. UNIMPLEMENTED; // use a regex?
  127. }
  128. else
  129. {
  130. MemoryBuffer lobuffer;
  131. MemoryBuffer hibuffer;
  132. Owned<IStringSet> filterSet = createStringSet(fieldSize);
  133. deserializeSet(*filterSet, inrec.getMinRecordSize(), fieldType, filter);
  134. while (filters.length()<=fieldNum)
  135. {
  136. filters.append(nullptr);
  137. filterOffsets.append(offsetCalculator.getOffset(filters.length()));
  138. filterSizes.append(offsetCalculator.getSize(filters.length()));
  139. }
  140. IStringSet *prev = filters.item(fieldNum);
  141. if (prev)
  142. filterSet.setown(prev->unionSet(filterSet)); // Debatable - would intersect be more appropriate?
  143. filters.replace(filterSet.getClear(), fieldNum);
  144. filterOffsets.replace(fieldOffset, fieldNum); // MORE - probably refactor this in a bit
  145. filterSizes.replace(fieldSize, fieldNum); // MORE - probably refactor this in a bit
  146. }
  147. }
  148. void createSegmentMonitors(IIndexReadContext *irc)
  149. {
  150. ForEachItemIn(idx, filters)
  151. {
  152. IStringSet *filter = filters.item(idx);
  153. if (filter)
  154. irc->append(createKeySegmentMonitor(false, LINK(filter), idx, filterOffsets.item(idx), filter->getSize()));
  155. }
  156. }
  157. void createSegmentMonitorsWithWild(IIndexReadContext *irc, unsigned keySize)
  158. {
  159. unsigned lastOffset = 0;
  160. ForEachItemIn(idx, filters)
  161. {
  162. IStringSet *filter = filters.item(idx);
  163. unsigned offset = filterOffsets.item(idx);
  164. unsigned size = filterSizes.item(idx);
  165. if (filter)
  166. {
  167. assertex(size = filter->getSize());
  168. irc->append(createKeySegmentMonitor(false, LINK(filter), idx, offset, size));
  169. }
  170. else
  171. irc->append(createWildKeySegmentMonitor(idx, offset, size)); // MORE - move this logic to irc::append ?
  172. }
  173. // MORE - trailing wild needs adding
  174. /*
  175. if (keySize > lastOffset)
  176. irc->append(createWildKeySegmentMonitor(lastOffset, keySize-lastOffset));
  177. */
  178. }
  179. protected:
  180. IPointerArrayOf<IStringSet> filters;
  181. UnsignedArray filterOffsets;
  182. UnsignedArray filterSizes;
  183. const RtlRecord &inrec;
  184. };
  185. class ECLRTL_API CDynamicDiskReadArg : public CThorDiskReadArg
  186. {
  187. public:
  188. CDynamicDiskReadArg(const char *_fileName, IOutputMetaData *_in, IOutputMetaData *_out, unsigned __int64 _chooseN, unsigned __int64 _skipN, unsigned __int64 _rowLimit)
  189. : fileName(_fileName), in(_in), out(_out), chooseN(_chooseN), skipN(_skipN), rowLimit(_rowLimit), filters(in->queryRecordAccessor(true))
  190. {
  191. translator.setown(createRecordTranslator(out->queryRecordAccessor(true), in->queryRecordAccessor(true)));
  192. }
  193. virtual bool needTransform() override
  194. {
  195. return true;
  196. }
  197. virtual unsigned getFlags() override
  198. {
  199. return flags;
  200. }
  201. virtual void createSegmentMonitors(IIndexReadContext *irc) override
  202. {
  203. filters.createSegmentMonitors(irc);
  204. }
  205. virtual IOutputMetaData * queryOutputMeta() override
  206. {
  207. return out;
  208. }
  209. virtual const char * getFileName() override final
  210. {
  211. return fileName;
  212. }
  213. virtual IOutputMetaData * queryDiskRecordSize() override final
  214. {
  215. return in;
  216. }
  217. virtual IOutputMetaData * queryProjectedDiskRecordSize() override final
  218. {
  219. return in;
  220. }
  221. virtual unsigned getFormatCrc() override
  222. {
  223. return 0; // engines should treat 0 as 'ignore'
  224. }
  225. virtual size32_t transform(ARowBuilder & rowBuilder, const void * src) override
  226. {
  227. return translator->translate(rowBuilder, (const byte *) src);
  228. }
  229. virtual unsigned __int64 getChooseNLimit() { return chooseN; }
  230. virtual unsigned __int64 getRowLimit() { return rowLimit; }
  231. void addFilter(const char *filter)
  232. {
  233. filters.addFilter(filter);
  234. flags |= TDRkeyed;
  235. }
  236. private:
  237. StringAttr fileName;
  238. unsigned flags = 0;
  239. Owned<IOutputMetaData> in;
  240. Owned<IOutputMetaData> out;
  241. Owned<const IDynamicTransform> translator;
  242. FilterSet filters;
  243. unsigned __int64 chooseN = I64C(0x7fffffffffffffff); // constant(s) should be commoned up somewhere
  244. unsigned __int64 skipN = 0;
  245. unsigned __int64 rowLimit = (unsigned __int64) -1;
  246. };
  247. class ECLRTL_API CDynamicIndexReadArg : public CThorIndexReadArg, implements IDynamicIndexReadArg
  248. {
  249. public:
  250. CDynamicIndexReadArg(const char *_fileName, IOutputMetaData *_in, IOutputMetaData *_out, unsigned __int64 _chooseN, unsigned __int64 _skipN, unsigned __int64 _rowLimit)
  251. : fileName(_fileName), in(_in), out(_out), chooseN(_chooseN), skipN(_skipN), rowLimit(_rowLimit), filters(in->queryRecordAccessor(true))
  252. {
  253. translator.setown(createRecordTranslator(out->queryRecordAccessor(true), in->queryRecordAccessor(true)));
  254. if (!translator->canTranslate())
  255. {
  256. translator->describe();
  257. throw makeStringException(0, "Translation not possible");
  258. }
  259. }
  260. virtual bool needTransform() override
  261. {
  262. return true;
  263. }
  264. virtual unsigned getFlags() override
  265. {
  266. return flags;
  267. }
  268. virtual void createSegmentMonitors(IIndexReadContext *irc) override
  269. {
  270. filters.createSegmentMonitorsWithWild(irc, 0); // Should be the total keyed size, but that's not available. And probably should not really be needed. Why should I create trailing wildsegs?
  271. }
  272. virtual IOutputMetaData * queryOutputMeta() override
  273. {
  274. return out;
  275. }
  276. virtual const char * getFileName() override final
  277. {
  278. return fileName;
  279. }
  280. virtual IOutputMetaData * queryDiskRecordSize() override final
  281. {
  282. return in;
  283. }
  284. virtual IOutputMetaData * queryProjectedDiskRecordSize() override final
  285. {
  286. return in;
  287. }
  288. virtual unsigned getFormatCrc() override
  289. {
  290. return 0; // engines should treat 0 as 'ignore'
  291. }
  292. virtual size32_t transform(ARowBuilder & rowBuilder, const void * src) override
  293. {
  294. return translator->translate(rowBuilder, (const byte *) src);
  295. }
  296. virtual unsigned __int64 getChooseNLimit() { return chooseN; }
  297. virtual unsigned __int64 getRowLimit() { return rowLimit; }
  298. virtual void addFilter(const char *filter) override
  299. {
  300. filters.addFilter(filter);
  301. }
  302. private:
  303. StringAttr fileName;
  304. unsigned flags = 0;
  305. Owned<IOutputMetaData> in;
  306. Owned<IOutputMetaData> out;
  307. Owned<const IDynamicTransform> translator;
  308. FilterSet filters;
  309. unsigned __int64 chooseN = I64C(0x7fffffffffffffff); // constant(s) should be commoned up somewhere
  310. unsigned __int64 skipN = 0;
  311. unsigned __int64 rowLimit = (unsigned __int64) -1;
  312. };
  313. class ECLRTL_API CDynamicWorkUnitWriteArg : public CThorWorkUnitWriteArg
  314. {
  315. public:
  316. CDynamicWorkUnitWriteArg(IOutputMetaData *_in) : in(_in)
  317. {
  318. }
  319. virtual int getSequence() override final { return 0; }
  320. virtual IOutputMetaData * queryOutputMeta() override final { return in; }
  321. private:
  322. Owned<IOutputMetaData> in;
  323. };
  324. static IOutputMetaData *loadTypeInfo(IPropertyTree &xgmml, const char *key)
  325. {
  326. StringBuffer xpath;
  327. MemoryBuffer binInfo;
  328. xgmml.getPropBin(xpath.setf("att[@name='%s_binary']/value", key), binInfo);
  329. assertex(binInfo.length());
  330. bool grouped = xgmml.getPropBool(xpath.setf("att[@name='%s_binary']/value", key), false);
  331. return new CDeserializedOutputMetaData(binInfo, grouped, nullptr);
  332. }
  333. extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(IPropertyTree &xgmml)
  334. {
  335. Owned <IOutputMetaData> in = loadTypeInfo(xgmml, "input");
  336. Owned <IOutputMetaData> out = loadTypeInfo(xgmml, "output");
  337. const char *fileName = xgmml.queryProp("att[@name=\"_fileName\"]/@value");
  338. unsigned __int64 chooseN = xgmml.getPropInt64("att[@name=\"chooseN\"]/@value", -1);
  339. unsigned __int64 skipN = xgmml.getPropInt64("att[@name=\"skipN\"]/@value", -1);
  340. unsigned __int64 rowLimit = xgmml.getPropInt64("att[@name=\"rowLimit\"]/@value", -1);
  341. Owned<CDynamicDiskReadArg> ret = new CDynamicDiskReadArg(fileName, in.getClear(), out.getClear(), chooseN, skipN, rowLimit);
  342. Owned<IPropertyTreeIterator> filters = xgmml.getElements("att[@name=\"keyfilter\"]");
  343. ForEach(*filters)
  344. ret->addFilter(filters->query().queryProp("@value"));
  345. return ret.getClear();
  346. }
  347. extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(const char *fileName, IOutputMetaData *in, IOutputMetaData *out, unsigned __int64 chooseN, unsigned __int64 skipN, unsigned __int64 rowLimit)
  348. {
  349. return new CDynamicDiskReadArg(fileName, in, out, chooseN, skipN, rowLimit);
  350. }
  351. extern ECLRTL_API IHThorIndexReadArg *createIndexReadArg(const char *fileName, IOutputMetaData *in, IOutputMetaData *out, unsigned __int64 chooseN, unsigned __int64 skipN, unsigned __int64 rowLimit)
  352. {
  353. return new CDynamicIndexReadArg(fileName, in, out, chooseN, skipN, rowLimit);
  354. }
  355. extern ECLRTL_API IHThorArg *createWorkunitWriteArg(IPropertyTree &xgmml)
  356. {
  357. Owned <IOutputMetaData> in = loadTypeInfo(xgmml, "input");
  358. return new CDynamicWorkUnitWriteArg(in.getClear());
  359. }
  360. struct ECLRTL_API DynamicEclProcess : public EclProcess {
  361. virtual unsigned getActivityVersion() const override { return ACTIVITY_INTERFACE_VERSION; }
  362. virtual int perform(IGlobalCodeContext * gctx, unsigned wfid) override {
  363. ICodeContext * ctx;
  364. ctx = gctx->queryCodeContext();
  365. ctx->executeGraph("graph1",false,0,NULL);
  366. return 1U;
  367. }
  368. };
  369. extern ECLRTL_API IEclProcess* createDynamicEclProcess()
  370. {
  371. return new DynamicEclProcess;
  372. }