소스 검색

Merge branch 'candidate-7.6.x' into candidate-7.8.0

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 5 년 전
부모
커밋
bcdfea3226

+ 40 - 28
roxie/udplib/udpsha.cpp

@@ -97,14 +97,6 @@ queue_t::~queue_t()
     delete [] elements; 
 }
 
-bool queue_t::empty() 
-{
-    c_region.enter();
-    bool res = (active_buffers == 0);
-    c_region.leave();
-    return res;
-}
-
 int queue_t::free_slots() 
 {
     int res=0;
@@ -148,9 +140,10 @@ void queue_t::pushOwn(DataBuffer *buf)
     data_avail.signal();
 }
 
-DataBuffer *queue_t::pop()
+DataBuffer *queue_t::pop(bool block)
 {
-    data_avail.wait();
+    if (!data_avail.wait(block ? INFINITE : 0))
+        return nullptr;
     DataBuffer *ret = NULL; 
     bool must_signal;
     {
@@ -171,30 +164,49 @@ DataBuffer *queue_t::pop()
 }
 
 
-bool queue_t::removeData(const void *key, PKT_CMP_FUN pkCmpFn)
+unsigned queue_t::removeData(const void *key, PKT_CMP_FUN pkCmpFn)
 {
-    bool ret = false;
-    CriticalBlock b(c_region);
-    if (active_buffers) 
+    unsigned removed = 0;
+    unsigned signalFree = 0;
+    unsigned signalFreeSlots = 0;
     {
-        unsigned ix = first;
-        for (;;)
+        CriticalBlock b(c_region);
+        if (active_buffers)
         {
-            if (elements[ix].data && 
-                ((key == NULL) || (pkCmpFn == NULL) || pkCmpFn((const void*) elements[ix].data, key)))
+            unsigned destix = first;
+            unsigned ix = first;
+            for (;;)
             {
-                ::Release(elements[ix].data);
-                elements[ix].data = NULL;  // safer than trying to remove it and close up queue - race conditions with code elsewhere
-                ret = true;
+                if (elements[ix].data && (!key || !pkCmpFn || pkCmpFn((const void*) elements[ix].data, key)))
+                {
+                    ::Release(elements[ix].data);
+                    signalFree++;
+                    active_buffers--;
+                    removed++;
+                }
+                else
+                    elements[destix++] = elements[ix];
+                ix++;
+                if (ix==element_count)
+                    ix = 0;
+                if (destix==element_count)
+                    destix = 0;
+                if (ix == last)
+                    break;
             }
-            ix++;
-            if (ix==element_count)
-                ix = 0;
-            if (ix == last)
-                break;
-        }           
+            if (signalFree && signal_free_sl)
+            {
+                signal_free_sl--;
+                signalFreeSlots++;
+            }
+            last = destix;
+        }
     }
-    return ret;
+    if (signalFree)
+        free_space.signal(signalFree);
+    if (signalFreeSlots)
+        free_sl.signal(signalFreeSlots);
+    return removed;
 }
 
 

+ 2 - 3
roxie/udplib/udpsha.hpp

@@ -75,10 +75,9 @@ class queue_t
 public: 
     void interrupt();
     void pushOwn(roxiemem::DataBuffer *buffer);
-    roxiemem::DataBuffer *pop();
-    bool empty() ;
+    roxiemem::DataBuffer *pop(bool block);
     bool dataQueued(const void *key, PKT_CMP_FUN pkCmpFn);
-    bool removeData(const void *key, PKT_CMP_FUN pkCmpFn);
+    unsigned removeData(const void *key, PKT_CMP_FUN pkCmpFn);
     int  free_slots(); //block if no free slots
     void set_queue_size(unsigned int queue_size); //must be called immediately after constructor if default constructor is used
     queue_t(unsigned int queue_size);

+ 2 - 2
roxie/udplib/udptrr.cpp

@@ -706,7 +706,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
         }
         else if (i >= free)
         {
-            if (udpTraceLevel)
+            if ((i > free) && (udpTraceLevel))
                 DBGLOG("UdpReceiver: ERROR: more packets in flight (%d) than slots free (%d)", i, free);  // Should never happen
             inflight = i = free-1;
         }
@@ -769,7 +769,7 @@ class CReceiveManager : implements IReceiveManager, public CInterface
     {
         while(running) 
         {
-            DataBuffer *dataBuff = input_queue->pop();
+            DataBuffer *dataBuff = input_queue->pop(true);
             collatePacket(dataBuff);
         }
     }

+ 40 - 44
roxie/udplib/udptrs.cpp

