Ver código fonte

HPCC-14417 Implement JSON Fetch

Also fix JSON BuildIndex virtual filepositions.
Also fix Roxie XML build index filepositions.
Also fix hthor XML fetch (not clearing lastMatch).

Signed-off-by: Anthony Fishbeck <anthony.fishbeck@lexisnexis.com>
Anthony Fishbeck 9 anos atrás
pai
commit
0f226bc81f

+ 1 - 0
common/thorhelper/commonext.cpp

@@ -95,6 +95,7 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     kindArray[TAKtopn] = "topn";
     kindArray[TAKmerge] = "merge";
     kindArray[TAKxmlfetch] = "xmlfetch";
+    kindArray[TAKjsonfetch] = "jsonfetch";
     kindArray[TAKxmlparse] = "xmlparse";
     kindArray[TAKkeyeddistribute] = "keyeddistribute";
     kindArray[TAKjoinlight] = "joinlight";

+ 1 - 0
common/thorhelper/thorcommon.cpp

@@ -665,6 +665,7 @@ extern const char * getActivityText(ThorActivityKind kind)
     case TAKtopn:                   return "Top N";
     case TAKmerge:                  return "Merge";
     case TAKxmlfetch:               return "Xml Fetch";
+    case TAKjsonfetch:              return "Json Fetch";
     case TAKxmlparse:               return "Parse Xml";
     case TAKkeyeddistribute:        return "Keyed Distribute";
     case TAKjoinlight:              return "Lightweight Join";

+ 15 - 9
dali/ft/daftformat.ipp

@@ -359,10 +359,11 @@ protected:
 
 class DALIFT_API JsonSplitter : public CInterface, implements IPTreeNotifyEvent
 {
+    friend class CJsonInputPartitioner;
 public:
     IMPLEMENT_IINTERFACE;
 
-    JsonSplitter(const FileFormat & format, IFileIOStream &stream) : headerLength(0), pathPos(0), tangent(0), rowDepth(0), rowStart(0), rowEnd(0), footerLength(0), newRowSet(true), hasRootArray(false)
+    JsonSplitter(const FileFormat & format, IFileIOStream &stream) : headerLength(0), pathPos(0), tangent(0), rowDepth(0), rowStart((offset_t)-1), rowEnd(0), footerLength((offset_t)-1), newRowSet(true), hasRootArray(false)
     {
         LOG(MCdebugProgressDetail, unknownJob, "JsonSplitter::JsonSplitter(format.type :'%s', rowPath:'%s')", format.getFileFormatTypeString(), format.rowTag.get());
 
@@ -473,27 +474,32 @@ public:
         }
         return true;
     }
