Bladeren bron

Merge branch 'candidate-5.4.0'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 jaren geleden
bovenliggende
commit
4544e650dc

+ 2 - 2
common/remote/rmtssh.cpp

@@ -99,7 +99,7 @@ class CFRunSSH: public CInterface, implements IFRunSSH
                     break;
                     break;
                 case 's': { // ssh params
                 case 's': { // ssh params
                         bool usepssh = !password.isEmpty();
                         bool usepssh = !password.isEmpty();
-                        cmdbuf.appendf("%s -o LogLevel=ERROR -o StrictHostKeyChecking=%s -o BatchMode=yes ",usepssh?"pssh":"ssh",strict?"yes":"no");
+                        cmdbuf.appendf("%s -o LogLevel=QUIET -o StrictHostKeyChecking=%s -o BatchMode=yes ",usepssh?"pssh":"ssh",strict?"yes":"no");
                         if (!identityfile.isEmpty())
                         if (!identityfile.isEmpty())
                             cmdbuf.appendf("-i %s ",identityfile.get());
                             cmdbuf.appendf("-i %s ",identityfile.get());
                         if (background)
                         if (background)
@@ -400,7 +400,7 @@ public:
             }
             }
             if (cmdline.length()==0) {
             if (cmdline.length()==0) {
                 // ssh
                 // ssh
-                cmdline.appendf("%s -n -o LogLevel=ERROR -o StrictHostKeyChecking=%s ",usepssh?"pssh":"ssh",strict?"yes":"no");
+                cmdline.appendf("%s -n -o LogLevel=QUIET -o StrictHostKeyChecking=%s ",usepssh?"pssh":"ssh",strict?"yes":"no");
                 if (!usepssh)
                 if (!usepssh)
                     cmdline.append("-o BatchMode=yes ");
                     cmdline.append("-o BatchMode=yes ");
                 if (!identityfile.isEmpty())
                 if (!identityfile.isEmpty())

+ 10 - 5
common/remote/sockfile.cpp

@@ -3098,6 +3098,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
         Owned<ISocket> socket;
         Owned<ISocket> socket;
         StringAttr peerName;
         StringAttr peerName;
         Owned<IAuthenticatedUser> user;
         Owned<IAuthenticatedUser> user;
+        MemoryBuffer msg;
         bool selecthandled;
         bool selecthandled;
         size32_t left;
         size32_t left;
         IArrayOf<IFileIO>   openfiles;      // kept in sync with handles
         IArrayOf<IFileIO>   openfiles;      // kept in sync with handles
@@ -3142,6 +3143,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
             }
             }
             parent = _parent;
             parent = _parent;
             left = 0;
             left = 0;
+            msg.setEndian(__BIG_ENDIAN);
             selecthandled = false;
             selecthandled = false;
             touch();
             touch();
         }
         }
@@ -3172,8 +3174,6 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
             size32_t avail = (size32_t)socket->avail_read();
             size32_t avail = (size32_t)socket->avail_read();
             if (avail)
             if (avail)
                 touch();
                 touch();
-            MemoryBuffer msg;
-            msg.setEndian(__BIG_ENDIAN);
             if (left==0)
             if (left==0)
             {
             {
                 try
                 try
@@ -3232,19 +3232,22 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
                     EXCLOG(e,"notifySelected(3)");
                     EXCLOG(e,"notifySelected(3)");
                     e->Release();
                     e->Release();
                     toread = left;
                     toread = left;
+                    msg.clear();
                 }
                 }
             }
             }
             if (TF_TRACE_FULL)
             if (TF_TRACE_FULL)
                 PROGLOG("notifySelected %d,%d",toread,left);
                 PROGLOG("notifySelected %d,%d",toread,left);
-            if ((left!=0)&&(avail==0)) {
+            if ((left!=0)&&(avail==0))
+            {
                 WARNLOG("notifySelected: Closing mid packet, %d remaining", left);
                 WARNLOG("notifySelected: Closing mid packet, %d remaining", left);
                 toread = left;
                 toread = left;
+                msg.clear();
             }
             }
             left -= toread;
             left -= toread;
             if (left==0)
             if (left==0)
             {
             {
                 // DEBUG
                 // DEBUG
-                parent->notify(this, msg);
+                parent->notify(this, msg); // consumes msg
             }
             }
             return false;
             return false;
         }
         }
@@ -3761,6 +3764,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
 
 
         struct cCommandProcessorParams
         struct cCommandProcessorParams
         {
         {
+            cCommandProcessorParams() { msg.setEndian(__BIG_ENDIAN); }
             CRemoteClientHandler *client;
             CRemoteClientHandler *client;
             MemoryBuffer msg;
             MemoryBuffer msg;
         };
         };
@@ -5177,7 +5181,8 @@ public:
             PROGLOG("notify %d", msg.length());
             PROGLOG("notify %d", msg.length());
         if (msg.length())
         if (msg.length())
         {
         {
-            PROGLOG("notify CRemoteClientHandler(%p), msg length=%u", _client, msg.length());
+            if (TF_TRACE_FULL)
+                PROGLOG("notify CRemoteClientHandler(%p), msg length=%u", _client, msg.length());
             cCommandProcessor::cCommandProcessorParams params;
             cCommandProcessor::cCommandProcessorParams params;
             params.client = client.getClear();
             params.client = client.getClear();
             params.msg.swapWith(msg);
             params.msg.swapWith(msg);

+ 4 - 0
dali/dafilesrv/CMakeLists.txt

@@ -20,3 +20,7 @@ include (dafilesrv.cmake)
 if ( PLATFORM )
 if ( PLATFORM )
     include (dafscontrol.cmake)
     include (dafscontrol.cmake)
 endif()
 endif()
+
+include_directories ( 
+         ${CMAKE_BINARY_DIR}
+    )

+ 2 - 0
dali/dafilesrv/dafilesrv.cpp

@@ -18,6 +18,7 @@
 #include "platform.h"
 #include "platform.h"
 #include "portlist.h"
 #include "portlist.h"
 
 
+#include "build-config.h"
 #include "jlib.hpp"
 #include "jlib.hpp"
 #include "jiface.hpp"
 #include "jiface.hpp"
 #include "jutil.hpp"
 #include "jutil.hpp"
@@ -657,6 +658,7 @@ int main(int argc,char **argv)
         lf->beginLogging();
         lf->beginLogging();
     }
     }
 
 
+    PROGLOG("Dafilesrv starting - Build %s", BUILD_TAG);
     PROGLOG("Parallel request limit = %d, throttleDelayMs = %d, throttleCPULimit = %d", parallelRequestLimit, throttleDelayMs, throttleCPULimit);
     PROGLOG("Parallel request limit = %d, throttleDelayMs = %d, throttleCPULimit = %d", parallelRequestLimit, throttleDelayMs, throttleCPULimit);
 
 
     const char * verstring = remoteServerVersionString();
     const char * verstring = remoteServerVersionString();

+ 10 - 4
dali/ft/daftformat.cpp

@@ -670,12 +670,18 @@ CCsvPartitioner::CCsvPartitioner(const FileFormat & _format) : CInputBasePartiti
     if (separator && *separator)
     if (separator && *separator)
         addActionList(matcher, separator, SEPARATOR, &maxElementLength);
         addActionList(matcher, separator, SEPARATOR, &maxElementLength);
 
 
-    addActionList(matcher, format.quote.get() ? format.quote.get() : "\"", QUOTE, &maxElementLength);
+    const char * quote = format.quote.get();
+    addActionList(matcher, quote ? quote : "\"", QUOTE, &maxElementLength);
     addActionList(matcher, format.terminate.get() ? format.terminate.get() : "\\n,\\r\\n", TERMINATOR, &maxElementLength);
     addActionList(matcher, format.terminate.get() ? format.terminate.get() : "\\n,\\r\\n", TERMINATOR, &maxElementLength);
+
     const char * escape = format.escape.get();
     const char * escape = format.escape.get();
     if (escape && *escape)
     if (escape && *escape)
-        addActionList(matcher,  escape, ESCAPE, &maxElementLength);
-
+    {
+        if (quote && (*escape == *quote))
+            LOG(MCdebugProgressDetail, unknownJob, "The quote ('%s') and the escape ('%s') are same, ignore escape.", quote, escape);
+        else
+            addActionList(matcher,  escape, ESCAPE, &maxElementLength);
+    }
     matcher.queryAddEntry(1, " ", WHITESPACE);
     matcher.queryAddEntry(1, " ", WHITESPACE);
     matcher.queryAddEntry(1, "\t", WHITESPACE);
     matcher.queryAddEntry(1, "\t", WHITESPACE);
     recordStructure.append("RECORD\n");
     recordStructure.append("RECORD\n");
@@ -1060,7 +1066,7 @@ void CUtfPartitioner::storeFieldName(const char * start, unsigned len)
     }
     }
     else
     else
     {
     {
-        fieldName.append("field").append(fieldCount);
+        fieldName.set("field").append(fieldCount);
     }
     }
 
 
     // Check discovered field name uniqueness
     // Check discovered field name uniqueness

