Преглед изворни кода

HPCC-14416 Create compressed empty files when writing larger file

When a file is output to a larger cluster, the number of parts is
padded to match the destination cluster.
If the target file is compressed the empty physical parts need to
be properly constructed, so that they have a compressed format
header. Prior to this fix, it used to create a 0 length file part,
which was fine if uncompressed, but caused read back issues if
the file was compressed.

Most of the diff in this commit, is moving functionality that
used to be in thmfilemanager.cpp to the diskwrite activity (the
only place it is needed), so that it can read needed info from the
activity (compression type, recordsize)

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith пре 9 година
родитељ
комит
4c0b4a07ad

+ 1 - 1
thorlcr/activities/keydiff/thkeydiff.cpp

@@ -174,7 +174,7 @@ public:
         container.queryTempHandler()->registerFile(outputName, container.queryOwner().queryGraphId(), 0, false, WUFileStandard, &clusters);
         Owned<IDistributedFile> patchFile;
         // set part sizes etc
-        queryThorFileManager().publish(container.queryJob(), outputName, *patchDesc, &patchFile, 0, false);
+        queryThorFileManager().publish(container.queryJob(), outputName, *patchDesc, &patchFile);
         try { // set file size
             if (patchFile) {
                 __int64 fs = patchFile->getFileSize(true,false);

+ 82 - 1
thorlcr/activities/thdiskbase.cpp

@@ -235,7 +235,88 @@ void CWriteMasterBase::publish()
     }
     container.queryTempHandler()->registerFile(fileName, container.queryOwner().queryGraphId(), diskHelperBase->getTempUsageCount(), TDXtemporary & diskHelperBase->getFlags(), getDiskOutputKind(diskHelperBase->getFlags()), &clusters);
     if (!dlfn.isExternal())
-        queryThorFileManager().publish(container.queryJob(), fileName, *fileDesc, NULL, targetOffset);
+    {
+        bool temporary = 0 != (diskHelperBase->getFlags()&TDXtemporary);
+        if (!temporary && (queryJob().querySlaves() < fileDesc->numParts()))
+        {
+            // create empty parts for a fileDesc being published that is larger than this clusters
+            size32_t recordSize = 0;
+            IOutputMetaData *diskRowMeta = diskHelperBase->queryDiskRecordSize()->querySerializedDiskMeta();
+            if (diskRowMeta->isFixedSize() && (TAKdiskwrite == container.getKind()))
+            {
+                recordSize = diskRowMeta->getMinRecordSize();
+                if (0 != (diskHelperBase->getFlags() & TDXgrouped))
+                    recordSize += 1;
+            }
+            unsigned compMethod = COMPRESS_METHOD_LZW;
+            // rowdiff used if recordSize > 0, else fallback to compMethod
+            if (getOptBool(THOROPT_COMP_FORCELZW, false))
+            {
+                recordSize = 0; // by default if fixed length (recordSize set), row diff compression is used. This forces compMethod.
+                compMethod = COMPRESS_METHOD_LZW;
+            }
+            else if (getOptBool(THOROPT_COMP_FORCEFLZ, false))
+                compMethod = COMPRESS_METHOD_FASTLZ;
+            else if (getOptBool(THOROPT_COMP_FORCELZ4, false))
+                compMethod = COMPRESS_METHOD_LZ4;
+            bool blockCompressed;
+            bool compressed = fileDesc->isCompressed(&blockCompressed);
+            for (unsigned clusterIdx=0; clusterIdx<fileDesc->numClusters(); clusterIdx++)
+            {
+                StringBuffer clusterName;
+                fileDesc->getClusterGroupName(clusterIdx, clusterName, &queryNamedGroupStore());
+                PROGLOG("Creating blank parts for file '%s', cluster '%s'", fileName.get(), clusterName.str());
+                unsigned p=0;
+                while (p<fileDesc->numParts())
+                {
+                    if (p == targetOffset)
+                        p += queryJob().querySlaves();
+                    IPartDescriptor *partDesc = fileDesc->queryPart(p);
+                    CDateTime createTime, modifiedTime;
+                    for (unsigned c=0; c<partDesc->numCopies(); c++)
+                    {
+                        RemoteFilename rfn;
+                        partDesc->getFilename(c, rfn);
+                        StringBuffer path;
+                        rfn.getPath(path);
+                        try
+                        {
+                            ensureDirectoryForFile(path.str());
+                            OwnedIFile iFile = createIFile(path.str());
+                            Owned<IFileIO> iFileIO;
+                            if (compressed)
+                                iFileIO.setown(createCompressedFileWriter(iFile, recordSize, false, true, NULL, compMethod));
+                            else
+                                iFileIO.setown(iFile->open(IFOcreate));
+                            dbgassertex(iFileIO.get());
+                            iFileIO.clear();
+                            // ensure copies have matching datestamps, as they would do normally (backupnode expects it)
+                            if (partDesc->numCopies() > 1)
+                            {
+                                if (0 == c)
+                                    iFile->getTime(&createTime, &modifiedTime, NULL);
+                                else
+                                    iFile->setTime(&createTime, &modifiedTime, NULL);
+                            }
+                        }
+                        catch (IException *e)
+                        {
+                            if (0 == c)
+                                throw;
+                            Owned<IThorException> e2 = MakeThorException(e);
+                            e->Release();
+                            e2->setAction(tea_warning);
+                            queryJob().fireException(e2);
+                        }
+                    }
+                    partDesc->queryProperties().setPropInt64("@size", 0);
+                    p++;
+                }
+                clusterIdx++;
+            }
+        }
+        queryThorFileManager().publish(container.queryJob(), fileName, *fileDesc, NULL);
+    }
 }
 
 CWriteMasterBase::CWriteMasterBase(CMasterGraphElement *info) : CMasterActivity(info), diskStats(info->queryJob(), diskWriteRemoteStatistics)

