瀏覽代碼

Merge pull request #7918 from afishbeck/jsonIndex

HPCC-14417 Implement JSON Fetch

Reviewed-By: Gavin Halliday <gavin.halliday@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 年之前
父節點
當前提交
1aac76de24

+ 1 - 0
common/thorhelper/commonext.cpp

@@ -95,6 +95,7 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     kindArray[TAKtopn] = "topn";
     kindArray[TAKmerge] = "merge";
     kindArray[TAKxmlfetch] = "xmlfetch";
+    kindArray[TAKjsonfetch] = "jsonfetch";
     kindArray[TAKxmlparse] = "xmlparse";
     kindArray[TAKkeyeddistribute] = "keyeddistribute";
     kindArray[TAKjoinlight] = "joinlight";

+ 1 - 0
common/thorhelper/thorcommon.cpp

@@ -666,6 +666,7 @@ extern const char * getActivityText(ThorActivityKind kind)
     case TAKtopn:                   return "Top N";
     case TAKmerge:                  return "Merge";
     case TAKxmlfetch:               return "Xml Fetch";
+    case TAKjsonfetch:              return "Json Fetch";
     case TAKxmlparse:               return "Parse Xml";
     case TAKkeyeddistribute:        return "Keyed Distribute";
     case TAKjoinlight:              return "Lightweight Join";

+ 15 - 9
dali/ft/daftformat.ipp

@@ -359,10 +359,11 @@ protected:
 
 class DALIFT_API JsonSplitter : public CInterface, implements IPTreeNotifyEvent
 {
+    friend class CJsonInputPartitioner;
 public:
     IMPLEMENT_IINTERFACE;
 
-    JsonSplitter(const FileFormat & format, IFileIOStream &stream) : headerLength(0), pathPos(0), tangent(0), rowDepth(0), rowStart(0), rowEnd(0), footerLength(0), newRowSet(true), hasRootArray(false)
+    JsonSplitter(const FileFormat & format, IFileIOStream &stream) : headerLength(0), pathPos(0), tangent(0), rowDepth(0), rowStart((offset_t)-1), rowEnd(0), footerLength((offset_t)-1), newRowSet(true), hasRootArray(false)
     {
         LOG(MCdebugProgressDetail, unknownJob, "JsonSplitter::JsonSplitter(format.type :'%s', rowPath:'%s')", format.getFileFormatTypeString(), format.rowTag.get());
 
@@ -473,27 +474,32 @@ public:
         }
         return true;
     }
