瀏覽代碼

HPCC-22231 Analyse workunit stats for distribute output row skews

Signed-off-by: Shamser Ahmed <shamser.ahmed@lexisnexis.co.uk>
Shamser Ahmed 6 年之前
父節點
當前提交
275e36f9a3

+ 1 - 0
common/CMakeLists.txt

@@ -26,4 +26,5 @@ HPCC_ADD_SUBDIRECTORY (remote)
 HPCC_ADD_SUBDIRECTORY (roxiecommlib)
 HPCC_ADD_SUBDIRECTORY (thorhelper)
 HPCC_ADD_SUBDIRECTORY (workunit)
+HPCC_ADD_SUBDIRECTORY (wuanalysis)
 HPCC_ADD_SUBDIRECTORY (wuwebview "PLATFORM")

+ 68 - 0
common/wuanalysis/CMakeLists.txt

@@ -0,0 +1,68 @@
+################################################################################
+#    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+#
+#    Licensed under the Apache License, Version 2.0 (the "License");
+#    you may not use this file except in compliance with the License.
+#    You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#    Unless required by applicable law or agreed to in writing, software
+#    distributed under the License is distributed on an "AS IS" BASIS,
+#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#    See the License for the specific language governing permissions and
+#    limitations under the License.
+################################################################################
+
+
+# File      : CMakeLists.txt
+# Component: wuanalysis
+#####################################################
+# Description:
+# ------------
+#    Cmake Input File for wuanalysis
+#####################################################
+
+cmake_policy( SET CMP0011 NEW )
+
+project( wuanalysis )
+
+set (    SRCS
+         anacommon.cpp
+         anarule.cpp
+         anawu.cpp
+
+         anacommon.hpp
+         anarule.hpp
+         anawu.hpp
+    )
+
+include_directories (
+         ${CMAKE_BINARY_DIR}
+         ${CMAKE_BINARY_DIR}/oss
+         ${HPCC_SOURCE_DIR}/system/mp
+         ${HPCC_SOURCE_DIR}/system/include
+         ${HPCC_SOURCE_DIR}/system/security/shared
+         ${HPCC_SOURCE_DIR}/dali/base
+         ${HPCC_SOURCE_DIR}/system/jlib
+         ${HPCC_SOURCE_DIR}/rtl/include
+         ${HPCC_SOURCE_DIR}/rtl/eclrtl
+         ${HPCC_SOURCE_DIR}/common/thorhelper
+         ${HPCC_SOURCE_DIR}/common/workunit
+    )
+
+HPCC_ADD_LIBRARY( wuanalysis SHARED ${SRCS} )
+
+set_target_properties(wuanalysis PROPERTIES
+    COMPILE_FLAGS -D_USRDLL
+    DEFINE_SYMBOL WUANALYSIS_EXPORTS )
+
+install ( TARGETS wuanalysis RUNTIME DESTINATION ${EXEC_DIR} LIBRARY DESTINATION ${LIB_DIR} )
+
+target_link_libraries ( wuanalysis
+         jlib
+         mp
+         hrpc
+         workunit
+         thorhelper
+    )

+ 47 - 0
common/wuanalysis/anacommon.cpp

@@ -0,0 +1,47 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+#include "anacommon.hpp"
+
+int compareIssuesCostOrder(CInterface * const * _l, CInterface * const * _r)
+{
+    const PerformanceIssue * l = static_cast<const PerformanceIssue *>(*_l);
+    const PerformanceIssue * r = static_cast<const PerformanceIssue *>(*_r);
+    return l->compareCost(*r);
+}
+
+int PerformanceIssue::compareCost(const PerformanceIssue & other) const
+{
+    if (cost == other.cost)
+        return 0;
+    else
+        return cost > other.cost ? -1 : +1;
+}
+
+void PerformanceIssue::print() const
+{
+    printf("[%" I64F "dms] %s: %s\n", statUnits2msecs(cost), scope.str(), comment.str());
+}
+
+void PerformanceIssue::set(stat_type _cost, const char * msg, ...)
+{
+    cost = _cost;
+    va_list args;
+    va_start(args, msg);
+    comment.valist_appendf(msg, args);
+    va_end(args);
+}
+

