瀏覽代碼

HPCC-14331 Means to import and export workunits from Dali and Cassandra

The existing wutest tool has been cleaned up, its commands rationalized,
and I have renamed it to wutool.

I think it should do all that is needed to get workunits out of dali to
XML and from XML to Cassandra (or vice versa should that be required).

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 年之前
父節點
當前提交
53ceed95e5
共有 6 個文件被更改,包括 160 次插入103 次删除
  1. 11 0
      common/workunit/workunit.cpp
  2. 0 1
      ecl/CMakeLists.txt
  3. 9 0
      plugins/cassandra/cassandraembed.cpp
  4. 1 0
      tools/CMakeLists.txt
  5. 7 15
      ecl/wutest/CMakeLists.txt
  6. 132 87
      ecl/wutest/wutest.cpp

+ 11 - 0
common/workunit/workunit.cpp

@@ -3117,6 +3117,17 @@ extern WORKUNIT_API IWorkUnitFactory * getWorkUnitFactory()
     return factory.getLink();
 }
 
+extern WORKUNIT_API IWorkUnitFactory * getDaliWorkUnitFactory()
+{
+    if (!factory)
+    {
+        CriticalBlock b(factoryCrit);
+        if (!factory)   // NOTE - this "double test" paradigm is not guaranteed threadsafe on modern systems/compilers - I think in this instance that is harmless even in the (extremely) unlikely event that it resulted in the setown being called twice.
+            factory.setown(new CDaliWorkUnitFactory());
+    }
+    return factory.getLink();
+}
+
 // A SecureWorkUnitFactory allows the security params to be supplied once to the factory rather than being supplied to each call.
 // They can still be supplied if you want...
 

+ 0 - 1
ecl/CMakeLists.txt

@@ -27,4 +27,3 @@ HPCC_ADD_SUBDIRECTORY (hqlcpp)
 HPCC_ADD_SUBDIRECTORY (hthor)
 HPCC_ADD_SUBDIRECTORY (scheduleadmin "PLATFORM")
 HPCC_ADD_SUBDIRECTORY (schedulectrl)
-HPCC_ADD_SUBDIRECTORY (wutest "PLATFORM")

+ 9 - 0
plugins/cassandra/cassandraembed.cpp

@@ -106,6 +106,7 @@ void CassandraClusterSession::setOptions(const StringArray &options)
     const char *contact_points = "localhost";
     const char *user = "";
     const char *password = "";
