瀏覽代碼

HPCC-16476 Refactored Keyed Join

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 8 年之前
父節點
當前提交
9b84e2d4f5
共有 35 個文件被更改,包括 7274 次插入2237 次删除
  1. 1 1
      rtl/eclrtl/rtldynfield.hpp
  2. 1 1
      system/jlib/jiface.hpp
  3. 32 0
      system/jlib/jmutex.hpp
  4. 7 0
      testing/regress/ecl/fullkeyed.ecl
  5. 9 0
      testing/regress/ecl/indexread_keyed.ecl
  6. 57 0
      testing/regress/ecl/key/keyed_join5.xml
  7. 8 0
      testing/regress/ecl/keyed_denormalize.ecl
  8. 7 0
      testing/regress/ecl/keyed_fetch.ecl
  9. 8 0
      testing/regress/ecl/keyed_join.ecl
  10. 7 0
      testing/regress/ecl/keyed_join2.ecl
  11. 11 0
      testing/regress/ecl/keyed_join3.ecl
  12. 106 0
      testing/regress/ecl/keyed_join5.ecl
  13. 1 0
      thorlcr/activities/activitymasters_lcr.cmake
  14. 1 0
      thorlcr/activities/activityslaves_lcr.cmake
  15. 350 0
      thorlcr/activities/keyedjoin/thkeyedjoin-legacy.cpp
  16. 405 219
      thorlcr/activities/keyedjoin/thkeyedjoin.cpp
  17. 4 0
      thorlcr/activities/keyedjoin/thkeyedjoin.ipp
  18. 52 0
      thorlcr/activities/keyedjoin/thkeyedjoincommon.hpp
  19. 2446 0
      thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp
  20. 2191 1946
      thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
  21. 4 0
      thorlcr/activities/keyedjoin/thkeyedjoinslave.ipp
  22. 14 6
      thorlcr/graph/thgraph.cpp
  23. 26 8
      thorlcr/graph/thgraph.hpp
  24. 0 3
      thorlcr/graph/thgraphmaster.cpp
  25. 8 2
      thorlcr/graph/thgraphslave.cpp
  26. 4 2
      thorlcr/graph/thgraphslave.hpp
  27. 10 30
      thorlcr/master/thactivitymaster.cpp
  28. 1 6
      thorlcr/master/thactivitymaster.hpp
  29. 6 2
      thorlcr/master/thmastermain.cpp
  30. 1434 8
      thorlcr/slave/slavmain.cpp
  31. 1 0
      thorlcr/slave/thslavemain.cpp
  32. 38 1
      thorlcr/thorutil/thmem.cpp
  33. 2 0
      thorlcr/thorutil/thmem.hpp
  34. 1 0
      thorlcr/thorutil/thormisc.cpp
  35. 21 2
      thorlcr/thorutil/thormisc.hpp

+ 1 - 1
rtl/eclrtl/rtldynfield.hpp

@@ -103,7 +103,7 @@ interface IRtlFieldTypeDeserializer : public IInterface
 
 };
 
-enum class RecordTranslationMode { None = 0, All = 1, Payload = 2, AlwaysDisk = 3, AlwaysECL = 4 };  // Latter 2 are for testing purposes only
+enum class RecordTranslationMode:byte { None = 0, All = 1, Payload = 2, AlwaysDisk = 3, AlwaysECL = 4 };  // Latter 2 are for testing purposes only
 
 extern ECLRTL_API RecordTranslationMode getTranslationMode(const char *modeStr);
 extern ECLRTL_API const char *getTranslationModeText(RecordTranslationMode val);

+ 1 - 1
system/jlib/jiface.hpp

@@ -102,7 +102,7 @@ template class CSimpleInterfaceOf<CEmptyClass>;
 class jlib_decl CSimpleInterface : public CSimpleInterfaceOf<CEmptyClass>
 {
 public:
-    bool Release() const;   // Prevent Release() being inlined everwhere it is called
+    bool Release() const;   // Prevent Release() being inlined everywhere it is called
 };
 #ifdef _WIN32
 #pragma warning(pop)

+ 32 - 0
system/jlib/jmutex.hpp

@@ -355,6 +355,38 @@ public:
     inline ~CriticalUnblock()                                   { crit.enter(); }
 };
 
+class CLeavableCriticalBlock
+{
+    CriticalSection &crit;
+    bool locked = false;
+public:
+    inline CLeavableCriticalBlock(CriticalSection &_crit) : crit(_crit)
+    {
+        enter();
+    }
+    inline ~CLeavableCriticalBlock()
+    {
+        if (locked)
+            crit.leave();
+    }
+    inline void enter()
+    {
+        if (locked)
+            return;
+        crit.enter();
+        locked = true;
+    }
+    inline void leave()
+    {
+        if (locked)
+        {
+            locked = false;
+            crit.leave();
+        }
+    }
+};
+
+
 #ifdef SPINLOCK_USE_MUTEX // for testing
 
 class  SpinLock

+ 7 - 0
testing/regress/ecl/fullkeyed.ecl

@@ -19,14 +19,21 @@
 //class=index
 //version multiPart=false
 //version multiPart=true
+//version multiPart=false,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
+//version multiPart=true,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
 
 import ^ as root;
 multiPart := #IFDEFINED(root.multiPart, true);
 useLocal := #IFDEFINED(root.useLocal, false);
 useTranslation := #IFDEFINED(root.useTranslation, false);
+forceRemoteKeyedLookup := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+forceRemoteKeyedFetch := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+
 
 //--- end of version configuration ---
 
+#option('forceRemoteKeyedLookup', forceRemoteKeyedLookup);
+#option('forceRemoteKeyedFetch', forceRemoteKeyedFetch);
 #option ('layoutTranslationEnabled', useTranslation);
 #onwarning (4522, ignore);
 

+ 9 - 0
testing/regress/ecl/indexread_keyed.ecl

@@ -19,14 +19,23 @@
 //version multiPart=false
 //version multiPart=true
 //version multiPart=true,useLocal=true
+//version multiPart=false,useLocal=true,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
+//version multiPart=true,useLocal=true,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
 
 import ^ as root;
 multiPart := #IFDEFINED(root.multiPart, true);
 useLocal := #IFDEFINED(root.useLocal, false);
 useTranslation := false;    // keyed limits do not produce the same results.
+forceRemoteKeyedLookup := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+forceRemoteKeyedFetch := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+
 
 //--- end of version configuration ---
 
+#option('forceRemoteKeyedLookup', forceRemoteKeyedLookup);
+#option('forceRemoteKeyedFetch', forceRemoteKeyedFetch);
+
+
 #onwarning (5402, ignore);
 
 import $.setup;

+ 57 - 0
testing/regress/ecl/key/keyed_join5.xml

@@ -0,0 +1,57 @@
+<Dataset name='Result 1'>
+</Dataset>
+<Dataset name='Result 2'>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><someid>0</someid><lhskey>2</lhskey><key>2</key><f1>a2        </f1><filepos>96</filepos></Row>
+ <Row><someid>1</someid><lhskey>3</lhskey><key>3</key><f1>e         </f1><filepos>120</filepos></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><someid>0</someid><lhskey>2</lhskey><key>2</key><f1>a2        </f1><f2>d         </f2></Row>
+ <Row><someid>1</someid><lhskey>3</lhskey><key>3</key><f1>e         </f1><f2>f         </f2></Row>
+</Dataset>
+<Dataset name='Result 5'>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a         </f1><filepos>0</filepos></Row>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a3        </f1><filepos>48</filepos></Row>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a         </f1><filepos>0</filepos></Row>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a3        </f1><filepos>48</filepos></Row>
+ <Row><someid>1</someid><lhskey>1</lhskey><key>1</key><f1>a         </f1><filepos>0</filepos></Row>
+ <Row><someid>1</someid><lhskey>1</lhskey><key>1</key><f1>a3        </f1><filepos>48</filepos></Row>
+ <Row><someid>1</someid><lhskey>3</lhskey><key>3</key><f1>e         </f1><filepos>120</filepos></Row>
+</Dataset>
+<Dataset name='Result 6'>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a         </f1><f2>b         </f2></Row>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a3        </f1><f2>b3        </f2></Row>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a         </f1><f2>b         </f2></Row>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a3        </f1><f2>b3        </f2></Row>
+ <Row><someid>1</someid><lhskey>1</lhskey><key>1</key><f1>a         </f1><f2>b         </f2></Row>
+ <Row><someid>1</someid><lhskey>1</lhskey><key>1</key><f1>a3        </f1><f2>b3        </f2></Row>
+ <Row><someid>1</someid><lhskey>3</lhskey><key>3</key><f1>e         </f1><f2>f         </f2></Row>
+</Dataset>
+<Dataset name='Result 7'>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a         </f1><f2>b         </f2></Row>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a3        </f1><f2>b3        </f2></Row>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a         </f1><f2>b         </f2></Row>
+ <Row><someid>0</someid><lhskey>1</lhskey><key>1</key><f1>a3        </f1><f2>b3        </f2></Row>
+ <Row><someid>0</someid><lhskey>2</lhskey><key>2</key><f1>a2        </f1><f2>d         </f2></Row>
+ <Row><someid>1</someid><lhskey>1</lhskey><key>1</key><f1>a         </f1><f2>b         </f2></Row>
+ <Row><someid>1</someid><lhskey>1</lhskey><key>1</key><f1>a3        </f1><f2>b3        </f2></Row>
+ <Row><someid>1</someid><lhskey>3</lhskey><key>3</key><f1>e         </f1><f2>f         </f2></Row>
+</Dataset>
+<Dataset name='Result 8'>
+ <Row><key>1</key><f1>a2        </f1><f2>b2        </f2></Row>
+ <Row><key>1</key><f1>a3        </f1><f2>b3        </f2></Row>
+ <Row><key>1</key><f1>a2        </f1><f2>b2        </f2></Row>
+ <Row><key>1</key><f1>a3        </f1><f2>b3        </f2></Row>
+ <Row><key>2</key><f1>a2        </f1><f2>d         </f2></Row>
+ <Row><key>1</key><f1>a2        </f1><f2>b2        </f2></Row>
+ <Row><key>1</key><f1>a3        </f1><f2>b3        </f2></Row>
+ <Row><key>3</key><f1>e         </f1><f2>f         </f2></Row>
+</Dataset>
+<Dataset name='Result 9'>
+ <Row><lhskey>1</lhskey><_unnamed_cnt_2>5</_unnamed_cnt_2></Row>
+ <Row><lhskey>1</lhskey><_unnamed_cnt_2>3</_unnamed_cnt_2></Row>
+</Dataset>
+<Dataset name='Result 10'>
+ <Row><key>0</key><f1>limit hit </f1><f2>          </f2></Row>
+</Dataset>

+ 8 - 0
testing/regress/ecl/keyed_denormalize.ecl

@@ -21,14 +21,22 @@
 //version multiPart=true
 //version multiPart=true,useLocal=true
 //version multiPart=true,useTranslation=true
+//version multiPart=false,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
+//version multiPart=true,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
+//version multiPart=true,useTranslation=true,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
 
 import ^ as root;
 multiPart := #IFDEFINED(root.multiPart, true);
 useLocal := #IFDEFINED(root.useLocal, false);
 useTranslation := #IFDEFINED(root.useTranslation, false);
+forceRemoteKeyedLookup := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+forceRemoteKeyedFetch := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+
 
 //--- end of version configuration ---
 
+#option('forceRemoteKeyedLookup', forceRemoteKeyedLookup);
+#option('forceRemoteKeyedFetch', forceRemoteKeyedFetch);
 #option ('layoutTranslationEnabled', useTranslation);
 #onwarning (4522, ignore);
 #onwarning (4523, ignore);

+ 7 - 0
testing/regress/ecl/keyed_fetch.ecl

@@ -20,14 +20,21 @@
 //version multiPart=false
 //version multiPart=true
 //version multiPart=true,useLocal=true
