瀏覽代碼

Merge branch 'candidate-5.4.0'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父節點
當前提交
6c0e526e62

+ 47 - 8
common/remote/sockfile.cpp

@@ -3251,18 +3251,49 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
         {
         {
             if (selecthandled) 
             if (selecthandled) 
                 processCommand(); // buffer already filled
                 processCommand(); // buffer already filled
-            else {
-                while (parent->threadRunningCount()<=TARGET_ACTIVE_THREADS) { // if too many threads add to select handler
-                    int w = socket->wait_read(1000);
+            else
+            {
+                while (parent->threadRunningCount()<=TARGET_ACTIVE_THREADS) // if too many threads add to select handler
+                {
+                    int w;
+                    try
+                    {
+                        w = socket->wait_read(1000);
+                    }
+                    catch (IException *e)
+                    {
+                        EXCLOG(e, "CRemoteClientHandler::main wait_read error");
+                        e->Release();
+                        parent->onCloseSocket(this,1);
+                        return;
+                    }
                     if (w==0)
                     if (w==0)
                         break;
                         break;
-                    if ((w<0)||!immediateCommand()) {
+                    if ((w<0)||!immediateCommand())
+                    {
                         if (w<0) 
                         if (w<0) 
                             WARNLOG("CRemoteClientHandler::main wait_read error");
                             WARNLOG("CRemoteClientHandler::main wait_read error");
                         parent->onCloseSocket(this,1);
                         parent->onCloseSocket(this,1);
                         return;
                         return;
                     }
                     }
                 }
                 }
+
+                /* This is a bit confusing..
+                 * The addClient below, adds this request to a selecthandler handled by another thread
+                 * and passes ownership of 'this' (CRemoteClientHandler)
+                 *
+                 * When notified, the selecthandler will launch a new pool thread to handle the request
+                 * If the pool thread limit is hit, the selecthandler will be blocked [ see comment in CRemoteFileServer::notify() ]
+                 *
+                 * Either way, a thread pool slot is occupied when processing a request.
+                 * Blocked threads, will be blocked for up to 1 minute (as defined by createThreadPool call)
+                 * IOW, if there are lots of incoming clients that can't be serviced by the CThrottle limit,
+                 * a large number of pool threads will build up after a while.
+                 *
+                 * The CThrottler mechanism, imposes a further hard limit on how many concurrent request threads can be active.
+                 * If the thread pool had an absolute limit (instead of just introducing a delay), then I don't see the point
+                 * in this additional layer of throttling..
+                 */
                 selecthandled = true;
                 selecthandled = true;
                 parent->addClient(this);    // add to select handler
                 parent->addClient(this);    // add to select handler
             }
             }
@@ -3356,6 +3387,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer, imple
             catch (IException *e) {
             catch (IException *e) {
                 // suppress some more errors clearing client
                 // suppress some more errors clearing client
                 EXCLOG(e,"cCommandProcessor::main(2)");
                 EXCLOG(e,"cCommandProcessor::main(2)");
+                e->Release();
             }
             }
         }
         }
         bool stop()
         bool stop()
@@ -3416,6 +3448,7 @@ public:
 #endif
 #endif
 
 
         INFINITE,TARGET_MIN_THREADS));
         INFINITE,TARGET_MIN_THREADS));
+        threads->setStartDelayTracing(60); // trace amount delayed every minute.
         stopping = false;
         stopping = false;
         clientcounttick = msTick();
         clientcounttick = msTick();
         closedclients = 0;
         closedclients = 0;
@@ -4557,7 +4590,6 @@ public:
     {
     {
         if (listenep.isNull())
         if (listenep.isNull())
             acceptsock.setown(ISocket::create(listenep.port));
             acceptsock.setown(ISocket::create(listenep.port));
-
         else {
         else {
             StringBuffer ips;
             StringBuffer ips;
             listenep.getIpText(ips);
             listenep.getIpText(ips);
@@ -4660,13 +4692,15 @@ public:
                     }
                     }
                     if (!sock||stopping)
                     if (!sock||stopping)
                         break;
                         break;
+                    runClient(sock.getClear());
                 }
                 }
                 catch (IException *e) {
                 catch (IException *e) {
                     EXCLOG(e,"CRemoteFileServer");
                     EXCLOG(e,"CRemoteFileServer");
                     e->Release();
                     e->Release();
-                    break;
+                    sock.clear();
+                    if (!QUERYINTERFACE(e, IJSOCK_Exception))
+                        break;
                 }
                 }
-                runClient(sock.getClear());
             }
             }
             else
             else
                 checkTimeout();
                 checkTimeout();
@@ -4780,6 +4814,7 @@ public:
             params.client->Link();
             params.client->Link();
             clients.append(*params.client);
             clients.append(*params.client);
         }
         }
+        // NB: This could be blocked, by thread pool limit
         threads->start(&params);
         threads->start(&params);
     }
     }
 
 
@@ -4805,11 +4840,15 @@ public:
         if (client->buf.length()) {
         if (client->buf.length()) {
             cCommandProcessor::cCommandProcessorParams params;
             cCommandProcessor::cCommandProcessorParams params;
             params.client = client.getClear();
             params.client = client.getClear();
+
+            /* This can block because the thread pool is full and therefore block the selecthandler
+             * This is akin to the main server blocking post accept() for the same reason.
+             */
             threads->start(&params);
             threads->start(&params);
         }
         }
         else 
         else 
             onCloseSocket(client,3);    // removes owned handles
             onCloseSocket(client,3);    // removes owned handles
-        
+
         return false;
         return false;
     }
     }
 
 

+ 255 - 2
docs/ECLLanguageReference/ECLR_mods/BltInFunc-OUTPUT.xml

@@ -50,6 +50,20 @@
   <para><emphasis role="bold">[</emphasis><emphasis>attr</emphasis>
   <para><emphasis role="bold">[</emphasis><emphasis>attr</emphasis>
   :=<emphasis role="bold"> ]
   :=<emphasis role="bold"> ]
   OUTPUT(</emphasis><emphasis>recordset</emphasis><emphasis role="bold">,
   OUTPUT(</emphasis><emphasis>recordset</emphasis><emphasis role="bold">,
+  [</emphasis><emphasis> format </emphasis><emphasis
+  role="bold">]</emphasis><emphasis> , file </emphasis><emphasis role="bold">
+  </emphasis><emphasis>,</emphasis><emphasis role="bold"> JSON<indexterm>
+      <primary>JSON</primary>
+    </indexterm> [ (</emphasis><emphasis>jsonoptions</emphasis><emphasis
+  role="bold">) ]</emphasis><emphasis role="bold"> </emphasis><emphasis
+  role="bold">[</emphasis><emphasis>jsonfileoptions </emphasis><emphasis
+  role="bold"> ] </emphasis><emphasis
+  role="bold">[</emphasis><emphasis>,</emphasis><emphasis role="bold"> NOXPATH
+  ] );</emphasis></para>
+
+  <para><emphasis role="bold">[</emphasis><emphasis>attr</emphasis>
+  :=<emphasis role="bold"> ]
+  OUTPUT(</emphasis><emphasis>recordset</emphasis><emphasis role="bold">,
   [</emphasis><emphasis> format </emphasis><emphasis role="bold">]
   [</emphasis><emphasis> format </emphasis><emphasis role="bold">]
   </emphasis><emphasis> ,</emphasis><emphasis role="bold">PIPE<indexterm>
   </emphasis><emphasis> ,</emphasis><emphasis role="bold">PIPE<indexterm>
       <primary>PIPE</primary>
       <primary>PIPE</primary>
@@ -149,8 +163,8 @@
             <emphasis>format</emphasis> or the RECORD structure of the
             <emphasis>format</emphasis> or the RECORD structure of the
             <emphasis>recordset</emphasis> are ignored and field names are
             <emphasis>recordset</emphasis> are ignored and field names are
             used instead. This allows control of whether XPATHs are used for
             used instead. This allows control of whether XPATHs are used for
-            output, so that XPATHs that were meant only for xml input can be
-            ignored for output.</entry>
+            output, so that XPATHs that were meant only for xml or json input
+            can be ignored for output.</entry>
           </row>
           </row>
 
 
           <row>
           <row>
@@ -197,6 +211,28 @@
           </row>
           </row>
 
 
           <row>
           <row>
+            <entry><emphasis role="bold">JSON</emphasis></entry>
+
+            <entry>Specifies the file is output as JSON data with the name of
+            each field in the format becoming the JSON tag for that field's
+            data.</entry>
+          </row>
+
+          <row>
+            <entry><emphasis>jsonoptions</emphasis></entry>
+
+            <entry>Optional. A comma separated list of options that define how
+            the output JSON file is delimited.</entry>
+          </row>
+
+          <row>
+            <entry><emphasis>jsonfileoptions</emphasis></entry>
+
+            <entry>Optional. A comma-delimited list of options valid for an
+            JSON file (see the section below for details).</entry>
+          </row>
+
+          <row>
             <entry><emphasis role="bold">PIPE</emphasis></entry>
             <entry><emphasis role="bold">PIPE</emphasis></entry>
 
 
             <entry>Indicates the specified command executes with the
             <entry>Indicates the specified command executes with the
@@ -923,6 +959,223 @@ OUTPUT(B,,'fred3.xml',XML('MyRow',TRIM,OPT));
 </programlisting>
 </programlisting>
   </sect2>
   </sect2>
 
 
+  <sect2 id="OUTPUT_XML_Files">
+    <title>OUTPUT JSON Files<indexterm>
+        <primary>JSON Files</primary>
+      </indexterm><indexterm>
+        <primary>OUTPUT - JSON Files</primary>
+      </indexterm></title>
+
+    <para><emphasis role="bold">[</emphasis><emphasis>attr</emphasis>
+    :=<emphasis role="bold"> ] OUTPUT<indexterm>
+        <primary>OUTPUT</primary>
+      </indexterm>(</emphasis><emphasis>recordset</emphasis><emphasis
+    role="bold">, [</emphasis><emphasis> format </emphasis><emphasis
+    role="bold">]</emphasis><emphasis> ,file </emphasis><emphasis role="bold">
+    </emphasis><emphasis>,</emphasis><emphasis role="bold">JSON<indexterm>
+        <primary>JSON</primary>
+      </indexterm> [ (</emphasis><emphasis>jsonoptions</emphasis><emphasis
+    role="bold">) ]</emphasis><emphasis role="bold"> [,ENCRYPT<indexterm>
+        <primary>ENCRYPT</primary>
+      </indexterm>(</emphasis><emphasis> key </emphasis><emphasis
+    role="bold">) ] [, CLUSTER<indexterm>
+        <primary>CLUSTER</primary>
+      </indexterm>(</emphasis><emphasis> target </emphasis><emphasis
+    role="bold">) ] [</emphasis><emphasis>,</emphasis><emphasis role="bold">
+    OVERWRITE<indexterm>
+        <primary>OVERWRITE</primary>
+      </indexterm> ]</emphasis><emphasis role="bold"><emphasis role="bold">[,
+    UPDATE<indexterm>
+        <primary>UPDATE</primary>
+      </indexterm>]</emphasis> [</emphasis><emphasis>,</emphasis><emphasis
+    role="bold"> EXPIRE<indexterm>
+        <primary>EXPIRE</primary>
+      </indexterm>( [ </emphasis><emphasis>days </emphasis><emphasis
+    role="bold">] ) ] )</emphasis></para>
+
+    <para><informaltable colsep="1" frame="all" rowsep="1">
+        <tgroup cols="2">
+          <colspec colwidth="93.80pt" />
+
+          <colspec />
+
+          <tbody>
+            <row>
+              <entry><emphasis role="bold">CLUSTER</emphasis></entry>
+
+              <entry>Optional. Specifies writing the file to the specified
+              list of target clusters. If omitted, the file is written to the
+              cluster on which the workunit executes. The number of physical
+              file parts written to disk is always determined by the number of
+              nodes in the cluster on which the workunit executes, regardless
+              of the number of nodes on the target cluster(s).</entry>
+            </row>
+
+            <row>
+              <entry><emphasis>target</emphasis></entry>
+
+              <entry>A comma-delimited list of string constants containing the
+              names of the clusters to write the file to. The names must be
+              listed as they appear on the ECL Watch Activity page or returned
+              by the Std.System.Thorlib.Group() function, optionally with
+              square brackets containing a comma-delimited list of
+              node-numbers (1-based) and/or ranges (specified with a dash, as
+              in n-m) to indicate the specific set of nodes to write
+              to.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">ENCRYPT</emphasis></entry>
+
+              <entry>Optional. Specifies writing the file to disk using both
+              256-bit AES encryption and LZW compression.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis>key</emphasis></entry>
+
+              <entry>A string constant containing the encryption key to use to
+              encrypt the data.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">OVERWRITE</emphasis></entry>
+
+              <entry>Optional. Specifies overwriting the file if it already
+              exists.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">UPDATE</emphasis></entry>
+
+              <entry>Specifies that the file should be rewritten only if the
+              code or input data has changed.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">EXPIRE</emphasis></entry>
+
+              <entry>Optional. Specifies the file is a temporary file that may
+              be automatically deleted after the specified number of
+              days.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis>days</emphasis></entry>
+
+              <entry>Optional. The number of days after which the file may be
+              automatically deleted. If omitted, the default is seven
+              (7).</entry>
+            </row>
+          </tbody>
+        </tgroup>
+      </informaltable></para>
+
+    <para>This form writes the <emphasis>recordset</emphasis> to the specified
+    <emphasis>file</emphasis> as JSON data with the name of each field in the
+    specified <emphasis>format</emphasis> becoming the JSON tag for that
+    field's data. The valid set of <emphasis>jsonoptions</emphasis>
+    are:</para>
+
+    <para><emphasis
+    role="bold">‘</emphasis><emphasis>rowtag</emphasis><emphasis
+    role="bold">'</emphasis></para>
+
+    <para><emphasis role="bold">HEADING<indexterm>
+        <primary>HEADING</primary>
+      </indexterm>( </emphasis><emphasis>headertext </emphasis><emphasis
+    role="bold">[</emphasis><emphasis>, footertext </emphasis><emphasis
+    role="bold">] )</emphasis></para>
+
+    <para><emphasis role="bold">TRIM<indexterm>
+        <primary>TRIM</primary>
+      </indexterm></emphasis><emphasis role="bold"> </emphasis></para>
+
+    <para><emphasis role="bold">OPT<indexterm>
+        <primary>OPT</primary>
+      </indexterm><indexterm>
+        <primary>TRIM OPT</primary>
+      </indexterm></emphasis></para>
+
+    <para><informaltable colsep="1" frame="all" rowsep="1">
+        <tgroup cols="2">
+          <colspec colwidth="84.45pt" />
+
+          <colspec />
+
+          <tbody>
+            <row>
+              <entry><emphasis>rowtag</emphasis></entry>
+
+              <entry>The text to place in record delimiting tag.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">HEADING</emphasis></entry>
+
+              <entry>Specifies placing header and footer records in the
+              file.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis>headertext</emphasis></entry>
+
+              <entry>The text of the header record to place in the
+              file.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis>footertext</emphasis></entry>
+
+              <entry>The text of the footer record to place in the
+              file.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">TRIM</emphasis></entry>
+
+              <entry>Specifies removing trailing blanks from string fields
+              before output.</entry>
+            </row>
+
+            <row>
+              <entry><emphasis role="bold">OPT</emphasis></entry>
+
+              <entry>Specifies omitting tags for any empty string field from
+              the output.</entry>
+            </row>
+          </tbody>
+        </tgroup>
+      </informaltable></para>
+
+    <para>If no <emphasis>jsonoptions</emphasis> are specified, the defaults
+    are:</para>
+
+    <programlisting>         JSON('Row',HEADING('[',']'))</programlisting>
+
+    <para>Example:</para>
+
+    <programlisting>R := {STRING10 fname,STRING12 lname};
+B := DATASET([{'Fred','Bell'},{'George','Blanda'},{'Sam',''}],R);
+
+OUTPUT(B,,'fred1.json', JSON); // writes B to the fred1.json file
+/* the Fred1.json file looks like this:
+{"Row": [
+{"fname": "Fred      ", "lname": "Bell        "},
+{"fname": "George    ", "lname": "Blanda      "},
+{"fname": "Sam       ", "lname": "            "}
+]}
+*/
+OUTPUT(B,,'fred2.json',JSON('MyResult', HEADING('[', ']')));
+/* the Fred2.json file looks like this:
+["MyResult": [
+{"fname": "Fred      ", "lname": "Bell        "},
+{"fname": "George    ", "lname": "Blanda      "},
+{"fname": "Sam       ", "lname": "            "}
+]]
+</programlisting>
+  </sect2>
+
   <sect2 id="OUTPUT_PIPE_Files">
   <sect2 id="OUTPUT_PIPE_Files">
     <title>OUTPUT PIPE Files<indexterm>
     <title>OUTPUT PIPE Files<indexterm>
         <primary>OUTPUT Pipe Files</primary>
         <primary>OUTPUT Pipe Files</primary>

+ 19 - 0
ecl/hql/hqlexpr.cpp

@@ -15675,6 +15675,25 @@ ITypeInfo * createRecordType(IHqlExpression * record)
 }
 }
 
 
 
 
+IHqlExpression * queryFunctionAttribute(IHqlExpression * funcdef, IAtom * name)
+{
+    dbgassertex(funcdef->getOperator() == no_funcdef);
+    IHqlExpression * body = funcdef->queryChild(0);
+    switch (body->getOperator())
+    {
+    case no_external:
+        return body->queryAttribute(name);
+    case no_outofline:
+        {
+            IHqlExpression * embed = body->queryChild(0);
+            if (embed->getOperator() == no_embedbody)
+                return embed->queryAttribute(name);
+        }
+        break;
+    }
+    return NULL;
+}
+
 ITypeInfo * getSumAggType(ITypeInfo * argType)
 ITypeInfo * getSumAggType(ITypeInfo * argType)
 {
 {
     type_t tc = argType->getTypeCode();
     type_t tc = argType->getTypeCode();

+ 1 - 0
ecl/hql/hqlexpr.hpp

@@ -1624,6 +1624,7 @@ extern HQL_API IIdAtom * queryPatternName(IHqlExpression * expr);
 extern HQL_API IHqlExpression * closeAndLink(IHqlExpression * expr);
 extern HQL_API IHqlExpression * closeAndLink(IHqlExpression * expr);
 extern HQL_API IHqlExpression * createAbstractRecord(IHqlExpression * record);
 extern HQL_API IHqlExpression * createAbstractRecord(IHqlExpression * record);
 extern HQL_API IHqlExpression * createSortList(HqlExprArray & elements);
 extern HQL_API IHqlExpression * createSortList(HqlExprArray & elements);
+extern HQL_API IHqlExpression * queryFunctionAttribute(IHqlExpression * funcdef, IAtom * name);
 
 
 // Same as expr->queryChild() except it doesn't return attributes.
 // Same as expr->queryChild() except it doesn't return attributes.
 inline IHqlExpression * queryRealChild(IHqlExpression * expr, unsigned i)
 inline IHqlExpression * queryRealChild(IHqlExpression * expr, unsigned i)

+ 15 - 0
ecl/hql/hqlgram.y

@@ -160,6 +160,7 @@ static void eclsyntaxerror(HqlGram * parser, const char * s, short yystate, int
   COUNTER
   COUNTER
   COVARIANCE
   COVARIANCE
   CPPBODY
   CPPBODY
+  TOK_CPP
   CRC
   CRC
   CRON
   CRON
   CSV
   CSV
@@ -1065,6 +1066,12 @@ embedBody
                             else
                             else
                                 $$.setExpr(parser->processEmbedBody($2, embedText, language, NULL), $1);
                                 $$.setExpr(parser->processEmbedBody($2, embedText, language, NULL), $1);
                         }
                         }
+    | embedCppPrefix CPPBODY
+                        {
+                            OwnedHqlExpr attrs = $1.getExpr();
+                            OwnedHqlExpr embedText = $2.getExpr();
+                            $$.setExpr(parser->processEmbedBody($2, embedText, NULL, attrs), $1);
+                        }
     | EMBED '(' abstractModule ',' expression ')'
     | EMBED '(' abstractModule ',' expression ')'
                         {
                         {
                             parser->normalizeExpression($5, type_stringorunicode, true);
                             parser->normalizeExpression($5, type_stringorunicode, true);
@@ -1091,6 +1098,14 @@ embedPrefix
                         }
                         }
     ;
     ;
 
 
+embedCppPrefix
+    : EMBED '(' TOK_CPP attribs ')'
+                        {
+                            parser->getLexer()->enterEmbeddedMode();
+                            $$.setExpr($4.getExpr(), $1);
+                        }
+    ;
+
 compoundAttribute
 compoundAttribute
     : startCompoundAttribute optDefinitions returnAction ';' END
     : startCompoundAttribute optDefinitions returnAction ';' END
                         {
                         {

+ 1 - 0
ecl/hql/hqlgram2.cpp

@@ -10441,6 +10441,7 @@ static void getTokenText(StringBuffer & msg, int token)
     case COUNTER: msg.append("COUNTER"); break;
     case COUNTER: msg.append("COUNTER"); break;
     case COVARIANCE: msg.append("COVARIANCE"); break;
     case COVARIANCE: msg.append("COVARIANCE"); break;
     case CPPBODY: msg.append("BEGINC++"); break;
     case CPPBODY: msg.append("BEGINC++"); break;
+    case TOK_CPP: msg.append("C++"); break;
     case CRC: msg.append("HASHCRC"); break;
     case CRC: msg.append("HASHCRC"); break;
     case CRON: msg.append("CRON"); break;
     case CRON: msg.append("CRON"); break;
     case CSV: msg.append("CSV"); break;
     case CSV: msg.append("CSV"); break;

+ 1 - 0
ecl/hql/hqllex.l

@@ -632,6 +632,7 @@ BLOB                { RETURNSYM(BLOB); }
 BNOT                { RETURNSYM(BNOT); }
 BNOT                { RETURNSYM(BNOT); }
 BUILD               { RETURNSYM(BUILD); }
 BUILD               { RETURNSYM(BUILD); }
 BUILDINDEX          { RETURNSYM(BUILD); }
 BUILDINDEX          { RETURNSYM(BUILD); }
+"C++"               { RETURNSYM(TOK_CPP); }
 CARDINALITY         { RETURNSYM(CARDINALITY); }
 CARDINALITY         { RETURNSYM(CARDINALITY); }
 CASE                { RETURNSYM(CASE); }
 CASE                { RETURNSYM(CASE); }
 CATCH               { RETURNSYM(TOK_CATCH); }
 CATCH               { RETURNSYM(TOK_CATCH); }

+ 3 - 0
ecl/hqlcpp/hqlcppds.cpp

@@ -4413,6 +4413,9 @@ void HqlCppTranslator::buildRowAssign(BuildCtx & ctx, IReferenceSelector * targe
     case no_null:
     case no_null:
         doBuildRowAssignNullRow(ctx, target, expr);
         doBuildRowAssignNullRow(ctx, target, expr);
         return;
         return;
+    case no_nofold:
+        buildRowAssign(ctx, target, expr->queryChild(0));
+        return;
     case no_serialize:
     case no_serialize:
         {
         {
             IHqlExpression * deserialized = expr->queryChild(0);
             IHqlExpression * deserialized = expr->queryChild(0);

+ 7 - 0
ecl/hqlcpp/hqlcset.cpp

@@ -1895,6 +1895,13 @@ void LinkedDatasetBuilderBase::buildFinish(BuildCtx & ctx, CHqlBoundExpr & bound
     bound.count.setown(createQuoted(s.str(), LINK(unsignedType)));
     bound.count.setown(createQuoted(s.str(), LINK(unsignedType)));
     s.clear().append(instanceName).append(".queryrows()");
     s.clear().append(instanceName).append(".queryrows()");
     bound.expr.setown(createQuoted(s.str(), makeReferenceModifier(dataset->getType())));
     bound.expr.setown(createQuoted(s.str(), makeReferenceModifier(dataset->getType())));
+    if (!ctx.isOuterContext() && ctx.queryMatchExpr(queryConditionalRowMarker()))
+    {
+        //If processing a conditional row, create a dataset object (at the outer-most) level
+        //and assign to that, to ensure the dataset doesn't go out of scope.
+        OwnedHqlExpr translated = bound.getTranslatedExpr();
+        translator.buildTempExpr(ctx, translated, bound);
+    }
 }
 }
 
 
 bool LinkedDatasetBuilderBase::buildLinkRow(BuildCtx & ctx, BoundRow * sourceRow)
 bool LinkedDatasetBuilderBase::buildLinkRow(BuildCtx & ctx, BoundRow * sourceRow)

+ 28 - 3
ecl/hqlcpp/hqlhtcpp.cpp

@@ -13632,15 +13632,31 @@ ABoundActivity * HqlCppTranslator::doBuildActivityAggregate(BuildCtx & ctx, IHql
 
 
 //---------------------------------------------------------------------------
 //---------------------------------------------------------------------------
 
 
+static bool isDistributedFunctionCall(IHqlExpression * expr)
+{
+    IHqlExpression * funcdef = NULL;
+    switch (expr->getOperator())
+    {
+    case no_externalcall:
+        funcdef = expr->queryBody()->queryExternalDefinition();
+        break;
+    case no_call:
+        funcdef = expr->queryBody()->queryFunctionDefinition();
+        break;
+    }
+    return (funcdef && queryFunctionAttribute(funcdef, distributedAtom));
+}
+
 ABoundActivity * HqlCppTranslator::doBuildActivityChildDataset(BuildCtx & ctx, IHqlExpression * expr)
 ABoundActivity * HqlCppTranslator::doBuildActivityChildDataset(BuildCtx & ctx, IHqlExpression * expr)
 {
 {
     if (options.mainRowsAreLinkCounted || isGrouped(expr))
     if (options.mainRowsAreLinkCounted || isGrouped(expr))
         return doBuildActivityLinkedRawChildDataset(ctx, expr);
         return doBuildActivityLinkedRawChildDataset(ctx, expr);
 
 
-
     StringBuffer s;
     StringBuffer s;
 
 
     Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKchilditerator, expr, "ChildIterator");
     Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKchilditerator, expr, "ChildIterator");
+    if (isDistributedFunctionCall(expr))
+        instance->setLocal(true);
     buildActivityFramework(instance);
     buildActivityFramework(instance);
 
 
     buildInstancePrefix(instance);
     buildInstancePrefix(instance);
@@ -13694,6 +13710,8 @@ ABoundActivity * HqlCppTranslator::doBuildActivityStreamedCall(BuildCtx & ctx, I
     StringBuffer s;
     StringBuffer s;
 
 
     Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKstreamediterator, expr, "StreamedIterator");
     Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKstreamediterator, expr, "StreamedIterator");
+    if (isDistributedFunctionCall(expr))
+        instance->setLocal(true);
     buildActivityFramework(instance);
     buildActivityFramework(instance);
 
 
     buildInstancePrefix(instance);
     buildInstancePrefix(instance);
@@ -13713,6 +13731,8 @@ ABoundActivity * HqlCppTranslator::doBuildActivityLinkedRawChildDataset(BuildCtx
     StringBuffer s;
     StringBuffer s;
 
 
     Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKlinkedrawiterator, expr, "LinkedRawIterator");
     Owned<ActivityInstance> instance = new ActivityInstance(*this, ctx, TAKlinkedrawiterator, expr, "LinkedRawIterator");
+    if (isDistributedFunctionCall(expr))
+        instance->setLocal(true);
     buildActivityFramework(instance);
     buildActivityFramework(instance);
 
 
     buildInstancePrefix(instance);
     buildInstancePrefix(instance);
@@ -18649,8 +18669,6 @@ static bool needsRealThor(IHqlExpression *expr, unsigned flags)
     case no_nohoist:
     case no_nohoist:
     case no_actionlist:
     case no_actionlist:
     case no_orderedactionlist:
     case no_orderedactionlist:
-    case no_externalcall:
-    case no_call:
     case no_compound_fetch:
     case no_compound_fetch:
     case no_addfiles:
     case no_addfiles:
     case no_nonempty:
     case no_nonempty:
@@ -18705,6 +18723,13 @@ static bool needsRealThor(IHqlExpression *expr, unsigned flags)
     case no_extractresult:
     case no_extractresult:
         return needsRealThor(expr->queryChild(0), flags);
         return needsRealThor(expr->queryChild(0), flags);
 
 
+    case no_call:
+    case no_externalcall:
+        if (isDistributedFunctionCall(expr))
+            return true;
+        //MORE: check for streamed inputs.
+        break;
+
     case no_fetch:
     case no_fetch:
         return needsRealThor(expr->queryChild(1), flags);
         return needsRealThor(expr->queryChild(1), flags);
 
 

+ 25 - 0
ecl/hqlcpp/hqlstmt.cpp

@@ -628,6 +628,31 @@ bool BuildCtx::hasAssociation(HqlExprAssociation & search, bool unconditional)
 }
 }
 
 
 
 
+bool BuildCtx::isOuterContext() const
+{
+    HqlStmts * searchStmts = curStmts;
+    loop
+    {
+        HqlStmt * owner = searchStmts->owner;
+        if (!owner)
+            return true;
+
+        switch (owner->getStmt())
+        {
+        case quote_compound_stmt:
+        case quote_compoundopt_stmt:
+        case indirect_stmt:
+            return true;
+        case group_stmt:
+            break;
+        default:
+            return false;
+        }
+
+        searchStmts = owner->queryContainer();
+    }
+}
+
 HqlExprAssociation * BuildCtx::queryAssociation(IHqlExpression * search, AssocKind kind, HqlExprCopyArray * selectors)
 HqlExprAssociation * BuildCtx::queryAssociation(IHqlExpression * search, AssocKind kind, HqlExprCopyArray * selectors)
 {
 {
     HqlStmts * searchStmts = curStmts;
     HqlStmts * searchStmts = curStmts;

+ 1 - 0
ecl/hqlcpp/hqlstmt.hpp

@@ -136,6 +136,7 @@ public:
     HqlExprAssociation *        queryMatchExpr(IHqlExpression * expr);
     HqlExprAssociation *        queryMatchExpr(IHqlExpression * expr);
     bool                        getMatchExpr(IHqlExpression * expr, CHqlBoundExpr & bound);
     bool                        getMatchExpr(IHqlExpression * expr, CHqlBoundExpr & bound);
     IHqlExpression *            getTempDeclare(ITypeInfo * type, IHqlExpression * value);
     IHqlExpression *            getTempDeclare(ITypeInfo * type, IHqlExpression * value);
+    bool                        isOuterContext() const;
     void                        needFunction(IFunctionInfo & helper);
     void                        needFunction(IFunctionInfo & helper);
     void                        needFunction(IAtom * name);
     void                        needFunction(IAtom * name);
     void                        removeAssociation(HqlExprAssociation * search);
     void                        removeAssociation(HqlExprAssociation * search);

+ 134 - 0
ecl/regress/issue12921.ecl

@@ -0,0 +1,134 @@
+#option ('spanMultipleCpp', false);
+
+outRecord := RECORD
+    STRING2  x;
+    STRING10 name;
+    STRING1  term;
+    STRING2  nl;
+END;
+
+myService := SERVICE
+    streamed dataset(outRecord) testRead(const varstring name) : distributed;
+    testWrite(streamed dataset(outRecord) out);
+    testWrite3(streamed dataset(outRecord) out1, streamed dataset(outRecord) out2, streamed dataset(outRecord) out3);
+END;
+
+streamed dataset(outRecord) doRead(const varstring name) := EMBED(C++ : distributed)
+    #include "platform.h"
+    #include "jiface.hpp"
+    #include "jfile.hpp"
+    #include "jstring.hpp"
+
+    class StreamReader : public CInterfaceOf<IRowStream>
+    {
+    public:
+        FileReader(ICodeContext * _ctx, IEngineRowAllocator * _resultAllocator) : resultAllocator(_resultAllocator)
+        {
+            deserializer.setown(resultAllocator->createDiskDeserializer(_ctx));
+        }
+
+        virtual const void * nextRow()
+        {
+            if (!source || source->isEof())
+                return NULL;
+
+            RtlDynamicRowBuilder builder(resultAllocator);
+            size32_t size = deserializer->deserialize(builder, *source);
+            return builder.finalizeRowClear(size);
+        }
+
+        virtual void stop()
+        {
+        }
+
+    private:
+        Linked<IOutputRowDeserializer> deserializer;
+        Linked<IEngineRowAllocator> resultAllocator;
+        Linked<IRowDeserializerSource> source;
+    };
+
+    class FileReader : public CInterfaceOf<IRowStream>
+    {
+    public:
+        FileReader(ICodeContext * _ctx, IEngineRowAllocator * _resultAllocator, IFileIO * _in) : StreamReader(_ctx, _resultAllocator)
+        {
+            source.setown(createSeralizerSource(in));
+        }
+
+    private:
+        Linked<IFileIO> in;
+    };
+
+    #body
+    unsigned numParts = ctx->getNodes();
+    unsigned whichPart = ctx->getNodeNum();
+    StringBuffer filename;
+    filename.append(name).append(".").append(whichPart).append("_of_").append(numParts);
+    Owned<IFile> out = createIFile(filename);
+    Owned<IFileIO> io = out->open(IFOread);
+    return new FileReader(ctx, _resultAllocator, io);
+ENDEMBED;
+
+
+
+doWrite(streamed dataset ds, const varstring name) := BEGINC++
+    #include "platform.h"
+    #include "jiface.hpp"
+    #include "jfile.hpp"
+    #include "jstring.hpp"
+
+    #body
+    unsigned numParts = ctx->getNodes();
+    unsigned whichPart = ctx->getNodeNum();
+    StringBuffer filename;
+    filename.append(name).append(".").append(whichPart).append("_of_").append(numParts);
+    Owned<IFile> out = createIFile(filename);
+    Owned<IFileIO> io = out->open(IFOcreate);
+    //create a buffered io stream
+    //create a serializer
+    for(;;)
+    {
+        const void * next = ds->nextRow();
+        if (!next)
+        {
+            next = ds->nextRow();
+            if (!next)
+                break;
+        }
+        //serialize row... to buffered stream
+        rtlReleaseRow(next);
+    }
+ENDC++;
+
+doRead('C:\\temp\\simple');
+
+ds := DATASET(20, TRANSFORM(outRecord, SELF.name := (string)HASH32(counter); SELF.x := (string2)COUNTER; SELF.nl := '\r\n'; SELF.term := '!'));
+
+sds := SORT(NOFOLD(ds), name);
+
+doWrite(sds, 'C:\\temp\\simple2');
+
+
+allNodesDs := DATASET(1, TRANSFORM({ unsigned id }, SELF.id := 0), LOCAL);
+streamedDs := NORMALIZE(allNodesDs, doRead('C:\\temp\\simple2'), TRANSFORM(RIGHT));
+output(streamedDs);
+
+/*
+Problems
+- no way to specify a function that returns a dataset with a user supplied format
+  * We could probably use macros to solve the problems for external services.
+- code for streaming output is poor.
+  * Need to introduce a new user-output activity
+- no way to cordinate between instances on different nodes.
+  * Needs more thought.  Might be required if input dataset required partitioning
+- no way that a dataset can be specified as local/executed on all
+  * Probably want a new syntax.  DATASET(function, LOCAL/DISTRIBUTED??).
+- no way to represent a filtered join against an external dataset
+  * A prefetch project almost provides what you need.  We should introduce a new syntax that allows
+    joins against datasources where the filter is pushed to the source.  There are other situtions where
+    this might help - e.g., remote filtering when reading from other thors, filtering on the disk controller etc.
+*/
+
+myService.testWrite(sds);
+myService.testWrite3(sds, sds, sds(name != 'gavin'));
+output(myService.testRead('x'));

+ 21 - 13
esp/src/eclwatch/LFDetailsWidget.js

@@ -98,6 +98,27 @@ define([
             this.dfuWorkunitWidget = registry.byId(this.id + "_DFUWorkunit");
             this.dfuWorkunitWidget = registry.byId(this.id + "_DFUWorkunit");
             this.copyTargetSelect = registry.byId(this.id + "CopyTargetSelect");
             this.copyTargetSelect = registry.byId(this.id + "CopyTargetSelect");
             this.desprayTargetSelect = registry.byId(this.id + "DesprayTargetSelect");
             this.desprayTargetSelect = registry.byId(this.id + "DesprayTargetSelect");
+            this.desprayTooltiopDialog = registry.byId(this.id + "DesprayTooltipDialog");
+            var context = this;
+            var origOnOpen = this.desprayTooltiopDialog.onOpen;
+            this.desprayTooltiopDialog.onOpen = function () {
+                if (!context.desprayTargetSelect.initalized) {
+                    context.desprayTargetSelect.init({
+                        DropZones: true,
+                        callback: function (value, item) {
+                            context.updateInput("DesprayTargetIPAddress", null, item.machine.Netaddress);
+                            context.updateInput("DesprayTargetName", null, context.logicalFile.getLeaf());
+                            if (context.desprayTargetPath) {
+                                context.desprayTargetPath.reset();
+                                context.desprayTargetPath._dropZoneTarget = item;
+                                context.desprayTargetPath.defaultValue = context.desprayTargetPath.get("value");
+                                context.desprayTargetPath.loadDropZoneFolders();
+                            }
+                        }
+                    });
+                }
+                origOnOpen.apply(context.desprayTooltiopDialog, arguments);
+            }
             this.desprayTargetPath = registry.byId(this.id + "DesprayTargetPath");
             this.desprayTargetPath = registry.byId(this.id + "DesprayTargetPath");
             this.fileBelongsToWidget = registry.byId(this.id + "_FileBelongs");
             this.fileBelongsToWidget = registry.byId(this.id + "_FileBelongs");
         },
         },
@@ -192,19 +213,6 @@ define([
             this.copyTargetSelect.init({
             this.copyTargetSelect.init({
                 Groups: true
                 Groups: true
             });
             });
-            this.desprayTargetSelect.init({
-                DropZones: true,
-                callback: function (value, item) {
-                    context.updateInput("DesprayTargetIPAddress", null, item.machine.Netaddress);
-                    context.updateInput("DesprayTargetName", null, context.logicalFile.getLeaf());
-                    if (context.desprayTargetPath) {
-                        context.desprayTargetPath.reset();
-                        context.desprayTargetPath._dropZoneTarget = item;
-                        context.desprayTargetPath.defaultValue = context.desprayTargetPath.get("value");
-                        context.desprayTargetPath.loadDropZoneFolders();
-                    }
-                }
-            });
             this.desprayTargetPath.init({
             this.desprayTargetPath.init({
                 DropZoneFolders: true
                 DropZoneFolders: true
             });
             });

+ 2 - 1
esp/src/eclwatch/TargetSelectWidget.js

@@ -157,7 +157,8 @@ define([
                     Netaddr: Netaddr,
                     Netaddr: Netaddr,
                     Path: Path,
                     Path: Path,
                     OS: OS
                     OS: OS
-                }
+                },
+                suppressExceptionToaster: true
             }).then(function (response) {
             }).then(function (response) {
                 var requests = [];
                 var requests = [];
                 if (lang.exists("FileListResponse.files.PhysicalFileStruct", response)) {
                 if (lang.exists("FileListResponse.files.PhysicalFileStruct", response)) {

+ 1 - 1
esp/src/eclwatch/templates/LFDetailsWidget.html

@@ -60,7 +60,7 @@
                     </div>
                     </div>
                     <div id="${id}DesprayDropDown" data-dojo-type="dijit.form.DropDownButton">
                     <div id="${id}DesprayDropDown" data-dojo-type="dijit.form.DropDownButton">
                         <span>${i18n.Despray}</span>
                         <span>${i18n.Despray}</span>
-                        <div data-dojo-type="dijit.TooltipDialog">
+                        <div id="${id}DesprayTooltipDialog" data-dojo-type="dijit.TooltipDialog">
                             <div id="${id}DesprayForm" style="width: 460px;" onsubmit="return false;" data-dojo-type="dijit.form.Form">
                             <div id="${id}DesprayForm" style="width: 460px;" onsubmit="return false;" data-dojo-type="dijit.form.Form">
                                 <div data-dojo-type="dijit.Fieldset">
                                 <div data-dojo-type="dijit.Fieldset">
                                     <legend>${i18n.Target}</legend>
                                     <legend>${i18n.Target}</legend>

+ 7 - 0
roxie/roxiemem/roxiemem.cpp

@@ -27,6 +27,7 @@
 
 
 #ifdef _DEBUG
 #ifdef _DEBUG
 #define _CLEAR_ALLOCATED_ROW
 #define _CLEAR_ALLOCATED_ROW
+#define _CLEAR_FREED_ROW
 //#define _CLEAR_ALLOCATED_HUGE_ROW
 //#define _CLEAR_ALLOCATED_HUGE_ROW
 #endif
 #endif
 
 
@@ -1324,6 +1325,9 @@ public:
                 allocatorCache->onDestroy(id & MAX_ACTIVITY_ID, ptr + chunkHeaderSize);
                 allocatorCache->onDestroy(id & MAX_ACTIVITY_ID, ptr + chunkHeaderSize);
             }
             }
 
 
+#ifdef _CLEAR_FREED_ROW
+            memset((void *)_ptr, 0xdd, chunkCapacity);
+#endif
             inlineReleasePointer(ptr);
             inlineReleasePointer(ptr);
         }
         }
     }
     }
