Explorar o código

Merge pull request #8608 from jakesmith/hpcc-14416

HPCC-14416 Create compressed empty files when writing larger file

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday %!s(int64=9) %!d(string=hai) anos
pai
achega
b5e7d4fc76

+ 1 - 1
thorlcr/activities/keydiff/thkeydiff.cpp

@@ -174,7 +174,7 @@ public:
         container.queryTempHandler()->registerFile(outputName, container.queryOwner().queryGraphId(), 0, false, WUFileStandard, &clusters);
         Owned<IDistributedFile> patchFile;
         // set part sizes etc
-        queryThorFileManager().publish(container.queryJob(), outputName, *patchDesc, &patchFile, 0, false);
+        queryThorFileManager().publish(container.queryJob(), outputName, *patchDesc, &patchFile);
         try { // set file size
             if (patchFile) {
                 __int64 fs = patchFile->getFileSize(true,false);

+ 82 - 1
thorlcr/activities/thdiskbase.cpp

@@ -235,7 +235,88 @@ void CWriteMasterBase::publish()
     }
     container.queryTempHandler()->registerFile(fileName, container.queryOwner().queryGraphId(), diskHelperBase->getTempUsageCount(), TDXtemporary & diskHelperBase->getFlags(), getDiskOutputKind(diskHelperBase->getFlags()), &clusters);
     if (!dlfn.isExternal())
-        queryThorFileManager().publish(container.queryJob(), fileName, *fileDesc, NULL, targetOffset);
+    {
+        bool temporary = 0 != (diskHelperBase->getFlags()&TDXtemporary);
+        if (!temporary && (queryJob().querySlaves() < fileDesc->numParts()))
+        {
+            // create empty parts for a fileDesc being published that is larger than this clusters
+            size32_t recordSize = 0;
+            IOutputMetaData *diskRowMeta = diskHelperBase->queryDiskRecordSize()->querySerializedDiskMeta();
+            if (diskRowMeta->isFixedSize() && (TAKdiskwrite == container.getKind()))
+            {
+                recordSize = diskRowMeta->getMinRecordSize();
+                if (0 != (diskHelperBase->getFlags() & TDXgrouped))
+                    recordSize += 1;
+            }
+            unsigned compMethod = COMPRESS_METHOD_LZW;
+            // rowdiff used if recordSize > 0, else fallback to compMethod
+            if (getOptBool(THOROPT_COMP_FORCELZW, false))
+            {
+                recordSize = 0; // by default if fixed length (recordSize set), row diff compression is used. This forces compMethod.
+                compMethod = COMPRESS_METHOD_LZW;
+            }
+            else if (getOptBool(THOROPT_COMP_FORCEFLZ, false))
+                compMethod = COMPRESS_METHOD_FASTLZ;
+            else if (getOptBool(THOROPT_COMP_FORCELZ4, false))
+                compMethod = COMPRESS_METHOD_LZ4;
+            bool blockCompressed;
+            bool compressed = fileDesc->isCompressed(&blockCompressed);
+            for (unsigned clusterIdx=0; clusterIdx<fileDesc->numClusters(); clusterIdx++)
+            {
+                StringBuffer clusterName;
+                fileDesc->getClusterGroupName(clusterIdx, clusterName, &queryNamedGroupStore());
+                PROGLOG("Creating blank parts for file '%s', cluster '%s'", fileName.get(), clusterName.str());
+                unsigned p=0;
+                while (p<fileDesc->numParts())
+                {
+                    if (p == targetOffset)
+                        p += queryJob().querySlaves();
+                    IPartDescriptor *partDesc = fileDesc->queryPart(p);
+                    CDateTime createTime, modifiedTime;
+                    for (unsigned c=0; c<partDesc->numCopies(); c++)
+                    {
+                        RemoteFilename rfn;
+                        partDesc->getFilename(c, rfn);
+                        StringBuffer path;
+                        rfn.getPath(path);
+                        try
+                        {
+                            ensureDirectoryForFile(path.str());
+                            OwnedIFile iFile = createIFile(path.str());
+                            Owned<IFileIO> iFileIO;
+                            if (compressed)
+                                iFileIO.setown(createCompressedFileWriter(iFile, recordSize, false, true, NULL, compMethod));
+                            else
+                                iFileIO.setown(iFile->open(IFOcreate));
+                            dbgassertex(iFileIO.get());
+                            iFileIO.clear();
+                            // ensure copies have matching datestamps, as they would do normally (backupnode expects it)
+                            if (partDesc->numCopies() > 1)
+                            {
+                                if (0 == c)
+                                    iFile->getTime(&createTime, &modifiedTime, NULL);
+                                else
+                                    iFile->setTime(&createTime, &modifiedTime, NULL);
+                            }
+                        }
+                        catch (IException *e)
+                        {
+                            if (0 == c)
+                                throw;
+                            Owned<IThorException> e2 = MakeThorException(e);
+                            e->Release();
+                            e2->setAction(tea_warning);
+                            queryJob().fireException(e2);
+                        }
+                    }
+                    partDesc->queryProperties().setPropInt64("@size", 0);
+                    p++;
+                }
+                clusterIdx++;
+            }
+        }
+        queryThorFileManager().publish(container.queryJob(), fileName, *fileDesc, NULL);
+    }
 }
 
 CWriteMasterBase::CWriteMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(info->queryJob(), diskWriteRemoteStatistics)

