Browse Source

Merge pull request #9713 from richardkchapman/hpcc-17097-roxie-record-length

HPCC-17097 Roxie does not report record length mismatches on disk files

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 8 years ago
parent
commit
dd2227b6c1

+ 19 - 1
roxie/ccd/ccdactivities.cpp

@@ -863,7 +863,10 @@ public:
         helper = (IHThorDiskReadBaseArg *) basehelper;
         variableFileName = allFilesDynamic || basefactory->queryQueryFactory().isDynamic() || ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0);
         isOpt = (helper->getFlags() & TDRoptional) != 0;
-        diskSize.set(helper->queryDiskRecordSize());
+        Linked<IOutputMetaData> diskMeta(helper->queryDiskRecordSize()->querySerializedDiskMeta());
+        if (diskMeta->isGrouped())
+            diskMeta.setown(new CSuffixedOutputMeta(+1, diskMeta.getClear()));
+        diskSize.set(diskMeta);
         processed = 0;
         readPos = 0;
         isKeyed = false;
@@ -899,6 +902,21 @@ public:
 
     virtual void setVariableFileInfo()
     {
+        const IPropertyTree *options =  varFileInfo->queryProperties();
+        if (options)
+        {
+            bool isGrouped = options->getPropBool("@grouped");
+            if (isGrouped && !diskSize.isGrouped())
+            {
+                // We are prepared to read contents of a grouped persist ungrouped... But not vice versa
+                WARNLOG("Published group information for file %s does not match coded information - assuming grouped", queryDynamicFileName());
+                Owned<IOutputMetaData> diskMeta(new CSuffixedOutputMeta(+1, LINK(diskSize.queryOriginal())));
+                diskSize.set(diskMeta);
+            }
+            size32_t dfsSize = options->getPropInt("@recordSize");
+            if (dfsSize && diskSize.isFixedSize() && dfsSize != diskSize.getFixedSize())
+                throw MakeStringException(ROXIE_LAYOUT_MISMATCH, "Published record size %d for file %s (%s) does not match coded record size %d", dfsSize, queryDynamicFileName(), isGrouped ? "grouped" : "ungrouped", diskSize.getFixedSize());
+        }
         unsigned channel = packet->queryHeader().channel;
         varFiles.setown(varFileInfo->getIFileIOArray(isOpt, channel)); // MORE could combine 
         manager.setown(varFileInfo->getIndexManager(isOpt, channel, varFiles, diskSize, false, 0));

+ 102 - 12
roxie/ccd/ccdserver.cpp

@@ -11366,7 +11366,7 @@ protected:
     bool encrypted;
     bool grouped;
     IHThorDiskWriteArg &helper;
-    CachedOutputMetaData diskmeta;
+    Owned<IRecordSize> diskmeta;
     Owned<IRoxieWriteHandler> writer;
 
     bool tallycrc;
@@ -11448,13 +11448,15 @@ protected:
 
 public:
     CRoxieServerDiskWriteActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
-        : CRoxieServerInternalSinkActivity(_ctx, _factory, _probeManager, 0), helper((IHThorDiskWriteArg &)basehelper)
+        : CRoxieServerInternalSinkActivity(_ctx, _factory, _probeManager, 0), helper((IHThorDiskWriteArg &) basehelper)
     {
         extend = ((helper.getFlags() & TDWextend) != 0);
         overwrite = ((helper.getFlags() & TDWoverwrite) != 0);
-        grouped = false; // don't think we need to support it...
-        diskmeta.set(helper.queryDiskRecordSize());
-        blockcompressed = (((helper.getFlags() & TDWnewcompress) != 0) || (((helper.getFlags() & TDXcompress) != 0) && (diskmeta.getFixedSize() >= MIN_ROWCOMPRESS_RECSIZE))); //always use new compression
+        grouped = (helper.getFlags() & TDXgrouped) != 0;
+        diskmeta.set(helper.queryDiskRecordSize()->querySerializedDiskMeta());
+        if (grouped)
+            diskmeta.setown(createDeltaRecordSize(diskmeta, +1));
+        blockcompressed = (((helper.getFlags() & TDWnewcompress) != 0) || (((helper.getFlags() & TDXcompress) != 0) && (diskmeta->getFixedSize() >= MIN_ROWCOMPRESS_RECSIZE))); //always use new compression
         encrypted = false; // set later
         tallycrc = true;
         uncompressedBytesWritten = 0;
@@ -11488,7 +11490,7 @@ public:
             blockcompressed = true;
         }
         if (blockcompressed)
-            io.setown(createCompressedFileWriter(writer->queryFile(), (diskmeta.isFixedSize() ? diskmeta.getFixedSize() : 0), extend, true, ecomp));
+            io.setown(createCompressedFileWriter(writer->queryFile(), (diskmeta->isFixedSize() ? diskmeta->getFixedSize() : 0), extend, true, ecomp));
         else
             io.setown(writer->queryFile()->open(extend ? IFOwrite : IFOcreate));
         if (!io)
@@ -11546,11 +11548,17 @@ public:
     {
         for (;;)
         {
-            const void *nextrec = inputStream->ungroupedNextRow();
-            if (!nextrec)
-                break;
+            const void *row = inputStream->nextRow();
+            if (!row)
+            {
+                row = inputStream->nextRow();
+                if (!row)
+                    break;
+                if (grouped)
+                    outSeq->putRow(NULL);
+            }
             processed++;
-            outSeq->putRow(nextrec);
+            outSeq->putRow(row);
         }
     }
 
@@ -21303,6 +21311,7 @@ protected:
     bool maySkip;
     bool isLocal;
     bool forceRemote;
+    bool isGrouped = false;
     CachedOutputMetaData diskSize;
     Owned<const IResolvedFile> varFileInfo;
     Owned<IFileIOArray> varFiles;
@@ -21334,7 +21343,15 @@ public:
         rowLimit = (unsigned __int64) -1;
         isKeyed = false;
         stopAfter = I64C(0x7FFFFFFFFFFFFFFF);
-        diskSize.set(helper.queryDiskRecordSize());
+        Linked<IOutputMetaData> diskMeta(helper.queryDiskRecordSize()->querySerializedDiskMeta());
+        if (diskMeta->isGrouped())
+        {
+            diskMeta.setown(new CSuffixedOutputMeta(+1, diskMeta.getClear()));
+            isGrouped = true;
+            if (useRemote())
+                UNIMPLEMENTED;
+        }
+        diskSize.set(diskMeta);
         variableFileName = allFilesDynamic || factory->queryQueryFactory().isDynamic() || ((helper.getFlags() & (TDXvarfilename|TDXdynamicfilename)) != 0);
         isOpt = (helper.getFlags() & TDRoptional) != 0;
     }
