浏览代码

Merge branch 'candidate-5.2.4' into candidate-5.4.0

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父节点
当前提交
ea635ee0cd

+ 47 - 8
common/remote/sockfile.cpp

@@ -3251,18 +3251,49 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
         {
             if (selecthandled) 
                 processCommand(); // buffer already filled
-            else {
-                while (parent->threadRunningCount()<=TARGET_ACTIVE_THREADS) { // if too many threads add to select handler
-                    int w = socket->wait_read(1000);
+            else
+            {
+                while (parent->threadRunningCount()<=TARGET_ACTIVE_THREADS) // if too many threads add to select handler
+                {
+                    int w;
+                    try
+                    {
+                        w = socket->wait_read(1000);
+                    }
+                    catch (IException *e)
+                    {
+                        EXCLOG(e, "CRemoteClientHandler::main wait_read error");
+                        e->Release();
+                        parent->onCloseSocket(this,1);
+                        return;
+                    }
                     if (w==0)
                         break;
-                    if ((w<0)||!immediateCommand()) {
+                    if ((w<0)||!immediateCommand())
+                    {
                         if (w<0) 
                             WARNLOG("CRemoteClientHandler::main wait_read error");
                         parent->onCloseSocket(this,1);
                         return;
                     }
                 }
+
+                /* This is a bit confusing..
+                 * The addClient below, adds this request to a selecthandler handled by another thread
+                 * and passes ownership of 'this' (CRemoteClientHandler)
+                 *
+                 * When notified, the selecthandler will launch a new pool thread to handle the request
+                 * If the pool thread limit is hit, the selecthandler will be blocked [ see comment in CRemoteFileServer::notify() ]
+                 *
+                 * Either way, a thread pool slot is occupied when processing a request.
+                 * Blocked threads, will be blocked for up to 1 minute (as defined by createThreadPool call)
+                 * IOW, if there are lots of incoming clients that can't be serviced by the CThrottle limit,
+                 * a large number of pool threads will build up after a while.
+                 *
+                 * The CThrottler mechanism, imposes a further hard limit on how many concurrent request threads can be active.
+                 * If the thread pool had an absolute limit (instead of just introducing a delay), then I don't see the point
+                 * in this additional layer of throttling..
+                 */
                 selecthandled = true;
                 parent->addClient(this);    // add to select handler
             }
@@ -3356,6 +3387,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
             catch (IException *e) {
                 // suppress some more errors clearing client
                 EXCLOG(e,"cCommandProcessor::main(2)");
+                e->Release();
             }
         }
         bool stop()
@@ -3416,6 +3448,7 @@ public:
 #endif
 
         INFINITE,TARGET_MIN_THREADS));
+        threads->setStartDelayTracing(60); // trace amount delayed every minute.
         stopping = false;
         clientcounttick = msTick();
         closedclients = 0;
@@ -4557,7 +4590,6 @@ public:
     {
         if (listenep.isNull())
             acceptsock.setown(ISocket::create(listenep.port));
-
         else {
             StringBuffer ips;
             listenep.getIpText(ips);
@@ -4660,13 +4692,15 @@ public:
                     }
                     if (!sock||stopping)
                         break;
+                    runClient(sock.getClear());
                 }
                 catch (IException *e) {
                     EXCLOG(e,"CRemoteFileServer");
                     e->Release();
-                    break;
+                    sock.clear();
+                    if (!QUERYINTERFACE(e, IJSOCK_Exception))
+                        break;
                 }
-                runClient(sock.getClear());
             }
             else
                 checkTimeout();
@@ -4780,6 +4814,7 @@ public:
             params.client->Link();
             clients.append(*params.client);
         }
+        // NB: This could be blocked, by thread pool limit
         threads->start(&params);
     }
 
@@ -4805,11 +4840,15 @@ public:
         if (client->buf.length()) {
             cCommandProcessor::cCommandProcessorParams params;
             params.client = client.getClear();
+
+            /* This can block because the thread pool is full and therefore block the selecthandler
+             * This is akin to the main server blocking post accept() for the same reason.
+             */
             threads->start(&params);
         }
         else 
             onCloseSocket(client,3);    // removes owned handles
