瀏覽代碼

HPCC-16140 Roxie fails with assert(factory) error if workunit submitted multiple times

Reduce (massively) the chances of unexpected hash collisions between
cloned workunits submitted close together.

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 8 年之前
父節點
當前提交
011310c7c8
共有 2 個文件被更改,包括 13 次插入7 次删除
  1. 9 5
      roxie/ccd/ccdquery.cpp
  2. 4 2
      roxie/ccd/ccdqueue.cpp

+ 9 - 5
roxie/ccd/ccdquery.cpp

@@ -1070,18 +1070,20 @@ public:
 
     virtual void beforeDispose()
     {
-        SpinBlock b(queriesCrit);
         // NOTE: it's theoretically possible for the final release to happen after a replacement has been inserted into hash table. 
         // So only remove from hash table if what we find there matches the item that is being deleted.
-        CQueryFactory *goer = queryMap.getValue(hashValue+channelNo);
+        hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
+        SpinBlock b(queriesCrit);
+        CQueryFactory *goer = queryMap.getValue(hv);
         if (goer == this)
-            queryMap.remove(hashValue+channelNo);
+            queryMap.remove(hv);
     }
 
     static IQueryFactory *getQueryFactory(hash64_t hashValue, unsigned channelNo)
     {
+        hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
         SpinBlock b(queriesCrit);
-        CQueryFactory *factory = LINK(queryMap.getValue(hashValue+channelNo));
+        CQueryFactory *factory = LINK(queryMap.getValue(hv));
         if (factory && factory->isAlive())
             return factory;
         else
@@ -1157,6 +1159,7 @@ public:
         }
         if (id)
             hashValue = rtlHash64VStr(id, hashValue);
+        hashValue = rtlHash64VStr("Roxie", hashValue);  // Adds some noise into the hash - otherwise adjacent wuids tend to hash very close together
         if (traceLevel > 8)
             DBGLOG("getQueryHash: %s %" I64F "u from id", id, hashValue);
         if (stateInfo)
@@ -1215,8 +1218,9 @@ public:
                 }
             }
         }
+        hash64_t hv = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
         SpinBlock b(queriesCrit);
-        queryMap.setValue(hashValue+channelNo, this);
+        queryMap.setValue(hv, this);
     }
 
     virtual unsigned queryChannel() const

+ 4 - 2
roxie/ccd/ccdqueue.cpp

@@ -673,14 +673,16 @@ void doUnload(IRoxieQueryPacket *packet, const IRoxieContextLogger &logctx)
     unsigned channelNo = header.channel;
     logctx.CTXLOG("Unload received for channel %d", channelNo);
     hash64_t hashValue = header.queryHash;
+    hashValue = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
     SpinBlock b(onDemandQueriesCrit);
-    onDemandQueryCache.remove(hashValue+channelNo);
+    onDemandQueryCache.remove(hashValue);
 }
 
 void cacheOnDemandQuery(hash64_t hashValue, unsigned channelNo, IQueryFactory *query)
 {
+    hashValue = rtlHash64Data(sizeof(channelNo), &channelNo, hashValue);
     SpinBlock b(onDemandQueriesCrit);
-    onDemandQueryCache.setValue(hashValue+channelNo, query);
+    onDemandQueryCache.setValue(hashValue, query);
 }
 
 //=================================================================================