@@ -21371,7 +21388,26 @@ public:
                 varFileInfo.setown(resolveLFN(fileName, isOpt));
                 numParts = 0;
                 if (varFileInfo)
+                {
                     numParts = varFileInfo->getNumParts();
+                    const IPropertyTree *options =  varFileInfo->queryProperties();
+                    if (options)
+                    {
+                        isGrouped = options->getPropBool("@grouped");
+                        if (isGrouped && !diskSize.isGrouped())
+                        {
+                            // We are prepared to read contents of a grouped persist ungrouped... But not vice versa
+                            VStringBuffer msg("Published group information for file %s does not match coded information - assuming grouped", fileName.get());
+                            WARNLOG("%s", msg.str());
+                            ctx->queryCodeContext()->addWuException(msg.str(), ROXIE_LAYOUT_MISMATCH, SeverityError, "roxie");
+                            Owned<IOutputMetaData> diskMeta(new CSuffixedOutputMeta(+1, LINK(diskSize.queryOriginal())));
+                            diskSize.set(diskMeta);
+                        }
+                        size32_t dfsSize = options->getPropInt("@recordSize");
+                        if (dfsSize && diskSize.isFixedSize() && dfsSize != diskSize.getFixedSize())
+                            throw MakeStringException(ROXIE_LAYOUT_MISMATCH, "Published record size %d for file %s (%s) does not match coded record size %d", dfsSize, fileName.get(), isGrouped ? "grouped" : "ungrouped", diskSize.getFixedSize());
+                    }
+                }
             }
             if (!numParts)
             {
@@ -21515,7 +21551,9 @@ class CRoxieServerDiskReadActivity : public CRoxieServerDiskReadBaseActivity
     IHThorCompoundReadExtra * readHelper;
     ConstPointerArray readrows;
     bool readAheadDone;
+    bool eogPending;
     unsigned readIndex;
+    unsigned lastGroupProcessed;
 
 public:
     CRoxieServerDiskReadActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, const RemoteActivityId &_remoteId, unsigned _numParts, bool _isLocal, bool _sorted, bool _maySkip, IInMemoryIndexManager *_manager)
@@ -21525,6 +21563,8 @@ public:
         readHelper = (IHThorDiskReadArg *)&helper;
         readAheadDone = false;
         readIndex = 0;
+        eogPending = false;
+        lastGroupProcessed = processed;
     }
 
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
@@ -21534,6 +21574,8 @@ public:
         CRoxieServerDiskReadBaseActivity::start(parentExtractSize, parentExtract, paused);
         readAheadDone = false;
         readIndex = 0;
