eclhelper_dyn.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jptree.hpp"
  15. #include "eclrtl.hpp"
  16. #include "eclhelper.hpp"
  17. #include "rtlds_imp.hpp"
  18. #include "eclhelper_base.hpp"
  19. #include "eclhelper_dyn.hpp"
  20. #include "rtlfield.hpp"
  21. #include "rtlrecord.hpp"
  22. #include "rtldynfield.hpp"
  23. #include "rtlkey.hpp"
  24. //---------------------------------------------------------------------------
  25. static void readString(StringBuffer &out, const char * &in)
  26. {
  27. for (;;)
  28. {
  29. char c = *in++;
  30. if (!c)
  31. throw MakeStringException(0, "Invalid filter - missing closing '");
  32. if (c=='\'')
  33. break;
  34. if (c=='\\')
  35. UNIMPLEMENTED;
  36. out.append(c);
  37. }
  38. }
  39. class ECLRTL_API CDynamicDiskReadArg : public CThorDiskReadArg
  40. {
  41. public:
  42. CDynamicDiskReadArg(const char *_fileName, IOutputMetaData *_in, IOutputMetaData *_out, unsigned __int64 _chooseN, unsigned __int64 _skipN, unsigned __int64 _rowLimit)
  43. : fileName(_fileName), in(_in), out(_out), chooseN(_chooseN), skipN(_skipN), rowLimit(_rowLimit)
  44. {
  45. inrec = &in->queryRecordAccessor(true);
  46. numOffsets = inrec->getNumVarFields() + 1;
  47. translator.setown(createRecordTranslator(queryOutputMeta()->queryRecordAccessor(true), *inrec));
  48. }
  49. virtual bool needTransform() override
  50. {
  51. return true;
  52. //return translator->needsTranslate(); might be more appropriate?
  53. }
  54. virtual unsigned getFlags() override
  55. {
  56. return flags;
  57. }
  58. virtual void createSegmentMonitors(IIndexReadContext *irc) override
  59. {
  60. ForEachItemIn(idx, filters)
  61. {
  62. IStringSet &filter = filters.item(idx);
  63. irc->append(createKeySegmentMonitor(false, LINK(&filter), filterOffsets.item(idx), filter.getSize()));
  64. }
  65. }
  66. virtual IOutputMetaData * queryOutputMeta() override
  67. {
  68. return out;
  69. }
  70. virtual const char * getFileName() override final
  71. {
  72. return fileName;
  73. }
  74. virtual IOutputMetaData * queryDiskRecordSize() override final
  75. {
  76. return in;
  77. }
  78. virtual unsigned getFormatCrc() override
  79. {
  80. return 0; // engines should treat 0 as 'ignore'
  81. }
  82. virtual size32_t transform(ARowBuilder & rowBuilder, const void * src) override
  83. {
  84. return translator->translate(rowBuilder, (const byte *) src);
  85. }
  86. virtual unsigned __int64 getChooseNLimit() { return chooseN; }
  87. virtual unsigned __int64 getRowLimit() { return rowLimit; }
  88. void addFilter(const char *filter)
  89. {
  90. // Format of a filter is:
  91. // field[..n]: valuestring
  92. // value string format specifies ranges using a comma-separated list of ranges.
  93. // Each range is specified as paren lower, upper paren, where the paren is either ( or [ depending
  94. // on whether the specified bound is inclusive or exclusive.
  95. // If only one bound is specified then it is used for both upper and lower bound (only meaningful with [] )
  96. //
  97. // ( A means values > A - exclusive
  98. // [ means values >= A - inclusive
  99. // A ) means values < A - exclusive
  100. // A ] means values <= A - inclusive
  101. // For example:
  102. // [A] matches just A
  103. // (,A),(A,) matches all but A
  104. // (A] of [A) are both empty ranges
  105. // [A,B) means A*
  106. // Values use the ECL syntax for constants. String constants are always utf8. Binary use d'xx' format (hexpairs)
  107. // Note that binary serialization format is different
  108. assertex(filter);
  109. const char *epos = strchr(filter,'=');
  110. assertex(epos);
  111. StringBuffer fieldName(epos-filter, filter);
  112. unsigned fieldNum = inrec->getFieldNum(fieldName);
  113. assertex(fieldNum != (unsigned) -1);
  114. size_t * variableOffsets = (size_t *)alloca(numOffsets * sizeof(size_t));
  115. RtlRow offsetCalculator(*inrec, nullptr, numOffsets, variableOffsets);
  116. unsigned fieldOffset = offsetCalculator.getOffset(fieldNum);
  117. unsigned fieldSize = offsetCalculator.getSize(fieldNum);
  118. const RtlTypeInfo *fieldType = inrec->queryType(fieldNum);
  119. filter = epos+1;
  120. if (*filter=='~')
  121. {
  122. UNIMPLEMENTED; // use a regex?
  123. }
  124. else
  125. {
  126. MemoryBuffer lobuffer;
  127. MemoryBuffer hibuffer;
  128. Owned<IStringSet> filterSet = createStringSet(fieldSize);
  129. while (*filter)
  130. {
  131. char startRange = *filter++;
  132. if (startRange != '(' && startRange != '[')
  133. throw MakeStringException(0, "Invalid filter string: expected [ or ( at start of range");
  134. // Now we expect a constant - type depends on type of field. Assume string or int for now
  135. StringBuffer upperString, lowerString;
  136. if (*filter=='\'')
  137. {
  138. filter++;
  139. readString(lowerString, filter);
  140. }
  141. else
  142. UNIMPLEMENTED; // lowerInt = readInt(curFilter);
  143. if (*filter == ',')
  144. {
  145. filter++;
  146. if (*filter=='\'')
  147. {
  148. filter++;
  149. readString(upperString, filter);
  150. }
  151. else
  152. UNIMPLEMENTED; //upperInt = readInt(curFilter);
  153. }
  154. else
  155. upperString.set(lowerString);
  156. char endRange = *filter++;
  157. if (endRange != ')' && endRange != ']')
  158. throw MakeStringException(0, "Invalid filter string: expected ] or ) at end of range");
  159. if (*filter==',')
  160. filter++;
  161. else if (*filter)
  162. throw MakeStringException(0, "Invalid filter string: expected , between ranges");
  163. MemoryBufferBuilder lobuilder(lobuffer.clear(), inrec->getMinRecordSize());
  164. fieldType->buildUtf8(lobuilder, 0, inrec->queryField(fieldNum), lowerString.length(), lowerString.str());
  165. MemoryBufferBuilder hibuilder(hibuffer.clear(), inrec->getMinRecordSize());
  166. fieldType->buildUtf8(hibuilder, 0, inrec->queryField(fieldNum), upperString.length(), upperString.str());
  167. filterSet->addRange(lobuffer.toByteArray(), hibuffer.toByteArray());
  168. if (startRange=='(')
  169. filterSet->killRange(lobuffer.toByteArray(), lobuffer.toByteArray());
  170. if (endRange==')')
  171. filterSet->killRange(hibuffer.toByteArray(), hibuffer.toByteArray());
  172. }
  173. filters.append(*filterSet.getClear());
  174. filterOffsets.append(fieldOffset);
  175. flags |= TDRkeyed;
  176. }
  177. }
  178. private:
  179. StringAttr fileName;
  180. unsigned numOffsets = 0;
  181. unsigned flags = 0;
  182. Owned<IRtlFieldTypeDeserializer> indeserializer; // Owns the resulting ITypeInfo structures, so needs to be kept around
  183. Owned<IRtlFieldTypeDeserializer> outdeserializer; // Owns the resulting ITypeInfo structures, so needs to be kept around
  184. Owned<IOutputMetaData> in;
  185. Owned<IOutputMetaData> out;
  186. IArrayOf<IStringSet> filters;
  187. UnsignedArray filterOffsets;
  188. const RtlRecord *inrec = nullptr;
  189. Owned<const IDynamicTransform> translator;
  190. unsigned __int64 chooseN = I64C(0x7fffffffffffffff); // constant(s) should be commoned up somewhere
  191. unsigned __int64 skipN = 0;
  192. unsigned __int64 rowLimit = (unsigned __int64) -1;
  193. };
  194. class ECLRTL_API CDynamicWorkUnitWriteArg : public CThorWorkUnitWriteArg
  195. {
  196. public:
  197. CDynamicWorkUnitWriteArg(IOutputMetaData *_in) : in(_in)
  198. {
  199. }
  200. virtual int getSequence() override final { return 0; }
  201. virtual IOutputMetaData * queryOutputMeta() override final { return in; }
  202. private:
  203. Owned<IOutputMetaData> in;
  204. };
  205. static IOutputMetaData *loadTypeInfo(IPropertyTree &xgmml, const char *key)
  206. {
  207. StringBuffer xpath;
  208. MemoryBuffer binInfo;
  209. xgmml.getPropBin(xpath.setf("att[@name='%s_binary']/value", key), binInfo);
  210. assertex(binInfo.length());
  211. return new CDeserializedOutputMetaData(binInfo);
  212. }
  213. extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(IPropertyTree &xgmml)
  214. {
  215. Owned <IOutputMetaData> in = loadTypeInfo(xgmml, "input");
  216. Owned <IOutputMetaData> out = loadTypeInfo(xgmml, "output");
  217. const char *fileName = xgmml.queryProp("att[@name=\"_fileName\"]/@value");
  218. unsigned __int64 chooseN = xgmml.getPropInt64("att[@name=\"chooseN\"]/@value", -1);
  219. unsigned __int64 skipN = xgmml.getPropInt64("att[@name=\"skipN\"]/@value", -1);
  220. unsigned __int64 rowLimit = xgmml.getPropInt64("att[@name=\"rowLimit\"]/@value", -1);
  221. Owned<CDynamicDiskReadArg> ret = new CDynamicDiskReadArg(fileName, in.getClear(), out.getClear(), chooseN, skipN, rowLimit);
  222. Owned<IPropertyTreeIterator> filters = xgmml.getElements("att[@name=\"keyfilter\"]");
  223. ForEach(*filters)
  224. ret->addFilter(filters->query().queryProp("@value"));
  225. return ret.getClear();
  226. }
  227. extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(const char *fileName, IOutputMetaData *in, IOutputMetaData *out, unsigned __int64 chooseN, unsigned __int64 skipN, unsigned __int64 rowLimit)
  228. {
  229. return new CDynamicDiskReadArg(fileName, in, out, chooseN, skipN, rowLimit);
  230. }
  231. extern ECLRTL_API IHThorArg *createWorkunitWriteArg(IPropertyTree &xgmml)
  232. {
  233. Owned <IOutputMetaData> in = loadTypeInfo(xgmml, "input");
  234. return new CDynamicWorkUnitWriteArg(in.getClear());
  235. }
  236. struct ECLRTL_API DynamicEclProcess : public EclProcess {
  237. virtual unsigned getActivityVersion() const override { return ACTIVITY_INTERFACE_VERSION; }
  238. virtual int perform(IGlobalCodeContext * gctx, unsigned wfid) override {
  239. ICodeContext * ctx;
  240. ctx = gctx->queryCodeContext();
  241. ctx->executeGraph("graph1",false,0,NULL);
  242. return 1U;
  243. }
  244. };
  245. extern ECLRTL_API IEclProcess* createDynamicEclProcess()
  246. {
  247. return new DynamicEclProcess;
  248. }