浏览代码

HPCC-15871 Add a mode for standalone Roxie queries to use a local workunit

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 年之前
父节点
当前提交
ad2150237a
共有 5 个文件被更改,包括 90 次插入32 次删除
  1. 10 2
      common/workunit/workunit.cpp
  2. 37 7
      roxie/ccd/ccdlistener.cpp
  3. 38 20
      roxie/ccd/ccdmain.cpp
  4. 4 2
      roxie/ccd/ccdquery.cpp
  5. 1 1
      roxie/ccd/ccdquery.hpp

+ 10 - 2
common/workunit/workunit.cpp

@@ -6767,7 +6767,15 @@ void CLocalWorkUnit::createGraph(const char * name, const char *label, WUGraphTy
 
 IConstWUGraphProgress *CLocalWorkUnit::getGraphProgress(const char *name) const
 {
-    throwUnexpected();   // Should only be used for persisted workunits
+/*    Owned<IRemoteConnection> conn = getProgressConnection();
+    if (conn)
+    {
+        IPTree *progress = conn->queryRoot()->queryPropTree(graphName);
+        if (progress)
+            return new CConstGraphProgress(p->queryName(), graphName, progress);
+    }
+    */
+    return NULL;
 }
 WUGraphState CLocalWorkUnit::queryGraphState(const char *graphName) const
 {
@@ -6787,7 +6795,7 @@ void CLocalWorkUnit::setNodeState(const char *graphName, WUGraphIDType nodeId, W
 }
 IWUGraphStats *CLocalWorkUnit::updateStats(const char *graphName, StatisticCreatorType creatorType, const char * creator, unsigned subgraph) const
 {
-    throwUnexpected();   // Should only be used for persisted workunits
+    return new CWuGraphStats(LINK(p), creatorType, creator, graphName, subgraph);
 }
 
 void CLocalWUGraph::setName(const char *str)

+ 37 - 7
roxie/ccd/ccdlistener.cpp

@@ -21,6 +21,7 @@
 #include "jregexp.hpp"
 
 #include "wujobq.hpp"
+#include "thorplugin.hpp"
 
 #include "ccd.hpp"
 #include "ccdcontext.hpp"
@@ -960,7 +961,9 @@ public:
 
     virtual void runOnce(const char *query)
     {
-        UNIMPLEMENTED;
+        Owned<IPooledThread> worker = createNew();
+        worker->init((void *) query);
+        worker->main();
     }
 
     virtual void noteQuery(IHpccProtocolMsgContext *msgctx, const char *peer, bool failed, unsigned bytesOut, unsigned elapsed, unsigned memused, unsigned slavesReplyLen, bool continuationNeeded)
@@ -1142,22 +1145,42 @@ public:
 
     virtual void main()
     {
-        Owned <IRoxieDaliHelper> daliHelper = connectToDali();
-        Owned<IConstWorkUnit> wu = daliHelper->attachWorkunit(wuid.get(), NULL);
+        assertex(wuid.length());
+        bool standalone = *wuid.str()=='-';
+        Owned<IRoxieDaliHelper> daliHelper;
+        Owned<IConstWorkUnit> wu;
+        Owned<const IQueryDll> dll;
+        if (standalone)
+        {
+            Owned<ILoadedDllEntry> standAloneDll = createExeDllEntry(wuid.get()+1);
+            StringBuffer wuXML;
+            if (getEmbeddedWorkUnitXML(standAloneDll, wuXML))
+            {
+                wu.setown(createLocalWorkUnit(wuXML));
+                dll.setown(createExeQueryDll(wuid.get()+1));
+            }
+        }
+        else
+        {
+            daliHelper.setown(connectToDali());
+            wu.setown(daliHelper->attachWorkunit(wuid.get(), NULL));
+        }
         Owned<StringContextLogger> logctx = new StringContextLogger(wuid.get());
         Owned<IQueryFactory> queryFactory;
         try
         {
             checkWorkunitVersionConsistency(wu);
-            daliHelper->noteWorkunitRunning(wuid.get(), true);
+            if (daliHelper)
+                daliHelper->noteWorkunitRunning(wuid.get(), true);
             if (!wu)
                 throw MakeStringException(ROXIE_DALI_ERROR, "Failed to open workunit %s", wuid.get());
-            queryFactory.setown(createServerQueryFactoryFromWu(wu));
+            queryFactory.setown(createServerQueryFactoryFromWu(wu, dll));
         }
         catch (IException *E)
         {
             reportException(wu, E, *logctx);
-            daliHelper->noteWorkunitRunning(wuid.get(), false);
+            if (daliHelper)
+                daliHelper->noteWorkunitRunning(wuid.get(), false);
             throw;
         }
 #ifndef _DEBUG
@@ -1171,7 +1194,14 @@ public:
         doMain(wu, queryFactory, *logctx);
         sendUnloadMessage(queryFactory->queryHash(), wuid.get(), *logctx);
         queryFactory.clear();
-        daliHelper->noteWorkunitRunning(wuid.get(), false);
+        if (daliHelper)
+            daliHelper->noteWorkunitRunning(wuid.get(), false);
+        if (standalone && traceLevel)
+        {
+            StringBuffer wuXML;
+            exportWorkUnitToXML(wu, wuXML, true, true, true);
+            DBGLOG("%s", wuXML.str());
+        }
         clearKeyStoreCache(false);   // Bit of a kludge - cache should really be smarter
     }
 

+ 38 - 20
roxie/ccd/ccdmain.cpp

@@ -1028,31 +1028,49 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         Owned<IHpccProtocolPluginContext> protocolCtx = new CHpccProtocolPluginCtx();
         if (runOnce)
         {
-            Owned<IHpccProtocolPlugin> protocolPlugin = loadHpccProtocolPlugin(protocolCtx, NULL);
-            Owned<IHpccProtocolListener> roxieServer = protocolPlugin->createListener("runOnce", createRoxieProtocolMsgSink(getNodeAddress(myNodeIndex), 0, 1, false), 0, 0, NULL);
-            try
+            if (globals->hasProp("-wu"))
             {
-                const char *format = globals->queryProp("format");
-                if (!format)
+                Owned<IHpccProtocolListener> roxieServer = createRoxieWorkUnitListener(1, false);
+                try
                 {
-                    if (globals->hasProp("-xml"))
-                        format = "xml";
-                    else if (globals->hasProp("-csv"))
-                        format = "csv";
-                    else if (globals->hasProp("-raw"))
-                        format = "raw";
-                    else
-                        format = "ascii";
+                    VStringBuffer x("-%s", argv[0]);
+                    roxieServer->runOnce(x);
+                    fflush(stdout);  // in windows if output is redirected results don't appear without flushing
+                }
+                catch (IException *E)
+                {
+                    EXCLOG(E);
+                    E->Release();
                 }
-                StringBuffer query;
-                query.appendf("<roxie format='%s'/>", format);
-                roxieServer->runOnce(query.str()); // MORE - should use the wu listener instead I suspect
-                fflush(stdout);  // in windows if output is redirected results don't appear without flushing
             }
-            catch (IException *E)
+            else
             {
-                EXCLOG(E);
-                E->Release();
+                Owned<IHpccProtocolPlugin> protocolPlugin = loadHpccProtocolPlugin(protocolCtx, NULL);
+                Owned<IHpccProtocolListener> roxieServer = protocolPlugin->createListener("runOnce", createRoxieProtocolMsgSink(getNodeAddress(myNodeIndex), 0, 1, false), 0, 0, NULL);
+                try
+                {
+                    const char *format = globals->queryProp("format");
+                    if (!format)
+                    {
+                        if (globals->hasProp("-xml"))
+                            format = "xml";
+                        else if (globals->hasProp("-csv"))
+                            format = "csv";
+                        else if (globals->hasProp("-raw"))
+                            format = "raw";
+                        else
+                            format = "ascii";
+                    }
+                    StringBuffer query;
+                    query.appendf("<roxie format='%s'/>", format);
+                    roxieServer->runOnce(query.str()); // MORE - should use the wu listener instead I suspect
+                    fflush(stdout);  // in windows if output is redirected results don't appear without flushing
+                }
+                catch (IException *E)
+                {
+                    EXCLOG(E);
+                    E->Release();
+                }
             }
         }
         else

+ 4 - 2
roxie/ccd/ccdquery.cpp

@@ -1679,9 +1679,11 @@ extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *
         return new CRoxieServerQueryFactory(id, NULL, dynamic_cast<const IRoxiePackage&>(package), hashValue, NULL, isDynamic);
 }
 
-extern IQueryFactory *createServerQueryFactoryFromWu(IConstWorkUnit *wu)
+extern IQueryFactory *createServerQueryFactoryFromWu(IConstWorkUnit *wu, const IQueryDll *_dll)
 {
-    Owned<const IQueryDll> dll = createWuQueryDll(wu);
+    Linked<const IQueryDll> dll = _dll;
+    if (!dll)
+        dll.setown(createWuQueryDll(wu));
     if (!dll)
         return NULL;
     return createServerQueryFactory(wu->queryWuid(), dll.getClear(), queryRootRoxiePackage(), NULL, true, false); // MORE - if use a constant for id might cache better?

+ 1 - 1
roxie/ccd/ccdquery.hpp

@@ -304,7 +304,7 @@ extern IRecordLayoutTranslator *createRecordLayoutTranslator(const char *logical
 extern IQueryFactory *createServerQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry);
 extern IQueryFactory *createSlaveQueryFactory(const char *id, const IQueryDll *dll, const IRoxiePackage &package, unsigned _channelNo, const IPropertyTree *stateInfo, bool isDynamic, bool forceRetry);
 extern IQueryFactory *getQueryFactory(hash64_t hashvalue, unsigned channel);
-extern IQueryFactory *createServerQueryFactoryFromWu(IConstWorkUnit *wu);
+extern IQueryFactory *createServerQueryFactoryFromWu(IConstWorkUnit *wu, const IQueryDll *_dll);
 extern IQueryFactory *createSlaveQueryFactoryFromWu(IConstWorkUnit *wu, unsigned channelNo);
 extern unsigned checkWorkunitVersionConsistency(const IConstWorkUnit *wu );