fvdsremote.cpp 16 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "eclrtl.hpp"
  15. #include "hqlexpr.hpp"
  16. #include "hqlthql.hpp"
  17. #include "fvresultset.ipp"
  18. #include "fileview.hpp"
  19. #include "fvdisksource.ipp"
  20. #include "fvwugen.hpp"
  21. #include "fvdsremote.ipp"
  22. #include "fverror.hpp"
  23. #include "mpcomm.hpp"
  24. #define TIMEOUT 60000
  25. #define REMOTE_DATA_SIZE 8000 // roughly how much is sent back for each request
  26. enum { FVCMDnone, FVCMDrow, FVCMDraw, FVCMDnumrows, FVCMDcreatewu, FVCMDcreatefile, FVCMDdestroy, FVCMDfetch, FVCMDfetchraw, FVCMDmax };
  27. //---------------------------------------------------------------------------
  28. static void sendReceive(INode * serverNode, CMessageBuffer & msg)
  29. {
  30. if (!queryWorldCommunicator().sendRecv(msg, serverNode, MPTAG_FILEVIEW, TIMEOUT))
  31. throwError(FVERR_TimeoutRemoteFileView);
  32. msg.setEndian(__BIG_ENDIAN);
  33. IException * error = deserializeException(msg);
  34. if (error)
  35. throw error;
  36. }
  37. RemoteDataSource::RemoteDataSource(const SocketEndpoint & _serverEP, unique_id_t _id, IFvDataSourceMetaData * _metaData, __int64 _cachedNumRows, bool _isIndex) : serverEP(_serverEP)
  38. {
  39. id = _id;
  40. metaData.set(_metaData);
  41. serverNode.setown(createINode(serverEP));
  42. cachedNumRows = _cachedNumRows;
  43. index = _isIndex;
  44. openCount = 0;
  45. }
  46. IFvDataSourceMetaData * RemoteDataSource::queryMetaData()
  47. {
  48. return metaData;
  49. }
  50. void RemoteDataSource::beforeDispose()
  51. {
  52. CMessageBuffer msg;
  53. msg.setEndian(__BIG_ENDIAN);
  54. msg.append((byte)FVCMDdestroy);
  55. msg.append(id);
  56. sendReceive(msg);
  57. }
  58. bool RemoteDataSource::getARow(MemoryBuffer & out, RowCache & cache, byte cmd, __int64 row)
  59. {
  60. RowLocation location;
  61. if (cache.getCacheRow(row, location))
  62. {
  63. out.append(location.matchLength, location.matchRow);
  64. return true;
  65. }
  66. CMessageBuffer msg;
  67. msg.setEndian(__BIG_ENDIAN);
  68. msg.append(cmd);
  69. msg.append(id);
  70. msg.append(row);
  71. sendReceive(msg);
  72. bool ok;
  73. msg.read(ok);
  74. if (!ok) return false;
  75. __int64 start;
  76. msg.read(start);
  77. VariableRowBlock * next = new VariableRowBlock(msg, start);
  78. cache.addRowsOwn(next);
  79. if (!cache.getCacheRow(row, location))
  80. assertex(!"Internal Error!");
  81. out.append(location.matchLength, location.matchRow);
  82. return true;
  83. }
  84. bool RemoteDataSource::fetchRow(MemoryBuffer & out, __int64 offset)
  85. {
  86. CMessageBuffer msg;
  87. msg.setEndian(__BIG_ENDIAN);
  88. msg.append(FVCMDfetch);
  89. msg.append(id);
  90. msg.append(offset);
  91. sendReceive(msg);
  92. bool ok;
  93. msg.read(ok);
  94. if (!ok) return false;
  95. size32_t len;
  96. msg.read(len);
  97. out.append(len, msg.readDirect(len));
  98. return true;
  99. }
  100. bool RemoteDataSource::fetchRawRow(MemoryBuffer & out, __int64 offset)
  101. {
  102. CMessageBuffer msg;
  103. msg.setEndian(__BIG_ENDIAN);
  104. msg.append(FVCMDfetchraw);
  105. msg.append(id);
  106. msg.append(offset);
  107. sendReceive(msg);
  108. bool ok;
  109. msg.read(ok);
  110. if (!ok) return false;
  111. size32_t len;
  112. msg.read(len);
  113. out.append(len, msg.readDirect(len));
  114. return true;
  115. }
  116. bool RemoteDataSource::getRow(MemoryBuffer & out, __int64 row)
  117. {
  118. return getARow(out, translatedRows, FVCMDrow, row);
  119. }
  120. bool RemoteDataSource::getRawRow(MemoryBuffer & out, __int64 row)
  121. {
  122. return getARow(out, rawRows, FVCMDraw, row);
  123. }
  124. __int64 RemoteDataSource::numRows(bool force)
  125. {
  126. if (!force)
  127. return cachedNumRows;
  128. CMessageBuffer msg;
  129. msg.setEndian(__BIG_ENDIAN);
  130. msg.append((byte)FVCMDnumrows);
  131. msg.append(id);
  132. sendReceive(msg);
  133. __int64 result;
  134. msg.read(result);
  135. return result;
  136. }
  137. void RemoteDataSource::onClose()
  138. {
  139. if (--openCount == 0)
  140. {
  141. //MORE: Should tell the server...
  142. }
  143. }
  144. void RemoteDataSource::onOpen()
  145. {
  146. //MORE: critical section
  147. if (openCount++ == 0)
  148. {
  149. //MORE - tell the server...
  150. }
  151. }
  152. void RemoteDataSource::sendReceive(CMessageBuffer & msg)
  153. {
  154. ::sendReceive(serverNode, msg);
  155. }
  156. IFvDataSource * createRemoteDataSource(const SocketEndpoint & server, const char * username, const char * password, const char * wuid, unsigned sequence, const char * name)
  157. {
  158. Owned<INode> serverNode = createINode(server);
  159. CMessageBuffer msg;
  160. msg.setEndian(__BIG_ENDIAN);
  161. msg.append((byte)FVCMDcreatewu);
  162. msg.append(myProcessSession());
  163. msg.append(username);
  164. msg.append(password);
  165. msg.append(wuid);
  166. msg.append(sequence);
  167. msg.append(name);
  168. sendReceive(serverNode, msg);
  169. unsigned short version;
  170. unique_id_t id;
  171. __int64 numRows;
  172. bool isIndex;
  173. msg.read(version);
  174. msg.read(id);
  175. msg.read(numRows);
  176. Owned<IFvDataSourceMetaData> meta = deserializeDataSourceMeta(msg);
  177. msg.read(isIndex);
  178. if (id)
  179. return new RemoteDataSource(server, id, meta, numRows, isIndex);
  180. return 0;
  181. }
  182. IFvDataSource * createRemoteFileDataSource(const SocketEndpoint & server, const char * username, const char * password, const char * logicalName)
  183. {
  184. Owned<INode> serverNode = createINode(server);
  185. CMessageBuffer msg;
  186. msg.setEndian(__BIG_ENDIAN);
  187. msg.append((byte)FVCMDcreatefile);
  188. msg.append(myProcessSession());
  189. msg.append(username);
  190. msg.append(password);
  191. msg.append(logicalName);
  192. sendReceive(serverNode, msg);
  193. unsigned short version;
  194. unique_id_t id;
  195. __int64 numRows;
  196. bool isIndex;
  197. msg.read(version);
  198. msg.read(id);
  199. msg.read(numRows);
  200. Owned<IFvDataSourceMetaData> meta = deserializeDataSourceMeta(msg);
  201. msg.read(isIndex);
  202. if (id)
  203. return new RemoteDataSource(server, id, meta, numRows, isIndex);
  204. return 0;
  205. }
  206. //---------------------------------------------------------------------------
  207. static RemoteDataSourceServer * server;
  208. RemoteDataEntry::~RemoteDataEntry()
  209. {
  210. if (subscription)
  211. querySessionManager().unsubscribeSession(subscription);
  212. }
  213. RemoteDataSourceServer::RemoteDataSourceServer(const char * _queue, const char * _cluster) : Thread("Remote File View Server")
  214. {
  215. alive = true;
  216. nextId = 0;
  217. queue.set(_queue);
  218. cluster.set(_cluster);
  219. }
  220. unique_id_t RemoteDataSourceServer::addDataSource(SessionId session, IFvDataSource * ds)
  221. {
  222. RemoteDataEntry * newEntry = new RemoteDataEntry;
  223. newEntry->id = ++nextId;
  224. newEntry->session = session;
  225. newEntry->ds.set(ds);
  226. newEntry->subscription = querySessionManager().subscribeSession(session, this);
  227. //MORE: Register the session so if it dies then we get notified.
  228. CriticalBlock procedure(cs);
  229. entries.append(*newEntry);
  230. return newEntry->id;
  231. }
  232. void RemoteDataSourceServer::doCmdFetch(bool raw, MemoryBuffer & in, MemoryBuffer & out)
  233. {
  234. Owned<IFvDataSource> ds = readDataSource(in);
  235. if (!ds)
  236. {
  237. out.append(false);
  238. return;
  239. }
  240. __int64 requestedOffset;
  241. in.read(requestedOffset);
  242. MemoryBuffer temp;
  243. bool ok = ds->fetchRow(temp, requestedOffset);
  244. out.append(ok); // ok
  245. out.append(temp.length());
  246. out.append(temp.length(), temp.toByteArray());
  247. }
  248. void RemoteDataSourceServer::doCmdFetchRaw(bool raw, MemoryBuffer & in, MemoryBuffer & out)
  249. {
  250. Owned<IFvDataSource> ds = readDataSource(in);
  251. if (!ds)
  252. {
  253. out.append(false);
  254. return;
  255. }
  256. __int64 requestedOffset;
  257. in.read(requestedOffset);
  258. MemoryBuffer temp;
  259. bool ok = ds->fetchRawRow(temp, requestedOffset);
  260. out.append(ok); // ok
  261. out.append(temp.length());
  262. out.append(temp.length(), temp.toByteArray());
  263. }
  264. void RemoteDataSourceServer::doCmdRow(bool raw, MemoryBuffer & in, MemoryBuffer & out)
  265. {
  266. Owned<IFvDataSource> ds = readDataSource(in);
  267. if (!ds)
  268. {
  269. out.append(false);
  270. return;
  271. }
  272. __int64 requestedRow;
  273. in.read(requestedRow);
  274. unsigned startPos = out.length();
  275. unsigned numRows = 0;
  276. out.append(true); // ok
  277. out.append(requestedRow); // start
  278. unsigned numRowsPos = out.length();
  279. out.append(numRows); // total number of rows;
  280. loop
  281. {
  282. unsigned lengthPos = out.length();
  283. out.append((unsigned)0); // size of this row.
  284. unsigned startRow = out.length();
  285. if (raw)
  286. {
  287. if (!ds->getRawRow(out, requestedRow+numRows))
  288. break;
  289. }
  290. else
  291. {
  292. if (!ds->getRow(out, requestedRow+numRows))
  293. break;
  294. }
  295. if ((numRows != 0) && (out.length() > REMOTE_DATA_SIZE))
  296. break;
  297. unsigned endRow = out.length();
  298. out.rewrite(lengthPos);
  299. out.append(endRow-startRow);
  300. out.rewrite(endRow);
  301. numRows++;
  302. }
  303. if (numRows == 0)
  304. {
  305. out.rewrite(startPos);
  306. out.append(false);
  307. return;
  308. }
  309. unsigned totalLength = out.length();
  310. out.rewrite(numRowsPos);
  311. out.append(numRows);
  312. out.rewrite(totalLength);
  313. }
  314. void RemoteDataSourceServer::doCmdNumRows(MemoryBuffer & in, MemoryBuffer & out)
  315. {
  316. Owned<IFvDataSource> ds = readDataSource(in);
  317. __int64 numRows = ds ? ds->numRows(true) : 0;
  318. out.append(numRows);
  319. }
  320. void RemoteDataSourceServer::doCmdCreateWorkunit(MemoryBuffer & in, MemoryBuffer & out)
  321. {
  322. SessionId session;
  323. StringAttr wuid, username, password;
  324. unsigned sequence;
  325. StringAttr name;
  326. in.read(session);
  327. in.read(username).read(password);
  328. in.read(wuid);
  329. in.read(sequence);
  330. in.read(name);
  331. DBGLOG("RemoteFileView:CreateWorkunit('%s',%d,'%s') by[%s:%" I64F "d", wuid.get(), sequence, name ? name.get() : "", username.get(), session);
  332. Owned<IConstWUResult> wuResult = resolveResult(wuid, sequence, name);
  333. Owned<IFvDataSource> ds = createDataSource(wuResult, wuid, username, password);
  334. unique_id_t id = addDataSource(session, ds);
  335. out.append((unsigned short)CurRemoteVersion);
  336. out.append(id);
  337. out.append(ds->numRows(false));
  338. ds->queryMetaData()->serialize(out);
  339. out.append(ds->isIndex());
  340. DBGLOG("RemoteFileView:CreateWorkunit returns %" I64F "d", id);
  341. }
  342. void RemoteDataSourceServer::doCmdCreateFile(MemoryBuffer & in, MemoryBuffer & out)
  343. {
  344. SessionId session;
  345. StringAttr username, password, logicalName;
  346. in.read(session);
  347. in.read(username).read(password);
  348. in.read(logicalName);
  349. DBGLOG("RemoteFileView:CreateFile('%s') by[%s:%" I64F "d", logicalName.get(), username.get(), session);
  350. Owned<IFvDataSource> ds = createFileDataSource(logicalName, cluster, username, password);
  351. unique_id_t id = addDataSource(session, ds);
  352. out.append((unsigned short)CurRemoteVersion);
  353. out.append(id);
  354. out.append(ds->numRows(false));
  355. ds->queryMetaData()->serialize(out);
  356. out.append(ds->isIndex());
  357. DBGLOG("RemoteFileView:CreateFile returns %" I64F "d", id);
  358. }
  359. void RemoteDataSourceServer::doCmdDestroy(MemoryBuffer & in, MemoryBuffer & out)
  360. {
  361. unique_id_t id;
  362. in.read(id);
  363. DBGLOG("RemoteFileView:Destroy(%" I64F "d)", id);
  364. CriticalBlock block(cs);
  365. ForEachItemIn(idx, entries)
  366. {
  367. RemoteDataEntry & cur = entries.item(idx);
  368. if (cur.id == id)
  369. {
  370. entries.remove(idx);
  371. return;
  372. }
  373. }
  374. }
  375. IFvDataSource * RemoteDataSourceServer::getDataSource(unique_id_t id)
  376. {
  377. CriticalBlock block(cs);
  378. ForEachItemIn(idx, entries)
  379. {
  380. RemoteDataEntry & cur = entries.item(idx);
  381. if (cur.id == id)
  382. return LINK(cur.ds);
  383. }
  384. return NULL;
  385. }
  386. void RemoteDataSourceServer::closed(SessionId id)
  387. {
  388. removeSession(id);
  389. }
  390. void RemoteDataSourceServer::aborted(SessionId id)
  391. {
  392. removeSession(id);
  393. }
  394. IFvDataSource * RemoteDataSourceServer::readDataSource(MemoryBuffer & in)
  395. {
  396. unique_id_t id;
  397. in.read(id);
  398. return getDataSource(id);
  399. }
  400. void RemoteDataSourceServer::removeSession(SessionId id)
  401. {
  402. DBGLOG("RemoteFileView:Session Died");
  403. CriticalBlock block(cs);
  404. ForEachItemInRev(idx, entries)
  405. {
  406. RemoteDataEntry & cur = entries.item(idx);
  407. if (cur.session == id)
  408. {
  409. DBGLOG("RemoteFileView:Instance Died %" I64F "d", cur.id);
  410. entries.remove(idx);
  411. }
  412. }
  413. }
  414. //MORE: If this is ever actually used then it should probably have several threads
  415. // processing the commands, especially if the commands can involve lots of processing.
  416. int RemoteDataSourceServer::run()
  417. {
  418. CMessageBuffer msg;
  419. MemoryBuffer result;
  420. INode * sender;
  421. while (alive)
  422. {
  423. msg.clear();
  424. if (queryWorldCommunicator().recv(msg, 0, MPTAG_FILEVIEW, &sender))
  425. {
  426. msg.setEndian(__BIG_ENDIAN);
  427. result.setEndian(__BIG_ENDIAN);
  428. try
  429. {
  430. serializeException(NULL, result.clear());
  431. byte cmd;
  432. msg.read(cmd);
  433. switch (cmd)
  434. {
  435. case FVCMDrow: doCmdRow(false, msg, result); break;
  436. case FVCMDraw: doCmdRow(true, msg, result); break;
  437. case FVCMDnumrows: doCmdNumRows(msg, result); break;
  438. case FVCMDcreatewu: doCmdCreateWorkunit(msg, result); break;
  439. case FVCMDcreatefile: doCmdCreateFile(msg, result); break;
  440. case FVCMDdestroy: doCmdDestroy(msg, result); break;
  441. case FVCMDfetch: doCmdFetch(false, msg, result); break;
  442. case FVCMDfetchraw: doCmdFetchRaw(false, msg, result); break;
  443. default:
  444. throwError(FVERR_UnknownRemoteCommand);
  445. }
  446. msg.clear().append(result);
  447. }
  448. catch (IException * e)
  449. {
  450. serializeException(e, msg.clear());
  451. e->Release();
  452. }
  453. queryWorldCommunicator().reply(msg, MP_ASYNC_SEND);
  454. ::Release(sender);
  455. }
  456. }
  457. server = NULL;
  458. return 0;
  459. }
  460. void RemoteDataSourceServer::stop()
  461. {
  462. alive = false;
  463. queryWorldCommunicator().cancel(0, MPTAG_FILEVIEW);
  464. join();
  465. }
  466. extern FILEVIEW_API void startRemoteDataSourceServer(const char * queue, const char * cluster)
  467. {
  468. //This isn't properly thread safe - it also isn't ever used in practice, so not a problem.
  469. if (!server)
  470. {
  471. server = new RemoteDataSourceServer(queue, cluster);
  472. server->start();
  473. }
  474. }
  475. extern FILEVIEW_API void stopRemoteDataSourceServer()
  476. {
  477. if (server)
  478. server->stop();
  479. }
  480. IConstWUResult * resolveResult(const char * wuid, unsigned sequence, const char * name)
  481. {
  482. Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
  483. Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid, false);
  484. return getWorkUnitResult(wu, name, sequence);
  485. }
  486. IConstWUResult * secResolveResult(ISecManager &secmgr, ISecUser &secuser, const char * wuid, unsigned sequence, const char * name)
  487. {
  488. Owned<IWorkUnitFactory> factory = getSecWorkUnitFactory(secmgr, secuser);
  489. Owned<IConstWorkUnit> wu = factory->openWorkUnit(wuid, false);
  490. return (wu) ? getWorkUnitResult(wu, name, sequence) : NULL;
  491. }