فهرست منبع

HPCC-14133 Critical sections

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.co.uk>
Shamser Ahmed 9 سال پیش
والد
کامیت
acc820e3cf

+ 9 - 0
common/workunit/workflow.cpp

@@ -45,6 +45,7 @@ mapEnums wfmodes[] =
     { WFModeBeginWait, "bwait" },
     { WFModeWait, "wait" },
     { WFModeOnce, "once" },
+    { WFModeCritical, "critical" },
     { WFModeSize, NULL}
 };
 
@@ -162,6 +163,7 @@ public:
     virtual unsigned     queryPersistWfid() const { return tree->getPropInt("@persistWfid", 0); }
     virtual int          queryPersistCopies() const { return tree->getPropInt("@persistCopies", 0); }
     virtual bool         queryPersistRefresh() const { return tree->getPropBool("@persistRefresh", true); }
+    virtual IStringVal & getCriticalName(IStringVal & val) const { val.set(tree->queryProp("@criticalName")); return val; }
     virtual IStringVal & queryCluster(IStringVal & val) const { val.set(tree->queryProp("@cluster")); return val; }
     virtual void         setScheduledNow() { tree->setPropTree("Schedule", createPTree()); setEnum(tree, "@state", WFStateReqd, wfstates); }
     virtual void         setScheduledOn(char const * name, char const * text) { IPropertyTree * stree = createPTree(); stree->setProp("@name", name); stree->setProp("@text", text); tree->setPropTree("Schedule", createPTree())->setPropTree("Event", stree); setEnum(tree, "@state", WFStateWait, wfstates); }
@@ -176,6 +178,7 @@ public:
             tree->setPropInt("@persistCopies", (int)numPersistInstances);
         tree->setPropBool("@persistRefresh", refresh);
     }
+    virtual void         setCriticalInfo(char const * name) { tree->setProp("@criticalName", name);}
     virtual void         setCluster(const char * cluster) { tree->setProp("@cluster", cluster); }
     //info set at run time
     virtual unsigned     queryScheduleCountRemaining() const { assertex(tree->hasProp("Schedule")); return tree->getPropInt("Schedule/@countRemaining"); }
@@ -352,6 +355,7 @@ private:
     unsigned persistWfid;
     int persistCopies;
     bool persistRefresh;
+    SCMStringBuffer criticalName;
     StringAttr eventName;
     StringAttr eventExtra;
 
@@ -387,6 +391,7 @@ public:
         scheduledWfid = other->queryScheduledWfid();
         persistCopies = other->queryPersistCopies();
         persistRefresh = other->queryPersistRefresh();
+        other->getCriticalName(criticalName);
         other->queryCluster(clusterName);
     }
     //info set at compile time
@@ -409,6 +414,7 @@ public:
     virtual unsigned     queryPersistWfid() const { return persistWfid; }
     virtual int          queryPersistCopies() const { return persistCopies; }
     virtual bool         queryPersistRefresh() const { return persistRefresh; }
+    virtual IStringVal & getCriticalName(IStringVal & val) const { val.set(criticalName.str()); return val; }
     virtual IStringVal & queryCluster(IStringVal & val) const { val.set(clusterName.str()); return val; }
     //info set at run time
     virtual unsigned     queryScheduleCountRemaining() const { return schedule ? schedule->queryCountRemaining() : 0; }
@@ -721,6 +727,9 @@ bool WorkflowMachine::executeItem(unsigned wfid, unsigned scheduledWfid)
     case WFModePersist:
         doExecutePersistItem(item);
         break;
+    case WFModeCritical:
+        doExecuteCriticalItem(item);
+        break;
     case WFModeBeginWait:
         doExecuteBeginWaitItem(item, scheduledWfid);
         item.setState(WFStateDone);

+ 1 - 0
common/workunit/workflow.hpp

@@ -91,6 +91,7 @@ protected:
     virtual void checkForAbort(unsigned wfid, IException * handling) = 0;
     // Persistence styles varies from machine to machine
     virtual void doExecutePersistItem(IRuntimeWorkflowItem & item) = 0;
+    virtual void doExecuteCriticalItem(IRuntimeWorkflowItem & item) = 0;
     virtual bool getPersistTime(time_t & when, IRuntimeWorkflowItem & item) = 0;
 
     // Check conditions, item type and call operations below based on type

