Browse Source

Merge pull request #2048 from rengolin/dfs-daregress

Daregress refactoring/cleanup

Reviewed-By: Jake Smith <jake.smith@lexisnexis.com>
Reviewed-By: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 13 years ago
parent
commit
0882bfa1b5
2 changed files with 372 additions and 282 deletions
  1. 19 21
      dali/base/dadfs.cpp
  2. 353 261
      dali/regress/daregress.cpp

+ 19 - 21
dali/base/dadfs.cpp

@@ -3903,8 +3903,7 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
             addFileLock(sub);
             if (lock())
                 return true;
-            else
-                unlock();
+            unlock();
             return false;
         }
         void run()
@@ -3964,8 +3963,7 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
                 addFileLock(sub);
             if (lock())
                 return true;
-            else
-                unlock();
+            unlock();
             return false;
         }
         void run()
@@ -4010,8 +4008,7 @@ class CDistributedSuperFile: public CDistributedFileBase<IDistributedSuperFile>
                 addFileLock(&file->querySubFile(i));
             if (lock())
                 return true;
-            else
-                unlock();
+            unlock();
             return false;
         }
         void run()
@@ -6543,7 +6540,7 @@ public:
             super.setown(new CDistributedSuperFile(parent, root, logicalname, user));
             created = true;
         }
-        transaction->addFile(super);
+        addFileLock(super);
     }
     virtual ~cCreateSuperFileAction() {}
     bool prepare()
@@ -6551,9 +6548,10 @@ public:
         // Attach the file to DFS, if wasn't there already
         if (created)
             parent->addEntry(logicalname,root,true,false);
