thkeydiff.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "jiface.hpp"
  14. #include "dadfs.hpp"
  15. #include "thorfile.hpp"
  16. #include "eclhelper.hpp"
  17. #include "thexception.hpp"
  18. #include "thkeydiff.ipp"
  19. class CKeyDiffMaster : public CMasterActivity
  20. {
  21. IHThorKeyDiffArg *helper;
  22. Owned<IDistributedFile> originalIndexFile, newIndexFile;
  23. bool local, copyTlk;
  24. unsigned width;
  25. Owned<IFileDescriptor> patchDesc, originalDesc, newIndexDesc;
  26. StringArray clusters;
  27. StringAttr originalName, updatedName, outputName;
  28. public:
  29. CKeyDiffMaster(CMasterGraphElement *info) : CMasterActivity(info)
  30. {
  31. helper = (IHThorKeyDiffArg *)queryHelper();
  32. local = false;
  33. width = 0;
  34. copyTlk = globals->getPropBool("@diffCopyTlk", true); // because tlk can have meta data and diff/patch does not support
  35. }
  36. virtual void init()
  37. {
  38. CMasterActivity::init();
  39. OwnedRoxieString originalHelperName(helper->getOriginalName());
  40. OwnedRoxieString updatedHelperName(helper->getUpdatedName());
  41. OwnedRoxieString outputHelperName(helper->getOutputName());
  42. StringBuffer expandedFileName;
  43. queryThorFileManager().addScope(container.queryJob(), originalHelperName, expandedFileName, false);
  44. originalName.set(expandedFileName);
  45. queryThorFileManager().addScope(container.queryJob(), updatedHelperName, expandedFileName.clear(), false);
  46. updatedName.set(expandedFileName);
  47. queryThorFileManager().addScope(container.queryJob(), outputHelperName, expandedFileName.clear(), false);
  48. outputName.set(expandedFileName);
  49. originalIndexFile.setown(lookupReadFile(originalHelperName, false, false, false));
  50. newIndexFile.setown(lookupReadFile(updatedHelperName, false, false, false));
  51. if (!isFileKey(originalIndexFile))
  52. throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read flat file as an index: %s", originalHelperName.get());
  53. if (!isFileKey(newIndexFile))
  54. throw MakeActivityException(this, TE_FileTypeMismatch, "Attempting to read flat file as an index: %s", updatedHelperName.get());
  55. if (originalIndexFile->numParts() != newIndexFile->numParts())
  56. throw MakeActivityException(this, TE_KeyDiffIndexSizeMismatch, "Index %s and %s differ in width", originalName.get(), updatedName.get());
  57. if (originalIndexFile->querySuperFile() || newIndexFile->querySuperFile())
  58. throw MakeActivityException(this, 0, "Diffing super files not supported");
  59. width = originalIndexFile->numParts();
  60. originalDesc.setown(originalIndexFile->getFileDescriptor());
  61. newIndexDesc.setown(newIndexFile->getFileDescriptor());
  62. Owned<IPartDescriptor> tlkDesc = originalDesc->getPart(originalDesc->numParts()-1);
  63. const char *kind = tlkDesc->queryProperties().queryProp("@kind");
  64. local = NULL == kind || 0 != stricmp("topLevelKey", kind);
  65. if (!local)
  66. width--; // 1 part == No n distributed / Monolithic key
  67. if (width > container.queryJob().querySlaves())
  68. throw MakeActivityException(this, 0, "Unsupported: keydiff(%s, %s) - Cannot diff a key that's wider(%d) than the target cluster size(%d)", originalIndexFile->queryLogicalName(), newIndexFile->queryLogicalName(), width, container.queryJob().querySlaves());
  69. StringBuffer defaultCluster;
  70. if (getDefaultStoragePlane(defaultCluster))
  71. clusters.append(defaultCluster);
  72. IArrayOf<IGroup> groups;
  73. fillClusterArray(container.queryJob(), outputName, clusters, groups);
  74. patchDesc.setown(queryThorFileManager().create(container.queryJob(), outputName, clusters, groups, 0 != (KDPoverwrite & helper->getFlags()), 0, !local, width));
  75. patchDesc->queryProperties().setProp("@kind", "keydiff");
  76. }
  77. virtual void serializeSlaveData(MemoryBuffer &dst, unsigned slave)
  78. {
  79. if (slave < width) // if false - due to mismatch width fitting - fill in with a blank entry
  80. {
  81. dst.append(true);
  82. Owned<IPartDescriptor> originalPartDesc = originalDesc->getPart(slave);
  83. originalPartDesc->serialize(dst);
  84. Owned<IPartDescriptor> newIndexPartDesc = newIndexDesc->getPart(slave);
  85. newIndexPartDesc->serialize(dst);
  86. patchDesc->queryPart(slave)->serialize(dst);
  87. if (0 == slave)
  88. {
  89. if (!local)
  90. {
  91. dst.append(true);
  92. Owned<IPartDescriptor> originalTlkPartDesc = originalDesc->getPart(originalDesc->numParts()-1);
  93. originalTlkPartDesc->serialize(dst);
  94. Owned<IPartDescriptor> newIndexTlkPartDesc = newIndexDesc->getPart(newIndexDesc->numParts()-1);
  95. newIndexTlkPartDesc->serialize(dst);
  96. patchDesc->queryPart(patchDesc->numParts()-1)->serialize(dst);
  97. }
  98. else
  99. dst.append(false);
  100. }
  101. }
  102. else
  103. dst.append(false); // no part
  104. }
  105. virtual void slaveDone(size32_t slaveIdx, MemoryBuffer &mb)
  106. {
  107. if (mb.length()) // if 0 implies aborted out from this slave.
  108. {
  109. offset_t size;
  110. mb.read(size);
  111. CDateTime modifiedTime(mb);
  112. IPartDescriptor *partDesc = patchDesc->queryPart(slaveIdx);
  113. IPropertyTree &props = partDesc->queryProperties();
  114. StringBuffer timeStr;
  115. modifiedTime.getString(timeStr);
  116. props.setProp("@modified", timeStr.str());
  117. unsigned crc;
  118. mb.read(crc);
  119. props.setPropInt64("@fileCrc", crc);
  120. if (!local && 0 == slaveIdx)
  121. {
  122. IPartDescriptor *partDesc = patchDesc->queryPart(patchDesc->numParts()-1);
  123. IPropertyTree &props = partDesc->queryProperties();
  124. mb.read(size);
  125. props.setPropInt64("@size", size);
  126. CDateTime modifiedTime(mb);
  127. StringBuffer timeStr;
  128. modifiedTime.getString(timeStr);
  129. props.setProp("@modified", timeStr.str());
  130. if (copyTlk)
  131. {
  132. Owned<IPartDescriptor> tlkDesc = newIndexDesc->getPart(newIndexDesc->numParts()-1);
  133. if (partDesc->getCrc(crc))
  134. props.setPropInt64("@fileCrc", crc);
  135. props.setProp("@diffFormat", "copy");
  136. }
  137. else
  138. {
  139. mb.read(crc);
  140. props.setPropInt64("@fileCrc", crc);
  141. props.setProp("@diffFormat", "diffV1");
  142. }
  143. }
  144. }
  145. }
  146. virtual void done()
  147. {
  148. Owned<IWorkUnit> wu = &container.queryJob().queryWorkUnit().lock();
  149. Owned<IWUResult> r = wu->updateResultBySequence(helper->getSequence());
  150. //Do not mark the result as calculated - because the patch file isn't a valid result
  151. r->setResultLogicalName(outputName);
  152. r.clear();
  153. wu.clear();
  154. IPropertyTree &patchProps = patchDesc->queryProperties();
  155. if (0 != (helper->getFlags() & KDPexpires))
  156. setExpiryTime(patchProps, helper->getExpiryDays());
  157. IPropertyTree &originalProps = originalDesc->queryProperties();;
  158. if (originalProps.queryProp("ECL"))
  159. patchProps.setProp("ECL", originalProps.queryProp("ECL"));
  160. if (originalProps.getPropBool("@local"))
  161. patchProps.setPropBool("@local", true);
  162. container.queryTempHandler()->registerFile(outputName, container.queryOwner().queryGraphId(), 0, false, WUFileStandard, &clusters);
  163. Owned<IDistributedFile> patchFile;
  164. // set part sizes etc
  165. queryThorFileManager().publish(container.queryJob(), outputName, *patchDesc, &patchFile);
  166. try { // set file size
  167. if (patchFile) {
  168. __int64 fs = patchFile->getFileSize(true,false);
  169. if (fs!=-1)
  170. patchFile->queryAttributes().setPropInt64("@size",fs);
  171. }
  172. }
  173. catch (IException *e) {
  174. EXCLOG(e,"keydiff setting file size");
  175. e->Release();
  176. }
  177. // Add a new 'Patch' description to the secondary key.
  178. DistributedFilePropertyLock lock(newIndexFile);
  179. IPropertyTree &fileProps = lock.queryAttributes();
  180. StringBuffer path("Patch[@name=\"");
  181. path.append(outputName).append("\"]");
  182. IPropertyTree *patch = fileProps.queryPropTree(path.str());
  183. if (!patch) patch = fileProps.addPropTree("Patch", createPTree());
  184. patch->setProp("@name", outputName);
  185. unsigned checkSum;
  186. if (patchFile->getFileCheckSum(checkSum))
  187. patch->setPropInt64("@checkSum", checkSum);
  188. IPropertyTree *index = patch->setPropTree("Index", createPTree());
  189. index->setProp("@name", originalIndexFile->queryLogicalName());
  190. if (originalIndexFile->getFileCheckSum(checkSum))
  191. index->setPropInt64("@checkSum", checkSum);
  192. CMasterActivity::done();
  193. }
  194. virtual void preStart(size32_t parentExtractSz, const byte *parentExtract)
  195. {
  196. CMasterActivity::preStart(parentExtractSz, parentExtract);
  197. IHThorKeyDiffArg *helper = (IHThorKeyDiffArg *) queryHelper();
  198. if (0==(KDPoverwrite & helper->getFlags()))
  199. {
  200. if (KDPvaroutputname & helper->getFlags()) return;
  201. OwnedRoxieString outputHelperName(helper->getOutputName());
  202. Owned<IDistributedFile> file = queryThorFileManager().lookup(container.queryJob(), outputHelperName, false, true, false, container.activityIsCodeSigned());
  203. if (file)
  204. throw MakeActivityException(this, TE_OverwriteNotSpecified, "Cannot write %s, file already exists (missing OVERWRITE attribute?)", file->queryLogicalName());
  205. }
  206. }
  207. virtual void kill()
  208. {
  209. CMasterActivity::kill();
  210. originalIndexFile.clear();
  211. newIndexFile.clear();
  212. }
  213. };
  214. CActivityBase *createKeyDiffActivityMaster(CMasterGraphElement *container)
  215. {
  216. return new CKeyDiffMaster(container);
  217. }