+    bool checkFoundRowStart()
+    {
+        return (rowStart!=(offset_t)-1);
+    }
     offset_t getHeaderLength()
     {
         if (!headerLength)
         {
-            while (!rowStart && reader->next());
-            if (!rowStart)
+            while (!checkFoundRowStart() && reader->next());
+            if (!checkFoundRowStart())
                 throw MakeStringException(DFTERR_CannotFindFirstJsonRecord, "Could not find first json record (check path)");
             else
-                headerLength = rowStart-1;
+                headerLength = rowStart;
         }
         return headerLength;
     }
     offset_t getFooterLength()
     {
-        if (!footerLength)
+        if (footerLength==(offset_t)-1)
         {
+            footerLength = 0;
             while (reader->next());
             if (rowEnd)
             {
                 footerLength = isRootless() ? 0 : 1; //account for parser using ] as offset unless rootless
-                footerLength = footerLength + size - rowEnd;
+                footerLength = footerLength + size - rowEnd - 1;
             }
         }
         return footerLength;
@@ -502,12 +508,12 @@ public:
     {
         if (rowStart <= headerLength)
             return 0;
-        return rowStart - headerLength - 1;
+        return rowStart - headerLength;
     }
 
     bool isRootless(){return (noPath && !hasRootArray);}
 
-public:
+private:
     Owned<IFileIOStream> inStream;
     Owned<IPullPTreeReader> reader;
     StringArray pathNodes;
@@ -540,7 +546,7 @@ protected:
 
         offset_t prevRowEnd;
         json->findRowEnd(splitOffset-thisOffset + thisHeaderSize, prevRowEnd);
-        if (!json->rowStart)
+        if (!json->checkFoundRowStart())
             return;
         if (!json->newRowSet) //get rid of extra delimiter if we haven't closed and reopened in the meantime
         {

+ 1 - 0
ecl/eclagent/eclgraph.cpp

@@ -182,6 +182,7 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
     case TAKxmlparse:
         return createXmlParseActivity(agent, activityId, subgraphId, (IHThorXmlParseArg &)arg, kind);
     case TAKxmlfetch:
+    case TAKjsonfetch:
         return createXmlFetchActivity(agent, activityId, subgraphId, (IHThorXmlFetchArg &)arg, kind);
     case TAKmerge: 
         return createMergeActivity(agent, activityId, subgraphId, (IHThorMergeArg &)arg, kind);

+ 2 - 2
ecl/hql/hqlgram.y

@@ -8556,7 +8556,7 @@ simpleDataSet
                             node_operator modeOp = no_none;
                             if (left->getOperator() == no_table)
                                 modeOp = left->queryChild(2)->getOperator();
-                            if ((modeOp != no_thor) && (modeOp != no_flat) && (modeOp != no_csv) && (modeOp != no_xml))
+                            if ((modeOp != no_thor) && (modeOp != no_flat) && (modeOp != no_csv) && (modeOp != no_xml) && (modeOp != no_json))
                                 parser->reportError(ERR_FETCH_NON_DATASET, $3, "First parameter of FETCH should be a disk file");
 
                             IHqlExpression *join = createDataset(no_fetch, left, createComma(right, $7.getExpr(), $9.getExpr(), createComma($10.getExpr(), $12.getExpr())));
@@ -8574,7 +8574,7 @@ simpleDataSet
                             node_operator modeOp = no_none;
                             if (left->getOperator() == no_table)
                                 modeOp = left->queryChild(2)->getOperator();
-                            if ((modeOp != no_thor) && (modeOp != no_flat) && (modeOp != no_csv) && (modeOp != no_xml))
+                            if ((modeOp != no_thor) && (modeOp != no_flat) && (modeOp != no_csv) && (modeOp != no_xml)  && (modeOp != no_json))
                                 parser->reportError(ERR_FETCH_NON_DATASET, $3, "First parameter of FETCH should be a disk file");
 
                             IHqlExpression *join = createDataset(no_fetch, left, createComma(right, $7.getExpr(), transform, createComma($8.getExpr(), $10.getExpr())));

+ 4 - 0
ecl/hqlcpp/hqlsource.cpp

@@ -1783,6 +1783,7 @@ inline bool useDescriptiveGraphLabel(ThorActivityKind kind)
     {
     case TAKcsvfetch:
     case TAKxmlfetch:
+    case TAKjsonfetch:
     case TAKfetch:
         return false;
     }
@@ -7277,6 +7278,9 @@ ABoundActivity * HqlCppTranslator::doBuildActivityFetch(BuildCtx & ctx, IHqlExpr
         return info.buildActivity(ctx, expr, TAKcsvfetch, "CsvFetch", childActivity);
     case no_xml:
         return info.buildActivity(ctx, expr, TAKxmlfetch, "XmlFetch", childActivity);
+    case no_json:
+        //Note use of "XmlFetch" because we want the code generator to leverage existing xml classes
+        return info.buildActivity(ctx, expr, TAKjsonfetch, "XmlFetch", childActivity);
     case no_flat:
     case no_thor:
         return info.buildActivity(ctx, expr, TAKfetch, "Fetch", childActivity);

+ 7 - 4
ecl/hthor/hthorkey.cpp

@@ -2571,10 +2571,11 @@ class XmlFetchPartHandler : public SimpleFetchPartHandlerBase, public IXMLSelect
 public:
     IMPLEMENT_IINTERFACE;
 
-    XmlFetchPartHandler(IXmlFetchHandlerCallback & _owner, IDistributedFilePart * _part, offset_t _base, offset_t _size, IThreadedExceptionHandler * _handler, unsigned _streamBufferSize, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta)
+    XmlFetchPartHandler(IXmlFetchHandlerCallback & _owner, IDistributedFilePart * _part, offset_t _base, offset_t _size, IThreadedExceptionHandler * _handler, unsigned _streamBufferSize, IThreadPool * _threadPool, bool _blockcompressed, MemoryAttr &_encryptionkey, unsigned _activityId, CachedOutputMetaData const & _outputMeta, bool _jsonFormat)
         : SimpleFetchPartHandlerBase(_part, _base, _size, _handler, _threadPool, _blockcompressed, _encryptionkey, _activityId, _outputMeta, NULL, NULL),
           owner(_owner),
-          streamBufferSize(_streamBufferSize)
+          streamBufferSize(_streamBufferSize),
+          jsonFormat(_jsonFormat)
     {
     }
 
@@ -2606,6 +2607,7 @@ public:
             }
         }
         owner.processFetched(fetch, lastMatch);
+        lastMatch.clear();
         parser->reset();
     }
 
