Bläddra i källkod

HPCC-17396 Roxie grouped disk read not working when using remote adaptor

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 8 år sedan
förälder
incheckning
7a86f9b444
2 ändrade filer med 52 tillägg och 13 borttagningar
  1. 43 11
      roxie/ccd/ccdactivities.cpp
  2. 9 2
      roxie/ccd/ccdserver.cpp

+ 43 - 11
roxie/ccd/ccdactivities.cpp

@@ -840,6 +840,7 @@ protected:
     unsigned numParallel;
 
     bool isKeyed;
+    bool isGrouped = false;
     bool forceUnkeyed;
 
     offset_t readPos;
@@ -865,7 +866,12 @@ public:
         isOpt = (helper->getFlags() & TDRoptional) != 0;
         Linked<IOutputMetaData> diskMeta(helper->queryDiskRecordSize()->querySerializedDiskMeta());
         if (diskMeta->isGrouped())
+        {
             diskMeta.setown(new CSuffixedOutputMeta(+1, diskMeta.getClear()));
+            isGrouped = true;
+        }
+        else
+            isGrouped = false;
         diskSize.set(diskMeta);
         processed = 0;
         readPos = 0;
@@ -892,7 +898,7 @@ public:
         CRoxieSlaveActivity::onCreate();
         helper->createSegmentMonitors(this);
         if (!resent)
-            isKeyed = (cursor && !forceUnkeyed) ? cursor->selectKey() : false;
+            isKeyed = (cursor && !forceUnkeyed && !isGrouped) ? cursor->selectKey() : false;
     }
 
     virtual const char *queryDynamicFileName() const
@@ -905,13 +911,14 @@ public:
         const IPropertyTree *options =  varFileInfo->queryProperties();
         if (options)
         {
-            bool isGrouped = options->getPropBool("@grouped");
-            if (isGrouped && !diskSize.isGrouped())
+            bool isFileGrouped = options->getPropBool("@grouped");
+            if (isFileGrouped && !isGrouped)
             {
                 // We are prepared to read contents of a grouped persist ungrouped... But not vice versa
                 WARNLOG("Published group information for file %s does not match coded information - assuming grouped", queryDynamicFileName());
                 Owned<IOutputMetaData> diskMeta(new CSuffixedOutputMeta(+1, LINK(diskSize.queryOriginal())));
                 diskSize.set(diskMeta);
+                isGrouped = true;
             }
             size32_t dfsSize = options->getPropInt("@recordSize");
             if (dfsSize && diskSize.isFixedSize() && dfsSize != diskSize.getFixedSize())
@@ -1055,7 +1062,7 @@ class CRoxieDiskReadActivity;
 class CRoxieCsvReadActivity;
 class CRoxieXmlReadActivity;
 IInMemoryFileProcessor *createKeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool resent);
-IInMemoryFileProcessor *createUnkeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool variableDisk, IDirectReader *reader);
+IInMemoryFileProcessor *createUnkeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool variableDisk, bool isGrouped, IDirectReader *reader);
 IInMemoryFileProcessor *createCsvRecordProcessor(CRoxieCsvReadActivity &owner, IDirectReader *reader, bool _skipHeader, const IResolvedFile *datafile, size32_t maxRowSize);
 IInMemoryFileProcessor *createXmlRecordProcessor(CRoxieXmlReadActivity &owner, IDirectReader *reader);
 
@@ -1085,7 +1092,7 @@ public:
         {
             CriticalBlock p(pcrit);
             processor.setown(isKeyed ? createKeyedRecordProcessor(cursor, *this, resent) : 
-                                       createUnkeyedRecordProcessor(cursor, *this, diskSize.isVariableSize(), manager->createReader(readPos, parallelPartNo, numParallel))); 
+                                       createUnkeyedRecordProcessor(cursor, *this, diskSize.isVariableSize(), isGrouped, manager->createReader(readPos, parallelPartNo, numParallel)));
         }
         unsigned __int64 rowLimit = helper->getRowLimit();
         unsigned __int64 stopAfter = helper->getChooseNLimit();
