fvdsremote.cpp 16 KB


  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jliball.hpp"
  15. #include "eclrtl.hpp"
  16. #include "hqlexpr.hpp"
  17. #include "hqlthql.hpp"
  18. #include "fvresultset.ipp"
  19. #include "fileview.hpp"
  20. #include "fvdisksource.ipp"
  21. #include "fvwugen.hpp"
  22. #include "fvdsremote.ipp"
  23. #include "fverror.hpp"
  24. #include "mpcomm.hpp"
  25. #define TIMEOUT 60000
  26. #define REMOTE_DATA_SIZE 8000 // roughly how much is sent back for each request
  27. enum { FVCMDnone, FVCMDrow, FVCMDraw, FVCMDnumrows, FVCMDcreatewu, FVCMDcreatefile, FVCMDdestroy, FVCMDfetch, FVCMDfetchraw, FVCMDmax };
  28. //---------------------------------------------------------------------------
  29. static void sendReceive(INode * serverNode, CMessageBuffer & msg)
  30. {
  31. if (!queryWorldCommunicator().sendRecv(msg, serverNode, MPTAG_FILEVIEW, TIMEOUT))
  32. throwError(FVERR_TimeoutRemoteFileView);
  33. msg.setEndian(__BIG_ENDIAN);
  34. IException * error = deserializeException(msg);
  35. if (error)
  36. throw error;
  37. }
  38. RemoteDataSource::RemoteDataSource(const SocketEndpoint & _serverEP, unique_id_t _id, IFvDataSourceMetaData * _metaData, __int64 _cachedNumRows, bool _isIndex) : serverEP(_serverEP)
  39. {
  40. id = _id;
  41. metaData.set(_metaData);
  42. serverNode.setown(createINode(serverEP));
  43. cachedNumRows = _cachedNumRows;
  44. index = _isIndex;
  45. openCount = 0;
  46. }
  47. IFvDataSourceMetaData * RemoteDataSource::queryMetaData()
  48. {
  49. return metaData;
  50. }
  51. void RemoteDataSource::beforeDispose()
  52. {
  53. CMessageBuffer msg;
  54. msg.setEndian(__BIG_ENDIAN);
  55. msg.append((byte)FVCMDdestroy);
  56. msg.append(id);
  57. sendReceive(msg);
  58. }
  59. bool RemoteDataSource::getARow(MemoryBuffer & out, RowCache & cache, byte cmd, __int64 row)
  60. {
  61. RowLocation location;
  62. if (cache.getCacheRow(row, location))
  63. {
  64. out.append(location.matchLength, location.matchRow);
  65. return true;
  66. }
  67. CMessageBuffer msg;
  68. msg.setEndian(__BIG_ENDIAN);
  69. msg.append(cmd);
  70. msg.append(id);
  71. msg.append(row);
  72. sendReceive(msg);
  73. bool ok;
  74. msg.read(ok);
  75. if (!ok) return false;
  76. __int64 start;
  77. msg.read(start);
  78. VariableRowBlock * next = new VariableRowBlock(msg, start);
  79. cache.addRowsOwn(next);
  80. if (!cache.getCacheRow(row, location))
  81. assertex(!"Internal Error!");
  82. out.append(location.matchLength, location.matchRow);
  83. return true;
  84. }
  85. bool RemoteDataSource::fetchRow(MemoryBuffer & out, __int64 offset)
  86. {
  87. CMessageBuffer msg;
  88. msg.setEndian(__BIG_ENDIAN);
  89. msg.append(FVCMDfetch);
  90. msg.append(id);
  91. msg.append(offset);
  92. sendReceive(msg);
  93. bool ok;
  94. msg.read(ok);
  95. if (!ok) return false;
  96. size32_t len;
  97. msg.read(len);
  98. out.append(len, msg.readDirect(len));
  99. return true;
  100. }
  101. bool RemoteDataSource::fetchRawRow(MemoryBuffer & out, __int64 offset)
  102. {
  103. CMessageBuffer msg;
  104. msg.setEndian(__BIG_ENDIAN);
  105. msg.append(FVCMDfetchraw);
  106. msg.append(id);
  107. msg.append(offset);
  108. sendReceive(msg);
  109. bool ok;
  110. msg.read(ok);
  111. if (!ok) return false;
  112. size32_t len;
  113. msg.read(len);
  114. out.append(len, msg.readDirect(len));
  115. return true;
  116. }
  117. bool RemoteDataSource::getRow(MemoryBuffer & out, __int64 row)
  118. {
  119. return getARow(out, translatedRows, FVCMDrow, row);
  120. }
  121. bool RemoteDataSource::getRawRow(MemoryBuffer & out, __int64 row)
  122. {
  123. return getARow(out, rawRows, FVCMDraw, row);
  124. }
  125. __int64 RemoteDataSource::numRows(bool force)
  126. {
  127. if (!force)
  128. return cachedNumRows;
  129. CMessageBuffer msg;
  130. msg.setEndian(__BIG_ENDIAN);
  131. msg.append((byte)FVCMDnumrows);
  132. msg.append(id);
  133. sendReceive(msg);
  134. __int64 result;
  135. msg.read(result);
  136. return result;
  137. }
  138. void RemoteDataSource::onClose()
  139. {
  140. if (--openCount == 0)
  141. {
  142. //MORE: Should tell the server...
  143. }
  144. }
  145. void RemoteDataSource::onOpen()
  146. {
  147. //MORE: critical section
  148. if (openCount++ == 0)
  149. {
  150. //MORE - tell the server...
  151. }
  152. }
  153. void RemoteDataSource::sendReceive(CMessageBuffer & msg)
  154. {
  155. ::sendReceive(serverNode, msg);
  156. }
  157. IFvDataSource * createRemoteDataSource(const SocketEndpoint & server, const char * username, const char * password, const char * wuid, unsigned sequence, const char * name)
  158. {
  159. Owned<INode> serverNode = createINode(server);
  160. CMessageBuffer msg;
  161. msg.setEndian(__BIG_ENDIAN);
  162. msg.append((byte)FVCMDcreatewu);
  163. msg.append(myProcessSession());
  164. msg.append(username);
  165. msg.append(password);
  166. msg.append(wuid);
  167. msg.append(sequence);
  168. msg.append(name);
  169. sendReceive(serverNode, msg);
  170. unsigned short version;
  171. unique_id_t id;
  172. __int64 numRows;
  173. bool isIndex;
  174. msg.read(version);
  175. msg.read(id);
  176. msg.read(numRows);
  177. Owned<IFvDataSourceMetaData> meta = deserializeDataSourceMeta(msg);
  178. msg.read(isIndex);
  179. if (id)
  180. return new RemoteDataSource(server, id, meta, numRows, isIndex);
  181. return 0;
  182. }
  183. IFvDataSource * createRemoteFileDataSource(const SocketEndpoint & server, const char * username, const char * password, const char * logicalName)
  184. {
  185. Owned<INode> serverNode = createINode(server);
  186. CMessageBuffer msg;
  187. msg.setEndian(__BIG_ENDIAN);
  188. msg.append((byte)FVCMDcreatefile);
  189. msg.append(myProcessSession());
  190. msg.append(username);
  191. msg.append(password);
  192. msg.append(logicalName);
  193. sendReceive(serverNode, msg);
  194. unsigned short version;
  195. unique_id_t id;
  196. __int64 numRows;
  197. bool isIndex;
  198. msg.read(version);
  199. msg.read(id);
  200. msg.read(numRows);
  201. Owned<IFvDataSourceMetaData> meta = deserializeDataSourceMeta(msg);
  202. msg.read(isIndex);
  203. if (id)
  204. return new RemoteDataSource(server, id, meta, numRows, isIndex);
  205. return 0;
  206. }
  207. //---------------------------------------------------------------------------
  208. static RemoteDataSourceServer * server;
  209. RemoteDataEntry::~RemoteDataEntry()
  210. {
  211. if (subscription)
  212. querySessionManager().unsubscribeSession(subscription);
  213. }
  214. RemoteDataSourceServer::RemoteDataSourceServer(const char * _queue, const char * _cluster) : Thread("Remote File View Server")
  215. {
  216. alive = true;
  217. nextId = 0;
  218. queue.set(_queue);
  219. cluster.set(_cluster);
  220. }
  221. unique_id_t RemoteDataSourceServer::addDataSource(SessionId session, IFvDataSource * ds)
  222. {
  223. RemoteDataEntry * newEntry = new RemoteDataEntry;
  224. newEntry->id = ++nextId;
  225. newEntry->session = session;
  226. newEntry->ds.set(ds);
  227. newEntry->subscription = querySessionManager().subscribeSession(session, this);
  228. //MORE: Register the session so if it dies then we get notified.
  229. CriticalBlock procedure(cs);
  230. entries.append(*newEntry);
  231. return newEntry->id;
  232. }
  233. void RemoteDataSourceServer::doCmdFetch(bool raw, MemoryBuffer & in, MemoryBuffer & out)
  234. {
  235. Owned<IFvDataSource> ds = readDataSource(in);
  236. if (!ds)
  237. {
  238. out.append(false);
  239. return;
  240. }
  241. __int64 requestedOffset;
  242. in.read(requestedOffset);
  243. MemoryBuffer temp;
  244. bool ok = ds->fetchRow(temp, requestedOffset);
  245. out.append(ok); // ok
  246. out.append(temp.length());
  247. out.append(temp.length(), temp.toByteArray());
  248. }
  249. void RemoteDataSourceServer::doCmdFetchRaw(bool raw, MemoryBuffer & in, MemoryBuffer & out)
  250. {
  251. Owned<IFvDataSource> ds = readDataSource(in);
  252. if (!ds)
  253. {
  254. out.append(false);
  255. return;
  256. }
  257. __int64 requestedOffset;
  258. in.read(requestedOffset);
  259. MemoryBuffer temp;
  260. bool ok = ds->fetchRawRow(temp, requestedOffset);
  261. out.append(ok); // ok
  262. out.append(temp.length());
  263. out.append(temp.length(), temp.toByteArray());
  264. }
  265. void RemoteDataSourceServer::doCmdRow(bool raw, MemoryBuffer & in, MemoryBuffer & out)
  266. {
  267. Owned<IFvDataSource> ds = readDataSource(in);
  268. if (!ds)
  269. {
  270. out.append(false);
  271. return;
  272. }
  273. __int64 requestedRow;
  274. in.read(requestedRow);
  275. unsigned startPos = out.length();
  276. unsigned numRows = 0;
  277. out.append(true); // ok
  278. out.append(requestedRow); // start
  279. unsigned numRowsPos = out.length();
  280. out.append(numRows); // total number of rows;
  281. loop
  282. {
  283. unsigned lengthPos = out.length();
  284. out.append((unsigned)0); // size of this row.
  285. unsigned startRow = out.length();
  286. if (raw)
  287. {
  288. if (!ds->getRawRow(out, requestedRow+numRows))
  289. break;
  290. }
  291. else
  292. {
  293. if (!ds->getRow(out, requestedRow+numRows))
  294. break;
  295. }
  296. if ((numRows != 0) && (out.length() > REMOTE_DATA_SIZE))
  297. break;
  298. unsigned endRow = out.length();
  299. out.rewrite(lengthPos);
  300. out.append(endRow-startRow);
  301. out.rewrite(endRow);
  302. numRows++;
  303. }
  304. if (numRows == 0)
  305. {
  306. out.rewrite(startPos);
  307. out.append(false);
  308. return;
  309. }
  310. unsigned totalLength = out.length();
  311. out.rewrite(numRowsPos);
  312. out.append(numRows);
  313. out.rewrite(totalLength);
  314. }
  315. void RemoteDataSourceServer::doCmdNumRows(MemoryBuffer & in, MemoryBuffer & out)
  316. {
  317. Owned<IFvDataSource> ds = readDataSource(in);
  318. __int64 numRows = ds ? ds->numRows(true) : 0;
  319. out.append(numRows);
  320. }
  321. void RemoteDataSourceServer::doCmdCreateWorkunit(MemoryBuffer & in, MemoryBuffer & out)
  322. {
  323. SessionId session;
  324. StringAttr wuid, username, password;
  325. unsigned sequence;
  326. StringAttr name;
  327. in.read(session);
  328. in.read(username).read(password);
  329. in.read(wuid);
  330. in.read(sequence);
  331. in.read(name);
  332. DBGLOG("RemoteFileView:CreateWorkunit('%s',%d,'%s') by[%s:%"I64F"d", wuid.get(), sequence, name ? name.get() : "", username.get(), session);
  333. Owned<IConstWUResult> wuResult = resolveResult(wuid, sequence, name);
  334. Owned<IFvDataSource> ds = createDataSource(wuResult, wuid, username, password);
  335. unique_id_t id = addDataSource(session, ds);
  336. out.append((unsigned short)CurRemoteVersion);
  337. out.append(id);
  338. out.append(ds->numRows(false));
  339. ds->queryMetaData()->serialize(out);
  340. out.append(ds->isIndex());
  341. DBGLOG("RemoteFileView:CreateWorkunit returns %"I64F"d", id);
  342. }
  343. void RemoteDataSourceServer::doCmdCreateFile(MemoryBuffer & in, MemoryBuffer & out)
  344. {
  345. SessionId session;
  346. StringAttr username, password, logicalName;
  347. in.read(session);
  348. in.read(username).read(password);
  349. in.read(logicalName);
  350. DBGLOG("RemoteFileView:CreateFile('%s') by[%s:%"I64F"d", logicalName.get(), username.get(), session);
  351. Owned<IFvDataSource> ds = createFileDataSource(logicalName, cluster, username, password);
  352. unique_id_t id = addDataSource(session, ds);
  353. out.append((unsigned short)CurRemoteVersion);
  354. out.append(id);
  355. out.append(ds->numRows(false));
  356. ds->queryMetaData()->serialize(out);
  357. out.append(ds->isIndex());
  358. DBGLOG("RemoteFileView:CreateFile returns %"I64F"d", id);
  359. }
  360. void RemoteDataSourceServer::doCmdDestroy(MemoryBuffer & in, MemoryBuffer & out)
  361. {
  362. unique_id_t id;
  363. in.read(id);
  364. DBGLOG("RemoteFileView:Destroy(%"I64F"d)", id);
  365. CriticalBlock block(cs);
  366. ForEachItemIn(idx, entries)
  367. {
  368. RemoteDataEntry & cur = entries.item(idx);
  369. if (cur.id == id)
  370. {
  371. entries.remove(idx);
  372. return;
  373. }
  374. }
  375. }
  376. IFvDataSource * RemoteDataSourceServer::getDataSource(unique_id_t id)
  377. {
  378. CriticalBlock block(cs);
  379. ForEachItemIn(idx, entries)
  380. {
  381. RemoteDataEntry & cur = entries.item(idx);
  382. if (cur.id == id)
  383. return LINK(cur.ds);
  384. }
  385. return NULL;
  386. }
  387. void RemoteDataSourceServer::closed(SessionId id)
  388. {
  389. removeSession(id);
  390. }
  391. void RemoteDataSourceServer::aborted(SessionId id)
  392. {
  393. removeSession(id);
  394. }
  395. IFvDataSource * RemoteDataSourceServer::readDataSource(MemoryBuffer & in)
  396. {
  397. unique_id_t id;
  398. in.read(id);
  399. return getDataSource(id);
  400. }
  401. void RemoteDataSourceServer::removeSession(SessionId id)
  402. {
  403. DBGLOG("RemoteFileView:Session Died");
  404. CriticalBlock block(cs);
  405. ForEachItemInRev(idx, entries)
  406. {
  407. RemoteDataEntry & cur = entries.item(idx);
  408. if (cur.session == id)
  409. {
  410. DBGLOG("RemoteFileView:Instance Died %"I64F"d", cur.id);
  411. entries.remove(idx);
  412. }
  413. }
  414. }
  415. //MORE: If this is ever actually used then it should probably have several threads
  416. // processing the commands, especially if the commands can involve lots of processing.
  417. int RemoteDataSourceServer::run()
  418. {
  419. CMessageBuffer msg;
  420. MemoryBuffer result;
  421. INode * sender;
  422. while (alive)
  423. {
  424. msg.clear();
  425. if (queryWorldCommunicator().recv(msg, 0, MPTAG_FILEVIEW, &sender))
  426. {
  427. msg.setEndian(__BIG_ENDIAN);
  428. result.setEndian(__BIG_ENDIAN);
  429. try
  430. {
  431. serializeException(NULL, result.clear());
  432. byte cmd;
  433. msg.read(cmd);
  434. switch (cmd)
  435. {
  436. case FVCMDrow: doCmdRow(false, msg, result); break;
  437. case FVCMDraw: doCmdRow(true, msg, result); break;
  438. case FVCMDnumrows: doCmdNumRows(msg, result); break;
  439. case FVCMDcreatewu: doCmdCreateWorkunit(msg, result); break;
  440. case FVCMDcreatefile: doCmdCreateFile(msg, result); break;
  441. case FVCMDdestroy: doCmdDestroy(msg, result); break;
  442. case FVCMDfetch: doCmdFetch(false, msg, result); break;
  443. case FVCMDfetchraw: doCmdFetchRaw(false, msg, result); break;
  444. default:
  445. throwError(FVERR_UnknownRemoteCommand);
  446. }
  447. msg.clear().append(result);
  448. }
  449. catch (IException * e)
  450. {
  451. serializeException(e, msg.clear());
  452. e->Release();
  453. }
  454. queryWorldCommunicator().reply(msg, MP_ASYNC_SEND);
  455. ::Release(sender);
  456. }
  457. }
  458. server = NULL;
  459. return 0;
  460. }
  461. void RemoteDataSourceServer::stop()
  462. {
  463. alive = false;
  464. queryWorldCommunicator().cancel(0, MPTAG_FILEVIEW);
  465. join();
  466. }
  467. extern FILEVIEW_API void startRemoteDataSourceServer(const char * queue, const char * cluster)
  468. {
  469. //This isn't properly thread safe - it also isn't ever used in practice, so not a problem.
  470. if (!server)
  471. {
  472. server = new RemoteDataSourceServer(queue, cluster);
  473. server->start();
  474. }
  475. }
  476. extern FILEVIEW_API void stopRemoteDataSourceServer()
  477. {
  478. if (server)
  479. server->stop();
  480. }
  481. IConstWUResult * resolveResult(const char * wuid, unsigned sequence, const char * name)
  482. {
  483. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  484. Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid, false);
  485. return getWorkUnitResult(wu, name, sequence);
  486. }
  487. IConstWUResult * secResolveResult(ISecManager &secmgr, ISecUser &secuser, const char * wuid, unsigned sequence, const char * name)
  488. {
  489. Owned<IWorkUnitFactory> factory = getSecWorkUnitFactory(secmgr, secuser);
  490. Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid, false);
  491. return (wu) ? getWorkUnitResult(wu, name, sequence) : NULL;
  492. }