@@ -2615,7 +2617,7 @@ public:
             return;
         FetchPartHandlerBase::openPart();
         rawStream.setown(createBufferedIOStream(rawFile, streamBufferSize));
-        parser.setown(createXMLParse(*rawStream, "/", *this));
+        parser.setown(jsonFormat ? createJSONParse(*rawStream, "/", *this) : createXMLParse(*rawStream, "/", *this));
     }
 
     //iface IXMLSelect
@@ -2630,6 +2632,7 @@ protected:
     Owned<IXMLParse> parser;
     Owned<IColumnProvider> lastMatch;
     unsigned streamBufferSize;
+    bool jsonFormat;
 };
 
 class CHThorXmlFetchActivity : public CHThorFetchActivityBase, public IXmlFetchHandlerCallback
@@ -2687,7 +2690,7 @@ public:
 
     virtual SimpleFetchPartHandlerBase * createFetchPartHandler(IDistributedFilePart * part, offset_t base, offset_t size, IThreadedExceptionHandler * handler, bool blockcompressed, MemoryAttr &encryptionkey, IOutputRowDeserializer * rowDeserializer, IEngineRowAllocator *rowAllocator)
     {
-        return new XmlFetchPartHandler(*this, part, base, size, handler, 4096, threadPool, blockcompressed, encryptionkey, activityId, outputMeta); //MORE: need to put correct stream buffer size here, when Gavin provides it
+        return new XmlFetchPartHandler(*this, part, base, size, handler, 4096, threadPool, blockcompressed, encryptionkey, activityId, outputMeta, kind==TAKjsonfetch); //MORE: need to put correct stream buffer size here, when Gavin provides it
     }
 
 protected:

+ 26 - 6
roxie/ccd/ccdactivities.cpp

@@ -1609,19 +1609,34 @@ public:
 
 // RecordProcessor used by XML read activity. We don't try to index these or optimize fixed size cases...
 