+ 47 - 0
common/wuanalysis/anacommon.hpp

@@ -0,0 +1,47 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef ANACOMMON_HPP
+#define ANACOMMON_HPP
+
+#include "jliball.hpp"
+
+#ifdef WUANALYSIS_EXPORTS
+    #define WUANALYSIS_API DECL_EXPORT
+#else
+    #define WUANALYSIS_API DECL_IMPORT
+#endif
+
+class PerformanceIssue : public CInterface
+{
+public:
+    int compareCost(const PerformanceIssue & other) const;
+
+    void print() const;
+    void set(stat_type _cost, const char * msg, ...) __attribute__((format(printf, 3, 4)));
+    void setScope(const char *_scope) { scope.set(scope); };
+    stat_type getCost() const          { return cost; }
+
+private:
+    StringAttr scope;
+    stat_type cost = 0;      // number of nanoseconds lost as a result.
+    StringBuffer comment;
+};
+
+extern int compareIssuesCostOrder(CInterface * const * _l, CInterface * const * _r);
+
+#endif

+ 81 - 0
common/wuanalysis/anarule.cpp

@@ -0,0 +1,81 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "jliball.hpp"
+
+#include "workunit.hpp"
+#include "anarule.hpp"
+#include "commonext.hpp"
+
+class ActivityKindRule : public AActivityRule
+{
+public:
+    ActivityKindRule(ThorActivityKind _kind) : kind(_kind) {}
+
+    virtual bool isCandidate(IWuActivity & activity) const override
+    {
+        return (activity.getAttr(WaKind) == kind);
+    }
+
+protected:
+    ThorActivityKind kind;
+};
+
+//--------------------------------------------------------------------------------------------------------------------
+
+class DistributeSkewRule : public ActivityKindRule
+{
+public:
+    DistributeSkewRule() : ActivityKindRule(TAKhashdistribute) {}
+
+    virtual bool check(PerformanceIssue & result, IWuActivity & activity, const WuAnalyseOptions & options) override
+    {
+        IWuEdge * outputEdge = activity.queryOutput(0);
+        if (!outputEdge)
+            return false;
+        stat_type rowsAvg = outputEdge->getStatRaw(StNumRowsProcessed, StAvgX);
+        if (rowsAvg < rowsThreshold)
+            return false;
+        stat_type rowsMaxSkew = outputEdge->getStatRaw(StNumRowsProcessed, StSkewMax);
+        if (rowsMaxSkew > options.skewThreshold)
+        {
+            // Use downstream activity time to calculate approximate cost
+            IWuActivity * targetActivity = outputEdge->queryTarget();
+            assertex(targetActivity);
+            stat_type timeMaxLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StMaxX);
+            stat_type timeAvgLocalExecute = targetActivity->getStatRaw(StTimeLocalExecute, StAvgX);
+            // Consider ways to improve this cost calculation further
+            stat_type cost = timeMaxLocalExecute - timeAvgLocalExecute;
+
+            IWuEdge * inputEdge = activity.queryInput(0);
+            if (inputEdge && (inputEdge->getStatRaw(StNumRowsProcessed, StSkewMax) < rowsMaxSkew))
+                result.set(cost, "DISTRIBUTE output skew is worse than input skew");
+            else
+                result.set(cost, "Significant skew in DISTRIBUTE output");
+            return true;
+        }
+        return false;
+    }
+
+protected:
+    static const stat_type rowsThreshold = 100;                // avg rows per node.
+};
+
+void gatherRules(CIArrayOf<AActivityRule> & rules)
+{
+    rules.append(*new DistributeSkewRule);
+}