+ 1 - 1
esp/src/eclwatch/HPCCPlatformWidget.js

@@ -232,7 +232,7 @@ define([
 
 
         checkMonitoring: function (status) {
         checkMonitoring: function (status) {
             if (status) {
             if (status) {
-                domClass.remove("MonitorStatus", status);
+                domClass.remove("MonitorStatus");
                 domClass.add("MonitorStatus", status);
                 domClass.add("MonitorStatus", status);
             }
             }
         },
         },

+ 7 - 1
esp/src/eclwatch/nls/hu/hpcc.js

@@ -207,6 +207,7 @@ define(
     IP: "IP",
     IP: "IP",
     IPAddress: "IP cím",
     IPAddress: "IP cím",
     IsLibrary: "Könyvtár?",
     IsLibrary: "Könyvtár?",
+    IsReplicated: "Készült másodpéldány?",
     JobName: "Munka azonosító",
     JobName: "Munka azonosító",
     Jobname: "Munka azonosító",
     Jobname: "Munka azonosító",
     jsmi: "jkis*",
     jsmi: "jkis*",
@@ -268,6 +269,7 @@ define(
     ModifiedUTCGMT: "Módosítva (UTC/GMT)",
     ModifiedUTCGMT: "Módosítva (UTC/GMT)",
     Modify: "Módosít",
     Modify: "Módosít",
     MonitorEventName: "Monitor esemény név",
     MonitorEventName: "Monitor esemény név",
+    Monitoring: "Felügyelet",
     MonitorShotLimit: "Monitor találatok limitje",
     MonitorShotLimit: "Monitor találatok limitje",
     MonitorSub: "Almonitor",
     MonitorSub: "Almonitor",
     Month: "Hónap",
     Month: "Hónap",
@@ -346,7 +348,9 @@ define(
     Port: "Port",
     Port: "Port",
     Prefix: "Előtag",
     Prefix: "Előtag",
     PrefixPlaceholder: "fájlnév{:hossz}, fájlméret{:[B|L][1-8]}",
     PrefixPlaceholder: "fájlnév{:hossz}, fájlméret{:[B|L][1-8]}",
+    PreserveCompression: "Tartalom tömörtés megőrzése",
     Preview: "Előnézet",
     Preview: "Előnézet",
+    PrimaryMonitoring: "Elsődleges felügyelet",
     Priority: "Prioritás",
     Priority: "Prioritás",
     Process: "Feldolgozás",
     Process: "Feldolgozás",
     ProcessFilter: "Process&nbsp;Filter",
     ProcessFilter: "Process&nbsp;Filter",
@@ -482,6 +486,7 @@ define(
     TargetClusters: "Cél klaszterek",
     TargetClusters: "Cél klaszterek",
     TargetName: "Cél neve",
     TargetName: "Cél neve",
     TargetNamePlaceholder: "valamilyen::logikai::fájlnév",
     TargetNamePlaceholder: "valamilyen::logikai::fájlnév",
+    TargetScope: "Cél hatókör",
     TargetWuid: "Cél/WUID",
     TargetWuid: "Cél/WUID",
     Terminators: "Rekord lezáró jelek",
     Terminators: "Rekord lezáró jelek",
     TestPages: "Tesztlapok",
     TestPages: "Tesztlapok",
@@ -605,7 +610,8 @@ define(
     ZeroLogicalFilesCheckFilter: "Nincs a megadott feltételnek megfelelő adat! (A jó a szűrőfeltétel?)",
     ZeroLogicalFilesCheckFilter: "Nincs a megadott feltételnek megfelelő adat! (A jó a szűrőfeltétel?)",
     Zip: "Zip",
     Zip: "Zip",
     ZippedAnalysisPackage: "Tömörített elemzési csomag",
     ZippedAnalysisPackage: "Tömörített elemzési csomag",
-    Zoom100Pct: "Nagyitás 100%-ra",
+    Zoom: "Nagyítás",
+    Zoom100Pct: "Nagyítás 100%-ra",
     ZoomAll: "Teljes gráf ablakba méretezése",
     ZoomAll: "Teljes gráf ablakba méretezése",
     ZoomWidth: "Gráf nagyítása az ablak szélességére"
     ZoomWidth: "Gráf nagyítása az ablak szélességére"
 })
 })

+ 28 - 12
plugins/pyembed/pyembed.cpp

@@ -454,6 +454,17 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
     return true;
     return true;
 }
 }
 
 
+static void checkThreadContext()
+{
+    if (!threadContext)
+    {
+        if (!globalState.isInitialized())
+            rtlFail(0, "Python not initialized");
+        threadContext = new PythonThreadContext;
+        threadHookChain = addThreadTermFunc(releaseContext);
+    }
+}
+
 PyObject *PythonThreadContext::getNamedTupleType(const RtlTypeInfo *type)
 PyObject *PythonThreadContext::getNamedTupleType(const RtlTypeInfo *type)
 {
 {
     if (!lru || (type!=lrutype))
     if (!lru || (type!=lrutype))
@@ -1178,8 +1189,8 @@ private:
 class PythonRowStream : public CInterfaceOf<IRowStream>
 class PythonRowStream : public CInterfaceOf<IRowStream>
 {
 {
 public:
 public:
-    PythonRowStream(PythonThreadContext *_sharedCtx, PyObject *result, IEngineRowAllocator *_resultAllocator)
-    : sharedCtx(_sharedCtx), resultIterator(NULL)
+    PythonRowStream(PyObject *result, IEngineRowAllocator *_resultAllocator)
+    : resultIterator(NULL)
     {
     {
         // NOTE - the caller should already have the GIL lock before creating me
         // NOTE - the caller should already have the GIL lock before creating me
         if (!result || result == Py_None)
         if (!result || result == Py_None)
@@ -1188,9 +1199,19 @@ public:
         checkPythonError();
         checkPythonError();
         resultAllocator.set(_resultAllocator);
         resultAllocator.set(_resultAllocator);
     }
     }
+    ~PythonRowStream()
+    {
+        if (resultIterator)
+        {
+            checkThreadContext();
+            GILBlock b(threadContext->threadState);
+            resultIterator.clear();
+        }
+    }
     virtual const void *nextRow()
     virtual const void *nextRow()
     {
     {
-        GILBlock b(sharedCtx->threadState);
+        checkThreadContext();
+        GILBlock b(threadContext->threadState);
         if (!resultIterator)
         if (!resultIterator)
             return NULL;
             return NULL;
         OwnedPyObject row = PyIter_Next(resultIterator);
         OwnedPyObject row = PyIter_Next(resultIterator);
@@ -1202,12 +1223,13 @@ public:
     }
     }
     virtual void stop()
     virtual void stop()
     {
     {
+        checkThreadContext();
+        GILBlock b(threadContext->threadState);
         resultAllocator.clear();
         resultAllocator.clear();
         resultIterator.clear();
         resultIterator.clear();
     }
     }
 
 
 protected:
 protected:
-    PythonThreadContext *sharedCtx;
     Linked<IEngineRowAllocator> resultAllocator;
     Linked<IEngineRowAllocator> resultAllocator;
     OwnedPyObject resultIterator;
     OwnedPyObject resultIterator;
 };
 };
@@ -1275,7 +1297,7 @@ public:
     }
     }
     virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
     virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
     {
     {
-        return new PythonRowStream(sharedCtx, result, _resultAllocator);
+        return new PythonRowStream(result, _resultAllocator);
     }
     }
     virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
     virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
     {
     {
@@ -1544,13 +1566,7 @@ public:
     }
     }
     virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
     virtual IEmbedFunctionContext *createFunctionContextEx(ICodeContext * ctx, unsigned flags, const char *options)
     {
     {
-        if (!threadContext)
-        {
-            if (!globalState.isInitialized())
-                rtlFail(0, "Python not initialized");
-            threadContext = new PythonThreadContext;
-            threadHookChain = addThreadTermFunc(releaseContext);
-        }
+        checkThreadContext();
         if (flags & EFimport)
         if (flags & EFimport)
             return new Python27EmbedImportContext(threadContext, options);
             return new Python27EmbedImportContext(threadContext, options);
         else
         else

+ 10 - 10
plugins/redis/README.md

@@ -17,7 +17,7 @@ The redis server and client software can be obtained via either - [binaries](htt
 sudo apt-get install redis-server
 sudo apt-get install redis-server
 ```
 ```
 
 
-*Note:* redis-server 2.6.12 or greater is required to use this plugin as intended. For efficiency, such version requirments are not checked as this is a runtime check only. The use of a
+*Note:* redis-server 2.6.12 or greater is required to use this plugin as intended. For efficiency, such version requirements are not checked as this is a runtime check only. The use of a
 lesser version will result in an exception, normally indicating that either a given command does not exist or that the wrong number of arguments was passed to it. The Set<type>
 lesser version will result in an exception, normally indicating that either a given command does not exist or that the wrong number of arguments was passed to it. The Set<type>
 plugin functions will not work when setting with an expiration for a version less than 2.6.12. In addition, whilst it is possible to use `Expire` with a version less than
 plugin functions will not work when setting with an expiration for a version less than 2.6.12. In addition, whilst it is possible to use `Expire` with a version less than
 2.1.3 it is not advised due to [the change in its semantics](http://redis.io/commands/expire).
 2.1.3 it is not advised due to [the change in its semantics](http://redis.io/commands/expire).
@@ -26,7 +26,7 @@ plugin functions will not work when setting with an expiration for a version les
 Getting started
 Getting started
 ---------------
 ---------------
 
 
-The server can be started by typing `redis-server` within a terminal. To run with with a non-default configuration run as `redis-server redis.conf`, where
+The server can be started by typing `redis-server` within a terminal. To run with a non-default configuration run as `redis-server redis.conf`, where
 redis.conf is the configuration file supplied with the redis-server package.
 redis.conf is the configuration file supplied with the redis-server package.
 
 
 For example, to require the server to **password authenticate**, locate and copy redis.conf to a desired dir. Then locate and alter the 'requirepass' variable within the file.
 For example, to require the server to **password authenticate**, locate and copy redis.conf to a desired dir. Then locate and alter the 'requirepass' variable within the file.
@@ -93,7 +93,7 @@ The core points to note here are:
    127.0.0.1:6379 is used. *Note:* 6379 is the default port for **redis-server**.
    127.0.0.1:6379 is used. *Note:* 6379 is the default port for **redis-server**.
    * `UNSIGNED timeout` has units *ms* and has a default value of 1 second. This is not a timeout duration for an entire plugin call but rather that set for each
    * `UNSIGNED timeout` has units *ms* and has a default value of 1 second. This is not a timeout duration for an entire plugin call but rather that set for each
    communication transaction with the redis server. *c.f.* 'Behaviour and Implementation Details' below.
    communication transaction with the redis server. *c.f.* 'Behaviour and Implementation Details' below.
-   * `UNSIGNED expire` has units seconds and a default of **0**, i.e. *forever*.
+   * `UNSIGNED expire` has units *ms* and a default of **0**, i.e. *forever*.
 
 
 ###The redisServer MODULE
 ###The redisServer MODULE
 To avoid the cumbersome and unnecessary need to constantly pass `options` and `password` with each function call, the module `redisServer` can be imported to effectively 
 To avoid the cumbersome and unnecessary need to constantly pass `options` and `password` with each function call, the module `redisServer` can be imported to effectively 
@@ -111,8 +111,8 @@ The notion of a *database* within a redis cache is a that of an UNSIGNED INTEGER
 myRedis.SetString('myKey', 'foo', 0);
 myRedis.SetString('myKey', 'foo', 0);
 myRedis.SetString('myKey', 'bar', 1);
 myRedis.SetString('myKey', 'bar', 1);
 
 
-myRedis.GetString('myKey', 'foo', 0);//returns 'foo'
-myRedis.GetString('myKey', 'bar', 1);//returns 'bar'
+myRedis.GetString('myKey', 0);//returns 'foo'
+myRedis.GetString('myKey', 1);//returns 'bar'
 ```
 ```
 *Note:* that the default database is 0.
 *Note:* that the default database is 0.
 
 
@@ -147,7 +147,7 @@ SEQUENTIAL(
 
 
     //If the key does not exist it will 'lock' the key and retrun an empty STRING.
     //If the key does not exist it will 'lock' the key and retrun an empty STRING.
     STRING value := myRedis.GetOrLockString('supercali- what?');
     STRING value := myRedis.GetOrLockString('supercali- what?');
-    //All locking.Set<type>() return the value passed in as the 2nd parameter.
+    //All SetAndPublish<type>() return the value passed in as the 2nd parameter.
     IF (LENGTH(value) == 0, myRedis.SetAndPublishString('supercali- what?', myFunc('poppins', 3)), value);
     IF (LENGTH(value) == 0, myRedis.SetAndPublishString('supercali- what?', myFunc('poppins', 3)), value);
     );
     );
 ```
 ```
@@ -157,16 +157,16 @@ Behaviour and Implementation Details
 A few notes to point out here:
 A few notes to point out here:
    * PUB-SUB channels are not disconnected from the keyspace as they are in their native redis usage. The key itself is used as the lock with its value being set as the channel to later
    * PUB-SUB channels are not disconnected from the keyspace as they are in their native redis usage. The key itself is used as the lock with its value being set as the channel to later
    PUBLISH on or SUBSCRIBE to. This channel is a string, unique by only the *key* and *database*, prefixed with **'redis_ecl_lock'**.
    PUBLISH on or SUBSCRIBE to. This channel is a string, unique by only the *key* and *database*, prefixed with **'redis_ecl_lock'**.
-   * The lock itself is set to expire with a duration equal to the `timeout` value passed to the `locking.Exists(<key>` function (default 1s).
-   * It is possible to manually 'unlock' this lock (`DELETE` the key) via the `locking.Unlock(<key>)` function. *Note:* this function will fail on any communication or reply error however,
+   * The lock itself is set to expire with a duration equal to the `timeout` value passed to the `GetOrLock<type>` function (default 1s).
+   * It is possible to manually 'unlock' this lock (`DELETE` the key) via the `Unlock(<key>)` function. *Note:* this function will fail on any communication or reply error however,
    it will **silently fail**, leaving the lock to expire, if the server observes any change to the key during the function call duration.
    it will **silently fail**, leaving the lock to expire, if the server observes any change to the key during the function call duration.
    * When the *race-winner* publishes, it actually publishes the value itself and that any subscriber will then obtain the key-value in this fashion. Therefore, not requiring an
    * When the *race-winner* publishes, it actually publishes the value itself and that any subscriber will then obtain the key-value in this fashion. Therefore, not requiring an
     additional `GET` and possible further race conditions in doing so. *Note:* This does however, mean that it is possible for the actual redis `SET` to fail on one client/process,
     additional `GET` and possible further race conditions in doing so. *Note:* This does however, mean that it is possible for the actual redis `SET` to fail on one client/process,
     have the key-value received on another, and yet, the key-value still does not exist on the cache.
     have the key-value received on another, and yet, the key-value still does not exist on the cache.
-   * At present the 'lock' is not as such an actual lock, as only the `locking.Get<type>` functions acknowledge it. By current implementation it is better thought as a flag for
+   * At present the 'lock' is not as such an actual lock, as only the locking functions acknowledge it. By current implementation it is better thought as a flag for
    `GET` to wait and subscribe. I.e. the locked key can be deleted and re-set just as any other key can be.
    `GET` to wait and subscribe. I.e. the locked key can be deleted and re-set just as any other key can be.
    * Since the timeout duration is not for an individual plugin call but instead that waiting for each reply from the server, the actual possible maximum timeout duration differs from
    * Since the timeout duration is not for an individual plugin call but instead that waiting for each reply from the server, the actual possible maximum timeout duration differs from
-     various functions within this plugin, i.e. some functions do more than others. Below is a table for each of the plugin functions (or catagories of) including the maximum possible and
+     various functions within this plugin, i.e. some functions do more than others. Below is a table for each of the plugin functions (or categories of) including the maximum possible and
      nominal expected, where the latter is due to using a cached connection, i.e. neither the server IP, port, nor password have changed from the function called prior to the one in
      nominal expected, where the latter is due to using a cached connection, i.e. neither the server IP, port, nor password have changed from the function called prior to the one in
      question. The values given are multiples of the given timeout.
      question. The values given are multiples of the given timeout.
 
 

+ 6 - 6
plugins/redis/lib_redis.ecllib

@@ -47,9 +47,9 @@ EXPORT redis := SERVICE : plugin('redis'), namespace('RedisPlugin')
   UNICODE  SetAndPublishUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUChar';
   UNICODE  SetAndPublishUnicode( CONST VARSTRING key, CONST UNICODE value, CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUChar';
   UTF8     SetAndPublishUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUtf8';
   UTF8     SetAndPublishUtf8(    CONST VARSTRING key, CONST UTF8 value,    CONST VARSTRING options, UNSIGNED database = 0, UNSIGNED4 expire = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRSetUtf8';
 
 
-  STRING      GetOrLockString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRGetStr';
-  UNICODE    GetOrLockUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRGetUChar';
-  UTF8          GetOrLockUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,once,context,entrypoint='SyncLockRGetUtf8';
+  STRING      GetOrLockString(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000, UNSIGNED4 expire = 1000) : cpp,once,context,entrypoint='SyncLockRGetStr';
+  UNICODE    GetOrLockUnicode(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000, UNSIGNED4 expire = 1000) : cpp,once,context,entrypoint='SyncLockRGetUChar';
+  UTF8          GetOrLockUtf8(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000, UNSIGNED4 expire = 1000) : cpp,once,context,entrypoint='SyncLockRGetUtf8';
 
 
   Unlock(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,action,context,entrypoint='SyncLockRUnlock';
   Unlock(CONST VARSTRING key, CONST VARSTRING options, UNSIGNED database = 0, CONST VARSTRING password = '', UNSIGNED timeout = 1000) : cpp,action,context,entrypoint='SyncLockRUnlock';
 END;
 END;
@@ -85,9 +85,9 @@ EXPORT RedisServer(VARSTRING options, VARSTRING password = '', UNSIGNED timeout
   EXPORT   SetAndPublishString(VARSTRING key, STRING value,   UNSIGNED database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishString  (key, value, options, database, expire, password, timeout);
   EXPORT   SetAndPublishString(VARSTRING key, STRING value,   UNSIGNED database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishString  (key, value, options, database, expire, password, timeout);
   EXPORT     SetAndPublishUtf8(VARSTRING key, UTF8 value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishUtf8    (key, value, options, database, expire, password, timeout);
   EXPORT     SetAndPublishUtf8(VARSTRING key, UTF8 value,     UNSIGNED database = 0, UNSIGNED4 expire = 0) := redis.SetAndPublishUtf8    (key, value, options, database, expire, password, timeout);
 
 
-  EXPORT  GetOrLockUnicode(VARSTRING key, UNSIGNED database = 0) :=  redis.GetOrLockUnicode(key, options, database, password, timeout);
-  EXPORT   GetOrLockString(VARSTRING key, UNSIGNED database = 0) :=   redis.GetOrLockString(key, options, database, password, timeout);
-  EXPORT     GetOrLockUtf8(VARSTRING key, UNSIGNED database = 0) :=     redis.GetOrLockUtf8(key, options, database, password, timeout);
+  EXPORT  GetOrLockUnicode(VARSTRING key, UNSIGNED database = 0, UNSIGNED4 expire = 1000) :=  redis.GetOrLockUnicode(key, options, database, password, timeout, expire);
+  EXPORT   GetOrLockString(VARSTRING key, UNSIGNED database = 0, UNSIGNED4 expire = 1000) :=   redis.GetOrLockString(key, options, database, password, timeout, expire);
+  EXPORT     GetOrLockUtf8(VARSTRING key, UNSIGNED database = 0, UNSIGNED4 expire = 1000) :=     redis.GetOrLockUtf8(key, options, database, password, timeout, expire);
 
 
   EXPORT Unlock(VARSTRING key, UNSIGNED database = 0) := redis.Unlock(key, options, database, password, timeout);
   EXPORT Unlock(VARSTRING key, UNSIGNED database = 0) := redis.Unlock(key, options, database, password, timeout);
 END;
 END;

+ 25 - 25
plugins/redis/redis.cpp

@@ -52,7 +52,7 @@ static void * allocateAndCopy(const void * src, size_t size)
 static StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
 static StringBuffer & appendExpire(StringBuffer & buffer, unsigned expire)
 {
 {
     if (expire > 0)
     if (expire > 0)
-        buffer.append(" EX ").append(expire/1000);
+        buffer.append(" PX ").append(expire);
     return buffer;
     return buffer;
 }
 }
 class Reply : public CInterface
 class Reply : public CInterface
@@ -102,7 +102,7 @@ public :
 
 
     //-------------------------------LOCKING------------------------------------------------
     //-------------------------------LOCKING------------------------------------------------
     void lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire);
     void lockSet(ICodeContext * ctx, const char * key, size32_t valueSize, const char * value, unsigned expire);
-    void lockGet(ICodeContext * ctx, const char * key, size_t & valueSize, char * & value, const char * password);
+    void lockGet(ICodeContext * ctx, const char * key, size_t & valueSize, char * & value, const char * password, unsigned expire);
     void unlock(ICodeContext * ctx, const char * key);
     void unlock(ICodeContext * ctx, const char * key);
     //--------------------------------------------------------------------------------------
     //--------------------------------------------------------------------------------------
 
 
@@ -134,10 +134,10 @@ protected :
 
 
     //-------------------------------LOCKING------------------------------------------------
     //-------------------------------LOCKING------------------------------------------------
     void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
     void handleLockOnSet(ICodeContext * ctx, const char * key, const char * value, size_t size, unsigned expire);
-    void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password);
+    void handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password, unsigned expire);
     void encodeChannel(StringBuffer & channel, const char * key) const;
     void encodeChannel(StringBuffer & channel, const char * key) const;
     bool noScript(const redisReply * reply) const;
     bool noScript(const redisReply * reply) const;
-    bool lock(ICodeContext * ctx, const char * key, const char * channel);
+    bool lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire);
     //--------------------------------------------------------------------------------------
     //--------------------------------------------------------------------------------------
 
 
 protected :
 protected :
@@ -455,7 +455,7 @@ void Connection::persist(ICodeContext * ctx, const char * key)
 }
 }
 void Connection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
 void Connection::expire(ICodeContext * ctx, const char * key, unsigned _expire)
 {
 {
-    OwnedReply reply = Reply::createReply(redisCommand(context, "EXPIRE %b %u", key, strlen(key), _expire/1000));
+    OwnedReply reply = Reply::createReply(redisCommand(context, "PEXPIRE %b %u", key, strlen(key), _expire));
     assertOnCommandErrorWithKey(reply->query(), "Expire", key);
     assertOnCommandErrorWithKey(reply->query(), "Expire", key);
 }
 }
 bool Connection::exists(ICodeContext * ctx, const char * key)
 bool Connection::exists(ICodeContext * ctx, const char * key)
@@ -672,16 +672,16 @@ void Connection::lockSet(ICodeContext * ctx, const char * key, size32_t valueSiz
 }
 }
 //-------------------------------------------GET-----------------------------------------
 //-------------------------------------------GET-----------------------------------------
 //--OUTER--
 //--OUTER--
-void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, unsigned __int64 database, const char * password, unsigned __int64 _timeout)
+void SyncLockRGet(ICodeContext * ctx, const char * options, const char * key, size_t & returnSize, char * & returnValue, unsigned __int64 database, unsigned expire, const char * password, unsigned __int64 _timeout)
 {
 {
     Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
     Owned<Connection> master = Connection::createConnection(ctx, options, database, password, _timeout);
-    master->lockGet(ctx, key, returnSize, returnValue, password);
+    master->lockGet(ctx, key, returnSize, returnValue, password, expire);
 }
 }
 //--INNER--
 //--INNER--
-void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSize, char * & returnValue, const char * password)
+void Connection::lockGet(ICodeContext * ctx, const char * key, size_t & returnSize, char * & returnValue, const char * password, unsigned expire)
 {
 {
     MemoryAttr retVal;
     MemoryAttr retVal;
-    handleLockOnGet(ctx, key, &retVal, password);
+    handleLockOnGet(ctx, key, &retVal, password, expire);
     returnSize = retVal.length();
     returnSize = retVal.length();
     returnValue = reinterpret_cast<char*>(retVal.detach());
     returnValue = reinterpret_cast<char*>(retVal.detach());
 }
 }
@@ -690,10 +690,10 @@ void Connection::encodeChannel(StringBuffer & channel, const char * key) const
 {
 {
     channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(database);
     channel.append(REDIS_LOCK_PREFIX).append("_").append(key).append("_").append(database);
 }
 }
-bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel)
+bool Connection::lock(ICodeContext * ctx, const char * key, const char * channel, unsigned expire)
 {
 {
-    StringBuffer cmd("SET %b %b NX EX ");
-    cmd.append(timeout/1000);
+    StringBuffer cmd("SET %b %b NX PX ");
+    cmd.append(expire);
 
 
     OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
     OwnedReply reply = Reply::createReply(redisCommand(context, cmd.str(), key, strlen(key), channel, strlen(channel)));
     assertOnError(reply->query(), cmd.append(" of the key '").append(key).append("' failed"));
     assertOnError(reply->query(), cmd.append(" of the key '").append(key).append("' failed"));
@@ -727,7 +727,7 @@ void Connection::unlock(ICodeContext * ctx, const char * key)
     }
     }
     //If the above is aborted, let the lock expire.
     //If the above is aborted, let the lock expire.
 }
 }
