ftbase.cpp 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "platform.h"
  15. #include "jlib.hpp"
  16. #include "jio.hpp"
  17. #include "jmutex.hpp"
  18. #include "jfile.hpp"
  19. #include "jsocket.hpp"
  20. #include "jptree.hpp"
  21. #include "fterror.hpp"
  22. #include "dadfs.hpp"
  23. #include "ftbase.ipp"
  24. #include "daftmc.hpp"
  25. #include "dasds.hpp"
  26. #include "daftcfg.hpp"
  27. #include "environment.hpp"
  28. #include "dalienv.hpp"
  29. #include "rmtspawn.hpp"
  30. //----------------------------------------------------------------------------
  31. void getDfuTempName(RemoteFilename & temp, const RemoteFilename & src)
  32. {
  33. StringBuffer ext;
  34. src.split(NULL, NULL, NULL, &ext);
  35. ext.append(".tmp");
  36. temp.set(src);
  37. temp.setExtension(ext);
  38. }
  39. void renameDfuTempToFinal(const RemoteFilename & realname)
  40. {
  41. RemoteFilename tempFilename;
  42. StringBuffer newTailname;
  43. getDfuTempName(tempFilename, realname);
  44. realname.getTail(newTailname);
  45. OwnedIFile output = createIFile(tempFilename);
  46. try
  47. {
  48. output->rename(newTailname);
  49. }
  50. catch (IException * e)
  51. {
  52. EXCLOG(e, "Failed to rename target file");
  53. StringBuffer oldName;
  54. realname.getPath(oldName);
  55. LOG(MCdebugInfoDetail, unknownJob, "Error: Rename %s->%s failed - tring to delete target and rename again", oldName.str(), newTailname.str());
  56. e->Release();
  57. OwnedIFile old = createIFile(realname);
  58. old->remove();
  59. output->rename(newTailname);
  60. }
  61. }
  62. //----------------------------------------------------------------------------
  63. PartitionPoint::PartitionPoint()
  64. {
  65. clear();
  66. }
  67. PartitionPoint::PartitionPoint(unsigned _whichInput, unsigned _whichOutput, offset_t _inputOffset, offset_t _inputLength, offset_t _outputLength)
  68. {
  69. clear();
  70. whichInput = _whichInput;
  71. whichOutput = _whichOutput;
  72. inputOffset = _inputOffset;
  73. inputLength = _inputLength;
  74. outputLength = _outputLength;
  75. #ifdef DEBUG
  76. display();
  77. #endif
  78. }
  79. void PartitionPoint::clear()
  80. {
  81. whichInput = 0;
  82. whichOutput = 0;
  83. inputOffset = 0;
  84. inputLength = 0;
  85. outputLength = 0;
  86. outputOffset = 0;
  87. whichSlave = (unsigned)-1;
  88. }
  89. void PartitionPoint::deserialize(MemoryBuffer & in)
  90. {
  91. inputName.deserialize(in);
  92. outputName.deserialize(in);
  93. in.read(whichInput);
  94. in.read(inputOffset);
  95. in.read(inputLength);
  96. in.read(whichOutput);
  97. in.read(outputOffset);
  98. in.read(outputLength);
  99. in.read(whichSlave);
  100. modifiedTime.deserialize(in);
  101. ::deserialize(in, fixedText);
  102. }
  103. void PartitionPoint::display()
  104. {
  105. StringBuffer fulli, fullo;
  106. LOG(MCdebugInfoDetail, unknownJob,
  107. "Partition %s{%d}[%" I64F "d size %" I64F "d]->%s{%d}[%" I64F "d size %" I64F "d]",
  108. inputName.getPath(fulli).str(), whichInput, inputOffset, inputLength,
  109. outputName.getPath(fullo).str(), whichOutput, outputOffset, outputLength);
  110. }
  111. void PartitionPoint::restore(IPropertyTree * tree)
  112. {
  113. StringBuffer fullname;
  114. whichInput = tree->getPropInt(ANinput);
  115. inputOffset = tree->getPropInt64(ANinputOffset);
  116. inputLength = tree->getPropInt64(ANinputLength);
  117. whichOutput = tree->getPropInt(ANoutput);
  118. outputOffset = tree->getPropInt64(ANoutputOffset);
  119. outputLength = tree->getPropInt64(ANoutputLength);
  120. setCanAccessDirectly(inputName,tree->getPropInt(ANinputDirect) != 0);
  121. setCanAccessDirectly(outputName,tree->getPropInt(ANoutputDirect) != 0);
  122. }
  123. void PartitionPoint::serialize(MemoryBuffer & out)
  124. {
  125. inputName.serialize(out);
  126. outputName.serialize(out);
  127. out.append(whichInput);
  128. out.append(inputOffset);
  129. out.append(inputLength);
  130. out.append(whichOutput);
  131. out.append(outputOffset);
  132. out.append(outputLength);
  133. out.append(whichSlave);
  134. modifiedTime.serialize(out);
  135. ::serialize(out, fixedText);
  136. }
  137. void PartitionPoint::save(IPropertyTree * tree)
  138. {
  139. tree->setPropInt(ANinput, whichInput);
  140. tree->setPropInt64(ANinputOffset, inputOffset);
  141. tree->setPropInt64(ANinputLength, inputLength);
  142. tree->setPropInt(ANoutput, whichOutput);
  143. tree->setPropInt64(ANoutputOffset, outputOffset);
  144. tree->setPropInt64(ANoutputLength, outputLength);
  145. tree->setPropInt(ANinputDirect, canAccessDirectly(inputName));
  146. tree->setPropInt(ANoutputDirect, canAccessDirectly(outputName));
  147. }
  148. //---------------------------------------------------------------------------
  149. const char * FFTtext[FFTlast] = {
  150. "unknown",
  151. "fixed", "variable", "blocked",
  152. "csv",
  153. "utf",
  154. "utf-8", "utf-8n",
  155. "utf-16", "utf-16be", "utf-16le",
  156. "utf-32", "utf-32be", "utf-32le",
  157. "recfm-vb", "recfm-v", "variablebigendian"
  158. };
  159. void FileFormat::deserialize(MemoryBuffer & in)
  160. {
  161. byte tempType;
  162. in.read(tempType); type = (FileFormatType)tempType;
  163. switch (type)
  164. {
  165. case FFTfixed:
  166. case FFTblocked:
  167. in.read(recordSize);
  168. break;
  169. case FFTcsv:
  170. case FFTutf:
  171. case FFTutf8: case FFTutf8n:
  172. case FFTutf16: case FFTutf16be: case FFTutf16le:
  173. case FFTutf32: case FFTutf32be: case FFTutf32le:
  174. in.read(maxRecordSize);
  175. ::deserialize(in, separate);
  176. ::deserialize(in, quote);
  177. ::deserialize(in, terminate);
  178. ::deserialize(in, rowTag);
  179. updateMarkupType(rowTag, NULL); //neither kind nor markup currently serialized. may add later
  180. break;
  181. }
  182. }
  183. void FileFormat::deserializeExtra(MemoryBuffer & in, unsigned version)
  184. {
  185. switch (type)
  186. {
  187. case FFTcsv:
  188. case FFTutf:
  189. case FFTutf8: case FFTutf8n:
  190. case FFTutf16: case FFTutf16be: case FFTutf16le:
  191. case FFTutf32: case FFTutf32be: case FFTutf32le:
  192. if (version == 1)
  193. ::deserialize(in, escape);
  194. break;
  195. }
  196. }
  197. unsigned FileFormat::getUnitSize() const
  198. {
  199. switch (type)
  200. {
  201. case FFTfixed:
  202. return recordSize;
  203. case FFTblocked:
  204. return EFX_BLOCK_SIZE;
  205. case FFTvariable:
  206. case FFTvariablebigendian:
  207. case FFTcsv:
  208. case FFTutf:
  209. case FFTutf8: case FFTutf8n:
  210. return 1;
  211. case FFTutf16: case FFTutf16be: case FFTutf16le:
  212. return 2;
  213. case FFTutf32: case FFTutf32be: case FFTutf32le:
  214. return 4;
  215. }
  216. return 1;
  217. }
  218. bool rowLocationIsPath(const char *rowLocator)
  219. {
  220. if (rowLocator && *rowLocator == '/')
  221. return true;
  222. return false;
  223. }
  224. void FileFormat::updateMarkupType(const char *rowLocator, const char *kind)
  225. {
  226. if (kind)
  227. {
  228. if (strieq(kind, "xml"))
  229. markup = FMTxml;
  230. else if (strieq(kind, "json"))
  231. markup = FMTjson;
  232. else
  233. markup = FMTunknown;
  234. }
  235. else if (rowLocator)
  236. {
  237. if (rowLocationIsPath(rowLocator))
  238. markup = FMTjson;
  239. else
  240. markup = FMTxml;
  241. }
  242. else
  243. markup = FMTunknown;
  244. }
  245. bool FileFormat::restore(IPropertyTree * props)
  246. {
  247. StringBuffer formatText;
  248. props->getProp(FPformat, formatText);
  249. const char * format = formatText.str();
  250. if (stricmp(format, "blocked")==0)
  251. type = FFTblocked;
  252. else if (stricmp(format, "variable")==0)
  253. type = FFTvariable;
  254. else if (stricmp(format, "variablebigendian")==0)
  255. type = FFTvariablebigendian;
  256. else if (stricmp(format, "csv")==0)
  257. {
  258. type = FFTcsv;
  259. maxRecordSize = props->getPropInt(FPmaxRecordSize, DEFAULT_MAX_CSV_SIZE);
  260. separate.set(props->queryProp(FPcsvSeparate));
  261. quote.set(props->queryProp(FPcsvQuote));
  262. terminate.set(props->queryProp(FPcsvTerminate));
  263. if (props->hasProp(FPcsvEscape))
  264. escape.set(props->queryProp(FPcsvEscape));
  265. if (maxRecordSize == 0)
  266. throwError(DFTERR_MaxRecordSizeZero);
  267. }
  268. else if (memicmp(format, "utf", 3) == 0)
  269. {
  270. type = FFTutf;
  271. const char * tail = format + 3;
  272. if (*tail == '-')
  273. tail++;
  274. if (stricmp(tail, "8")==0)
  275. type = FFTutf8;
  276. else if (stricmp(tail, "8N")==0)
  277. type = FFTutf8n;
  278. else if (stricmp(tail, "16")==0)
  279. type = FFTutf16;
  280. else if (stricmp(tail, "16BE")==0)
  281. type = FFTutf16be;
  282. else if (stricmp(tail, "16LE")==0)
  283. type = FFTutf16le;
  284. else if (stricmp(tail, "32")==0)
  285. type = FFTutf32;
  286. else if (stricmp(tail, "32BE")==0)
  287. type = FFTutf32be;
  288. else if (stricmp(tail, "32LE")==0)
  289. type = FFTutf32le;
  290. else if (*tail)
  291. throwError1(DFTERR_UnknownUTFFormat, format);
  292. maxRecordSize = props->getPropInt(FPmaxRecordSize, DEFAULT_MAX_CSV_SIZE);
  293. separate.set(props->queryProp(FPcsvSeparate));
  294. quote.set(props->queryProp(FPcsvQuote));
  295. terminate.set(props->queryProp(FPcsvTerminate));
  296. if (props->hasProp(FPcsvEscape))
  297. escape.set(props->queryProp(FPcsvEscape));
  298. rowTag.set(props->queryProp(FProwTag));
  299. if (maxRecordSize == 0)
  300. throwError(DFTERR_MaxRecordSizeZero);
  301. updateMarkupType(rowTag, props->queryProp(FPkind));
  302. headerLength = (unsigned)props->getPropInt(FPheaderLength, -1);
  303. footerLength = (unsigned)props->getPropInt(FPfooterLength, -1);
  304. }
  305. else if ((stricmp(format, "recfmvb")==0)||(stricmp(format, "recfm-vb")==0))
  306. type = FFTrecfmvb;
  307. else if ((stricmp(format, "recfmv")==0)||(stricmp(format, "recfm-v")==0))
  308. type = FFTrecfmv;
  309. else if (props->hasProp(FPrecordSize))
  310. {
  311. type = FFTfixed;
  312. recordSize = props->getPropInt(FPrecordSize);
  313. }
  314. else
  315. return false;
  316. return true;
  317. }
  318. void FileFormat::save(IPropertyTree * props)
  319. {
  320. switch (type)
  321. {
  322. case FFTfixed:
  323. props->setPropInt(FPrecordSize, recordSize);
  324. props->setProp(FPkind, FFTtext[type]);
  325. break;
  326. case FFTblocked:
  327. props->setProp(FPformat, "blocked");
  328. props->setProp(FPkind, FFTtext[type]);
  329. break;
  330. case FFTvariable:
  331. props->setProp(FPformat, "variable");
  332. props->setProp(FPkind, FFTtext[type]);
  333. break;
  334. case FFTvariablebigendian:
  335. props->setProp(FPformat, "variablebigendian");
  336. props->setProp(FPkind, FFTtext[FFTvariable]);
  337. break;
  338. case FFTcsv:
  339. case FFTutf:
  340. case FFTutf8: case FFTutf8n:
  341. case FFTutf16: case FFTutf16be: case FFTutf16le:
  342. case FFTutf32: case FFTutf32be: case FFTutf32le:
  343. props->setProp(FPformat, FFTtext[type]);
  344. if (maxRecordSize) props->setPropInt(FPmaxRecordSize, maxRecordSize);
  345. if (separate) props->setProp(FPcsvSeparate, separate);
  346. if (quote) props->setProp(FPcsvQuote, quote);
  347. if (terminate) props->setProp(FPcsvTerminate, terminate);
  348. if (escape) props->setProp(FPcsvEscape, escape);
  349. if (rowTag) props->setProp(FProwTag, rowTag);
  350. if (markup != FMTunknown)
  351. props->setProp(FPkind, (markup==FMTjson) ? "json" : "xml");
  352. else
  353. props->setProp(FPkind, FFTtext[FFTcsv]);
  354. if (headerLength!=(unsigned)-1)
  355. props->setPropInt(FPheaderLength, headerLength);
  356. if (footerLength!=(unsigned)-1)
  357. props->setPropInt(FPfooterLength, footerLength);
  358. break;
  359. case FFTrecfmvb:
  360. case FFTrecfmv:
  361. props->setProp(FPformat, FFTtext[type]);
  362. props->setProp(FPkind, FFTtext[FFTrecfmv]);
  363. break;
  364. default:
  365. PROGLOG("unknown type %d",(int)type);
  366. throwError(DFTERR_UnknownFormatType);
  367. }
  368. }
  369. void FileFormat::serialize(MemoryBuffer & out) const
  370. {
  371. out.append((byte)type);
  372. switch (type)
  373. {
  374. case FFTfixed:
  375. case FFTblocked:
  376. out.append(recordSize);
  377. break;
  378. case FFTcsv:
  379. case FFTutf:
  380. case FFTutf8: case FFTutf8n:
  381. case FFTutf16: case FFTutf16be: case FFTutf16le:
  382. case FFTutf32: case FFTutf32be: case FFTutf32le:
  383. out.append(maxRecordSize);
  384. ::serialize(out, separate);
  385. ::serialize(out, quote);
  386. ::serialize(out, terminate);
  387. ::serialize(out, rowTag);
  388. break;
  389. }
  390. }
  391. void FileFormat::serializeExtra(MemoryBuffer & out, unsigned version) const
  392. {
  393. switch (type)
  394. {
  395. case FFTcsv:
  396. case FFTutf:
  397. case FFTutf8: case FFTutf8n:
  398. case FFTutf16: case FFTutf16be: case FFTutf16le:
  399. case FFTutf32: case FFTutf32be: case FFTutf32le:
  400. if (version == 1)
  401. ::serialize(out, escape);
  402. break;
  403. }
  404. }
  405. void FileFormat::set(const FileFormat & src)
  406. {
  407. type = src.type;
  408. recordSize = src.recordSize;
  409. maxRecordSize = src.maxRecordSize;
  410. separate.set(src.separate);
  411. quote.set(src.quote);
  412. terminate.set(src.terminate);
  413. escape.set(src.escape);
  414. rowTag.set(src.rowTag);
  415. quotedTerminator = src.quotedTerminator;
  416. markup = src.markup;
  417. headerLength = src.headerLength;
  418. footerLength = src.footerLength;
  419. }
  420. UtfReader::UtfFormat getUtfFormatType(FileFormatType type)
  421. {
  422. switch (type)
  423. {
  424. case FFTutf: return UtfReader::Utf8;
  425. case FFTutf8: return UtfReader::Utf8;
  426. case FFTutf8n: return UtfReader::Utf8;
  427. case FFTutf16: return UtfReader::Utf16be;
  428. case FFTutf16be:return UtfReader::Utf16be;
  429. case FFTutf16le:return UtfReader::Utf16le;
  430. case FFTutf32: return UtfReader::Utf32be;
  431. case FFTutf32be:return UtfReader::Utf32be;
  432. case FFTutf32le:return UtfReader::Utf32le;
  433. }
  434. return UtfReader::Utf8;
  435. }
  436. bool sameEncoding(const FileFormat & src, const FileFormat & tgt)
  437. {
  438. if (src.equals(tgt))
  439. return true;
  440. switch (src.type)
  441. {
  442. case FFTutf8n:
  443. return (tgt.type == FFTutf8);
  444. case FFTutf16be:
  445. return (tgt.type == FFTutf16);
  446. case FFTutf32be:
  447. return (tgt.type == FFTutf32);
  448. }
  449. return false;
  450. }
  451. //----------------------------------------------------------------------------
  452. const char * getHeaderText(FileFormatType type)
  453. {
  454. switch (type)
  455. {
  456. case FFTutf:
  457. case FFTutf8:
  458. return "\xEF\xBB\xBF";
  459. case FFTutf16:
  460. return "\xFE\xFF";
  461. case FFTutf32:
  462. return "\x00\x00\xFE\xFF";
  463. }
  464. return NULL;
  465. }
  466. unsigned getHeaderSize(FileFormatType type)
  467. {
  468. const char * headerText = getHeaderText(type);
  469. return headerText ? strlen(headerText) : 0;
  470. }
  471. //---------------------------------------------------------------------------
  472. OutputProgress::OutputProgress()
  473. {
  474. whichPartition = (unsigned)-1;
  475. hasInputCRC = false;
  476. reset();
  477. }
  478. void OutputProgress::reset()
  479. {
  480. status = StatusBegin;
  481. inputCRC = 0;
  482. inputLength = 0;
  483. outputCRC = 0;
  484. outputLength = 0;
  485. hasCompressed = false;
  486. compressedPartSize = 0;
  487. }
  488. MemoryBuffer & OutputProgress::deserializeCore(MemoryBuffer & in)
  489. {
  490. unsigned _inputCRC, _outputCRC;
  491. bool hasTime;
  492. in.read(status).read(whichPartition).read(hasInputCRC).read(_inputCRC).read(inputLength).read(_outputCRC).read(outputLength).read(hasTime);
  493. inputCRC = _inputCRC;
  494. outputCRC = _outputCRC;
  495. if (hasTime)
  496. resultTime.deserialize(in);
  497. else
  498. resultTime.clear();
  499. return in;
  500. }
  501. MemoryBuffer & OutputProgress::deserializeExtra(MemoryBuffer & in, unsigned version)
  502. {
  503. if (in.remaining())
  504. {
  505. switch (version)
  506. {
  507. case 1:
  508. in.read(hasCompressed);
  509. if (hasCompressed)
  510. in.read(compressedPartSize);
  511. break;
  512. }
  513. }
  514. return in;
  515. }
  516. static const char * const statusText[] = {"Init","Active","Copied","Renamed"};
  517. void OutputProgress::trace()
  518. {
  519. LOG(MCdebugInfoDetail, unknownJob, "Chunk %d status: %s input length: %" I64F "d[CRC:%x] -> output length:%" I64F "d[CRC:%x]", whichPartition, statusText[status], inputLength, inputCRC, outputLength, outputCRC);
  520. }
  521. MemoryBuffer & OutputProgress::serializeCore(MemoryBuffer & out)
  522. {
  523. bool hasTime = !resultTime.isNull();
  524. unsigned _inputCRC = inputCRC;
  525. unsigned _outputCRC = outputCRC;
  526. out.append(status).append(whichPartition).append(hasInputCRC).append(_inputCRC).append(inputLength).append(_outputCRC).append(outputLength).append(hasTime);
  527. if (hasTime)
  528. resultTime.serialize(out);
  529. return out;
  530. }
  531. MemoryBuffer & OutputProgress::serializeExtra(MemoryBuffer & out, unsigned version)
  532. {
  533. switch (version)
  534. {
  535. case 1:
  536. out.append(hasCompressed);
  537. if (hasCompressed )
  538. out.append(compressedPartSize);
  539. break;
  540. }
  541. return out;
  542. }
  543. void OutputProgress::set(const OutputProgress & other)
  544. {
  545. whichPartition = other.whichPartition;
  546. hasInputCRC = other.hasInputCRC;
  547. inputCRC = other.inputCRC;
  548. inputLength = other.inputLength;
  549. outputCRC = other.outputCRC;
  550. outputLength = other.outputLength;
  551. status = other.status;
  552. resultTime = other.resultTime;
  553. hasCompressed = other.hasCompressed;
  554. compressedPartSize = other.compressedPartSize;
  555. }
  556. void OutputProgress::restore(IPropertyTree * tree)
  557. {
  558. status = tree->getPropInt("@status");
  559. whichPartition = tree->getPropInt("@partition");
  560. hasInputCRC = tree->hasProp("@inputCRC");
  561. inputCRC = tree->getPropInt("@inputCRC", 0);
  562. inputLength = tree->getPropInt64("@inputLength");
  563. outputCRC = tree->getPropInt("@outputCRC");
  564. outputLength = tree->getPropInt64("@outputLength");
  565. resultTime.setString(tree->queryProp("@modified"));
  566. hasCompressed = tree->getPropBool("@compressed");
  567. compressedPartSize = tree->getPropInt64("@compressedPartSize");
  568. }
  569. void OutputProgress::save(IPropertyTree * tree)
  570. {
  571. tree->setPropInt("@status", status);
  572. tree->setPropInt("@partition", whichPartition);
  573. if (hasInputCRC)
  574. tree->setPropInt("@inputCRC", inputCRC);
  575. tree->setPropInt64("@inputLength", inputLength);
  576. tree->setPropInt("@outputCRC", outputCRC);
  577. tree->setPropInt64("@outputLength", outputLength);
  578. if (!resultTime.isNull())
  579. {
  580. StringBuffer timestr;
  581. tree->setProp("@modified", resultTime.getString(timestr));
  582. }
  583. tree->setPropInt("@compressed", hasCompressed);
  584. tree->setPropInt64("@compressedPartSize", compressedPartSize);
  585. }
  586. void displayProgress(OutputProgressArray & progress)
  587. {
  588. LOG(MCdebugInfoDetail, unknownJob, "Progress:");
  589. ForEachItemIn(idx, progress)
  590. progress.item(idx).trace();
  591. }
  592. //---------------------------------------------------------------------------
  593. void displayPartition(PartitionPointArray & partition)
  594. {
  595. LOG(MCdebugInfoDetail, unknownJob, "Partition:");
  596. ForEachItemIn(idx, partition)
  597. partition.item(idx).display();
  598. }
  599. void deserialize(PartitionPointArray & partition, MemoryBuffer & in)
  600. {
  601. unsigned count;
  602. in.read(count);
  603. for (unsigned idx = 0; idx < count; idx++)
  604. {
  605. PartitionPoint & next = * new PartitionPoint;
  606. next.deserialize(in);
  607. partition.append(next);
  608. }
  609. }
  610. void serialize(PartitionPointArray & partition, MemoryBuffer & out)
  611. {
  612. out.append(partition.ordinality());
  613. ForEachItemIn(idx, partition)
  614. partition.item(idx).serialize(out);
  615. }
  616. //---------------------------------------------------------------------------
  617. CrcIOStream::CrcIOStream(IFileIOStream * _stream, unsigned startCRC)
  618. {
  619. stream.set(_stream);
  620. crc = startCRC;
  621. }
  622. void CrcIOStream::flush()
  623. {
  624. }
  625. size32_t CrcIOStream::read(size32_t len, void * data)
  626. {
  627. size32_t got = stream->read(len, data);
  628. crc = crc32((const char *)data, got, crc);
  629. return got;
  630. }
  631. void CrcIOStream::seek(offset_t pos, IFSmode origin)
  632. {
  633. stream->seek(pos, origin);
  634. //MORE - no sensible thing to do on a seek....
  635. }
  636. offset_t CrcIOStream::size()
  637. {
  638. return stream->size();
  639. }
  640. offset_t CrcIOStream::tell()
  641. {
  642. return stream->tell();
  643. }
  644. size32_t CrcIOStream::write(size32_t len, const void * data)
  645. {
  646. size32_t written = stream->write(len, data);
  647. crc = crc32((const char *)data, written, crc);
  648. return written;
  649. }
  650. //---------------------------------------------------------------------------
  651. static int breakCount;
  652. bool daftAbortHandler()
  653. {
  654. LOG(MCprogress, unknownJob, "Aborting...");
  655. // hit ^C 3 times to really stop it...
  656. if (breakCount++ >= 2)
  657. {
  658. closeEnvironment();
  659. return true;
  660. }
  661. return false;
  662. }
  663. //---------------------------------------------------------------------------
  664. const char * queryFtSlaveExecutable(const IpAddress &ip, StringBuffer &ret)
  665. {
  666. StringBuffer dir; // not currently used
  667. return querySlaveExecutable("FTSlaveProcess", "ftslave", NULL, ip, ret, dir);
  668. }
  669. static StringAttr ftslavelogdir;
  670. const char * queryFtSlaveLogDir()
  671. {
  672. return ftslavelogdir.get();
  673. }
  674. void setFtSlaveLogDir(const char *dir)
  675. {
  676. PROGLOG("ftslave log dir set to %s",dir);
  677. ftslavelogdir.set(dir);
  678. }