+ 42 - 0
common/wuanalysis/anarule.hpp

@@ -0,0 +1,42 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef ANARULE_HPP
+#define ANARULE_HPP
+
+#include "anacommon.hpp"
+#include "anawu.hpp"
+#include "jstatcodes.h"
+#include "wuattr.hpp"
+
+struct WuAnalyseOptions
+{
+    stat_type minInterestingTime = msecs2StatUnits(10);// ignore anything under 10 millisecond
+    stat_type minCost = seconds2StatUnits(1);          // only interested in costs of > 1s
+    stat_type skewThreshold = statSkewPercent(20);     // minimum interesting skew measurment
+};
+
+class AActivityRule : public CInterface
+{
+public:
+    virtual bool isCandidate(IWuActivity & activity) const = 0;
+    virtual bool check(PerformanceIssue & results, IWuActivity & activity, const WuAnalyseOptions & options) = 0;
+};
+
+void gatherRules(CIArrayOf<AActivityRule> & rules);
+
+#endif

+ 391 - 0
common/wuanalysis/anawu.cpp

@@ -0,0 +1,391 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "jliball.hpp"
+
+#include "workunit.hpp"
+#include "anacommon.hpp"
+#include "anarule.hpp"
+#include "anawu.hpp"
+
+class WuScope;
+class WorkunitAnalyser;
+
+class WuScopeHashTable : public SuperHashTableOf<WuScope, const char>
+{
+public:
+    ~WuScopeHashTable() { _releaseAll(); }
+
+    void addNew(WuScope * newWuScope) { SuperHashTableOf::addNew(newWuScope);}
+    virtual void     onAdd(void *et) {};
+    virtual void     onRemove(void *et);
+    virtual unsigned getHashFromElement(const void *et) const;
+    virtual unsigned getHashFromFindParam(const void *fp) const;
+    virtual const void * getFindParam(const void *et) const;
+    virtual bool matchesFindParam(const void *et, const void *key, unsigned fphash) const;
+    virtual bool matchesElement(const void *et, const void *searchET) const;
+};
+
+inline unsigned hashScope(const char * name) { return hashc((const byte *)name, strlen(name), 0); }
+
+class WuScope : public CInterface, implements IWuEdge, implements IWuActivity
+{
+public:
+    WuScope(const char * _name, WuScope * _parent) : name(_name), parent(_parent)
+    {
+        attrs.setown(createPTree());
+    }
+
+    void applyRules(WorkunitAnalyser & analyser);
+    void connectActivities();
+
+    WuScope * select(const char * scope);  // Returns matching wuScope (create if no pre-existing)
+    void setInput(unsigned i, WuScope * scope);  // Save i'th target in inputs
+    void setOutput(unsigned i, WuScope * scope); // Save i'th source in output
+
+    virtual stat_type getStatRaw(StatisticKind kind, StatisticKind variant = StKindNone) const;
+    virtual unsigned getAttr(WuAttr kind) const;
+    virtual void getAttr(StringBuffer & result, WuAttr kind) const;
+
+    inline const char * queryName() const { return name; }
+    inline IPropertyTree * queryAttrs() const { return attrs; }
+    virtual WuScope * querySource() override;
+    virtual WuScope * queryTarget() override;
+    virtual IWuEdge * queryInput(unsigned idx)  { return (idx < inputs.size()) ? inputs[idx]:nullptr; }
+    virtual IWuEdge * queryOutput(unsigned idx) { return (idx < outputs.size()) ? outputs[idx]:nullptr; }
+    StatisticScopeType queryScopeType() const   { return (StatisticScopeType)attrs->getPropInt("@stype");}
+    const char * queryScopeTypeName() const     { return ::queryScopeTypeName(queryScopeType()); }
+
+    void trace(unsigned indent=0) const;
+protected:
+    StringAttr name;
+    WuScope * parent = nullptr;
+    Owned<IPropertyTree> attrs;
+    WuScopeHashTable scopes;
+    std::vector<WuScope *> inputs;
+    std::vector<WuScope *> outputs;
+};
+
+//-----------------------------------------------------------------------------------------------------------
+class WorkunitAnalyser
+{
+public:
+    WorkunitAnalyser(WuAnalyseOptions & _options);
+
+    void check(const char * scope, IWuActivity & activity);
+    void analyse(IConstWorkUnit * wu);
+
+protected:
+    void collateWorkunitStats(IConstWorkUnit * workunit, const WuScopeFilter & filter);
+    void printWarnings();
+    WuScope * selectFullScope(const char * scope);
+
+protected:
+    CIArrayOf<AActivityRule> rules;
+    CIArrayOf<PerformanceIssue> issues;
+    WuScope root;
+    WuAnalyseOptions & options;
+};
+
+//-----------------------------------------------------------------------------------------------------------
+void WuScopeHashTable::onRemove(void *et)
+{
+    WuScope * elem = reinterpret_cast<WuScope *>(et);
+    elem->Release();
+}
+unsigned WuScopeHashTable::getHashFromElement(const void *et) const
+{
+    const WuScope * elem = reinterpret_cast<const WuScope *>(et);
+    return hashScope(elem->queryName());
+}
+
+unsigned WuScopeHashTable::getHashFromFindParam(const void *fp) const
+{
+    const char * search = reinterpret_cast<const char *>(fp);
+    return hashScope(search);
+}
+
+const void * WuScopeHashTable::getFindParam(const void *et) const
+{
+    const WuScope * elem = reinterpret_cast<const WuScope *>(et);
+    return elem->queryName();
+}
+
+bool WuScopeHashTable::matchesFindParam(const void *et, const void *key, unsigned fphash) const
+{
+    const WuScope * elem = reinterpret_cast<const WuScope *>(et);
+    const char * search = reinterpret_cast<const char *>(key);
+    return streq(elem->queryName(), search);
+}
+
+bool WuScopeHashTable::matchesElement(const void *et, const void *searchET) const
+{
+    const WuScope * elem = reinterpret_cast<const WuScope *>(et);
+    const WuScope * searchElem = reinterpret_cast<const WuScope *>(searchET);
+    return streq(elem->queryName(), searchElem->queryName());
+}
+
+//-----------------------------------------------------------------------------------------------------------
+void WuScope::applyRules(WorkunitAnalyser & analyser)
+{
+    for (auto & cur : scopes)
+    {
+        if (cur.queryScopeType() == SSTactivity)
+            analyser.check(cur.queryName(), cur);
+        cur.applyRules(analyser);
+    }
+}
+
+void WuScope::connectActivities()
+{
+    //Yuk - scopes can be added to while they are being iterated, so need to create a list first
+    CICopyArrayOf<WuScope> toWalk;
+    for (auto & cur : scopes)
+        toWalk.append(cur);
+
+    ForEachItemIn(i, toWalk)
+    {
+        WuScope & cur = toWalk.item(i);
+
+        if (cur.queryScopeType() == SSTedge)
+        {
+            WuScope * source = cur.querySource();
+            WuScope * sink = cur.queryTarget();
+            if (source)
+                source->setOutput(cur.getAttr(WaSourceIndex), &cur);
+            if (sink)
+                sink->setInput(cur.getAttr(WaTargetIndex), &cur);
+        }
+        cur.connectActivities();
+    }
+}
+
+WuScope * WuScope::select(const char * scope)
+{
+    assertex(scope);
+    WuScope * match = scopes.find(scope);
+    if (match)
+        return match;
+    match = new WuScope(scope, this);
+    scopes.addNew(match);
+    return match;
+}
+
+void WuScope::setInput(unsigned i, WuScope * scope)
+{
+    while (inputs.size() <= i)
+        inputs.push_back(nullptr);
+    inputs[i] = scope;
+}
+
+void WuScope::setOutput(unsigned i, WuScope * scope)
+{
+    while (outputs.size() <= i)
+        outputs.push_back(nullptr);
+    outputs[i] = scope;
+}
+
+WuScope * WuScope::querySource()
+{
+    const char * source = attrs->queryProp("@IdSource");
+    if (!source)
+        return nullptr;
+    return parent->select(source);
+}
+
+WuScope * WuScope::queryTarget()
+{
+    const char * target = attrs->queryProp("@IdTarget");
+    if (!target)
+        return nullptr;
+    return parent->select(target);
+}
+
+stat_type WuScope::getStatRaw(StatisticKind kind, StatisticKind variant) const
+{
+    StringBuffer name;
+    name.append('@').append(queryStatisticName(kind | variant));
+    return attrs->getPropInt64(name);
+}
+
+unsigned WuScope::getAttr(WuAttr attr) const
+{
+    StringBuffer name;
+    name.append('@').append(queryWuAttributeName(attr));
+    return attrs->getPropInt64(name);
+}
+
+void WuScope::getAttr(StringBuffer & result, WuAttr attr) const
+{
+    StringBuffer name;
+    name.append('@').append(queryWuAttributeName(attr));
+    attrs->getProp(result, name);
+}
+
+void WuScope::trace(unsigned indent) const
+{
+    printf("%*s%s: \"%s\" (", indent, " ", queryScopeTypeName(), queryName());
+    for (auto in : inputs)
+        printf("%s ", in->queryName());
+    printf("->");
+    for (auto out : outputs)
+        printf(" %s",  out->queryName());
+    printf(") [");
+    printf("children: ");
+    for(auto & scope: scopes)
+    {
+        printf("%s ", scope.queryName());
+    }
+    printf("] {\n");
+
+    printXML(queryAttrs(), indent);
+    for (auto & cur : scopes)
+    {
+        cur.trace(indent+4);
+    }
+    printf("%*s}\n",indent, " ");
+}
+
+
+//-----------------------------------------------------------------------------------------------------------
+/* Callback used to output scope properties as xml */
+class StatsGatherer : public IWuScopeVisitor
+{
+public:
+    StatsGatherer(IPropertyTree * _scope) : scope(_scope){}
+
+    virtual void noteStatistic(StatisticKind kind, unsigned __int64 value, IConstWUStatistic & cur) override
+    {
+        StringBuffer name;
+        name.append('@').append(queryStatisticName(kind));
+        scope->setPropInt64(name, value);
+    }
+    virtual void noteAttribute(WuAttr attr, const char * value)
+    {
+        StringBuffer name;
+        name.append('@').append(queryWuAttributeName(attr));
+        scope->setProp(name, value);
+    }
+    virtual void noteHint(const char * kind, const char * value)
+    {
+        throwUnexpected();
+    }
+    IPropertyTree * scope;
+};
+
+
+//-----------------------------------------------------------------------------------------------------------
+WorkunitAnalyser::WorkunitAnalyser(WuAnalyseOptions & _options) : root("", nullptr), options(_options)
+{
+    gatherRules(rules);
+}
+
+void WorkunitAnalyser::check(const char * scope, IWuActivity & activity)
+{
+    if (activity.getStatRaw(StTimeLocalExecute, StMaxX) < options.minInterestingTime)
+        return;
+
+    Owned<PerformanceIssue> highestCostIssue;
+    ForEachItemIn(i, rules)
+    {
+        if (rules.item(i).isCandidate(activity))
+        {
+            Owned<PerformanceIssue> issue (new PerformanceIssue);
+            if (rules.item(i).check(*issue, activity, options))
+            {
+                if (issue->getCost() >= options.minCost)
+                {
+                    if (!highestCostIssue || highestCostIssue->getCost() < issue->getCost())
+                        highestCostIssue.setown(issue.getClear());
+                }
+            }
+        }
+    }
+    if (highestCostIssue)
+    {
+        highestCostIssue->setScope(scope);
+        issues.append(*highestCostIssue.getClear());
+    }
+}
+
+void WorkunitAnalyser::analyse(IConstWorkUnit * wu)
+{
+    WuScopeFilter filter;
+    filter.addOutputProperties(PTstatistics).addOutputProperties(PTattributes);
+    filter.finishedFilter();
+    collateWorkunitStats(wu, filter);
+    root.connectActivities();
+    // root.trace();
+    root.applyRules(*this);
+    printWarnings();
+}
+
+void WorkunitAnalyser::collateWorkunitStats(IConstWorkUnit * workunit, const WuScopeFilter & filter)
+{
+    Owned<IConstWUScopeIterator> iter = &workunit->getScopeIterator(filter);
+    ForEach(*iter)
+    {
+        try
+        {
+            WuScope * scope = selectFullScope(iter->queryScope());
+
+            StatsGatherer callback(scope->queryAttrs());
+            scope->queryAttrs()->setPropInt("@stype", iter->getScopeType());
+            iter->playProperties(callback);
+        }
+        catch (IException * e)
+        {
+            e->Release();
+        }
+    }
+}
+
+void WorkunitAnalyser::printWarnings()
+{
+    issues.sort(compareIssuesCostOrder);
+
+    ForEachItemIn(i, issues)
+        issues.item(i).print();
+}
+
+WuScope * WorkunitAnalyser::selectFullScope(const char * scope)
+{
+    StringBuffer temp;
+    WuScope * resolved = &root;
+    for (;;)
+    {
+        if (!*scope)
+            return resolved;
+        const char * dot = strchr(scope, ':');
+        if (!dot)
+            return resolved->select(scope);
+
+        temp.clear().append(dot-scope, scope);
+        resolved = resolved->select(temp.str());
+        scope = dot+1;
+    }
+}
+
+//---------------------------------------------------------------------------------------------------------------------
+
+void analyseWorkunit(IConstWorkUnit * wu)
+{
+    WuAnalyseOptions options;
+    options.skewThreshold = statSkewPercent(10);
+    WorkunitAnalyser analyser(options);
+    analyser.analyse(wu);
+}

