Browse Source

Merge pull request #11079 from AttilaVamos/HPCC-19557-fix-6.4.16

HPCC-19557 Spray partition calculation can't be aborted.

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 7 years ago
parent
commit
17783be02d
5 changed files with 33 additions and 2 deletions
  1. 13 0
      dali/ft/daftformat.cpp
  2. 2 1
      dali/ft/daftformat.hpp
  3. 6 0
      dali/ft/daftformat.ipp
  4. 2 1
      dali/ft/filecopy.cpp
  5. 10 0
      dali/ft/filecopy.ipp

+ 13 - 0
dali/ft/daftformat.cpp

@@ -58,6 +58,16 @@ CPartitioner::CPartitioner()
     partitioning = false;
 }
 
+void CPartitioner::setAbort(IAbortRequestCallback * _abort)
+{
+    abortChecker = _abort;
+}
+
+bool CPartitioner::isAborting()
+{
+    return abortChecker && abortChecker->abortRequested();
+}
+
 void CPartitioner::calcPartitions(Semaphore * sem)
 {
     commonCalcPartitions();
@@ -138,6 +148,9 @@ void CPartitioner::commonCalcPartitions()
             startInputOffset = inputOffset;
             startOutputOffset = cursor.outputOffset;
         }
+
+        if (isAborting())
+            throwAbortException();
     }
 
     assertex(startInputOffset != endOffset || splitAfterPoint());

+ 2 - 1
dali/ft/daftformat.hpp

@@ -34,7 +34,7 @@ struct PartitionCursor
 {
 public:
     PartitionCursor(offset_t _inputOffset)  { inputOffset = nextInputOffset = _inputOffset; outputOffset = 0; trimLength = 0; }
-    
+
     offset_t        inputOffset;
     offset_t        nextInputOffset;
     offset_t        outputOffset;
@@ -61,6 +61,7 @@ public:
     virtual void setTarget(IOutputProcessor * _target) = 0;
     virtual void setRecordStructurePresent(bool _recordStructurePresent) = 0;
     virtual void getRecordStructure(StringBuffer & _recordStructure) = 0;
+    virtual void setAbort(IAbortRequestCallback * _abort) = 0;
 };
 
 interface IFormatProcessor : public IFormatPartitioner

+ 6 - 0
dali/ft/daftformat.ipp

@@ -53,6 +53,7 @@ public:
     virtual void setTarget(IOutputProcessor * _target);
     virtual void setRecordStructurePresent(bool _recordStructurePresent);
     virtual void getRecordStructure(StringBuffer & _recordStructure);
+    virtual void setAbort(IAbortRequestCallback * _abort);
 
 protected:
     virtual void findSplitPoint(offset_t curOffset, PartitionCursor & cursor) = 0;
@@ -61,6 +62,7 @@ protected:
 
     void commonCalcPartitions();
 
+    virtual bool isAborting();
 
 protected:
     PartitionPointArray         results;
@@ -76,6 +78,7 @@ protected:
     unsigned                    thisHeaderSize;
     unsigned                    numParts;
     bool                        partitioning;
+    IAbortRequestCallback *     abortChecker = nullptr;
 };
 
 //---------------------------------------------------------------------------
@@ -713,10 +716,13 @@ public:
     virtual void setTarget(IOutputProcessor * _target) { UNIMPLEMENTED; }
     virtual void setRecordStructurePresent(bool _recordStructurePresent);
     virtual void getRecordStructure(StringBuffer & _recordStructure);
+    virtual void setAbort(IAbortRequestCallback * _abort) { UNIMPLEMENTED; }
 
 protected:
     void callRemote();
 
+    virtual bool isAborting() { UNIMPLEMENTED; };
+
 protected:
     CachedPasswordProvider      passwordProvider;
     SocketEndpoint              ep;

+ 2 - 1
dali/ft/filecopy.cpp

@@ -565,7 +565,7 @@ int FileSizeThread::run()
 //----------------------------------------------------------------------------
 
 FileSprayer::FileSprayer(IPropertyTree * _options, IPropertyTree * _progress, IRemoteConnection * _recoveryConnection, const char *_wuid)
-  : wuid(_wuid)
+  : wuid(_wuid), fileSprayerAbortChecker(*this)
 {
     totalSize = 0;
     replicate = false;
@@ -1187,6 +1187,7 @@ void FileSprayer::calculateSprayPartition()
     ForEachItemIn(idx, sources)
     {
         IFormatPartitioner * partitioner = createPartitioner(idx, calcOutput, numParts);
+        partitioner->setAbort(&fileSprayerAbortChecker);
         partitioners.append(*partitioner);
     }
 

+ 10 - 0
dali/ft/filecopy.ipp

@@ -272,6 +272,15 @@ protected:
     void examineCsvStructure();
     IFormatPartitioner * createPartitioner(aindex_t index, bool calcOutput, unsigned numParts);
 
+    class CAbortRequestCallback : implements IAbortRequestCallback
+    {
+        FileSprayer &sprayer;
+    public:
+        CAbortRequestCallback(FileSprayer &_sprayer) : sprayer(_sprayer) { }
+        virtual bool abortRequested() { return sprayer.isAborting(); }
+    };
+
+
 private:
     bool calcUsePull();
     // Get and store Remote File Name parts into the History record
@@ -335,6 +344,7 @@ protected:
     int                     fileUmask;
     Owned<IPropertyTree>    srcHistory;
     dfu_operation           operation = dfu_unknown;
+    CAbortRequestCallback   fileSprayerAbortChecker;
 };