瀏覽代碼

Merge pull request #3606 from richardkchapman/roxie-copy

HPCC-8180 Roxie issues reloading changed files
Richard Chapman 12 年之前
父節點
當前提交
a8b8a93090

+ 3 - 1
roxie/ccd/ccdactivities.cpp

@@ -353,7 +353,9 @@ protected:
         if (variableFileName) // note - in keyed join with dependent index case, kj itself won't have variableFileName but indexread might
         {
             CDateTime cacheDate(serializedCreate);
-            varFileInfo.setown(querySlaveDynamicFileCache()->lookupDynamicFile(logctx, queryDynamicFileName(), cacheDate, &packet->queryHeader(), isOpt, true));
+            unsigned checksum;
+            serializedCreate.read(checksum);
+            varFileInfo.setown(querySlaveDynamicFileCache()->lookupDynamicFile(logctx, queryDynamicFileName(), cacheDate, checksum, &packet->queryHeader(), isOpt, true));
             setVariableFileInfo();
         }
     }

+ 54 - 49
roxie/ccd/ccdfile.cpp

@@ -87,8 +87,6 @@ protected:
     bool remote;
     offset_t fileSize;
     CDateTime fileDate;
-    bool fileIsMemFile;
-    bool copyInForeground;
     unsigned crc;
     Owned<ILazyFileIO> patchFile;
     StringBuffer baseIndexFileName;
@@ -117,8 +115,6 @@ public:
         readCount = 0;
 #endif
         memFileRequested = _memFileRequested;
-        fileIsMemFile = false;
-        copyInForeground = false;
         lastAccess = msTick();
         copying = false;
         cached = NULL;
@@ -397,13 +393,6 @@ public:
         return patchFile;
     }
 
-    // the following calls are always made from inside of a critical block...
-    virtual void setFileIsMemFile(bool val) { fileIsMemFile = val; }
-    virtual bool getFileIsMemFile() { return fileIsMemFile; }
-    virtual void setCopyInForeground(bool val) { copyInForeground = val; }
-    virtual bool getCopyInForeground() { return copyInForeground; }
-
-
     virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
     virtual void setSize(offset_t size) { throwUnexpected(); }
     virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }
@@ -1156,29 +1145,43 @@ public:
                 if ((size != -1 && size != f->getSize()) ||
                     (!modified.isNull() && !modified.equals(*f->queryDateTime(), false)))
                 {
-                    StringBuffer modifiedDt;
-                    if (!modified.isNull())
-                        modified.getString(modifiedDt);
-                    StringBuffer fileDt;
-                    f->queryDateTime()->getString(fileDt);
-                    if (fileErrorList.find(id) == 0)
+                    if (!f->IsShared())
                     {
-                        switch (fileType)
+                        // kill it
+                        files.remove(localLocation);
+                        ForEachItemInRev(idx, todo)
                         {
-                            case ROXIE_KEY:
-                                fileErrorList.setValue(id, "Key");
-                                break;
-                        
-                            case ROXIE_FILE:
-                                fileErrorList.setValue(id, "File");
-                                break;
+                            if (f == &todo.item(idx))
+                            {
+                                todo.remove(idx);
+                            }
                         }
                     }
-        
-                    throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %"I64F"d %"I64F"d  Date = %s  %s", id, size, f->getSize(), modifiedDt.str(), fileDt.str());
-                }
+                    else
+                    {
+                        StringBuffer modifiedDt;
+                        if (!modified.isNull())
+                            modified.getString(modifiedDt);
+                        StringBuffer fileDt;
+                        f->queryDateTime()->getString(fileDt);
+                        if (fileErrorList.find(id) == 0)
+                        {
+                            switch (fileType)
+                            {
+                                case ROXIE_KEY:
+                                    fileErrorList.setValue(id, "Key");
+                                    break;
 
-                return LINK(f);
+                                case ROXIE_FILE:
+                                    fileErrorList.setValue(id, "File");
+                                    break;
+                            }
+                        }
+                        throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %"I64F"d %"I64F"d  Date = %s  %s", id, size, f->getSize(), modifiedDt.str(), fileDt.str());
+                    }
+                }
+                else
+                    return LINK(f);
             }
 
             ret.setown(openFile(id, partNo, fileType, localLocation, peerRoxieCopiedLocationInfo, deployedLocationInfo, size, modified, memFile, crc, isCompressed));  // for now don't check crcs
