Pārlūkot izejas kodu

HPCC-13545 Expose lock info to clients.

Previously locking information was available only en-bulk via
a single output processed by Dali and returned as a formatted
string table.
There was a couple of places (in daliadmin and dalidiag) that
worked hard to parse this formatted string, to perform
filtering.

This change allows path and ip filtering of locks to be
handled at the server, for the result to be serialized to the
requesting client and then deserialized into interrogatable
interfaces.
This will also allow clients (e.g. Eclwatch) to request and
format the locking information as they see fit.

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 10 gadi atpakaļ
vecāks
revīzija
848d3ecedd

+ 26 - 7
dali/base/dacsds.cpp

@@ -31,6 +31,7 @@
 #include "dasess.hpp"
 #include "daclient.hpp"
 #include "dadfs.hpp"
+#include "dautils.hpp"
 
 #include "dasds.ipp" // common header for client/server sds
 #include "dacsds.ipp"
@@ -1862,14 +1863,12 @@ StringBuffer &CClientSDSManager::getInfo(SdsDiagCommand cmd, StringBuffer &out)
                 case DIAG_CMD_STATS:
                     formatUsageStats(mb, out);
                     break;
-                case DIAG_CMD_LOCKINFO:
-                    size32_t sz;
-                    mb.read(sz);
-                    out.append(sz, (const char *)mb.readDirect(sz));
-                    break;
                 case DIAG_CMD_CONNECTIONS:
                     formatConnections(mb, out);
                     break;
+                case DIAG_CMD_SUBSCRIBERS:
+                    formatSubscribers(mb, out);
+                    break;
             }
             break;
         }
@@ -1882,9 +1881,29 @@ StringBuffer &CClientSDSManager::getInfo(SdsDiagCommand cmd, StringBuffer &out)
     return out;
 }
 
-StringBuffer &CClientSDSManager::getLocks(StringBuffer &out)
+ILockInfoCollection *CClientSDSManager::getLocks(const char *ipPattern, const char *xpathPattern)
 {
-    return getInfo(DIAG_CMD_LOCKINFO, out);
+    CMessageBuffer msg;
+    msg.append((int)DAMP_SDSCMD_DIAGNOSTIC);
+    msg.append((int)DIAG_CMD_LOCKINFO);
+    msg.append(ipPattern?ipPattern:"");
+    msg.append(xpathPattern?xpathPattern:"");
+
+    if (!queryCoven().sendRecv(msg, RANK_RANDOM, MPTAG_DALI_SDS_REQUEST))
+        throw MakeSDSException(SDSExcpt_FailedToCommunicateWithServer, "getLocks");
+
+    SdsReply replyMsg;
+    msg.read((int &)replyMsg);
+    switch (replyMsg)
+    {
+        case DAMP_SDSREPLY_OK:
+            break;
+        case DAMP_SDSREPLY_ERROR:
+            throwMbException("SDS Reply Error ", msg);
+        default:
+            assertex(false);
+    }
+    return deserializeLockInfoCollection(msg);
 }
 
 StringBuffer &CClientSDSManager::getUsageStats(StringBuffer &out)

+ 1 - 1
dali/base/dacsds.ipp

@@ -405,7 +405,7 @@ public:
     virtual SubscriptionId subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue=false);
     virtual void unsubscribe(SubscriptionId id);
     virtual void unsubscribeExact(SubscriptionId id);
-    virtual StringBuffer &getLocks(StringBuffer &out);
+    virtual ILockInfoCollection *getLocks(const char *ipPattern, const char *xpathPattern);
     virtual StringBuffer &getUsageStats(StringBuffer &out);
     virtual StringBuffer &getConnections(StringBuffer &out);
     virtual StringBuffer &getSubscribers(StringBuffer &out);

+ 7 - 5
dali/base/dadiags.cpp

@@ -27,6 +27,7 @@
 #include "daserver.hpp"
 #include "dasds.hpp"
 #include "dasubs.ipp"
+#include "dautils.hpp"
 #include "dadiags.hpp"
 
 #ifdef _MSC_VER
@@ -121,16 +122,17 @@ public:
                 else if (0 == stricmp(id, "mpqueue")) {
                     mb.append(getReceiveQueueDetails(buf).str());
                 }
