eclhelper_dyn.cpp 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "jptree.hpp"
  15. #include "eclrtl.hpp"
  16. #include "eclhelper.hpp"
  17. #include "rtlds_imp.hpp"
  18. #include "eclhelper_base.hpp"
  19. #include "eclhelper_dyn.hpp"
  20. #include "rtlfield.hpp"
  21. #include "rtlrecord.hpp"
  22. #include "rtldynfield.hpp"
  23. #include "rtlkey.hpp"
  24. //---------------------------------------------------------------------------
  25. class ECLRTL_API CDynamicDiskReadArg : public CThorDiskReadArg
  26. {
  27. public:
  28. CDynamicDiskReadArg(const char *_fileName, IOutputMetaData *_in, IOutputMetaData *_out, unsigned __int64 _chooseN, unsigned __int64 _skipN, unsigned __int64 _rowLimit)
  29. : fileName(_fileName), in(_in), out(_out), chooseN(_chooseN), skipN(_skipN), rowLimit(_rowLimit)
  30. {
  31. inrec = &in->queryRecordAccessor(true);
  32. numOffsets = inrec->getNumVarFields() + 1;
  33. translator.setown(createRecordTranslator(queryOutputMeta()->queryRecordAccessor(true), *inrec));
  34. }
  35. virtual bool needTransform() override
  36. {
  37. return true;
  38. //return translator->needsTranslate(); might be more appropriate?
  39. }
  40. virtual unsigned getFlags() override
  41. {
  42. return flags;
  43. }
  44. virtual void createSegmentMonitors(IIndexReadContext *irc) override
  45. {
  46. ForEachItemIn(idx, filters)
  47. {
  48. IStringSet &filter = filters.item(idx);
  49. irc->append(createKeySegmentMonitor(false, LINK(&filter), filterOffsets.item(idx), filter.getSize()));
  50. }
  51. }
  52. virtual IOutputMetaData * queryOutputMeta() override
  53. {
  54. return out;
  55. }
  56. virtual const char * getFileName() override final
  57. {
  58. return fileName;
  59. }
  60. virtual IOutputMetaData * queryDiskRecordSize() override final
  61. {
  62. return in;
  63. }
  64. virtual unsigned getFormatCrc() override
  65. {
  66. return 0; // engines should treat 0 as 'ignore'
  67. }
  68. virtual size32_t transform(ARowBuilder & rowBuilder, const void * src) override
  69. {
  70. return translator->translate(rowBuilder, (const byte *) src);
  71. }
  72. virtual unsigned __int64 getChooseNLimit() { return chooseN; }
  73. virtual unsigned __int64 getRowLimit() { return rowLimit; }
  74. void addFilter(const char *filter)
  75. {
  76. // Format of a filter is:
  77. // field[..n]: valuestring
  78. // value string format specifies ranges using a comma-separated list of ranges.
  79. // Each range is specified as paren lower, upper paren, where the paren is either ( or [ depending
  80. // on whether the specified bound is inclusive or exclusive.
  81. // If only one bound is specified then it is used for both upper and lower bound (only meaningful with [] )
  82. //
  83. // ( A means values > A - exclusive
  84. // [ means values >= A - inclusive
  85. // A ) means values < A - exclusive
  86. // A ] means values <= A - inclusive
  87. // For example:
  88. // [A] matches just A
  89. // (,A),(A,) matches all but A
  90. // (A] of [A) are both empty ranges
  91. // [A,B) means A*
  92. // Values use the ECL syntax for constants. String constants are always utf8. Binary use d'xx' format (hexpairs)
  93. // Note that binary serialization format is different
  94. assertex(filter);
  95. const char *epos = strchr(filter,'=');
  96. assertex(epos);
  97. StringBuffer fieldName(epos-filter, filter);
  98. unsigned fieldNum = inrec->getFieldNum(fieldName);
  99. assertex(fieldNum != (unsigned) -1);
  100. size_t * variableOffsets = (size_t *)alloca(numOffsets * sizeof(size_t));
  101. RtlRow offsetCalculator(*inrec, nullptr, numOffsets, variableOffsets);
  102. unsigned fieldOffset = offsetCalculator.getOffset(fieldNum);
  103. unsigned fieldSize = offsetCalculator.getSize(fieldNum);
  104. const RtlTypeInfo *fieldType = inrec->queryType(fieldNum);
  105. filter = epos+1;
  106. if (*filter=='~')
  107. {
  108. UNIMPLEMENTED; // use a regex?
  109. }
  110. else
  111. {
  112. MemoryBuffer lobuffer;
  113. MemoryBuffer hibuffer;
  114. Owned<IStringSet> filterSet = createStringSet(fieldSize);
  115. deserializeSet(*filterSet, inrec->getMinRecordSize(), fieldType, filter);
  116. filters.append(*filterSet.getClear());
  117. filterOffsets.append(fieldOffset);
  118. flags |= TDRkeyed;
  119. }
  120. }
  121. private:
  122. StringAttr fileName;
  123. unsigned numOffsets = 0;
  124. unsigned flags = 0;
  125. Owned<IRtlFieldTypeDeserializer> indeserializer; // Owns the resulting ITypeInfo structures, so needs to be kept around
  126. Owned<IRtlFieldTypeDeserializer> outdeserializer; // Owns the resulting ITypeInfo structures, so needs to be kept around
  127. Owned<IOutputMetaData> in;
  128. Owned<IOutputMetaData> out;
  129. IArrayOf<IStringSet> filters;
  130. UnsignedArray filterOffsets;
  131. const RtlRecord *inrec = nullptr;
  132. Owned<const IDynamicTransform> translator;
  133. unsigned __int64 chooseN = I64C(0x7fffffffffffffff); // constant(s) should be commoned up somewhere
  134. unsigned __int64 skipN = 0;
  135. unsigned __int64 rowLimit = (unsigned __int64) -1;
  136. };
  137. class ECLRTL_API CDynamicWorkUnitWriteArg : public CThorWorkUnitWriteArg
  138. {
  139. public:
  140. CDynamicWorkUnitWriteArg(IOutputMetaData *_in) : in(_in)
  141. {
  142. }
  143. virtual int getSequence() override final { return 0; }
  144. virtual IOutputMetaData * queryOutputMeta() override final { return in; }
  145. private:
  146. Owned<IOutputMetaData> in;
  147. };
  148. static IOutputMetaData *loadTypeInfo(IPropertyTree &xgmml, const char *key)
  149. {
  150. StringBuffer xpath;
  151. MemoryBuffer binInfo;
  152. xgmml.getPropBin(xpath.setf("att[@name='%s_binary']/value", key), binInfo);
  153. assertex(binInfo.length());
  154. return new CDeserializedOutputMetaData(binInfo);
  155. }
  156. extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(IPropertyTree &xgmml)
  157. {
  158. Owned <IOutputMetaData> in = loadTypeInfo(xgmml, "input");
  159. Owned <IOutputMetaData> out = loadTypeInfo(xgmml, "output");
  160. const char *fileName = xgmml.queryProp("att[@name=\"_fileName\"]/@value");
  161. unsigned __int64 chooseN = xgmml.getPropInt64("att[@name=\"chooseN\"]/@value", -1);
  162. unsigned __int64 skipN = xgmml.getPropInt64("att[@name=\"skipN\"]/@value", -1);
  163. unsigned __int64 rowLimit = xgmml.getPropInt64("att[@name=\"rowLimit\"]/@value", -1);
  164. Owned<CDynamicDiskReadArg> ret = new CDynamicDiskReadArg(fileName, in.getClear(), out.getClear(), chooseN, skipN, rowLimit);
  165. Owned<IPropertyTreeIterator> filters = xgmml.getElements("att[@name=\"keyfilter\"]");
  166. ForEach(*filters)
  167. ret->addFilter(filters->query().queryProp("@value"));
  168. return ret.getClear();
  169. }
  170. extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(const char *fileName, IOutputMetaData *in, IOutputMetaData *out, unsigned __int64 chooseN, unsigned __int64 skipN, unsigned __int64 rowLimit)
  171. {
  172. return new CDynamicDiskReadArg(fileName, in, out, chooseN, skipN, rowLimit);
  173. }
  174. extern ECLRTL_API IHThorArg *createWorkunitWriteArg(IPropertyTree &xgmml)
  175. {
  176. Owned <IOutputMetaData> in = loadTypeInfo(xgmml, "input");
  177. return new CDynamicWorkUnitWriteArg(in.getClear());
  178. }
  179. struct ECLRTL_API DynamicEclProcess : public EclProcess {
  180. virtual unsigned getActivityVersion() const override { return ACTIVITY_INTERFACE_VERSION; }
  181. virtual int perform(IGlobalCodeContext * gctx, unsigned wfid) override {
  182. ICodeContext * ctx;
  183. ctx = gctx->queryCodeContext();
  184. ctx->executeGraph("graph1",false,0,NULL);
  185. return 1U;
  186. }
  187. };
  188. extern ECLRTL_API IEclProcess* createDynamicEclProcess()
  189. {
  190. return new DynamicEclProcess;
  191. }