-class XmlRecordProcessor : public RecordProcessor, implements IXMLSelect
+class XmlRecordProcessor : public RecordProcessor, implements IXMLSelect, implements IThorDiskCallback
 {
 public:
     IMPLEMENT_IINTERFACE;
     XmlRecordProcessor(CRoxieXmlReadActivity &_owner, IDirectReader *_reader)
-        : RecordProcessor(NULL), owner(_owner), reader(_reader)
+        : RecordProcessor(NULL), owner(_owner), reader(_reader), fileposition(0)
     {
         helper = _owner.helper;
-        helper->setCallback(reader->queryThorDiskCallback());
+        helper->setCallback(this);
+    }
+
+    //interface IThorDiskCallback
+    virtual unsigned __int64 getFilePosition(const void * row)
+    {
+        return fileposition;
+    }
+    virtual unsigned __int64 getLocalFilePosition(const void * row)
+    {
+        return reader->makeFilePositionLocal(fileposition);
+    }
+    virtual const char * queryLogicalFilename(const void * row)
+    {
+        return reader->queryThorDiskCallback()->queryLogicalFilename(row);
     }
 
     virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
     {
+        fileposition = startOffset;
         lastMatch.set(&entry);
     }
 
@@ -1633,7 +1648,11 @@ public:
 #endif
         Linked<IXmlToRowTransformer> rowTransformer = helper->queryTransformer();
         OwnedRoxieString xmlIterator(helper->getXmlIteratorPath());
-        Owned<IXMLParse> xmlParser = createXMLParse(*reader->querySimpleStream(), xmlIterator, *this, (0 != (TDRxmlnoroot & helper->getFlags()))?ptr_noRoot:ptr_none, (helper->getFlags() & TDRusexmlcontents) != 0);
+        Owned<IXMLParse> xmlParser;
+        if (owner.basefactory->getKind() == TAKjsonread)
+            xmlParser.setown(createJSONParse(*reader->querySimpleStream(), xmlIterator, *this, (0 != (TDRxmlnoroot & helper->getFlags()))?ptr_noRoot:ptr_none, (helper->getFlags() & TDRusexmlcontents) != 0));
+        else
+            xmlParser.setown(createXMLParse(*reader->querySimpleStream(), xmlIterator, *this, (0 != (TDRxmlnoroot & helper->getFlags()))?ptr_noRoot:ptr_none, (helper->getFlags() & TDRusexmlcontents) != 0));
         while (!aborted)
         {
             //call to next() will callback on the IXmlSelect interface
@@ -1643,7 +1662,7 @@ public:
                 break;
             else if (lastMatch)
             {
-                unsigned transformedSize = owner.doTransform(output, rowTransformer, lastMatch, reader->queryThorDiskCallback());
+                unsigned transformedSize = owner.doTransform(output, rowTransformer, lastMatch, this);
                 lastMatch.clear();
                 if (transformedSize)
                 {
@@ -1684,6 +1703,7 @@ protected:
 
     Owned<IColumnProvider> lastMatch;
     Owned<IDirectReader> reader;
+    unsigned __int64 fileposition;
 };
 
 IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *datafile, size32_t maxRowSize)
@@ -4520,7 +4540,7 @@ public:
     {
         CRoxieFetchActivityBase::setPartNo(filechanged);
         rawStreamX.setown(createBufferedIOStream(rawFile, streamBufferSize));
-        parser.setown(createXMLParse(*rawStreamX, "/", *this));
+        parser.setown((factory->getKind()==TAKjsonfetch) ? createJSONParse(*rawStreamX, "/", *this) : createXMLParse(*rawStreamX, "/", *this));
     }
 };
 

+ 37 - 0
roxie/ccd/ccdkey.cpp

@@ -71,6 +71,26 @@ class PtrToOffsetMapper
         return fragments[a-1];
     }
 
+    FragmentInfo &findBase(offset_t pos) const
+    {
+        // MORE - could cache last hit
+        unsigned int a = 0;
+        int b = numBases;
+        int rc;
+        while ((int)a<b)
+        {
+            int i = (a+b+1)/2;
+            rc = pos - fragments[i-1].baseOffset;
+            if (rc>=0)
+                a = i;
+            else
+                b = i-1;
+        }
+        assertex(a > 0);    // or incoming value was not part of ANY fragment
+        // MORE - could also check it's in the range for this fragment
+        return fragments[a-1];
+    }
+
     PtrToOffsetMapper(const PtrToOffsetMapper &);  // Not implemented
 
 public:
@@ -132,6 +152,12 @@ public:
         return makeLocalFposOffset(frag.partNo, ptr - frag.base);
     }
 
+    offset_t makeFilePositionLocal(offset_t pos)
+    {
+        FragmentInfo &frag = findBase(pos);
+        return makeLocalFposOffset(frag.partNo, pos - frag.baseOffset);
+    }
+
     void splitPos(offset_t &offset, offset_t &size, unsigned partNo, unsigned numParts)
     {
         assert(numParts > 0);
@@ -638,6 +664,11 @@ public:
         return baseMap.ptrToLocalFilePosition(_ptr);
     }
 
+    virtual unsigned __int64 makeFilePositionLocal(offset_t pos)
+    {
+        return baseMap.makeFilePositionLocal(pos);
+    }
+
     virtual const char * queryLogicalFilename(const void * row) 
     { 
         UNIMPLEMENTED;
@@ -817,6 +848,12 @@ public:
         return makeLocalFposOffset(thisPartIdx-1, curStream->tell() + (const char *)_ptr - (const char *)curStream->peek(1, dummy));
     }
 
+    virtual unsigned __int64 makeFilePositionLocal(offset_t pos)
+    {
+        assertex(pos >= thisFileStartPos);
+        return makeLocalFposOffset(thisPartIdx-1, pos - thisFileStartPos);
+    }
+
     virtual const char * queryLogicalFilename(const void * row) 
     { 
         return f->queryLogicalFilename(thisPartIdx);

+ 1 - 0
roxie/ccd/ccdkey.hpp

@@ -29,6 +29,7 @@ interface IDirectReader : public ISerialStream
     virtual IThorDiskCallback *queryThorDiskCallback() = 0;
     virtual ISimpleReadStream *querySimpleStream() = 0;
     virtual unsigned queryFilePart() const = 0;
+    virtual unsigned __int64 makeFilePositionLocal(offset_t pos) = 0;
 };
 
 interface IInMemoryIndexCursor : public IThorDiskCallback, public IIndexReadContext

