Ver código fonte

HPCC-13932 Add version check when connecting to Cassandra

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 anos atrás
pai
commit
4c13242970
2 arquivos alterados com 91 adições e 3 exclusões
  1. 1 0
      common/workunit/wuerror.hpp
  2. 90 3
      plugins/cassandra/cassandrawu.cpp

+ 1 - 0
common/workunit/wuerror.hpp

@@ -50,4 +50,5 @@
 #define WUERR_WorkunitPublished                 5025
 #define WUERR_GraphProgressWriteUnsupported     5026
 #define WUERR_WorkunitPluginError               5027
+#define WUERR_WorkunitVersionMismatch           5028
 #endif

+ 90 - 3
plugins/cassandra/cassandrawu.cpp

@@ -839,6 +839,11 @@ struct CassandraTableInfo
     const CassandraXmlMapping *mappings;
 };
 
+static const int majorVersion = 1;  // If this does not match the value in the repository, you cannot proceed - a conversion tool is needed
+static const int minorVersion = 1;  // If this is less that the value in the repository, we should be fine (but there may be columns we don't know about and thus don't read - and will write as NULL in new rows)
+                                    // If this is greater than the value in the repository, we need to update the repository (using add column) and its version before proceeding
+                                    // Make sure to increment this if any column is ever added below
+
 static const CassandraXmlMapping workunitsMappings [] =
 {
     {"partition", "int", NULL, hashRootNameColumnMapper},
@@ -938,6 +943,19 @@ static const CassandraXmlMapping filesReadSearchMappings [] =
     { NULL, "filesReadSearchValues", "((name), wuid)|CLUSTERING ORDER BY (wuid DESC)", stringColumnMapper}
 };
 