+        eogPending = false;
+        lastGroupProcessed = processed;
     }
 
     virtual void reset()
@@ -21542,6 +21584,8 @@ public:
         readrows.kill();
         readAheadDone = false;
         readIndex = 0;
+        eogPending = false;
+        lastGroupProcessed = processed;
         CRoxieServerDiskReadBaseActivity::reset();
     }
 
@@ -21627,6 +21671,12 @@ public:
 
     const void *_nextRow()
     {
+        if (eogPending && (lastGroupProcessed != processed))
+        {
+            eogPending = false;
+            lastGroupProcessed = processed;
+            return NULL;
+        }
         RtlDynamicRowBuilder rowBuilder(rowAllocator);
         unsigned transformedSize = 0;
         if (isKeyed)
@@ -21660,9 +21710,29 @@ public:
                     transformedSize = 0;
                 else
                     transformedSize = readHelper->transform(rowBuilder, nextRec);
+                bool eog;
+                if (isGrouped)
+                {
+                    size32_t sizeRead = diskSize.getRecordSize(nextRec);
+                    eog = nextRec[sizeRead-1];
+                }
+                else
+                    eog = false;
                 deserializeSource.finishedRow();
                 if (transformedSize)
+                {
+                    if (isGrouped)
+                        eogPending = eog;
                     break;
+                }
+                else
+                {
+                    if (eog && (lastGroupProcessed != processed))
+                    {
+                        lastGroupProcessed = processed;
+                        return NULL;
+                    }
+                }
             }
         }
         return rowBuilder.finalizeRowClear(transformedSize);
@@ -22378,6 +22448,26 @@ public:
             bool isOpt = (helper->getFlags() & TDRoptional) != 0;
             OwnedRoxieString fileName(helper->getFileName());
             datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
