Переглянути джерело

Merge branch 'candidate-5.2.8' into candidate-5.4.0

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 роки тому
батько
коміт
aeb3c435d1

+ 1 - 1
dali/ft/daftformat.cpp

@@ -1064,7 +1064,7 @@ void CUtfPartitioner::storeFieldName(const char * start, unsigned len)
     }
     }
     else
     else
     {
     {
-        fieldName.append("field").append(fieldCount);
+        fieldName.set("field").append(fieldCount);
     }
     }
 
 
     // Check discovered field name uniqueness
     // Check discovered field name uniqueness

+ 28 - 12
plugins/pyembed/pyembed.cpp

@@ -454,6 +454,17 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     return true;
     return true;
 }
 }
 
 
+static void checkThreadContext()
+{
+    if (!threadContext)
+    {
+        if (!globalState.isInitialized())
+            rtlFail(0, "Python not initialized");
+        threadContext = new PythonThreadContext;
+        threadHookChain = addThreadTermFunc(releaseContext);
+    }
+}
+
 PyObject *PythonThreadContext::getNamedTupleType(const RtlTypeInfo *type)
 PyObject *PythonThreadContext::getNamedTupleType(const RtlTypeInfo *type)
 {
 {
     if (!lru || (type!=lrutype))
     if (!lru || (type!=lrutype))
@@ -1178,8 +1189,8 @@ private:
 class PythonRowStream : public CInterfaceOf<IRowStream>
 class PythonRowStream : public CInterfaceOf<IRowStream>
 {
 {
 public:
 public:
-    PythonRowStream(PythonThreadContext *_sharedCtx, PyObject *result, IEngineRowAllocator *_resultAllocator)
-    : sharedCtx(_sharedCtx), resultIterator(NULL)
+    PythonRowStream(PyObject *result, IEngineRowAllocator *_resultAllocator)
+    : resultIterator(NULL)
     {
     {
         // NOTE - the caller should already have the GIL lock before creating me
         // NOTE - the caller should already have the GIL lock before creating me
         if (!result || result == Py_None)
         if (!result || result == Py_None)
@@ -1188,9 +1199,19 @@ public:
         checkPythonError();
         checkPythonError();
         resultAllocator.set(_resultAllocator);
         resultAllocator.set(_resultAllocator);
     }
     }
+    ~PythonRowStream()
+    {
+        if (resultIterator)
+        {
+            checkThreadContext();
+            GILBlock b(threadContext->threadState);
+            resultIterator.clear();
+        }
+    }
     virtual const void *nextRow()
     virtual const void *nextRow()
     {
     {
-        GILBlock b(sharedCtx->threadState);
+        checkThreadContext();
+        GILBlock b(threadContext->threadState);
         if (!resultIterator)
         if (!resultIterator)
             return NULL;
             return NULL;
         OwnedPyObject row = PyIter_Next(resultIterator);
         OwnedPyObject row = PyIter_Next(resultIterator);
@@ -1202,12 +1223,13 @@ public:
     }
     }
     virtual void stop()
     virtual void stop()
     {
     {
+        checkThreadContext();
+        GILBlock b(threadContext->threadState);
         resultAllocator.clear();
         resultAllocator.clear();
         resultIterator.clear();
         resultIterator.clear();
     }
     }
 
 
 protected:
 protected:
-    PythonThreadContext *sharedCtx;
     Linked<IEngineRowAllocator> resultAllocator;
     Linked<IEngineRowAllocator> resultAllocator;
     OwnedPyObject resultIterator;
     OwnedPyObject resultIterator;
 };
 };
@@ -1275,7 +1297,7 @@ public:
     }
     }
     virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
     virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
     {
     {
-        return new PythonRowStream(sharedCtx, result, _resultAllocator);
+        return new PythonRowStream(result, _resultAllocator);
     }
     }
     virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
     virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
     {
     {
@@ -1544,13 +1566,7 @@ public:
     }
     }
     virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
     virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
     {
     {
-        if (!threadContext)
-        {
-            if (!globalState.isInitialized())
-                rtlFail(0, "Python not initialized");
-            threadContext = new PythonThreadContext;
-            threadHookChain = addThreadTermFunc(releaseContext);
-        }
+        checkThreadContext();
         if (flags & EFimport)
         if (flags & EFimport)
             return new Python27EmbedImportContext(threadContext, options);
             return new Python27EmbedImportContext(threadContext, options);
         else
         else

+ 9 - 9
plugins/redis/README.md

@@ -26,7 +26,7 @@ plugin functions will not work when setting with an expiration for a version les
 Getting started
 Getting started
 ---------------
 ---------------
 
 
-The server can be started by typing `redis-server` within a terminal. To run with with a non-default configuration run as `redis-server redis.conf`, where
+The server can be started by typing `redis-server` within a terminal. To run with a non-default configuration run as `redis-server redis.conf`, where
 redis.conf is the configuration file supplied with the redis-server package.
 redis.conf is the configuration file supplied with the redis-server package.
 
 
 For example, to require the server to **password authenticate**, locate and copy redis.conf to a desired dir. Then locate and alter the 'requirepass' variable within the file.
 For example, to require the server to **password authenticate**, locate and copy redis.conf to a desired dir. Then locate and alter the 'requirepass' variable within the file.
@@ -93,7 +93,7 @@ The core points to note here are:
    127.0.0.1:6379 is used. *Note:* 6379 is the default port for **redis-server**.
    127.0.0.1:6379 is used. *Note:* 6379 is the default port for **redis-server**.
    * `UNSIGNED timeout` has units *ms* and has a default value of 1 second. This is not a timeout duration for an entire plugin call but rather that set for each
    * `UNSIGNED timeout` has units *ms* and has a default value of 1 second. This is not a timeout duration for an entire plugin call but rather that set for each
    communication transaction with the redis server. *c.f.* 'Behaviour and Implementation Details' below.
    communication transaction with the redis server. *c.f.* 'Behaviour and Implementation Details' below.
-   * `UNSIGNED expire` has units seconds and a default of **0**, i.e. *forever*.
+   * `UNSIGNED expire` has units *ms* and a default of **0**, i.e. *forever*.
 
 
 ###The redisServer MODULE
 ###The redisServer MODULE
 To avoid the cumbersome and unnecessary need to constantly pass `options` and `password` with each function call, the module `redisServer` can be imported to effectively 
 To avoid the cumbersome and unnecessary need to constantly pass `options` and `password` with each function call, the module `redisServer` can be imported to effectively 
@@ -111,8 +111,8 @@ The notion of a *database* within a redis cache is a that of an UNSIGNED INTEGER
 myRedis.SetString('myKey', 'foo', 0);
 myRedis.SetString('myKey', 'foo', 0);
 myRedis.SetString('myKey', 'bar', 1);
 myRedis.SetString('myKey', 'bar', 1);
 
 
-myRedis.GetString('myKey', 'foo', 0);//returns 'foo'
-myRedis.GetString('myKey', 'bar', 1);//returns 'bar'
+myRedis.GetString('myKey', 0);//returns 'foo'
+myRedis.GetString('myKey', 1);//returns 'bar'
 ```
 ```
 *Note:* that the default database is 0.
 *Note:* that the default database is 0.
 
 
@@ -147,7 +147,7 @@ SEQUENTIAL(
 
 
     //If the key does not exist it will 'lock' the key and retrun an empty STRING.
     //If the key does not exist it will 'lock' the key and retrun an empty STRING.
     STRING value := myRedis.GetOrLockString('supercali- what?');
     STRING value := myRedis.GetOrLockString('supercali- what?');
-    //All locking.Set<type>() return the value passed in as the 2nd parameter.
+    //All SetAndPublish<type>() return the value passed in as the 2nd parameter.
     IF (LENGTH(value) == 0, myRedis.SetAndPublishString('supercali- what?', myFunc('poppins', 3)), value);
     IF (LENGTH(value) == 0, myRedis.SetAndPublishString('supercali- what?', myFunc('poppins', 3)), value);
     );
     );
 ```
 ```
@@ -157,16 +157,16 @@ Behaviour and Implementation Details
 A few notes to point out here:
 A few notes to point out here:
    * PUB-SUB channels are not disconnected from the keyspace as they are in their native redis usage. The key itself is used as the lock with its value being set as the channel to later
    * PUB-SUB channels are not disconnected from the keyspace as they are in their native redis usage. The key itself is used as the lock with its value being set as the channel to later
    PUBLISH on or SUBSCRIBE to. This channel is a string, unique by only the *key* and *database*, prefixed with **'redis_ecl_lock'**.
    PUBLISH on or SUBSCRIBE to. This channel is a string, unique by only the *key* and *database*, prefixed with **'redis_ecl_lock'**.
-   * The lock itself is set to expire with a duration equal to the `timeout` value passed to the `locking.Exists(<key>` function (default 1s).
-   * It is possible to manually 'unlock' this lock (`DELETE` the key) via the `locking.Unlock(<key>)` function. *Note:* this function will fail on any communication or reply error however,
+   * The lock itself is set to expire with a duration equal to the `timeout` value passed to the `GetOrLock<type>` function (default 1s).
+   * It is possible to manually 'unlock' this lock (`DELETE` the key) via the `Unlock(<key>)` function. *Note:* this function will fail on any communication or reply error however,
    it will **silently fail**, leaving the lock to expire, if the server observes any change to the key during the function call duration.
    it will **silently fail**, leaving the lock to expire, if the server observes any change to the key during the function call duration.
    * When the *race-winner* publishes, it actually publishes the value itself and that any subscriber will then obtain the key-value in this fashion. Therefore, not requiring an
    * When the *race-winner* publishes, it actually publishes the value itself and that any subscriber will then obtain the key-value in this fashion. Therefore, not requiring an
     additional `GET` and possible further race conditions in doing so. *Note:* This does however, mean that it is possible for the actual redis `SET` to fail on one client/process,
     additional `GET` and possible further race conditions in doing so. *Note:* This does however, mean that it is possible for the actual redis `SET` to fail on one client/process,
     have the key-value received on another, and yet, the key-value still does not exist on the cache.
     have the key-value received on another, and yet, the key-value still does not exist on the cache.
-   * At present the 'lock' is not as such an actual lock, as only the `locking.Get<type>` functions acknowledge it. By current implementation it is better thought as a flag for
+   * At present the 'lock' is not as such an actual lock, as only the locking functions acknowledge it. By current implementation it is better thought as a flag for
    `GET` to wait and subscribe. I.e. the locked key can be deleted and re-set just as any other key can be.
    `GET` to wait and subscribe. I.e. the locked key can be deleted and re-set just as any other key can be.
    * Since the timeout duration is not for an individual plugin call but instead that waiting for each reply from the server, the actual possible maximum timeout duration differs from
    * Since the timeout duration is not for an individual plugin call but instead that waiting for each reply from the server, the actual possible maximum timeout duration differs from
-     various functions within this plugin, i.e. some functions do more than others. Below is a table for each of the plugin functions (or catagories of) including the maximum possible and
+     various functions within this plugin, i.e. some functions do more than others. Below is a table for each of the plugin functions (or categories of) including the maximum possible and
      nominal expected, where the latter is due to using a cached connection, i.e. neither the server IP, port, nor password have changed from the function called prior to the one in
      nominal expected, where the latter is due to using a cached connection, i.e. neither the server IP, port, nor password have changed from the function called prior to the one in
      question. The values given are multiples of the given timeout.
      question. The values given are multiples of the given timeout.
 
 

+ 8 - 8
plugins/redis/redis.cpp

@@ -52,7 +52,7 @@ static void * allocateAndCopy(const void * src, size_t size)
 static StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
 static StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
 {
 {
     if (expire > 0)
     if (expire > 0)
-        buffer.append(" EX ").append(expire/1000);
+        buffer.append(" PX ").append(expire);
     return buffer;
     return buffer;
 }
 }
 class Reply : public CInterface
 class Reply : public CInterface
@@ -455,7 +455,7 @@ void Connection::persist(ICodeContext * ctx, const char * key)
 }
 }
 void Connection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
 void Connection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
 {
 {
-    OwnedReply reply = Reply::createReply(redisCommand(context, "EXPIRE %b %u", key, strlen(key), _expire/1000));
+    OwnedReply reply = Reply::createReply(redisCommand(context, "PEXPIRE %b %u", key, strlen(key), _expire));
     assertOnCommandErrorWithKey(reply->query(), "Expire", key);
     assertOnCommandErrorWithKey(reply->query(), "Expire", key);
 }
 }
 bool Connection::exists(ICodeContext * ctx, const char * key)
 bool Connection::exists(ICodeContext * ctx, const char * key)
