Browse Source

Merge pull request #7785 from wangkx/h13537

HPCC-13547 Add WsSMC.LockQuery to report locks

Reviewed-By: Anthony Fishbeck <anthony.fishbeck@lexisnexis.com>
Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 9 years ago
parent
commit
231287c4e4
3 changed files with 222 additions and 0 deletions
  1. 40 0
      esp/scm/ws_smc.ecm
  2. 179 0
      esp/services/ws_smc/ws_smcService.cpp
  3. 3 0
      esp/services/ws_smc/ws_smcService.hpp

+ 40 - 0
esp/scm/ws_smc.ecm

@@ -358,6 +358,45 @@ GetStatusServerInfoResponse
     ESPstruct StatusServerInfo StatusServerInfo;
 };
 
+ESPenum LockModes : string
+{
+    ALL("ALL"),
+    READ("READ"),
+    WRITE("WRITE"),
+    HOLD("HOLD"),
+    SUB("SUB")
+};
+
+ESPStruct [nil_remove] Lock
+{
+    string EPIP;
+    string XPath;
+    string LogicalFile;
+    int64 SessionID;
+    unsigned DurationMS;
+    string TimeLocked;
+    string Modes;
+    ESParray<string> ModeNames;
+};
+
+ESPrequest [nil_remove] LockQueryRequest
+{
+    string EPIP;
+    string XPath;
+    unsigned DurationMSLow;
+    unsigned DurationMSHigh;
+    string TimeLockedLow;
+    string TimeLockedHigh;
+    ESPenum LockModes Mode;
+    bool AllFileLocks(false);
+};
+
+ESPresponse [exceptions_inline] LockQueryResponse
+{
+    ESParray<ESPstruct Lock> Locks;
+    int NumLocks;
+};
+
 ESPservice [noforms, version("1.19"), exceptions_inline("./smc_xslt/exceptions.xslt"), use_method_name] WsSMC
 {
     ESPmethod Index(SMCIndexRequest, SMCIndexResponse);
@@ -384,6 +423,7 @@ ESPservice [noforms, version("1.19"), exceptions_inline("./smc_xslt/exceptions.x
 
     ESPmethod RoxieControlCmd(RoxieControlCmdRequest, RoxieControlCmdResponse);
     ESPmethod GetStatusServerInfo(GetStatusServerInfoRequest, GetStatusServerInfoResponse);
+    ESPmethod LockQuery(LockQueryRequest, LockQueryResponse);
 };
 
 SCMexportdef(WSSMC);

+ 179 - 0
esp/services/ws_smc/ws_smcService.cpp

@@ -26,6 +26,7 @@
 #include "wshelpers.hpp"
 
 #include "dalienv.hpp"
+#include "dasds.hpp"
 #include "WUWrapper.hpp"
 #include "dfuwu.hpp"
 #include "exception_util.hpp"
@@ -2346,3 +2347,181 @@ void CWsSMCEx::setActiveWUs(IEspContext &context, IEspActiveWorkunit& wu, IEspAc
     }
 }
 
+static const char *LockModeNames[] = { "ALL", "READ", "WRITE", "HOLD", "SUB" };
+
+void CWsSMCEx::addLockInfo(CLockMetaData& lD, const char* xPath, const char* lfn, unsigned msNow, time_t ttNow, IArrayOf<IEspLock>& locks)
+{
+    Owned<IEspLock> lock = createLock();
+    if (xPath && *xPath)
+        lock->setXPath(xPath);
+    else if (lfn && *lfn)
+        lock->setLogicalFile(lfn);
+    else
+        return; //Should not happen
+    lock->setEPIP(lD.queryEp());
+    lock->setSessionID(lD.sessId);
+
+    unsigned duration = msNow-lD.timeLockObtained;
+    lock->setDurationMS(duration);
+
+    CDateTime timeLocked;
+    StringBuffer timeStr;
+    time_t ttLocked = ttNow - duration/1000;
+    timeLocked.set(ttLocked);
+    timeLocked.getString(timeStr);
+    lock->setTimeLocked(timeStr.str());
+
+    unsigned mode = lD.mode;
+    VStringBuffer modeStr("%x", mode);
+    lock->setModes(modeStr.str());
+
+    StringArray modes;
+    if (RTM_MODE(mode, RTM_LOCK_READ))
+        modes.append(LockModeNames[CLockModes_READ]);
+    if (RTM_MODE(mode, RTM_LOCK_WRITE))
+        modes.append(LockModeNames[CLockModes_WRITE]);
+    if (RTM_MODE(mode, RTM_LOCK_HOLD)) // long-term lock
+        modes.append(LockModeNames[CLockModes_HOLD]);
+    if (RTM_MODE(mode, RTM_LOCK_SUB)) // locks all descendants as well as self
+        modes.append(LockModeNames[CLockModes_SUB]);
+    lock->setModeNames(modes);
+    locks.append(*lock.getClear());
+}
+
+bool CWsSMCEx::onLockQuery(IEspContext &context, IEspLockQueryRequest &req, IEspLockQueryResponse &resp)
+{
+    class CLockPostFilter
+    {
+        CLockModes mode;
+        time_t ttLTLow, ttLTHigh;
+        bool checkLTLow, checkLTHigh;
+        int durationLow, durationHigh;
+
+        bool checkMode(unsigned lockMode)
+        {
+            unsigned modeReq;
+            switch (mode)
+            {
+            case CLockModes_READ:
+                modeReq = RTM_LOCK_READ;
+                break;
+            case CLockModes_WRITE:
+                modeReq = RTM_LOCK_WRITE;
+                break;
+            case CLockModes_HOLD:
+                modeReq = RTM_LOCK_HOLD;
+                break;
+            case CLockModes_SUB:
+                modeReq = RTM_LOCK_SUB;
+                break;
+            default:
+                return true;
+            }
+            if (lockMode & modeReq)
+                return true;
+
+            return false;
+        }
+    public:
+        CLockPostFilter(IEspLockQueryRequest& req)
+        {
+            mode = req.getMode();
+            if (mode == LockModes_Undefined)
+                throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Lock Mode");
+
+            if (req.getDurationMSLow_isNull())
+                durationLow = -1;
+            else
+                durationLow = req.getDurationMSLow();
+            if (req.getDurationMSHigh_isNull())
+                durationHigh = -1;
+            else
+                durationHigh = req.getDurationMSHigh();
+            const char* timeLow = req.getTimeLockedLow();
+            if (!timeLow || !*timeLow)
+                checkLTLow = false;
+            else
+            {
+                CDateTime dtLow;
+                dtLow.setString(timeLow, NULL, false);
+                ttLTLow = dtLow.getSimple();
+                checkLTLow = true;
+            }
+            const char* timeHigh = req.getTimeLockedHigh();
+            if (!timeHigh || !*timeHigh)
+                checkLTHigh = false;
+            else
+            {
+                CDateTime dtHigh;
+                dtHigh.setString(timeHigh, NULL, false);
+                ttLTHigh = dtHigh.getSimple();
+                checkLTHigh = true;
+            }
+        }
+        bool check(CLockMetaData& lD, unsigned msNow, time_t ttNow)
+        {
+            if (!checkMode(lD.mode))
+                return false;
+
+            int duration = msNow-lD.timeLockObtained;
+            if (durationLow > duration)
+                return false;
+            if ((durationHigh >= 0) && (durationHigh < duration))
+                return false;
+
+            if (checkLTLow && (ttNow - duration/1000 < ttLTLow))
+                return false;
+            if (checkLTHigh && (ttNow - duration/1000 > ttLTHigh))
+                return false;
+
+            return true;
+        }
+    };
+
+    try
+    {
+        CLockPostFilter postFilter(req);
+        StringBuffer xPath;
+        if (req.getAllFileLocks())
+            xPath.appendf("/%s/*", querySdsFilesRoot());
+        else
+            xPath = req.getXPath();
+
+        Owned<ILockInfoCollection> lockInfoCollection = querySDS().getLocks(req.getEPIP(), xPath.str());
+
+        IArrayOf<IEspLock> locks;
+        CDateTime time;
+        time.setNow();
+        time_t ttNow = time.getSimple();
+        unsigned msNow = msTick();
+        for (unsigned l=0; l<lockInfoCollection->queryLocks(); l++)
+        {
+            ILockInfo& lockInfo = lockInfoCollection->queryLock(l);
+
+            CDfsLogicalFileName dlfn;
+            const char* lfn = NULL;
+            const char* xPath = NULL;
+            if (dlfn.setFromXPath(lockInfo.queryXPath()))
+                lfn = dlfn.get();
+            else
+                xPath = lockInfo.queryXPath();
+            for (unsigned i=0; i<lockInfo.queryConnections(); i++)
+            {
+                CLockMetaData& lMD = lockInfo.queryLockData(i);
+                if (postFilter.check(lMD, msNow, ttNow))
+                    addLockInfo(lMD, xPath, lfn, msNow, ttNow, locks);
+            }
+        }
+        unsigned numLocks = locks.length();
+        if (numLocks)
+            resp.setLocks(locks);
+        resp.setNumLocks(numLocks);
+    }
+    catch(IException* e)
+    {
+        FORWARDEXCEPTION(context, e,  ECLWATCH_INTERNAL_ERROR);
+    }
+
+    return true;
+}
+

+ 3 - 0
esp/services/ws_smc/ws_smcService.hpp

@@ -19,6 +19,7 @@
 #define _ESPWIZ_WsSMC_HPP__
 
 #include "ws_smc_esp.ipp"
+#include "dautils.hpp"
 #include "wujobq.hpp"
 #include "TpWrapper.hpp"
 #include "WUXMLInfo.hpp"
@@ -194,6 +195,7 @@ public:
     virtual bool onBrowseResources(IEspContext &context, IEspBrowseResourcesRequest & req, IEspBrowseResourcesResponse & resp);
     virtual bool onRoxieControlCmd(IEspContext &context, IEspRoxieControlCmdRequest &req, IEspRoxieControlCmdResponse &resp);
     virtual bool onGetStatusServerInfo(IEspContext &context, IEspGetStatusServerInfoRequest &req, IEspGetStatusServerInfoResponse &resp);
+    virtual bool onLockQuery(IEspContext &context, IEspLockQueryRequest &req, IEspLockQueryResponse &resp);
 private:
     void addCapabilities( IPropertyTree* pFeatureNode, const char* access, IArrayOf<IEspCapability>& capabilities);
     void addServerJobQueue(IArrayOf<IEspServerJobQueue>& jobQueues, const char* queueName, const char* serverName,
@@ -231,6 +233,7 @@ private:
     void setActiveWUs(IEspContext &context, const char *serverType, const char *clusterName, const char *queueName, const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo);
     void setActiveWUs(IEspContext &context, const char *serverType, const char *instance, const IArrayOf<IEspActiveWorkunit>& aws, IEspStatusServerInfo& statusServerInfo);
     void setActiveWUs(IEspContext &context, IEspActiveWorkunit& wu, IEspActiveWorkunit* wuToSet);
+    void addLockInfo(CLockMetaData& lD, const char* xPath, const char* lfn, unsigned msNow, time_t ttNow, IArrayOf<IEspLock>& locks);
 };