浏览代码

Merge pull request #352 from jakesmith/xref

Resolve bottleneck on directory operations

Reviewed-by: Gavin Halliday <ghalliday@hpccsystems.com>
Gavin Halliday 13 年之前
父节点
当前提交
479d29d89b
共有 2 个文件被更改,包括 97 次插入31 次删除
  1. 96 31
      common/remote/sockfile.cpp
  2. 1 0
      system/jlib/jsocket.hpp

+ 96 - 31
common/remote/sockfile.cpp

@@ -161,7 +161,7 @@ struct dummyReadWrite
 // backward compatible modes
 typedef enum { compatIFSHnone, compatIFSHread, compatIFSHwrite, compatIFSHexec, compatIFSHall} compatIFSHmode;
 
-static const char *VERSTRING= "DS V1.7e - 6 "       // dont forget FILESRV_VERSION in header
+static const char *VERSTRING= "DS V1.7e - 7 "       // dont forget FILESRV_VERSION in header
 #ifdef _WIN32
 "Windows ";
 #else
@@ -1418,24 +1418,28 @@ public:
         return 0;
     }
 
-    static bool serialize(MemoryBuffer &mb,IDirectoryIterator *iter, size32_t bufsize)
+    static bool serialize(MemoryBuffer &mb,IDirectoryIterator *iter, size32_t bufsize, bool first)
     {
         bool ret = true;
         byte b=1;
         StringBuffer tmp;
-        ForEach(*iter) {
-            mb.append(b);
-            bool isdir = iter->isDir();
-            __int64 sz = isdir?0:iter->getFileSize();
-            CDateTime dt;
-            iter->getModifiedTime(dt);
-            iter->getName(tmp.clear());
-            mb.append(isdir).append(sz);
-            dt.serialize(mb);
-            mb.append(tmp.str());
-            if (bufsize&&(mb.length()>=bufsize-1)) {
-                ret = false;
-                break;
+        if (first && iter->first() || iter->next()) {
+            loop {
+                mb.append(b);
+                bool isdir = iter->isDir();
+                __int64 sz = isdir?0:iter->getFileSize();
+                CDateTime dt;
+                iter->getModifiedTime(dt);
+                iter->getName(tmp.clear());
+                mb.append(isdir).append(sz);
+                dt.serialize(mb);
+                mb.append(tmp.str());
+                if (bufsize&&(mb.length()>=bufsize-1)) {
+                    ret = false;
+                    break;
+                }
+                if (!iter->next())
+                    break;
             }
         }               
         b = 0;
@@ -1453,7 +1457,7 @@ public:
             byte b = 2;
             mb.append(b).append((unsigned)flags.length()).append(flags);
         }
-        serialize(mb,iter,0);
+        serialize(mb,iter,0,true);
     }
 
     void serialize(MemoryBuffer &mb,bool isdiff)
@@ -1463,12 +1467,60 @@ public:
             b = 2;
             mb.append(b).append(numflags).append(numflags,flags);
         }
-        serialize(mb,this,0);
+        serialize(mb,this,0,true);
     }
 
 };
 
-CriticalSection CRemoteDirectoryIterator::crit;
+class CCritTable;
+class CEndpointCS : public CriticalSection, public CInterface
+{
+    CCritTable &table;
+    const SocketEndpoint &ep;
+public:
+    CEndpointCS(CCritTable &_table, const SocketEndpoint &_ep) : table(_table), ep(_ep) { }
+    const void *queryFindParam() const { return &ep; }
+
+    virtual void beforeDispose();
+};
+
+class CCritTable : private SimpleHashTableOf<CEndpointCS, const SocketEndpoint>
+{
+    typedef SimpleHashTableOf<CEndpointCS, const SocketEndpoint> PARENT;
+    CriticalSection crit;
+public:
+    CEndpointCS *getCrit(const SocketEndpoint &ep)
+    {
+        CriticalBlock b(crit);
+        Linked<CEndpointCS> clientCrit = find(ep);
+        if (!clientCrit || !clientCrit->isAlive()) // if !isAlive(), then it is in the process of being destroyed/removed.
+        {
+            clientCrit.setown(new CEndpointCS(*this, ep));
+            replace(*clientCrit); // NB table doesn't own
+        }
+        return clientCrit.getClear();
+    }
+    void removeExact(CEndpointCS *clientCrit)
+    {
+        CriticalBlock b(crit);
+        PARENT::removeExact(clientCrit); // NB may not exist, could have been replaced if detected !isAlive() in getCrit()
+    }
+} *dirCSTable;
+
+MODULE_INIT(INIT_PRIORITY_STANDARD)
+{
+    dirCSTable = new CCritTable;
+    return true;
+}
+MODULE_EXIT()
+{
+    delete dirCSTable;
+}
+
+void CEndpointCS::beforeDispose()
+{
+    table.removeExact(this);
+}
 
 
 class CRemoteFile : public CRemoteBase, implements IFile
