testsocket.cpp 38 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <platform.h>
  14. #include <stdio.h>
  15. #include "jmisc.hpp"
  16. #include "jlib.hpp"
  17. #include "jsocket.hpp"
  18. #include "jstream.ipp"
  19. #include "portlist.h"
  20. #include "jdebug.hpp"
  21. #include "jthread.hpp"
  22. #include "jfile.hpp"
  23. #include "securesocket.hpp"
  24. #include "rmtclient.hpp"
  25. bool abortEarly = false;
  26. bool forceHTTP = false;
  27. bool useSSL = false;
  28. bool abortAfterFirst = false;
  29. bool echoResults = false;
  30. bool saveResults = true;
  31. bool showTiming = false;
  32. bool showStatus = true;
  33. bool sendToSocket = false;
  34. bool parallelBlocked = false;
  35. bool justResults = false;
  36. bool multiThread = false;
  37. bool manyResults = false;
  38. bool sendFileAfterQuery = false;
  39. bool doLock = false;
  40. bool roxieLogMode = false;
  41. bool rawOnly = false;
  42. bool rawSend = false;
  43. bool remoteStreamQuery = false;
  44. bool remoteStreamForceResend = false;
  45. bool remoteStreamSendCursor = false;
  46. int verboseDbgLevel = 0;
  47. StringBuffer sendFileName;
  48. StringAttr queryNameOverride;
  49. unsigned delay = 0;
  50. unsigned runningQueries;
  51. unsigned multiThreadMax;
  52. unsigned maxLineSize = 10000000;
  53. Owned<ISocket> persistSocket;
  54. bool persistConnections = false;
  55. Owned<ISecureSocketContext> persistSecureContext;
  56. Owned<ISecureSocket> persistSSock;
  57. int repeats = 0;
  58. StringBuffer queryPrefix;
  59. Semaphore okToSend;
  60. Semaphore done;
  61. Semaphore finishedReading;
  62. FILE * trace;
  63. CriticalSection traceCrit;
  64. unsigned queryDelayMS = 0;
  65. unsigned totalQueryCnt = 0;
  66. double totalQueryMS = 0.0;
  67. //---------------------------------------------------------------------------
  68. void SplitIpPort(StringAttr & ip, unsigned & port, const char * address)
  69. {
  70. const char * colon = strchr(address, ':');
  71. if (colon)
  72. {
  73. ip.set(address,colon-address);
  74. port = atoi(colon+1);
  75. }
  76. else
  77. ip.set(address);
  78. }
  79. void showMessage(const char * text)
  80. {
  81. if (!justResults)
  82. {
  83. if (echoResults)
  84. fwrite(text, strlen(text), 1, stdout);
  85. if (saveResults && trace != NULL)
  86. fwrite(text, strlen(text), 1, trace);
  87. }
  88. }
  89. void sendFile(const char * filename, ISocket * socket)
  90. {
  91. FILE *in = fopen(filename, "rb");
  92. unsigned size = 0;
  93. void * buff = NULL;
  94. if (in)
  95. {
  96. fseek(in, 0, SEEK_END);
  97. size = ftell(in);
  98. fseek(in, 0, SEEK_SET);
  99. buff = malloc(size);
  100. size_t numRead = fread(buff, 1, size, in);
  101. fclose(in);
  102. if (numRead != size)
  103. {
  104. printf("read from file %s failed (%u/%u)\n", filename, (unsigned)numRead, size);
  105. size = 0;
  106. }
  107. }
  108. else
  109. printf("read from file %s failed\n", filename);
  110. unsigned dllLen = size;
  111. _WINREV(dllLen);
  112. socket->write(&dllLen, sizeof(dllLen));
  113. socket->write(buff, size);
  114. free(buff);
  115. }
  116. #define CHUNK_SIZE 152*2000
  117. void sendFileChunk(const char * filename, offset_t offset, ISocket * socket)
  118. {
  119. FILE *in = fopen(filename, "rb");
  120. unsigned size = 0;
  121. void * buff = NULL;
  122. if (in)
  123. {
  124. fseek(in, 0, SEEK_END);
  125. offset_t endOffset = ftell(in);
  126. fseek(in, offset, SEEK_SET);
  127. if (endOffset < offset)
  128. size = 0;
  129. else
  130. size = (unsigned)(endOffset - offset);
  131. if (size > CHUNK_SIZE)
  132. size = CHUNK_SIZE;
  133. buff = malloc(size);
  134. size_t numRead = fread(buff, 1, size, in);
  135. fclose(in);
  136. if (numRead != size)
  137. {
  138. printf("read from file %s failed (%u/%u)\n", filename, (unsigned)numRead, size);
  139. size = 0;
  140. }
  141. }
  142. else
  143. printf("read from file %s failed\n", filename);
  144. if (size > 0)
  145. {
  146. MemoryBuffer sendBuffer;
  147. unsigned rev = size + strlen(filename) + 10;
  148. rev |= 0x80000000;
  149. _WINREV(rev);
  150. sendBuffer.append(rev);
  151. sendBuffer.append('R');
  152. rev = 0; // should put the sequence number here
  153. _WINREV(rev);
  154. sendBuffer.append(rev);
  155. rev = 0; // should put the # of recs in msg here
  156. _WINREV(rev);
  157. sendBuffer.append(rev);
  158. sendBuffer.append(strlen(filename)+1, filename);
  159. sendBuffer.append(size, buff);
  160. socket->write(sendBuffer.toByteArray(), sendBuffer.length());
  161. }
  162. else
  163. {
  164. unsigned zeroLen = 0;
  165. socket->write(&zeroLen, sizeof(zeroLen));
  166. }
  167. free(buff);
  168. }
  169. int readResults(ISocket * socket, bool readBlocked, bool useHTTP, StringBuffer &result, const char *query, size32_t queryLen)
  170. {
  171. if (readBlocked)
  172. socket->set_block_mode(BF_SYNC_TRANSFER_PULL,0,60*1000);
  173. StringBuffer remoteReadCursor;
  174. unsigned len;
  175. bool is_status;
  176. bool isBlockedResult;
  177. for (;;)
  178. {
  179. if (delay)
  180. MilliSleep(delay);
  181. is_status = false;
  182. isBlockedResult = false;
  183. try
  184. {
  185. if (useHTTP)
  186. len = 0x10000;
  187. else if (readBlocked)
  188. len = socket->receive_block_size();
  189. else
  190. {
  191. socket->read(&len, sizeof(len));
  192. _WINREV(len);
  193. }
  194. }
  195. catch(IException * e)
  196. {
  197. if (manyResults)
  198. showMessage("End of result multiple set\n");
  199. else
  200. pexception("failed to read len data", e);
  201. e->Release();
  202. return 1;
  203. }
  204. if (len == 0)
  205. {
  206. if (manyResults)
  207. {
  208. showMessage("----End of result set----\n");
  209. continue;
  210. }
  211. break;
  212. }
  213. bool isSpecial = false;
  214. bool pluginRequest = false;
  215. bool dataBlockRequest = false;
  216. bool remoteReadRequest = false;
  217. if (len & 0x80000000)
  218. {
  219. unsigned char flag;
  220. isSpecial = true;
  221. socket->read(&flag, sizeof(flag));
  222. switch (flag)
  223. {
  224. case '-':
  225. if (echoResults)
  226. fputs("Error:", stdout);
  227. if (saveResults && trace != NULL)
  228. fputs("Error:", trace);
  229. break;
  230. case 'D':
  231. showMessage("request for datablock\n");
  232. dataBlockRequest = true;
  233. break;
  234. case 'P':
  235. showMessage("request for plugin\n");
  236. pluginRequest = true;
  237. break;
  238. case 'S':
  239. if (showStatus)
  240. showMessage("Status:");
  241. is_status=true;
  242. break;
  243. case 'T':
  244. showMessage("Timing:\n");
  245. break;
  246. case 'X':
  247. showMessage("---Compound query finished---\n");
  248. return 1;
  249. case 'R':
  250. isBlockedResult = true;
  251. break;
  252. case 'J':
  253. remoteReadRequest = true;
  254. break;
  255. }
  256. len &= 0x7FFFFFFF;
  257. len--; // flag already read
  258. }
  259. MemoryBuffer mb;
  260. mb.setEndian(__BIG_ENDIAN);
  261. char *mem = (char *)mb.reserveTruncate(len+1);
  262. char * t = mem;
  263. size32_t sendlen = len;
  264. t[len]=0;
  265. try
  266. {
  267. if (useHTTP)
  268. {
  269. try
  270. {
  271. socket->read(t, 0, len, sendlen);
  272. }
  273. catch (IException *E)
  274. {
  275. if (E->errorCode()!= JSOCKERR_graceful_close)
  276. throw;
  277. E->Release();
  278. break;
  279. }
  280. if (!sendlen)
  281. break;
  282. }
  283. else if (readBlocked)
  284. socket->receive_block(t, len);
  285. else
  286. socket->read(t, len);
  287. }
  288. catch(IException * e)
  289. {
  290. pexception("failed to read data", e);
  291. e->Release();
  292. return 1;
  293. }
  294. if (pluginRequest)
  295. {
  296. //Not very robust! A poor man's implementation for testing...
  297. StringBuffer dllname, libname;
  298. const char * dot = strchr(t, '.');
  299. dllname.append("\\edata\\bin\\debug\\").append(t);
  300. libname.append("\\edata\\bin\\debug\\").append(dot-t,t).append(".lib");
  301. sendFile(dllname.str(), socket);
  302. sendFile(libname.str(), socket);
  303. }
  304. else if (dataBlockRequest)
  305. {
  306. //Not very robust! A poor man's implementation for testing...
  307. offset_t offset;
  308. mb.read(offset);
  309. sendFileChunk((const char *)mb.readDirect(offset), offset, socket);
  310. }
  311. else if (remoteReadRequest)
  312. {
  313. auto cmd = queryRemoteStreamCmd();
  314. size32_t remoteStreamCmdSz = sizeof(cmd);
  315. size32_t jsonQueryLen = queryLen - remoteStreamCmdSz;
  316. const char *jsonQuery = query + remoteStreamCmdSz;
  317. Owned<IPropertyTree> requestTree = createPTreeFromJSONString(jsonQueryLen, jsonQuery);
  318. Owned<IPropertyTree> responseTree; // used if response is xml or json
  319. const char *outputFmtStr = requestTree->queryProp("format");
  320. const char *response = nullptr;
  321. if (!outputFmtStr || strieq("xml", outputFmtStr))
  322. {
  323. response = (const char *)mb.readDirect(len);
  324. responseTree.setown(createPTreeFromXMLString(len, response));
  325. assertex(responseTree);
  326. }
  327. else if (strieq("json", outputFmtStr))
  328. {
  329. response = (const char *)mb.readDirect(len);
  330. // JCSMORE - json string coming back from IXmlWriterExt is always rootless the moment, so workaround it by supplying ptr_noRoot to reader
  331. // writer should be fixed.
  332. Owned<IPropertyTree> tree = createPTreeFromJSONString(len, response, ipt_none, (PTreeReaderOptions)(ptr_ignoreWhiteSpace|ptr_noRoot));
  333. responseTree.setown(tree->getPropTree("Response"));
  334. assertex(responseTree);
  335. }
  336. else if (!strieq("binary", outputFmtStr))
  337. throw MakeStringException(0, "Unknown output format: %s", outputFmtStr);
  338. unsigned cursorHandle;
  339. if (responseTree)
  340. cursorHandle = responseTree->getPropInt("handle");
  341. else
  342. mb.read(cursorHandle);
  343. bool retrySend = false;
  344. if (cursorHandle)
  345. {
  346. PROGLOG("Got handle back: %u; len=%u", cursorHandle, len);
  347. StringBuffer xml;
  348. if (responseTree)
  349. {
  350. if (echoResults && response)
  351. {
  352. fputs(response, stdout);
  353. fflush(stdout);
  354. }
  355. if (!responseTree->getProp("cursorBin", remoteReadCursor))
  356. break;
  357. }
  358. else
  359. {
  360. size32_t dataLen;
  361. mb.read(dataLen);
  362. if (!dataLen)
  363. break;
  364. const void *rowData = mb.readDirect(dataLen);
  365. // JCSMORE - output binary row data?
  366. // cursor
  367. size32_t cursorLen;
  368. mb.read(cursorLen);
  369. if (!cursorLen)
  370. break;
  371. const void *cursor = mb.readDirect(cursorLen);
  372. JBASE64_Encode(cursor, cursorLen, remoteReadCursor, true);
  373. }
  374. if (remoteStreamForceResend)
  375. cursorHandle = NotFound; // fake that it's a handle dafilesrv doesn't know about
  376. Owned<IPropertyTree> requestTree = createPTree();
  377. requestTree->setPropInt("handle", cursorHandle);
  378. // Only the handle is needed for continuation, but this tests the behaviour of some clients which may send cursor per request (e.g. to refresh)
  379. if (remoteStreamSendCursor)
  380. requestTree->setProp("cursorBin", remoteReadCursor);
  381. requestTree->setProp("format", outputFmtStr);
  382. StringBuffer requestStr;
  383. requestStr.append(queryRemoteStreamCmd());
  384. toJSON(requestTree, requestStr);
  385. if (verboseDbgLevel > 0)
  386. {
  387. fputs("\nNext request:", stdout);
  388. fputs(requestStr.str()+remoteStreamCmdSz, stdout);
  389. fputs("\n", stdout);
  390. fflush(stdout);
  391. }
  392. sendlen = requestStr.length();
  393. _WINREV(sendlen);
  394. try
  395. {
  396. if (!rawSend && !useHTTP)
  397. socket->write(&sendlen, sizeof(sendlen));
  398. socket->write(requestStr.str(), requestStr.length());
  399. }
  400. catch (IJSOCK_Exception *e)
  401. {
  402. retrySend = true;
  403. EXCLOG(e, nullptr);
  404. e->Release();
  405. }
  406. }
  407. else // dafilesrv didn't know who I was, resent query + serialized cursor
  408. retrySend = true;
  409. if (retrySend)
  410. {
  411. PROGLOG("Retry send for handle: %u", cursorHandle);
  412. requestTree->setProp("cursorBin", remoteReadCursor);
  413. StringBuffer requestStr;
  414. requestStr.append(queryRemoteStreamCmd());
  415. toJSON(requestTree, requestStr);
  416. PROGLOG("requestStr = %s", requestStr.str()+remoteStreamCmdSz);
  417. sendlen = requestStr.length();
  418. _WINREV(sendlen);
  419. if (!rawSend && !useHTTP)
  420. socket->write(&sendlen, sizeof(sendlen));
  421. socket->write(requestStr.str(), requestStr.length());
  422. }
  423. }
  424. else
  425. {
  426. if (isBlockedResult)
  427. {
  428. t += 8;
  429. t += strlen(t)+1;
  430. sendlen -= (t - mem);
  431. }
  432. if (echoResults && (!is_status || showStatus))
  433. {
  434. fwrite(t, sendlen, 1, stdout);
  435. fflush(stdout);
  436. }
  437. if (!is_status)
  438. result.append(sendlen, t);
  439. }
  440. if (abortAfterFirst)
  441. return 0;
  442. }
  443. return 0;
  444. }
  445. class ReceiveThread : public Thread
  446. {
  447. public:
  448. virtual int run();
  449. };
  450. int ReceiveThread::run()
  451. {
  452. ISocket * socket = ISocket::create(3456);
  453. ISocket * client = socket->accept();
  454. StringBuffer result;
  455. readResults(client, parallelBlocked, false, result, nullptr, 0);
  456. client->Release();
  457. socket->Release();
  458. finishedReading.signal();
  459. return 0;
  460. }
  461. //------------------------------------------------------------------------
  462. /**
  463. * Return: 0 - success
  464. * nonzero - error
  465. */
  466. int doSendQuery(const char * ip, unsigned port, const char * base)
  467. {
  468. Owned<ISocket> socket;
  469. Owned<ISecureSocketContext> secureContext;
  470. __int64 starttime, endtime;
  471. StringBuffer ipstr;
  472. CTimeMon tm;
  473. if (queryDelayMS)
  474. tm.reset(queryDelayMS);
  475. try
  476. {
  477. if (strcmp(ip, ".")==0)
  478. ip = GetCachedHostName();
  479. else
  480. {
  481. const char *dash = strchr(ip, '-');
  482. if (dash && isdigit(dash[1]) && dash>ip && isdigit(dash[-1]))
  483. {
  484. if (persistConnections)
  485. UNIMPLEMENTED;
  486. const char *startrange = dash-1;
  487. while (isdigit(startrange[-1]))
  488. startrange--;
  489. char *endptr;
  490. unsigned firstnum = atoi(startrange);
  491. unsigned lastnum = strtol(dash+1, &endptr, 10);
  492. if (lastnum > firstnum)
  493. {
  494. static unsigned counter;
  495. static CriticalSection counterCrit;
  496. CriticalBlock b(counterCrit);
  497. ipstr.append(startrange - ip, ip).append((counter++ % (lastnum+1-firstnum)) + firstnum).append(endptr);
  498. ip = ipstr.str();
  499. printf("Sending to %s\n", ip);
  500. }
  501. }
  502. }
  503. starttime= get_cycles_now();
  504. if (persistConnections)
  505. {
  506. if (!persistSocket)
  507. {
  508. SocketEndpoint ep(ip,port);
  509. persistSocket.setown(ISocket::connect_timeout(ep, 1000));
  510. if (useSSL)
  511. {
  512. #ifdef _USE_OPENSSL
  513. if (!persistSecureContext)
  514. persistSecureContext.setown(createSecureSocketContext(ClientSocket));
  515. persistSSock.setown(persistSecureContext->createSecureSocket(persistSocket.getClear()));
  516. int res = persistSSock->secure_connect();
  517. if (res < 0)
  518. throw MakeStringException(-1, "doSendQuery : Failed to establish secure connection");
  519. persistSocket.setown(persistSSock.getClear());
  520. #else
  521. throw MakeStringException(-1, "OpenSSL disabled in build");
  522. #endif
  523. }
  524. }
  525. socket = persistSocket;
  526. }
  527. else
  528. {
  529. SocketEndpoint ep(ip,port);
  530. socket.setown(ISocket::connect_timeout(ep, 100000));
  531. if (useSSL)
  532. {
  533. #ifdef _USE_OPENSSL
  534. secureContext.setown(createSecureSocketContext(ClientSocket));
  535. Owned<ISecureSocket> ssock = secureContext->createSecureSocket(socket.getClear());
  536. int res = ssock->secure_connect();
  537. if (res < 0)
  538. throw MakeStringException(-1, "doSendQuery : Failed to establish secure connection");
  539. socket.setown(ssock.getClear());
  540. #else
  541. throw MakeStringException(1, "OpenSSL disabled in build");
  542. #endif
  543. }
  544. }
  545. }
  546. catch(IException * e)
  547. {
  548. pexception("failed to connect to server", e);
  549. return 1;
  550. }
  551. StringBuffer fullQuery;
  552. bool useHTTP = forceHTTP || strstr(base, "<soap:Envelope") != NULL;
  553. if (useHTTP)
  554. {
  555. StringBuffer newQuery;
  556. Owned<IPTree> p = createPTreeFromXMLString(base, ipt_none, ptr_none);
  557. const char *queryName = p->queryName();
  558. if ((stricmp(queryName, "envelope") != 0) && (stricmp(queryName, "envelope") != 0))
  559. {
  560. if (queryNameOverride.length())
  561. queryName = queryNameOverride;
  562. newQuery.appendf("<Envelope xmlns:soap=\"http://schemas.xmlsoap.org/soap/envelope/\"><Body><%sRequest>", queryName);
  563. Owned<IPTreeIterator> elements = p->getElements("./*");
  564. ForEach(*elements)
  565. {
  566. IPTree &elem = elements->query();
  567. toXML(&elem, newQuery, 0, XML_SingleQuoteAttributeValues);
  568. }
  569. newQuery.appendf("</%sRequest></Body></Envelope>", queryName);
  570. base = newQuery.str();
  571. }
  572. // note - don't support queryname override unless original query is xml
  573. fullQuery.appendf("POST /doc HTTP/1.0\r\nContent-Type: application/x-www-form-urlencoded\r\nContent-Length: %d\r\n\r\n", (int) strlen(base)).append(base);
  574. }
  575. else
  576. {
  577. if (sendToSocket)
  578. {
  579. Thread * receive = new ReceiveThread();
  580. receive->start();
  581. receive->Release();
  582. }
  583. if (doLock)
  584. {
  585. const char *lock = "<control:lock/>";
  586. unsigned locklen = strlen(lock);
  587. _WINREV(locklen);
  588. socket->write(&locklen, sizeof(locklen));
  589. socket->write(lock, strlen(lock));
  590. StringBuffer lockResult;
  591. readResults(socket, false, false, lockResult, nullptr, 0);
  592. }
  593. if (queryNameOverride.length())
  594. {
  595. try
  596. {
  597. Owned<IPTree> p = createPTreeFromXMLString(base, ipt_none, ptr_none);
  598. p->renameProp("/", queryNameOverride);
  599. toXML(p, fullQuery.clear());
  600. }
  601. catch (IException *E)
  602. {
  603. StringBuffer s;
  604. printf("Error: %s", E->errorMessage(s).str());
  605. E->Release();
  606. return 1;
  607. }
  608. }
  609. else
  610. {
  611. if (remoteStreamQuery)
  612. fullQuery.append(queryRemoteStreamCmd());
  613. fullQuery.append(base);
  614. }
  615. }
  616. const char * query = fullQuery.str();
  617. size32_t queryLen=(size32_t)strlen(query);
  618. size32_t len = queryLen;
  619. size32_t sendlen = len;
  620. if (persistConnections)
  621. sendlen |= 0x80000000;
  622. _WINREV(sendlen);
  623. try
  624. {
  625. if (!rawSend && !useHTTP)
  626. socket->write(&sendlen, sizeof(sendlen));
  627. if (verboseDbgLevel > 0)
  628. {
  629. fprintf(stdout, "about to write %u <%s>\n", len, query);
  630. fflush(stdout);
  631. }
  632. socket->write(query, len);
  633. if (sendFileAfterQuery)
  634. {
  635. FILE *in = fopen(sendFileName.str(), "rb");
  636. if (in)
  637. {
  638. char buffer[1024];
  639. for (;;)
  640. {
  641. len = fread(buffer, 1, sizeof(buffer), in);
  642. sendlen = len;
  643. _WINREV(sendlen);
  644. socket->write(&sendlen, sizeof(sendlen));
  645. if (!len)
  646. break;
  647. socket->write(buffer, len);
  648. }
  649. fclose(in);
  650. }
  651. else
  652. printf("File %s could not be opened\n", sendFileName.str());
  653. }
  654. }
  655. catch(IException * e)
  656. {
  657. pexception("failed to write data", e);
  658. return 1;
  659. }
  660. if (abortEarly)
  661. return 0;
  662. // back-end does some processing.....
  663. StringBuffer result;
  664. int ret = readResults(socket, false, useHTTP, result, query, queryLen);
  665. if ((ret == 0) && !justResults)
  666. {
  667. endtime = get_cycles_now();
  668. CriticalBlock b(traceCrit);
  669. if (trace != NULL)
  670. {
  671. if (rawOnly == false)
  672. {
  673. fprintf(trace, "query: %s\n", query);
  674. if (saveResults)
  675. fprintf(trace, "result: %s\n", result.str());
  676. }
  677. else
  678. {
  679. fprintf(trace, "%s", result.str());
  680. }
  681. double queryTimeMS = (double)(cycle_to_nanosec(endtime - starttime))/1000000;
  682. totalQueryMS += queryTimeMS;
  683. totalQueryCnt++;
  684. if (showTiming && rawOnly == false)
  685. {
  686. fprintf(trace, "Time taken = %.3f msecs\n", queryTimeMS);
  687. fputs("----------------------------------------------------------------------------\n", trace);
  688. }
  689. }
  690. }
  691. if (!persistConnections)
  692. {
  693. socket->close();
  694. }
  695. if (queryDelayMS)
  696. {
  697. unsigned remaining;
  698. if (!tm.timedout(&remaining))
  699. Sleep(remaining);
  700. }
  701. return 0;
  702. }
  703. class QueryThread : public Thread
  704. {
  705. public:
  706. QueryThread(const char * _ip, unsigned _port, const char * _base) : ip(_ip),port(_port),base(_base) {}
  707. virtual int run() { doSendQuery(ip, port, base); done.signal(); okToSend.signal(); return 0; }
  708. protected:
  709. StringAttr ip;
  710. unsigned port;
  711. StringAttr base;
  712. };
  713. int sendQuery(const char * ip, unsigned port, const char * base)
  714. {
  715. if (!multiThread)
  716. return doSendQuery(ip, port, base);
  717. if (multiThreadMax)
  718. okToSend.wait();
  719. runningQueries++;
  720. Thread * thread = new QueryThread(ip, port, base);
  721. thread->start();
  722. thread->Release();
  723. return 0;
  724. }
  725. void usage(int exitCode)
  726. {
  727. printf("testsocket ip<:port> [flags] [query | -f[f] file.sql | -]\n");
  728. printf(" - take query from stdin\n");
  729. printf(" -a abort before input received\n");
  730. printf(" -a1 abort after first packet receieved\n");
  731. printf(" -c test sending response to a socket\n");
  732. printf(" -cb test sending response to a block mode socket\n");
  733. printf(" -d force delay after each packet\n");
  734. printf(" -f take query from file\n");
  735. printf(" -ff take multiple queries from file, one per line\n");
  736. printf(" -tff take multiple queries from file, one per line, preceded by the time at which it should be submitted (relative to time on first line)\n");
  737. printf(" -k don't save the results to result.txt\n");
  738. printf(" -m only save results to result.txt\n");
  739. printf(" -maxLineSize <n> set maximum query line length\n");
  740. printf(" -n multiple results - keep going until socket closes\n");
  741. printf(" -o set output filename\n");
  742. printf(" -or set output filename for raw output\n");
  743. printf(" -persist use persistent connection\n");
  744. printf(" -pr <text>add a prefix to the query\n");
  745. printf(" -q quiet - don't echo query\n");
  746. printf(" -qname xx Use xx as queryname in place of the xml root element name\n");
  747. printf(" -qd delay (ms) to possibly wait between start of query and start of next query\n");
  748. printf(" -r <n> repeat the query several times\n");
  749. printf(" -rl roxie logfile mode\n");
  750. printf(" -rs remote stream request\n");
  751. printf(" -rsr force remote stream resend per continuation request\n");
  752. printf(" -rssc send cursor per continuation request\n");
  753. printf(" -s add stars to indicate transfer packets\n");
  754. printf(" -ss suppress XML Status messages to screen (always suppressed from tracefile)\n");
  755. printf(" -ssl use ssl\n");
  756. printf(" -td add debug timing statistics to trace\n");
  757. printf(" -tf add full timing statistics to trace\n");
  758. printf(" -time add timing to trace\n");
  759. printf(" -u<max> run queries on separate threads\n");
  760. printf(" -v debug output\n");
  761. printf(" -cascade cascade query (to all roxie nodes)\n");
  762. printf(" -lock locked cascade query (to all roxie nodes)\n");
  763. printf(" -x raw send\n");
  764. exit(exitCode);
  765. }
  766. int main(int argc, char **argv)
  767. {
  768. InitModuleObjects();
  769. StringAttr outputName("result.txt");
  770. bool fromFile = false;
  771. bool fromStdIn = false;
  772. bool fromMultiFile = false;
  773. bool timedReplay = false;
  774. if (argc < 2 && !(argc==2 && strstr(argv[1], "::")))
  775. usage(1);
  776. int arg = 2;
  777. bool echoSingle = true;
  778. while (arg < argc && *argv[arg]=='-')
  779. {
  780. if (stricmp(argv[arg], "-time") == 0)
  781. {
  782. showTiming = true;
  783. ++arg;
  784. }
  785. else if (stricmp(argv[arg], "-a") == 0)
  786. {
  787. abortEarly = true;
  788. ++arg;
  789. }
  790. else if (stricmp(argv[arg], "-a1") == 0)
  791. {
  792. abortAfterFirst = true;
  793. ++arg;
  794. }
  795. else if (stricmp(argv[arg], "-c") == 0)
  796. {
  797. sendToSocket = true;
  798. ++arg;
  799. }
  800. else if (stricmp(argv[arg], "-cb") == 0)
  801. {
  802. sendToSocket = true;
  803. parallelBlocked = true;
  804. ++arg;
  805. }
  806. else if (stricmp(argv[arg], "-d") == 0)
  807. {
  808. delay = 300;
  809. ++arg;
  810. }
  811. else if (stricmp(argv[arg], "-http") == 0)
  812. {
  813. forceHTTP = true;
  814. ++arg;
  815. }
  816. else if (stricmp(argv[arg], "-ssl") == 0)
  817. {
  818. useSSL = true;
  819. ++arg;
  820. }
  821. else if (stricmp(argv[arg], "-") == 0)
  822. {
  823. fromStdIn = true;
  824. ++arg;
  825. }
  826. else if (stricmp(argv[arg], "-f") == 0)
  827. {
  828. fromFile = true;
  829. ++arg;
  830. }
  831. else if (stricmp(argv[arg], "-x") == 0)
  832. {
  833. rawSend = true;
  834. ++arg;
  835. }
  836. else if (stricmp(argv[arg], "-ff") == 0)
  837. {
  838. fromMultiFile = true;
  839. ++arg;
  840. }
  841. else if (stricmp(argv[arg], "-tff") == 0)
  842. {
  843. fromMultiFile = true;
  844. timedReplay = true;
  845. ++arg;
  846. }
  847. else if (stricmp(argv[arg], "-k") == 0)
  848. {
  849. saveResults = false;
  850. ++arg;
  851. }
  852. else if (stricmp(argv[arg], "-m") == 0)
  853. {
  854. justResults = true;
  855. ++arg;
  856. }
  857. else if (stricmp(argv[arg], "-maxlinesize") == 0)
  858. {
  859. ++arg;
  860. if (arg>=argc)
  861. usage(1);
  862. maxLineSize = atoi(argv[arg]);
  863. ++arg;
  864. }
  865. else if (stricmp(argv[arg], "-n") == 0)
  866. {
  867. manyResults = true;
  868. ++arg;
  869. }
  870. else if (stricmp(argv[arg], "-o") == 0)
  871. {
  872. outputName.set(argv[arg+1]);
  873. arg+=2;
  874. }
  875. else if (stricmp(argv[arg], "-or") == 0)
  876. {
  877. rawOnly = true;
  878. outputName.set(argv[arg+1]);
  879. arg+=2;
  880. }
  881. else if (stricmp(argv[arg], "-persist") == 0)
  882. {
  883. persistConnections = true;
  884. ++arg;
  885. }
  886. else if (stricmp(argv[arg], "-pr") == 0)
  887. {
  888. queryPrefix.append(argv[arg+1]);
  889. arg+=2;
  890. }
  891. else if (stricmp(argv[arg], "-q") == 0)
  892. {
  893. echoSingle = false;
  894. ++arg;
  895. }
  896. else if (stricmp(argv[arg], "-qd") == 0)
  897. {
  898. ++arg;
  899. if (arg>=argc)
  900. usage(1);
  901. queryDelayMS = atoi(argv[arg]);
  902. ++arg;
  903. }
  904. else if (stricmp(argv[arg], "-qname") == 0)
  905. {
  906. queryNameOverride.set(argv[arg+1]);
  907. arg+=2;
  908. }
  909. else if (stricmp(argv[arg], "-r") == 0)
  910. {
  911. ++arg;
  912. if (arg>=argc)
  913. usage(1);
  914. repeats = atoi(argv[arg]);
  915. ++arg;
  916. }
  917. else if (stricmp(argv[arg], "-rl") == 0)
  918. {
  919. roxieLogMode = true;
  920. fromMultiFile = true;
  921. ++arg;
  922. }
  923. else if (stricmp(argv[arg], "-ss") == 0)
  924. {
  925. showStatus = false;
  926. ++arg;
  927. }
  928. else if (stricmp(argv[arg], "-v") == 0)
  929. {
  930. verboseDbgLevel++;
  931. ++arg;
  932. }
  933. else if (memicmp(argv[arg], "-u", 2) == 0)
  934. {
  935. multiThread = true;
  936. multiThreadMax = atoi(argv[arg]+2);
  937. if (multiThreadMax)
  938. okToSend.signal(multiThreadMax);
  939. ++arg;
  940. }
  941. else if (stricmp(argv[arg], "-lock") == 0)
  942. {
  943. doLock = true;
  944. ++arg;
  945. }
  946. else if (memicmp(argv[arg], "-z", 2) == 0)
  947. {
  948. sendFileAfterQuery = true;
  949. sendFileName.append(argv[arg+1]);
  950. OwnedIFile f = createIFile(sendFileName.str());
  951. if (!f->exists() || f->isFile()==fileBool::foundNo)
  952. {
  953. printf("file %s does not exist\n", sendFileName.str());
  954. exit (EXIT_FAILURE);
  955. }
  956. arg+=2;
  957. }
  958. else if (strieq(argv[arg], "-rs"))
  959. {
  960. remoteStreamQuery = true;
  961. ++arg;
  962. }
  963. else if (strieq(argv[arg], "-rsr"))
  964. {
  965. remoteStreamForceResend = true;
  966. ++arg;
  967. }
  968. else if (strieq(argv[arg], "-rssc"))
  969. {
  970. remoteStreamSendCursor = true;
  971. ++arg;
  972. }
  973. else
  974. {
  975. printf("Unknown argument %s, ignored\n", argv[arg]);
  976. ++arg;
  977. }
  978. }
  979. if (persistConnections && multiThread)
  980. {
  981. printf("Multi-thread (-u) not available with -persist - ignored\n");
  982. multiThread = false;
  983. }
  984. StringAttr ip;
  985. unsigned socketPort = (useSSL) ? ROXIE_SSL_SERVER_PORT : ROXIE_SERVER_PORT;
  986. SplitIpPort(ip, socketPort, argv[1]);
  987. int ret = 0;
  988. trace = fopen(outputName, "w");
  989. if (trace == NULL)
  990. {
  991. printf("Can't open %s for writing\n", outputName.str());
  992. }
  993. __int64 starttime,endtime;
  994. starttime = get_cycles_now();
  995. if (arg < argc || fromStdIn)
  996. {
  997. echoResults = echoSingle;
  998. do
  999. {
  1000. const char * query = argv[arg];
  1001. if (fromMultiFile)
  1002. {
  1003. FILE *in = fopen(query, "rt");
  1004. if (in)
  1005. {
  1006. CDateTime firstTime;
  1007. CDateTime startTime;
  1008. bool firstLine = true;
  1009. char *buffer = new char[maxLineSize];
  1010. for (;;)
  1011. {
  1012. if (fgets(buffer, maxLineSize, in)==NULL) // buffer overflow possible - do I care?
  1013. break;
  1014. if (timedReplay)
  1015. {
  1016. if (firstLine)
  1017. {
  1018. firstTime.setNow();
  1019. firstTime.setTimeString(buffer, &query);
  1020. startTime.setNow();
  1021. }
  1022. else
  1023. {
  1024. CDateTime queryTime, nowTime;
  1025. queryTime.setNow();
  1026. queryTime.setTimeString(buffer, &query);
  1027. nowTime.setNow();
  1028. int sleeptime = (int)((queryTime.getSimple()-firstTime.getSimple()) - (nowTime.getSimple()-startTime.getSimple()));
  1029. if (sleeptime < 0)
  1030. DBGLOG("Running behind %d seconds", -sleeptime);
  1031. else if (sleeptime)
  1032. {
  1033. DBGLOG("Sleeping %d seconds", sleeptime);
  1034. Sleep(sleeptime*1000);
  1035. }
  1036. StringBuffer targetTime;
  1037. queryTime.getTimeString(targetTime);
  1038. DBGLOG("Virtual time is %s", targetTime.str());
  1039. }
  1040. }
  1041. else
  1042. {
  1043. query = buffer;
  1044. while (isspace(*query)) query++;
  1045. if (roxieLogMode)
  1046. {
  1047. char *start = (char *) strchr(query, '<');
  1048. if (start)
  1049. {
  1050. char *end = (char *) strchr(start, '"');
  1051. if (end && end[1]=='\n')
  1052. {
  1053. query = start;
  1054. *end = 0;
  1055. }
  1056. }
  1057. }
  1058. }
  1059. if (query)
  1060. {
  1061. ret = sendQuery(ip, socketPort, query);
  1062. firstLine = false;
  1063. }
  1064. }
  1065. delete [] buffer;
  1066. fclose(in);
  1067. }
  1068. else
  1069. printf("File %s could not be opened\n", query);
  1070. }
  1071. else if (fromFile || fromStdIn)
  1072. {
  1073. FILE *in = fromStdIn ? stdin : fopen(query, "rt");
  1074. if (in)
  1075. {
  1076. StringBuffer fileContents;
  1077. char buffer[1024];
  1078. int bytes;
  1079. for (;;)
  1080. {
  1081. bytes = fread(buffer, 1, sizeof(buffer), in);
  1082. if (!bytes)
  1083. break;
  1084. fileContents.append(buffer, 0, bytes);
  1085. }
  1086. if (in != stdin)
  1087. fclose(in);
  1088. ret = sendQuery(ip, socketPort, fileContents.str());
  1089. }
  1090. else
  1091. printf("File %s could not be opened\n", query);
  1092. }
  1093. else
  1094. {
  1095. ret = sendQuery(ip, socketPort, query);
  1096. if (sendToSocket)
  1097. finishedReading.wait();
  1098. }
  1099. } while (--repeats > 0);
  1100. }
  1101. else
  1102. usage(2);
  1103. while (runningQueries--)
  1104. done.wait();
  1105. if (persistConnections && persistSocket)
  1106. {
  1107. int sendlen=0;
  1108. persistSocket->write(&sendlen, sizeof(sendlen));
  1109. persistSocket->close();
  1110. }
  1111. endtime = get_cycles_now();
  1112. if (!justResults)
  1113. {
  1114. if (rawOnly == false)
  1115. {
  1116. if (trace != NULL)
  1117. {
  1118. fprintf(trace, "Total Time taken = %.3f msecs\n", (double)(cycle_to_nanosec(endtime - starttime))/1000000);
  1119. if (totalQueryCnt)
  1120. {
  1121. double timePerQueryMS = totalQueryMS / totalQueryCnt;
  1122. fprintf(trace, "Total Queries: %u Avg t/q = %.3f msecs\n", totalQueryCnt, timePerQueryMS);
  1123. }
  1124. fputs("----------------------------------------------------------------------------\n", trace);
  1125. }
  1126. }
  1127. }
  1128. if (trace != NULL)
  1129. {
  1130. fclose(trace);
  1131. }
  1132. #ifdef _DEBUG
  1133. releaseAtoms();
  1134. #endif
  1135. return ret;
  1136. }