+// The version table is keyed by a partition value because (a) you need to key by something and (b) we can use it to spread the load of
+// version lookups (pick a partition at random).
+// Note that this table must have the same minimum layout on all versions.
+
+static const CassandraXmlMapping versionMappings [] =
+{
+    {"partition", "int", "@partition", intColumnMapper},
+    {"major", "int", "@major", intColumnMapper},
+    {"minor", "int", "@minor", intColumnMapper},
+    {"attributes", "map<text, text>", "@major@minor@partition@", attributeMapColumnMapper},  // name is the suppression list, note trailing @
+    { NULL, "version", "((partition))", stringColumnMapper}
+};
+
 /*
  * Some thoughts on the secondary tables:
  * 1. To support (trailing) wildcards we will need to split the key into two - the leading N chars and the rest. Exactly what N is will depend on the installation size.
@@ -952,7 +970,7 @@ static const CassandraXmlMapping filesReadSearchMappings [] =
 
 // The following describe child tables - all keyed by wuid
 
-enum ChildTablesEnum { WuQueryChild, WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild,ChildTablesSize };
+enum ChildTablesEnum { WuQueryChild, WuExceptionsChild, WuStatisticsChild, WuGraphsChild, WuResultsChild, WuVariablesChild, WuTemporariesChild, WuFilesReadChild, ChildTablesSize };
 
 struct ChildTableInfo
 {
@@ -2046,7 +2064,7 @@ static void lockWuid(Owned<IRemoteConnection> &connection, const char *wuid)
     else
         connection.setown(querySDS().connect(wuRoot.str(), myProcessSession(), RTM_LOCK_WRITE|RTM_CREATE_QUERY, SDS_LOCK_TIMEOUT));
     if (!connection)
-        throw MakeStringException(WUERR_LockFailed, "Failed to get connection for xpath %s", wuRoot.str());
+        throw makeStringExceptionV(WUERR_LockFailed, "Failed to get connection for xpath %s", wuRoot.str());
 }
 
 class CCassandraWorkUnit : public CPersistedWorkUnit
@@ -2778,6 +2796,30 @@ public:
         if (!cluster.queryKeySpace())
             cluster.setKeySpace("hpcc");
         cluster.connect();
+        Owned<IPTree> versionInfo = getVersionInfo();
+        int major = versionInfo->getPropInt("@major", 0);
+        int minor = versionInfo->getPropInt("@minor", 0);
+        if (major && minor)
+        {
+            // Note that if there is no version info at all, we have to assume that the repository is not yet created. We don't fail, otherwise no-one can call createRepository the first time...
+            if (major != majorVersion)
+                throw makeStringExceptionV(WUERR_WorkunitVersionMismatch, "Incompatible workunit repository version (wanted %d.%d, found %d.%d)", majorVersion, minorVersion, major, minor);
+            if (minor != minorVersion)
+            {
+                if (minor < minorVersion)
+                {
+                    DBGLOG("WARNING: repository version %d.%d is older than current version %d.%d - adding required columns", major, minor, majorVersion, minorVersion);
+                    switch (minor)
+                    {
+                    // Add code here to create any columns that we need to to get from version "minor" to expected layout
+                    }
+                }
+                else
+                    DBGLOG("WARNING: repository version %d.%d is newer than current version %d.%d - some columns will not be updated", major, minor, majorVersion, minorVersion);
+            }
+        }
+        else
+            DBGLOG("WARNING: repository version could not be retrieved (repository not yet created?)");
         cacheRetirer.start();
     }
 
@@ -3299,10 +3341,11 @@ public:
         CassandraSession s(cass_session_new());
         CassandraFuture future(cass_session_connect(s, cluster.queryCluster()));
         future.wait("connect without keyspace");
-        VStringBuffer create("CREATE KEYSPACE IF NOT EXISTS %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' } ;", cluster.queryKeySpace()); // MORE - options from props? Not 100% sure if they are appropriate.
+        VStringBuffer create("CREATE KEYSPACE %s WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1' } ;", cluster.queryKeySpace()); // MORE - options from props? Not 100% sure if they are appropriate.
         executeSimpleCommand(s, create);
         s.set(NULL);
         cluster.connect();
+        createVersionTable();
         ensureTable(querySession(), workunitsMappings);
         ensureTable(querySession(), searchMappings);
         ensureTable(querySession(), uniqueSearchMappings);
@@ -3327,6 +3370,50 @@ public:
         return cluster.prepareStatement(query, traceLevel>=2);
     }
 private:
+    void createVersionTable()
+    {
+        StringBuffer schema;
+        executeSimpleCommand(querySession(), describeTable(versionMappings, schema));
+        VStringBuffer versionInfo("<Version major='%d' minor='%d'/>", majorVersion, minorVersion);
+        CassandraBatch versionBatch(cass_batch_new(CASS_BATCH_TYPE_LOGGED));
+        Owned<IPTree> pt = createPTreeFromXMLString(versionInfo);
+        for (int i = 0; i < NUM_PARTITIONS; i++)
+        {
+            pt->setPropInt("@partition", i);
+            simpleXMLtoCassandra(this, versionBatch, versionMappings, pt, NULL);
+        }
+        CassandraFuture futureBatch(cass_session_execute_batch(querySession(), versionBatch));
+        futureBatch.wait("createVersionTable");
+    }
+    IPTree *getVersionInfo()
+    {
+        try
+        {
+            StringBuffer names;
+            StringBuffer tableName;
+            getFieldNames(versionMappings, names, tableName);
+            VStringBuffer selectQuery("select %s from %s where partition=?;", names.str()+1, tableName.str());
+            CassandraStatement select(prepareStatement(selectQuery));
+            select.bindInt32(0, rand_r(&randState) % NUM_PARTITIONS);
+            CassandraFuture future(cass_session_execute(querySession(), select));
+            future.wait("read version");
+            CassandraResult result(cass_future_get_result(future));
+            const CassRow *row = cass_result_first_row(result);
+            if (row)
+                return rowToPTree(NULL, NULL, versionMappings, row);
+        }
+        catch (IException *E)
+        {
+            EXCLOG(E);
+            E->Release();
+        }
+        catch (...)
+        {
+            DBGLOG("WARNING: Unknown exception caught while trying to retrieve Cassandra repository version information");
+        }
+        return createPTreeFromXMLString("<Version/>");
+    }
+
     bool checkWuExists(const char *wuid)
     {
         CassandraStatement statement(prepareStatement("SELECT COUNT(*) FROM workunits where partition=? and wuid=?;"));