+//version multiPart=false,useLocal=true,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
+//version multiPart=true,useLocal=true,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
 
 import ^ as root;
 multiPart := #IFDEFINED(root.multiPart, true);
 useLocal := #IFDEFINED(root.useLocal, false);
 useTranslation := #IFDEFINED(root.useTranslation, false);
+forceRemoteKeyedLookup := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+forceRemoteKeyedFetch := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+
 
 //--- end of version configuration ---
 
+#option('forceRemoteKeyedLookup', forceRemoteKeyedLookup);
+#option('forceRemoteKeyedFetch', forceRemoteKeyedFetch);
 #option ('layoutTranslationEnabled', useTranslation);
 #onwarning (4515, ignore);
 #onwarning (4523, ignore);

+ 8 - 0
testing/regress/ecl/keyed_join.ecl

@@ -21,11 +21,19 @@
 //version multiPart=true
 //version multiPart=true,useLocal=true
 //version multiPart=true,useTranslation=true
+//version multiPart=false,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
+//version multiPart=true,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
+//version multiPart=true,useTranslation=true,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
 
 import ^ as root;
 multiPart := #IFDEFINED(root.multiPart, true);
 useLocal := #IFDEFINED(root.useLocal, false);
 useTranslation := #IFDEFINED(root.useTranslation, false);
+forceRemoteKeyedLookup := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+forceRemoteKeyedFetch := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+
+#option('forceRemoteKeyedLookup', forceRemoteKeyedLookup);
+#option('forceRemoteKeyedFetch', forceRemoteKeyedFetch);
 
 //--- end of version configuration ---
 

+ 7 - 0
testing/regress/ecl/keyed_join2.ecl

@@ -21,14 +21,21 @@
 //version multiPart=true
 //version multiPart=true,useLocal=true
 //version multiPart=true,useTranslation=true
+//version multiPart=false,useLocal=true,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
+//version multiPart=true,useLocal=true,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
 
 import ^ as root;
 multiPart := #IFDEFINED(root.multiPart, true);
 useLocal := #IFDEFINED(root.useLocal, false);
 useTranslation := #IFDEFINED(root.useTranslation, false);
+forceRemoteKeyedLookup := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+forceRemoteKeyedFetch := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+
 
 //--- end of version configuration ---
 
+#option('forceRemoteKeyedLookup', forceRemoteKeyedLookup);
+#option('forceRemoteKeyedFetch', forceRemoteKeyedFetch);
 #option ('layoutTranslationEnabled', useTranslation);
 #onwarning (4522, ignore);
 #onwarning (5402, ignore);

+ 11 - 0
testing/regress/ecl/keyed_join3.ecl

@@ -15,11 +15,22 @@
     limitations under the License.
 ############################################################################## */
 
+
+//class=file
+//class=index
 //version multiPart=false
+//version multiPart=false,forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
 
 import ^ as root;
+forceRemoteKeyedLookup := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+forceRemoteKeyedFetch := #IFDEFINED(root.forceRemoteKeyedLookup, false);
 multiPart := #IFDEFINED(root.multiPart, false);
 
+#option('forceRemoteKeyedLookup', forceRemoteKeyedLookup);
+#option('forceRemoteKeyedFetch', forceRemoteKeyedFetch);
+
+
+
 //--- end of version configuration ---
 
 #onwarning(4522, ignore);

+ 106 - 0
testing/regress/ecl/keyed_join5.ecl

@@ -0,0 +1,106 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//class=file
+//class=index
+//version forceRemoteKeyedLookup=false,forceRemoteKeyedFetch=false
+//version forceRemoteKeyedLookup=true,forceRemoteKeyedFetch=true
+
+import ^ as root;
+forceRemoteKeyedLookup := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+forceRemoteKeyedFetch := #IFDEFINED(root.forceRemoteKeyedLookup, false);
+
+#option('forceRemoteKeyedLookup', forceRemoteKeyedLookup);
+#option('forceRemoteKeyedFetch', forceRemoteKeyedFetch);
+
+#onwarning(4522, ignore);
+
+rhsRec := RECORD
+ unsigned4 key;
+ string10 f1;
+ string10 f2;
+END;
+
+rhsRecFP := RECORD(rhsRec)
+ unsigned8 filePos{virtual(fileposition)};
+END;
+
+
+rhs := DATASET([{1, 'a', 'b'}, {1, 'a2', 'b2'}, {1, 'a3', 'b3'}, {1, 'a4', 'b4'}, {2, 'a2', 'd'}, {3, 'e', 'f'}], rhsRec);
+
+lhsRec := RECORD
+ unsigned4 someid;
+ unsigned4 lhsKey;
+END;
+
+lhs := DISTRIBUTE(DATASET([{0, 1}, {0, 1}, {0,2}, {1, 1}, {1, 3}], lhsRec), someid);
+
+rhsDs := DATASET('~REGRESS::'+WORKUNIT+'::rhsDs', rhsRecFP, FLAT);
+
+i := INDEX(rhsDs, {key}, {f1, filePos}, '~REGRESS::'+WORKUNIT+'::rhsDs.idx');
+
+rhsRec doHKJoinTrans(lhsRec l, RECORDOF(i) r) := TRANSFORM
+ SELF.key := IF(l.lhsKey=1 AND r.f1='a', SKIP, l.lhsKey);
+ SELF.f2 := 'blank';
+ SELF := r;
+END;
+rhsRec doFKJoinTrans(lhsRec l, rhsRecFP r) := TRANSFORM
+ SELF.key := IF(l.lhsKey=1 AND r.f1='a', SKIP, l.lhsKey);
+ SELF := r;
+END;
+
+
+// a test with a expression on a lhs field - tests helper->leftCanMatch() handling
+j1hk := JOIN(lhs, i, LEFT.lhsKey>1 AND LEFT.lhsKey=RIGHT.key);
+j1fk := JOIN(lhs, rhsDs, LEFT.lhsKey>1 AND LEFT.lhsKey=RIGHT.key, TRANSFORM({lhsRec, rhsRec}, SELF := LEFT; SELF := RIGHT), KEYED(i));
+
+
+// All the tests below with KEEP(2) have a filter with some kind to remove one of the '1' set matches.
+
+// a test with a expression on a rhs index field - tests helper->indexReadMatch() handling
+j2hk := JOIN(lhs, i, LEFT.lhsKey=RIGHT.key AND RIGHT.f1 != 'a2', KEEP(2));
+j2fk := JOIN(lhs, rhsDs, LEFT.lhsKey=RIGHT.key AND RIGHT.f1 != 'a2', TRANSFORM({lhsRec, rhsRec}, SELF := LEFT; SELF := RIGHT), KEYED(i), KEEP(2));
+
+// a test with a expression on a rhs fetch field - tests helper->fetchMatch() handling
+j3fk := JOIN(lhs, rhsDs, LEFT.lhsKey=RIGHT.key AND RIGHT.f2 != 'b2', TRANSFORM({lhsRec, rhsRec}, SELF := LEFT; SELF := RIGHT), KEYED(i), KEEP(2));
+
+// a test with a transform that skips
+j4hk := JOIN(lhs, i, LEFT.lhsKey=RIGHT.key, doHKJoinTrans(LEFT, RIGHT), KEEP(2));
+j4fk := JOIN(lhs, rhsDs, LEFT.lhsKey=RIGHT.key, doFKJoinTrans(LEFT, RIGHT), KEYED(i), KEEP(2));
+
+
+// test lhs group preserved and group counts
+j5 := TABLE(JOIN(GROUP(lhs, someid), i, LEFT.lhsKey=RIGHT.key, KEEP(2)), { lhsKey, COUNT(GROUP) }, FEW);
+
+// test helper->getRowLimit, generated inside KJ by enclosing within LIMIT()
+j6 := LIMIT(JOIN(lhs, i, LEFT.lhsKey=RIGHT.key, doHKJoinTrans(LEFT, RIGHT), KEEP(3)), 2, onFail(TRANSFORM(rhsRec, SELF.f1 := 'limit hit'; SELF := [])));
+
+SEQUENTIAL(
+ OUTPUT(rhs, , '~REGRESS::'+WORKUNIT+'::rhsDs', OVERWRITE);
+ BUILD(i, OVERWRITE);
+ PARALLEL(
+  OUTPUT(j1hk);
+  OUTPUT(j1fk);
+  OUTPUT(j2hk);
+  OUTPUT(j2fk);
+  OUTPUT(j3fk);
+  OUTPUT(j4fk);
+
+  OUTPUT(j5);
+  OUTPUT(j6);
+ );
+);

+ 1 - 0
thorlcr/activities/activitymasters_lcr.cmake

@@ -48,6 +48,7 @@ set (    SRCS
          join/thjoin.cpp 
          keydiff/thkeydiff.cpp 
          keyedjoin/thkeyedjoin.cpp 
+         keyedjoin/thkeyedjoin-legacy.cpp 
          keypatch/thkeypatch.cpp 
          limit/thlimit.cpp 
          lookupjoin/thlookupjoin.cpp 

+ 1 - 0
thorlcr/activities/activityslaves_lcr.cmake

@@ -50,6 +50,7 @@ set (    SRCS
          join/thjoinslave.cpp 
          keydiff/thkeydiffslave.cpp 
          keyedjoin/thkeyedjoinslave.cpp 
+         keyedjoin/thkeyedjoinslave-legacy.cpp 
          keypatch/thkeypatchslave.cpp 
          limit/thlimitslave.cpp 
          lookupjoin/thlookupjoinslave.cpp 

+ 350 - 0
thorlcr/activities/keyedjoin/thkeyedjoin-legacy.cpp

@@ -0,0 +1,350 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#include "dasess.hpp"
+#include "dadfs.hpp"
+#include "thexception.hpp"
+
+#include "../hashdistrib/thhashdistrib.ipp"
+#include "thkeyedjoin.ipp"
+#include "jhtree.hpp"
+
+
+namespace LegacyKJ
+{
+
+class CKeyedJoinMaster : public CMasterActivity
+{
+    IHThorKeyedJoinArg *helper;
+    Owned<IFileDescriptor> dataFileDesc;
+    Owned<CSlavePartMapping> dataFileMapping;
+    MemoryBuffer offsetMapMb, initMb;
+    bool localKey, remoteDataFiles;
+    unsigned numTags;
+    mptag_t tags[4];
+    ProgressInfoArray progressInfoArr;
+    UnsignedArray progressKinds;
+
+
+public:
+    CKeyedJoinMaster(CMasterGraphElement *info) : CMasterActivity(info)
+    {
+        helper = (IHThorKeyedJoinArg *) queryHelper();
+        //GH->JCS a bit wasteful creating this array each time.
+        progressKinds.append(StNumIndexSeeks);
+        progressKinds.append(StNumIndexScans);
+        progressKinds.append(StNumIndexAccepted);
+        progressKinds.append(StNumPostFiltered);
+        progressKinds.append(StNumPreFiltered);
+
+        if (helper->diskAccessRequired())
+        {
+            progressKinds.append(StNumDiskSeeks);
+            progressKinds.append(StNumDiskAccepted);
+            progressKinds.append(StNumDiskRejected);
+        }
+        ForEachItemIn(l, progressKinds)
+            progressInfoArr.append(*new ProgressInfo(queryJob()));
+        localKey = false;
+        numTags = 0;
+        tags[0] = tags[1] = tags[2] = tags[3] = TAG_NULL;
+        reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename);
+        remoteDataFiles = false;
+    }
+    ~CKeyedJoinMaster()
+    {
+        unsigned i;
+        for (i=0; i<4; i++)
+            if (TAG_NULL != tags[i])
+                container.queryJob().freeMPTag(tags[i]);
+    }
+    virtual void init()
+    {
+        CMasterActivity::init();
+        OwnedRoxieString indexFileName(helper->getIndexFileName());
+        Owned<IDistributedFile> dataFile;
+        Owned<IDistributedFile> indexFile = queryThorFileManager().lookup(container.queryJob(), indexFileName, false, 0 != (helper->getJoinFlags() & JFindexoptional), true);
+
+        unsigned keyReadWidth = (unsigned)container.queryJob().getWorkUnitValueInt("KJKRR", 0);
+        if (!keyReadWidth || keyReadWidth>container.queryJob().querySlaves())
+            keyReadWidth = container.queryJob().querySlaves();
+        
+
+        initMb.clear();
+        initMb.append(indexFileName.get());
+        if (helper->diskAccessRequired())
+            numTags += 2;
+        initMb.append(numTags);
+        unsigned t=0;
+        for (; t<numTags; t++)
+        {
+            tags[t] = container.queryJob().allocateMPTag();
+            initMb.append(tags[t]);
+        }
+        bool keyHasTlk = false;
+        if (indexFile)
+        {
+            if (!isFileKey(indexFile))
+                throw MakeActivityException(this, 0, "Attempting to read flat file as an index: %s", indexFileName.get());
+            unsigned numParts = 0;
+            localKey = indexFile->queryAttributes().getPropBool("@local");
+
+            if (container.queryLocalData() && !localKey)
+                throw MakeActivityException(this, 0, "Keyed Join cannot be LOCAL unless supplied index is local");
+
+            checkFormatCrc(this, indexFile, helper->getIndexFormatCrc(), helper->queryProjectedIndexRecordSize(), helper->queryIndexRecordSize(), true);
+            Owned<IFileDescriptor> indexFileDesc = indexFile->getFileDescriptor();
+            IDistributedSuperFile *superIndex = indexFile->querySuperFile();
+            unsigned superIndexWidth = 0;
+            unsigned numSuperIndexSubs = 0;
+            if (superIndex)
+            {
+                numSuperIndexSubs = superIndex->numSubFiles(true);
+                bool first=true;
+                // consistency check
+                Owned<IDistributedFileIterator> iter = superIndex->getSubFileIterator(true);
+                ForEach(*iter)
+                {
+                    IDistributedFile &f = iter->query();
+                    unsigned np = f.numParts()-1;
+                    IDistributedFilePart &part = f.queryPart(np);
+                    const char *kind = part.queryAttributes().queryProp("@kind");
+                    bool hasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind); // if last part not tlk, then deemed local (might be singlePartKey)
+                    if (first)
+                    {
+                        first = false;
+                        keyHasTlk = hasTlk;
+                        superIndexWidth = f.numParts();
+                        if (keyHasTlk)
+                            --superIndexWidth;
+                    }
+                    else
+                    {
+                        if (hasTlk != keyHasTlk)
+                            throw MakeActivityException(this, 0, "Local/Single part keys cannot be mixed with distributed(tlk) keys in keyedjoin");
+                        if (keyHasTlk && superIndexWidth != f.numParts()-1)
+                            throw MakeActivityException(this, 0, "Super sub keys of different width cannot be mixed with distributed(tlk) keys in keyedjoin");
+                    }
+                }
+                if (keyHasTlk)
+                    numParts = superIndexWidth * numSuperIndexSubs;
+                else
+                    numParts = superIndex->numParts();
+            }
+            else
+            {
+                numParts = indexFile->numParts();
+                if (numParts)
+                {
+                    const char *kind = indexFile->queryPart(indexFile->numParts()-1).queryAttributes().queryProp("@kind");
+                    keyHasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind);
+                    if (keyHasTlk)
+                        --numParts;
+                }
+            }
+            if (numParts)
+            {
+                initMb.append(numParts);
+                initMb.append(superIndexWidth); // 0 if not superIndex
+                bool interleaved = superIndex && superIndex->isInterleaved();
+                initMb.append(interleaved ? numSuperIndexSubs : 0);
+                UnsignedArray parts;
+                if (!superIndex || interleaved) // serialize first numParts parts, TLK are at end and are serialized separately.
+                {
+                    for (unsigned p=0; p<numParts; p++)
+                        parts.append(p);
+                }
+                else // non-interleaved superindex
+                {
+                    unsigned p=0;
+                    for (unsigned i=0; i<numSuperIndexSubs; i++)
+                    {
+                        for (unsigned kp=0; kp<superIndexWidth; kp++)
+                            parts.append(p++);
+                        if (keyHasTlk)
+                            p++; // TLK's serialized separately.
+                    }
+                }
+                indexFileDesc->serializeParts(initMb, parts);
+                if (localKey)
+                    keyHasTlk = false; // not used
+                initMb.append(keyHasTlk);
+                if (keyHasTlk)
+                {
+                    if (numSuperIndexSubs)
+                        initMb.append(numSuperIndexSubs);
+                    else
+                        initMb.append((unsigned)1);
+
+                    Owned<IDistributedFileIterator> iter;
+                    IDistributedFile *f;
+                    if (superIndex)
+                    {
+                        iter.setown(superIndex->getSubFileIterator(true));
+                        f = &iter->query();
+                    }
+                    else
+                        f = indexFile;
+                    for (;;)
+                    {
+                        unsigned location;
+                        OwnedIFile iFile;
+                        StringBuffer filePath;
+                        Owned<IFileDescriptor> fileDesc = f->getFileDescriptor();
+                        Owned<IPartDescriptor> tlkDesc = fileDesc->getPart(fileDesc->numParts()-1);
+                        if (!getBestFilePart(this, *tlkDesc, iFile, location, filePath))
+                            throw MakeThorException(TE_FileNotFound, "Top level key part does not exist, for key: %s", f->queryLogicalName());
+                        OwnedIFileIO iFileIO = iFile->open(IFOread);
+                        assertex(iFileIO);
+
+                        size32_t tlkSz = (size32_t)iFileIO->size();
+                        initMb.append(tlkSz);
+                        ::read(iFileIO, 0, tlkSz, initMb);
+
+                        if (!iter || !iter->next())
+                            break;
+                        f = &iter->query();
+                    }
+                }
+                if (helper->diskAccessRequired())
+                {
+                    OwnedRoxieString fetchFilename(helper->getFileName());
+                    if (fetchFilename)
+                    {
+                        dataFile.setown(queryThorFileManager().lookup(container.queryJob(), fetchFilename, false, 0 != (helper->getFetchFlags() & FFdatafileoptional), true));
+                        if (dataFile)
+                        {
+                            if (isFileKey(dataFile))
+                                throw MakeActivityException(this, 0, "Attempting to read index as a flat file: %s", fetchFilename.get());
+                            if (superIndex)
+                                throw MakeActivityException(this, 0, "Superkeys and full keyed joins are not supported");
+                            dataFileDesc.setown(getConfiguredFileDescriptor(*dataFile));
+                            void *ekey;
+                            size32_t ekeylen;
+                            helper->getFileEncryptKey(ekeylen,ekey);
+                            bool encrypted = dataFileDesc->queryProperties().getPropBool("@encrypted");
+                            if (0 != ekeylen)
+                            {
+                                memset(ekey,0,ekeylen);
+                                free(ekey);
+                                if (!encrypted)
+                                {
+                                    Owned<IException> e = MakeActivityWarning(&container, TE_EncryptionMismatch, "Ignoring encryption key provided as file '%s' was not published as encrypted", dataFile->queryLogicalName());
+                                    queryJobChannel().fireException(e);
+                                }
+                            }
+                            else if (encrypted)
+                                throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", dataFile->queryLogicalName());
+
+                            /* If fetch file is local to cluster, fetches are sent to be processed to local node, each node has info about it's
+                             * local parts only.
+                             * If fetch file is off cluster, fetches are performed by requesting node directly on fetch part, therefore each nodes
+                             * needs all part descriptors.
+                             */
+                            remoteDataFiles = false;
+                            RemoteFilename rfn;
+                            dataFileDesc->queryPart(0)->getFilename(0, rfn);
+                            if (!rfn.queryIP().ipequals(container.queryJob().querySlaveGroup().queryNode(0).endpoint()))
+                                remoteDataFiles = true;
+                            if (!remoteDataFiles) // local to cluster
+                            {
+                                unsigned dataReadWidth = (unsigned)container.queryJob().getWorkUnitValueInt("KJDRR", 0);
+                                if (!dataReadWidth || dataReadWidth>container.queryJob().querySlaves())
+                                    dataReadWidth = container.queryJob().querySlaves();
+                                Owned<IGroup> grp = container.queryJob().querySlaveGroup().subset((unsigned)0, dataReadWidth);
+                                dataFileMapping.setown(getFileSlaveMaps(dataFile->queryLogicalName(), *dataFileDesc, container.queryJob().queryUserDescriptor(), *grp, false, false, NULL));
+                                dataFileMapping->serializeFileOffsetMap(offsetMapMb.clear());
+                            }
+                        }
+                        else
+                            indexFile.clear();
+                    }
+                }
+            }
+            else
+                indexFile.clear();
+        }
+        if (indexFile)
+        {
+            addReadFile(indexFile);
+            if (dataFile)
+                addReadFile(dataFile);
+        }
+        else
+            initMb.append((unsigned)0);
+    }
+    virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
+    {
+        dst.append(initMb);
+        IDistributedFile *indexFile = queryReadFile(0); // 0 == indexFile, 1 == dataFile
+        if (indexFile && helper->diskAccessRequired())
+        {
+            IDistributedFile *dataFile = queryReadFile(1);
+            if (dataFile)
+            {
+                dst.append(remoteDataFiles);
+                if (remoteDataFiles)
+                {
+                    UnsignedArray parts;
+                    parts.append((unsigned)-1); // weird convention meaning all
+                    dst.append(dataFileDesc->numParts());
+                    dataFileDesc->serializeParts(dst, parts);
+                }
+                else
+                {
+                    dataFileMapping->serializeMap(slave, dst);
+                    dst.append(offsetMapMb);
+                }
+            }
+            else
+            {
+                CSlavePartMapping::serializeNullMap(dst);
+                CSlavePartMapping::serializeNullOffsetMap(dst);
+            }
+        }
+    }
+    virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
+    {
+        CMasterActivity::deserializeStats(node, mb);
+        ForEachItemIn(p, progressKinds)
+        {
+            unsigned __int64 st;
+            mb.read(st);
+            progressInfoArr.item(p).set(node, st);
+        }
+    }
+    virtual void getEdgeStats(IStatisticGatherer & stats, unsigned idx)
+    {
+        //This should be an activity stats
+        CMasterActivity::getEdgeStats(stats, idx);
+        assertex(0 == idx);
+        ForEachItemIn(p, progressInfoArr)
+        {
+            ProgressInfo &progress = progressInfoArr.item(p);
+            progress.processInfo();
+            stats.addStatistic((StatisticKind)progressKinds.item(p), progress.queryTotal());
+        }
+    }
+};
+
+
+CActivityBase *createKeyedJoinActivityMaster(CMasterGraphElement *info)
+{
+    return new CKeyedJoinMaster(info);
+}
+
+}

+ 405 - 219
thorlcr/activities/keyedjoin/thkeyedjoin.cpp

@@ -15,308 +15,492 @@
     limitations under the License.
 ############################################################################## */
 
+#include <array>
+#include <vector>
+#include <algorithm>
 #include "dasess.hpp"
 #include "dadfs.hpp"
 #include "thexception.hpp"
 
+#include "../fetch/thfetchcommon.hpp"
 #include "../hashdistrib/thhashdistrib.ipp"
 #include "thkeyedjoin.ipp"
 #include "jhtree.hpp"
 
+static const std::array<StatisticKind, 8> progressKinds{ StNumIndexSeeks, StNumIndexScans, StNumIndexAccepted, StNumPostFiltered, StNumPreFiltered, StNumDiskSeeks, StNumDiskAccepted, StNumDiskRejected };
+
 class CKeyedJoinMaster : public CMasterActivity
 {
-    IHThorKeyedJoinArg *helper;
-    Owned<IFileDescriptor> dataFileDesc;
-    Owned<CSlavePartMapping> dataFileMapping;
-    MemoryBuffer offsetMapMb, initMb;
-    bool localKey, remoteDataFiles;
-    unsigned numTags;
-    mptag_t tags[4];
+    IHThorKeyedJoinArg *helper = nullptr;
+    Owned<IFileDescriptor> dataFileDesc, indexFileDesc;
+    MemoryBuffer initMb;
+    unsigned numTags = 0;
+    std::vector<mptag_t> tags;
     ProgressInfoArray progressInfoArr;
-    UnsignedArray progressKinds;
+
+    bool local = false;
+    bool remoteKeyedLookup = false;
+    bool remoteKeyedFetch = false;
+    unsigned totalIndexParts = 0;
+
+    // CMap contains mappings and lists of parts for each slave
+    class CMap
+    {
+        static const unsigned partMask = 0x00ffffff;
+    public:
+        std::vector<unsigned> allParts;
+        std::vector<std::vector<unsigned>> slavePartMap; // vector of slave parts (IPartDescriptor's slavePartMap[<slave>] serialized to each slave)
+        std::vector<unsigned> partToSlave; // vector mapping part index to slave (sent to all slaves)
+
+        void setup(unsigned slaves, unsigned parts)
+        {
+            clear();
+            slavePartMap.resize(slaves);
+            partToSlave.resize(parts);
+        }
+        void clear()
+        {
+            allParts.clear();
+            slavePartMap.clear();
+            partToSlave.clear();
+        }
+        unsigned count() const { return partToSlave.size(); }
+        void serializePartMap(MemoryBuffer &mb) const
+        {
+            mb.append(partToSlave.size() * sizeof(unsigned), &partToSlave[0]);
+        }
+        unsigned querySlave(unsigned part) const { return partToSlave[part]; }
+        std::vector<unsigned> &querySlaveParts(unsigned slave) { return slavePartMap[slave]; }
+        std::vector<unsigned> &queryAllParts() { return allParts; }
+
+
+        /* maps input file into lists of parts for slaves and a mapping for slaves to find other parts
+         * If 'allLocal' option is true, it will also map replicate copies and use them directly if local to slave.
+         */
+        void map(CKeyedJoinMaster &activity, IDistributedFile *file, bool isIndexWithTlk, bool allLocal)
+        {
+            Owned<IFileDescriptor> fileDesc = file->getFileDescriptor();
+            assertex(fileDesc);
+            IDistributedSuperFile *super = file->querySuperFile();
+            ISuperFileDescriptor *superFileDesc = fileDesc->querySuperFileDescriptor();
+            unsigned totalParts = file->numParts();
+            if (isIndexWithTlk)
+                totalParts -= super ? super->numSubFiles(true) : 1;
+
+            IGroup &dfsGroup = queryDfsGroup();
+            setup(dfsGroup.ordinality(), totalParts);
+
+            unsigned numSuperIndexSubs = 0;
+            unsigned superWidth = 0;
+            if (super)
+            {
+                if (super->numSubFiles(true))
+                {
+                    if (!super->isInterleaved())
+                        numSuperIndexSubs = super->numSubFiles(true);
+
+                    IDistributedFile &sub = super->querySubFile(0, true);
+                    superWidth = sub.numParts();
+                    if (isIndexWithTlk)
+                        --superWidth;
+                }
+            }
+
+            unsigned groupSize = dfsGroup.ordinality();
+            std::vector<unsigned> partsByPartIdx;
+            Owned<IBitSet> partsOnSlaves = createBitSet();
+            unsigned numParts = fileDesc->numParts();
+            unsigned nextGroupStartPos = 0;
+
+            for (unsigned p=0; p<numParts; p++)
+            {
+                IPartDescriptor *part = fileDesc->queryPart(p);
+                const char *kind = isIndexWithTlk ? part->queryProperties().queryProp("@kind") : nullptr;
+                if (!kind || !strsame("topLevelKey", kind))
+                {
+                    unsigned partIdx = part->queryPartIndex();
+                    unsigned subfile = NotFound;
+                    unsigned subPartIdx = partIdx;
+                    if (superFileDesc)
+                    {
+                        superFileDesc->mapSubPart(partIdx, subfile, subPartIdx);
+                        partIdx = superWidth*subfile+subPartIdx;
+                    }
+                    if (activity.local)
+                    {
+                        if (activity.queryContainer().queryLocalData())
+                        {
+                            if (subPartIdx < dfsGroup.ordinality())
+                            {
+                                std::vector<unsigned> &slaveParts = querySlaveParts(subPartIdx);
+                                slaveParts.push_back(p);
+                            }
+                        }
+                        else
+                        {
+                            for (auto &slaveParts : slavePartMap)
+                                slaveParts.push_back(p);
+                        }
+                        partsByPartIdx.push_back(partIdx);
+                    }
+                    else
+                    {
+                        /* see if any of the part copies are local to any of the cluster nodes
+                         * Add them to local parts list if found.
+                         */
+                        unsigned mappedPos = NotFound;
+                        for (unsigned c=0; c<part->numCopies(); c++)
+                        {
+                            INode *partNode = part->queryNode(c);
+                            unsigned partCopy = p | (c << 24);
+                            unsigned start=nextGroupStartPos;
+                            unsigned gn=start;
+                            do
+                            {
+                                INode &groupNode = dfsGroup.queryNode(gn);
+                                if ((partNode->equals(&groupNode)))
+                                {
+                                    /* NB: If there's >1 slave per node (e.g. slavesPerNode>1) then there are multiple matching node's in the dfsGroup
+                                     * Which means a copy of a part may already be assigned to a cluster slave map. This check avoid handling it again if it has.
+                                     */
+                                    if (!partsOnSlaves->testSet(groupSize*p+gn))
+                                    {
+                                        std::vector<unsigned> &slaveParts = querySlaveParts(gn);
+                                        if (NotFound == mappedPos)
+                                        {
+                                            /* NB: to avoid all parts being mapped to same remote slave process (significant if slavesPerNode>1)
+                                             * or (conditionally) all accessible locals being added to all slaves (which may have detrimental effect on key node caching)
+                                             * inc. group start pos for beginning of next search.
+                                             */
+                                            slaveParts.push_back(partCopy);
+                                            if (activity.queryContainer().queryJob().queryChannelsPerSlave()>1)
+                                                mappedPos = gn % queryNodeClusterWidth();
+                                            else
+                                                mappedPos = gn;
+                                            nextGroupStartPos = gn+1;
+                                            if (nextGroupStartPos == groupSize)
+                                                nextGroupStartPos = 0;
+                                        }
+                                        else if (allLocal) // all slaves get all locally accessible parts
+                                            slaveParts.push_back(partCopy);
+                                    }
+                                }
+                                gn++;
+                                if (gn == groupSize)
+                                    gn = 0;
+                            }
+                            while (gn != start);
+                        }
+                        if (NotFound == mappedPos)
+                        {
+                            // part not within the cluster, add it to all slave maps, meaning these part meta will be serialized to all slaves so they handle the lookups directly.
+                            for (auto &slaveParts : slavePartMap)
+                                slaveParts.push_back(p);
+                        }
+                        if (superFileDesc)
+                            partIdx = superWidth*subfile+subPartIdx;
+                        partsByPartIdx.push_back(partIdx);
+                        assertex(partIdx < totalParts);
+                        partToSlave[partIdx] = mappedPos;
+                    }
+                }
+            }
+            if (!activity.local)
+            {
+                if (0 == numSuperIndexSubs)
+                {
+                    for (unsigned p=0; p<totalParts; p++)
+                        allParts.push_back(p);
+                }
+                else // non-interleaved superindex
+                {
+                    unsigned p=0;
+                    for (unsigned i=0; i<numSuperIndexSubs; i++)
+                    {
+                        for (unsigned kp=0; kp<superWidth; kp++)
+                            allParts.push_back(p++);
+                        if (isIndexWithTlk)
+                            p++; // TLK's serialized separately.
+                    }
+                }
+                // ensure sorted by partIdx, so that consistent order for partHandlers/lookup
+                std::sort(allParts.begin(), allParts.end(), [partsByPartIdx](unsigned a, unsigned b) { return partsByPartIdx[a] < partsByPartIdx[b]; });
+            }
+            // ensure sorted by partIdx, so that consistent order for partHandlers/lookup
+            for (auto &slaveParts : slavePartMap)
+                std::sort(slaveParts.begin(), slaveParts.end(), [partsByPartIdx](unsigned a, unsigned b) { return partsByPartIdx[a & partMask] < partsByPartIdx[b & partMask]; });
+        }
+    };
+
+    CMap indexMap, dataMap;
 
 
 public:
     CKeyedJoinMaster(CMasterGraphElement *info) : CMasterActivity(info)
     {
         helper = (IHThorKeyedJoinArg *) queryHelper();
-        //GH->JCS a bit wasteful creating this array each time.
-        progressKinds.append(StNumIndexSeeks);
-        progressKinds.append(StNumIndexScans);
-        progressKinds.append(StNumIndexAccepted);
-        progressKinds.append(StNumPostFiltered);
-        progressKinds.append(StNumPreFiltered);
+        unsigned numStats = helper->diskAccessRequired() ? 8 : 5; // see progressKinds array
+        for (unsigned s=0; s<numStats; s++)
+            progressInfoArr.append(*new ProgressInfo(queryJob()));
+        reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename);
+
+        // NB: force options are there to force all parts to be remote, even if local to slave (handled on slave)
+        remoteKeyedLookup = getOptBool(THOROPT_REMOTE_KEYED_LOOKUP, true);
+        if (getOptBool(THOROPT_FORCE_REMOTE_KEYED_LOOKUP))
+            remoteKeyedLookup = true;
+        remoteKeyedFetch = getOptBool(THOROPT_REMOTE_KEYED_FETCH, true);
+        if (getOptBool(THOROPT_FORCE_REMOTE_KEYED_FETCH))
+            remoteKeyedFetch = true;
 
         if (helper->diskAccessRequired())
+            numTags += 2;
+        for (unsigned t=0; t<numTags; t++)
         {
-            progressKinds.append(StNumDiskSeeks);
-            progressKinds.append(StNumDiskAccepted);
-            progressKinds.append(StNumDiskRejected);
+            mptag_t tag = container.queryJob().allocateMPTag();
+            tags.push_back(tag);
         }
-        ForEachItemIn(l, progressKinds)
-            progressInfoArr.append(*new ProgressInfo(queryJob()));
-        localKey = false;
-        numTags = 0;
-        tags[0] = tags[1] = tags[2] = tags[3] = TAG_NULL;
-        reInit = 0 != (helper->getFetchFlags() & (FFvarfilename|FFdynamicfilename)) || (helper->getJoinFlags() & JFvarindexfilename);
-        remoteDataFiles = false;
     }
     ~CKeyedJoinMaster()
     {
-        unsigned i;
-        for (i=0; i<4; i++)
-            if (TAG_NULL != tags[i])
-                container.queryJob().freeMPTag(tags[i]);
+        for (const mptag_t &tag : tags)
+            container.queryJob().freeMPTag(tag);
     }
     virtual void init()
     {
         CMasterActivity::init();
         OwnedRoxieString indexFileName(helper->getIndexFileName());
-        Owned<IDistributedFile> dataFile;
-        Owned<IDistributedFile> indexFile = queryThorFileManager().lookup(container.queryJob(), indexFileName, false, 0 != (helper->getJoinFlags() & JFindexoptional), true);
-
-        unsigned keyReadWidth = (unsigned)container.queryJob().getWorkUnitValueInt("KJKRR", 0);
-        if (!keyReadWidth || keyReadWidth>container.queryJob().querySlaves())
-            keyReadWidth = container.queryJob().querySlaves();
-        
 
         initMb.clear();
         initMb.append(indexFileName.get());
-        if (helper->diskAccessRequired())
-            numTags += 2;
-        initMb.append(numTags);
-        unsigned t=0;
-        for (; t<numTags; t++)
-        {
-            tags[t] = container.queryJob().allocateMPTag();
-            initMb.append(tags[t]);
-        }
         bool keyHasTlk = false;
+        totalIndexParts = 0;
+
+        Owned<IDistributedFile> dataFile;
+        Owned<IDistributedFile> indexFile = queryThorFileManager().lookup(container.queryJob(), indexFileName, false, 0 != (helper->getJoinFlags() & JFindexoptional), true);
         if (indexFile)
         {
             if (!isFileKey(indexFile))
                 throw MakeActivityException(this, 0, "Attempting to read flat file as an index: %s", indexFileName.get());
-            unsigned numParts = 0;
-            localKey = indexFile->queryAttributes().getPropBool("@local");
-
-            if (container.queryLocalData() && !localKey)
-                throw MakeActivityException(this, 0, "Keyed Join cannot be LOCAL unless supplied index is local");
-
-            checkFormatCrc(this, indexFile, helper->getIndexFormatCrc(), helper->queryProjectedIndexRecordSize(), helper->queryIndexRecordSize(), true);
-            Owned<IFileDescriptor> indexFileDesc = indexFile->getFileDescriptor();
             IDistributedSuperFile *superIndex = indexFile->querySuperFile();
-            unsigned superIndexWidth = 0;
-            unsigned numSuperIndexSubs = 0;
-            if (superIndex)
+            if (helper->diskAccessRequired())
             {
-                numSuperIndexSubs = superIndex->numSubFiles(true);
-                bool first=true;
-                // consistency check
-                Owned<IDistributedFileIterator> iter = superIndex->getSubFileIterator(true);
-                ForEach(*iter)
+                OwnedRoxieString fetchFilename(helper->getFileName());
+                if (fetchFilename)
                 {
-                    IDistributedFile &f = iter->query();
-                    unsigned np = f.numParts()-1;
-                    IDistributedFilePart &part = f.queryPart(np);
-                    const char *kind = part.queryAttributes().queryProp("@kind");
-                    bool hasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind); // if last part not tlk, then deemed local (might be singlePartKey)
-                    if (first)
-                    {
-                        first = false;
-                        keyHasTlk = hasTlk;
-                        superIndexWidth = f.numParts();
-                        if (keyHasTlk)
-                            --superIndexWidth;
-                    }
-                    else
+                    dataFile.setown(queryThorFileManager().lookup(container.queryJob(), fetchFilename, false, 0 != (helper->getFetchFlags() & FFdatafileoptional), true));
+                    if (dataFile)
                     {
-                        if (hasTlk != keyHasTlk)
-                            throw MakeActivityException(this, 0, "Local/Single part keys cannot be mixed with distributed(tlk) keys in keyedjoin");
-                        if (keyHasTlk && superIndexWidth != f.numParts()-1)
-                            throw MakeActivityException(this, 0, "Super sub keys of different width cannot be mixed with distributed(tlk) keys in keyedjoin");
+                        if (isFileKey(dataFile))
+                            throw MakeActivityException(this, 0, "Attempting to read index as a flat file: %s", fetchFilename.get());
+                        if (superIndex)
+                            throw MakeActivityException(this, 0, "Superkeys and full keyed joins are not supported");
+
+                        dataFileDesc.setown(getConfiguredFileDescriptor(*dataFile));
+                        void *ekey;
+                        size32_t ekeylen;
+                        helper->getFileEncryptKey(ekeylen,ekey);
+                        bool encrypted = dataFileDesc->queryProperties().getPropBool("@encrypted");
+                        if (0 != ekeylen)
+                        {
+                            memset(ekey,0,ekeylen);
+                            free(ekey);
+                            if (!encrypted)
+                            {
+                                Owned<IException> e = MakeActivityWarning(&container, TE_EncryptionMismatch, "Ignoring encryption key provided as file '%s' was not published as encrypted", dataFile->queryLogicalName());
+                                queryJobChannel().fireException(e);
+                            }
+                        }
+                        else if (encrypted)
+                            throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", dataFile->queryLogicalName());
+
+                        /* If fetch file is local to cluster, fetches are sent to the slave the parts are local to.
+                         * If fetch file is off cluster, fetches are performed by requesting node directly on fetch part, therefore each nodes
+                         * needs all part descriptors.
+                         */
+                        if (remoteKeyedFetch)
+                        {
+                            RemoteFilename rfn;
+                            dataFileDesc->queryPart(0)->getFilename(0, rfn);
+                            if (!rfn.queryIP().ipequals(container.queryJob().querySlaveGroup().queryNode(0).endpoint()))
+                                remoteKeyedFetch = false;
+                        }
+                        dataMap.map(*this, dataFile, false, getOptBool("allLocalFetchParts"));
                     }
                 }
-                if (keyHasTlk)
-                    numParts = superIndexWidth * numSuperIndexSubs;
-                else
-                    numParts = superIndex->numParts();
-            }
-            else
-            {
-                numParts = indexFile->numParts();
-                if (numParts)
-                {
-                    const char *kind = indexFile->queryPart(indexFile->numParts()-1).queryAttributes().queryProp("@kind");
-                    keyHasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind);
-                    if (keyHasTlk)
-                        --numParts;
-                }
             }
-            if (numParts)
+            if (!helper->diskAccessRequired() || dataFileDesc)
             {
-                initMb.append(numParts);
-                initMb.append(superIndexWidth); // 0 if not superIndex
-                bool interleaved = superIndex && superIndex->isInterleaved();
-                initMb.append(interleaved ? numSuperIndexSubs : 0);
-                UnsignedArray parts;
-                if (!superIndex || interleaved) // serialize first numParts parts, TLK are at end and are serialized separately.
+                bool localKey = indexFile->queryAttributes().getPropBool("@local");
+                local = localKey || container.queryLocalData();
+                if (local)
                 {
-                    for (unsigned p=0; p<numParts; p++)
-                        parts.append(p);
+                    remoteKeyedLookup = false;
+                    remoteKeyedFetch = false;
                 }
-                else // non-interleaved superindex
+                checkFormatCrc(this, indexFile, helper->getIndexFormatCrc(), helper->queryProjectedIndexRecordSize(), helper->queryIndexRecordSize(), true);
+                indexFileDesc.setown(indexFile->getFileDescriptor());
+
+                unsigned superIndexWidth = 0;
+                unsigned numSuperIndexSubs = 0;
+                if (superIndex)
                 {
-                    unsigned p=0;
-                    for (unsigned i=0; i<numSuperIndexSubs; i++)
+                    numSuperIndexSubs = superIndex->numSubFiles(true);
+                    bool first=true;
+                    // consistency check
+                    Owned<IDistributedFileIterator> iter = superIndex->getSubFileIterator(true);
+                    ForEach(*iter)
                     {
-                        for (unsigned kp=0; kp<superIndexWidth; kp++)
-                            parts.append(p++);
-                        if (keyHasTlk)
-                            p++; // TLK's serialized separately.
+                        IDistributedFile &f = iter->query();
+                        unsigned np = f.numParts()-1;
+                        IDistributedFilePart &part = f.queryPart(np);
+                        const char *kind = part.queryAttributes().queryProp("@kind");
+                        bool hasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind); // if last part not tlk, then deemed local (might be singlePartKey)
+                        if (first)
+                        {
+                            first = false;
+                            keyHasTlk = hasTlk;
+                            superIndexWidth = f.numParts();
+                            if (keyHasTlk)
+                                --superIndexWidth;
+                        }
+                        else
+                        {
+                            if (hasTlk != keyHasTlk)
+                                throw MakeActivityException(this, 0, "Local/Single part keys cannot be mixed with distributed(tlk) keys in keyedjoin");
+                            if (keyHasTlk && superIndexWidth != f.numParts()-1)
+                                throw MakeActivityException(this, 0, "Super sub keys of different width cannot be mixed with distributed(tlk) keys in keyedjoin");
+                            if (localKey && superIndexWidth != queryClusterWidth())
+                                throw MakeActivityException(this, 0, "Super keys of local index must be same width as target cluster");
+                        }
                     }
+                    if (keyHasTlk)
+                        totalIndexParts = superIndexWidth * numSuperIndexSubs;
+                    else
+                        totalIndexParts = superIndex->numParts();
                 }
-                indexFileDesc->serializeParts(initMb, parts);
-                if (localKey)
-                    keyHasTlk = false; // not used
-                initMb.append(keyHasTlk);
-                if (keyHasTlk)
+                else
                 {
-                    if (numSuperIndexSubs)
-                        initMb.append(numSuperIndexSubs);
-                    else
-                        initMb.append((unsigned)1);
-
-                    Owned<IDistributedFileIterator> iter;
-                    IDistributedFile *f;
-                    if (superIndex)
-                    {
-                        iter.setown(superIndex->getSubFileIterator(true));
-                        f = &iter->query();
-                    }
-                    else
-                        f = indexFile;
-                    for (;;)
+                    totalIndexParts = indexFile->numParts();
+                    if (totalIndexParts)
                     {
-                        unsigned location;
-                        OwnedIFile iFile;
-                        StringBuffer filePath;
-                        Owned<IFileDescriptor> fileDesc = f->getFileDescriptor();
-                        Owned<IPartDescriptor> tlkDesc = fileDesc->getPart(fileDesc->numParts()-1);
-                        if (!getBestFilePart(this, *tlkDesc, iFile, location, filePath))
-                            throw MakeThorException(TE_FileNotFound, "Top level key part does not exist, for key: %s", f->queryLogicalName());
-                        OwnedIFileIO iFileIO = iFile->open(IFOread);
-                        assertex(iFileIO);
-
-                        size32_t tlkSz = (size32_t)iFileIO->size();
-                        initMb.append(tlkSz);
-                        ::read(iFileIO, 0, tlkSz, initMb);
-
-                        if (!iter || !iter->next())
-                            break;
-                        f = &iter->query();
+                        const char *kind = indexFile->queryPart(indexFile->numParts()-1).queryAttributes().queryProp("@kind");
+                        keyHasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind);
+                        if (keyHasTlk)
+                            --totalIndexParts;
                     }
                 }
-                if (helper->diskAccessRequired())
+
+                // serialize common (to all slaves) info
+                initMb.append(totalIndexParts);
+                if (totalIndexParts)
                 {
-                    OwnedRoxieString fetchFilename(helper->getFileName());
-                    if (fetchFilename)
+                    indexMap.map(*this, indexFile, keyHasTlk, getOptBool("allLocalIndexParts"));
+                    initMb.append(numTags);
+                    for (auto &tag: tags)
+                        initMb.append(tag);
+                    initMb.append(remoteKeyedLookup);
+                    initMb.append(remoteKeyedFetch);
+                    initMb.append(superIndexWidth); // 0 if not superIndex
+                    if (localKey)
+                        keyHasTlk = false; // JCSMORE, not used at least for now
+                    initMb.append(keyHasTlk);
+                    if (keyHasTlk)
                     {
-                        dataFile.setown(queryThorFileManager().lookup(container.queryJob(), fetchFilename, false, 0 != (helper->getFetchFlags() & FFdatafileoptional), true));
-                        if (dataFile)
+                        if (numSuperIndexSubs)
+                            initMb.append(numSuperIndexSubs);
+                        else
+                            initMb.append((unsigned)1);
+
+                        Owned<IDistributedFileIterator> iter;
+                        IDistributedFile *f;
+                        if (superIndex)
                         {
-                            if (isFileKey(dataFile))
-                                throw MakeActivityException(this, 0, "Attempting to read index as a flat file: %s", fetchFilename.get());
-                            if (superIndex)
-                                throw MakeActivityException(this, 0, "Superkeys and full keyed joins are not supported");
-                            dataFileDesc.setown(getConfiguredFileDescriptor(*dataFile));
-                            void *ekey;
-                            size32_t ekeylen;
-                            helper->getFileEncryptKey(ekeylen,ekey);
-                            bool encrypted = dataFileDesc->queryProperties().getPropBool("@encrypted");
-                            if (0 != ekeylen)
-                            {
-                                memset(ekey,0,ekeylen);
-                                free(ekey);
-                                if (!encrypted)
-                                {
-                                    Owned<IException> e = MakeActivityWarning(&container, TE_EncryptionMismatch, "Ignoring encryption key provided as file '%s' was not published as encrypted", dataFile->queryLogicalName());
-                                    queryJobChannel().fireException(e);
-                                }
-                            }
-                            else if (encrypted)
-                                throw MakeActivityException(this, 0, "File '%s' was published as encrypted but no encryption key provided", dataFile->queryLogicalName());
-
-                            /* If fetch file is local to cluster, fetches are sent to be processed to local node, each node has info about it's
-                             * local parts only.
-                             * If fetch file is off cluster, fetches are performed by requesting node directly on fetch part, therefore each nodes
-                             * needs all part descriptors.
-                             */
-                            remoteDataFiles = false;
-                            RemoteFilename rfn;
-                            dataFileDesc->queryPart(0)->getFilename(0, rfn);
-                            if (!rfn.queryIP().ipequals(container.queryJob().querySlaveGroup().queryNode(0).endpoint()))
-                                remoteDataFiles = true;
-                            if (!remoteDataFiles) // local to cluster
-                            {
-                                unsigned dataReadWidth = (unsigned)container.queryJob().getWorkUnitValueInt("KJDRR", 0);
-                                if (!dataReadWidth || dataReadWidth>container.queryJob().querySlaves())
-                                    dataReadWidth = container.queryJob().querySlaves();
-                                Owned<IGroup> grp = container.queryJob().querySlaveGroup().subset((unsigned)0, dataReadWidth);
-                                dataFileMapping.setown(getFileSlaveMaps(dataFile->queryLogicalName(), *dataFileDesc, container.queryJob().queryUserDescriptor(), *grp, false, false, NULL));
-                                dataFileMapping->serializeFileOffsetMap(offsetMapMb.clear());
-                            }
+                            iter.setown(superIndex->getSubFileIterator(true));
+                            f = &iter->query();
                         }
                         else
-                            indexFile.clear();
+                            f = indexFile;
+                        for (;;)
+                        {
+                            unsigned location;
+                            OwnedIFile iFile;
+                            StringBuffer filePath;
+                            Owned<IFileDescriptor> fileDesc = f->getFileDescriptor();
+                            Owned<IPartDescriptor> tlkDesc = fileDesc->getPart(fileDesc->numParts()-1);
+                            if (!getBestFilePart(this, *tlkDesc, iFile, location, filePath))
+                                throw MakeThorException(TE_FileNotFound, "Top level key part does not exist, for key: %s", f->queryLogicalName());
+                            OwnedIFileIO iFileIO = iFile->open(IFOread);
+                            assertex(iFileIO);
+
+                            size32_t tlkSz = (size32_t)iFileIO->size();
+                            initMb.append(tlkSz);
+                            ::read(iFileIO, 0, tlkSz, initMb);
+
+                            if (!iter || !iter->next())
+                                break;
+                            f = &iter->query();
+                        }
                     }
                 }
+                else
+                {
+                    indexFile.clear();
+                    indexFileDesc.clear();
+                    dataFile.clear();
+                    dataFileDesc.clear();
+                }
             }
-            else
-                indexFile.clear();
         }
+        else
+            initMb.append(totalIndexParts); // 0
         if (indexFile)
         {
             addReadFile(indexFile);
             if (dataFile)
                 addReadFile(dataFile);
         }
-        else
-            initMb.append((unsigned)0);
     }
     virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
     {
         dst.append(initMb);
-        IDistributedFile *indexFile = queryReadFile(0); // 0 == indexFile, 1 == dataFile
-        if (indexFile && helper->diskAccessRequired())
+        if (totalIndexParts)
         {
-            IDistributedFile *dataFile = queryReadFile(1);
-            if (dataFile)
+            std::vector<unsigned> &allParts = local ? indexMap.querySlaveParts(slave) : indexMap.queryAllParts();
+            unsigned numParts = allParts.size();
+            dst.append(numParts);
+            if (numParts)
             {
-                dst.append(remoteDataFiles);
-                if (remoteDataFiles)
-                {
-                    UnsignedArray parts;
-                    parts.append((unsigned)-1); // weird convention meaning all
-                    dst.append(dataFileDesc->numParts());
-                    dataFileDesc->serializeParts(dst, parts);
-                }
-                else
-                {
-                    dataFileMapping->serializeMap(slave, dst);
-                    dst.append(offsetMapMb);
-                }
+                indexFileDesc->serializeParts(dst, &allParts[0], numParts);
+                std::vector<unsigned> &parts = remoteKeyedLookup ? indexMap.querySlaveParts(slave) : allParts;
+                dst.append((unsigned)parts.size());
+                dst.append(sizeof(unsigned)*parts.size(), &parts[0]);
             }
-            else
+            if (remoteKeyedLookup)
+                indexMap.serializePartMap(dst);
+            unsigned totalDataParts = dataMap.count();
+            dst.append(totalDataParts);
+            if (totalDataParts)
             {
-                CSlavePartMapping::serializeNullMap(dst);
-                CSlavePartMapping::serializeNullOffsetMap(dst);
+                std::vector<unsigned> &allParts = dataMap.queryAllParts();
+                unsigned numParts = allParts.size();
+                dst.append(numParts);
+                if (numParts)
+                {
+                    dataFileDesc->serializeParts(dst, &allParts[0], numParts);
+                    std::vector<unsigned> &parts = remoteKeyedFetch ? dataMap.querySlaveParts(slave) : allParts;
+                    unsigned numSlaveParts = parts.size();
+                    dst.append(numSlaveParts);
+                    dst.append(sizeof(unsigned)*numSlaveParts, &parts[0]);
+                }
+                if (remoteKeyedFetch)
+                    dataMap.serializePartMap(dst);
             }
         }
     }
     virtual void deserializeStats(unsigned node, MemoryBuffer &mb)
     {
         CMasterActivity::deserializeStats(node, mb);
-        ForEachItemIn(p, progressKinds)
+        ForEachItemIn(p, progressInfoArr)
         {
             unsigned __int64 st;
             mb.read(st);
@@ -332,7 +516,7 @@ public:
         {
             ProgressInfo &progress = progressInfoArr.item(p);
             progress.processInfo();
-            stats.addStatistic((StatisticKind)progressKinds.item(p), progress.queryTotal());
+            stats.addStatistic(progressKinds[p], progress.queryTotal());
         }
     }
 };
@@ -340,5 +524,7 @@ public:
 
 CActivityBase *createKeyedJoinActivityMaster(CMasterGraphElement *info)
 {
+    if (info->getOptBool("legacykj"))
+        return LegacyKJ::createKeyedJoinActivityMaster(info);
     return new CKeyedJoinMaster(info);
 }

+ 4 - 0
thorlcr/activities/keyedjoin/thkeyedjoin.ipp

@@ -22,5 +22,9 @@
 
 
 CActivityBase *createKeyedJoinActivityMaster(CMasterGraphElement *info);
+namespace LegacyKJ
+{
+    CActivityBase *createKeyedJoinActivityMaster(CMasterGraphElement *info);
+}
 
 #endif

+ 52 - 0
thorlcr/activities/keyedjoin/thkeyedjoincommon.hpp

@@ -0,0 +1,52 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+#ifndef _THKJCOMMON_HPP_
+#define _THKJCOMMON_HPP_
+
+#ifdef ACTIVITYSLAVES_EXPORTS
+ #define activityslaves_decl DECL_EXPORT
+#else
+ #define activityslaves_decl DECL_IMPORT
+#endif
+
+class CJoinGroup;
+struct KeyLookupHeader
+{
+    CJoinGroup *jg;
+};
+struct FetchRequestHeader
+{
+    offset_t fpos;
+    CJoinGroup *jg;
+    unsigned __int64 sequence;
+};
+struct FetchReplyHeader
+{
+    static const unsigned __int64 fetchMatchedMask = 0x8000000000000000;
+    unsigned __int64 sequence; // fetchMatchedMask used to screen top-bit to denote whether reply fetch matched or not
+};
+template <class HeaderStruct>
+void getHeaderFromRow(const void *row, HeaderStruct &header)
+{
+    memcpy(&header, row, sizeof(HeaderStruct));
+}
+enum GroupFlags:unsigned { gf_null=0x0, gf_limitatmost=0x01, gf_limitabort=0x02, gf_eog=0x04, gf_head=0x08 };
+enum KJServiceCmds:byte { kjs_nop, kjs_keyopen, kjs_keyread, kjs_keyclose, kjs_fetchopen, kjs_fetchread, kjs_fetchclose };
+enum KJFetchFlags:byte { kjf_nop=0x0, kjf_compressed=0x1, kjf_encrypted=0x2 };
+
+#endif

文件差異過大導致無法顯示
+ 2446 - 0
thorlcr/activities/keyedjoin/thkeyedjoinslave-legacy.cpp


文件差異過大導致無法顯示
+ 2191 - 1946
thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp


+ 4 - 0
thorlcr/activities/keyedjoin/thkeyedjoinslave.ipp

@@ -22,5 +22,9 @@
 #include "slave.ipp"
 
 activityslaves_decl CActivityBase *createKeyedJoinSlave(CGraphElementBase *container);
+namespace LegacyKJ
+{
+    activityslaves_decl CActivityBase *createKeyedJoinSlave(CGraphElementBase *container);
+}
 
 #endif

+ 14 - 6
thorlcr/graph/thgraph.cpp

@@ -2610,9 +2610,9 @@ CJobBase::CJobBase(ILoadedDllEntry *_querySo, const char *_graphName) : querySo(
     maxDiskUsage = diskUsage = 0;
     dirty = true;
     aborted = false;
-    mpJobTag = TAG_NULL;
     globalMemoryMB = globals->getPropInt("@globalMemorySize"); // in MB
-    numChannels = globals->getPropInt("@channelsPerSlave", 1);
+    channelsPerSlave = globals->getPropInt("@channelsPerSlave", 1);
+    numChannels = channelsPerSlave;
     pluginMap = new SafePluginMap(&pluginCtx, true);
 
 // JCSMORE - Will pass down at job creation time...
@@ -3084,9 +3084,16 @@ void CActivityBase::ActPrintLog(IException *e)
     ActPrintLog(e, "%s", "");
 }
 
-IThorRowInterfaces * CActivityBase::createRowInterfaces(IOutputMetaData * meta)
+IThorRowInterfaces * CActivityBase::createRowInterfaces(IOutputMetaData * meta, byte seq)
 {
-    return createThorRowInterfaces(queryRowManager(), meta, queryId(), queryHeapFlags(), queryCodeContext());
+    activity_id id = createCompoundActSeqId(queryId(), seq);
+    return createThorRowInterfaces(queryRowManager(), meta, id, queryHeapFlags(), queryCodeContext());
+}
+
+IThorRowInterfaces * CActivityBase::createRowInterfaces(IOutputMetaData * meta, roxiemem::RoxieHeapFlags heapFlags, byte seq)
+{
+    activity_id id = createCompoundActSeqId(queryId(), seq);
+    return createThorRowInterfaces(queryRowManager(), meta, id, heapFlags, queryCodeContext());
 }
 
 bool CActivityBase::fireException(IException *e)
@@ -3163,9 +3170,10 @@ IThorRowInterfaces *CActivityBase::getRowInterfaces()
     return createThorRowInterfaces(queryRowManager(), queryRowMetaData(), container.queryId(), queryHeapFlags(), queryCodeContext());
 }
 
-IEngineRowAllocator *CActivityBase::getRowAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags) const
+IEngineRowAllocator *CActivityBase::getRowAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags, byte seq) const
 {
-    return queryJobChannel().getRowAllocator(meta, queryId(), flags);
+    activity_id actId = createCompoundActSeqId(queryId(), seq);
+    return queryJobChannel().getRowAllocator(meta, actId, flags);
 }
 
 bool CActivityBase::receiveMsg(ICommunicator &comm, CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender, unsigned timeout)

+ 26 - 8
thorlcr/graph/thgraph.hpp

@@ -90,11 +90,19 @@ interface IDiskUsage : extends IInterface
 interface IBackup;
 interface IFileInProgressHandler;
 interface IThorFileCache;
+interface IKJService : extends IInterface
+{
+    virtual void setCurrentJob(CJobBase &job) = 0;
+    virtual void reset() = 0;
+    virtual void start() = 0;
+    virtual void stop() = 0;
+};
 interface IThorResource
 {
     virtual IThorFileCache &queryFileCache() = 0;
     virtual IBackup &queryBackup() = 0;
     virtual IFileInProgressHandler &queryFileInProgressHandler() = 0;
+    virtual IKJService &queryKeyedJoinService() = 0;
 };
 
 interface IBarrier : extends IInterface
@@ -795,11 +803,12 @@ protected:
     StringBuffer wuid, user, scope, token;
     mutable CriticalSection wuDirty;
     mutable bool dirty;
-    mptag_t mpJobTag, slavemptag;
+    mptag_t slavemptag;
     Owned<IGroup> jobGroup, slaveGroup, nodeGroup;
     Owned<IPropertyTree> xgmml;
     Owned<IGraphTempHandler> tmpHandler;
     bool timeActivities;
+    unsigned channelsPerSlave;
     unsigned numChannels;
     unsigned maxActivityCores, globalMemoryMB, sharedMemoryMB;
     unsigned forceLogGraphIdMin, forceLogGraphIdMax;
@@ -829,7 +838,7 @@ protected:
         }
     } pluginCtx;
     SafePluginMap *pluginMap;
-    void endJob();
+    virtual void endJob();
 public:
     IMPLEMENT_IINTERFACE;
 
@@ -840,6 +849,7 @@ public:
     virtual void addChannel(IMPServer *mpServer) = 0;
     CJobChannel &queryJobChannel(unsigned c) const;
     CActivityBase &queryChannelActivity(unsigned c, graph_id gid, activity_id id) const;
+    unsigned queryChannelsPerSlave() const { return channelsPerSlave; }
     unsigned queryJobChannels() const { return jobChannels.ordinality(); }
     inline unsigned queryJobChannelSlaveNum(unsigned channelNum) const { dbgassertex(channelNum<queryJobChannels()); return jobChannelSlaveNumbers[channelNum]; }
     inline unsigned queryJobSlaveChannelNum(unsigned slaveNum) const { dbgassertex(slaveNum && slaveNum<=querySlaves()); return jobSlaveChannelNum[slaveNum-1]; }
@@ -877,7 +887,6 @@ public:
     void setDiskUsage(offset_t _diskUsage) { diskUsage = _diskUsage; }
     const offset_t queryMaxDiskUsage() const { return maxDiskUsage; }
     mptag_t querySlaveMpTag() const { return slavemptag; }
-    mptag_t queryJobMpTag() const { return mpJobTag; }
     unsigned querySlaves() const { return slaveGroup->ordinality(); }
     unsigned queryNodes() const { return nodeGroup->ordinality()-1; }
     IGroup &queryJobGroup() const { return *jobGroup; }
@@ -1010,6 +1019,13 @@ public:
 
 interface IOutputMetaData;
 
+inline activity_id createCompoundActSeqId(activity_id actId, byte seq)
+{
+    if (seq)
+        actId |= seq << 24;
+    return actId;
+}
+
 class graph_decl CActivityBase : implements CInterfaceOf<IThorRowInterfaces>, implements IExceptionHandler
 {
     Owned<IEngineRowAllocator> rowAllocator;
@@ -1059,7 +1075,7 @@ public:
     bool lastNode() { return container.queryJob().querySlaves() == container.queryJobChannel().queryMyRank(); }
     unsigned queryMaxCores() const { return container.queryMaxCores(); }
     IThorRowInterfaces *getRowInterfaces();
-    IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags=roxiemem::RHFnone) const;
+    IEngineRowAllocator *getRowAllocator(IOutputMetaData * meta, roxiemem::RoxieHeapFlags flags=roxiemem::RHFnone, byte seq=0) const;
 
     bool appendRowXml(StringBuffer & target, IOutputMetaData & meta, const void * row) const;
     void logRow(const char * prefix, IOutputMetaData & meta, const void * row);
@@ -1082,7 +1098,8 @@ public:
     void ActPrintLog(IException *e, const char *format, ...) __attribute__((format(printf, 3, 4)));
     void ActPrintLog(IException *e);
 
-    IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta);
+    IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, byte seq=0);
+    IThorRowInterfaces * createRowInterfaces(IOutputMetaData * meta, roxiemem::RoxieHeapFlags heapFlags, byte seq=0);
 
 // IExceptionHandler
     bool fireException(IException *e);
@@ -1149,9 +1166,10 @@ public:
     IMPLEMENT_IINTERFACE;
 
 // IThorResource
-    virtual IThorFileCache &queryFileCache() { UNIMPLEMENTED; return *((IThorFileCache *)NULL); }
-    virtual IBackup &queryBackup() { UNIMPLEMENTED; return *((IBackup *)NULL); }
-    virtual IFileInProgressHandler &queryFileInProgressHandler() { UNIMPLEMENTED; return *((IFileInProgressHandler *)NULL); }
+    virtual IThorFileCache &queryFileCache() override { UNIMPLEMENTED; }
+    virtual IBackup &queryBackup() override  { UNIMPLEMENTED; }
+    virtual IFileInProgressHandler &queryFileInProgressHandler() override  { UNIMPLEMENTED; }
+    virtual IKJService &queryKeyedJoinService() override { UNIMPLEMENTED; }
 };
 
 class graph_decl CThorGraphResults : implements IThorGraphResults, public CInterface

+ 0 - 3
thorlcr/graph/thgraphmaster.cpp

@@ -1337,7 +1337,6 @@ CJobMaster::CJobMaster(IConstWorkUnit &_workunit, const char *graphName, ILoaded
     sharedAllocator.setown(::createThorAllocator(globalMemoryMB, 0, 1, memorySpillAtPercentage, *logctx, crcChecking, usePackedAllocator));
     Owned<IMPServer> mpServer = getMPServer();
     addChannel(mpServer);
-    mpJobTag = allocateMPTag();
     slavemptag = allocateMPTag();
     slaveMsgHandler = new CSlaveMessageHandler(*this, slavemptag);
     tmpHandler.setown(createTempHandler(true));
@@ -1348,7 +1347,6 @@ CJobMaster::~CJobMaster()
 {
     if (slaveMsgHandler)
         delete slaveMsgHandler;
-    freeMPTag(mpJobTag);
     freeMPTag(slavemptag);
     tmpHandler.clear();
 }
@@ -1549,7 +1547,6 @@ void CJobMaster::sendQuery()
     CriticalBlock b(sendQueryCrit);
     if (querySent) return;
     CMessageBuffer tmp;
-    tmp.append(mpJobTag);
     tmp.append(slavemptag);
     tmp.append(queryWuid());
     tmp.append(graphName);

+ 8 - 2
thorlcr/graph/thgraphslave.cpp

@@ -1482,7 +1482,7 @@ public:
 };
 
 #define SLAVEGRAPHPOOLLIMIT 10
-CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, const char *graphName, ILoadedDllEntry *_querySo, mptag_t _mpJobTag, mptag_t _slavemptag) : CJobBase(_querySo, graphName), watchdog(_watchdog)
+CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, const char *graphName, ILoadedDllEntry *_querySo, mptag_t _slavemptag) : CJobBase(_querySo, graphName), watchdog(_watchdog)
 {
     workUnitInfo.set(_workUnitInfo);
     workUnitInfo->getProp("token", token);
@@ -1513,7 +1513,6 @@ CJobSlave::CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *_workUnitInfo, co
     }
 
     oldNodeCacheMem = 0;
-    mpJobTag = _mpJobTag;
     slavemptag = _slavemptag;
 
     IPropertyTree *plugins = workUnitInfo->queryPropTree("plugins");
@@ -1574,6 +1573,13 @@ void CJobSlave::startJob()
             throw MakeThorException(TE_NotEnoughFreeSpace, "Node %s has %u MB(s) of available disk space, specified minimum for this job: %u MB(s)", ep.getUrlStr(s).str(), (unsigned) freeSpace / 0x100000, minFreeSpace);
         }
     }
+    queryThor().queryKeyedJoinService().setCurrentJob(*this);
+}
+
+void CJobSlave::endJob()
+{
+    queryThor().queryKeyedJoinService().reset();
+    PARENT::endJob();
 }
 
 void CJobSlave::reportGraphEnd(graph_id gid)

+ 4 - 2
thorlcr/graph/thgraphslave.hpp

@@ -402,6 +402,7 @@ public:
 interface ISlaveWatchdog;
 class graphslave_decl CJobSlave : public CJobBase
 {
+    typedef CJobBase PARENT;
     ISlaveWatchdog *watchdog;
     Owned<IPropertyTree> workUnitInfo;
     size32_t oldNodeCacheMem;
@@ -410,10 +411,11 @@ class graphslave_decl CJobSlave : public CJobBase
 public:
     IMPLEMENT_IINTERFACE;
 
-    CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *workUnitInfo, const char *graphName, ILoadedDllEntry *querySo, mptag_t _mptag, mptag_t _slavemptag);
+    CJobSlave(ISlaveWatchdog *_watchdog, IPropertyTree *workUnitInfo, const char *graphName, ILoadedDllEntry *querySo, mptag_t _slavemptag);
 
     virtual void addChannel(IMPServer *mpServer);
-    virtual void startJob();
+    virtual void startJob() override;
+    virtual void endJob() override;
     const char *queryFindString() const { return key.get(); } // for string HT
 
     virtual IGraphTempHandler *createTempHandler(bool errorOnMissing);

+ 10 - 30
thorlcr/master/thactivitymaster.cpp

@@ -441,51 +441,31 @@ void CSlavePartMapping::serializeNullOffsetMap(MemoryBuffer &mb)
     mb.append((unsigned)0);
 }
 
-void CSlavePartMapping::serializeMap(unsigned i, MemoryBuffer &mb, IGetSlaveData *extra)
+void CSlavePartMapping::serializeMap(unsigned i, MemoryBuffer &mb, bool countPrefix)
 {
     if (local)
         i = 0;
-    if (i >= maps.ordinality())
+    if (i >= maps.ordinality() || (0 == maps.item(i).ordinality()))
     {
-        mb.append((unsigned)0);
+        if (countPrefix)
+            mb.append((unsigned)0);
         return;
     }
 
     CSlaveMap &map = maps.item(i);
     unsigned nPos = mb.length();
     unsigned n=0;
-    mb.append(n);
+    if (countPrefix)
+        mb.append(n);
     UnsignedArray parts;
     ForEachItemIn(m, map)
         parts.append(map.item(m).queryPartIndex());
-    MemoryBuffer extraMb;
-    if (extra)
+    if (countPrefix)
     {
-        ForEachItemIn(m2, map)
-        {
-            unsigned xtraLen = 0;
-            unsigned xtraPos = extraMb.length();
-            extraMb.append(xtraLen);
-            IPartDescriptor &partDesc = map.item(m2);
-            if (!extra->getData(m2, partDesc.queryPartIndex(), extraMb))
-            {
-                parts.zap(partDesc.queryPartIndex());
-                extraMb.rewrite(xtraPos);
-            }
-            else
-            {
-                xtraLen = (extraMb.length()-xtraPos)-sizeof(xtraLen);
-                extraMb.writeDirect(xtraPos, sizeof(xtraLen), &xtraLen);
-            }
-        }
-    }
-    n = parts.ordinality();
-    mb.writeDirect(nPos, sizeof(n), &n);
-    if (n)
-    {
-        fileDesc->serializeParts(mb, parts);
-        mb.append(extraMb);
+        n = parts.ordinality();
+        mb.writeDirect(nPos, sizeof(n), &n);
     }
+    fileDesc->serializeParts(mb, parts);
 }
 
 CSlavePartMapping::CSlavePartMapping(const char *_logicalName, IFileDescriptor &_fileDesc, IUserDescriptor *_userDesc, IGroup &localGroup, bool _local, bool index, IHash *hash, IDistributedSuperFile *super)