+ 3 - 0
roxie/ccd/ccdquery.cpp

@@ -598,6 +598,7 @@ protected:
         case TAKfetch:
         case TAKcsvfetch:
         case TAKxmlfetch:
+        case TAKjsonfetch:
             {
                 RemoteActivityId remoteId(id, hashValue);
                 return createRoxieServerFetchActivityFactory(id, subgraphId, *this, helperFactory, kind, remoteId, node);
@@ -1745,6 +1746,7 @@ class CSlaveQueryFactory : public CQueryFactory
         case TAKfetch:
         case TAKcsvfetch:
         case TAKxmlfetch:
+        case TAKjsonfetch:
         case TAKremotegraph:
             break;
         case TAKsubgraph:
@@ -1814,6 +1816,7 @@ class CSlaveQueryFactory : public CQueryFactory
                     newAct = createRoxieCSVFetchActivityFactory(node, subgraphId, *this, helperFactory);
                     break;
                 case TAKxmlfetch:
+                case TAKjsonfetch:
                     newAct = createRoxieXMLFetchActivityFactory(node, subgraphId, *this, helperFactory);
                     break;
                 case TAKkeyedjoin:

+ 21 - 7
roxie/ccd/ccdserver.cpp

@@ -11382,7 +11382,7 @@ public:
                     break;
                 try
                 {
-                    unsigned __int64 fpos;
+                    unsigned __int64 fpos=0;
                     RtlStaticRowBuilder rowBuilder(rowBuffer, maxDiskRecordSize);
                     size32_t thisSize = helper.transform(rowBuilder, nextrec, &bc, fpos);
                     builder->processKeyData(rowBuffer, fpos, thisSize);
@@ -20920,13 +20920,13 @@ public:
     }
 };
 
-class CRoxieServerXmlReadActivity : public CRoxieServerDiskReadBaseActivity, implements IXMLSelect
+class CRoxieServerXmlReadActivity : public CRoxieServerDiskReadBaseActivity, implements IXMLSelect, implements IThorDiskCallback
 {
     IHThorXmlReadArg * readHelper;
     Owned<IXmlToRowTransformer> rowTransformer;
     Owned<IXMLParse> xmlParser;
     Owned<IColumnProvider> lastMatch;
-    unsigned __int64 localOffset;
+    unsigned __int64 fileoffset;
 public:
     IMPLEMENT_IINTERFACE;
     CRoxieServerXmlReadActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const RemoteActivityId &_remoteId, unsigned _numParts, bool _isLocal, bool _sorted, bool _maySkip, IInMemoryIndexManager *_manager)
@@ -20934,7 +20934,7 @@ public:
     {
         compoundHelper = NULL;
         readHelper = (IHThorXmlReadArg *)&helper;
-        localOffset = 0;
+        fileoffset = 0;
     }
 
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
@@ -20964,10 +20964,24 @@ public:
 
     virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
     {
-        localOffset = startOffset;
+        fileoffset = startOffset;
         lastMatch.set(&entry);
     }
 
