|
@@ -129,6 +129,7 @@ void usage(const char *exe)
|
|
printf(" wuidcompress <wildcard> <type> -- scan workunits that match <wildcard> and compress resources of <type>\n");
|
|
printf(" wuidcompress <wildcard> <type> -- scan workunits that match <wildcard> and compress resources of <type>\n");
|
|
printf(" wuiddecompress <wildcard> <type> -- scan workunits that match <wildcard> and decompress resources of <type>\n");
|
|
printf(" wuiddecompress <wildcard> <type> -- scan workunits that match <wildcard> and decompress resources of <type>\n");
|
|
printf(" xmlsize <filename> [<percentage>] -- analyse size usage in xml file, display individual items above 'percentage' \n");
|
|
printf(" xmlsize <filename> [<percentage>] -- analyse size usage in xml file, display individual items above 'percentage' \n");
|
|
|
|
+ printf(" migratefiles <src-group> <target-group> [<filemask>] [dryrun] [createmaps] [listonly] [verbose]\n");
|
|
printf("\n");
|
|
printf("\n");
|
|
printf("Common options\n");
|
|
printf("Common options\n");
|
|
printf(" server=<dali-server-ip> -- server ip\n");
|
|
printf(" server=<dali-server-ip> -- server ip\n");
|
|
@@ -2808,6 +2809,285 @@ static void validateStore(bool fix, bool deleteFiles, bool verbose)
|
|
|
|
|
|
//=============================================================================
|
|
//=============================================================================
|
|
|
|
|
|
|
|
+static void migrateFiles(const char *srcGroup, const char *tgtGroup, const char *filemask, const char *_options)
|
|
|
|
+{
|
|
|
|
+ if (strieq(srcGroup, tgtGroup))
|
|
|
|
+ throw makeStringExceptionV(0, "source and target cluster groups cannot be the same! cluster = %s", srcGroup);
|
|
|
|
+
|
|
|
|
+ enum class mg_options : unsigned { nop, createmaps=1, listonly=2, dryrun=4, verbose=8};
|
|
|
|
+
|
|
|
|
+ StringArray options;
|
|
|
|
+ options.appendList(_options, ",");
|
|
|
|
+ mg_options opts = mg_options::nop;
|
|
|
|
+ ForEachItemIn(o, options)
|
|
|
|
+ {
|
|
|
|
+ const char *opt = options.item(o);
|
|
|
|
+ if (strieq("CREATEMAPS", opt))
|
|
|
|
+ opts = (mg_options)((unsigned)opts | (unsigned)mg_options::createmaps);
|
|
|
|
+ else if (strieq("LISTONLY", opt))
|
|
|
|
+ opts = (mg_options)((unsigned)opts | (unsigned)mg_options::listonly);
|
|
|
|
+ else if (strieq("DRYRUN", opt))
|
|
|
|
+ opts = (mg_options)((unsigned)opts | (unsigned)mg_options::dryrun);
|
|
|
|
+ else if (strieq("VERBOSE", opt))
|
|
|
|
+ opts = (mg_options)((unsigned)opts | (unsigned)mg_options::verbose);
|
|
|
|
+ else
|
|
|
|
+ WARNLOG("Unknown option: %s", opt);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /*
|
|
|
|
+ * CMatchScanner scans logical files, looking for files that are in the source group
|
|
|
|
+ * and matching against the logical file names against filemask.
|
|
|
|
+ * Then (depending on options) manipulates the meta data to point to new target group
|
|
|
|
+ * and outputs a file per node of the source group, with a list of all matching
|
|
|
|
+ * physical files in the format: srcIP,dstIP,physical file
|
|
|
|
+ */
|
|
|
|
+ class CMatchScanner : public CSDSFileScanner
|
|
|
|
+ {
|
|
|
|
+ StringAttr srcGroup, tgtGroup;
|
|
|
|
+ mg_options options;
|
|
|
|
+ StringBuffer tgtClusterGroupText;
|
|
|
|
+ Owned<IGroup> srcClusterGroup, tgtClusterGroup;
|
|
|
|
+ IPointerArrayOf<IFileIOStream> fileLists;
|
|
|
|
+ unsigned matchingFiles = 0;
|
|
|
|
+ Linked<IRemoteConnection> conn;
|
|
|
|
+ StringAttr filemask;
|
|
|
|
+ bool wild = false;
|
|
|
|
+ unsigned srcClusterSize = 0;
|
|
|
|
+ unsigned tgtClusterSize = 0;
|
|
|
|
+
|
|
|
|
+ bool mgOpt(mg_options o)
|
|
|
|
+ {
|
|
|
|
+ return ((unsigned)o & (unsigned)options);
|
|
|
|
+ }
|
|
|
|
+ IFileIOStream *getFileIOStream(unsigned p)
|
|
|
|
+ {
|
|
|
|
+ while (fileLists.ordinality()<=p)
|
|
|
|
+ fileLists.append(nullptr);
|
|
|
|
+
|
|
|
|
+ Linked<IFileIOStream> stream = fileLists.item(p);
|
|
|
|
+ if (nullptr == stream)
|
|
|
|
+ {
|
|
|
|
+ VStringBuffer filePartList("fileparts%u_%s_%u.lst", GetCurrentProcessId(), srcGroup.get(), p);
|
|
|
|
+ Owned<IFile> iFile = createIFile(filePartList);
|
|
|
|
+ Owned<IFileIO> iFileIO = iFile->open(IFOcreate);
|
|
|
|
+ if (!iFileIO)
|
|
|
|
+ throw makeStringExceptionV(0, "Failed to open: %s", filePartList.str());
|
|
|
|
+ stream.setown(createBufferedIOStream(iFileIO));
|
|
|
|
+ fileLists.replace(stream.getLink(), p);
|
|
|
|
+ }
|
|
|
|
+ return stream.getClear();
|
|
|
|
+ }
|
|
|
|
+ unsigned find(IGroup *group, const IpAddress &ip) const
|
|
|
|
+ {
|
|
|
|
+ unsigned c = group->ordinality();
|
|
|
|
+ for (unsigned i=0; i<c; i++)
|
|
|
|
+ {
|
|
|
|
+ const IpAddress &nodeIP = group->queryNode(i).endpoint();
|
|
|
|
+ if (ip.ipequals(nodeIP))
|
|
|
|
+ return i;
|
|
|
|
+ }
|
|
|
|
+ return NotFound;
|
|
|
|
+ }
|
|
|
|
+ public:
|
|
|
|
+ CMatchScanner(const char *_srcGroup, const char *_tgtGroup, mg_options _options) : srcGroup(_srcGroup), tgtGroup(_tgtGroup), options(_options)
|
|
|
|
+ {
|
|
|
|
+ srcClusterGroup.setown(queryNamedGroupStore().lookup(srcGroup));
|
|
|
|
+ if (!srcClusterGroup)
|
|
|
|
+ throw makeStringExceptionV(0, "Could not find source cluster group: %s", _srcGroup);
|
|
|
|
+ tgtClusterGroup.setown(queryNamedGroupStore().lookup(tgtGroup));
|
|
|
|
+ if (!tgtClusterGroup)
|
|
|
|
+ throw makeStringExceptionV(0, "Could not find target cluster group: %s", _tgtGroup);
|
|
|
|
+
|
|
|
|
+ srcClusterSize = srcClusterGroup->ordinality();
|
|
|
|
+ tgtClusterSize = tgtClusterGroup->ordinality();
|
|
|
|
+ if (tgtClusterSize>srcClusterSize)
|
|
|
|
+ throw makeStringExceptionV(0, "Unsupported - target cluster is wider than source (target size=%u, source size=%u", tgtClusterSize, srcClusterSize);
|
|
|
|
+ if (0 != (srcClusterSize%tgtClusterSize))
|
|
|
|
+ throw makeStringExceptionV(0, "Unsupported - target cluster must be a factor of source cluster size (target size=%u, source size=%u", tgtClusterSize, srcClusterSize);
|
|
|
|
+
|
|
|
|
+ tgtClusterGroup->getText(tgtClusterGroupText);
|
|
|
|
+ }
|
|
|
|
+ virtual bool checkFileOk(IPropertyTree &file, const char *filename) override
|
|
|
|
+ {
|
|
|
|
+ const char *group = file.queryProp("@group");
|
|
|
|
+ if (!group)
|
|
|
|
+ {
|
|
|
|
+ if (mgOpt(mg_options::verbose))
|
|
|
|
+ PROGLOG("No group defined - filename=%s, mask=%s, srcGroup=%s", filename, filemask.get(), srcGroup.get());
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ else if (nullptr == strstr(file.queryProp("@group"), srcGroup)) // crude match, could be rejected in processFile
|
|
|
|
+ {
|
|
|
|
+ if (mgOpt(mg_options::verbose))
|
|
|
|
+ PROGLOG("GROUP-MISMATCH - filename=%s, mask=%s, srcGroup=%s, file group=%s", filename, filemask.get(), srcGroup.get(), group);
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ else if (wild)
|
|
|
|
+ {
|
|
|
|
+ if (WildMatch(filename, filemask, false))
|
|
|
|
+ {
|
|
|
|
+ if (mgOpt(mg_options::verbose))
|
|
|
|
+ PROGLOG("WILD-MISMATCH - filename=%s, mask=%s, srcGroup=%s, file group=%s", filename, filemask.get(), srcGroup.get(), group);
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ else if (strieq(filename, filemask))
|
|
|
|
+ return true;
|
|
|
|
+ if (mgOpt(mg_options::verbose))
|
|
|
|
+ PROGLOG("EXACT-MISMATCH - filename=%s, mask=%s, srcGroup=%s, file group=%s", filename, filemask.get(), srcGroup.get(), group);
|
|
|
|
+ return false;
|
|
|
|
+ }
|
|
|
|
+ virtual bool checkScopeOk(const char *scopename) override
|
|
|
|
+ {
|
|
|
|
+ if (mgOpt(mg_options::verbose))
|
|
|
|
+ PROGLOG("Processing scope %s", scopename);
|
|
|
|
+ return true;
|
|
|
|
+ }
|
|
|
|
+ virtual void processFile(IPropertyTree &root, StringBuffer &name) override
|
|
|
|
+ {
|
|
|
|
+ try
|
|
|
|
+ {
|
|
|
|
+ bool doCommit = false;
|
|
|
|
+ StringBuffer _tgtClusterGroupText = tgtClusterGroupText;
|
|
|
|
+
|
|
|
|
+ Owned<IFileDescriptor> fileDesc = deserializeFileDescriptorTree(&root, &queryNamedGroupStore());
|
|
|
|
+ unsigned numClusters = fileDesc->numClusters();
|
|
|
|
+ for (unsigned clusterNum=0; clusterNum<numClusters; clusterNum++)
|
|
|
|
+ {
|
|
|
|
+ StringBuffer srcFileGroup;
|
|
|
|
+ fileDesc->getClusterGroupName(clusterNum, srcFileGroup);
|
|
|
|
+
|
|
|
|
+ StringBuffer srcFileGroupName, srcFileGroupRange;
|
|
|
|
+ if (!decodeChildGroupName(srcFileGroup, srcFileGroupName, srcFileGroupRange))
|
|
|
|
+ srcFileGroupName.append(srcFileGroup);
|
|
|
|
+ if (streq(srcFileGroupName, srcGroup))
|
|
|
|
+ {
|
|
|
|
+ IGroup *srcFileClusterGroup = fileDesc->queryClusterGroup(clusterNum);
|
|
|
|
+ unsigned srcFileClusterGroupWidth = srcFileClusterGroup->ordinality();
|
|
|
|
+
|
|
|
|
+ StringBuffer _tgtGroup(tgtGroup);
|
|
|
|
+ unsigned groupOffset = NotFound;
|
|
|
|
+ if (srcFileGroupRange.length())
|
|
|
|
+ {
|
|
|
|
+ SocketEndpointArray epas;
|
|
|
|
+ UnsignedArray dstPositions;
|
|
|
|
+ Owned<INodeIterator> nodeIter = srcFileClusterGroup->getIterator();
|
|
|
|
+ ForEach(*nodeIter)
|
|
|
|
+ {
|
|
|
|
+ const IpAddress &ip = nodeIter->query().endpoint();
|
|
|
|
+ unsigned srcRelPos = find(srcClusterGroup, ip);
|
|
|
|
+ if (NotFound == groupOffset)
|
|
|
|
+ groupOffset = srcRelPos;
|
|
|
|
+ unsigned dstRelPos = srcRelPos % tgtClusterSize;
|
|
|
|
+ dstPositions.append(dstRelPos);
|
|
|
|
+ }
|
|
|
|
+ StringBuffer rangeText;
|
|
|
|
+ encodeChildGroupRange(dstPositions, rangeText);
|
|
|
|
+ _tgtGroup.append(rangeText);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ groupOffset = 0;
|
|
|
|
+ unsigned numParts = fileDesc->numParts();
|
|
|
|
+ PROGLOG("Processing file %s (width=%u), cluster group=%s (%u of %u), new group = %s", name.str(), numParts, srcFileGroup.str(), clusterNum+1, numClusters, _tgtGroup.str());
|
|
|
|
+ if (!mgOpt(mg_options::listonly))
|
|
|
|
+ {
|
|
|
|
+ if (!mgOpt(mg_options::dryrun))
|
|
|
|
+ {
|
|
|
|
+ doCommit = true;
|
|
|
|
+ VStringBuffer clusterXPath("Cluster[%u]", clusterNum+1);
|
|
|
|
+ IPropertyTree *cluster = root.queryPropTree(clusterXPath);
|
|
|
|
+ root.setProp("@group", _tgtGroup);
|
|
|
|
+ if (cluster)
|
|
|
|
+ cluster->setProp("@name", _tgtGroup);
|
|
|
|
+ else
|
|
|
|
+ WARNLOG("No Cluster found for file: %s", name.str());
|
|
|
|
+ }
|
|
|
|
+ if (mgOpt(mg_options::createmaps))
|
|
|
|
+ {
|
|
|
|
+ for (unsigned partNum=0; partNum<numParts; partNum++)
|
|
|
|
+ {
|
|
|
|
+ unsigned r = partNum % srcFileClusterGroupWidth;
|
|
|
|
+ const SocketEndpoint &srcEp = srcFileClusterGroup->queryNode(r).endpoint();
|
|
|
|
+ unsigned relPos = find(srcClusterGroup, srcEp);
|
|
|
|
+ unsigned dstPos = (partNum+groupOffset) % tgtClusterSize;
|
|
|
|
+ const SocketEndpoint &tgtEp = tgtClusterGroup->queryNode(dstPos).endpoint();
|
|
|
|
+
|
|
|
|
+ // output srcIP, dstIP, path/file-part-name >> script<N>.lst
|
|
|
|
+
|
|
|
|
+ Owned<IFileIOStream> iFileIOStream = getFileIOStream(relPos+1);
|
|
|
|
+
|
|
|
|
+ StringBuffer outputLine;
|
|
|
|
+ srcEp.getIpText(outputLine);
|
|
|
|
+ outputLine.append(",");
|
|
|
|
+ tgtEp.getIpText(outputLine);
|
|
|
|
+ outputLine.append(",");
|
|
|
|
+
|
|
|
|
+ IPartDescriptor *part = fileDesc->queryPart(partNum);
|
|
|
|
+ StringBuffer filePath;
|
|
|
|
+ part->getPath(filePath);
|
|
|
|
+
|
|
|
|
+ outputLine.append(filePath);
|
|
|
|
+ outputLine.newline();
|
|
|
|
+
|
|
|
|
+ iFileIOStream->write(outputLine.length(), outputLine.str());
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ ++matchingFiles;
|
|
|
|
+ if (doCommit)
|
|
|
|
+ conn->commit(); // NB: the scanner rolls back any changes, mainly to reduce cost/exposure to previously lazy fetched scope branches
|
|
|
|
+ }
|
|
|
|
+ catch (IException *e)
|
|
|
|
+ {
|
|
|
|
+ VStringBuffer errorMsg("Failed to process file : %s", name.str());
|
|
|
|
+ EXCLOG(e, errorMsg.str());
|
|
|
|
+ e->Release();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ unsigned scan(IRemoteConnection *_conn, const char *_filemask, bool includefiles=true, bool includesuper=false)
|
|
|
|
+ {
|
|
|
|
+ filemask.set(_filemask);
|
|
|
|
+ conn.set(_conn);
|
|
|
|
+ wild = containsWildcard(_filemask);
|
|
|
|
+ CSDSFileScanner::scan(_conn, includefiles, includesuper);
|
|
|
|
+ return matchingFiles;
|
|
|
|
+ }
|
|
|
|
+ } scanner(srcGroup, tgtGroup, opts);
|
|
|
|
+
|
|
|
|
+ IUserDescriptor *user = nullptr;
|
|
|
|
+ Owned<IRemoteConnection> conn = querySDS().connect("/Files", myProcessSession(), 0, 100000);
|
|
|
|
+ bool success=false;
|
|
|
|
+ unsigned matchingFiles=0;
|
|
|
|
+ try
|
|
|
|
+ {
|
|
|
|
+ matchingFiles = scanner.scan(conn, filemask, true, false);
|
|
|
|
+ success=true;
|
|
|
|
+ }
|
|
|
|
+ catch (IException *e)
|
|
|
|
+ {
|
|
|
|
+ EXCLOG(e, nullptr);
|
|
|
|
+ e->Release();
|
|
|
|
+ }
|
|
|
|
+ if (!success)
|
|
|
|
+ {
|
|
|
|
+ WARNLOG("Failed to make changes");
|
|
|
|
+ conn->rollback();
|
|
|
|
+ }
|
|
|
|
+ else if ((unsigned)opts & (unsigned)mg_options::dryrun)
|
|
|
|
+ {
|
|
|
|
+ conn->rollback();
|
|
|
|
+ WARNLOG("Dry-run, no changes committed. %u files matched", matchingFiles);
|
|
|
|
+ }
|
|
|
|
+ else
|
|
|
|
+ PROGLOG("Committed changes: %u files changed", matchingFiles);
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+//=============================================================================
|
|
|
|
+
|
|
|
|
+
|
|
|
|
|
|
void testThorRunningWUs()
|
|
void testThorRunningWUs()
|
|
{
|
|
{
|
|
@@ -3215,6 +3495,24 @@ int main(int argc, char* argv[])
|
|
dumpStats(params.item(1), params.item(2), params.item(3), params.item(4), params.item(5), params.item(6), nullptr, csv);
|
|
dumpStats(params.item(1), params.item(2), params.item(3), params.item(4), params.item(5), params.item(6), nullptr, csv);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
+ else if (stricmp(cmd, "migratefiles") == 0)
|
|
|
|
+ {
|
|
|
|
+ CHECKPARAMS(2, 7);
|
|
|
|
+ const char *srcGroup = params.item(1);
|
|
|
|
+ const char *dstGroup = params.item(2);
|
|
|
|
+ const char *filemask = "*";
|
|
|
|
+ StringBuffer options;
|
|
|
|
+ if (params.isItem(3))
|
|
|
|
+ {
|
|
|
|
+ filemask = params.item(3);
|
|
|
|
+ unsigned arg=4;
|
|
|
|
+ StringArray optArray;
|
|
|
|
+ while (arg<params.ordinality())
|
|
|
|
+ optArray.append(params.item(arg++));
|
|
|
|
+ optArray.getString(options, ",");
|
|
|
|
+ }
|
|
|
|
+ migrateFiles(srcGroup, dstGroup, filemask, options);
|
|
|
|
+ }
|
|
else
|
|
else
|
|
ERRLOG("Unknown command %s",cmd);
|
|
ERRLOG("Unknown command %s",cmd);
|
|
}
|
|
}
|