@@ -692,7 +692,7 @@ void Connection::encodeChannel(StringBuffer & channel, const char * key) const
 }
 }
 bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire)
 bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire)
 {
 {
-    StringBuffer cmd("SET %b %b NX EX ");
+    StringBuffer cmd("SET %b %b NX PX ");
     cmd.append(expire);
     cmd.append(expire);
 
 
     OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
     OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
@@ -838,18 +838,18 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
             replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             if (noScript(replyContainer->query()))
             if (noScript(replyContainer->query()))
             {
             {
-                const char * luaScript = "redis.call('SET', KEYS[1], ARGV[2]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";
+                const char * luaScript = "redis.call('SET', KEYS[1], ARGV[2]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptSHA1 if luaScript is updated!
                 replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
                 replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             }
             }
         }
         }
         else
         else
         {
         {
-            const char * luaScriptWithExpireSHA1 = "c68d1706d7dc6342d5fc1d651e238931bd75320d"; //NOTE: update this if luaScriptWithExpire is updated!
-            replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire/1000));
+            const char * luaScriptWithExpireSHA1 = "6f6bc88ccea7c6853ccc395eaa7abd8cb91fb2d8"; //NOTE: update this if luaScriptWithExpire is updated!
+            replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             if (noScript(replyContainer->query()))
             if (noScript(replyContainer->query()))
             {
             {
-                const char * luaScriptWithExpire = "redis.call('SET', KEYS[1], ARGV[2], 'EX', ARGV[3]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";
-                replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire/1000));
+                const char * luaScriptWithExpire = "redis.call('SET', KEYS[1], ARGV[2], 'PX', ARGV[3]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptWithExpireSHA1 if luaScriptWithExpire is updated!
+                replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             }
             }
         }
         }
         assertOnCommandErrorWithKey(replyContainer->query(), "SET", key);
         assertOnCommandErrorWithKey(replyContainer->query(), "SET", key);