-        
+
         return false;
     }
 

+ 255 - 2
docs/ECLLanguageReference/ECLR_mods/BltInFunc-OUTPUT.xml

@@ -50,6 +50,20 @@
   <para><emphasis role="bold">[</emphasis><emphasis>attr</emphasis>
   :=<emphasis role="bold"> ]
   OUTPUT(</emphasis><emphasis>recordset</emphasis><emphasis role="bold">,
+  [</emphasis><emphasis> format </emphasis><emphasis
+  role="bold">]</emphasis><emphasis> , file </emphasis><emphasis role="bold">
+  </emphasis><emphasis>,</emphasis><emphasis role="bold"> JSON<indexterm>
+      <primary>JSON</primary>
+    </indexterm> [ (</emphasis><emphasis>jsonoptions</emphasis><emphasis
+  role="bold">) ]</emphasis><emphasis role="bold"> </emphasis><emphasis
+  role="bold">[</emphasis><emphasis>jsonfileoptions </emphasis><emphasis
+  role="bold"> ] </emphasis><emphasis
+  role="bold">[</emphasis><emphasis>,</emphasis><emphasis role="bold"> NOXPATH
+  ] );</emphasis></para>
+
+  <para><emphasis role="bold">[</emphasis><emphasis>attr</emphasis>
+  :=<emphasis role="bold"> ]
+  OUTPUT(</emphasis><emphasis>recordset</emphasis><emphasis role="bold">,
   [</emphasis><emphasis> format </emphasis><emphasis role="bold">]
   </emphasis><emphasis> ,</emphasis><emphasis role="bold">PIPE<indexterm>
       <primary>PIPE</primary>
@@ -149,8 +163,8 @@
             <emphasis>format</emphasis> or the RECORD structure of the
             <emphasis>recordset</emphasis> are ignored and field names are
             used instead. This allows control of whether XPATHs are used for
-            output, so that XPATHs that were meant only for xml input can be
-            ignored for output.</entry>
+            output, so that XPATHs that were meant only for xml or json input
+            can be ignored for output.</entry>
           </row>
 
           <row>
@@ -197,6 +211,28 @@
           </row>
 
           <row>
+            <entry><emphasis role="bold">JSON</emphasis></entry>
+
+            <entry>Specifies the file is output as JSON data with the name of
+            each field in the format becoming the JSON tag for that field's
+            data.</entry>
+          </row>
+
+          <row>
+            <entry><emphasis>jsonoptions</emphasis></entry>
+
+            <entry>Optional. A comma separated list of options that define how
+            the output JSON file is delimited.</entry>
+          </row>
+
+          <row>
+            <entry><emphasis>jsonfileoptions</emphasis></entry>
+
+            <entry>Optional. A comma-delimited list of options valid for an
+            JSON file (see the section below for details).</entry>
+          </row>
+
+          <row>
             <entry><emphasis role="bold">PIPE</emphasis></entry>
 
             <entry>Indicates the specified command executes with the
@@ -923,6 +959,223 @@ OUTPUT(B,,'fred3.xml',XML('MyRow',TRIM,OPT));
 </programlisting>
   </sect2>
 
+  <sect2 id="OUTPUT_XML_Files">
+    <title>OUTPUT JSON Files<indexterm>
+        <primary>JSON Files</primary>
+      </indexterm><indexterm>
+        <primary>OUTPUT - JSON Files</primary>
+      </indexterm></title>
+
+    <para><emphasis role="bold">[</emphasis><emphasis>attr</emphasis>
+    :=<emphasis role="bold"> ] OUTPUT<indexterm>
+        <primary>OUTPUT</primary>
+      </indexterm>(</emphasis><emphasis>recordset</emphasis><emphasis
+    role="bold">, [</emphasis><emphasis> format </emphasis><emphasis
+    role="bold">]</emphasis><emphasis> ,file </emphasis><emphasis role="bold">
+    </emphasis><emphasis>,</emphasis><emphasis role="bold">JSON<indexterm>
+        <primary>JSON</primary>
+      </indexterm> [ (</emphasis><emphasis>jsonoptions</emphasis><emphasis
+    role="bold">) ]</emphasis><emphasis role="bold"> [,ENCRYPT<indexterm>
+        <primary>ENCRYPT</primary>
+      </indexterm>(</emphasis><emphasis> key </emphasis><emphasis
+    role="bold">) ] [, CLUSTER<indexterm>
+        <primary>CLUSTER</primary>
+      </indexterm>(</emphasis><emphasis> target </emphasis><emphasis
+    role="bold">) ] [</emphasis><emphasis>,</emphasis><emphasis role="bold">
+    OVERWRITE<indexterm>
+        <primary>OVERWRITE</primary>
+      </indexterm> ]</emphasis><emphasis role="bold"><emphasis role="bold">[,
+    UPDATE<indexterm>
+        <primary>UPDATE</primary>
+      </indexterm>]</emphasis> [</emphasis><emphasis>,</emphasis><emphasis
+    role="bold"> EXPIRE<indexterm>
+        <primary>EXPIRE</primary>
+      </indexterm>( [ </emphasis><emphasis>days </emphasis><emphasis
+    role="bold">] ) ] )</emphasis></para>
+
+    <para><informaltable colsep="1" frame="all" rowsep="1">
+        <tgroup cols="2">
+          <colspec colwidth="93.80pt" />
+
+          <colspec />
+
+          <tbody>
+            <row>
+              <entry><emphasis role="bold">CLUSTER</emphasis></entry>
+
+              <entry>Optional. Specifies writing the file to the specified
+              list of target clusters. If omitted, the file is written to the
+              cluster on which the workunit executes. The number of physical
+              file parts written to disk is always determined by the number of
+              nodes in the cluster on which the workunit executes, regardless
+              of the number of nodes on the target cluster(s).</entry>
+            </row>
+
+            <row>
+              <entry><emphasis>target</emphasis></entry>
+
+              <entry>A comma-delimited list of string constants containing the
+              names of the clusters to write the file to. The names must be
+              listed as they appear on the ECL Watch Activity page or returned
+              by the Std.System.Thorlib.Group() function, optionally with
+              square brackets containing a comma-delimited list of
+              node-numbers (1-based) and/or ranges (specified with a dash, as
+              in n-m) to indicate the specific set of nodes to write
+              to.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">ENCRYPT</emphasis></entry>
+
+              <entry>Optional. Specifies writing the file to disk using both
+              256-bit AES encryption and LZW compression.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis>key</emphasis></entry>
+
+              <entry>A string constant containing the encryption key to use to
+              encrypt the data.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">OVERWRITE</emphasis></entry>
+
+              <entry>Optional. Specifies overwriting the file if it already
+              exists.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">UPDATE</emphasis></entry>
+
+              <entry>Specifies that the file should be rewritten only if the
+              code or input data has changed.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">EXPIRE</emphasis></entry>
+
+              <entry>Optional. Specifies the file is a temporary file that may
+              be automatically deleted after the specified number of
+              days.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis>days</emphasis></entry>
+
+              <entry>Optional. The number of days after which the file may be
+              automatically deleted. If omitted, the default is seven
+              (7).</entry>
+            </row>
+          </tbody>
+        </tgroup>
+      </informaltable></para>
+
+    <para>This form writes the <emphasis>recordset</emphasis> to the specified
+    <emphasis>file</emphasis> as JSON data with the name of each field in the
+    specified <emphasis>format</emphasis> becoming the JSON tag for that
+    field's data. The valid set of <emphasis>jsonoptions</emphasis>
+    are:</para>
+
+    <para><emphasis
+    role="bold">‘</emphasis><emphasis>rowtag</emphasis><emphasis
+    role="bold">'</emphasis></para>
+
+    <para><emphasis role="bold">HEADING<indexterm>
+        <primary>HEADING</primary>
+      </indexterm>( </emphasis><emphasis>headertext </emphasis><emphasis
+    role="bold">[</emphasis><emphasis>, footertext </emphasis><emphasis
+    role="bold">] )</emphasis></para>
+
+    <para><emphasis role="bold">TRIM<indexterm>
+        <primary>TRIM</primary>
+      </indexterm></emphasis><emphasis role="bold"> </emphasis></para>
+
+    <para><emphasis role="bold">OPT<indexterm>
+        <primary>OPT</primary>
+      </indexterm><indexterm>
+        <primary>TRIM OPT</primary>
+      </indexterm></emphasis></para>
+
+    <para><informaltable colsep="1" frame="all" rowsep="1">
+        <tgroup cols="2">
+          <colspec colwidth="84.45pt" />
+
+          <colspec />
+
+          <tbody>
+            <row>
+              <entry><emphasis>rowtag</emphasis></entry>
+
+              <entry>The text to place in record delimiting tag.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">HEADING</emphasis></entry>
+
+              <entry>Specifies placing header and footer records in the
+              file.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis>headertext</emphasis></entry>
+
+              <entry>The text of the header record to place in the
+              file.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis>footertext</emphasis></entry>
+
+              <entry>The text of the footer record to place in the
+              file.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">TRIM</emphasis></entry>
+
+              <entry>Specifies removing trailing blanks from string fields
+              before output.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">OPT</emphasis></entry>
+
+              <entry>Specifies omitting tags for any empty string field from
+              the output.</entry>
+            </row>
+          </tbody>
+        </tgroup>
+      </informaltable></para>
+
+    <para>If no <emphasis>jsonoptions</emphasis> are specified, the defaults
+    are:</para>
+
+    <programlisting>         JSON('Row',HEADING('[',']'))</programlisting>
+
+    <para>Example:</para>
+
+    <programlisting>R := {STRING10 fname,STRING12 lname};
+B := DATASET([{'Fred','Bell'},{'George','Blanda'},{'Sam',''}],R);
+
+OUTPUT(B,,'fred1.json', JSON); // writes B to the fred1.json file
+/* the Fred1.json file looks like this:
+{"Row": [
+{"fname": "Fred      ", "lname": "Bell        "},
+{"fname": "George    ", "lname": "Blanda      "},
+{"fname": "Sam       ", "lname": "            "}
+]}
+*/
+OUTPUT(B,,'fred2.json',JSON('MyResult', HEADING('[', ']')));
+/* the Fred2.json file looks like this:
+["MyResult": [
+{"fname": "Fred      ", "lname": "Bell        "},
+{"fname": "George    ", "lname": "Blanda      "},
+{"fname": "Sam       ", "lname": "            "}
+]]
+</programlisting>
+  </sect2>
+
   <sect2 id="OUTPUT_PIPE_Files">
     <title>OUTPUT PIPE Files<indexterm>
         <primary>OUTPUT Pipe Files</primary>

+ 3 - 0
ecl/hqlcpp/hqlcppds.cpp

@@ -4413,6 +4413,9 @@ void HqlCppTranslator::buildRowAssign(BuildCtx & ctx, IReferenceSelector * targe
     case no_null:
         doBuildRowAssignNullRow(ctx, target, expr);
         return;
+    case no_nofold:
+        buildRowAssign(ctx, target, expr->queryChild(0));
+        return;
     case no_serialize:
         {
             IHqlExpression * deserialized = expr->queryChild(0);

+ 7 - 0
ecl/hqlcpp/hqlcset.cpp

@@ -1895,6 +1895,13 @@ void LinkedDatasetBuilderBase::buildFinish(BuildCtx & ctx, CHqlBoundExpr & bound
     bound.count.setown(createQuoted(s.str(), LINK(unsignedType)));
     s.clear().append(instanceName).append(".queryrows()");
     bound.expr.setown(createQuoted(s.str(), makeReferenceModifier(dataset->getType())));
+    if (!ctx.isOuterContext() && ctx.queryMatchExpr(queryConditionalRowMarker()))
+    {
+        //If processing a conditional row, create a dataset object (at the outer-most) level
+        //and assign to that, to ensure the dataset doesn't go out of scope.
+        OwnedHqlExpr translated = bound.getTranslatedExpr();
+        translator.buildTempExpr(ctx, translated, bound);
+    }
 }
 
 bool LinkedDatasetBuilderBase::buildLinkRow(BuildCtx & ctx, BoundRow * sourceRow)

+ 25 - 0
ecl/hqlcpp/hqlstmt.cpp

@@ -628,6 +628,31 @@ bool BuildCtx::hasAssociation(HqlExprAssociation & search, bool unconditional)
 }
 
 
+bool BuildCtx::isOuterContext() const
+{
+    HqlStmts * searchStmts = curStmts;
+    loop
+    {
+        HqlStmt * owner = searchStmts->owner;
+        if (!owner)
+            return true;
+
+        switch (owner->getStmt())
+        {
+        case quote_compound_stmt:
+        case quote_compoundopt_stmt:
+        case indirect_stmt:
+            return true;
+        case group_stmt:
+            break;
+        default:
+            return false;
+        }
+
+        searchStmts = owner->queryContainer();
+    }
+}
+
 HqlExprAssociation * BuildCtx::queryAssociation(IHqlExpression * search, AssocKind kind, HqlExprCopyArray * selectors)
 {
     HqlStmts * searchStmts = curStmts;

+ 1 - 0
ecl/hqlcpp/hqlstmt.hpp

@@ -136,6 +136,7 @@ public:
     HqlExprAssociation *        queryMatchExpr(IHqlExpression * expr);
     bool                        getMatchExpr(IHqlExpression * expr, CHqlBoundExpr & bound);
     IHqlExpression *            getTempDeclare(ITypeInfo * type, IHqlExpression * value);
+    bool                        isOuterContext() const;
     void                        needFunction(IFunctionInfo & helper);
     void                        needFunction(IAtom * name);
     void                        removeAssociation(HqlExprAssociation * search);

+ 21 - 13
esp/src/eclwatch/LFDetailsWidget.js

@@ -98,6 +98,27 @@ define([
             this.dfuWorkunitWidget = registry.byId(this.id + "_DFUWorkunit");
             this.copyTargetSelect = registry.byId(this.id + "CopyTargetSelect");
             this.desprayTargetSelect = registry.byId(this.id + "DesprayTargetSelect");
+            this.desprayTooltiopDialog = registry.byId(this.id + "DesprayTooltipDialog");
+            var context = this;
+            var origOnOpen = this.desprayTooltiopDialog.onOpen;
+            this.desprayTooltiopDialog.onOpen = function () {
+                if (!context.desprayTargetSelect.initalized) {
+                    context.desprayTargetSelect.init({
+                        DropZones: true,
+                        callback: function (value, item) {
+                            context.updateInput("DesprayTargetIPAddress", null, item.machine.Netaddress);
+                            context.updateInput("DesprayTargetName", null, context.logicalFile.getLeaf());
+                            if (context.desprayTargetPath) {
+                                context.desprayTargetPath.reset();
+                                context.desprayTargetPath._dropZoneTarget = item;
+                                context.desprayTargetPath.defaultValue = context.desprayTargetPath.get("value");
+                                context.desprayTargetPath.loadDropZoneFolders();
+                            }
+                        }
+                    });
+                }
+                origOnOpen.apply(context.desprayTooltiopDialog, arguments);
+            }
             this.desprayTargetPath = registry.byId(this.id + "DesprayTargetPath");
             this.fileBelongsToWidget = registry.byId(this.id + "_FileBelongs");
         },
@@ -192,19 +213,6 @@ define([
             this.copyTargetSelect.init({
                 Groups: true
             });
-            this.desprayTargetSelect.init({
-                DropZones: true,
-                callback: function (value, item) {
-                    context.updateInput("DesprayTargetIPAddress", null, item.machine.Netaddress);
-                    context.updateInput("DesprayTargetName", null, context.logicalFile.getLeaf());
-                    if (context.desprayTargetPath) {
-                        context.desprayTargetPath.reset();
-                        context.desprayTargetPath._dropZoneTarget = item;
-                        context.desprayTargetPath.defaultValue = context.desprayTargetPath.get("value");
-                        context.desprayTargetPath.loadDropZoneFolders();
-                    }
-                }
-            });
             this.desprayTargetPath.init({
                 DropZoneFolders: true
             });

+ 2 - 1
esp/src/eclwatch/TargetSelectWidget.js

@@ -157,7 +157,8 @@ define([
                     Netaddr: Netaddr,
                     Path: Path,
                     OS: OS
-                }
+                },
+                suppressExceptionToaster: true
             }).then(function (response) {
                 var requests = [];
                 if (lang.exists("FileListResponse.files.PhysicalFileStruct", response)) {

+ 1 - 1
esp/src/eclwatch/templates/LFDetailsWidget.html

@@ -60,7 +60,7 @@
                     </div>
                     <div id="${id}DesprayDropDown" data-dojo-type="dijit.form.DropDownButton">
                         <span>${i18n.Despray}</span>
-                        <div data-dojo-type="dijit.TooltipDialog">
+                        <div id="${id}DesprayTooltipDialog" data-dojo-type="dijit.TooltipDialog">
                             <div id="${id}DesprayForm" style="width: 460px;" onsubmit="return false;" data-dojo-type="dijit.form.Form">
                                 <div data-dojo-type="dijit.Fieldset">
                                     <legend>${i18n.Target}</legend>

+ 7 - 0
roxie/roxiemem/roxiemem.cpp

@@ -27,6 +27,7 @@
 
 #ifdef _DEBUG
 #define _CLEAR_ALLOCATED_ROW
+#define _CLEAR_FREED_ROW
 //#define _CLEAR_ALLOCATED_HUGE_ROW
 #endif
 
@@ -1250,6 +1251,9 @@ public:
                 allocatorCache->onDestroy(id & MAX_ACTIVITY_ID, ptr + chunkHeaderSize);
             }
 
+#ifdef _CLEAR_FREED_ROW
+            memset((void *)_ptr, 0xdd, chunkCapacity);
+#endif
             inlineReleasePointer(ptr);
         }
     }
@@ -1482,6 +1486,9 @@ public:
             if (rowCount & ROWCOUNT_DESTRUCTOR_FLAG)
                 allocatorCache->onDestroy(sharedAllocatorId & MAX_ACTIVITY_ID, ptr + chunkHeaderSize);
 
+#ifdef _CLEAR_FREED_ROW
+            memset((void *)_ptr, 0xdd, chunkCapacity);
+#endif
             inlineReleasePointer(ptr);
         }
     }

+ 19 - 12
system/jlib/jsocket.cpp

@@ -1200,6 +1200,7 @@ bool CSocket::connect_timeout( unsigned timeout, bool noexception)
         if ((err == EINPROGRESS)||(err == EWOULDBLOCK)) {
             T_FD_SET fds;
             struct timeval tv;
+            CHECKSOCKRANGE(sock);
             XFD_ZERO(&fds);
             FD_SET((unsigned)sock, &fds);
             T_FD_SET except;
@@ -1207,7 +1208,6 @@ bool CSocket::connect_timeout( unsigned timeout, bool noexception)
             FD_SET((unsigned)sock, &except);
             tv.tv_sec = remaining / 1000;
             tv.tv_usec = (remaining % 1000)*1000;
-            CHECKSOCKRANGE(sock);
             int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
             if (rc>0) {
                 // select succeeded - return error from socket (0 if connected)
@@ -1306,6 +1306,7 @@ void CSocket::connect_wait(unsigned timems)
             while (!blockselect && ((err == EINPROGRESS)||(err == EWOULDBLOCK))) {
                 T_FD_SET fds;
                 struct timeval tv;
+                CHECKSOCKRANGE(sock);
                 XFD_ZERO(&fds);
                 FD_SET((unsigned)sock, &fds);
                 T_FD_SET except;
@@ -1318,7 +1319,6 @@ void CSocket::connect_wait(unsigned timems)
                 tv.tv_sec = 0;
                 tv.tv_usec = 0;
     #endif
-                CHECKSOCKRANGE(sock);
                 int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
                 if (rc>0) {
                     // select succeeded - return error from socket (0 if connected)
@@ -1441,9 +1441,9 @@ int CSocket::wait_read(unsigned timeout)
     int ret = 0;
     while (sock!=INVALID_SOCKET) {
         T_FD_SET fds;
+        CHECKSOCKRANGE(sock);
         XFD_ZERO(&fds);
         FD_SET((unsigned)sock, &fds);
-        CHECKSOCKRANGE(sock);
         if (timeout==WAIT_FOREVER) {
             ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, NULL );
         }
@@ -1471,9 +1471,9 @@ int CSocket::wait_write(unsigned timeout)
     int ret = 0;
     while (sock!=INVALID_SOCKET) {
         T_FD_SET fds;
+        CHECKSOCKRANGE(sock);
         XFD_ZERO(&fds);
         FD_SET((unsigned)sock, &fds);
-        CHECKSOCKRANGE(sock);
         if (timeout==WAIT_FOREVER) {
             ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, NULL );
         }
@@ -3690,12 +3690,12 @@ public:
         }
         T_FD_SET fds;
         struct timeval tv;
+        CHECKSOCKRANGE(sock);
         XFD_ZERO(&fds);
         FD_SET((unsigned)sock, &fds);
         //FD_SET((unsigned)sock, &except);
         tv.tv_sec = 0;
         tv.tv_usec = 0;
-        CHECKSOCKRANGE(sock);
         int rc = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv );
         if (rc<0) {
             StringBuffer sockstr;
@@ -4742,13 +4742,18 @@ public:
 
     void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
     {
-        CriticalBlock block(sect);
-        epollthread->add(sock,mode,nfy);
+        CriticalBlock block(sect); // JCS->MK - are these blocks necessary? epollthread->add() uses it's own CS.
+
+        /* JCS->MK, the CSocketSelectHandler variety, checks result of thread->add and spins up another handler
+         * Shouldn't epoll version do the same?
+         */
+        if (!epollthread->add(sock,mode,nfy))
+            throw MakeStringException(-1, "CSocketEpollHandler: failed to add socket to epollthread handler: sock # = %d", sock->OShandle());
     }
 
     void remove(ISocket *sock)
     {
-        CriticalBlock block(sect);
+        CriticalBlock block(sect); // JCS->MK - are these blocks necessary? epollthread->add() uses it's own CS.
         epollthread->remove(sock);
     }
 
