|
@@ -769,10 +769,6 @@ void CSlaveGraph::init(MemoryBuffer &mb)
|
|
|
|
|
|
void CSlaveGraph::initWithActData(MemoryBuffer &in, MemoryBuffer &out)
|
|
|
{
|
|
|
- CriticalBlock b(progressCrit);
|
|
|
- initialized = true;
|
|
|
- if (0 == in.length())
|
|
|
- return;
|
|
|
activity_id id;
|
|
|
loop
|
|
|
{
|
|
@@ -787,11 +783,9 @@ void CSlaveGraph::initWithActData(MemoryBuffer &in, MemoryBuffer &out)
|
|
|
in.read(sz);
|
|
|
unsigned aread = in.getPos();
|
|
|
CSlaveActivity *activity = (CSlaveActivity *)element->queryActivity();
|
|
|
- if (activity)
|
|
|
- {
|
|
|
- element->sentActInitData->set(0);
|
|
|
- activity->init(in, out);
|
|
|
- }
|
|
|
+ assertex(activity);
|
|
|
+ element->sentActInitData->set(0);
|
|
|
+ activity->init(in, out);
|
|
|
aread = in.getPos()-aread;
|
|
|
if (aread<sz)
|
|
|
{
|
|
@@ -801,6 +795,7 @@ void CSlaveGraph::initWithActData(MemoryBuffer &in, MemoryBuffer &out)
|
|
|
}
|
|
|
else if (aread>sz)
|
|
|
throw MakeActivityException(element, TE_SeriailzationError, "Serialization error - activity read beyond serialized data (%d byte(s))", aread-sz);
|
|
|
+ activity->setInitialized(true);
|
|
|
size32_t dl = out.length() - l;
|
|
|
if (dl)
|
|
|
out.writeDirect(l-sizeof(size32_t), sizeof(size32_t), &dl);
|
|
@@ -814,32 +809,43 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
|
|
|
{
|
|
|
bool ret = true;
|
|
|
unsigned needActInit = 0;
|
|
|
+ unsigned uninitialized = 0;
|
|
|
Owned<IThorActivityIterator> iter = getConnectedIterator();
|
|
|
ForEach(*iter)
|
|
|
{
|
|
|
CGraphElementBase &element = (CGraphElementBase &)iter->query();
|
|
|
CActivityBase *activity = element.queryActivity();
|
|
|
- if (activity && activity->needReInit())
|
|
|
- element.sentActInitData->set(0, false); // force act init to be resent
|
|
|
- if (!element.sentActInitData->test(0))
|
|
|
- ++needActInit;
|
|
|
- }
|
|
|
- if (needActInit)
|
|
|
- {
|
|
|
- mptag_t replyTag = TAG_NULL;
|
|
|
- size32_t len;
|
|
|
- CMessageBuffer actInitRtnData;
|
|
|
- actInitRtnData.append(false);
|
|
|
- CMessageBuffer msg;
|
|
|
-
|
|
|
- if (syncInitData())
|
|
|
+ if (activity)
|
|
|
{
|
|
|
- if (!graphCancelHandler.recv(queryJobChannel().queryJobComm(), msg, 0, mpTag, NULL, LONGTIMEOUT))
|
|
|
- throw MakeStringException(0, "Error receiving actinit data for graph: %" GIDPF "d", graphId);
|
|
|
- replyTag = msg.getReplyTag();
|
|
|
- msg.read(len);
|
|
|
+ if (activity->needReInit())
|
|
|
+ {
|
|
|
+ element.sentActInitData->set(0, false); // force act init to be resent
|
|
|
+ activity->setInitialized(false);
|
|
|
+ }
|
|
|
+ if (!element.sentActInitData->test(0))
|
|
|
+ ++needActInit;
|
|
|
+ if (!activity->queryInitialized())
|
|
|
+ ++uninitialized;
|
|
|
}
|
|
|
- else
|
|
|
+ }
|
|
|
+ if (0 == uninitialized)
|
|
|
+ return true;
|
|
|
+ mptag_t replyTag = TAG_NULL;
|
|
|
+ size32_t len = 0;
|
|
|
+ CMessageBuffer actInitRtnData;
|
|
|
+ actInitRtnData.append(false);
|
|
|
+ CMessageBuffer msg;
|
|
|
+
|
|
|
+ if (syncInitData())
|
|
|
+ {
|
|
|
+ if (!graphCancelHandler.recv(queryJobChannel().queryJobComm(), msg, 0, mpTag, NULL, LONGTIMEOUT))
|
|
|
+ throw MakeStringException(0, "Error receiving actinit data for graph: %" GIDPF "d", graphId);
|
|
|
+ replyTag = msg.getReplyTag();
|
|
|
+ msg.read(len);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ if (needActInit)
|
|
|
{
|
|
|
// initialize any for which no data was sent
|
|
|
msg.append(smt_initActDataReq); // may cause graph to be created at master
|
|
@@ -848,14 +854,20 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
|
|
|
assertex(!parentExtractSz || NULL!=parentExtract);
|
|
|
msg.append(parentExtractSz);
|
|
|
msg.append(parentExtractSz, parentExtract);
|
|
|
- Owned<IThorActivityIterator> iter = getConnectedIterator();
|
|
|
+
|
|
|
+ // NB: will only request activities that need initializaton data (those that override CSlaveActivity::init())
|
|
|
ForEach(*iter)
|
|
|
{
|
|
|
CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
|
|
|
- if (!element.sentActInitData->test(0))
|
|
|
+ CActivityBase *activity = element.queryActivity();
|
|
|
+ if (activity)
|
|
|
{
|
|
|
- msg.append(element.queryId());
|
|
|
-// element.serializeStartContext(msg);
|
|
|
+ if (!element.sentActInitData->test(0))
|
|
|
+ {
|
|
|
+ msg.append(element.queryId());
|
|
|
+ // JCSMORE -> GH - do you always generate a start context serializer?
|
|
|
+ element.serializeStartContext(msg);
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
msg.append((activity_id)0);
|
|
@@ -875,33 +887,16 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
|
|
|
}
|
|
|
msg.read(len);
|
|
|
}
|
|
|
+ }
|
|
|
+ if (len)
|
|
|
+ {
|
|
|
try
|
|
|
{
|
|
|
MemoryBuffer actInitData;
|
|
|
- if (len)
|
|
|
- actInitData.append(len, msg.readDirect(len));
|
|
|
+ actInitData.append(len, msg.readDirect(len));
|
|
|
+ CriticalBlock b(progressCrit);
|
|
|
+ initialized = true;
|
|
|
initWithActData(actInitData, actInitRtnData);
|
|
|
-
|
|
|
- if (queryOwner() && !isGlobal())
|
|
|
- {
|
|
|
- // initialize any for which no data was sent
|
|
|
- Owned<IThorActivityIterator> iter = getConnectedIterator();
|
|
|
- ForEach(*iter)
|
|
|
- {
|
|
|
- CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
|
|
|
- if (!element.sentActInitData->test(0))
|
|
|
- {
|
|
|
- element.sentActInitData->set(0);
|
|
|
- CSlaveActivity *activity = (CSlaveActivity *)element.queryActivity();
|
|
|
- if (activity)
|
|
|
- {
|
|
|
- MemoryBuffer in, out;
|
|
|
- activity->init(in, out);
|
|
|
- assertex(0 == out.length());
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
}
|
|
|
catch (IException *e)
|
|
|
{
|
|
@@ -911,14 +906,33 @@ bool CSlaveGraph::recvActivityInitData(size32_t parentExtractSz, const byte *par
|
|
|
e->Release();
|
|
|
ret = false;
|
|
|
}
|
|
|
+ }
|
|
|
+ if (syncInitData() || needActInit)
|
|
|
+ {
|
|
|
if (!queryJobChannel().queryJobComm().send(actInitRtnData, 0, replyTag, LONGTIMEOUT))
|
|
|
throw MakeStringException(0, "Timeout sending init data back to master");
|
|
|
}
|
|
|
+ // initialize any for which no data was sent
|
|
|
+ ForEach(*iter)
|
|
|
+ {
|
|
|
+ CSlaveGraphElement &element = (CSlaveGraphElement &)iter->query();
|
|
|
+ CSlaveActivity *activity = (CSlaveActivity *)element.queryActivity();
|
|
|
+ if (activity && !activity->queryInitialized())
|
|
|
+ {
|
|
|
+ activity->setInitialized(true);
|
|
|
+ element.sentActInitData->set(0);
|
|
|
+ MemoryBuffer in, out;
|
|
|
+ activity->init(in, out);
|
|
|
+ assertex(0 == out.length());
|
|
|
+ }
|
|
|
+ }
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
bool CSlaveGraph::preStart(size32_t parentExtractSz, const byte *parentExtract)
|
|
|
{
|
|
|
+ if (!recvActivityInitData(parentExtractSz, parentExtract))
|
|
|
+ throw MakeThorException(0, "preStart failure");
|
|
|
CGraphBase::preStart(parentExtractSz, parentExtract);
|
|
|
if (isGlobal())
|
|
|
{
|
|
@@ -1024,8 +1038,6 @@ void CSlaveGraph::executeSubGraph(size32_t parentExtractSz, const byte *parentEx
|
|
|
}
|
|
|
connect(); // only now do slave acts. have all their outputs prepared.
|
|
|
}
|
|
|
- if (!recvActivityInitData(parentExtractSz, parentExtract))
|
|
|
- throw MakeThorException(0, "preStart failure");
|
|
|
CGraphBase::executeSubGraph(parentExtractSz, parentExtract);
|
|
|
}
|
|
|
catch (IException *e)
|