-                else if (0 == stricmp(id, "locks")) {
-                    mb.append(querySDS().getLocks(buf).str());
+                else if (0 == stricmp(id, "locks")) { // Legacy - newer diag clients should use querySDS().getLocks() directly
+                    Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks();
+                    mb.append(lockInfoCollection->toString(buf).str());
                 }
-                else if (0 == stricmp(id, "sdsstats")) {
+                else if (0 == stricmp(id, "sdsstats")) { // Legacy - newer diag clients should use querySDS().getUsageStats() directly
                     mb.append(querySDS().getUsageStats(buf).str());
                 }
-                else if (0 == stricmp(id, "connections")) {
+                else if (0 == stricmp(id, "connections")) { // Legacy - newer diag clients should use querySDS().getConnections() directly
                     mb.append(querySDS().getConnections(buf).str());
                 }
-                else if (0 == stricmp(id, "sdssubscribers")) {
+                else if (0 == stricmp(id, "sdssubscribers")) { // Legacy - newer diag clients should use querySDS().getSubscribers() directly
                     mb.append(querySDS().getSubscribers(buf).str());
                 }
                 else if (0 == stricmp(id, "clients")) {

+ 0 - 1
dali/base/dadiags.hpp

@@ -25,7 +25,6 @@
 extern da_decl StringBuffer & getDaliDiagnosticValue(const char *name,StringBuffer &ret);
 extern da_decl MemoryBuffer & getDaliDiagnosticValue(MemoryBuffer &m);
 
-
 // for server use
 interface IDaliServer;
 extern da_decl IDaliServer *createDaliDiagnosticsServer(); // called for coven members

+ 77 - 115
dali/base/dasds.cpp

@@ -20,6 +20,7 @@
 #include "jhash.hpp"
 #include "jlib.hpp"
 #include "jfile.hpp"
+#include "jregexp.hpp"
 #include "jthread.hpp"
 #include "javahash.hpp"
 #include "javahash.tpp"
@@ -896,13 +897,6 @@ struct DebugInfo
 };
 #endif
 
-struct LockData
-{
-    unsigned mode;
-    SessionId sessId;
-    unsigned timeLockObtained;
-};
-
 class BoolSetBlock
 {
 public:
@@ -1997,7 +1991,7 @@ public:
     virtual SubscriptionId subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue=false);
     virtual void unsubscribe(SubscriptionId id);
     virtual void unsubscribeExact(SubscriptionId id);
-    virtual StringBuffer &getLocks(StringBuffer &out);
+    virtual ILockInfoCollection *getLocks(const char *ipPattern, const char *xpathPattern);
     virtual StringBuffer &getUsageStats(StringBuffer &out);
     virtual StringBuffer &getConnections(StringBuffer &out);
     virtual StringBuffer &getSubscribers(StringBuffer &out);
@@ -3172,8 +3166,8 @@ public:
 };
 
 typedef Int64Array IdPath;
-typedef MapBetween<ConnectionId, ConnectionId, LockData, LockData> ConnectionInfoMap;
 #define LOCKSESSCHECK (1000*60*5)
+
 class CLock : public CInterface, implements IInterface
 {
     DECL_NAMEDCOUNT;
@@ -3182,7 +3176,7 @@ class CLock : public CInterface, implements IInterface
     unsigned sub, readLocks, holdLocks, pending, waiting;
     IdPath idPath;
     ConnectionInfoMap connectionInfo;
-    CheckedCriticalSection crit;
+    mutable CheckedCriticalSection crit;
     Semaphore sem;
     StringAttr xpath;
     __int64 treeId;
@@ -3290,7 +3284,7 @@ class CLock : public CInterface, implements IInterface
                         {
                             waiting--;
                             StringBuffer s("Infinite timeout lock still waiting: ");
-                            getLockInfo(s);
+                            toString(s);
                             PROGLOG("%s", s.str());
                         }
                         {
@@ -3454,10 +3448,7 @@ class CLock : public CInterface, implements IInterface
         {
             if (RTM_LOCK_SUB & mode)
                 sub++;
-            LockData ld;
-            ld.mode = mode;
-            ld.sessId = sessId;
-            ld.timeLockObtained = msTick();
+            LockData ld(mode, sessId, msTick());
             connectionInfo.setValue(id, ld);
         }
         return LockSucceeded;
@@ -3621,9 +3612,9 @@ public:
             case LockFailed:
                 throw MakeSDSException(SDSExcpt_ConnectionAbsent, "Lost connection performing changeMode on connection to : %s", xpath.get());
             case LockTimedOut:
-                throw MakeSDSException(SDSExcpt_LockTimeout, "Lock timeout performing changeMode on connection to : %s, existing lock info: %s", xpath.get(), getLockInfo(s).str());
+                throw MakeSDSException(SDSExcpt_LockTimeout, "Lock timeout performing changeMode on connection to : %s, existing lock info: %s", xpath.get(), toString(s).str());
             case LockHeld:
-                throw MakeSDSException(SDSExcpt_LockHeld, "Lock is held performing changeMode on connection to : %s, existing lock info: %s", xpath.get(), getLockInfo(s).str());
+                throw MakeSDSException(SDSExcpt_LockHeld, "Lock is held performing changeMode on connection to : %s, existing lock info: %s", xpath.get(), toString(s).str());
             }
         }
     }
@@ -3640,58 +3631,12 @@ public:
         return NULL!=connectionInfo.getValue(connectionId);
     }
 
-    const char *queryXPath() const
-    {
-        return xpath;
-    }
+    const char *queryXPath() const { return xpath; }
 
-    StringBuffer &getLockInfo(StringBuffer &out)
+    ILockInfo *getLockInfo() const
     {
-        unsigned nlocks=0;
-        MemoryBuffer locks;
-        UInt64Array keys;
-        {
-            CHECKEDCRITICALBLOCK(crit, fakeCritTimeout);
-            HashIterator iter(connectionInfo);
-            ForEach(iter)
-            {
-                IMapping &imap = iter.query();
-                LockData *lD = connectionInfo.mapToValue(&imap);
-                keys.append(* ((ConnectionId *) imap.getKey()));
-                locks.append(sizeof(LockData), lD);
-                ++nlocks;
-            }
-        }
-
-        unsigned msNow = msTick();
-        out.append("Locks on path: /").append(xpath).newline();
-        out.append("Endpoint            |SessionId       |ConnectionId    |mode    |time(duration)]").newline().newline();
-        unsigned l = 0;
-        if (nlocks)
-        {
-            loop
-            {
-                LockData lD;
-                memcpy(&lD, ((const byte *)locks.toByteArray())+l*sizeof(LockData), sizeof(LockData));
-                ConnectionId connId = keys.item(l);
-
-                StringBuffer sessEpStr;
-                unsigned lockedFor = msNow-lD.timeLockObtained;
-                CDateTime time;
-                time.setNow();
-                time_t tt = time.getSimple() - (lockedFor/1000);
-                time.set(tt);
-                StringBuffer timeStr;
-                time.getString(timeStr);
-                out.appendf("%-20s|%-16" I64F "x|%-16" I64F "x|%-8x|%s(%d ms)", querySessionManager().getClientProcessEndpoint(lD.sessId, sessEpStr).str(), lD.sessId, connId, lD.mode, timeStr.str(), lockedFor);
-                ++l;
-                if (l>=nlocks)
-                    break;
-                out.newline();
-            }
-        }
-        out.newline();
-        return out;
+        CHECKEDCRITICALBLOCK(crit, fakeCritTimeout);
+        return createLockInfo(xpath, connectionInfo); // NB: doesn't resolve sessionId to Endpoint string at this point
     }
 
     void setDROLR(CServerRemoteTree *_parent, CServerRemoteTree *_child)
@@ -3706,6 +3651,11 @@ public:
         parent.set(_parent);
         child.set(_child);
     }
