Browse Source

HPCC-13836 Thor write to external lost rows if slaves on >1 node

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 10 years ago
parent
commit
c33e368da3

+ 1 - 1
thorlcr/activities/indexwrite/thindexwriteslave.cpp

@@ -188,7 +188,7 @@ public:
         StringBuffer partFname;
         getPartFilename(partDesc, 0, partFname);
         bool compress=false;
-        OwnedIFileIO iFileIO = createMultipleWrite(this, partDesc, 0, compress, false, NULL, this, false, true, &abortSoon);
+        OwnedIFileIO iFileIO = createMultipleWrite(this, partDesc, 0, TW_RenameToPrimary, compress, NULL, this, &abortSoon);
         Owned<IFileIOStream> out = createBufferedIOStream(iFileIO);
         ActPrintLog("INDEXWRITE: created fixed output stream %s", partFname.str());
         unsigned flags = COL_PREFIX;

+ 14 - 13
thorlcr/activities/thactivityutil.cpp

@@ -656,14 +656,15 @@ class CWriteHandler : public CSimpleInterface, implements IFileIO
     bool *aborted;
     CActivityBase &activity;
     IPartDescriptor &partDesc;
-    bool remote, direct, renameToPrimary;
+    bool remote;
     CFIPScope fipScope;
+    unsigned twFlags;
 
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
-    CWriteHandler(CActivityBase &_activity, IPartDescriptor &_partDesc, IFile *_primary, IFileIO *_primaryio, ICopyFileProgress *_iProgress, bool _direct, bool _renameToPrimary, bool *_aborted) 
-        : activity(_activity), partDesc(_partDesc), primary(_primary), primaryio(_primaryio), iProgress(_iProgress), direct(_direct), renameToPrimary(_renameToPrimary), aborted(_aborted), fipScope(primary->queryFilename())
+    CWriteHandler(CActivityBase &_activity, IPartDescriptor &_partDesc, IFile *_primary, IFileIO *_primaryio, ICopyFileProgress *_iProgress, unsigned _twFlags, bool *_aborted)
+        : activity(_activity), partDesc(_partDesc), primary(_primary), primaryio(_primaryio), iProgress(_iProgress), twFlags(_twFlags), aborted(_aborted), fipScope(primary->queryFilename())
     {
         RemoteFilename rfn;
         partDesc.getFilename(0, rfn);
@@ -680,11 +681,11 @@ public:
             primary->remove(); // i.e. never completed, so remove partial (temp) primary
             return;
         }