-void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password)
+void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAttr * retVal, const char * password, unsigned expire)
 {
 {
     //NOTE: This routine can only return an empty string under one condition, that which indicates to the caller that the key was successfully locked.
     //NOTE: This routine can only return an empty string under one condition, that which indicates to the caller that the key was successfully locked.
 
 
@@ -735,7 +735,7 @@ void Connection::handleLockOnGet(ICodeContext * ctx, const char * key, MemoryAtt
     encodeChannel(channel, key);
     encodeChannel(channel, key);
 
 
     //Query key and set lock if non existent
     //Query key and set lock if non existent
-    if (lock(ctx, key, channel.str()))
+    if (lock(ctx, key, channel.str(), expire))
         return;
         return;
 
 
 #if(0)//Test empty string handling by deleting the lock/value, and thus GET returns REDIS_REPLY_NIL as the reply type and an empty string.
 #if(0)//Test empty string handling by deleting the lock/value, and thus GET returns REDIS_REPLY_NIL as the reply type and an empty string.
@@ -838,18 +838,18 @@ void Connection::handleLockOnSet(ICodeContext * ctx, const char * key, const cha
             replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b", luaScriptSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             if (noScript(replyContainer->query()))
             if (noScript(replyContainer->query()))
             {
             {
-                const char * luaScript = "redis.call('SET', KEYS[1], ARGV[2]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";
+                const char * luaScript = "redis.call('SET', KEYS[1], ARGV[2]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptSHA1 if luaScript is updated!
                 replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
                 replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b", luaScript, strlen(luaScript), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size));
             }
             }
         }
         }
         else
         else
         {
         {
-            const char * luaScriptWithExpireSHA1 = "c68d1706d7dc6342d5fc1d651e238931bd75320d"; //NOTE: update this if luaScriptWithExpire is updated!
-            replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire/1000));
+            const char * luaScriptWithExpireSHA1 = "6f6bc88ccea7c6853ccc395eaa7abd8cb91fb2d8"; //NOTE: update this if luaScriptWithExpire is updated!
+            replyContainer->setClear((redisReply*)redisCommand(context, "EVALSHA %b %d %b %b %b %d", luaScriptWithExpireSHA1, (size_t)40, 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             if (noScript(replyContainer->query()))
             if (noScript(replyContainer->query()))
             {
             {
-                const char * luaScriptWithExpire = "redis.call('SET', KEYS[1], ARGV[2], 'EX', ARGV[3]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";
-                replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire/1000));
+                const char * luaScriptWithExpire = "redis.call('SET', KEYS[1], ARGV[2], 'PX', ARGV[3]) redis.call('PUBLISH', ARGV[1], ARGV[2]) return";//NOTE: MUST update luaScriptWithExpireSHA1 if luaScriptWithExpire is updated!
+                replyContainer->setClear((redisReply*)redisCommand(context, "EVAL %b %d %b %b %b %d", luaScriptWithExpire, strlen(luaScriptWithExpire), 1, key, strlen(key), channel.str(), (size_t)channel.length(), value, size, expire));
             }
             }
         }
         }
         assertOnCommandErrorWithKey(replyContainer->query(), "SET", key);
         assertOnCommandErrorWithKey(replyContainer->query(), "SET", key);
