testsocket.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include <platform.h>
  15. #include <stdio.h>
  16. #include "jmisc.hpp"
  17. #include "jlib.hpp"
  18. #include "jsocket.hpp"
  19. #include "jstream.ipp"
  20. #include "portlist.h"
  21. #include "jdebug.hpp"
  22. #include "jthread.hpp"
  23. #include "jfile.hpp"
  24. bool abortEarly = false;
  25. bool forceHTTP = false;
  26. bool abortAfterFirst = false;
  27. bool echoResults = false;
  28. bool saveResults = true;
  29. bool showTiming = false;
  30. bool showStatus = true;
  31. bool sendToSocket = false;
  32. bool parallelBlocked = false;
  33. bool justResults = false;
  34. bool multiThread = false;
  35. bool manyResults = false;
  36. bool sendFileAfterQuery = false;
  37. bool doLock = false;
  38. bool roxieLogMode = false;
  39. StringBuffer sendFileName;
  40. StringAttr queryNameOverride;
  41. unsigned delay = 0;
  42. unsigned runningQueries;
  43. unsigned multiThreadMax;
  44. unsigned maxLineSize = 10000000;
  45. ISocket *persistSocket;
  46. bool persistConnections;
  47. int repeats = 0;
  48. StringBuffer queryPrefix;
  49. Semaphore okToSend;
  50. Semaphore done;
  51. Semaphore finishedReading;
  52. FILE * trace;
  53. CriticalSection traceCrit;
  54. //---------------------------------------------------------------------------
  55. void SplitIpPort(StringAttr & ip, unsigned & port, const char * address)
  56. {
  57. const char * colon = strchr(address, ':');
  58. if (colon)
  59. {
  60. ip.set(address,colon-address);
  61. port = atoi(colon+1);
  62. }
  63. else
  64. ip.set(address);
  65. }
  66. void showMessage(const char * text)
  67. {
  68. if (!justResults)
  69. {
  70. if (echoResults)
  71. fwrite(text, strlen(text), 1, stdout);
  72. if (saveResults)
  73. fwrite(text, strlen(text), 1, trace);
  74. }
  75. }
  76. void sendFile(const char * filename, ISocket * socket)
  77. {
  78. FILE *in = fopen(filename, "rb");
  79. unsigned size = 0;
  80. void * buff = NULL;
  81. if (in)
  82. {
  83. fseek(in, 0, SEEK_END);
  84. size = ftell(in);
  85. fseek(in, 0, SEEK_SET);
  86. buff = malloc(size);
  87. size_t numRead = fread(buff, 1, size, in);
  88. fclose(in);
  89. if (numRead != size)
  90. {
  91. printf("read from file %s failed (%u/%u)\n", filename, (unsigned)numRead, size);
  92. size = 0;
  93. }
  94. }
  95. else
  96. printf("read from file %s failed\n", filename);
  97. unsigned dllLen = size;
  98. _WINREV(dllLen);
  99. socket->write(&dllLen, sizeof(dllLen));
  100. socket->write(buff, size);
  101. free(buff);
  102. }
  103. #define CHUNK_SIZE 152*2000
  104. void sendFileChunk(const char * filename, offset_t offset, ISocket * socket)
  105. {
  106. FILE *in = fopen(filename, "rb");
  107. unsigned size = 0;
  108. void * buff = NULL;
  109. if (in)
  110. {
  111. fseek(in, 0, SEEK_END);
  112. size = ftell(in);
  113. fseek(in, offset, SEEK_SET);
  114. if (size < offset)
  115. size = 0;
  116. else
  117. size -= offset;
  118. if (size > CHUNK_SIZE)
  119. size = CHUNK_SIZE;
  120. buff = malloc(size);
  121. size_t numRead = fread(buff, 1, size, in);
  122. fclose(in);
  123. if (numRead != size)
  124. {
  125. printf("read from file %s failed (%u/%u)\n", filename, (unsigned)numRead, size);
  126. size = 0;
  127. }
  128. }
  129. else
  130. printf("read from file %s failed\n", filename);
  131. if (size > 0)
  132. {
  133. MemoryBuffer sendBuffer;
  134. unsigned rev = size + strlen(filename) + 10;
  135. rev |= 0x80000000;
  136. _WINREV(rev);
  137. sendBuffer.append(rev);
  138. sendBuffer.append('R');
  139. rev = 0; // should put the sequence number here
  140. _WINREV(rev);
  141. sendBuffer.append(rev);
  142. rev = 0; // should put the # of recs in msg here
  143. _WINREV(rev);
  144. sendBuffer.append(rev);
  145. sendBuffer.append(strlen(filename)+1, filename);
  146. sendBuffer.append(size, buff);
  147. socket->write(sendBuffer.toByteArray(), sendBuffer.length());
  148. }
  149. else
  150. {
  151. unsigned zeroLen = 0;
  152. socket->write(&zeroLen, sizeof(zeroLen));
  153. }
  154. free(buff);
  155. }
  156. int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &result)
  157. {
  158. if (readBlocked)
  159. socket->set_block_mode(BF_SYNC_TRANSFER_PULL,0,60*1000);
  160. unsigned len;
  161. bool is_status;
  162. bool isBlockedResult;
  163. for (;;)
  164. {
  165. if (delay)
  166. MilliSleep(delay);
  167. is_status = false;
  168. isBlockedResult = false;
  169. try
  170. {
  171. if (useHTTP)
  172. len = 0x10000;
  173. else if (readBlocked)
  174. len = socket->receive_block_size();
  175. else
  176. {
  177. socket->read(&len, sizeof(len));
  178. _WINREV(len);
  179. }
  180. }
  181. catch(IException * e)
  182. {
  183. if (manyResults)
  184. showMessage("End of result multiple set\n");
  185. else
  186. pexception("failed to read len data", e);
  187. e->Release();
  188. return 1;
  189. }
  190. if (len == 0)
  191. {
  192. if (manyResults)
  193. {
  194. showMessage("----End of result set----\n");
  195. continue;
  196. }
  197. break;
  198. }
  199. bool isSpecial = false;
  200. bool pluginRequest = false;
  201. bool dataBlockRequest = false;
  202. if (len & 0x80000000)
  203. {
  204. unsigned char flag;
  205. isSpecial = true;
  206. socket->read(&flag, sizeof(flag));
  207. switch (flag)
  208. {
  209. case '-':
  210. if (echoResults)
  211. fputs("Error:", stdout);
  212. if (saveResults)
  213. fputs("Error:", trace);
  214. break;
  215. case 'D':
  216. showMessage("request for datablock\n");
  217. dataBlockRequest = true;
  218. break;
  219. case 'P':
  220. showMessage("request for plugin\n");
  221. pluginRequest = true;
  222. break;
  223. case 'S':
  224. if (showStatus)
  225. showMessage("Status:");
  226. is_status=true;
  227. break;
  228. case 'T':
  229. showMessage("Timing:\n");
  230. break;
  231. case 'X':
  232. showMessage("---Compound query finished---\n");
  233. return 1;
  234. case 'R':
  235. isBlockedResult = true;
  236. break;
  237. }
  238. len &= 0x7FFFFFFF;
  239. len--; // flag already read
  240. }
  241. char * mem = (char*) malloc(len+1);
  242. char * t = mem;
  243. unsigned sendlen = len;
  244. t[len]=0;
  245. try
  246. {
  247. if (useHTTP)
  248. {
  249. try
  250. {
  251. socket->read(t, 0, len, sendlen);
  252. }
  253. catch (IException *E)
  254. {
  255. if (E->errorCode()!= JSOCKERR_graceful_close)
  256. throw;
  257. E->Release();
  258. break;
  259. }
  260. if (!sendlen)
  261. break;
  262. }
  263. else if (readBlocked)
  264. socket->receive_block(t, len);
  265. else
  266. socket->read(t, len);
  267. }
  268. catch(IException * e)
  269. {
  270. pexception("failed to read data", e);
  271. return 1;
  272. }
  273. if (pluginRequest)
  274. {
  275. //Not very robust! A poor man's implementation for testing...
  276. StringBuffer dllname, libname;
  277. const char * dot = strchr(t, '.');
  278. dllname.append("\\edata\\bin\\debug\\").append(t);
  279. libname.append("\\edata\\bin\\debug\\").append(dot-t,t).append(".lib");
  280. sendFile(dllname.str(), socket);
  281. sendFile(libname.str(), socket);
  282. }
  283. else if (dataBlockRequest)
  284. {
  285. //Not very robust! A poor man's implementation for testing...
  286. offset_t offset;
  287. memcpy(&offset, t, sizeof(offset));
  288. _WINREV(offset);
  289. sendFileChunk(t+sizeof(offset), offset, socket);
  290. }
  291. else
  292. {
  293. if (isBlockedResult)
  294. {
  295. t += 8;
  296. t += strlen(t)+1;
  297. sendlen -= (t - mem);
  298. }
  299. if (echoResults && (!is_status || showStatus))
  300. {
  301. fwrite(t, sendlen, 1, stdout);
  302. fflush(stdout);
  303. }
  304. if (!is_status)
  305. result.append(sendlen, t);
  306. }
  307. free(mem);
  308. if (abortAfterFirst)
  309. return 0;
  310. }
  311. return 0;
  312. }
  313. class ReceiveThread : public Thread
  314. {
  315. public:
  316. virtual int run();
  317. };
  318. int ReceiveThread::run()
  319. {
  320. ISocket * socket = ISocket::create(3456);
  321. ISocket * client = socket->accept();
  322. StringBuffer result;
  323. readResults(client, parallelBlocked, false, result);
  324. client->Release();
  325. socket->Release();
  326. finishedReading.signal();
  327. return 0;
  328. }
  329. //------------------------------------------------------------------------
  330. /**
  331. * Return: 0 - success
  332. * nonzero - error
  333. */
  334. int doSendQuery(const char * ip, unsigned port, const char * base)
  335. {
  336. ISocket * socket;
  337. __int64 starttime, endtime;
  338. StringBuffer ipstr;
  339. try
  340. {
  341. if (strcmp(ip, ".")==0)
  342. ip = GetCachedHostName();
  343. else
  344. {
  345. const char *dash = strchr(ip, '-');
  346. if (dash && isdigit(dash[1]) && dash>ip && isdigit(dash[-1]))
  347. {
  348. if (persistConnections)
  349. UNIMPLEMENTED;
  350. const char *startrange = dash-1;
  351. while (isdigit(startrange[-1]))
  352. startrange--;
  353. char *endptr;
  354. unsigned firstnum = atoi(startrange);
  355. unsigned lastnum = strtol(dash+1, &endptr, 10);
  356. if (lastnum > firstnum)
  357. {
  358. static unsigned counter;
  359. static CriticalSection counterCrit;
  360. CriticalBlock b(counterCrit);
  361. ipstr.append(startrange - ip, ip).append((counter++ % (lastnum+1-firstnum)) + firstnum).append(endptr);
  362. ip = ipstr.str();
  363. printf("Sending to %s\n", ip);
  364. }
  365. }
  366. }
  367. starttime= get_cycles_now();
  368. if (persistConnections)
  369. {
  370. if (!persistSocket) {
  371. SocketEndpoint ep(ip,port);
  372. persistSocket = ISocket::connect_timeout(ep, 1000);
  373. }
  374. socket = persistSocket;
  375. }
  376. else {
  377. SocketEndpoint ep(ip,port);
  378. socket = ISocket::connect_timeout(ep,1000);
  379. }
  380. }
  381. catch(IException * e)
  382. {
  383. pexception("failed to connect to server", e);
  384. return 1;
  385. }
  386. StringBuffer fullQuery;
  387. bool useHTTP = forceHTTP || strstr(base, "<soap:Envelope") != NULL;
  388. if (useHTTP)
  389. {
  390. StringBuffer newQuery;
  391. Owned<IPTree> p = createPTreeFromXMLString(base, false, false);
  392. const char *queryName = p->queryName();
  393. if ((stricmp(queryName, "envelope") != 0) && (stricmp(queryName, "envelope") != 0))
  394. {
  395. if (queryNameOverride.length())
  396. queryName = queryNameOverride;
  397. newQuery.appendf("<Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\"><Body><%sRequest>", queryName);
  398. Owned<IPTreeIterator> elements = p->getElements("./*");
  399. ForEach(*elements)
  400. {
  401. IPTree &elem = elements->query();
  402. toXML(&elem, newQuery, 0, XML_SingleQuoteAttributeValues);
  403. }
  404. newQuery.appendf("</%sRequest></Body></Envelope>", queryName);
  405. base = newQuery.str();
  406. }
  407. // note - don't support queryname override unless original query is xml
  408. fullQuery.appendf("POST /doc HTTP/1.0\r\nContent-Type: application/x-www-form-urlencoded\r\nContent-Length: %d\r\n\r\n", (int) strlen(base)).append(base);
  409. }
  410. else
  411. {
  412. if (sendToSocket)
  413. {
  414. Thread * receive = new ReceiveThread();
  415. receive->start();
  416. receive->Release();
  417. }
  418. if (doLock)
  419. {
  420. const char *lock = "<control:lock/>";
  421. unsigned locklen = strlen(lock);
  422. _WINREV(locklen);
  423. socket->write(&locklen, sizeof(locklen));
  424. socket->write(lock, strlen(lock));
  425. StringBuffer lockResult;
  426. readResults(socket, false, false, lockResult);
  427. }
  428. if (queryNameOverride.length())
  429. {
  430. try
  431. {
  432. Owned<IPTree> p = createPTreeFromXMLString(base, false, false);
  433. p->renameProp("/", queryNameOverride);
  434. toXML(p, fullQuery.clear());
  435. }
  436. catch (IException *E)
  437. {
  438. StringBuffer s;
  439. printf("Error: %s", E->errorMessage(s).str());
  440. E->Release();
  441. return 1;
  442. }
  443. }
  444. else
  445. fullQuery.append(base);
  446. }
  447. const char * query = fullQuery.toCharArray();
  448. int len=strlen(query);
  449. int sendlen = len;
  450. if (persistConnections)
  451. sendlen |= 0x80000000;
  452. _WINREV(sendlen);
  453. try
  454. {
  455. if (!useHTTP)
  456. socket->write(&sendlen, sizeof(sendlen));
  457. socket->write(query, len);
  458. if (sendFileAfterQuery)
  459. {
  460. FILE *in = fopen(sendFileName.str(), "rb");
  461. if (in)
  462. {
  463. char buffer[1024];
  464. for (;;)
  465. {
  466. len = fread(buffer, 1, sizeof(buffer), in);
  467. sendlen = len;
  468. _WINREV(sendlen);
  469. socket->write(&sendlen, sizeof(sendlen));
  470. if (!len)
  471. break;
  472. socket->write(buffer, len);
  473. }
  474. fclose(in);
  475. }
  476. else
  477. printf("File %s could not be opened\n", sendFileName.str());
  478. }
  479. }
  480. catch(IException * e)
  481. {
  482. pexception("failed to write data", e);
  483. return 1;
  484. }
  485. if (abortEarly)
  486. return 0;
  487. // back-end does some processing.....
  488. StringBuffer result;
  489. int ret = readResults(socket, false, useHTTP, result);
  490. if ((ret == 0) && !justResults)
  491. {
  492. endtime = get_cycles_now();
  493. CriticalBlock b(traceCrit);
  494. fprintf(trace, "query: %s\n", query);
  495. if (saveResults)
  496. fprintf(trace, "result: %s\n", result.str());
  497. if (showTiming)
  498. fprintf(trace, "Time taken = %.3f msecs\n", (double)(cycle_to_nanosec(endtime - starttime)/1000000));
  499. fputs("----------------------------------------------------------------------------\n", trace);
  500. }
  501. if (!persistConnections)
  502. {
  503. socket->close();
  504. socket->Release();
  505. }
  506. return 0;
  507. }
  508. class QueryThread : public Thread
  509. {
  510. public:
  511. QueryThread(const char * _ip, unsigned _port, const char * _base) : ip(_ip),port(_port),base(_base) {}
  512. virtual int run() { doSendQuery(ip, port, base); done.signal(); okToSend.signal(); return 0; }
  513. protected:
  514. StringAttr ip;
  515. unsigned port;
  516. StringAttr base;
  517. };
  518. int sendQuery(const char * ip, unsigned port, const char * base)
  519. {
  520. if (!multiThread)
  521. return doSendQuery(ip, port, base);
  522. if (multiThreadMax)
  523. okToSend.wait();
  524. runningQueries++;
  525. Thread * thread = new QueryThread(ip, port, base);
  526. thread->start();
  527. thread->Release();
  528. return 0;
  529. }
  530. void usage(int exitCode)
  531. {
  532. printf("testsocket ip<:port> [flags] [query | -f[f] file.sql]\n");
  533. printf(" -a abort before input recieved\n");
  534. printf(" -a1 abort after first packet receieved\n");
  535. printf(" -c test sending response to a socket\n");
  536. printf(" -cb test sending response to a block mode socket\n");
  537. printf(" -d force delay after each packet\n");
  538. printf(" -f take query from file\n");
  539. printf(" -ff take multiple queries from file, one per line\n");
  540. printf(" -tff take multiple queries from file, one per line, preceded by the time at which it should be submitted (relative to time on first line)\n");
  541. printf(" -k don't save the results to result.txt\n");
  542. printf(" -m only save results to result.txt\n");
  543. printf(" -maxLineSize <n> set maximum query line length\n");
  544. printf(" -n multiple results - keep going until socket closes\n");
  545. printf(" -o set output filename\n");
  546. printf(" -persist use persistant connection\n");
  547. printf(" -pr <text>add a prefix to the query\n");
  548. printf(" -q quiet - don't echo query\n");
  549. printf(" -qname xx Use xx as queryname in place of the xml root element name\n");
  550. printf(" -r <n> repeat the query several times\n");
  551. printf(" -rl roxie logfile mode\n");
  552. printf(" -s add stars to indicate transfer packets\n");
  553. printf(" -ss suppress XML Status messages to screen (always suppressed from tracefile)\n");
  554. printf(" -td add debug timing statistics to trace\n");
  555. printf(" -tf add full timing statistics to trace\n");
  556. printf(" -time add timing to trace\n");
  557. printf(" -u<max> run queries on separate threads\n");
  558. printf(" -cascade cascade query (to all roxie nodes)\n");
  559. printf(" -lock locked cascade query (to all roxie nodes)\n");
  560. exit(exitCode);
  561. }
  562. int main(int argc, char **argv)
  563. {
  564. #if 0
  565. {
  566. SocketListCreator c;
  567. SocketEndpoint ep;
  568. ep.ip.ip[0] = 192;
  569. ep.ip.ip[1] = 168;
  570. ep.ip.ip[2] = 6;
  571. ep.ip.ip[3] = 186;
  572. ep.port = 3004;
  573. c.addSocket(ep);
  574. ep.ip.ip[3] = 180;
  575. c.addSocket(ep);
  576. c.addSocket("192.168.6.187", 3004);
  577. c.addSocket("192.168.6.188", 3004);
  578. c.addSocket("192.168.6.189", 3004);
  579. c.addSocket("192.168.6.190", 3004);
  580. c.addSocket("192.168.6.191", 3004);
  581. c.addSocket("192.168.6.192", 3004);
  582. c.addSocket("192.168.6.194", 3004);
  583. c.addSocket("192.168.6.195", 3004);
  584. c.addSocket("192.168.6.196", 3004);
  585. c.addSocket("192.168.6.197", 3004);
  586. c.addSocket("192.168.6.198", 3004);
  587. c.addSocket("192.168.6.199", 3004);
  588. c.addSocket("192.168.7.115", 3004);
  589. printf("%s\n",c.getText());
  590. SocketListParser p(c.getText());
  591. p.first(0);
  592. StringAttr ip;
  593. unsigned port;
  594. while (p.next(ip, port))
  595. printf("%s:%d\n",ip.get(),port);
  596. return 0;
  597. }
  598. #endif
  599. #ifndef _WIN32
  600. InitModuleObjects();
  601. #endif
  602. StringAttr outputName("result.txt");
  603. bool fromFile = false;
  604. bool fromMultiFile = false;
  605. bool timedReplay = false;
  606. if (argc < 2 && !(argc==2 && strstr(argv[1], "::")))
  607. usage(1);
  608. int arg = 2;
  609. bool echoSingle = true;
  610. while (arg < argc && *argv[arg]=='-')
  611. {
  612. if (stricmp(argv[arg], "-time") == 0)
  613. {
  614. showTiming = true;
  615. ++arg;
  616. }
  617. else if (stricmp(argv[arg], "-a") == 0)
  618. {
  619. abortEarly = true;
  620. ++arg;
  621. }
  622. else if (stricmp(argv[arg], "-a1") == 0)
  623. {
  624. abortAfterFirst = true;
  625. ++arg;
  626. }
  627. else if (stricmp(argv[arg], "-c") == 0)
  628. {
  629. sendToSocket = true;
  630. ++arg;
  631. }
  632. else if (stricmp(argv[arg], "-cb") == 0)
  633. {
  634. sendToSocket = true;
  635. parallelBlocked = true;
  636. ++arg;
  637. }
  638. else if (stricmp(argv[arg], "-d") == 0)
  639. {
  640. delay = 300;
  641. ++arg;
  642. }
  643. else if (stricmp(argv[arg], "-http") == 0)
  644. {
  645. forceHTTP = true;
  646. ++arg;
  647. }
  648. else if (stricmp(argv[arg], "-f") == 0)
  649. {
  650. fromFile = true;
  651. ++arg;
  652. }
  653. else if (stricmp(argv[arg], "-ff") == 0)
  654. {
  655. fromMultiFile = true;
  656. ++arg;
  657. }
  658. else if (stricmp(argv[arg], "-tff") == 0)
  659. {
  660. fromMultiFile = true;
  661. timedReplay = true;
  662. ++arg;
  663. }
  664. else if (stricmp(argv[arg], "-k") == 0)
  665. {
  666. saveResults = false;
  667. ++arg;
  668. }
  669. else if (stricmp(argv[arg], "-m") == 0)
  670. {
  671. justResults = true;
  672. ++arg;
  673. }
  674. else if (stricmp(argv[arg], "-maxlinesize") == 0)
  675. {
  676. ++arg;
  677. if (arg>=argc)
  678. usage(1);
  679. maxLineSize = atoi(argv[arg]);
  680. ++arg;
  681. }
  682. else if (stricmp(argv[arg], "-n") == 0)
  683. {
  684. manyResults = true;
  685. ++arg;
  686. }
  687. else if (stricmp(argv[arg], "-o") == 0)
  688. {
  689. outputName.set(argv[arg+1]);
  690. arg+=2;
  691. }
  692. else if (stricmp(argv[arg], "-persist") == 0)
  693. {
  694. persistConnections = true;
  695. ++arg;
  696. }
  697. else if (stricmp(argv[arg], "-pr") == 0)
  698. {
  699. queryPrefix.append(argv[arg+1]);
  700. arg+=2;
  701. }
  702. else if (stricmp(argv[arg], "-q") == 0)
  703. {
  704. echoSingle = false;
  705. ++arg;
  706. }
  707. else if (stricmp(argv[arg], "-qname") == 0)
  708. {
  709. queryNameOverride.set(argv[arg+1]);
  710. arg+=2;
  711. }
  712. else if (stricmp(argv[arg], "-r") == 0)
  713. {
  714. ++arg;
  715. if (arg>=argc)
  716. usage(1);
  717. repeats = atoi(argv[arg]);
  718. ++arg;
  719. }
  720. else if (stricmp(argv[arg], "-rl") == 0)
  721. {
  722. roxieLogMode = true;
  723. fromMultiFile = true;
  724. ++arg;
  725. }
  726. else if (stricmp(argv[arg], "-ss") == 0)
  727. {
  728. showStatus = false;
  729. ++arg;
  730. }
  731. else if (memicmp(argv[arg], "-u", 2) == 0)
  732. {
  733. multiThread = true;
  734. multiThreadMax = atoi(argv[arg]+2);
  735. if (multiThreadMax)
  736. okToSend.signal(multiThreadMax);
  737. ++arg;
  738. }
  739. else if (stricmp(argv[arg], "-lock") == 0)
  740. {
  741. doLock = true;
  742. ++arg;
  743. }
  744. else if (memicmp(argv[arg], "-z", 2) == 0)
  745. {
  746. sendFileAfterQuery = true;
  747. sendFileName.append(argv[arg+1]);
  748. OwnedIFile f = createIFile(sendFileName.str());
  749. if (!f->exists() || !f->isFile())
  750. {
  751. printf("file %s does not exist\n", sendFileName.str());
  752. exit (EXIT_FAILURE);
  753. }
  754. arg+=2;
  755. }
  756. else
  757. {
  758. printf("Unknown argument %s, ignored\n", argv[arg]);
  759. ++arg;
  760. }
  761. }
  762. if (persistConnections && multiThread)
  763. {
  764. printf("Multi-thread (-u) not available with -persist - ignored\n");
  765. multiThread = false;
  766. }
  767. StringAttr ip;
  768. unsigned socketPort = ROXIE_SERVER_PORT;
  769. SplitIpPort(ip, socketPort, argv[1]);
  770. int ret = 0;
  771. trace = fopen(outputName, "w");
  772. __int64 starttime,endtime;
  773. starttime = get_cycles_now();
  774. if (arg < argc)
  775. {
  776. echoResults = echoSingle;
  777. do
  778. {
  779. const char * query = argv[arg];
  780. if (fromMultiFile)
  781. {
  782. FILE *in = fopen(query, "rt");
  783. if (in)
  784. {
  785. CDateTime firstTime;
  786. CDateTime startTime;
  787. bool firstLine = true;
  788. char *buffer = new char[maxLineSize];
  789. for (;;)
  790. {
  791. if (fgets(buffer, maxLineSize, in)==NULL) // buffer overflow possible - do I care?
  792. break;
  793. if (timedReplay)
  794. {
  795. if (firstLine)
  796. {
  797. firstTime.setNow();
  798. firstTime.setTimeString(buffer, &query);
  799. startTime.setNow();
  800. }
  801. else
  802. {
  803. CDateTime queryTime, nowTime;
  804. queryTime.setNow();
  805. queryTime.setTimeString(buffer, &query);
  806. nowTime.setNow();
  807. int sleeptime = (queryTime.getSimple()-firstTime.getSimple()) - (nowTime.getSimple()-startTime.getSimple());
  808. if (sleeptime < 0)
  809. DBGLOG("Running behind %d seconds", -sleeptime);
  810. else if (sleeptime)
  811. {
  812. DBGLOG("Sleeping %d seconds", sleeptime);
  813. Sleep(sleeptime*1000);
  814. }
  815. StringBuffer targetTime;
  816. queryTime.getTimeString(targetTime);
  817. DBGLOG("Virtual time is %s", targetTime.str());
  818. }
  819. }
  820. else
  821. {
  822. query = buffer;
  823. while (isspace(*query)) query++;
  824. if (roxieLogMode)
  825. {
  826. char *start = (char *) strchr(query, '<');
  827. if (start)
  828. {
  829. char *end = (char *) strchr(start, '"');
  830. if (end && end[1]=='\n')
  831. {
  832. query = start;
  833. *end = 0;
  834. }
  835. }
  836. }
  837. }
  838. if (query)
  839. {
  840. ret = sendQuery(ip, socketPort, query);
  841. firstLine = false;
  842. }
  843. }
  844. delete [] buffer;
  845. fclose(in);
  846. }
  847. else
  848. printf("File %s could not be opened\n", query);
  849. }
  850. else if (fromFile)
  851. {
  852. FILE *in = fopen(query, "rt");
  853. if (in)
  854. {
  855. StringBuffer fileContents;
  856. char buffer[1024];
  857. int bytes;
  858. for (;;)
  859. {
  860. bytes = fread(buffer, 1, sizeof(buffer), in);
  861. if (!bytes)
  862. break;
  863. fileContents.append(buffer, 0, bytes);
  864. }
  865. fclose(in);
  866. ret = sendQuery(ip, socketPort, fileContents.toCharArray());
  867. }
  868. else
  869. printf("File %s could not be opened\n", query);
  870. }
  871. else
  872. {
  873. ret = sendQuery(ip, socketPort, query);
  874. if (sendToSocket)
  875. finishedReading.wait();
  876. }
  877. } while (--repeats > 0);
  878. }
  879. while (runningQueries--)
  880. done.wait();
  881. if (persistConnections && persistSocket)
  882. {
  883. int sendlen=0;
  884. persistSocket->write(&sendlen, sizeof(sendlen));
  885. persistSocket->close();
  886. persistSocket->Release();
  887. }
  888. endtime = get_cycles_now();
  889. if (!justResults)
  890. {
  891. fprintf(trace, "Total Time taken = %.3f msecs\n", (double)(cycle_to_nanosec(endtime - starttime)/1000000));
  892. fputs("----------------------------------------------------------------------------\n", trace);
  893. }
  894. fclose(trace);
  895. #ifdef _DEBUG
  896. releaseAtoms();
  897. #endif
  898. return ret;
  899. }