@@ -1215,15 +1218,6 @@ public:
 //                  toCopy.signal();
                 }
             }
-            else
-            {
-                ret->setFileIsMemFile(memFile);
-                ret->setCopyInForeground(doForegroundCopy);
-                                    
-                todo.append(*ret);
-                atomic_inc(&numFilesToProcess);  // must increment counter for SNMP accuracy
-                toCopy.signal();
-            }
 
             if (!lazyOpen || fileType == ROXIE_PATCH)  // patch file MUST be open at this point - make sure we open it
                 ret->checkOpen();
@@ -1592,7 +1586,7 @@ inline void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations
     }
 }
 
-ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts)
+ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy)
 {
     IPropertyTree &partProps = pdesc->queryProperties();
     offset_t dfsSize = partProps.getPropInt64("@size");
@@ -1619,13 +1613,14 @@ ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, IPartDesc
 
     const char *logicalname = dlfn.get();
 
-    makePhysicalPartName(logicalname, partNo, numParts, localFileName, false, DFD_OSdefault, baseDataDirectory);  // MORE - if we get the dataDirectory we can pass it in and possibly` reuse an existing file
+    makePhysicalPartName(logicalname, partNo, numParts, localFileName, false, DFD_OSdefault, baseDataDirectory);  // MORE - if we get the dataDirectory we can pass it in and possibly reuse an existing file
 
     appendRemoteLocations(pdesc, remoteLocations, true);
     if (remotePDesc)
         appendRemoteLocations(remotePDesc, remoteLocations, false);
 
-    return queryFileCache().lookupFile(id, partNo, fileType, localFileName, NULL, NULL, localLocations, remoteLocations, dfsSize, fileDate, false, true, false, false, crcResources ? crc : 0, pdesc->queryOwner().isCompressed(), NULL);
+    bool foregroundCopy = numParts==1 || (partNo==numParts && fileType==ROXIE_KEY);
+    return queryFileCache().lookupFile(id, partNo, fileType, localFileName, NULL, NULL, localLocations, remoteLocations, dfsSize, fileDate, false, true, startCopy, foregroundCopy, crcResources ? crc : 0, pdesc->queryOwner().isCompressed(), NULL);
 }
 
 //====================================================================================================
@@ -1879,7 +1874,7 @@ public:
 
 template <class X> class PerChannelCacheOf
 {
-    PointerArrayOf<X> cache;
+    PointerIArrayOf<X> cache;
     IntArray channels;
 public:
     void set(X *value, unsigned channel)
@@ -1910,6 +1905,7 @@ protected:
     CDateTime fileTimeStamp;
     RoxieFileType fileType;
     offset_t fileSize;
+    unsigned fileCheckSum;
 
     StringArray subNames;
     PointerIArrayOf<IFileDescriptor> subFiles; // note - on slaves, the file descriptors may have incomplete info. On originating server is always complete
@@ -1955,6 +1951,7 @@ public:
     {
         cached = NULL;
         fileSize = 0;
+        fileCheckSum = 0;
         if (dFile)
         {
             if (traceLevel > 5)
@@ -1972,6 +1969,7 @@ public:
             else // normal file, not superkey
                 addFile(dFile->queryLogicalName(), dFile->getFileDescriptor());
             bool tsSet = dFile->getModificationTime(fileTimeStamp);
+            bool csSet = dFile->getFileCheckSum(fileCheckSum);
             assertex(tsSet); // per Nigel, is always set
             properties.set(&dFile->queryAttributes());
         }
@@ -2039,6 +2037,7 @@ public:
         byte type = (byte) fileType;
         mb.append(type);
         fileTimeStamp.serialize(mb);
+        mb.append(fileCheckSum);
         mb.append(fileSize);
         unsigned numSubFiles = subFiles.length();
         mb.append(numSubFiles);
@@ -2098,9 +2097,9 @@ public:
                         IPartDescriptor *pdesc = fdesc->queryPart(i-1);
                         assertex(pdesc);
                         IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, i-1);
-                        Owned<ILazyFileIO> file = createDynamicFile(subNames.item(0), pdesc, remotePDesc, ROXIE_FILE, numParts);
+                        Owned<ILazyFileIO> file = createDynamicFile(subNames.item(0), pdesc, remotePDesc, ROXIE_FILE, numParts, cached != NULL);
                         IPropertyTree &partProps = pdesc->queryProperties();
-                        f->addFile(LINK(file), partProps.getPropInt64("@offset"));
+                        f->addFile(file.getClear(), partProps.getPropInt64("@offset"));
                     }
                     catch (IException *E)
                     {
@@ -2175,7 +2174,7 @@ public:
                             IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, partNo-1);
                             if (pdesc)
                             {
-                                part.setown(createDynamicFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, fdesc->numParts()));
+                                part.setown(createDynamicFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, fdesc->numParts(), cached != NULL));
                                 pdesc->getCrc(crc);
                             }
                         }
