소스 검색

HPCC-11749 Always recheck persists to ensure locked and exist

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday 11 년 전
부모
커밋
7fc86af8e3
7개의 변경된 파일176개의 추가작업 그리고 6개의 파일을 삭제
  1. 13 1
      common/workunit/workflow.cpp
  2. 16 4
      ecl/eclagent/eclagent.cpp
  3. 3 1
      ecl/eclagent/eclagent.ipp
  4. 41 0
      ecl/regress/issue11528a.ecl
  5. 22 0
      ecl/regress/issue11528b.ecl
  6. 38 0
      ecl/regress/issue11528c.ecl
  7. 43 0
      ecl/regress/issue11528d.ecl

+ 13 - 1
common/workunit/workflow.cpp

@@ -673,6 +673,17 @@ bool WorkflowMachine::executeItem(unsigned wfid, unsigned scheduledWfid)
     switch(item.queryState())
     {
     case WFStateDone:
+        if (item.queryMode() == WFModePersist)
+        {
+#ifdef TRACE_WORKFLOW
+            LOG(MCworkflow, "Recheck persist %u", wfid);
+#endif
+            break;
+        }
+#ifdef TRACE_WORKFLOW
+        LOG(MCworkflow, "Nothing to be done for workflow item %u", wfid);
+#endif
+        return true;
     case WFStateSkip:
 #ifdef TRACE_WORKFLOW
         LOG(MCworkflow, "Nothing to be done for workflow item %u", wfid);
@@ -683,7 +694,8 @@ bool WorkflowMachine::executeItem(unsigned wfid, unsigned scheduledWfid)
     case WFStateBlocked:
         throw new WorkflowException(0, "INTERNAL ERROR: attempting to execute workflow item in blocked state", wfid, WorkflowException::SYSTEM, MSGAUD_user);
     case WFStateFail:
-        item.reset(); //fall through
+        item.reset();
+        break;
     }
 
     switch(item.queryMode())

+ 16 - 4
ecl/eclagent/eclagent.cpp

@@ -2209,13 +2209,19 @@ void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
     {
         throw MakeStringException(0, "PERSIST not supported when running standalone");
     }
+
+    SCMStringBuffer name;
+    const char *logicalName = item.getPersistName(name).str();
+    //Persists are re-executed to ensure they are locked when workunits are restarted, and to ensure expired persists
+    //are rebuilt, but check if we have already processed and locked this persist
+    if (agent.alreadyLockedPersist(logicalName))
+        return;
+
     unsigned wfid = item.queryWfid();
     // Old persist model requires dependencies to be executed BEFORE checking if the persist is up to date
     // Defaults to old model, in case executing a WU that is created by earlier eclcc
     if (!agent.queryWorkUnit()->getDebugValueBool("expandPersistInputDependencies", false))
         doExecuteItemDependencies(item, wfid);
-    SCMStringBuffer name;
-    const char *logicalName = item.getPersistName(name).str();
     int maxPersistCopies = item.queryPersistCopies();
     if (maxPersistCopies < 0)
         maxPersistCopies = DEFAULT_PERSIST_COPIES;
@@ -2256,7 +2262,7 @@ void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
         doExecuteItem(item, wfid);
         agent.updatePersist(persistLock, logicalName, thisPersist->eclCRC, thisPersist->allCRC);
     }
-    agent.finishPersist(persistLock.getClear());
+    agent.finishPersist(logicalName, persistLock.getClear());
 }
 
 //----------------------------------------------------------------
@@ -2620,9 +2626,15 @@ IRemoteConnection *EclAgent::startPersist(const char * logicalName)
     return persistLock;
 }
 
-void EclAgent::finishPersist(IRemoteConnection *persistLock)
+bool EclAgent::alreadyLockedPersist(const char * persistName)
+{
+    return processedPersists.contains(persistName);
+}
+
+void EclAgent::finishPersist(const char * persistName, IRemoteConnection *persistLock)
 {
     LOG(MCrunlock, unknownJob, "Finished persists - add to read lock list");
+    processedPersists.append(persistName);
     persistReadLocks.append(*persistLock);
 }
 

+ 3 - 1
ecl/eclagent/eclagent.ipp

@@ -355,6 +355,7 @@ private:
     StringArray tempFiles;
     CriticalSection tfsect;
     Array persistReadLocks;
+    StringArray processedPersists;
 
     Owned<ILoadedDllEntry> dll;
     CIArrayOf<EclAgentQueryLibrary> queryLibraries;
@@ -488,7 +489,8 @@ public:
     virtual void restoreCluster();
 
     IRemoteConnection *startPersist(const char * name);
-    void finishPersist(IRemoteConnection *persistLock);
+    bool alreadyLockedPersist(const char * persistName);
+    void finishPersist(const char * persistName, IRemoteConnection *persistLock);
     void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC);
     void checkPersistMatches(const char * logicalName, unsigned eclCRC);
     virtual void deleteLRUPersists(const char * logicalName, int keep);

+ 41 - 0
ecl/regress/issue11528a.ecl

@@ -0,0 +1,41 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+d1 := DATASET([1,2,3,4,56,31,1324,134], { unsigned id });
+
+p1 := d1 : persist('~p1', single);
+
+addMessage := output(dataset(['Hi'], { string text }),named('msgs'), EXTEND);
+
+allMessages := DATASET(WORKUNIT('msgs'), { string text });
+
+p2 := p1(id != 10) : persist('~p2', single);
+
+
+import Std.File;
+
+clean := File.DeleteLogicalFile('~p2');
+
+doFail := FAIL('Fail because this is being run for the first time!');
+
+cleanUp := IF(count(allMessages)=1, doFail);
+
+
+x1 := WHEN(p2, addMessage, BEFORE);
+
+output(WHEN(x1, cleanup, BEFORE),named('results'));

+ 22 - 0
ecl/regress/issue11528b.ecl

@@ -0,0 +1,22 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+import Std.File;
+
+clean := File.DeleteLogicalFile('~p2');
+
+clean;

+ 38 - 0
ecl/regress/issue11528c.ecl

@@ -0,0 +1,38 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+d1 := DATASET([1,2,3,4,56,31,1324,134], { unsigned id });
+
+p1 := d1 : persist('~p1', single);
+
+addMessage := output(dataset(['Hi'], { string text }),named('msgs'), EXTEND);
+
+allMessages := DATASET(WORKUNIT('msgs'), { string text });
+
+p2 := p1(id != 10) : persist('pzzz', single, FEW);
+
+
+import Std.File;
+
+doFail := FAIL('Fail because this is being run for the first time!');
+
+cleanUp := IF(count(allMessages)=1, doFail);
+
+x1 := WHEN(p2, addMessage, BEFORE);
+
+output(WHEN(x1, cleanup, BEFORE),named('results'));

+ 43 - 0
ecl/regress/issue11528d.ecl

@@ -0,0 +1,43 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+d1 := DATASET([1,2,3,4,56,31,1324,134], { unsigned id });
+
+p1 := d1 : persist('~p1', single);
+
+addMessage := output(dataset(['Hi'], { string text }),named('msgs'), EXTEND);
+
+allMessages := DATASET(WORKUNIT('msgs'), { string text });
+
+p2 := p1(id != 10) : persist('~p2', single);
+
+p3 := dedup(p1 + p2) : persist('~p3', single);
+
+
+
+import Std.File;
+
+clean(string x) := File.DeleteLogicalFile('~' + x);
+
+cleanUp := [ clean('p1'), clean('p2'), clean('p3') ];
+
+
+sequential(
+    cleanUp;
+    output(p3,named('results'));
+);