+    bool checkFoundRowStart()
+    {
+        return (rowStart!=(offset_t)-1);
+    }
     offset_t getHeaderLength()
     {
         if (!headerLength)
         {
-            while (!rowStart && reader->next());
-            if (!rowStart)
+            while (!checkFoundRowStart() && reader->next());
+            if (!checkFoundRowStart())
                 throw MakeStringException(DFTERR_CannotFindFirstJsonRecord, "Could not find first json record (check path)");
             else
-                headerLength = rowStart-1;
+                headerLength = rowStart;
         }
         return headerLength;
     }
     offset_t getFooterLength()
     {
-        if (!footerLength)
+        if (footerLength==(offset_t)-1)
         {
+            footerLength = 0;
             while (reader->next());
             if (rowEnd)
             {
                 footerLength = isRootless() ? 0 : 1; //account for parser using ] as offset unless rootless
-                footerLength = footerLength + size - rowEnd;
+                footerLength = footerLength + size - rowEnd - 1;
             }
         }
         return footerLength;
@@ -502,12 +508,12 @@ public:
     {
         if (rowStart <= headerLength)
             return 0;
-        return rowStart - headerLength - 1;
+        return rowStart - headerLength;
     }
 
     bool isRootless(){return (noPath && !hasRootArray);}
 
-public:
+private:
     Owned<IFileIOStream> inStream;
     Owned<IPullPTreeReader> reader;
     StringArray pathNodes;
@@ -540,7 +546,7 @@ protected:
 
         offset_t prevRowEnd;
         json->findRowEnd(splitOffset-thisOffset + thisHeaderSize, prevRowEnd);
-        if (!json->rowStart)
+        if (!json->checkFoundRowStart())
             return;
         if (!json->newRowSet) //get rid of extra delimiter if we haven't closed and reopened in the meantime
         {

+ 1 - 0
ecl/eclagent/eclgraph.cpp

@@ -182,6 +182,7 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
     case TAKxmlparse:
         return createXmlParseActivity(agent, activityId, subgraphId, (IHThorXmlParseArg &)arg, kind);
     case TAKxmlfetch:
+    case TAKjsonfetch:
         return createXmlFetchActivity(agent, activityId, subgraphId, (IHThorXmlFetchArg &)arg, kind);
     case TAKmerge: 
         return createMergeActivity(agent, activityId, subgraphId, (IHThorMergeArg &)arg, kind);

+ 2 - 2
ecl/hql/hqlgram.y

@@ -8556,7 +8556,7 @@ simpleDataSet
                             node_operator modeOp = no_none;
                             if (left->getOperator() == no_table)
                                 modeOp = left->queryChild(2)->getOperator();
-                            if ((modeOp != no_thor) && (modeOp != no_flat) && (modeOp != no_csv) && (modeOp != no_xml))
+                            if ((modeOp != no_thor) && (modeOp != no_flat) && (modeOp != no_csv) && (modeOp != no_xml) && (modeOp != no_json))
                                 parser->reportError(ERR_FETCH_NON_DATASET, $3, "First parameter of FETCH should be a disk file");
 
                             IHqlExpression *join = createDataset(no_fetch, left, createComma(right, $7.getExpr(), $9.getExpr(), createComma($10.getExpr(), $12.getExpr())));
@@ -8574,7 +8574,7 @@ simpleDataSet
                             node_operator modeOp = no_none;
                             if (left->getOperator() == no_table)
                                 modeOp = left->queryChild(2)->getOperator();
-                            if ((modeOp != no_thor) && (modeOp != no_flat) && (modeOp != no_csv) && (modeOp != no_xml))
+                            if ((modeOp != no_thor) && (modeOp != no_flat) && (modeOp != no_csv) && (modeOp != no_xml)  && (modeOp != no_json))
                                 parser->reportError(ERR_FETCH_NON_DATASET, $3, "First parameter of FETCH should be a disk file");
 
                             IHqlExpression *join = createDataset(no_fetch, left, createComma(right, $7.getExpr(), transform, createComma($8.getExpr(), $10.getExpr())));

+ 4 - 0
ecl/hqlcpp/hqlsource.cpp

@@ -1784,6 +1784,7 @@ inline bool useDescriptiveGraphLabel(ThorActivityKind kind)
     {
     case TAKcsvfetch:
     case TAKxmlfetch:
+    case TAKjsonfetch:
     case TAKfetch:
         return false;
     }
@@ -7278,6 +7279,9 @@ ABoundActivity * HqlCppTranslator::doBuildActivityFetch(BuildCtx & ctx, IHqlExpr
         return info.buildActivity(ctx, expr, TAKcsvfetch, "CsvFetch", childActivity);
     case no_xml:
         return info.buildActivity(ctx, expr, TAKxmlfetch, "XmlFetch", childActivity);
+    case no_json:
+        //Note use of "XmlFetch" because we want the code generator to leverage existing xml classes
+        return info.buildActivity(ctx, expr, TAKjsonfetch, "XmlFetch", childActivity);
     case no_flat:
     case no_thor:
         return info.buildActivity(ctx, expr, TAKfetch, "Fetch", childActivity);

+ 8 - 5
ecl/hthor/hthorkey.cpp

@@ -2571,10 +2571,11 @@ class XmlFetchPartHandler : public SimpleFetchPartHandlerBase, public IXMLSelect
 public:
     IMPLEMENT_IINTERFACE;
 
-    XmlFetchPartHandler(IXmlFetchHandlerCallback & _owner, IDistributedFilePart * _part, offset_t _base, offset_t _size, IThreadedExceptionHandler * _handler, unsigned _streamBufferSize, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta)
+    XmlFetchPartHandler(IXmlFetchHandlerCallback & _owner, IDistributedFilePart * _part, offset_t _base, offset_t _size, IThreadedExceptionHandler * _handler, unsigned _streamBufferSize, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, bool _jsonFormat)
         : SimpleFetchPartHandlerBase(_part, _base, _size, _handler, _threadPool, _blockcompressed, _encryptionkey, _activityId, _outputMeta, NULL, NULL),
           owner(_owner),
-          streamBufferSize(_streamBufferSize)
+          streamBufferSize(_streamBufferSize),
+          jsonFormat(_jsonFormat)
     {
     }
 
@@ -2597,7 +2598,7 @@ public:
                 part->getFilename(rfn).getPath(fname);
                 throw owner.makeWrappedException(e, fname.str());
             }
-            if(!gotNext)
+            if(!gotNext && !lastMatch) //unfortunately json parser next() has slightly different behavior, tricky, may fix later
             {
                 StringBuffer fname;
                 RemoteFilename rfn;
@@ -2606,6 +2607,7 @@ public:
             }
         }
         owner.processFetched(fetch, lastMatch);
+        lastMatch.clear();
         parser->reset();
     }
 
