Ver código fonte

HPCC-14417 Further JSON parsing and fileposition fixes

Make JSON pull parser interface getNext() compatible with XML pull parser.

Fix roxie filepostion support on XML record handler.

Signed-off-by: Anthony Fishbeck <anthony.fishbeck@lexisnexis.com>
Anthony Fishbeck 9 anos atrás
pai
commit
e21349dd55

+ 1 - 1
ecl/hthor/hthorkey.cpp

@@ -2598,7 +2598,7 @@ public:
                 part->getFilename(rfn).getPath(fname);
                 part->getFilename(rfn).getPath(fname);
                 throw owner.makeWrappedException(e, fname.str());
                 throw owner.makeWrappedException(e, fname.str());
             }
             }
-            if(!gotNext && !lastMatch) //unfortunately json parser next() has slightly different behavior, tricky, may fix later
+            if(!gotNext)
             {
             {
                 StringBuffer fname;
                 StringBuffer fname;
                 RemoteFilename rfn;
                 RemoteFilename rfn;

+ 21 - 5
roxie/ccd/ccdactivities.cpp

@@ -1608,19 +1608,34 @@ public:
 
 
 // RecordProcessor used by XML read activity. We don't try to index these or optimize fixed size cases...
 // RecordProcessor used by XML read activity. We don't try to index these or optimize fixed size cases...
 
 
-class XmlRecordProcessor : public RecordProcessor, implements IXMLSelect
+class XmlRecordProcessor : public RecordProcessor, implements IXMLSelect, implements IThorDiskCallback
 {
 {
 public:
 public:
     IMPLEMENT_IINTERFACE;
     IMPLEMENT_IINTERFACE;
     XmlRecordProcessor(CRoxieXmlReadActivity &_owner, IDirectReader *_reader)
     XmlRecordProcessor(CRoxieXmlReadActivity &_owner, IDirectReader *_reader)
-        : RecordProcessor(NULL), owner(_owner), reader(_reader)
+        : RecordProcessor(NULL), owner(_owner), reader(_reader), fileposition(0)
     {
     {
         helper = _owner.helper;
         helper = _owner.helper;
-        helper->setCallback(reader->queryThorDiskCallback());
+        helper->setCallback(this);
+    }
+
+    //interface IThorDiskCallback
+    virtual unsigned __int64 getFilePosition(const void * row)
+    {
+        return fileposition;
+    }
+    virtual unsigned __int64 getLocalFilePosition(const void * row)
+    {
+        return reader->makeFilePositionLocal(fileposition);
+    }
+    virtual const char * queryLogicalFilename(const void * row)
+    {
+        return reader->queryThorDiskCallback()->queryLogicalFilename(row);
     }
     }
 
 
     virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
     virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
     {
     {
+        fileposition = startOffset;
         lastMatch.set(&entry);
         lastMatch.set(&entry);
     }
     }
 
 
@@ -1646,7 +1661,7 @@ public:
                 break;
                 break;
             else if (lastMatch)
             else if (lastMatch)
             {
             {
-                unsigned transformedSize = owner.doTransform(output, rowTransformer, lastMatch, reader->queryThorDiskCallback());
+                unsigned transformedSize = owner.doTransform(output, rowTransformer, lastMatch, this);
                 lastMatch.clear();
                 lastMatch.clear();
                 if (transformedSize)
                 if (transformedSize)
                 {
                 {
@@ -1687,6 +1702,7 @@ protected:
 
 
     Owned<IColumnProvider> lastMatch;
     Owned<IColumnProvider> lastMatch;
     Owned<IDirectReader> reader;
     Owned<IDirectReader> reader;
+    unsigned __int64 fileposition;
 };
 };
 
 
 IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *datafile, size32_t maxRowSize)
 IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *_reader, bool _skipHeader, const IResolvedFile *datafile, size32_t maxRowSize)
@@ -4499,7 +4515,7 @@ public:
         try
         try
         {
         {
             while(!lastMatch)
             while(!lastMatch)
-            if(!parser->next() && !lastMatch) //unfortunately json parser next() has slightly different behavior, tricky, may fix later
+            if(!parser->next())
                 throw MakeStringException(ROXIE_RECORD_FETCH_ERROR, "XML parse error at position %" I64F "d", pos);
                 throw MakeStringException(ROXIE_RECORD_FETCH_ERROR, "XML parse error at position %" I64F "d", pos);
             IHThorXmlFetchArg *h = (IHThorXmlFetchArg *) helper;
             IHThorXmlFetchArg *h = (IHThorXmlFetchArg *) helper;
             unsigned thisSize = h->transform(rowBuilder, lastMatch, inputData, rawpos);
             unsigned thisSize = h->transform(rowBuilder, lastMatch, inputData, rawpos);

+ 37 - 0
roxie/ccd/ccdkey.cpp

@@ -71,6 +71,26 @@ class PtrToOffsetMapper
         return fragments[a-1];
         return fragments[a-1];
     }
     }
 
 
+    FragmentInfo &findBase(offset_t pos) const
+    {
+        // MORE - could cache last hit
+        unsigned int a = 0;
+        int b = numBases;
+        int rc;
+        while ((int)a<b)
+        {
+            int i = (a+b+1)/2;
+            rc = pos - fragments[i-1].baseOffset;
+            if (rc>=0)
+                a = i;
+            else
+                b = i-1;
+        }
+        assertex(a > 0);    // or incoming value was not part of ANY fragment
+        // MORE - could also check it's in the range for this fragment
+        return fragments[a-1];
+    }
+
     PtrToOffsetMapper(const PtrToOffsetMapper &);  // Not implemented
     PtrToOffsetMapper(const PtrToOffsetMapper &);  // Not implemented
 
 
 public:
 public:
@@ -132,6 +152,12 @@ public:
         return makeLocalFposOffset(frag.partNo, ptr - frag.base);
         return makeLocalFposOffset(frag.partNo, ptr - frag.base);
     }
     }
 
 
+    offset_t makeFilePositionLocal(offset_t pos)
+    {
+        FragmentInfo &frag = findBase(pos);
+        return makeLocalFposOffset(frag.partNo, pos - frag.baseOffset);
+    }
+
     void splitPos(offset_t &offset, offset_t &size, unsigned partNo, unsigned numParts)
     void splitPos(offset_t &offset, offset_t &size, unsigned partNo, unsigned numParts)
     {
     {
         assert(numParts > 0);
         assert(numParts > 0);
@@ -638,6 +664,11 @@ public:
         return baseMap.ptrToLocalFilePosition(_ptr);
         return baseMap.ptrToLocalFilePosition(_ptr);
     }
     }
 
 
+    virtual unsigned __int64 makeFilePositionLocal(offset_t pos)
+    {
+        return baseMap.makeFilePositionLocal(pos);
+    }
+
     virtual const char * queryLogicalFilename(const void * row) 
     virtual const char * queryLogicalFilename(const void * row) 
     { 
     { 
         UNIMPLEMENTED;
         UNIMPLEMENTED;
@@ -817,6 +848,12 @@ public:
         return makeLocalFposOffset(thisPartIdx-1, curStream->tell() + (const char *)_ptr - (const char *)curStream->peek(1, dummy));
         return makeLocalFposOffset(thisPartIdx-1, curStream->tell() + (const char *)_ptr - (const char *)curStream->peek(1, dummy));
     }
     }
 
 
+    virtual unsigned __int64 makeFilePositionLocal(offset_t pos)
+    {
+        assertex(pos >= thisFileStartPos);
+        return makeLocalFposOffset(thisPartIdx-1, pos - thisFileStartPos);
+    }
+
     virtual const char * queryLogicalFilename(const void * row) 
     virtual const char * queryLogicalFilename(const void * row) 
     { 
     { 
         return f->queryLogicalFilename(thisPartIdx);
         return f->queryLogicalFilename(thisPartIdx);

+ 1 - 0
roxie/ccd/ccdkey.hpp

@@ -29,6 +29,7 @@ interface IDirectReader : public ISerialStream
     virtual IThorDiskCallback *queryThorDiskCallback() = 0;
     virtual IThorDiskCallback *queryThorDiskCallback() = 0;
     virtual ISimpleReadStream *querySimpleStream() = 0;
     virtual ISimpleReadStream *querySimpleStream() = 0;
     virtual unsigned queryFilePart() const = 0;
     virtual unsigned queryFilePart() const = 0;
+    virtual unsigned __int64 makeFilePositionLocal(offset_t pos) = 0;
 };
 };
 
 
 interface IInMemoryIndexCursor : public IThorDiskCallback, public IIndexReadContext
 interface IInMemoryIndexCursor : public IThorDiskCallback, public IIndexReadContext

+ 10 - 10
roxie/ccd/ccdserver.cpp

@@ -20552,7 +20552,7 @@ class CRoxieServerXmlReadActivity : public CRoxieServerDiskReadBaseActivity, imp
     Owned<IXmlToRowTransformer> rowTransformer;
     Owned<IXmlToRowTransformer> rowTransformer;
     Owned<IXMLParse> xmlParser;
     Owned<IXMLParse> xmlParser;
     Owned<IColumnProvider> lastMatch;
     Owned<IColumnProvider> lastMatch;
-    unsigned __int64 localOffset;
+    unsigned __int64 fileoffset;
 public:
 public:
     IMPLEMENT_IINTERFACE;
     IMPLEMENT_IINTERFACE;
     CRoxieServerXmlReadActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const RemoteActivityId &_remoteId, unsigned _numParts, bool _isLocal, bool _sorted, bool _maySkip, IInMemoryIndexManager *_manager)
     CRoxieServerXmlReadActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const RemoteActivityId &_remoteId, unsigned _numParts, bool _isLocal, bool _sorted, bool _maySkip, IInMemoryIndexManager *_manager)
