|
@@ -1194,7 +1194,9 @@ class CRoxieWorker : public CInterface, implements IPooledThread
|
|
|
{
|
|
|
RoxieQueue *queue;
|
|
|
CriticalSection actCrit;
|
|
|
+#ifndef NEW_IBYTI
|
|
|
Semaphore ibytiSem;
|
|
|
+#endif
|
|
|
bool stopped;
|
|
|
bool abortJob;
|
|
|
bool busy;
|
|
@@ -1247,8 +1249,10 @@ public:
|
|
|
if (packet && packet->queryHeader().channel==channel)
|
|
|
{
|
|
|
abortJob = true;
|
|
|
+#ifndef NEW_IBYTI
|
|
|
if (doIbytiDelay)
|
|
|
ibytiSem.signal();
|
|
|
+#endif
|
|
|
if (activity)
|
|
|
activity->abort();
|
|
|
}
|
|
@@ -1261,8 +1265,10 @@ public:
|
|
|
{
|
|
|
queryFound = true;
|
|
|
abortJob = true;
|
|
|
+#ifndef NEW_IBYTI
|
|
|
if (doIbytiDelay)
|
|
|
ibytiSem.signal();
|
|
|
+#endif
|
|
|
if (activity)
|
|
|
{
|
|
|
// Try to stop/abort a job after it starts only if IBYTI comes from a higher priority agent
|
|
@@ -1386,18 +1392,19 @@ public:
|
|
|
try
|
|
|
{
|
|
|
bool debugging = logctx.queryDebuggerActive();
|
|
|
-#ifdef SUBCHANNELS_IN_HEADER
|
|
|
if (debugging)
|
|
|
{
|
|
|
- if (!mySubChannel)
|
|
|
- abortJob = true; // when debugging, we always run on primary only... should really have sent to primary only too...
|
|
|
+ if (mySubChannel)
|
|
|
+ abortJob = true; // when debugging, we always run on primary only...
|
|
|
}
|
|
|
+#ifndef NEW_IBYTI
|
|
|
+#ifdef SUBCHANNELS_IN_HEADER
|
|
|
else if (doIbytiDelay && mySubChannel)
|
|
|
{
|
|
|
unsigned delay = 0;
|
|
|
for (unsigned subChannel = 0; subChannel < mySubChannel; subChannel++)
|
|
|
delay += getIbytiDelay(header.subChannels[subChannel].getIpAddress());
|
|
|
- unsigned start;
|
|
|
+ unsigned start = 0;
|
|
|
if (traceRoxiePackets)
|
|
|
{
|
|
|
StringBuffer x;
|
|
@@ -1418,11 +1425,6 @@ public:
|
|
|
}
|
|
|
}
|
|
|
#else
|
|
|
- if (debugging)
|
|
|
- {
|
|
|
- if (mySubChannel)
|
|
|
- abortJob = true; // when debugging, we always run on primary only...
|
|
|
- }
|
|
|
else if (doIbytiDelay && (numAgents > 1))
|
|
|
{
|
|
|
unsigned hdrHashVal = header.priorityHash();
|
|
@@ -1468,6 +1470,7 @@ public:
|
|
|
}
|
|
|
}
|
|
|
#endif
|
|
|
+#endif
|
|
|
if (abortJob)
|
|
|
{
|
|
|
CriticalBlock b(actCrit);
|
|
@@ -1534,8 +1537,10 @@ public:
|
|
|
maxAgentsActive.store_max(agentsActive);
|
|
|
abortJob = false;
|
|
|
busy = true;
|
|
|
+#ifndef NEW_IBYTI
|
|
|
if (doIbytiDelay)
|
|
|
ibytiSem.reinit(0U); // Make sure sem is is in no-signaled state
|
|
|
+#endif
|
|
|
packet.setown(queue->dequeue());
|
|
|
if (packet)
|
|
|
{
|
|
@@ -1930,6 +1935,293 @@ public:
|
|
|
}
|
|
|
};
|
|
|
|
|
|
+//------------------------------------------------------------------------------------------------------------
|
|
|
+#ifdef NEW_IBYTI
|
|
|
+
|
|
|
+class DelayedPacketQueue
|
|
|
+{
|
|
|
+ // Used to keep a list of all recently-received packets where we are not primary subchannel. There is one queue per subchannel level
|
|
|
+ // It is accessed ONLY from the main reader thread and does not need to be threadsafe (but does need to be fast)
|
|
|
+ // We use a doubly-linked list (not std::list as not quite flexible enough).
|
|
|
+
|
|
|
+ class DelayedPacketEntry
|
|
|
+ {
|
|
|
+ DelayedPacketEntry() = delete;
|
|
|
+ DelayedPacketEntry(const DelayedPacketEntry&) = delete;
|
|
|
+ public:
|
|
|
+ DelayedPacketEntry(IRoxieQueryPacket *_packet, unsigned _waitExpires)
|
|
|
+ : packet(_packet), waitExpires(_waitExpires)
|
|
|
+ {
|
|
|
+ }
|
|
|
+ ~DelayedPacketEntry()
|
|
|
+ {
|
|
|
+ if (prev)
|
|
|
+ prev->next = next;
|
|
|
+ if (next)
|
|
|
+ next->prev = prev;
|
|
|
+ }
|
|
|
+ bool matches(const RoxiePacketHeader &ibyti) const
|
|
|
+ {
|
|
|
+ return packet->queryHeader().matchPacket(ibyti);
|
|
|
+ }
|
|
|
+ IRoxieQueryPacket *getClear()
|
|
|
+ {
|
|
|
+ return packet.getClear();
|
|
|
+ }
|
|
|
+ StringBuffer & describe(StringBuffer &ret) const
|
|
|
+ {
|
|
|
+ return packet->queryHeader().toString(ret);
|
|
|
+ }
|
|
|
+
|
|
|
+ Owned<IRoxieQueryPacket> packet;
|
|
|
+ DelayedPacketEntry *next = nullptr;
|
|
|
+ DelayedPacketEntry *prev = nullptr;
|
|
|
+
|
|
|
+ unsigned waitExpires = 0;
|
|
|
+ };
|
|
|
+
|
|
|
+public:
|
|
|
+ DelayedPacketQueue() = default;
|
|
|
+ DelayedPacketQueue(const DelayedPacketQueue&) = delete;
|
|
|
+ ~DelayedPacketQueue()
|
|
|
+ {
|
|
|
+ while (head)
|
|
|
+ removeEntry(head);
|
|
|
+ }
|
|
|
+ bool doIBYTI(const RoxiePacketHeader &ibyti)
|
|
|
+ {
|
|
|
+ assert(GetCurrentThreadId()==roxiePacketReaderThread);
|
|
|
+ DelayedPacketEntry *finger = head;
|
|
|
+ while (finger)
|
|
|
+ {
|
|
|
+ if (finger->matches(ibyti))
|
|
|
+ {
|
|
|
+ if (traceRoxiePackets)
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("IBYTI removing delayed packet %s", finger->describe(s).str());
|
|
|
+ }
|
|
|
+ removeEntry(finger);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ finger = finger->next;
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ void append(IRoxieQueryPacket *packet, unsigned expires)
|
|
|
+ {
|
|
|
+ // Goes on the end. But percolate the expiry time backwards
|
|
|
+ assert(GetCurrentThreadId()==roxiePacketReaderThread);
|
|
|
+ DelayedPacketEntry *newEntry = new DelayedPacketEntry(packet, expires);
|
|
|
+ if (traceRoxiePackets)
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("Adding delayed packet %s", packet->queryHeader().toString(s).str());
|
|
|
+ }
|
|
|
+ newEntry->prev = tail;
|
|
|
+ if (tail)
|
|
|
+ {
|
|
|
+ tail->next = newEntry;
|
|
|
+ for (DelayedPacketEntry *finger = tail; finger != nullptr; finger = finger->prev)
|
|
|
+ {
|
|
|
+ if ((int) (finger->waitExpires - expires) <= 0)
|
|
|
+ break;
|
|
|
+ finger->waitExpires = expires;
|
|
|
+ finger = finger->prev;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ head = newEntry;
|
|
|
+ tail = newEntry;
|
|
|
+ }
|
|
|
+
|
|
|
+ // Move any that we are done waiting for our buddy onto the active queue
|
|
|
+ void checkExpired(
|
|
|
+ unsigned now,
|
|
|
+#ifdef ROXIE_SLA_LOGIC
|
|
|
+ RoxieQueue &slaQueue,
|
|
|
+#endif
|
|
|
+ RoxieQueue &hiQueue, RoxieQueue &loQueue)
|
|
|
+ {
|
|
|
+ assert(GetCurrentThreadId()==roxiePacketReaderThread);
|
|
|
+ DelayedPacketEntry *finger = head;
|
|
|
+ while (finger)
|
|
|
+ {
|
|
|
+ if (((int) (finger->waitExpires - now)) <= 0) // Oddly coded to handle wrapping
|
|
|
+ {
|
|
|
+ IRoxieQueryPacket *packet = finger->getClear();
|
|
|
+ const RoxiePacketHeader &header = packet->queryHeader();
|
|
|
+ if (traceRoxiePackets)
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("No IBYTI received yet for delayed packet %s", header.toString(s).str());
|
|
|
+ }
|
|
|
+#ifdef ROXIE_SLA_LOGIC
|
|
|
+ if (header.activityId & ROXIE_SLA_PRIORITY)
|
|
|
+ slaQueue.enqueue(packet);
|
|
|
+ else
|
|
|
+#endif
|
|
|
+ if (header.activityId & ROXIE_HIGH_PRIORITY)
|
|
|
+ hiQueue.enqueue(packet);
|
|
|
+ else
|
|
|
+ loQueue.enqueue(packet);
|
|
|
+ for (unsigned subChannel = 0; subChannel < MAX_SUBCHANNEL; subChannel++)
|
|
|
+ {
|
|
|
+ if (header.subChannels[subChannel].isMe() || header.subChannels[subChannel].isNull())
|
|
|
+ break;
|
|
|
+ noteNodeSick(header.subChannels[subChannel].getIpAddress());
|
|
|
+ }
|
|
|
+
|
|
|
+ DelayedPacketEntry *goer = finger;
|
|
|
+ finger = finger->next;
|
|
|
+ removeEntry(goer);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // How long until the next time we want to call checkExpires() ?
|
|
|
+ unsigned timeout(unsigned now) const
|
|
|
+ {
|
|
|
+ assert(GetCurrentThreadId()==roxiePacketReaderThread);
|
|
|
+ if (head)
|
|
|
+ {
|
|
|
+ int delay = (int) (head->waitExpires - now);
|
|
|
+ if (delay <= 0)
|
|
|
+ return 0;
|
|
|
+ else
|
|
|
+ return (unsigned) delay;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ return (unsigned) -1;
|
|
|
+ }
|
|
|
+
|
|
|
+private:
|
|
|
+ void removeEntry(DelayedPacketEntry *goer)
|
|
|
+ {
|
|
|
+ if (goer==head)
|
|
|
+ head = goer->next;
|
|
|
+ if (goer==tail)
|
|
|
+ tail = goer->prev;
|
|
|
+ delete goer;
|
|
|
+ }
|
|
|
+
|
|
|
+ DelayedPacketEntry *head = nullptr;
|
|
|
+ DelayedPacketEntry *tail = nullptr;
|
|
|
+
|
|
|
+};
|
|
|
+
|
|
|
+//------------------------------------------------------------------------------------------------------------
|
|
|
+
|
|
|
+class DelayedPacketQueueChannel : public CInterface
|
|
|
+{
|
|
|
+ // Manages a set of DelayedPacketQueues, one for each supported subchannel level.
|
|
|
+ DelayedPacketQueueChannel() = delete;
|
|
|
+ DelayedPacketQueueChannel(const DelayedPacketQueueChannel&) = delete;
|
|
|
+public:
|
|
|
+ DelayedPacketQueueChannel(unsigned _channel) : channel(_channel)
|
|
|
+ {
|
|
|
+ }
|
|
|
+ inline unsigned queryChannel() const { return channel; }
|
|
|
+ inline DelayedPacketQueue &queryQueue(unsigned subchannel)
|
|
|
+ {
|
|
|
+ assertex(subchannel); // Subchannel 0 means primary and is never delayed
|
|
|
+ subchannel -= 1;
|
|
|
+ if (subchannel > maxSeen)
|
|
|
+ maxSeen = subchannel;
|
|
|
+ return queues[subchannel];
|
|
|
+ }
|
|
|
+ unsigned timeout(unsigned now) const
|
|
|
+ {
|
|
|
+ unsigned min = (unsigned) -1;
|
|
|
+ for (unsigned queue = 0; queue <= maxSeen; queue++)
|
|
|
+ {
|
|
|
+ unsigned t = queues[queue].timeout(now);
|
|
|
+ if (t < min)
|
|
|
+ min = t;
|
|
|
+ }
|
|
|
+ return min;
|
|
|
+ }
|
|
|
+ void checkExpired(
|
|
|
+ unsigned now,
|
|
|
+#ifdef ROXIE_SLA_LOGIC
|
|
|
+ RoxieQueue &slaQueue,
|
|
|
+#endif
|
|
|
+ RoxieQueue &hiQueue, RoxieQueue &loQueue)
|
|
|
+ {
|
|
|
+ for (unsigned queue = 0; queue <= maxSeen; queue++)
|
|
|
+ {
|
|
|
+ queues[queue].checkExpired(
|
|
|
+ now,
|
|
|
+#ifdef ROXIE_SLA_LOGIC
|
|
|
+ slaQueue,
|
|
|
+#endif
|
|
|
+ hiQueue, loQueue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+private:
|
|
|
+ DelayedPacketQueue queues[MAX_SUBCHANNEL-1]; // Note - primary subchannel is not included
|
|
|
+ unsigned channel = 0;
|
|
|
+ unsigned maxSeen = 0;
|
|
|
+};
|
|
|
+
|
|
|
+class DelayedPacketQueueManager
|
|
|
+{
|
|
|
+
|
|
|
+public:
|
|
|
+ DelayedPacketQueueManager() = default;
|
|
|
+ DelayedPacketQueueManager(const DelayedPacketQueueManager&) = delete;
|
|
|
+ inline DelayedPacketQueue &queryQueue(unsigned channel, unsigned subchannel)
|
|
|
+ {
|
|
|
+ // Note - there are normally no more than a couple of channels on a single agent.
|
|
|
+ // If that were to change we could make this a fixed size array
|
|
|
+ assert(GetCurrentThreadId()==roxiePacketReaderThread);
|
|
|
+ ForEachItemIn(idx, channels)
|
|
|
+ {
|
|
|
+ DelayedPacketQueueChannel &i = channels.item(idx);
|
|
|
+ if (i.queryChannel() == channel)
|
|
|
+ return i.queryQueue(subchannel);
|
|
|
+ }
|
|
|
+ channels.append(*new DelayedPacketQueueChannel(channel));
|
|
|
+ return channels.tos().queryQueue(subchannel);
|
|
|
+ }
|
|
|
+ unsigned timeout(unsigned now) const
|
|
|
+ {
|
|
|
+ unsigned ret = (unsigned) -1;
|
|
|
+ ForEachItemIn(idx, channels)
|
|
|
+ {
|
|
|
+ unsigned t = channels.item(idx).timeout(now);
|
|
|
+ if (t < ret)
|
|
|
+ ret = t;
|
|
|
+ }
|
|
|
+ return ret;
|
|
|
+ }
|
|
|
+ void checkExpired(
|
|
|
+ unsigned now,
|
|
|
+#ifdef ROXIE_SLA_LOGIC
|
|
|
+ RoxieQueue &slaQueue,
|
|
|
+#endif
|
|
|
+ RoxieQueue &hiQueue, RoxieQueue &loQueue)
|
|
|
+ {
|
|
|
+ ForEachItemIn(idx, channels)
|
|
|
+ {
|
|
|
+ channels.item(idx).checkExpired(
|
|
|
+ now,
|
|
|
+#ifdef ROXIE_SLA_LOGIC
|
|
|
+ slaQueue,
|
|
|
+#endif
|
|
|
+ hiQueue, loQueue);
|
|
|
+ }
|
|
|
+ }
|
|
|
+private:
|
|
|
+ CIArrayOf<DelayedPacketQueueChannel> channels;
|
|
|
+};
|
|
|
+#endif
|
|
|
+
|
|
|
+//------------------------------------------------------------------------------------------------------------
|
|
|
+
|
|
|
class RoxieSocketQueueManager : public RoxieReceiverBase
|
|
|
{
|
|
|
protected:
|
|
@@ -1939,6 +2231,9 @@ protected:
|
|
|
Owned<TokenBucket> bucket;
|
|
|
unsigned maxPacketSize = 0;
|
|
|
std::atomic<bool> running = { false };
|
|
|
+#ifdef NEW_IBYTI
|
|
|
+ DelayedPacketQueueManager delayed;
|
|
|
+#endif
|
|
|
|
|
|
class ReceiverThread : public Thread
|
|
|
{
|
|
@@ -2098,25 +2393,28 @@ public:
|
|
|
return ret;
|
|
|
}
|
|
|
|
|
|
- void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue
|
|
|
-#ifndef SUBCHANNELS_IN_HEADER
|
|
|
- , const ITopologyServer* topology
|
|
|
-#endif
|
|
|
- )
|
|
|
+ void doIbyti(RoxiePacketHeader &header, RoxieQueue &queue)
|
|
|
{
|
|
|
assert(!localAgent);
|
|
|
bool preActivity = false;
|
|
|
#ifdef SUBCHANNELS_IN_HEADER
|
|
|
unsigned mySubChannel = header.mySubChannel();
|
|
|
#else
|
|
|
+ Owned<const ITopologyServer> topology = getTopology();
|
|
|
const ChannelInfo &channelInfo = topology->queryChannelInfo(header.channel);
|
|
|
unsigned mySubChannel = channelInfo.subChannel();
|
|
|
#endif
|
|
|
|
|
|
if (header.retries == QUERY_ABORTED)
|
|
|
{
|
|
|
- abortRunning(header, queue, false, preActivity);
|
|
|
- queue.remove(header);
|
|
|
+ bool foundInQ = false;
|
|
|
+#ifdef NEW_IBYTI
|
|
|
+ foundInQ = mySubChannel != 0 && delayed.queryQueue(header.channel, mySubChannel).doIBYTI(header);
|
|
|
+#endif
|
|
|
+ if (!foundInQ)
|
|
|
+ foundInQ = queue.remove(header);
|
|
|
+ if (!foundInQ)
|
|
|
+ abortRunning(header, queue, false, preActivity);
|
|
|
|
|
|
if (traceRoxiePackets || traceLevel > 10)
|
|
|
{
|
|
@@ -2141,7 +2439,12 @@ public:
|
|
|
#else
|
|
|
noteNodeHealthy(header.subChannels[subChannel].getIpAddress());
|
|
|
#endif
|
|
|
- bool foundInQ = queue.remove(header);
|
|
|
+ bool foundInQ = false;
|
|
|
+#ifdef NEW_IBYTI
|
|
|
+ foundInQ = mySubChannel != 0 && delayed.queryQueue(header.channel, mySubChannel).doIBYTI(header);
|
|
|
+#endif
|
|
|
+ if (!foundInQ)
|
|
|
+ foundInQ = queue.remove(header);
|
|
|
if (foundInQ)
|
|
|
{
|
|
|
if (traceRoxiePackets || traceLevel > 10)
|
|
@@ -2181,84 +2484,100 @@ public:
|
|
|
{
|
|
|
// NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
|
|
|
// DO NOT put tracing on this thread except at very high tracelevels!
|
|
|
-#ifdef SUBCHANNELS_IN_HEADER
|
|
|
- unsigned mySubchannel = header.mySubChannel();
|
|
|
-#else
|
|
|
- Owned<const ITopologyServer> topology = getTopology();
|
|
|
- if (!header.channel)
|
|
|
- {
|
|
|
- // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
|
|
|
- // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
|
|
|
- // Unfortunately this is bad news for dropping packets
|
|
|
- const std::vector<unsigned> channels = topology->queryChannels();
|
|
|
- Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
|
|
|
- for (unsigned i = 1; i < channels.size(); i++)
|
|
|
- queue.enqueue(packet->clonePacket(channels[i]));
|
|
|
- header.channel = channels[0];
|
|
|
- queue.enqueue(packet.getClear());
|
|
|
- return;
|
|
|
- }
|
|
|
- unsigned mySubchannel = topology->queryChannelInfo(header.channel).subChannel();
|
|
|
-#endif
|
|
|
- if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
|
|
|
+ if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0)
|
|
|
+ doIbyti(header, queue);
|
|
|
+ else
|
|
|
{
|
|
|
- Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
|
|
|
- if (traceLevel > 10)
|
|
|
+ if (!header.channel)
|
|
|
{
|
|
|
- StringBuffer s;
|
|
|
- DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
|
|
|
+ // Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
|
|
|
+ // So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
|
|
|
+ // Unfortunately this is bad news for dropping packets
|
|
|
+ // In SUBCHANNELS_IN_HEADER mode this translation has been done on server before sending, except for some control messages like PING or UNLOAD
|
|
|
+
|
|
|
+ Owned<const ITopologyServer> topology = getTopology();
|
|
|
+ const std::vector<unsigned> channels = topology->queryChannels();
|
|
|
+ Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
|
|
|
+ for (unsigned i = 1; i < channels.size(); i++)
|
|
|
+ queue.enqueue(packet->clonePacket(channels[i]));
|
|
|
+ header.channel = channels[0];
|
|
|
+ queue.enqueue(packet.getClear());
|
|
|
+ return;
|
|
|
}
|
|
|
- doFileCallback(packet);
|
|
|
- }
|
|
|
- else if ((header.activityId & ~ROXIE_PRIORITY_MASK) == 0)
|
|
|
- doIbyti(header, queue
|
|
|
-#ifndef SUBCHANNELS_IN_HEADER
|
|
|
- , topology
|
|
|
+#ifdef SUBCHANNELS_IN_HEADER
|
|
|
+ unsigned mySubchannel = header.mySubChannel();
|
|
|
+#else
|
|
|
+ Owned<const ITopologyServer> topology = getTopology();
|
|
|
+ unsigned mySubchannel = topology->queryChannelInfo(header.channel).subChannel();
|
|
|
#endif
|
|
|
- ); // MORE - check how fast this is!
|
|
|
- else if (IBYTIbufferSize && queue.lookupOrphanIBYTI(header))
|
|
|
- {
|
|
|
- if (traceRoxiePackets || traceLevel > 10)
|
|
|
+ if (header.activityId == ROXIE_FILECALLBACK || header.activityId == ROXIE_DEBUGCALLBACK )
|
|
|
{
|
|
|
- StringBuffer s;
|
|
|
- DBGLOG("doIBYTI packet was too early : %s", header.toString(s).str());
|
|
|
+ Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
|
|
|
+ if (traceLevel > 10)
|
|
|
+ {
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("ROXIE_CALLBACK %s", header.toString(s).str());
|
|
|
+ }
|
|
|
+ doFileCallback(packet);
|
|
|
}
|
|
|
- ibytiPacketsTooLate--;
|
|
|
- ibytiPacketsTooEarly++;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
|
|
|
- AgentContextLogger logctx(packet);
|
|
|
- unsigned retries = header.thisChannelRetries(mySubchannel);
|
|
|
- if (retries)
|
|
|
+ else if (IBYTIbufferSize && queue.lookupOrphanIBYTI(header))
|
|
|
{
|
|
|
- // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread
|
|
|
- assertex(header.channel); // should never see a retry on channel 0
|
|
|
- if (retries >= SUBCHANNEL_MASK)
|
|
|
- return; // someone sent a failure or something - ignore it
|
|
|
-
|
|
|
- // Send back an out-of-band immediately, to let Roxie server know that channel is still active
|
|
|
- if (!(testAgentFailure & 0x800))
|
|
|
+ if (traceRoxiePackets || traceLevel > 10)
|
|
|
{
|
|
|
- RoxiePacketHeader newHeader(header, ROXIE_ALIVE, mySubchannel);
|
|
|
- Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
|
|
|
- output->flush();
|
|
|
+ StringBuffer s;
|
|
|
+ DBGLOG("doIBYTI packet was too early : %s", header.toString(s).str());
|
|
|
}
|
|
|
-
|
|
|
- // If it's a retry, look it up against already running, or output stream, or input queue
|
|
|
- // if found, send an IBYTI and discard retry request
|
|
|
-
|
|
|
- if (!mySubchannel)
|
|
|
- retriesReceivedPrm++;
|
|
|
- else
|
|
|
- retriesReceivedSec++;
|
|
|
- bool alreadyRunning = false;
|
|
|
- Owned<IPooledThreadIterator> wi = queue.running();
|
|
|
- ForEach(*wi)
|
|
|
+ ibytiPacketsTooLate--;
|
|
|
+ ibytiPacketsTooEarly++;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ Owned<IRoxieQueryPacket> packet = createRoxiePacket(mb);
|
|
|
+ AgentContextLogger logctx(packet);
|
|
|
+ unsigned retries = header.thisChannelRetries(mySubchannel);
|
|
|
+ if (retries)
|
|
|
{
|
|
|
- CRoxieWorker &w = (CRoxieWorker &) wi->query();
|
|
|
- if (w.match(header))
|
|
|
+ // MORE - is this fast enough? By the time I am seeing retries I may already be under load. Could move onto a separate thread
|
|
|
+ assertex(header.channel); // should never see a retry on channel 0
|
|
|
+ if (retries >= SUBCHANNEL_MASK)
|
|
|
+ return; // someone sent a failure or something - ignore it
|
|
|
+
|
|
|
+ // Send back an out-of-band immediately, to let Roxie server know that channel is still active
|
|
|
+ if (!(testAgentFailure & 0x800))
|
|
|
+ {
|
|
|
+ RoxiePacketHeader newHeader(header, ROXIE_ALIVE, mySubchannel);
|
|
|
+ Owned<IMessagePacker> output = ROQ->createOutputStream(newHeader, true, logctx);
|
|
|
+ output->flush();
|
|
|
+ }
|
|
|
+
|
|
|
+ // If it's a retry, look it up against already running, or output stream, or input queue
|
|
|
+ // if found, send an IBYTI and discard retry request
|
|
|
+
|
|
|
+ if (!mySubchannel)
|
|
|
+ retriesReceivedPrm++;
|
|
|
+ else
|
|
|
+ retriesReceivedSec++;
|
|
|
+ bool alreadyRunning = false;
|
|
|
+ Owned<IPooledThreadIterator> wi = queue.running();
|
|
|
+ ForEach(*wi)
|
|
|
+ {
|
|
|
+ CRoxieWorker &w = (CRoxieWorker &) wi->query();
|
|
|
+ if (w.match(header))
|
|
|
+ {
|
|
|
+ alreadyRunning = true;
|
|
|
+ if (!mySubchannel)
|
|
|
+ retriesIgnoredPrm++;
|
|
|
+ else
|
|
|
+ retriesIgnoredSec++;
|
|
|
+ ROQ->sendIbyti(header, logctx, mySubchannel);
|
|
|
+ if (logctx.queryTraceLevel() > 10)
|
|
|
+ {
|
|
|
+ StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for running activity %s", mySubchannel, header.toString(xx).str());
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (!alreadyRunning && checkCompleted && ROQ->replyPending(header))
|
|
|
{
|
|
|
alreadyRunning = true;
|
|
|
if (!mySubchannel)
|
|
@@ -2268,35 +2587,33 @@ public:
|
|
|
ROQ->sendIbyti(header, logctx, mySubchannel);
|
|
|
if (logctx.queryTraceLevel() > 10)
|
|
|
{
|
|
|
- StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for running activity %s", mySubchannel, header.toString(xx).str());
|
|
|
+ StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for completed activity %s", mySubchannel, header.toString(xx).str());
|
|
|
}
|
|
|
- break;
|
|
|
}
|
|
|
- }
|
|
|
- if (!alreadyRunning && checkCompleted && ROQ->replyPending(header))
|
|
|
- {
|
|
|
- alreadyRunning = true;
|
|
|
- if (!mySubchannel)
|
|
|
- retriesIgnoredPrm++;
|
|
|
- else
|
|
|
- retriesIgnoredSec++;
|
|
|
- ROQ->sendIbyti(header, logctx, mySubchannel);
|
|
|
- if (logctx.queryTraceLevel() > 10)
|
|
|
+ if (!alreadyRunning)
|
|
|
{
|
|
|
- StringBuffer xx; logctx.CTXLOG("Ignored retry on subchannel %u for completed activity %s", mySubchannel, header.toString(xx).str());
|
|
|
+ if (logctx.queryTraceLevel() > 10)
|
|
|
+ {
|
|
|
+ StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str());
|
|
|
+ }
|
|
|
+ queue.enqueueUnique(packet.getClear(), mySubchannel);
|
|
|
}
|
|
|
}
|
|
|
- if (!alreadyRunning)
|
|
|
+ else // first time (not a retry).
|
|
|
{
|
|
|
- if (logctx.queryTraceLevel() > 10)
|
|
|
+#ifdef NEW_IBYTI
|
|
|
+ if (mySubchannel != 0) // i.e. I am not the primary here
|
|
|
{
|
|
|
- StringBuffer xx; logctx.CTXLOG("Retry %d received on subchannel %u for %s", retries+1, mySubchannel, header.toString(xx).str());
|
|
|
+ unsigned delay = 0;
|
|
|
+ for (unsigned subChannel = 0; subChannel < mySubchannel; subChannel++)
|
|
|
+ delay += getIbytiDelay(header.subChannels[subChannel].getIpAddress());
|
|
|
+ delayed.queryQueue(header.channel, mySubchannel).append(packet.getClear(), msTick()+delay);
|
|
|
}
|
|
|
- queue.enqueueUnique(packet.getClear(), mySubchannel);
|
|
|
+ else
|
|
|
+#endif
|
|
|
+ queue.enqueue(packet.getClear());
|
|
|
}
|
|
|
}
|
|
|
- else // first time (not a retry).
|
|
|
- queue.enqueue(packet.getClear());
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -2313,8 +2630,15 @@ public:
|
|
|
{
|
|
|
// NOTE - this thread needs to do as little as possible - just read packets and queue them up - otherwise we can get packet loss due to buffer overflow
|
|
|
// DO NOT put tracing on this thread except at very high tracelevels!
|
|
|
+#ifdef NEW_IBYTI
|
|
|
+ unsigned timeout = delayed.timeout(msTick());
|
|
|
+ if (timeout>5000)
|
|
|
+ timeout = 5000;
|
|
|
+#else
|
|
|
+ unsigned timeout = 5000;
|
|
|
+#endif
|
|
|
unsigned l;
|
|
|
- multicastSocket->read(mb.reserve(maxPacketSize), sizeof(RoxiePacketHeader), maxPacketSize, l, 5);
|
|
|
+ multicastSocket->readtms(mb.reserve(maxPacketSize), sizeof(RoxiePacketHeader), maxPacketSize, l, timeout);
|
|
|
mb.setLength(l);
|
|
|
packetsReceived++;
|
|
|
RoxiePacketHeader &header = *(RoxiePacketHeader *) mb.toByteArray();
|
|
@@ -2371,6 +2695,14 @@ public:
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
+#ifdef NEW_IBYTI
|
|
|
+ delayed.checkExpired(
|
|
|
+ msTick(),
|
|
|
+#ifdef ROXIE_SLA_LOGIC
|
|
|
+ slaQueue,
|
|
|
+#endif
|
|
|
+ hiQueue, loQueue);
|
|
|
+#endif
|
|
|
}
|
|
|
return 0;
|
|
|
}
|
|
@@ -2763,17 +3095,13 @@ public:
|
|
|
}
|
|
|
else
|
|
|
{
|
|
|
-#ifdef SUBCHANNELS_IN_HEADER
|
|
|
- // In SUBCHANNELS_IN_HEADER mode this translation has been done on server before sending
|
|
|
- throwUnexpected();
|
|
|
-#else
|
|
|
// Turn broadcast packet (channel 0), as early as possible, into non-0 channel packets.
|
|
|
// So retries and other communication with Roxie server (which uses non-0 channel numbers) will not cause double work or confusion.
|
|
|
+ // In SUBCHANNELS_IN_HEADER mode this translation has been done on server before sending, except for some control messages like PING or UNLOAD
|
|
|
for (unsigned i = 0; i < numChannels; i++)
|
|
|
{
|
|
|
targetQueue->enqueue(packet->clonePacket(i+1));
|
|
|
}
|
|
|
-#endif
|
|
|
}
|
|
|
}
|
|
|
}
|