@@ -2615,7 +2617,7 @@ public:
             return;
         FetchPartHandlerBase::openPart();
         rawStream.setown(createBufferedIOStream(rawFile, streamBufferSize));
-        parser.setown(createXMLParse(*rawStream, "/", *this));
+        parser.setown(jsonFormat ? createJSONParse(*rawStream, "/", *this) : createXMLParse(*rawStream, "/", *this));
     }
 
     //iface IXMLSelect
@@ -2630,6 +2632,7 @@ protected:
     Owned<IXMLParse> parser;
     Owned<IColumnProvider> lastMatch;
     unsigned streamBufferSize;
+    bool jsonFormat;
 };
 
 class CHThorXmlFetchActivity : public CHThorFetchActivityBase, public IXmlFetchHandlerCallback
@@ -2687,7 +2690,7 @@ public:
 
     virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator)
     {
-        return new XmlFetchPartHandler(*this, part, base, size, handler, 4096, threadPool, blockcompressed, encryptionkey, activityId, outputMeta); //MORE: need to put correct stream buffer size here, when Gavin provides it
+        return new XmlFetchPartHandler(*this, part, base, size, handler, 4096, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, kind==TAKjsonfetch); //MORE: need to put correct stream buffer size here, when Gavin provides it
     }
 
 protected:

+ 7 - 3
roxie/ccd/ccdactivities.cpp

@@ -1632,7 +1632,11 @@ public:
 #endif
         Linked<IXmlToRowTransformer> rowTransformer = helper->queryTransformer();
         OwnedRoxieString xmlIterator(helper->getXmlIteratorPath());