@@ -20560,7 +20560,7 @@ public:
     {
     {
         compoundHelper = NULL;
         compoundHelper = NULL;
         readHelper = (IHThorXmlReadArg *)&helper;
         readHelper = (IHThorXmlReadArg *)&helper;
-        localOffset = 0;
+        fileoffset = 0;
     }
     }
 
 
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
@@ -20590,22 +20590,22 @@ public:
 
 
     virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
     virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
     {
     {
-        localOffset = startOffset;
+        fileoffset = startOffset;
         lastMatch.set(&entry);
         lastMatch.set(&entry);
     }
     }
 
 
     //interface IThorDiskCallback
     //interface IThorDiskCallback
     virtual unsigned __int64 getFilePosition(const void * row)
     virtual unsigned __int64 getFilePosition(const void * row)
     {
     {
-        return localOffset;
+        return fileoffset;
     }
     }
     virtual unsigned __int64 getLocalFilePosition(const void * row)
     virtual unsigned __int64 getLocalFilePosition(const void * row)
     {
     {
-        return localOffset;
+        return reader->makeFilePositionLocal(fileoffset);
     }
     }
     virtual const char * queryLogicalFilename(const void * row)
     virtual const char * queryLogicalFilename(const void * row)
     {
     {
-        return varFileInfo ? varFileInfo->queryFileName() : NULL;
+        return reader->queryThorDiskCallback()->queryLogicalFilename(row);
     }
     }
 
 
     virtual const void *nextRow()
     virtual const void *nextRow()
