Forráskód Böngészése

Merge pull request #7574 from jakesmith/hpcc-13545

HPCC-13545 Expose lock info to clients.

Reviewed-By: Kevin Wang <kevin.wang@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 éve
szülő
commit
9c2316c867

+ 26 - 7
dali/base/dacsds.cpp

@@ -31,6 +31,7 @@
 #include "dasess.hpp"
 #include "daclient.hpp"
 #include "dadfs.hpp"
+#include "dautils.hpp"
 
 #include "dasds.ipp" // common header for client/server sds
 #include "dacsds.ipp"
@@ -1862,14 +1863,12 @@ StringBuffer &CClientSDSManager::getInfo(SdsDiagCommand cmd, StringBuffer &out)
                 case DIAG_CMD_STATS:
                     formatUsageStats(mb, out);
                     break;
-                case DIAG_CMD_LOCKINFO:
-                    size32_t sz;
-                    mb.read(sz);
-                    out.append(sz, (const char *)mb.readDirect(sz));
-                    break;
                 case DIAG_CMD_CONNECTIONS:
                     formatConnections(mb, out);
                     break;
+                case DIAG_CMD_SUBSCRIBERS:
+                    formatSubscribers(mb, out);
+                    break;
             }
             break;
         }
@@ -1882,9 +1881,29 @@ StringBuffer &CClientSDSManager::getInfo(SdsDiagCommand cmd, StringBuffer &out)
     return out;
 }
 
-StringBuffer &CClientSDSManager::getLocks(StringBuffer &out)
+ILockInfoCollection *CClientSDSManager::getLocks(const char *ipPattern, const char *xpathPattern)
 {
-    return getInfo(DIAG_CMD_LOCKINFO, out);
+    CMessageBuffer msg;
+    msg.append((int)DAMP_SDSCMD_DIAGNOSTIC);
+    msg.append((int)DIAG_CMD_LOCKINFO);
+    msg.append(ipPattern?ipPattern:"");
+    msg.append(xpathPattern?xpathPattern:"");
+
+    if (!queryCoven().sendRecv(msg, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST))
+        throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "getLocks");
+
+    SdsReply replyMsg;
+    msg.read((int &)replyMsg);
+    switch (replyMsg)
+    {
+        case DAMP_SDSREPLY_OK:
+            break;
+        case DAMP_SDSREPLY_ERROR:
+            throwMbException("SDS Reply Error ", msg);
+        default:
+            assertex(false);
+    }
+    return deserializeLockInfoCollection(msg);
 }
 
 StringBuffer &CClientSDSManager::getUsageStats(StringBuffer &out)

+ 1 - 1
dali/base/dacsds.ipp

@@ -405,7 +405,7 @@ public:
     virtual SubscriptionId subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue=false);
     virtual void unsubscribe(SubscriptionId id);
     virtual void unsubscribeExact(SubscriptionId id);
-    virtual StringBuffer &getLocks(StringBuffer &out);
+    virtual ILockInfoCollection *getLocks(const char *ipPattern, const char *xpathPattern);
     virtual StringBuffer &getUsageStats(StringBuffer &out);
     virtual StringBuffer &getConnections(StringBuffer &out);
     virtual StringBuffer &getSubscribers(StringBuffer &out);

+ 7 - 5
dali/base/dadiags.cpp

@@ -27,6 +27,7 @@
 #include "daserver.hpp"
 #include "dasds.hpp"
 #include "dasubs.ipp"
+#include "dautils.hpp"
 #include "dadiags.hpp"
 
 #ifdef _MSC_VER
@@ -121,16 +122,17 @@ public:
                 else if (0 == stricmp(id, "mpqueue")) {
                     mb.append(getReceiveQueueDetails(buf).str());
                 }