-        // FIXME: This will introduce a window (until commit) that
-        // the file is accessible. Use super->attach/detach instead.
-        return true;
+        if (lock())
+            return true;
+        unlock();
+        return false;
     }
     void run()
     {
@@ -6581,7 +6579,16 @@ IDistributedSuperFile *CDistributedFileDirectory::createSuperFile(const char *_l
     logicalname.set(_logicalname);
     checkLogicalName(logicalname,user,true,true,false,"have a superfile with");
 
-    IDistributedSuperFile *sfile = lookupSuperFile(logicalname.get(), user, transaction, false, defaultTimeout);
+    // Create a local transaction that will be destroyed (but never touch the external transaction)
+    Linked<IDistributedFileTransaction> localtrans;
+    if (transaction) {
+        localtrans.set(transaction);
+    } else {
+        // TODO: Make it explicit in the API that a transaction is required
+        localtrans.setown(new CDistributedFileTransaction(user));
+    }
+
+    IDistributedSuperFile *sfile = localtrans->lookupSuperFile(logicalname.get());
     if (sfile) {
         if (ifdoesnotexist) {
             // Cache, since we're going to use it
@@ -6592,22 +6599,13 @@ IDistributedSuperFile *CDistributedFileDirectory::createSuperFile(const char *_l
             throw MakeStringException(-1,"createSuperFile: SuperFile %s already exists",logicalname.get());
     }
 
-    // Create a local transaction that will be destroyed (but never touch the external transaction)
-    Linked<IDistributedFileTransaction> localtrans;
-    if (transaction) {
-        localtrans.set(transaction);
-    } else {
-        // TODO: Make it explicit in the API that a transaction is required
-        localtrans.setown(new CDistributedFileTransaction(user));
-    }
-
     // action is owned by transaction (acquired on CDFAction's c-tor) so don't unlink or delete!
     cCreateSuperFileAction *action = new cCreateSuperFileAction(localtrans,this,user,_logicalname,_interleaved);
 
     localtrans->autoCommit();
 
     // Should have been persisted to the DFS by now
-    return lookupSuperFile(logicalname.get(), user, localtrans, false, defaultTimeout);
+    return localtrans->lookupSuperFile(logicalname.get());
 }
 
 static bool checkProtectAttr(const char *logicalname,IPropertyTree *froot,StringBuffer &reason)

+ 353 - 261
dali/regress/daregress.cpp

@@ -16,6 +16,12 @@
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 ############################################################################## */
 
+/*
+ * Dali Quick Regression Suite: Tests Dali functionality on a programmatic way.
+ *
+ * Add as much as possible here to avoid having to run the Hthor/Thor regressions
+ * all the time for Dali tests, since most of it can be tested quickly from here.
+ */
 
 #include "platform.h"
 #include "jlib.hpp"
@@ -38,6 +44,7 @@ static int errorcount;
 #define ERROR3(s,p1,p2,p3) printf("ERROR(%d): %s[%d]: " s "\n", ++errorcount, __FILE__, __LINE__, p1,p2,p3)
 #define ERROR4(s,p1,p2,p3,p4) printf("ERROR(%d): %s[%d]: " s "\n", ++errorcount, __FILE__, __LINE__, p1,p2,p3,p4)
 
+// ======================================================================= Support Functions / Classes
 
 static IRemoteConnection *Rconn;
 
@@ -73,7 +80,7 @@ static unsigned fn(unsigned n, unsigned m, unsigned seed, unsigned depth, IPrope
     }
     name[i] = 0;
     IPropertyTree *child = parent->queryPropTree(name);
-    if (!child) 
+    if (!child)
         child = parent->addPropTree(name, createPTree(name));
     return fn(fn(n,seed,seed*17+11,depth+1,child),fn(seed,m,seed*11+17,depth+1,child),seed*19+7,depth+1,child);
 }
@@ -130,17 +137,169 @@ static unsigned fn2(unsigned n, unsigned m, unsigned seed, unsigned depth, Strin
         parentname.append('/');
     parentname.append(name);
     IPropertyTree *child = parent->queryPropTree(name);
-    if (!child) 
+    if (!child)
         child = parent->addPropTree(name, createPTree(name));
     unsigned ret = fn2(fn2(n,seed,seed*17+11,depth+1,parentname),fn2(seed,m,seed*11+17,depth+1,parentname),seed*19+7,depth+1,parentname);
     parentname.setLength(l);
     return ret;
 }
 
+static __int64 subchangetotal;
+static unsigned subchangenum;
+static CriticalSection subchangesect;
 
-static void test1()
+class CCSub : public CInterface, implements ISDSConnectionSubscription, implements ISDSSubscription
+{
+    unsigned n;
+    unsigned &count;
+public:
+    IMPLEMENT_IINTERFACE;
+
+    CCSub(unsigned _n,unsigned &_count)
+        : count(_count)
+    {
+        n = _n;
+    }
+    virtual void notify()
+    {
+        CriticalBlock block(subchangesect);
+        subchangetotal += n;
+        subchangenum++;
+        count++;
+    }
+    virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
+    {
+        CriticalBlock block(subchangesect);
+        subchangetotal += n;
+        subchangenum++;
+        subchangetotal += (unsigned)flags;
+        subchangetotal += crc32(xpath,strlen(xpath),0);
+        if (valueLen) {
+            subchangetotal += crc32((const char *)valueData,valueLen,0);
+        }
+        count++;
+
+    }
+
+};
+
+class CChange : public Thread
+{
+    Owned<IRemoteConnection> conn;
+    Owned<CCSub> sub;
+    StringAttr path;
+    SubscriptionId id[10];
+    unsigned n;
+    unsigned count;
+
+public:
+    Semaphore stopsem;
+    CChange(unsigned _n)
+    {
+        n = _n;
+        StringBuffer s("/DAREGRESS/CONSUB");
+        s.append(n+1);
+        path.set(s.str());
+        conn.setown(querySDS().connect(path, myProcessSession(), RTM_CREATE|RTM_DELETE_ON_DISCONNECT, 1000000));
+        unsigned i;
+        for (i=0;i<5;i++)
+            id[i] = conn->subscribe(*new CCSub(n*1000+i,count));
+        s.append("/testprop");
+        for (;i<10;i++)
+            id[i] = querySDS().subscribe(s.str(),*new CCSub(n*1000+i,count),false,true);
+        count = 0;
+        start();
+    }
+
+    virtual int run()
+    {
+        unsigned i;
+        for (i = 0;i<10; i++) {
+            conn->queryRoot()->setPropInt("testprop", (i*17+n*21)%100);
+            conn->commit();
+            for (unsigned j=0;j<1000;j++) {
+                {
+                    CriticalBlock block(subchangesect);
+                    if (count>=(i+1)*10)
+                        break;
+                }
+                Sleep(10);
+            }
+        }
+        stopsem.wait();
+        for (i=0;i<10;i++)
+            conn->unsubscribe(id[i]);
+        return 0;
+    }
+};
+
+extern void dispFDesc(IFileDescriptor *fdesc);
+
+extern void runDafsTest();
+
+struct RunOptions {
+    char *match;
+    char *dali;
+    bool full;
+};
+
+static void usage(const char *error=NULL)
+{
+    if (error)
+        printf("ERROR: %s\n", error);
+    printf("usage: DAREGRESS <dali-host> [ test-name-match ] [ full ]\n");
+    exit(-1);
+}
+
+static RunOptions read_cmdline(int argc, char **argv)
+{
+    StringBuffer cmd;
+    splitFilename(argv[0], NULL, NULL, &cmd, NULL);
+    StringBuffer lf;
+    openLogFile(lf, cmd.toLowerCase().append(".log").str());
+
+    RunOptions opt;
+    opt.dali = argv[1];
+    opt.full = false;
+    opt.match = 0;
+    for (unsigned i=2;i<argc;i++) {
+        if (stricmp(argv[i],"full")==0)
+            opt.full = 1;
+        else
+            if (!opt.match)
+                opt.match = argv[i];
+            else
+                usage("Argument not recognised, too many matches");
+    }
+    return opt;
+}
+
+struct ReleaseAtomBlock { ~ReleaseAtomBlock() { releaseAtoms(); } };
+
+static IFileDescriptor *createFileDescriptor(const char* dir, const char* name, unsigned parts, unsigned recSize, unsigned index=0)
+{
+    Owned<IPropertyTree> pp = createPTree("Part");
+    Owned<IFileDescriptor>fdesc = createFileDescriptor();
+    fdesc->setDefaultDir(dir);
+    StringBuffer s;
+    for (unsigned k=0;k<parts;k++) {
+        s.clear().append("192.168.1.10");
+        Owned<INode> node = createINode(s.str());
+        pp->setPropInt64("@size",recSize);
+        s.clear().append(name);
+        if (index)
+            s.append(index);
+        s.append("._").append(k+1).append("_of_").append(parts);
+        fdesc->setPart(k,node,s.str(),pp);
+    }
+    fdesc->queryProperties().setPropInt("@recordSize",recSize);
+    return fdesc.getClear();
+}
+
+// ======================================================================= Test cases
+
+static void testSDSRW()
 {
-    printf("Test SDS read/write\n");
     Owned<IPropertyTree> ref = createPTree("DAREGRESS");
     fn(1,2,3,0,ref);
     StringBuffer refstr;
@@ -185,17 +344,16 @@ static void test1()
         ERROR("RTM_DELETE_ON_DISCONNECT failed");
 }
 
-
-static void test2() 
+static void testDFS()
 {
+    IDistributedFileDirectory &dir = queryDistributedFileDirectory();
     const size32_t recsize = 17;
-    printf("Test DFS\n");
     StringBuffer s;
     unsigned i;
     unsigned n;
     unsigned t;
     queryNamedGroupStore().remove("daregress_group");
-    queryDistributedFileDirectory().removeEntry("daregress::superfile1");
+    dir.removeEntry("daregress::superfile1");
     SocketEndpointArray epa;
     for (n=0;n<400;n++) {
         s.clear().append("192.168.").append(n/256).append('.').append(n%256);
@@ -215,7 +373,7 @@ static void test2()
     for (i=0;i<100;i++) {
         Owned<IPropertyTree> pp = createPTree("Part");
         Owned<IFileDescriptor>fdesc = createFileDescriptor();
-        fdesc->setDefaultDir("c:\\thordata\\regress");
+        fdesc->setDefaultDir("thordata/regress");
         n = 9;
         for (unsigned k=0;k<400;k++) {
             s.clear().append("192.168.").append(n/256).append('.').append(n%256);
@@ -227,9 +385,9 @@ static void test2()
         }
         fdesc->queryProperties().setPropInt("@recordSize",17);
         s.clear().append("daregress::test").append(i);
-        queryDistributedFileDirectory().removeEntry(s.str());
+        dir.removeEntry(s.str());
         StringBuffer cname;
-        Owned<IDistributedFile> dfile = queryDistributedFileDirectory().createNew(fdesc);
+        Owned<IDistributedFile> dfile = dir.createNew(fdesc);
         if (stricmp(dfile->getClusterName(0,cname),"daregress_group")!=0)
             ERROR1("Cluster name wrong %d",i);
         s.clear().append("daregress::test").append(i);
@@ -240,9 +398,9 @@ static void test2()
     t = 33;
     for (i=0;i<100;i++) {
         s.clear().append("daregress::test").append(t);
-        if (!queryDistributedFileDirectory().exists(s.str())) 
+        if (!dir.exists(s.str()))
             ERROR1("Could not find %s",s.str());
-        Owned<IDistributedFile> dfile = queryDistributedFileDirectory().lookup(s.str());
+        Owned<IDistributedFile> dfile = dir.lookup(s.str());
         if (!dfile) {
             ERROR1("Could not find %s",s.str());
             continue;
@@ -287,7 +445,7 @@ static void test2()
     __int64 crctot = 0;
     unsigned np = 0;
     unsigned totrows = 0;
-    Owned<IDistributedFileIterator> fiter = queryDistributedFileDirectory().getIterator("daregress::*",false); 
+    Owned<IDistributedFileIterator> fiter = dir.getIterator("daregress::*",false);
     Owned<IDistributedFilePartIterator> piter;
     ForEach(*fiter) {
         piter.setown(fiter->query().getIterator()); 
@@ -307,13 +465,13 @@ static void test2()
     fiter.clear();
     printf("DFile iterate done     - %d parts, %d rows, CRC sum %"I64F"d\n",np,totrows,crctot);
     Owned<IDistributedSuperFile> sfile;
-    sfile.setown(queryDistributedFileDirectory().createSuperFile("daregress::superfile1",true));
+    sfile.setown(dir.createSuperFile("daregress::superfile1",true));
     for (i = 0;i<100;i++) {
         s.clear().append("daregress::test").append(i);
         sfile->addSubFile(s.str());
     }
     sfile.clear();
-    sfile.setown(queryDistributedFileDirectory().lookupSuperFile("daregress::superfile1"));
+    sfile.setown(dir.lookupSuperFile("daregress::superfile1"));
     if (!sfile) {
         ERROR("Could not find added superfile");
         return;
@@ -346,28 +504,28 @@ static void test2()
         ERROR1("Superfile size does not match part sum %d",tr);
     sfile->detach();
     sfile.clear();
-    sfile.setown(queryDistributedFileDirectory().lookupSuperFile("daregress::superfile1"));
+    sfile.setown(dir.lookupSuperFile("daregress::superfile1"));
     if (sfile)
         ERROR("Superfile deletion failed");
     t = 37;
     for (i=0;i<100;i++) {
         s.clear().append("daregress::test").append(t);
         if (i%1) {
-            Owned<IDistributedFile> dfile = queryDistributedFileDirectory().lookup(s.str());
+            Owned<IDistributedFile> dfile = dir.lookup(s.str());
             if (!dfile)
                 ERROR1("Could not find %s",s.str());
             dfile->detach();
         }
         else 
-            queryDistributedFileDirectory().removeEntry(s.str());
+            dir.removeEntry(s.str());
         t = (t+37)%100; 
     }
     printf("DFile removal complete\n");
     t = 39;
     for (i=0;i<100;i++) {
-        if (queryDistributedFileDirectory().exists(s.str()))
+        if (dir.exists(s.str()))
             ERROR1("Found %s after deletion",s.str());
-        Owned<IDistributedFile> dfile = queryDistributedFileDirectory().lookup(s.str());
+        Owned<IDistributedFile> dfile = dir.lookup(s.str());
         if (dfile)
             ERROR1("Found %s after deletion",s.str());
         t = (t+39)%100; 
@@ -378,186 +536,120 @@ static void test2()
         ERROR("Named group not removed");
 }
 
-void testSubscription(bool subscriber, int subs, int comms)
+static void testDFSTrans()
 {
-    class TestSubscription : public CInterface, implements ISDSSubscription
-    {
-    public:
-        IMPLEMENT_IINTERFACE;
-        virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
-        {
-            PrintLog("Notification(%"I64F"x) of %s - flags = %d",(__int64) id, xpath, flags);
-            if (valueData)
-            {
-                StringBuffer data;
-                appendURL(&data, (const char *)valueData, valueLen, 0);
-                PrintLog("ValueData = %s", data.str());
-            }
-        }
-    };
+    IDistributedFileDirectory &dir = queryDistributedFileDirectory();
+    Owned<IDistributedFileTransaction> transaction = createDistributedFileTransaction();
 
-
-    unsigned subscriptions = subs==-1?400:subs;
-    unsigned commits = comms==-1?400:comms;
-    unsigned i;
-    if (subscriber) {
-            TestSubscription **subs = (TestSubscription **) alloca(sizeof(TestSubscription *)*subscriptions);
-            SubscriptionId *ids = (SubscriptionId *) alloca(sizeof(SubscriptionId)*subscriptions);
-            for (i=0; i<subscriptions; i++){
-                    subs[i] = new TestSubscription;
-                    PrintLog("Subscribe %d",i);
-                    StringBuffer key;
-                    key.append("/TESTS/TEST").append(i);
-                    ids[i] = querySDS().subscribe(key.str(), *subs[i], true);
-            }
-            PrintLog("paused 1");
-            getchar();
-            for (i=0; i<subscriptions; i++)  {
-                    querySDS().unsubscribe(ids[i]);
-                    subs[i]->Release();
-            }
-            PrintLog("paused");
-            getchar();
+    // Prepare - MORE - Change this when create/remove file is part of transactions
+    printf("Cleaning up 'regress::trans' scope\n");
+    if (dir.exists("regress::trans::super1",false,true) && !dir.removeEntry("regress::trans::super1")) {
+        ERROR("Can't remove super1");
+        return;
     }
-    else {
-            Owned<IRemoteConnection> conn = querySDS().connect("/DAREGRESS",myProcessSession(), RTM_CREATE_QUERY, 1000000);
-            IPropertyTree *root = conn->queryRoot();
-            unsigned i, _i;
-
-            for (_i=0; _i<commits; _i++) {
-                i = _i%subscriptions;
-                    StringBuffer key;
-                    key.append("TEST").append(i);
-                    root->setPropTree(key.str(), createPTree());
-
-            }
-            conn->commit();
-
-            PrintLog("paused 1");
-
-            getchar();
-            for (_i=0; _i<commits; _i++) {
-                i = _i%subscriptions;
-                    StringBuffer key;
-                    key.append("TEST").append(i).append("/index");
-                    root->setPropInt(key.str(), i);
-                    PrintLog("Commit %d", i);
-                    conn->commit();
-            }
-            PrintLog("paused 2");
-            getchar();
-            for (_i=0; _i<commits; _i++) {
-                i = _i%subscriptions;
-                    StringBuffer key;
-                    key.append("TEST").append(i).append("/index");
-                    root->setPropInt(key.str(), subscriptions-i);            
-                    conn->commit();
-            }
-            PrintLog("paused 3");
-            getchar();
-            for (_i=0; _i<commits; _i++) {
-                i = _i%subscriptions;
-                    StringBuffer key;
-                    key.append("TEST").append(i).append("/index");
-                    root->setPropInt(key.str(), i);         
-                    conn->commit();
-            }
+    if (dir.exists("regress::trans::super2",false,true) && !dir.removeEntry("regress::trans::super2")) {
+        ERROR("Can't remove super2");
+        return;
+    }
+    if (dir.exists("regress::trans::super3",false,true) && !dir.removeEntry("regress::trans::super3")) {
+        ERROR("Can't remove super3");
+        return;
     }
-}
-
-static __int64 subchangetotal;
-static unsigned subchangenum;
-static CriticalSection subchangesect;
-
-class CCSub : public CInterface, implements ISDSConnectionSubscription, implements ISDSSubscription
-{
-    unsigned n;
-    unsigned &count;
-public:
-    IMPLEMENT_IINTERFACE;
 
-    CCSub(unsigned _n,unsigned &_count) 
-        : count(_count)
-    { 
-        n = _n;
+    if (dir.exists("regress::trans::sub1",true,false) && !dir.removeEntry("regress::trans::sub1")) {
+        ERROR("Can't remove sub1");
+        return;
     }
-    virtual void notify()
-    {
-        CriticalBlock block(subchangesect);
-        subchangetotal += n;
-        subchangenum++;
-        count++;
+    printf("Creating 'regress::trans' subfiles(1,4)\n");
+    Owned<IFileDescriptor> sub1 = createFileDescriptor("regress::trans", "sub1", 3, 17);
+    Owned<IDistributedFile> dsub1 = dir.createNew(sub1);
+    dsub1->attach("regress::trans::sub1");
+    dsub1.clear();
+
+    if (dir.exists("regress::trans::sub1",true,false) && !dir.removeEntry("regress::trans::sub2")) {
+        ERROR("Can't remove sub2");
+        return;
     }
-    virtual void notify(SubscriptionId id, const char *xpath, SDSNotifyFlags flags, unsigned valueLen, const void *valueData)
-    {
-        CriticalBlock block(subchangesect);
-        subchangetotal += n;
-        subchangenum++;
-        subchangetotal += (unsigned)flags;
-        subchangetotal += crc32(xpath,strlen(xpath),0);
-        if (valueLen) {
-            subchangetotal += crc32((const char *)valueData,valueLen,0);
-        }
-        count++;
+    Owned<IFileDescriptor> sub2 = createFileDescriptor("regress::trans", "sub2", 3, 17);
+    Owned<IDistributedFile> dsub2 = dir.createNew(sub2);
+    dsub2->attach("regress::trans::sub2");
+    dsub2.clear();
 
+    if (dir.exists("regress::trans::sub1",true,false) && !dir.removeEntry("regress::trans::sub3")) {
+        ERROR("Can't remove sub3");
+        return;
     }
+    Owned<IFileDescriptor> sub3 = createFileDescriptor("regress::trans", "sub3", 3, 17);
+    Owned<IDistributedFile> dsub3 = dir.createNew(sub3);
+    dsub3->attach("regress::trans::sub3");
+    dsub3.clear();
 
-};
-
-class CChange : public Thread
-{
-    Owned<IRemoteConnection> conn;
-    Owned<CCSub> sub;
-    StringAttr path;
-    SubscriptionId id[10];
-    unsigned n;
-    unsigned count;
-
-public:
-    Semaphore stopsem;
-    CChange(unsigned _n) 
-    {
-        n = _n;
-        StringBuffer s("/DAREGRESS/CONSUB");
-        s.append(n+1);
-        path.set(s.str());
-        conn.setown(querySDS().connect(path, myProcessSession(), RTM_CREATE|RTM_DELETE_ON_DISCONNECT, 1000000));
-        unsigned i;
-        for (i=0;i<5;i++)  
-            id[i] = conn->subscribe(*new CCSub(n*1000+i,count));
-        s.append("/testprop");
-        for (;i<10;i++) 
-            id[i] = querySDS().subscribe(s.str(),*new CCSub(n*1000+i,count),false,true);
-        count = 0;
-        start();
+    if (dir.exists("regress::trans::sub1",true,false) && !dir.removeEntry("regress::trans::sub4")) {
+        ERROR("Can't remove sub4");
+        return;
     }
-
-    virtual int run()
-    {
-        unsigned i;
-        for (i = 0;i<10; i++) {
-            conn->queryRoot()->setPropInt("testprop", (i*17+n*21)%100);
-            conn->commit();
-            for (unsigned j=0;j<1000;j++) {
-                {   
-                    CriticalBlock block(subchangesect);
-                    if (count>=(i+1)*10) 
-                        break;
-                }
-                Sleep(10);
-            }
-        }
-        stopsem.wait();
-        for (i=0;i<10;i++) 
-            conn->unsubscribe(id[i]);
-        return 0;
+    Owned<IFileDescriptor> sub4 = createFileDescriptor("regress::trans", "sub4", 3, 17);
+    Owned<IDistributedFile> dsub4 = dir.createNew(sub4);
+    dsub4->attach("regress::trans::sub4");
+    dsub4.clear();
+
+    // Auto-commit
+    printf("Auto-commit test (inactive transaction)\n");
+    Owned<IDistributedSuperFile> sfile1 = dir.createSuperFile("regress::trans::super1", false, false, NULL, transaction);
+    sfile1->addSubFile("regress::trans::sub1", false, NULL, false, transaction);
+    sfile1->addSubFile("regress::trans::sub2", false, NULL, false, transaction);
+    sfile1.clear();
+    sfile1.setown(dir.lookupSuperFile("regress::trans::super1", NULL, transaction));
+    if (!sfile1.get())
+        ERROR("non-transactional add super1 failed");
+    if (sfile1->numSubFiles() != 2)
+        ERROR("auto-commit add sub failed, not all subs were added");
+    else {
+        if (strcmp(sfile1->querySubFile(0).queryLogicalName(), "regress::trans::sub1") != 0)
+            ERROR("auto-commit add sub failed, wrong name for sub1");
+        if (strcmp(sfile1->querySubFile(1).queryLogicalName(), "regress::trans::sub2") != 0)
+            ERROR("auto-commit add sub failed, wrong name for sub2");
     }
-};
-
+    sfile1.clear();
+
+    // Rollback
+    printf("Rollback test (active transaction)\n");
+    transaction->start();
+    Owned<IDistributedSuperFile> sfile2 = dir.createSuperFile("regress::trans::super2", false, false, NULL, transaction);
+    sfile2->addSubFile("regress::trans::sub3", false, NULL, false, transaction);
+    sfile2->addSubFile("regress::trans::sub4", false, NULL, false, transaction);
+    transaction->rollback();
+    if (sfile2->numSubFiles() != 0)
+        ERROR("transactional rollback failed, some subs were added");
+    sfile2.clear();
+    sfile2.setown(dir.lookupSuperFile("regress::trans::super2", NULL, transaction));
+    if (sfile2.get())
+        ERROR("transactional rollback super2 failed, it exists!");
+
+    // Commit - FIXME - adding the superfile inside the transaction exposed a flaw in the DFS that only happens in this case
+    printf("Commit test (active transaction)\n");
+    Owned<IDistributedSuperFile> sfile3 = dir.createSuperFile("regress::trans::super3", false, false, NULL, transaction);
+    transaction->start();
+    sfile3->addSubFile("regress::trans::sub3", false, NULL, false, transaction);
+    sfile3->addSubFile("regress::trans::sub4", false, NULL, false, transaction);
+    transaction->commit();
+    sfile3.clear();
+    sfile3.setown(dir.lookupSuperFile("regress::trans::super3", NULL, transaction));
+    if (!sfile3.get())
+        ERROR("transactional add super3 failed");
+    if (sfile3->numSubFiles() != 2)
+        ERROR("transactional add sub failed, not all subs were added");
+    else {
+        if (strcmp(sfile3->querySubFile(0).queryLogicalName(), "regress::trans::sub3") != 0)
+            ERROR("transactional add sub failed, wrong name for sub3");
+        if (strcmp(sfile3->querySubFile(1).queryLogicalName(), "regress::trans::sub4") != 0)
+            ERROR("transactional add sub failed, wrong name for sub4");
+    }
+    sfile3.clear();
+}
 
-void test3()
+void testSDSSubs()
 {
-    printf("Test SDS subscriptions\n");
     subchangenum = 0;
     subchangetotal = 0;
     IArrayOf<CChange> a;
@@ -620,64 +712,85 @@ void testReadAllSDS()
     printf("Test SDS connecting to every branch\n");
     testReadBranch("/");
     printf("Connected to every branch\n");
-
 }
 
+// ======================================================================= Test Engine
+
+struct TestArray {
+    char *name;
+    void(*func)();
+    TestArray *next;
+} Tests = { NULL, NULL, NULL };
+TestArray *currentTest = &Tests;
 
-static bool runTest(unsigned i)
+static void registerTest(const char* name, void(*fp)())
 {
-    try {
-        switch (i) {
-        case 1: test1(); break;
-        case 2: test2(); break;
-        case 3: test3(); break;
-        default:
-            return false;
-        }
-        printf("Test #%d complete\n",i);
-    }
-    catch (IException *e) {
-        StringBuffer s;
-        s.append("DAREGRESS Test #").append(i).append(" Exception");
-        pexception(s.str(),e);
+    assert(name && fp && "Test must have names and a function to run");
+    if (currentTest->func) {
+        TestArray *t = new TestArray();
+        t->name = (char*)name;
+        t->func = fp;
+        t->next = 0;
+        currentTest->next = t;
+        currentTest = t;
+    } else {
+        currentTest->name = (char*)name;
+        currentTest->func = fp;
+        currentTest->next = 0;
     }
-    return true; 
 }
 
-extern void runDafsTest();
-
-
-static void usage(const char *error=NULL)
+static int runTest(const char* match)
 {
-    if (error) 
-        printf("ERROR: %s\n", error);
-    printf("usage: DAREGRESS <dali-ip>"); 
-    printf("or:    DAREGRESS <dali-ip> { test-number> } [ full ]"); 
+    int n=0;
+    currentTest = &Tests;
+    while (currentTest) {
+        if (match && strstr(currentTest->name, match) == 0) {
+            currentTest = currentTest->next;
+            continue;
+        }
+        printf("\n ++ Running test '%s'\n",currentTest->name);
+        try {
+            currentTest->func();
+        } catch (IException *e) {
+            StringBuffer s;
+            s.append(" ++ Test Exception");
+            pexception(s.str(),e);
+            errorcount++;
+        }
+        n++;
+        printf(" ++ Test '%s' complete\n",currentTest->name);
+        currentTest = currentTest->next;
+    }
+    return n;
 }
 
-struct ReleaseAtomBlock { ~ReleaseAtomBlock() { releaseAtoms(); } };
-
+void initTests() {
+    registerTest("SDS read/write", testSDSRW);
+    registerTest("DFS basics", testDFS);
+    registerTest("DFS transaction", testDFSTrans);
+    registerTest("SDS subscriptions", testSDSSubs);
+}
 
 int main(int argc, char* argv[])
 {   
+    if (argc<2) {
+        usage();
+        return 1;
+    }
     ReleaseAtomBlock rABlock;
     InitModuleObjects();
-
     EnableSEHtoExceptionMapping();
+    RunOptions opt = read_cmdline(argc, argv);
+    initTests();
+    unsigned nt = 0;
+    printf("Dali Regression Tests\n");
+    printf("=====================\n");
 
     try {
-        StringBuffer cmd;
-        splitFilename(argv[0], NULL, NULL, &cmd, NULL);
-        StringBuffer lf;
-        openLogFile(lf, cmd.toLowerCase().append(".log").str());
-        if (argc<2) {
-            usage();
-            return 1;
-        }
-        printf("Dali Regression Tests\n");
-        printf("=====================\n");
+        printf(" ++ Connecting to Dali server at %s\n", opt.dali);
         SocketEndpoint ep;
-        ep.set(argv[1],DALI_SERVER_PORT);
+        ep.set(opt.dali,DALI_SERVER_PORT);
         if (ep.isNull()) {
             usage("could not resolve dali server IP");
             return 1;
@@ -686,47 +799,26 @@ int main(int argc, char* argv[])
         epa.append(ep);
         Owned<IGroup> group = createIGroup(epa); 
         initClientProcess(group, DCR_Other);
+
+        // Test
         errorcount = 0;
-        unsigned nt = 0;
-        bool done=false;
-        bool full=false;
-        loop {
-            for (unsigned i=2;i<argc;i++) {
-                if (stricmp(argv[i],"test")==0) {
-                    runDafsTest();
-                    done = true;
-                    break;
-                }
-                else if (stricmp(argv[i],"full")==0)
-                    full = true;
-                else {
-                    unsigned n = atoi(argv[i]);
-                    if (n) {
-                        done = true;
-                        runTest(n);
-                    }
-                }
-            }
-            if (!done) {
-                for (unsigned i=1;;i++) {
-                    if (!runTest(i)) 
-                        break;
-                    nt++;
-                }
-            }
-            if (full) {
-                testReadAllSDS(); 
-            }
-            printf("%d Test%s completed, %d error%s reported\n",nt,(nt!=1)?"s":"",errorcount,(errorcount!=1)?"s":"");
-            break;
+        if (opt.full) {
+            testReadAllSDS();
+            nt++;
         }
+        nt += runTest(opt.match);
+
+        // Cleanup
         closedownClientProcess();
         setNodeCaching(false);
     }
     catch (IException *e) {
-        pexception("DAREGRESS Exception",e);
+        pexception(" ++ DAREGRESS Exception",e);
+        errorcount++;
     }
-    
-    return 0;
+    printf("\nDali Regression Tests Report\n");
+    printf(" ++ Tests completed: %3d\n", nt);
+    printf(" ++ Errors reported: %3d\n", errorcount);
+    return errorcount;
 }