+ 57 - 0
common/wuanalysis/anawu.hpp

@@ -0,0 +1,57 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef ANAWU_HPP
+#define ANAWU_HPP
+
+#include "anacommon.hpp"
+#include "workunit.hpp"
+#include "eclhelper.hpp"
+
+interface IWuScope
+{
+    virtual stat_type getStatRaw(StatisticKind kind, StatisticKind variant = StKindNone) const = 0;
+    virtual unsigned getAttr(WuAttr kind) const = 0;
+    virtual void getAttr(StringBuffer & result, WuAttr kind) const = 0;
+};
+
+interface IWuActivity;
+interface IWuEdge : public IWuScope
+{
+    virtual IWuActivity * querySource() = 0;
+    virtual IWuActivity * queryTarget() = 0;
+};
+
+interface IWuActivity : public IWuScope
+{
+    virtual const char * queryName() const = 0;
+    virtual IWuEdge * queryInput(unsigned idx) = 0;
+    virtual IWuEdge * queryOutput(unsigned idx) = 0;
+    inline IWuActivity * queryInputActivity(unsigned idx)
+    {
+        IWuEdge * edge = queryInput(idx);
+        return edge ? edge->querySource() : nullptr;
+    }
+    inline ThorActivityKind queryThorActivityKind()
+    {
+        return (ThorActivityKind) getAttr(WaKind);
+    }
+};
+
+void WUANALYSIS_API analyseWorkunit(IConstWorkUnit * wu);
+
+#endif