@@ -2206,7 +2205,7 @@ public:
                     assertex(numParts > 0);
                     IPartDescriptor *pdesc = fdesc->queryPart(numParts - 1);
                     IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, numParts - 1);
-                    Owned<ILazyFileIO> keyFile = createDynamicFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, numParts);
+                    Owned<ILazyFileIO> keyFile = createDynamicFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, numParts, cached != NULL);
                     unsigned crc = 0;
                     pdesc->getCrc(crc);
                     StringBuffer pname;
@@ -2251,6 +2250,11 @@ public:
         return fileTimeStamp;
     }
 
+    virtual unsigned queryCheckSum() const
+    {
+        return fileCheckSum;
+    }
+
     virtual offset_t getFileSize() const
     {
         return fileSize;
@@ -2371,6 +2375,7 @@ public:
                 serverData.read(type);
                 fileType = (RoxieFileType) type;
                 fileTimeStamp.deserialize(serverData);
+                serverData.read(fileCheckSum);
                 serverData.read(fileSize);
                 unsigned numSubFiles;
                 serverData.read(numSubFiles);
@@ -2437,14 +2442,14 @@ public:
     IMPLEMENT_IINTERFACE;
     CSlaveDynamicFileCache(unsigned _limit) : tableSize(_limit) {}
 
-    virtual IResolvedFile *lookupDynamicFile(const IRoxieContextLogger &logctx, const char *lfn, CDateTime &cacheDate, RoxiePacketHeader *header, bool isOpt, bool isLocal)
+    virtual IResolvedFile *lookupDynamicFile(const IRoxieContextLogger &logctx, const char *lfn, CDateTime &cacheDate, unsigned checksum, RoxiePacketHeader *header, bool isOpt, bool isLocal)
     {
         if (logctx.queryTraceLevel() > 5)
         {
             StringBuffer s;
             logctx.CTXLOG("lookupDynamicFile %s for packet %s", lfn, header->toString(s).str());
         }
-        // we use a fixed-size array with linear lookup for ease of initial coding - but unless we start making heavy use of the feaure this may be adequate.
+        // we use a fixed-size array with linear lookup for ease of initial coding - but unless we start making heavy use of the feature this may be adequate.
         CriticalBlock b(crit);
         if (!cacheDate.isNull())
         {
@@ -2454,7 +2459,7 @@ public:
                 CSlaveDynamicFile &f = files.item(idx);
                 if (f.channel==header->channel && f.serverIdx==header->serverIdx && stricmp(f.queryLFN(), lfn)==0)
                 {
-                    if (!cacheDate.equals(f.queryTimeStamp()))
+                    if (!cacheDate.equals(f.queryTimeStamp()) || checksum != f.queryCheckSum())
                     {
                         if (f.isKey())
                             clearKeyStoreCacheEntry(f.queryLFN());

+ 1 - 4
roxie/ccd/ccdfile.hpp

@@ -41,10 +41,6 @@ interface ILazyFileIO : extends IFileIO
     virtual IFile *queryTarget() = 0;
     virtual void copyComplete() = 0;
     virtual int getLinkCount() const = 0;
-    virtual void setFileIsMemFile(bool val) = 0;
-    virtual bool getFileIsMemFile() = 0;
-    virtual void setCopyInForeground(bool val) = 0;
-    virtual bool getCopyInForeground() = 0;
     virtual bool createHardFileLink() = 0;
 
     virtual void setBaseIndexFileName(const char *val) =0;
@@ -112,6 +108,7 @@ interface IResolvedFile : extends ISimpleSuperFileEnquiry
     virtual offset_t getFileSize() const = 0;
 
     virtual const CDateTime &queryTimeStamp() const = 0;
+    virtual unsigned queryCheckSum() const = 0;
 
     virtual const char *queryFileName() const = 0;
     virtual void setCache(const IRoxiePackage *cache) = 0;

+ 21 - 6
roxie/ccd/ccdserver.cpp

@@ -3421,7 +3421,10 @@ private:
                 activity.serializeCreateStartContext(cachedContext.clear());
                 activity.serializeExtra(cachedContext);
                 if (activity.queryVarFileInfo())
+                {
                     activity.queryVarFileInfo()->queryTimeStamp().serialize(cachedContext);
+                    cachedContext.append(activity.queryVarFileInfo()->queryCheckSum());
+                }
                 contextCached = true;
             }
 
@@ -10633,7 +10636,9 @@ protected:
     CachedOutputMetaData diskmeta;
     Owned<IRoxieWriteHandler> writer;
 
+    bool tallycrc;
     unsigned __int64 uncompressedBytesWritten;
+    CRC32 crc;
 
     void updateWorkUnitResult(unsigned __int64 reccount)
     {
@@ -10715,6 +10720,7 @@ public:
         diskmeta.set(helper.queryDiskRecordSize());
         blockcompressed = (((helper.getFlags() & TDWnewcompress) != 0) || (((helper.getFlags() & TDXcompress) != 0) && (diskmeta.getFixedSize() >= MIN_ROWCOMPRESS_RECSIZE))); //always use new compression
         encrypted = false; // set later
+        tallycrc = true;
         uncompressedBytesWritten = 0;
     }
 
@@ -10745,17 +10751,17 @@ public:
             encrypted = true;
             blockcompressed = true;
         }
-        if(blockcompressed)
+        if (blockcompressed)
             io.setown(createCompressedFileWriter(writer->queryFile(), (diskmeta.isFixedSize() ? diskmeta.getFixedSize() : 0), extend, true, ecomp));
         else
             io.setown(writer->queryFile()->open(extend ? IFOwrite : IFOcreate));
-        if(!io)
+        if (!io)
             throw MakeStringException(errno, "Failed to create%s file %s for writing", (encrypted ? " encrypted" : (blockcompressed ? " compressed" : "")), writer->queryFile()->queryFilename());
         diskout.setown(createBufferedIOStream(io));
-        if(extend)
+        if (extend)
             diskout->seek(0, IFSend);
         rowSerializer.setown(input->queryOutputMeta()->createRowSerializer(ctx->queryCodeContext(), activityId)); 
-        bool tallycrc = !factory->queryQueryFactory().getDebugValueBool("skipFileFormatCrcCheck", false) && !(helper.getFlags() & TDRnocrccheck); 
+        tallycrc = !factory->queryQueryFactory().getDebugValueBool("skipFileFormatCrcCheck", false) && !(helper.getFlags() & TDRnocrccheck) && !blockcompressed;
         outSeq.setown(createRowWriter(diskout, rowSerializer, rowAllocator, grouped, tallycrc, true )); 
     }
 
@@ -10768,7 +10774,7 @@ public:
         }
         else
         {
-            outSeq->flush();
+            outSeq->flush(&crc);
             updateWorkUnitResult(processed);
             uncompressedBytesWritten = outSeq->getPosition();
             writer->finish(true, this);
@@ -10784,6 +10790,7 @@ public:
         outSeq.clear();
         writer.clear();
         uncompressedBytesWritten = 0;
+        crc.reset();
     }
 
     virtual void onExecute()
@@ -10814,6 +10821,8 @@ public:
             fileProps.setPropInt64("@size", uncompressedBytesWritten);
             partProps.setPropInt64("@size", uncompressedBytesWritten);
         }
