Bladeren bron

HPCC-22736 Allow KJ to dynamically spill in transit groups

Because KJ is highly parallel, there can be a lot of pending
(and completed) lhs+rhs match groups in flight.  They can be:
1) Complete but not read
2) Complete but due to ordering, blocked by a earlier but yet
incomplete matching group
3) Incomplete, still waiting for matches from in progress lookups

If the RHS groups are big and the RHS joins fields are sizeable,
this can use significant memory.

This PR tackles this scenario by dynamically spilling groups,
preferring completed groups for ease (due to ordering issues),
and scaling back on the amount of concurrent groups queueud being
matched, so over time less memory will be used.
There also changes to free rows as soon as possible, e.g.
as each row is read or deserialized, rather than when the join
group is finished with.

There was also a bug that caused the queue of done groups (those
match groups waiting to be read) to exceed the configured limit.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 5 jaren geleden
bovenliggende
commit
37f8c6a919
3 gewijzigde bestanden met toevoegingen van 931 en 132 verwijderingen
  1. 927 132
      thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
  2. 1 0
      thorlcr/thorutil/thmem.hpp
  3. 3 0
      thorlcr/thorutil/thormisc.hpp

File diff suppressed because it is too large
+ 927 - 132
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp


+ 1 - 0
thorlcr/thorutil/thmem.hpp

@@ -224,6 +224,7 @@ graph_decl StringBuffer &getRecordString(const void *key, IOutputRowSerializer *
 #define SPILL_PRIORITY_HASHJOIN SPILL_PRIORITY_HIGH
 #define SPILL_PRIORITY_LARGESORT SPILL_PRIORITY_HIGH
 #define SPILL_PRIORITY_LOOKUPJOIN SPILL_PRIORITY_HIGH
+#define SPILL_PRIORITY_KEYEDJOIN SPILL_PRIORITY_LOW+900
 
 
 enum StableSortFlag { stableSort_none, stableSort_earlyAlloc, stableSort_lateAlloc };

+ 3 - 0
thorlcr/thorutil/thormisc.hpp

@@ -83,7 +83,10 @@
 #define THOROPT_KEYLOOKUP_MAX_FETCH_THREADS "maxFetchThreads"   // Maximum number of threads performing keyed lookups                            (default = 10)
 #define THOROPT_KEYLOOKUP_MAX_PROCESS_THREADS "keyLookupMaxProcessThreads" // Maximum number of threads performing keyed lookups                 (default = 10)
 #define THOROPT_KEYLOOKUP_MAX_QUEUED  "keyLookupMaxQueued"      // Total maximum number of rows (across all parts/threads) to queue              (default = 10000)
+#define THOROPT_KEYLOOKUP_MIN_MB      "keyLookupMinJoinGroupMB" // Min(MB) for groups (across all parts/threads) to queue)                       (default = 50)
 #define THOROPT_KEYLOOKUP_MAX_DONE    "keyLookupMaxDone"        // Maximum number of done items pending to be ready by next activity             (default = 10000)
+#define THOROPT_KEYLOOKUP_PROCESS_BATCHLIMIT "keyLookupProcessBatchLimit" // Maximum number of key lookups on queue before passing to a processor (default = 1000)
+#define THOROPT_FETCHLOOKUP_PROCESS_BATCHLIMIT "fetchLookupProcessBatchLimit" // Maximum number of fetch lookups on queue before passing to a processor (default = 10000)
 #define THOROPT_REMOTE_KEYED_LOOKUP   "remoteKeyedLookup"       // Send key request to remote node unless part is local                          (default = true)
 #define THOROPT_REMOTE_KEYED_FETCH    "remoteKeyedFetch"        // Send fetch request to remote node unless part is local                        (default = true)
 #define THOROPT_FORCE_REMOTE_KEYED_LOOKUP "forceRemoteKeyedLookup" // force all keyed lookups, even where part local to be sent as if remote     (default = false)