+ 0 - 1
system/jlib/jstatcodes.h

@@ -250,5 +250,4 @@ enum StatisticKind
     //NOTE: Do not use 0x80000000 since wu attributes use those values, and they should not overlap
 };
 constexpr StatisticKind operator |(StatisticKind l, StatisticKind r) { return (StatisticKind)((unsigned)l | (unsigned)r); }
-
 #endif

+ 9 - 0
system/jlib/jstats.h

@@ -30,6 +30,15 @@ typedef unsigned __int64 stat_type;
 const unsigned __int64 MaxStatisticValue = (unsigned __int64)0-1U;
 const unsigned __int64 AnyStatisticValue = MaxStatisticValue; // Use the maximum value to also represent unknown, since it is unlikely to ever occur.
 
+inline constexpr stat_type seconds2StatUnits(stat_type secs) { return secs * 1000000000; }
+inline constexpr stat_type msecs2StatUnits(stat_type ms) { return ms * 1000000; }
+inline constexpr stat_type statUnits2seconds(stat_type stat) {return stat / 1000000000; }
+inline constexpr stat_type statUnits2msecs(stat_type stat) {return stat / 1000000; }
+
+inline constexpr stat_type statSkewPercent(int value) { return (stat_type)value * 100; }            // Since 1 = 0.01% skew
+inline constexpr stat_type statSkewPercent(long double value) { return (stat_type)(value * 100); }
+inline constexpr stat_type statSkewPercent(stat_type  value) { return (stat_type)(value * 100); }
+
 inline StatisticKind queryStatsVariant(StatisticKind kind) { return (StatisticKind)(kind & ~StKindMask); }
 
 //---------------------------------------------------------------------------------------------------------------------

