wsdfuaccess.cpp 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2018 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <vector>
  14. #include "jliball.hpp"
  15. #include "jflz.hpp"
  16. #include "daclient.hpp"
  17. #include "dautils.hpp"
  18. #include "seclib.hpp"
  19. #include "environment.hpp"
  20. #include "ws_dfu.hpp"
  21. #include "dafsstream.hpp"
  22. #include "dafdesc.hpp"
  23. #include "dadfs.hpp"
  24. #include "dasess.hpp"
  25. #include "thorcommon.hpp"
  26. #include "digisign.hpp"
  27. #include "rmtclient.hpp"
  28. #include "eclwatch_errorlist.hpp" // only for ECLWATCH_FILE_NOT_EXIST
  29. #include "soapmessage.hpp"
  30. #include "wsdfuaccess.hpp"
  31. using namespace dafsstream;
  32. using namespace cryptohelper;
  33. namespace wsdfuaccess
  34. {
  35. //#define TEST_RETURNTEXTRESPONSE
  36. static std::vector<std::string> dfuServiceUrls;
  37. static CriticalSection dfuServiceUrlCrit;
  38. static std::atomic<unsigned> currentDfuServiceUrl{0};
  39. static std::atomic<bool> dfuServiceUrlsDiscovered{false};
  40. void ensureAccessibleDfuServiceURLList()
  41. {
  42. bool expected = false;
  43. if (dfuServiceUrlsDiscovered.compare_exchange_strong(expected, true))
  44. {
  45. getAccessibleServiceURLList("WsSMC", dfuServiceUrls);
  46. if (0 == dfuServiceUrls.size())
  47. throw MakeStringException(-1, "Could not find any DFU services in the target HPCC configuration.");
  48. for (auto &s: dfuServiceUrls)
  49. s = s + "/WsDfu/";
  50. }
  51. }
  52. static unsigned getNumDfuServiceURL()
  53. {
  54. ensureAccessibleDfuServiceURLList();
  55. return dfuServiceUrls.size();
  56. }
  57. /* advances to next DFU service URL, wraps if necessary.
  58. * If concurrent threads are trying to advance, only 1 will succeed, but this call will update current.
  59. */
  60. static const char *advanceToNextAvailableDFUServiceURL(unsigned &currentURL)
  61. {
  62. ensureAccessibleDfuServiceURLList();
  63. // 1st check if need to rollover
  64. unsigned expected = dfuServiceUrls.size()-1;
  65. if (currentDfuServiceUrl.compare_exchange_strong(expected, 0))
  66. currentURL = 0;
  67. else
  68. {
  69. // try to advance by 1.
  70. if (currentDfuServiceUrl.compare_exchange_strong(currentURL, currentURL+1))
  71. currentURL++;
  72. else // someone else already has, get current dfu service url
  73. currentURL = currentDfuServiceUrl;
  74. }
  75. return dfuServiceUrls[currentURL].c_str();
  76. }
  77. static IClientDFUFileAccessResponse *doLookupDFUFileDeprecated(IClientWsDfu *dfuClient, const char *logicalName, const char *requestId, unsigned expirySecs)
  78. {
  79. Owned<IClientDFUFileAccessRequest> dfuReq = dfuClient->createDFUFileAccessRequest();
  80. CDfsLogicalFileName lfn;
  81. lfn.set(logicalName);
  82. StringBuffer cluster, lfnName;
  83. lfn.getCluster(cluster);
  84. lfn.get(lfnName); // remove cluster if present
  85. IEspDFUFileAccessRequestBase &requestBase = dfuReq->updateRequestBase();
  86. requestBase.setName(lfnName);
  87. requestBase.setCluster(cluster);
  88. requestBase.setExpirySeconds(expirySecs);
  89. requestBase.setJobId(requestId);
  90. requestBase.setAccessRole(CFileAccessRole_External);
  91. requestBase.setAccessType(CSecAccessType_Read);
  92. requestBase.setReturnBinTypeInfo(true);
  93. return dfuClient->DFUFileAccess(dfuReq);
  94. }
  95. static IClientDFUFileAccessResponse *doLookupDFUFile(IClientWsDfu *dfuClient, const char *logicalName, const char *requestId, unsigned expirySecs)
  96. {
  97. Owned<IClientDFUFileAccessV2Request> dfuReq = dfuClient->createDFUFileAccessV2Request();
  98. CDfsLogicalFileName lfn;
  99. lfn.set(logicalName);
  100. StringBuffer cluster, lfnName;
  101. lfn.getCluster(cluster);
  102. lfn.get(lfnName); // remove cluster if present
  103. dfuReq->setName(lfnName);
  104. dfuReq->setCluster(cluster);
  105. dfuReq->setExpirySeconds(expirySecs);
  106. dfuReq->setRequestId(requestId);
  107. #ifdef TEST_RETURNTEXTRESPONSE
  108. dfuReq->setReturnTextResponse(true);
  109. #endif
  110. return dfuClient->DFUFileAccessV2(dfuReq);
  111. }
  112. static IDFUFileAccess *doLookupDFUFileHandleLegacy(const char *serviceUrl, const char *logicalName, const char *requestId, unsigned expirySecs, const char *user, const char *password)
  113. {
  114. Owned<IClientWsDfu> dfuClient = createWsDfuClient();
  115. dfuClient->addServiceUrl(serviceUrl);
  116. dfuClient->setUsernameToken(user, password, "");
  117. Owned<IClientDFUFileAccessResponse> dfuResp;
  118. try
  119. {
  120. dfuResp.setown(doLookupDFUFile(dfuClient, logicalName, requestId, expirySecs));
  121. }
  122. catch (IException *e)
  123. {
  124. /* NB: there should really be a different IException class and a specific error code
  125. * The server knows it's an unsupported method.
  126. */
  127. if (SOAP_SERVER_ERROR != e->errorCode())
  128. throw;
  129. // fall through and try deprecated method
  130. e->Release();
  131. }
  132. if (!dfuResp)
  133. dfuResp.setown(doLookupDFUFileDeprecated(dfuClient, logicalName, requestId, expirySecs));
  134. const IMultiException *excep = &dfuResp->getExceptions(); // NB: warning despite getXX name, this does not Link
  135. if (excep->ordinality() > 0)
  136. throw LINK((IMultiException *)excep); // NB - const IException.. not caught in general..
  137. Owned<IDFUFileAccess> ret = createDFUFileAccess(dfuResp->getAccessInfo().getMetaInfoBlob());
  138. if (!ret->queryEngineInterface()->queryMeta()) // as a result of legacy WsDFU version
  139. {
  140. const MemoryBuffer &binLayout = dfuResp->getAccessInfo().getRecordTypeInfoBin();
  141. if (0 == binLayout.length())
  142. throw makeStringExceptionV(0, "lookupDFUFile(%s) - layout missing", logicalName);
  143. ret->queryEngineInterface()->setLayoutBin(binLayout.length(), binLayout.bytes());
  144. }
  145. return ret.getClear();
  146. }
  147. static IClientDFUFileCreateResponse *doCreateDFUFileDeprecated(IClientWsDfu *dfuClient, const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs)
  148. {
  149. Owned<IClientDFUFileCreateRequest> dfuReq = dfuClient->createDFUFileCreateRequest();
  150. dfuReq->setECLRecordDefinition(recDef);
  151. IEspDFUFileAccessRequestBase &requestBase = dfuReq->updateRequestBase();
  152. requestBase.setName(logicalName);
  153. requestBase.setCluster(cluster);
  154. requestBase.setExpirySeconds(expirySecs);
  155. requestBase.setJobId(requestId);
  156. requestBase.setAccessRole(CFileAccessRole_External);
  157. requestBase.setAccessType(CSecAccessType_Write);
  158. requestBase.setReturnBinTypeInfo(true);
  159. return dfuClient->DFUFileCreate(dfuReq);
  160. }
  161. static IClientDFUFileCreateResponse *doCreateDFUFile(IClientWsDfu *dfuClient, const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs, bool compressed)
  162. {
  163. Owned<IClientDFUFileCreateV2Request> dfuReq = dfuClient->createDFUFileCreateV2Request();
  164. dfuReq->setECLRecordDefinition(recDef);
  165. dfuReq->setName(logicalName);
  166. dfuReq->setCluster(cluster);
  167. dfuReq->setExpirySeconds(expirySecs);
  168. dfuReq->setRequestId(requestId);
  169. dfuReq->setCompressed(compressed);
  170. #ifdef TEST_RETURNTEXTRESPONSE
  171. dfuReq->setReturnTextResponse(true);
  172. #endif
  173. CDFUFileType serviceType;
  174. switch (type)
  175. {
  176. case dft_flat:
  177. serviceType = CDFUFileType_Flat;
  178. break;
  179. case dft_index:
  180. serviceType = CDFUFileType_Index;
  181. break;
  182. default:
  183. throwStringExceptionV(0, "Invalid DFU file type: %u", (unsigned)type);
  184. }
  185. dfuReq->setType(serviceType);
  186. return dfuClient->DFUFileCreateV2(dfuReq);
  187. }
  188. static IDFUFileAccess *doCreateDFUFileHandleLegacy(const char *serviceUrl, const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs, bool compressed, const char *user, const char *password)
  189. {
  190. Owned<IClientWsDfu> dfuClient = createWsDfuClient();
  191. dfuClient->addServiceUrl(serviceUrl);
  192. dfuClient->setUsernameToken(user, password, "");
  193. Owned<IClientDFUFileCreateResponse> dfuResp;
  194. try
  195. {
  196. dfuResp.setown(doCreateDFUFile(dfuClient, logicalName, cluster, type, recDef, requestId, expirySecs, compressed));
  197. }
  198. catch (IException *e)
  199. {
  200. /* NB: there should really be a different IException class and a specific error code
  201. * The server knows it's an unsupported method.
  202. */
  203. if (SOAP_SERVER_ERROR != e->errorCode())
  204. throw;
  205. // fall through and try deprecated method
  206. e->Release();
  207. }
  208. if (!dfuResp)
  209. {
  210. if (compressed)
  211. WARNLOG("createDFUFile(%s), legacy esp server does not support creating compressed files", logicalName);
  212. dfuResp.setown(doCreateDFUFileDeprecated(dfuClient, logicalName, cluster, type, recDef, requestId, expirySecs));
  213. }
  214. const IMultiException *excep = &dfuResp->getExceptions(); // NB: warning despite getXX name, this does not Link
  215. if (excep->ordinality() > 0)
  216. throw LINK((IMultiException *)excep); // NB: - const IException.. not caught in general..
  217. Owned<IDFUFileAccess> ret = createDFUFileAccess(dfuResp->getAccessInfo().getMetaInfoBlob(), dfuResp->getFileId());
  218. // NB: patch up record definition if server didn't return it (because legacy WsDFU version)
  219. if (!ret->queryEngineInterface()->queryProperties().hasProp("ECL"))
  220. ret->queryEngineInterface()->queryProperties().setProp("ECL", recDef);
  221. if (!ret->queryEngineInterface()->queryMeta()) // as a result of legacy WsDFU version
  222. {
  223. const MemoryBuffer &binLayout = dfuResp->getAccessInfo().getRecordTypeInfoBin();
  224. if (0 == binLayout.length())
  225. throw makeStringExceptionV(0, "createDFUFile(%s) - layout missing", logicalName);
  226. ret->queryEngineInterface()->setLayoutBin(binLayout.length(), binLayout.bytes());
  227. }
  228. return ret.getClear();
  229. }
  230. static void doPublishDFUFile(const char *serviceUrl, IDFUFileAccess *dfuFile, bool overwrite, const char *user, const char *password)
  231. {
  232. Owned<IClientWsDfu> dfuClient = createWsDfuClient();
  233. dfuClient->addServiceUrl(serviceUrl);
  234. dfuClient->setUsernameToken(user, password, "");
  235. Owned<IClientDFUFilePublishRequest> dfuReq = dfuClient->createDFUFilePublishRequest();
  236. dfuReq->setFileId(dfuFile->queryFileId());
  237. dfuReq->setOverwrite(overwrite); // NB: WsDfu min_ver 1.50
  238. IFileDescriptor &fileDesc = dfuFile->queryEngineInterface()->queryFileDescriptor();
  239. MemoryBuffer mb;
  240. fileDesc.serialize(mb);
  241. dfuReq->setFileDescriptorBlob(mb); // NB: WsDfu min_ver 1.50
  242. const char *eclRecDef = fileDesc.queryProperties().queryProp("ECL");
  243. dfuReq->setECLRecordDefinition(eclRecDef); // NB: WsDfu depv_ver < 1.50
  244. Owned<IClientDFUFilePublishResponse> dfuResp = dfuClient->DFUFilePublish(dfuReq);
  245. const IMultiException *excep = &dfuResp->getExceptions(); // NB: warning despite getXX name, this does not Link
  246. if (excep->ordinality() > 0)
  247. throw LINK((IMultiException *)excep); // NB: - const IException.. not caught in general..
  248. }
  249. // wrapper to the doLookupDFUFile, that discovers and tries DFUService URL's
  250. IDFUFileAccess *lookupDFUFile(const char *logicalName, const char *requestId, unsigned expirySecs, const char *user, const char *password)
  251. {
  252. unsigned currentUrl;
  253. const char *espServiceUrl = advanceToNextAvailableDFUServiceURL(currentUrl);
  254. unsigned c = getNumDfuServiceURL(); // max attempts
  255. while (c)
  256. {
  257. try
  258. {
  259. /* JCSMORE - where would locking fit in?
  260. * *IF* Esp established lock, then there'd be no association with this client (no state), and if Esp restarted lock would be lost,
  261. * if this client died, the lock would remain (on Esp).
  262. *
  263. * Idea:
  264. * 1) Esp establishes lock on behalf of this client.
  265. * 2) This client sends keep-alive packets every N seconds (To Esp).
  266. * 3) Esp ensures lock remains alive somehow (something (Esp?) could keep persistent [written] state of active locks?)
  267. * 4) If no keep-alive for a lock, Esp closes it.
  268. *
  269. * Would require the ability (in Dali) to create locks without session association.
  270. * As long as Dali is the lock manager, Would probably be best if the keep-alive packets were
  271. * forwarded to Dali, and it managed the live/stale locks.
  272. */
  273. return doLookupDFUFileHandleLegacy(espServiceUrl, logicalName, requestId, expirySecs, user, password);
  274. }
  275. catch (IJSOCK_Exception *e)
  276. {
  277. EXCLOG(e, nullptr);
  278. e->Release();
  279. }
  280. catch (IException *e)
  281. {
  282. if (ECLWATCH_FILE_NOT_EXIST == e->errorCode())
  283. {
  284. e->Release();
  285. return nullptr; // not found
  286. }
  287. throw;
  288. }
  289. espServiceUrl = advanceToNextAvailableDFUServiceURL(currentUrl);
  290. --c;
  291. }
  292. StringBuffer msg("Failed to contact DFU service: { ");
  293. for (auto &url: dfuServiceUrls)
  294. msg.append(url.c_str());
  295. msg.append("}");
  296. throw makeStringException(0, msg.str());
  297. }
  298. IDFUFileAccess *lookupDFUFile(const char *logicalName, const char *requestId, unsigned expirySecs, IUserDescriptor *userDesc)
  299. {
  300. assertex(userDesc);
  301. StringBuffer user, password;
  302. userDesc->getUserName(user);
  303. userDesc->getPassword(password);
  304. IDFUFileAccess *ret = lookupDFUFile(logicalName, requestId, expirySecs, user, password);
  305. if (ret)
  306. ret->setFileOption(dfo_compressedRemoteStreams);
  307. return ret;
  308. }
  309. // wrapper to the doCreateDFUFile, that discovers and tries DFUService URL's
  310. IDFUFileAccess *createDFUFile(const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs, bool compressed, const char *user, const char *password)
  311. {
  312. unsigned currentUrl;
  313. const char *espServiceUrl = advanceToNextAvailableDFUServiceURL(currentUrl);
  314. unsigned c = getNumDfuServiceURL(); // max attempts
  315. while (c)
  316. {
  317. try
  318. {
  319. return doCreateDFUFileHandleLegacy(espServiceUrl, logicalName, cluster, type, recDef, requestId, expirySecs, compressed, user, password);
  320. }
  321. catch (IJSOCK_Exception *e)
  322. {
  323. EXCLOG(e, nullptr);
  324. e->Release();
  325. }
  326. advanceToNextAvailableDFUServiceURL(currentUrl);
  327. --c;
  328. }
  329. StringBuffer msg("Failed to contact DFU service: { ");
  330. for (auto &url: dfuServiceUrls)
  331. msg.append(url.c_str());
  332. msg.append("}");
  333. throw makeStringException(0, msg.str());
  334. }
  335. // NB: no way to create grouped flat file output at the moment, but not sure would ever want to support that.
  336. IDFUFileAccess *createDFUFile(const char *logicalName, const char *cluster, DFUFileType type, const char *recDef, const char *requestId, unsigned expirySecs, bool compressed, IUserDescriptor *userDesc)
  337. {
  338. assertex(userDesc);
  339. StringBuffer user, password;
  340. userDesc->getUserName(user);
  341. userDesc->getPassword(password);
  342. return createDFUFile(logicalName, cluster, type, recDef, requestId, expirySecs, compressed, user, password);
  343. }
  344. // wrapper to the doPublishDFUFile, that discovers and tries DFUService URL's
  345. void publishDFUFile(IDFUFileAccess *dfuFile, bool overwrite, const char *user, const char *password)
  346. {
  347. unsigned currentUrl;
  348. const char *espServiceUrl = advanceToNextAvailableDFUServiceURL(currentUrl);
  349. unsigned c = getNumDfuServiceURL(); // max attempts
  350. while (c)
  351. {
  352. try
  353. {
  354. doPublishDFUFile(espServiceUrl, dfuFile, overwrite, user, password);
  355. return;
  356. }
  357. catch (IJSOCK_Exception *e)
  358. {
  359. EXCLOG(e, nullptr);
  360. e->Release();
  361. }
  362. advanceToNextAvailableDFUServiceURL(currentUrl);
  363. --c;
  364. }
  365. StringBuffer msg("Failed to contact DFU service: { ");
  366. for (auto &url: dfuServiceUrls)
  367. msg.append(url.c_str());
  368. msg.append("}");
  369. throw makeStringException(0, msg.str());
  370. }
  371. void publishDFUFile(IDFUFileAccess *dfuFile, bool overwrite, IUserDescriptor *userDesc)
  372. {
  373. assertex(userDesc);
  374. StringBuffer user, password;
  375. userDesc->getUserName(user);
  376. userDesc->getPassword(password);
  377. publishDFUFile(dfuFile, overwrite, user, password);
  378. }
  379. /*
  380. * createDFUFileAccess() and encodeDFUFileMeta() will normally be called by the DFU service
  381. * via a DFS file request. So that the meta info blob can be returned to the client of the service.
  382. * However, for testing purposes it's also useful to create these blobs elsewhere directly from IFileDescriptor's
  383. */
  384. IPropertyTree *createDFUFileMetaInfo(const char *fileName, IFileDescriptor *fileDesc, const char *requestId, const char *accessType, unsigned expirySecs,
  385. IUserDescriptor *userDesc, const char *keyPairName, unsigned port, bool secure, unsigned maxFileAccessExpirySeconds)
  386. {
  387. /*
  388. * version
  389. * fileName
  390. * requestId [optional]
  391. * accessType [const "READ" for this method]
  392. * user
  393. * port (int) // port # of dafilesrv srvice to connect to
  394. * secure (bool) // if true = SSL connection
  395. * keyPairName // name of key pair to use
  396. * expiryTime // (seconds) timeout for validity of this request
  397. * jsonTypeInfo // JSON representation of the file's record definition
  398. */
  399. Owned<IPropertyTree> metaInfo = createPTree();
  400. metaInfo->setProp("logicalFilename", fileName);
  401. if (!isEmptyString(requestId))
  402. metaInfo->setProp("requestId", requestId);
  403. metaInfo->setProp("accessType", accessType);
  404. StringBuffer userStr;
  405. if (userDesc)
  406. metaInfo->setProp("user", userDesc->getUserName(userStr).str());
  407. // key, port, secure
  408. metaInfo->setPropInt("port", port);
  409. metaInfo->setPropBool("secure", secure);
  410. if (!isEmptyString(keyPairName))
  411. metaInfo->setProp("keyPairName", keyPairName);
  412. // expiry time
  413. if (expirySecs > maxFileAccessExpirySeconds)
  414. expirySecs = maxFileAccessExpirySeconds;
  415. time_t now;
  416. time(&now);
  417. CDateTime expiryDt;
  418. expiryDt.set(now + expirySecs);
  419. StringBuffer expiryTime;
  420. expiryDt.getString(expiryTime);
  421. metaInfo->setProp("expiryTime", expiryTime);
  422. // layout
  423. MemoryBuffer binLayout;
  424. if (getDaliLayoutInfo(binLayout, fileDesc->queryProperties()))
  425. metaInfo->setPropBin("binLayout", binLayout.length(), binLayout.toByteArray());
  426. // file meta info
  427. INode *node1 = fileDesc->queryNode(0);
  428. SocketEndpoint ep = node1->endpoint();
  429. unsigned dafilesrvVersion = getCachedRemoteVersion(node1->endpoint(), secure);
  430. if (dafilesrvVersion < DAFILESRV_STREAMGENERAL_MINVERSION)
  431. {
  432. metaInfo->setPropInt("version", 1); // legacy format
  433. extractFilePartInfo(*metaInfo, *fileDesc);
  434. }
  435. else
  436. {
  437. metaInfo->setPropInt("version", DAFILESRV_METAINFOVERSION);
  438. IPropertyTree *fileInfoTree = metaInfo->setPropTree("FileInfo");
  439. fileDesc->serializeTree(*fileInfoTree);
  440. }
  441. return metaInfo.getClear();
  442. }
  443. StringBuffer &encodeDFUFileMeta(StringBuffer &metaInfoBlob, IPropertyTree *metaInfo, IConstEnvironment *environment)
  444. {
  445. MemoryBuffer metaInfoMb;
  446. /* NB: If file access security is disabled in the environment, or on a per cluster basis
  447. * keyPairName will be blank. In that case the meta data is returned in plain format.
  448. * NB2: Dafilesrv's would also require file access security to be disabled in that case,
  449. * otherwise they will be denied access.
  450. * Should be part of the same configuration setup.
  451. */
  452. #ifdef _USE_OPENSSL
  453. if (metaInfo->hasProp("keyPairName") && environment) // without it, meta data is not encrypted
  454. {
  455. MemoryBuffer metaInfoBlob;
  456. metaInfo->serialize(metaInfoBlob);
  457. const char *keyPairName = metaInfo->queryProp("keyPairName");
  458. const char *privateKeyFName = environment->getPrivateKeyPath(keyPairName);
  459. Owned<CLoadedKey> privateKey = loadPrivateKeyFromFile(privateKeyFName, nullptr);
  460. StringBuffer metaInfoSignature;
  461. digiSign(metaInfoSignature, metaInfoBlob.length(), metaInfoBlob.bytes(), *privateKey);
  462. Owned<IPropertyTree> metaInfoEnvelope = createPTree();
  463. metaInfoEnvelope->setProp("signature", metaInfoSignature);
  464. metaInfoEnvelope->setPropBin("metaInfoBlob", metaInfoBlob.length(), metaInfoBlob.bytes());
  465. metaInfoEnvelope->serialize(metaInfoMb.clear());
  466. }
  467. else
  468. #endif
  469. metaInfo->serialize(metaInfoMb);
  470. MemoryBuffer compressedMetaInfoMb;
  471. fastLZCompressToBuffer(compressedMetaInfoMb, metaInfoMb.length(), metaInfoMb.bytes());
  472. JBASE64_Encode(compressedMetaInfoMb.bytes(), compressedMetaInfoMb.length(), metaInfoBlob, false);
  473. return metaInfoBlob;
  474. }
  475. } // namespace wsdfuaccess
  476. #ifdef _USE_CPPUNIT
  477. #include "unittests.hpp"
  478. #include "dafsserver.hpp"
  479. #include "rmtfile.hpp"
  480. #include "dafscommon.hpp"
  481. #include "portlist.h"
  482. using namespace wsdfuaccess;
  483. class DFUAccessTests : public CppUnit::TestFixture
  484. {
  485. CPPUNIT_TEST_SUITE(DFUAccessTests);
  486. CPPUNIT_TEST(testStartServer);
  487. CPPUNIT_TEST(testDaFsStreamingStd);
  488. CPPUNIT_TEST(testDaFsStreamingCompressed);
  489. CPPUNIT_TEST(testDaFsStreamingGrouped);
  490. CPPUNIT_TEST(testDaFsStreamingCompressedAndGrouped);
  491. CPPUNIT_TEST(testFinish);
  492. CPPUNIT_TEST_SUITE_END();
  493. static unsigned serverPort;
  494. StringBuffer basePath;
  495. Owned<CSimpleInterface> serverThread;
  496. Owned<IFileDescriptor> fileDesc;
  497. protected:
  498. void testStartServer()
  499. {
  500. Owned<ISocket> socket;
  501. unsigned endPort = MP_END_PORT;
  502. while (1)
  503. {
  504. try
  505. {
  506. socket.setown(ISocket::create(serverPort));
  507. break;
  508. }
  509. catch (IJSOCK_Exception *e)
  510. {
  511. if (e->errorCode() != JSOCKERR_port_in_use)
  512. {
  513. StringBuffer eStr;
  514. e->errorMessage(eStr);
  515. e->Release();
  516. CPPUNIT_ASSERT_MESSAGE(eStr.str(), 0);
  517. }
  518. else if (serverPort == endPort)
  519. {
  520. e->Release();
  521. CPPUNIT_ASSERT_MESSAGE("Could not find a free port to use for remote file server", 0);
  522. }
  523. }
  524. ++serverPort;
  525. }
  526. basePath.append("//");
  527. SocketEndpoint ep(serverPort);
  528. ep.getUrlStr(basePath);
  529. char cpath[_MAX_DIR];
  530. if (!GetCurrentDirectory(_MAX_DIR, cpath))
  531. CPPUNIT_ASSERT_MESSAGE("Current directory path too big", 0);
  532. else
  533. basePath.append(cpath);
  534. addPathSepChar(basePath);
  535. class CServerThread : public CSimpleInterface, implements IThreaded
  536. {
  537. CThreaded threaded;
  538. Owned<IRemoteFileServer> server;
  539. Linked<ISocket> socket;
  540. public:
  541. CServerThread(IRemoteFileServer *_server, ISocket *_socket) : server(_server), socket(_socket), threaded("CServerThread")
  542. {
  543. threaded.init(this);
  544. }
  545. ~CServerThread()
  546. {
  547. threaded.join();
  548. }
  549. // IThreaded
  550. virtual void threadmain() override
  551. {
  552. DAFSConnectCfg sslCfg = SSLNone;
  553. server->run(sslCfg, socket, nullptr, nullptr);
  554. }
  555. };
  556. Owned<IRemoteFileServer> server = createRemoteFileServer();
  557. serverThread.setown(new CServerThread(QUERYINTERFACE(server.getClear(), IRemoteFileServer), socket.getClear()));
  558. }
  559. void testDaFsStreaming(bool compressed, bool grouped)
  560. {
  561. const char *thorInstance = "mythor";
  562. const char *groupName = thorInstance;
  563. const char *fname = ".::dfuaccess::testfname1";
  564. IUserDescriptor *userDesc = nullptr;
  565. const char *keyPairName = nullptr;
  566. unsigned port = 0;
  567. bool secure = false;
  568. unsigned expiryTime = 60;
  569. unsigned maxFileAccessExpirySeconds = 300;
  570. unsigned numRecsInTest = 1000;
  571. const char *eclRecDef = "{ string5 f1; string10 f2; };";
  572. size32_t fixedRecSize = 15;
  573. fileDesc.setown(createFileDescriptor());
  574. GroupType groupType;
  575. StringBuffer basedir;
  576. SocketEndpointArray eps;
  577. SocketEndpoint ep(".", serverPort);
  578. eps.append(ep);
  579. Owned<IGroup> group = createIGroup(eps);
  580. fileDesc.setown(createFileDescriptor(fname, "thor", "mythor", group));
  581. fileDesc->queryProperties().setProp("ECL", eclRecDef);
  582. if (grouped)
  583. fileDesc->queryProperties().setPropBool("@grouped", true);
  584. Owned<IPropertyTree> metaInfo = createDFUFileMetaInfo(fname, fileDesc, "cppunit-test1", "WRITE", 30,
  585. userDesc, keyPairName, port, secure, maxFileAccessExpirySeconds);
  586. StringBuffer metaInfoBlob;
  587. encodeDFUFileMeta(metaInfoBlob, metaInfo, nullptr);
  588. Owned<IDFUFileAccess> newFile = createDFUFileAccess(metaInfoBlob);
  589. newFile->setStreamReplyLimitK(1); // set a low limit to force testing of continuation etc.
  590. if (compressed)
  591. newFile->setFileOption(dfo_compressedRemoteStreams);
  592. else
  593. newFile->clearFileOption(dfo_compressedRemoteStreams);
  594. CRC32 writeCrc32;
  595. // write
  596. unsigned n = newFile->queryNumParts();
  597. for (unsigned p=0; p<n; p++)
  598. {
  599. Owned<IDFUFilePartWriter> writer = newFile->createFilePartWriter(p);
  600. writer->start();
  601. for (unsigned r=0; r<numRecsInTest; r++)
  602. {
  603. VStringBuffer rowData("%5u%10u", r, r);
  604. writer->write(fixedRecSize, rowData.str());
  605. writeCrc32.tally(fixedRecSize, rowData.str());
  606. if (grouped)
  607. writer->write(0, nullptr); // eog
  608. }
  609. }
  610. newFile->setFilePropertyInt("@recordCount", numRecsInTest);
  611. // publish would normally happen here, but this unittest is self-contained (no esp etc.)
  612. CRC32 readCrc32;
  613. // read back
  614. for (unsigned p=0; p<n; p++)
  615. {
  616. Owned<IDFUFilePartReader> reader = newFile->createFilePartReader(p, 0, nullptr, true);
  617. reader->start();
  618. for (unsigned r=0; r<numRecsInTest; r++)
  619. {
  620. size32_t sz;
  621. const void *row = reader->nextRow(sz);
  622. if (!row)
  623. {
  624. row = reader->nextRow(sz);
  625. assertex(row);
  626. }
  627. readCrc32.tally(sz, row);
  628. #ifdef _DEBUG
  629. printf("%.*s%.*s\n", 5, (const char *)row, 10, ((const char *)row)+5);
  630. #endif
  631. }
  632. }
  633. if (writeCrc32.get() != readCrc32.get())
  634. {
  635. VStringBuffer errMsg("DFU write/read test: crc's don't match. Write crc=%x, read crc=%x", writeCrc32.get(), readCrc32.get());
  636. CPPUNIT_ASSERT_MESSAGE(errMsg.str(), 0);
  637. }
  638. }
  639. void testDaFsStreamingStd()
  640. {
  641. testDaFsStreaming(false, false);
  642. }
  643. void testDaFsStreamingCompressed()
  644. {
  645. testDaFsStreaming(true, false);
  646. }
  647. void testDaFsStreamingGrouped()
  648. {
  649. testDaFsStreaming(false, true);
  650. }
  651. void testDaFsStreamingCompressedAndGrouped()
  652. {
  653. testDaFsStreaming(true, true);
  654. }
  655. void testFinish()
  656. {
  657. // clearup
  658. if (fileDesc)
  659. {
  660. RemoteFilename rfn;
  661. fileDesc->getFilename(0, 0, rfn);
  662. StringBuffer path;
  663. rfn.getPath(path);
  664. Owned<IFile> iFile = createIFile(path);
  665. iFile->remove();
  666. }
  667. SocketEndpoint ep(serverPort);
  668. Owned<ISocket> sock = ISocket::connect_timeout(ep, 60 * 1000);
  669. CPPUNIT_ASSERT(RFEnoerror == stopRemoteServer(sock));
  670. serverThread.clear();
  671. }
  672. };
  673. /* MP_START_PORT -> MP_END_PORT is the MP reserved dynamic port range, and is used here for convenience.
  674. * MP_START_PORT is used as starting point to find an available port for the temporary dafilesrv service in these unittests.
  675. * All (MP) components using this range always check and find an unused port.
  676. */
  677. unsigned DFUAccessTests::serverPort = MP_START_PORT;
  678. CPPUNIT_TEST_SUITE_REGISTRATION( DFUAccessTests );
  679. CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( DFUAccessTests, "DFUAccessTests" );
  680. #endif // _USE_CPPUNIT