@@ -6003,7 +6008,7 @@ public:
                 if ((err == EINPROGRESS)||(err == EWOULDBLOCK))
                     err = 0; // continue
                 else {
-                    if (err==0) 
+                    if (err==0)
                         connectdone = true; // done immediately
                     else if(!oneshot) //  probably ECONNREFUSED but treat all errors same
                         refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound
@@ -6011,6 +6016,7 @@ public:
             }
             if (!connectdone&&(err==0)) {
                 SOCKET s = sock->sock;
+                CHECKSOCKRANGE(s);
                 T_FD_SET fds;
                 struct timeval tv;
                 XFD_ZERO(&fds);
@@ -6020,7 +6026,6 @@ public:
                 FD_SET((unsigned)s, &except);
                 tv.tv_sec = remaining / 1000;
                 tv.tv_usec = (remaining % 1000)*1000;
-                CHECKSOCKRANGE(s);
                 int rc = ::select( s + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
                 if (rc==0) 
                     break; // timeout
@@ -6091,8 +6096,10 @@ int wait_multiple(bool isRead,               //IN   true if wait read, false it
 #ifdef _DEBUG
         dbgSB.appendf(" %d",socks.item(idx));
 #endif
-        maxSocket = socks.item(idx) > maxSocket ? socks.item(idx) : maxSocket;
-        FD_SET((unsigned)socks.item(idx), &fds);
+        SOCKET s = socks.item(idx);
+        CHECKSOCKRANGE(s);
+        maxSocket = s > maxSocket ? s : maxSocket;
+        FD_SET((unsigned)s, &fds);
     }
 #ifdef _DEBUG
     DBGLOG("%s",dbgSB.str());

+ 21 - 12
system/jlib/jthread.cpp

@@ -928,23 +928,32 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
         PooledThreadHandle ret;
         {
             CriticalBlock block(crit);
-            if (timedout&&!availsem.wait(0)) {  // make sure take allocated sem if has become available
-                if (noBlock || timeout > 0)
-                    throw MakeStringException(0, "No threads available in pool %s", poolname.get());
-                WARNLOG("Pool limit exceeded for %s", poolname.get());
+            if (timedout)
+            {
+                if (!availsem.wait(0)) {  // make sure take allocated sem if has become available
+                    if (noBlock || timeout > 0)
+                        throw MakeStringException(0, "No threads available in pool %s", poolname.get());
+                    WARNLOG("Pool limit exceeded for %s", poolname.get());
+                }
+                else
+                    timedout = false;
             }
             if (traceStartDelayPeriod)
             {
                 ++startsInPeriod;
-                startDelayInPeriod += startTimer.elapsedCycles();
-                if (overAllTimer.elapsedCycles() >= queryOneSecCycles()*traceStartDelayPeriod) // check avg. delay per minute
+                if (timedout)
                 {
-                    cycle_t avg = startDelayInPeriod/startsInPeriod;
-                    unsigned avgMs = static_cast<unsigned>(cycle_to_nanosec(avg)/1000000);
-                    PROGLOG("%s: %d threads started in last %d seconds, average delay = %d milliseconds", poolname.get(), startsInPeriod, traceStartDelayPeriod, avgMs);
-                    startsInPeriod = 0;
-                    startDelayInPeriod = 0;
-                    overAllTimer.reset();
+                    startDelayInPeriod += startTimer.elapsedCycles();
+                    if (overAllTimer.elapsedCycles() >= queryOneSecCycles()*traceStartDelayPeriod) // check avg. delay per minute
+                    {
+                        double totalDelayMs = (static_cast<double>(cycle_to_nanosec(startDelayInPeriod)))/1000000;
+                        double avgDelayMs = (static_cast<double>(cycle_to_nanosec(startDelayInPeriod/startsInPeriod)))/1000000;
+                        unsigned totalElapsedSecs = overAllTimer.elapsedMs()/1000;
+                        PROGLOG("%s: %u threads started in last %u seconds, total delay = %0.2f milliseconds, average delay = %0.2f milliseconds, currently running = %u", poolname.get(), startsInPeriod, totalElapsedSecs, totalDelayMs, avgDelayMs, runningCount());
+                        startsInPeriod = 0;
+                        startDelayInPeriod = 0;
+                        overAllTimer.reset();
+                    }
                 }
             }
             CPooledThreadWrapper &t = allocThread();

+ 43 - 0
testing/regress/ecl/issue13590.ecl

@@ -0,0 +1,43 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+idRec := { STRING id; };
+
+one := '1' : stored('one');
+two := '2' : stored('two');
+three := '3' : stored('three');
+
+ids(STRING base) := NOFOLD(DATASET([one+base,two+base], idRec));
+
+rRec := RECORD
+    STRING x;
+    DATASET(idRec) y;
+END;
+
+ds := DATASET(['92','67','56','23'], idRec);
+
+rRec t(idRec l) := TRANSFORM
+    y1 := DATASET([three], idRec);
+    ids := ids(l.id);
+    y2 := NOFOLD(IF(ids[1].id >= '0', ids[1], ids[2]));
+    y3 := NOFOLD(IF(l.id > '60', y2, y1[1]));
+    SELF.x := l.id;
+    SELF.y := y1 & y3;
+END;
+
+OUTPUT(PROJECT(ds, t(LEFT)));

+ 6 - 0
testing/regress/ecl/key/issue13590.xml

@@ -0,0 +1,6 @@
+<Dataset name='Result 1'>
+ <Row><x>92</x><y><Row><id>3</id></Row><Row><id>192</id></Row></y></Row>
+ <Row><x>67</x><y><Row><id>3</id></Row><Row><id>167</id></Row></y></Row>
+ <Row><x>56</x><y><Row><id>3</id></Row><Row><id>3</id></Row></y></Row>
+ <Row><x>23</x><y><Row><id>3</id></Row><Row><id>3</id></Row></y></Row>
+</Dataset>