+ 34 - 0
testing/wuanalysis/anahashdistrib1.ecl

@@ -0,0 +1,34 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and 
+    limitations under the License.
+############################################################################## */
+
+// Analysis should show: DISTRIBUTE output skew is worse than input skew
+//
+// NOTE: For faster nodes, it may be necessary to increase the size of testfile
+
+IMPORT * From common;
+
+visits := DATASET(testfile1, layout_visits, THOR);
+
+layout_visitCounts := RECORD
+  visits.url;
+  visits_cnt := COUNT(GROUP);
+END;
+
+visitcounts := TABLE(DISTRIBUTE(visits,HASH32(url[1..3])),
+                     layout_visitCounts,url,LOCAL);
+OUTPUT(visitcounts);
+

+ 34 - 0
testing/wuanalysis/anahashdistrib2.ecl

@@ -0,0 +1,34 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and 
+    limitations under the License.
+############################################################################## */
+
+// Analysis should show: Significant skew in DISTRIBUTE output
+//
+// NOTE: For faster nodes, it may be necessary to increase the size of testfile
+
+IMPORT * From common;
+
+visits := DATASET(testfile2, layout_visits, THOR);
+
+layout_visitCounts := RECORD
+  visits.url;
+  visits_cnt := COUNT(GROUP);
+END;
+
+visitcounts := TABLE(DISTRIBUTE(visits,HASH32(timestamp%2)),
+                     layout_visitCounts,url,LOCAL);
+OUTPUT(visitcounts);
+