-                else if (0 == stricmp(id, "locks")) {
-                    mb.append(querySDS().getLocks(buf).str());
+                else if (0 == stricmp(id, "locks")) { // Legacy - newer diag clients should use querySDS().getLocks() directly
+                    Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks();
+                    mb.append(lockInfoCollection->toString(buf).str());
                 }
-                else if (0 == stricmp(id, "sdsstats")) {
+                else if (0 == stricmp(id, "sdsstats")) { // Legacy - newer diag clients should use querySDS().getUsageStats() directly
                     mb.append(querySDS().getUsageStats(buf).str());
                 }
-                else if (0 == stricmp(id, "connections")) {
+                else if (0 == stricmp(id, "connections")) { // Legacy - newer diag clients should use querySDS().getConnections() directly
                     mb.append(querySDS().getConnections(buf).str());
                 }
-                else if (0 == stricmp(id, "sdssubscribers")) {
+                else if (0 == stricmp(id, "sdssubscribers")) { // Legacy - newer diag clients should use querySDS().getSubscribers() directly
                     mb.append(querySDS().getSubscribers(buf).str());
                 }
                 else if (0 == stricmp(id, "clients")) {

+ 0 - 1
dali/base/dadiags.hpp

@@ -25,7 +25,6 @@
 extern da_decl StringBuffer & getDaliDiagnosticValue(const char *name,StringBuffer &ret);
 extern da_decl MemoryBuffer & getDaliDiagnosticValue(MemoryBuffer &m);
 
-
 // for server use
 interface IDaliServer;
 extern da_decl IDaliServer *createDaliDiagnosticsServer(); // called for coven members

+ 135 - 175
dali/base/dasds.cpp

@@ -20,6 +20,7 @@
 #include "jhash.hpp"
 #include "jlib.hpp"
 #include "jfile.hpp"
+#include "jregexp.hpp"
 #include "jthread.hpp"
 #include "javahash.hpp"
 #include "javahash.tpp"
@@ -896,13 +897,6 @@ struct DebugInfo
 };
 #endif
 
-struct LockData
-{
-    unsigned mode;
-    SessionId sessId;
-    unsigned timeLockObtained;
-};
-
 class BoolSetBlock
 {
 public:
@@ -1871,8 +1865,8 @@ private:
 
 //////////
 
-class CLockInfo;
-typedef ThreadSafeOwningSimpleHashTableOf<CLockInfo, __int64> CLockInfoTable;
+class CLock;
+typedef ThreadSafeOwningSimpleHashTableOf<CLock, __int64> CLockTable;
 
 
 //////////
@@ -1939,7 +1933,7 @@ public:
     void changeLockMode(CServerConnection &connection, unsigned newMode, unsigned timeout);
     void clearSDSLocks();
     void lock(CServerRemoteTree &tree, const char *xpath, ConnectionId connectionId, SessionId sessionId, unsigned mode, unsigned timeout, IUnlockCallback &callback);
-    CLockInfo *queryLockInfo(__int64 id) { return lockTable.find(&id); }
+    CLock *queryLock(__int64 id) { return lockTable.find(&id); }
     CSubscriberTable &querySubscriberTable() { return subscribers; }
     IExternalHandler *queryExternalHandler(const char *handler) { if (!handler) return NULL; CExternalHandlerMapping *mapping = externalHandlers.find(handler); return mapping ? &mapping->query() : NULL; }
     void handleNotify(CSubscriberContainerBase &subscriber, MemoryBuffer &notifyData);
@@ -1997,7 +1991,7 @@ public:
     virtual SubscriptionId subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue=false);
     virtual void unsubscribe(SubscriptionId id);
     virtual void unsubscribeExact(SubscriptionId id);
-    virtual StringBuffer &getLocks(StringBuffer &out);
+    virtual ILockInfoCollection *getLocks(const char *ipPattern, const char *xpathPattern);
     virtual StringBuffer &getUsageStats(StringBuffer &out);
     virtual StringBuffer &getConnections(StringBuffer &out);
     virtual StringBuffer &getSubscribers(StringBuffer &out);
@@ -2052,7 +2046,7 @@ public: // data
     Owned<IPropertyTree> properties;
 private:
     void validateBackup();
-    LockStatus establishLock(CLockInfo &lockInfo, __int64 treeId, ConnectionId connectionId, SessionId sessionId, unsigned mode, unsigned timeout, IUnlockCallback &lockCallback);
+    LockStatus establishLock(CLock &lock, __int64 treeId, ConnectionId connectionId, SessionId sessionId, unsigned mode, unsigned timeout, IUnlockCallback &lockCallback);
     void _getChildren(CRemoteTreeBase &parent, CServerRemoteTree &serverParent, CRemoteConnection &connection, unsigned levels);
     void matchServerTree(CClientRemoteTree *local, IPropertyTree &matchTree, bool allTail);
 
@@ -2066,7 +2060,7 @@ private:
     Owned<ICoalesce> coalesce;
     unsigned __int64 nextExternal;
     unsigned externalSizeThreshold;
-    CLockInfoTable lockTable;
+    CLockTable lockTable;
     CNotifyHandlerTable nodeNotifyHandlers;
     Owned<IThreadPool> scanNotifyPool, notifyPool;
     CExternalHandlerTable externalHandlers;
@@ -3165,24 +3159,24 @@ PDState CServerRemoteTree::processData(IPropertyTree &changeTree, Owned<CBranchC
 
 class CPendingLockBlock
 {
-    CLockInfo &lockInfo;
+    CLock &lock;
 public:
-    CPendingLockBlock(CLockInfo &_lockInfo);
+    CPendingLockBlock(CLock &_lock);
     ~CPendingLockBlock();
 };
 
 typedef Int64Array IdPath;
-typedef MapBetween<ConnectionId, ConnectionId, LockData, LockData> ConnectionInfoMap;
 #define LOCKSESSCHECK (1000*60*5)
-class CLockInfo : public CInterface, implements IInterface
+
+class CLock : public CInterface, implements IInterface
 {
     DECL_NAMEDCOUNT;
 
-    CLockInfoTable &table;
+    CLockTable &table;
     unsigned sub, readLocks, holdLocks, pending, waiting;
     IdPath idPath;
     ConnectionInfoMap connectionInfo;
-    CheckedCriticalSection crit;
+    mutable CheckedCriticalSection crit;
     Semaphore sem;
     StringAttr xpath;
     __int64 treeId;
@@ -3290,7 +3284,7 @@ class CLockInfo : public CInterface, implements IInterface
                         {
                             waiting--;
                             StringBuffer s("Infinite timeout lock still waiting: ");
-                            getLockInfo(s);
+                            toString(s);
                             PROGLOG("%s", s.str());
                         }
                         {
@@ -3454,10 +3448,7 @@ class CLockInfo : public CInterface, implements IInterface
         {
             if (RTM_LOCK_SUB & mode)
                 sub++;
-            LockData ld;
-            ld.mode = mode;
-            ld.sessId = sessId;
-            ld.timeLockObtained = msTick();
+            LockData ld(mode, sessId, msTick());
             connectionInfo.setValue(id, ld);
         }
         return LockSucceeded;
@@ -3475,7 +3466,7 @@ class CLockInfo : public CInterface, implements IInterface
 public:
     IMPLEMENT_IINTERFACE;
 
-    CLockInfo(CLockInfoTable &_table, __int64 _treeId, IdPath &_idPath, const char *_xpath, unsigned mode, ConnectionId id, SessionId sessId)
+    CLock(CLockTable &_table, __int64 _treeId, IdPath &_idPath, const char *_xpath, unsigned mode, ConnectionId id, SessionId sessId)
         : table(_table), treeId(_treeId), xpath(_xpath), exclusive(false), sub(0), readLocks(0), holdLocks(0), waiting(0), pending(0)
     {
         INIT_NAMEDCOUNT;
@@ -3484,7 +3475,7 @@ public:
             idPath.append(_idPath.item(i));
     }
 
-    ~CLockInfo()
+    ~CLock()
     {
         if (parent)
             clearLastRef();
@@ -3585,7 +3576,7 @@ public:
 
     inline void removePending()
     {
-        Linked<CLockInfo> destroy;
+        Linked<CLock> destroy;
         {
             CHECKEDCRITICALBLOCK(crit, fakeCritTimeout);
             pending--;
@@ -3621,9 +3612,9 @@ public:
             case LockFailed:
                 throw MakeSDSException(SDSExcpt_ConnectionAbsent, "Lost connection performing changeMode on connection to : %s", xpath.get());
             case LockTimedOut:
-                throw MakeSDSException(SDSExcpt_LockTimeout, "Lock timeout performing changeMode on connection to : %s, existing lock info: %s", xpath.get(), getLockInfo(s).str());
+                throw MakeSDSException(SDSExcpt_LockTimeout, "Lock timeout performing changeMode on connection to : %s, existing lock info: %s", xpath.get(), toString(s).str());
             case LockHeld:
-                throw MakeSDSException(SDSExcpt_LockHeld, "Lock is held performing changeMode on connection to : %s, existing lock info: %s", xpath.get(), getLockInfo(s).str());
+                throw MakeSDSException(SDSExcpt_LockHeld, "Lock is held performing changeMode on connection to : %s, existing lock info: %s", xpath.get(), toString(s).str());
             }
         }
     }
@@ -3640,58 +3631,12 @@ public:
         return NULL!=connectionInfo.getValue(connectionId);
     }
 
-    const char *queryXPath() const
-    {
-        return xpath;
-    }
+    const char *queryXPath() const { return xpath; }
 
-    StringBuffer &getLockInfo(StringBuffer &out)
+    ILockInfo *getLockInfo() const
     {
-        unsigned nlocks=0;
-        MemoryBuffer locks;
-        UInt64Array keys;
-        {
-            CHECKEDCRITICALBLOCK(crit, fakeCritTimeout);
-            HashIterator iter(connectionInfo);
-            ForEach(iter)
-            {
-                IMapping &imap = iter.query();
-                LockData *lD = connectionInfo.mapToValue(&imap);
-                keys.append(* ((ConnectionId *) imap.getKey()));
-                locks.append(sizeof(LockData), lD);
-                ++nlocks;
-            }
-        }
-
-        unsigned msNow = msTick();
-        out.append("Locks on path: /").append(xpath).newline();
-        out.append("Endpoint            |SessionId       |ConnectionId    |mode    |time(duration)]").newline().newline();
-        unsigned l = 0;
-        if (nlocks)
-        {
-            loop
-            {
-                LockData lD;
-                memcpy(&lD, ((const byte *)locks.toByteArray())+l*sizeof(LockData), sizeof(LockData));
-                ConnectionId connId = keys.item(l);
-
-                StringBuffer sessEpStr;
-                unsigned lockedFor = msNow-lD.timeLockObtained;
-                CDateTime time;
-                time.setNow();
-                time_t tt = time.getSimple() - (lockedFor/1000);
-                time.set(tt);
-                StringBuffer timeStr;
-                time.getString(timeStr);
-                out.appendf("%-20s|%-16" I64F "x|%-16" I64F "x|%-8x|%s(%d ms)", querySessionManager().getClientProcessEndpoint(lD.sessId, sessEpStr).str(), lD.sessId, connId, lD.mode, timeStr.str(), lockedFor);
-                ++l;
-                if (l>=nlocks)
-                    break;
-                out.newline();
-            }
-        }
-        out.newline();
-        return out;
+        CHECKEDCRITICALBLOCK(crit, fakeCritTimeout);
+        return createLockInfo(xpath, connectionInfo); // NB: doesn't resolve sessionId to Endpoint string at this point
     }
 
     void setDROLR(CServerRemoteTree *_parent, CServerRemoteTree *_child)
@@ -3706,25 +3651,26 @@ public:
         parent.set(_parent);
         child.set(_child);
     }
+    StringBuffer &toString(StringBuffer &out) const
+    {
+        Owned<ILockInfo> lockInfo = getLockInfo();
+        return lockInfo->toString(out, 0, true);
+    }
 };
 
-CPendingLockBlock::CPendingLockBlock(CLockInfo &_lockInfo) : lockInfo(_lockInfo)
+CPendingLockBlock::CPendingLockBlock(CLock &_lock) : lock(_lock)
 {
-    lockInfo.addPending();
+    lock.addPending();
 }
 
 CPendingLockBlock::~CPendingLockBlock()
 {
-    lockInfo.removePending();
+    lock.removePending();
 }
 
-typedef ICopyArrayOf<CLockInfo> CLockInfoArray;
-
-///////////
-
-template <> void CLockInfoTable::onRemove(void *et)
+template <> void CLockTable::onRemove(void *et)
 {
-    ((CLockInfo*)et)->Release();
+    ((CLock*)et)->Release();
 }
 
 
@@ -3820,12 +3766,11 @@ int CSDSTransactionServer::run()
                             {
                                 case DIAG_CMD_LOCKINFO:
                                 {
-                                    StringBuffer out;
-                                    SDSManager->getLocks(out);
+                                    StringAttr ipPattern, xpathPattern;
+                                    mb.read(ipPattern).read(xpathPattern);
                                     mb.clear().append(DAMP_SDSREPLY_OK);
-                                    mb.append(out.length());
-                                    mb.append(out.length(), out.str());
-
+                                    Owned<ILockInfoCollection> lockInfoCollection = SDSManager->getLocks(ipPattern, xpathPattern);
+                                    lockInfoCollection->serialize(mb);
                                     break;
                                 }
                                 case DIAG_CMD_STATS:
@@ -4675,7 +4620,7 @@ IPropertyTree *CServerConnection::queryRoot()
 }
 ////////////////
 
-void CLockInfo::clearLastRef()
+void CLock::clearLastRef()
 {
     if (parent)
     {
@@ -7332,8 +7277,8 @@ CServerConnection *CCovenSDSManager::createConnectionInstance(CRemoteTreeBase *r
 void CCovenSDSManager::clearSDSLocks()
 {
     CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-    SuperHashIteratorOf<CLockInfo> iter(lockTable.queryBaseTable());
-    ICopyArrayOf<CLockInfo> locks;
+    SuperHashIteratorOf<CLock> iter(lockTable.queryBaseTable());
+    ICopyArrayOf<CLock> locks;
     ForEach(iter)
         locks.append(iter.query());
     ForEachItemIn(l, locks)
@@ -7350,10 +7295,10 @@ void CCovenSDSManager::changeLockMode(CServerConnection &connection, unsigned ne
     CUnlockCallback callback(connection.queryXPath(), connectionId, *tree);
     {
         CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-        CLockInfo *lockInfo = queryLockInfo(treeId);
-        if (lockInfo)
+        CLock *lock = queryLock(treeId);
+        if (lock)
         {
-            lockInfo->changeMode(connectionId, connection.querySessionId(), newMode, timeout, callback);
+            lock->changeMode(connectionId, connection.querySessionId(), newMode, timeout, callback);
             connection.setMode(newMode);
             return;
         }
@@ -7382,9 +7327,9 @@ bool CCovenSDSManager::unlock(__int64 connectionId, bool close, StringBuffer &co
         PROGLOG("forcing unlock for connection : %s", connectionInfo.str());
         __int64 nodeId = ((CRemoteTreeBase *)connection->queryRoot())->queryServerId();
         CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-        CLockInfo *lockInfo = queryLockInfo(nodeId);
-        if (lockInfo)
-            lockInfo->unlock(connectionId);
+        CLock *lock = queryLock(nodeId);
+        if (lock)
+            lock->unlock(connectionId);
     }
     return true;
 }
@@ -7392,65 +7337,65 @@ bool CCovenSDSManager::unlock(__int64 connectionId, bool close, StringBuffer &co
 bool CCovenSDSManager::unlock(__int64 treeId, ConnectionId connectionId, bool delayDelete)
 {
     CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-    CLockInfo *lockInfo = queryLockInfo(treeId);
-    if (lockInfo)
-        return lockInfo->unlock(connectionId, delayDelete);
+    CLock *lock = queryLock(treeId);
+    if (lock)
+        return lock->unlock(connectionId, delayDelete);
     return false;
 }
 
 void CCovenSDSManager::unlockAll(__int64 treeId)
 {
     CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-    CLockInfo *lockInfo = queryLockInfo(treeId);
-    if (lockInfo)
-        lockInfo->unlockAll();
+    CLock *lock = queryLock(treeId);
+    if (lock)
+        lock->unlockAll();
 }
 
-LockStatus CCovenSDSManager::establishLock(CLockInfo &lockInfo, __int64 treeId, ConnectionId connectionId, SessionId sessionId, unsigned mode, unsigned timeout, IUnlockCallback &lockCallback)
+LockStatus CCovenSDSManager::establishLock(CLock &lock, __int64 treeId, ConnectionId connectionId, SessionId sessionId, unsigned mode, unsigned timeout, IUnlockCallback &lockCallback)
 {
-    LockStatus res = lockInfo.lock(mode, timeout, connectionId, sessionId, lockCallback);
+    LockStatus res = lock.lock(mode, timeout, connectionId, sessionId, lockCallback);
     if (res == LockSucceeded && server.queryStopped())
     {
-        lockInfo.unlock(connectionId);
+        lock.unlock(connectionId);
         throw MakeSDSException(SDSExcpt_ServerStoppedLockAborted);
     }
     return res;
 }
 
-void CCovenSDSManager::lock(CServerRemoteTree &tree, const char *__xpath, ConnectionId connectionId, SessionId sessionId, unsigned mode, unsigned timeout, IUnlockCallback &callback)
+void CCovenSDSManager::lock(CServerRemoteTree &tree, const char *xpath, ConnectionId connectionId, SessionId sessionId, unsigned mode, unsigned timeout, IUnlockCallback &callback)
 {
-    if (0 == ((RTM_LOCK_READ | RTM_LOCK_WRITE) & mode)) // no point in creating lockInfo.
+    if (0 == ((RTM_LOCK_READ | RTM_LOCK_WRITE) & mode)) // no point in creating lock.
         return;
-    CLockInfo *lockInfo = NULL;
-    StringAttr sxpath;
-    char *_xpath = (char *) (('/' == *__xpath) ? __xpath+1 : __xpath);
-    char *xpath;
-    if ('/' == _xpath[strlen(_xpath)-1])
-        xpath = (char *)_xpath;
-    else
+    CLock *lock = NULL;
+    StringBuffer sxpath;
+    if ('/' != *xpath)
     {
-        unsigned l = strlen(_xpath);
-        xpath = (char *)malloc(l+2);
-        memcpy(xpath, _xpath, l);
-        xpath[l] = '/';
-        xpath[l+1] = '\0';
-        sxpath.setown(xpath);
+        sxpath.append('/').append(xpath);
+        if ('/' != sxpath.charAt(sxpath.length()-1))
+            sxpath.append('/');
+        xpath = sxpath.str();
+    }
+    else if ('/' != xpath[strlen(xpath)-1])
+    {
+        sxpath.append(xpath);
+        sxpath.append('/');
+        xpath = sxpath.str();
     }
 
     __int64 treeId = tree.queryServerId();
     CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-    lockInfo = lockTable.find(&treeId);
+    lock = lockTable.find(&treeId);
     
-    if (!lockInfo)
+    if (!lock)
     {
         IdPath idPath;
-        lockInfo = new CLockInfo(lockTable, treeId, idPath, xpath, mode, connectionId, sessionId);
-        lockTable.replace(*lockInfo);
+        lock = new CLock(lockTable, treeId, idPath, xpath, mode, connectionId, sessionId);
+        lockTable.replace(*lock);
     }
     else
     {
-        Linked<CLockInfo> tmp = lockInfo; // keep it alive could be destroyed whilst blocked in call below.
-        LockStatus result = establishLock(*lockInfo, treeId, connectionId, sessionId, mode, timeout, callback);
+        Linked<CLock> tmp = lock; // keep it alive could be destroyed whilst blocked in call below.
+        LockStatus result = establishLock(*lock, treeId, connectionId, sessionId, mode, timeout, callback);
         if (result != LockSucceeded)
         {
             if (!queryConnection(connectionId)) return; // connection aborted.
@@ -7460,9 +7405,9 @@ void CCovenSDSManager::lock(CServerRemoteTree &tree, const char *__xpath, Connec
             case LockFailed:
                 throw MakeSDSException(SDSExcpt_ConnectionAbsent, "Lost connection trying to establish lock on connection to : %s", xpath);
             case LockTimedOut:
-                throw MakeSDSException(SDSExcpt_LockTimeout, "Lock timeout trying to establish lock to %s, existing lock info: %s", xpath, lockInfo->getLockInfo(s).str());
+                throw MakeSDSException(SDSExcpt_LockTimeout, "Lock timeout trying to establish lock to %s, existing lock info: %s", xpath, lock->toString(s).str());
             case LockHeld:
-                throw MakeSDSException(SDSExcpt_LockHeld, "Lock is held trying to establish lock to %s, existing lock info: %s", xpath, lockInfo->getLockInfo(s).str());
+                throw MakeSDSException(SDSExcpt_LockHeld, "Lock is held trying to establish lock to %s, existing lock info: %s", xpath, lock->toString(s).str());
             }
         }
     }
@@ -7582,24 +7527,24 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi
                                     {
                                         if (tm.timedout(&remaining))
                                         {
-                                            Linked<CLockInfo> lockInfo;
+                                            Linked<CLock> lock;
                                             VStringBuffer timeoutMsg("Failed to establish lock to %s, timeout whilst retrying connection to orphaned connection path", xpath);
                                             ForEachItemIn(f, freeExistingLocks.existingLockTrees)
                                             {
                                                 CServerRemoteTree &e = freeExistingLocks.existingLockTrees.item(f);
                                                 {
                                                     CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-                                                    CLockInfo *_lockInfo = queryLockInfo(e.queryServerId());
-                                                    if (_lockInfo)
+                                                    CLock *_lock = queryLock(e.queryServerId());
+                                                    if (_lock)
                                                     {
-                                                        if (!lockInfo)
+                                                        if (!lock)
                                                             timeoutMsg.append(", existing lock info: ");
                                                         timeoutMsg.newline();
-                                                        lockInfo.set(_lockInfo);
+                                                        lock.set(_lock);
                                                     }
                                                 }
-                                                if (lockInfo)
-                                                    lockInfo->getLockInfo(timeoutMsg);
+                                                if (lock)
+                                                    lock->toString(timeoutMsg);
                                             }
                                             throw MakeSDSException(SDSExcpt_LockTimeout, "%s", timeoutMsg.str());
                                         }
@@ -7785,11 +7730,11 @@ void CCovenSDSManager::disconnect(ConnectionId id, bool deleteRoot, Owned<CLCLoc
         if (deleteRoot || RTM_MODE(connection->queryMode(), RTM_DELETE_ON_DISCONNECT))
         {
             CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-            CLockInfo *lockInfo = queryLockInfo(tree->queryServerId());
-            if (lockInfo)
+            CLock *lock = queryLock(tree->queryServerId());
+            if (lock)
             {
                 deleteRoot = false;
-                lockInfo->setDROLR((CServerRemoteTree *)connection->queryParent(), tree);
+                lock->setDROLR((CServerRemoteTree *)connection->queryParent(), tree);
             }
             else
                 deleteRoot = true;
@@ -7849,23 +7794,6 @@ void CCovenSDSManager::disconnect(ConnectionId id, bool deleteRoot, Owned<CLCLoc
     connection->unsubscribeSession();
 }
 
-StringBuffer &CCovenSDSManager::getLocks(StringBuffer &out)
-{
-    CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-    SuperHashIteratorOf<CLockInfo> iter(lockTable.queryBaseTable());    
-    iter.first();
-    while (iter.isValid())
-    {
-        CLockInfo &lockInfo = iter.query();
-        if (lockInfo.lockCount())
-            lockInfo.getLockInfo(out);
-        if (!iter.next())
-            break;
-        if (out.length()) out.newline();
-    }
-    return out.length() ? out : out.append("No current locks");
-}
-
 StringBuffer &formatUsageStats(MemoryBuffer &src, StringBuffer &out)
 {
     unsigned c;
@@ -7969,14 +7897,47 @@ unsigned CCovenSDSManager::countActiveLocks()
 {
     unsigned activeLocks = 0;
     CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-    SuperHashIteratorOf<CLockInfo> iter(lockTable.queryBaseTable());    
+    SuperHashIteratorOf<CLock> iter(lockTable.queryBaseTable());
     ForEach(iter) {
-        CLockInfo &lockInfo = iter.query();
-        if (lockInfo.lockCount()) activeLocks++;
+        CLock &lock = iter.query();
+        if (lock.lockCount()) activeLocks++;
     }
     return activeLocks;
 }
 
+ILockInfoCollection *CCovenSDSManager::getLocks(const char *ipPattern, const char *xpathPattern)
+{
+    Owned<ILockInfoCollection> lockInfoCollection = createLockInfoCollection();
+
+    bool filteredConnections = !isEmptyString(ipPattern);
+    bool filteredXPaths = !isEmptyString(xpathPattern);
+    CLockInfoArray locks;
+    {
+        CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
+        SuperHashIteratorOf<CLock> iter(lockTable.queryBaseTable());
+        ForEach(iter)
+        {
+            CLock &lock = iter.query();
+            if (lock.lockCount())
+            {
+                if (!filteredXPaths || WildMatch(lock.queryXPath(), xpathPattern))
+                    locks.append(* lock.getLockInfo());
+            }
+        }
+    }
+    if (filteredConnections)
+    {
+        ForEachItemIn(c, locks)
+        {
+            ILockInfo &lockInfo = locks.item(c);
+            lockInfo.prune(ipPattern);
+        }
+    }
+    ForEachItemIn(l, locks)
+        lockInfoCollection->add(* LINK(&locks.item(l)));
+    return lockInfoCollection.getClear();
+}
+
 MemoryBuffer &CCovenSDSManager::collectUsageStats(MemoryBuffer &out)
 {
     { CHECKEDCRITICALBLOCK(cTableCrit, fakeCritTimeout);
@@ -7984,11 +7945,11 @@ MemoryBuffer &CCovenSDSManager::collectUsageStats(MemoryBuffer &out)
     }
     unsigned activeLocks = 0;
     { CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-        SuperHashIteratorOf<CLockInfo> iter(lockTable.queryBaseTable());    
+        SuperHashIteratorOf<CLock> iter(lockTable.queryBaseTable());
         ForEach(iter)
         {
-            CLockInfo &lockInfo = iter.query();
-            if (lockInfo.lockCount()) activeLocks++;
+            CLock &lock = iter.query();
+            if (lock.lockCount()) activeLocks++;
         }
     }
     out.append(activeLocks);
@@ -8028,13 +7989,6 @@ void CCovenSDSManager::blockingSave(unsigned *writeTransactions)
     SDSManager->saveStore();
 }
 
-StringBuffer &CCovenSDSManager::getUsageStats(StringBuffer &out)
-{
-    MemoryBuffer mb;
-    formatUsageStats(collectUsageStats(mb), out);
-    return out;
-}
-
 bool CCovenSDSManager::updateEnvironment(IPropertyTree *newEnv, bool forceGroupUpdate, StringBuffer &response)
 {
     Owned<IRemoteConnection> conn = querySDS().connect("/",myProcessSession(),0, INFINITE);
@@ -8068,7 +8022,6 @@ StringBuffer &CCovenSDSManager::getExternalReport(StringBuffer &out)
     return out;
 }
 
-
 StringBuffer &CCovenSDSManager::getConnections(StringBuffer &out)
 {
     MemoryBuffer mb;
@@ -8083,6 +8036,13 @@ StringBuffer &CCovenSDSManager::getSubscribers(StringBuffer &out)
     return out;
 }
 
+StringBuffer &CCovenSDSManager::getUsageStats(StringBuffer &out)
+{
+    MemoryBuffer mb;
+    formatUsageStats(collectUsageStats(mb), out);
+    return out;
+}
+
 void CCovenSDSManager::handleNodeNotify(notifications n, CServerRemoteTree &tree)
 {
     const char *handlerKey = tree.queryProp(NOTIFY_ATTR);

+ 2 - 1
dali/base/dasds.hpp

@@ -98,6 +98,7 @@ interface IRemoteConnections : extends IInterface
     virtual unsigned queryConnections() = 0;
 };
 
+interface ILockInfoCollection;
 interface ISDSManager
 {
     virtual ~ISDSManager() { }
@@ -107,7 +108,7 @@ interface ISDSManager
     virtual SubscriptionId subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue=false) = 0;
     virtual void unsubscribe(SubscriptionId id) = 0;
     virtual void unsubscribeExact(SubscriptionId id) = 0;
-    virtual StringBuffer &getLocks(StringBuffer &out) = 0;
+    virtual ILockInfoCollection *getLocks(const char *ipPattern=NULL, const char *xpathPattern=NULL) = 0;
     virtual StringBuffer &getUsageStats(StringBuffer &out) = 0;
     virtual StringBuffer &getConnections(StringBuffer &out) = 0;
     virtual StringBuffer &getSubscribers(StringBuffer &out) = 0;

+ 209 - 0
dali/base/dautils.cpp

@@ -577,6 +577,40 @@ void CDfsLogicalFileName::setForeign(const SocketEndpoint &daliep,bool checkloca
     set(str);
 }
 
+static bool begins(const char *&ln,const char *pat)
+{
+    size32_t sz = strlen(pat);
+    if (memicmp(ln,pat,sz)==0) {
+        ln += sz;
+        return true;
+    }
+    return false;
+}
+
+bool CDfsLogicalFileName::setFromXPath(const char *xpath)
+{
+    StringBuffer lName;
+    const char *s = xpath;
+    if (!begins(s, "/Files"))
+        return false;
+
+    while (*s && (begins(s, "/Scope[@name=\"") || begins(s, "/File[@name=\"") || begins(s, "/SuperFile[@name=\"")))
+    {
+        if (lName.length())
+            lName.append("::");
+        while (*s && (*s != '"'))
+            lName.append(*(s++));
+        if (*s == '"')
+            s++;
+        if (*s == ']')
+            s++;
+    }
+    if (0 == lName.length())
+        return false;
+    return setValidate(lName);
+}
+
+
 void CDfsLogicalFileName::clearForeign()
 {
     StringBuffer str;
@@ -3080,3 +3114,178 @@ bool traceAllTransactions(bool on)
     return ret;
 }
 
+class CLockInfo : public CSimpleInterfaceOf<ILockInfo>
+{
+    StringAttr xpath;
+    SafePointerArrayOf<CLockMetaData> ldInfo;
+public:
+    CLockInfo(MemoryBuffer &mb)
+    {
+        mb.read(xpath);
+        unsigned count;
+        mb.read(count);
+        if (count)
+        {
+            ldInfo.ensure(count);
+            for (unsigned c=0; c<count; c++)
+                ldInfo.append(new CLockMetaData(mb));
+        }
+    }
+    CLockInfo(const char *_xpath, const ConnectionInfoMap &map) : xpath(_xpath)
+    {
+        HashIterator iter(map);
+        ForEach(iter)
+        {
+            IMapping &imap = iter.query();
+            LockData *lD = map.mapToValue(&imap);
+            ConnectionId connId = * ((ConnectionId *) imap.getKey());
+            ldInfo.append(new CLockMetaData(*lD, connId));
+        }
+    }
+// ILockInfo impl.
+    virtual const char *queryXPath() const { return xpath; }
+    virtual unsigned queryConnections() const { return ldInfo.ordinality(); }
+    virtual CLockMetaData &queryLockData(unsigned lock) const
+    {
+        return *ldInfo.item(lock);
+    }
+    virtual void prune(const char *ipPattern)
+    {
+        StringBuffer ipStr;
+        ForEachItemInRev(c, ldInfo)
+        {
+            CLockMetaData &lD = *ldInfo.item(c);
+            SocketEndpoint ep(lD.queryEp());
+            ep.getIpText(ipStr.clear());
+            if (!WildMatch(ipStr, ipPattern))
+                ldInfo.zap(&lD);
+        }
+    }
+    virtual void serialize(MemoryBuffer &mb) const
+    {
+        mb.append(xpath);
+        mb.append(ldInfo.ordinality());
+        ForEachItemIn(c, ldInfo)
+            ldInfo.item(c)->serialize(mb);
+    }
+    virtual StringBuffer &toString(StringBuffer &out, unsigned format, bool header, const char *altText) const
+    {
+        if (ldInfo.ordinality())
+        {
+            unsigned msNow = msTick();
+            CDateTime time;
+            time.setNow();
+            time_t timeSimple = time.getSimple();
+
+            if (header)
+            {
+                switch (format)
+                {
+                    case 0: // internal
+                    {
+                        out.append("Locks on path: ").append(altText ? altText : xpath.get()).newline();
+                        out.append("Endpoint            |SessionId       |ConnectionId    |mode    |time(duration)]").newline();
+                        break;
+                    }
+                    case 1: // daliadmin
+                    {
+                        out.appendf("Server IP           |Session         |Mode    |Time                |Duration    |Lock").newline();
+                        out.appendf("====================|================|========|====================|============|======").newline();
+                        break;
+                    }
+                    default:
+                        throwUnexpected();
+                }
+            }
+
+            CDateTime timeLocked;
+            StringBuffer timeStr;
+            unsigned c = 0;
+            loop
+            {
+                CLockMetaData &lD = *ldInfo.item(c);
+                unsigned lockedFor = msNow-lD.timeLockObtained;
+                time_t tt = timeSimple - (lockedFor/1000);
+                timeLocked.set(tt);
+                timeLocked.getString(timeStr.clear());
+
+                switch (format)
+                {
+                    case 0: // internal
+                        out.appendf("%-20s|%-16" I64F "x|%-16" I64F "x|%-8x|%s(%d ms)", lD.queryEp(), lD.sessId, lD.connectionId, lD.mode, timeStr.str(), lockedFor);
+                        break;
+                    case 1: // daliadmin
+                        out.appendf("%-20s|%-16" I64F "x|%-8x|%-20s|%-12d|%s", lD.queryEp(), lD.sessId, lD.mode, timeStr.str(), lockedFor, altText ? altText : xpath.get());
+                        break;
+                    default:
+                        throwUnexpected();
+                }
+                ++c;
+                if (c>=ldInfo.ordinality())
+                    break;
+                out.newline();
+            }
+
+        }
+        return out;
+    }
+};
+
+ILockInfo *createLockInfo(const char *xpath, const ConnectionInfoMap &map)
+{
+    return new CLockInfo(xpath, map);
+}
+
+ILockInfo *deserializeLockInfo(MemoryBuffer &mb)
+{
+    return new CLockInfo(mb);
+}
+
+class CLockInfoCollection : public CSimpleInterfaceOf<ILockInfoCollection>
+{
+    CLockInfoArray locks;
+public:
+    CLockInfoCollection() { }
+    CLockInfoCollection(MemoryBuffer &mb)
+    {
+        unsigned lockCount;
+        mb.read(lockCount);
+        for (unsigned l=0; l<lockCount; l++)
+        {
+            Owned<ILockInfo> lockInfo = deserializeLockInfo(mb);
+            locks.append(* lockInfo.getClear());
+        }
+    }
+// ILockInfoCollection impl.
+    virtual unsigned queryLocks() const { return locks.ordinality(); }
+    virtual ILockInfo &queryLock(unsigned lock) const { return locks.item(lock); }
+    virtual void serialize(MemoryBuffer &mb) const
+    {
+        mb.append(locks.ordinality());
+        ForEachItemIn(l, locks)
+            locks.item(l).serialize(mb);
+    }
+    virtual StringBuffer &toString(StringBuffer &out) const
+    {
+        if (0 == locks.ordinality())
+            out.append("No current locks").newline();
+        else
+        {
+            ForEachItemIn(l, locks)
+                locks.item(l).toString(out, 0, true).newline();
+        }
+        return out;
+    }
+    virtual void add(ILockInfo &lock) { locks.append(lock); }
+};
+
+ILockInfoCollection *createLockInfoCollection()
+{
+    return new CLockInfoCollection();
+}
+
+ILockInfoCollection *deserializeLockInfoCollection(MemoryBuffer &mb)
+{
+    return new CLockInfoCollection(mb);
+}
+

+ 91 - 0
dali/base/dautils.hpp

@@ -71,6 +71,7 @@ public:
     void set(const CDfsLogicalFileName &lfn);
     void set(const char *scopes,const char *tail);
     bool setFromMask(const char *partmask,const char *rootdir=NULL);
+    bool setFromXPath(const char *xpath);
     void clear();
     bool isSet() const;
     /*
@@ -420,4 +421,94 @@ interface ILocalOrDistributedFile: extends IInterface
 
 extern da_decl ILocalOrDistributedFile* createLocalOrDistributedFile(const char *fname,IUserDescriptor *user,bool onlylocal,bool onlydfs,bool iswrite=false);
 
+typedef __int64 ConnectionId;
+
+struct LockData
+{
+    unsigned mode;
+    SessionId sessId;
+    unsigned timeLockObtained;
+
+    LockData(unsigned _mode, SessionId _sessId, unsigned _timeLockObtained) : mode(_mode), sessId(_sessId), timeLockObtained(_timeLockObtained)
+    {
+    }
+    LockData(const LockData &other)
+    {
+        mode = other.mode;
+        sessId = other.sessId;
+        timeLockObtained = other.timeLockObtained;
+    }
+    LockData(MemoryBuffer &mb)
+    {
+        mb.read(mode).read(sessId).read(timeLockObtained);
+    }
+    void serialize(MemoryBuffer &mb) const
+    {
+        mb.append(mode).append(sessId).append(timeLockObtained);
+    }
+};
+
+typedef MapBetween<ConnectionId, ConnectionId, LockData, LockData> ConnectionInfoMap;
+
+class CLockMetaData : public LockData
+{
+    mutable StringAttr ep;
+    mutable bool epResolved;
+public:
+    ConnectionId connectionId;
+
+    CLockMetaData(LockData &lD, ConnectionId _connectionId) : LockData(lD), connectionId(_connectionId)
+    {
+        epResolved = false;
+    }
+    CLockMetaData(MemoryBuffer &mb) : LockData(mb)
+    {
+        mb.read(connectionId).read(ep);
+        epResolved = true;
+    }
+    void serialize(MemoryBuffer &mb) const
+    {
+        LockData::serialize(mb);
+        mb.append(connectionId).append(queryEp());
+    }
+    const char *queryEp() const
+    {
+        if (!epResolved)
+        {
+            epResolved = true;
+            StringBuffer sessionEpStr;
+            querySessionManager().getClientProcessEndpoint(sessId, sessionEpStr);
+            ep.set(sessionEpStr);
+        }
+        return ep;
+    }
+};
+
+interface ILockInfo : extends IInterface
+{
+    virtual const char *queryXPath() const = 0;
+    virtual unsigned queryConnections() const = 0;
+    virtual CLockMetaData &queryLockData(unsigned lock) const = 0;
+    virtual void prune(const char *ipPattern) = 0;
+    virtual void serialize(MemoryBuffer &mb) const = 0;
+    virtual StringBuffer &toString(StringBuffer &out, unsigned format, bool header, const char *altText=NULL) const = 0;
+};
+
+extern da_decl ILockInfo *createLockInfo(const char *xpath, const ConnectionInfoMap &map);
+extern da_decl ILockInfo *deserializeLockInfo(MemoryBuffer &mb);
+
+typedef IArrayOf<ILockInfo> CLockInfoArray;
+typedef IIteratorOf<ILockInfo> ILockInfoIterator;
+
+interface ILockInfoCollection : extends IInterface
+{
+    virtual unsigned queryLocks() const = 0;
+    virtual ILockInfo &queryLock(unsigned lock) const = 0;
+    virtual void serialize(MemoryBuffer &mb) const = 0;
+    virtual StringBuffer &toString(StringBuffer &out) const = 0;
+    virtual void add(ILockInfo &lock) = 0;
+};
+extern da_decl ILockInfoCollection *createLockInfoCollection();
+extern da_decl ILockInfoCollection *deserializeLockInfoCollection(MemoryBuffer &mb);
+
 #endif

+ 68 - 135
dali/daliadmin/daliadmin.cpp

@@ -115,7 +115,7 @@ void usage(const char *exe)
   printf("  daliping [ <num> ]              -- time dali server connect\n");
   printf("  getxref <destxmlfile>           -- get all XREF information\n");
   printf("  dalilocks [ <ip-pattern> ] [ files ] -- get all locked files/xpaths\n");
-  printf("  unlock <xpath or logicalfile>   --  unlocks either matching xpath(s) or matching logical file(s), can contain wildcards\n");
+  printf("  unlock <xpath or logicalfile> <[path|file]> --  unlocks either matching xpath(s) or matching logical file(s), can contain wildcards\n");
   printf("  validatestore [fix=<true|false>]\n"
          "                [verbose=<true|false>]\n"
          "                [deletefiles=<true|false>]-- perform some checks on dali meta data an optionally fix or remove redundant info \n");
@@ -2271,144 +2271,71 @@ static bool begins(const char *&ln,const char *pat)
     return false;
 }
 
-static void dodalilocks(const char *pattern,const char *obj,Int64Array *conn,bool filesonly)
-{
-    StringBuffer buf;
-    getDaliDiagnosticValue("locks",buf);
-    for (int pass=(filesonly?1:0);pass<2;pass++) {
-        bool headerdone = false;
-        StringBuffer line;
-        StringBuffer curfile;
-        StringBuffer curxpath;
-        StringBuffer ips;
-        StringBuffer times;
-        StringBuffer sessid;
-        StringBuffer connid;
-        const char *s = buf.str();
-        loop {
-            line.clear();
-            while (*s&&(*s!='\n')) 
-                line.append(*(s++));
-            if (line.length()) {
-                const char *ln = line.str();
-                if (begins(ln,"Locks on path: ")) {
-                    curfile.clear();
-                    curxpath.clear();
-                    const char *x = ln;
-                    while (*x)
-                        curxpath.append(*(x++));
-                    if (begins(ln,"/Files")) {
-                        while (*ln&&(begins(ln,"/Scope[@name=\"")||begins(ln,"/File[@name=\"")||begins(ln,"/SuperFile[@name=\""))) {
-                            if (curfile.length())
-                                curfile.append("::");
-                            while (*ln&&(*ln!='"'))
-                                curfile.append(*(ln++));
-                            if (*ln=='"')
-                                ln++;
-                            if (*ln==']')
-                                ln++;
-                        }
-                    }
-                }
-                else if (isdigit(*ln)) {
-                    if (obj) 
-                        if (!curxpath.length()||!WildMatch(curxpath.str(),obj)) 
-                            if (!curfile.length()||!WildMatch(curfile.str(),obj)) 
-                                continue;
-                    if ((curfile.length()!=0)==(pass==1)) {
-                        ips.clear();
-                        while (*ln&&(*ln!=':'))
-                            ips.append(*(ln++));
-                        if (!pattern||WildMatch(ips.str(),pattern)) {
-                            ips.append(*ln++);
-                            while (isdigit(*ln))
-                                ips.append(*ln++);
-                            while (*ln!='|')    
-                                ln++;
-                            ln++; // sessid start
-                            sessid.clear();
-                            while (*ln!='|') {
-                                sessid.append(*ln);
-                                ln++;
-                            }
-                            sessid.clip();
-                            ln++; // connectid start
-                            connid.clear();
-                            while (*ln!='|') {
-                                connid.append(*ln);
-                                ln++;
-                            }
-                            connid.clip();
-                            ln++; // mode start
-                            unsigned mode = 0;
-                            while (isdigit(*ln))
-                                mode = mode*10+(*(ln++)-'0');
-                            while (*ln!='|')
-                                ln++;
-                            ln++; // duration start
-                            times.clear();
-                            while (*ln&&(*ln!='('))
-                                times.append(*(ln++));
-                            ln++;
-                            unsigned duration = 0;
-                            while (isdigit(*ln))
-                                duration = duration*10+(*(ln++)-'0');
-                            if (conn) {
-                                bool err;
-                                __int64 c = (__int64)hextoll(connid.str(),err);
-                                if (!err) {
-                                    bool found = false;
-                                    ForEachItemIn(i,*conn) 
-                                        if (c==conn->item(i))
-                                            found = true;
-                                    if (!found)
-                                        conn->append(c);
-                                }
-                            }
-                            else {
-                                if (!headerdone) {
-                                    OUTLOG( "\nServer IP         , session   ,mode, time              ,duration ,%s",pass?"File":"XPath");
-                                    OUTLOG(   "==================,===========,===,====================,=========,=====");
-                                    headerdone = true;
-                                }
-                                OUTLOG("%s, %s, %d, %s, %d, %s",ips.str(),sessid.str(),mode,times.str(),duration,pass?curfile.str():curxpath.str());
-                            }
-                        }
-                    }
-                }
-            }
-            if (!*s)
-                break;
-            s++;
+static void dalilocks(const char *ipPattern, bool files)
+{
+    Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks(ipPattern, files ? "/Files/*" : NULL);
+    bool headers = true;
+    CDfsLogicalFileName dlfn;
+    for (unsigned l=0; l<lockInfoCollection->queryLocks(); l++)
+    {
+        ILockInfo &lockInfo = lockInfoCollection->queryLock(l);
+        if (files)
+        {
+            if (!dlfn.setFromXPath(lockInfo.queryXPath()))
+                continue;
         }
+        if (0 == lockInfo.queryConnections())
+            continue;
+        StringBuffer lockFormat;
+        lockInfo.toString(lockFormat, 1, headers, files ? dlfn.get() : NULL);
+        headers = false;
+        PROGLOG("%s", lockFormat.str());
+    }
+    if (headers) // if still true, no locks matched
+    {
+        printf("No lock(s) found\n");
+        return;
     }
-}
-
-static void dalilocks(const char *pattern,bool fileonly)
-{
-    dodalilocks(pattern,NULL,NULL,fileonly);
 }
 
 //=============================================================================
 
-static void unlock(const char *pattern)
-{
-    Int64Array conn;
-    dodalilocks(NULL,pattern,&conn,false);
-    ForEachItemIn(i,conn) {
-        MemoryBuffer mb;
-        __int64 connectionId = conn.item(i);
-        bool success;
-        bool disconnect = false;        // TBD?
-        mb.append("unlock").append(connectionId).append(disconnect);
-        getDaliDiagnosticValue(mb);
-        mb.read(success);
-        StringBuffer connectionInfo;
-        if (!success)
-            PROGLOG("Lock %" I64F "x not found",connectionId);
-        else {
-            mb.read(connectionInfo);
-            PROGLOG("Lock %" I64F "x successfully removed: %s", connectionId, connectionInfo.str());
+static void unlock(const char *pattern, bool files)
+{
+    Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks(NULL, files ? "/Files/*" : pattern);
+    for (unsigned l=0; l<lockInfoCollection->queryLocks(); l++)
+    {
+        ILockInfo &lockInfo = lockInfoCollection->queryLock(l);
+        bool match = false;
+        if (files)
+        {
+            CDfsLogicalFileName dlfn;
+            dlfn.setAllowWild(true);
+            if (dlfn.setFromXPath(lockInfo.queryXPath()))
+                match = WildMatch(dlfn.get(), pattern);
+        }
+        else
+            match = WildMatch(lockInfo.queryXPath(), pattern);
+        if (match)
+        {
+            for (unsigned c=0; c<lockInfo.queryConnections(); c++)
+            {
+                ConnectionId connectionId = lockInfo.queryLockData(c).connectionId;
+                bool disconnect = false;        // TBD?
+                MemoryBuffer mb;
+                mb.append("unlock").append(connectionId).append(disconnect);
+                getDaliDiagnosticValue(mb);
+                bool success;
+                mb.read(success);
+                if (!success)
+                    PROGLOG("Lock %" I64F "x not found",connectionId);
+                else
+                {
+                    StringBuffer connectionInfo;
+                    mb.read(connectionInfo);
+                    PROGLOG("Lock %" I64F "x successfully removed: %s", connectionId, connectionInfo.str());
+                }
+            }
         }
     }
 }
@@ -2978,8 +2905,14 @@ int main(int argc, char* argv[])
                         dalilocks(np>0?params.item(1):NULL,filesonly);
                     }
                     else if (stricmp(cmd,"unlock")==0) {
-                        CHECKPARAMS(1,1);
-                        unlock(params.item(1));
+                        CHECKPARAMS(2,2);
+                        const char *fileOrPath = params.item(2);
+                        if (0 == stricmp("file", fileOrPath))
+                            unlock(params.item(1), true);
+                        else if (0 == stricmp("path", fileOrPath))
+                            unlock(params.item(1), false);
+                        else
+                            throw MakeStringException(0, "unknown type [ %s ], must be 'file' or 'path'", fileOrPath);
                     }
                     else if (stricmp(cmd,"validateStore")==0) {
                         CHECKPARAMS(0,2);

+ 22 - 71
dali/dalidiag/dalidiag.cpp

@@ -26,6 +26,7 @@
 #include "danqs.hpp"
 #include "dasds.hpp"
 #include "dadfs.hpp"
+#include "dautils.hpp"
 #include "jptree.hpp"
 #include "jlzw.hpp"
 
@@ -457,73 +458,6 @@ static bool begins(const char *&ln,const char *pat)
     return false;
 }
 
-void fileLocks(const char *ip)
-{
-    StringBuffer buf;
-    getDaliDiagnosticValue("locks",buf);
-    StringBuffer line;
-    StringBuffer curfile;
-    StringBuffer ips;
-    const char *s = buf.str();
-    loop {
-        line.clear();
-        while (*s&&(*s!='\n')) 
-            line.append(*(s++));
-        if (line.length()) {
-            const char *ln = line.str();
-            if (begins(ln,"Locks on path: ")) {
-                curfile.clear();
-                if (begins(ln,"/Files")) {
-                    while (*ln&&(begins(ln,"/Scope[@name=\"")||begins(ln,"/File[@name=\""))) {
-                        if (curfile.length())
-                            curfile.append("::");
-                        while (*ln&&(*ln!='"'))
-                            curfile.append(*(ln++));
-                        if (*ln=='"')
-                            ln++;
-                        if (*ln==']')
-                            ln++;
-                    }
-                }
-            }
-            else if (isdigit(*ln)) {
-                if (curfile.length()) {
-                    ips.clear();
-                    while (*ln&&(*ln!=':'))
-                        ips.append(*(ln++));
-                    if (!ip||(strcmp(ips.str(),ip)==0)) {
-                        ips.append(*ln++);
-                        while (isdigit(*ln))
-                            ips.append(*ln++);
-                        while (*ln!='|')    
-                            ln++;
-                        ln++; // sessid start
-                        while (*ln!='|')
-                            ln++;
-                        ln++; // connectid start
-                        while (*ln!='|')
-                            ln++;
-                        ln++; // mode start
-                        unsigned mode = 0;
-                        while (isdigit(*ln))
-                            mode = mode*10+(*(ln++)-'0');
-                        while (*ln!='|')
-                            ln++;
-                        ln++; // duration start
-                        unsigned duration = 0;
-                        while (isdigit(*ln))
-                            duration = duration*10+(*(ln++)-'0');
-                        printf("%s, %d, %d, %s\n",ips.str(),mode,duration,curfile.str());
-                    }
-                }
-            }
-        }
-        if (!*s)
-            break;
-        s++;
-    }
-}
-
 // NB: there's strtoll under Linux
 static unsigned __int64 hextoll(const char *str, bool *error=NULL)
 {
@@ -671,10 +605,6 @@ int main(int _argc, char* argv[])
                     nqPingPong(argv[i+1],i+2<argc?argv[i+2]:NULL);
                     break;
                 }
-                if ((i<argc)&&(stricmp(arg,"filelocks")==0)) {
-                    fileLocks(i+1<argc?argv[i+1]:NULL);
-                    break;
-                }
                 if ((i+1<argc)&&(stricmp(arg,"unlock")==0)) {
                     MemoryBuffer mb;
                     __int64 connectionId;
@@ -743,6 +673,27 @@ int main(int _argc, char* argv[])
                     PROGLOG("SDS store saved");
                     break;
                 }
+                if (0 == stricmp(arg, "locks")) {
+                    Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks();
+                    lockInfoCollection->toString(buf);
+                    printf("\n%s:\n%s",arg,buf.str());
+                    break;
+                }
+                if (0 == stricmp(arg,"sdsstats")) {
+                    querySDS().getUsageStats(buf);
+                    printf("\n%s:\n%s",arg,buf.str());
+                    break;
+                }
+                if (0 == stricmp(arg, "connections")) {
+                    querySDS().getConnections(buf);
+                    printf("\n%s:\n%s",arg,buf.str());
+                    break;
+                }
+                if (0 == stricmp(arg, "sdssubscribers")) {
+                    querySDS().getSubscribers(buf);
+                    printf("\n%s:\n%s",arg,buf.str());
+                    break;
+                }
                 else {
                     loop {
                         getDaliDiagnosticValue(arg,buf.clear());

+ 1 - 0
system/jlib/jstring.hpp

@@ -558,6 +558,7 @@ extern jlib_decl bool endsWithIgnoreCase(const char* src, const char* dst);
 inline bool strieq(const char* s, const char* t) { return stricmp(s,t)==0; }
 inline bool streq(const char* s, const char* t) { return strcmp(s,t)==0; }
 inline bool strsame(const char* s, const char* t) { return (s == t) || (s && t && strcmp(s,t)==0); }  // also allow nulls
+inline bool isEmptyString(const char *text) { return !text||!*text; }
 
 extern jlib_decl char *j_strtok_r(char *str, const char *delim, char **saveptr);
 extern jlib_decl int j_memicmp (const void *s1, const void *s2, size32_t len);