@@ -916,24 +916,24 @@ ECL_REDIS_API void ECL_REDIS_CALL SyncLockRSetUtf8(ICodeContext * ctx, size32_t
     returnValue = (char*)allocateAndCopy(value, valueSize);
     returnValue = (char*)allocateAndCopy(value, valueSize);
 }
 }
 //-------------------------------------GET----------------------------------------
 //-------------------------------------GET----------------------------------------
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetStr(ICodeContext * ctx, size32_t & returnSize, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout, unsigned expire)
 {
 {
     size_t _returnSize;
     size_t _returnSize;
-    SyncLockRGet(ctx, options, key, _returnSize, returnValue, database, password, timeout);
+    SyncLockRGet(ctx, options, key, _returnSize, returnValue, database, expire, password, timeout);
     returnSize = static_cast<size32_t>(_returnSize);
     returnSize = static_cast<size32_t>(_returnSize);
 }
 }
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue,  const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUChar(ICodeContext * ctx, size32_t & returnLength, UChar * & returnValue,  const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout, unsigned expire)
 {
 {
     size_t returnSize;
     size_t returnSize;
     char  * _returnValue;
     char  * _returnValue;
-    SyncLockRGet(ctx, options, key, returnSize, _returnValue, database, password, timeout);
+    SyncLockRGet(ctx, options, key, returnSize, _returnValue, database, expire, password, timeout);
     returnValue = (UChar*)_returnValue;
     returnValue = (UChar*)_returnValue;
     returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
     returnLength = static_cast<size32_t>(returnSize/sizeof(UChar));
 }
 }
-ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
+ECL_REDIS_API void ECL_REDIS_CALL SyncLockRGetUtf8(ICodeContext * ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout, unsigned expire)
 {
 {
     size_t returnSize;
     size_t returnSize;
-    SyncLockRGet(ctx, options, key, returnSize, returnValue, database, password, timeout);
+    SyncLockRGet(ctx, options, key, returnSize, returnValue, database, expire, password, timeout);
     returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
     returnLength = static_cast<size32_t>(rtlUtf8Length(returnSize, returnValue));
 }
 }
 ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)
 ECL_REDIS_API void ECL_REDIS_CALL SyncLockRUnlock(ICodeContext * ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout)

+ 3 - 3
plugins/redis/redis.hpp

@@ -74,9 +74,9 @@ namespace RedisPlugin {
     ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRSetStr  (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
     ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRSetStr  (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, size32_t valueLength, const char * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
     ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRSetUChar(ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
     ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRSetUChar(ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, size32_t valueLength, const UChar * value, const char * options, unsigned __int64 database, unsigned expire, const char * pswd, unsigned timeout);
     //--------------------------GET----------------------------------------
     //--------------------------GET----------------------------------------
-    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetUtf8  (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
-    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetStr   (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
-    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetUChar (ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetUtf8  (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout, unsigned expire);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetStr   (ICodeContext * _ctx, size32_t & returnLength, char * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout, unsigned expire);
+    ECL_REDIS_API void             ECL_REDIS_CALL SyncLockRGetUChar (ICodeContext * _ctx, size32_t & returnLength, UChar * & returnValue, const char * key, const char * options, unsigned __int64 database, const char * pswd, unsigned timeout, unsigned expire);
 
 
     ECL_REDIS_API bool ECL_REDIS_CALL SyncLockRMissThenLock(ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout);
     ECL_REDIS_API bool ECL_REDIS_CALL SyncLockRMissThenLock(ICodeContext * _ctx, const char * key, const char * options, unsigned __int64 database, const char * password, unsigned __int64 timeout);
 }
 }

+ 9 - 9
roxie/ccd/ccdactivities.cpp

@@ -989,7 +989,7 @@ public:
         {
         {
             bool isOpt = (helper->getFlags() & TDRoptional) != 0;
             bool isOpt = (helper->getFlags() & TDRoptional) != 0;
             OwnedRoxieString fileName(helper->getFileName());
             OwnedRoxieString fileName(helper->getFileName());
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
             {
             {
                 unsigned channel = queryFactory.queryChannel();
                 unsigned channel = queryFactory.queryChannel();
@@ -3075,7 +3075,7 @@ public:
         {
         {
             bool isOpt = (helper->getFlags() & TIRoptional) != 0;
             bool isOpt = (helper->getFlags() & TIRoptional) != 0;
             OwnedRoxieString indexName(helper->getFileName());
             OwnedRoxieString indexName(helper->getFileName());
-            datafile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit()));
+            datafile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
                 keyArray.setown(datafile->getKeyArray(activityMeta, layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
                 keyArray.setown(datafile->getKeyArray(activityMeta, layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
         }
         }
@@ -4276,7 +4276,7 @@ public:
         {
         {
             bool isOpt = (fetchContext->getFetchFlags() & FFdatafileoptional) != 0;
             bool isOpt = (fetchContext->getFetchFlags() & FFdatafileoptional) != 0;
             OwnedRoxieString fname(fetchContext->getFileName());
             OwnedRoxieString fname(fetchContext->getFileName());
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(fname, isOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(fname, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
                 fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
                 fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
         }
         }
@@ -4626,7 +4626,7 @@ public:
         {
         {
             bool isOpt = (helper->getJoinFlags() & JFindexoptional) != 0;
             bool isOpt = (helper->getJoinFlags() & JFindexoptional) != 0;
             OwnedRoxieString indexFileName(helper->getIndexFileName());
             OwnedRoxieString indexFileName(helper->getIndexFileName());
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(indexFileName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(indexFileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
                 keyArray.setown(datafile->getKeyArray(activityMeta, layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
                 keyArray.setown(datafile->getKeyArray(activityMeta, layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
         }
         }
@@ -4967,7 +4967,7 @@ public:
         {
         {
             bool isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
             bool isOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
             OwnedRoxieString fileName(helper->getFileName());
             OwnedRoxieString fileName(helper->getFileName());
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
                 fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
                 fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
         }
         }
@@ -5298,17 +5298,17 @@ public:
             {
             {
                 const char *fileName = queryNodeFileName(_graphNode, kind);
                 const char *fileName = queryNodeFileName(_graphNode, kind);
                 const char *indexName = queryNodeIndexName(_graphNode, kind);
                 const char *indexName = queryNodeIndexName(_graphNode, kind);
-                if (indexName && !allFilesDynamic)
+                if (indexName && !allFilesDynamic && !queryFactory.isDynamic())
                 {
                 {
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isIndexOpt']/@value");
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isIndexOpt']/@value");
-                    indexfile.setown(_queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+                    indexfile.setown(_queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
                     if (indexfile)
                     if (indexfile)
                         keyArray.setown(indexfile->getKeyArray(NULL, &layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
                         keyArray.setown(indexfile->getKeyArray(NULL, &layoutTranslators, isOpt, queryFactory.queryChannel(), queryFactory.queryOptions().enableFieldTranslation));
                 }
                 }
-                if (fileName && !allFilesDynamic)
+                if (fileName && !allFilesDynamic && !queryFactory.isDynamic())
                 {
                 {
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isOpt']/@value");
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isOpt']/@value");
-                    datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+                    datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
                     if (datafile)
                     if (datafile)
                         fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
                         fileArray.setown(datafile->getIFileIOArray(isOpt, queryFactory.queryChannel()));
                 }
                 }

+ 2 - 2
roxie/ccd/ccdcontext.cpp

@@ -3428,11 +3428,11 @@ public:
     {
     {
         CriticalBlock b(contextCrit);
         CriticalBlock b(contextCrit);
         StringBuffer expandedName;
         StringBuffer expandedName;
-        expandLogicalFilename(expandedName, fileName, workUnit, false, !workUnit);
+        expandLogicalFilename(expandedName, fileName, workUnit, false, false);
         Linked<const IResolvedFile> ret = fileCache.getValue(expandedName);
         Linked<const IResolvedFile> ret = fileCache.getValue(expandedName);
         if (!ret)
         if (!ret)
         {
         {
-            ret.setown(factory->queryPackage().lookupFileName(fileName, isOpt, false, false, workUnit));
+            ret.setown(factory->queryPackage().lookupFileName(fileName, isOpt, false, false, workUnit, false));
             if (ret)
             if (ret)
             {
             {
                 IResolvedFile *add = const_cast<IResolvedFile *>(ret.get());
                 IResolvedFile *add = const_cast<IResolvedFile *>(ret.get());

+ 2 - 2
roxie/ccd/ccdquery.cpp

@@ -1122,7 +1122,7 @@ public:
                                     if (indexName)
                                     if (indexName)
                                     {
                                     {
                                         bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isIndexOpt']/@value");
                                         bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isIndexOpt']/@value");
-                                        const IResolvedFile *indexFile = package.lookupFileName(indexName, isOpt, true, true, wu);
+                                        const IResolvedFile *indexFile = package.lookupFileName(indexName, isOpt, true, true, wu, true);
                                         if (indexFile)
                                         if (indexFile)
                                         {
                                         {
                                             hashValue = indexFile->addHash64(hashValue);
                                             hashValue = indexFile->addHash64(hashValue);
@@ -1136,7 +1136,7 @@ public:
                                         if (!node.getPropBool("att[@name='_isSpill']/@value") && !node.getPropBool("att[@name='_isSpillGlobal']/@value"))
                                         if (!node.getPropBool("att[@name='_isSpill']/@value") && !node.getPropBool("att[@name='_isSpillGlobal']/@value"))
                                         {
                                         {
                                             bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isOpt']/@value");
                                             bool isOpt = pretendAllOpt || node.getPropBool("att[@name='_isOpt']/@value");
-                                            const IResolvedFile *dataFile = package.lookupFileName(fileName, isOpt, true, true, wu);
+                                            const IResolvedFile *dataFile = package.lookupFileName(fileName, isOpt, true, true, wu, true);
                                             if (dataFile)
                                             if (dataFile)
                                             {
                                             {
                                                 hashValue = dataFile->addHash64(hashValue);
                                                 hashValue = dataFile->addHash64(hashValue);

+ 14 - 14
roxie/ccd/ccdserver.cpp

@@ -20648,7 +20648,7 @@ public:
         {
         {
             bool isOpt = (helper->getFlags() & TDRoptional) != 0;
             bool isOpt = (helper->getFlags() & TDRoptional) != 0;
             OwnedRoxieString fileName(helper->getFileName());
             OwnedRoxieString fileName(helper->getFileName());
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, _queryFactory.queryWorkUnit(), true));
             bool isSimple = (datafile && datafile->getNumParts()==1 && !_queryFactory.queryOptions().disableLocalOptimizations);
             bool isSimple = (datafile && datafile->getNumParts()==1 && !_queryFactory.queryOptions().disableLocalOptimizations);
             if (isLocal || isSimple)
             if (isLocal || isSimple)
             {
             {
@@ -20726,7 +20726,7 @@ public:
             if ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) == 0)
             if ((helper->getFlags() & (TDXvarfilename|TDXdynamicfilename)) == 0)
             {
             {
                 OwnedRoxieString fileName(helper->getFileName());
                 OwnedRoxieString fileName(helper->getFileName());
-                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit());
+                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit(), true);
                 if (temp)
                 if (temp)
                     addXrefFileInfo(reply, temp);
                     addXrefFileInfo(reply, temp);
             }
             }
@@ -21805,7 +21805,7 @@ public:
         {
         {
             bool isOpt = (flags & TIRoptional) != 0;
             bool isOpt = (flags & TIRoptional) != 0;
             OwnedRoxieString indexName(indexHelper->getFileName());
             OwnedRoxieString indexName(indexHelper->getFileName());
-            indexfile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit()));
+            indexfile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit(), true));
             if (indexfile)
             if (indexfile)
                 keySet.setown(indexfile->getKeyArray(activityMeta, translatorArray, isOpt, isLocal ? queryFactory.queryChannel() : 0, enableFieldTranslation));
                 keySet.setown(indexfile->getKeyArray(activityMeta, translatorArray, isOpt, isLocal ? queryFactory.queryChannel() : 0, enableFieldTranslation));
         }
         }
@@ -21838,7 +21838,7 @@ public:
             if ((indexHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) == 0)
             if ((indexHelper->getFlags() & (TIRvarfilename|TIRdynamicfilename)) == 0)
             {
             {
                 OwnedRoxieString indexName(indexHelper->getFileName());
                 OwnedRoxieString indexName(indexHelper->getFileName());
-                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(indexName, true, true, false, queryFactory.queryWorkUnit());
+                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(indexName, true, true, false, queryFactory.queryWorkUnit(), true);
                 if (temp)
                 if (temp)
                     addXrefFileInfo(reply, temp);
                     addXrefFileInfo(reply, temp);
             }
             }
@@ -22814,7 +22814,7 @@ public:
             datafile.setown(_queryFactory.queryPackage().lookupFileName(fname,
             datafile.setown(_queryFactory.queryPackage().lookupFileName(fname,
                                                                         (fetchContext->getFetchFlags() & FFdatafileoptional) != 0,
                                                                         (fetchContext->getFetchFlags() & FFdatafileoptional) != 0,
                                                                         true, true,
                                                                         true, true,
-                                                                        _queryFactory.queryWorkUnit()));
+                                                                        _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
                 map.setown(datafile->getFileMap());
                 map.setown(datafile->getFileMap());
         }
         }
@@ -22837,7 +22837,7 @@ public:
             if ((fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) == 0)
             if ((fetchContext->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) == 0)
             {
             {
                 OwnedRoxieString fileName(fetchContext->getFileName());
                 OwnedRoxieString fileName(fetchContext->getFileName());
-                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit());
+                Owned<const IResolvedFile> temp = queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit(), true);
                 if (temp)
                 if (temp)
                     addXrefFileInfo(reply, temp);
                     addXrefFileInfo(reply, temp);
             }
             }
@@ -22880,17 +22880,17 @@ public:
             {
             {
                 fileName.set(queryNodeFileName(_graphNode, kind));
                 fileName.set(queryNodeFileName(_graphNode, kind));
                 indexName.set(queryNodeIndexName(_graphNode, kind));
                 indexName.set(queryNodeIndexName(_graphNode, kind));
-                if (indexName && !allFilesDynamic)
+                if (indexName && !allFilesDynamic && !queryFactory.isDynamic())
                 {
                 {
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isIndexOpt']/@value");
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isIndexOpt']/@value");
-                    indexfile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit()));
+                    indexfile.setown(queryFactory.queryPackage().lookupFileName(indexName, isOpt, true, true, queryFactory.queryWorkUnit(), true));
                     if (indexfile)
                     if (indexfile)
                         keySet.setown(indexfile->getKeyArray(NULL, &layoutTranslators, isOpt, isLocal ? queryFactory.queryChannel() : 0, false));
                         keySet.setown(indexfile->getKeyArray(NULL, &layoutTranslators, isOpt, isLocal ? queryFactory.queryChannel() : 0, false));
                 }
                 }
-                if (fileName && !allFilesDynamic)
+                if (fileName && !allFilesDynamic && !queryFactory.isDynamic())
                 {
                 {
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isOpt']/@value");
                     bool isOpt = pretendAllOpt || _graphNode.getPropBool("att[@name='_isOpt']/@value");
-                    datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, queryFactory.queryWorkUnit()));
+                    datafile.setown(_queryFactory.queryPackage().lookupFileName(fileName, isOpt, true, true, queryFactory.queryWorkUnit(), true));
                     if (datafile)
                     if (datafile)
                     {
                     {
                         if (isLocal)
                         if (isLocal)
@@ -22924,13 +22924,13 @@ public:
             Owned<const IResolvedFile> temp;
             Owned<const IResolvedFile> temp;
             if (fileName.length())
             if (fileName.length())
             {
             {
-                temp.setown(queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit()));
+                temp.setown(queryFactory.queryPackage().lookupFileName(fileName, true, true, false, queryFactory.queryWorkUnit(), true));
                 if (temp)
                 if (temp)
                     addXrefFileInfo(reply, temp);
                     addXrefFileInfo(reply, temp);
             }
             }
             if (indexName.length())
             if (indexName.length())
             {
             {
-                temp.setown(queryFactory.queryPackage().lookupFileName(indexName, true, true, false, queryFactory.queryWorkUnit()));
+                temp.setown(queryFactory.queryPackage().lookupFileName(indexName, true, true, false, queryFactory.queryWorkUnit(), true));
                 if (temp)
                 if (temp)
                     addXrefFileInfo(reply, temp);
                     addXrefFileInfo(reply, temp);
             }
             }
@@ -24442,7 +24442,7 @@ public:
         {
         {
             bool isOpt = (joinFlags & JFindexoptional) != 0;
             bool isOpt = (joinFlags & JFindexoptional) != 0;
             OwnedRoxieString indexFileName(helper->getIndexFileName());
             OwnedRoxieString indexFileName(helper->getIndexFileName());
-            indexfile.setown(queryFactory.queryPackage().lookupFileName(indexFileName, isOpt, true, true, queryFactory.queryWorkUnit()));
+            indexfile.setown(queryFactory.queryPackage().lookupFileName(indexFileName, isOpt, true, true, queryFactory.queryWorkUnit(), true));
             if (indexfile)
             if (indexfile)
                 keySet.setown(indexfile->getKeyArray(activityMeta, translatorArray, isOpt, isLocal ? queryFactory.queryChannel() : 0, enableFieldTranslation));
                 keySet.setown(indexfile->getKeyArray(activityMeta, translatorArray, isOpt, isLocal ? queryFactory.queryChannel() : 0, enableFieldTranslation));
         }
         }
@@ -24461,7 +24461,7 @@ public:
         if (!isHalfKeyed && !variableFetchFileName)
         if (!isHalfKeyed && !variableFetchFileName)
         {
         {
             bool isFetchOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
             bool isFetchOpt = (helper->getFetchFlags() & FFdatafileoptional) != 0;
-            datafile.setown(_queryFactory.queryPackage().lookupFileName(queryNodeFileName(_graphNode, _kind), isFetchOpt, true, true, _queryFactory.queryWorkUnit()));
+            datafile.setown(_queryFactory.queryPackage().lookupFileName(queryNodeFileName(_graphNode, _kind), isFetchOpt, true, true, _queryFactory.queryWorkUnit(), true));
             if (datafile)
             if (datafile)
             {
             {
                 if (isLocal)
                 if (isLocal)

+ 3 - 3
roxie/ccd/ccdstate.cpp

@@ -614,7 +614,7 @@ protected:
                 const char *name = super.queryProp("@id");
                 const char *name = super.queryProp("@id");
                 if (name)
                 if (name)
                 {
                 {
-                    const IResolvedFile *resolved = lookupFileName(name, false, true, true, NULL);
+                    const IResolvedFile *resolved = lookupFileName(name, false, true, true, NULL, true);
                     if (resolved)
                     if (resolved)
                     {
                     {
                         files.append(*const_cast<IResolvedFile *>(resolved));
                         files.append(*const_cast<IResolvedFile *>(resolved));
@@ -666,10 +666,10 @@ public:
         return lookupElements(xpath.str(), "MemIndex");
         return lookupElements(xpath.str(), "MemIndex");
     }
     }
 
 
-    virtual const IResolvedFile *lookupFileName(const char *_fileName, bool opt, bool useCache, bool cacheResult, IConstWorkUnit *wu) const
+    virtual const IResolvedFile *lookupFileName(const char *_fileName, bool opt, bool useCache, bool cacheResult, IConstWorkUnit *wu, bool ignoreForeignPrefix) const
     {
     {
         StringBuffer fileName;
         StringBuffer fileName;
-        expandLogicalFilename(fileName, _fileName, wu, false, !wu);
+        expandLogicalFilename(fileName, _fileName, wu, false, ignoreForeignPrefix);
         if (traceLevel > 5)
         if (traceLevel > 5)
             DBGLOG("lookupFileName %s", fileName.str());
             DBGLOG("lookupFileName %s", fileName.str());
 
 

+ 1 - 1
roxie/ccd/ccdstate.hpp

@@ -55,7 +55,7 @@ extern const IRoxiePackageMap &queryEmptyRoxiePackageMap();
 interface IRoxiePackage : public IHpccPackage
 interface IRoxiePackage : public IHpccPackage
 {
 {
     // Lookup information in package to resolve existing logical file name
     // Lookup information in package to resolve existing logical file name
-    virtual const IResolvedFile *lookupFileName(const char *fileName, bool opt, bool useCache, bool cacheResults, IConstWorkUnit *wu) const = 0;
+    virtual const IResolvedFile *lookupFileName(const char *fileName, bool opt, bool useCache, bool cacheResults, IConstWorkUnit *wu, bool ignoreForeignPrefix) const = 0;
     // Lookup information in package to create new logical file name
     // Lookup information in package to create new logical file name
     virtual IRoxieWriteHandler *createFileName(const char *fileName, bool overwrite, bool extend, const StringArray &clusters, IConstWorkUnit *wu) const = 0;
     virtual IRoxieWriteHandler *createFileName(const char *fileName, bool overwrite, bool extend, const StringArray &clusters, IConstWorkUnit *wu) const = 0;
     // Lookup information in package about what in-memory indexes should be built for file
     // Lookup information in package about what in-memory indexes should be built for file

+ 1 - 1
system/mp/mpcomm.cpp

@@ -1816,7 +1816,7 @@ CMPConnectThread::CMPConnectThread(CMPServer *_parent, unsigned port)
             WARNLOG("MP: kernel listen queue backlog setting (somaxconn=%d) is lower than environment mpSoMaxConn (%d) setting and should be increased", kernSoMaxConn, mpSoMaxConn);
             WARNLOG("MP: kernel listen queue backlog setting (somaxconn=%d) is lower than environment mpSoMaxConn (%d) setting and should be increased", kernSoMaxConn, mpSoMaxConn);
     }
     }
     if (!mpSoMaxConn)
     if (!mpSoMaxConn)
-        mpSoMaxConn = SOMAXCONN;
+        mpSoMaxConn = DEFAULT_LISTEN_QUEUE_SIZE;
     if (!port)
     if (!port)
     {
     {
         // need to connect early to resolve clash
         // need to connect early to resolve clash

+ 7 - 1
testing/regress/ecl/key/streame.xml

@@ -15,5 +15,11 @@
  <Row><name>Generate:</name><value>9</value></Row>
  <Row><name>Generate:</name><value>9</value></Row>
 </Dataset>
 </Dataset>
 <Dataset name='Result 3'>
 <Dataset name='Result 3'>
- <Row><Result_3>Yo</Result_3></Row>
+ <Row><Result_3>500</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><Result_4>499</Result_4></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><Result_5>Yo</Result_5></Row>
 </Dataset>
 </Dataset>

+ 1 - 1
testing/regress/ecl/redislockingtest.ecl

@@ -114,7 +114,7 @@ SEQUENTIAL(
 //Test unlock
 //Test unlock
 SEQUENTIAL(
 SEQUENTIAL(
     myRedis.FlushDB(),
     myRedis.FlushDB(),
-    myRedis.GetOrLockString('testlock'),/*by default lock expires after 1s*/
+    myRedis.GetOrLockString('testlock',, 1000),
     myRedis.Exists('testlock'),
     myRedis.Exists('testlock'),
     Std.System.Debug.Sleep(2000),
     Std.System.Debug.Sleep(2000),
     myRedis.Exists('testlock'),
     myRedis.Exists('testlock'),

+ 2 - 2
testing/regress/ecl/redissynctest.ecl

@@ -115,13 +115,13 @@ sleep(INTEGER duration) := Std.System.Debug.Sleep(duration * 1000);
 
 
 SEQUENTIAL(
 SEQUENTIAL(
     myRedis.Exists('str2'),
     myRedis.Exists('str2'),
-    myRedis.Expire('str2', , 1000),/*\ms*/
+    myRedis.Expire('str2', , 900),/*\ms*/
     sleep(2),
     sleep(2),
     myRedis.Exists('str2'),
     myRedis.Exists('str2'),
 
 
     myRedis.SetString('str3', str),
     myRedis.SetString('str3', str),
     myRedis.Exists('str3'),
     myRedis.Exists('str3'),
-    myRedis.Expire('str3', , 1000),/*\ms*/
+    myRedis.Expire('str3', , 900),/*\ms*/
     myRedis.Persist('str3'),
     myRedis.Persist('str3'),
     sleep(2),
     sleep(2),
     myRedis.Exists('str3')
     myRedis.Exists('str3')

+ 5 - 0
testing/regress/ecl/streame.ecl

@@ -67,6 +67,11 @@ ENDEMBED;
 output(streamedNames(d'AA', u'là'));
 output(streamedNames(d'AA', u'là'));
 output (testGenerator(10));
 output (testGenerator(10));
 
 
+// Test what happens when two threads pull from a generator
+c := testGenerator(1000);
+count(c(value < 500));
+count(c(value > 500));
+
 // Test Python code returning named tuples
 // Test Python code returning named tuples
 childrec tnamed(string s) := EMBED(Python)
 childrec tnamed(string s) := EMBED(Python)
   import collections;
   import collections;

+ 58 - 17
thorlcr/thorutil/thmem.cpp

@@ -167,6 +167,7 @@ protected:
     unsigned spillPriority;
     unsigned spillPriority;
     CThorSpillableRowArray rows;
     CThorSpillableRowArray rows;
     OwnedIFile spillFile;
     OwnedIFile spillFile;
+    bool mmRegistered;
 
 
     bool spillRows()
     bool spillRows()
     {
     {
@@ -185,9 +186,21 @@ protected:
         rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
         rows.kill(); // no longer needed, readers will pull from spillFile. NB: ok to kill array as rows is never written to or expanded
         return true;
         return true;
     }
     }
-    void clearSpillingCallback()
+    inline void addSpillingCallback()
+    {
+        if (!mmRegistered)
+        {
+            mmRegistered = true;
+            activity.queryJob().queryRowManager()->addRowBuffer(this);
+        }
+    }
+    inline void clearSpillingCallback()
     {
     {
-        activity.queryJob().queryRowManager()->removeRowBuffer(this);
+        if (mmRegistered)
+        {
+            mmRegistered = false;
+            activity.queryJob().queryRowManager()->removeRowBuffer(this);
+        }
     }
     }
 public:
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -195,9 +208,10 @@ public:
     CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriorirty)
     CSpillableStreamBase(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriorirty)
         : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls), spillPriority(_spillPriorirty)
         : activity(_activity), rowIf(_rowIf), rows(_activity, _rowIf, _preserveNulls), preserveNulls(_preserveNulls), spillPriority(_spillPriorirty)
     {
     {
-    	assertex(inRows.isFlushed());
+        assertex(inRows.isFlushed());
         rows.swap(inRows);
         rows.swap(inRows);
         useCompression = false;
         useCompression = false;
+        mmRegistered = false;
     }
     }
     ~CSpillableStreamBase()
     ~CSpillableStreamBase()
     {
     {
@@ -213,9 +227,30 @@ public:
     }
     }
     virtual bool freeBufferedRows(bool critical)
     virtual bool freeBufferedRows(bool critical)
     {
     {
+        if (spillFile) // i.e. if spilt already. NB: this is thread-safe, as 'spillFile' only set by spillRows() call below and can't be called on multiple threads concurrently.
+            return false;
         CThorArrayLockBlock block(rows);
         CThorArrayLockBlock block(rows);
         return spillRows();
         return spillRows();
     }
     }
+friend class CRowsLockBlock;
+};
+
+class CRowsLockBlock
+{
+    CSpillableStreamBase &owner;
+public:
+    inline CRowsLockBlock(CSpillableStreamBase &_owner) : owner(_owner)
+    {
+        owner.rows.lock();
+        clearCB = false;
+    }
+    inline ~CRowsLockBlock()
+    {
+        owner.rows.unlock();
+        if (clearCB)
+            owner.clearSpillingCallback();
+    }
+    bool clearCB;
 };
 };
 
 
 // NB: Shared/spillable, holds all rows in mem until needs to spill.
 // NB: Shared/spillable, holds all rows in mem until needs to spill.
@@ -248,10 +283,10 @@ class CSharedSpillableRowSet : public CSpillableStreamBase, implements IInterfac
         {
         {
             if (spillStream)
             if (spillStream)
                 return spillStream->nextRow();
                 return spillStream->nextRow();
-            CThorArrayLockBlock block(owner->rows);
+            CRowsLockBlock block(*owner);
             if (owner->spillFile) // i.e. has spilt
             if (owner->spillFile) // i.e. has spilt
             {
             {
-                owner->clearSpillingCallback();
+                block.clearCB = true;
                 assertex(((offset_t)-1) != outputOffset);
                 assertex(((offset_t)-1) != outputOffset);
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (owner->preserveNulls)
                 if (owner->preserveNulls)
@@ -289,16 +324,16 @@ public:
     CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority)
     CSharedSpillableRowSet(CActivityBase &_activity, CThorSpillableRowArray &inRows, IRowInterfaces *_rowIf, bool _preserveNulls, unsigned _spillPriority)
         : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls, _spillPriority)
         : CSpillableStreamBase(_activity, inRows, _rowIf, _preserveNulls, _spillPriority)
     {
     {
-        activity.queryJob().queryRowManager()->addRowBuffer(this);
+        addSpillingCallback();
     }
     }
     IRowStream *createRowStream()
     IRowStream *createRowStream()
     {
     {
         {
         {
             // already spilled?
             // already spilled?
-            CThorArrayLockBlock block(rows);
+            CRowsLockBlock block(*this);
             if (spillFile)
             if (spillFile)
             {
             {
-                clearSpillingCallback();
+                block.clearCB = true;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (preserveNulls)
                 if (preserveNulls)
                     rwFlags |= rw_grouped;
                     rwFlags |= rw_grouped;
@@ -329,7 +364,7 @@ public:
         // a small amount of rows to read from swappable rows
         // a small amount of rows to read from swappable rows
         roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
         roxiemem::IRowManager *rowManager = activity.queryJob().queryRowManager();
         readRows = static_cast<const void * *>(rowManager->allocate(granularity * sizeof(void*), activity.queryContainer().queryId(), inRows.queryDefaultMaxSpillCost()));
         readRows = static_cast<const void * *>(rowManager->allocate(granularity * sizeof(void*), activity.queryContainer().queryId(), inRows.queryDefaultMaxSpillCost()));
-        activity.queryJob().queryRowManager()->addRowBuffer(this);
+        addSpillingCallback();
     }
     }
     ~CSpillableStream()
     ~CSpillableStream()
     {
     {
@@ -348,10 +383,10 @@ public:
             return spillStream->nextRow();
             return spillStream->nextRow();
         if (pos == numReadRows)
         if (pos == numReadRows)
         {
         {
-            CThorArrayLockBlock block(rows);
+            CRowsLockBlock block(*this);
             if (spillFile)
             if (spillFile)
             {
             {
-                clearSpillingCallback();
+                block.clearCB = true;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 unsigned rwFlags = DEFAULT_RWFLAGS;
                 if (preserveNulls)
                 if (preserveNulls)
                     rwFlags |= rw_grouped;
                     rwFlags |= rw_grouped;
@@ -362,13 +397,16 @@ public:
             }
             }
             rowidx_t available = rows.numCommitted();
             rowidx_t available = rows.numCommitted();
             if (0 == available)
             if (0 == available)
+            {
+                block.clearCB = true;
                 return NULL;
                 return NULL;
+            }
             rowidx_t fetch = (available >= granularity) ? granularity : available;
             rowidx_t fetch = (available >= granularity) ? granularity : available;
             // consume 'fetch' rows
             // consume 'fetch' rows
             rows.readBlock(readRows, fetch);
             rows.readBlock(readRows, fetch);
             if (available == fetch)
             if (available == fetch)
             {
             {
-                clearSpillingCallback();
+                block.clearCB = true;
                 rows.kill();
                 rows.kill();
             }
             }
             numReadRows = fetch;
             numReadRows = fetch;
@@ -379,7 +417,10 @@ public:
         ++pos;
         ++pos;
         return row;
         return row;
     }
     }
-    virtual void stop() { }
+    virtual void stop()
+    {
+        clearSpillingCallback();
+    }
 };
 };
 
 
 //====
 //====
@@ -806,12 +847,12 @@ bool CThorExpandingRowArray::binaryInsert(const void *row, ICompare &compare, bo
     binary_vec_insert_stable(row, rows, numRows, compare); // takes ownership of row
     binary_vec_insert_stable(row, rows, numRows, compare); // takes ownership of row
     if (dropLast)
     if (dropLast)
     {
     {
-    	// last row falls out, i.e. release last row and don't increment numRows
-    	dbgassertex(numRows); // numRows must be >=1 for dropLast
-    	ReleaseThorRow(rows[numRows]);
+        // last row falls out, i.e. release last row and don't increment numRows
+        dbgassertex(numRows); // numRows must be >=1 for dropLast
+        ReleaseThorRow(rows[numRows]);
     }
     }
     else
     else
-    	++numRows;
+        ++numRows;
     return true;
     return true;
 }
 }