tsorts1.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include <limits.h>
  15. #include "jlib.hpp"
  16. #include "mpbase.hpp"
  17. #include "mpcomm.hpp"
  18. #include "thorport.hpp"
  19. #include "tsorts.hpp"
  20. #include "thmem.hpp"
  21. #include "securesocket.hpp"
  22. #ifdef _DEBUG
  23. //#define TRACE_UNIQUE
  24. #define _FULL_TRACE
  25. //#define _FULLMPTRACE
  26. //#define TRACE_PARTITION
  27. //#define TRACE_PARTITION_OVERFLOW
  28. #endif
  29. // This contains the original global merge method
  30. class CSortMerge;
  31. typedef CopyReferenceArrayOf<CSortMerge> CSortMergeArray;
  32. class CMergeReadStream : public CSimpleInterface, public IRowStream
  33. {
  34. protected:
  35. IRowStream *stream;
  36. SocketEndpoint endpoint;
  37. void eos()
  38. {
  39. if (stream) {
  40. stream->Release();
  41. stream = NULL;
  42. }
  43. }
  44. public:
  45. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  46. CMergeReadStream(IThorRowInterfaces *rowif, unsigned streamno,SocketEndpoint &targetep, rowcount_t startrec, rowcount_t numrecs, unsigned sortTraceLevel=0, ISecureSocketContext *secureContextClient=nullptr)
  47. {
  48. endpoint = targetep;
  49. char url[100];
  50. targetep.getUrlStr(url,sizeof(url));
  51. LOG(MCthorDetailedDebugInfo, thorJob, "SORT Merge READ: Stream(%u) %s, pos=%" RCPF "d len=%" RCPF "u",streamno,url,startrec,numrecs);
  52. SocketEndpoint mergeep = targetep;
  53. mergeep.port+=SOCKETSERVERINC;
  54. Owned<ISocket> socket = ISocket::connect_wait(mergeep,CONNECTTIMEOUT*1000);
  55. #if defined(_USE_OPENSSL)
  56. if (secureContextClient)
  57. {
  58. Owned<ISecureSocket> ssock = secureContextClient->createSecureSocket(socket.getClear());
  59. int tlsTraceLevel = SSLogMin;
  60. if (sortTraceLevel >= ExtraneousMsgThreshold)
  61. tlsTraceLevel = SSLogMax;
  62. int status = ssock->secure_connect(tlsTraceLevel);
  63. if (status < 0)
  64. {
  65. ssock->close();
  66. VStringBuffer errmsg("Secure connect failed: %d", status);
  67. throw createJSocketException(JSOCKERR_connection_failed, errmsg);
  68. }
  69. socket.setown(ssock.getClear());
  70. }
  71. #endif // OPENSSL
  72. stream = ConnectMergeRead(streamno,rowif,mergeep,startrec,numrecs,socket);
  73. LOG(MCthorDetailedDebugInfo, thorJob, "SORT Merge READ: Stream(%u) connected to %s",streamno,url);
  74. }
  75. virtual ~CMergeReadStream()
  76. {
  77. if (stream) {
  78. char url[100];
  79. endpoint.getUrlStr(url,sizeof(url));
  80. DBGLOG("SORT Merge READ: EOS via destructor for %s",url);
  81. stream->stop();
  82. }
  83. eos();
  84. }
  85. const void *nextRow()
  86. {
  87. if (stream) {
  88. OwnedConstThorRow row = stream->nextRow();
  89. if (row)
  90. return row.getClear();
  91. #ifdef _FULL_TRACE
  92. char url[100];
  93. endpoint.getUrlStr(url,sizeof(url));
  94. LOG(MCthorDetailedDebugInfo, thorJob, "SORT Merge READ: EOS for %s",url);
  95. #endif
  96. eos();
  97. }
  98. return NULL;
  99. }
  100. virtual void stop()
  101. {
  102. if (stream) {
  103. #ifdef _FULL_TRACE
  104. char url[100];
  105. endpoint.getUrlStr(url,sizeof(url));
  106. LOG(MCthorDetailedDebugInfo, thorJob, "SORT Merge READ: stop for %s",url);
  107. #endif
  108. stream->stop();
  109. eos();
  110. }
  111. }
  112. };
  113. class CSortTransferServerThread;
  114. class CSortMerge: public CSimpleInterface, implements ISocketSelectNotify
  115. {
  116. Owned<IRowStream> iseq;
  117. StringBuffer url;
  118. ISortSlaveBase &src;
  119. Owned<ISocketRowWriter> out;
  120. rowcount_t poscount;
  121. rowcount_t numrecs;
  122. // unsigned pos;
  123. // unsigned endpos;
  124. unsigned ndone;
  125. bool started;
  126. CSortTransferServerThread *parent;
  127. bool done;
  128. bool closing;
  129. Semaphore donesem;
  130. Owned<IException> exception;
  131. ISocketSelectHandler *selecthandler;
  132. protected:
  133. Owned<ISocket> socket;
  134. CriticalSection crit;
  135. public:
  136. IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
  137. CSortMerge(CSortTransferServerThread *_parent,ISocket* _socket,ISocketRowWriter *_out,rowcount_t _poscount,rowcount_t _numrecs,ISocketSelectHandler *_selecthandler);
  138. ~CSortMerge()
  139. {
  140. #ifdef _FULL_TRACE
  141. LOG(MCthorDetailedDebugInfo, thorJob, "~CSortMerge in");
  142. #endif
  143. if (started)
  144. closedown();
  145. #ifdef _FULL_TRACE
  146. LOG(MCthorDetailedDebugInfo, thorJob, "~CSortMerge out");
  147. #endif
  148. }
  149. void init()
  150. {
  151. CriticalBlock block(crit);
  152. started = true;
  153. char name[64];
  154. int port = socket->peer_name(name,sizeof(name));
  155. url.append(name).append(':').append(port);
  156. LOG(MCthorDetailedDebugInfo, thorJob, "SORT Merge WRITE: start %s, pos=%" RCPF "d, len=%" RCPF "d",url.str(),poscount,numrecs);
  157. rowcount_t pos=poscount;
  158. try
  159. {
  160. iseq.setown(src.createMergeInputStream(pos,numrecs));
  161. }
  162. catch (IException *e)
  163. {
  164. PrintExceptionLog(e,"**Exception(4a)");
  165. throw;
  166. }
  167. }
  168. void closedown();
  169. bool processRows()
  170. {
  171. // NB sends a single buffer
  172. CriticalBlock block(crit);
  173. bool sent = false;
  174. try {
  175. if (!socket)
  176. return false;
  177. if (!started)
  178. init();
  179. for (;;) {
  180. OwnedConstThorRow row = iseq->nextRow();
  181. if (!row) {
  182. if (sent)
  183. out->flush();
  184. break;
  185. }
  186. out->putRow(row.getClear());
  187. ndone ++;
  188. sent = true;
  189. if (out->bufferSent())
  190. return true;
  191. }
  192. }
  193. catch (IException *e) {
  194. PrintExceptionLog(e,"CSortMergeBase processRows");
  195. throw;
  196. }
  197. return sent;
  198. }
  199. void waitdone()
  200. {
  201. char peer[16];
  202. if (socket) {
  203. socket->peer_name(peer,sizeof(peer)-1);
  204. LOG(MCthorDetailedDebugInfo, thorJob, "waitdone %s",peer);
  205. }
  206. else
  207. peer[0] = 0;
  208. while (!done)
  209. donesem.wait();
  210. if (exception)
  211. throw exception.getClear();
  212. if (peer[0])
  213. LOG(MCthorDetailedDebugInfo, thorJob, "waitdone exit");
  214. }
  215. bool notifySelected(ISocket *sock,unsigned selected)
  216. {
  217. while (!done) {
  218. try {
  219. if (closing) {
  220. closing = false;
  221. #ifdef _FULL_TRACE
  222. LOG(MCthorDetailedDebugInfo, thorJob, "notifySelected calling closedown");
  223. #endif
  224. closedown();
  225. #ifdef _FULL_TRACE
  226. LOG(MCthorDetailedDebugInfo, thorJob, "notifySelected called closedown");
  227. #endif
  228. done = true;
  229. donesem.signal();
  230. return false;
  231. }
  232. }
  233. catch (IException *e) {
  234. EXCLOG(e,"CSortMerge notifySelected.1");
  235. exception.setown(e);
  236. done = true;
  237. donesem.signal();
  238. return false;
  239. }
  240. try {
  241. if (processRows())
  242. return false; // false correct here
  243. }
  244. catch (IException *e) {
  245. EXCLOG(e,"CSortMerge notifySelected.2");
  246. exception.setown(e);
  247. }
  248. closing = true;
  249. CriticalBlock block(crit);
  250. if (!sock||!socket)
  251. break;
  252. if (sock->avail_read()==0)
  253. break;
  254. }
  255. return false; // false correct here
  256. }
  257. };
  258. class CSortTransferServerThread: protected Thread, implements IMergeTransferServer
  259. {
  260. protected: friend class CSortMerge;
  261. ISortSlaveBase &slave;
  262. bool term;
  263. Owned<ISocket> server;
  264. CriticalSection childsect;
  265. CSortMergeArray children;
  266. Owned<ISocketSelectHandler> selecthandler;
  267. Linked<IThorRowInterfaces> rowif;
  268. CriticalSection rowifsect;
  269. Semaphore rowifsem;
  270. Owned<ISecureSocketContext> secureContextServer;
  271. Owned<ISecureSocketContext> secureContextClients;
  272. public:
  273. IMPLEMENT_IINTERFACE_USING(Thread)
  274. void start()
  275. {
  276. Thread::start();
  277. }
  278. CSortTransferServerThread(ISortSlaveBase &in)
  279. : slave(in), Thread("SortTransferServer")
  280. {
  281. unsigned port = in.getTransferPort();
  282. server.setown(ISocket::create(port));
  283. term = false;
  284. #if defined(_USE_OPENSSL)
  285. if (slave.queryTLS())
  286. {
  287. secureContextServer.setown(createSecureSocketContextSecretSrv("local"));
  288. secureContextClients.setown(createSecureSocketContextSecret("local", ClientSocket));
  289. }
  290. #endif
  291. }
  292. void setRowIF(IThorRowInterfaces *_rowif)
  293. {
  294. // bit of a kludge
  295. CriticalBlock block(rowifsect);
  296. rowif.set(_rowif);
  297. rowifsem.signal();
  298. }
  299. void waitRowIF()
  300. {
  301. // bit of a kludge
  302. CriticalBlock block(rowifsect);
  303. while (!rowif&&!term) {
  304. PROGLOG("CSortTransferServerThread waiting for row interface");
  305. CriticalUnblock unblock(rowifsect);
  306. rowifsem.wait(60*1000);
  307. }
  308. }
  309. void stop()
  310. {
  311. DBGLOG("CSortTransferServerThread::stop");
  312. term = true;
  313. try {
  314. server->cancel_accept();
  315. }
  316. catch (IJSOCK_Exception *e) {
  317. PrintExceptionLog(e,"CSortTransferServerThread:stop");
  318. }
  319. verifyex(join(10*60*1000));
  320. DBGLOG("CSortTransferServerThread::stopped");
  321. }
  322. int run()
  323. {
  324. DBGLOG("CSortTransferServerThread started port %d",slave.getTransferPort());
  325. unsigned numretries = 10;
  326. try {
  327. while (!term)
  328. {
  329. Owned<ISocket> socket = server->accept(true);
  330. if (!socket)
  331. break;
  332. #if defined(_USE_OPENSSL)
  333. if (slave.queryTLS())
  334. {
  335. Owned<ISecureSocket> ssock = secureContextServer->createSecureSocket(socket.getClear());
  336. int tlsTraceLevel = SSLogMin;
  337. unsigned sortTraceLevel = slave.queryTraceLevel();
  338. if (sortTraceLevel >= ExtraneousMsgThreshold)
  339. tlsTraceLevel = SSLogMax;
  340. int status = ssock->secure_accept(tlsTraceLevel);
  341. if (status < 0)
  342. {
  343. ssock->close();
  344. VStringBuffer errmsg("Secure accept failed: %d", status);
  345. throw createJSocketException(JSOCKERR_connection_failed, errmsg);
  346. }
  347. socket.setown(ssock.getClear());
  348. }
  349. #endif // OPENSSL
  350. rowcount_t poscount=0;
  351. rowcount_t numrecs=0;
  352. ISocketRowWriter *strm=NULL;
  353. try
  354. {
  355. waitRowIF();
  356. strm = ConnectMergeWrite(rowif,socket,0x100000,poscount,numrecs);
  357. }
  358. catch (IJSOCK_Exception *e) // retry if failed
  359. {
  360. PrintExceptionLog(e, "WARNING: Exception(ConnectMergeWrite)");
  361. if (--numretries==0)
  362. throw;
  363. e->Release();
  364. continue;
  365. }
  366. catch (IException *e) // only retry if serialization check failed, indicating possible foreign client connect
  367. {
  368. PrintExceptionLog(e, "WARNING: Exception(ConnectMergeWrite)");
  369. if (TE_InvalidSortConnect != e->errorCode() || (--numretries==0))
  370. throw;
  371. e->Release();
  372. continue;
  373. }
  374. CriticalBlock block(childsect);
  375. add(strm, socket.getClear(), poscount, numrecs);
  376. }
  377. }
  378. catch (IJSOCK_Exception *e)
  379. {
  380. if (e->errorCode()!=JSOCKERR_cancel_accept) {
  381. PrintExceptionLog(e,"CSortTransferServerThread Exception");
  382. // Ignore for now
  383. }
  384. e->Release();
  385. }
  386. shutdownAndCloseNoThrow(server);
  387. subjoin();
  388. DBGLOG("CSortTransferServerThread finished");
  389. return 0;
  390. }
  391. void subjoin()
  392. {
  393. CriticalBlock proc(childsect);
  394. ForEachItemIn(i,children)
  395. {
  396. CSortMerge &c = children.item(i);
  397. CriticalUnblock unblock(childsect);
  398. c.waitdone();
  399. }
  400. if (selecthandler) {
  401. selecthandler->stop(true);
  402. selecthandler.clear();
  403. }
  404. ForEachItemIn(i2,children)
  405. {
  406. CSortMerge &c = children.item(i2);
  407. c.Release();
  408. }
  409. children.kill();
  410. rowif.clear();
  411. }
  412. void add(ISocketRowWriter *strm,ISocket *socket,rowcount_t poscount,rowcount_t numrecs) // takes ownership of sock
  413. {
  414. CriticalBlock proc(childsect);
  415. if (!selecthandler) {
  416. selecthandler.setown(createSocketSelectHandler("SORT"));
  417. selecthandler->start();
  418. }
  419. CSortMerge *sub = new CSortMerge(this,socket,strm,poscount,numrecs,selecthandler); // NB: takes ownership of 'socket'
  420. children.append(*sub);
  421. selecthandler->add(socket,SELECTMODE_READ,sub);
  422. }
  423. void remove(ISocket *socket)
  424. {
  425. CriticalBlock proc(childsect);
  426. if (selecthandler)
  427. selecthandler->remove(socket);
  428. }
  429. rowcount_t merge(unsigned mapsize,rowcount_t *map,rowcount_t *mapupper,
  430. unsigned numnodes,SocketEndpoint* endpoints,
  431. unsigned partno)
  432. {
  433. // map format is an array numnodes*numnodes
  434. // with columns corresponding to split pos (final column is size)
  435. // and rows indicating node number
  436. waitRowIF();
  437. #define vMAPL(node,col) (((int)col>=0)?map[node*numnodes+col]:0)
  438. #define vMAPU(node,col) (mapupper?(((int)col>=0)?mapupper[node*numnodes+col]:0):vMAPL(node,col))
  439. assertex(mapsize==numnodes*numnodes);
  440. unsigned i;
  441. unsigned j;
  442. #ifdef TRACE_PARTITION
  443. unsigned k = 0;
  444. for (i=0;i<numnodes;i++) {
  445. char url[100];
  446. endpoints[i].getUrlStr(url,sizeof(url));
  447. DBGLOG(" %s",url);
  448. for (j=0;j<numnodes;j++) {
  449. DBGLOG(" %u,",map[k]);
  450. k++;
  451. }
  452. }
  453. #endif
  454. rowcount_t resnum=0;
  455. for (i=0;i<numnodes;i++)
  456. resnum += vMAPU(i,partno)-vMAPL(i,partno-1);
  457. // calculate start position
  458. unsigned __int64 respos=0;
  459. for(i=0;i<partno;i++)
  460. for (j=0;j<numnodes;j++)
  461. respos += vMAPL(j,i)-vMAPL(j,i-1); // note we are adding up all of the lower as we want start
  462. rowcount_t totalrows = resnum;
  463. LOG(MCthorDetailedDebugInfo, thorJob, "Output start = %" RCPF "d, num = %" RCPF "u",respos,resnum);
  464. IArrayOf<IRowStream> readers;
  465. IException *exc = NULL;
  466. try
  467. {
  468. for (j=0;j<numnodes;j++)
  469. {
  470. unsigned i=j;
  471. rowcount_t sstart=vMAPL(i,partno-1);
  472. rowcount_t snum=vMAPU(i,partno)-sstart;
  473. if (snum>0)
  474. {
  475. if (i==partno)
  476. {
  477. LOG(MCthorDetailedDebugInfo, thorJob, "SORT Merge READ: Stream(%u) local, pos=%" RCPF "u len=%" RCPF "u",i,sstart,snum);
  478. readers.append(*slave.createMergeInputStream(sstart,snum));
  479. }
  480. else
  481. readers.append(*new CMergeReadStream(rowif,i,endpoints[i], sstart, snum, slave.queryTraceLevel(), secureContextClients));
  482. }
  483. }
  484. }
  485. catch (IException *e)
  486. {
  487. PrintExceptionLog(e,"**MultiMerge");
  488. exc = e;
  489. }
  490. if (!exc) {
  491. try {
  492. slave.startMerging(readers, totalrows);
  493. }
  494. catch (IException *e)
  495. {
  496. PrintExceptionLog(e,"**MultiMerge.2");
  497. exc = e;
  498. }
  499. }
  500. if (exc)
  501. throw exc;
  502. return totalrows;
  503. }
  504. };
  505. CSortMerge::CSortMerge(CSortTransferServerThread *_parent,ISocket* _socket,ISocketRowWriter *_out,rowcount_t _poscount,rowcount_t _numrecs,ISocketSelectHandler *_selecthandler)
  506. : src(_parent->slave),socket(_socket),out(_out)
  507. {
  508. parent = _parent;
  509. poscount = _poscount;
  510. numrecs = _numrecs;
  511. ndone = 0;
  512. started = false;
  513. selecthandler = _selecthandler;
  514. done = false;
  515. closing = false;
  516. }
  517. void CSortMerge::closedown()
  518. {
  519. CriticalBlock block(crit);
  520. #ifdef _FULL_TRACE
  521. LOG(MCthorDetailedDebugInfo, thorJob, "SORT Merge: closing %s",url.str());
  522. #endif
  523. if (!socket)
  524. return;
  525. try {
  526. iseq.clear();
  527. }
  528. catch (IException *e) {
  529. PrintExceptionLog(e,"**Exception(4b)");
  530. throw;
  531. }
  532. try {
  533. if (out)
  534. out->stop();
  535. }
  536. catch (IException *e) {
  537. PrintExceptionLog(e,"**Exception(4c)");
  538. throw;
  539. }
  540. try {
  541. out.clear();
  542. }
  543. catch (IException *e) {
  544. PrintExceptionLog(e,"**Exception(5)");
  545. throw;
  546. }
  547. parent->remove(socket);
  548. try {
  549. socket.clear();
  550. }
  551. catch (IException *e) {
  552. PrintExceptionLog(e,"**Exception(5b)");
  553. throw;
  554. }
  555. started = false;
  556. LOG(MCthorDetailedDebugInfo, thorJob, "SORT Merge: finished %s, %d rows merged",url.str(),ndone);
  557. }
  558. IMergeTransferServer *createMergeTransferServer(ISortSlaveBase *parent)
  559. {
  560. return new CSortTransferServerThread(*parent);
  561. }