+    StringBuffer &toString(StringBuffer &out) const
+    {
+        Owned<ILockInfo> lockInfo = getLockInfo();
+        return lockInfo->toString(out, 0, true);
+    }
 };
 
 CPendingLockBlock::CPendingLockBlock(CLock &_lock) : lock(_lock)
@@ -3718,8 +3668,6 @@ CPendingLockBlock::~CPendingLockBlock()
     lock.removePending();
 }
 
-///////////
-
 template <> void CLockTable::onRemove(void *et)
 {
     ((CLock*)et)->Release();
@@ -3818,12 +3766,11 @@ int CSDSTransactionServer::run()
                             {
                                 case DIAG_CMD_LOCKINFO:
                                 {
-                                    StringBuffer out;
-                                    SDSManager->getLocks(out);
+                                    StringAttr ipPattern, xpathPattern;
+                                    mb.read(ipPattern).read(xpathPattern);
                                     mb.clear().append(DAMP_SDSREPLY_OK);
-                                    mb.append(out.length());
-                                    mb.append(out.length(), out.str());
-
+                                    Owned<ILockInfoCollection> lockInfoCollection = SDSManager->getLocks(ipPattern, xpathPattern);
+                                    lockInfoCollection->serialize(mb);
                                     break;
                                 }
                                 case DIAG_CMD_STATS:
@@ -7415,24 +7362,24 @@ LockStatus CCovenSDSManager::establishLock(CLock &lock, __int64 treeId, Connecti
     return res;
 }
 
-void CCovenSDSManager::lock(CServerRemoteTree &tree, const char *__xpath, ConnectionId connectionId, SessionId sessionId, unsigned mode, unsigned timeout, IUnlockCallback &callback)
+void CCovenSDSManager::lock(CServerRemoteTree &tree, const char *xpath, ConnectionId connectionId, SessionId sessionId, unsigned mode, unsigned timeout, IUnlockCallback &callback)
 {
     if (0 == ((RTM_LOCK_READ | RTM_LOCK_WRITE) & mode)) // no point in creating lock.
         return;
     CLock *lock = NULL;
-    StringAttr sxpath;
-    char *_xpath = (char *) (('/' == *__xpath) ? __xpath+1 : __xpath);
-    char *xpath;
-    if ('/' == _xpath[strlen(_xpath)-1])
-        xpath = (char *)_xpath;
-    else
+    StringBuffer sxpath;
+    if ('/' != *xpath)
     {
-        unsigned l = strlen(_xpath);
-        xpath = (char *)malloc(l+2);
-        memcpy(xpath, _xpath, l);
-        xpath[l] = '/';
-        xpath[l+1] = '\0';
-        sxpath.setown(xpath);
+        sxpath.append('/').append(xpath);
+        if ('/' != sxpath.charAt(sxpath.length()-1))
+            sxpath.append('/');
+        xpath = sxpath.str();
+    }
+    else if ('/' != xpath[strlen(xpath)-1])
+    {
+        sxpath.append(xpath);
+        sxpath.append('/');
+        xpath = sxpath.str();
     }
 
     __int64 treeId = tree.queryServerId();
@@ -7458,9 +7405,9 @@ void CCovenSDSManager::lock(CServerRemoteTree &tree, const char *__xpath, Connec
             case LockFailed:
                 throw MakeSDSException(SDSExcpt_ConnectionAbsent, "Lost connection trying to establish lock on connection to : %s", xpath);
             case LockTimedOut:
-                throw MakeSDSException(SDSExcpt_LockTimeout, "Lock timeout trying to establish lock to %s, existing lock info: %s", xpath, lock->getLockInfo(s).str());
+                throw MakeSDSException(SDSExcpt_LockTimeout, "Lock timeout trying to establish lock to %s, existing lock info: %s", xpath, lock->toString(s).str());
             case LockHeld:
-                throw MakeSDSException(SDSExcpt_LockHeld, "Lock is held trying to establish lock to %s, existing lock info: %s", xpath, lock->getLockInfo(s).str());
+                throw MakeSDSException(SDSExcpt_LockHeld, "Lock is held trying to establish lock to %s, existing lock info: %s", xpath, lock->toString(s).str());
             }
         }
     }
@@ -7597,7 +7544,7 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi
                                                     }
                                                 }
                                                 if (lock)
-                                                    lock->getLockInfo(timeoutMsg);
+                                                    lock->toString(timeoutMsg);
                                             }
                                             throw MakeSDSException(SDSExcpt_LockTimeout, "%s", timeoutMsg.str());
                                         }
@@ -7847,23 +7794,6 @@ void CCovenSDSManager::disconnect(ConnectionId id, bool deleteRoot, Owned<CLCLoc
     connection->unsubscribeSession();
 }
 
-StringBuffer &CCovenSDSManager::getLocks(StringBuffer &out)
-{
-    CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
-    SuperHashIteratorOf<CLock> iter(lockTable.queryBaseTable());
-    iter.first();
-    while (iter.isValid())
-    {
-        CLock &lock = iter.query();
-        if (lock.lockCount())
-            lock.getLockInfo(out);
-        if (!iter.next())
-            break;
-        if (out.length()) out.newline();
-    }
-    return out.length() ? out : out.append("No current locks");
-}
-
 StringBuffer &formatUsageStats(MemoryBuffer &src, StringBuffer &out)
 {
     unsigned c;
@@ -7975,6 +7905,39 @@ unsigned CCovenSDSManager::countActiveLocks()
     return activeLocks;
 }
 
