Browse Source

Merge pull request #320 from richardkchapman/assert-single

Fix gh-204 - Roxie fails ASSERT_SINGLE_THREADED test
Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 14 years ago
parent
commit
fe538b5d92
1 changed files with 15 additions and 39 deletions
  1. 15 39
      roxie/udplib/udpmsgpk.cpp

+ 15 - 39
roxie/udplib/udpmsgpk.cpp

@@ -44,31 +44,6 @@ using roxiemem::IRowManager;
 
 bool streamingSupported = false;
 
-#ifdef _DEBUG
-#ifndef __APPLE__
-#define ASSERT_SINGLE_THREADED \
-{   \
-    static SpinLock paranoid;   \
-    SpinBlock pb(paranoid);     \
-    static unsigned lastTID = 0;    \
-    if (GetCurrentThreadId() != lastTID)    \
-    {   \
-        if (lastTID)    \
-        {   \
-            DBGLOG("Unexpected multithreading %u detected at line %u", lastTID, __LINE__);  \
-            assert(false);  \
-        }   \
-        lastTID = GetCurrentThreadId(); \
-    }   \
-}   
-#else
-#define ASSERT_SINGLE_THREADED {}
-#endif
-#else
-#define ASSERT_SINGLE_THREADED {}
-#endif
-
-
 atomic_t unwantedDiscarded;
 atomic_t packetsRetried;
 atomic_t packetsAbandoned;
@@ -484,11 +459,12 @@ PUID GETPUID(DataBuffer *dataBuff)
 
 class CMessageCollator : public CInterface, implements IMessageCollator
 {
-    seq_map_que         que;
+    seq_map_que         queue;
     msg_map             mapping;
     bool                activity;
     bool                memLimitExceeded;
-    CriticalSection     cr;
+    CriticalSection     queueCrit;
+    CriticalSection     mapCrit;
     InterruptableSemaphore sem;
     Linked<IRowManager> rowMgr;
     ruid_t ruid;
@@ -510,10 +486,10 @@ public:
     {
         if (checkTraceLevel(TRACE_MSGPACK, 3))
             DBGLOG("UdpCollator: CMessageCollator::~CMessageCollator ruid="RUIDF", this=%p", ruid, this);
-        while (!que.empty())
+        while (!queue.empty())
         {
-            PackageSequencer *pkSqncr = que.front();
-            que.pop();
+            PackageSequencer *pkSqncr = queue.front();
+            queue.pop();
             pkSqncr->Release();
         }
     }
@@ -530,7 +506,6 @@ public:
 
     virtual bool add_package(DataBuffer *dataBuff) 
     {
-        ASSERT_SINGLE_THREADED;         // This is called by only one thread "CPacketCollator::run()"
         UdpPacketHeader *pktHdr = (UdpPacketHeader*) dataBuff->data;
         if (checkTraceLevel(TRACE_MSGPACK, 4))
         {
@@ -553,6 +528,7 @@ public:
         totalBytesReceived += pktHdr->length;
         PUID puid = GETPUID(dataBuff);
         // MORE - I think we leak a PackageSequencer for messages that we only receive parts of - maybe only an issue for "catchall" case
+        CriticalBlock b(mapCrit);
         PackageSequencer *pkSqncr = mapping.getValue(puid);
         bool isNew = false;
         bool isComplete = false;
@@ -566,14 +542,14 @@ public:
         isComplete = pkSqncr->insert(dataBuff);
         if (streamingSupported ? isNew : isComplete)
         {
-            cr.enter();
+            queueCrit.enter();
             pkSqncr->Link();
-            que.push(pkSqncr);
+            queue.push(pkSqncr);
             sem.signal();
-            cr.leave();
+            queueCrit.leave();
         }
         if (isComplete)
-            mapping.remove(puid); // doesn't need protecting as mapping only accessed on this thread.
+            mapping.remove(puid);
         return(true);
     }
 
@@ -594,10 +570,10 @@ public:
         }
         if (sem.wait(time_out)) 
         {
-            cr.enter();
-            PackageSequencer *pkSqncr = que.front();
-            que.pop();
-            cr.leave();
+            queueCrit.enter();
+            PackageSequencer *pkSqncr = queue.front();
+            queue.pop();
+            queueCrit.leave();
             anyActivity = true;
             activity = false;
             return new CMessageResult(pkSqncr);