+ 27 - 0
testing/wuanalysis/common.ecl

@@ -0,0 +1,27 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and 
+    limitations under the License.
+############################################################################## */
+
+Export common := Module
+    export layout_visits := RECORD
+        STRING20 User;
+        STRING30 url;
+        INTEGER8 timestamp;
+    END;
+    export testfile1 := 'regress::wuanalysis::largedata1';
+    export testfile2 := 'regress::wuanalysis::largedata2';
+End;
+

+ 52 - 0
testing/wuanalysis/setup.ecl

@@ -0,0 +1,52 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and 
+    limitations under the License.
+############################################################################## */
+
+IMPORT * from common;
+
+// Generate Data
+layout_person := RECORD
+   STRING20 user;
+END;
+
+layout_sites := RECORD
+   STRING30 url;
+END;
+
+persons := DATASET([{'Ned'},{'Robert'}, {'Jaime'}, {'Catelyn'}, {'Cersei'}, {'Daenerys'}, {'Jon'},
+                    {'Sansa'}, {'Arya'}, {'Robb'}, {'Theon'}, {'Bran'}, {'Joffrey'}, {'Hound'}, {'Tyrion'}], layout_person );
+                   
+sites := DATASET([{'www.yahoo.com'}, {'www.amazon.com'}, {'www.cnn.com'}, {'www.yahoo.com'}, {'www.bbc.co.uk'}], layout_sites);
+
+layout_visits f(layout_person l, layout_sites r) := TRANSFORM
+  SELF.user := l.user;
+  SELF.url := r.url;
+  SELF.timestamp := 0;
+END;
+
+v := JOIN(persons, sites, true, f(LEFT,RIGHT), ALL);
+
+layout_visits addTime(layout_visits l, INTEGER c) := TRANSFORM
+  SELF.timestamp := RANDOM();
+  SELF := L;
+END;
+
+visits1 := DISTRIBUTE(NORMALIZE(v, 60000, addTime(LEFT, COUNTER)), HASH32(timestamp));
+visits2 := DISTRIBUTE(NORMALIZE(v, 60000, addTime(LEFT, COUNTER)), HASH32(url));
+
+OUTPUT(visits1, , testfile1, OVERWRITE);
+OUTPUT(visits2, , testfile2, OVERWRITE);
+