@@ -20622,12 +20622,14 @@ public:
                 //call to next() will callback on the IXmlSelect interface
                 //call to next() will callback on the IXmlSelect interface
                 bool gotNext = false;
                 bool gotNext = false;
                 gotNext = xmlParser->next();
                 gotNext = xmlParser->next();
-                if (lastMatch)
+                if(!gotNext)
+                    eof = true;
+                else if (lastMatch)
                 {
                 {
                     RtlDynamicRowBuilder rowBuilder(rowAllocator);
                     RtlDynamicRowBuilder rowBuilder(rowAllocator);
                     unsigned sizeGot = rowTransformer->transform(rowBuilder, lastMatch, this);
                     unsigned sizeGot = rowTransformer->transform(rowBuilder, lastMatch, this);
                     lastMatch.clear();
                     lastMatch.clear();
-                    localOffset = 0;
+                    fileoffset = 0;
                     if (sizeGot)
                     if (sizeGot)
                     {
                     {
                         OwnedConstRoxieRow ret = rowBuilder.finalizeRowClear(sizeGot); 
                         OwnedConstRoxieRow ret = rowBuilder.finalizeRowClear(sizeGot); 
@@ -20647,8 +20649,6 @@ public:
                         return ret.getClear();
                         return ret.getClear();
                     }
                     }
                 }
                 }
