/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #include #include #include "jmisc.hpp" #include "jlib.hpp" #include "jsocket.hpp" #include "jstream.ipp" #include "portlist.h" #include "jdebug.hpp" #include "jthread.hpp" #include "jfile.hpp" bool abortEarly = false; bool forceHTTP = false; bool abortAfterFirst = false; bool echoResults = false; bool saveResults = true; bool showTiming = false; bool showStatus = true; bool sendToSocket = false; bool parallelBlocked = false; bool justResults = false; bool multiThread = false; bool manyResults = false; bool sendFileAfterQuery = false; bool doLock = false; bool roxieLogMode = false; StringBuffer sendFileName; StringAttr queryNameOverride; unsigned delay = 0; unsigned runningQueries; unsigned multiThreadMax; unsigned maxLineSize = 10000000; ISocket *persistSocket; bool persistConnections; int repeats = 0; StringBuffer queryPrefix; Semaphore okToSend; Semaphore done; Semaphore finishedReading; FILE * trace; CriticalSection traceCrit; //--------------------------------------------------------------------------- void SplitIpPort(StringAttr & ip, unsigned & port, const char * address) { const char * colon = strchr(address, ':'); if (colon) { ip.set(address,colon-address); port = atoi(colon+1); } else ip.set(address); } void showMessage(const char * text) { if (!justResults) { if (echoResults) fwrite(text, strlen(text), 1, stdout); if (saveResults) fwrite(text, strlen(text), 1, trace); } } void sendFile(const char * filename, ISocket * socket) { FILE *in = fopen(filename, "rb"); unsigned size = 0; void * buff = NULL; if (in) { fseek(in, 0, SEEK_END); size = ftell(in); fseek(in, 0, SEEK_SET); buff = malloc(size); size_t numRead = fread(buff, 1, size, in); fclose(in); if (numRead != size) { printf("read from file %s failed (%u/%u)\n", filename, (unsigned)numRead, size); size = 0; } } else printf("read from file %s failed\n", filename); unsigned dllLen = size; _WINREV(dllLen); socket->write(&dllLen, sizeof(dllLen)); socket->write(buff, size); free(buff); } #define CHUNK_SIZE 152*2000 void sendFileChunk(const char * filename, offset_t offset, ISocket * socket) { FILE *in = fopen(filename, "rb"); unsigned size = 0; void * buff = NULL; if (in) { fseek(in, 0, SEEK_END); size = ftell(in); fseek(in, offset, SEEK_SET); if (size < offset) size = 0; else size -= offset; if (size > CHUNK_SIZE) size = CHUNK_SIZE; buff = malloc(size); size_t numRead = fread(buff, 1, size, in); fclose(in); if (numRead != size) { printf("read from file %s failed (%u/%u)\n", filename, (unsigned)numRead, size); size = 0; } } else printf("read from file %s failed\n", filename); if (size > 0) { MemoryBuffer sendBuffer; unsigned rev = size + strlen(filename) + 10; rev |= 0x80000000; _WINREV(rev); sendBuffer.append(rev); sendBuffer.append('R'); rev = 0; // should put the sequence number here _WINREV(rev); sendBuffer.append(rev); rev = 0; // should put the # of recs in msg here _WINREV(rev); sendBuffer.append(rev); sendBuffer.append(strlen(filename)+1, filename); sendBuffer.append(size, buff); socket->write(sendBuffer.toByteArray(), sendBuffer.length()); } else { unsigned zeroLen = 0; socket->write(&zeroLen, sizeof(zeroLen)); } free(buff); } int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &result) { if (readBlocked) socket->set_block_mode(BF_SYNC_TRANSFER_PULL,0,60*1000); unsigned len; bool is_status; bool isBlockedResult; for (;;) { if (delay) MilliSleep(delay); is_status = false; isBlockedResult = false; try { if (useHTTP) len = 0x10000; else if (readBlocked) len = socket->receive_block_size(); else { socket->read(&len, sizeof(len)); _WINREV(len); } } catch(IException * e) { if (manyResults) showMessage("End of result multiple set\n"); else pexception("failed to read len data", e); e->Release(); return 1; } if (len == 0) { if (manyResults) { showMessage("----End of result set----\n"); continue; } break; } bool isSpecial = false; bool pluginRequest = false; bool dataBlockRequest = false; if (len & 0x80000000) { unsigned char flag; isSpecial = true; socket->read(&flag, sizeof(flag)); switch (flag) { case '-': if (echoResults) fputs("Error:", stdout); if (saveResults) fputs("Error:", trace); break; case 'D': showMessage("request for datablock\n"); dataBlockRequest = true; break; case 'P': showMessage("request for plugin\n"); pluginRequest = true; break; case 'S': if (showStatus) showMessage("Status:"); is_status=true; break; case 'T': showMessage("Timing:\n"); break; case 'X': showMessage("---Compound query finished---\n"); return 1; case 'R': isBlockedResult = true; break; } len &= 0x7FFFFFFF; len--; // flag already read } char * mem = (char*) malloc(len+1); char * t = mem; unsigned sendlen = len; t[len]=0; try { if (useHTTP) { try { socket->read(t, 0, len, sendlen); } catch (IException *E) { if (E->errorCode()!= JSOCKERR_graceful_close) throw; E->Release(); break; } if (!sendlen) break; } else if (readBlocked) socket->receive_block(t, len); else socket->read(t, len); } catch(IException * e) { pexception("failed to read data", e); return 1; } if (pluginRequest) { //Not very robust! A poor man's implementation for testing... StringBuffer dllname, libname; const char * dot = strchr(t, '.'); dllname.append("\\edata\\bin\\debug\\").append(t); libname.append("\\edata\\bin\\debug\\").append(dot-t,t).append(".lib"); sendFile(dllname.str(), socket); sendFile(libname.str(), socket); } else if (dataBlockRequest) { //Not very robust! A poor man's implementation for testing... offset_t offset; memcpy(&offset, t, sizeof(offset)); _WINREV(offset); sendFileChunk(t+sizeof(offset), offset, socket); } else { if (isBlockedResult) { t += 8; t += strlen(t)+1; sendlen -= (t - mem); } if (echoResults && (!is_status || showStatus)) { fwrite(t, sendlen, 1, stdout); fflush(stdout); } if (!is_status) result.append(sendlen, t); } free(mem); if (abortAfterFirst) return 0; } return 0; } class ReceiveThread : public Thread { public: virtual int run(); }; int ReceiveThread::run() { ISocket * socket = ISocket::create(3456); ISocket * client = socket->accept(); StringBuffer result; readResults(client, parallelBlocked, false, result); client->Release(); socket->Release(); finishedReading.signal(); return 0; } //------------------------------------------------------------------------ /** * Return: 0 - success * nonzero - error */ int doSendQuery(const char * ip, unsigned port, const char * base) { ISocket * socket; __int64 starttime, endtime; StringBuffer ipstr; try { if (strcmp(ip, ".")==0) ip = GetCachedHostName(); else { const char *dash = strchr(ip, '-'); if (dash && isdigit(dash[1]) && dash>ip && isdigit(dash[-1])) { if (persistConnections) UNIMPLEMENTED; const char *startrange = dash-1; while (isdigit(startrange[-1])) startrange--; char *endptr; unsigned firstnum = atoi(startrange); unsigned lastnum = strtol(dash+1, &endptr, 10); if (lastnum > firstnum) { static unsigned counter; static CriticalSection counterCrit; CriticalBlock b(counterCrit); ipstr.append(startrange - ip, ip).append((counter++ % (lastnum+1-firstnum)) + firstnum).append(endptr); ip = ipstr.str(); printf("Sending to %s\n", ip); } } } starttime= get_cycles_now(); if (persistConnections) { if (!persistSocket) { SocketEndpoint ep(ip,port); persistSocket = ISocket::connect_timeout(ep, 1000); } socket = persistSocket; } else { SocketEndpoint ep(ip,port); socket = ISocket::connect_timeout(ep,1000); } } catch(IException * e) { pexception("failed to connect to server", e); return 1; } StringBuffer fullQuery; bool useHTTP = forceHTTP || strstr(base, " p = createPTreeFromXMLString(base, false, false); const char *queryName = p->queryName(); if ((stricmp(queryName, "envelope") != 0) && (stricmp(queryName, "envelope") != 0)) { if (queryNameOverride.length()) queryName = queryNameOverride; newQuery.appendf("<%sRequest>", queryName); Owned elements = p->getElements("./*"); ForEach(*elements) { IPTree &elem = elements->query(); toXML(&elem, newQuery, 0, XML_SingleQuoteAttributeValues); } newQuery.appendf("", queryName); base = newQuery.str(); } // note - don't support queryname override unless original query is xml fullQuery.appendf("POST /doc HTTP/1.0\r\nContent-Type: application/x-www-form-urlencoded\r\nContent-Length: %d\r\n\r\n", (int) strlen(base)).append(base); } else { if (sendToSocket) { Thread * receive = new ReceiveThread(); receive->start(); receive->Release(); } if (doLock) { const char *lock = ""; unsigned locklen = strlen(lock); _WINREV(locklen); socket->write(&locklen, sizeof(locklen)); socket->write(lock, strlen(lock)); StringBuffer lockResult; readResults(socket, false, false, lockResult); } if (queryNameOverride.length()) { try { Owned p = createPTreeFromXMLString(base, false, false); p->renameProp("/", queryNameOverride); toXML(p, fullQuery.clear()); } catch (IException *E) { StringBuffer s; printf("Error: %s", E->errorMessage(s).str()); E->Release(); return 1; } } else fullQuery.append(base); } const char * query = fullQuery.toCharArray(); int len=strlen(query); int sendlen = len; if (persistConnections) sendlen |= 0x80000000; _WINREV(sendlen); try { if (!useHTTP) socket->write(&sendlen, sizeof(sendlen)); socket->write(query, len); if (sendFileAfterQuery) { FILE *in = fopen(sendFileName.str(), "rb"); if (in) { char buffer[1024]; for (;;) { len = fread(buffer, 1, sizeof(buffer), in); sendlen = len; _WINREV(sendlen); socket->write(&sendlen, sizeof(sendlen)); if (!len) break; socket->write(buffer, len); } fclose(in); } else printf("File %s could not be opened\n", sendFileName.str()); } } catch(IException * e) { pexception("failed to write data", e); return 1; } if (abortEarly) return 0; // back-end does some processing..... StringBuffer result; int ret = readResults(socket, false, useHTTP, result); if ((ret == 0) && !justResults) { endtime = get_cycles_now(); CriticalBlock b(traceCrit); fprintf(trace, "query: %s\n", query); if (saveResults) fprintf(trace, "result: %s\n", result.str()); if (showTiming) fprintf(trace, "Time taken = %.3f msecs\n", (double)(cycle_to_nanosec(endtime - starttime)/1000000)); fputs("----------------------------------------------------------------------------\n", trace); } if (!persistConnections) { socket->close(); socket->Release(); } return 0; } class QueryThread : public Thread { public: QueryThread(const char * _ip, unsigned _port, const char * _base) : ip(_ip),port(_port),base(_base) {} virtual int run() { doSendQuery(ip, port, base); done.signal(); okToSend.signal(); return 0; } protected: StringAttr ip; unsigned port; StringAttr base; }; int sendQuery(const char * ip, unsigned port, const char * base) { if (!multiThread) return doSendQuery(ip, port, base); if (multiThreadMax) okToSend.wait(); runningQueries++; Thread * thread = new QueryThread(ip, port, base); thread->start(); thread->Release(); return 0; } void usage(int exitCode) { printf("testsocket ip<:port> [flags] [query | -f[f] file.sql]\n"); printf(" -a abort before input recieved\n"); printf(" -a1 abort after first packet receieved\n"); printf(" -c test sending response to a socket\n"); printf(" -cb test sending response to a block mode socket\n"); printf(" -d force delay after each packet\n"); printf(" -f take query from file\n"); printf(" -ff take multiple queries from file, one per line\n"); printf(" -tff take multiple queries from file, one per line, preceded by the time at which it should be submitted (relative to time on first line)\n"); printf(" -k don't save the results to result.txt\n"); printf(" -m only save results to result.txt\n"); printf(" -maxLineSize set maximum query line length\n"); printf(" -n multiple results - keep going until socket closes\n"); printf(" -o set output filename\n"); printf(" -persist use persistant connection\n"); printf(" -pr add a prefix to the query\n"); printf(" -q quiet - don't echo query\n"); printf(" -qname xx Use xx as queryname in place of the xml root element name\n"); printf(" -r repeat the query several times\n"); printf(" -rl roxie logfile mode\n"); printf(" -s add stars to indicate transfer packets\n"); printf(" -ss suppress XML Status messages to screen (always suppressed from tracefile)\n"); printf(" -td add debug timing statistics to trace\n"); printf(" -tf add full timing statistics to trace\n"); printf(" -time add timing to trace\n"); printf(" -u run queries on separate threads\n"); printf(" -cascade cascade query (to all roxie nodes)\n"); printf(" -lock locked cascade query (to all roxie nodes)\n"); exit(exitCode); } int main(int argc, char **argv) { #if 0 { SocketListCreator c; SocketEndpoint ep; ep.ip.ip[0] = 192; ep.ip.ip[1] = 168; ep.ip.ip[2] = 6; ep.ip.ip[3] = 186; ep.port = 3004; c.addSocket(ep); ep.ip.ip[3] = 180; c.addSocket(ep); c.addSocket("192.168.6.187", 3004); c.addSocket("192.168.6.188", 3004); c.addSocket("192.168.6.189", 3004); c.addSocket("192.168.6.190", 3004); c.addSocket("192.168.6.191", 3004); c.addSocket("192.168.6.192", 3004); c.addSocket("192.168.6.194", 3004); c.addSocket("192.168.6.195", 3004); c.addSocket("192.168.6.196", 3004); c.addSocket("192.168.6.197", 3004); c.addSocket("192.168.6.198", 3004); c.addSocket("192.168.6.199", 3004); c.addSocket("192.168.7.115", 3004); printf("%s\n",c.getText()); SocketListParser p(c.getText()); p.first(0); StringAttr ip; unsigned port; while (p.next(ip, port)) printf("%s:%d\n",ip.get(),port); return 0; } #endif #ifndef _WIN32 InitModuleObjects(); #endif StringAttr outputName("result.txt"); bool fromFile = false; bool fromMultiFile = false; bool timedReplay = false; if (argc < 2 && !(argc==2 && strstr(argv[1], "::"))) usage(1); int arg = 2; bool echoSingle = true; while (arg < argc && *argv[arg]=='-') { if (stricmp(argv[arg], "-time") == 0) { showTiming = true; ++arg; } else if (stricmp(argv[arg], "-a") == 0) { abortEarly = true; ++arg; } else if (stricmp(argv[arg], "-a1") == 0) { abortAfterFirst = true; ++arg; } else if (stricmp(argv[arg], "-c") == 0) { sendToSocket = true; ++arg; } else if (stricmp(argv[arg], "-cb") == 0) { sendToSocket = true; parallelBlocked = true; ++arg; } else if (stricmp(argv[arg], "-d") == 0) { delay = 300; ++arg; } else if (stricmp(argv[arg], "-http") == 0) { forceHTTP = true; ++arg; } else if (stricmp(argv[arg], "-f") == 0) { fromFile = true; ++arg; } else if (stricmp(argv[arg], "-ff") == 0) { fromMultiFile = true; ++arg; } else if (stricmp(argv[arg], "-tff") == 0) { fromMultiFile = true; timedReplay = true; ++arg; } else if (stricmp(argv[arg], "-k") == 0) { saveResults = false; ++arg; } else if (stricmp(argv[arg], "-m") == 0) { justResults = true; ++arg; } else if (stricmp(argv[arg], "-maxlinesize") == 0) { ++arg; if (arg>=argc) usage(1); maxLineSize = atoi(argv[arg]); ++arg; } else if (stricmp(argv[arg], "-n") == 0) { manyResults = true; ++arg; } else if (stricmp(argv[arg], "-o") == 0) { outputName.set(argv[arg+1]); arg+=2; } else if (stricmp(argv[arg], "-persist") == 0) { persistConnections = true; ++arg; } else if (stricmp(argv[arg], "-pr") == 0) { queryPrefix.append(argv[arg+1]); arg+=2; } else if (stricmp(argv[arg], "-q") == 0) { echoSingle = false; ++arg; } else if (stricmp(argv[arg], "-qname") == 0) { queryNameOverride.set(argv[arg+1]); arg+=2; } else if (stricmp(argv[arg], "-r") == 0) { ++arg; if (arg>=argc) usage(1); repeats = atoi(argv[arg]); ++arg; } else if (stricmp(argv[arg], "-rl") == 0) { roxieLogMode = true; fromMultiFile = true; ++arg; } else if (stricmp(argv[arg], "-ss") == 0) { showStatus = false; ++arg; } else if (memicmp(argv[arg], "-u", 2) == 0) { multiThread = true; multiThreadMax = atoi(argv[arg]+2); if (multiThreadMax) okToSend.signal(multiThreadMax); ++arg; } else if (stricmp(argv[arg], "-lock") == 0) { doLock = true; ++arg; } else if (memicmp(argv[arg], "-z", 2) == 0) { sendFileAfterQuery = true; sendFileName.append(argv[arg+1]); OwnedIFile f = createIFile(sendFileName.str()); if (!f->exists() || !f->isFile()) { printf("file %s does not exist\n", sendFileName.str()); exit (EXIT_FAILURE); } arg+=2; } else { printf("Unknown argument %s, ignored\n", argv[arg]); ++arg; } } if (persistConnections && multiThread) { printf("Multi-thread (-u) not available with -persist - ignored\n"); multiThread = false; } StringAttr ip; unsigned socketPort = ROXIE_SERVER_PORT; SplitIpPort(ip, socketPort, argv[1]); int ret = 0; trace = fopen(outputName, "w"); __int64 starttime,endtime; starttime = get_cycles_now(); if (arg < argc) { echoResults = echoSingle; do { const char * query = argv[arg]; if (fromMultiFile) { FILE *in = fopen(query, "rt"); if (in) { CDateTime firstTime; CDateTime startTime; bool firstLine = true; char *buffer = new char[maxLineSize]; for (;;) { if (fgets(buffer, maxLineSize, in)==NULL) // buffer overflow possible - do I care? break; if (timedReplay) { if (firstLine) { firstTime.setNow(); firstTime.setTimeString(buffer, &query); startTime.setNow(); } else { CDateTime queryTime, nowTime; queryTime.setNow(); queryTime.setTimeString(buffer, &query); nowTime.setNow(); int sleeptime = (queryTime.getSimple()-firstTime.getSimple()) - (nowTime.getSimple()-startTime.getSimple()); if (sleeptime < 0) DBGLOG("Running behind %d seconds", -sleeptime); else if (sleeptime) { DBGLOG("Sleeping %d seconds", sleeptime); Sleep(sleeptime*1000); } StringBuffer targetTime; queryTime.getTimeString(targetTime); DBGLOG("Virtual time is %s", targetTime.str()); } } else { query = buffer; while (isspace(*query)) query++; if (roxieLogMode) { char *start = (char *) strchr(query, '<'); if (start) { char *end = (char *) strchr(start, '"'); if (end && end[1]=='\n') { query = start; *end = 0; } } } } if (query) { ret = sendQuery(ip, socketPort, query); firstLine = false; } } delete [] buffer; fclose(in); } else printf("File %s could not be opened\n", query); } else if (fromFile) { FILE *in = fopen(query, "rt"); if (in) { StringBuffer fileContents; char buffer[1024]; int bytes; for (;;) { bytes = fread(buffer, 1, sizeof(buffer), in); if (!bytes) break; fileContents.append(buffer, 0, bytes); } fclose(in); ret = sendQuery(ip, socketPort, fileContents.toCharArray()); } else printf("File %s could not be opened\n", query); } else { ret = sendQuery(ip, socketPort, query); if (sendToSocket) finishedReading.wait(); } } while (--repeats > 0); } while (runningQueries--) done.wait(); if (persistConnections && persistSocket) { int sendlen=0; persistSocket->write(&sendlen, sizeof(sendlen)); persistSocket->close(); persistSocket->Release(); } endtime = get_cycles_now(); if (!justResults) { fprintf(trace, "Total Time taken = %.3f msecs\n", (double)(cycle_to_nanosec(endtime - starttime)/1000000)); fputs("----------------------------------------------------------------------------\n", trace); } fclose(trace); #ifdef _DEBUG releaseAtoms(); #endif return ret; }