@@ -171,16 +171,15 @@ public:
         while (toSend.size() < maxPackets && packetsQueued.load(std::memory_order_relaxed))
         {
             DataBuffer *buffer = popQueuedData();
-            if (buffer) // Aborted slave queries leave NULL records on queue
-            {
-                UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
-                toSend.push_back(buffer);
-                totalSent += header->length;
+            if (!buffer)
+                break;  // Suggests data was aborted before we got to pop it
+            UdpPacketHeader *header = (UdpPacketHeader*) buffer->data;
+            toSend.push_back(buffer);
+            totalSent += header->length;
 #if defined(__linux__) || defined(__APPLE__)
-                if (isLocal && (totalSent> 100000))  // Avoids sending too fast to local node, for reasons lost in the mists of time
-                    break;
+            if (isLocal && (totalSent> 100000))  // Avoids sending too fast to local node, for reasons lost in the mists of time
+                break;
 #endif
-            }
         }
         for (DataBuffer *buffer: toSend)
         {
@@ -229,17 +228,16 @@ public:
     bool removeData(void *key, PKT_CMP_FUN pkCmpFn) 
     {
         // Used after receiving an abort, to avoid sending data that is no longer required
-        bool anyRemoved = false;
+        unsigned removed = 0;
         if (packetsQueued.load(std::memory_order_relaxed))
         {
-            // NOTE - removeData replaces entries by null (so value of packetsQueued is not affected)
             for (unsigned i = 0; i < numQueues; i++)
             {
-                if (output_queue[i].removeData(key, pkCmpFn))
-                    anyRemoved = true;
+                removed += output_queue[i].removeData(key, pkCmpFn);
             }
+            packetsQueued -= removed;
         }
-        return anyRemoved;
+        return removed > 0;
     }
 
     void abort()
@@ -266,49 +264,47 @@ public:
     DataBuffer *popQueuedData() 
     {
         DataBuffer *buffer;
-        while (1) 
+        for (unsigned i = 0; i < numQueues; i++)
         {
-            for (unsigned i = 0; i < numQueues; i++) 
+            if (udpOutQsPriority)
             {
-                if (udpOutQsPriority) 
+                buffer = output_queue[current_q].pop(false);
+                if (!buffer)
                 {
-                    if (output_queue[current_q].empty()) 
+                    if (udpTraceLevel >= 5)
+                        DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
+                    currentQNumPkts = 0;
+                    current_q = (current_q + 1) % numQueues;
+                }
+                else
+                {
+                    currentQNumPkts++;
+                    if (udpTraceLevel >= 5)
+                        DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
+                    if (currentQNumPkts >= maxPktsPerQ[current_q])
                     {
-                        if (udpTraceLevel >= 5)
-                            DBGLOG("UdpSender: ---------- Empty Q %d", current_q);
                         currentQNumPkts = 0;
                         current_q = (current_q + 1) % numQueues;
                     }
-                    else 
-                    {
-                        buffer = output_queue[current_q].pop();
-                        currentQNumPkts++;
-                        if (udpTraceLevel >= 5) 
-                            DBGLOG("UdpSender: ---------- Packet from Q %d", current_q);
-                        if (currentQNumPkts >= maxPktsPerQ[current_q]) 
-                        {
-                            currentQNumPkts = 0;
-                            current_q = (current_q + 1) % numQueues;
-                        }
-                        packetsQueued--;
-                        return buffer;
-                    }
+                    packetsQueued--;
+                    return buffer;
                 }
-                else 
+            }
+            else
+            {
+                current_q = (current_q + 1) % numQueues;
+                buffer = output_queue[current_q].pop(false);
+                if (buffer)
                 {
-                    current_q = (current_q + 1) % numQueues;
-                    if (!output_queue[current_q].empty()) 
-                    {
-                        packetsQueued--;
-                        return output_queue[current_q].pop();
-                    }
+                    packetsQueued--;
+                    return buffer;
                 }
             }
-            // If we get here, it suggests we were told to get a buffer but no queue has one
-            // Should never happen
-            MilliSleep(10);
-            DBGLOG("UdpSender: ------------- this code should never execute --------------- ");
         }
+        // If we get here, it suggests we were told to get a buffer but no queue has one.
+        // This should be rare but possible if data gets removed following an abort, as
+        // there is a window in abort() between the remove and the decrement of packetsQueued.
+        return nullptr;
     }
 
     UdpReceiverEntry(const IpAddress &_ip, const IpAddress &_sourceIP, unsigned _numQueues, unsigned _queueSize, unsigned _sendFlowPort, unsigned _dataPort)

+ 3 - 0
system/jlib/jsem.cpp

@@ -80,6 +80,9 @@ bool Semaphore::wait(unsigned timeout)
     if (sem_trywait(&sem) == 0)
         return true;
 
+    if (timeout==0)
+        return false;
+
     timespec abs;
     getEndTime(abs, timeout);
     int ret = sem_timedwait(&sem, &abs);

+ 5 - 1
thorlcr/graph/thgraphmaster.cpp

@@ -1837,7 +1837,6 @@ bool CJobMaster::go()
         EXCLOG(e, NULL); 
         jobDoneException.setown(e);
     }
-    fatalHandler->clear();
     queryTempHandler()->clearTemps();
     slaveMsgHandler->stop();
     if (jobDoneException.get())
@@ -1978,6 +1977,11 @@ bool CJobMaster::fireException(IException *e)
     return true;
 }
 
+IFatalHandler *CJobMaster::clearFatalHandler()
+{
+    return fatalHandler.getClear();
+}
+
 // CJobMasterChannel
 
 CJobMasterChannel::CJobMasterChannel(CJobBase &job, IMPServer *mpServer, unsigned channel) : CJobChannel(job, mpServer, channel)

+ 2 - 0
thorlcr/graph/thgraphmaster.ipp

@@ -265,6 +265,8 @@ public:
     __int64 queryNodeDiskUsage(unsigned node);
     void setNodeDiskUsage(unsigned node, __int64 sz);
     bool queryCreatedFile(const char *file);
+
+    virtual IFatalHandler *clearFatalHandler();
 };
 
 class graphmaster_decl CJobMasterChannel : public CJobChannel

+ 4 - 0
thorlcr/master/thgraphmanager.cpp

@@ -925,6 +925,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
     addJob(*job);
     bool allDone = false;
     Owned<IException> exception;
+    Owned<IFatalHandler> fatalHdlr;
     try
     {
         struct CounterBlock
@@ -972,9 +973,12 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
         setWuid(nullptr);
         throw exception.getClear();
     }
+    fatalHdlr.setown(job->clearFatalHandler());
     job.clear();
     PROGLOG("Finished wuid=%s, graph=%s", wuid.str(), graphName);
 
+    fatalHdlr->clear();
+
     setWuid(NULL);
     return allDone;
 }