@@ -1710,10 +1762,14 @@ public:
         MemoryBuffer replyBuffer;
         bool includedirs = true;
         bool sub=false;
-        sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
-        sendRemoteCommand(sendBuffer, replyBuffer);
+        {
+            //Could be removed with new dafilesrv change [ (stream != 0) ], since this is not streaming.
+            Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
+            CriticalBlock block(*crit);
+            sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
+            sendRemoteCommand(sendBuffer, replyBuffer);
+        }
         // now should be 0 or 1 files returned
-        CriticalBlock block(CRemoteDirectoryIterator::crit);
         Owned<CRemoteDirectoryIterator> iter = new CRemoteDirectoryIterator(ep, dir.str());
         iter->appendBuf(replyBuffer);
         if (!iter->first())
@@ -1748,9 +1804,12 @@ public:
     {
         if (mask&&!*mask)
             return createDirectoryIterator("",""); // NULL iterator
-        CriticalBlock block(CRemoteDirectoryIterator::crit);
+
         CRemoteDirectoryIterator *ret = new CRemoteDirectoryIterator(ep, filename);
         byte stream=1;
+
+        Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
+        CriticalBlock block(*crit);
         loop {
             MemoryBuffer sendBuffer;
             initSendBuffer(sendBuffer);
@@ -1774,7 +1833,6 @@ public:
                                   Semaphore *abortsem=NULL) // returns NULL if timed out
     {
         // abortsem not yet supported
-        CriticalBlock block(CRemoteDirectoryIterator::crit);
         MemoryBuffer sendBuffer;
         initSendBuffer(sendBuffer);
         MemoryBuffer replyBuffer;
@@ -1785,7 +1843,7 @@ public:
         byte isprev=(prev!=NULL)?1:0;
         sendBuffer.append(isprev);
         if (prev) 
-            CRemoteDirectoryIterator::serialize(sendBuffer,prev,0);
+            CRemoteDirectoryIterator::serialize(sendBuffer,prev,0,true);
         sendRemoteCommand(sendBuffer, replyBuffer);
         byte status;
         replyBuffer.read(status);
@@ -1800,7 +1858,6 @@ public:
     bool getInfo(bool &isdir,offset_t &size,CDateTime &modtime)
     {
         // do this by using dir call (could be improved with new function but this not *too* bad)
-        CriticalBlock block(CRemoteDirectoryIterator::crit);
         StringBuffer dir;
         const char *tail = splitDirTail(filename,dir);
         if (!dir.length())
@@ -1810,8 +1867,13 @@ public:
         MemoryBuffer replyBuffer;
         bool includedirs = true;
         bool sub=false;
-        sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
-        sendRemoteCommand(sendBuffer, replyBuffer);
+        {
+            //Could be removed with new dafilesrv change [ (stream != 0) ], since this is not streaming.
+            Owned<CEndpointCS> crit = dirCSTable->getCrit(ep); // NB dirCSTable doesn't own, last reference will remove from table
+            CriticalBlock block(*crit);
+            sendBuffer.append((RemoteFileCommandType)RFCgetdir).append(dir).append(tail).append(includedirs).append(sub);
+            sendRemoteCommand(sendBuffer, replyBuffer);
+        }
         // now should be 0 or 1 files returned
         Owned<CRemoteDirectoryIterator> iter = new CRemoteDirectoryIterator(ep, dir.str());
         iter->appendBuf(replyBuffer);
@@ -3821,7 +3883,7 @@ public:
         bool sub;
         byte stream = 0;
         msg.read(name).read(mask).read(includedir).read(sub);
-        if (msg.remaining()>sizeof(byte)) {
+        if (msg.remaining()>=sizeof(byte)) {
             msg.read(stream);
             if (stream==1)
                 client.opendir.clear();
@@ -3837,15 +3899,18 @@ public:
                 iter.set(client.opendir);
             else {
                 iter.setown(dir->directoryFiles(mask.length()?mask.get():NULL,sub,includedir));
-                client.opendir.set(iter);
+                if (stream != 0)
+                    client.opendir.set(iter);
             }
             if (!iter) {
                 reply.append((unsigned)RFSERR_GetDirFailed);
                 return false;
             }
             reply.append((unsigned)RFEnoerror);
-            if (CRemoteDirectoryIterator::serialize(reply,iter,stream?0x100000:0)) 
-                client.opendir.clear();
+            if (CRemoteDirectoryIterator::serialize(reply,iter,stream?0x100000:0,stream<2)) {
+                if (stream != 0)
+                    client.opendir.clear();
+            }
             else {
                 bool cont=true;
                 reply.append(cont);

+ 1 - 0
system/jlib/jsocket.hpp

@@ -156,6 +156,7 @@ public:
         ipset(other);
         port = other.port;
     }
+	bool operator == (const SocketEndpoint &other) const { return equals(other); }
 
     unsigned hash(unsigned prev) const;