-                if(!gotNext) //currently slightly different behavior for json
-                    eof = true;
             }
             }
             return NULL;
             return NULL;
         }
         }

+ 5 - 2
system/jlib/jptree.cpp

@@ -6713,6 +6713,7 @@ class CPullJSONReader : public CJSONReaderBase<X>, implements IPullPTreeReader
     enum ParseStates { headerStart, nameStart, valueStart, itemStart, objAttributes, itemContent, itemEnd } state;
     enum ParseStates { headerStart, nameStart, valueStart, itemStart, objAttributes, itemContent, itemEnd } state;
     bool endOfRoot;
     bool endOfRoot;
     bool preReadItemName;
     bool preReadItemName;
+    bool more;
     StringBuffer tag, value;
     StringBuffer tag, value;
 
 
     void init()
     void init()
@@ -6721,6 +6722,7 @@ class CPullJSONReader : public CJSONReaderBase<X>, implements IPullPTreeReader
         stateInfo = NULL;
         stateInfo = NULL;
         endOfRoot = false;
         endOfRoot = false;
         preReadItemName = false;
         preReadItemName = false;
+        more = true;
     }
     }
 
 
     virtual void resetState()
     virtual void resetState()
@@ -6916,7 +6918,6 @@ public:
     }
     }
     bool endNode(offset_t offset, bool notify=true)
     bool endNode(offset_t offset, bool notify=true)
     {
     {
-        bool more = true;
         if (stack.ordinality()<2)
         if (stack.ordinality()<2)
         {
         {
             state = headerStart;
             state = headerStart;
@@ -6932,7 +6933,7 @@ public:
         freeStateInfo.append(*stateInfo);
         freeStateInfo.append(*stateInfo);
         stack.pop();
         stack.pop();
         stateInfo = (stack.ordinality()) ? &stack.tos() : NULL;
         stateInfo = (stack.ordinality()) ? &stack.tos() : NULL;
-        return more;
+        return true;
     }
     }
 
 
     // IPullPTreeReader
     // IPullPTreeReader
@@ -6951,6 +6952,8 @@ public:
 
 
     virtual bool next()
     virtual bool next()
     {
     {
+        if (!more)
+            return false;
         checkStartReadNext();
         checkStartReadNext();
         checkSkipWS();
         checkSkipWS();
         switch (state)
         switch (state)

+ 1 - 1
thorlcr/activities/fetch/thfetchslave.cpp

@@ -647,7 +647,7 @@ public:
         lastMatch = &lastMatches[filePartIndex];
         lastMatch = &lastMatches[filePartIndex];
         while (!lastMatch->get())
         while (!lastMatch->get())
         {
         {
-            if (!parser->next() && !lastMatch)
+            if (!parser->next())
             {
             {
                 StringBuffer tmpStr;
                 StringBuffer tmpStr;
                 throw MakeActivityException(this, 0, "%s", fetchStream->getPartName(filePartIndex, tmpStr).str());
                 throw MakeActivityException(this, 0, "%s", fetchStream->getPartName(filePartIndex, tmpStr).str());