+        else if (tallycrc)
+            partProps.setPropInt64("@fileCrc", crc.get());
 
         if (encrypted)
             fileProps.setPropBool("@encrypted", true);
@@ -23389,7 +23398,10 @@ public:
             MemoryBuffer tmp;
             rootIndex->queryActivity()->serializeCreateStartContext(tmp);
             if (rootIndex->queryActivity()->queryVarFileInfo())
+            {
                 rootIndex->queryActivity()->queryVarFileInfo()->queryTimeStamp().serialize(tmp);
+                tmp.append(rootIndex->queryActivity()->queryVarFileInfo()->queryCheckSum());
+            }
             unsigned ctxlen = tmp.length();
             out.append(ctxlen).append(tmp);
         }
@@ -24173,7 +24185,10 @@ public:
             MemoryBuffer tmp;
             rootIndex->queryActivity()->serializeCreateStartContext(tmp);
             if (rootIndex->queryActivity()->queryVarFileInfo())
+            {
                 rootIndex->queryActivity()->queryVarFileInfo()->queryTimeStamp().serialize(tmp);
+                tmp.append(rootIndex->queryActivity()->queryVarFileInfo()->queryCheckSum());
+            }
             unsigned ctxlen = tmp.length();
             out.append(ctxlen).append(tmp);
         }
