dafsstream.cpp 49 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include <atomic>
  14. #include <string>
  15. #include <array>
  16. #include <unordered_map>
  17. #include "platform.h"
  18. #include "jfile.hpp"
  19. #include "jflz.hpp"
  20. #include "jlzw.hpp"
  21. #include "jlog.hpp"
  22. #include "jmisc.hpp"
  23. #include "jptree.hpp"
  24. #include "jsocket.hpp"
  25. #include "dadfs.hpp"
  26. #include "dafdesc.hpp"
  27. #include "hqlexpr.hpp"
  28. #include "rtlcommon.hpp"
  29. #include "rtldynfield.hpp"
  30. #include "eclhelper_dyn.hpp"
  31. #include "rmtclient.hpp"
  32. #include "dafscommon.hpp"
  33. #include "dafsstream.hpp"
  34. namespace dafsstream
  35. {
  36. static const DFUFileOption defaultFileOptions = dfo_compressedRemoteStreams;
  37. static StringAttr defaultCompCompression = "LZ4";
  38. static const char *DFUFileIdSeparator = "|";
  39. static const unsigned defaultExpirySecs = 300;
  40. static const char *getReadActivityString(DFUFileType fileType)
  41. {
  42. switch (fileType)
  43. {
  44. case dft_flat:
  45. return "diskread";
  46. case dft_index:
  47. return "indexread";
  48. case dft_csv:
  49. return "csvread";
  50. case dft_xml:
  51. return "xmlread";
  52. case dft_json:
  53. return "jsonread";
  54. }
  55. return "unknown";
  56. }
  57. class CDaFsException : public CSimpleInterfaceOf<IDaFsException>
  58. {
  59. DaFsExceptionCode code;
  60. StringAttr msg;
  61. MessageAudience aud;
  62. public:
  63. CDaFsException(DaFsExceptionCode _code, const char *_msg, MessageAudience _aud) : code(_code), msg(_msg), aud(_aud)
  64. {
  65. }
  66. virtual int errorCode() const { return code; }
  67. virtual StringBuffer & errorMessage(StringBuffer &msg) const { return msg; }
  68. virtual MessageAudience errorAudience() const { return aud; }
  69. };
  70. static IDaFsException *makeDaFsClientException(DaFsExceptionCode code, MessageAudience aud, const char *message)
  71. {
  72. return new CDaFsException(code, message, aud);
  73. }
  74. static IDaFsException *makeDaFsClientExceptionVA(DaFsExceptionCode code, MessageAudience aud, const char *format, va_list args) __attribute__((format(printf,3,0)));
  75. static IDaFsException *makeDaFsClientExceptionVA(DaFsExceptionCode code, MessageAudience aud, const char *format, va_list args)
  76. {
  77. StringBuffer eStr;
  78. eStr.limited_valist_appendf(1024, format, args);
  79. return new CDaFsException(code, eStr.str(), aud);
  80. }
  81. static void throwDsFsClientException(DaFsExceptionCode code, const char *format)
  82. {
  83. throw makeDaFsClientException(code, MSGAUD_programmer, format);
  84. }
  85. static void throwDsFsClientExceptionV(DaFsExceptionCode code, const char *format, ...) __attribute__((format(printf,2,3)));
  86. static void throwDsFsClientExceptionV(DaFsExceptionCode code, const char *format, ...)
  87. {
  88. va_list args;
  89. va_start(args, format);
  90. IDaFsException *ret = makeDaFsClientExceptionVA(code, MSGAUD_programmer, format, args);
  91. va_end(args);
  92. throw ret;
  93. }
  94. class CDFUFile : public CSimpleInterfaceOf<IDFUFileAccess>, implements IDFUFileAccessExt
  95. {
  96. typedef CSimpleInterfaceOf<IDFUFileAccess> PARENT;
  97. StringAttr name;
  98. StringAttr fileId;
  99. SecAccessFlags accessType = SecAccess_None;
  100. unsigned expirySecs = defaultExpirySecs;
  101. Owned<IPropertyTree> metaInfo;
  102. unsigned numParts = 0;
  103. bool grouped = false;
  104. DFUFileType fileType = dft_none;
  105. Owned<IOutputMetaData> actualMeta;
  106. Owned<IFileDescriptor> fileDesc;
  107. unsigned maxCopiesPerPart = 0;
  108. StringBuffer groupName;
  109. mutable MemoryBuffer binLayout;
  110. mutable CriticalSection decodeJsonCrit;
  111. mutable bool gotJsonTypeInfo = false;
  112. mutable StringBuffer jsonLayout;
  113. StringAttr metaInfoBlobB64;
  114. std::vector<std::string> hosts;
  115. DFUFileOption fileOptions = defaultFileOptions;
  116. StringAttr commCompType = defaultCompCompression;
  117. unsigned rowStreamReplyLimitKb = 1024; // 1MB
  118. Owned<IPropertyTree> options;
  119. public:
  120. IMPLEMENT_IINTERFACE_USING(PARENT);
  121. CDFUFile(const char *_metaInfoBlobB64, const char *_fileId) : metaInfoBlobB64(_metaInfoBlobB64), fileId(_fileId)
  122. {
  123. MemoryBuffer compressedMetaInfoMb;
  124. JBASE64_Decode(metaInfoBlobB64, compressedMetaInfoMb);
  125. MemoryBuffer decompressedMetaInfoMb;
  126. fastLZDecompressToBuffer(decompressedMetaInfoMb, compressedMetaInfoMb);
  127. Owned<IPropertyTree> metaInfoEnvelope = createPTree(decompressedMetaInfoMb);
  128. StringBuffer metaInfoSignature;
  129. if (metaInfoEnvelope->getProp("signature", metaInfoSignature))
  130. {
  131. MemoryBuffer metaInfoBlob;
  132. metaInfoEnvelope->getPropBin("metaInfoBlob", metaInfoBlob);
  133. metaInfo.setown(createPTree(metaInfoBlob));
  134. }
  135. else
  136. metaInfo.set(metaInfoEnvelope);
  137. name.set(metaInfo->queryProp("logicalFilename"));
  138. if (name.isEmpty())
  139. throwDsFsClientExceptionV(DaFsClient_InvalidMetaInfo, "logicalFilename missing");
  140. IPropertyTree *fileInfo = metaInfo->queryPropTree("FileInfo");
  141. if (!fileInfo)
  142. throwDsFsClientExceptionV(DaFsClient_InvalidMetaInfo, "FileInfo is missing (logicalFilename=%s", name.get());
  143. unsigned metaInfoVersion = metaInfo->getPropInt("version");
  144. switch (metaInfoVersion)
  145. {
  146. case 0:
  147. // implies unsigned direct request from engines (on unsecure port)
  148. // fall through
  149. case 1:
  150. {
  151. // old metaInfo, reconstruct a IFileDescriptor for ease of compatibility with rest of code
  152. unsigned numParts = fileInfo->getCount("Part");
  153. // calculate part mask
  154. const char *path = fileInfo->queryProp("Part[1]/Copy[1]/@filePath");
  155. if (isEmptyString(path))
  156. throwDsFsClientExceptionV(DaFsClient_InvalidMetaInfo, "filePath not found (logicalFilename=%s", name.get());
  157. StringBuffer dir, fname, ext;
  158. splitFilename(path, &dir, &dir, &fname, &ext);
  159. VStringBuffer partMask("%s._$P$_of_%u", fname.str(), numParts);
  160. // reconstruct group
  161. SocketEndpointArray eps;
  162. bool replicated = false;
  163. Owned<IPropertyTreeIterator> iter = fileInfo->getElements("Part");
  164. ForEach(*iter)
  165. {
  166. IPropertyTree &part = iter->query();
  167. if (part.hasProp("Copy[2]"))
  168. replicated = true;
  169. const char *host = part.queryProp("Copy[1]/@host");
  170. SocketEndpoint ep(host);
  171. eps.append(ep);
  172. }
  173. StringBuffer groupText;
  174. eps.getText(groupText);
  175. Owned<IGroup> group = createIGroup(eps);
  176. ClusterPartDiskMapSpec mspec;
  177. mspec.defaultCopies = replicated?DFD_DefaultCopies:DFD_NoCopies;
  178. fileDesc.setown(createFileDescriptor());
  179. fileDesc->setDefaultDir(dir.str());
  180. fileDesc->setNumParts(numParts);
  181. fileDesc->setPartMask(partMask);
  182. fileDesc->addCluster(group, mspec);
  183. break;
  184. }
  185. case 2: // serialized compact IFileDescriptor
  186. {
  187. fileDesc.setown(deserializeFileDescriptorTree(fileInfo));
  188. break;
  189. }
  190. }
  191. if (isFileKey(fileDesc))
  192. fileType = dft_index;
  193. else
  194. {
  195. const char *kind = fileDesc->queryKind();
  196. if (kind)
  197. {
  198. if (streq("csv", kind))
  199. fileType = dft_csv;
  200. else if (streq("xml", kind))
  201. fileType = dft_xml;
  202. else if (streq("json", kind))
  203. fileType = dft_json;
  204. else
  205. fileType = dft_flat;
  206. }
  207. else
  208. fileType = dft_flat;
  209. }
  210. fileDesc->getClusterGroupName(0, groupName);
  211. grouped = fileDesc->isGrouped();
  212. numParts = fileDesc->numParts();
  213. /* NB: all parts should have same # of copies
  214. * But find max just in case.
  215. */
  216. for (unsigned p=0; p<numParts; p++)
  217. {
  218. unsigned numCopies = fileDesc->numCopies(p);
  219. if (numCopies > maxCopiesPerPart)
  220. maxCopiesPerPart = numCopies;
  221. }
  222. hosts.resize(numParts*maxCopiesPerPart);
  223. for (unsigned p=0; p<numParts; p++)
  224. {
  225. unsigned numCopies = fileDesc->numCopies(p);
  226. for (unsigned c=0; c<numCopies; c++)
  227. {
  228. StringBuffer host;
  229. fileDesc->queryNode(p, c)->endpoint().getUrlStr(host);
  230. unsigned pos = p*maxCopiesPerPart+c;
  231. if (hosts.size() <= pos)
  232. hosts.resize(pos+1); // ensure big enough
  233. hosts.at(pos) = host.str();
  234. }
  235. }
  236. if (metaInfo->getPropBin("binLayout", binLayout))
  237. {
  238. actualMeta.setown(createTypeInfoOutputMetaData(binLayout, grouped));
  239. binLayout.reset(0);
  240. }
  241. if (!fileId) // new esp client, construct a fileId (to be used for e.g. publish)
  242. {
  243. const char *clusterName = metaInfo->queryProp("clusterName");
  244. //create FileId
  245. StringBuffer tmp;
  246. tmp.set(groupName.str()).append(DFUFileIdSeparator).append(clusterName).append(DFUFileIdSeparator).append(name);
  247. if (queryIsCompressed())
  248. tmp.append(DFUFileIdSeparator).append("true");
  249. fileId.set(tmp);
  250. }
  251. }
  252. const MemoryBuffer &queryBinTypeInfo() const
  253. {
  254. return binLayout;
  255. }
  256. const char *queryMetaInfoBlob() const
  257. {
  258. return metaInfoBlobB64;
  259. }
  260. IPropertyTree *queryMetaInfo() const
  261. {
  262. return metaInfo;
  263. }
  264. unsigned queryRowStreamReplyLimitKb() const
  265. {
  266. return rowStreamReplyLimitKb;
  267. }
  268. const IPropertyTree *queryOptions() const
  269. {
  270. return options;
  271. }
  272. // IDFUFileAccessExt
  273. virtual IOutputMetaData *queryMeta() const override
  274. {
  275. return actualMeta;
  276. }
  277. virtual IFileDescriptor &queryFileDescriptor() const
  278. {
  279. return *fileDesc;
  280. }
  281. virtual IPropertyTree &queryProperties() const override
  282. {
  283. return fileDesc->queryProperties();
  284. }
  285. virtual void setLayoutBin(size32_t sz, const void *layoutBin) override
  286. {
  287. binLayout.clear().append(sz, layoutBin);
  288. actualMeta.setown(createTypeInfoOutputMetaData(binLayout, grouped));
  289. binLayout.reset(0);
  290. }
  291. // IDFUFileAccess impl.
  292. virtual const char *queryName() const override
  293. {
  294. return name;
  295. }
  296. virtual const char *queryFileId() const override
  297. {
  298. return fileId;
  299. }
  300. virtual unsigned queryNumParts() const override
  301. {
  302. return numParts;
  303. }
  304. virtual SecAccessFlags queryAccessType() const
  305. {
  306. return accessType;
  307. }
  308. virtual bool queryIsGrouped() const override
  309. {
  310. return grouped;
  311. }
  312. virtual DFUFileType queryType() const override
  313. {
  314. return fileType;
  315. }
  316. virtual bool queryIsCompressed() const override
  317. {
  318. return fileDesc->isCompressed();
  319. }
  320. virtual const char *queryClusterGroupName() const override
  321. {
  322. return groupName;
  323. }
  324. virtual const char *queryPartHost(unsigned part, unsigned copy=0) const override
  325. {
  326. return hosts[part*maxCopiesPerPart+copy].c_str();
  327. }
  328. virtual const char *queryJSONTypeInfo() const override
  329. {
  330. CriticalBlock b(decodeJsonCrit);
  331. if (!gotJsonTypeInfo)
  332. {
  333. Owned<IRtlFieldTypeDeserializer> deserializer = createRtlFieldTypeDeserializer();
  334. const RtlTypeInfo *typeInfo = deserializer->deserialize(binLayout.reset(0));
  335. if (!dumpTypeInfo(jsonLayout, typeInfo))
  336. throwUnexpected();
  337. gotJsonTypeInfo = true;
  338. }
  339. return jsonLayout;
  340. }
  341. virtual const char *queryECLRecordDefinition() const override
  342. {
  343. /* JCSMORE - need a helper method that can dump type info to ECL format
  344. * Could then also be used to normalize a ECL definition.
  345. */
  346. UNIMPLEMENTED_X("queryECLRecordDefinition");
  347. }
  348. virtual const void *queryMetaInfoBlob(size32_t &sz) const override
  349. {
  350. sz = metaInfoBlobB64.length();
  351. return metaInfoBlobB64;
  352. }
  353. virtual const char *queryCommCompressionType() const override
  354. {
  355. return commCompType;
  356. }
  357. virtual void setCommCompressionType(const char *compType) override
  358. {
  359. commCompType.set(compType);
  360. }
  361. virtual DFUFileOption queryFileOptions() const override
  362. {
  363. return fileOptions;
  364. }
  365. virtual bool isFileOptionSet(DFUFileOption opt) const override
  366. {
  367. return (fileOptions & opt);
  368. }
  369. virtual const char *queryFileProperty(const char *name) const override
  370. {
  371. return fileDesc->queryProperties().queryProp(name);
  372. }
  373. virtual __int64 queryFilePropertyInt(const char *name) const override
  374. {
  375. return fileDesc->queryProperties().getPropInt64(name);
  376. }
  377. virtual const char *queryPartProperty(unsigned part, const char *name) const override
  378. {
  379. return fileDesc->queryPart(part)->queryProperties().queryProp(name);
  380. }
  381. virtual __int64 queryPartPropertyInt(unsigned part, const char *name) const override
  382. {
  383. return fileDesc->queryPart(part)->queryProperties().getPropInt64(name);
  384. }
  385. virtual void setFileOption(DFUFileOption opt) override
  386. {
  387. fileOptions = (DFUFileOption)(((unsigned)fileOptions) | (unsigned)opt);
  388. }
  389. virtual void clearFileOption(DFUFileOption opt) override
  390. {
  391. fileOptions = (DFUFileOption)(((unsigned)fileOptions) & ~((unsigned)opt));
  392. }
  393. virtual void setECLRecordDefinition(const char *eclRecDef) override
  394. {
  395. MemoryBuffer layoutBin;
  396. MultiErrorReceiver errs;
  397. Owned<IHqlExpression> expr = parseQuery(eclRecDef, &errs);
  398. if (errs.errCount() > 0)
  399. {
  400. StringBuffer errorMsg;
  401. throwDsFsClientExceptionV(DaFsClient_ECLParseError, "Failed in parsing ECL %s: %s.", eclRecDef, errs.toString(errorMsg).str());
  402. }
  403. if (!expr)
  404. throwDsFsClientExceptionV(DaFsClient_ECLParseError, "Failed in parsing ECL: %s.", eclRecDef);
  405. if (!exportBinaryType(layoutBin, expr, false))
  406. throwDsFsClientException(DaFsClient_ECLParseError, "Failed in exportBinaryType.");
  407. fileDesc->queryProperties().setProp("ECL", eclRecDef);
  408. fileDesc->queryProperties().setPropBin("_rtlType", layoutBin.length(), layoutBin.toByteArray());
  409. }
  410. virtual void setFileProperty(const char *name, const char *value) override
  411. {
  412. fileDesc->queryProperties().setProp(name, value);
  413. }
  414. virtual void setFilePropertyInt(const char *name, __int64 value) override
  415. {
  416. fileDesc->queryProperties().setPropInt64(name, value);
  417. }
  418. virtual void setPartProperty(unsigned part, const char *name, const char *value) override
  419. {
  420. fileDesc->queryPart(part)->queryProperties().setProp(name, value);
  421. }
  422. virtual void setPartPropertyInt(unsigned part, const char *name, __int64 value) override
  423. {
  424. fileDesc->queryPart(part)->queryProperties().setPropInt64(name, value);
  425. }
  426. virtual void setStreamReplyLimitK(unsigned k) override
  427. {
  428. rowStreamReplyLimitKb = k;
  429. }
  430. virtual void setExpirySecs(unsigned secs) override
  431. {
  432. expirySecs = secs;
  433. }
  434. virtual void setOption(const char *key, const char *value) override
  435. {
  436. if (!options)
  437. options.setown(createPTree());
  438. options->setProp(key, value);
  439. }
  440. // NB: the intention is for a IDFUFileAccess to be used to create instances for multiple parts, but not to mix types.
  441. virtual IDFUFilePartReader *createFilePartReader(unsigned p, unsigned copy, IOutputMetaData *outMeta, bool preserveGrouping) override;
  442. virtual IDFUFilePartWriter *createFilePartWriter(unsigned p) override;
  443. virtual IDFUFilePartWriter *createFilePartAppender(unsigned p) override;
  444. virtual IDFUFileAccessExt *queryEngineInterface() override { return this; }
  445. };
  446. IDFUFileAccess *createDFUFileAccess(const char *metaInfoBlobB64, const char *fileId)
  447. {
  448. return new CDFUFile(metaInfoBlobB64, fileId);
  449. }
  450. class CDaFileSrvClientBase : public CInterfaceOf<IDFUFilePartBase>
  451. {
  452. protected:
  453. Linked<CDFUFile> file;
  454. unsigned part = 0;
  455. unsigned copy = 0;
  456. Owned<IPropertyTree> requestTree;
  457. IPropertyTree *requestNode = nullptr;
  458. size32_t jsonRequestStartPos = 0;
  459. size32_t jsonRequestEndPos = 0;
  460. MemoryBuffer sendMb;
  461. Owned<IDaFsConnection> daFsConnection;
  462. int handle = 0;
  463. unsigned serverVersion = 0;
  464. bool started = false;
  465. bool checkAccess(SecAccessFlags accessWanted)
  466. {
  467. return 0 != (file->queryAccessType() & accessWanted);
  468. }
  469. void markJsonStart()
  470. {
  471. sendMb.append((size32_t)0); // placeholder
  472. jsonRequestStartPos = sendMb.length();
  473. }
  474. void markJsonEnd()
  475. {
  476. jsonRequestEndPos = sendMb.length();
  477. size32_t jsonRequestLen = jsonRequestEndPos - jsonRequestStartPos;
  478. sendMb.writeEndianDirect(jsonRequestStartPos-sizeof(size32_t), sizeof(size32_t), &jsonRequestLen);
  479. }
  480. void serializeJsonRequest(IPropertyTree *tree)
  481. {
  482. StringBuffer jsonStr;
  483. #if _DEBUG
  484. toJSON(tree, jsonStr, 2);
  485. #else
  486. toJSON(tree, jsonStr, 0, 0);
  487. #endif
  488. sendMb.append(jsonStr.length(), jsonStr); // NB: if there was a IOStream to MemoryBuffer impl, could use that to avoid encoding to string, and then appending.
  489. }
  490. void addRequest(IPropertyTree *tree, RemoteFileCommandType legacyCmd=RFCunknown)
  491. {
  492. if (serverVersion < DAFILESRV_STREAMGENERAL_MINVERSION)
  493. {
  494. assertex(legacyCmd != RFCunknown);
  495. sendMb.append(legacyCmd);
  496. serializeJsonRequest(tree);
  497. }
  498. else
  499. {
  500. sendMb.append((RemoteFileCommandType)RFCStreamGeneral);
  501. markJsonStart();
  502. serializeJsonRequest(tree);
  503. markJsonEnd();
  504. }
  505. }
  506. unsigned send(MemoryBuffer &reply)
  507. {
  508. daFsConnection->send(sendMb, reply);
  509. unsigned newHandle;
  510. reply.read(newHandle);
  511. return newHandle;
  512. }
  513. void establishServerVersion()
  514. {
  515. if (serverVersion)
  516. return;
  517. serverVersion = getCachedRemoteVersion(*daFsConnection); // NB: may also connect in the process
  518. if (0 == serverVersion)
  519. {
  520. StringBuffer str;
  521. throwDsFsClientExceptionV(DaFsClient_ConnectionFailure, "CDaFileSrvClientBase: Failed to connect to %s", daFsConnection->queryEp().getUrlStr(str).str());
  522. }
  523. if (serverVersion < DAFILESRV_STREAMREAD_MINVERSION)
  524. {
  525. StringBuffer str;
  526. throwDsFsClientExceptionV(DaFsClient_TooOld, "CDaFileSrvClientBase: server ersion(%u), too old connect to %s", serverVersion, daFsConnection->queryEp().getUrlStr(str).str());
  527. }
  528. }
  529. void start()
  530. {
  531. if (serverVersion)
  532. return;
  533. establishServerVersion(); // JCSMORE - ensure cache involved behind the scenes
  534. if (file->isFileOptionSet(dfo_compressedRemoteStreams))
  535. {
  536. const char *compType = file->queryCommCompressionType();
  537. if (!isEmptyString(compType))
  538. {
  539. if (serverVersion < DAFILESRV_STREAMGENERAL_MINVERSION)
  540. requestTree->setProp("outputCompression", file->queryCommCompressionType());
  541. else
  542. requestTree->setProp("commCompression", file->queryCommCompressionType());
  543. }
  544. }
  545. }
  546. void close()
  547. {
  548. daFsConnection->close(handle);
  549. }
  550. public:
  551. CDaFileSrvClientBase(CDFUFile *_file, unsigned _part, unsigned _copy) : file(_file), part(_part), copy(_copy)
  552. {
  553. unsigned port = file->queryMetaInfo()->getPropInt("port");
  554. bool secure = file->queryMetaInfo()->getPropBool("secure");
  555. SocketEndpoint ep(file->queryPartHost(part), port);
  556. DAFSConnectCfg connMethod = secure ? SSLOnly : SSLNone;
  557. daFsConnection.setown(createDaFsConnection(ep, connMethod, file->queryName()));
  558. requestTree.setown(createPTree());
  559. requestTree->setProp("format", "binary");
  560. requestTree->setPropInt("replyLimit", file->queryRowStreamReplyLimitKb());
  561. if (file->isFileOptionSet(dfo_compressedRemoteStreams))
  562. requestTree->setProp("commCompression", file->queryCommCompressionType());
  563. requestNode = requestTree->setPropTree("node");
  564. // NB: these are 1 based
  565. requestNode->setPropInt("filePart", part+1);
  566. requestNode->setPropInt("filePartCopy", copy+1);
  567. const MemoryBuffer &binLayout = file->queryBinTypeInfo();
  568. StringBuffer typeInfoStr;
  569. JBASE64_Encode(binLayout.toByteArray(), binLayout.length(), typeInfoStr, false);
  570. requestNode->setProp("inputBin", typeInfoStr.str()); // on disk meta
  571. requestNode->setProp("metaInfo", file->queryMetaInfoBlob());
  572. const IPropertyTree *options = file->queryOptions();
  573. if (options)
  574. requestNode->addPropTree("ActivityOptions", createPTreeFromIPT(options));
  575. }
  576. virtual void beforeDispose() override
  577. {
  578. try
  579. {
  580. finalize();
  581. }
  582. catch (IException *e)
  583. {
  584. EXCLOG(e, nullptr);
  585. e->Release();
  586. }
  587. }
  588. // IDFUFilePartBase impl.
  589. virtual void finalize() override
  590. {
  591. close();
  592. started = false;
  593. }
  594. };
  595. class CDFUPartReader : public CDaFileSrvClientBase, implements IDFUFilePartReader, implements ISerialStream
  596. {
  597. typedef CDaFileSrvClientBase PARENT;
  598. MemoryBuffer replyMb;
  599. Owned<IExpander> expander;
  600. MemoryBuffer expandMb;
  601. size32_t bufRemaining = 0;
  602. offset_t bufPos = 0;
  603. bool endOfStream = false;
  604. std::unordered_map<std::string, std::string> virtualFields;
  605. Owned<ISourceRowPrefetcher> rowPrefetcher;
  606. CThorContiguousRowBuffer prefetchBuffer;
  607. bool grouped = false;
  608. bool eog = false;
  609. bool eoi = false;
  610. Linked<IOutputMetaData> outMeta;
  611. offset_t currentReadPos = 0;
  612. bool variableContentDirty = false;
  613. bool pendingFinishRow = false;
  614. std::vector<std::string> fieldFilters;
  615. bool preserveGrouping = false;
  616. const size32_t replyHdrSize = sizeof(unsigned) + sizeof(unsigned) + sizeof(unsigned); // errCode+handle+rowDataSz
  617. void ensureAvailable(size32_t oldSz, const void *oldData)
  618. {
  619. replyMb.read(bufRemaining);
  620. endOfStream = (bufRemaining == 0); // NB: if true, a cursorLength of 0 will follow.
  621. unsigned offset = 0;
  622. if (expander && !endOfStream)
  623. {
  624. size32_t expandedSz = expander->init(replyMb.bytes()+replyMb.getPos());
  625. expandMb.clear().reserve(oldSz+expandedSz);
  626. expander->expand(((byte *)expandMb.bufferBase())+oldSz);
  627. expandMb.swapWith(replyMb);
  628. }
  629. if (oldSz)
  630. {
  631. if (!expander && (oldSz < replyHdrSize)) // when no expander pre-served space includes header
  632. offset = replyHdrSize - oldSz;
  633. replyMb.writeDirect(offset, oldSz, oldData); // NB: overwriting header and/or reserved space
  634. bufRemaining += oldSz;
  635. replyMb.reset(offset); // read pos
  636. }
  637. // NB: continuation cursor (with leading length) follows the row data in replyMb, 0 if no more row data
  638. }
  639. void ensureVariableContentAdded()
  640. {
  641. if (!variableContentDirty)
  642. return;
  643. variableContentDirty = false;
  644. IPropertyTree *virtualFieldsTree = requestNode->setPropTree("virtualFields");
  645. for (auto &e : virtualFields)
  646. virtualFieldsTree->setProp(e.first.c_str(), e.second.c_str());
  647. while (requestNode->removeProp("keyFilter")); // remove all
  648. for (auto &field: fieldFilters)
  649. requestNode->addProp("keyFilter", field.c_str());
  650. }
  651. unsigned sendReadStart(MemoryBuffer &tgt)
  652. {
  653. ensureVariableContentAdded();
  654. sendMb.clear();
  655. initSendBuffer(sendMb);
  656. addRequest(requestTree, RFCStreamRead);
  657. return send(tgt);
  658. }
  659. unsigned sendReadContinuation(MemoryBuffer &newReply)
  660. {
  661. sendMb.clear();
  662. initSendBuffer(sendMb);
  663. Owned<IPropertyTree> tree = createPTree();
  664. tree->setPropInt("handle", handle);
  665. tree->setProp("format", "binary");
  666. addRequest(tree, RFCStreamRead);
  667. daFsConnection->send(sendMb, newReply);
  668. unsigned newHandle;
  669. newReply.read(newHandle);
  670. return newHandle;
  671. }
  672. void extendReplyMb(size32_t wanted)
  673. {
  674. if (0 == bufRemaining)
  675. {
  676. refill();
  677. return;
  678. }
  679. /* Used to read remaining from and patch new reply
  680. * NB: oldBufPtr remains intact until out of scope
  681. */
  682. size32_t oldRemaining = bufRemaining;
  683. unsigned oldRemainingPos = replyMb.getPos();
  684. MemoryBuffer newReplyMb;
  685. /* reserve space to patch back existing remaining bytes into the head of new reply buffer.
  686. * NB: reply buffer's have a leading header (sizeof(errorcode[unsigned]) + sizeof(handle) + sizeof(bufRemaining)), which
  687. * can will be read but removed (overwritten) when writing the oldRemaining bytes.
  688. * Not relevant if compression and expanding used.
  689. */
  690. size32_t replyHdrSize = sizeof(unsigned) + sizeof(handle) + sizeof(bufRemaining);
  691. size32_t leadingSpace = bufRemaining;
  692. if (!expander && (bufRemaining > replyHdrSize))
  693. {
  694. leadingSpace -= replyHdrSize;
  695. newReplyMb.reserveTruncate(leadingSpace);
  696. newReplyMb.reset(leadingSpace); // ensure that newHandle is read from new data
  697. }
  698. // ensures gets in one go
  699. if (wanted>(file->queryRowStreamReplyLimitKb()*1024)) // unlikely
  700. requestTree->setPropInt("replyLimit", (wanted+1023)/1024); // found up to nearest # K
  701. unsigned newHandle = sendReadContinuation(newReplyMb);
  702. if (newHandle != handle) // dafilesrv did not recognize handle, send cursor
  703. {
  704. assertex(newHandle == 0);
  705. // resend original request with cursor
  706. size32_t cursorLength;
  707. replyMb.skip(bufRemaining);
  708. replyMb.read(cursorLength);
  709. StringBuffer cursorInfo;
  710. JBASE64_Encode(replyMb.readDirect(cursorLength), cursorLength, cursorInfo, false);
  711. requestTree->setProp("cursorBin", cursorInfo);
  712. newReplyMb.rewrite(leadingSpace);
  713. handle = sendReadStart(newReplyMb); // new handle
  714. requestTree->removeProp("cursorBin");
  715. }
  716. replyMb.swapWith(newReplyMb);
  717. ensureAvailable(oldRemaining, newReplyMb.bytes()+oldRemainingPos); // reads from replyMb, leaves 'oldRemaining' space at front of expanded buffer (if used)
  718. }
  719. void refill()
  720. {
  721. if (!started)
  722. throwDsFsClientException(DaFsClient_NotStarted, "CDFUPartReader - not started");
  723. size32_t cursorLength;
  724. replyMb.read(cursorLength);
  725. if (!cursorLength)
  726. {
  727. endOfStream = true;
  728. return;
  729. }
  730. MemoryBuffer newReply;
  731. unsigned newHandle = sendReadContinuation(newReply);
  732. if (newHandle == handle)
  733. replyMb.swapWith(newReply);
  734. else // dafilesrv did not recognize handle, send cursor
  735. {
  736. assertex(newHandle == 0);
  737. // resend original request with cursor
  738. StringBuffer cursorInfo;
  739. JBASE64_Encode(replyMb.readDirect(cursorLength), cursorLength, cursorInfo, false);
  740. requestTree->setProp("cursorBin", cursorInfo);
  741. handle = sendReadStart(replyMb.clear()); // new handle
  742. requestTree->removeProp("cursorBin");
  743. }
  744. ensureAvailable(0, nullptr); // reads from replyMb
  745. }
  746. // ISerialStream impl.
  747. virtual const void *peek(size32_t wanted, size32_t &got) override
  748. {
  749. if (bufRemaining >= wanted)
  750. got = bufRemaining;
  751. else
  752. extendReplyMb(wanted);
  753. got = bufRemaining;
  754. return replyMb.bytes()+replyMb.getPos();
  755. }
  756. virtual void get(size32_t len, void * ptr) override // exception if no data available
  757. {
  758. while (len)
  759. {
  760. if (0 == bufRemaining)
  761. {
  762. refill();
  763. if (0 == bufRemaining)
  764. throwDsFsClientException(DaFsClient_ReaderEndOfStream, "CDFUPartReader::get(): end of stream");
  765. }
  766. size32_t r = len>bufRemaining ? bufRemaining : len;
  767. memcpy(ptr, replyMb.readDirect(r), r);
  768. len -= r;
  769. bufRemaining -= r;
  770. currentReadPos += r;
  771. }
  772. }
  773. virtual bool eos() override
  774. {
  775. if (!eoi)
  776. {
  777. if (0 == bufRemaining)
  778. {
  779. refill();
  780. if (0 == bufRemaining)
  781. eoi = true;
  782. }
  783. }
  784. return eoi;
  785. }
  786. virtual void skip(size32_t len) override
  787. {
  788. // same as get() without the memcpy
  789. while (len)
  790. {
  791. if (0 == bufRemaining)
  792. {
  793. refill();
  794. if (0 == bufRemaining)
  795. throwDsFsClientException(DaFsClient_ReaderEndOfStream, "CDFUPartReader::skip(): end of stream");
  796. }
  797. size32_t r = len>bufRemaining ? bufRemaining : len;
  798. len -= r;
  799. bufRemaining -= r;
  800. replyMb.skip(r);
  801. currentReadPos += r;
  802. }
  803. }
  804. virtual offset_t tell() const override
  805. {
  806. return currentReadPos;
  807. }
  808. virtual void reset(offset_t _offset,offset_t _flen=(offset_t)-1) override
  809. {
  810. throwUnexpected();
  811. }
  812. public:
  813. IMPLEMENT_IINTERFACE_USING(PARENT);
  814. CDFUPartReader(CDFUFile *file, unsigned part, unsigned copy, IOutputMetaData *_outMeta, bool _preserveGrouping)
  815. : CDaFileSrvClientBase(file, part, copy), outMeta(_outMeta), prefetchBuffer(nullptr), preserveGrouping(_preserveGrouping)
  816. {
  817. checkAccess(SecAccess_Read);
  818. grouped = file->queryIsGrouped(); // inputGrouped. Will be sent to dafilesrv in request.
  819. if (preserveGrouping && !grouped)
  820. preserveGrouping = false;
  821. // NB: setOutputRecordFormat() can override/set outMeta
  822. if (outMeta && (outMeta != file->queryMeta()))
  823. {
  824. MemoryBuffer projectedTypeInfo;
  825. dumpTypeInfo(projectedTypeInfo, outMeta->querySerializedDiskMeta()->queryTypeInfo());
  826. StringBuffer typeInfoStr;
  827. JBASE64_Encode(projectedTypeInfo.toByteArray(), projectedTypeInfo.length(), typeInfoStr, false);
  828. const char *inputBin = requestNode->queryProp("inputBin"); // NB: this is disk meta
  829. if (0 != strsame(typeInfoStr, inputBin)) // double check provided outMeta is not same as inMeta
  830. requestNode->setProp("outputBin", typeInfoStr.str()); // i.e. dafilesrv will project to this meta
  831. }
  832. else
  833. outMeta.set(file->queryMeta());
  834. if (file->queryIsCompressed())
  835. requestNode->setPropBool("compressed", true);
  836. if (grouped)
  837. {
  838. requestNode->setPropBool("inputGrouped", true);
  839. requestNode->setPropBool("outputGrouped", preserveGrouping);
  840. }
  841. switch (file->queryType())
  842. {
  843. case dft_xml:
  844. case dft_json:
  845. case dft_flat:
  846. case dft_csv:
  847. case dft_index:
  848. {
  849. requestNode->setProp("kind", getReadActivityString(file->queryType()));
  850. break;
  851. }
  852. default:
  853. throwUnexpected();
  854. }
  855. if (file->isFileOptionSet(dfo_compressedRemoteStreams))
  856. {
  857. const char *compType = file->queryCommCompressionType();
  858. if (!isEmptyString(compType))
  859. {
  860. expander.setown(getExpander(compType));
  861. if (expander)
  862. expandMb.setEndian(__BIG_ENDIAN);
  863. else
  864. throwDsFsClientExceptionV(DaFsClient_CompressorSetupError, "Failed to created compression decompressor for: %s", file->queryCommCompressionType());
  865. }
  866. }
  867. }
  868. // IDFUFilePartReader impl.
  869. virtual void start() override
  870. {
  871. PARENT::start();
  872. rowPrefetcher.setown(outMeta->createDiskPrefetcher());
  873. assertex(rowPrefetcher);
  874. prefetchBuffer.setStream(this);
  875. eog = false;
  876. eoi = false;
  877. pendingFinishRow = false;
  878. handle = sendReadStart(replyMb.clear());
  879. ensureAvailable(0, nullptr); // reads from replyMb
  880. started = true;
  881. }
  882. virtual void finalize() override
  883. {
  884. PARENT::finalize();
  885. }
  886. virtual IOutputMetaData *queryMeta() const override
  887. {
  888. return outMeta;
  889. }
  890. virtual const void *nextRow(size32_t &sz) override
  891. {
  892. if (pendingFinishRow)
  893. {
  894. pendingFinishRow = false;
  895. prefetchBuffer.finishedRow();
  896. }
  897. if (eog)
  898. eog = false;
  899. else if (!eoi)
  900. {
  901. if (prefetchBuffer.eos())
  902. {
  903. eoi = true;
  904. return nullptr;
  905. }
  906. rowPrefetcher->readAhead(prefetchBuffer);
  907. sz = prefetchBuffer.queryRowSize();
  908. if (preserveGrouping) // if inputGrouped, but !preserveGrouping, dafilesrv will have stripped grouping
  909. prefetchBuffer.skip(sizeof(eog));
  910. const byte * row = prefetchBuffer.queryRow();
  911. if (preserveGrouping)
  912. memcpy(&eog, row+sz, sizeof(eog));
  913. pendingFinishRow = true;
  914. return row;
  915. }
  916. return nullptr;
  917. }
  918. virtual const void *getRows(size32_t min, size32_t &got) override
  919. {
  920. if (pendingFinishRow)
  921. {
  922. pendingFinishRow = false;
  923. prefetchBuffer.finishedRow();
  924. }
  925. if (eoi)
  926. {
  927. got = 0;
  928. return nullptr;
  929. }
  930. while (true)
  931. {
  932. if (prefetchBuffer.eos())
  933. {
  934. eoi = true;
  935. got = prefetchBuffer.queryRowSize();
  936. return got ? prefetchBuffer.queryRow() : nullptr;
  937. }
  938. rowPrefetcher->readAhead(prefetchBuffer);
  939. pendingFinishRow = true;
  940. if (grouped)
  941. prefetchBuffer.read(sizeof(eog), &eog);
  942. got = prefetchBuffer.queryRowSize();
  943. if (got >= min)
  944. return prefetchBuffer.queryRow();
  945. }
  946. got = 0;
  947. return nullptr;
  948. }
  949. // NB: the methods below should be called before start()
  950. virtual void addFieldFilter(const char *textFilter) override
  951. {
  952. // this is purely to validate the textFilter
  953. const RtlRecord *record = &file->queryMeta()->queryRecordAccessor(true);
  954. Owned<IFieldFilter> rtlFilter = deserializeFieldFilter(*record, textFilter);
  955. if (rtlFilter)
  956. {
  957. fieldFilters.push_back(textFilter);
  958. variableContentDirty = true;
  959. }
  960. }
  961. virtual void clearFieldFilters() override
  962. {
  963. fieldFilters.clear();
  964. variableContentDirty = true;
  965. }
  966. virtual void setOutputRecordFormat(const char *eclRecDef) override
  967. {
  968. MultiErrorReceiver errs;
  969. Owned<IHqlExpression> record = parseQuery(eclRecDef, &errs);
  970. if (errs.errCount())
  971. {
  972. StringBuffer errText;
  973. IError *first = errs.firstError();
  974. first->toString(errText);
  975. throwDsFsClientExceptionV(DaFsClient_ECLParseError, "Failed to parse output ecl definition '%s': %s @ %u:%u", eclRecDef, errText.str(), first->getColumn(), first->getLine());
  976. }
  977. if (!record)
  978. throwDsFsClientExceptionV(DaFsClient_ECLParseError, "Failed to parse output ecl definition '%s'", eclRecDef);
  979. MemoryBuffer projectedTypeInfo;
  980. exportBinaryType(projectedTypeInfo, record, dft_index == file->queryType());
  981. outMeta.setown(createTypeInfoOutputMetaData(projectedTypeInfo, false));
  982. StringBuffer typeInfoStr;
  983. JBASE64_Encode(projectedTypeInfo.toByteArray(), projectedTypeInfo.length(), typeInfoStr, false);
  984. requestNode->setProp("outputBin", typeInfoStr.str());
  985. }
  986. virtual void addVirtualFieldMapping(const char *fieldName, const char *fieldValue) override
  987. {
  988. variableContentDirty = true;
  989. virtualFields[fieldName] = fieldValue;
  990. }
  991. };
  992. class CDFUPartWriterBase : public CDaFileSrvClientBase, implements IDFUFilePartWriter
  993. {
  994. typedef CDaFileSrvClientBase PARENT;
  995. MemoryBuffer replyMb;
  996. unsigned startPos = 0;
  997. Owned<ICompressor> compressor;
  998. protected:
  999. bool firstSend = true;
  1000. const unsigned sendThresholdBytes = 0x100000; // 1MB
  1001. void prepNext()
  1002. {
  1003. sendMb.clear();
  1004. // prepare for next continuation
  1005. initSendBuffer(sendMb);
  1006. Owned<IPropertyTree> tree = createPTree();
  1007. tree->setPropInt("handle", handle);
  1008. addRequest(tree);
  1009. startPos = sendMb.length();
  1010. size32_t rowDataSz = 0;
  1011. sendMb.append(rowDataSz); // place holder
  1012. if (compressor)
  1013. {
  1014. void *rowData = sendMb.reserveTruncate(sendThresholdBytes);
  1015. compressor->open(rowData, sendThresholdBytes);
  1016. }
  1017. }
  1018. unsigned send(MemoryBuffer &replyMb)
  1019. {
  1020. size32_t len;
  1021. if (compressor)
  1022. {
  1023. compressor->close();
  1024. len = compressor->buflen();
  1025. sendMb.setLength(startPos + sizeof(len)+len);
  1026. }
  1027. else
  1028. len = sendMb.length()-startPos-sizeof(size32_t);
  1029. sendMb.writeEndianDirect(startPos, sizeof(len), &len);
  1030. return PARENT::send(replyMb);
  1031. }
  1032. void sendWriteFirst()
  1033. {
  1034. unsigned newHandle = send(replyMb.clear());
  1035. if (!newHandle)
  1036. throwStringExceptionV(DAFSERR_cmdstream_generalwritefailure, "Error whilst writing data to file: '%s'", file->queryName());
  1037. else if (handle && (newHandle != handle))
  1038. throwStringExceptionV(DAFSERR_cmdstream_unknownwritehandle, "Unknown write handle whilst remote writing to file: '%s'", file->queryName());
  1039. handle = newHandle;
  1040. }
  1041. unsigned sendWriteContinuation()
  1042. {
  1043. MemoryBuffer newReplyMb;
  1044. unsigned newHandle = send(newReplyMb.clear());
  1045. if (newHandle == handle)
  1046. replyMb.swapWith(newReplyMb);
  1047. else // dafilesrv did not recognize handle, send cursor
  1048. {
  1049. assertex(newHandle == 0);
  1050. // resend original request with cursor
  1051. size32_t cursorLength;
  1052. replyMb.read(cursorLength);
  1053. StringBuffer cursorInfo;
  1054. JBASE64_Encode(replyMb.readDirect(cursorLength), cursorLength, cursorInfo, false);
  1055. requestTree->setProp("cursorBin", cursorInfo);
  1056. initSendBuffer(sendMb);
  1057. addRequest(requestTree);
  1058. sendWriteFirst(); // new handle
  1059. requestTree->removeProp("cursorBin");
  1060. }
  1061. prepNext();
  1062. return newHandle;
  1063. }
  1064. void sendWrite()
  1065. {
  1066. if (firstSend) // for 1st send, want to send even if no record, so that file is at least created
  1067. {
  1068. if (!started)
  1069. throwDsFsClientException(DaFsClient_NotStarted, "CDFUPartWriterBase: start() must be called before write()");
  1070. firstSend = false;
  1071. sendWriteFirst();
  1072. prepNext();
  1073. }
  1074. else if (sendMb.length() > startPos) // if anything to send
  1075. sendWriteContinuation();
  1076. }
  1077. public:
  1078. IMPLEMENT_IINTERFACE_USING(PARENT);
  1079. CDFUPartWriterBase(CDFUFile *file, unsigned part) : CDaFileSrvClientBase(file, part, 0)
  1080. {
  1081. if (file->isFileOptionSet(dfo_compressedRemoteStreams))
  1082. {
  1083. const char *compType = file->queryCommCompressionType();
  1084. if (!isEmptyString(compType))
  1085. {
  1086. compressor.setown(getCompressor(file->queryCommCompressionType()));
  1087. if (!compressor)
  1088. WARNLOG("Failed to created compressor for: %s", file->queryCommCompressionType());
  1089. }
  1090. }
  1091. }
  1092. // IDFUFilePartWriter impl.
  1093. virtual void start() override
  1094. {
  1095. PARENT::start();
  1096. if (serverVersion < DAFILESRV_STREAMGENERAL_MINVERSION)
  1097. throwDsFsClientExceptionV(DaFsClient_NoStreamWriteSupport, "dafilesrv version (%u) too old to support streaming write", serverVersion);
  1098. sendMb.clear();
  1099. initSendBuffer(sendMb);
  1100. addRequest(requestTree);
  1101. startPos = sendMb.length();
  1102. size32_t rowDataSz = 0;
  1103. sendMb.append(rowDataSz); // place holder
  1104. if (compressor)
  1105. {
  1106. void *rowData = sendMb.reserveTruncate(sendThresholdBytes);
  1107. compressor->open(rowData, sendThresholdBytes);
  1108. }
  1109. started = true;
  1110. }
  1111. virtual void finalize() override
  1112. {
  1113. if (!started)
  1114. return;
  1115. sendWrite();
  1116. PARENT::finalize();
  1117. }
  1118. virtual IOutputMetaData *queryMeta() const override
  1119. {
  1120. return file->queryMeta();
  1121. }
  1122. virtual void write(size32_t sz, const void *rowData) override // NB: can be multiple rows
  1123. {
  1124. if (compressor)
  1125. {
  1126. if (compressor->write(rowData, sz) < sz)
  1127. sendWrite();
  1128. }
  1129. else
  1130. {
  1131. sendMb.append(sz, rowData);
  1132. if (sendMb.length() > sendThresholdBytes)
  1133. sendWrite();
  1134. }
  1135. }
  1136. };
  1137. static const char *defaultCompressionType = "LZ4";
  1138. class CDFUPartFlatWriter : public CDFUPartWriterBase
  1139. {
  1140. typedef CDFUPartWriterBase PARENT;
  1141. StringAttr eclRecDef;
  1142. const byte eogMarker = 1;
  1143. bool lastEog = false;
  1144. bool grouped = false;
  1145. bool append = false;
  1146. // only for legacy dafilesrv's that are 'open' and don't support stream writing.
  1147. StringBuffer directFileName;
  1148. Owned<IFileIOStream> directFileIOStream;
  1149. void doWrite(size32_t sz, const void *data)
  1150. {
  1151. if (directFileIOStream)
  1152. directFileIOStream->write(sz, data);
  1153. else
  1154. PARENT::write(sz, data);
  1155. }
  1156. public:
  1157. CDFUPartFlatWriter(CDFUFile *file, unsigned part, bool _append) : CDFUPartWriterBase(file, part), append(_append)
  1158. {
  1159. checkAccess(SecAccess_Write);
  1160. if (dft_flat != file->queryType())
  1161. throwDsFsClientExceptionV(DaFsClient_InvalidFileType, "CDFUPartFlatWriter: invalid file type: %u", file->queryType());
  1162. requestNode->setProp("kind", "diskwrite");
  1163. if (file->queryIsCompressed())
  1164. requestNode->setProp("compressed", defaultCompressionType);
  1165. requestNode->setPropBool("inputGrouped", file->queryIsGrouped());
  1166. grouped = file->queryIsGrouped();
  1167. }
  1168. virtual void start() override
  1169. {
  1170. lastEog = false;
  1171. try
  1172. {
  1173. PARENT::start();
  1174. return;
  1175. }
  1176. catch (IDaFsException *e)
  1177. {
  1178. if (DaFsClient_NoStreamWriteSupport != e->errorCode())
  1179. throw;
  1180. e->Release();
  1181. }
  1182. StringBuffer msg;
  1183. daFsConnection->queryEp().getUrlStr(msg);
  1184. WARNLOG("Stream writing not supported by dafilesrv(%s), attempting unsecured direct connection", msg.str());
  1185. RemoteFilename rfn;
  1186. file->queryFileDescriptor().getFilename(part, 0, rfn);
  1187. rfn.getRemotePath(directFileName);
  1188. if (!recursiveCreateDirectoryForFile(directFileName))
  1189. throwDsFsClientExceptionV(DaFsClient_OpenFailure, "Failed to create dirtory for file: '%s'", directFileName.str());
  1190. Owned<IFile> iFile = createIFile(directFileName);
  1191. Owned<IFileIO> iFileIO = iFile->open(append ? IFOwrite : IFOcreate);
  1192. directFileIOStream.setown(createBufferedIOStream(iFileIO));
  1193. if (append)
  1194. directFileIOStream->seek(0, IFSend);
  1195. started = true; // DaFsClient_NoStreamWriteSupport exception in base would have prevent started being set
  1196. firstSend = false; // NB: this suppresses the base class from sending
  1197. }
  1198. virtual void write(size32_t sz, const void *rowData) override // NB: can be multiple rows
  1199. {
  1200. if (!rowData)
  1201. {
  1202. if (!grouped)
  1203. throwDsFsClientException(DaFsClient_WriteError, "CRemoteDiskWriteActivity::write() - invalid null write() not grouped write operation");
  1204. else if (lastEog)
  1205. throwDsFsClientException(DaFsClient_WriteError, "CRemoteDiskWriteActivity::write() - multiple sequential null's");
  1206. lastEog = true;
  1207. doWrite(1, &eogMarker);
  1208. }
  1209. else
  1210. {
  1211. lastEog = false;
  1212. doWrite(sz, rowData);
  1213. }
  1214. }
  1215. virtual void finalize() override
  1216. {
  1217. if (directFileIOStream)
  1218. directFileIOStream.clear(); // closes the file
  1219. PARENT::finalize();
  1220. }
  1221. };
  1222. class CDFUPartIndexWriter : public CDFUPartWriterBase
  1223. {
  1224. StringAttr eclRecDef;
  1225. public:
  1226. CDFUPartIndexWriter(CDFUFile *file, unsigned part) : CDFUPartWriterBase(file, part)
  1227. {
  1228. checkAccess(SecAccess_Write);
  1229. if (dft_index != file->queryType())
  1230. throwDsFsClientExceptionV(DaFsClient_InvalidFileType, "CDFUPartIndexWriter: invalid file type: %u", file->queryType());
  1231. requestNode->setProp("kind", "indexwrite");
  1232. }
  1233. };
  1234. IDFUFilePartReader *CDFUFile::createFilePartReader(unsigned p, unsigned copy, IOutputMetaData *outMeta, bool preserveGrouping)
  1235. {
  1236. return new CDFUPartReader(this, p, copy, outMeta, preserveGrouping);
  1237. }
  1238. IDFUFilePartWriter *CDFUFile::createFilePartWriter(unsigned p)
  1239. {
  1240. switch (fileType)
  1241. {
  1242. case dft_flat:
  1243. return new CDFUPartFlatWriter(this, p, false);
  1244. case dft_index:
  1245. return new CDFUPartIndexWriter(this, p);
  1246. default:
  1247. throwUnexpected();
  1248. }
  1249. }
  1250. IDFUFilePartWriter *CDFUFile::createFilePartAppender(unsigned p)
  1251. {
  1252. switch (fileType)
  1253. {
  1254. case dft_flat:
  1255. return new CDFUPartFlatWriter(this, p, true);
  1256. case dft_index:
  1257. throwStringExceptionV(0, "Appending to index not supported");
  1258. break;
  1259. default:
  1260. throwUnexpected();
  1261. }
  1262. }
  1263. ////////////
  1264. IRowWriter *createRowWriter(IDFUFilePartWriter *partWriter)
  1265. {
  1266. class CRowWriter : public CSimpleInterfaceOf<IRowWriter>, protected IRowSerializerTarget
  1267. {
  1268. Linked<IDFUFilePartWriter> partWriter;
  1269. IOutputMetaData *meta = nullptr;
  1270. Owned<IOutputRowSerializer> serializer;
  1271. unsigned nesting = 0;
  1272. MemoryBuffer nested;
  1273. // IRowSerializerTarget impl.
  1274. virtual void put(size32_t len, const void * ptr) override
  1275. {
  1276. if (nesting)
  1277. nested.append(len, ptr);
  1278. else
  1279. partWriter->write(len, ptr);
  1280. }
  1281. virtual size32_t beginNested(size32_t count) override
  1282. {
  1283. nesting++;
  1284. unsigned pos = nested.length();
  1285. nested.append((size32_t)0);
  1286. return pos;
  1287. }
  1288. virtual void endNested(size32_t sizePos) override
  1289. {
  1290. size32_t sz = nested.length()-(sizePos + sizeof(size32_t));
  1291. nested.writeDirect(sizePos,sizeof(sz),&sz);
  1292. nesting--;
  1293. if (!nesting)
  1294. {
  1295. partWriter->write(nested.length(), nested.toByteArray());
  1296. nested.clear();
  1297. }
  1298. }
  1299. public:
  1300. CRowWriter(IDFUFilePartWriter *_partWriter) : partWriter(_partWriter)
  1301. {
  1302. meta = partWriter->queryMeta();
  1303. serializer.setown(meta->createDiskSerializer(nullptr, 1));
  1304. }
  1305. virtual void putRow(const void *row) override
  1306. {
  1307. serializer->serialize(*this, (const byte *)row);
  1308. }
  1309. virtual void flush() override
  1310. {
  1311. // flushing internal to partWriter
  1312. }
  1313. };
  1314. return new CRowWriter(partWriter);
  1315. }
  1316. IRowStream *createRowStream(IDFUFilePartReader *partReader)
  1317. {
  1318. class CRowStream : public CSimpleInterfaceOf<IRowStream>
  1319. {
  1320. IDFUFilePartReader *partReader;
  1321. public:
  1322. CRowStream(IDFUFilePartReader *_partReader) : partReader(_partReader)
  1323. {
  1324. }
  1325. virtual const void *nextRow() override
  1326. {
  1327. return nullptr;
  1328. }
  1329. virtual void stop() override
  1330. {
  1331. }
  1332. };
  1333. return new CRowStream(partReader);
  1334. }
  1335. void setDefaultCommCompression(const char *compType)
  1336. {
  1337. defaultCompCompression.set(compType);
  1338. }
  1339. } // namespace dafsstream