Explorar el Código

Merge remote-tracking branch 'origin/candidate-3.8.x' into candidate-3.10.x

Conflicts:
	thorlcr/thorutil/thmem.cpp

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman hace 12 años
padre
commit
aab37b2555

+ 15 - 0
ecl/hql/hqlattr.cpp

@@ -1670,6 +1670,14 @@ bool isLocalActivity(IHqlExpression * expr)
     }
     }
 }
 }
 
 
+bool isGroupedAggregateActivity(IHqlExpression * expr, IHqlExpression * grouping)
+{
+    if (grouping && !grouping->isAttribute())
+        return false;
+
+    return isGrouped(expr->queryChild(0));
+}
+
 bool isGroupedActivity(IHqlExpression * expr)
 bool isGroupedActivity(IHqlExpression * expr)
 {
 {
     switch (expr->getOperator())
     switch (expr->getOperator())
@@ -1704,6 +1712,13 @@ bool isGroupedActivity(IHqlExpression * expr)
     case no_related:
     case no_related:
     case no_pipe:
     case no_pipe:
         return isGrouped(expr->queryType());
         return isGrouped(expr->queryType());
+    case no_selectfields:
+    case no_usertable:
+        return isGroupedAggregateActivity(expr, expr->queryChild(2));
+    case no_aggregate:
+    case no_newaggregate:
+    case no_newusertable:
+        return isGroupedAggregateActivity(expr, expr->queryChild(3));
     case no_null:
     case no_null:
     case no_anon:
     case no_anon:
     case no_pseudods:
     case no_pseudods:

+ 4 - 0
ecl/regress/aggds4.ecl

@@ -28,3 +28,7 @@ pr2:= table(sqNamesTable2, { surname, forename, aage, unsigned8 seq := (random()
 
 
 //Filtered Aggregate on a projected table.
 //Filtered Aggregate on a projected table.
 output(table(pr2(seq > 10), { surname, ave(group, aage) }, surname, few, keyed));
 output(table(pr2(seq > 10), { surname, ave(group, aage) }, surname, few, keyed));
+
+//Should not generate a grouped Hash Aggregate
+output(sort(table(group(sort(sqNamesTable1, surname),surname), { surname, ave(group, aage) }, surname, few), record));
+

+ 7 - 0
initfiles/componentfiles/configxml/thor.xsd.in

@@ -353,6 +353,13 @@
           </xs:appinfo>
           </xs:appinfo>
         </xs:annotation>
         </xs:annotation>
       </xs:attribute>
       </xs:attribute>
+      <xs:attribute name="memorySpillAt" type="xs:nonNegativeInteger" use="optional">
+        <xs:annotation>
+          <xs:appinfo>
+            <tooltip>Threshold that the memory manager should start requesting memory to be freed (percentage)</tooltip>
+          </xs:appinfo>
+        </xs:annotation>
+      </xs:attribute>
       <xs:attribute name="pluginsPath" type="relativePath" default="${PLUGINS_PATH}/"/>
       <xs:attribute name="pluginsPath" type="relativePath" default="${PLUGINS_PATH}/"/>
       <xs:attribute name="nodeGroup" type="xs:string" use="optional">
       <xs:attribute name="nodeGroup" type="xs:string" use="optional">
         <xs:annotation>
         <xs:annotation>

+ 22 - 0
system/jlib/jdebug.hpp

@@ -87,6 +87,28 @@ struct ITimeReporter : public IInterface
   virtual void serialize(MemoryBuffer &mb) = 0;
   virtual void serialize(MemoryBuffer &mb) = 0;
 };
 };
 
 
+class CCycleTimer
+{
+    cycle_t start_time;
+public:
+    CCycleTimer()
+    {
+        reset();
+    }
+    inline void reset()
+    {
+        start_time = get_cycles_now();
+    }
+    inline cycle_t elapsedCycles()
+    {
+        return get_cycles_now() - start_time;
+    }
+    inline unsigned elapsedMs()
+    {
+        return cycle_to_nanosec(elapsedCycles())/1000000;
+    }
+};
+
 class jlib_decl TimeSection
 class jlib_decl TimeSection
 {
 {
 public:
 public:

+ 15 - 2
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -532,7 +532,9 @@ public:
 
 
     virtual void recvloop()
     virtual void recvloop()
     {
     {
+        CCycleTimer timer;
         MemoryBuffer tempMb;
         MemoryBuffer tempMb;
+        static cycle_t oneSec = nanosec_to_cycle(1000000000);
         try {
         try {
             ActPrintLog(activity, "Read loop start");
             ActPrintLog(activity, "Read loop start");
             CMessageBuffer recvMb;
             CMessageBuffer recvMb;
@@ -562,8 +564,19 @@ public:
                     }
                     }
                     {
                     {
                         CriticalBlock block(putsect);
                         CriticalBlock block(putsect);
-                        while (!rowSource.eos()) 
-                            pipewr->putRow(ptrallocator.deserializeRow(allocator,rowSource));      
+                        while (!rowSource.eos())
+                        {
+                            timer.reset();
+                            const void *row = ptrallocator.deserializeRow(allocator,rowSource);
+                            cycle_t took=timer.elapsedCycles();
+                            if (took>=oneSec)
+                                DBGLOG("RECVLOOP deserializeRow blocked for : %d second(s)", (unsigned)(cycle_to_nanosec(took)/1000000000));
+                            timer.reset();
+                            pipewr->putRow(row);
+                            took=timer.elapsedCycles();
+                            if (took>=oneSec)
+                                DBGLOG("RECVLOOP pipewr->putRow blocked for : %d second(s)", (unsigned)(cycle_to_nanosec(took)/1000000000));
+                        }
                     }
                     }
                 }
                 }
                 else {
                 else {

+ 3 - 2
thorlcr/graph/thgraph.cpp

@@ -2351,13 +2351,14 @@ void CJobBase::init()
 #endif
 #endif
     bool crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", defaultCrcChecking));
     bool crcChecking = 0 != getWorkUnitValueInt("THOR_ROWCRC", globals->getPropBool("@THOR_ROWCRC", defaultCrcChecking));
     bool usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", false));
     bool usePackedAllocator = 0 != getWorkUnitValueInt("THOR_PACKEDALLOCATOR", globals->getPropBool("@THOR_PACKEDALLOCATOR", false));
-    thorAllocator.setown(createThorAllocator(((memsize_t)gmemSize)*0x100000, crcChecking, usePackedAllocator));
+    unsigned memorySpillAt = getWorkUnitValueInt("memorySpillAt", globals->getPropInt("@memorySpillAt", 80));
+    thorAllocator.setown(createThorAllocator(((memsize_t)gmemSize)*0x100000, memorySpillAt, crcChecking, usePackedAllocator));
 
 
     unsigned defaultMemMB = gmemSize*3/4;
     unsigned defaultMemMB = gmemSize*3/4;
     unsigned largeMemSize = getOptInt("@largeMemSize", defaultMemMB);
     unsigned largeMemSize = getOptInt("@largeMemSize", defaultMemMB);
     if (gmemSize && largeMemSize >= gmemSize)
     if (gmemSize && largeMemSize >= gmemSize)
         throw MakeStringException(0, "largeMemSize(%d) can not exceed globalMemorySize(%d)", largeMemSize, gmemSize);
         throw MakeStringException(0, "largeMemSize(%d) can not exceed globalMemorySize(%d)", largeMemSize, gmemSize);
-    PROGLOG("Global memory size = %d MB, large mem size = %d MB", gmemSize, largeMemSize);
+    PROGLOG("Global memory size = %d MB, memory spill at = %d%%, large mem size = %d MB", gmemSize, memorySpillAt, largeMemSize);
     StringBuffer tracing("maxActivityCores = ");
     StringBuffer tracing("maxActivityCores = ");
     if (maxActivityCores)
     if (maxActivityCores)
         tracing.append(maxActivityCores);
         tracing.append(maxActivityCores);

+ 11 - 5
thorlcr/thorutil/thmem.cpp

@@ -1127,7 +1127,12 @@ protected:
 
 
         totalRows += numRows;
         totalRows += numRows;
         if (iCompare)
         if (iCompare)
+        {
+            ActPrintLog(&activity, "Sorting %"RIPF"d rows", spillableRows.numCommitted());
+            CCycleTimer timer;
             spillableRows.sort(*iCompare, maxCores); // sorts committed rows
             spillableRows.sort(*iCompare, maxCores); // sorts committed rows
+            ActPrintLog(&activity, "Sort took: %f", ((float)timer.elapsedMs())/1000);
+        }
 
 
         StringBuffer tempname;
         StringBuffer tempname;
         GetTempName(tempname,"srtspill",true);
         GetTempName(tempname,"srtspill",true);
@@ -1674,9 +1679,10 @@ protected:
 public:
 public:
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
     IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
 
 
-    CThorAllocator(memsize_t memSize, roxiemem::RoxieHeapFlags _flags) : flags(_flags)
+    CThorAllocator(memsize_t memSize, unsigned memorySpillAt, roxiemem::RoxieHeapFlags _flags) : flags(_flags)
     {
     {
         rowManager.setown(roxiemem::createRowManager(memSize, NULL, queryDummyContextLogger(), this, false));
         rowManager.setown(roxiemem::createRowManager(memSize, NULL, queryDummyContextLogger(), this, false));
+        rowManager->setMemoryLimit(memSize, 0==memorySpillAt ? 0 : memSize/100*memorySpillAt);
         rtlSetReleaseRowHook(this);
         rtlSetReleaseRowHook(this);
     }
     }
     ~CThorAllocator()
     ~CThorAllocator()
@@ -1791,7 +1797,7 @@ public:
 class CThorCrcCheckingAllocator : public CThorAllocator
 class CThorCrcCheckingAllocator : public CThorAllocator
 {
 {
 public:
 public:
-    CThorCrcCheckingAllocator(memsize_t memSize, roxiemem::RoxieHeapFlags flags) : CThorAllocator(memSize, flags)
+    CThorCrcCheckingAllocator(memsize_t memSize, unsigned memorySpillAt, roxiemem::RoxieHeapFlags flags) : CThorAllocator(memSize, memorySpillAt, flags)
     {
     {
     }
     }
 // IThorAllocator
 // IThorAllocator
@@ -1808,7 +1814,7 @@ public:
 };
 };
 
 
 
 
-IThorAllocator *createThorAllocator(memsize_t memSize, bool crcChecking, bool usePacked)
+IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, bool crcChecking, bool usePacked)
 {
 {
     PROGLOG("CRC allocator %s", crcChecking?"ON":"OFF");
     PROGLOG("CRC allocator %s", crcChecking?"ON":"OFF");
     PROGLOG("Packed allocator %s", usePacked?"ON":"OFF");
     PROGLOG("Packed allocator %s", usePacked?"ON":"OFF");
@@ -1818,9 +1824,9 @@ IThorAllocator *createThorAllocator(memsize_t memSize, bool crcChecking, bool us
     else
     else
         flags = roxiemem::RHFnone;
         flags = roxiemem::RHFnone;
     if (crcChecking)
     if (crcChecking)
-        return new CThorCrcCheckingAllocator(memSize, flags);
+        return new CThorCrcCheckingAllocator(memSize, memorySpillAt, flags);
     else
     else
-        return new CThorAllocator(memSize, flags);
+        return new CThorAllocator(memSize, memorySpillAt, flags);
 }
 }
 
 
 
 

+ 1 - 1
thorlcr/thorutil/thmem.hpp

@@ -157,7 +157,7 @@ interface IThorAllocator : extends IInterface
     virtual bool queryCrc() const = 0;
     virtual bool queryCrc() const = 0;
 };
 };
 
 
-IThorAllocator *createThorAllocator(memsize_t memSize, bool crcChecking, bool usePacked);
+IThorAllocator *createThorAllocator(memsize_t memSize, unsigned memorySpillAt, bool crcChecking, bool usePacked);
 
 
 extern graph_decl IOutputMetaData *createOutputMetaDataWithExtra(IOutputMetaData *meta, size32_t sz);
 extern graph_decl IOutputMetaData *createOutputMetaDataWithExtra(IOutputMetaData *meta, size32_t sz);
 extern graph_decl IOutputMetaData *createOutputMetaDataWithChildRow(IEngineRowAllocator *childAllocator, size32_t extraSz);
 extern graph_decl IOutputMetaData *createOutputMetaDataWithChildRow(IEngineRowAllocator *childAllocator, size32_t extraSz);