fttransform.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jliball.hpp"
  15. #include "platform.h"
  16. #include "jlib.hpp"
  17. #include "jio.hpp"
  18. #include "jmutex.hpp"
  19. #include "jfile.hpp"
  20. #include "jsocket.hpp"
  21. #include "fterror.hpp"
  22. #include "dadfs.hpp"
  23. #include "daftcfg.hpp"
  24. #include "daftmc.hpp"
  25. #include "rmtspawn.hpp"
  26. #include "fttransform.ipp"
  27. #include "ftbase.ipp"
  28. #define OPTIMIZE_COMMON_TRANSFORMS
  29. // A couple of options useful for debugging
  30. const unsigned gpfFrequency = 0;
  31. const unsigned blockDelay = 00000; // time in ms
  32. //----------------------------------------------------------------------------
  33. CTransformerBase::CTransformerBase()
  34. {
  35. startOffset = 0;
  36. maxOffset = 0;
  37. }
  38. void CTransformerBase::beginTransform(IFileIOStream * out)
  39. {
  40. }
  41. void CTransformerBase::endTransform(IFileIOStream * out)
  42. {
  43. }
  44. void CTransformerBase::setInputCRC(crc32_t _inputCRC)
  45. {
  46. }
  47. bool CTransformerBase::setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length)
  48. {
  49. inputFile.setown(createIFile(remoteInputName));
  50. startOffset = _startOffset;
  51. maxOffset = _startOffset + _length;
  52. return true;
  53. }
  54. //----------------------------------------------------------------------------
  55. CTransformer::CTransformer(size32_t _bufferSize)
  56. {
  57. cursor = 0;
  58. bufferSize = _bufferSize;
  59. buffer = new byte[bufferSize];
  60. }
  61. CTransformer::~CTransformer()
  62. {
  63. delete [] buffer;
  64. }
  65. size32_t CTransformer::read(size32_t maxLength, void * buffer)
  66. {
  67. if (cursor + maxLength > maxOffset)
  68. maxLength = (size32_t)(maxOffset - cursor);
  69. size32_t got = input->read(cursor, maxLength, buffer);
  70. cursor += got;
  71. return got;
  72. }
  73. bool CTransformer::setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey)
  74. {
  75. CTransformerBase::setPartition(remoteInputName, _startOffset, _length);
  76. input.setown(inputFile->open(IFOread));
  77. if (compressedInput) {
  78. Owned<IExpander> expander;
  79. if (decryptKey&&*decryptKey) {
  80. StringBuffer key;
  81. decrypt(key,decryptKey);
  82. expander.setown(createAESExpander256(key.length(),key.str()));
  83. }
  84. input.setown(createCompressedFileReader(input,expander));
  85. }
  86. cursor = startOffset;
  87. return (input != NULL);
  88. }
  89. offset_t CTransformer::tell()
  90. {
  91. return cursor;
  92. }
  93. size32_t CTransformer::getBlock(IFileIOStream * out)
  94. {
  95. unsigned gotLength = getN(buffer, bufferSize);
  96. if (gotLength)
  97. out->write(gotLength, buffer);
  98. return gotLength;
  99. }
  100. //----------------------------------------------------------------------------
  101. CNullTransformer::CNullTransformer(size32_t buffersize) : CTransformer(buffersize)
  102. {
  103. doInputCRC = false;
  104. }
  105. size32_t CNullTransformer::getN(byte * buffer, size32_t maxLength)
  106. {
  107. unsigned num = read(maxLength, buffer);
  108. if (doInputCRC)
  109. inputCRC = crc32((const char *)buffer, num, inputCRC);
  110. return num;
  111. }
  112. void CNullTransformer::setInputCRC(crc32_t _inputCRC)
  113. {
  114. doInputCRC = true;
  115. inputCRC = _inputCRC;
  116. }
  117. //----------------------------------------------------------------------------
  118. CFixedToVarTransformer::CFixedToVarTransformer(size32_t _recordSize,size32_t buffersize, bool _bigendian)
  119. : CTransformer(buffersize)
  120. {
  121. recordSize = _recordSize;
  122. bigendian = _bigendian;
  123. assertex(!bigendian); // TBD Var BE
  124. }
  125. //Coded slightly strangely, so that we avoid an extra memcpy() - except for the bytes
  126. //that don't quite fit in this block.
  127. size32_t CFixedToVarTransformer::getN(byte * buffer, size32_t maxLength)
  128. {
  129. //Read a block of fixed length records into memory, then add the variable length tags
  130. //by moving the data within the block.
  131. const size32_t targetRecordSize = recordSize + sizeof(varLenType);
  132. size32_t sizeToGet = (maxLength / targetRecordSize) * recordSize;
  133. size32_t sizeGot = read(sizeToGet, buffer);
  134. //Now add the varLenType
  135. unsigned numGot = sizeGot/recordSize;
  136. assertex(numGot*recordSize==sizeGot);
  137. for (unsigned cur=numGot;cur--!=0;)
  138. {
  139. byte * curSource = buffer + recordSize * cur;
  140. byte * curTarget = buffer + targetRecordSize * cur;
  141. memmove(curTarget + sizeof(varLenType), curSource, recordSize);
  142. _WINCPYREV(curTarget, &recordSize, sizeof(varLenType));
  143. }
  144. return numGot * targetRecordSize;
  145. }
  146. offset_t CFixedToVarTransformer::tell()
  147. {
  148. return cursor;
  149. }
  150. //---------------------------------------------------------------------------
  151. CVarToFixedTransformer::CVarToFixedTransformer(unsigned _recordSize,size32_t buffersize, bool _bigendian)
  152. : CTransformer(buffersize)
  153. {
  154. recordSize = _recordSize;
  155. savedSize = 0;
  156. savedBuffer = new byte[minBlockSize];
  157. bigendian = _bigendian;
  158. assertex(!bigendian); // TBD Var BE
  159. }
  160. CVarToFixedTransformer::~CVarToFixedTransformer()
  161. {
  162. delete [] savedBuffer;
  163. }
  164. //Read the variable length records into a temporary buffer, and then
  165. //copy across. Can't avoid the memcpy, or do it in place...
  166. size32_t CVarToFixedTransformer::getN(byte * buffer, size32_t maxLength)
  167. {
  168. //Fill the savedBuffer
  169. savedSize += read(minBlockSize-savedSize, savedBuffer+savedSize);
  170. //Walk through the records copying them to the destination
  171. assertex(sizeof(varLenType) == 4);
  172. byte * good = savedBuffer;
  173. byte * lastGood = savedBuffer + savedSize;
  174. byte * target = buffer;
  175. while (good + sizeof(varLenType) <= lastGood)
  176. {
  177. varLenType nextLen;
  178. _WINCPYREV4(&nextLen, good);
  179. if (good + sizeof(varLenType) + nextLen > lastGood)
  180. break;
  181. if (target + recordSize > buffer + maxLength)
  182. break;
  183. if (nextLen < recordSize)
  184. {
  185. memcpy(target, good + sizeof(varLenType), nextLen);
  186. memset(target+nextLen, 0, recordSize-nextLen);
  187. }
  188. else
  189. memcpy(target, good + sizeof(varLenType), recordSize);
  190. good += sizeof(varLenType) + nextLen;
  191. target += recordSize;
  192. }
  193. //Finally shift the extra records down - if there are any
  194. unsigned numUsed = good - savedBuffer;
  195. memmove(savedBuffer, good, savedSize-numUsed);
  196. savedSize -= numUsed;
  197. return target-buffer;
  198. }
  199. offset_t CVarToFixedTransformer::tell()
  200. {
  201. return cursor - savedSize;
  202. }
  203. //----------------------------------------------------------------------------
  204. CBlockToVarTransformer::CBlockToVarTransformer(bool _bigendian)
  205. : CTransformer(EFX_BLOCK_SIZE)
  206. {
  207. assertex(sizeof(blockLenType) == 4);
  208. bigendian = _bigendian;
  209. assertex(!bigendian); // TBD Var BE
  210. nextBlockSize = 0;
  211. }
  212. //Coded to read the length of the next block with the previous block of data.
  213. //assumes the padding is pretty small in the record, so don't try and skip it.
  214. size32_t CBlockToVarTransformer::getN(byte * buffer, size32_t maxLength)
  215. {
  216. assertex(maxLength >= EFX_BLOCK_SIZE);
  217. size32_t blockSize = nextBlockSize;
  218. if (!blockSize)
  219. {
  220. blockLenType temp;
  221. size32_t hdrLen = read(sizeof(temp), &temp);
  222. if (hdrLen == 0)
  223. return 0;
  224. assertex(hdrLen == sizeof(blockLenType));
  225. _WINCPYREV4(&blockSize, &temp);
  226. }
  227. size32_t sizeGot = read(EFX_BLOCK_SIZE, buffer);
  228. if (sizeGot == EFX_BLOCK_SIZE)
  229. _WINCPYREV4(&nextBlockSize, buffer+EFX_BLOCK_SIZE- sizeof(blockLenType));
  230. else
  231. nextBlockSize = 0;
  232. return blockSize;
  233. }
  234. offset_t CBlockToVarTransformer::tell()
  235. {
  236. if (nextBlockSize)
  237. return cursor - sizeof(nextBlockSize);
  238. return cursor;
  239. }
  240. //----------------------------------------------------------------------------
  241. CVarToBlockTransformer::CVarToBlockTransformer(bool _bigendian)
  242. : CTransformer(EFX_BLOCK_SIZE)
  243. {
  244. savedSize = 0;
  245. bigendian = _bigendian;
  246. assertex(!bigendian); // TBD Var BE
  247. savedBuffer = new byte[EFX_BLOCK_SIZE];
  248. }
  249. CVarToBlockTransformer::~CVarToBlockTransformer()
  250. {
  251. delete [] savedBuffer;
  252. }
  253. //Coded slightly strangely, so that we avoid an extra memcpy() - except for the bytes
  254. //that don't quite fit in this block.
  255. size32_t CVarToBlockTransformer::getN(byte * buffer, size32_t maxLength)
  256. {
  257. size32_t sizeGot = 0;
  258. byte * startData = buffer + sizeof(blockLenType);
  259. const unsigned maxDataLength = maxLength - sizeof(blockLenType);
  260. if (savedSize)
  261. {
  262. assertex(savedSize <= maxDataLength);
  263. size32_t copyLen = savedSize;
  264. memcpy(startData, savedBuffer, copyLen);
  265. sizeGot += copyLen;
  266. savedSize -= copyLen;
  267. }
  268. if (maxDataLength != sizeGot)
  269. sizeGot += read(maxDataLength-sizeGot, startData + sizeGot);
  270. if (sizeGot == 0)
  271. return 0;
  272. //Now work out how many records we've copied.
  273. byte * good = startData;
  274. byte * lastGood = startData + sizeGot;
  275. while (good + sizeof(varLenType) < lastGood)
  276. {
  277. varLenType nextLen;
  278. _WINCPYREV(&nextLen, good, sizeof(nextLen));
  279. if (good + sizeof(varLenType) + nextLen > lastGood)
  280. break;
  281. good += sizeof(varLenType) + nextLen;
  282. }
  283. savedSize = sizeGot - (good - startData);
  284. assertex(savedSize < EFX_BLOCK_SIZE);
  285. memcpy(savedBuffer, good, savedSize);
  286. blockLenType blockSize = sizeGot-savedSize;
  287. _WINCPYREV(buffer, &blockSize, sizeof(blockSize));
  288. memset(good, 0, EFX_BLOCK_SIZE-blockSize-sizeof(blockSize));
  289. return EFX_BLOCK_SIZE;
  290. }
  291. offset_t CVarToBlockTransformer::tell()
  292. {
  293. return cursor - savedSize;
  294. }
  295. //----------------------------------------------------------------------------
  296. CGeneralTransformer::CGeneralTransformer(const FileFormat & srcFormat, const FileFormat & tgtFormat)
  297. {
  298. processor.setown(createFormatProcessor(srcFormat, tgtFormat, true));
  299. target.setown(createOutputProcessor(tgtFormat));
  300. processor->setTarget(target);
  301. }
  302. size32_t CGeneralTransformer::getBlock(IFileIOStream * out)
  303. {
  304. return processor->transformBlock(maxOffset, cursor);
  305. }
  306. bool CGeneralTransformer::getInputCRC(crc32_t & value)
  307. {
  308. value = processor->getInputCRC();
  309. return true;
  310. }
  311. void CGeneralTransformer::setInputCRC(crc32_t _inputCRC)
  312. {
  313. processor->setInputCRC(_inputCRC);
  314. }
  315. bool CGeneralTransformer::setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey)
  316. {
  317. CTransformerBase::setPartition(remoteInputName, _startOffset, _length);
  318. processor->setSource(0, remoteInputName, compressedInput, decryptKey);
  319. return inputFile->exists();
  320. }
  321. void CGeneralTransformer::beginTransform(IFileIOStream * out)
  322. {
  323. processor->beginTransform(startOffset, maxOffset-startOffset, cursor);
  324. target->setOutput(out->tell(), out);
  325. }
  326. void CGeneralTransformer::endTransform(IFileIOStream * out)
  327. {
  328. processor->endTransform(cursor);
  329. }
  330. offset_t CGeneralTransformer::tell()
  331. {
  332. return cursor.inputOffset;
  333. }
  334. //----------------------------------------------------------------------------
  335. ITransformer * createTransformer(const FileFormat & srcFormat, const FileFormat & tgtFormat, size32_t buffersize)
  336. {
  337. ITransformer * transformer = NULL;
  338. #ifdef OPTIMIZE_COMMON_TRANSFORMS
  339. if (srcFormat.equals(tgtFormat))
  340. transformer = new CNullTransformer(buffersize);
  341. else
  342. {
  343. switch (srcFormat.type)
  344. {
  345. case FFTfixed:
  346. switch (tgtFormat.type)
  347. {
  348. case FFTvariable:
  349. case FFTvariablebigendian:
  350. transformer = new CFixedToVarTransformer(srcFormat.recordSize,buffersize,(tgtFormat.type==FFTvariablebigendian));
  351. break;
  352. }
  353. break;
  354. case FFTvariable:
  355. case FFTvariablebigendian:
  356. switch (tgtFormat.type)
  357. {
  358. case FFTfixed:
  359. transformer = new CVarToFixedTransformer(tgtFormat.recordSize,buffersize,(srcFormat.type==FFTvariablebigendian));
  360. break;
  361. case FFTblocked:
  362. transformer = new CVarToBlockTransformer((srcFormat.type==FFTvariablebigendian));
  363. break;
  364. }
  365. break;
  366. case FFTblocked:
  367. switch (tgtFormat.type)
  368. {
  369. case FFTvariable:
  370. case FFTvariablebigendian:
  371. transformer = new CBlockToVarTransformer((tgtFormat.type==FFTvariablebigendian));
  372. break;
  373. }
  374. break;
  375. case FFTutf8: case FFTutf8n:
  376. switch (tgtFormat.type)
  377. {
  378. case FFTutf8n:
  379. case FFTutf8:
  380. transformer = new CNullTransformer(buffersize);
  381. break;
  382. case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
  383. break;
  384. default:
  385. throwError(DFTERR_BadSrcTgtCombination);
  386. }
  387. break;
  388. case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
  389. switch (tgtFormat.type)
  390. {
  391. case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
  392. break;
  393. default:
  394. throwError(DFTERR_BadSrcTgtCombination);
  395. }
  396. break;
  397. }
  398. }
  399. #endif
  400. if (!transformer)
  401. transformer = new CGeneralTransformer(srcFormat, tgtFormat);
  402. // throwError(DFTERR_BadSrcTgtCombination);
  403. return transformer;
  404. }
  405. //----------------------------------------------------------------------------
  406. TransferServer::TransferServer(ISocket * _masterSocket)
  407. {
  408. masterSocket = _masterSocket;
  409. lastTick = msTick();
  410. updateFrequency = (unsigned int) -1;
  411. throttleNicSpeed = 0;
  412. compressedInput = false;
  413. compressOutput = false;
  414. transferBufferSize = DEFAULT_STD_BUFFER_SIZE;
  415. }
  416. void TransferServer::sendProgress(OutputProgress & curProgress)
  417. {
  418. MemoryBuffer msg;
  419. msg.setEndian(__BIG_ENDIAN);
  420. curProgress.serialize(msg.clear().append(false));
  421. if (!catchWriteBuffer(masterSocket, msg))
  422. throwError(RFSERR_TimeoutWaitMaster);
  423. checkForRemoteAbort(masterSocket);
  424. }
  425. void TransferServer::appendTransformed(unsigned chunkIndex, ITransformer * input)
  426. {
  427. OutputProgress & curProgress = progress.item(chunkIndex);
  428. PartitionPoint & curPartition = partition.item(chunkIndex);
  429. input->beginTransform(out);
  430. const offset_t startInputOffset = curPartition.inputOffset;
  431. const offset_t startOutputOffset = curPartition.outputOffset;
  432. loop
  433. {
  434. unsigned gotLength = input->getBlock(out);
  435. totalLengthRead += gotLength;
  436. if (gpfFrequency || !gotLength || ((unsigned)(msTick() - lastTick)) > updateFrequency)
  437. {
  438. out->flush();
  439. lastTick = msTick();
  440. offset_t outputOffset = out->tell();
  441. offset_t inputOffset = input->tell();
  442. if (totalLengthToRead)
  443. LOG(MCdebugProgress, unknownJob, "Progress: %d%% done.", (unsigned)(totalLengthRead*100/totalLengthToRead));
  444. curProgress.status = (gotLength == 0) ? OutputProgress::StatusCopied : OutputProgress::StatusActive;
  445. curProgress.inputLength = input->tell()-startInputOffset;
  446. curProgress.outputLength = out->tell()-startOutputOffset;
  447. if (crcOut)
  448. curProgress.outputCRC = crcOut->getCRC();
  449. if (calcInputCRC)
  450. curProgress.hasInputCRC = input->getInputCRC(curProgress.inputCRC);
  451. sendProgress(curProgress);
  452. }
  453. if (!gotLength)
  454. break;
  455. if (blockDelay)
  456. MilliSleep(blockDelay);
  457. else if (throttleNicSpeed)
  458. {
  459. unsigned delay = (unsigned)(((unsigned __int64)gotLength*10*1000*(numParallelSlaves-1))/(throttleNicSpeed * I64C(0x100000)));
  460. if (delay)
  461. MilliSleep(delay*(getRandom()%100)/50);
  462. }
  463. #ifdef _WIN32
  464. if (gpfFrequency && ((rand() % gpfFrequency) == 0))
  465. {
  466. LOG(MCdebugInfo, unknownJob, "About to crash....");
  467. *(char *)0 = 0;
  468. }
  469. #endif
  470. }
  471. input->endTransform(out);
  472. }
  473. void TransferServer::deserializeAction(MemoryBuffer & msg, unsigned action)
  474. {
  475. SocketEndpoint ep;
  476. ep.deserialize(msg);
  477. if (!ep.isHost())
  478. assertex(!"Command transferred to the wrong computer!!!");
  479. srcFormat.deserialize(msg);
  480. tgtFormat.deserialize(msg);
  481. msg.read(calcInputCRC);
  482. msg.read(calcOutputCRC);
  483. deserialize(partition, msg);
  484. msg.read(numParallelSlaves);
  485. msg.read(updateFrequency);
  486. msg.read(replicate);
  487. msg.read(mirror);
  488. msg.read(isSafeMode);
  489. srand((unsigned)get_cycles_now());
  490. int adjust = (rand() * rand() * rand()) % updateFrequency - (updateFrequency/2);
  491. lastTick = msTick() + adjust;
  492. StringBuffer localFilename;
  493. if (action == FTactionpull)
  494. {
  495. partition.item(0).outputName.getPath(localFilename);
  496. LOG(MCdebugProgress, unknownJob, "Process Pull Command: %s", localFilename.str());
  497. }
  498. else
  499. {
  500. partition.item(0).inputName.getPath(localFilename);
  501. LOG(MCdebugProgress, unknownJob, "Process Push Command: %s", localFilename.str());
  502. }
  503. LOG(MCdebugProgress, unknownJob, "Num Parallel Slaves=%d Adjust=%d/%d", numParallelSlaves, adjust, updateFrequency);
  504. LOG(MCdebugProgress, unknownJob, "replicate(%d) mirror(%d) safe(%d) incrc(%d) outcrc(%d)", replicate, mirror, isSafeMode, calcInputCRC, calcOutputCRC);
  505. displayPartition(partition);
  506. unsigned numProgress;
  507. msg.read(numProgress);
  508. for (unsigned i = 0; i < numProgress; i++)
  509. {
  510. OutputProgress & next = *new OutputProgress;
  511. next.deserialize(msg);
  512. progress.append(next);
  513. }
  514. if (msg.remaining())
  515. msg.read(throttleNicSpeed);
  516. if (msg.remaining())
  517. msg.read(compressedInput).read(compressOutput);
  518. if (msg.remaining())
  519. msg.read(copyCompressed);
  520. if (msg.remaining())
  521. msg.read(transferBufferSize);
  522. if (msg.remaining())
  523. msg.read(encryptKey).read(decryptKey);
  524. LOG(MCdebugProgress, unknownJob, "throttle(%d), transferBufferSize(%d)", throttleNicSpeed, transferBufferSize);
  525. PROGLOG("compressedInput(%d), compressedOutput(%d), copyCompressed(%d)", compressedInput?1:0, compressOutput?1:0, copyCompressed?1:0);
  526. PROGLOG("encrypt(%d), decrypt(%d)", encryptKey.isEmpty()?0:1, decryptKey.isEmpty()?0:1);
  527. //---Finished deserializing ---
  528. displayProgress(progress);
  529. totalLengthRead = 0;
  530. totalLengthToRead = 0;
  531. ForEachItemIn(idx, partition)
  532. totalLengthToRead += partition.item(idx).inputLength;
  533. }
  534. unsigned TransferServer::queryLastOutput(unsigned outputIndex)
  535. {
  536. ForEachItemInRev(idx, partition)
  537. if (partition.item(idx).whichOutput == outputIndex)
  538. return idx;
  539. return (unsigned int) -1;
  540. }
  541. void TransferServer::transferChunk(unsigned chunkIndex)
  542. {
  543. PartitionPoint & curPartition = partition.item(chunkIndex);
  544. OutputProgress & curProgress = progress.item(chunkIndex);
  545. LOG(MCdebugProgress, unknownJob, "Begin to transfer chunk %d: Start at length %"I64F"d", chunkIndex, curProgress.inputLength);
  546. const unsigned __int64 startOutOffset = out->tell();
  547. if (startOutOffset != curPartition.outputOffset+curProgress.outputLength)
  548. throwError4(DFTERR_OutputOffsetMismatch, out->tell(), curPartition.outputOffset+curProgress.outputLength, "start", chunkIndex);
  549. unsigned fixedTextLength = curPartition.fixedText.length();
  550. if (fixedTextLength || curPartition.inputName.isNull())
  551. {
  552. out->write(fixedTextLength, curPartition.fixedText.get());
  553. curProgress.status = OutputProgress::StatusCopied;
  554. curProgress.inputLength = fixedTextLength;
  555. curProgress.outputLength = fixedTextLength;
  556. if (crcOut)
  557. curProgress.outputCRC = crcOut->getCRC();
  558. sendProgress(curProgress);
  559. }
  560. else
  561. {
  562. Owned<ITransformer> transformer = createTransformer(srcFormat, tgtFormat, transferBufferSize);
  563. if (!transformer->setPartition(curPartition.inputName,
  564. curPartition.inputOffset+curProgress.inputLength,
  565. curPartition.inputLength-curProgress.inputLength,
  566. compressedInput,
  567. decryptKey))
  568. {
  569. StringBuffer temp;
  570. throwError1(DFTERR_CouldNotOpenFile, curPartition.inputName.getRemotePath(temp).str());
  571. }
  572. if (calcInputCRC)
  573. transformer->setInputCRC(curProgress.inputCRC);
  574. appendTransformed(chunkIndex, transformer);
  575. }
  576. assertex(out->tell() == curPartition.outputOffset + curProgress.outputLength);
  577. if (copyCompressed)
  578. {
  579. //Now the copy of this chunk is complete, update the progress with the full expected length.
  580. //Don't do it before otherwise recovery won't work very well.
  581. curProgress.outputLength = curPartition.outputLength;
  582. }
  583. else
  584. {
  585. if (curPartition.outputLength && (curProgress.outputLength != curPartition.outputLength))
  586. throwError4(DFTERR_OutputOffsetMismatch, out->tell(), curPartition.outputOffset+curPartition.outputLength, "end", chunkIndex);
  587. }
  588. }
  589. bool TransferServer::pull()
  590. {
  591. unsigned curOutput = (unsigned)-1;
  592. unsigned start;
  593. unsigned __int64 curOutputOffset = 0;
  594. //loop through all partitions - inner loop does a file at a time.
  595. unsigned numPartitions = partition.ordinality();
  596. for (start = 0; start < numPartitions; )
  597. {
  598. PartitionPoint & startPartition = partition.item(start);
  599. OutputProgress & startProgress = progress.item(start);
  600. if (startProgress.status == OutputProgress::StatusBegin)
  601. break;
  602. assertex(!compressOutput);
  603. RemoteFilename localTempFilename;
  604. getDfuTempName(localTempFilename, startPartition.outputName);
  605. OwnedIFile outFile = createIFile(localTempFilename);
  606. OwnedIFileIO outio = outFile->openShared(IFOwrite,IFSHnone);
  607. unsigned __int64 size = outio ? outio->size() : 0;
  608. curOutput = startPartition.whichOutput;
  609. curOutputOffset = getHeaderSize(tgtFormat.type);
  610. unsigned i;
  611. for (i = start;i < numPartitions; i++)
  612. {
  613. PartitionPoint & curPartition = partition.item(i);
  614. OutputProgress & curProgress = progress.item(i);
  615. if (curOutput != curPartition.whichOutput)
  616. break;
  617. curPartition.outputOffset = curOutputOffset;
  618. unsigned __int64 progressOffset = curOutputOffset + curProgress.outputLength;
  619. if (progressOffset > size)
  620. {
  621. LOG(MCwarning, unknownJob, "Recovery information seems to be invalid (%"I64F"d %"I64F"d) start copying from the beginning",
  622. size, progressOffset);
  623. //reset any remaining partitions...
  624. for (i = start; i < numPartitions; i++)
  625. progress.item(i).reset();
  626. curOutput = (unsigned int) -1;
  627. goto processedProgress; // break out of both loops
  628. }
  629. assertex(curProgress.status != OutputProgress::StatusRenamed);
  630. if (curProgress.status != OutputProgress::StatusCopied)
  631. {
  632. out.setown(createIOStream(outio));
  633. out->seek(progressOffset, IFSbegin);
  634. wrapOutInCRC(curProgress.outputCRC);
  635. StringBuffer localFilename;
  636. localTempFilename.getPath(localFilename);
  637. LOG(MCdebugProgress, unknownJob, "Continue pulling to file: %s from recovery position", localFilename.str());
  638. start = i;
  639. goto processedProgress; // break out of both loops
  640. }
  641. curOutputOffset += curProgress.outputLength;
  642. }
  643. start = i;
  644. }
  645. processedProgress:
  646. //Delete any output files before generating the new ones.
  647. unsigned maxChunk = partition.ordinality();
  648. if (((start == 0) && !isSafeMode))
  649. {
  650. unsigned prevOutput = (unsigned int) -1;
  651. for (unsigned i = 0; i < maxChunk; i++)
  652. {
  653. PartitionPoint & curPartition = partition.item(i);
  654. if (curPartition.whichOutput != prevOutput)
  655. {
  656. OwnedIFile output = createIFile(curPartition.outputName);
  657. output->remove();
  658. prevOutput = curPartition.whichOutput;
  659. }
  660. }
  661. }
  662. for (unsigned idx=start; idx<maxChunk; idx++)
  663. {
  664. PartitionPoint & curPartition = partition.item(idx);
  665. OutputProgress & curProgress = progress.item(idx);
  666. //Either first non-recovery file, or the target file has changed....
  667. if (curOutput != curPartition.whichOutput)
  668. {
  669. curOutput = curPartition.whichOutput;
  670. if (curProgress.status == OutputProgress::StatusRenamed)
  671. {
  672. LOG(MCdebugProgress, unknownJob, "Renamed file found - must be CRC recovery");
  673. idx = queryLastOutput(curOutput);
  674. continue;
  675. }
  676. RemoteFilename localTempFilename;
  677. getDfuTempName(localTempFilename, curPartition.outputName);
  678. StringBuffer localFilename;
  679. localTempFilename.getPath(localFilename);
  680. recursiveCreateDirectoryForFile(localFilename);
  681. OwnedIFile outFile = createIFile(localFilename.str());
  682. OwnedIFileIO outio = outFile->openShared(IFOcreate,IFSHnone);
  683. if (!outio)
  684. throwError1(DFTERR_CouldNotCreateOutput, localFilename.str());
  685. if (compressOutput) {
  686. Owned<ICompressor> compressor;
  687. if (!encryptKey.isEmpty()) {
  688. StringBuffer key;
  689. decrypt(key,encryptKey);
  690. compressor.setown(createAESCompressor256(key.length(),key.str()));
  691. }
  692. outio.setown(createCompressedFileWriter(outio, 0, true, compressor));
  693. }
  694. LOG(MCdebugProgress, unknownJob, "Start pulling to file: %s", localFilename.str());
  695. //Find the last partition entry that refers to the same file.
  696. if (!compressOutput)
  697. {
  698. PartitionPoint & lastChunk = partition.item(queryLastOutput(curOutput));
  699. if (lastChunk.outputLength)
  700. {
  701. char null = 0;
  702. offset_t lastOffset = lastChunk.outputOffset+lastChunk.outputLength;
  703. outio->write(lastOffset-sizeof(null),sizeof(null),&null);
  704. LOG(MCdebugProgress, unknownJob, "Extend length of target file to %"I64F"d", lastOffset);
  705. }
  706. }
  707. out.setown(createIOStream(outio));
  708. out->seek(0, IFSbegin);
  709. wrapOutInCRC(0);
  710. unsigned headerLen = getHeaderSize(tgtFormat.type);
  711. if (headerLen)
  712. out->write(headerLen, getHeaderText(tgtFormat.type));
  713. curOutputOffset = headerLen;
  714. }
  715. else if (crcOut && (idx!=start))
  716. crcOut->setCRC(0);
  717. curPartition.outputOffset = curOutputOffset;
  718. transferChunk(idx);
  719. curOutputOffset += curProgress.outputLength;
  720. }
  721. crcOut.clear();
  722. out.clear();
  723. //Once the transfers have completed, rename the files, and sync file times
  724. //if replicating...
  725. if (!isSafeMode)
  726. {
  727. MemoryBuffer msg;
  728. unsigned prevOutput = (unsigned int) -1;
  729. for (unsigned i = 0; i < maxChunk; i++)
  730. {
  731. PartitionPoint & curPartition = partition.item(i);
  732. OutputProgress & curProgress = progress.item(i);
  733. if (curPartition.whichOutput != prevOutput)
  734. {
  735. if (curProgress.status != OutputProgress::StatusRenamed)
  736. {
  737. //rename the files..
  738. renameDfuTempToFinal(curPartition.outputName);
  739. OwnedIFile output = createIFile(curPartition.outputName);
  740. if (mirror || replicate)
  741. {
  742. OwnedIFile input = createIFile(curPartition.inputName);
  743. CDateTime modifiedTime;
  744. CDateTime createTime;
  745. if (input->getTime(&createTime, &modifiedTime, NULL))
  746. output->setTime(&createTime, &modifiedTime, NULL);
  747. }
  748. else if (!curPartition.modifiedTime.isNull())
  749. {
  750. OwnedIFile output = createIFile(curPartition.outputName);
  751. output->setTime(&curPartition.modifiedTime, &curPartition.modifiedTime, NULL);
  752. }
  753. else
  754. output->getTime(NULL, &curProgress.resultTime, NULL);
  755. //Notify the master that the file has been renamed - and send the modified time.
  756. msg.setEndian(__BIG_ENDIAN);
  757. curProgress.status = OutputProgress::StatusRenamed;
  758. curProgress.serialize(msg.clear().append(false));
  759. if (!catchWriteBuffer(masterSocket, msg))
  760. throwError(RFSERR_TimeoutWaitMaster);
  761. }
  762. prevOutput = curPartition.whichOutput;
  763. }
  764. }
  765. }
  766. return true;
  767. }
  768. bool TransferServer::push()
  769. {
  770. //May be multiple sources files, and may not read all the chunks from the source file so opened each time..
  771. //Slightly inefficent, but not significant because it is likely to be local
  772. unsigned maxChunk = partition.ordinality();
  773. for (unsigned idx=0;idx<maxChunk;idx++)
  774. {
  775. PartitionPoint & curPartition = partition.item(idx);
  776. OutputProgress & curProgress = progress.item(idx);
  777. if (curProgress.status != OutputProgress::StatusCopied)
  778. {
  779. RemoteFilename outFilename;
  780. getDfuTempName(outFilename, curPartition.outputName);
  781. OwnedIFile output = createIFile(outFilename);
  782. OwnedIFileIO outio = output->openShared(compressOutput?IFOreadwrite:IFOwrite,IFSHfull);
  783. if (!outio)
  784. {
  785. StringBuffer outputPath;
  786. outFilename.getRemotePath(outputPath);
  787. throwError1(DFTERR_CouldNotCreateOutput, outputPath.str());
  788. }
  789. if (compressOutput) {
  790. Owned<ICompressor> compressor;
  791. if (!encryptKey.isEmpty()) {
  792. StringBuffer key;
  793. decrypt(key,encryptKey);
  794. compressor.setown(createAESCompressor256(key.length(),key.str()));
  795. }
  796. outio.setown(createCompressedFileWriter(outio, 0, true, compressor));
  797. }
  798. out.setown(createIOStream(outio));
  799. if (!compressOutput)
  800. out->seek(curPartition.outputOffset + curProgress.outputLength, IFSbegin);
  801. wrapOutInCRC(curProgress.outputCRC);
  802. transferChunk(idx);
  803. crcOut.clear();
  804. out.clear();
  805. }
  806. }
  807. return true;
  808. }
  809. void TransferServer::wrapOutInCRC(unsigned startCRC)
  810. {
  811. if (calcOutputCRC)
  812. {
  813. crcOut.setown(new CrcIOStream(out, startCRC));
  814. out.set(crcOut);
  815. }
  816. }