+ILockInfoCollection *CCovenSDSManager::getLocks(const char *ipPattern, const char *xpathPattern)
+{
+    Owned<ILockInfoCollection> lockInfoCollection = createLockInfoCollection();
+
+    bool filteredConnections = !isEmptyString(ipPattern);
+    bool filteredXPaths = !isEmptyString(xpathPattern);
+    CLockInfoArray locks;
+    {
+        CHECKEDCRITICALBLOCK(lockCrit, fakeCritTimeout);
+        SuperHashIteratorOf<CLock> iter(lockTable.queryBaseTable());
+        ForEach(iter)
+        {
+            CLock &lock = iter.query();
+            if (lock.lockCount())
+            {
+                if (!filteredXPaths || WildMatch(lock.queryXPath(), xpathPattern))
+                    locks.append(* lock.getLockInfo());
+            }
+        }
+    }
+    if (filteredConnections)
+    {
+        ForEachItemIn(c, locks)
+        {
+            ILockInfo &lockInfo = locks.item(c);
+            lockInfo.prune(ipPattern);
+        }
+    }
+    ForEachItemIn(l, locks)
+        lockInfoCollection->add(* LINK(&locks.item(l)));
+    return lockInfoCollection.getClear();
+}
+
 MemoryBuffer &CCovenSDSManager::collectUsageStats(MemoryBuffer &out)
 {
     { CHECKEDCRITICALBLOCK(cTableCrit, fakeCritTimeout);
@@ -8026,13 +7989,6 @@ void CCovenSDSManager::blockingSave(unsigned *writeTransactions)
     SDSManager->saveStore();
 }
 
-StringBuffer &CCovenSDSManager::getUsageStats(StringBuffer &out)
-{
-    MemoryBuffer mb;
-    formatUsageStats(collectUsageStats(mb), out);
-    return out;
-}
-
 bool CCovenSDSManager::updateEnvironment(IPropertyTree *newEnv, bool forceGroupUpdate, StringBuffer &response)
 {
     Owned<IRemoteConnection> conn = querySDS().connect("/",myProcessSession(),0, INFINITE);
@@ -8066,7 +8022,6 @@ StringBuffer &CCovenSDSManager::getExternalReport(StringBuffer &out)
     return out;
 }
 
-
 StringBuffer &CCovenSDSManager::getConnections(StringBuffer &out)
 {
     MemoryBuffer mb;
@@ -8081,6 +8036,13 @@ StringBuffer &CCovenSDSManager::getSubscribers(StringBuffer &out)
     return out;
 }
 
+StringBuffer &CCovenSDSManager::getUsageStats(StringBuffer &out)
+{
+    MemoryBuffer mb;
+    formatUsageStats(collectUsageStats(mb), out);
+    return out;
+}
+
 void CCovenSDSManager::handleNodeNotify(notifications n, CServerRemoteTree &tree)
 {
     const char *handlerKey = tree.queryProp(NOTIFY_ATTR);

+ 2 - 1
dali/base/dasds.hpp

@@ -98,6 +98,7 @@ interface IRemoteConnections : extends IInterface
     virtual unsigned queryConnections() = 0;
 };
 
+interface ILockInfoCollection;
 interface ISDSManager
 {
     virtual ~ISDSManager() { }
@@ -107,7 +108,7 @@ interface ISDSManager
     virtual SubscriptionId subscribeExact(const char *xpath, ISDSNodeSubscription &notify, bool sendValue=false) = 0;
     virtual void unsubscribe(SubscriptionId id) = 0;
     virtual void unsubscribeExact(SubscriptionId id) = 0;
-    virtual StringBuffer &getLocks(StringBuffer &out) = 0;
+    virtual ILockInfoCollection *getLocks(const char *ipPattern=NULL, const char *xpathPattern=NULL) = 0;
     virtual StringBuffer &getUsageStats(StringBuffer &out) = 0;
     virtual StringBuffer &getConnections(StringBuffer &out) = 0;
     virtual StringBuffer &getSubscribers(StringBuffer &out) = 0;

+ 209 - 0
dali/base/dautils.cpp

@@ -577,6 +577,40 @@ void CDfsLogicalFileName::setForeign(const SocketEndpoint &daliep,bool checkloca
     set(str);
 }
 
+static bool begins(const char *&ln,const char *pat)
+{
+    size32_t sz = strlen(pat);
+    if (memicmp(ln,pat,sz)==0) {
+        ln += sz;
+        return true;
+    }
+    return false;
+}
+
+bool CDfsLogicalFileName::setFromXPath(const char *xpath)
+{
+    StringBuffer lName;
+    const char *s = xpath;
+    if (!begins(s, "/Files"))
+        return false;
+
+    while (*s && (begins(s, "/Scope[@name=\"") || begins(s, "/File[@name=\"") || begins(s, "/SuperFile[@name=\"")))
+    {
+        if (lName.length())
+            lName.append("::");
+        while (*s && (*s != '"'))
+            lName.append(*(s++));
+        if (*s == '"')
+            s++;
+        if (*s == ']')
+            s++;
+    }
+    if (0 == lName.length())
+        return false;
+    return setValidate(lName);
+}
+
+
 void CDfsLogicalFileName::clearForeign()
 {
     StringBuffer str;
@@ -3082,3 +3116,178 @@ bool traceAllTransactions(bool on)
     return ret;
 }
 
+class CLockInfo : public CSimpleInterfaceOf<ILockInfo>
+{
+    StringAttr xpath;
+    SafePointerArrayOf<CLockMetaData> ldInfo;
+public:
+    CLockInfo(MemoryBuffer &mb)
+    {
+        mb.read(xpath);
+        unsigned count;
+        mb.read(count);
+        if (count)
+        {
+            ldInfo.ensure(count);
+            for (unsigned c=0; c<count; c++)
+                ldInfo.append(new CLockMetaData(mb));
+        }
+    }
+    CLockInfo(const char *_xpath, const ConnectionInfoMap &map) : xpath(_xpath)
+    {
+        HashIterator iter(map);
+        ForEach(iter)
+        {
+            IMapping &imap = iter.query();
+            LockData *lD = map.mapToValue(&imap);
+            ConnectionId connId = * ((ConnectionId *) imap.getKey());
+            ldInfo.append(new CLockMetaData(*lD, connId));
+        }
+    }
+// ILockInfo impl.
+    virtual const char *queryXPath() const { return xpath; }
+    virtual unsigned queryConnections() const { return ldInfo.ordinality(); }
+    virtual CLockMetaData &queryLockData(unsigned lock) const
+    {
+        return *ldInfo.item(lock);
+    }
+    virtual void prune(const char *ipPattern)
+    {
+        StringBuffer ipStr;
+        ForEachItemInRev(c, ldInfo)
+        {
+            CLockMetaData &lD = *ldInfo.item(c);
+            SocketEndpoint ep(lD.queryEp());
+            ep.getIpText(ipStr.clear());
+            if (!WildMatch(ipStr, ipPattern))
+                ldInfo.zap(&lD);
+        }
+    }
+    virtual void serialize(MemoryBuffer &mb) const
+    {
+        mb.append(xpath);
+        mb.append(ldInfo.ordinality());
+        ForEachItemIn(c, ldInfo)
+            ldInfo.item(c)->serialize(mb);
+    }
+    virtual StringBuffer &toString(StringBuffer &out, unsigned format, bool header, const char *altText) const
+    {
+        if (ldInfo.ordinality())
+        {
+            unsigned msNow = msTick();
+            CDateTime time;
+            time.setNow();
+            time_t timeSimple = time.getSimple();
+
+            if (header)
+            {
+                switch (format)
+                {
+                    case 0: // internal
+                    {
+                        out.append("Locks on path: ").append(altText ? altText : xpath.get()).newline();
+                        out.append("Endpoint            |SessionId       |ConnectionId    |mode    |time(duration)]").newline();
+                        break;
+                    }
+                    case 1: // daliadmin
+                    {
+                        out.appendf("Server IP           |Session         |Mode    |Time                |Duration    |Lock").newline();
+                        out.appendf("====================|================|========|====================|============|======").newline();
+                        break;
+                    }
+                    default:
+                        throwUnexpected();
+                }
+            }
+
+            CDateTime timeLocked;
+            StringBuffer timeStr;
+            unsigned c = 0;
+            loop
+            {
+                CLockMetaData &lD = *ldInfo.item(c);
+                unsigned lockedFor = msNow-lD.timeLockObtained;
+                time_t tt = timeSimple - (lockedFor/1000);
+                timeLocked.set(tt);
+                timeLocked.getString(timeStr.clear());
+
+                switch (format)
+                {
+                    case 0: // internal
+                        out.appendf("%-20s|%-16" I64F "x|%-16" I64F "x|%-8x|%s(%d ms)", lD.queryEp(), lD.sessId, lD.connectionId, lD.mode, timeStr.str(), lockedFor);
+                        break;
+                    case 1: // daliadmin
+                        out.appendf("%-20s|%-16" I64F "x|%-8x|%-20s|%-12d|%s", lD.queryEp(), lD.sessId, lD.mode, timeStr.str(), lockedFor, altText ? altText : xpath.get());
+                        break;
+                    default:
+                        throwUnexpected();
+                }
+                ++c;
+                if (c>=ldInfo.ordinality())
+                    break;
+                out.newline();
+            }
+
+        }
+        return out;
+    }
+};
+
+ILockInfo *createLockInfo(const char *xpath, const ConnectionInfoMap &map)
+{
+    return new CLockInfo(xpath, map);
+}
+
+ILockInfo *deserializeLockInfo(MemoryBuffer &mb)
+{
+    return new CLockInfo(mb);
+}
+
+class CLockInfoCollection : public CSimpleInterfaceOf<ILockInfoCollection>
+{
+    CLockInfoArray locks;
+public:
+    CLockInfoCollection() { }
+    CLockInfoCollection(MemoryBuffer &mb)
+    {
+        unsigned lockCount;
+        mb.read(lockCount);
+        for (unsigned l=0; l<lockCount; l++)
+        {
+            Owned<ILockInfo> lockInfo = deserializeLockInfo(mb);
+            locks.append(* lockInfo.getClear());
+        }
+    }
+// ILockInfoCollection impl.
+    virtual unsigned queryLocks() const { return locks.ordinality(); }
+    virtual ILockInfo &queryLock(unsigned lock) const { return locks.item(lock); }
+    virtual void serialize(MemoryBuffer &mb) const
+    {
+        mb.append(locks.ordinality());
+        ForEachItemIn(l, locks)
+            locks.item(l).serialize(mb);
+    }
+    virtual StringBuffer &toString(StringBuffer &out) const
+    {
+        if (0 == locks.ordinality())
+            out.append("No current locks").newline();
+        else
+        {
+            ForEachItemIn(l, locks)
+                locks.item(l).toString(out, 0, true).newline();
+        }
+        return out;
+    }
+    virtual void add(ILockInfo &lock) { locks.append(lock); }
+};
+
+ILockInfoCollection *createLockInfoCollection()
+{
+    return new CLockInfoCollection();
+}
+
+ILockInfoCollection *deserializeLockInfoCollection(MemoryBuffer &mb)
+{
+    return new CLockInfoCollection(mb);
+}
+

+ 91 - 0
dali/base/dautils.hpp

@@ -71,6 +71,7 @@ public:
     void set(const CDfsLogicalFileName &lfn);
     void set(const char *scopes,const char *tail);
     bool setFromMask(const char *partmask,const char *rootdir=NULL);
+    bool setFromXPath(const char *xpath);
     void clear();
     bool isSet() const;
     /*
@@ -420,4 +421,94 @@ interface ILocalOrDistributedFile: extends IInterface
 
 extern da_decl ILocalOrDistributedFile* createLocalOrDistributedFile(const char *fname,IUserDescriptor *user,bool onlylocal,bool onlydfs,bool iswrite=false);
 
+typedef __int64 ConnectionId;
+
+struct LockData
+{
+    unsigned mode;
+    SessionId sessId;
+    unsigned timeLockObtained;
+
+    LockData(unsigned _mode, SessionId _sessId, unsigned _timeLockObtained) : mode(_mode), sessId(_sessId), timeLockObtained(_timeLockObtained)
+    {
+    }
+    LockData(const LockData &other)
+    {
+        mode = other.mode;
+        sessId = other.sessId;
+        timeLockObtained = other.timeLockObtained;
+    }
+    LockData(MemoryBuffer &mb)
+    {
+        mb.read(mode).read(sessId).read(timeLockObtained);
+    }
+    void serialize(MemoryBuffer &mb) const
+    {
+        mb.append(mode).append(sessId).append(timeLockObtained);
+    }
+};
+
+typedef MapBetween<ConnectionId, ConnectionId, LockData, LockData> ConnectionInfoMap;
+
+class CLockMetaData : public LockData
+{
+    mutable StringAttr ep;
+    mutable bool epResolved;
+public:
+    ConnectionId connectionId;
+
+    CLockMetaData(LockData &lD, ConnectionId _connectionId) : LockData(lD), connectionId(_connectionId)
+    {
+        epResolved = false;
+    }
+    CLockMetaData(MemoryBuffer &mb) : LockData(mb)
+    {
+        mb.read(connectionId).read(ep);
+        epResolved = true;
+    }
+    void serialize(MemoryBuffer &mb) const
+    {
+        LockData::serialize(mb);
+        mb.append(connectionId).append(queryEp());
+    }
+    const char *queryEp() const
+    {
+        if (!epResolved)
+        {
+            epResolved = true;
+            StringBuffer sessionEpStr;
+            querySessionManager().getClientProcessEndpoint(sessId, sessionEpStr);
+            ep.set(sessionEpStr);
+        }
+        return ep;
+    }
+};
+
+interface ILockInfo : extends IInterface
+{
+    virtual const char *queryXPath() const = 0;
+    virtual unsigned queryConnections() const = 0;
+    virtual CLockMetaData &queryLockData(unsigned lock) const = 0;
+    virtual void prune(const char *ipPattern) = 0;
+    virtual void serialize(MemoryBuffer &mb) const = 0;
+    virtual StringBuffer &toString(StringBuffer &out, unsigned format, bool header, const char *altText=NULL) const = 0;
+};
+
+extern da_decl ILockInfo *createLockInfo(const char *xpath, const ConnectionInfoMap &map);
+extern da_decl ILockInfo *deserializeLockInfo(MemoryBuffer &mb);
+
+typedef IArrayOf<ILockInfo> CLockInfoArray;
+typedef IIteratorOf<ILockInfo> ILockInfoIterator;
+
+interface ILockInfoCollection : extends IInterface
+{
+    virtual unsigned queryLocks() const = 0;
+    virtual ILockInfo &queryLock(unsigned lock) const = 0;
+    virtual void serialize(MemoryBuffer &mb) const = 0;
+    virtual StringBuffer &toString(StringBuffer &out) const = 0;
+    virtual void add(ILockInfo &lock) = 0;
+};
+extern da_decl ILockInfoCollection *createLockInfoCollection();
+extern da_decl ILockInfoCollection *deserializeLockInfoCollection(MemoryBuffer &mb);
+
 #endif

+ 68 - 135
dali/daliadmin/daliadmin.cpp

@@ -115,7 +115,7 @@ void usage(const char *exe)
   printf("  daliping [ <num> ]              -- time dali server connect\n");
   printf("  getxref <destxmlfile>           -- get all XREF information\n");
   printf("  dalilocks [ <ip-pattern> ] [ files ] -- get all locked files/xpaths\n");
-  printf("  unlock <xpath or logicalfile>   --  unlocks either matching xpath(s) or matching logical file(s), can contain wildcards\n");
+  printf("  unlock <xpath or logicalfile> <[path|file]> --  unlocks either matching xpath(s) or matching logical file(s), can contain wildcards\n");
   printf("  validatestore [fix=<true|false>]\n"
          "                [verbose=<true|false>]\n"
          "                [deletefiles=<true|false>]-- perform some checks on dali meta data an optionally fix or remove redundant info \n");
@@ -2271,144 +2271,71 @@ static bool begins(const char *&ln,const char *pat)
     return false;
 }
 
-static void dodalilocks(const char *pattern,const char *obj,Int64Array *conn,bool filesonly)
-{
-    StringBuffer buf;
-    getDaliDiagnosticValue("locks",buf);
-    for (int pass=(filesonly?1:0);pass<2;pass++) {
-        bool headerdone = false;
-        StringBuffer line;
-        StringBuffer curfile;
-        StringBuffer curxpath;
-        StringBuffer ips;
-        StringBuffer times;
-        StringBuffer sessid;
-        StringBuffer connid;
-        const char *s = buf.str();
-        loop {
-            line.clear();
-            while (*s&&(*s!='\n')) 
-                line.append(*(s++));
-            if (line.length()) {
-                const char *ln = line.str();
-                if (begins(ln,"Locks on path: ")) {
-                    curfile.clear();
-                    curxpath.clear();
-                    const char *x = ln;
-                    while (*x)
-                        curxpath.append(*(x++));
-                    if (begins(ln,"/Files")) {
-                        while (*ln&&(begins(ln,"/Scope[@name=\"")||begins(ln,"/File[@name=\"")||begins(ln,"/SuperFile[@name=\""))) {
-                            if (curfile.length())
-                                curfile.append("::");
-                            while (*ln&&(*ln!='"'))
-                                curfile.append(*(ln++));
-                            if (*ln=='"')
-                                ln++;
-                            if (*ln==']')
-                                ln++;
-                        }
-                    }
-                }
-                else if (isdigit(*ln)) {
-                    if (obj) 
-                        if (!curxpath.length()||!WildMatch(curxpath.str(),obj)) 
-                            if (!curfile.length()||!WildMatch(curfile.str(),obj)) 
-                                continue;
-                    if ((curfile.length()!=0)==(pass==1)) {
-                        ips.clear();
-                        while (*ln&&(*ln!=':'))
-                            ips.append(*(ln++));
-                        if (!pattern||WildMatch(ips.str(),pattern)) {
-                            ips.append(*ln++);
-                            while (isdigit(*ln))
-                                ips.append(*ln++);
-                            while (*ln!='|')    
-                                ln++;
-                            ln++; // sessid start
-                            sessid.clear();
-                            while (*ln!='|') {
-                                sessid.append(*ln);
-                                ln++;
-                            }
-                            sessid.clip();
-                            ln++; // connectid start
-                            connid.clear();
-                            while (*ln!='|') {
-                                connid.append(*ln);
-                                ln++;
-                            }
-                            connid.clip();
-                            ln++; // mode start
-                            unsigned mode = 0;
-                            while (isdigit(*ln))
-                                mode = mode*10+(*(ln++)-'0');
-                            while (*ln!='|')
-                                ln++;
-                            ln++; // duration start
-                            times.clear();
-                            while (*ln&&(*ln!='('))
-                                times.append(*(ln++));
-                            ln++;
-                            unsigned duration = 0;
-                            while (isdigit(*ln))
-                                duration = duration*10+(*(ln++)-'0');
-                            if (conn) {
-                                bool err;
-                                __int64 c = (__int64)hextoll(connid.str(),err);
-                                if (!err) {
-                                    bool found = false;
-                                    ForEachItemIn(i,*conn) 
-                                        if (c==conn->item(i))
-                                            found = true;
-                                    if (!found)
-                                        conn->append(c);
-                                }
-                            }
-                            else {
-                                if (!headerdone) {
-                                    OUTLOG( "\nServer IP         , session   ,mode, time              ,duration ,%s",pass?"File":"XPath");
-                                    OUTLOG(   "==================,===========,===,====================,=========,=====");
-                                    headerdone = true;
-                                }
-                                OUTLOG("%s, %s, %d, %s, %d, %s",ips.str(),sessid.str(),mode,times.str(),duration,pass?curfile.str():curxpath.str());
-                            }
-                        }
-                    }
-                }
-            }
-            if (!*s)
-                break;
-            s++;
+static void dalilocks(const char *ipPattern, bool files)
+{
+    Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks(ipPattern, files ? "/Files/*" : NULL);
+    bool headers = true;
+    CDfsLogicalFileName dlfn;
+    for (unsigned l=0; l<lockInfoCollection->queryLocks(); l++)
+    {
+        ILockInfo &lockInfo = lockInfoCollection->queryLock(l);
+        if (files)
+        {
+            if (!dlfn.setFromXPath(lockInfo.queryXPath()))
+                continue;
         }
+        if (0 == lockInfo.queryConnections())
+            continue;
+        StringBuffer lockFormat;
+        lockInfo.toString(lockFormat, 1, headers, files ? dlfn.get() : NULL);
+        headers = false;
+        PROGLOG("%s", lockFormat.str());
+    }
+    if (headers) // if still true, no locks matched
+    {
+        printf("No lock(s) found\n");
+        return;
     }
-}
-
-static void dalilocks(const char *pattern,bool fileonly)
-{
-    dodalilocks(pattern,NULL,NULL,fileonly);
 }
 
 //=============================================================================
 
-static void unlock(const char *pattern)
-{
-    Int64Array conn;
-    dodalilocks(NULL,pattern,&conn,false);
-    ForEachItemIn(i,conn) {
-        MemoryBuffer mb;
-        __int64 connectionId = conn.item(i);
-        bool success;
-        bool disconnect = false;        // TBD?
-        mb.append("unlock").append(connectionId).append(disconnect);
-        getDaliDiagnosticValue(mb);
-        mb.read(success);
-        StringBuffer connectionInfo;
-        if (!success)
-            PROGLOG("Lock %" I64F "x not found",connectionId);
-        else {
-            mb.read(connectionInfo);
-            PROGLOG("Lock %" I64F "x successfully removed: %s", connectionId, connectionInfo.str());
+static void unlock(const char *pattern, bool files)
+{
+    Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks(NULL, files ? "/Files/*" : pattern);
+    for (unsigned l=0; l<lockInfoCollection->queryLocks(); l++)
+    {
+        ILockInfo &lockInfo = lockInfoCollection->queryLock(l);
+        bool match = false;
+        if (files)
+        {
+            CDfsLogicalFileName dlfn;
+            dlfn.setAllowWild(true);
+            if (dlfn.setFromXPath(lockInfo.queryXPath()))
+                match = WildMatch(dlfn.get(), pattern);
+        }
+        else
+            match = WildMatch(lockInfo.queryXPath(), pattern);
+        if (match)
+        {
+            for (unsigned c=0; c<lockInfo.queryConnections(); c++)
+            {
+                ConnectionId connectionId = lockInfo.queryLockData(c).connectionId;
+                bool disconnect = false;        // TBD?
+                MemoryBuffer mb;
+                mb.append("unlock").append(connectionId).append(disconnect);
+                getDaliDiagnosticValue(mb);
+                bool success;
+                mb.read(success);
+                if (!success)
+                    PROGLOG("Lock %" I64F "x not found",connectionId);
+                else
+                {
+                    StringBuffer connectionInfo;
+                    mb.read(connectionInfo);
+                    PROGLOG("Lock %" I64F "x successfully removed: %s", connectionId, connectionInfo.str());
+                }
+            }
         }
     }
 }
@@ -2978,8 +2905,14 @@ int main(int argc, char* argv[])
                         dalilocks(np>0?params.item(1):NULL,filesonly);
                     }
                     else if (stricmp(cmd,"unlock")==0) {
-                        CHECKPARAMS(1,1);
-                        unlock(params.item(1));
+                        CHECKPARAMS(2,2);
+                        const char *fileOrPath = params.item(2);
+                        if (0 == stricmp("file", fileOrPath))
+                            unlock(params.item(1), true);
+                        else if (0 == stricmp("path", fileOrPath))
+                            unlock(params.item(1), false);
+                        else
+                            throw MakeStringException(0, "unknown type [ %s ], must be 'file' or 'path'", fileOrPath);
                     }
                     else if (stricmp(cmd,"validateStore")==0) {
                         CHECKPARAMS(0,2);

+ 22 - 71
dali/dalidiag/dalidiag.cpp

@@ -26,6 +26,7 @@
 #include "danqs.hpp"
 #include "dasds.hpp"
 #include "dadfs.hpp"
+#include "dautils.hpp"
 #include "jptree.hpp"
 #include "jlzw.hpp"
 
@@ -457,73 +458,6 @@ static bool begins(const char *&ln,const char *pat)
     return false;
 }
 
-void fileLocks(const char *ip)
-{
-    StringBuffer buf;
-    getDaliDiagnosticValue("locks",buf);
-    StringBuffer line;
-    StringBuffer curfile;
-    StringBuffer ips;
-    const char *s = buf.str();
-    loop {
-        line.clear();
-        while (*s&&(*s!='\n')) 
-            line.append(*(s++));
-        if (line.length()) {
-            const char *ln = line.str();
-            if (begins(ln,"Locks on path: ")) {
-                curfile.clear();
-                if (begins(ln,"/Files")) {
-                    while (*ln&&(begins(ln,"/Scope[@name=\"")||begins(ln,"/File[@name=\""))) {
-                        if (curfile.length())
-                            curfile.append("::");
-                        while (*ln&&(*ln!='"'))
-                            curfile.append(*(ln++));
-                        if (*ln=='"')
-                            ln++;
-                        if (*ln==']')
-                            ln++;
-                    }
-                }
-            }
-            else if (isdigit(*ln)) {
-                if (curfile.length()) {
-                    ips.clear();
-                    while (*ln&&(*ln!=':'))
-                        ips.append(*(ln++));
-                    if (!ip||(strcmp(ips.str(),ip)==0)) {
-                        ips.append(*ln++);
-                        while (isdigit(*ln))
-                            ips.append(*ln++);
-                        while (*ln!='|')    
-                            ln++;
-                        ln++; // sessid start
-                        while (*ln!='|')
-                            ln++;
-                        ln++; // connectid start
-                        while (*ln!='|')
-                            ln++;
-                        ln++; // mode start
-                        unsigned mode = 0;
-                        while (isdigit(*ln))
-                            mode = mode*10+(*(ln++)-'0');
-                        while (*ln!='|')
-                            ln++;
-                        ln++; // duration start
-                        unsigned duration = 0;
-                        while (isdigit(*ln))
-                            duration = duration*10+(*(ln++)-'0');
-                        printf("%s, %d, %d, %s\n",ips.str(),mode,duration,curfile.str());
-                    }
-                }
-            }
-        }
-        if (!*s)
-            break;
-        s++;
-    }
-}
-
 // NB: there's strtoll under Linux
 static unsigned __int64 hextoll(const char *str, bool *error=NULL)
 {
@@ -671,10 +605,6 @@ int main(int _argc, char* argv[])
                     nqPingPong(argv[i+1],i+2<argc?argv[i+2]:NULL);
                     break;
                 }
-                if ((i<argc)&&(stricmp(arg,"filelocks")==0)) {
-                    fileLocks(i+1<argc?argv[i+1]:NULL);
-                    break;
-                }
                 if ((i+1<argc)&&(stricmp(arg,"unlock")==0)) {
                     MemoryBuffer mb;
                     __int64 connectionId;
@@ -743,6 +673,27 @@ int main(int _argc, char* argv[])
                     PROGLOG("SDS store saved");
                     break;
                 }
+                if (0 == stricmp(arg, "locks")) {
+                    Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks();
+                    lockInfoCollection->toString(buf);
+                    printf("\n%s:\n%s",arg,buf.str());
+                    break;
+                }
+                if (0 == stricmp(arg,"sdsstats")) {
+                    querySDS().getUsageStats(buf);
+                    printf("\n%s:\n%s",arg,buf.str());
+                    break;
+                }
+                if (0 == stricmp(arg, "connections")) {
+                    querySDS().getConnections(buf);
+                    printf("\n%s:\n%s",arg,buf.str());
+                    break;
+                }
+                if (0 == stricmp(arg, "sdssubscribers")) {
+                    querySDS().getSubscribers(buf);
+                    printf("\n%s:\n%s",arg,buf.str());
+                    break;
+                }
                 else {
                     loop {
                         getDaliDiagnosticValue(arg,buf.clear());

+ 1 - 0
system/jlib/jstring.hpp

@@ -558,6 +558,7 @@ extern jlib_decl bool endsWithIgnoreCase(const char* src, const char* dst);
 inline bool strieq(const char* s, const char* t) { return stricmp(s,t)==0; }
 inline bool streq(const char* s, const char* t) { return strcmp(s,t)==0; }
 inline bool strsame(const char* s, const char* t) { return (s == t) || (s && t && strcmp(s,t)==0); }  // also allow nulls
+inline bool isEmptyString(const char *text) { return !text||!*text; }
 
 extern jlib_decl char *j_strtok_r(char *str, const char *delim, char **saveptr);
 extern jlib_decl int j_memicmp (const void *s1, const void *s2, size32_t len);