+ 1 - 59
thorlcr/mfilemanager/thmfilemanager.cpp

@@ -479,70 +479,12 @@ public:
         return LINK(desc);
     }
 
-    void publish(CJobBase &job, const char *logicalName, IFileDescriptor &fileDesc, Owned<IDistributedFile> *publishedFile=NULL, unsigned partOffset=0, bool createMissingParts=true)
+    void publish(CJobBase &job, const char *logicalName, IFileDescriptor &fileDesc, Owned<IDistributedFile> *publishedFile=NULL)
     {
         IPropertyTree &props = fileDesc.queryProperties();
         bool temporary = props.getPropBool("@temporary");
         if (!temporary || job.queryUseCheckpoints())
             queryDistributedFileDirectory().removeEntry(logicalName, job.queryUserDescriptor());
-
-        if (!temporary && createMissingParts && !isFileKey(fileDesc.queryProperties()))
-        {
-            // create empty parts for a fileDesc being published that is larger than this clusters
-            unsigned clusterIdx = 0;
-            for (; clusterIdx<fileDesc.numClusters(); clusterIdx++)
-            {
-                if (job.querySlaves() < fileDesc.numParts())
-                {
-                    StringBuffer clusterName;
-                    fileDesc.getClusterGroupName(clusterIdx, clusterName, &queryNamedGroupStore());
-                    PROGLOG("Creating blank parts for file '%s', cluster '%s'", logicalName, clusterName.str());
-                    unsigned p=0;
-                    while (p<fileDesc.numParts())
-                    {
-                        if (p == partOffset)
-                            p += job.querySlaves();
-                        IPartDescriptor *partDesc = fileDesc.queryPart(p);
-                        CDateTime createTime, modifiedTime;
-                        unsigned c=0;
-                        for (; c<partDesc->numCopies(); c++)
-                        {
-                            RemoteFilename rfn;
-                            partDesc->getFilename(c, rfn);
-                            StringBuffer path;
-                            rfn.getPath(path);
-                            try
-                            {
-                                ensureDirectoryForFile(path.str());
-                                OwnedIFile iFile = createIFile(path.str());
-                                OwnedIFileIO iFileIO = iFile->open(IFOcreate);
-                                iFileIO.clear();
-                                // ensure copies have matching datestamps, as they would do normally (backupnode expects it)
-                                if (partDesc->numCopies() > 1)
-                                {
-                                    if (0 == c)
-                                        iFile->getTime(&createTime, &modifiedTime, NULL);
-                                    else
-                                        iFile->setTime(&createTime, &modifiedTime, NULL);
-                                }
-                            }
-                            catch (IException *e)
-                            {
-                                if (0 == c)
-                                    throw;
-                                Owned<IThorException> e2 = MakeThorException(e);
-                                e->Release();
-                                e2->setAction(tea_warning);
-                                job.fireException(e2);
-                            }
-                        }
-                        partDesc->queryProperties().setPropInt64("@size", 0);
-                        p++;
-                    }
-                }
-                clusterIdx++;
-            }
-        }       
         // thor clusters are backed up so if replicateOutputs set *always* assume a replicate
         if (replicateOutputs && (!temporary || job.queryUseCheckpoints()))
         {

+ 1 - 1
thorlcr/mfilemanager/thmfilemanager.hpp

@@ -47,7 +47,7 @@ interface IThorFileManager : extends IInterface
     virtual void noteFileRead(CJobBase &job, IDistributedFile *file, bool extended=false) = 0;
     virtual IDistributedFile *lookup(CJobBase &job, const char *logicalName, bool temporary=false, bool optional=false, bool reportOptional=false, bool updateAccessed=true) = 0;
     virtual IFileDescriptor *create(CJobBase &job, const char *logicalName, StringArray &groupNames, IArrayOf<IGroup> &groups, bool overwriteok, unsigned helperFlags=0, bool nonLocalIndex=false, unsigned restrictedWidth=0) = 0;
-    virtual void publish(CJobBase &job, const char *logicalName, IFileDescriptor &file, Owned<IDistributedFile> *publishedFile=NULL, unsigned partOffset=0, bool createMissingParts=true) = 0;
+    virtual void publish(CJobBase &job, const char *logicalName, IFileDescriptor &file, Owned<IDistributedFile> *publishedFile=NULL) = 0;
     virtual void clearCacheEntry(const char *name) = 0;
     virtual StringBuffer &mangleLFN(CJobBase &job, const char *lfn, StringBuffer &out) = 0;
     virtual StringBuffer &addScope(CJobBase &job, const char *logicalname, StringBuffer &ret, bool temporary=false, bool paused=false) = 0;