+ 5 - 1
tools/wutool/CMakeLists.txt

@@ -36,6 +36,9 @@ include_directories (
          ./../../system/jlib 
          ./../../common/environment 
          ./../../common/workunit 
+         ./../../common/wuanalysis
+         ./../../common/thorhelper
+         ./../../rtl/include
          ./../../testing/unittests
          ./../../system/security/shared
     )
@@ -60,7 +63,8 @@ target_link_libraries ( wutool
          nbcd 
          eclrtl 
          deftype
-         workunit 
+         workunit
+         wuanalysis
          ${CPPUNIT_LIBRARIES}
     )
     

+ 7 - 1
tools/wutool/wutool.cpp

@@ -30,6 +30,7 @@
 #include "dautils.hpp"
 #include "danqs.hpp"
 #include "dalienv.hpp"
+#include "anawu.hpp"
 
 #ifdef _USE_CPPUNIT
 #include "workunitservices.hpp"
@@ -87,6 +88,7 @@ void usage(const char * action = nullptr)
                "   results <workunits> - Dump results from specified workunits\n"
                "   info <workunits> <filter>\n"
                "                       - Display information from a workunit\n"
+               "   analyse <workunit>  - Analyse the workunit to highlight performance issues\n"
                "\n"
                "   archive <workunits> - Archive to xml files [TO=<directory>] [DEL=1] [DELETERESULTS=1] [INCLUDEFILES=1]\n"
                "   restore <filenames> - Restore from xml files [INCLUDEFILES=1]\n"
@@ -195,6 +197,10 @@ static void process(IConstWorkUnit &w, IProperties *globals, const StringArray &
             printf("%s\n", schema.str());
         }
     }
+    else if (stricmp(action, "analyse")==0)
+    {
+        analyseWorkunit(&w);
+    }
     else if (stricmp(action, "dump")==0)
     {
         StringBuffer xml;
@@ -572,7 +578,7 @@ int main(int argc, const char *argv[])
         {
             usage();
         }
-        else if (strieq(action, "list") || strieq(action, "dump") || strieq(action, "results") || strieq(action, "delete") || strieq(action, "archive") || strieq(action, "info"))
+        else if (strieq(action, "list") || strieq(action, "dump") || strieq(action, "results") || strieq(action, "delete") || strieq(action, "archive") || strieq(action, "info") || strieq(action, "analyse"))
         {
             if (strieq(action, "info") && args.empty())
                 args.append("source[all],properties[all]");