@@ -28063,7 +28078,7 @@ public:
     virtual const IResolvedFile *resolveLFN(const char *filename, bool isOpt)
     {
         CDateTime cacheDate; // Note - this is empty meaning we don't know...
-        return querySlaveDynamicFileCache()->lookupDynamicFile(*this, filename, cacheDate, header, isOpt, false);
+        return querySlaveDynamicFileCache()->lookupDynamicFile(*this, filename, cacheDate, 0, header, isOpt, false);
     }
 
     virtual IRoxieWriteHandler *createLFN(const char *filename, bool overwrite, bool extend, const StringArray &clusters)

+ 1 - 1
roxie/ccd/ccdstate.hpp

@@ -83,7 +83,7 @@ extern IRoxiePackage *createPackage(IPropertyTree *p);
 
 interface ISlaveDynamicFileCache : extends IInterface
 {
-    virtual IResolvedFile *lookupDynamicFile(const IRoxieContextLogger &logctx, const char *lfn, CDateTime &cacheDate, RoxiePacketHeader *header, bool isOpt, bool isLocal) = 0; 
+    virtual IResolvedFile *lookupDynamicFile(const IRoxieContextLogger &logctx, const char *lfn, CDateTime &cacheDate, unsigned checksum, RoxiePacketHeader *header, bool isOpt, bool isLocal) = 0;
 };
 extern ISlaveDynamicFileCache *querySlaveDynamicFileCache();
 extern void releaseSlaveDynamicFileCache();

+ 1 - 1
system/jlib/jcrc.hpp

@@ -37,7 +37,7 @@ class jlib_decl CRC32
 public:
     CRC32(unsigned crc = ~0U) { reset(crc); }
     inline void reset(unsigned _crc = ~0U) { crc = _crc; }
-    inline unsigned get() { return ~crc; }
+    inline unsigned get() const { return ~crc; }
     void skip(offset_t length);
     void tally(unsigned len, const void * buf);
 

+ 15 - 0
testing/ecl/key/rewrite.xml

@@ -0,0 +1,15 @@
+<Dataset name='Result 1'>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 3'>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 5'>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><a>3</a></Row>
+</Dataset>

+ 17 - 0
testing/ecl/rewrite.ecl

@@ -0,0 +1,17 @@
+#option ('allowVariableRoxieFilenames', 1);
+string prefix := '' : stored('prefix');
+
+
+d1 := dataset([{1}], { INTEGER a });
+d2 := dataset([{2}], { INTEGER a });
+d3 := dataset([{3}], { INTEGER a });
+i := dataset('regress::outfile'+prefix, { INTEGER a }, FLAT);
+
+sequential(
+  output(d1,,'regress::outfile'+prefix, OVERWRITE),
+  output(i);
+  output(d2,,'regress::outfile'+prefix, OVERWRITE),
+  output(i);
+  output(d3,,'regress::outfile'+prefix, OVERWRITE),
+  output(i);
+);

+ 15 - 0
testing/ecl/roxie/key/rewrite.xml

@@ -0,0 +1,15 @@
+<Dataset name='Result 1'>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><a>1</a></Row>
+</Dataset>
+<Dataset name='Result 3'>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><a>2</a></Row>
+</Dataset>
+<Dataset name='Result 5'>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><a>3</a></Row>
+</Dataset>