+            Linked<IOutputMetaData> diskMeta(helper->queryDiskRecordSize()->querySerializedDiskMeta());
+            if (diskMeta->isGrouped())
+                diskMeta.setown(new CSuffixedOutputMeta(+1, diskMeta.getClear()));
+            if (datafile)
+            {
+                const IPropertyTree *options =  datafile->queryProperties();
+                if (options)
+                {
+                    bool isGrouped = options->getPropBool("@grouped");
+                    if (isGrouped && !diskMeta->isGrouped())
+                    {
+                        // We are prepared to read contents of a grouped persist ungrouped... But not vice versa
+                        WARNLOG("Published group information for file %s does not match coded information - assuming grouped", fileName.get());
+                        diskMeta.setown(new CSuffixedOutputMeta(+1, diskMeta.getClear()));
+                    }
+                    size32_t dfsSize = options->getPropInt("@recordSize");
+                    if (dfsSize && diskMeta->isFixedSize() && dfsSize != diskMeta->getFixedSize())
+                        throw MakeStringException(ROXIE_LAYOUT_MISMATCH, "Published record size %d for file %s (%s) does not match coded record size %d", dfsSize, fileName.get(), isGrouped ? "grouped" : "ungrouped", diskMeta->getFixedSize());
+                }
+            }
             bool isSimple = (datafile && datafile->getNumParts()==1 && !_queryFactory.queryOptions().disableLocalOptimizations);
             if (isLocal || isSimple)
             {
@@ -22385,7 +22475,7 @@ public:
                 {
                     unsigned channel = isLocal ? queryFactory.queryChannel() : 0;
                     files.setown(datafile->getIFileIOArray(isOpt, channel));
-                    manager.setown(datafile->getIndexManager(isOpt, channel, files, helper->queryDiskRecordSize(), _graphNode.getPropBool("att[@name=\"preload\"]/@value", false), _graphNode.getPropInt("att[@name=\"_preloadSize\"]/@value", 0)));
+                    manager.setown(datafile->getIndexManager(isOpt, channel, files, diskMeta, _graphNode.getPropBool("att[@name=\"preload\"]/@value", false), _graphNode.getPropInt("att[@name=\"_preloadSize\"]/@value", 0)));
                     const IPropertyTree *options = datafile->queryProperties();
                     if (options)
                     {

+ 1 - 0
roxie/roxie/roxie.hpp

@@ -85,6 +85,7 @@
 #define ROXIE_OPT_REPORTING         ROXIE_ERROR_START+63
 #define ROXIE_MISSING_DLL           ROXIE_ERROR_START+64
 #define ROXIE_INVALID_TARGET        ROXIE_ERROR_START+65
+#define ROXIE_LAYOUT_MISMATCH       ROXIE_ERROR_START+66
 
 
 

+ 31 - 0
testing/regress/ecl/groupread.ecl

@@ -0,0 +1,31 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//class=file
+//version multiPart=false
+//version multiPart=true
+
+import ^ as root;
+multiPart := #IFDEFINED(root.multiPart, true);
+useLocal := #IFDEFINED(root.useLocal, false);
+
+//--- end of version configuration ---
+
+import $.setup;
+Files := setup.Files(multiPart, useLocal, false);
+
+TABLE(Files.DG_FlatFileGrouped, {DG_FirstName, COUNT(GROUP)});

+ 6 - 0
testing/regress/ecl/key/groupread.xml

@@ -0,0 +1,6 @@
+<Dataset name='Result 1'>
+ <Row><dg_firstname>CLAIRE    </dg_firstname><_unnamed_cnt_2>16</_unnamed_cnt_2></Row>
+ <Row><dg_firstname>DAVID     </dg_firstname><_unnamed_cnt_2>16</_unnamed_cnt_2></Row>
+ <Row><dg_firstname>KELLY     </dg_firstname><_unnamed_cnt_2>16</_unnamed_cnt_2></Row>
+ <Row><dg_firstname>KIMBERLY  </dg_firstname><_unnamed_cnt_2>16</_unnamed_cnt_2></Row>
+</Dataset>

+ 2 - 0
testing/regress/ecl/key/setup.xml

@@ -28,3 +28,5 @@
 </Dataset>
 <Dataset name='Result 15'>
 </Dataset>
+<Dataset name='Result 16'>
+</Dataset>

+ 2 - 0
testing/regress/ecl/setup/files.ecl

@@ -52,6 +52,7 @@ EXPORT STRING indexPrefix := IF(multiPart AND useLocal, 'local',
 EXPORT DG_FileOut           := '~REGRESS::' + filePrefix + '::DG_';
 EXPORT DG_IndexOut           := '~REGRESS::' + indexPrefix + '::DG_';
 EXPORT DG_ParentFileOut     := '~REGRESS::' + filePrefix + '::DG_Parent.d00';
+EXPORT DG_ParentFileOutGrouped     := '~REGRESS::' + filePrefix + '::DG_ParentGrouped.d00';
 EXPORT DG_ChildFileOut      := '~REGRESS::' + filePrefix + '::DG_Child.d00';
 EXPORT DG_GrandChildFileOut := '~REGRESS::' + filePrefix + '::DG_GrandChild.d00';
 EXPORT DG_FetchFileName     := '~REGRESS::' + filePrefix + '::C.DG_FetchFile';
@@ -122,6 +123,7 @@ END;
 EXPORT DG_BlankSet := dataset([{0,'','',0}],DG_OutRec);
 
 EXPORT DG_FlatFile      := DATASET(DG_FileOut+'FLAT',{DG_OutRec,UNSIGNED8 filepos{virtual(fileposition)}},FLAT);
+EXPORT DG_FlatFileGrouped := DATASET(DG_FileOut+'GROUPED',{DG_OutRec,UNSIGNED8 filepos{virtual(fileposition)}},FLAT,__GROUPED__);
 EXPORT DG_FlatFileEvens := DATASET(DG_FileOut+'FLAT_EVENS',{DG_OutRec,UNSIGNED8 filepos{virtual(fileposition)}},FLAT);
 
 EXPORT DG_NormalIndexFile      := INDEX(DG_FlatFile, { DG_firstname, DG_lastname }, { DG_Prange, filepos }, DG_IndexOut+'INDEX');

+ 1 - 0
testing/regress/ecl/setup/setup.ecl

@@ -84,6 +84,7 @@ EvensFilter := DG_ParentRecs.DG_firstname in [Files.DG_Fnames[2],Files.DG_Fnames
 
 SEQUENTIAL(
     PARALLEL(output(DG_ParentRecs,,Files.DG_FileOut+'FLAT',overwrite),
+             output(GROUP(SORT(DG_ParentRecs, DG_FirstName),DG_Firstname),,Files.DG_FileOut+'GROUPED',__GROUPED__,overwrite),
              output(DG_ParentRecs(EvensFilter),,Files.DG_FileOut+'FLAT_EVENS',overwrite)),
     PARALLEL(buildindex(Files.DG_NormalIndexFile,overwrite),
              buildindex(Files.DG_NormalIndexFileEvens,overwrite),

+ 2 - 0
testing/regress/ecl/setup/thor/setup.xml

@@ -40,3 +40,5 @@
 </Dataset>
 <Dataset name='Result 21'>
 </Dataset>
+<Dataset name='Result 22'>
+</Dataset>