+    //interface IThorDiskCallback
+    virtual unsigned __int64 getFilePosition(const void * row)
+    {
+        return fileoffset;
+    }
+    virtual unsigned __int64 getLocalFilePosition(const void * row)
+    {
+        return reader->makeFilePositionLocal(fileoffset);
+    }
+    virtual const char * queryLogicalFilename(const void * row)
+    {
+        return reader->queryThorDiskCallback()->queryLogicalFilename(row);
+    }
+
     virtual const void *nextRow()
     {
         if (eof)
@@ -20987,9 +21001,9 @@ public:
                 else if (lastMatch)
                 {
                     RtlDynamicRowBuilder rowBuilder(rowAllocator);
-                    unsigned sizeGot = rowTransformer->transform(rowBuilder, lastMatch, reader->queryThorDiskCallback());
+                    unsigned sizeGot = rowTransformer->transform(rowBuilder, lastMatch, this);
                     lastMatch.clear();
-                    localOffset = 0;
+                    fileoffset = 0;
                     if (sizeGot)
                     {
                         OwnedConstRoxieRow ret = rowBuilder.finalizeRowClear(sizeGot); 

+ 1 - 0
rtl/include/eclhelper.hpp

@@ -924,6 +924,7 @@ enum ThorActivityKind
     TAKjsonread,
     TAKtrace,
     TAKquantile,
+    TAKjsonfetch,
 
     TAKlast
 };

+ 21 - 4
system/jlib/jptree.cpp

@@ -3952,6 +3952,12 @@ protected:
         curOffset++;
         return true;
     }
+    inline bool checkStartReadNext()
+    {
+        if (curOffset || nextChar) //not at starting state
+            return true;
+        return readNextToken();
+    }
     inline bool readNextToken();
     inline bool checkSkipWS()
     {
@@ -6290,6 +6296,9 @@ public:
     typedef CommonReaderBase<X> PARENT;
     using PARENT::reset;
     using PARENT::nextChar;
+    using PARENT::readNextToken;
+    using PARENT::checkReadNext;
+    using PARENT::checkStartReadNext;
     using PARENT::readNext;
     using PARENT::expecting;
     using PARENT::match;
@@ -6447,6 +6456,7 @@ class CJSONReader : public CJSONReaderBase<X>, implements IPTreeReader
     using PARENT::readName;
     using PARENT::checkReadNext;
     using PARENT::checkSkipWS;
+    using PARENT::checkStartReadNext;
     using PARENT::expecting;
     using PARENT::error;
     using PARENT::eos;
@@ -6478,6 +6488,7 @@ public:
     }
     void readValueNotify(const char *name, bool skipAttributes)
     {
+        offset_t startOffset = curOffset;
         StringBuffer value;
         if (readValue(value)==elementTypeNull)
             return;
@@ -6489,7 +6500,7 @@ public:
             return;
         }
 
-        iEvent->beginNode(name, curOffset);
+        iEvent->beginNode(name, startOffset);
         iEvent->beginNodeContent(name);
         iEvent->endNode(name, value.length(), value.str(), false, curOffset);
 
@@ -6576,9 +6587,10 @@ public:
         }
         iEvent->endNode(name, 0, "", false, curOffset);
     }
+
     void loadJSON()
     {
-        if (!checkReadNext())
+        if (!checkStartReadNext())
             return;
         if (checkBOM() && !checkReadNext())
             return;
@@ -6656,6 +6668,7 @@ class CPullJSONReader : public CJSONReaderBase<X>, implements IPullPTreeReader
     using PARENT::readName;
     using PARENT::checkReadNext;
     using PARENT::checkSkipWS;
+    using PARENT::checkStartReadNext;
     using PARENT::expecting;
     using PARENT::error;
     using PARENT::eos;
@@ -6700,6 +6713,7 @@ class CPullJSONReader : public CJSONReaderBase<X>, implements IPullPTreeReader
     enum ParseStates { headerStart, nameStart, valueStart, itemStart, objAttributes, itemContent, itemEnd } state;
     bool endOfRoot;
     bool preReadItemName;
+    bool more;
     StringBuffer tag, value;
 
     void init()
@@ -6708,6 +6722,7 @@ class CPullJSONReader : public CJSONReaderBase<X>, implements IPullPTreeReader
         stateInfo = NULL;
         endOfRoot = false;
         preReadItemName = false;
+        more = true;
     }
 
     virtual void resetState()
