thkeydiff.cpp 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. /*##############################################################################
  2. Copyright (C) 2011 HPCC Systems.
  3. All rights reserved. This program is free software: you can redistribute it and/or modify
  4. it under the terms of the GNU Affero General Public License as
  5. published by the Free Software Foundation, either version 3 of the
  6. License, or (at your option) any later version.
  7. This program is distributed in the hope that it will be useful,
  8. but WITHOUT ANY WARRANTY; without even the implied warranty of
  9. MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  10. GNU Affero General Public License for more details.
  11. You should have received a copy of the GNU Affero General Public License
  12. along with this program. If not, see <http://www.gnu.org/licenses/>.
  13. ############################################################################## */
  14. #include "jiface.hpp"
  15. #include "dadfs.hpp"
  16. #include "thorfile.hpp"
  17. #include "eclhelper.hpp"
  18. #include "thexception.hpp"
  19. #include "thkeydiff.ipp"
  20. class CKeyDiffMaster : public CMasterActivity
  21. {
  22. IHThorKeyDiffArg *helper;
  23. Owned<IDistributedFile> originalIndexFile, newIndexFile;
  24. bool local, copyTlk;
  25. unsigned width;
  26. Owned<IFileDescriptor> patchDesc, originalDesc, newIndexDesc;
  27. StringArray clusters;
  28. public:
  29. CKeyDiffMaster(CMasterGraphElement *info) : CMasterActivity(info)
  30. {
  31. helper = NULL;
  32. local = false;
  33. width = 0;
  34. copyTlk = globals->getPropBool("@diffCopyTlk", true); // because tlk can have meta data and diff/patch does not support
  35. }
  36. ~CKeyDiffMaster()
  37. {
  38. }
  39. void init()
  40. {
  41. helper = (IHThorKeyDiffArg *)queryHelper();
  42. originalIndexFile.setown(queryThorFileManager().lookup(container.queryJob(), helper->queryOriginalName()));
  43. newIndexFile.setown(queryThorFileManager().lookup(container.queryJob(), helper->queryUpdatedName()));
  44. if (originalIndexFile->numParts() != newIndexFile->numParts())
  45. throw MakeActivityException(this, TE_KeyDiffIndexSizeMismatch, "Index %s and %s differ in width", helper->queryOriginalName(), helper->queryUpdatedName());
  46. if (originalIndexFile->querySuperFile() || newIndexFile->querySuperFile())
  47. throw MakeActivityException(this, 0, "Diffing super files not supported");
  48. width = originalIndexFile->numParts();
  49. originalDesc.setown(originalIndexFile->getFileDescriptor());
  50. newIndexDesc.setown(newIndexFile->getFileDescriptor());
  51. Owned<IPartDescriptor> tlkDesc = originalDesc->getPart(originalDesc->numParts()-1);
  52. const char *kind = tlkDesc->queryProperties().queryProp("@kind");
  53. local = NULL == kind || 0 != stricmp("topLevelKey", kind);
  54. if (!local)
  55. width--; // 1 part == No n distributed / Monolithic key
  56. if (width > container.queryJob().querySlaves())
  57. throw MakeActivityException(this, 0, "Unsupported: keydiff(%s, %s) - Cannot diff a key that's wider(%d) than the target cluster size(%d)", originalIndexFile->queryLogicalName(), newIndexFile->queryLogicalName(), width, container.queryJob().querySlaves());
  58. queryThorFileManager().noteFileRead(container.queryJob(), originalIndexFile);
  59. queryThorFileManager().noteFileRead(container.queryJob(), newIndexFile);
  60. IArrayOf<IGroup> groups;
  61. fillClusterArray(container.queryJob(), helper->queryOutputName(), clusters, groups);
  62. patchDesc.setown(queryThorFileManager().create(container.queryJob(), helper->queryOutputName(), clusters, groups, 0 != (KDPoverwrite & helper->getFlags()), 0, !local, width));
  63. }
  64. void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
  65. {
  66. if (slave < width) // if false - due to mismatch width fitting - fill in with a blank entry
  67. {
  68. dst.append(true);
  69. Owned<IPartDescriptor> originalPartDesc = originalDesc->getPart(slave);
  70. originalPartDesc->serialize(dst);
  71. Owned<IPartDescriptor> newIndexPartDesc = newIndexDesc->getPart(slave);
  72. newIndexPartDesc->serialize(dst);
  73. patchDesc->queryPart(slave)->serialize(dst);
  74. if (0 == slave)
  75. {
  76. if (!local)
  77. {
  78. dst.append(true);
  79. Owned<IPartDescriptor> originalTlkPartDesc = originalDesc->getPart(originalDesc->numParts()-1);
  80. originalTlkPartDesc->serialize(dst);
  81. Owned<IPartDescriptor> newIndexTlkPartDesc = newIndexDesc->getPart(newIndexDesc->numParts()-1);
  82. newIndexTlkPartDesc->serialize(dst);
  83. patchDesc->queryPart(patchDesc->numParts()-1)->serialize(dst);
  84. }
  85. else
  86. dst.append(false);
  87. }
  88. }
  89. else
  90. dst.append(false); // no part
  91. }
  92. void slaveDone(size32_t slaveIdx, MemoryBuffer &mb)
  93. {
  94. if (mb.length()) // if 0 implies aborted out from this slave.
  95. {
  96. offset_t size;
  97. mb.read(size);
  98. CDateTime modifiedTime(mb);
  99. IPartDescriptor *partDesc = patchDesc->queryPart(slaveIdx);
  100. IPropertyTree &props = partDesc->queryProperties();
  101. StringBuffer timeStr;
  102. modifiedTime.getString(timeStr);
  103. props.setProp("@modified", timeStr.str());
  104. unsigned crc;
  105. mb.read(crc);
  106. props.setPropInt64("@fileCrc", crc);
  107. if (!local && 0 == slaveIdx)
  108. {
  109. IPartDescriptor *partDesc = patchDesc->queryPart(patchDesc->numParts()-1);
  110. IPropertyTree &props = partDesc->queryProperties();
  111. mb.read(size);
  112. props.setPropInt64("@size", size);
  113. CDateTime modifiedTime(mb);
  114. StringBuffer timeStr;
  115. modifiedTime.getString(timeStr);
  116. props.setProp("@modified", timeStr.str());
  117. if (copyTlk)
  118. {
  119. Owned<IPartDescriptor> tlkDesc = newIndexDesc->getPart(newIndexDesc->numParts()-1);
  120. if (partDesc->getCrc(crc))
  121. props.setPropInt64("@fileCrc", crc);
  122. props.setProp("@diffFormat", "copy");
  123. }
  124. else
  125. {
  126. mb.read(crc);
  127. props.setPropInt64("@fileCrc", crc);
  128. props.setProp("@diffFormat", "diffV1");
  129. }
  130. }
  131. }
  132. }
  133. void done()
  134. {
  135. StringBuffer scopedName;
  136. queryThorFileManager().addScope(container.queryJob(), helper->queryOutputName(), scopedName);
  137. Owned<IWorkUnit> wu = &container.queryJob().queryWorkUnit().lock();
  138. Owned<IWUResult> r = wu->updateResultBySequence(helper->getSequence());
  139. r->setResultStatus(ResultStatusCalculated);
  140. r->setResultLogicalName(scopedName.str());
  141. r.clear();
  142. wu.clear();
  143. IPropertyTree &patchProps = patchDesc->queryProperties();
  144. setExpiryTime(patchProps, helper->getExpiryDays());
  145. IPropertyTree &originalProps = originalDesc->queryProperties();;
  146. if (originalProps.queryProp("ECL"))
  147. patchProps.setProp("ECL", originalProps.queryProp("ECL"));
  148. if (originalProps.getPropBool("@local"))
  149. patchProps.setPropBool("@local", true);
  150. container.queryTempHandler()->registerFile(helper->queryOutputName(), container.queryOwner().queryGraphId(), 0, false, WUFileStandard, &clusters);
  151. Owned<IDistributedFile> patchFile;
  152. // set part sizes etc
  153. queryThorFileManager().publish(container.queryJob(), helper->queryOutputName(), false, *patchDesc, &patchFile, 0, false);
  154. try { // set file size
  155. if (patchFile) {
  156. __int64 fs = patchFile->getFileSize(true,false);
  157. if (fs!=-1)
  158. patchFile->queryProperties().setPropInt64("@size",fs);
  159. }
  160. }
  161. catch (IException *e) {
  162. EXCLOG(e,"keydiff setting file size");
  163. e->Release();
  164. }
  165. // Add a new 'Patch' description to the secondary key.
  166. newIndexFile->lockProperties();
  167. IPropertyTree &fileProps = newIndexFile->queryProperties();
  168. StringBuffer path("Patch[@name=\"");
  169. path.append(scopedName.str()).append("\"]");
  170. IPropertyTree *patch = fileProps.queryPropTree(path.str());
  171. if (!patch) patch = fileProps.addPropTree("Patch", createPTree());
  172. patch->setProp("@name", scopedName.str());
  173. unsigned checkSum;
  174. if (patchFile->getFileCheckSum(checkSum))
  175. patch->setPropInt64("@checkSum", checkSum);
  176. IPropertyTree *index = patch->setPropTree("Index", createPTree());
  177. index->setProp("@name", originalIndexFile->queryLogicalName());
  178. if (originalIndexFile->getFileCheckSum(checkSum))
  179. index->setPropInt64("@checkSum", checkSum);
  180. newIndexFile->unlockProperties();
  181. }
  182. void preStart(size32_t parentExtractSz, const byte *parentExtract)
  183. {
  184. CMasterActivity::preStart(parentExtractSz, parentExtract);
  185. IHThorKeyDiffArg *helper = (IHThorKeyDiffArg *) queryHelper();
  186. if (0==(KDPoverwrite & helper->getFlags()))
  187. {
  188. if (KDPvaroutputname & helper->getFlags()) return;
  189. Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), helper->queryOutputName(), false, true);
  190. if (file)
  191. throw MakeActivityException(this, TE_OverwriteNotSpecified, "Cannot write %s, file already exists (missing OVERWRITE attribute?)", file->queryLogicalName());
  192. }
  193. }
  194. void kill()
  195. {
  196. CMasterActivity::kill();
  197. originalIndexFile.clear();
  198. newIndexFile.clear();
  199. }
  200. };
  201. CActivityBase *createKeyDiffActivityMaster(CMasterGraphElement *container)
  202. {
  203. return new CKeyDiffMaster(container);
  204. }