+ 6 - 1
common/workunit/workunit.hpp

@@ -551,7 +551,8 @@ enum WFMode
     WFModeBeginWait = 5,
     WFModeWait = 6,
     WFModeOnce = 7,
-    WFModeSize = 8
+    WFModeSize = 8,
+    WFModeCritical = 9
 };
 
 enum WFState
@@ -603,6 +604,7 @@ interface IConstWorkflowItem : extends IInterface
     virtual unsigned queryPersistWfid() const = 0;
     virtual int queryPersistCopies() const = 0;  // 0 - unmangled name,  < 0 - use default, > 0 - max number
     virtual bool queryPersistRefresh() const = 0;
+    virtual IStringVal &getCriticalName(IStringVal & val) const = 0;
     virtual unsigned queryScheduleCountRemaining() const = 0;
     virtual WFState queryState() const = 0;
     virtual unsigned queryRetriesRemaining() const = 0;
@@ -614,6 +616,8 @@ interface IConstWorkflowItem : extends IInterface
     virtual IStringVal & queryCluster(IStringVal & val) const = 0;
 };
 inline bool isPersist(const IConstWorkflowItem & item) { return item.queryMode() == WFModePersist; }
+inline bool isCritical(const IConstWorkflowItem & item) { return item.queryMode() == WFModeCritical; }
+
 
 interface IRuntimeWorkflowItem : extends IConstWorkflowItem
 {
@@ -635,6 +639,7 @@ interface IWorkflowItem : extends IRuntimeWorkflowItem
     virtual void setScheduleCount(unsigned count) = 0;
     virtual void addDependency(unsigned wfid) = 0;
     virtual void setPersistInfo(const char * name, unsigned wfid, int maxCopies, bool refresh) = 0;
+    virtual void setCriticalInfo(char const * name) = 0;
     virtual void syncRuntimeData(const IConstWorkflowItem & other) = 0;
     virtual void setScheduledWfid(unsigned wfid) = 0;
     virtual void setCluster(const char * cluster) = 0;

+ 33 - 0
ecl/eclagent/eclagent.cpp

@@ -2197,6 +2197,20 @@ void EclAgentWorkflowMachine::releaseRunlock()
     runlock.clear();
 }
 
+IRemoteConnection *EclAgentWorkflowMachine::obtainCriticalLock(const char *name)
+{
+    StringBuffer xpath;
+    xpath.append("/WorkUnitCriticalLocks/").append(name);
+    return querySDS().connect(xpath.str(), myProcessSession(), RTM_CREATE | RTM_LOCK_WRITE | RTM_DELETE_ON_DISCONNECT, INFINITE);
+}
+
+void EclAgentWorkflowMachine::releaseCriticalLock(IRemoteConnection *criticalLock)
+{
+    LOG(MCrunlock, unknownJob, "Releasing critical lock");
+    if(criticalLock && queryDaliServerVersion().compare("1.3") < 0)
+        criticalLock->close(true);
+}
+
 void EclAgentWorkflowMachine::syncWorkflow()
 {
 #ifdef TRACE_WORKFLOW
@@ -2307,6 +2321,25 @@ void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
     agent.finishPersist(logicalName, persistLock.getClear());
 }
 
+void EclAgentWorkflowMachine::doExecuteCriticalItem(IRuntimeWorkflowItem & item)
+{
+    if (agent.isStandAloneExe)
+        throw MakeStringException(0, "CRITICAL not supported when running standalone");
+
+    SCMStringBuffer name;
+    const char *criticalName = item.getCriticalName(name).str();
+
+    unsigned wfid = item.queryWfid();
+
+    Owned<IRemoteConnection> rlock = obtainCriticalLock(criticalName);
+    if (!rlock.get())
+        throw MakeStringException(0, "Cannot obtain Critical section lock");
+
+    doExecuteItemDependencies(item, wfid);
+    doExecuteItem(item, wfid);
+
+    releaseCriticalLock(rlock);
+}
 //----------------------------------------------------------------
 
 void EclAgent::doNotify(char const * name, char const * text)

+ 3 - 0
ecl/eclagent/eclagent.ipp

@@ -273,6 +273,7 @@ protected:
     virtual void reportContingencyFailure(char const * type, IException * e);
     virtual void checkForAbort(unsigned wfid, IException * handling);
     virtual void doExecutePersistItem(IRuntimeWorkflowItem & item);
+    virtual void doExecuteCriticalItem(IRuntimeWorkflowItem & item);
     virtual bool getPersistTime(time_t & when, IRuntimeWorkflowItem & item);
 
 private:
@@ -284,6 +285,8 @@ private:
     void obtainRunlock();
     void releaseRunlock();
     void syncWorkflow();
+    IRemoteConnection * obtainCriticalLock(const char *name);
+    void releaseCriticalLock(IRemoteConnection *r);
     Owned<IWorkflowScheduleConnection> wfconn;
     Owned<PersistVersion> persist;
     bool persistsPrelocked;

+ 1 - 1
ecl/hql/hqlattr.cpp

@@ -460,6 +460,7 @@ unsigned getOperatorMetaFlags(node_operator op)
     case no_wait:
     case no_event:
     case no_persist:
+    case no_critical:
     case no_when:
     case no_setconditioncode:
     case no_priority:
@@ -628,7 +629,6 @@ unsigned getOperatorMetaFlags(node_operator op)
 
     case no_unused6:
     case no_unused13: case no_unused14: case no_unused15:
-    case no_unused29:
     case no_unused30: case no_unused31: case no_unused32: case no_unused33: case no_unused34: case no_unused35: case no_unused36: case no_unused37: case no_unused38:
     case no_unused40: case no_unused41: case no_unused42: case no_unused43: case no_unused44: case no_unused45: case no_unused46: case no_unused47: case no_unused48: case no_unused49:
     case no_unused50: case no_unused52:

+ 2 - 1
ecl/hql/hqlexpr.cpp

@@ -1191,6 +1191,7 @@ const char *getOpString(node_operator op)
     case no_notify: return "no_notify";
     case no_event: return "no_event";
     case no_persist: return "PERSIST";
+    case no_critical: return "CRITICAL";
     case no_omitted: return "no_omitted";
     case no_setconditioncode: return "no_setconditioncode";
     case no_selectfields: return "no_selectfields";
@@ -1523,7 +1524,6 @@ const char *getOpString(node_operator op)
 
     case no_unused6:
     case no_unused13: case no_unused14: case no_unused15:
-    case no_unused29:
     case no_unused30: case no_unused31: case no_unused32: case no_unused33: case no_unused34: case no_unused35: case no_unused36: case no_unused37: case no_unused38:
     case no_unused40: case no_unused41: case no_unused42: case no_unused43: case no_unused44: case no_unused45: case no_unused46: case no_unused47: case no_unused48: case no_unused49:
     case no_unused50: case no_unused52:
@@ -1589,6 +1589,7 @@ bool checkConstant(node_operator op)
     case no_select:
     case no_stored:
     case no_persist:
+    case no_critical:
     case no_checkpoint:
     case no_once:
     case no_getresult:

+ 1 - 1
ecl/hql/hqlexpr.hpp

@@ -365,7 +365,7 @@ enum _node_operator {
         no_quantile,
         no_nocombine,
         no_unordered,
-    no_unused29,
+        no_critical,
     no_unused30,
     no_unused31,
     no_unused32,

+ 7 - 0
ecl/hql/hqlfold.cpp

@@ -4234,6 +4234,13 @@ IHqlExpression * NullFolderMixin::foldNullDataset(IHqlExpression * expr)
         if (isRedundantGlobalScope(expr))
             return removeParentNode(expr);
         break;
+    case no_evaluate_stmt:
+        {
+            IHqlExpression * arg = expr->queryChild(0);
+            if (arg->isConstant() || arg->getOperator() == no_table)
+                return createNullExpr(expr);
+            break;
+        }
     }
     return NULL;
 }

+ 10 - 0
ecl/hql/hqlgram.y

@@ -153,6 +153,7 @@ static void eclsyntaxerror(HqlGram * parser, const char * s, short yystate, int
   COMBINE
   COMPRESSED
   __COMPRESSED__
+  CRITICAL
   TOK_CONST
   CORRELATION
   COS
@@ -1637,6 +1638,11 @@ failure
                             parser->normalizeExpression($5, type_string, true);
                             $$.setExpr(createValueF(no_persist, makeVoidType(), $3.getExpr(), $5.getExpr(), $6.getExpr(), NULL), $1);
                         }