@@ -6903,7 +6918,6 @@ public:
     }
     bool endNode(offset_t offset, bool notify=true)
     {
-        bool more = true;
         if (stack.ordinality()<2)
         {
             state = headerStart;
@@ -6919,7 +6933,7 @@ public:
         freeStateInfo.append(*stateInfo);
         stack.pop();
         stateInfo = (stack.ordinality()) ? &stack.tos() : NULL;
-        return more;
+        return true;
     }
 
     // IPullPTreeReader
@@ -6938,6 +6952,9 @@ public:
 
     virtual bool next()
     {
+        if (!more)
+            return false;
+        checkStartReadNext();
         checkSkipWS();
         switch (state)
         {

+ 74 - 0
testing/regress/ecl/jsonfetch.ecl

@@ -0,0 +1,74 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+phoneRecord :=
+            RECORD
+string5         areaCode{xpath('@areaCode')};
+udecimal12      number{xpath('@number')};
+            END;
+
+contactrecord :=
+            RECORD
+phoneRecord     phone;
+boolean         hasemail{xpath('@hasEmail')};
+                ifblock(self.hasemail)
+string              email;
+                end;
+            END;
+
+bookRec :=
+    RECORD
+string      title;
+string      author;
+    END;
+
+personRecord :=
+            RECORD
+string20        surname;
+string10        forename;
+integer4        category;
+integer8        uid;
+phoneRecord     homePhone;
+boolean         hasMobile;
+                ifblock(self.hasMobile)
+phoneRecord         mobilePhone;
+                end;
+contactRecord   contact;
+dataset(bookRec) books;
+set of string    colours;
+string2         endmarker := '$$';
+            END;
+
+namesTable := dataset([
+        {'Shakespeare','William', 1, 1, '09876',123456,true,'07967',838690, 'n/a','n/a',true,'william@edata.com',[{'To kill a mocking bird','Lee'},{'Zen and the art of motorcycle maintainence','Pirsig'}], ALL},
+        {'Mitchell','Margaret', 1, 1, '09876',123456,true,'07967',838690, 'n/a','n/a',true,'maggy@edata.com',[{'Harry Potter and the Deathly Hallows','Rowling'},{'Where the Sidewalk Ends','Silverstein'}], ['Violet','Orange']},
+        {'Mitchell','David', 1, 1, '09876',123456,true,'07967',838690, 'n/a','n/a',true,'dm@edata.com',[{'Love in the Time of Cholera','Marquez'},{'Where the Wild Things Are','Sendak'}], ALL},
+        {'Dickens','Charles', 1, 2, '09876',654321,false,'','',false,[{'The cat in the hat','Suess'},{'Wolly the sheep',''}], ['Red','Yellow']},
+        {'Rowling','J.K.', 1, 2, '09876',654321,false,'','',false,[{'Animal Farm',''},{'Slaughterhouse-five','Vonnegut'}], ['Blue','Green']}
+        ], personRecord);
+
+output(namesTable,,'REGRESS::TEMP::output_object_namedArray.json',overwrite, json);
+readObjectNamedArray := dataset(DYNAMIC('REGRESS::TEMP::output_object_namedArray.json'), personRecord, json('Row'));
+
+namedArrayWithPos := dataset(DYNAMIC('REGRESS::TEMP::output_object_namedArray.json'), {personRecord, UNSIGNED8 RecPtr{virtual(fileposition)}}, json('Row'));
+BUILD(namedArrayWithPos, {surname, RecPtr}, 'REGRESS::TEMP::namedarray.json.index', OVERWRITE);
+
+namedArrayIndex := INDEX(namedArrayWithPos, {surname, RecPtr}, DYNAMIC('REGRESS::TEMP::namedarray.json.index'));
+
+fetcheddata := LIMIT(FETCH(namedArrayWithPos, namedArrayIndex(surname = 'Mitchell'), RIGHT.RecPtr), 10);
+fetchednopos := project(fetcheddata, personRecord); //don't output positions
+output(fetchednopos, named('fetched'));

文件差異過大導致無法顯示
+ 8 - 0
testing/regress/ecl/key/jsonfetch.xml


文件差異過大導致無法顯示
+ 8 - 0
testing/regress/ecl/key/xmlfetch2.xml


+ 73 - 0
testing/regress/ecl/xmlfetch2.ecl

@@ -0,0 +1,73 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+phoneRecord :=
+            RECORD
+string5         areaCode{xpath('@areaCode')};
+udecimal12      number{xpath('@number')};
+            END;
+
+contactrecord :=
+            RECORD
+phoneRecord     phone;
+boolean         hasemail{xpath('@hasEmail')};
+                ifblock(self.hasemail)
+string              email;
+                end;
+            END;
+
+bookRec :=
+    RECORD
+string      title;
+string      author;
+    END;
+
+personRecord :=
+            RECORD
+string20        surname;
+string10        forename;
+integer4        category;
+integer8        uid;
+phoneRecord     homePhone;
+boolean         hasMobile;
+                ifblock(self.hasMobile)
+phoneRecord         mobilePhone;
+                end;
+contactRecord   contact;
+dataset(bookRec) books;
+set of string    colours;
+string2         endmarker := '$$';
+            END;
+
+namesTable := dataset([
+        {'Shakespeare','William', 1, 1, '09876',123456,true,'07967',838690, 'n/a','n/a',true,'william@edata.com',[{'To kill a mocking bird','Lee'},{'Zen and the art of motorcycle maintainence','Pirsig'}], ALL},
+        {'Mitchell','Margaret', 1, 1, '09876',123456,true,'07967',838690, 'n/a','n/a',true,'maggy@edata.com',[{'Harry Potter and the Deathly Hallows','Rowling'},{'Where the Sidewalk Ends','Silverstein'}], ['Violet','Orange']},
+        {'Mitchell','David', 1, 1, '09876',123456,true,'07967',838690, 'n/a','n/a',true,'dm@edata.com',[{'Love in the Time of Cholera','Marquez'},{'Where the Wild Things Are','Sendak'}], ALL},
+        {'Dickens','Charles', 1, 2, '09876',654321,false,'','',false,[{'The cat in the hat','Suess'},{'Wolly the sheep',''}], ['Red','Yellow']},
+        {'Rowling','J.K.', 1, 2, '09876',654321,false,'','',false,[{'Animal Farm',''},{'Slaughterhouse-five','Vonnegut'}], ['Blue','Green']}
+        ], personRecord);
+
+output(namesTable,,'REGRESS::TEMP::xmlfetch2.xml',overwrite, XML);
+
+xmlWithPos := dataset(DYNAMIC('REGRESS::TEMP::xmlfetch2.xml'), {personRecord, UNSIGNED8 RecPtr{virtual(fileposition)}}, XML('Dataset/Row'));
+BUILD(xmlWithPos, {surname, RecPtr}, 'REGRESS::TEMP::xmlfetch2.xml.index', OVERWRITE);
+
+xmlIndex := INDEX(xmlWithPos, {surname, RecPtr}, DYNAMIC('REGRESS::TEMP::xmlfetch2.xml.index'));
+
+fetcheddata := LIMIT(FETCH(xmlWithPos, xmlIndex(surname = 'Mitchell'), RIGHT.RecPtr), 10);
+fetchednopos := project(fetcheddata, personRecord); //don't output positions
+output(fetchednopos, named('fetched'));

+ 4 - 1
thorlcr/activities/fetch/thfetchslave.cpp

@@ -634,7 +634,10 @@ public:
             streams[f].setown(createBufferedIOStream(fetchStream->queryPartIO(f)));
             // NB: the index is based on path iteration matches, so on lookup the elements start at positioned stream
             // i.e. getXmlIteratorPath not used (or supplied) here.
-            parsers[f].setown(createXMLParse(*streams[f], "/", *xmlSelect, ptr_none, ((IHThorXmlFetchArg *)fetchBaseHelper)->requiresContents()));
+            if (container.getKind()==TAKjsonfetch)
+                parsers[f].setown(createJSONParse(*streams[f], "/", *xmlSelect, ptr_none, ((IHThorXmlFetchArg *)fetchBaseHelper)->requiresContents()));
+            else
+                parsers[f].setown(createXMLParse(*streams[f], "/", *xmlSelect, ptr_none, ((IHThorXmlFetchArg *)fetchBaseHelper)->requiresContents()));
         }
     }
     virtual size32_t fetch(ARowBuilder & rowBuilder, const void *keyRow, unsigned filePartIndex, unsigned __int64 localFpos, unsigned __int64 fpos)

+ 1 - 0
thorlcr/master/thactivitymaster.cpp

@@ -326,6 +326,7 @@ public:
                 ret = createCsvFetchActivityMaster(this);
                 break;
             case TAKxmlfetch:
+            case TAKjsonfetch:
                 ret = createXmlFetchActivityMaster(this);
                 break;
             case TAKworkunitread:

+ 1 - 0
thorlcr/slave/slave.cpp

@@ -639,6 +639,7 @@ public:
                 ret = createCsvFetchSlave(this);
                 break;
             case TAKxmlfetch:
+            case TAKjsonfetch:
                 ret = createXmlFetchSlave(this);
                 break;
             case TAKthroughaggregate: