Przeglądaj źródła

Merge remote-tracking branch 'origin/closedown-4.0.x'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 lat temu
rodzic
commit
508eb6e897

+ 18 - 4
dali/base/dadfs.cpp

@@ -8182,13 +8182,27 @@ class CInitGroups
         Owned<IPropertyTreeIterator> nodes = cluster.getElements(processName);
         ForEach(*nodes) {
             IPropertyTree &node = nodes->query();
+            SocketEndpoint ep;
             const char *computer = node.queryProp("@computer");
-            CMachineEntryPtr *m = machinemap.getValue(computer);
-            if (!m) {
-                ERRLOG("Cannot construct %s, computer name %s not found\n",cluster.queryProp("@name"),computer);
+            const char *netAddress = node.queryProp("@netAddress");
+            if (computer && *computer)
+            {
+                CMachineEntryPtr *m = machinemap.getValue(computer);
+                if (!m) {
+                    ERRLOG("Cannot construct %s, computer name %s not found\n", cluster.queryProp("@name"), computer);
+                    return NULL;
+                }
+                ep.set((*m)->ep);
+            }
+            else if (netAddress && *netAddress)
+            {
+                ep.set(netAddress, 0);
+            }
+            else
+            {
+                ERRLOG("Cannot construct %s, missing computer spec on node\n", cluster.queryProp("@name"));
                 return NULL;
             }
-            SocketEndpoint ep = (*m)->ep;
             switch (groupType) {
                 case grp_roxie:
                 // Redundant copies are located via the flags.

+ 1 - 1
ecl/hql/hqlgram.hpp

@@ -785,7 +785,7 @@ protected:
     void definePatternSymbolProduction(attribute & nameattr, const attribute & assignAttr, attribute & valueAttr, attribute & workflowAttr, const attribute & semiattr);
     void cloneInheritedAttributes(IHqlScope * scope, const attribute & errpos);
 
-    IHqlExpression * createEvaluateOutputModule(const attribute & errpos, IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, node_operator outputOp);
+    IHqlExpression * createEvaluateOutputModule(const attribute & errpos, IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, node_operator outputOp, IIdAtom *matchId);
     IHqlExpression * createStoredModule(const attribute & errpos, IHqlExpression * scopeExpr);
     void processForwardModuleDefinition(const attribute & errpos);
     void checkNonGlobalModule(const attribute & errpos, IHqlExpression * scopeExpr);

+ 9 - 3
ecl/hql/hqlgram.y

@@ -2585,7 +2585,13 @@ actionStmt
                         {
                             OwnedHqlExpr abstract = $3.getExpr();
                             OwnedHqlExpr concrete = parser->checkConcreteModule($3, abstract);
-                            $$.setExpr(parser->createEvaluateOutputModule($3, concrete, concrete, no_evaluate_stmt), $1);
+                            $$.setExpr(parser->createEvaluateOutputModule($3, concrete, concrete, no_evaluate_stmt, NULL), $1);
+                        }
+    | EVALUATE '(' abstractModule ',' knownOrUnknownId ')'
+                        {
+                            OwnedHqlExpr abstract = $3.getExpr();
+                            OwnedHqlExpr concrete = parser->checkConcreteModule($3, abstract);
+                            $$.setExpr(parser->createEvaluateOutputModule($3, concrete, concrete, no_evaluate_stmt, $5.getId()), $1);
                         }
     | DISTRIBUTION '(' startTopFilter beginList optDistributionFlags ignoreDummyList ')' endTopFilter
                         {
@@ -2621,7 +2627,7 @@ actionStmt
                         {
                             OwnedHqlExpr abstract = $3.getExpr();
                             OwnedHqlExpr concrete = parser->checkConcreteModule($3, abstract);
-                            $$.setExpr(parser->createEvaluateOutputModule($3, concrete, concrete, no_output));
+                            $$.setExpr(parser->createEvaluateOutputModule($3, concrete, concrete, no_output, NULL));
                             $$.setPosition($1);
                         }
     | OUTPUT '(' abstractModule ',' abstractModule ')'
@@ -2629,7 +2635,7 @@ actionStmt
                             OwnedHqlExpr abstract = $3.getExpr();
                             OwnedHqlExpr concrete = parser->checkConcreteModule($3, abstract);
                             OwnedHqlExpr iface = $5.getExpr();
-                            $$.setExpr(parser->createEvaluateOutputModule($3, concrete, iface, no_output));
+                            $$.setExpr(parser->createEvaluateOutputModule($3, concrete, iface, no_output, NULL));
                             $$.setPosition($1);
                         }
     | ALLNODES '(' beginList actionlist ')'

+ 2 - 2
ecl/hql/hqlgram2.cpp

@@ -9629,11 +9629,11 @@ IHqlExpression * HqlGram::createLibraryInstance(const attribute & errpos, IHqlEx
 
 //==========================================================================================================
 
-IHqlExpression * HqlGram::createEvaluateOutputModule(const attribute & errpos, IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, node_operator outputOp)
+IHqlExpression * HqlGram::createEvaluateOutputModule(const attribute & errpos, IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, node_operator outputOp, IIdAtom *matchId)
 {
     if (!ifaceExpr->queryType()->assignableFrom(scopeExpr->queryType()))
         reportError(ERR_NOT_BASE_MODULE, errpos, "Module doesn't implement the interface supplied");
-    return ::createEvaluateOutputModule(lookupCtx, scopeExpr, ifaceExpr, lookupCtx.queryExpandCallsWhenBound(), outputOp);
+    return ::createEvaluateOutputModule(lookupCtx, scopeExpr, ifaceExpr, lookupCtx.queryExpandCallsWhenBound(), outputOp, matchId);
 }
 
 IHqlExpression * HqlGram::createStoredModule(const attribute & errpos, IHqlExpression * scopeExpr)

+ 15 - 10
ecl/hql/hqlutil.cpp

@@ -3921,7 +3921,7 @@ public:
     ModuleExpander(HqlLookupContext & _ctx, bool _expandCallsWhenBound, node_operator _outputOp) 
         : ctx(_ctx), expandCallsWhenBound(_expandCallsWhenBound), outputOp(_outputOp) {}
 
-    IHqlExpression * createExpanded(IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, const char * prefix);
+    IHqlExpression * createExpanded(IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, const char * prefix, IIdAtom *matchId);
 
 protected:
     HqlLookupContext ctx;
@@ -3929,7 +3929,7 @@ protected:
     node_operator outputOp;
 };
 
-IHqlExpression * ModuleExpander::createExpanded(IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, const char * prefix)
+IHqlExpression * ModuleExpander::createExpanded(IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, const char * prefix, IIdAtom *matchId)
 {
     IHqlScope * scope = scopeExpr->queryScope();
 
@@ -4009,17 +4009,22 @@ IHqlExpression * ModuleExpander::createExpanded(IHqlExpression * scopeExpr, IHql
                 break;
             }
 
-            if (op != no_none)
-                outputs.append(*createValue(op, makeVoidType(), LINK(value), createAttribute(namedAtom, createConstant(lowername))));
-            else if (value->isAction())
-                outputs.append(*LINK(value));
-            else if (value->isScope())
+            if (value->isScope())
             {
                 lowername.append(".");
-                OwnedHqlExpr child = createExpanded(value, value, lowername.str());
+                OwnedHqlExpr child = createExpanded(value, value, lowername.str(), matchId);
                 if (child->getOperator() != no_null)
                     outputs.append(*child.getClear());
             }
+            else if (!matchId || name->lower()==matchId->lower())
+            {
+                if (op != no_none)
+                {
+                    outputs.append(*createValue(op, makeVoidType(), LINK(value), createAttribute(namedAtom, createConstant(lowername))));
+                }
+                else if (value->isAction())
+                    outputs.append(*LINK(value));
+            }
         }
     }
 
@@ -4027,10 +4032,10 @@ IHqlExpression * ModuleExpander::createExpanded(IHqlExpression * scopeExpr, IHql
 }
 
 
-IHqlExpression * createEvaluateOutputModule(HqlLookupContext & ctx, IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, bool expandCallsWhenBound, node_operator outputOp)
+IHqlExpression * createEvaluateOutputModule(HqlLookupContext & ctx, IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, bool expandCallsWhenBound, node_operator outputOp, IIdAtom *id)
 {
     ModuleExpander expander(ctx, expandCallsWhenBound, outputOp);
-    return expander.createExpanded(scopeExpr, ifaceExpr, NULL);
+    return expander.createExpanded(scopeExpr, ifaceExpr, NULL, id);
 }
 
 

+ 1 - 1
ecl/hql/hqlutil.hpp

@@ -165,7 +165,7 @@ extern HQL_API bool hasOperand(IHqlExpression * expr, IHqlExpression * child);
 
 extern HQL_API unsigned numRealChildren(IHqlExpression * expr);
 
-extern HQL_API IHqlExpression * createEvaluateOutputModule(HqlLookupContext & ctx, IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, bool expandCallsWhenBound, node_operator outputOp);
+extern HQL_API IHqlExpression * createEvaluateOutputModule(HqlLookupContext & ctx, IHqlExpression * scopeExpr, IHqlExpression * ifaceExpr, bool expandCallsWhenBound, node_operator outputOp, IIdAtom *matchId);
 extern HQL_API IHqlExpression * createStoredModule(IHqlExpression * scopeExpr);
 extern HQL_API IHqlExpression * convertScalarAggregateToDataset(IHqlExpression * expr);
 

+ 2 - 0
initfiles/sbin/add_conf_settings.sh.in

@@ -40,5 +40,7 @@ ${USER_NAME}    soft    core      unlimited
 ${USER_NAME}    hard    core      unlimited
 ${USER_NAME}    soft    nproc     4096
 ${USER_NAME}    hard    nproc     8192
+${USER_NAME}    soft    rtprio    0
+${USER_NAME}    hard    rtprio    4
 %EOF
 

+ 19 - 14
roxie/ccd/ccdmain.cpp

@@ -838,10 +838,26 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         if (myNodeIndex == -1)
             throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - current node is not in server list");
 
+        // Set multicast base addresses - must be done before generating slave channels
+
+        if (roxieMulticastEnabled && !localSlave)
+        {
+            if (topology->queryProp("@multicastBase"))
+                multicastBase.ipset(topology->queryProp("@multicastBase"));
+            else
+                throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastBase not set");
+            if (topology->queryProp("@multicastLast"))
+                multicastLast.ipset(topology->queryProp("@multicastLast"));
+            else
+                throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastLast not set");
+        }
+
         // Generate the slave channels
         unsigned numDataCopies = topology->getPropInt("@numDataCopies", 1);
         unsigned numNodes = getNumNodes();
         const char *slaveConfig = topology->queryProp("@slaveConfig");
+        if (!slaveConfig)
+            slaveConfig = "simple";
         if (strnicmp(slaveConfig, "cyclic", 6) == 0)
         {
             if (numChannels != numNodes)
@@ -852,10 +868,10 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
                 int channel = i+1;
                 for (int copy=0; copy<numDataCopies; copy++)
                 {
-                    channel = channel + cyclicOffset;
                     if (channel > numNodes)
                         channel = channel - numNodes;
                     addChannel(i, channel, copy);
+                    channel = channel + cyclicOffset;
                 }
             }
         }
@@ -887,21 +903,10 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
                     channel = 1;
             }
         }
+        // Now we know all the channels, we can open and subscribe the multicast channels
         if (!localSlave)
-        {
-            if (roxieMulticastEnabled)
-            {
-                if (topology->queryProp("@multicastBase"))
-                    multicastBase.ipset(topology->queryProp("@multicastBase"));
-                else
-                    throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastBase not set");
-                if (topology->queryProp("@multicastLast"))
-                    multicastLast.ipset(topology->queryProp("@multicastLast"));
-                else
-                    throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - multicastLast not set");
-            }
             openMulticastSocket();
-        }
+
         setDaliServixSocketCaching(true);  // enable daliservix caching
         loadPlugins();
         globalPackageSetManager = createRoxiePackageSetManager(standAloneDll.getClear());

+ 1 - 0
roxie/ccd/ccdqueue.cpp

@@ -152,6 +152,7 @@ size32_t channelWrite(unsigned channel, void const* buf, size32_t size)
 {
     size32_t minwrote = 0;
     SocketEndpointArray &eps = slaveEndpoints[channel]; // if multicast is enabled, this will have a single multicast endpoint in it.
+    assertex(eps.ordinality());
     ForEachItemIn(idx, eps)
     {
         size32_t wrote = multicastSocket->udp_write_to(eps.item(idx), buf, size);

+ 13 - 0
testing/ecl/evaluateModule.ecl

@@ -0,0 +1,13 @@
+t := MODULE
+  EXPORT a := OUTPUT('a');
+  EXPORT c := OUTPUT('c');
+  EXPORT b := OUTPUT('b');
+  EXPORT d := MODULE
+    EXPORT a := OUTPUT('a1');
+    EXPORT c := OUTPUT('c1');
+    EXPORT b := OUTPUT('b1');
+  END;
+END;
+
+EVALUATE(t);
+EVALUATE(t, c);

+ 24 - 0
testing/ecl/key/evaluateModule.xml

@@ -0,0 +1,24 @@
+<Dataset name='Result 1'>
+ <Row><Result_1>a</Result_1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><Result_2>b</Result_2></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><Result_3>c</Result_3></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><Result_4>a1</Result_4></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><Result_5>b1</Result_5></Row>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><Result_6>c1</Result_6></Row>
+</Dataset>
+<Dataset name='Result 7'>
+ <Row><Result_7>c</Result_7></Row>
+</Dataset>
+<Dataset name='Result 8'>
+ <Row><Result_8>c1</Result_8></Row>
+</Dataset>

+ 30 - 24
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -2300,18 +2300,11 @@ class CBucket : public CSimpleInterface, implements IInterface
     unsigned bucketN;
     CSpill rowSpill, keySpill;
 
+    void doSpillHashTable();
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
     CBucket(HashDedupSlaveActivityBase &_owner, IRowInterfaces *_rowIf, IRowInterfaces *_keyIf, IHash *_iRowHash, IHash *_iKeyHash, ICompare *_iCompare, bool _extractKey, unsigned _bucketN, CHashTableRowTable *_htRows);
-    void setSpilt()
-    {
-        if (spilt)
-            return;
-        rowSpill.init();
-        keySpill.init();
-        spilt = true;
-    }
     bool addKey(const void *key, unsigned hashValue);
     bool addRow(const void *row, unsigned hashValue);
     void clear();
@@ -2802,25 +2795,29 @@ void CBucket::clear()
     }
 }
 
-bool CBucket::spillHashTable()
+void CBucket::doSpillHashTable()
 {
-    rowidx_t removeN;
+    if (isSpilt())
+        return;
+    spilt = true;
+    rowSpill.init();
+    keySpill.init();
+    rowidx_t maxRows = htRows->queryMaxRows();
+    for (rowidx_t i=0; i<maxRows; i++)
     {
-        CriticalBlock b(lock);
-        removeN = htRows->queryHtElements();
-        if (0 == removeN || spilt) // NB: if split, will be handled by CBucket on different priority
-            return false;
-        setSpilt();
-        // JCSMORE - could detach row table here and let 'lock' go whilst spilling to disk
-        // would have to careful to ensure that when buck is closed, it waits for pending write
-        rowidx_t maxRows = htRows->queryMaxRows();
-        for (rowidx_t i=0; i<maxRows; i++)
-        {
-            OwnedConstThorRow key = htRows->getRowClear(i);
-            if (key)
-                keySpill.putRow(key.getClear());
-        }
+        OwnedConstThorRow key = htRows->getRowClear(i);
+        if (key)
+            keySpill.putRow(key.getClear());
     }
+}
+
+bool CBucket::spillHashTable()
+{
+    CriticalBlock b(lock);
+    rowidx_t removeN = htRows->queryHtElements();
+    if (0 == removeN || spilt) // NB: if split, will be handled by CBucket on different priority
+        return false; // signal nothing to spill
+    doSpillHashTable();
     ActPrintLog(&owner, "Spilt bucket %d - %d elements of hash table", bucketN, removeN);
     return true;
 }
@@ -2836,7 +2833,11 @@ bool CBucket::addKey(const void *key, unsigned hashValue)
             {
                 // attempt rehash
                 if (!rehash())
+                {
+                    // no room to rehash, ensure spilt
+                    doSpillHashTable(); // NB: may have spilt already when allocating for rehash
                     doAdd = false;
+                }
             }
             if (doAdd)
             {
@@ -2895,6 +2896,11 @@ bool CBucket::addRow(const void *row, unsigned hashValue)
                 {
                     if (rehash()) // even if rehash fails, there may be room to continue (following a flush)
                         htPos = hashValue % htRows->queryMaxRows();
+                    else
+                    {
+                        // no room to rehash, ensure spilt
+                        doSpillHashTable(); // NB: may have spilt already when allocating for rehash
+                    }
                 }
                 if (htRows->hasRoom())
                 {