-        Owned<IXMLParse> xmlParser = createXMLParse(*reader->querySimpleStream(), xmlIterator, *this, (0 != (TDRxmlnoroot & helper->getFlags()))?ptr_noRoot:ptr_none, (helper->getFlags() & TDRusexmlcontents) != 0);
+        Owned<IXMLParse> xmlParser;
+        if (owner.basefactory->getKind() == TAKjsonread)
+            xmlParser.setown(createJSONParse(*reader->querySimpleStream(), xmlIterator, *this, (0 != (TDRxmlnoroot & helper->getFlags()))?ptr_noRoot:ptr_none, (helper->getFlags() & TDRusexmlcontents) != 0));
+        else
+            xmlParser.setown(createXMLParse(*reader->querySimpleStream(), xmlIterator, *this, (0 != (TDRxmlnoroot & helper->getFlags()))?ptr_noRoot:ptr_none, (helper->getFlags() & TDRusexmlcontents) != 0));
         while (!aborted)
         {
             //call to next() will callback on the IXmlSelect interface
@@ -4495,7 +4499,7 @@ public:
         try
         {
             while(!lastMatch)
-            if(!parser->next())
+            if(!parser->next() && !lastMatch) //unfortunately json parser next() has slightly different behavior, tricky, may fix later
                 throw MakeStringException(ROXIE_RECORD_FETCH_ERROR, "XML parse error at position %" I64F "d", pos);
             IHThorXmlFetchArg *h = (IHThorXmlFetchArg *) helper;
             unsigned thisSize = h->transform(rowBuilder, lastMatch, inputData, rawpos);
@@ -4519,7 +4523,7 @@ public:
     {
         CRoxieFetchActivityBase::setPartNo(filechanged);
         rawStreamX.setown(createBufferedIOStream(rawFile, streamBufferSize));
-        parser.setown(createXMLParse(*rawStreamX, "/", *this));
+        parser.setown((factory->getKind()==TAKjsonfetch) ? createJSONParse(*rawStreamX, "/", *this) : createXMLParse(*rawStreamX, "/", *this));
     }
 };
 

+ 3 - 0
roxie/ccd/ccdquery.cpp

@@ -590,6 +590,7 @@ protected:
         case TAKfetch:
         case TAKcsvfetch:
         case TAKxmlfetch:
+        case TAKjsonfetch:
             {
                 RemoteActivityId remoteId(id, hashValue);
                 return createRoxieServerFetchActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
@@ -1737,6 +1738,7 @@ class CSlaveQueryFactory : public CQueryFactory
         case TAKfetch:
         case TAKcsvfetch:
         case TAKxmlfetch:
+        case TAKjsonfetch:
         case TAKremotegraph:
             break;
         case TAKsubgraph:
@@ -1806,6 +1808,7 @@ class CSlaveQueryFactory : public CQueryFactory
                     newAct = createRoxieCSVFetchActivityFactory(node, subgraphId, *this, helperFactory);
                     break;
                 case TAKxmlfetch:
+                case TAKjsonfetch:
                     newAct = createRoxieXMLFetchActivityFactory(node, subgraphId, *this, helperFactory);
                     break;
                 case TAKkeyedjoin:

+ 20 - 6
roxie/ccd/ccdserver.cpp

@@ -11049,7 +11049,7 @@ public:
                     break;
                 try
                 {
-                    unsigned __int64 fpos;
+                    unsigned __int64 fpos=0;
                     RtlStaticRowBuilder rowBuilder(rowBuffer, maxDiskRecordSize);
                     size32_t thisSize = helper.transform(rowBuilder, nextrec, &bc, fpos);
                     builder->processKeyData(rowBuffer, fpos, thisSize);
@@ -20546,7 +20546,7 @@ public:
     }
 };
 
-class CRoxieServerXmlReadActivity : public CRoxieServerDiskReadBaseActivity, implements IXMLSelect
+class CRoxieServerXmlReadActivity : public CRoxieServerDiskReadBaseActivity, implements IXMLSelect, implements IThorDiskCallback
 {
     IHThorXmlReadArg * readHelper;
     Owned<IXmlToRowTransformer> rowTransformer;
@@ -20594,6 +20594,20 @@ public:
         lastMatch.set(&entry);
     }
 
+    //interface IThorDiskCallback
+    virtual unsigned __int64 getFilePosition(const void * row)
+    {
+        return localOffset;
+    }
+    virtual unsigned __int64 getLocalFilePosition(const void * row)
+    {
+        return localOffset;
+    }
+    virtual const char * queryLogicalFilename(const void * row)
+    {
+        return varFileInfo ? varFileInfo->queryFileName() : NULL;
+    }
+
     virtual const void *nextRow()
     {
         if (eof)
@@ -20608,12 +20622,10 @@ public:
                 //call to next() will callback on the IXmlSelect interface
                 bool gotNext = false;
                 gotNext = xmlParser->next();
-                if(!gotNext)
-                    eof = true;
-                else if (lastMatch)
+                if (lastMatch)
                 {
                     RtlDynamicRowBuilder rowBuilder(rowAllocator);
-                    unsigned sizeGot = rowTransformer->transform(rowBuilder, lastMatch, reader->queryThorDiskCallback());
+                    unsigned sizeGot = rowTransformer->transform(rowBuilder, lastMatch, this);
                     lastMatch.clear();
                     localOffset = 0;
                     if (sizeGot)
@@ -20635,6 +20647,8 @@ public:
                         return ret.getClear();
                     }
                 }
+                if(!gotNext) //currently slightly different behavior for json
+                    eof = true;
             }
             return NULL;
         }