@@ -1556,6 +1560,9 @@ public:
             if (rowCount & ROWCOUNT_DESTRUCTOR_FLAG)
             if (rowCount & ROWCOUNT_DESTRUCTOR_FLAG)
                 allocatorCache->onDestroy(sharedAllocatorId & MAX_ACTIVITY_ID, ptr + chunkHeaderSize);
                 allocatorCache->onDestroy(sharedAllocatorId & MAX_ACTIVITY_ID, ptr + chunkHeaderSize);
 
 
+#ifdef _CLEAR_FREED_ROW
+            memset((void *)_ptr, 0xdd, chunkCapacity);
+#endif
             inlineReleasePointer(ptr);
             inlineReleasePointer(ptr);
         }
         }
     }
     }

+ 19 - 12
system/jlib/jsocket.cpp

@@ -1200,6 +1200,7 @@ bool CSocket::connect_timeout( unsigned timeout, bool noexception)
         if ((err == EINPROGRESS)||(err == EWOULDBLOCK)) {
         if ((err == EINPROGRESS)||(err == EWOULDBLOCK)) {
             T_FD_SET fds;
             T_FD_SET fds;
             struct timeval tv;
             struct timeval tv;
+            CHECKSOCKRANGE(sock);
             XFD_ZERO(&fds);
             XFD_ZERO(&fds);
             FD_SET((unsigned)sock, &fds);
             FD_SET((unsigned)sock, &fds);
             T_FD_SET except;
             T_FD_SET except;
@@ -1207,7 +1208,6 @@ bool CSocket::connect_timeout( unsigned timeout, bool noexception)
             FD_SET((unsigned)sock, &except);
             FD_SET((unsigned)sock, &except);
             tv.tv_sec = remaining / 1000;
             tv.tv_sec = remaining / 1000;
             tv.tv_usec = (remaining % 1000)*1000;
             tv.tv_usec = (remaining % 1000)*1000;
-            CHECKSOCKRANGE(sock);
             int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
             int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
             if (rc>0) {
             if (rc>0) {
                 // select succeeded - return error from socket (0 if connected)
                 // select succeeded - return error from socket (0 if connected)
@@ -1306,6 +1306,7 @@ void CSocket::connect_wait(unsigned timems)
             while (!blockselect && ((err == EINPROGRESS)||(err == EWOULDBLOCK))) {
             while (!blockselect && ((err == EINPROGRESS)||(err == EWOULDBLOCK))) {
                 T_FD_SET fds;
                 T_FD_SET fds;
                 struct timeval tv;
                 struct timeval tv;
+                CHECKSOCKRANGE(sock);
                 XFD_ZERO(&fds);
                 XFD_ZERO(&fds);
                 FD_SET((unsigned)sock, &fds);
                 FD_SET((unsigned)sock, &fds);
                 T_FD_SET except;
                 T_FD_SET except;
@@ -1318,7 +1319,6 @@ void CSocket::connect_wait(unsigned timems)
                 tv.tv_sec = 0;
                 tv.tv_sec = 0;
                 tv.tv_usec = 0;
                 tv.tv_usec = 0;
     #endif
     #endif
-                CHECKSOCKRANGE(sock);
                 int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
                 int rc = ::select( sock + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
                 if (rc>0) {
                 if (rc>0) {
                     // select succeeded - return error from socket (0 if connected)
                     // select succeeded - return error from socket (0 if connected)
@@ -1441,9 +1441,9 @@ int CSocket::wait_read(unsigned timeout)
     int ret = 0;
     int ret = 0;
     while (sock!=INVALID_SOCKET) {
     while (sock!=INVALID_SOCKET) {
         T_FD_SET fds;
         T_FD_SET fds;
+        CHECKSOCKRANGE(sock);
         XFD_ZERO(&fds);
         XFD_ZERO(&fds);
         FD_SET((unsigned)sock, &fds);
         FD_SET((unsigned)sock, &fds);
-        CHECKSOCKRANGE(sock);
         if (timeout==WAIT_FOREVER) {
         if (timeout==WAIT_FOREVER) {
             ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, NULL );
             ret = ::select( sock + 1, (fd_set *)&fds, NULL, NULL, NULL );
         }
         }
@@ -1471,9 +1471,9 @@ int CSocket::wait_write(unsigned timeout)
     int ret = 0;
     int ret = 0;
     while (sock!=INVALID_SOCKET) {
     while (sock!=INVALID_SOCKET) {
         T_FD_SET fds;
         T_FD_SET fds;
+        CHECKSOCKRANGE(sock);
         XFD_ZERO(&fds);
         XFD_ZERO(&fds);
         FD_SET((unsigned)sock, &fds);
         FD_SET((unsigned)sock, &fds);
-        CHECKSOCKRANGE(sock);
         if (timeout==WAIT_FOREVER) {
         if (timeout==WAIT_FOREVER) {
             ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, NULL );
             ret = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, NULL );
         }
         }
@@ -3690,12 +3690,12 @@ public:
         }
         }
         T_FD_SET fds;
         T_FD_SET fds;
         struct timeval tv;
         struct timeval tv;
+        CHECKSOCKRANGE(sock);
         XFD_ZERO(&fds);
         XFD_ZERO(&fds);
         FD_SET((unsigned)sock, &fds);
         FD_SET((unsigned)sock, &fds);
         //FD_SET((unsigned)sock, &except);
         //FD_SET((unsigned)sock, &except);
         tv.tv_sec = 0;
         tv.tv_sec = 0;
         tv.tv_usec = 0;
         tv.tv_usec = 0;
-        CHECKSOCKRANGE(sock);
         int rc = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv );
         int rc = ::select( sock + 1, NULL, (fd_set *)&fds, NULL, &tv );
         if (rc<0) {
         if (rc<0) {
             StringBuffer sockstr;
             StringBuffer sockstr;
@@ -4742,13 +4742,18 @@ public:
 
 
     void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
     void add(ISocket *sock,unsigned mode,ISocketSelectNotify *nfy)
     {
     {
-        CriticalBlock block(sect);
-        epollthread->add(sock,mode,nfy);
+        CriticalBlock block(sect); // JCS->MK - are these blocks necessary? epollthread->add() uses it's own CS.
+
+        /* JCS->MK, the CSocketSelectHandler variety, checks result of thread->add and spins up another handler
+         * Shouldn't epoll version do the same?
+         */
+        if (!epollthread->add(sock,mode,nfy))
+            throw MakeStringException(-1, "CSocketEpollHandler: failed to add socket to epollthread handler: sock # = %d", sock->OShandle());
     }
     }
 
 
     void remove(ISocket *sock)
     void remove(ISocket *sock)
     {
     {
-        CriticalBlock block(sect);
+        CriticalBlock block(sect); // JCS->MK - are these blocks necessary? epollthread->add() uses it's own CS.
         epollthread->remove(sock);
         epollthread->remove(sock);
     }
     }
 
 
@@ -6003,7 +6008,7 @@ public:
                 if ((err == EINPROGRESS)||(err == EWOULDBLOCK))
                 if ((err == EINPROGRESS)||(err == EWOULDBLOCK))
                     err = 0; // continue
                     err = 0; // continue
                 else {
                 else {
-                    if (err==0) 
+                    if (err==0)
                         connectdone = true; // done immediately
                         connectdone = true; // done immediately
                     else if(!oneshot) //  probably ECONNREFUSED but treat all errors same
                     else if(!oneshot) //  probably ECONNREFUSED but treat all errors same
                         refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound
                         refused_sleep((waitremaining==remaining)?waittm:connecttm,refuseddelay); // this stops becoming cpu bound
@@ -6011,6 +6016,7 @@ public:
             }
             }
             if (!connectdone&&(err==0)) {
             if (!connectdone&&(err==0)) {
                 SOCKET s = sock->sock;
                 SOCKET s = sock->sock;
+                CHECKSOCKRANGE(s);
                 T_FD_SET fds;
                 T_FD_SET fds;
                 struct timeval tv;
                 struct timeval tv;
                 XFD_ZERO(&fds);
                 XFD_ZERO(&fds);
@@ -6020,7 +6026,6 @@ public:
                 FD_SET((unsigned)s, &except);
                 FD_SET((unsigned)s, &except);
                 tv.tv_sec = remaining / 1000;
                 tv.tv_sec = remaining / 1000;
                 tv.tv_usec = (remaining % 1000)*1000;
                 tv.tv_usec = (remaining % 1000)*1000;
-                CHECKSOCKRANGE(s);
                 int rc = ::select( s + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
                 int rc = ::select( s + 1, NULL, (fd_set *)&fds, (fd_set *)&except, &tv );
                 if (rc==0) 
                 if (rc==0) 
                     break; // timeout
                     break; // timeout
@@ -6091,8 +6096,10 @@ int wait_multiple(bool isRead,               //IN   true if wait read, false it
 #ifdef _DEBUG
 #ifdef _DEBUG
         dbgSB.appendf(" %d",socks.item(idx));
         dbgSB.appendf(" %d",socks.item(idx));
 #endif
 #endif
-        maxSocket = socks.item(idx) > maxSocket ? socks.item(idx) : maxSocket;
-        FD_SET((unsigned)socks.item(idx), &fds);
+        SOCKET s = socks.item(idx);
+        CHECKSOCKRANGE(s);
+        maxSocket = s > maxSocket ? s : maxSocket;
+        FD_SET((unsigned)s, &fds);
     }
     }
 #ifdef _DEBUG
 #ifdef _DEBUG
     DBGLOG("%s",dbgSB.str());
     DBGLOG("%s",dbgSB.str());

+ 21 - 12
system/jlib/jthread.cpp

@@ -928,23 +928,32 @@ class CThreadPool: public CThreadPoolBase, implements IThreadPool, public CInter
         PooledThreadHandle ret;
         PooledThreadHandle ret;
         {
         {
             CriticalBlock block(crit);
             CriticalBlock block(crit);
-            if (timedout&&!availsem.wait(0)) {  // make sure take allocated sem if has become available
-                if (noBlock || timeout > 0)
-                    throw MakeStringException(0, "No threads available in pool %s", poolname.get());
-                WARNLOG("Pool limit exceeded for %s", poolname.get());
+            if (timedout)
+            {
+                if (!availsem.wait(0)) {  // make sure take allocated sem if has become available
+                    if (noBlock || timeout > 0)
+                        throw MakeStringException(0, "No threads available in pool %s", poolname.get());
+                    WARNLOG("Pool limit exceeded for %s", poolname.get());
+                }
+                else
+                    timedout = false;
             }
             }
             if (traceStartDelayPeriod)
             if (traceStartDelayPeriod)
             {
             {
                 ++startsInPeriod;
                 ++startsInPeriod;
-                startDelayInPeriod += startTimer.elapsedCycles();
-                if (overAllTimer.elapsedCycles() >= queryOneSecCycles()*traceStartDelayPeriod) // check avg. delay per minute
+                if (timedout)
                 {
                 {
-                    cycle_t avg = startDelayInPeriod/startsInPeriod;
-                    unsigned avgMs = static_cast<unsigned>(cycle_to_nanosec(avg)/1000000);
-                    PROGLOG("%s: %d threads started in last %d seconds, average delay = %d milliseconds", poolname.get(), startsInPeriod, traceStartDelayPeriod, avgMs);
-                    startsInPeriod = 0;
-                    startDelayInPeriod = 0;
-                    overAllTimer.reset();
+                    startDelayInPeriod += startTimer.elapsedCycles();
+                    if (overAllTimer.elapsedCycles() >= queryOneSecCycles()*traceStartDelayPeriod) // check avg. delay per minute
+                    {
+                        double totalDelayMs = (static_cast<double>(cycle_to_nanosec(startDelayInPeriod)))/1000000;
+                        double avgDelayMs = (static_cast<double>(cycle_to_nanosec(startDelayInPeriod/startsInPeriod)))/1000000;
+                        unsigned totalElapsedSecs = overAllTimer.elapsedMs()/1000;
+                        PROGLOG("%s: %u threads started in last %u seconds, total delay = %0.2f milliseconds, average delay = %0.2f milliseconds, currently running = %u", poolname.get(), startsInPeriod, totalElapsedSecs, totalDelayMs, avgDelayMs, runningCount());
+                        startsInPeriod = 0;
+                        startDelayInPeriod = 0;
+                        overAllTimer.reset();
+                    }
                 }
                 }
             }
             }
             CPooledThreadWrapper &t = allocThread();
             CPooledThreadWrapper &t = allocThread();

+ 43 - 0
testing/regress/ecl/issue13590.ecl

@@ -0,0 +1,43 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2015 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+idRec := { STRING id; };
+
+one := '1' : stored('one');
+two := '2' : stored('two');
+three := '3' : stored('three');
+
+ids(STRING base) := NOFOLD(DATASET([one+base,two+base], idRec));
+
+rRec := RECORD
+    STRING x;
+    DATASET(idRec) y;
+END;
+
+ds := DATASET(['92','67','56','23'], idRec);
+
+rRec t(idRec l) := TRANSFORM
+    y1 := DATASET([three], idRec);
+    ids := ids(l.id);
+    y2 := NOFOLD(IF(ids[1].id >= '0', ids[1], ids[2]));
+    y3 := NOFOLD(IF(l.id > '60', y2, y1[1]));
+    SELF.x := l.id;
+    SELF.y := y1 & y3;
+END;
+
+OUTPUT(PROJECT(ds, t(LEFT)));

+ 6 - 0
testing/regress/ecl/key/issue13590.xml

@@ -0,0 +1,6 @@
+<Dataset name='Result 1'>
+ <Row><x>92</x><y><Row><id>3</id></Row><Row><id>192</id></Row></y></Row>
+ <Row><x>67</x><y><Row><id>3</id></Row><Row><id>167</id></Row></y></Row>
+ <Row><x>56</x><y><Row><id>3</id></Row><Row><id>3</id></Row></y></Row>
+ <Row><x>23</x><y><Row><id>3</id></Row><Row><id>3</id></Row></y></Row>
+</Dataset>

+ 9 - 0
testing/regress/ecl/key/streamread.xml

@@ -0,0 +1,9 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>true</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><Result_2>true</Result_2></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>true</Result_3></Row>
+</Dataset>

+ 83 - 0
testing/regress/ecl/streamread.ecl

@@ -0,0 +1,83 @@
+outRecord := RECORD
+    STRING10 name;
+    unsigned1  id;
+END;
+
+streamed dataset(outRecord) doRead(const varstring name) := EMBED(C++ : distributed)
+
+    const char * rows[] = {
+        "Gavin     \x01",
+        "Simon     \x02",
+        "Charlotte \x09",
+        "TheEnd    \x00" };
+
+    class StreamCreator : public IRowStream, public RtlCInterface
+    {
+    public:
+        StreamCreator(ICodeContext * _ctx, IEngineRowAllocator * _resultAllocator) : resultAllocator(_resultAllocator)
+        {
+            idx = 0;
+        }
+        RTLIMPLEMENT_IINTERFACE
+
+        virtual const void * nextRow()
+        {
+            if (idx >= sizeof(rows)/sizeof(*rows))
+                return NULL;
+
+            RtlDynamicRowBuilder builder(resultAllocator);
+            memcpy(builder.getSelf(), rows[idx++], 11);
+            return builder.finalizeRowClear(11);
+        }
+
+        virtual void stop()
+        {
+        }
+
+    private:
+        Linked<IEngineRowAllocator> resultAllocator;
+        unsigned idx;
+    };
+
+    #body
+    return new StreamCreator(ctx, _resultAllocator);
+ENDEMBED;
+
+ds := doRead('C:\\temp\\simple');
+
+count(ds) = CLUSTERSIZE * 4;
+
+
+linkcounted dataset(outRecord) doReadRows(const varstring name) := EMBED(C++ : distributed)
+
+    static const char * rows2[] = {
+        "Gavin     \x01",
+        "Simon     \x02",
+        "Charlotte \x09",
+        "TheEnd    \x00" };
+
+    #body
+    //Can return constant allocations as roxie rows
+    __countResult = 4;
+    __result = (byte * *)rows2;
+ENDEMBED;
+
+
+dsRows := doReadRows('C:\\temp\\simple');
+
+count(dsRows) = CLUSTERSIZE * 4;
+
+dataset(outRecord) doReadBlock(const varstring name) := EMBED(C++ : distributed)
+
+    static const char * rows3 = "Gavin     \x01Simon     \002Charlotte \x09TheEnd    \x00";
+
+    #body
+    __lenResult = 44;
+    __result = rtlMalloc(44);
+    memcpy(__result, rows3, 44);
+ENDEMBED;
+
+
+dsBlock := doReadBlock('C:\\temp\\simple');
+
+count(dsBlock) = CLUSTERSIZE * 4;

+ 2 - 3
thorlcr/activities/iterate/thiterateslave.cpp

@@ -401,7 +401,7 @@ class CStreamedIteratorSlaveActivity : public CSlaveActivity, public CThorDataLi
 {
 {
     IHThorStreamedIteratorArg *helper;
     IHThorStreamedIteratorArg *helper;
     Owned<IRowStream> rows;
     Owned<IRowStream> rows;
-    bool eof, isLocal;
+    bool eof;
 
 
 public:
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
@@ -414,11 +414,10 @@ public:
     {
     {
         appendOutputLinked(this);   // adding 'me' to outputs array
         appendOutputLinked(this);   // adding 'me' to outputs array
         helper = static_cast <IHThorStreamedIteratorArg *> (queryHelper());
         helper = static_cast <IHThorStreamedIteratorArg *> (queryHelper());
-        isLocal = false;
     }
     }
     virtual void start()
     virtual void start()
     {
     {
-        isLocal = container.queryOwnerId() && container.queryOwner().isLocalOnly();
+        bool isLocal = container.queryLocalData() || container.queryOwner().isLocalChild();
         eof = isLocal ? false : !firstNode();
         eof = isLocal ? false : !firstNode();
         if (!eof)
         if (!eof)
             rows.setown(helper->createInput());
             rows.setown(helper->createInput());