-        if (renameToPrimary)
+        if (twFlags & TW_RenameToPrimary)
         {
             OwnedIFile tmpIFile;
             CFIPScope fipScope;
-            if (remote)
+            if (remote && !(twFlags & TW_External))
             {
                 StringBuffer tmpName(primaryName.str());
                 tmpName.append(".tmp");
@@ -749,7 +750,7 @@ public:
     virtual void close() { primaryio->close(); }
 };
 
-IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, bool &compress, bool extend, ICompressor *ecomp, ICopyFileProgress *iProgress, bool direct, bool renameToPrimary, bool *aborted, StringBuffer *_outLocationName)
+IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, unsigned twFlags, bool &compress, ICompressor *ecomp, ICopyFileProgress *iProgress, bool *aborted, StringBuffer *_outLocationName)
 {
     StringBuffer outLocationNameI;
     StringBuffer &outLocationName = _outLocationName?*_outLocationName:outLocationNameI;
@@ -757,8 +758,8 @@ IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc,
     RemoteFilename rfn;
     partDesc.getFilename(0, rfn);
     StringBuffer primaryName;
-    rfn.getPath(primaryName);   
-    if (direct)
+    rfn.getPath(primaryName);
+    if (twFlags & TW_Direct)
     {
         if (0 == outLocationName.length())
             outLocationName.append(primaryName.str());
@@ -767,7 +768,7 @@ IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc,
     {
         // use temp name
         GetTempName(outLocationName, "partial");
-        if (rfn.isLocal())
+        if (rfn.isLocal() || (twFlags & TW_External))
         { // ensure local tmp in same directory as target
             StringBuffer dir;
             splitDirTail(primaryName, dir);
@@ -784,21 +785,21 @@ IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc,
     {
         if (activity->getOptBool(THOROPT_COMP_FORCELZW, false))
             recordSize = 0; // by default if fixed length (recordSize set), row diff compression is used. This forces LZW
-        fileio.setown(createCompressedFileWriter(file, recordSize, extend, true, ecomp));
+        fileio.setown(createCompressedFileWriter(file, recordSize, 0 != (twFlags & TW_Extend), true, ecomp));
         if (!fileio)
         {
             compress = false;
             Owned<IThorException> e = MakeActivityWarning(activity, TE_LargeBufferWarning, "Could not write file '%s' compressed", outLocationName.str());
             activity->fireException(e);
-            fileio.setown(file->open(extend&&file->exists()?IFOwrite:IFOcreate)); 
+            fileio.setown(file->open((twFlags & TW_Extend)&&file->exists()?IFOwrite:IFOcreate));
         }
     }
     else
-        fileio.setown(file->open(extend&&file->exists()?IFOwrite:IFOcreate)); 
+        fileio.setown(file->open((twFlags & TW_Extend)&&file->exists()?IFOwrite:IFOcreate));
     if (!fileio)
         throw MakeActivityException(activity, TE_FileCreationFailed, "Failed to create file for write (%s) error = %d", outLocationName.str(), GetLastError());
     ActPrintLog(activity, "Writing to file: %s, compress=%s, rdiff=%s", file->queryFilename(), compress ? "true" : "false", (compress && recordSize) ? "true" : "false");
-    return new CWriteHandler(*activity, partDesc, file, fileio, iProgress, direct, renameToPrimary, aborted);
+    return new CWriteHandler(*activity, partDesc, file, fileio, iProgress, twFlags, aborted);
 }
 
 StringBuffer &locateFilePartPath(CActivityBase *activity, const char *logicalFilename, IPartDescriptor &partDesc, StringBuffer &filePath)

+ 5 - 1
thorlcr/activities/thactivityutil.ipp

@@ -190,8 +190,12 @@ StringBuffer &locateFilePartPath(CActivityBase *activity, const char *logicalFil
 void doReplicate(CActivityBase *activity, IPartDescriptor &partDesc, ICopyFileProgress *iProgress=NULL);
 void cancelReplicates(CActivityBase *activity, IPartDescriptor &partDesc);
 
+#define TW_Extend 0x01
+#define TW_Direct 0x02
+#define TW_External 0x04
+#define TW_RenameToPrimary 0x08
 interface IPartDescriptor;
-IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, bool &compress, bool extend, ICompressor *ecomp, ICopyFileProgress *iProgress, bool direct, bool renameToPrimary, bool *aborted, StringBuffer *_locationName=NULL);
+IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, unsigned twFlags, bool &compress, ICompressor *ecomp, ICopyFileProgress *iProgress, bool *aborted, StringBuffer *_locationName=NULL);
 
 
 #endif

+ 9 - 3
thorlcr/activities/thdiskbaseslave.cpp

@@ -334,9 +334,15 @@ void CDiskWriteSlaveActivityBase::open()
     if (query && compress)
         UNIMPLEMENTED;
 
-    bool direct = query || (external && !firstNode());
-    bool rename = !external || (!query && lastNode());
-    Owned<IFileIO> iFileIO = createMultipleWrite(this, *partDesc, diskRowMinSz, compress, extend||(external&&!query), ecomp, this, direct, rename, &abortSoon, (external&&!query) ? &tempExternalName : NULL);
+    unsigned twFlags = external ? TW_External : 0;
+    if (query || (external && !firstNode()))
+        twFlags |= TW_Direct;
+    if (!external || (!query && lastNode()))
+        twFlags |= TW_RenameToPrimary;
+    if (extend||(external&&!query))
+        twFlags |= TW_Extend;
+
+    Owned<IFileIO> iFileIO = createMultipleWrite(this, *partDesc, diskRowMinSz, twFlags, compress, ecomp, this, &abortSoon, (external&&!query) ? &tempExternalName : NULL);
 
     if (compress)
     {