浏览代码

Merge pull request #6906 from jakesmith/hpcc-9395

HPCC-9395 Expand hashjoin to hashdistributes+local join.

Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 10 年之前
父节点
当前提交
cc49936f9e

+ 2 - 2
ecl/hqlcpp/hqlcpp.cpp

@@ -1629,7 +1629,7 @@ void HqlCppTranslator::cacheOptions()
         DebugOption(options.resourceMaxSockets, "resourceMaxSockets", 0),
         DebugOption(options.resourceMaxActivities, "resourceMaxActivities", 0),
         DebugOption(options.resourceMaxHeavy, "resourceMaxHeavy", 1),
-        DebugOption(options.resourceMaxDistribute, "resourceMaxDistribute", 2),
+        DebugOption(options.resourceMaxDistribute, "resourceMaxDistribute", 8),
         DebugOption(options.unlimitedResources,"unlimitedResources", false),
         DebugOption(options.filteredReadSpillThreshold, "filteredReadSpillThreshold", 999),
         DebugOption(options.allowThroughSpill,"allowThroughSpill", true),
@@ -1718,7 +1718,7 @@ void HqlCppTranslator::cacheOptions()
         DebugOption(options.canLinkConstantRows,"canLinkConstantRows",true),
         DebugOption(options.checkAmbiguousRollupCondition,"checkAmbiguousRollupCondition",true),
         DebugOption(options.matchExistingDistributionForJoin,"matchExistingDistributionForJoin",true),
-        DebugOption(options.expandHashJoin,"expandHashJoin",false),
+        DebugOption(options.expandHashJoin,"expandHashJoin",true),
         DebugOption(options.traceIR,"traceIR",false),
         DebugOption(options.preserveCaseExternalParameter,"preserveCaseExternalParameter",true),
         DebugOption(options.optimizeParentAccess,"optimizeParentAccess",false),

+ 2 - 3
ecl/hqlcpp/hqlresource.cpp

@@ -40,9 +40,8 @@
 //#define VERIFY_RESOURCING
 //#define SPOT_UNCONDITIONAL_CONDITIONS
 
-#define DEFAULT_LARGEMEM_BUFFER_SIZE (0x58000000) // ~ 1.4GB
 #define DEFAULT_MAX_SOCKETS 2000 // configurable by setting max_sockets in .ini
-#define DEFAULT_TOTAL_MEMORY ((1024*1024*1800)-DEFAULT_LARGEMEM_BUFFER_SIZE)
+#define DEFAULT_TOTAL_MEMORY ((1024*1024*1800))
 #define FIXED_CLUSTER_SIZE 400
 #define MEM_Const_Minimal (1*1024*1024)
 #define DEFAULT_MAX_ACTIVITIES  100
@@ -64,7 +63,7 @@ MODULE_EXIT()
 
 static void setHashResources(IHqlExpression * expr, CResources & resources, const CResourceOptions & options)
 {
-    unsigned memneeded = MEM_Const_Minimal+resources.clusterSize*4*DISTRIBUTE_SINGLE_BUFFER_SIZE+DISTRIBUTE_PULL_BUFFER_SIZE;
+    unsigned memneeded = MEM_Const_Minimal+DISTRIBUTE_RESMEM(resources.clusterSize);
     resources.set(RESslavememory, memneeded).set(REShashdist, 1);
 }
 

+ 2 - 4
thorlcr/activities/hashdistrib/thhashdistribslave.cpp

@@ -53,8 +53,6 @@
 
 #define NUMSLAVEPORTS       2
 #define DEFAULTCONNECTTIMEOUT 10000
-#define DEFAULT_OUT_BUFFER_SIZE 0x100000        // 1MB
-#define DEFAULT_IN_BUFFER_SIZE  0x100000*32  // 32MB input buffer
 #define DEFAULT_WRITEPOOLSIZE 16
 #define DISK_BUFFER_SIZE 0x10000 // 64K
 #define DEFAULT_TIMEOUT (1000*60*60)
@@ -979,9 +977,9 @@ public:
         iCompare = NULL;
         ihash = NULL;
         fixedEstSize = 0;
-        bucketSendSize = activity->getOptUInt(THOROPT_HDIST_BUCKET_SIZE, DEFAULT_OUT_BUFFER_SIZE);
+        bucketSendSize = activity->getOptUInt(THOROPT_HDIST_BUCKET_SIZE, DISTRIBUTE_DEFAULT_OUT_BUFFER_SIZE);
         istop = _istop;
-        inputBufferSize = activity->getOptUInt(THOROPT_HDIST_BUFFER_SIZE, DEFAULT_IN_BUFFER_SIZE);
+        inputBufferSize = activity->getOptUInt(THOROPT_HDIST_BUFFER_SIZE, DISTRIBUTE_DEFAULT_IN_BUFFER_SIZE);
         pullBufferSize = DISTRIBUTE_PULL_BUFFER_SIZE;
         selfstopped = false;
         pull = false;

+ 4 - 2
thorlcr/thorutil/thbufdef.hpp

@@ -32,7 +32,7 @@
 #define INDEXWRITE_SMART_BUFFER_SIZE            (0x100000*12)           // 12MB
 #define COUNTPROJECT_SMART_BUFFER_SIZE          (0x100000*12)           // 12MB
 #define ENTH_SMART_BUFFER_SIZE                  (0x100000*12)           // 12MB
-#define JOIN_SMART_BUFFER_SIZE                 (0x100000*12)            // 12MB
+#define JOIN_SMART_BUFFER_SIZE                  (0x100000*12)           // 12MB
 #define LOOKUPJOINL_SMART_BUFFER_SIZE           (0x100000*12)           // 12MB
 #define CATCH_BUFFER_SIZE                       (0x100000*12)           // 12MB
 #define SKIPLIMIT_BUFFER_SIZE                   (0x100000*12)           // 12MB
@@ -42,7 +42,8 @@
 #define NSPLITTER_SPILL_BUFFER_SIZE             (0x100000)              // 1MB
 #define DISTRIBUTE_PULL_BUFFER_SIZE             (0x100000*32)           // 32MB
 #define SORT_BUFFER_TOTAL                       (0x100000*20)           // 20MB (estimate)
-#define DISTRIBUTE_SINGLE_BUFFER_SIZE           (0x10000)               // 64K  - NB per node and multiplied by async send
+#define DISTRIBUTE_DEFAULT_OUT_BUFFER_SIZE      (0x100000)              // 1MB (* targets (numnodes), on each slave)
+#define DISTRIBUTE_DEFAULT_IN_BUFFER_SIZE       (0x100000*32)           // 32MB input buffer (on each slave)
 #define FUNNEL_MIN_BUFF_SIZE                    (0x100000*2)            // 2MB
 #define FUNNEL_MAX_BUFF_SIZE                    (0x100000*20)           // 20MB
 #define COMBINE_MAX_BUFF_SIZE                   (0x100000*20)           // 20MB
@@ -54,6 +55,7 @@
 #define LOOP_SMART_BUFFER_SIZE                  (0x100000*12)           // 12MB
 #define LOCALRESULT_BUFFER_SIZE                 (0x100000*10)           // 10MB
 
+#define DISTRIBUTE_RESMEM(N) ((DISTRIBUTE_DEFAULT_OUT_BUFFER_SIZE * (N)) + DISTRIBUTE_DEFAULT_IN_BUFFER_SIZE)
 
 
 #endif