|
@@ -2933,7 +2933,7 @@ inline void appendErr3(MemoryBuffer &reply, RemoteFileCommandType e, int code, c
|
|
|
|
|
|
|
|
|
|
|
|
-#define MAPCOMMAND(c,p) case c: { ret = this->p(msg, reply) ; break; }
|
|
|
+#define MAPCOMMAND(c,p) case c: { ret = this->p(msg, reply) ; break; }
|
|
|
#define MAPCOMMANDCLIENT(c,p,client) case c: { ret = this->p(msg, reply, client); break; }
|
|
|
#define MAPCOMMANDCLIENTTHROTTLE(c,p,client,throttler) case c: { ret = this->p(msg, reply, client, throttler); break; }
|
|
|
#define MAPCOMMANDSTATS(c,p,stats) case c: { ret = this->p(msg, reply, stats); break; }
|
|
@@ -2946,39 +2946,38 @@ static CriticalSection ClientCountSect;
|
|
|
#define DEFAULT_THROTTLOG_LOG_INTERVAL_SECS 60 // log total throttled delay period
|
|
|
|
|
|
|
|
|
-struct ClientStats
|
|
|
+class CClientStats : public CInterface
|
|
|
{
|
|
|
- ClientStats(const char *_client) : client(_client) { count = 0; bRead = 0; bWritten = 0; }
|
|
|
+ SpinLock spin;
|
|
|
+public:
|
|
|
+ CClientStats(const char *_client) : client(_client) { count = 0; bRead = 0; bWritten = 0; }
|
|
|
const char *queryFindString() const { return client; }
|
|
|
+ inline void addRead(unsigned len)
|
|
|
+ {
|
|
|
+ SpinBlock b(spin); // rare event, but we should change to a atomic<__int64> for >= c++11
|
|
|
+ bRead += len;
|
|
|
+ }
|
|
|
+ inline void addWrite(unsigned len)
|
|
|
+ {
|
|
|
+ SpinBlock b(spin); // rare event, but we should change to a atomic<__int64> for >= c++11
|
|
|
+ bWritten += len;
|
|
|
+ }
|
|
|
|
|
|
StringAttr client;
|
|
|
unsigned __int64 count;
|
|
|
unsigned __int64 bRead;
|
|
|
unsigned __int64 bWritten;
|
|
|
};
|
|
|
-class CClientStatsTable : public StringSuperHashTableOf<ClientStats>
|
|
|
+class CClientStatsTable : public OwningStringSuperHashTableOf<CClientStats>
|
|
|
{
|
|
|
- typedef StringSuperHashTableOf<ClientStats> PARENT;
|
|
|
+ typedef OwningStringSuperHashTableOf<CClientStats> PARENT;
|
|
|
CriticalSection crit;
|
|
|
unsigned cmdStats[RFCmax];
|
|
|
|
|
|
- inline ClientStats *addClientCommon(RemoteFileCommandType cmd, const char *client)
|
|
|
- {
|
|
|
- ClientStats *stats = PARENT::find(client);
|
|
|
- if (!stats)
|
|
|
- {
|
|
|
- stats = new ClientStats(client);
|
|
|
- PARENT::replace(*stats);
|
|
|
- }
|
|
|
- cmdStats[cmd]++;
|
|
|
- ++stats->count;
|
|
|
- return stats;
|
|
|
- }
|
|
|
-
|
|
|
static int compareElement(void* const *ll, void* const *rr)
|
|
|
{
|
|
|
- const ClientStats *l = (const ClientStats *) *ll;
|
|
|
- const ClientStats *r = (const ClientStats *) *rr;
|
|
|
+ const CClientStats *l = (const CClientStats *) *ll;
|
|
|
+ const CClientStats *r = (const CClientStats *) *rr;
|
|
|
if (l->count == r->count)
|
|
|
return 0;
|
|
|
else if (l->count<r->count)
|
|
@@ -2995,30 +2994,19 @@ public:
|
|
|
{
|
|
|
kill();
|
|
|
}
|
|
|
- void kill()
|
|
|
+ CClientStats *getClientReference(RemoteFileCommandType cmd, const char *client)
|
|
|
{
|
|
|
- SuperHashIteratorOf<ClientStats> iter(*this);
|
|
|
- ForEach(iter)
|
|
|
+ CriticalBlock b(crit);
|
|
|
+ CClientStats *stats = PARENT::find(client);
|
|
|
+ if (!stats)
|
|
|
{
|
|
|
- ClientStats *elem = &iter.query();
|
|
|
- delete elem;
|
|
|
+ stats = new CClientStats(client);
|
|
|
+ PARENT::replace(*stats);
|
|
|
}
|
|
|
- PARENT::kill();
|
|
|
- }
|
|
|
- ClientStats *addClientReference(RemoteFileCommandType cmd, const char *client)
|
|
|
- {
|
|
|
- CriticalBlock b(crit);
|
|
|
- return addClientCommon(cmd, client);
|
|
|
- }
|
|
|
- void addRead(ClientStats &stats, unsigned len)
|
|
|
- {
|
|
|
- CriticalBlock b(crit);
|
|
|
- stats.bRead += len;
|
|
|
- }
|
|
|
- void addWrite(ClientStats &stats, unsigned len)
|
|
|
- {
|
|
|
- CriticalBlock b(crit);
|
|
|
- stats.bWritten += len;
|
|
|
+ if (cmd<RFCmax) // i.e. ignore duff command (which will be traced), but still record client connected
|
|
|
+ cmdStats[cmd]++;
|
|
|
+ ++stats->count;
|
|
|
+ return LINK(stats);
|
|
|
}
|
|
|
StringBuffer &getInfo(StringBuffer &info, unsigned level=1)
|
|
|
{
|
|
@@ -3040,11 +3028,11 @@ public:
|
|
|
}
|
|
|
if (totalClients)
|
|
|
{
|
|
|
- SuperHashIteratorOf<ClientStats> iter(*this);
|
|
|
- PointerArrayOf<ClientStats> elements;
|
|
|
+ SuperHashIteratorOf<CClientStats> iter(*this);
|
|
|
+ PointerArrayOf<CClientStats> elements;
|
|
|
ForEach(iter)
|
|
|
{
|
|
|
- ClientStats &elem = iter.query();
|
|
|
+ CClientStats &elem = iter.query();
|
|
|
elements.append(&elem);
|
|
|
}
|
|
|
elements.sort(&compareElement);
|
|
@@ -3057,7 +3045,7 @@ public:
|
|
|
info.append("Top 10 clients:").newline();
|
|
|
for (unsigned e=0; e<max; e++)
|
|
|
{
|
|
|
- const ClientStats &element = *elements.item(e);
|
|
|
+ const CClientStats &element = *elements.item(e);
|
|
|
info.appendf("Client %s - %" I64F "d requests handled, bytes read = %" I64F "d, bytes written = % " I64F "d",
|
|
|
element.client.get(), element.count, element.bRead, element.bWritten).newline();
|
|
|
}
|
|
@@ -3067,7 +3055,7 @@ public:
|
|
|
info.append("All clients:").newline();
|
|
|
ForEachItemIn(e, elements)
|
|
|
{
|
|
|
- const ClientStats &element = *elements.item(e);
|
|
|
+ const CClientStats &element = *elements.item(e);
|
|
|
info.appendf("Client %s - %" I64F "d requests handled, bytes read = %" I64F "d, bytes written = % " I64F "d",
|
|
|
element.client.get(), element.count, element.bRead, element.bWritten).newline();
|
|
|
}
|
|
@@ -3111,25 +3099,32 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
|
|
|
: socket(_socket), user(_user), globallasttick(_globallasttick)
|
|
|
{
|
|
|
previdx = (unsigned)-1;
|
|
|
- if (socket)
|
|
|
+ StringBuffer peerBuf;
|
|
|
+ char name[256];
|
|
|
+ name[0] = 0;
|
|
|
+ int port = socket->peer_name(name,sizeof(name)-1);
|
|
|
+ if (port>=0)
|
|
|
{
|
|
|
- StringBuffer peerBuf;
|
|
|
- char name[256];
|
|
|
- name[0] = 0;
|
|
|
- int port = socket->peer_name(name,sizeof(name)-1);
|
|
|
- if (port>=0)
|
|
|
- {
|
|
|
- peerBuf.append(name);
|
|
|
- if (port)
|
|
|
- peerBuf.append(':').append(port);
|
|
|
- peerName.set(peerBuf);
|
|
|
- }
|
|
|
+ peerBuf.append(name);
|
|
|
+ if (port)
|
|
|
+ peerBuf.append(':').append(port);
|
|
|
+ peerName.set(peerBuf);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ /* There's a possibility the socket closed before got here, in which case, peer name is unavailable
|
|
|
+ * May potentially be unavailable for other reasons also.
|
|
|
+ * Must be set, as used in client stats HT.
|
|
|
+ * If socket closed, the handler will start up but notice closed and quit
|
|
|
+ */
|
|
|
+ peerName.set("UNKNOWN PEER NAME");
|
|
|
}
|
|
|
{
|
|
|
CriticalBlock block(ClientCountSect);
|
|
|
if (++ClientCount>MaxClientCount)
|
|
|
MaxClientCount = ClientCount;
|
|
|
- if (TF_TRACE_CLIENT_CONN) {
|
|
|
+ if (TF_TRACE_CLIENT_CONN)
|
|
|
+ {
|
|
|
StringBuffer s;
|
|
|
s.appendf("Connecting(%p) [%d,%d] to ",this,ClientCount,MaxClientCount);
|
|
|
s.append(peerName);
|
|
@@ -3352,6 +3347,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
|
|
|
*/
|
|
|
selecthandled = true;
|
|
|
parent->addClient(this); // add to select handler
|
|
|
+ // NB: this (CRemoteClientHandler) is now linked by the selecthandler and owned by the 'clients' list
|
|
|
}
|
|
|
}
|
|
|
|
|
@@ -3456,7 +3452,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
|
|
|
unsigned queryQueueLimit() const { return queueLimit; }
|
|
|
StringBuffer &getInfoSummary(StringBuffer &info)
|
|
|
{
|
|
|
- info.appendf("Thottler(%s) - limit=%u, delayMs=%u, cpuThreshold=%u, queueLimit=%u", title.get(), limit, delayMs, cpuThreshold, queueLimit).newline();
|
|
|
+ info.appendf("Throttler(%s) - limit=%u, delayMs=%u, cpuThreshold=%u, queueLimit=%u", title.get(), limit, delayMs, cpuThreshold, queueLimit).newline();
|
|
|
unsigned elapsedSecs = totalThrottleDelayTimer.elapsedMs()/1000;
|
|
|
time_t simple;
|
|
|
time(&simple);
|
|
@@ -3593,10 +3589,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
|
|
|
* NB: The overall number of threads is still capped by the thread pool.
|
|
|
*/
|
|
|
unsigned ms = timer.elapsedMs();
|
|
|
- {
|
|
|
- CriticalBlock b(crit);
|
|
|
- totalThrottleDelay += ms;
|
|
|
- }
|
|
|
+ totalThrottleDelay += ms;
|
|
|
PROGLOG("Throttler(%s): transaction delayed [cmd=%s] for : %u milliseconds, proceeding as cpu(%u)<throttleCPULimit(%u)", title.get(), getRFCText(cmd), cpu, ms, cpuThreshold);
|
|
|
hadSem = false;
|
|
|
}
|
|
@@ -3623,7 +3616,7 @@ class CRemoteFileServer : public CInterface, implements IRemoteFileServer
|
|
|
|
|
|
/* Whilst holding on this throttle slot (i.e. before signalling semaphore back), process
|
|
|
* queued items. NB: other threads that are finishing will do also.
|
|
|
- * Queued items are processed 1st, the current request, then anything that was queued when handling current request
|
|
|
+ * Queued items are processed 1st, then the current request, then anything that was queued when handling current request
|
|
|
* Throttle slot (semaphore) is only given back when no more to do.
|
|
|
*/
|
|
|
Linked<CRemoteClientHandler> currentClient;
|
|
@@ -3929,8 +3922,14 @@ public:
|
|
|
StringBuffer s(client->queryPeerName());
|
|
|
PROGLOG("onCloseSocket(%d) %s",which,s.str());
|
|
|
#endif
|
|
|
- if (client->socket) {
|
|
|
- try {
|
|
|
+ if (client->socket)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ /* JCSMORE - shouldn't this really be dependent on whether selecthandled=true
|
|
|
+ * It has not been added to the selecthandler
|
|
|
+ * Harmless, but wasteful if so.
|
|
|
+ */
|
|
|
selecthandler->remove(client->socket);
|
|
|
}
|
|
|
catch (IException *e) {
|
|
@@ -4018,7 +4017,7 @@ public:
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- bool cmdRead(MemoryBuffer & msg, MemoryBuffer & reply, ClientStats &stats)
|
|
|
+ bool cmdRead(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
|
|
|
{
|
|
|
int handle;
|
|
|
__int64 pos;
|
|
@@ -4052,7 +4051,7 @@ public:
|
|
|
e->Release();
|
|
|
return false;
|
|
|
}
|
|
|
- clientStatsTable.addRead(stats, len);
|
|
|
+ stats.addRead(len);
|
|
|
if (TF_TRACE)
|
|
|
PROGLOG("read file, handle = %d, pos = %" I64F "d, toread = %d, read = %d",handle,pos,len,numRead);
|
|
|
{
|
|
@@ -4092,7 +4091,7 @@ public:
|
|
|
}
|
|
|
|
|
|
|
|
|
- bool cmdWrite(MemoryBuffer & msg, MemoryBuffer & reply, ClientStats &stats)
|
|
|
+ bool cmdWrite(MemoryBuffer & msg, MemoryBuffer & reply, CClientStats &stats)
|
|
|
{
|
|
|
int handle;
|
|
|
__int64 pos;
|
|
@@ -4106,7 +4105,7 @@ public:
|
|
|
if (TF_TRACE_PRE_IO)
|
|
|
PROGLOG("before write file, handle = %d, towrite = %d",handle,len);
|
|
|
size32_t numWritten = fileio->write(pos,len,data);
|
|
|
- clientStatsTable.addWrite(stats, numWritten);
|
|
|
+ stats.addWrite(numWritten);
|
|
|
if (TF_TRACE)
|
|
|
PROGLOG("write file, handle = %d, towrite = %d, written = %d",handle,len,numWritten);
|
|
|
reply.append((unsigned)RFEnoerror).append(numWritten);
|
|
@@ -4198,7 +4197,7 @@ public:
|
|
|
return true;
|
|
|
}
|
|
|
|
|
|
- bool cmdAppend(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, ClientStats &stats)
|
|
|
+ bool cmdAppend(MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler &client, CClientStats &stats)
|
|
|
{
|
|
|
IMPERSONATE_USER(client);
|
|
|
int handle;
|
|
@@ -4212,7 +4211,7 @@ public:
|
|
|
|
|
|
Owned<IFile> file = createIFile(srcname.get());
|
|
|
__int64 written = fileio->appendFile(file,pos,len);
|
|
|
- clientStatsTable.addWrite(stats, written);
|
|
|
+ stats.addWrite(written);
|
|
|
if (TF_TRACE)
|
|
|
PROGLOG("append file, handle = %d, file=%s, pos = %" I64F "d len = %" I64F "d written = %" I64F "d",handle,srcname.get(),pos,len,written);
|
|
|
reply.append((unsigned)RFEnoerror).append(written);
|
|
@@ -4854,7 +4853,7 @@ public:
|
|
|
|
|
|
bool processCommand(RemoteFileCommandType cmd, MemoryBuffer & msg, MemoryBuffer & reply, CRemoteClientHandler *client, CThrottler *throttler)
|
|
|
{
|
|
|
- ClientStats *stats = clientStatsTable.addClientReference(cmd, client->queryPeerName());
|
|
|
+ Owned<CClientStats> stats = clientStatsTable.getClientReference(cmd, client->queryPeerName());
|
|
|
bool ret = true;
|
|
|
try
|
|
|
{
|