+ 1 - 0
rtl/include/eclhelper.hpp

@@ -924,6 +924,7 @@ enum ThorActivityKind
     TAKjsonread,
     TAKtrace,
     TAKquantile,
+    TAKjsonfetch,
 
     TAKlast
 };

+ 16 - 2
system/jlib/jptree.cpp

@@ -3952,6 +3952,12 @@ protected:
         curOffset++;
         return true;
     }
+    inline bool checkStartReadNext()
+    {
+        if (curOffset || nextChar) //not at starting state
+            return true;
+        return readNextToken();
+    }
     inline bool readNextToken();
     inline bool checkSkipWS()
     {
@@ -6290,6 +6296,9 @@ public:
     typedef CommonReaderBase<X> PARENT;
     using PARENT::reset;
     using PARENT::nextChar;
+    using PARENT::readNextToken;
+    using PARENT::checkReadNext;
+    using PARENT::checkStartReadNext;
     using PARENT::readNext;
     using PARENT::expecting;
     using PARENT::match;
@@ -6447,6 +6456,7 @@ class CJSONReader : public CJSONReaderBase<X>, implements IPTreeReader
     using PARENT::readName;
     using PARENT::checkReadNext;
     using PARENT::checkSkipWS;
+    using PARENT::checkStartReadNext;
     using PARENT::expecting;
     using PARENT::error;
     using PARENT::eos;
@@ -6478,6 +6488,7 @@ public:
     }
     void readValueNotify(const char *name, bool skipAttributes)
     {
+        offset_t startOffset = curOffset;
         StringBuffer value;
         if (readValue(value)==elementTypeNull)
             return;
@@ -6489,7 +6500,7 @@ public:
             return;
         }
 
-        iEvent->beginNode(name, curOffset);
+        iEvent->beginNode(name, startOffset);
         iEvent->beginNodeContent(name);
         iEvent->endNode(name, value.length(), value.str(), false, curOffset);
 
@@ -6576,9 +6587,10 @@ public:
         }
         iEvent->endNode(name, 0, "", false, curOffset);
     }