@@ -1453,8 +1460,8 @@ public:
 class UnkeyedVariableRecordProcessor : public UnkeyedRecordProcessor
 {
 public:
-    UnkeyedVariableRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskReadActivity &_owner, IDirectReader *_reader)
-      : UnkeyedRecordProcessor(_cursor, _owner, _reader), deserializeSource(_reader)
+    UnkeyedVariableRecordProcessor(IInMemoryIndexCursor *_cursor, CRoxieDiskReadActivity &_owner, bool _isGrouped, IDirectReader *_reader)
+      : UnkeyedRecordProcessor(_cursor, _owner, _reader), isGrouped(_isGrouped), deserializeSource(_reader)
     {
         prefetcher.setown(owner.diskSize.queryOriginal()->createDiskPrefetcher(owner.queryContext->queryCodeContext(), owner.basefactory->queryId()));
     }
@@ -1463,6 +1470,7 @@ public:
     {
         unsigned totalSizeSent = 0;
         helper->setCallback(reader->queryThorDiskCallback());
+        unsigned lastGroupProcessed = processed;
         while (!aborted && !deserializeSource.eos())
         {
             // This loop is the inner loop for memory diskreads - so keep it efficient!
@@ -1473,6 +1481,14 @@ public:
                 transformedSize = 0;
             else
                 transformedSize = owner.doTransform(output, nextRec);
+            bool eog;
+            if (isGrouped)
+            {
+                size32_t sizeRead = deserializeSource.queryRowSize();
+                eog = nextRec[sizeRead-1];
+            }
+            else
+                eog = false;
             deserializeSource.finishedRow();
             if (transformedSize)
             {
@@ -1485,7 +1501,7 @@ public:
                 if (processed == stopAfter)
                     return;
                 totalSizeSent += transformedSize;
-                if (totalSizeSent > indexReadChunkSize)
+                if (totalSizeSent > indexReadChunkSize && !isGrouped)
                 {
                     MemoryBuffer si;
                     unsigned short siLen = 0;
@@ -1500,18 +1516,34 @@ public:
                     return;
                 }
             }
+            if (eog && (lastGroupProcessed != processed))
+            {
+                // We return grouped data one whole group at a time
+                MemoryBuffer si;
+                unsigned short siLen = 0;
+                si.append(siLen);
+                si.append(processed);
+                si.append(false);  // not using a key
+                offset_t readPos = deserializeSource.tell();
+                si.append(readPos);
+                siLen = si.length() - sizeof(siLen);
+                si.writeDirect(0, sizeof(siLen), &siLen);
+                output->sendMetaInfo(si.toByteArray(), si.length());
+                return;
+            }
         }
     }
 
 protected:
     CThorContiguousRowBuffer deserializeSource;
     Owned<ISourceRowPrefetcher> prefetcher;
+    bool isGrouped;
 };
 
-IInMemoryFileProcessor *createUnkeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool variableDisk, IDirectReader *_reader)
+IInMemoryFileProcessor *createUnkeyedRecordProcessor(IInMemoryIndexCursor *cursor, CRoxieDiskReadActivity &owner, bool variableDisk, bool isGrouped, IDirectReader *_reader)
 {
-    if (variableDisk)
-        return new UnkeyedVariableRecordProcessor(cursor, owner, _reader);
+    if (variableDisk || isGrouped)
+        return new UnkeyedVariableRecordProcessor(cursor, owner, isGrouped, _reader);
     else
         return new UnkeyedRecordProcessor(cursor, owner, _reader);
 }

+ 9 - 2
roxie/ccd/ccdserver.cpp

@@ -3903,6 +3903,7 @@ private:
     bool allread;
     bool contextCached;
     bool preserveOrder;
+    bool eogSent = false;
 
     InterruptableSemaphore sentsome;
     Owned <IMessageCollator> mc;
@@ -4497,12 +4498,20 @@ public:
                 got = merger.next(matched, dummySmartStepExtra);
             }
             else if (mu)
+            {
                 got = getRow(mu);
+                if (!got && meta.isGrouped() && !eogSent)
+                {
+                    eogSent = true;
+                    return NULL;
+                }
+            }
             if (got)
             {
                 processRow(got);
                 return got;
             }
+            eogSent = false;
             if (!reload())
                 return NULL;
         }
@@ -21352,8 +21361,6 @@ public:
         {
             diskMeta.setown(new CSuffixedOutputMeta(+1, diskMeta.getClear()));
             isGrouped = true;
-            if (useRemote())
-                UNIMPLEMENTED;
         }
         diskSize.set(diskMeta);
         variableFileName = allFilesDynamic || factory->queryQueryFactory().isDynamic() || ((helper.getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0);