|
@@ -47,6 +47,7 @@
|
|
|
//#define REMOVE_NAMED_SCALARS
|
|
|
//#define OPTIMIZE_IMPLICIT_CAST
|
|
|
|
|
|
+#define PERSIST_VERSION 1 // Increment when implementation is incompatible.
|
|
|
#define REMOVE_GLOBAL_ANNOTATION // This should improve cse. It currently does for some, but not others...
|
|
|
|
|
|
#define DEFAULT_FOLD_OPTIONS HFOfoldfilterproject
|
|
@@ -131,15 +132,23 @@ struct GlobalAttributeInfo
|
|
|
{
|
|
|
public:
|
|
|
GlobalAttributeInfo(const char * _filePrefix, const char * _storedPrefix, IHqlExpression * _value) : value(_value)
|
|
|
- { setOp = no_none; persistOp = no_none; few = false; filePrefix = _filePrefix; storedPrefix = _storedPrefix; }
|
|
|
+ {
|
|
|
+ setOp = no_none;
|
|
|
+ persistOp = no_none;
|
|
|
+ few = false;
|
|
|
+ filePrefix = _filePrefix;
|
|
|
+ storedPrefix = _storedPrefix;
|
|
|
+ numPersistInstances = 0;
|
|
|
+ }
|
|
|
|
|
|
void extractGlobal(IHqlExpression * global, ClusterType platform);
|
|
|
- void extractStoredInfo(IHqlExpression * expr, IHqlExpression * originalValue, bool isRoxie);
|
|
|
+ void extractStoredInfo(IHqlExpression * expr, IHqlExpression * codehash, bool isRoxie, bool multiplePersistInstances);
|
|
|
void checkFew(HqlCppTranslator & translator);
|
|
|
void splitGlobalDefinition(ITypeInfo * type, IHqlExpression * value, IConstWorkUnit * wu, SharedHqlExpr & setOutput, OwnedHqlExpr * getOutput, bool isRoxie);
|
|
|
IHqlExpression * getStoredKey();
|
|
|
void preventDiskSpill() { few = true; }
|
|
|
IHqlExpression * queryCluster() const { return cluster; }
|
|
|
+ int queryMaxPersistCopies() const { return numPersistInstances; }
|
|
|
|
|
|
protected:
|
|
|
void doSplitGlobalDefinition(ITypeInfo * type, IHqlExpression * value, IConstWorkUnit * wu, SharedHqlExpr & setOutput, OwnedHqlExpr * getOutput, bool isRoxie);
|
|
@@ -162,8 +171,10 @@ protected:
|
|
|
OwnedHqlExpr cluster;
|
|
|
OwnedHqlExpr extraSetAttr;
|
|
|
OwnedHqlExpr extraOutputAttr;
|
|
|
+ OwnedHqlExpr codehash;
|
|
|
const char * filePrefix;
|
|
|
const char * storedPrefix;
|
|
|
+ int numPersistInstances;
|
|
|
bool few;
|
|
|
};
|
|
|
|
|
@@ -4825,7 +4836,7 @@ void GlobalAttributeInfo::extractGlobal(IHqlExpression * global, ClusterType pla
|
|
|
persistOp = no_global;
|
|
|
}
|
|
|
|
|
|
-void GlobalAttributeInfo::extractStoredInfo(IHqlExpression * expr, IHqlExpression * originalValue, bool isRoxie)
|
|
|
+void GlobalAttributeInfo::extractStoredInfo(IHqlExpression * expr, IHqlExpression * _codehash, bool isRoxie, bool multiplePersistInstances)
|
|
|
{
|
|
|
node_operator op = expr->getOperator();
|
|
|
few = expr->hasAttribute(fewAtom) || (isRoxie) || (value->isDictionary() && !expr->hasAttribute(manyAtom));
|
|
@@ -4844,13 +4855,29 @@ void GlobalAttributeInfo::extractStoredInfo(IHqlExpression * expr, IHqlExpressio
|
|
|
extraSetAttr.setown(createAttribute(checkpointAtom));
|
|
|
break;
|
|
|
case no_persist:
|
|
|
+ assertex(_codehash);
|
|
|
+ codehash.set(_codehash);
|
|
|
setOp = no_ensureresult;
|
|
|
storedName.set(expr->queryChild(0));
|
|
|
sequence.setown(getGlobalSequenceNumber());
|
|
|
- extraSetAttr.setown(createAttribute(_workflowPersist_Atom, LINK(originalValue)));
|
|
|
+ extraSetAttr.setown(createAttribute(_workflowPersist_Atom, LINK(codehash)));
|
|
|
setCluster(queryRealChild(expr, 1));
|
|
|
few = expr->hasAttribute(fewAtom); // PERSISTs need a consistent format.
|
|
|
extraOutputAttr.setown(createComma(LINK(expr->queryAttribute(expireAtom)), LINK(expr->queryAttribute(clusterAtom))));
|
|
|
+ numPersistInstances = multiplePersistInstances ? -1 : 0;
|
|
|
+ if (expr->hasAttribute(multipleAtom))
|
|
|
+ numPersistInstances = getIntValue(queryAttributeChild(expr, multipleAtom, 0), -1);
|
|
|
+ else if (expr->hasAttribute(singleAtom))
|
|
|
+ numPersistInstances = 0;
|
|
|
+
|
|
|
+ if (numPersistInstances != 0)
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ getStringValue(s, storedName);
|
|
|
+ s.append("__");
|
|
|
+ getStringValue(s, codehash);
|
|
|
+ storedName.setown(createConstant(s.str()));
|
|
|
+ }
|
|
|
break;
|
|
|
case no_global:
|
|
|
throwUnexpected();
|
|
@@ -5256,6 +5283,7 @@ WorkflowTransformer::WorkflowTransformer(IWorkUnit * _wu, HqlCppTranslator & _tr
|
|
|
combineAllStored = translator.queryOptions().combineAllStored;
|
|
|
combineTrivialStored = translator.queryOptions().combineTrivialStored;
|
|
|
expandPersistInputDependencies = translator.queryOptions().expandPersistInputDependencies;
|
|
|
+ multiplePersistInstances = translator.queryOptions().multiplePersistInstances;
|
|
|
isRootAction = true;
|
|
|
isRoxie = (translator.getTargetClusterType() == RoxieCluster);
|
|
|
workflowOut = NULL;
|
|
@@ -5302,9 +5330,9 @@ void WorkflowTransformer::setWorkflowSchedule(IWorkflowItem * wf, const Schedule
|
|
|
wf->setSchedulePriority(priority);
|
|
|
}
|
|
|
|
|
|
-void WorkflowTransformer::setWorkflowPersist(IWorkflowItem * wf, char const * persistName, unsigned persistWfid)
|
|
|
+void WorkflowTransformer::setWorkflowPersist(IWorkflowItem * wf, char const * persistName, unsigned persistWfid, int numPersistInstances)
|
|
|
{
|
|
|
- wf->setPersistInfo(persistName, persistWfid);
|
|
|
+ wf->setPersistInfo(persistName, persistWfid, numPersistInstances);
|
|
|
}
|
|
|
|
|
|
WorkflowItem * WorkflowTransformer::createWorkflowItem(IHqlExpression * expr, unsigned wfid, node_operator workflowOp)
|
|
@@ -5441,8 +5469,13 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
|
|
|
HqlExprArray actions;
|
|
|
unwindChildren(actions, expr, 1);
|
|
|
|
|
|
- IHqlExpression * original = queryAttribute(_original_Atom, actions);
|
|
|
- if (original) original = original->queryChild(0);
|
|
|
+ IHqlExpression * originalAttr = queryAttribute(_original_Atom, actions);
|
|
|
+ OwnedHqlExpr codehash;
|
|
|
+ if (originalAttr)
|
|
|
+ {
|
|
|
+ unsigned crc = getExpressionCRC(originalAttr->queryChild(0)) + PERSIST_VERSION;
|
|
|
+ codehash.setown(getSizetConstant(crc));
|
|
|
+ }
|
|
|
|
|
|
//First check for duplicate expressions, and cope with the weird case where they are identical except for the annotations.
|
|
|
//Do it before wfid is allocated to make life simpler
|
|
@@ -5455,7 +5488,7 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
|
|
|
case no_persist:
|
|
|
case no_checkpoint:
|
|
|
case no_stored:
|
|
|
- info.extractStoredInfo(&cur, original, isRoxie);
|
|
|
+ info.extractStoredInfo(&cur, codehash, isRoxie, multiplePersistInstances);
|
|
|
|
|
|
OwnedHqlExpr id = info.getStoredKey();
|
|
|
unsigned match = alreadyProcessed.find(*id);
|
|
@@ -5521,7 +5554,7 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
|
|
|
case no_checkpoint:
|
|
|
case no_stored:
|
|
|
{
|
|
|
- info.extractStoredInfo(&cur, original, isRoxie);
|
|
|
+ info.extractStoredInfo(&cur, codehash, isRoxie, multiplePersistInstances);
|
|
|
|
|
|
OwnedHqlExpr id = info.getStoredKey();
|
|
|
alreadyProcessed.append(*id.getClear());
|
|
@@ -5531,14 +5564,14 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
|
|
|
break;
|
|
|
case no_independent:
|
|
|
case no_once:
|
|
|
- info.extractStoredInfo(&cur, original, isRoxie);
|
|
|
+ info.extractStoredInfo(&cur, codehash, isRoxie, multiplePersistInstances);
|
|
|
break;
|
|
|
case no_success:
|
|
|
{
|
|
|
OwnedHqlExpr successExpr = transformSequentialEtc(cur.queryChild(0));
|
|
|
conts.success = splitValue(successExpr);
|
|
|
Owned<IWorkflowItem> wf = addWorkflowContingencyToWorkunit(conts.success, WFTypeSuccess, WFModeNormal, queryDirectDependencies(successExpr), NULL, wfid);
|
|
|
- info.extractStoredInfo(&cur, original, isRoxie);
|
|
|
+ info.extractStoredInfo(&cur, codehash, isRoxie, multiplePersistInstances);
|
|
|
break;
|
|
|
}
|
|
|
case no_failure:
|
|
@@ -5546,7 +5579,7 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
|
|
|
OwnedHqlExpr failureExpr = transformSequentialEtc(cur.queryChild(0));
|
|
|
conts.failure = splitValue(failureExpr);
|
|
|
Owned<IWorkflowItem> wf = addWorkflowContingencyToWorkunit(conts.failure, WFTypeFailure, WFModeNormal, queryDirectDependencies(failureExpr), NULL, wfid);
|
|
|
- info.extractStoredInfo(&cur, original, isRoxie);
|
|
|
+ info.extractStoredInfo(&cur, codehash, isRoxie, multiplePersistInstances);
|
|
|
break;
|
|
|
}
|
|
|
case no_recovery:
|
|
@@ -5554,7 +5587,7 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
|
|
|
conts.recovery = splitValue(cur.queryChild(0));
|
|
|
conts.retries = (unsigned)getIntValue(cur.queryChild(1), 0);
|
|
|
Owned<IWorkflowItem> wf = addWorkflowContingencyToWorkunit(conts.recovery, WFTypeRecovery, WFModeNormal, queryDirectDependencies(cur.queryChild(0)), NULL, wfid);
|
|
|
- info.extractStoredInfo(&cur, original, isRoxie);
|
|
|
+ info.extractStoredInfo(&cur, codehash, isRoxie, multiplePersistInstances);
|
|
|
break;
|
|
|
}
|
|
|
case no_attr:
|
|
@@ -5685,7 +5718,7 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
|
|
|
info.storedName->queryValue()->getStringValue(persistName);
|
|
|
unsigned persistWfid = ++wfidCount;
|
|
|
Owned<IWorkflowItem> wf = addWorkflowToWorkunit(wfid, WFTypeNormal, WFModePersist, queryDirectDependencies(setValue), conts, info.queryCluster());
|
|
|
- setWorkflowPersist(wf, persistName.str(), persistWfid);
|
|
|
+ setWorkflowPersist(wf, persistName.str(), persistWfid, info.queryMaxPersistCopies());
|
|
|
|
|
|
Owned<IWorkflowItem> wfPersist = addWorkflowToWorkunit(persistWfid, WFTypeNormal, WFModeNormal, NULL);
|
|
|
DependenciesUsed dependencies(false);
|
|
@@ -5698,7 +5731,7 @@ IHqlExpression * WorkflowTransformer::extractWorkflow(IHqlExpression * untransfo
|
|
|
checkArgs.append(*createExprAttribute(_files_Atom, dependencies.tablesRead));
|
|
|
if (dependencies.resultsRead.ordinality())
|
|
|
checkArgs.append(*createExprAttribute(_results_Atom, dependencies.resultsRead));
|
|
|
- checkArgs.append(*createAttribute(_original_Atom, LINK(original)));
|
|
|
+ checkArgs.append(*createAttribute(_codehash_Atom, LINK(codehash)));
|
|
|
checkArgs.append(*createAttribute(namedAtom, LINK(info.storedName)));
|
|
|
if (expr->isDataset())
|
|
|
checkArgs.append(*createAttribute(fileAtom));
|