Browse Source

HPCC-19557 Spray partition calculation can't be aborted.

Add abort handling to partitioner code

Tested manually with spray/despray regression cases and ECLWatch

Signed-off-by: Attila Vamos <attila.vamos@gmail.com>
Attila Vamos 7 years ago
parent
commit
9721eb5fe5
5 changed files with 41 additions and 11 deletions
  1. 13 0
      dali/ft/daftformat.cpp
  2. 2 1
      dali/ft/daftformat.hpp
  3. 14 9
      dali/ft/daftformat.ipp
  4. 2 1
      dali/ft/filecopy.cpp
  5. 10 0
      dali/ft/filecopy.ipp

+ 13 - 0
dali/ft/daftformat.cpp

@@ -58,6 +58,16 @@ CPartitioner::CPartitioner()
     partitioning = false;
 }
 
+void CPartitioner::setAbort(IAbortRequestCallback * _abort)
+{
+    abortChecker = _abort;
+}
+
+bool CPartitioner::isAborting()
+{
+    return abortChecker && abortChecker->abortRequested();
+}
+
 void CPartitioner::calcPartitions(Semaphore * sem)
 {
     commonCalcPartitions();
@@ -133,6 +143,9 @@ void CPartitioner::commonCalcPartitions()
             startInputOffset = inputOffset;
             startOutputOffset = cursor.outputOffset;
         }
+
+        if (isAborting())
+            throwAbortException();
     }
 
     assertex(startInputOffset != endOffset || splitAfterPoint());

+ 2 - 1
dali/ft/daftformat.hpp

@@ -34,7 +34,7 @@ struct PartitionCursor
 {
 public:
     PartitionCursor(offset_t _inputOffset)  { inputOffset = nextInputOffset = _inputOffset; outputOffset = 0; trimLength = 0; }
-    
+
     offset_t        inputOffset;
     offset_t        nextInputOffset;
     offset_t        outputOffset;
@@ -61,6 +61,7 @@ public:
     virtual void setTarget(IOutputProcessor * _target) = 0;
     virtual void setRecordStructurePresent(bool _recordStructurePresent) = 0;
     virtual void getRecordStructure(StringBuffer & _recordStructure) = 0;
+    virtual void setAbort(IAbortRequestCallback * _abort) = 0;
 };
 
 interface IFormatProcessor : public IFormatPartitioner

+ 14 - 9
dali/ft/daftformat.ipp

@@ -40,15 +40,16 @@ public:
     virtual void setTarget(IOutputProcessor * _target);
     virtual void setRecordStructurePresent(bool _recordStructurePresent);
     virtual void getRecordStructure(StringBuffer & _recordStructure);
+    virtual void setAbort(IAbortRequestCallback * _abort);
 
 protected:
     virtual void findSplitPoint(offset_t curOffset, PartitionCursor & cursor) = 0;
     virtual bool splitAfterPoint() { return false; }
     virtual void killBuffer() = 0;
-    
 
     void commonCalcPartitions();
 
+    virtual bool isAborting();
 
 protected:
     PartitionPointArray         results;
@@ -64,6 +65,7 @@ protected:
     unsigned                    thisHeaderSize;
     unsigned                    numParts;
     bool                        partitioning;
+    IAbortRequestCallback *     abortChecker = nullptr;
 };
 
 //---------------------------------------------------------------------------
@@ -125,7 +127,7 @@ protected:
 
 
 //---------------------------------------------------------------------------
-// More complex processors that need to read the source file - e.g. because 
+// More complex processors that need to read the source file - e.g. because
 // output offset being calculated.
 
 
@@ -151,13 +153,13 @@ protected:
     void seekInput(offset_t offset);
     offset_t tellInput();
 
