瀏覽代碼

HPCC-8803 Directories hpcc-data2 and hpcc-data3 don't get used

Roxie should specify for each slave/channel what replication level it
represents, and then pick up from the standard directory locations
information.

Extend the named group store to retrieve information about the type of a
group.

Use per-channel caches to minimize work of file lookups (as calculating remote
names - even when they are then not needed - can get expensive).

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 年之前
父節點
當前提交
1973a71469

+ 66 - 34
dali/base/dadfs.cpp

@@ -3628,7 +3628,7 @@ public:
         if (newbasedir)
             diroverride = newbasedir;
 
-        const char *myBase = queryBaseDirectory(false,os);
+        const char *myBase = queryBaseDirectory(0, os);
         StringBuffer baseDir, newPath;
         makePhysicalPartName(logicalName.get(), 0, 0, newPath, false, os, diroverride);
         if (!getBase(directory, newPath, baseDir))
@@ -6069,16 +6069,48 @@ public:
 
 #define GROUP_CACHE_INTERVAL (1000*60)
 
+static const char *translateGroupType(GroupType groupType)
+{
+    switch (groupType)
+    {
+        case grp_thor:
+            return "Thor";
+        case grp_roxie:
+            return "Roxie";
+        case grp_roxiefarm:
+            return "RoxieFarm";
+        case grp_hthor:
+            return "hthor";
+        default:
+            return NULL;
+    }
+}
+
+static GroupType translateGroupType(const char *groupType)
+{
+    if (strieq(groupType, "Thor"))
+        return grp_thor;
+    else if (strieq(groupType, "Roxie"))
+        return grp_roxie;
+    else if (strieq(groupType, "RoxieFarm"))
+        return grp_roxiefarm;
+    else if (strieq(groupType, "hthor"))
+        return grp_hthor;
+    else
+        return grp_unknown;
+}
+
 class CNamedGroupCacheEntry: public CInterface
 {
 public:
     Linked<IGroup> group;
     StringAttr name;
-    StringAttr groupdir;
+    StringAttr groupDir;
     unsigned cachedtime;
+    GroupType groupType;
 
-    CNamedGroupCacheEntry(IGroup *_group, const char *_name, const char *_dir)
-    : group(_group), name(_name), groupdir(_dir)
+    CNamedGroupCacheEntry(IGroup *_group, const char *_name, const char *_dir, GroupType _groupType)
+    : group(_group), name(_name), groupDir(_dir), groupType(_groupType)
     {
         cachedtime = msTick();
     }
@@ -6098,7 +6130,7 @@ public:
         defaultTimeout = INFINITE;
     }
 
-    IGroup *dolookup(const char *logicalgroupname,IRemoteConnection *conn, StringBuffer *dirret)
+    IGroup *dolookup(const char *logicalgroupname,IRemoteConnection *conn, StringBuffer *dirret, GroupType &groupType)
     {
         SocketEndpointArray epa;
         StringBuffer gname(logicalgroupname);
@@ -6149,6 +6181,7 @@ public:
             logicalgroupname = gname.str();
         }
         StringAttr groupdir;
+        GroupType type;
         bool cached = false;
         unsigned timeNow = msTick();
         {
@@ -6166,12 +6199,14 @@ public:
                     if (range.length()==0)
                     {
                         if (dirret)
-                            dirret->append(entry.groupdir);
+                            dirret->append(entry.groupDir);
+                        groupType = entry.groupType;
                         return entry.group.getLink();
                     }
                     // there is a range so copy to epa
                     entry.group->getSocketEndpoints(epa);
-                    groupdir.set(entry.groupdir);
+                    groupdir.set(entry.groupDir);
+                    type = entry.groupType;
                     break;
                 }
             }
@@ -6185,7 +6220,7 @@ public:
                 s+=2;
                 if (*s) {
                     Owned<INode> dali = createINode(eps.str());
-                    if (!dali || !getRemoteGroup(dali, s, FOREIGN_DALI_TIMEOUT, groupdir, epa))
+                    if (!dali || !getRemoteGroup(dali, s, FOREIGN_DALI_TIMEOUT, groupdir, type, epa))
                         return NULL;
                 }
             }
@@ -6207,6 +6242,7 @@ public:
             if (!pt)
                 return NULL;
             groupdir.set(pt->queryProp("@dir"));
+            type = translateGroupType(pt->queryProp("@kind"));
             Owned<IPropertyTreeIterator> pe2 = pt->getElements("Node");
             ForEach(*pe2) {
                 SocketEndpoint ep(pe2->query().queryProp("@ip"));
@@ -6217,7 +6253,7 @@ public:
         if (!cached)
         {
             CriticalBlock block(cachesect);
-            cache.append(*new CNamedGroupCacheEntry(ret, gname, groupdir));
+            cache.append(*new CNamedGroupCacheEntry(ret, gname, groupdir, type));
         }
         if (range.length())
         {
@@ -6269,17 +6305,19 @@ public:
         }
         if (dirret)
             dirret->append(groupdir);
+        groupType = type;
         return ret.getClear();
     }
 
     IGroup *lookup(const char *logicalgroupname)
     {
-        return dolookup(logicalgroupname,NULL,NULL);
+        GroupType dummy;
+        return dolookup(logicalgroupname, NULL, NULL, dummy);
     }
 
-    IGroup *lookup(const char *logicalgroupname, StringBuffer &dir)
+    IGroup *lookup(const char *logicalgroupname, StringBuffer &dir, GroupType &groupType)
     {
-        return dolookup(logicalgroupname,NULL,&dir);
+        return dolookup(logicalgroupname, NULL, &dir, groupType);
     }
 
     INamedGroupIterator *getIterator()
@@ -6346,7 +6384,7 @@ public:
         lname.append(name);
     }
     
-    void add(const char *logicalgroupname,IGroup *group,bool cluster,const char *dir)
+    void add(const char *logicalgroupname, IGroup *group, bool cluster, const char *dir, GroupType groupType)
     {
         StringBuffer name(logicalgroupname);
         name.toLowerCase();
@@ -6360,13 +6398,13 @@ public:
             CriticalBlock block(cachesect);
             cache.kill();
             if (group)
-                cache.append(*new CNamedGroupCacheEntry(group, name.str(), dir));
+                cache.append(*new CNamedGroupCacheEntry(group, name.str(), dir, groupType));
         }
     }
 
     void remove(const char *logicalgroupname)
     {
-        add(logicalgroupname,NULL,false,NULL);
+        add(logicalgroupname, NULL, false, NULL, grp_unknown);
     }
 
     bool find(IGroup *grp, StringBuffer &gname, bool add)
@@ -6438,7 +6476,8 @@ public:
     }
 
 private:
-    bool getRemoteGroup(const INode *foreigndali, const char *gname, unsigned foreigndalitimeout, StringAttr &groupdir, SocketEndpointArray &epa)
+    bool getRemoteGroup(const INode *foreigndali, const char *gname, unsigned foreigndalitimeout,
+                           StringAttr &groupdir, GroupType &type, SocketEndpointArray &epa)
     {
         StringBuffer lcname(gname);
         gname = lcname.trim().toLowerCase().str();
@@ -6465,6 +6504,7 @@ private:
         Owned<IPropertyTree> pt = createPTree(mb);
         Owned<IPropertyTreeIterator> pe = pt->getElements("Node");
         groupdir.set(pt->queryProp("@dir"));
+        type = translateGroupType(pt->queryProp("@kind"));
         ForEach(*pe) {
             SocketEndpoint ep(pe->query().queryProp("@ip"));
             epa.append(ep);
@@ -6486,7 +6526,8 @@ bool CNamedGroupIterator::match()
             const char *name = pe->query().queryProp("@name");
             if (!name||!*name)
                 return false;
-            Owned<IGroup> lgrp = groupStore->dolookup(name,conn,NULL);
+            GroupType dummy;
+            Owned<IGroup> lgrp = groupStore->dolookup(name, conn, NULL, dummy);
             if (lgrp) {
                 if (exactmatch)
                     return lgrp->equals(matchgroup);
@@ -7740,7 +7781,6 @@ class CInitGroups
         grp->setProp("@name", name);
     }
 
-    enum GroupType { grp_thor, grp_thorspares, grp_roxie, grp_roxiefarm, grp_hthor };
     IGroup *getGroupFromCluster(GroupType groupType, IPropertyTree &cluster)
     {
         SocketEndpointArray eps;
@@ -7787,24 +7827,20 @@ class CInitGroups
                 {
                     Owned<IPropertyTreeIterator> channels;
                     channels.setown(node.getElements("RoxieChannel"));
-                    unsigned j = 0;
-                    unsigned mindrive = (unsigned)-1;
+                    unsigned thisNodePrimaryChannel = 0;
                     ForEach(*channels) {
-                        unsigned k = channels->query().getPropInt("@number");
-                        const char * dir = channels->query().queryProp("@dataDirectory");
-                        unsigned d = dir?getPathDrive(dir):0;
-                        if (d<mindrive) {
-                            j = k;
-                            mindrive = d;
-                        }
+                        unsigned channel = channels->query().getPropInt("@number");
+                        unsigned level = channels->query().getPropInt("@level", 0);
+                        if (level == 0)  // level 0 means primary copy
+                            thisNodePrimaryChannel = channel;
                     }
-                    if (j==0) {
+                    if (thisNodePrimaryChannel==0) {
                         ERRLOG("Cannot construct roxie cluster %s, no channel for node",cluster.queryProp("@name"));
                         return NULL;
                     }
-                    while (eps.ordinality()<j)
+                    while (eps.ordinality()<thisNodePrimaryChannel)
                         eps.append(nullep);
-                    eps.item(j-1) = ep;
+                    eps.item(thisNodePrimaryChannel-1) = ep;
                     break;
                 }
                 case grp_thor:
@@ -7914,13 +7950,9 @@ class CInitGroups
                 realCluster = false;
                 break;
             case grp_roxie:
-                defDir = cluster.queryProp("@slaveDataDir");
-                if (!defDir||!*defDir)
-                    defDir = cluster.queryProp("@baseDataDir");
                 gname.append(cluster.queryProp("@name"));
                 break;
             case grp_roxiefarm:
-                defDir = cluster.queryProp("@dataDirectory");
                 break;
             default:
                 throwUnexpected();

+ 5 - 4
dali/base/dadfs.hpp

@@ -573,6 +573,8 @@ extern da_decl IDistributedFileDirectory &queryDistributedFileDirectory();
 
 // ==GROUP STORE=================================================================================================
 
+enum GroupType { grp_thor, grp_thorspares, grp_roxie, grp_roxiefarm, grp_hthor, grp_unknown };
+
 interface INamedGroupIterator: extends IInterface
 {
     virtual bool first() = 0;
@@ -585,16 +587,15 @@ interface INamedGroupIterator: extends IInterface
 
 interface INamedGroupStore: implements IGroupResolver
 {
-    
     virtual IGroup *lookup(const char *logicalgroupname) = 0;
     virtual INamedGroupIterator *getIterator() = 0;
-    virtual INamedGroupIterator *getIterator(IGroup *match,bool exact=false) = 0;
-    virtual void add(const char *logicalgroupname,IGroup *group,bool cluster=false, const char *dir=NULL) = 0;
+    virtual INamedGroupIterator *getIterator(IGroup *match, bool exact=false) = 0;
+    virtual void add(const char *logicalgroupname,IGroup *group, bool cluster=false, const char *dir=NULL, GroupType groupType = grp_unknown) = 0;
     virtual void remove(const char *logicalgroupname) = 0;
     virtual bool find(IGroup *grp, StringBuffer &lname, bool add=false) = 0;
     virtual void addUnique(IGroup *group,StringBuffer &lname,const char *dir=NULL) = 0;
     virtual void swapNode(const IpAddress &from, const IpAddress &to) = 0;
-    virtual IGroup *lookup(const char *logicalgroupname, StringBuffer &dir) = 0;
+    virtual IGroup *lookup(const char *logicalgroupname, StringBuffer &dir, GroupType &groupType) = 0;
     virtual unsigned setDefaultTimeout(unsigned timems) = 0;                                    // sets default timeout for SDS connections and locking                                                                                         // returns previous value
 
 };

+ 35 - 53
dali/base/dafdesc.cpp

@@ -212,7 +212,7 @@ void ClusterPartDiskMapSpec::fromProp(IPropertyTree *tree)
     // if directory is specified then must match default base to be default replicated
     StringBuffer dir;
     if (tree&&tree->getProp("@directory",dir)) {
-        const char * base = queryBaseDirectory(false,SepCharBaseOs(getPathSepChar(dir.str())));
+        const char * base = queryBaseDirectory(0, SepCharBaseOs(getPathSepChar(dir.str())));
         size32_t l = strlen(base);
         if ((memcmp(base,dir.str(),l)!=0)||((l!=dir.length())&&!isPathSepChar(dir.charAt(l))))
             defrep = 0;
@@ -525,7 +525,7 @@ public:
     void getBaseDir(StringBuffer &basedir,DFD_OS os)
     {
         if (mspec.defaultBaseDir.isEmpty())  // assume thor default
-            basedir.append(queryBaseDirectory(false,os));
+            basedir.append(queryBaseDirectory(0, os));
         else
             basedir.append(mspec.defaultBaseDir);
     }
@@ -533,7 +533,7 @@ public:
     void getReplicateDir(StringBuffer &basedir,DFD_OS os)
     {
         if (mspec.defaultReplicateDir.isEmpty())  // assume thor default
-            basedir.append(queryBaseDirectory(true,os));
+            basedir.append(queryBaseDirectory(1, os));
         else
             basedir.append(mspec.defaultReplicateDir);
     }
@@ -1523,7 +1523,7 @@ public:
             if (!sc)
                 sc = getPathSepChar(dirname);
             StringBuffer tmp;
-            tmp.append(queryBaseDirectory(false,SepCharBaseOs(sc))).append(sc).append(s);
+            tmp.append(queryBaseDirectory(0, SepCharBaseOs(sc))).append(sc).append(s);
             directory.set(tmp.str());
         }
         else
@@ -2179,11 +2179,8 @@ inline bool validFNameChar(char c)
     return (c>=32 && c<127 && !strchr(invalids, c));
 }
 
-static StringAttr winbasedir;
-static StringAttr winreplicatedir;
-static StringAttr unixbasedir;
-static StringAttr unixreplicatedir;
-
+static StringAttr winbasedirs[MAX_REPLICATION_LEVELS];
+static StringAttr unixbasedirs[MAX_REPLICATION_LEVELS];
 
 static StringAttr defaultpartmask("$L$._$P$_of_$N$");
 
@@ -2197,38 +2194,37 @@ void loadDefaultBases()
     ldbDone = true;
     // assumed default first thor
     // first set 
-    if (winreplicatedir.isEmpty())
-        winreplicatedir.set("d:\\thordata");
-    if (winbasedir.isEmpty())
-        winbasedir.set("c:\\thordata");
+    if (winbasedirs[1].isEmpty())
+        winbasedirs[1].set("d:\\thordata");
+    if (winbasedirs[0].isEmpty())
+        winbasedirs[0].set("c:\\thordata");
     SessionId mysessid = myProcessSession();
-    if (mysessid&&(unixreplicatedir.isEmpty()||unixbasedir.isEmpty())) {
+    if (mysessid && (unixbasedirs[1].isEmpty() || unixbasedirs[0].isEmpty())) {
         Owned<IRemoteConnection> conn = querySDS().connect("/Environment/Software/Directories", mysessid, RTM_LOCK_READ, SDS_CONNECT_TIMEOUT);
         if (conn) {
             IPropertyTree* dirs = conn->queryRoot();
             StringBuffer dirout;
-            if (unixreplicatedir.isEmpty())
+            if (unixbasedirs[1].isEmpty())
                 if (getConfigurationDirectory(dirs,"mirror", "thor",
                     "mythor",   // NB this is dummy value should really get 1st thor (but actually hopefully not used anyway)
                     dirout))
-                    unixreplicatedir.set(dirout.str());
-            if (unixbasedir.isEmpty())
+                    unixbasedirs[1].set(dirout.str());
+            if (unixbasedirs[0].isEmpty())
                 if (getConfigurationDirectory(dirs,"data", "thor",
                     "mythor",   // NB this is dummy value should really get 1st thor (but actually hopefully not used anyway)
                     dirout.clear()))
-                    unixbasedir.set(dirout.str());
+                    unixbasedirs[0].set(dirout.str());
 
         }
     }
-    if (unixreplicatedir.isEmpty())
-        unixreplicatedir.set("/d$/thordata");
-    if (unixbasedir.isEmpty())
-        unixbasedir.set("/c$/thordata");
-
+    if (unixbasedirs[0].isEmpty())
+        unixbasedirs[0].set("/var/lib/HPCCSystems/hpcc-data/thor");
+    if (unixbasedirs[1].isEmpty())
+        unixbasedirs[1].set("/var/lib/HPCCSystems/hpcc-mirror/thor");
 }
 
 
-const char *queryBaseDirectory(bool replicate,DFD_OS os)
+const char *queryBaseDirectory(unsigned replicateLevel, DFD_OS os)
 {
     if (os==DFD_OSdefault)
 #ifdef _WIN32
@@ -2236,17 +2232,19 @@ const char *queryBaseDirectory(bool replicate,DFD_OS os)
 #else
         os = DFD_OSunix;
 #endif
+    assertex(replicateLevel < MAX_REPLICATION_LEVELS);
     loadDefaultBases();
-    switch (os) {
+    switch (os)
+    {
     case DFD_OSwindows:
-        return replicate?winreplicatedir.get():winbasedir.get();
+        return winbasedirs[replicateLevel];
     case DFD_OSunix:
-        return replicate?unixreplicatedir.get():unixbasedir.get();
+        return unixbasedirs[replicateLevel];
     }
     return NULL;
 }
 
-void setBaseDirectory(const char * dir,bool replicate,DFD_OS os)
+void setBaseDirectory(const char * dir, unsigned replicateLevel, DFD_OS os)
 {
     // 2 possibilities
     // either its an absolute path
@@ -2257,35 +2255,19 @@ void setBaseDirectory(const char * dir,bool replicate,DFD_OS os)
 #else
         os = DFD_OSunix;
 #endif
+    assertex(replicateLevel < MAX_REPLICATION_LEVELS);
     StringBuffer out;
-    if (!dir||!*dir||!isAbsolutePath(dir)) {
-        if (os==DFD_OSwindows) {
-            if (replicate)
-                out.append("d:\\thordata");
-            else
-                out.append("c:\\thordata");
-        }
-        else if (replicate)
-            out.append("/d$/thordata");
-        else
-            out.append("/c$/thordata");
-        dir = out.str();
-    }
+    if (!dir||!*dir||!isAbsolutePath(dir))
+        throw MakeStringException(-1,"setBaseDirectory(%s) requires an absolute path",dir ? dir : "null");
     size32_t l = strlen(dir);
     if ((l>3)&&(isPathSepChar(dir[l-1])))
         l--;
     switch (os) {
     case DFD_OSwindows:
-        if (replicate)
-            winreplicatedir.set(dir,l);
-        else
-            winbasedir.set(dir,l);
+        winbasedirs[replicateLevel].set(dir,l);
         break;
     case DFD_OSunix:
-        if (replicate)
-            unixreplicatedir.set(dir,l);
-        else
-            unixbasedir.set(dir,l);
+        unixbasedirs[replicateLevel].set(dir,l);
         break;
     }
 }
@@ -2404,7 +2386,7 @@ inline const char *skipRoot(const char *lname)
 
 
 
-StringBuffer &makePhysicalPartName(const char *lname, unsigned partno, unsigned partmax, StringBuffer &result, bool replicate, DFD_OS os,const char *diroverride)
+StringBuffer &makePhysicalPartName(const char *lname, unsigned partno, unsigned partmax, StringBuffer &result, unsigned replicateLevel, DFD_OS os,const char *diroverride)
 {
     assertex(lname);
     if (strstr(lname,"::>")) { // probably query
@@ -2439,7 +2421,7 @@ StringBuffer &makePhysicalPartName(const char *lname, unsigned partno, unsigned
         result.append(diroverride);
     }
     else
-        result.append(queryBaseDirectory(replicate,os));
+        result.append(queryBaseDirectory(replicateLevel, os));
 
     size32_t l = result.length();
     if ((l>3)&&(result.charAt(l-1)!=OsSepChar(os))) {
@@ -2517,7 +2499,7 @@ bool setReplicateDir(const char *dir,StringBuffer &out,bool isrep,const char *ba
     if (!sep)
         return false;
     DFD_OS os = SepCharBaseOs(*sep);
-    const char *d = baseDir?baseDir:queryBaseDirectory(!isrep,os);
+    const char *d = baseDir?baseDir:queryBaseDirectory(isrep ? 0 : 1,os);
     if (!d)
         return false;
     unsigned match = 0;
@@ -2528,7 +2510,7 @@ bool setReplicateDir(const char *dir,StringBuffer &out,bool isrep,const char *ba
             match = i;
             count++;
         }
-    const char *r = repDir?repDir:queryBaseDirectory(isrep,os);
+    const char *r = repDir?repDir:queryBaseDirectory(isrep ? 1 : 0,os);
     if (d[i]==0) {
         if ((dir[i]==0)||isPathSepChar(dir[i])) {
             out.append(r).append(dir+i);

+ 7 - 5
dali/base/dafdesc.hpp

@@ -34,6 +34,8 @@ interface IReplicatedFile;
 
 #define SUPPORTS_MULTI_CLUSTERS  // always now set
 
+#define MAX_REPLICATION_LEVELS 4
+
 enum DFD_OS
 {
     DFD_OSdefault,
@@ -290,13 +292,13 @@ void getClusterInfo(IPropertyTree &pt, IGroupResolver *resolver, unsigned flags,
 
 
 
-// default logical to physcal filename routined
+// default logical to physical filename routines
 extern da_decl StringBuffer &makePhysicalPartName(
                                 const char *lname,                  // logical name
                                 unsigned partno,                    // part number (1..)
                                 unsigned partmax,                   // number of parts (1..)
                                 StringBuffer &result,               // result filename (or directory name if part 0)
-                                bool replicate = false,             // uses replication directory
+                                unsigned replicateLevel = 0,       // uses replication directory
                                 DFD_OS os=DFD_OSdefault,            // os must be specified if no dir specified
                                 const char *diroverride=NULL);      // override default directory
 extern da_decl StringBuffer &makeSinglePhysicalPartName(const char *lname, // single part file
@@ -307,12 +309,12 @@ extern da_decl StringBuffer &makeSinglePhysicalPartName(const char *lname, // si
                                                         );    
 
 // set/get defaults
-extern da_decl const char *queryBaseDirectory(bool replicatedir=false,DFD_OS os=DFD_OSdefault);
-extern da_decl void setBaseDirectory(const char * dir,bool replicatedir=false,DFD_OS os=DFD_OSdefault);
+extern da_decl const char *queryBaseDirectory(unsigned replicateLevel=0, DFD_OS os=DFD_OSdefault);
+extern da_decl void setBaseDirectory(const char * dir, unsigned replicateLevel=0, DFD_OS os=DFD_OSdefault);
 extern da_decl const char *queryPartMask();
 extern da_decl StringBuffer &getPartMask(StringBuffer &ret,const char *lname=NULL,unsigned partmax=0);
 extern da_decl void setPartMask(const char * mask);
-extern da_decl bool setReplicateDir(const char *name,StringBuffer &out, bool isrep=true,const char *baseDir=NULL,const char *repDir=NULL); // changes direcctory of name passed to backup directory
+extern da_decl bool setReplicateDir(const char *name,StringBuffer &out, bool isrep=true,const char *baseDir=NULL,const char *repDir=NULL); // changes directory of name passed to backup directory
 
 extern da_decl IFileDescriptor *createFileDescriptor();
 extern da_decl IFileDescriptor *createFileDescriptor(IPropertyTree *attr);      // ownership of attr tree is taken

+ 1 - 1
dali/base/dautils.cpp

@@ -976,7 +976,7 @@ bool CDfsLogicalFileName::setFromMask(const char *fname,const char *rootdir)
         return false;
     // first remove base dir from fname if present
     DFD_OS os = SepCharBaseOs(getPathSepChar(fname));
-    const char *dir = (rootdir&&*rootdir)?rootdir:queryBaseDirectory(false,os);
+    const char *dir = (rootdir&&*rootdir)?rootdir:queryBaseDirectory(0, os);
     // ignore drive if present
     if (os==DFD_OSwindows) {
         if (dir[1]==':')

+ 2 - 1
dali/daunittest/dautdfs.cpp

@@ -305,7 +305,8 @@ protected:
         mapping.setRepeatedCopies(7,false);
         fdesc->addCluster(grp,mapping);
         StringBuffer dir2;
-        grp.setown(dfsgroup->lookup(DFSUTGROUP "7b", dir2));
+        GroupType groupType;
+        grp.setown(dfsgroup->lookup(DFSUTGROUP "7b", dir2, groupType));
         ClusterPartDiskMapSpec mapping2;
         mapping2.setDefaultBaseDir(dir2);
         mapping2.setRepeatedCopies(7,true);

+ 2 - 1
dali/dfu/dfurun.cpp

@@ -345,7 +345,8 @@ class CDFUengine: public CInterface, implements IDFUengine
         if (!cluster||!*cluster)
             return;
         StringBuffer dir;
-        Owned<IGroup> grp = queryNamedGroupStore().lookup(cluster,dir);
+        GroupType groupType;
+        Owned<IGroup> grp = queryNamedGroupStore().lookup(cluster, dir, groupType);
         if (!grp) {
             throw MakeStringException(-1,"setFileRepeatOptions cluster %s not found",cluster);
             return;

+ 4 - 2
dali/dfu/dfuutil.cpp

@@ -454,6 +454,7 @@ class CFileCloner
         {
             StringBuffer s;
             dstfdesc->queryProperties().setProp("@cloneFrom", srcdali->endpoint().getUrlStr(s).str());
+            dstfdesc->queryProperties().setProp("@cloneFromDir", srcfdesc->queryDefaultDir());
             unsigned numClusters = srcfdesc->numClusters();
             for (unsigned clusterNum = 0; clusterNum < numClusters; clusterNum++)
             {
@@ -538,7 +539,8 @@ public:
             break;
         }
         StringBuffer defdir1;
-        grp1.setown(queryNamedGroupStore().lookup(_cluster1,defdir1));
+        GroupType groupType;
+        grp1.setown(queryNamedGroupStore().lookup(_cluster1, defdir1, groupType));
         if (!grp1)
             throw MakeStringException(-1,"Cannot find cluster %s",_cluster1);
         if (defdir1.length())
@@ -547,7 +549,7 @@ public:
             spec2 = spec1;
             cluster2.set(_cluster2);
             StringBuffer defdir2;
-            grp2.setown(queryNamedGroupStore().lookup(_cluster2,defdir2));
+            grp2.setown(queryNamedGroupStore().lookup(_cluster2, defdir2, groupType));
             if (!grp2)
                 throw MakeStringException(-1,"Cannot find cluster %s",_cluster2);
             spec2.setRepeatedCopies(CPDMSRP_lastRepeated,true); // only TLK on cluster2

+ 2 - 2
dali/dfuXRefLib/dfuxreflib.cpp

@@ -2677,8 +2677,8 @@ IPropertyTree *  runXRef(unsigned nclusters,const char **clusters,IXRefProgressC
     Owned<IGroup> group = queryNamedGroupStore().lookup(clusters[0]);
     if (group)
         islinux = queryOS(group->queryNode(0).endpoint())==MachineOsLinux;
-    dirs[0] = queryBaseDirectory(false,islinux?DFD_OSunix:DFD_OSwindows);
-    dirs[1] = queryBaseDirectory(true,islinux?DFD_OSunix:DFD_OSwindows);
+    dirs[0] = queryBaseDirectory(0,islinux?DFD_OSunix:DFD_OSwindows);
+    dirs[1] = queryBaseDirectory(1,islinux?DFD_OSunix:DFD_OSwindows);
     numdirs = 2;
     IPropertyTree *ret=NULL;
     try {

+ 5 - 4
dali/sasha/saxref.cpp

@@ -747,7 +747,8 @@ public:
         grpstr.toLowerCase();
         StringAttr grpname(grpstr.str());
         StringBuffer basedir;
-        grp.setown(queryNamedGroupStore().lookup(grpstr.str(),basedir));
+        GroupType groupType;
+        grp.setown(queryNamedGroupStore().lookup(grpstr.str(), basedir, groupType));
         if (!grp) {
             ERRLOG(LOGPFX "Cluster %s node group %s not found",clustname.get(),grpstr.str());
             return false;
@@ -804,9 +805,9 @@ public:
             if (getConfigurationDirectory(serverConfig->queryPropTree("Directories"),"mirror","thor",_clustname,repdir))
                 rdir = repdir.str();
             iswin = grp->ordinality()?(getDaliServixOs(grp->queryNode(0).endpoint())==DAFS_OSwindows):false;
-            setBaseDirectory(ddir,false,iswin?DFD_OSwindows:DFD_OSunix);
-            setBaseDirectory(rdir,true,iswin?DFD_OSwindows:DFD_OSunix);
-            rootdir.set(queryBaseDirectory(false,iswin?DFD_OSwindows:DFD_OSunix));
+            setBaseDirectory(ddir,0,iswin?DFD_OSwindows:DFD_OSunix);
+            setBaseDirectory(rdir,1,iswin?DFD_OSwindows:DFD_OSunix);
+            rootdir.set(queryBaseDirectory(0,iswin?DFD_OSwindows:DFD_OSunix));
         }
         else {
             rootdir.set(basedir);

+ 22 - 5
esp/services/ws_fs/ws_fsService.cpp

@@ -440,7 +440,8 @@ static void DeepAssign(IEspContext &context, IConstDFUWorkUnit *src, IEspDFUWork
     }
 }
 
-bool CFileSprayEx::ParseLogicalPath(const char * pLogicalPath, const char* cluster, StringBuffer &folder, StringBuffer &title, StringBuffer &defaultFolder, StringBuffer &defaultReplicateFolder)
+bool CFileSprayEx::ParseLogicalPath(const char * pLogicalPath, const char* cluster,
+                                    StringBuffer &folder, StringBuffer &title, StringBuffer &defaultFolder, StringBuffer &defaultReplicateFolder)
 {
     if(!pLogicalPath || !*pLogicalPath)
         return false;
@@ -454,7 +455,9 @@ bool CFileSprayEx::ParseLogicalPath(const char * pLogicalPath, const char* clust
 
     if(cluster != NULL && *cluster != '\0')
     {
-        Owned<IGroup> group = queryNamedGroupStore().lookup(cluster);
+        StringBuffer basedir;
+        GroupType groupType;
+        Owned<IGroup> group = queryNamedGroupStore().lookup(cluster, basedir, groupType);
         if (group) {
             switch (queryOS(group->queryNode(0).endpoint())) {
             case MachineOsW2K:
@@ -463,9 +466,23 @@ bool CFileSprayEx::ParseLogicalPath(const char * pLogicalPath, const char* clust
             case MachineOsLinux:
                 os = DFD_OSunix; break;
             }
-            if (directories.get()) {
-                getConfigurationDirectory(directories, "data", "thor", cluster, defaultFolder);
-                getConfigurationDirectory(directories, "mirror", "thor", cluster, defaultReplicateFolder);
+            if (directories.get())
+            {
+                switch (groupType)
+                {
+                case grp_roxie:
+                    getConfigurationDirectory(directories, "data", "roxie", cluster, defaultFolder);
+                    getConfigurationDirectory(directories, "data2", "roxie", cluster, defaultReplicateFolder);
+                    // MORE - should extend to systems with higher redundancy
+                    break;
+                case grp_hthor:
+                    getConfigurationDirectory(directories, "data", "hthor", cluster, defaultFolder);
+                    break;
+                case grp_thor:
+                default:
+                    getConfigurationDirectory(directories, "data", "thor", cluster, defaultFolder);
+                    getConfigurationDirectory(directories, "mirror", "thor", cluster, defaultReplicateFolder);
+                }
             }
         }
         else 

+ 10 - 33
initfiles/componentfiles/configxml/RoxieTopology.xsl

@@ -151,47 +151,24 @@
                 <xsl:element name="RoxieServerProcess">
                     <xsl:attribute name="netAddress"><xsl:value-of select="/Environment/Hardware/Computer[@name=$computer]/@netAddress"/></xsl:attribute>
                     <xsl:copy-of select="@port"/>
-                <xsl:if test="string(@dataDirectory)=''">
-                    <xsl:message terminate="yes">Data directory is not specified for Roxie server '<xsl:value-of select="@computer"/>'.</xsl:message>
-                </xsl:if>
-                <xsl:variable name="dataDir">
-                            <xsl:call-template name="makeAbsolutePath">
-                                <xsl:with-param name="path" select="@dataDirectory"/>
-                            </xsl:call-template>                        
-                </xsl:variable>
-                    <xsl:attribute name="baseDataDirectory">
-                        <xsl:value-of select="$dataDir"/>
-                    </xsl:attribute>
-                    <xsl:attribute name="dataDirectory">
-                        <xsl:value-of select="concat($dataDir, $pathSep, $roxieClusterNode/@name)"/>
-                    </xsl:attribute>
-                    <xsl:copy-of select="@*[name()!='netAddress' and name()!='dataDirectory' and name()!='computer' and name()!='name' and name()!='port']"/>
+                    <xsl:copy-of select="@*[name()!='netAddress' and name()!='computer' and name()!='name' and name()!='port']"/>
                 </xsl:element>
             </xsl:for-each>
             <xsl:for-each select="RoxieSlaveProcess">
-                <xsl:sort select="@dataDirectory"/>
+                <xsl:sort select="@level"/>
                 <xsl:sort select="@channel" data-type="number"/>
                 <xsl:element name="RoxieSlaveProcess">
                     <xsl:variable name="computer" select="@computer"/>
                     <xsl:attribute name="netAddress"><xsl:value-of select="/Environment/Hardware/Computer[@name=$computer]/@netAddress"/></xsl:attribute>
                     <xsl:attribute name="channel"><xsl:value-of select="@channel"/></xsl:attribute>
-                <xsl:if test="string(@dataDirectory)=''">
-                    <xsl:message terminate="yes">
-                        <xsl:text>Data directory is not specified for Roxie slave '</xsl:text>
-                        <xsl:value-of select="@computer"/>
-                        <xsl:text>' for channel </xsl:text>
-                        <xsl:value-of select="@channel"/>.</xsl:message>
-                </xsl:if>                           
-
-                                        <xsl:variable name="dataDir">
-                            <xsl:call-template name="makeAbsolutePath">
-                            <xsl:with-param name="path" select="@dataDirectory"/>
-                        </xsl:call-template>                        
-                                        </xsl:variable>
-                    <xsl:attribute name="dataDirectory">
-                        <xsl:value-of select="concat($dataDir, $pathSep, $roxieClusterNode/@name)"/>
-                    </xsl:attribute>
-                                  
+                    <xsl:if test="string(@level)=''">
+                        <xsl:message terminate="yes">
+                            <xsl:text>Replication level is not specified for Roxie slave '</xsl:text>
+                            <xsl:value-of select="@computer"/>
+                            <xsl:text>' for channel </xsl:text>
+                            <xsl:value-of select="@channel"/>.</xsl:message>
+                    </xsl:if>
+                    <xsl:copy-of select="@*[name()!='netAddress' and name()!='computer' and name()!='name' and name()!='channel']"/>
                 </xsl:element>
             </xsl:for-each>
             <xsl:for-each select="RoxieMonitorProcess">

+ 7 - 2
initfiles/componentfiles/configxml/roxie.xsd.in

@@ -49,7 +49,6 @@
                 </xs:appinfo>
               </xs:annotation>
             </xs:attribute>
-            <xs:attribute name="dataDirectory" type="absolutePath" use="required"/>
             <xs:attribute name="numThreads" type="xs:nonNegativeInteger" use="optional" default="30">
               <xs:annotation>
                 <xs:appinfo>
@@ -105,7 +104,13 @@
                 </xs:appinfo>
               </xs:annotation>
             </xs:attribute>
-            <xs:attribute name="dataDirectory" type="absolutePath" use="required"/>
+            <xs:attribute name="level" type="xs:nonNegativeInteger" use="optional" default="0">
+              <xs:annotation>
+                <xs:appinfo>
+                  <tooltip>Replication level of this slave</tooltip>
+                </xs:appinfo>
+              </xs:annotation>
+            </xs:attribute>
           </xs:complexType>
         </xs:element>
         <xs:element name="ACL" maxOccurs="unbounded">

+ 7 - 6
initfiles/etc/DIR_NAME/environment.xml.in

@@ -363,6 +363,7 @@
    <Category dir="${EXEC_PREFIX}/lib/[NAME]/hpcc-data/[COMPONENT]" name="data"/>
    <Category dir="${EXEC_PREFIX}/lib/[NAME]/hpcc-data2/[COMPONENT]" name="data2"/>
    <Category dir="${EXEC_PREFIX}/lib/[NAME]/hpcc-data3/[COMPONENT]" name="data3"/>
+   <Category dir="${EXEC_PREFIX}/lib/[NAME]/hpcc-data4/[COMPONENT]" name="data4"/>
    <Category dir="${EXEC_PREFIX}/lib/[NAME]/hpcc-mirror/[COMPONENT]" name="mirror"/>
    <Category dir="${EXEC_PREFIX}/lib/[NAME]/queries/[INST]" name="query"/>
    <Category dir="${EXEC_PREFIX}/lock/[NAME]/[INST]" name="lock"/>
@@ -894,7 +895,7 @@
                 useRemoteResources="true"
                 useTreeCopy="false">
    <RoxieFarmProcess 
-                     dataDirectory="${RUNTIME_PATH}/hpcc-data/roxie"
+                     level="0"
                      listenQueue="200"
                      name="farm1"
                      numThreads="30"
@@ -903,7 +904,7 @@
     <RoxieServerProcess computer="localhost" name="farm1_s1"/>
    </RoxieFarmProcess>
    <RoxieFarmProcess
-                      dataDirectory="${RUNTIME_PATH}/hpcc-data/roxie"
+                      level="0"
                       listenQueue="200"
                       name="farm2"
                       numThreads="30"
@@ -913,7 +914,7 @@
     </RoxieFarmProcess>
     <RoxieServerProcess
                        computer="localhost"
-                       dataDirectory="${RUNTIME_PATH}/hpcc-data/roxie"
+                       level="0"
                        listenQueue="200"
                        name="farm1_s1"
                        netAddress="."
@@ -922,7 +923,7 @@
                        requestArrayThreads="5"/>
    <RoxieServerProcess 
                        computer="localhost"
-                       dataDirectory="${RUNTIME_PATH}/hpcc-data/roxie"
+                       level="0"
                        listenQueue="200"
                        name="farm2_s1"
                        netAddress="."
@@ -930,11 +931,11 @@
                        port="0"
                        requestArrayThreads="5"/>
    <RoxieSlave computer="localhost" name="s1">
-    <RoxieChannel dataDirectory="${RUNTIME_PATH}/hpcc-data/roxie" number="1"/>
+    <RoxieChannel level="0" number="1"/>
    </RoxieSlave>
    <RoxieSlaveProcess channel="1"
                       computer="localhost"
-                      dataDirectory="${RUNTIME_PATH}/hpcc-data/roxie"
+                      level="0"
                       name="s1"
                       netAddress="."/>
   </RoxieCluster>

+ 1 - 0
roxie/ccd/ccd.hpp

@@ -64,6 +64,7 @@ extern unsigned channelCount;                   // number of channels this node
 extern unsigned subChannels[MAX_CLUSTER_SIZE];  // maps channel numbers to subChannels for this node
 extern bool suspendedChannels[MAX_CLUSTER_SIZE];// indicates suspended channels for this node
 extern unsigned numSlaves[MAX_CLUSTER_SIZE];    // number of slaves listening on this channel
+extern unsigned replicationLevel[MAX_CLUSTER_SIZE];  // Which copy of the data this channel uses on this slave
 
 extern unsigned myNodeIndex;
 #define OUTOFBAND_SEQUENCE    0x8000        // indicates an out-of-band reply

+ 25 - 8
roxie/ccd/ccddali.cpp

@@ -288,11 +288,7 @@ private:
         dstfdesc->setPartMask(dstpartmask.str());
         unsigned np = srcfdesc->numParts();
         dstfdesc->setNumParts(srcfdesc->numParts());
-        DFD_OS os = (getPathSepChar(srcfdesc->queryDefaultDir())=='\\')?DFD_OSwindows:DFD_OSunix;
-        StringBuffer dir;
-        StringBuffer dstdir;
-        makePhysicalPartName(dstlfn.get(),0,0,dstdir,false,os,NULL);
-        dstfdesc->setDefaultDir(dstdir.str());
+        dstfdesc->setDefaultDir(srcfdesc->queryProperties().queryProp("@cloneFromDir"));
 
         for (unsigned pn=0;pn<srcfdesc->numParts();pn++) {
             offset_t sz = srcfdesc->queryPart(pn)->queryProperties().getPropInt64("@size",-1);
@@ -309,18 +305,39 @@ private:
         {
             IPropertyTree &elem = groups->query();
             const char *groupName = elem.queryProp("@groupName");
+            StringBuffer dir;
             StringBuffer foreignGroup("foreign::");
             foreignGroup.append(cloneFrom).append("::").append(groupName);
-            Owned<IGroup> group = queryNamedGroupStore().lookup(foreignGroup);  // NOTE - this is cached by the named group store
+            GroupType groupType;
+            Owned<IGroup> group = queryNamedGroupStore().lookup(foreignGroup, dir, groupType);
             ClusterPartDiskMapSpec dmSpec;
             dmSpec.fromProp(&elem);
+            if (!dmSpec.defaultBaseDir.length())
+            {
+                if (dir.length())
+                {
+                    dmSpec.setDefaultBaseDir(dir);
+                }
+                else
+                {
+                    // Due to the really weird code in dadfs, this MUST be set to match the leading portion of cloneFromDir
+                    // in order to properly handle remote systems with different default directory locations
+                    StringBuffer tail;
+                    DFD_OS os = (getPathSepChar(srcfdesc->queryDefaultDir())=='\\')?DFD_OSwindows:DFD_OSunix;
+                    makePhysicalPartName(dstlfn.get(),0,0,tail,0,os,PATHSEPSTR);  // if lfn is a::b::c, tail will be /a/b/
+                    assertex(tail.length() > 1);
+                    tail.setLength(tail.length()-1);   // strip off the trailing /
+                    StringBuffer head(srcfdesc->queryProperties().queryProp("@cloneFromDir")); // Will end with /a/b
+                    assertex(streq(head.str() + head.length() - tail.length(), tail.str()));
+                    head.setLength(head.length() - tail.length()); // So strip off the end...
+                    dmSpec.setDefaultBaseDir(head.str());
+                }
+            }
             dstfdesc->addCluster(groupName, group, dmSpec);
         }
-
         return dstfdesc.getClear();
     }
 
-
 public:
 
     IMPLEMENT_IINTERFACE;

+ 38 - 15
roxie/ccd/ccdfile.cpp

@@ -619,6 +619,8 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
                     {
                         const char *remoteName = remoteLocationInfo.item(idx);
                         Owned<IFile> remote = createIFile(remoteName);
+                        if (traceLevel > 5)
+                            DBGLOG("checking remote location %s", remoteName);
                         if (fileUpToDate(remote, size, modified, crc, isCompressed)==FileIsValid)
                         {
                             if (miscDebugTraceLevel > 10)
@@ -1042,7 +1044,7 @@ public:
     }
 
     virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType,
-                                     IPartDescriptor *pdesc, unsigned numParts,
+                                     IPartDescriptor *pdesc, unsigned numParts, unsigned replicationLevel,
                                      const StringArray &deployedLocationInfo, bool startFileCopy)
     {
         IPropertyTree &partProps = pdesc->queryProperties();
@@ -1067,7 +1069,7 @@ public:
             dlfn.clearForeign();
         const char *logicalname = dlfn.get();
 
-        makePhysicalPartName(logicalname, partNo, numParts, localLocation, false, DFD_OSdefault, baseDataDirectory);  // MORE - if we get the dataDirectory we can pass it in and possibly reuse an existing file
+        makePhysicalPartName(logicalname, partNo, numParts, localLocation, replicationLevel, DFD_OSdefault);
 
         Owned<ILazyFileIO> ret;
         try
@@ -1278,13 +1280,13 @@ public:
     }
 };
 
-ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy)
+ILazyFileIO *createPhysicalFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy, unsigned channel)
 {
     StringArray remoteLocations;
     if (remotePDesc)
         appendRemoteLocations(remotePDesc, remoteLocations, false);
 
-    return queryFileCache().lookupFile(id, fileType, pdesc, numParts, remoteLocations, startCopy);
+    return queryFileCache().lookupFile(id, fileType, pdesc, numParts, replicationLevel[channel], remoteLocations, startCopy);
 }
 
 //====================================================================================================
@@ -1611,7 +1613,8 @@ protected:
     mutable CriticalSection lock;
     mutable Owned<IFilePartMap> fileMap;
     mutable PerChannelCacheOf<IInMemoryIndexManager> indexMap;
-//  MORE - cache the others, using per-channel cache support. 
+    mutable PerChannelCacheOf<IFileIOArray> ioArrayMap;
+    mutable PerChannelCacheOf<IKeyArray> keyArrayMap;
 
 public:
     IMPLEMENT_IINTERFACE;
@@ -1775,7 +1778,18 @@ public:
         else
             mb.append(false);
     }
-    virtual IFileIOArray *getIFileIOArray(bool isOpt, unsigned channel) const 
+    virtual IFileIOArray *getIFileIOArray(bool isOpt, unsigned channel) const
+    {
+        CriticalBlock b(lock);
+        IFileIOArray *ret = ioArrayMap.get(channel);
+        if (!ret)
+        {
+            ret = createIFileIOArray(isOpt, channel);
+            ioArrayMap.set(ret, channel);
+        }
+        return LINK(ret);
+    }
+    IFileIOArray *createIFileIOArray(bool isOpt, unsigned channel) const
     {
         Owned<CFileIOArray> f = new CFileIOArray();
         f->addFile(NULL, 0);
@@ -1795,7 +1809,7 @@ public:
                             IPartDescriptor *pdesc = fdesc->queryPart(i-1);
                             assertex(pdesc);
                             IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, i-1);
-                            Owned<ILazyFileIO> file = createDynamicFile(subNames.item(0), pdesc, remotePDesc, ROXIE_FILE, numParts, cached != NULL);
+                            Owned<ILazyFileIO> file = createPhysicalFile(subNames.item(0), pdesc, remotePDesc, ROXIE_FILE, numParts, cached != NULL, channel);
                             IPropertyTree &partProps = pdesc->queryProperties();
                             f->addFile(file.getClear(), partProps.getPropInt64("@offset"));
                         }
@@ -1820,7 +1834,6 @@ public:
     }
     virtual IKeyArray *getKeyArray(IDefRecordMeta *activityMeta, TranslatorArray *translators, bool isOpt, unsigned channel, bool allowFieldTranslation) const
     {
-        Owned<IKeyArray> ret = ::createKeyArray();
         unsigned maxParts = 0;
         ForEachItemIn(subFile, subFiles)
         {
@@ -1851,7 +1864,18 @@ public:
             else
                 translators->append(NULL);
         }
-
+        CriticalBlock b(lock);
+        IKeyArray *ret = keyArrayMap.get(channel);
+        if (!ret)
+        {
+            ret = createKeyArray(isOpt, channel, maxParts);
+            keyArrayMap.set(ret, channel);
+        }
+        return LINK(ret);
+    }
+    IKeyArray *createKeyArray(bool isOpt, unsigned channel, unsigned maxParts) const
+    {
+        Owned<IKeyArray> ret = ::createKeyArray();
         if (channel)
         {
             ret->addKey(NULL);
@@ -1873,7 +1897,7 @@ public:
                             if (pdesc)
                             {
                                 IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, partNo-1);
-                                part.setown(createDynamicFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, fdesc->numParts(), cached != NULL));
+                                part.setown(createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, fdesc->numParts(), cached != NULL, channel));
                                 pdesc->getCrc(crc);
                             }
                         }
@@ -1912,7 +1936,7 @@ public:
                     assertex(numParts > 0);
                     IPartDescriptor *pdesc = fdesc->queryPart(numParts - 1);
                     IPartDescriptor *remotePDesc = queryMatchingRemotePart(pdesc, remoteFDesc, numParts - 1);
-                    Owned<ILazyFileIO> keyFile = createDynamicFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, numParts, cached != NULL);
+                    Owned<ILazyFileIO> keyFile = createPhysicalFile(subNames.item(idx), pdesc, remotePDesc, ROXIE_KEY, numParts, cached != NULL, channel);
                     unsigned crc = 0;
                     pdesc->getCrc(crc);
                     StringBuffer pname;
@@ -1941,17 +1965,16 @@ public:
 
     virtual IInMemoryIndexManager *getIndexManager(bool isOpt, unsigned channel, IFileIOArray *files, IRecordSize *recs, bool preload, int numKeys) const 
     {
-        // MORE - if we want to share this then we need to pass in channel too and cache per channel. Should combine the get() and the load().
-        // MORE - I don't know that it makes sense to pass isOpt in to these calls rather than just when creating the IResolvedFile... think about it.
+        // MORE - I don't know that it makes sense to pass isOpt in to these calls
+        // Failures to resolve will not be cached, only successes.
         // MORE - preload and numkeys are all messed up - can't be specified per query have to be per file
-//      return createInMemoryIndexManager(isOpt, lfn);
 
         CriticalBlock b(lock);
         IInMemoryIndexManager *ret = indexMap.get(channel);
         if (!ret)
         {
             ret = createInMemoryIndexManager(isOpt, lfn);
-            ret->load(files, recs, preload, numKeys);
+            ret->load(files, recs, preload, numKeys);   // note - files (passed in) are channel specific
             indexMap.set(ret, channel);
         }
         return LINK(ret);

+ 2 - 3
roxie/ccd/ccdfile.hpp

@@ -54,11 +54,10 @@ interface ILazyFileIO : extends IFileIO
     virtual void removeCache(const IRoxieFileCache *) = 0;
 };
 
-extern ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, RoxieFileType fileType, int numParts, SocketEndpoint &cloneFrom);
-
 interface IRoxieFileCache : extends IInterface
 {
-    virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType, IPartDescriptor *pdesc, unsigned numParts, const StringArray &deployedLocationInfo, bool startFileCopy) = 0;
+    virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType, IPartDescriptor *pdesc, unsigned numParts,
+                                      unsigned replicationLevel, const StringArray &deployedLocationInfo, bool startFileCopy) = 0;
     virtual RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, unsigned crc, bool isCompressed) = 0;
     virtual int numFilesToCopy() = 0;
     virtual void closeExpired(bool remote) = 0;

+ 25 - 15
roxie/ccd/ccdmain.cpp

@@ -282,7 +282,7 @@ void getAccessList(const char *aclName, IPropertyTree *topology, IPropertyTree *
     serverInfo->removeProp(xpath);
 }
 
-void addServerChannel(const char *dataDirectory, unsigned port, unsigned threads, const char *access, IPropertyTree *topology)
+void addServerChannel(unsigned port, unsigned threads, const char *access, IPropertyTree *topology)
 {
     if (!ownEP.port)
         ownEP.set(port, queryHostIP());
@@ -290,13 +290,10 @@ void addServerChannel(const char *dataDirectory, unsigned port, unsigned threads
     ForEach(*servers)
     {
         IPropertyTree &f = servers->query();
-        if (strcmp(f.queryProp("@dataDirectory"), dataDirectory) != 0)
-            throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - Roxie server dataDirectory respecified");
         if (f.getPropInt("@port", 0) == port)
             throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - Roxie server port repeated");
     }
     IPropertyTree *ci = createPTree("RoxieServerProcess");
-    ci->setProp("@dataDirectory", dataDirectory);
     ci->setPropInt("@port", port);
     ci->setPropInt("@numThreads", threads);
     if (access && *access)
@@ -311,7 +308,7 @@ bool ipMatch(IpAddress &ip)
     return ip.isLocal();
 }
 
-void addSlaveChannel(unsigned channel, const char *dataDirectory, bool suspended)
+void addSlaveChannel(unsigned channel, unsigned level, bool suspended)
 {
     StringBuffer xpath;
     xpath.appendf("RoxieSlaveProcess[@channel=\"%d\"]", channel);
@@ -322,16 +319,16 @@ void addSlaveChannel(unsigned channel, const char *dataDirectory, bool suspended
     ci->setPropBool("@suspended", suspended);
     ci->setPropInt("@subChannel", numSlaves[channel]);
     suspendedChannels[channel] = suspended;
-    ci->setProp("@dataDirectory", dataDirectory);
+    replicationLevel[channel] = level;
     ccdChannels->addPropTree("RoxieSlaveProcess", ci);
 }
 
-void addChannel(unsigned channel, const char *dataDirectory, bool isMe, bool suspended, IpAddress& slaveIp)
+void addChannel(unsigned channel, unsigned level, bool isMe, bool suspended, IpAddress& slaveIp)
 {
     numSlaves[channel]++;
     if (isMe && channel > 0 && channel <= numChannels)
     {
-        addSlaveChannel(channel, dataDirectory, suspended);
+        addSlaveChannel(channel, level, suspended);
     }
     if (!localSlave)
     {
@@ -519,8 +516,8 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             }
             topology=createPTreeFromXMLString(
                 "<RoxieTopology numChannels='1' localSlave='1'>"
-                 "<RoxieServerProcess dataDirectory='.' netAddress='.'/>"
-                 "<RoxieSlaveProcess dataDirectory='.' netAddress='.' channel='1'/>"
+                 "<RoxieServerProcess netAddress='.'/>"
+                 "<RoxieSlaveProcess netAddress='.' channel='1'/>"
                 "</RoxieTopology>"
                 );
             int port = globals->getPropInt("--port", 9876);
@@ -558,7 +555,20 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
 
         IPropertyTree *directoryTree = topology->queryPropTree("Directories");
         if (directoryTree)
-            getConfigurationDirectory(directoryTree,"query","roxie", roxieName, queryDirectory);
+        {
+            getConfigurationDirectory(directoryTree, "query", "roxie", roxieName, queryDirectory);
+            for (unsigned replicationLevel = 0; replicationLevel < MAX_REPLICATION_LEVELS; replicationLevel++)
+            {
+                StringBuffer dataDir;
+                StringBuffer dirId("data");
+                if (replicationLevel)
+                    dirId.append(replicationLevel+1);
+                if (getConfigurationDirectory(directoryTree, dirId, "roxie", roxieName, dataDir))
+                    setBaseDirectory(dataDir, replicationLevel, DFD_OSdefault);
+            }
+        }
+        else
+            setBaseDirectory(".", 0, DFD_OSdefault);
 
         //Logging stuff
         if (globals->getPropBool("--stdlog", traceLevel != 0) || topology->getPropBool("@forceStdLog", false))
@@ -856,7 +866,7 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
                         baseDataDirectory.append(roxieServer.queryProp("@baseDataDirectory"));
                     unsigned numThreads = roxieServer.getPropInt("@numThreads", numServerThreads);
                     const char *aclName = roxieServer.queryProp("@aclName");
-                    addServerChannel(roxieServers->query().queryProp("@dataDirectory"), port, numThreads, aclName, topology);
+                    addServerChannel(port, numThreads, aclName, topology);
                     if (!myIPadded || (myHostNumber==-1))
                     {
                         myNodeIndex = nodeIndex;
@@ -923,17 +933,17 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
                 unsigned channel = slave.getPropInt("@channel", 0);
                 if (!channel)
                     channel = slave.getPropInt("@channels", 0); // legacy support
-                const char *dataDirectory = slave.queryProp("@dataDirectory");
+                unsigned replicationLevel = slave.getPropInt("@level", 0);
                 if (channel && channel <= numChannels)
                 {
                     if (isMe)
                         isCCD = true;
-                    if (!numSlaves[channel])
+                    if (!replicationLevel)
                     {
                         primaries[channel] = slaveIp;
                         slaveCount++;
                     }
-                    addChannel(channel, dataDirectory, isMe, suspended, slaveIp);
+                    addChannel(channel, replicationLevel, isMe, suspended, slaveIp);
                     if (isMe)
                         joinMulticastChannel(channel);
                 }

+ 1 - 0
roxie/ccd/ccdqueue.cpp

@@ -40,6 +40,7 @@ unsigned channels[MAX_CLUSTER_SIZE];
 unsigned channelCount;
 unsigned subChannels[MAX_CLUSTER_SIZE];
 unsigned numSlaves[MAX_CLUSTER_SIZE];
+unsigned replicationLevel[MAX_CLUSTER_SIZE];
 unsigned IBYTIDelays[MAX_CLUSTER_SIZE]; // MORE: this will cover only 2 slaves per channel, change to cover all. 
 
 SpinLock suspendCrit;