+    StringBuffer epText;
     ForEachItemIn(idx, options)
     {
         const char *opt = options.item(idx);
@@ -115,7 +116,15 @@ void CassandraClusterSession::setOptions(const StringArray &options)
             StringBuffer optName(val-opt, opt);
             val++;
             if (stricmp(optName, "contact_points")==0 || stricmp(optName, "server")==0)
+            {
                 contact_points = val;   // Note that lifetime of val is adequate for this to be safe
+                if (contact_points[0]=='.')
+                {
+                    SocketEndpoint ep(contact_points);
+                    ep.getIpText(epText.clear());
+                    contact_points = epText.str();
+                }
+            }
             else if (stricmp(optName, "user")==0)
                 user = val;
             else if (stricmp(optName, "password")==0)

+ 1 - 0
tools/CMakeLists.txt

@@ -31,6 +31,7 @@ HPCC_ADD_SUBDIRECTORY (testsocket "PLATFORM")
 HPCC_ADD_SUBDIRECTORY (swapnode "PLATFORM")
 HPCC_ADD_SUBDIRECTORY (vkey "PLATFORM")
 HPCC_ADD_SUBDIRECTORY (wuget "CLIENTTOOLS")
+HPCC_ADD_SUBDIRECTORY (wutool "PLATFORM")
 HPCC_ADD_SUBDIRECTORY (copyexp "PLATFORM")
 HPCC_ADD_SUBDIRECTORY (genht "PLATFORM")
 

+ 7 - 15
ecl/wutest/CMakeLists.txt

@@ -15,17 +15,17 @@
 ################################################################################
 
 
-# Component: wutest 
+# Component: wutool
 #####################################################
 # Description:
 # ------------
-#    Cmake Input File for wutest
+#    Cmake Input File for wutool
 #####################################################
 
-project( wutest ) 
+project( wutool ) 
 
 set (    SRCS 
-         wutest.cpp 
+         wutool.cpp 
     )
 
 include_directories ( 
@@ -48,13 +48,9 @@ if ( USE_CPPUNIT )
 endif()
 
 ADD_DEFINITIONS( -D_CONSOLE )
-if ( FORCE_WORKUNITS_TO_CASSANDRA )
-  ADD_DEFINITIONS( -DFORCE_WORKUNITS_TO_CASSANDRA )
-endif()
-
 
-HPCC_ADD_EXECUTABLE ( wutest ${SRCS} )
-target_link_libraries ( wutest 
+HPCC_ADD_EXECUTABLE ( wutool ${SRCS} )
+target_link_libraries ( wutool 
          jlib
          remote 
          dalibase 
@@ -67,10 +63,6 @@ target_link_libraries ( wutest
          ${CPPUNIT_LIBRARIES}
     )
     
-if ( FORCE_WORKUNITS_TO_CASSANDRA )
-  target_link_libraries ( wutest cassandraembed )
-endif ()
-
 if ( USE_CPPUNIT )
-  target_link_libraries ( wutest workunitservices )
+  target_link_libraries ( wutool workunitservices )
 endif()

+ 132 - 87
ecl/wutest/wutest.cpp

@@ -19,6 +19,7 @@
 #include "jprop.hpp"
 #include "jptree.hpp"
 #include "jsocket.hpp"
+#include "jutil.hpp"
 #include "workunit.hpp"
 #include "mpbase.hpp"
 #include "dllserver.hpp"
@@ -41,29 +42,33 @@ static unsigned testSize = 1000;
 
 void usage()
 {
-    printf("Usage: WUTEST action [WUID=xxx] [OWNER=xxx]\n\n"
-           "actions supported are:\n"
-           "   list\n"
-           "   dump\n"
-           "   delete\n"
-           "   results\n"
-           "   archive [TO=<directory>] [DEL=1] [KEEPFILERESULTS=1]\n"
-           "   restore [FROM=<directory>]\n"
-           "   pack\n"
-           "   unpack\n"
-           "   validate [fix=1]\n");
+    printf("Usage: wutool action [WUID=xxx] [DALISERVER=ip] [CASSANDRASERVER=ip] [option=value]...\n\n"
+           "Actions supported are:\n"
+           "   list <workunits>    - List workunits\n"
+           "   dump <workunits>    - Dump xml for specified workunits\n"
+           "   delete <workunits>  - Delete workunits\n"
+           "   results <workunits> - Dump results from specified workunits\n"
+           "\n"
+           "   archive <workunits> - Archive to xml files [TO=<directory>] [DEL=1] [KEEPFILERESULTS=1]\n"
+           "   restore <filenames> - Restore from xml files\n"
+            "\n"
+           "   orphans             - Delete orphaned information from store\n"
+           "   cleanup [days=NN]   - Delete workunits older than NN days\n"
+           "   validate [fix=1]    - Check contents of workunit repository for errors\n"
+           "   clear               - Delete entire workunit repository (requires entire=1 repository=1)\n"
+           "   initialize          - Initialize new workunit repository\n"
+            "\n"
+           "<workunits> can be specified on commandline, or can be specified using a filter owner=XXXX. If ommitted,\n"
+           "all workunits will be selected.\n"
+            );
 }
 
-bool dump(IConstWorkUnit &w, IProperties *globals)
+void process(IConstWorkUnit &w, IProperties *globals)
 {
     const char *action = globals->queryProp("#action");
     if (!action || stricmp(action, "list")==0)
     {
-        Owned <IConstWUQuery> query = w.getQuery();
-        SCMStringBuffer queryText;
-        if (query)
-            query->getQueryText(queryText);
-        printf("%-20s %-10s %-10s %-10s %-10s %s\n", w.queryWuid(), w.queryClusterName(), w.queryUser(), w.queryJobName(), w.queryStateDesc(), queryText.str());
+        printf("%-20s %-10s %-10s %-10s %-10s\n", w.queryWuid(), w.queryClusterName(), w.queryUser(), w.queryJobName(), w.queryStateDesc());
     }
     else if (stricmp(action, "results")==0)
     {
@@ -84,18 +89,6 @@ bool dump(IConstWorkUnit &w, IProperties *globals)
         exportWorkUnitToXML(&w, xml, true, false, true);
         printf("%s\n", xml.str());
     }
-    else if (stricmp(action, "temporaries")==0)
-    {
-        Owned<IConstWUResult> r = w.getTemporaryByName("a8QL");
-        printf("%s = %" I64F "d\n", "a8QL", r->getResultInt());
-    }
-    else if (stricmp(action, "get")==0)
-    {
-        Owned<IConstWUQuery> q = w.getQuery();
-        SCMStringBuffer x;
-        q->getQueryText(x);
-        printf("%s\n", x.str());
-    }
     else if (stricmp(action, "delete")==0)
     {
         Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
@@ -119,24 +112,8 @@ bool dump(IConstWorkUnit &w, IProperties *globals)
         else
             printf("archive of %s failed\n", wuid.str());
     }
-    else if (stricmp(action, "getWebServicesInfo")==0)
-    {
-        Owned<IConstWUWebServicesInfo> q = w.getWebServicesInfo();
-        SCMStringBuffer x;
-        q->getInfo(NULL, x);
-        printf("%s\n", x.str());
-    }
-
-    else {
-        usage();
-        return false;
-    }
-    return true;
 }
 
-#ifdef FORCE_WORKUNITS_TO_CASSANDRA
-extern "C" IWorkUnitFactory *createWorkUnitFactory(const IPropertyTree *props);
-#endif
 
 Owned<IProperties> globals;
 
@@ -145,37 +122,71 @@ int main(int argc, const char *argv[])
     int ret = 0;
     InitModuleObjects();
     unsigned count=0;
-    globals.setown(createProperties("WUTEST.INI", true));
+    globals.setown(createProperties("wutool.ini", true));
+    const char *action = NULL;
+    StringArray wuids;
+    StringArray files;
     for (int i = 1; i < argc; i++)
     {
         if (strchr(argv[i],'='))
             globals->loadProp(argv[i]);
+        else if (strchr(argv[i],'.') || strchr(argv[i],PATHSEPCHAR))
+            files.append(argv[i]);
         else if (argv[i][0]=='W' || argv[i][0]=='w')
-            globals->setProp("WUID", argv[i]);
+            wuids.append(argv[i]);
+        else if (action)
+        {
+            usage();
+            _exit(4);
+        }
         else
+        {
+            action = argv[i];
             globals->setProp("#action", argv[i]);
+        }
+    }
+    if (!action)
+    {
+        usage();
+        _exit(4);
     }
-#ifdef FORCE_WORKUNITS_TO_CASSANDRA
     StringBuffer cassandraServer;
+    bool serverSpecified = false;
     if (globals->getProp("CASSANDRASERVER", cassandraServer))
     {
-        // Statically linking to cassandra plugin makes debugging easier (and means can debug simple cassandra workunit interactions without needing dali running)
-        Owned<IPTree> props = createPTreeFromXMLString("<WorkUnitsServer><Option name='server' value='.'/><Option name='randomWuidSuffix' value='4'/><Option name='traceLevel' value='0'/><Option name='keyspace' value='hpcc_test'></Option></WorkUnitsServer>");
-        props->setProp("Option[@name='server']/@value", cassandraServer.str());
-        props->setPropInt("Option[@name='traceLevel']/@value", globals->getPropInt("tracelevel", 0));
-        setWorkUnitFactory(createWorkUnitFactory(props));
+        // If they explicitly specify a cassandra server, use it even if the info in the environment in dali does not indicate there is a cassandra associated
+        // Conversely, if the explicitly specify "none" then don't use cassandra even if there is one specified in the dali environment...
+        if (!strieq(cassandraServer, "none"))
+        {
+            Owned<IPTree> pluginInfo = createPTreeFromXMLString(
+                  "<WorkUnitsServer name='cassandraembed' entrypoint='createWorkUnitFactory'>"
+                    "<Option name='server' value='.'/>"
+                    "<Option name='randomWuidSuffix' value='4'/>"
+                    "<Option name='traceLevel' value='0'/>"
+                    "<Option name='keyspace' value='hpcc'/>"
+                  "</WorkUnitsServer>");
+            pluginInfo->setProp("Option[@name='server']/@value", cassandraServer.str());
+            pluginInfo->setPropInt("Option[@name='traceLevel']/@value", globals->getPropInt("tracelevel", 0));
+            StringBuffer keySpace;
+            if (globals->getProp("CASSANDRA_KEYSPACE", keySpace))
+                pluginInfo->setProp("Option[@name='keyspace']/@value", keySpace.str());
+            setWorkUnitFactory((IWorkUnitFactory *) loadPlugin(pluginInfo));
+            serverSpecified = true;
+        }
     }
-#endif
 
     StringBuffer daliServers;
-    if (!globals->getProp("DALISERVERS", daliServers))
-        daliServers.append(".");
-    if (!strieq(daliServers, "none"))
+    if (globals->getProp("DALISERVER", daliServers))
     {
         Owned<IGroup> serverGroup = createIGroup(daliServers.str(), DALI_SERVER_PORT);
         initClientProcess(serverGroup,DCR_Other);
         setPasswordsFromSDS();
     }
+    else if (!serverSpecified)
+    {
+        printf("No server specified (at least one of daliserver or cassandraserver required)\n");
+        _exit(4);
+    }
     try
     {
         CDateTime cutoff;
@@ -192,14 +203,13 @@ int main(int argc, const char *argv[])
                 cutoff.setDateString(since, NULL);
         }
         Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
-        const char *action = globals->queryProp("#action");
 #ifdef _USE_CPPUNIT
         if (action && (stricmp(action, "-selftest")==0))
         {
             testSize = globals->getPropInt("testSize", 100);
             queryStderrLogMsgHandler()->setMessageFields(MSGFIELD_time | MSGFIELD_milliTime | MSGFIELD_prefix);
             CppUnit::TextUi::TestRunner runner;
-            CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry("WuTest");
+            CppUnit::TestFactoryRegistry &registry = CppUnit::TestFactoryRegistry::getRegistry("WuTool");
             runner.addTest( registry.makeTest() );
             ret = runner.run( "", false );
         }
@@ -344,37 +354,71 @@ int main(int argc, const char *argv[])
             }
 #endif
         }
-        else if (globals->hasProp("WUID"))
+        else if (strieq(action, "restore"))
         {
-            if (action && stricmp(action, "restore")==0)
+            if (wuids.length())
             {
-                StringBuffer from;
-                globals->getProp("FROM", from);
-                if (from.length()==0)
-                    from.append('.');
-                const char *wuid = globals->queryProp("WUID");
-                if (factory->restoreWorkUnit(from.str(),wuid))
-                    printf("restored %s\n", wuid);
-                else
-                    printf("failed to restore %s\n", wuid);
+                usage();
+                exit(4);
             }
-            else {
-                Owned<IConstWorkUnit> w = factory->openWorkUnit(globals->queryProp("WUID"));
-                if (w)
-                    dump(*w, globals);
+            if (!files.length())
+            {
+                printf("One or more workunit files must be specified\n");
+                exit(4);
             }
+            ForEachItemIn(idx, files)
+            {
+                StringBuffer base, wuid, ext;
+                splitFilename(files.item(idx), &base, &base, &wuid, &ext, true);
+                if (streq(ext, "xml"))
+                {
+                    if (base.length()==0)
+                        base.append('.');
+                    if (factory->restoreWorkUnit(base.str(), wuid))
+                        printf("restored %s\n", wuid.str());
+                    else
+                        printf("failed to restore %s\n", wuid.str());
+                }
+                else
+                    printf("Ignoring file %s - extension is not '.xml'\n", files.item(idx));
+            }
+        }
+        else if (strieq(action, "help"))
+        {
+            usage();
         }
-        else 
+        else if (strieq(action, "list") || strieq(action, "dump") || strieq(action, "results") || strieq(action, "delete") || strieq(action, "archive"))
         {
-            Owned<IConstWorkUnitIterator> it = factory->getWorkUnitsByOwner(globals->queryProp("OWNER"));
-            ForEach(*it)
+            if (wuids.length())
             {
-                IConstWorkUnitInfo& wi = it->query();
-                Owned<IConstWorkUnit> w = factory->openWorkUnit(wi.queryWuid());
-                if (!dump(*w, globals))
-                    break;
+                ForEachItemIn(idx, wuids)
+                {
+                    Owned<IConstWorkUnit> w = factory->openWorkUnit(wuids.item(idx));
+                    process(*w, globals);
+                }
             }
-            
+            else
+            {
+                if (strieq(action, "delete") && !globals->getPropBool(("FORCE")))
+                {
+                    printf("Specify FORCE=1 to delete workunits via filter");
+                    _exit(4);
+                }
+                ret = 1;  // Assume there will be no matches
+                Owned<IConstWorkUnitIterator> it = factory->getWorkUnitsByOwner(globals->queryProp("OWNER"));
+                ForEach(*it)
+                {
+                    IConstWorkUnitInfo& wi = it->query();
+                    Owned<IConstWorkUnit> w = factory->openWorkUnit(wi.queryWuid());
+                    process(*w, globals);
+                    ret = 0; // There was at least one match
+                }
+            }
+        }
+        else
+        {
+            usage();
+            _exit(4);
         }
     }
     catch (IException *E)
@@ -382,6 +426,7 @@ int main(int argc, const char *argv[])
         StringBuffer m;
         printf("Error: %s\n", E->errorMessage(m).str());
         E->Release();
+        ret = 2;
     }
     closeDllServer();   
     closeEnvironment(); 
@@ -406,9 +451,9 @@ inline int min(int a, int b)
 {
     return a < b ? a : b;
 }
-class WuTest : public CppUnit::TestFixture
+class WuTool : public CppUnit::TestFixture
 {
-    CPPUNIT_TEST_SUITE(WuTest);
+    CPPUNIT_TEST_SUITE(WuTool);
         CPPUNIT_TEST(testInit);
         CPPUNIT_TEST(testCreate);
         CPPUNIT_TEST(testValidate);
@@ -1774,9 +1819,9 @@ protected:
         rtlFree(result);
     }
 };
-StringArray WuTest::wuids;
+StringArray WuTool::wuids;
 
-CPPUNIT_TEST_SUITE_REGISTRATION( WuTest );
-CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( WuTest, "WuTest" );
+CPPUNIT_TEST_SUITE_REGISTRATION( WuTool );
+CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( WuTool, "WuTool" );
 
 #endif