Преглед изворни кода

Merge pull request #7668 from richardkchapman/cassandra-nodestate

HPCC-13899 Add support for graph progress in Cassandra

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman пре 9 година
родитељ
комит
a1e476475d
1 измењених фајлова са 11 додато и 17 уклоњено
  1. 11 17
      common/workunit/workunit.cpp

+ 11 - 17
common/workunit/workunit.cpp

@@ -906,8 +906,7 @@ public:
     }
     IConstWUGraphProgress *getGraphProgress(const char *graphName) const
     {
-        CriticalBlock block(crit);
-        IRemoteConnection *conn = queryProgressConnection();
+        Owned<IRemoteConnection> conn = getProgressConnection();
         if (conn)
         {
             IPTree *progress = conn->queryRoot()->queryPropTree(graphName);
@@ -918,8 +917,7 @@ public:
     }
     virtual WUGraphState queryGraphState(const char *graphName) const
     {
-        CriticalBlock block(crit);
-        IRemoteConnection *conn = queryProgressConnection();
+        Owned<IRemoteConnection> conn = getProgressConnection();
         if (conn)
         {
             IPTree *progress = conn->queryRoot()->queryPropTree(graphName);
@@ -930,8 +928,7 @@ public:
     }
     virtual WUGraphState queryNodeState(const char *graphName, WUGraphIDType nodeId) const
     {
-        CriticalBlock block(crit);
-        IRemoteConnection *conn = queryProgressConnection();
+        Owned<IRemoteConnection> conn = getProgressConnection();
         if (conn)
         {
             IPTree *progress = conn->queryRoot()->queryPropTree(graphName);
@@ -959,8 +956,7 @@ public:
 
     virtual bool getRunningGraph(IStringVal &graphName, WUGraphIDType &subId) const
     {
-        CriticalBlock block(crit);
-        IRemoteConnection *conn = queryProgressConnection();
+        Owned<IRemoteConnection> conn = getProgressConnection();
         if (!conn)
             return false;
         const char *name = conn->queryRoot()->queryProp("Running/@graph");
@@ -978,14 +974,10 @@ public:
     virtual void forceReload()
     {
         synchronized sync(locked); // protect locked workunits (uncommitted writes) from reload
-        StringBuffer wuRoot;
-        getXPath(wuRoot, p->queryName());
-        IRemoteConnection *newconn = querySDS().connect(wuRoot.str(), myProcessSession(), 0, SDS_LOCK_TIMEOUT);
-        if (!newconn)
-            throw MakeStringException(WUERR_ConnectFailed, "Could not connect to workunit %s (deleted?)",p->queryName());
         CriticalBlock block(crit);
         clearCached(true);
-        connection.setown(newconn);
+        connection->reload();
+        progressConnection.clear();
         abortDirty = true;
         p.setown(connection->getRoot());
     }
@@ -1089,7 +1081,7 @@ public:
     }
 
 protected:
-    IRemoteConnection *queryProgressConnection() const
+    IRemoteConnection *getProgressConnection() const
     {
         CriticalBlock block(crit);
         if (!progressConnection)
@@ -1097,16 +1089,18 @@ protected:
             VStringBuffer path("/GraphProgress/%s", queryWuid());
             progressConnection.setown(querySDS().connect(path, myProcessSession(), 0, SDS_LOCK_TIMEOUT)); // Note - we don't lock. The writes are atomic.
         }
-        return progressConnection;
+        return progressConnection.getLink();
     }
     IRemoteConnection *getWritableProgressConnection(const char *graphName) const
     {
+        CriticalBlock block(crit);
+        progressConnection.clear(); // Make sure subsequent reads from this workunit get the changes I am making
         VStringBuffer path("/GraphProgress/%s/%s", queryWuid(), graphName);
         return querySDS().connect(path, myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT);
     }
     IPropertyTree *getGraphProgressTree() const
     {
-        IRemoteConnection *conn = queryProgressConnection();
+        Owned<IRemoteConnection> conn = getProgressConnection();
         if (conn)
         {
             Owned<IPropertyTree> tmp = createPTree("GraphProgress");