fttransform.cpp 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jliball.hpp"
  14. #include "platform.h"
  15. #include "jlib.hpp"
  16. #include "jio.hpp"
  17. #include "jmutex.hpp"
  18. #include "jfile.hpp"
  19. #include "jsocket.hpp"
  20. #include "fterror.hpp"
  21. #include "dadfs.hpp"
  22. #include "daftcfg.hpp"
  23. #include "daftmc.hpp"
  24. #include "rmtspawn.hpp"
  25. #include "fttransform.ipp"
  26. #include "ftbase.ipp"
  27. #define OPTIMIZE_COMMON_TRANSFORMS
  28. // A couple of options useful for debugging
  29. const unsigned gpfFrequency = 0;
  30. const unsigned blockDelay = 00000; // time in ms
  31. //----------------------------------------------------------------------------
  32. CTransformerBase::CTransformerBase()
  33. {
  34. startOffset = 0;
  35. maxOffset = 0;
  36. }
  37. void CTransformerBase::beginTransform(IFileIOStream * out)
  38. {
  39. }
  40. void CTransformerBase::endTransform(IFileIOStream * out)
  41. {
  42. }
  43. void CTransformerBase::setInputCRC(crc32_t _inputCRC)
  44. {
  45. }
  46. bool CTransformerBase::setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length)
  47. {
  48. inputFile.setown(createIFile(remoteInputName));
  49. startOffset = _startOffset;
  50. maxOffset = _startOffset + _length;
  51. return true;
  52. }
  53. //----------------------------------------------------------------------------
  54. CTransformer::CTransformer(size32_t _bufferSize)
  55. {
  56. cursor = 0;
  57. bufferSize = _bufferSize;
  58. buffer = new byte[bufferSize];
  59. }
  60. CTransformer::~CTransformer()
  61. {
  62. delete [] buffer;
  63. }
  64. size32_t CTransformer::read(size32_t maxLength, void * buffer)
  65. {
  66. if (cursor + maxLength > maxOffset)
  67. maxLength = (size32_t)(maxOffset - cursor);
  68. size32_t got = input->read(cursor, maxLength, buffer);
  69. cursor += got;
  70. return got;
  71. }
  72. bool CTransformer::setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey)
  73. {
  74. CTransformerBase::setPartition(remoteInputName, _startOffset, _length);
  75. // could be cache for local, nocache for mirror
  76. input.setown(inputFile->open(IFOread,IFEnocache));
  77. if (compressedInput) {
  78. Owned<IExpander> expander;
  79. if (decryptKey&&*decryptKey) {
  80. StringBuffer key;
  81. decrypt(key,decryptKey);
  82. expander.setown(createAESExpander256(key.length(),key.str()));
  83. }
  84. input.setown(createCompressedFileReader(input,expander));
  85. }
  86. cursor = startOffset;
  87. return (input != NULL);
  88. }
  89. offset_t CTransformer::tell()
  90. {
  91. return cursor;
  92. }
  93. size32_t CTransformer::getBlock(IFileIOStream * out)
  94. {
  95. unsigned gotLength = getN(buffer, bufferSize);
  96. if (gotLength)
  97. out->write(gotLength, buffer);
  98. return gotLength;
  99. }
  100. //----------------------------------------------------------------------------
  101. CNullTransformer::CNullTransformer(size32_t buffersize) : CTransformer(buffersize)
  102. {
  103. doInputCRC = false;
  104. }
  105. size32_t CNullTransformer::getN(byte * buffer, size32_t maxLength)
  106. {
  107. unsigned num = read(maxLength, buffer);
  108. if (doInputCRC)
  109. inputCRC = crc32((const char *)buffer, num, inputCRC);
  110. return num;
  111. }
  112. void CNullTransformer::setInputCRC(crc32_t _inputCRC)
  113. {
  114. doInputCRC = true;
  115. inputCRC = _inputCRC;
  116. }
  117. //----------------------------------------------------------------------------
  118. CFixedToVarTransformer::CFixedToVarTransformer(size32_t _recordSize,size32_t buffersize, bool _bigendian)
  119. : CTransformer(buffersize)
  120. {
  121. recordSize = _recordSize;
  122. bigendian = _bigendian;
  123. assertex(!bigendian); // TBD Var BE
  124. }
  125. //Coded slightly strangely, so that we avoid an extra memcpy() - except for the bytes
  126. //that don't quite fit in this block.
  127. size32_t CFixedToVarTransformer::getN(byte * buffer, size32_t maxLength)
  128. {
  129. //Read a block of fixed length records into memory, then add the variable length tags
  130. //by moving the data within the block.
  131. const size32_t targetRecordSize = recordSize + sizeof(varLenType);
  132. size32_t sizeToGet = (maxLength / targetRecordSize) * recordSize;
  133. size32_t sizeGot = read(sizeToGet, buffer);
  134. //Now add the varLenType
  135. unsigned numGot = sizeGot/recordSize;
  136. assertex(numGot*recordSize==sizeGot);
  137. for (unsigned cur=numGot;cur--!=0;)
  138. {
  139. byte * curSource = buffer + recordSize * cur;
  140. byte * curTarget = buffer + targetRecordSize * cur;
  141. memmove(curTarget + sizeof(varLenType), curSource, recordSize);
  142. _WINCPYREV(curTarget, &recordSize, sizeof(varLenType));
  143. }
  144. return numGot * targetRecordSize;
  145. }
  146. offset_t CFixedToVarTransformer::tell()
  147. {
  148. return cursor;
  149. }
  150. //---------------------------------------------------------------------------
  151. CVarToFixedTransformer::CVarToFixedTransformer(unsigned _recordSize,size32_t buffersize, bool _bigendian)
  152. : CTransformer(buffersize)
  153. {
  154. recordSize = _recordSize;
  155. savedSize = 0;
  156. savedBuffer = new byte[minBlockSize];
  157. bigendian = _bigendian;
  158. assertex(!bigendian); // TBD Var BE
  159. }
  160. CVarToFixedTransformer::~CVarToFixedTransformer()
  161. {
  162. delete [] savedBuffer;
  163. }
  164. //Read the variable length records into a temporary buffer, and then
  165. //copy across. Can't avoid the memcpy, or do it in place...
  166. size32_t CVarToFixedTransformer::getN(byte * buffer, size32_t maxLength)
  167. {
  168. //Fill the savedBuffer
  169. savedSize += read(minBlockSize-savedSize, savedBuffer+savedSize);
  170. //Walk through the records copying them to the destination
  171. assertex(sizeof(varLenType) == 4);
  172. byte * good = savedBuffer;
  173. byte * lastGood = savedBuffer + savedSize;
  174. byte * target = buffer;
  175. while (good + sizeof(varLenType) <= lastGood)
  176. {
  177. varLenType nextLen;
  178. _WINCPYREV4(&nextLen, good);
  179. if (good + sizeof(varLenType) + nextLen > lastGood)
  180. break;
  181. if (target + recordSize > buffer + maxLength)
  182. break;
  183. if (nextLen < recordSize)
  184. {
  185. memcpy(target, good + sizeof(varLenType), nextLen);
  186. memset(target+nextLen, 0, recordSize-nextLen);
  187. }
  188. else
  189. memcpy(target, good + sizeof(varLenType), recordSize);
  190. good += sizeof(varLenType) + nextLen;
  191. target += recordSize;
  192. }
  193. //Finally shift the extra records down - if there are any
  194. unsigned numUsed = good - savedBuffer;
  195. memmove(savedBuffer, good, savedSize-numUsed);
  196. savedSize -= numUsed;
  197. return target-buffer;
  198. }
  199. offset_t CVarToFixedTransformer::tell()
  200. {
  201. return cursor - savedSize;
  202. }
  203. //----------------------------------------------------------------------------
  204. CBlockToVarTransformer::CBlockToVarTransformer(bool _bigendian)
  205. : CTransformer(EFX_BLOCK_SIZE)
  206. {
  207. assertex(sizeof(blockLenType) == 4);
  208. bigendian = _bigendian;
  209. assertex(!bigendian); // TBD Var BE
  210. nextBlockSize = 0;
  211. }
  212. //Coded to read the length of the next block with the previous block of data.
  213. //assumes the padding is pretty small in the record, so don't try and skip it.
  214. size32_t CBlockToVarTransformer::getN(byte * buffer, size32_t maxLength)
  215. {
  216. assertex(maxLength >= EFX_BLOCK_SIZE);
  217. size32_t blockSize = nextBlockSize;
  218. if (!blockSize)
  219. {
  220. blockLenType temp;
  221. size32_t hdrLen = read(sizeof(temp), &temp);
  222. if (hdrLen == 0)
  223. return 0;
  224. assertex(hdrLen == sizeof(blockLenType));
  225. _WINCPYREV4(&blockSize, &temp);
  226. }
  227. size32_t sizeGot = read(EFX_BLOCK_SIZE, buffer);
  228. if (sizeGot == EFX_BLOCK_SIZE)
  229. _WINCPYREV4(&nextBlockSize, buffer+EFX_BLOCK_SIZE- sizeof(blockLenType));
  230. else
  231. nextBlockSize = 0;
  232. return blockSize;
  233. }
  234. offset_t CBlockToVarTransformer::tell()
  235. {
  236. if (nextBlockSize)
  237. return cursor - sizeof(nextBlockSize);
  238. return cursor;
  239. }
  240. //----------------------------------------------------------------------------
  241. CVarToBlockTransformer::CVarToBlockTransformer(bool _bigendian)
  242. : CTransformer(EFX_BLOCK_SIZE)
  243. {
  244. savedSize = 0;
  245. bigendian = _bigendian;
  246. assertex(!bigendian); // TBD Var BE
  247. savedBuffer = new byte[EFX_BLOCK_SIZE];
  248. }
  249. CVarToBlockTransformer::~CVarToBlockTransformer()
  250. {
  251. delete [] savedBuffer;
  252. }
  253. //Coded slightly strangely, so that we avoid an extra memcpy() - except for the bytes
  254. //that don't quite fit in this block.
  255. size32_t CVarToBlockTransformer::getN(byte * buffer, size32_t maxLength)
  256. {
  257. size32_t sizeGot = 0;
  258. byte * startData = buffer + sizeof(blockLenType);
  259. const unsigned maxDataLength = maxLength - sizeof(blockLenType);
  260. if (savedSize)
  261. {
  262. assertex(savedSize <= maxDataLength);
  263. size32_t copyLen = savedSize;
  264. memcpy(startData, savedBuffer, copyLen);
  265. sizeGot += copyLen;
  266. savedSize -= copyLen;
  267. }
  268. if (maxDataLength != sizeGot)
  269. sizeGot += read(maxDataLength-sizeGot, startData + sizeGot);
  270. if (sizeGot == 0)
  271. return 0;
  272. //Now work out how many records we've copied.
  273. byte * good = startData;
  274. byte * lastGood = startData + sizeGot;
  275. while (good + sizeof(varLenType) < lastGood)
  276. {
  277. varLenType nextLen;
  278. _WINCPYREV(&nextLen, good, sizeof(nextLen));
  279. if (good + sizeof(varLenType) + nextLen > lastGood)
  280. break;
  281. good += sizeof(varLenType) + nextLen;
  282. }
  283. savedSize = sizeGot - (good - startData);
  284. assertex(savedSize < EFX_BLOCK_SIZE);
  285. memcpy(savedBuffer, good, savedSize);
  286. blockLenType blockSize = sizeGot-savedSize;
  287. _WINCPYREV(buffer, &blockSize, sizeof(blockSize));
  288. memset(good, 0, EFX_BLOCK_SIZE-blockSize-sizeof(blockSize));
  289. return EFX_BLOCK_SIZE;
  290. }
  291. offset_t CVarToBlockTransformer::tell()
  292. {
  293. return cursor - savedSize;
  294. }
  295. //----------------------------------------------------------------------------
  296. CGeneralTransformer::CGeneralTransformer(const FileFormat & srcFormat, const FileFormat & tgtFormat)
  297. {
  298. processor.setown(createFormatProcessor(srcFormat, tgtFormat, true));
  299. target.setown(createOutputProcessor(tgtFormat));
  300. processor->setTarget(target);
  301. }
  302. size32_t CGeneralTransformer::getBlock(IFileIOStream * out)
  303. {
  304. return processor->transformBlock(maxOffset, cursor);
  305. }
  306. bool CGeneralTransformer::getInputCRC(crc32_t & value)
  307. {
  308. value = processor->getInputCRC();
  309. return true;
  310. }
  311. void CGeneralTransformer::setInputCRC(crc32_t _inputCRC)
  312. {
  313. processor->setInputCRC(_inputCRC);
  314. }
  315. bool CGeneralTransformer::setPartition(RemoteFilename & remoteInputName, offset_t _startOffset, offset_t _length, bool compressedInput, const char *decryptKey)
  316. {
  317. CTransformerBase::setPartition(remoteInputName, _startOffset, _length);
  318. processor->setSource(0, remoteInputName, compressedInput, decryptKey);
  319. return inputFile->exists();
  320. }
  321. void CGeneralTransformer::beginTransform(IFileIOStream * out)
  322. {
  323. processor->beginTransform(startOffset, maxOffset-startOffset, cursor);
  324. target->setOutput(out->tell(), out);
  325. }
  326. void CGeneralTransformer::endTransform(IFileIOStream * out)
  327. {
  328. processor->endTransform(cursor);
  329. }
  330. offset_t CGeneralTransformer::tell()
  331. {
  332. return cursor.inputOffset;
  333. }
  334. //----------------------------------------------------------------------------
  335. ITransformer * createTransformer(const FileFormat & srcFormat, const FileFormat & tgtFormat, size32_t buffersize)
  336. {
  337. ITransformer * transformer = NULL;
  338. #ifdef OPTIMIZE_COMMON_TRANSFORMS
  339. if (srcFormat.equals(tgtFormat))
  340. transformer = new CNullTransformer(buffersize);
  341. else
  342. {
  343. switch (srcFormat.type)
  344. {
  345. case FFTfixed:
  346. switch (tgtFormat.type)
  347. {
  348. case FFTvariable:
  349. case FFTvariablebigendian:
  350. transformer = new CFixedToVarTransformer(srcFormat.recordSize,buffersize,(tgtFormat.type==FFTvariablebigendian));
  351. break;
  352. }
  353. break;
  354. case FFTvariable:
  355. case FFTvariablebigendian:
  356. switch (tgtFormat.type)
  357. {
  358. case FFTfixed:
  359. transformer = new CVarToFixedTransformer(tgtFormat.recordSize,buffersize,(srcFormat.type==FFTvariablebigendian));
  360. break;
  361. case FFTblocked:
  362. transformer = new CVarToBlockTransformer((srcFormat.type==FFTvariablebigendian));
  363. break;
  364. }
  365. break;
  366. case FFTblocked:
  367. switch (tgtFormat.type)
  368. {
  369. case FFTvariable:
  370. case FFTvariablebigendian:
  371. transformer = new CBlockToVarTransformer((tgtFormat.type==FFTvariablebigendian));
  372. break;
  373. }
  374. break;
  375. case FFTutf8: case FFTutf8n:
  376. switch (tgtFormat.type)
  377. {
  378. case FFTutf8n:
  379. case FFTutf8:
  380. transformer = new CNullTransformer(buffersize);
  381. break;
  382. case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
  383. break;
  384. default:
  385. throwError(DFTERR_BadSrcTgtCombination);
  386. }
  387. break;
  388. case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
  389. switch (tgtFormat.type)
  390. {
  391. case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
  392. break;
  393. default:
  394. throwError(DFTERR_BadSrcTgtCombination);
  395. }
  396. break;
  397. }
  398. }
  399. #endif
  400. if (!transformer)
  401. transformer = new CGeneralTransformer(srcFormat, tgtFormat);
  402. // throwError(DFTERR_BadSrcTgtCombination);
  403. return transformer;
  404. }
  405. //----------------------------------------------------------------------------
  406. TransferServer::TransferServer(ISocket * _masterSocket)
  407. {
  408. masterSocket = _masterSocket;
  409. lastTick = msTick();
  410. updateFrequency = (unsigned int) -1;
  411. throttleNicSpeed = 0;
  412. compressedInput = false;
  413. compressOutput = false;
  414. transferBufferSize = DEFAULT_STD_BUFFER_SIZE;
  415. fileUmask = -1;
  416. }
  417. void TransferServer::sendProgress(OutputProgress & curProgress)
  418. {
  419. MemoryBuffer msg;
  420. msg.setEndian(__BIG_ENDIAN);
  421. curProgress.serializeCore(msg.clear().append(false));
  422. curProgress.serializeExtra(msg, 1);
  423. if (!catchWriteBuffer(masterSocket, msg))
  424. throwError(RFSERR_TimeoutWaitMaster);
  425. checkForRemoteAbort(masterSocket);
  426. }
  427. void TransferServer::appendTransformed(unsigned chunkIndex, ITransformer * input)
  428. {
  429. OutputProgress & curProgress = progress.item(chunkIndex);
  430. PartitionPoint & curPartition = partition.item(chunkIndex);
  431. input->beginTransform(out);
  432. const offset_t startInputOffset = curPartition.inputOffset;
  433. const offset_t startOutputOffset = curPartition.outputOffset;
  434. for (;;)
  435. {
  436. unsigned gotLength = input->getBlock(out);
  437. totalLengthRead += gotLength;
  438. if (gpfFrequency || !gotLength || ((unsigned)(msTick() - lastTick)) > updateFrequency)
  439. {
  440. out->flush();
  441. lastTick = msTick();
  442. offset_t outputOffset = out->tell();
  443. offset_t inputOffset = input->tell();
  444. if (totalLengthToRead)
  445. LOG(MCdebugProgress, unknownJob, "Progress: %d%% done. [%" I64F "u]", (unsigned)(totalLengthRead*100/totalLengthToRead), (unsigned __int64)totalLengthRead);
  446. curProgress.status = (gotLength == 0) ? OutputProgress::StatusCopied : OutputProgress::StatusActive;
  447. curProgress.inputLength = input->tell()-startInputOffset;
  448. curProgress.outputLength = out->tell()-startOutputOffset;
  449. if (crcOut)
  450. curProgress.outputCRC = crcOut->getCRC();
  451. if (calcInputCRC)
  452. curProgress.hasInputCRC = input->getInputCRC(curProgress.inputCRC);
  453. sendProgress(curProgress);
  454. }
  455. if (!gotLength)
  456. break;
  457. if (blockDelay)
  458. MilliSleep(blockDelay);
  459. else if (throttleNicSpeed)
  460. {
  461. unsigned delay = (unsigned)(((unsigned __int64)gotLength*10*1000*(numParallelSlaves-1))/(throttleNicSpeed * I64C(0x100000)));
  462. if (delay)
  463. MilliSleep(delay*(getRandom()%100)/50);
  464. }
  465. #ifdef _WIN32
  466. if (gpfFrequency && ((fastRand() % gpfFrequency) == 0))
  467. {
  468. LOG(MCdebugInfo, unknownJob, "About to crash....");
  469. *(char *)0 = 0;
  470. }
  471. #endif
  472. }
  473. input->endTransform(out);
  474. }
  475. void TransferServer::deserializeAction(MemoryBuffer & msg, unsigned action)
  476. {
  477. SocketEndpoint ep;
  478. ep.deserialize(msg);
  479. if (!ep.isLocal())
  480. {
  481. StringBuffer host, expected;
  482. queryHostIP().getIpText(host);
  483. ep.getIpText(expected);
  484. throwError2(DFTERR_WrongComputer, expected.str(), host.str());
  485. }
  486. srcFormat.deserialize(msg);
  487. tgtFormat.deserialize(msg);
  488. msg.read(calcInputCRC);
  489. msg.read(calcOutputCRC);
  490. deserialize(partition, msg);
  491. msg.read(numParallelSlaves);
  492. msg.read(updateFrequency);
  493. msg.read(copySourceTimeStamp);
  494. msg.read(mirror);
  495. msg.read(isSafeMode);
  496. srand((unsigned)get_cycles_now());
  497. int adjust = fastRand() % updateFrequency - (updateFrequency/2);
  498. lastTick = msTick() + adjust;
  499. StringBuffer localFilename;
  500. if (action == FTactionpull)
  501. {
  502. partition.item(0).outputName.getPath(localFilename);
  503. LOG(MCdebugProgress, unknownJob, "Process Pull Command: %s", localFilename.str());
  504. }
  505. else
  506. {
  507. partition.item(0).inputName.getPath(localFilename);
  508. LOG(MCdebugProgress, unknownJob, "Process Push Command: %s", localFilename.str());
  509. }
  510. LOG(MCdebugProgress, unknownJob, "Num Parallel Slaves=%d Adjust=%d/%d", numParallelSlaves, adjust, updateFrequency);
  511. LOG(MCdebugProgress, unknownJob, "copySourceTimeStamp(%d) mirror(%d) safe(%d) incrc(%d) outcrc(%d)", copySourceTimeStamp, mirror, isSafeMode, calcInputCRC, calcOutputCRC);
  512. displayPartition(partition);
  513. unsigned numProgress;
  514. msg.read(numProgress);
  515. for (unsigned i = 0; i < numProgress; i++)
  516. {
  517. OutputProgress & next = *new OutputProgress;
  518. next.deserializeCore(msg);
  519. progress.append(next);
  520. }
  521. if (msg.remaining())
  522. msg.read(throttleNicSpeed);
  523. if (msg.remaining())
  524. msg.read(compressedInput).read(compressOutput);
  525. if (msg.remaining())
  526. msg.read(copyCompressed);
  527. if (msg.remaining())
  528. msg.read(transferBufferSize);
  529. if (msg.remaining())
  530. msg.read(encryptKey).read(decryptKey);
  531. if (msg.remaining())
  532. {
  533. srcFormat.deserializeExtra(msg, 1);
  534. tgtFormat.deserializeExtra(msg, 1);
  535. }
  536. ForEachItemIn(i1, progress)
  537. progress.item(i1).deserializeExtra(msg, 1);
  538. if (msg.remaining())
  539. msg.read(fileUmask);
  540. LOG(MCdebugProgress, unknownJob, "throttle(%d), transferBufferSize(%d)", throttleNicSpeed, transferBufferSize);
  541. PROGLOG("compressedInput(%d), compressedOutput(%d), copyCompressed(%d)", compressedInput?1:0, compressOutput?1:0, copyCompressed?1:0);
  542. PROGLOG("encrypt(%d), decrypt(%d)", encryptKey.isEmpty()?0:1, decryptKey.isEmpty()?0:1);
  543. if (fileUmask != -1)
  544. PROGLOG("umask(0%o)", fileUmask);
  545. else
  546. PROGLOG("umask(default)");
  547. //---Finished deserializing ---
  548. displayProgress(progress);
  549. totalLengthRead = 0;
  550. totalLengthToRead = 0;
  551. ForEachItemIn(idx, partition)
  552. totalLengthToRead += partition.item(idx).inputLength;
  553. }
  554. unsigned TransferServer::queryLastOutput(unsigned outputIndex)
  555. {
  556. ForEachItemInRev(idx, partition)
  557. if (partition.item(idx).whichOutput == outputIndex)
  558. return idx;
  559. return (unsigned int) -1;
  560. }
  561. void TransferServer::transferChunk(unsigned chunkIndex)
  562. {
  563. PartitionPoint & curPartition = partition.item(chunkIndex);
  564. OutputProgress & curProgress = progress.item(chunkIndex);
  565. StringBuffer targetPath;
  566. curPartition.outputName.getPath(targetPath);
  567. LOG(MCdebugProgress, unknownJob, "Begin to transfer chunk %d (offset: %" I64F "d, size: %" I64F "d) to target:'%s' (offset: %" I64F "d, size: %" I64F "d) ",
  568. chunkIndex, curPartition.inputOffset, curPartition.inputLength, targetPath.str(), curPartition.outputOffset, curPartition.outputLength);
  569. const unsigned __int64 startOutOffset = out->tell();
  570. if (startOutOffset != curPartition.outputOffset+curProgress.outputLength)
  571. throwError4(DFTERR_OutputOffsetMismatch, out->tell(), curPartition.outputOffset+curProgress.outputLength, "start", chunkIndex);
  572. size32_t fixedTextLength = (size32_t)curPartition.fixedText.length();
  573. if (fixedTextLength || curPartition.inputName.isNull())
  574. {
  575. out->write(fixedTextLength, curPartition.fixedText.get());
  576. curProgress.status = OutputProgress::StatusCopied;
  577. curProgress.inputLength = fixedTextLength;
  578. curProgress.outputLength = fixedTextLength;
  579. if (crcOut)
  580. curProgress.outputCRC = crcOut->getCRC();
  581. sendProgress(curProgress);
  582. }
  583. else
  584. {
  585. Owned<ITransformer> transformer = createTransformer(srcFormat, tgtFormat, transferBufferSize);
  586. if (!transformer->setPartition(curPartition.inputName,
  587. curPartition.inputOffset+curProgress.inputLength,
  588. curPartition.inputLength-curProgress.inputLength,
  589. compressedInput,
  590. decryptKey))
  591. {
  592. StringBuffer temp;
  593. throwError1(DFTERR_CouldNotOpenFile, curPartition.inputName.getRemotePath(temp).str());
  594. }
  595. if (calcInputCRC)
  596. transformer->setInputCRC(curProgress.inputCRC);
  597. appendTransformed(chunkIndex, transformer);
  598. }
  599. assertex(out->tell() == curPartition.outputOffset + curProgress.outputLength);
  600. if (copyCompressed)
  601. {
  602. //Now the copy of this chunk is complete, update the progress with the full expected length.
  603. //Don't do it before otherwise recovery won't work very well.
  604. curProgress.outputLength = curPartition.outputLength;
  605. }
  606. else
  607. {
  608. if (curPartition.outputLength && (curProgress.outputLength != curPartition.outputLength))
  609. throwError4(DFTERR_OutputOffsetMismatch, out->tell(), curPartition.outputOffset+curPartition.outputLength, "end", chunkIndex);
  610. }
  611. }
  612. bool TransferServer::pull()
  613. {
  614. unsigned curOutput = (unsigned)-1;
  615. unsigned start;
  616. unsigned __int64 curOutputOffset = 0;
  617. //loop through all partitions - inner loop does a file at a time.
  618. unsigned numPartitions = partition.ordinality();
  619. for (start = 0; start < numPartitions; )
  620. {
  621. PartitionPoint & startPartition = partition.item(start);
  622. OutputProgress & startProgress = progress.item(start);
  623. if (startProgress.status == OutputProgress::StatusBegin)
  624. break;
  625. assertex(!compressOutput);
  626. RemoteFilename localTempFilename;
  627. getDfuTempName(localTempFilename, startPartition.outputName);
  628. OwnedIFile outFile = createIFile(localTempFilename);
  629. OwnedIFileIO outio = outFile->openShared(IFOwrite,IFSHnone);
  630. unsigned __int64 size = outio ? outio->size() : 0;
  631. curOutput = startPartition.whichOutput;
  632. curOutputOffset = getHeaderSize(tgtFormat.type);
  633. unsigned i;
  634. for (i = start;i < numPartitions; i++)
  635. {
  636. PartitionPoint & curPartition = partition.item(i);
  637. OutputProgress & curProgress = progress.item(i);
  638. if (curOutput != curPartition.whichOutput)
  639. break;
  640. curPartition.outputOffset = curOutputOffset;
  641. unsigned __int64 progressOffset = curOutputOffset + curProgress.outputLength;
  642. if (progressOffset > size)
  643. {
  644. LOG(MCwarning, unknownJob, "Recovery information seems to be invalid (%" I64F "d %" I64F "d) start copying from the beginning",
  645. size, progressOffset);
  646. //reset any remaining partitions...
  647. for (i = start; i < numPartitions; i++)
  648. progress.item(i).reset();
  649. curOutput = (unsigned int) -1;
  650. goto processedProgress; // break out of both loops
  651. }
  652. assertex(curProgress.status != OutputProgress::StatusRenamed);
  653. if (curProgress.status != OutputProgress::StatusCopied)
  654. {
  655. out.setown(createIOStream(outio));
  656. out->seek(progressOffset, IFSbegin);
  657. wrapOutInCRC(curProgress.outputCRC);
  658. StringBuffer localFilename;
  659. localTempFilename.getPath(localFilename);
  660. LOG(MCdebugProgress, unknownJob, "Continue pulling to file: %s from recovery position", localFilename.str());
  661. start = i;
  662. goto processedProgress; // break out of both loops
  663. }
  664. curOutputOffset += curProgress.outputLength;
  665. }
  666. start = i;
  667. }
  668. processedProgress:
  669. //Delete any output files before generating the new ones.
  670. unsigned maxChunk = partition.ordinality();
  671. if (((start == 0) && !isSafeMode))
  672. {
  673. unsigned prevOutput = (unsigned int) -1;
  674. for (unsigned i = 0; i < maxChunk; i++)
  675. {
  676. PartitionPoint & curPartition = partition.item(i);
  677. if (curPartition.whichOutput != prevOutput)
  678. {
  679. OwnedIFile output = createIFile(curPartition.outputName);
  680. output->remove();
  681. prevOutput = curPartition.whichOutput;
  682. }
  683. }
  684. }
  685. for (unsigned idx=start; idx<maxChunk; idx++)
  686. {
  687. PartitionPoint & curPartition = partition.item(idx);
  688. OutputProgress & curProgress = progress.item(idx);
  689. //Either first non-recovery file, or the target file has changed....
  690. if (curOutput != curPartition.whichOutput)
  691. {
  692. curOutput = curPartition.whichOutput;
  693. if (curProgress.status == OutputProgress::StatusRenamed)
  694. {
  695. LOG(MCdebugProgress, unknownJob, "Renamed file found - must be CRC recovery");
  696. idx = queryLastOutput(curOutput);
  697. continue;
  698. }
  699. RemoteFilename localTempFilename;
  700. getDfuTempName(localTempFilename, curPartition.outputName);
  701. StringBuffer localFilename;
  702. localTempFilename.getPath(localFilename);
  703. if (!recursiveCreateDirectoryForFile(localFilename))
  704. throw makeOsExceptionV(GetLastError(), "Failed to create directory for file: %s", localFilename.str());
  705. OwnedIFile outFile = createIFile(localFilename.str());
  706. // if we want spray to not fill page cache use IFEnocache
  707. OwnedIFileIO outio = outFile->openShared(IFOcreate,IFSHnone,IFEnocache);
  708. if (!outio)
  709. throwError1(DFTERR_CouldNotCreateOutput, localFilename.str());
  710. if (compressOutput) {
  711. Owned<ICompressor> compressor;
  712. if (!encryptKey.isEmpty()) {
  713. StringBuffer key;
  714. decrypt(key,encryptKey);
  715. compressor.setown(createAESCompressor256(key.length(),key.str()));
  716. }
  717. outio.setown(createCompressedFileWriter(outio, false, 0, true, compressor, COMPRESS_METHOD_LZW));
  718. }
  719. LOG(MCdebugProgress, unknownJob, "Start pulling to file: %s", localFilename.str());
  720. //Find the last partition entry that refers to the same file.
  721. if (!compressOutput)
  722. {
  723. PartitionPoint & lastChunk = partition.item(queryLastOutput(curOutput));
  724. if (lastChunk.outputLength)
  725. {
  726. char null = 0;
  727. offset_t lastOffset = lastChunk.outputOffset+lastChunk.outputLength;
  728. outio->write(lastOffset-sizeof(null),sizeof(null),&null);
  729. LOG(MCdebugProgress, unknownJob, "Extend length of target file to %" I64F "d", lastOffset);
  730. }
  731. }
  732. out.setown(createIOStream(outio));
  733. out->seek(0, IFSbegin);
  734. wrapOutInCRC(0);
  735. unsigned headerLen = getHeaderSize(tgtFormat.type);
  736. if (headerLen)
  737. out->write(headerLen, getHeaderText(tgtFormat.type));
  738. curOutputOffset = headerLen;
  739. }
  740. else if (crcOut && (idx!=start))
  741. crcOut->setCRC(0);
  742. curPartition.outputOffset = curOutputOffset;
  743. transferChunk(idx);
  744. curOutputOffset += curProgress.outputLength;
  745. }
  746. crcOut.clear();
  747. out.clear();
  748. //Once the transfers have completed, rename the files, and sync file times
  749. //if replicating...
  750. if (!isSafeMode)
  751. {
  752. MemoryBuffer msg;
  753. unsigned prevOutput = (unsigned int) -1;
  754. for (unsigned i = 0; i < maxChunk; i++)
  755. {
  756. PartitionPoint & curPartition = partition.item(i);
  757. OutputProgress & curProgress = progress.item(i);
  758. if (curPartition.whichOutput != prevOutput)
  759. {
  760. if (curProgress.status != OutputProgress::StatusRenamed)
  761. {
  762. //rename the files..
  763. renameDfuTempToFinal(curPartition.outputName);
  764. OwnedIFile output = createIFile(curPartition.outputName);
  765. if (fileUmask != -1)
  766. output->setFilePermissions(~fileUmask&0666);
  767. if (mirror || copySourceTimeStamp)
  768. {
  769. OwnedIFile input = createIFile(curPartition.inputName);
  770. CDateTime modifiedTime;
  771. CDateTime createTime;
  772. if (input->getTime(&createTime, &modifiedTime, NULL))
  773. output->setTime(&createTime, &modifiedTime, NULL);
  774. }
  775. else if (!curPartition.modifiedTime.isNull())
  776. {
  777. OwnedIFile output = createIFile(curPartition.outputName);
  778. output->setTime(&curPartition.modifiedTime, &curPartition.modifiedTime, NULL);
  779. }
  780. else
  781. output->getTime(NULL, &curProgress.resultTime, NULL);
  782. //Notify the master that the file has been renamed - and send the modified time.
  783. msg.setEndian(__BIG_ENDIAN);
  784. curProgress.status = OutputProgress::StatusRenamed;
  785. if (compressOutput)
  786. {
  787. curProgress.compressedPartSize = output->size();
  788. curProgress.hasCompressed = true;
  789. }
  790. curProgress.serializeCore(msg.clear().append(false));
  791. curProgress.serializeExtra(msg, 1);
  792. if (!catchWriteBuffer(masterSocket, msg))
  793. throwError(RFSERR_TimeoutWaitMaster);
  794. }
  795. prevOutput = curPartition.whichOutput;
  796. }
  797. }
  798. }
  799. return true;
  800. }
  801. bool TransferServer::push()
  802. {
  803. //May be multiple sources files, and may not read all the chunks from the source file so opened each time..
  804. //Slightly inefficent, but not significant because it is likely to be local
  805. unsigned maxChunk = partition.ordinality();
  806. for (unsigned idx=0;idx<maxChunk;idx++)
  807. {
  808. PartitionPoint & curPartition = partition.item(idx);
  809. OutputProgress & curProgress = progress.item(idx);
  810. if (curProgress.status != OutputProgress::StatusCopied)
  811. {
  812. RemoteFilename outFilename;
  813. getDfuTempName(outFilename, curPartition.outputName);
  814. OwnedIFile output = createIFile(outFilename);
  815. OwnedIFileIO outio = output->openShared(compressOutput?IFOreadwrite:IFOwrite,IFSHfull);
  816. if (!outio)
  817. {
  818. StringBuffer outputPath;
  819. outFilename.getRemotePath(outputPath);
  820. throwError1(DFTERR_CouldNotCreateOutput, outputPath.str());
  821. }
  822. if (compressOutput) {
  823. Owned<ICompressor> compressor;
  824. if (!encryptKey.isEmpty()) {
  825. StringBuffer key;
  826. decrypt(key,encryptKey);
  827. compressor.setown(createAESCompressor256(key.length(),key.str()));
  828. }
  829. outio.setown(createCompressedFileWriter(outio, false, 0, true, compressor, COMPRESS_METHOD_LZW));
  830. }
  831. out.setown(createIOStream(outio));
  832. if (!compressOutput)
  833. out->seek(curPartition.outputOffset + curProgress.outputLength, IFSbegin);
  834. wrapOutInCRC(curProgress.outputCRC);
  835. transferChunk(idx);
  836. if (compressOutput)
  837. {
  838. //Notify the master that the file compressed and its new size
  839. curProgress.compressedPartSize = output->size();
  840. curProgress.hasCompressed = true;
  841. sendProgress(curProgress);
  842. }
  843. crcOut.clear();
  844. out.clear();
  845. }
  846. }
  847. return true;
  848. }
  849. void TransferServer::wrapOutInCRC(unsigned startCRC)
  850. {
  851. if (calcOutputCRC)
  852. {
  853. crcOut.setown(new CrcIOStream(out, startCRC));
  854. out.set(crcOut);
  855. }
  856. }