123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include <platform.h>
- #include <stdio.h>
- #include "jmisc.hpp"
- #include "jlib.hpp"
- #include "jsocket.hpp"
- #include "jstream.ipp"
- #include "portlist.h"
- #include "jdebug.hpp"
- #include "jthread.hpp"
- #include "jfile.hpp"
- bool abortEarly = false;
- bool forceHTTP = false;
- bool abortAfterFirst = false;
- bool echoResults = false;
- bool saveResults = true;
- bool showTiming = false;
- bool showStatus = true;
- bool sendToSocket = false;
- bool parallelBlocked = false;
- bool justResults = false;
- bool multiThread = false;
- bool manyResults = false;
- bool sendFileAfterQuery = false;
- bool doLock = false;
- bool roxieLogMode = false;
- bool rawOnly = false;
- StringBuffer sendFileName;
- StringAttr queryNameOverride;
- unsigned delay = 0;
- unsigned runningQueries;
- unsigned multiThreadMax;
- unsigned maxLineSize = 10000000;
- ISocket *persistSocket;
- bool persistConnections;
- int repeats = 0;
- StringBuffer queryPrefix;
- Semaphore okToSend;
- Semaphore done;
- Semaphore finishedReading;
- FILE * trace;
- CriticalSection traceCrit;
- //---------------------------------------------------------------------------
- void SplitIpPort(StringAttr & ip, unsigned & port, const char * address)
- {
- const char * colon = strchr(address, ':');
- if (colon)
- {
- ip.set(address,colon-address);
- port = atoi(colon+1);
- }
- else
- ip.set(address);
- }
- void showMessage(const char * text)
- {
- if (!justResults)
- {
- if (echoResults)
- fwrite(text, strlen(text), 1, stdout);
- if (saveResults && trace != NULL)
- fwrite(text, strlen(text), 1, trace);
- }
- }
- void sendFile(const char * filename, ISocket * socket)
- {
- FILE *in = fopen(filename, "rb");
- unsigned size = 0;
- void * buff = NULL;
- if (in)
- {
- fseek(in, 0, SEEK_END);
- size = ftell(in);
- fseek(in, 0, SEEK_SET);
- buff = malloc(size);
- size_t numRead = fread(buff, 1, size, in);
- fclose(in);
- if (numRead != size)
- {
- printf("read from file %s failed (%u/%u)\n", filename, (unsigned)numRead, size);
- size = 0;
- }
- }
- else
- printf("read from file %s failed\n", filename);
- unsigned dllLen = size;
- _WINREV(dllLen);
- socket->write(&dllLen, sizeof(dllLen));
- socket->write(buff, size);
- free(buff);
- }
- #define CHUNK_SIZE 152*2000
- void sendFileChunk(const char * filename, offset_t offset, ISocket * socket)
- {
- FILE *in = fopen(filename, "rb");
- unsigned size = 0;
- void * buff = NULL;
- if (in)
- {
- fseek(in, 0, SEEK_END);
- offset_t endOffset = ftell(in);
- fseek(in, offset, SEEK_SET);
- if (endOffset < offset)
- size = 0;
- else
- size = (unsigned)(endOffset - offset);
- if (size > CHUNK_SIZE)
- size = CHUNK_SIZE;
- buff = malloc(size);
- size_t numRead = fread(buff, 1, size, in);
- fclose(in);
- if (numRead != size)
- {
- printf("read from file %s failed (%u/%u)\n", filename, (unsigned)numRead, size);
- size = 0;
- }
- }
- else
- printf("read from file %s failed\n", filename);
- if (size > 0)
- {
- MemoryBuffer sendBuffer;
- unsigned rev = size + strlen(filename) + 10;
- rev |= 0x80000000;
- _WINREV(rev);
- sendBuffer.append(rev);
- sendBuffer.append('R');
- rev = 0; // should put the sequence number here
- _WINREV(rev);
- sendBuffer.append(rev);
- rev = 0; // should put the # of recs in msg here
- _WINREV(rev);
- sendBuffer.append(rev);
- sendBuffer.append(strlen(filename)+1, filename);
- sendBuffer.append(size, buff);
- socket->write(sendBuffer.toByteArray(), sendBuffer.length());
- }
- else
- {
- unsigned zeroLen = 0;
- socket->write(&zeroLen, sizeof(zeroLen));
- }
- free(buff);
- }
- int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &result)
- {
- if (readBlocked)
- socket->set_block_mode(BF_SYNC_TRANSFER_PULL,0,60*1000);
- unsigned len;
- bool is_status;
- bool isBlockedResult;
- for (;;)
- {
- if (delay)
- MilliSleep(delay);
- is_status = false;
- isBlockedResult = false;
- try
- {
- if (useHTTP)
- len = 0x10000;
- else if (readBlocked)
- len = socket->receive_block_size();
- else
- {
- socket->read(&len, sizeof(len));
- _WINREV(len);
- }
- }
- catch(IException * e)
- {
- if (manyResults)
- showMessage("End of result multiple set\n");
- else
- pexception("failed to read len data", e);
- e->Release();
- return 1;
- }
- if (len == 0)
- {
- if (manyResults)
- {
- showMessage("----End of result set----\n");
- continue;
- }
- break;
- }
- bool isSpecial = false;
- bool pluginRequest = false;
- bool dataBlockRequest = false;
- if (len & 0x80000000)
- {
- unsigned char flag;
- isSpecial = true;
- socket->read(&flag, sizeof(flag));
- switch (flag)
- {
- case '-':
- if (echoResults)
- fputs("Error:", stdout);
- if (saveResults && trace != NULL)
- fputs("Error:", trace);
- break;
- case 'D':
- showMessage("request for datablock\n");
- dataBlockRequest = true;
- break;
- case 'P':
- showMessage("request for plugin\n");
- pluginRequest = true;
- break;
- case 'S':
- if (showStatus)
- showMessage("Status:");
- is_status=true;
- break;
- case 'T':
- showMessage("Timing:\n");
- break;
- case 'X':
- showMessage("---Compound query finished---\n");
- return 1;
- case 'R':
- isBlockedResult = true;
- break;
- }
- len &= 0x7FFFFFFF;
- len--; // flag already read
- }
- char * mem = (char*) malloc(len+1);
- char * t = mem;
- unsigned sendlen = len;
- t[len]=0;
- try
- {
- if (useHTTP)
- {
- try
- {
- socket->read(t, 0, len, sendlen);
- }
- catch (IException *E)
- {
- if (E->errorCode()!= JSOCKERR_graceful_close)
- throw;
- E->Release();
- break;
- }
- if (!sendlen)
- break;
- }
- else if (readBlocked)
- socket->receive_block(t, len);
- else
- socket->read(t, len);
- }
- catch(IException * e)
- {
- pexception("failed to read data", e);
- return 1;
- }
- if (pluginRequest)
- {
- //Not very robust! A poor man's implementation for testing...
- StringBuffer dllname, libname;
- const char * dot = strchr(t, '.');
- dllname.append("\\edata\\bin\\debug\\").append(t);
- libname.append("\\edata\\bin\\debug\\").append(dot-t,t).append(".lib");
- sendFile(dllname.str(), socket);
- sendFile(libname.str(), socket);
- }
- else if (dataBlockRequest)
- {
- //Not very robust! A poor man's implementation for testing...
- offset_t offset;
- memcpy(&offset, t, sizeof(offset));
- _WINREV(offset);
- sendFileChunk(t+sizeof(offset), offset, socket);
- }
- else
- {
- if (isBlockedResult)
- {
- t += 8;
- t += strlen(t)+1;
- sendlen -= (t - mem);
- }
- if (echoResults && (!is_status || showStatus))
- {
- fwrite(t, sendlen, 1, stdout);
- fflush(stdout);
- }
- if (!is_status)
- result.append(sendlen, t);
- }
- free(mem);
- if (abortAfterFirst)
- return 0;
- }
- return 0;
- }
- class ReceiveThread : public Thread
- {
- public:
- virtual int run();
- };
- int ReceiveThread::run()
- {
- ISocket * socket = ISocket::create(3456);
- ISocket * client = socket->accept();
- StringBuffer result;
- readResults(client, parallelBlocked, false, result);
- client->Release();
- socket->Release();
- finishedReading.signal();
- return 0;
- }
- //------------------------------------------------------------------------
- /**
- * Return: 0 - success
- * nonzero - error
- */
- int doSendQuery(const char * ip, unsigned port, const char * base)
- {
- ISocket * socket;
- __int64 starttime, endtime;
- StringBuffer ipstr;
- try
- {
- if (strcmp(ip, ".")==0)
- ip = GetCachedHostName();
- else
- {
- const char *dash = strchr(ip, '-');
- if (dash && isdigit(dash[1]) && dash>ip && isdigit(dash[-1]))
- {
- if (persistConnections)
- UNIMPLEMENTED;
- const char *startrange = dash-1;
- while (isdigit(startrange[-1]))
- startrange--;
- char *endptr;
- unsigned firstnum = atoi(startrange);
- unsigned lastnum = strtol(dash+1, &endptr, 10);
- if (lastnum > firstnum)
- {
- static unsigned counter;
- static CriticalSection counterCrit;
- CriticalBlock b(counterCrit);
- ipstr.append(startrange - ip, ip).append((counter++ % (lastnum+1-firstnum)) + firstnum).append(endptr);
- ip = ipstr.str();
- printf("Sending to %s\n", ip);
- }
- }
- }
- starttime= get_cycles_now();
- if (persistConnections)
- {
- if (!persistSocket) {
- SocketEndpoint ep(ip,port);
- persistSocket = ISocket::connect_timeout(ep, 1000);
- }
- socket = persistSocket;
- }
- else {
- SocketEndpoint ep(ip,port);
- socket = ISocket::connect_timeout(ep,1000);
- }
- }
- catch(IException * e)
- {
- pexception("failed to connect to server", e);
- return 1;
- }
- StringBuffer fullQuery;
- bool useHTTP = forceHTTP || strstr(base, "<soap:Envelope") != NULL;
- if (useHTTP)
- {
- StringBuffer newQuery;
- Owned<IPTree> p = createPTreeFromXMLString(base, ipt_none, ptr_none);
- const char *queryName = p->queryName();
- if ((stricmp(queryName, "envelope") != 0) && (stricmp(queryName, "envelope") != 0))
- {
- if (queryNameOverride.length())
- queryName = queryNameOverride;
- newQuery.appendf("<Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\"><Body><%sRequest>", queryName);
- Owned<IPTreeIterator> elements = p->getElements("./*");
- ForEach(*elements)
- {
- IPTree &elem = elements->query();
- toXML(&elem, newQuery, 0, XML_SingleQuoteAttributeValues);
- }
- newQuery.appendf("</%sRequest></Body></Envelope>", queryName);
- base = newQuery.str();
- }
- // note - don't support queryname override unless original query is xml
- fullQuery.appendf("POST /doc HTTP/1.0\r\nContent-Type: application/x-www-form-urlencoded\r\nContent-Length: %d\r\n\r\n", (int) strlen(base)).append(base);
- }
- else
- {
- if (sendToSocket)
- {
- Thread * receive = new ReceiveThread();
- receive->start();
- receive->Release();
- }
- if (doLock)
- {
- const char *lock = "<control:lock/>";
- unsigned locklen = strlen(lock);
- _WINREV(locklen);
- socket->write(&locklen, sizeof(locklen));
- socket->write(lock, strlen(lock));
- StringBuffer lockResult;
- readResults(socket, false, false, lockResult);
- }
- if (queryNameOverride.length())
- {
- try
- {
- Owned<IPTree> p = createPTreeFromXMLString(base, ipt_none, ptr_none);
- p->renameProp("/", queryNameOverride);
- toXML(p, fullQuery.clear());
- }
- catch (IException *E)
- {
- StringBuffer s;
- printf("Error: %s", E->errorMessage(s).str());
- E->Release();
- return 1;
- }
- }
- else
- fullQuery.append(base);
- }
- const char * query = fullQuery.str();
- int len=strlen(query);
- int sendlen = len;
- if (persistConnections)
- sendlen |= 0x80000000;
- _WINREV(sendlen);
- try
- {
- if (!useHTTP)
- socket->write(&sendlen, sizeof(sendlen));
- socket->write(query, len);
- if (sendFileAfterQuery)
- {
- FILE *in = fopen(sendFileName.str(), "rb");
- if (in)
- {
- char buffer[1024];
- for (;;)
- {
- len = fread(buffer, 1, sizeof(buffer), in);
- sendlen = len;
- _WINREV(sendlen);
- socket->write(&sendlen, sizeof(sendlen));
- if (!len)
- break;
- socket->write(buffer, len);
- }
- fclose(in);
- }
- else
- printf("File %s could not be opened\n", sendFileName.str());
- }
- }
- catch(IException * e)
- {
- pexception("failed to write data", e);
- return 1;
- }
- if (abortEarly)
- return 0;
-
- // back-end does some processing.....
- StringBuffer result;
- int ret = readResults(socket, false, useHTTP, result);
- if ((ret == 0) && !justResults)
- {
- endtime = get_cycles_now();
- CriticalBlock b(traceCrit);
- if (trace != NULL)
- {
- if (rawOnly == false)
- {
- fprintf(trace, "query: %s\n", query);
- if (saveResults)
- fprintf(trace, "result: %s\n", result.str());
- }
- else
- {
- fprintf(trace, "%s", result.str());
- }
- if (showTiming && rawOnly == false)
- {
- fprintf(trace, "Time taken = %.3f msecs\n", (double)(cycle_to_nanosec(endtime - starttime)/1000000));
- fputs("----------------------------------------------------------------------------\n", trace);
- }
- }
- }
- if (!persistConnections)
- {
- socket->close();
- socket->Release();
- }
- return 0;
- }
- class QueryThread : public Thread
- {
- public:
- QueryThread(const char * _ip, unsigned _port, const char * _base) : ip(_ip),port(_port),base(_base) {}
- virtual int run() { doSendQuery(ip, port, base); done.signal(); okToSend.signal(); return 0; }
- protected:
- StringAttr ip;
- unsigned port;
- StringAttr base;
- };
- int sendQuery(const char * ip, unsigned port, const char * base)
- {
- if (!multiThread)
- return doSendQuery(ip, port, base);
- if (multiThreadMax)
- okToSend.wait();
- runningQueries++;
- Thread * thread = new QueryThread(ip, port, base);
- thread->start();
- thread->Release();
- return 0;
- }
- void usage(int exitCode)
- {
- printf("testsocket ip<:port> [flags] [query | -f[f] file.sql | -]\n");
- printf(" - take query from stdin\n");
- printf(" -a abort before input received\n");
- printf(" -a1 abort after first packet receieved\n");
- printf(" -c test sending response to a socket\n");
- printf(" -cb test sending response to a block mode socket\n");
- printf(" -d force delay after each packet\n");
- printf(" -f take query from file\n");
- printf(" -ff take multiple queries from file, one per line\n");
- printf(" -tff take multiple queries from file, one per line, preceded by the time at which it should be submitted (relative to time on first line)\n");
- printf(" -k don't save the results to result.txt\n");
- printf(" -m only save results to result.txt\n");
- printf(" -maxLineSize <n> set maximum query line length\n");
- printf(" -n multiple results - keep going until socket closes\n");
- printf(" -o set output filename\n");
- printf(" -or set output filename for raw output\n");
- printf(" -persist use persistent connection\n");
- printf(" -pr <text>add a prefix to the query\n");
- printf(" -q quiet - don't echo query\n");
- printf(" -qname xx Use xx as queryname in place of the xml root element name\n");
- printf(" -r <n> repeat the query several times\n");
- printf(" -rl roxie logfile mode\n");
- printf(" -s add stars to indicate transfer packets\n");
- printf(" -ss suppress XML Status messages to screen (always suppressed from tracefile)\n");
- printf(" -td add debug timing statistics to trace\n");
- printf(" -tf add full timing statistics to trace\n");
- printf(" -time add timing to trace\n");
- printf(" -u<max> run queries on separate threads\n");
- printf(" -cascade cascade query (to all roxie nodes)\n");
- printf(" -lock locked cascade query (to all roxie nodes)\n");
-
- exit(exitCode);
- }
- int main(int argc, char **argv)
- {
- InitModuleObjects();
- StringAttr outputName("result.txt");
- bool fromFile = false;
- bool fromStdIn = false;
- bool fromMultiFile = false;
- bool timedReplay = false;
- if (argc < 2 && !(argc==2 && strstr(argv[1], "::")))
- usage(1);
- int arg = 2;
- bool echoSingle = true;
- while (arg < argc && *argv[arg]=='-')
- {
- if (stricmp(argv[arg], "-time") == 0)
- {
- showTiming = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-a") == 0)
- {
- abortEarly = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-a1") == 0)
- {
- abortAfterFirst = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-c") == 0)
- {
- sendToSocket = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-cb") == 0)
- {
- sendToSocket = true;
- parallelBlocked = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-d") == 0)
- {
- delay = 300;
- ++arg;
- }
- else if (stricmp(argv[arg], "-http") == 0)
- {
- forceHTTP = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-") == 0)
- {
- fromStdIn = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-f") == 0)
- {
- fromFile = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-ff") == 0)
- {
- fromMultiFile = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-tff") == 0)
- {
- fromMultiFile = true;
- timedReplay = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-k") == 0)
- {
- saveResults = false;
- ++arg;
- }
- else if (stricmp(argv[arg], "-m") == 0)
- {
- justResults = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-maxlinesize") == 0)
- {
- ++arg;
- if (arg>=argc)
- usage(1);
- maxLineSize = atoi(argv[arg]);
- ++arg;
- }
- else if (stricmp(argv[arg], "-n") == 0)
- {
- manyResults = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-o") == 0)
- {
- outputName.set(argv[arg+1]);
- arg+=2;
- }
- else if (stricmp(argv[arg], "-or") == 0)
- {
- rawOnly = true;
- outputName.set(argv[arg+1]);
- arg+=2;
- }
- else if (stricmp(argv[arg], "-persist") == 0)
- {
- persistConnections = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-pr") == 0)
- {
- queryPrefix.append(argv[arg+1]);
- arg+=2;
- }
- else if (stricmp(argv[arg], "-q") == 0)
- {
- echoSingle = false;
- ++arg;
- }
- else if (stricmp(argv[arg], "-qname") == 0)
- {
- queryNameOverride.set(argv[arg+1]);
- arg+=2;
- }
- else if (stricmp(argv[arg], "-r") == 0)
- {
- ++arg;
- if (arg>=argc)
- usage(1);
- repeats = atoi(argv[arg]);
- ++arg;
- }
- else if (stricmp(argv[arg], "-rl") == 0)
- {
- roxieLogMode = true;
- fromMultiFile = true;
- ++arg;
- }
- else if (stricmp(argv[arg], "-ss") == 0)
- {
- showStatus = false;
- ++arg;
- }
- else if (memicmp(argv[arg], "-u", 2) == 0)
- {
- multiThread = true;
- multiThreadMax = atoi(argv[arg]+2);
- if (multiThreadMax)
- okToSend.signal(multiThreadMax);
- ++arg;
- }
- else if (stricmp(argv[arg], "-lock") == 0)
- {
- doLock = true;
- ++arg;
- }
- else if (memicmp(argv[arg], "-z", 2) == 0)
- {
- sendFileAfterQuery = true;
- sendFileName.append(argv[arg+1]);
- OwnedIFile f = createIFile(sendFileName.str());
- if (!f->exists() || !f->isFile())
- {
- printf("file %s does not exist\n", sendFileName.str());
- exit (EXIT_FAILURE);
- }
- arg+=2;
- }
- else
- {
- printf("Unknown argument %s, ignored\n", argv[arg]);
- ++arg;
- }
- }
- if (persistConnections && multiThread)
- {
- printf("Multi-thread (-u) not available with -persist - ignored\n");
- multiThread = false;
- }
- StringAttr ip;
- unsigned socketPort = ROXIE_SERVER_PORT;
- SplitIpPort(ip, socketPort, argv[1]);
- int ret = 0;
- trace = fopen(outputName, "w");
- if (trace == NULL)
- {
- printf("Can't open %s for writing\n", outputName.str());
- }
- __int64 starttime,endtime;
- starttime = get_cycles_now();
- if (arg < argc || fromStdIn)
- {
- echoResults = echoSingle;
- do
- {
- const char * query = argv[arg];
- if (fromMultiFile)
- {
- FILE *in = fopen(query, "rt");
- if (in)
- {
- CDateTime firstTime;
- CDateTime startTime;
- bool firstLine = true;
- char *buffer = new char[maxLineSize];
- for (;;)
- {
- if (fgets(buffer, maxLineSize, in)==NULL) // buffer overflow possible - do I care?
- break;
- if (timedReplay)
- {
- if (firstLine)
- {
- firstTime.setNow();
- firstTime.setTimeString(buffer, &query);
- startTime.setNow();
- }
- else
- {
- CDateTime queryTime, nowTime;
- queryTime.setNow();
- queryTime.setTimeString(buffer, &query);
- nowTime.setNow();
- int sleeptime = (int)((queryTime.getSimple()-firstTime.getSimple()) - (nowTime.getSimple()-startTime.getSimple()));
- if (sleeptime < 0)
- DBGLOG("Running behind %d seconds", -sleeptime);
- else if (sleeptime)
- {
- DBGLOG("Sleeping %d seconds", sleeptime);
- Sleep(sleeptime*1000);
- }
- StringBuffer targetTime;
- queryTime.getTimeString(targetTime);
- DBGLOG("Virtual time is %s", targetTime.str());
- }
- }
- else
- {
- query = buffer;
- while (isspace(*query)) query++;
- if (roxieLogMode)
- {
- char *start = (char *) strchr(query, '<');
- if (start)
- {
- char *end = (char *) strchr(start, '"');
- if (end && end[1]=='\n')
- {
- query = start;
- *end = 0;
- }
- }
- }
- }
- if (query)
- {
- ret = sendQuery(ip, socketPort, query);
- firstLine = false;
- }
- }
- delete [] buffer;
- fclose(in);
- }
- else
- printf("File %s could not be opened\n", query);
- }
- else if (fromFile || fromStdIn)
- {
- FILE *in = fromStdIn ? stdin : fopen(query, "rt");
- if (in)
- {
- StringBuffer fileContents;
- char buffer[1024];
- int bytes;
- for (;;)
- {
- bytes = fread(buffer, 1, sizeof(buffer), in);
- if (!bytes)
- break;
- fileContents.append(buffer, 0, bytes);
- }
- if (in != stdin)
- fclose(in);
- ret = sendQuery(ip, socketPort, fileContents.str());
- }
- else
- printf("File %s could not be opened\n", query);
- }
- else
- {
- ret = sendQuery(ip, socketPort, query);
- if (sendToSocket)
- finishedReading.wait();
- }
- } while (--repeats > 0);
- }
- else
- usage(2);
- while (runningQueries--)
- done.wait();
- if (persistConnections && persistSocket)
- {
- int sendlen=0;
- persistSocket->write(&sendlen, sizeof(sendlen));
- persistSocket->close();
- persistSocket->Release();
- }
- endtime = get_cycles_now();
- if (!justResults)
- {
- if (rawOnly == false)
- {
- if (trace != NULL)
- {
- fprintf(trace, "Total Time taken = %.3f msecs\n", (double)(cycle_to_nanosec(endtime - starttime)/1000000));
- fputs("----------------------------------------------------------------------------\n", trace);
- }
- }
- }
- if (trace != NULL)
- {
- fclose(trace);
- }
-
- #ifdef _DEBUG
- releaseAtoms();
- #endif
- return ret;
- }
|