-    inline byte *bufferBase()  
-    { 
-        return (byte *)((bufattr.length()!=bufferSize)?bufattr.allocate(bufferSize):bufattr.bufferBase()); 
+    inline byte *bufferBase()
+    {
+        return (byte *)((bufattr.length()!=bufferSize)?bufattr.allocate(bufferSize):bufattr.bufferBase());
     }
     virtual void killBuffer()  { bufattr.clear(); }
     virtual void clearBufferOverrun() { numOfBufferOverrun = 0; numOfProcessedBytes = 0; }
-protected: 
+protected:
     Owned<IFileIOStream>   inStream;
     MemoryAttr             bufattr;
     size32_t               headerSize;
@@ -252,7 +254,7 @@ protected:
 
 private:
     void storeFieldName(const char * start, unsigned len);
-    
+
 protected:
     enum { NONE=0, SEPARATOR=1, TERMINATOR=2, WHITESPACE=3, QUOTE=4, ESCAPE=5 };
     unsigned        maxElementLength;
@@ -605,7 +607,7 @@ public:
     size32_t getEndOfRecord(const byte * record, unsigned maxToRead);
     offset_t getHeaderLength(BufferedDirectReader & reader);
     offset_t getFooterLength(BufferedDirectReader & reader, offset_t size);
-    
+
     unsigned getMaxElementLength() { return maxElementLength; }
 
 protected:
@@ -667,10 +669,13 @@ public:
     virtual void setTarget(IOutputProcessor * _target) { UNIMPLEMENTED; }
     virtual void setRecordStructurePresent(bool _recordStructurePresent);
     virtual void getRecordStructure(StringBuffer & _recordStructure);
+    virtual void setAbort(IAbortRequestCallback * _abort) { UNIMPLEMENTED; }
 
 protected:
     void callRemote();
 
+    virtual bool isAborting() { UNIMPLEMENTED; };
+
 protected:
     CachedPasswordProvider      passwordProvider;
     SocketEndpoint              ep;
@@ -710,7 +715,7 @@ public:
 
 protected:
     offset_t                outputOffset;
-    OwnedIFileIOStream      out; 
+    OwnedIFileIOStream      out;
 };
 
 class DALIFT_API CFixedOutputProcessor : public COutputProcessor

+ 2 - 1
dali/ft/filecopy.cpp

@@ -565,7 +565,7 @@ int FileSizeThread::run()
 //----------------------------------------------------------------------------
 
 FileSprayer::FileSprayer(IPropertyTree * _options, IPropertyTree * _progress, IRemoteConnection * _recoveryConnection, const char *_wuid)
-  : wuid(_wuid)
+  : wuid(_wuid), fileSprayerAbortChecker(*this)
 {
     totalSize = 0;
     replicate = false;
@@ -1191,6 +1191,7 @@ void FileSprayer::calculateSprayPartition()
     ForEachItemIn(idx, sources)
     {
         IFormatPartitioner * partitioner = createPartitioner(idx, calcOutput, numParts);
+        partitioner->setAbort(&fileSprayerAbortChecker);
         partitioners.append(*partitioner);
     }
 

+ 10 - 0
dali/ft/filecopy.ipp

@@ -272,6 +272,15 @@ protected:
     void examineCsvStructure();
     IFormatPartitioner * createPartitioner(aindex_t index, bool calcOutput, unsigned numParts);
 
+    class CAbortRequestCallback : implements IAbortRequestCallback
+    {
+        FileSprayer &sprayer;
+    public:
+        CAbortRequestCallback(FileSprayer &_sprayer) : sprayer(_sprayer) { }
+        virtual bool abortRequested() { return sprayer.isAborting(); }
+    };
+
+
 private:
     bool calcUsePull();
     // Get and store Remote File Name parts into the History record
@@ -335,6 +344,7 @@ protected:
     int                     fileUmask;
     Owned<IPropertyTree>    srcHistory;
     dfu_operation           operation = dfu_unknown;
+    CAbortRequestCallback   fileSprayerAbortChecker;
 };