+
     void loadJSON()
     {
-        if (!checkReadNext())
+        if (!checkStartReadNext())
             return;
         if (checkBOM() && !checkReadNext())
             return;
@@ -6656,6 +6668,7 @@ class CPullJSONReader : public CJSONReaderBase<X>, implements IPullPTreeReader
     using PARENT::readName;
     using PARENT::checkReadNext;
     using PARENT::checkSkipWS;
+    using PARENT::checkStartReadNext;
     using PARENT::expecting;
     using PARENT::error;
     using PARENT::eos;
@@ -6938,6 +6951,7 @@ public:
 
     virtual bool next()
     {
+        checkStartReadNext();
         checkSkipWS();
         switch (state)
         {

+ 74 - 0
testing/regress/ecl/jsonfetch.ecl

@@ -0,0 +1,74 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+phoneRecord :=
+            RECORD
+string5         areaCode{xpath('@areaCode')};
+udecimal12      number{xpath('@number')};
+            END;
+
+contactrecord :=
+            RECORD
+phoneRecord     phone;
+boolean         hasemail{xpath('@hasEmail')};
+                ifblock(self.hasemail)
+string              email;
+                end;
+            END;
+
+bookRec :=
+    RECORD
+string      title;
+string      author;
+    END;
+
+personRecord :=
+            RECORD
+string20        surname;
+string10        forename;
+integer4        category;
+integer8        uid;
+phoneRecord     homePhone;
+boolean         hasMobile;
+                ifblock(self.hasMobile)
+phoneRecord         mobilePhone;
+                end;
+contactRecord   contact;
+dataset(bookRec) books;
+set of string    colours;
+string2         endmarker := '$$';
+            END;
+
+namesTable := dataset([
+        {'Shakespeare','William', 1, 1, '09876',123456,true,'07967',838690, 'n/a','n/a',true,'william@edata.com',[{'To kill a mocking bird','Lee'},{'Zen and the art of motorcycle maintainence','Pirsig'}], ALL},
+        {'Mitchell','Margaret', 1, 1, '09876',123456,true,'07967',838690, 'n/a','n/a',true,'maggy@edata.com',[{'Harry Potter and the Deathly Hallows','Rowling'},{'Where the Sidewalk Ends','Silverstein'}], ['Violet','Orange']},
+        {'Mitchell','David', 1, 1, '09876',123456,true,'07967',838690, 'n/a','n/a',true,'dm@edata.com',[{'Love in the Time of Cholera','Marquez'},{'Where the Wild Things Are','Sendak'}], ALL},
+        {'Dickens','Charles', 1, 2, '09876',654321,false,'','',false,[{'The cat in the hat','Suess'},{'Wolly the sheep',''}], ['Red','Yellow']},
+        {'Rowling','J.K.', 1, 2, '09876',654321,false,'','',false,[{'Animal Farm',''},{'Slaughterhouse-five','Vonnegut'}], ['Blue','Green']}
+        ], personRecord);
+
+output(namesTable,,'REGRESS::TEMP::output_object_namedArray.json',overwrite, json);
+readObjectNamedArray := dataset(DYNAMIC('REGRESS::TEMP::output_object_namedArray.json'), personRecord, json('Row'));
+
+namedArrayWithPos := dataset(DYNAMIC('REGRESS::TEMP::output_object_namedArray.json'), {personRecord, UNSIGNED8 RecPtr{virtual(fileposition)}}, json('Row'));
+BUILD(namedArrayWithPos, {surname, RecPtr}, 'REGRESS::TEMP::namedarray.json.index', OVERWRITE);
+
+namedArrayIndex := INDEX(namedArrayWithPos, {surname, RecPtr}, DYNAMIC('REGRESS::TEMP::namedarray.json.index'));
+
+fetcheddata := LIMIT(FETCH(namedArrayWithPos, namedArrayIndex(surname = 'Mitchell'), RIGHT.RecPtr), 10);
+fetchednopos := project(fetcheddata, personRecord); //don't output positions
+output(fetchednopos, named('fetched'));

Diferenças do arquivo suprimidas por serem muito extensas
+ 8 - 0
testing/regress/ecl/key/jsonfetch.xml


+ 5 - 2
thorlcr/activities/fetch/thfetchslave.cpp

@@ -634,7 +634,10 @@ public:
             streams[f].setown(createBufferedIOStream(fetchStream->queryPartIO(f)));
             // NB: the index is based on path iteration matches, so on lookup the elements start at positioned stream
             // i.e. getXmlIteratorPath not used (or supplied) here.
-            parsers[f].setown(createXMLParse(*streams[f], "/", *xmlSelect, ptr_none, ((IHThorXmlFetchArg *)fetchBaseHelper)->requiresContents()));
+            if (container.getKind()==TAKjsonfetch)
+                parsers[f].setown(createJSONParse(*streams[f], "/", *xmlSelect, ptr_none, ((IHThorXmlFetchArg *)fetchBaseHelper)->requiresContents()));
+            else
+                parsers[f].setown(createXMLParse(*streams[f], "/", *xmlSelect, ptr_none, ((IHThorXmlFetchArg *)fetchBaseHelper)->requiresContents()));
         }
     }
     virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)
@@ -644,7 +647,7 @@ public:
         lastMatch = &lastMatches[filePartIndex];
         while (!lastMatch->get())
         {
-            if (!parser->next())
+            if (!parser->next() && !lastMatch)
             {
                 StringBuffer tmpStr;
                 throw MakeActivityException(this, 0, "%s", fetchStream->getPartName(filePartIndex, tmpStr).str());

+ 1 - 0
thorlcr/master/thactivitymaster.cpp

@@ -326,6 +326,7 @@ public:
                 ret = createCsvFetchActivityMaster(this);
                 break;
             case TAKxmlfetch:
+            case TAKjsonfetch:
                 ret = createXmlFetchActivityMaster(this);
                 break;
             case TAKworkunitread:

+ 1 - 0
thorlcr/slave/slave.cpp

@@ -631,6 +631,7 @@ public:
                 ret = createCsvFetchSlave(this);
                 break;
             case TAKxmlfetch:
+            case TAKjsonfetch:
                 ret = createXmlFetchSlave(this);
                 break;
             case TAKthroughaggregate: