浏览代码

Merge pull request #11040 from richardkchapman/partition-ky

HPCC-19482 Support PARTITION keys in Roxie keyed join

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 7 年之前
父节点
当前提交
1e423648d2
共有 1 个文件被更改,包括 8 次插入6 次删除
  1. 8 6
      roxie/ccd/ccdserver.cpp

+ 8 - 6
roxie/ccd/ccdserver.cpp

@@ -25371,7 +25371,7 @@ public:
                                 bool locallySorted = !thisKey->isFullySorted();
                                 while (locallySorted || tlk->lookup(false)) 
                                 {
-                                    unsigned slavePart = locallySorted ? 0 : (unsigned)extractFpos(tlk);
+                                    unsigned slavePart = locallySorted ? tlk->getPartition() : (unsigned)extractFpos(tlk);
                                     if (locallySorted || slavePart)
                                     {
                                         cvp *outputBuffer = (cvp *) remote.getMem(slavePart, fileNo, indexReadSize + sizeof(cvp) + (indexReadInputRecordVariable ? sizeof(unsigned) : 0));
@@ -25383,12 +25383,13 @@ public:
                                         }
                                         jg->notePending();
                                         memcpy(outputBuffer, extracted, indexReadSize);
-                                        if (locallySorted)
+                                        if (!slavePart)
                                         {
                                             for (unsigned i = 1; i < numChannels; i++)
                                                 jg->notePending();
-                                            break;
                                         }
+                                        if (locallySorted)
+                                            break;
                                     }
                                 }
                             }
@@ -26218,7 +26219,7 @@ public:
                                 bool locallySorted = (!thisKey->isFullySorted());
                                 while (locallySorted || tlk->lookup(false))
                                 {
-                                    unsigned slavePart = locallySorted ? 0 : (unsigned)extractFpos(tlk);
+                                    unsigned slavePart = locallySorted ? tlk->getPartition() : (unsigned)extractFpos(tlk);
                                     if (locallySorted || slavePart)
                                     {
                                         cvp *outputBuffer = (cvp *) remote.getMem(slavePart, fileNo, indexReadRecordSize + sizeof(cvp) + (indexReadInputRecordVariable ? sizeof(unsigned) : 0));
@@ -26230,12 +26231,13 @@ public:
                                         }
                                         jg->notePending();
                                         memcpy(outputBuffer, extracted, indexReadRecordSize);
-                                        if (locallySorted)
+                                        if (!slavePart)
                                         {
                                             for (unsigned i = 1; i < numChannels; i++)
                                                 jg->notePending();
-                                            break;
                                         }
+                                        if (locallySorted)
+                                            break;
                                     }
                                 }
                             }