+    | CRITICAL '(' expression ')'
+                        {
+                            parser->normalizeExpression($3, type_string, true);
+                            $$.setExpr(createValueF(no_critical, makeVoidType(), $3.getExpr(), NULL), $1);
+                        }
     | STORED '(' startStoredAttrs expression ',' fewMany optStoredFieldFormat ')'
                         {
                             parser->normalizeStoredNameExpression($4);
@@ -2684,6 +2690,10 @@ actionStmt
                             $$.setExpr(createValue(no_evaluate_stmt, makeVoidType(), $3.getExpr()));
                             $$.setPosition($1);
                         }
+    | EVALUATE '(' dataSet ')'
+                        {
+                            $$.setExpr(createValue(no_evaluate_stmt, makeVoidType(), $3.getExpr()), $1);
+                        }
     | EVALUATE '(' action ')'
                         {
                             $$.inherit($3);

+ 3 - 1
ecl/hql/hqlgram2.cpp

@@ -10524,6 +10524,7 @@ static void getTokenText(StringBuffer & msg, int token)
     case CPPBODY: msg.append("BEGINC++"); break;
     case TOK_CPP: msg.append("C++"); break;
     case CRC: msg.append("HASHCRC"); break;
+    case CRITICAL: msg.append("CRITICAL"); break;
     case CRON: msg.append("CRON"); break;
     case CSV: msg.append("CSV"); break;
     case DATASET: msg.append("DATASET"); break;
@@ -11203,7 +11204,8 @@ void HqlGram::checkWorkflowMultiples(IHqlExpression * previousWorkflow, IHqlExpr
         case no_stored:
         case no_checkpoint:
         case no_once:
-            if((oldOp==no_persist)||(oldOp==no_stored)||(oldOp==no_once)||(oldOp==no_checkpoint))
+        case no_critical:
+            if((oldOp==no_persist)||(oldOp==no_stored)||(oldOp==no_once)||(oldOp==no_checkpoint)||(oldOp==no_critical))
                 reportError(ERR_MULTIPLE_WORKFLOW, errpos, "Multiple scoping controls are not allowed on an action or expression");
             break;
         case no_attr:

+ 1 - 1
ecl/hql/hqlir.cpp

@@ -285,7 +285,7 @@ const char * getOperatorIRText(node_operator op)
     EXPAND_CASE(no,quantile);
     EXPAND_CASE(no,nocombine);
     EXPAND_CASE(no,unordered);
-    EXPAND_CASE(no,unused29);
+    EXPAND_CASE(no,critical);
     EXPAND_CASE(no,unused30);
     EXPAND_CASE(no,unused31);
     EXPAND_CASE(no,unused32);

+ 1 - 0
ecl/hql/hqllex.l

@@ -667,6 +667,7 @@ COS                 { RETURNSYM(COS); }
 COSH                { RETURNSYM(COSH); }
 COUNT               { RETURNSYM(COUNT); }
 COUNTER             { RETURNSYM(COUNTER); }
+CRITICAL            { RETURNSYM(CRITICAL); }
 CRON                { RETURNSYM(CRON); }
 CSV                 { RETURNSYM(CSV); }
 DATASET             { RETURNSYM(DATASET); }

+ 43 - 0
ecl/hqlcpp/hqlttcpp.cpp

@@ -966,6 +966,8 @@ YesNoOption HqlThorBoundaryTransformer::calcNormalizeThor(IHqlExpression * expr)
             type = NULL;        // don't check the return type
             break;
         }
+    case no_evaluate_stmt:
+        return normalizeThor(expr->queryChild(0));
     case no_setworkflow_cond:
     case no_ensureresult:
         return OptionNo;
@@ -5194,6 +5196,13 @@ void GlobalAttributeInfo::extractStoredInfo(IHqlExpression * expr, IHqlExpressio
         }
 	persistRefresh = getBoolValue(queryAttributeChild(expr, refreshAtom, 0), true);
         break;
+    case no_critical:
+        setOp = no_setresult;
+        storedName.set(expr->queryChild(0));
+        originalLabel.set(storedName);
+        sequence.setown(getLocalSequenceNumber());
+        extraSetAttr.setown(createAttribute(_workflow_Atom));
+        break;
     case no_global:
         throwUnexpected();
     case no_independent:
@@ -5651,6 +5660,12 @@ void WorkflowTransformer::setWorkflowPersist(IWorkflowItem * wf, char const * pe
     wf->setPersistInfo(persistName, persistWfid, numPersistInstances, refresh);
 }
 
+void WorkflowTransformer::setWorkflowCritical(IWorkflowItem * wf, char const * criticalName)
+{
+
+    wf->setCriticalInfo(criticalName);
+}
+
 WorkflowItem * WorkflowTransformer::createWorkflowItem(IHqlExpression * expr, unsigned wfid, node_operator workflowOp)
 {
     WorkflowItem * item = new WorkflowItem(wfid, workflowOp);
@@ -5863,6 +5878,7 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
                 // MORE - Add dynamic attribute to ensure the file is not pre-resolved
             }
             //fall through
+        case no_critical:
         case no_checkpoint:
         case no_stored:
             {
@@ -6031,6 +6047,33 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
         if ((info.persistOp != no_persist) && expr->isAction())
             setValue.setown(transformSequentialEtc(setValue));
 
+         if(info.persistOp == no_critical)
+         {
+            StringBuffer criticalName;
+            info.storedName->queryValue()->getStringValue(criticalName);
+            unsigned criticalWfid = ++wfidCount;
+            Owned<IWorkflowItem> wf = addWorkflowToWorkunit(wfid, WFTypeNormal, WFModeCritical, queryDirectDependencies(setValue), conts, info.queryCluster());
+            setWorkflowCritical(wf, criticalName.str());
+
+            DependenciesUsed dependencies(false);
+            UnsignedArray visited;
+            extractDependentInputs(visited, dependencies, queryDirectDependencies(setValue));
+            gatherDependencies(setValue, dependencies, GatherAll);
+
+            HqlExprArray checkArgs;
+            checkArgs.append(*createExprAttribute(_files_Atom, dependencies.tablesRead));
+            inheritDependencies(&checkArgs.item(0));
+            if (dependencies.resultsRead.ordinality())
+            {
+                checkArgs.append(*createExprAttribute(_results_Atom, dependencies.resultsRead));
+                inheritDependencies(&checkArgs.item(1));
+            }
+
+            workflowOut->append(*createWorkflowItem(setValue, wfid, no_critical));
+
+            Owned<IWorkflowItem> wfPersist = addWorkflowToWorkunit(criticalWfid, WFTypeNormal, WFModeNormal, queryDirectDependencies(setValue), NULL);
+        }
+         else
         if(info.persistOp == no_persist)
         {
             StringBuffer persistName;

+ 1 - 0
ecl/hqlcpp/hqlttcpp.ipp

@@ -460,6 +460,7 @@ protected:
 
     void setWorkflowPersist(IWorkflowItem * wf, char const * persistName, unsigned persistWfid, int  numPersistInstances, bool refresh);
     void setWorkflowSchedule(IWorkflowItem * wf, ScheduleData const & sched);
+    void setWorkflowCritical(IWorkflowItem * wf, char const * criticalName);
 
     virtual IHqlExpression *  createTransformed(IHqlExpression * expr);
     void                      inheritDependencies(IHqlExpression * expr);

+ 1 - 1
plugins/fileservices/fileservices.cpp

@@ -1081,7 +1081,7 @@ public:
         else
             transaction = NULL;
     }
-    ~CImplicitSuperTransaction()
+    ~CImplicitSuperTransaction() noexcept(false)
     {
         if (transaction)
             transaction->commit();

+ 29 - 1
roxie/ccd/ccdcontext.cpp

@@ -376,6 +376,24 @@ protected:
         when = getResultInt(whenName, ResultSequencePersist);
         return true;
     }
+    virtual void doExecuteCriticalItem(IRuntimeWorkflowItem & item)
+    {
+        if (!workunit)
+            throw MakeStringException(0, "CRITICAL not supported when running predeployed queries");
+
+        unsigned wfid = item.queryWfid();
+
+        SCMStringBuffer name;
+        const char *criticalName = item.getCriticalName(name).str();
+
+        Owned<IRemoteConnection> rlock = obtainCriticalLock(criticalName);
+        if (!rlock.get())
+            throw MakeStringException(0, "Cannot obtain Critical section lock");
+
+        doExecuteItemDependencies(item, wfid);
+        doExecuteItem(item, wfid);
+        releaseCriticalLock(rlock);
+    }
 
 private:
 
@@ -708,7 +726,17 @@ private:
             }
         }
     }