+ 9 - 9
roxie/ccd/ccdactivities.cpp

@@ -989,7 +989,7 @@ public:
         {
         {
             bool isOpt = (helper->getFlags() & TDRoptional) != 0;
             bool isOpt = (helper->getFlags() & TDRoptional) != 0;
             OwnedRoxieString fileName(helper->getFileName());
             OwnedRoxieString fileName(helper->getFileName());
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
             {
             {
                 unsigned channel = queryFactory.queryChannel();
                 unsigned channel = queryFactory.queryChannel();
@@ -3075,7 +3075,7 @@ public:
         {
         {
             bool isOpt = (helper->getFlags() & TIRoptional) != 0;
             bool isOpt = (helper->getFlags() & TIRoptional) != 0;
             OwnedRoxieString indexName(helper->getFileName());
             OwnedRoxieString indexName(helper->getFileName());
-            datafile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit()));
+            datafile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
                 keyArray.setown(datafile->getKeyArray(activityMeta, layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
                 keyArray.setown(datafile->getKeyArray(activityMeta, layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
         }
         }
@@ -4276,7 +4276,7 @@ public:
         {
         {
             bool isOpt = (fetchContext->getFetchFlags() & FFdatafileoptional) != 0;
             bool isOpt = (fetchContext->getFetchFlags() & FFdatafileoptional) != 0;
             OwnedRoxieString fname(fetchContext->getFileName());
             OwnedRoxieString fname(fetchContext->getFileName());
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(fname, isOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(fname, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
                 fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
                 fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
         }
         }
@@ -4626,7 +4626,7 @@ public:
         {
         {
             bool isOpt = (helper->getJoinFlags() & JFindexoptional) != 0;
             bool isOpt = (helper->getJoinFlags() & JFindexoptional) != 0;
             OwnedRoxieString indexFileName(helper->getIndexFileName());
             OwnedRoxieString indexFileName(helper->getIndexFileName());
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(indexFileName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(indexFileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
                 keyArray.setown(datafile->getKeyArray(activityMeta, layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
                 keyArray.setown(datafile->getKeyArray(activityMeta, layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
         }
         }
@@ -4967,7 +4967,7 @@ public:
         {
         {
             bool isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
             bool isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
             OwnedRoxieString fileName(helper->getFileName());
             OwnedRoxieString fileName(helper->getFileName());
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
                 fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
                 fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
         }
         }
@@ -5298,17 +5298,17 @@ public:
             {
             {
                 const char *fileName = queryNodeFileName(_graphNode, kind);
                 const char *fileName = queryNodeFileName(_graphNode, kind);
                 const char *indexName = queryNodeIndexName(_graphNode, kind);
                 const char *indexName = queryNodeIndexName(_graphNode, kind);
-                if (indexName && !allFilesDynamic)
+                if (indexName && !allFilesDynamic && !queryFactory.isDynamic())
                 {
                 {
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isIndexOpt']/@value");
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isIndexOpt']/@value");
-                    indexfile.setown(_queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+                    indexfile.setown(_queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
                     if (indexfile)
                     if (indexfile)
                         keyArray.setown(indexfile->getKeyArray(NULL, &layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
                         keyArray.setown(indexfile->getKeyArray(NULL, &layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
                 }
                 }
-                if (fileName && !allFilesDynamic)
+                if (fileName && !allFilesDynamic && !queryFactory.isDynamic())
                 {
                 {
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isOpt']/@value");
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isOpt']/@value");
-                    datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+                    datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
                     if (datafile)
                     if (datafile)
                         fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
                         fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
                 }
                 }

+ 2 - 2
roxie/ccd/ccdcontext.cpp

@@ -3497,11 +3497,11 @@ public:
     {
     {
         CriticalBlock b(contextCrit);
         CriticalBlock b(contextCrit);
         StringBuffer expandedName;
         StringBuffer expandedName;
-        expandLogicalFilename(expandedName, fileName, workUnit, false, !workUnit);
+        expandLogicalFilename(expandedName, fileName, workUnit, false, false);
         Linked<const IResolvedFile> ret = fileCache.getValue(expandedName);
         Linked<const IResolvedFile> ret = fileCache.getValue(expandedName);
         if (!ret)
         if (!ret)
         {
         {
-            ret.setown(factory->queryPackage().lookupFileName(fileName, isOpt, false, false, workUnit));
+            ret.setown(factory->queryPackage().lookupFileName(fileName, isOpt, false, false, workUnit, false));
             if (ret)
             if (ret)
             {
             {
                 IResolvedFile *add = const_cast<IResolvedFile *>(ret.get());
                 IResolvedFile *add = const_cast<IResolvedFile *>(ret.get());

+ 2 - 2
roxie/ccd/ccdquery.cpp

@@ -1121,7 +1121,7 @@ public:
                                     if (indexName)
                                     if (indexName)
                                     {
                                     {
                                         bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isIndexOpt']/@value");
                                         bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isIndexOpt']/@value");
-                                        const IResolvedFile *indexFile = package.lookupFileName(indexName, isOpt, true, true, wu);
+                                        const IResolvedFile *indexFile = package.lookupFileName(indexName, isOpt, true, true, wu, true);
                                         if (indexFile)
                                         if (indexFile)
                                         {
                                         {
                                             hashValue = indexFile->addHash64(hashValue);
                                             hashValue = indexFile->addHash64(hashValue);
@@ -1135,7 +1135,7 @@ public:
                                         if (!node.getPropBool("att[@name='_isSpill']/@value") && !node.getPropBool("att[@name='_isSpillGlobal']/@value"))
                                         if (!node.getPropBool("att[@name='_isSpill']/@value") && !node.getPropBool("att[@name='_isSpillGlobal']/@value"))
                                         {
                                         {
                                             bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isOpt']/@value");
                                             bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isOpt']/@value");
-                                            const IResolvedFile *dataFile = package.lookupFileName(fileName, isOpt, true, true, wu);
+                                            const IResolvedFile *dataFile = package.lookupFileName(fileName, isOpt, true, true, wu, true);
                                             if (dataFile)
                                             if (dataFile)
                                             {
                                             {
                                                 hashValue = dataFile->addHash64(hashValue);
                                                 hashValue = dataFile->addHash64(hashValue);

+ 14 - 14
roxie/ccd/ccdserver.cpp

@@ -21466,7 +21466,7 @@ public:
         {
         {
             bool isOpt = (helper->getFlags() & TDRoptional) != 0;
             bool isOpt = (helper->getFlags() & TDRoptional) != 0;
             OwnedRoxieString fileName(helper->getFileName());
             OwnedRoxieString fileName(helper->getFileName());
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
             bool isSimple = (datafile && datafile->getNumParts()==1 && !_queryFactory.queryOptions().disableLocalOptimizations);
             bool isSimple = (datafile && datafile->getNumParts()==1 && !_queryFactory.queryOptions().disableLocalOptimizations);
             if (isLocal || isSimple)
             if (isLocal || isSimple)
             {
             {
@@ -21544,7 +21544,7 @@ public:
             if ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) == 0)
             if ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) == 0)
             {
             {
                 OwnedRoxieString fileName(helper->getFileName());
                 OwnedRoxieString fileName(helper->getFileName());
-                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit());
+                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit(), true);
                 if (temp)
                 if (temp)
                     addXrefFileInfo(reply, temp);
                     addXrefFileInfo(reply, temp);
             }
             }
@@ -22623,7 +22623,7 @@ public:
         {
         {
             bool isOpt = (flags & TIRoptional) != 0;
             bool isOpt = (flags & TIRoptional) != 0;
             OwnedRoxieString indexName(indexHelper->getFileName());
             OwnedRoxieString indexName(indexHelper->getFileName());
-            indexfile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit()));
+            indexfile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit(), true));
             if (indexfile)
             if (indexfile)
                 keySet.setown(indexfile->getKeyArray(activityMeta, translatorArray, isOpt, isLocal ? queryFactory.queryChannel() : 0, enableFieldTranslation));
                 keySet.setown(indexfile->getKeyArray(activityMeta, translatorArray, isOpt, isLocal ? queryFactory.queryChannel() : 0, enableFieldTranslation));
         }
         }
@@ -22656,7 +22656,7 @@ public:
             if ((indexHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) == 0)
             if ((indexHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) == 0)
             {
             {
                 OwnedRoxieString indexName(indexHelper->getFileName());
                 OwnedRoxieString indexName(indexHelper->getFileName());
-                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(indexName, true, true, false, queryFactory.queryWorkUnit());
+                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(indexName, true, true, false, queryFactory.queryWorkUnit(), true);
                 if (temp)
                 if (temp)
                     addXrefFileInfo(reply, temp);
                     addXrefFileInfo(reply, temp);
             }
             }
@@ -23632,7 +23632,7 @@ public:
             datafile.setown(_queryFactory.queryPackage().lookupFileName(fname,
             datafile.setown(_queryFactory.queryPackage().lookupFileName(fname,
                                                                         (fetchContext->getFetchFlags() & FFdatafileoptional) != 0,
                                                                         (fetchContext->getFetchFlags() & FFdatafileoptional) != 0,
                                                                         true, true,
                                                                         true, true,
-                                                                        _queryFactory.queryWorkUnit()));
+                                                                        _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
                 map.setown(datafile->getFileMap());
                 map.setown(datafile->getFileMap());
         }
         }
@@ -23655,7 +23655,7 @@ public:
             if ((fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) == 0)
             if ((fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) == 0)
             {
             {
                 OwnedRoxieString fileName(fetchContext->getFileName());
                 OwnedRoxieString fileName(fetchContext->getFileName());
-                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit());
+                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit(), true);
                 if (temp)
                 if (temp)
                     addXrefFileInfo(reply, temp);
                     addXrefFileInfo(reply, temp);
             }
             }
@@ -23698,17 +23698,17 @@ public:
             {
             {
                 fileName.set(queryNodeFileName(_graphNode, kind));
                 fileName.set(queryNodeFileName(_graphNode, kind));
                 indexName.set(queryNodeIndexName(_graphNode, kind));
                 indexName.set(queryNodeIndexName(_graphNode, kind));
-                if (indexName && !allFilesDynamic)
+                if (indexName && !allFilesDynamic && !queryFactory.isDynamic())
                 {
                 {
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isIndexOpt']/@value");
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isIndexOpt']/@value");
-                    indexfile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit()));
+                    indexfile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit(), true));
                     if (indexfile)
                     if (indexfile)
                         keySet.setown(indexfile->getKeyArray(NULL, &layoutTranslators, isOpt, isLocal ? queryFactory.queryChannel() : 0, false));
                         keySet.setown(indexfile->getKeyArray(NULL, &layoutTranslators, isOpt, isLocal ? queryFactory.queryChannel() : 0, false));
                 }
                 }
-                if (fileName && !allFilesDynamic)
+                if (fileName && !allFilesDynamic && !queryFactory.isDynamic())
                 {
                 {
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isOpt']/@value");
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isOpt']/@value");
-                    datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, queryFactory.queryWorkUnit()));
+                    datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, queryFactory.queryWorkUnit(), true));
                     if (datafile)
                     if (datafile)
                     {
                     {
                         if (isLocal)
                         if (isLocal)
@@ -23742,13 +23742,13 @@ public:
             Owned<const IResolvedFile> temp;
             Owned<const IResolvedFile> temp;
             if (fileName.length())
             if (fileName.length())
             {
             {
-                temp.setown(queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit()));
+                temp.setown(queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit(), true));
                 if (temp)
                 if (temp)
                     addXrefFileInfo(reply, temp);
                     addXrefFileInfo(reply, temp);
             }
             }
             if (indexName.length())
             if (indexName.length())
             {
             {
-                temp.setown(queryFactory.queryPackage().lookupFileName(indexName, true, true, false, queryFactory.queryWorkUnit()));
+                temp.setown(queryFactory.queryPackage().lookupFileName(indexName, true, true, false, queryFactory.queryWorkUnit(), true));
                 if (temp)
                 if (temp)
                     addXrefFileInfo(reply, temp);
                     addXrefFileInfo(reply, temp);
             }
             }
@@ -25260,7 +25260,7 @@ public:
         {
         {
             bool isOpt = (joinFlags & JFindexoptional) != 0;
             bool isOpt = (joinFlags & JFindexoptional) != 0;
             OwnedRoxieString indexFileName(helper->getIndexFileName());
             OwnedRoxieString indexFileName(helper->getIndexFileName());
-            indexfile.setown(queryFactory.queryPackage().lookupFileName(indexFileName, isOpt, true, true, queryFactory.queryWorkUnit()));
+            indexfile.setown(queryFactory.queryPackage().lookupFileName(indexFileName, isOpt, true, true, queryFactory.queryWorkUnit(), true));
             if (indexfile)
             if (indexfile)
                 keySet.setown(indexfile->getKeyArray(activityMeta, translatorArray, isOpt, isLocal ? queryFactory.queryChannel() : 0, enableFieldTranslation));
                 keySet.setown(indexfile->getKeyArray(activityMeta, translatorArray, isOpt, isLocal ? queryFactory.queryChannel() : 0, enableFieldTranslation));
         }
         }
@@ -25279,7 +25279,7 @@ public:
         if (!isHalfKeyed && !variableFetchFileName)
         if (!isHalfKeyed && !variableFetchFileName)
         {
         {
             bool isFetchOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
             bool isFetchOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(queryNodeFileName(_graphNode, _kind), isFetchOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(queryNodeFileName(_graphNode, _kind), isFetchOpt, true, true, _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
             {
             {
                 if (isLocal)
                 if (isLocal)

+ 3 - 3
roxie/ccd/ccdstate.cpp

@@ -614,7 +614,7 @@ protected:
                 const char *name = super.queryProp("@id");
                 const char *name = super.queryProp("@id");
                 if (name)
                 if (name)
                 {
                 {
-                    const IResolvedFile *resolved = lookupFileName(name, false, true, true, NULL);
+                    const IResolvedFile *resolved = lookupFileName(name, false, true, true, NULL, true);
                     if (resolved)
                     if (resolved)
                     {
                     {
                         files.append(*const_cast<IResolvedFile *>(resolved));
                         files.append(*const_cast<IResolvedFile *>(resolved));
@@ -666,10 +666,10 @@ public:
         return lookupElements(xpath.str(), "MemIndex");
         return lookupElements(xpath.str(), "MemIndex");
     }
     }
 
 
-    virtual const IResolvedFile *lookupFileName(const char *_fileName, bool opt, bool useCache, bool cacheResult, IConstWorkUnit *wu) const
+    virtual const IResolvedFile *lookupFileName(const char *_fileName, bool opt, bool useCache, bool cacheResult, IConstWorkUnit *wu, bool ignoreForeignPrefix) const
     {
     {
         StringBuffer fileName;
         StringBuffer fileName;
-        expandLogicalFilename(fileName, _fileName, wu, false, !wu);
+        expandLogicalFilename(fileName, _fileName, wu, false, ignoreForeignPrefix);
         if (traceLevel > 5)
         if (traceLevel > 5)
             DBGLOG("lookupFileName %s", fileName.str());
             DBGLOG("lookupFileName %s", fileName.str());
 
 

+ 1 - 1
roxie/ccd/ccdstate.hpp

@@ -55,7 +55,7 @@ extern const IRoxiePackageMap &queryEmptyRoxiePackageMap();
 interface IRoxiePackage : public IHpccPackage
 interface IRoxiePackage : public IHpccPackage
 {
 {
     // Lookup information in package to resolve existing logical file name
     // Lookup information in package to resolve existing logical file name
-    virtual const IResolvedFile *lookupFileName(const char *fileName, bool opt, bool useCache, bool cacheResults, IConstWorkUnit *wu) const = 0;
+    virtual const IResolvedFile *lookupFileName(const char *fileName, bool opt, bool useCache, bool cacheResults, IConstWorkUnit *wu, bool ignoreForeignPrefix) const = 0;
     // Lookup information in package to create new logical file name
     // Lookup information in package to create new logical file name
     virtual IRoxieWriteHandler *createFileName(const char *fileName, bool overwrite, bool extend, const StringArray &clusters, IConstWorkUnit *wu) const = 0;
     virtual IRoxieWriteHandler *createFileName(const char *fileName, bool overwrite, bool extend, const StringArray &clusters, IConstWorkUnit *wu) const = 0;
     // Lookup information in package about what in-memory indexes should be built for file
     // Lookup information in package about what in-memory indexes should be built for file

+ 7 - 1
testing/regress/ecl/key/streame.xml

@@ -15,5 +15,11 @@
  <Row><name>Generate:</name><value>9</value></Row>
  <Row><name>Generate:</name><value>9</value></Row>
 </Dataset>
 </Dataset>
 <Dataset name='Result 3'>
 <Dataset name='Result 3'>
- <Row><Result_3>Yo</Result_3></Row>
+ <Row><Result_3>500</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><Result_4>499</Result_4></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><Result_5>Yo</Result_5></Row>
 </Dataset>
 </Dataset>

+ 2 - 2
testing/regress/ecl/redissynctest.ecl

@@ -112,13 +112,13 @@ sleep(INTEGER duration) := Std.System.Debug.Sleep(duration * 1000);
 
 
 SEQUENTIAL(
 SEQUENTIAL(
     myRedis.Exists('str2'),
     myRedis.Exists('str2'),
-    myRedis.Expire('str2', , 1000),/*\ms*/
+    myRedis.Expire('str2', , 900),/*\ms*/
     sleep(2),
     sleep(2),
     myRedis.Exists('str2'),
     myRedis.Exists('str2'),
 
 
     myRedis.SetString('str3', str),
     myRedis.SetString('str3', str),
     myRedis.Exists('str3'),
     myRedis.Exists('str3'),
-    myRedis.Expire('str3', , 1000),/*\ms*/
+    myRedis.Expire('str3', , 900),/*\ms*/
     myRedis.Persist('str3'),
     myRedis.Persist('str3'),
     sleep(2),
     sleep(2),
     myRedis.Exists('str3')
     myRedis.Exists('str3')

+ 5 - 0
testing/regress/ecl/streame.ecl

@@ -65,6 +65,11 @@ ENDEMBED;
 output(streamedNames(d'AA', u'là'));
 output(streamedNames(d'AA', u'là'));
 output (testGenerator(10));
 output (testGenerator(10));
 
 
+// Test what happens when two threads pull from a generator
+c := testGenerator(1000);
+count(c(value < 500));
+count(c(value > 500));
+
 // Test Python code returning named tuples
 // Test Python code returning named tuples
 childrec tnamed(string s) := EMBED(Python)
 childrec tnamed(string s) := EMBED(Python)
   import collections;
   import collections;

+ 59 - 18
thorlcr/thorutil/thmem.cpp

@@ -167,6 +167,7 @@ protected:
     unsigned spillPriority;
     unsigned spillPriority;
     CThorSpillableRowArray rows;
     CThorSpillableRowArray rows;
     OwnedIFile spillFile;
     OwnedIFile spillFile;
+    bool mmRegistered;
 
 
     bool spillRows()
     bool spillRows()
     {
     {
@@ -185,9 +186,21 @@ protected:
         rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
         rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
         return true;
         return true;
     }
     }
-    void clearSpillingCallback()
+    inline void addSpillingCallback()
+    {
+        if (!mmRegistered)
+        {
+            mmRegistered = true;
+            activity.queryJob().queryRowManager()->addRowBuffer(this);
+        }
+    }
+    inline void clearSpillingCallback()
     {
     {
-        activity.queryJob().queryRowManager()->removeRowBuffer(this);
+        if (mmRegistered)
+        {
+            mmRegistered = false;
+            activity.queryJob().queryRowManager()->removeRowBuffer(this);
+        }
     }
     }
 public:
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -195,9 +208,10 @@ public:
     CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriorirty)
     CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriorirty)
         : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls), spillPriority(_spillPriorirty)
         : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls), spillPriority(_spillPriorirty)
     {
     {
-    	assertex(inRows.isFlushed());
+        assertex(inRows.isFlushed());
         rows.swap(inRows);
         rows.swap(inRows);
         useCompression = false;
         useCompression = false;
+        mmRegistered = false;
     }
     }
     ~CSpillableStreamBase()
     ~CSpillableStreamBase()
     {
     {
@@ -213,9 +227,30 @@ public:
     }
     }
     virtual bool freeBufferedRows(bool critical)
     virtual bool freeBufferedRows(bool critical)
     {
     {
+        if (spillFile) // i.e. if spilt already. NB: this is thread-safe, as 'spillFile' only set by spillRows() call below and can't be called on multiple threads concurrently.
+            return false;
         CThorArrayLockBlock block(rows);
         CThorArrayLockBlock block(rows);
         return spillRows();
         return spillRows();
     }
     }
+friend class CRowsLockBlock;
+};
+
+class CRowsLockBlock
+{
+    CSpillableStreamBase &owner;
+public:
+    inline CRowsLockBlock(CSpillableStreamBase &_owner) : owner(_owner)
+    {
+        owner.rows.lock();
+        clearCB = false;
+    }
+    inline ~CRowsLockBlock()
+    {
+        owner.rows.unlock();
+        if (clearCB)
+            owner.clearSpillingCallback();
+    }
+    bool clearCB;
 };
 };
 
 
 // NB: Shared/spillable, holds all rows in mem until needs to spill.
 // NB: Shared/spillable, holds all rows in mem until needs to spill.
@@ -248,10 +283,10 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
         {
         {
             if (spillStream)
             if (spillStream)
                 return spillStream->nextRow();
                 return spillStream->nextRow();
-            CThorArrayLockBlock block(owner->rows);
+            CRowsLockBlock block(*owner);
             if (owner->spillFile) // i.e. has spilt
             if (owner->spillFile) // i.e. has spilt
             {
             {
-                owner->clearSpillingCallback();
+                block.clearCB = true;
                 assertex(((offset_t)-1) != outputOffset);
                 assertex(((offset_t)-1) != outputOffset);
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (owner->preserveNulls)
                 if (owner->preserveNulls)
@@ -262,7 +297,7 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
             }
             }
             else if (pos == owner->rows.numCommitted())
             else if (pos == owner->rows.numCommitted())
             {
             {
-                owner->clearSpillingCallback();
+                block.clearCB = true;
                 return NULL;
                 return NULL;
             }
             }
             return owner->rows.get(pos++);
             return owner->rows.get(pos++);
@@ -286,16 +321,16 @@ public:
     CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority)
     CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority)
         : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls, _spillPriority)
         : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls, _spillPriority)
     {
     {
-        activity.queryJob().queryRowManager()->addRowBuffer(this);
+        addSpillingCallback();
     }
     }
     IRowStream *createRowStream()
     IRowStream *createRowStream()
     {
     {
         {
         {
             // already spilled?
             // already spilled?
-            CThorArrayLockBlock block(rows);
+            CRowsLockBlock block(*this);
             if (spillFile)
             if (spillFile)
             {
             {
-                clearSpillingCallback();
+                block.clearCB = true;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (preserveNulls)
                 if (preserveNulls)
                     rwFlags |= rw_grouped;
                     rwFlags |= rw_grouped;
@@ -326,7 +361,7 @@ public:
         // a small amount of rows to read from swappable rows
         // a small amount of rows to read from swappable rows
         roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
         roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
         readRows = static_cast<const void * *>(rowManager->allocate(granularity * sizeof(void*), activity.queryContainer().queryId(), inRows.queryDefaultMaxSpillCost()));
         readRows = static_cast<const void * *>(rowManager->allocate(granularity * sizeof(void*), activity.queryContainer().queryId(), inRows.queryDefaultMaxSpillCost()));
-        activity.queryJob().queryRowManager()->addRowBuffer(this);
+        addSpillingCallback();
     }
     }
     ~CSpillableStream()
     ~CSpillableStream()
     {
     {
@@ -345,10 +380,10 @@ public:
             return spillStream->nextRow();
             return spillStream->nextRow();
         if (pos == numReadRows)
         if (pos == numReadRows)
         {
         {
-            CThorArrayLockBlock block(rows);
+            CRowsLockBlock block(*this);
             if (spillFile)
             if (spillFile)
             {
             {
-                clearSpillingCallback();
+                block.clearCB = true;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (preserveNulls)
                 if (preserveNulls)
                     rwFlags |= rw_grouped;
                     rwFlags |= rw_grouped;
@@ -359,13 +394,16 @@ public:
             }
             }
             rowidx_t available = rows.numCommitted();
             rowidx_t available = rows.numCommitted();
             if (0 == available)
             if (0 == available)
+            {
+                block.clearCB = true;
                 return NULL;
                 return NULL;
+            }
             rowidx_t fetch = (available >= granularity) ? granularity : available;
             rowidx_t fetch = (available >= granularity) ? granularity : available;
             // consume 'fetch' rows
             // consume 'fetch' rows
             rows.readBlock(readRows, fetch);
             rows.readBlock(readRows, fetch);
             if (available == fetch)
             if (available == fetch)
             {
             {
-                clearSpillingCallback();
+                block.clearCB = true;
                 rows.kill();
                 rows.kill();
             }
             }
             numReadRows = fetch;
             numReadRows = fetch;
@@ -376,7 +414,10 @@ public:
         ++pos;
         ++pos;
         return row;
         return row;
     }
     }
-    virtual void stop() { }
+    virtual void stop()
+    {
+        clearSpillingCallback();
+    }
 };
 };
 
 
 //====
 //====
@@ -773,12 +814,12 @@ bool CThorExpandingRowArray::binaryInsert(const void *row, ICompare &compare, bo
     binary_vec_insert_stable(row, rows, numRows, compare); // takes ownership of row
     binary_vec_insert_stable(row, rows, numRows, compare); // takes ownership of row
     if (dropLast)
     if (dropLast)
     {
     {
-    	// last row falls out, i.e. release last row and don't increment numRows
-    	dbgassertex(numRows); // numRows must be >=1 for dropLast
-    	ReleaseThorRow(rows[numRows]);
+        // last row falls out, i.e. release last row and don't increment numRows
+        dbgassertex(numRows); // numRows must be >=1 for dropLast
+        ReleaseThorRow(rows[numRows]);
     }
     }
     else
     else
-    	++numRows;
+        ++numRows;
     return true;
     return true;
 }
 }