+ 1 - 6
thorlcr/master/thactivitymaster.hpp

@@ -29,11 +29,6 @@ enum masterEvents { ev_unknown, ev_done };
 
 #include "thmfilemanager.hpp"
 
-interface IGetSlaveData : extends IInterface
-{
-    virtual bool getData(unsigned slave, unsigned part, MemoryBuffer &mb) = 0;
-};
-
 struct CSlaveMap : public IArrayOf<IPartDescriptor>, public CInterface
 {
 };
@@ -68,7 +63,7 @@ public:
     }
     void serializeFileOffsetMap(MemoryBuffer &mb);
     void getParts(unsigned i, IArrayOf<IPartDescriptor> &parts);
-    void serializeMap(unsigned map, MemoryBuffer &mb, IGetSlaveData *extra=NULL);
+    void serializeMap(unsigned map, MemoryBuffer &mb, bool countPrefix=true);
     static void serializeNullMap(MemoryBuffer &mb);
     static void serializeNullOffsetMap(MemoryBuffer &mb);
 };

+ 6 - 2
thorlcr/master/thmastermain.cpp

@@ -263,6 +263,7 @@ public:
         queryRawGroup().serialize(msg);
         globals->serialize(msg);
         msg.append(masterSlaveMpTag);
+        msg.append(kjServiceMpTag);
         if (!queryNodeComm().send(msg, RANK_ALL_OTHER, MPTAG_THORREGISTRATION, MP_ASYNC_SEND))
         {
             PROGLOG("Failed to initialize slaves");
@@ -718,7 +719,8 @@ int main( int argc, char *argv[]  )
         SetTempDir(tempDirStr.str(), tempPrefix.str(), true);
 
         char thorPath[1024];
-        if (!GetCurrentDirectory(1024, thorPath)) {
+        if (!GetCurrentDirectory(1024, thorPath))
+        {
             ERRLOG("ThorMaster::main: Current directory path too big, setting it to null");
             thorPath[0] = 0;
         }
@@ -763,7 +765,8 @@ int main( int argc, char *argv[]  )
     getThorQueueNames(_queueNames, thorName);
     queueName.set(_queueNames.str());
 
-    try {
+    try
+    {
         CSDSServerStatus &serverStatus = openThorServerStatus();
 
         Owned<CRegistryServer> registry = new CRegistryServer();
@@ -780,6 +783,7 @@ int main( int argc, char *argv[]  )
 
         addAbortHandler(ControlHandler);
         masterSlaveMpTag = allocateClusterMPTag();
+        kjServiceMpTag = allocateClusterMPTag();
 
         if (registry->connect())
         {

文件差異過大導致無法顯示
+ 1434 - 8
thorlcr/slave/slavmain.cpp


+ 1 - 0
thorlcr/slave/thslavemain.cpp

@@ -159,6 +159,7 @@ static bool RegisterSelf(SocketEndpoint &masterEp)
 #endif
         }
         msg.read((unsigned &)masterSlaveMpTag);
+        msg.read((unsigned &)kjServiceMpTag);
         msg.clear();
         msg.setReplyTag(MPTAG_THORREGISTRATION);
         if (!queryNodeComm().reply(msg))

+ 38 - 1
thorlcr/thorutil/thmem.cpp

@@ -810,6 +810,17 @@ void CThorExpandingRowArray::transferRows(rowidx_t & outNumRows, const void * *
     stableTable = NULL;
 }
 
+void CThorExpandingRowArray::transferRows(rowidx_t start, rowidx_t num, CThorExpandingRowArray &tgt)
+{
+    if (start >= numRows)
+        return;
+    rowidx_t remaining = numRows-start;
+    rowidx_t max = remaining>num ? num : remaining;
+    tgt.appendRows(rows+start, max, true);
+    memmove(rows+start, rows+start+max, (remaining-max) * sizeof(void *));
+    numRows -= max;
+}
+
 void CThorExpandingRowArray::transferRowsCopy(const void **outRows, bool takeOwnership)
 {
     if (0 == numRows)
@@ -860,6 +871,32 @@ void CThorExpandingRowArray::removeRows(rowidx_t start, rowidx_t n)
     }
 }
 
+bool CThorExpandingRowArray::appendRows(const void **inRows, rowidx_t num, bool takeOwnership)
+{
+    if (0 == num)
+        return true;
+    if (numRows+num >= maxRows)
+    {
+        if (!resize(numRows + num))
+            return false;
+    }
+    const void **newRows = rows+numRows;
+    memcpy(newRows, inRows, num*sizeof(void **));
+    numRows += num;
+    if (!takeOwnership)
+    {
+        const void **lastNewRow = newRows+numRows-1;
+        for (;;)
+        {
+            LinkThorRow(*newRows);
+            if (newRows == lastNewRow)
+                break;
+            newRows++;
+        }
+    }
+    return true;
+}
+
 bool CThorExpandingRowArray::appendRows(CThorExpandingRowArray &inRows, bool takeOwnership)
 {
     rowidx_t num = inRows.ordinality();
@@ -2536,7 +2573,7 @@ public:
     { 
          // ignoring xml'ing extra
         //GH: I think this is what it should do
-        childMeta->toXML(*(const byte **)(self+extraSz), out); 
+        childMeta->toXML(*(const byte **)(self+extraSz), out);
     }
     virtual unsigned getVersion() const { return OUTPUTMETACHILDROW_VERSION; }
 

+ 2 - 0
thorlcr/thorutil/thmem.hpp

@@ -363,9 +363,11 @@ public:
         swap(from);
     }
     void transferRows(rowidx_t & outNumRows, const void * * & outRows);
+    void transferRows(rowidx_t start, rowidx_t num, CThorExpandingRowArray &tgt);
     void transferFrom(CThorExpandingRowArray &src);
     void transferFrom(CThorSpillableRowArray &src);
     void removeRows(rowidx_t start, rowidx_t n);
+    bool appendRows(const void **inRows, rowidx_t num, bool takeOwnership);
     bool appendRows(CThorExpandingRowArray &inRows, bool takeOwnership);
     bool appendRows(CThorSpillableRowArray &inRows, bool takeOwnership);
     void clearUnused();

+ 1 - 0
thorlcr/thorutil/thormisc.cpp

@@ -65,6 +65,7 @@ static ICommunicator *nodeComm;
 
 
 mptag_t masterSlaveMpTag;
+mptag_t kjServiceMpTag;
 IPropertyTree *globals;
 static IMPtagAllocator *ClusterMPAllocator = NULL;
 

+ 21 - 2
thorlcr/thorutil/thormisc.hpp

@@ -72,10 +72,28 @@
 #define THOROPT_TRACE_LIMIT           "traceLimit"              // Number of rows from TRACE activity                                            (default = 10)
 #define THOROPT_READ_CRC              "crcReadEnabled"          // Enabled CRC validation on disk reads if file CRC are available                (default = true)
 #define THOROPT_WRITE_CRC             "crcWriteEnabled"         // Calculate CRC's for disk outputs and store in file meta data                  (default = true)
-#define THOROPT_READCOMPRESSED_CRC    "crcReadCompressedEnabled"  // Enabled CRC validation on compressed disk reads if file CRC are available   (default = false)
+#define THOROPT_READCOMPRESSED_CRC    "crcReadCompressedEnabled" // Enabled CRC validation on compressed disk reads if file CRC are available   (default = false)
 #define THOROPT_WRITECOMPRESSED_CRC   "crcWriteCompressedEnabled" // Calculate CRC's for compressed disk outputs and store in file meta data     (default = false)
-#define THOROPT_CHILD_GRAPH_INIT_TIMEOUT "childGraphInitTimeout"  // Time to wait for child graphs to respond to initialization                  (default = 5*60 seconds)
+#define THOROPT_CHILD_GRAPH_INIT_TIMEOUT "childGraphInitTimeout" // Time to wait for child graphs to respond to initialization                  (default = 5*60 seconds)
 #define THOROPT_SORT_COMPBLKSZ        "sortCompBlkSz"           // Block size used by compressed spill in a spilling sort                        (default = 0, uses row writer default)
+#define THOROPT_KEYLOOKUP_QUEUED_BATCHSIZE "keyLookupQueuedBatchSize" // Number of rows candidates to gather before performing lookup against part (default = 1000)
+#define THOROPT_KEYLOOKUP_FETCH_QUEUED_BATCHSIZE "fetchLookupQueuedBatchSize" // Number of rows candidates to gather before performing lookup against part (default = 1000)
+#define THOROPT_KEYLOOKUP_MAX_LOOKUP_BATCHSIZE "keyLookupMaxLookupBatchSize"  // Maximum chunk of rows to process per cycle in lookup handler    (default = 1000)
+#define THOROPT_KEYLOOKUP_MAX_THREADS "maxKeyLookupThreads"     // Maximum number of threads performing keyed lookups                            (default = 10)
+#define THOROPT_KEYLOOKUP_MAX_FETCH_THREADS "maxFetchThreads"   // Maximum number of threads performing keyed lookups                            (default = 10)
+#define THOROPT_KEYLOOKUP_MAX_PROCESS_THREADS "keyLookupMaxProcessThreads" // Maximum number of threads performing keyed lookups                 (default = 10)
+#define THOROPT_KEYLOOKUP_MAX_QUEUED  "keyLookupMaxQueued"      // Total maximum number of rows (across all parts/threads) to queue              (default = 10000)
+#define THOROPT_KEYLOOKUP_MAX_DONE    "keyLookupMaxDone"        // Maximum number of done items pending to be ready by next activity             (default = 10000)
+#define THOROPT_REMOTE_KEYED_LOOKUP   "remoteKeyedLookup"       // Send key request to remote node unless part is local                          (default = true)
+#define THOROPT_REMOTE_KEYED_FETCH   "remoteKeyedFetch"         // Send fetch request to remote node unless part is local                        (default = true)
+#define THOROPT_FORCE_REMOTE_KEYED_LOOKUP "forceRemoteKeyedLookup" // force all keyed lookups, even where part local to be sent as if remote     (default = false)
+#define THOROPT_FORCE_REMOTE_KEYED_FETCH "forceRemoteKeyedFetch" // force all keyed fetches, even where part local to be sent as if remote       (default = false)
+#define THOROPT_KEYLOOKUP_MAX_LOCAL_HANDLERS "maxLocalHandlers" // maximum number of handlers dealing with local parts                           (default = 10)
+#define THOROPT_KEYLOOKUP_MAX_REMOTE_HANDLERS "maxRemoteHandlers" // maximum number of handlers per remote slave                                 (default = 2)
+#define THOROPT_KEYLOOKUP_MAX_FETCH_LOCAL_HANDLERS "maxLocalFetchHandlers" // maximum number of fetch handlers dealing with local parts          (default = 10)
+#define THOROPT_KEYLOOKUP_MAX_FETCH_REMOTE_HANDLERS "maxRemoteFetchHandlers" // maximum number of fetch handlers per remote slave                (default = 2)
+#define THOROPT_KEYLOOKUP_COMPRESS_MESSAGES "keyedJoinCompressMsgs" // compress key and fetch request messages                                   (default = true)
+
 
 #define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000  // max of row matches before selfjoin emits warning
 
@@ -442,6 +460,7 @@ extern graph_decl void reportExceptionToWorkunit(IConstWorkUnit &workunit,IExcep
 
 extern graph_decl IPropertyTree *globals;
 extern graph_decl mptag_t masterSlaveMpTag;
+extern graph_decl mptag_t kjServiceMpTag;
 enum SlaveMsgTypes { smt_errorMsg=1, smt_initGraphReq, smt_initActDataReq, smt_dataReq, smt_getPhysicalName, smt_getFileOffset, smt_actMsg, smt_getresult };
 // Logging
 extern graph_decl const LogMsgJobInfo thorJob;