|
@@ -1297,7 +1297,7 @@ public:
|
|
loop {
|
|
loop {
|
|
CRollingCacheElem * r = cache->mid(0);
|
|
CRollingCacheElem * r = cache->mid(0);
|
|
if (!r)
|
|
if (!r)
|
|
- break; // kit eos
|
|
|
|
|
|
+ break; // hit eos
|
|
int c = cmp->docompare(left,r->row);
|
|
int c = cmp->docompare(left,r->row);
|
|
if (c==0) {
|
|
if (c==0) {
|
|
r->cmp = limitedcmp->docompare(left,r->row);
|
|
r->cmp = limitedcmp->docompare(left,r->row);
|
|
@@ -1408,7 +1408,7 @@ public:
|
|
ThorActivityKind kind;
|
|
ThorActivityKind kind;
|
|
Owned<IException> exc;
|
|
Owned<IException> exc;
|
|
CriticalSection sect;
|
|
CriticalSection sect;
|
|
- bool eos;
|
|
|
|
|
|
+ bool eos, selfJoin;
|
|
|
|
|
|
|
|
|
|
void setException(IException *e,const char *title)
|
|
void setException(IException *e,const char *title)
|
|
@@ -1479,7 +1479,7 @@ public:
|
|
void doMatch(cWorkItem &work,SimpleInterThreadQueueOf<cOutItem,false> &outqueue)
|
|
void doMatch(cWorkItem &work,SimpleInterThreadQueueOf<cOutItem,false> &outqueue)
|
|
{
|
|
{
|
|
MemoryBuffer rmatchedbuf;
|
|
MemoryBuffer rmatchedbuf;
|
|
- CThorExpandingRowArray &rgroup = (kind==TAKselfjoin||kind==TAKselfjoinlight)?work.lgroup:work.rgroup;
|
|
|
|
|
|
+ CThorExpandingRowArray &rgroup = selfJoin?work.lgroup:work.rgroup;
|
|
bool *rmatched;
|
|
bool *rmatched;
|
|
if (rightouter) {
|
|
if (rightouter) {
|
|
rmatched = (bool *)rmatchedbuf.clear().reserve(rgroup.ordinality());
|
|
rmatched = (bool *)rmatchedbuf.clear().reserve(rgroup.ordinality());
|
|
@@ -1519,7 +1519,7 @@ public:
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
- CMultiCoreJoinHelperBase(CActivityBase &_activity, unsigned numthreads, IJoinHelper *_jhelper, IHThorJoinArg *_helper, IEngineRowAllocator *_allocator)
|
|
|
|
|
|
+ CMultiCoreJoinHelperBase(CActivityBase &_activity, unsigned numthreads, bool _selfJoin, IJoinHelper *_jhelper, IHThorJoinArg *_helper, IEngineRowAllocator *_allocator)
|
|
: activity(_activity), allocator(_allocator)
|
|
: activity(_activity), allocator(_allocator)
|
|
{
|
|
{
|
|
kind = activity.queryContainer().getKind();
|
|
kind = activity.queryContainer().getKind();
|
|
@@ -1529,6 +1529,7 @@ public:
|
|
leftouter = (flags & JFleftouter) != 0;
|
|
leftouter = (flags & JFleftouter) != 0;
|
|
rightouter = (flags & JFrightouter) != 0;
|
|
rightouter = (flags & JFrightouter) != 0;
|
|
exclude = (flags & JFexclude) != 0;
|
|
exclude = (flags & JFexclude) != 0;
|
|
|
|
+ selfJoin = _selfJoin;
|
|
numworkers = numthreads;
|
|
numworkers = numthreads;
|
|
eos = false;
|
|
eos = false;
|
|
}
|
|
}
|
|
@@ -1628,7 +1629,6 @@ class CMultiCoreJoinHelper: public CMultiCoreJoinHelperBase
|
|
{
|
|
{
|
|
PROGLOG("CMultiCoreJoinHelper::cWorker started");
|
|
PROGLOG("CMultiCoreJoinHelper::cWorker started");
|
|
MemoryBuffer rmatchedbuf;
|
|
MemoryBuffer rmatchedbuf;
|
|
- bool selfjoin = (parent->kind==TAKselfjoin||parent->kind==TAKselfjoinlight);
|
|
|
|
loop {
|
|
loop {
|
|
work.clear();
|
|
work.clear();
|
|
workready.signal();
|
|
workready.signal();
|
|
@@ -1661,8 +1661,8 @@ class CMultiCoreJoinHelper: public CMultiCoreJoinHelperBase
|
|
public:
|
|
public:
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
|
- CMultiCoreJoinHelper(CActivityBase &activity, unsigned numthreads, IJoinHelper *_jhelper, IHThorJoinArg *_helper, IEngineRowAllocator *_allocator)
|
|
|
|
- : CMultiCoreJoinHelperBase(activity, numthreads, _jhelper, _helper, _allocator)
|
|
|
|
|
|
+ CMultiCoreJoinHelper(CActivityBase &activity, unsigned numthreads, bool selfJoin, IJoinHelper *_jhelper, IHThorJoinArg *_helper, IEngineRowAllocator *_allocator)
|
|
|
|
+ : CMultiCoreJoinHelperBase(activity, numthreads, selfJoin, _jhelper, _helper, _allocator)
|
|
{
|
|
{
|
|
reader.parent = this;
|
|
reader.parent = this;
|
|
workers = new cWorker *[numthreads];
|
|
workers = new cWorker *[numthreads];
|
|
@@ -1802,7 +1802,6 @@ class CMultiCoreUnorderedJoinHelper: public CMultiCoreJoinHelperBase
|
|
{
|
|
{
|
|
CMultiCoreUnorderedJoinHelper *parent;
|
|
CMultiCoreUnorderedJoinHelper *parent;
|
|
public:
|
|
public:
|
|
- SimpleInterThreadQueueOf<cOutItem,false> outqueue; // used in ordered
|
|
|
|
cWorker(CMultiCoreUnorderedJoinHelper *_parent)
|
|
cWorker(CMultiCoreUnorderedJoinHelper *_parent)
|
|
: Thread("CMulticoreUnorderedJoinHelper::cWorker"), parent(_parent)
|
|
: Thread("CMulticoreUnorderedJoinHelper::cWorker"), parent(_parent)
|
|
{
|
|
{
|
|
@@ -1838,8 +1837,8 @@ class CMultiCoreUnorderedJoinHelper: public CMultiCoreJoinHelperBase
|
|
public:
|
|
public:
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
|
- CMultiCoreUnorderedJoinHelper(CActivityBase &activity, unsigned numthreads, IJoinHelper *_jhelper, IHThorJoinArg *_helper, IEngineRowAllocator *_allocator)
|
|
|
|
- : CMultiCoreJoinHelperBase(activity, numthreads, _jhelper, _helper, _allocator)
|
|
|
|
|
|
+ CMultiCoreUnorderedJoinHelper(CActivityBase &activity, unsigned numthreads, bool selfJoin, IJoinHelper *_jhelper, IHThorJoinArg *_helper, IEngineRowAllocator *_allocator)
|
|
|
|
+ : CMultiCoreJoinHelperBase(activity, numthreads, selfJoin, _jhelper, _helper, _allocator)
|
|
{
|
|
{
|
|
reader.parent = this;
|
|
reader.parent = this;
|
|
stoppedworkers = 0;
|
|
stoppedworkers = 0;
|
|
@@ -1944,8 +1943,8 @@ IJoinHelper *createJoinHelper(CActivityBase &activity, IHThorJoinArg *helper, IE
|
|
return jhelper;
|
|
return jhelper;
|
|
unsigned numthreads = getAffinityCpus();
|
|
unsigned numthreads = getAffinityCpus();
|
|
if (unsortedoutput)
|
|
if (unsortedoutput)
|
|
- return new CMultiCoreUnorderedJoinHelper(activity, numthreads, jhelper, helper, allocator);
|
|
|
|
- return new CMultiCoreJoinHelper(activity, numthreads, jhelper, helper, allocator);
|
|
|
|
|
|
+ return new CMultiCoreUnorderedJoinHelper(activity, numthreads, false, jhelper, helper, allocator);
|
|
|
|
+ return new CMultiCoreJoinHelper(activity, numthreads, false, jhelper, helper, allocator);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1962,8 +1961,8 @@ IJoinHelper *createSelfJoinHelper(CActivityBase &activity, IHThorJoinArg *helper
|
|
return jhelper;
|
|
return jhelper;
|
|
unsigned numthreads = getAffinityCpus();
|
|
unsigned numthreads = getAffinityCpus();
|
|
if (unsortedoutput)
|
|
if (unsortedoutput)
|
|
- return new CMultiCoreUnorderedJoinHelper(activity, numthreads, jhelper, helper, allocator);
|
|
|
|
- return new CMultiCoreJoinHelper(activity, numthreads, jhelper, helper, allocator);
|
|
|
|
|
|
+ return new CMultiCoreUnorderedJoinHelper(activity, numthreads, true, jhelper, helper, allocator);
|
|
|
|
+ return new CMultiCoreJoinHelper(activity, numthreads, true, jhelper, helper, allocator);
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|