+ 1 - 59
thorlcr/mfilemanager/thmfilemanager.cpp

@@ -479,70 +479,12 @@ public:
         return LINK(desc);
     }
 
-    void publish(CJobBase &job, const char *logicalName, IFileDescriptor &fileDesc, Owned<IDistributedFile> *publishedFile=NULL, unsigned partOffset=0, bool createMissingParts=true)
+    void publish(CJobBase &job, const char *logicalName, IFileDescriptor &fileDesc, Owned<IDistributedFile> *publishedFile=NULL)
     {
         IPropertyTree &props = fileDesc.queryProperties();
         bool temporary = props.getPropBool("@temporary");
         if (!temporary || job.queryUseCheckpoints())
             queryDistributedFileDirectory().removeEntry(logicalName, job.queryUserDescriptor());
-
-        if (!temporary && createMissingParts && !isFileKey(fileDesc.queryProperties()))
-        {
-            // create empty parts for a fileDesc being published that is larger than this clusters
-            unsigned clusterIdx = 0;
-            for (; clusterIdx<fileDesc.numClusters(); clusterIdx++)
-            {
-                if (job.querySlaves() < fileDesc.numParts())
-                {
-                    StringBuffer clusterName;
-                    fileDesc.getClusterGroupName(clusterIdx, clusterName, &queryNamedGroupStore());
-                    PROGLOG("Creating blank parts for file '%s', cluster '%s'", logicalName, clusterName.str());
-                    unsigned p=0;
-                    while (p<fileDesc.numParts())
-                    {
-                        if (p == partOffset)
-                            p += job.querySlaves();
-                        IPartDescriptor *partDesc = fileDesc.queryPart(p);
-                        CDateTime createTime, modifiedTime;
-                        unsigned c=0;
-                        for (; c<partDesc->numCopies(); c++)
-                        {
-                            RemoteFilename rfn;
-                            partDesc->getFilename(c, rfn);
-                            StringBuffer path;
-                            rfn.getPath(path);
-                            try
-                            {
-                                ensureDirectoryForFile(path.str());
-                                OwnedIFile iFile = createIFile(path.str());
-                                OwnedIFileIO iFileIO = iFile->open(IFOcreate);
-                                iFileIO.clear();
-                                // ensure copies have matching datestamps, as they would do normally (backupnode expects it)
-                                if (partDesc->numCopies() > 1)
-                                {
-                                    if (0 == c)
-                                        iFile->getTime(&createTime, &modifiedTime, NULL);
-                                    else
-                                        iFile->setTime(&createTime, &modifiedTime, NULL);
-                                }
-                            }
-                            catch (IException *e)
-                            {
-                                if (0 == c)
-                                    throw;
-                                Owned<IThorException> e2 = MakeThorException(e);
-                                e->Release();
-                                e2->setAction(tea_warning);
-                                job.fireException(e2);
-                            }
-                        }
-                        partDesc->queryProperties().setPropInt64("@size", 0);
-                        p++;
-                    }
-                }
-                clusterIdx++;
-            }
-        }       
         // thor clusters are backed up so if replicateOutputs set *always* assume a replicate
         if (replicateOutputs && (!temporary || job.queryUseCheckpoints()))
         {

+ 1 - 1
thorlcr/mfilemanager/thmfilemanager.hpp

@@ -47,7 +47,7 @@ interface IThorFileManager : extends IInterface
     virtual void noteFileRead(CJobBase &job, IDistributedFile *file, bool extended=false) = 0;
     virtual IDistributedFile *lookup(CJobBase &job, const char *logicalName, bool temporary=false, bool optional=false, bool reportOptional=false, bool updateAccessed=true) = 0;
     virtual IFileDescriptor *create(CJobBase &job, const char *logicalName, StringArray &groupNames, IArrayOf<IGroup> &groups, bool overwriteok, unsigned helperFlags=0, bool nonLocalIndex=false, unsigned restrictedWidth=0) = 0;
-    virtual void publish(CJobBase &job, const char *logicalName, IFileDescriptor &file, Owned<IDistributedFile> *publishedFile=NULL, unsigned partOffset=0, bool createMissingParts=true) = 0;
+    virtual void publish(CJobBase &job, const char *logicalName, IFileDescriptor &file, Owned<IDistributedFile> *publishedFile=NULL) = 0;
     virtual void clearCacheEntry(const char *name) = 0;
     virtual StringBuffer &mangleLFN(CJobBase &job, const char *lfn, StringBuffer &out) = 0;
     virtual StringBuffer &addScope(CJobBase &job, const char *logicalname, StringBuffer &ret, bool temporary=false, bool paused=false) = 0;