123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "platform.h"
- #include "jptree.hpp"
- #include "eclrtl.hpp"
- #include "eclhelper.hpp"
- #include "rtlds_imp.hpp"
- #include "eclhelper_base.hpp"
- #include "eclhelper_dyn.hpp"
- #include "rtlfield.hpp"
- #include "rtlrecord.hpp"
- #include "rtldynfield.hpp"
- #include "rtlkey.hpp"
- //---------------------------------------------------------------------------
- static void readString(StringBuffer &out, const char * &in)
- {
- for (;;)
- {
- char c = *in++;
- if (!c)
- throw MakeStringException(0, "Invalid filter - missing closing '");
- if (c=='\'')
- break;
- if (c=='\\')
- UNIMPLEMENTED;
- out.append(c);
- }
- }
- class ECLRTL_API CDynamicDiskReadArg : public CThorDiskReadArg
- {
- public:
- CDynamicDiskReadArg(const char *_fileName, IOutputMetaData *_in, IOutputMetaData *_out, unsigned __int64 _chooseN, unsigned __int64 _skipN, unsigned __int64 _rowLimit)
- : fileName(_fileName), in(_in), out(_out), chooseN(_chooseN), skipN(_skipN), rowLimit(_rowLimit)
- {
- inrec = &in->queryRecordAccessor(true);
- numOffsets = inrec->getNumVarFields() + 1;
- translator.setown(createRecordTranslator(queryOutputMeta()->queryRecordAccessor(true), *inrec));
- }
- virtual bool needTransform() override
- {
- return true;
- //return translator->needsTranslate(); might be more appropriate?
- }
- virtual unsigned getFlags() override
- {
- return flags;
- }
- virtual void createSegmentMonitors(IIndexReadContext *irc) override
- {
- ForEachItemIn(idx, filters)
- {
- IStringSet &filter = filters.item(idx);
- irc->append(createKeySegmentMonitor(false, LINK(&filter), filterOffsets.item(idx), filter.getSize()));
- }
- }
- virtual IOutputMetaData * queryOutputMeta() override
- {
- return out;
- }
- virtual const char * getFileName() override final
- {
- return fileName;
- }
- virtual IOutputMetaData * queryDiskRecordSize() override final
- {
- return in;
- }
- virtual unsigned getFormatCrc() override
- {
- return 0; // engines should treat 0 as 'ignore'
- }
- virtual size32_t transform(ARowBuilder & rowBuilder, const void * src) override
- {
- return translator->translate(rowBuilder, (const byte *) src);
- }
- virtual unsigned __int64 getChooseNLimit() { return chooseN; }
- virtual unsigned __int64 getRowLimit() { return rowLimit; }
- void addFilter(const char *filter)
- {
- // Format of a filter is:
- // field[..n]: valuestring
- // value string format specifies ranges using a comma-separated list of ranges.
- // Each range is specified as paren lower, upper paren, where the paren is either ( or [ depending
- // on whether the specified bound is inclusive or exclusive.
- // If only one bound is specified then it is used for both upper and lower bound (only meaningful with [] )
- //
- // ( A means values > A - exclusive
- // [ means values >= A - inclusive
- // A ) means values < A - exclusive
- // A ] means values <= A - inclusive
- // For example:
- // [A] matches just A
- // (,A),(A,) matches all but A
- // (A] of [A) are both empty ranges
- // [A,B) means A*
- // Values use the ECL syntax for constants. String constants are always utf8. Binary use d'xx' format (hexpairs)
- // Note that binary serialization format is different
- assertex(filter);
- const char *epos = strchr(filter,'=');
- assertex(epos);
- StringBuffer fieldName(epos-filter, filter);
- unsigned fieldNum = inrec->getFieldNum(fieldName);
- assertex(fieldNum != (unsigned) -1);
- size_t * variableOffsets = (size_t *)alloca(numOffsets * sizeof(size_t));
- RtlRow offsetCalculator(*inrec, nullptr, numOffsets, variableOffsets);
- unsigned fieldOffset = offsetCalculator.getOffset(fieldNum);
- unsigned fieldSize = offsetCalculator.getSize(fieldNum);
- const RtlTypeInfo *fieldType = inrec->queryType(fieldNum);
- filter = epos+1;
- if (*filter=='~')
- {
- UNIMPLEMENTED; // use a regex?
- }
- else
- {
- MemoryBuffer lobuffer;
- MemoryBuffer hibuffer;
- Owned<IStringSet> filterSet = createStringSet(fieldSize);
- while (*filter)
- {
- char startRange = *filter++;
- if (startRange != '(' && startRange != '[')
- throw MakeStringException(0, "Invalid filter string: expected [ or ( at start of range");
- // Now we expect a constant - type depends on type of field. Assume string or int for now
- StringBuffer upperString, lowerString;
- if (*filter=='\'')
- {
- filter++;
- readString(lowerString, filter);
- }
- else
- UNIMPLEMENTED; // lowerInt = readInt(curFilter);
- if (*filter == ',')
- {
- filter++;
- if (*filter=='\'')
- {
- filter++;
- readString(upperString, filter);
- }
- else
- UNIMPLEMENTED; //upperInt = readInt(curFilter);
- }
- else
- upperString.set(lowerString);
- char endRange = *filter++;
- if (endRange != ')' && endRange != ']')
- throw MakeStringException(0, "Invalid filter string: expected ] or ) at end of range");
- if (*filter==',')
- filter++;
- else if (*filter)
- throw MakeStringException(0, "Invalid filter string: expected , between ranges");
- MemoryBufferBuilder lobuilder(lobuffer.clear(), inrec->getMinRecordSize());
- fieldType->buildUtf8(lobuilder, 0, inrec->queryField(fieldNum), lowerString.length(), lowerString.str());
- MemoryBufferBuilder hibuilder(hibuffer.clear(), inrec->getMinRecordSize());
- fieldType->buildUtf8(hibuilder, 0, inrec->queryField(fieldNum), upperString.length(), upperString.str());
- filterSet->addRange(lobuffer.toByteArray(), hibuffer.toByteArray());
- if (startRange=='(')
- filterSet->killRange(lobuffer.toByteArray(), lobuffer.toByteArray());
- if (endRange==')')
- filterSet->killRange(hibuffer.toByteArray(), hibuffer.toByteArray());
- }
- filters.append(*filterSet.getClear());
- filterOffsets.append(fieldOffset);
- flags |= TDRkeyed;
- }
- }
- private:
- StringAttr fileName;
- unsigned numOffsets = 0;
- unsigned flags = 0;
- Owned<IRtlFieldTypeDeserializer> indeserializer; // Owns the resulting ITypeInfo structures, so needs to be kept around
- Owned<IRtlFieldTypeDeserializer> outdeserializer; // Owns the resulting ITypeInfo structures, so needs to be kept around
- Owned<IOutputMetaData> in;
- Owned<IOutputMetaData> out;
- IArrayOf<IStringSet> filters;
- UnsignedArray filterOffsets;
- const RtlRecord *inrec = nullptr;
- Owned<const IDynamicTransform> translator;
- unsigned __int64 chooseN = I64C(0x7fffffffffffffff); // constant(s) should be commoned up somewhere
- unsigned __int64 skipN = 0;
- unsigned __int64 rowLimit = (unsigned __int64) -1;
- };
- class ECLRTL_API CDynamicWorkUnitWriteArg : public CThorWorkUnitWriteArg
- {
- public:
- CDynamicWorkUnitWriteArg(IOutputMetaData *_in) : in(_in)
- {
- }
- virtual int getSequence() override final { return 0; }
- virtual IOutputMetaData * queryOutputMeta() override final { return in; }
- private:
- Owned<IOutputMetaData> in;
- };
- static IOutputMetaData *loadTypeInfo(IPropertyTree &xgmml, const char *key)
- {
- StringBuffer xpath;
- MemoryBuffer binInfo;
- xgmml.getPropBin(xpath.setf("att[@name='%s_binary']/value", key), binInfo);
- assertex(binInfo.length());
- return new CDeserializedOutputMetaData(binInfo);
- }
- extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(IPropertyTree &xgmml)
- {
- Owned <IOutputMetaData> in = loadTypeInfo(xgmml, "input");
- Owned <IOutputMetaData> out = loadTypeInfo(xgmml, "output");
- const char *fileName = xgmml.queryProp("att[@name=\"_fileName\"]/@value");
- unsigned __int64 chooseN = xgmml.getPropInt64("att[@name=\"chooseN\"]/@value", -1);
- unsigned __int64 skipN = xgmml.getPropInt64("att[@name=\"skipN\"]/@value", -1);
- unsigned __int64 rowLimit = xgmml.getPropInt64("att[@name=\"rowLimit\"]/@value", -1);
- Owned<CDynamicDiskReadArg> ret = new CDynamicDiskReadArg(fileName, in.getClear(), out.getClear(), chooseN, skipN, rowLimit);
- Owned<IPropertyTreeIterator> filters = xgmml.getElements("att[@name=\"keyfilter\"]");
- ForEach(*filters)
- ret->addFilter(filters->query().queryProp("@value"));
- return ret.getClear();
- }
- extern ECLRTL_API IHThorDiskReadArg *createDiskReadArg(const char *fileName, IOutputMetaData *in, IOutputMetaData *out, unsigned __int64 chooseN, unsigned __int64 skipN, unsigned __int64 rowLimit)
- {
- return new CDynamicDiskReadArg(fileName, in, out, chooseN, skipN, rowLimit);
- }
- extern ECLRTL_API IHThorArg *createWorkunitWriteArg(IPropertyTree &xgmml)
- {
- Owned <IOutputMetaData> in = loadTypeInfo(xgmml, "input");
- return new CDynamicWorkUnitWriteArg(in.getClear());
- }
- struct ECLRTL_API DynamicEclProcess : public EclProcess {
- virtual unsigned getActivityVersion() const override { return ACTIVITY_INTERFACE_VERSION; }
- virtual int perform(IGlobalCodeContext * gctx, unsigned wfid) override {
- ICodeContext * ctx;
- ctx = gctx->queryCodeContext();
- ctx->executeGraph("graph1",false,0,NULL);
- return 1U;
- }
- };
- extern ECLRTL_API IEclProcess* createDynamicEclProcess()
- {
- return new DynamicEclProcess;
- }
|