/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #include "jliball.hpp" #include "hql.hpp" #include "platform.h" #include "jlib.hpp" #include "jmisc.hpp" #include "jstream.ipp" #include "jdebug.hpp" #include "hql.hpp" #include "hqlthql.hpp" #include "hqlhtcpp.ipp" #include "hqlttcpp.ipp" #include "hqlutil.hpp" #include "hqlthql.hpp" #include "hqlpmap.hpp" #include "hqlwcpp.hpp" #include "hqlcpputil.hpp" #include "hqltcppc.ipp" #include "hqlopt.hpp" #include "hqlfold.hpp" #include "hqlcerrors.hpp" #include "hqlcatom.hpp" #include "hqlgraph.ipp" #include "thorcommon.hpp" //--------------------------------------------------------------------------- IPropertyTree * addGraphAttribute(IPropertyTree * node, const char * name) { IPropertyTree * att = createPTree(); att->setProp("@name", name); return node->addPropTree("att", att); } void addGraphAttribute(IPropertyTree * node, const char * name, const char * value) { addGraphAttribute(node, name)->setProp("@value", value); } void addGraphAttributeInt(IPropertyTree * node, const char * name, __int64 value) { addGraphAttribute(node, name)->setPropInt64("@value", value); } void addGraphAttributeBool(IPropertyTree * node, const char * name, bool value) { if (value) addGraphAttribute(node, name)->setPropBool("@value", value); } void addSimpleGraphEdge(IPropertyTree * subGraph, unsigned __int64 source, unsigned __int64 target, unsigned outputIndex, unsigned inputIndex, _ATOM kind, const char * label, bool nWay) { IPropertyTree *edge = createPTree(); edge->setPropInt64("@target", target); edge->setPropInt64("@source", source); if (outputIndex != 0) addGraphAttributeInt(edge, "_sourceIndex", outputIndex); if (inputIndex != 0) addGraphAttributeInt(edge, "_targetIndex", inputIndex); if (label) edge->setProp("@label", label); if (kind == dependencyAtom) addGraphAttributeBool(edge, "_dependsOn", true); if (nWay) edge->setPropBool("@nWay", true); StringBuffer s; edge->setProp("@id", s.append(source).append('_').append(outputIndex).str()); subGraph->addPropTree("edge", edge); } void addComplexGraphEdge(IPropertyTree * graph, unsigned __int64 sourceGraph, unsigned __int64 targetGraph, unsigned __int64 sourceActivity, unsigned __int64 targetActivity, unsigned outputIndex, _ATOM kind, const char * label) { StringBuffer idText; IPropertyTree *edge = createPTree(); edge->setProp("@id", idText.clear().append(sourceGraph).append('_').append(targetGraph).append("_").append(outputIndex).str()); edge->setPropInt64("@target", sourceGraph); edge->setPropInt64("@source", targetGraph); if (label) edge->setProp("@label", label); if (outputIndex) addGraphAttributeInt(edge, "_sourceIndex", outputIndex); if (kind == dependencyAtom) addGraphAttributeBool(edge, "_dependsOn", true); addGraphAttributeInt(edge, "_sourceActivity", sourceActivity); addGraphAttributeInt(edge, "_targetActivity", targetActivity); graph->addPropTree("edge", edge); } void removeGraphAttribute(IPropertyTree * node, const char * name) { StringBuffer xpath; xpath.append("att[@name=\"").append(name).append("\"]"); node->removeProp(xpath.str()); } static void queryExpandFilename(StringBuffer & out, IHqlExpression * expr) { if (expr) { OwnedHqlExpr folded = foldHqlExpression(expr); if (folded->queryValue()) folded->queryValue()->generateECL(out.append('\n')); } } //--------------------------------------------------------------------------- static void addDependent(HqlExprArray & dependents, IHqlExpression * expr) { if (dependents.find(*expr) == NotFound) dependents.append(*LINK(expr)); } LogicalGraphCreator::LogicalGraphCreator(IWorkUnit * _wu) { lockTransformMutex(); seq = 0; wu = _wu; expandPersist = wu->getDebugValueBool("logicalGraphExpandPersist", true); expandStored = wu->getDebugValueBool("logicalGraphExpandStored", false); includeNameInText = wu->getDebugValueBool("logicalGraphIncludeName", true); includeModuleInText = wu->getDebugValueBool("logicalGraphIncludeModule", true); displayJavadoc = wu->getDebugValueBool("logicalGraphDisplayJavadoc", true); displayJavadocParameters = wu->getDebugValueBool("logicalGraphDisplayJavadocParameters", false); rootGraphId = 0; subGraphId = 0; } LogicalGraphCreator::~LogicalGraphCreator() { unlockTransformMutex(); } void LogicalGraphCreator::addAttribute(const char * name, const char * value) { addGraphAttribute(activityNode, name, value); } void LogicalGraphCreator::addAttribute(const char * name, _ATOM value) { if (value) addGraphAttribute(activityNode, name, value->str()); } void LogicalGraphCreator::addAttributeInt(const char * name, __int64 value) { addGraphAttributeInt(activityNode, name, value); } void LogicalGraphCreator::addAttributeBool(const char * name, bool value) { addGraphAttributeBool(activityNode, name, value); } void LogicalGraphCreator::beginActivity(const char * label, unique_id_t id) { activityNode.set(curSubGraph()->addPropTree("node", createPTree())); if (label) activityNode->setProp("@label", label); activityNode->setPropInt64("@id", id); } void LogicalGraphCreator::beginSubGraph(const char * label, bool nested) { savedGraphId.append(subGraphId); if (!nested) saveSubGraphs(); if ((subGraphs.ordinality() == 0) && rootSubGraph) { subGraphs.append(*LINK(rootSubGraph)); subGraphId = rootGraphId; return; } subGraphId = ++seq; IPropertyTree * node = createPTree("node"); node = curSubGraph()->addPropTree("node", node); node->setPropInt64("@id", subGraphId); IPropertyTree * graphAttr = node->addPropTree("att", createPTree("att")); IPropertyTree * subGraph = graphAttr->addPropTree("graph", createPTree("graph")); subGraphs.append(*LINK(subGraph)); if (!rootSubGraph) { rootSubGraph.set(subGraph); rootGraphId = subGraphId; } } void LogicalGraphCreator::connectActivities(IHqlExpression * fromExpr, IHqlExpression * toExpr, _ATOM kind, const char * label, bool nWay) { StringBuffer tempLabel; if (fromExpr->getOperator() == no_comma) { fromExpr->queryChild(1)->queryValue()->getStringValue(tempLabel); fromExpr = fromExpr->queryChild(0); label = tempLabel.str(); } LogicalGraphInfo * from = queryExtra(fromExpr); LogicalGraphInfo * to = queryExtra(toExpr); if (kind == dependencyAtom) { LogicalGraphInfo * temp = from; from = to; to = temp; } if (from->subGraphId == to->subGraphId) addSimpleGraphEdge(curSubGraph(), from->id, to->id, from->outputCount++, 0, kind, label, nWay); else if (false) addSimpleGraphEdge(curSubGraph(), from->id, to->subGraphId, from->outputCount++, 0, kind, label, nWay); else addComplexGraphEdge(graph, from->subGraphId, to->subGraphId, from->id, to->id, from->outputCount++, kind, label); } void LogicalGraphCreator::createLogicalGraph(HqlExprArray & exprs) { graph.setown(createPTree("graph")); // beginSubGraph(NULL, false); ForEachItemIn(i, exprs) createRootGraphActivity(&exprs.item(i)); // endSubGraph(); Owned wug = wu->updateGraph("Logical"); wug->setXGMMLTree(graph.getClear()); wug->setType(GraphTypeEcl); } void LogicalGraphCreator::createRootGraphActivity(IHqlExpression * expr) { switch (expr->getOperator()) { case no_comma: case no_compound: case no_sortlist: { ForEachChild(i, expr) createRootGraphActivity(expr->queryChild(i)); return; } default: beginSubGraph(NULL, false); createGraphActivity(expr); endSubGraph(false); } } static void expandUnnamedFunnel(HqlExprArray & inputs, IHqlExpression * expr) { while ((expr->getOperator() == no_addfiles) && (expr->queryBody() == expr)) { expandUnnamedFunnel(inputs, expr->queryChild(0)); expr = expr->queryChild(1); } inputs.append(*LINK(expr)); } void LogicalGraphCreator::createGraphActivity(IHqlExpression * expr) { LogicalGraphInfo * extra = queryExtra(expr); if (extra->globalId && !inSubQuery()) { extra->id = extra->globalId; return; } //First generate children... //MORE: may want to do inputs first and dependents afterwards. _ATOM dependencyKind = dependencyAtom; unsigned first = getFirstActivityArgument(expr); unsigned last = first + getNumActivityArguments(expr); node_operator op = expr->getOperator(); HqlExprArray inputs, dependents; bool defaultInputs = true; switch (op) { case no_setresult: case no_map: last = first; dependencyKind = NULL; break; case no_select: if (!isNewSelector(expr)) { last = first; } break; case no_addfiles: expandUnnamedFunnel(inputs, expr->queryBody()); defaultInputs = false; break; case no_colon: { if (!isWorkflowExpanded(expr)) defaultInputs = false; gatherWorkflowActivities(expr, dependents); inputs.append(*LINK(expr->queryChild(0))); defaultInputs = false; break; } case no_forcelocal: case no_forcenolocal: case no_allnodes: case no_thisnode: { IHqlExpression * child = expr->queryChild(0); createSubGraphActivity(child); addDependent(dependents, child); defaultInputs = false; break; } } if (defaultInputs) { ForEachChild(i, expr) { IHqlExpression * cur = expr->queryChild(i); if ((i >= first && i < last) && !cur->isAttribute()) inputs.append(*LINK(cur)); else if (includeChildInDependents(expr, i)) gatherGraphActivities(cur, dependents); } } ForEachItemIn(i0, inputs) createGraphActivity(&inputs.item(i0)); //Generate the node for this activity unique_id_t id = ++seq; bool isGrouped = isGroupedActivity(expr); bool isLocal = !isGrouped && isLocalActivity(expr) && localChangesActivity(expr); StringBuffer tempText; beginActivity(getActivityText(expr, tempText), id); StringBuffer eclText; toECL(expr->queryBody(), eclText, false, true); addAttribute("ecl", eclText.str()); addAttributeBool("grouped", isGrouped); addAttributeBool("local", isLocal); IHqlExpression * symbol = queryNamedSymbol(expr); if (symbol) { addAttribute("name", symbol->queryName()); addAttribute("module", symbol->queryFullModuleName()); addAttributeInt("line", symbol->getStartLine()); addAttributeInt("column", symbol->getStartColumn()); } endActivity(); if (!inSubQuery()) extra->globalId = id; extra->id = id; extra->subGraphId = subGraphId; ForEachItemIn(i1, inputs) { IHqlExpression * cur = &inputs.item(i1); if (!cur->isAttribute()) { const char * label = NULL; switch (expr->getOperator()) { case no_if: label = (i1 == 0) ? "True" : "False"; break; case no_fetch: break; default: switch (getChildDatasetType(expr)) { case childdataset_leftright: case childdataset_top_left_right: if (!isKeyedJoin(expr)) { label = (i1 == 0) ? "LEFT" : "RIGHT"; } break; } } connectActivities(cur, expr, NULL, label); } } ForEachItemIn(j, dependents) connectActivities(&dependents.item(j), expr, dependencyKind); } void LogicalGraphCreator::createSubGraphActivity(IHqlExpression * expr) { beginSubGraph(NULL, true); createGraphActivity(expr); endSubGraph(true); } IPropertyTree * LogicalGraphCreator::curSubGraph() { if (subGraphs.ordinality()) return &subGraphs.tos(); return graph; } void LogicalGraphCreator::endActivity() { activityNode.clear(); } void LogicalGraphCreator::endSubGraph(bool nested) { subGraphs.pop(); subGraphId = savedGraphId.pop(); if (!nested) restoreSubGraphs(); } static bool exprIsGlobal(IHqlExpression * expr) { HqlExprCopyArray inScope; expr->gatherTablesUsed(NULL, &inScope); return inScope.ordinality() == 0; } void LogicalGraphCreator::gatherWorkflowActivities(IHqlExpression * expr, HqlExprArray & dependents) { HqlExprArray actions; expr->queryChild(1)->unwindList(actions, no_comma); ForEachItemIn(i, actions) { IHqlExpression & cur = actions.item(i); switch (cur.getOperator()) { case no_success: case no_failure: case no_recovery: { IHqlExpression * action = cur.queryChild(0); createRootGraphActivity(action); OwnedHqlExpr text = createConstant(getOpString(cur.getOperator())); OwnedHqlExpr labeled = createComma(LINK(action), LINK(text)); addDependent(dependents, labeled); break; } } } } bool LogicalGraphCreator::gatherGraphActivities(IHqlExpression * expr, HqlExprArray & dependents) { LogicalGraphInfo * extra = queryExtra(expr); if (extra->noDependents) return false; //if no_select with new then create a child graph node_operator op = expr->getOperator(); switch (op) { case no_select: { if (isNewSelector(expr)) { if (exprIsGlobal(expr)) createRootGraphActivity(expr); else createSubGraphActivity(expr); addDependent(dependents, expr); return true; } else return false; } case NO_AGGREGATE: { if (exprIsGlobal(expr)) createRootGraphActivity(expr); else createSubGraphActivity(expr); addDependent(dependents, expr); return true; } case no_colon: { if (!isWorkflowExpanded(expr)) return false; gatherWorkflowActivities(expr, dependents); } break; case no_attr: case no_attr_link: case no_attr_expr: case no_setmeta: return false; } if (extra->globalId && !inSubQuery()) { extra->id = extra->globalId; addDependent(dependents, expr); return true; } switch (op) { case no_newkeyindex: case no_table: return false; } bool hasDependents = false; ForEachChild(i, expr) { if (gatherGraphActivities(expr->queryChild(i), dependents)) hasDependents = true; } extra->noDependents = !hasDependents; return hasDependents; } static void capitaliseOpText(StringBuffer & out, IHqlExpression * expr) { const char * opText = getOpString(expr->getOperator()); //Capitalise the keywords if (opText && *opText) { out.append(*opText); while (*++opText) out.append((char)tolower(*opText)); } } const char * LogicalGraphCreator::getActivityText(IHqlExpression * expr, StringBuffer & temp) { if (expr->queryBody() != expr) { _ATOM module = includeModuleInText ? expr->queryFullModuleName() : NULL; _ATOM name = includeNameInText ? expr->queryName() : NULL; StringBuffer header; if (module) { if (name) { //module and name supplied. module may be the form .. //If so, display . if the name matches the attr, else .:: const char * dot = strrchr(module->str(), '.'); if (dot) { if (stricmp(dot+1, name->str()) == 0) header.append(module); else header.append(module).append("::").append(name); } else header.append(module).append(".").append(name); } else header.append(module); } else if (name) header.append(name); if (header.length()) temp.append(header).append("\n"); } if (displayJavadoc) { Owned doc = expr->getDocumentation(); if (doc) { doc->getProp("", temp); if (displayJavadocParameters && doc->hasProp("param")) { temp.append("\nParameters:"); Owned iter = doc->getElements("param"); ForEach(*iter) { Owned cur = &iter->get(); temp.append("\n"); cur->getProp("", temp); } } return temp.str(); } } switch (expr->getOperator()) { case no_table: { IHqlExpression * mode = expr->queryChild(2); switch (mode->getOperator()) { case no_csv: case no_xml: temp.append(getOpString(expr->getOperator())).append(" "); break; } temp.append("Dataset"); queryExpandFilename(temp, expr->queryChild(0)); break; } break; case no_newkeyindex: { temp.append(getOpString(expr->getOperator())); queryExpandFilename(temp, expr->queryChild(3)); break; } case no_newusertable: { temp.append("Table"); break; } case no_output: { IHqlExpression * filename = queryRealChild(expr, 1); if (filename) { temp.append("Output"); if (expr->hasProperty(xmlAtom)) temp.append(" XML"); else if (expr->hasProperty(csvAtom)) temp.append(" CSV"); queryExpandFilename(temp, filename); } else { IHqlExpression * seq = querySequence(expr); IHqlExpression * name = queryResultName(expr); temp.append(::getActivityText(TAKworkunitwrite)).append("\n"); getStoredDescription(temp, seq, name, true); } break; } case no_temptable: case no_inlinetable: temp.append("Inline Dataset"); break; case no_addfiles: temp.append(::getActivityText(TAKfunnel)); break; case no_colon: { temp.append("Workflow:"); HqlExprArray actions; expr->queryChild(1)->unwindList(actions, no_comma); ForEachItemIn(i, actions) { IHqlExpression * cur = &actions.item(i); if (!cur->isAttribute()) { temp.append(' '); toECL(cur, temp, false, false); } } break; } case no_extractresult: case no_setresult: { IHqlExpression * sequence = queryPropertyChild(expr, sequenceAtom, 0); IHqlExpression * name = queryPropertyChild(expr, namedAtom, 0); temp.append("Store\n"); getStoredDescription(temp, sequence, name, true); break; } case no_join: if (isKeyedJoin(expr)) { temp.append("Keyed Join"); IHqlExpression * index = expr->queryChild(1)->queryNormalizedSelector(); if (index->getOperator() == no_newkeyindex) queryExpandFilename(temp, index->queryChild(3)); } else temp.append("Join"); break; case no_selfjoin: temp.append("Self Join"); break; case no_select: temp.append("Select "); if (expr->isDataset()) temp.append("Dataset"); else if (expr->isDatarow()) temp.append("Row"); else temp.append("Field"); temp.append("\n"); temp.append(expr->queryChild(1)->queryName()); break; case no_group: { IHqlExpression * sortlist = queryRealChild(expr, 1); if (sortlist && sortlist->numChildren() != 0) capitaliseOpText(temp, expr); else temp.append("Degroup"); break; } default: if (expr->isAction() || expr->isDataset() || expr->isAggregate()) capitaliseOpText(temp, expr); else temp.append("Result"); break; } return temp.str(); } bool LogicalGraphCreator::inSubQuery() const { return subGraphs.ordinality() > 1; } bool LogicalGraphCreator::isWorkflowExpanded(IHqlExpression * expr) const { IHqlExpression * actions = expr->queryChild(1); ForEachChild(i, actions) { IHqlExpression * cur = actions->queryChild(i); switch (cur->getOperator()) { case no_persist: if (!expandPersist) return false; break; case no_stored: if (!expandStored) return false; break; default: return true; } } return true; } LogicalGraphInfo * LogicalGraphCreator::queryExtra(IHqlExpression * expr) { LogicalGraphInfo * extra = (LogicalGraphInfo *)expr->queryTransformExtra(); if (!extra) { extra = new LogicalGraphInfo(expr); expr->setTransformExtraOwned(extra); } return extra; } void LogicalGraphCreator::restoreSubGraphs() { subGraphs.kill(); unsigned level = savedLevels.pop(); while (saved.ordinality() != level) subGraphs.append(saved.popGet()); } void LogicalGraphCreator::saveSubGraphs() { savedLevels.append(saved.ordinality()); ForEachItemInRev(i, subGraphs) saved.append(subGraphs.item(i)); subGraphs.kill(true); }