-
+    IRemoteConnection *obtainCriticalLock(const char *name)
+    {
+        StringBuffer xpath;
+        xpath.append("/WorkUnitCriticalLocks/").append(name);
+        return querySDS().connect(xpath.str(), myProcessSession(), RTM_CREATE | RTM_LOCK_WRITE | RTM_DELETE_ON_DISCONNECT, INFINITE);
+    }
+    void releaseCriticalLock(IRemoteConnection *criticalLock)
+    {
+        if(criticalLock && queryDaliServerVersion().compare("1.3") < 0)
+            criticalLock->close(true);
+    }
     IConstWorkUnit *workunit;
     IPropertyTree *workflowInfo;
     Owned<IWorkflowScheduleConnection> wfconn;

+ 74 - 0
testing/regress/ecl/critical1.ecl

@@ -0,0 +1,74 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+###############################################################################*/
+import Std;
+
+//noroxie
+//nohthor
+
+rawRecord := RECORD
+    unsigned key;
+END;
+
+taggedRecord := RECORD(rawRecord)
+    unsigned8 id;
+END;
+
+idFileName := 'test::ids';
+
+// Create superfile and initialize with first id number
+InitializeSuperFile := FUNCTION
+    InitialDS := DATASET([{0,1}],taggedRecord);
+    initialIdFileName := idFilename + '_initial';
+    newDS:=OUTPUT(InitialDS,,initialIdFileName,THOR,OVERWRITE);
+
+   return IF(not Std.File.SuperFileExists(idFileName),
+             SEQUENTIAL(Std.File.CreateSuperFile(idFileName, false, true),newDS, Std.File.AddSuperFile(idFileName, initialIdFileName)) );
+END;
+
+// Create new file with keys not already in current file.
+processInput(dataset(rawRecord) inFile, string outFileName) := FUNCTION
+    existingIds := DATASET(idFileName, taggedRecord, THOR);
+    unmatchedKeys := JOIN(inFile, existingIds, LEFT.key = RIGHT.key, LEFT ONLY, LOOKUP);
+    maxId := MAX(existingIds, id);
+    newIdFileName := idFileName + WORKUNIT;
+    newIds := PROJECT(unmatchedKeys, TRANSFORM(taggedRecord, SELF.id := maxId + COUNTER; SELF := LEFT));
+    tagged := JOIN(inFile, existingIds + newIds, LEFT.key = RIGHT.key, TRANSFORM(taggedRecord, SELF.id := RIGHT.id, SELF := LEFT),lookup);
+
+    updateIds := OUTPUT(newIds,,newIdFileName);
+    extendSuper := Std.File.AddSuperFile(idFileName, newIdFileName);
+
+    result := SEQUENTIAL(InitializeSuperFile, updateIds, extendSuper): CRITICAL('critical_test');
+
+    RETURN result;
+END;
+
+/* Generate sample input data */
+inputDataset := DATASET(1000000, TRANSFORM(rawRecord, SELF.key := RANDOM() % 1000000));
+datasetOutsideCritical := inputDataset : independent;
+
+/* Post process check - all id's should be unique*/
+sortedds := SORT(DATASET(idFileName,taggedRecord,THOR), id);
+dedupds := DEDUP(sortedds, left.id=right.id);
+dedupds_cnt := COUNT(dedupds);
+sortedds_cnt := COUNT(sortedds);
+
+SEQUENTIAL(
+    EVALUATE(datasetOutsideCritical),
+    processInput(datasetOutsideCritical, 'test::tagged'),
+    OUTPUT(sortedds_cnt-dedupds_cnt,NAMED('DuplicateIdCount') ), // This should be zero
+    Std.File.DeleteSuperFile(idFileName)
+);

+ 7 - 0
testing/regress/ecl/key/critical1.xml

@@ -0,0 +1,7 @@
+<Dataset name='Result 1'>
+</Dataset>
+<Dataset name='Result 2'>
+</Dataset>
+<Dataset name='DuplicateIdCount'>
+ <Row><DuplicateIdCount>0</DuplicateIdCount></Row>
+</Dataset>