|
@@ -35,7 +35,6 @@
|
|
|
|
|
|
enum join_t { JT_Undefined, JT_Inner, JT_LeftOuter, JT_RightOuter, JT_LeftOnly, JT_RightOnly, JT_LeftOnlyTransform };
|
|
|
enum joinkind_t { join_lookup, join_all, denormalize_lookup, denormalize_all };
|
|
|
-const char *joinActName[4] = { "LOOKUPJOIN", "ALLJOIN", "LOOKUPDENORMALIZE", "ALLDENORMALIZE" };
|
|
|
|
|
|
|
|
|
#define MAX_SEND_SIZE 0x100000 // 1MB
|
|
@@ -609,7 +608,6 @@ class CLookupJoinActivity : public CSlaveActivity, public CThorDataLink, impleme
|
|
|
}
|
|
|
protected:
|
|
|
joinkind_t joinKind;
|
|
|
- StringAttr joinStr;
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
@@ -647,10 +645,7 @@ public:
|
|
|
// IThorSlaveActivity overloaded methods
|
|
|
virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData)
|
|
|
{
|
|
|
- StringBuffer js(joinActName[(int)joinKind]);
|
|
|
- js.append("(").append(container.queryId()).append(")");
|
|
|
- joinStr.set(js.str());
|
|
|
- ActPrintLog("%s: init",joinStr.get());
|
|
|
+ ActPrintLog("init");
|
|
|
appendOutputLinked(this);
|
|
|
|
|
|
eos = false;
|
|
@@ -736,7 +731,7 @@ public:
|
|
|
while (slaves--)
|
|
|
rhsNodeRows.append(new CThorExpandingRowArray(*this, NULL, true)); // true, nulls not needed?
|
|
|
StringBuffer str;
|
|
|
- ActPrintLog("%s: Join type is %s", joinStr.get(), getJoinTypeStr(str).str());
|
|
|
+ ActPrintLog("Join type is %s", getJoinTypeStr(str).str());
|
|
|
}
|
|
|
virtual void onInputStarted(IException *except)
|
|
|
{
|
|
@@ -749,7 +744,7 @@ public:
|
|
|
}
|
|
|
virtual void onInputFinished(rowcount_t count)
|
|
|
{
|
|
|
- ActPrintLog("%s: LHS input finished, %"RCPF"d rows read", joinStr.get(), count);
|
|
|
+ ActPrintLog("LHS input finished, %"RCPF"d rows read", count);
|
|
|
}
|
|
|
virtual void start()
|
|
|
{
|
|
@@ -768,7 +763,6 @@ public:
|
|
|
leftAllocator.set(::queryRowAllocator(left));
|
|
|
outputMeta.set(left->queryFromActivity()->queryContainer().queryHelper()->queryOutputMeta());
|
|
|
left.setown(createDataLinkSmartBuffer(this,left,LOOKUPJOINL_SMART_BUFFER_SIZE,isSmartBufferSpillNeeded(left->queryFromActivity()),grouped,RCUNBOUND,this,false,&container.queryJob().queryIDiskUsage()));
|
|
|
- StringBuffer str(joinStr);
|
|
|
startInput(left);
|
|
|
right.set(inputs.item(1));
|
|
|
rightAllocator.set(::queryRowAllocator(right));
|
|
@@ -825,7 +819,7 @@ public:
|
|
|
defaultRight.setown(rr.finalizeRowClear(rrsz));
|
|
|
if (rlsz)
|
|
|
defaultLeft.setown(rl.finalizeRowClear(rlsz));
|
|
|
- dataLinkStart(joinStr, container.queryId());
|
|
|
+ dataLinkStart();
|
|
|
}
|
|
|
virtual void abort()
|
|
|
{
|
|
@@ -1328,42 +1322,26 @@ public:
|
|
|
rhs.ensure((rowidx_t)rhsTotalCount);
|
|
|
}
|
|
|
}
|
|
|
- Owned<IException> exception;
|
|
|
- try
|
|
|
+ if (needGlobal)
|
|
|
{
|
|
|
- if (needGlobal)
|
|
|
- {
|
|
|
- rowProcessor.start();
|
|
|
- broadcaster.start(this, mpTag, stopping);
|
|
|
- sendRHS();
|
|
|
- broadcaster.end();
|
|
|
- rowProcessor.wait();
|
|
|
- }
|
|
|
- else if (!stopping)
|
|
|
- {
|
|
|
- while (!abortSoon)
|
|
|
- {
|
|
|
- OwnedConstThorRow row = right->ungroupedNextRow();
|
|
|
- if (!row)
|
|
|
- break;
|
|
|
- addRow(row.getClear());
|
|
|
- }
|
|
|
- }
|
|
|
- if (!stopping)
|
|
|
- prepareRHS();
|
|
|
+ rowProcessor.start();
|
|
|
+ broadcaster.start(this, mpTag, stopping);
|
|
|
+ sendRHS();
|
|
|
+ broadcaster.end();
|
|
|
+ rowProcessor.wait();
|
|
|
}
|
|
|
- catch (IOutOfMemException *e) { exception.setown(e); }
|
|
|
- if (exception.get())
|
|
|
+ else if (!stopping)
|
|
|
{
|
|
|
- StringBuffer errStr(joinStr);
|
|
|
- errStr.append("(").append(container.queryId()).appendf(") right-hand side is too large (%"I64F"u bytes in %"RIPF"d rows) for %s : (",(unsigned __int64) rhs.serializedSize(),rhs.ordinality(),joinStr.get());
|
|
|
- errStr.append(exception->errorCode()).append(", ");
|
|
|
- exception->errorMessage(errStr);
|
|
|
- errStr.append(")");
|
|
|
- IException *e2 = MakeActivityException(this, TE_TooMuchData, "%s", errStr.str());
|
|
|
- ActPrintLog(e2);
|
|
|
- throw e2;
|
|
|
+ while (!abortSoon)
|
|
|
+ {
|
|
|
+ OwnedConstThorRow row = right->ungroupedNextRow();
|
|
|
+ if (!row)
|
|
|
+ break;
|
|
|
+ addRow(row.getClear());
|
|
|
+ }
|
|
|
}
|
|
|
+ if (!stopping)
|
|
|
+ prepareRHS();
|
|
|
}
|
|
|
void prepareRHS()
|
|
|
{
|