Преглед изворни кода

Merge branch 'candidate-6.4.16'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman пре 7 година
родитељ
комит
250598c1b4

+ 30 - 3
common/remote/sockfile.cpp

@@ -5381,7 +5381,7 @@ public:
                 CDateTime dt;
                 iFile->getTime(nullptr, &dt, nullptr);
                 dt.serialize(reply);
-                reply.append(iFile->queryFilename());
+                reply.append(mask);
                 b = 0;
                 reply.append(b);
             }
@@ -6883,6 +6883,27 @@ protected:
         iFile2->remove();
         iFile->move(subDirFilePath);
 
+        // open sub-directory file2 explicitly
+        RemoteFilename rfn;
+        rfn.setRemotePath(subDirPath.str());
+        Owned<IFile> dir = createIFile(rfn);
+        Owned<IDirectoryIterator> diriter = dir->directoryFiles("file2");
+        if (!diriter->first())
+        {
+            CPPUNIT_ASSERT_MESSAGE("Error, file2 diriter->first() is null", 0);
+        }
+
+        Linked<IFile> iFile3 = &diriter->query();
+        diriter.clear();
+        dir.clear();
+
+        OwnedIFileIO iFile3IO = iFile3->openShared(IFOread, IFSHfull);
+        if (!iFile3IO)
+        {
+            CPPUNIT_ASSERT_MESSAGE("Error, file2 openShared() failed", 0);
+        }
+        iFile3IO->close();
+
         // count sub-directory files with a wildcard
         unsigned count=0;
         Owned<IDirectoryIterator> iter = subDirIFile->directoryFiles("*2");
@@ -6907,9 +6928,15 @@ protected:
         CPPUNIT_ASSERT(subDirIFile->getTime(&createTime, &modifiedTime, &accessedTime));
         CPPUNIT_ASSERT(modifiedTime == newModifiedTime);
 
-
         // test set file permissions
-        iFile2->setFilePermissions(0777);
+        try
+        {
+            iFile2->setFilePermissions(0777);
+        }
+        catch (...)
+        {
+            CPPUNIT_ASSERT_MESSAGE("iFile2->setFilePermissions() exception", 0);
+        }
     }
     void testConfiguration()
     {

+ 25 - 34
dali/ft/filecopy.cpp

@@ -1259,7 +1259,7 @@ IFormatPartitioner * FileSprayer::createPartitioner(aindex_t index, bool calcOut
 
 void FileSprayer::examineCsvStructure()
 {
-    if (srcAttr->hasProp("ECL"))
+    if (srcAttr && srcAttr->hasProp("ECL"))
         // Already has, keep it.
         return;
 
@@ -3022,6 +3022,7 @@ bool FileSprayer::isSameSizeHeaderFooter()
 
 void FileSprayer::updateTargetProperties()
 {
+    TimeSection timer("FileSprayer::updateTargetProperties() time");
     Owned<IException> error;
     if (distributedTarget)
     {
@@ -3246,17 +3247,13 @@ void FileSprayer::updateTargetProperties()
                 // add original file name from a single distributed source (like Copy)
                 RemoteFilename remoteFile;
                 distributedSource->queryPart(0).getFilename(remoteFile, 0);
-                splitAndStoreFileInfo(newRecord, remoteFile);
+                splitAndCollectFileInfo(newRecord, remoteFile);
             }
             else
             {
-                // add original file names from multiple sources (like Spray)
-                ForEachItemIn(idx, sources)
-                {
-                    FilePartInfo & curSource = sources.item(idx);
-                    RemoteFilename &remoteFile = curSource.filename;
-                    splitAndStoreFileInfo(newRecord, remoteFile, idx, false);
-                }
+                FilePartInfo & firstSource = sources.item((aindex_t)0);
+                RemoteFilename &remoteFile = firstSource.filename;
+                splitAndCollectFileInfo(newRecord, remoteFile, false);
             }
             curHistory->addPropTree("Origin",newRecord.getClear());
         }
@@ -3271,40 +3268,34 @@ void FileSprayer::updateTargetProperties()
         throw error.getClear();
 }
 
-void FileSprayer::splitAndStoreFileInfo(IPropertyTree * newRecord, RemoteFilename &remoteFileName,
-                                        aindex_t idx, bool isDistributedSource)
+
+void FileSprayer::splitAndCollectFileInfo(IPropertyTree * newRecord, RemoteFilename &remoteFileName,
+                                          bool isDistributedSource)
 {
     StringBuffer drive;
     StringBuffer path;
-    StringBuffer fileName;
+    StringBuffer tail;
     StringBuffer ext;
-    remoteFileName.split(&drive, &path, &fileName, &ext);
-    if (idx == 0)
-    {
-        if (drive.isEmpty())
-        {
-            remoteFileName.queryIP().getIpText(drive.clear());
-            newRecord->setProp("@ip", drive.str());
-        }
-        else
-            newRecord->setProp("@drive", drive.str());
+    remoteFileName.split(&drive, &path, &tail, &ext);
 
-        newRecord->setProp("@path", path.str());
+    if (drive.isEmpty())
+    {
+        remoteFileName.queryIP().getIpText(drive.clear());
+        newRecord->setProp("@ip", drive.str());
     }
+    else
+        newRecord->setProp("@drive", drive.str());
+
+    newRecord->setProp("@path", path.str());
+
     // We don't want to store distributed file parts name extension
     if (!isDistributedSource && ext.length())
-        fileName.append(ext);
+        tail.append(ext);
 
-    // In spray multiple source files case keep all original filenames
-    if (newRecord->hasProp("@name"))
-    {
-        StringBuffer currentName;
-        newRecord->getProp("@name", currentName);
-        currentName.append(",").append(fileName);
-        fileName = currentName;
-    }
-
-    newRecord->setProp("@name", fileName.str());
+    if (sources.ordinality()>1)
+        newRecord->setProp("@name", "[MULTI]");
+    else
+        newRecord->setProp("@name", tail.str());
 }
 
 void FileSprayer::setOperation(dfu_operation op)

+ 3 - 1
dali/ft/filecopy.ipp

@@ -275,7 +275,9 @@ protected:
 private:
     bool calcUsePull();
     // Get and store Remote File Name parts into the History record
-    void splitAndStoreFileInfo(IPropertyTree * newRecord, RemoteFilename &remoteFileName, aindex_t idx = 0, bool isDistributedSource = true);
+    void splitAndCollectFileInfo(IPropertyTree * newRecord, RemoteFilename &remoteFileName,
+                                 bool isDistributedSource = true);
+
 
 protected:
     CIArrayOf<FilePartInfo> sources;

+ 10 - 0
roxie/ccd/ccd.hpp

@@ -27,6 +27,7 @@
 #include "thorxmlwrite.hpp"
 #include "jlog.hpp"
 #include "jstats.h"
+#include "jset.hpp"
 #include "roxie.hpp"
 #include "roxiedebug.ipp"
 #include "eclrtl.hpp"
@@ -92,6 +93,8 @@ extern unsigned myNodeIndex;
 
 #define SUBCHANNEL_MASK 3
 #define SUBCHANNEL_BITS 2    // allows for up to 7-way redundancy in a 16-bit short retries flag, high bits used for indicators/flags
+#define MAX_SUBCHANNEL  7    // (16-2) / SUBCHANNEL_BITS
+
 //#define TIME_PACKETS
 
 #define ROXIE_FASTLANE      0x8000u         // mask in retries indicating slave reply goes on the fast queue
@@ -167,6 +170,13 @@ public:
     void setException();
     unsigned thisChannelRetries();
 
+    unsigned getRespondingSubChannel() const // NOTE - 0 based
+    {
+        dbgassertex(retries);
+        unsigned bitpos = countTrailingUnsetBits((unsigned) retries);
+        return bitpos / SUBCHANNEL_BITS;
+    }
+
     inline unsigned getSequenceId() const
     {
         return (((unsigned) overflowSequence) << 16) | (unsigned) continueSequence;

+ 7 - 8
roxie/ccd/ccdmain.cpp

@@ -970,17 +970,16 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
         {
             numChannels = numNodes;
             unsigned cyclicOffset = topology->getPropInt("@cyclicOffset", 1);
-            for (unsigned i=0; i<numNodes; i++)
+            for (unsigned copy=0; copy<numDataCopies; copy++)
             {
                 // Note this code is a little confusing - easy to get the cyclic offset backwards
                 // cyclic offset means node n+offset has copy 2 for channel n, so node n has copy 2 for channel n-offset
-                int channel = (int)i+1;
-                for (unsigned copy=0; copy<numDataCopies; copy++)
+                for (unsigned i=0; i<numNodes; i++)
                 {
-                    if (channel < 1)
+                    int channel = (int)i+1 - (copy * cyclicOffset);
+                    while (channel < 1)
                         channel = channel + numNodes;
                     addChannel(i, channel, copy);
-                    channel -= cyclicOffset;
                 }
             }
         }
@@ -989,11 +988,11 @@ int STARTQUERY_API start_query(int argc, const char *argv[])
             if (!channelsPerNode)
                 throw MakeStringException(MSGAUD_operator, ROXIE_INVALID_TOPOLOGY, "Invalid topology file - channelsPerNode should be > 0");
             numChannels = numNodes * channelsPerNode;
-            for (unsigned i=0; i<numNodes; i++)
+            for (unsigned copy=0; copy<channelsPerNode; copy++)
             {
-                int channel = (int)(i+1);
-                for (unsigned copy=0; copy<channelsPerNode; copy++)
+                for (unsigned i=0; i<numNodes; i++)
                 {
+                    int channel = (int)(i+1);
                     addChannel(i, channel, copy);
                     channel += numNodes;
                 }

+ 119 - 47
roxie/ccd/ccdqueue.cpp

@@ -34,12 +34,13 @@
 #include <cppunit/extensions/HelperMacros.h>
 #endif
 
-static unsigned channels[MAX_CLUSTER_SIZE];     // list of all channel numbers for this node
-static unsigned channelCount;                   // number of channels this node is doing
-static unsigned subChannels[MAX_CLUSTER_SIZE];  // maps channel numbers to subChannels for this node
-static unsigned numSlaves[MAX_CLUSTER_SIZE];    // number of slaves listening on this channel
-static unsigned replicationLevel[MAX_CLUSTER_SIZE];  // Which copy of the data this channel uses on this slave
-static unsigned liveSubChannels[MAX_CLUSTER_SIZE];   // Which subchannels are believed live for a given channel - technically probably should be atomic, but any races are benign enough that not worth it.
+static unsigned channels[MAX_CLUSTER_SIZE];              // Array [0..channelCount-1] of channels supported by this slave
+static unsigned channelCount;                            // number of channels supported by this slave
+
+//The following are all indexed by the channel number [1..numChannels]
+static unsigned subChannels[MAX_CLUSTER_SIZE];       // The subchannel that this node supports on the specified channel. NOTE - values in this array are 1-based
+static unsigned numSlaves[MAX_CLUSTER_SIZE];         // The total number of slaves in the system that process this channel
+static unsigned replicationLevel[MAX_CLUSTER_SIZE];  // Which copy of the data is held by this node, for a given channel - MORE - is this the same as subChannels[n]-1 ?  I suspect it is.
 
 static std::atomic<bool> suspendedChannels[MAX_CLUSTER_SIZE];
 
@@ -753,51 +754,121 @@ void SlaveContextLogger::flush()
 
 //=================================================================================
 
-bool isSubChannelAlive(unsigned channel, unsigned subChannel)
+/*
+ * IBYTI handling
+ *
+ * IBYTI (I beat you to it) messages are sent by the slave that is going to process a particular request,
+ * to tell the other slaves on the same channel not to bother.
+ *
+ * In order to reduce wasted work, for each request a "primary" subchannel is selected (based on a hash of the
+ * packet's RUID) - this channel will process the request immediately, but others will delay a little while
+ * in order to give the expected IBYTI time to arrive.
+ *
+ * The decision on how long to delay is a little complex - too long, and you end up losing the ability for a
+ * backup slave to step in when primary is under load (or dead); too short and you end up duplicating work.
+ * It's also important for the delay to be adaptive so that if a slave goes offline, the other slaves on the
+ * subchannel don't keep waiting for it to take its turn.
+ *
+ * The adaptiveness is handled by noting any time that we delay waiting for an IBYTI that does not arrive - this
+ * may mean that the slave(s) we expected to get there first are offline, and thus next time we don't wait quite
+ * so long for them. Conversely, any time an IBYTI does arrive from another slave on your channel, you know that
+ * it is online and so can reset the delay to its original value.
+ *
+ * A previous version of this code assumed a single missed IBYTI was enough to assume that a slave was dead and drop the
+ * delay for that slave to zero - this turned out to behave pretty poorly when under load, with much duplicated work.
+ * Thus we take care to adjust the delay more gradually, while still ending up with a zero delay if the buddy does not respond
+ * several times in a row.
+ */
+
+/*
+ * A "subchannel" is a value from 1 to 7 (with current settings) that indicates which "copy" of the data for this channel
+ * is being processed by this slave. A value of 0 would indicate that this slave does not have any data for this channel.
+ * In a typical 100-way roxie with cyclic redundancy, node 1 would be processing channel 1, subchannel 1, and channel 2,
+ * subchannel 2, node 2 would be processing channel 2, subchannel 1 and channel 3, subchannel 2, and so on u to node 100,
+ * which would process channel 100, subchannel 1 and channel 1, subchannel 2. The subChannels array tells me my subchannel
+ * for any given channel.
+ *
+ * To determine which subchannel is the "primary" for a given query packet, a hash value of fields from the packet header
+ * is used, modulo the number of subchannels on this channel. The slave on this subchannel will respond immediately.
+ * Slaves on other channels delay according to the subchannel number - so on a 4-way redundant system, if the primary
+ * subchannel is decided to be 2, the slave on subchannel 3 will delay by 1 ibytiDelay value, the slave on subchannel 4 by
+ * 2 values, and the slave on subchannel 1 by 3 values (this assumes all slaves are responding normally).
+ *
+ * In fact, the calculation is a little more complex, in that the "units" are adjusted per subchannel to take into account
+ * the responsiveness or otherwise of a subchannel. Initially, the delay value for each subchannel is the same, but any time
+ * a slave waits for an IBYTI that does not arrive on time, the delay value for any slave that is "more primary" than me for
+ * this packet is reduced. Any time an IBYTI _does_ arrive on time, the delay is reset to its initial value.
+ */
+
+class ChannelInfo
 {
-    unsigned mask = SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel);
-    return (liveSubChannels[channel] & mask) == mask;
-}
+public:
+    unsigned getIbytiDelay(unsigned primarySubChannel) const  // NOTE - zero-based
+    {
+        unsigned delay = 0;
+        unsigned subChannel = primarySubChannel;
+        while (subChannel != mySubChannel)
+        {
+            delay += currentDelay[subChannel];
+            subChannel++;
+            if (subChannel == numSubChannels)
+                subChannel = 0;
+        }
+        return delay;
+    }
 
-unsigned getIbytiDelay(unsigned channel, unsigned primarySubChannel)
-{
-    // MORE - adjust delay according to whether it's a retry, whether it was a broadcast etc
-    unsigned mySubChannel = subChannels[channel] - 1;
-    unsigned subChannel = primarySubChannel;
-    unsigned delay = 0;
-    while (subChannel != mySubChannel)
-    {
-        if (isSubChannelAlive(channel, subChannel))
-            delay++;
-        subChannel++;
-        if (subChannel >= numSlaves[channel])
-            subChannel = 0;
-    }
-    return delay * initIbytiDelay;
-}
+    void noteChannelsSick(unsigned primarySubChannel)
+    {
+        unsigned subChannel = primarySubChannel;
+        while (subChannel != mySubChannel)
+        {
+            unsigned newDelay = currentDelay[subChannel] / 2;
+            if (newDelay < minIbytiDelay)
+                newDelay = minIbytiDelay;
+            currentDelay[subChannel] = newDelay;
+            subChannel++;
+            if (subChannel == numSubChannels)
+                subChannel = 0;
+        }
+    }
 
-void notePrimarySubChannelsDead(unsigned channel, unsigned primarySubChannel)
-{
-    unsigned mySubChannel = subChannels[channel] - 1;
-    unsigned subChannel = primarySubChannel;
-    while (subChannel != mySubChannel)
+    void noteChannelHealthy(unsigned subChannel)
     {
-        liveSubChannels[channel] &= ~(SUBCHANNEL_MASK << (SUBCHANNEL_BITS * subChannel));
-        subChannel++;
-        if (subChannel >= numSlaves[channel])
-            subChannel = 0;
+        currentDelay[subChannel] = initIbytiDelay;
     }
-}
 
+    void init(unsigned _subChannel, unsigned _numSubChannels)
+    {
+        mySubChannel = _subChannel;
+        numSubChannels = _numSubChannels;
+        for (unsigned i = 0; i < numSubChannels; i++)
+            currentDelay[i] = initIbytiDelay;
+    }
+private:
+    unsigned mySubChannel = 0;     // Which subChannel does this node implement for this channel - zero-based
+    unsigned numSubChannels = 0;   // How many subchannels are there for this channel, across all slaves
+    unsigned currentDelay[MAX_SUBCHANNEL] = {0};  // NOTE - technically should be atomic, but in the event of a race we don't really care who wins
+};
+
+static ChannelInfo channelInfo[MAX_CLUSTER_SIZE];
+
+/*
+ * Determine whether to abort on receipt of an IBYTI for a packet which I have already started processing
+ * As I will also have sent out an IBYTI, I should only abort if the sender of the IBYTI has higher priority
+ * for this packet than I do.
+ *
+ * MORE - move this function into ChannelInfo class.
+ */
 bool otherSlaveHasPriority(const RoxiePacketHeader &h)
 {
     unsigned hash = h.priorityHash();
-    unsigned primarySubChannel = (hash % numSlaves[h.channel]) + 1;
-    unsigned mySubChannel = subChannels[h.channel];
+    unsigned primarySubChannel = (hash % numSlaves[h.channel]);
+    unsigned mySubChannel = subChannels[h.channel] - 1;
+    unsigned otherSlaveSubChannel = h.getRespondingSubChannel();
+    // could be coded smarter! Basically mysub - prim < theirsub - prim using modulo arithmetic, I think
     while (primarySubChannel != mySubChannel)
     {
-        unsigned channelMask = SUBCHANNEL_MASK << (SUBCHANNEL_BITS * primarySubChannel);
-        if ((h.retries & ROXIE_RETRIES_MASK) == channelMask)
+        if (primarySubChannel == otherSlaveSubChannel)
             return true;
         primarySubChannel++;
         if (primarySubChannel >= numSlaves[h.channel])
@@ -1302,10 +1373,10 @@ public:
                 if (subChannels[channel] != 1) 
                     primChannel = false;
                 unsigned hdrHashVal = header.priorityHash();
-                unsigned primarySubChannel = (hdrHashVal % numSlaves[channel]) + 1;
-                if (primarySubChannel != subChannels[channel])
+                unsigned primarySubChannel = (hdrHashVal % numSlaves[channel]);
+                if (primarySubChannel != subChannels[channel]-1)
                 {
-                    unsigned delay = getIbytiDelay(channel, primarySubChannel);
+                    unsigned delay = channelInfo[channel].getIbytiDelay(primarySubChannel);
                     if (logctx.queryTraceLevel() > 6)
                     {
                         StringBuffer x;
@@ -1319,7 +1390,7 @@ public:
                     {
                         ibytiSem.wait(delay);
                         if (!abortJob)
-                            notePrimarySubChannelsDead(channel, primarySubChannel);
+                            channelInfo[channel].noteChannelsSick(primarySubChannel);
                         if (logctx.queryTraceLevel() > 8)
                         {
                             StringBuffer x;
@@ -1626,8 +1697,8 @@ public:
             assertex(subChannels[channel] == 0);
             assertex(subChannel != 0);
             subChannels[channel] = subChannel;
-            liveSubChannels[channel] = 0;  // Until proven otherwise
             channels[channelCount++] = channel;
+            channelInfo[channel].init(subChannel-1, numSlaves[channel]);
         }
     }
 
@@ -2120,7 +2191,8 @@ public:
         }
         else
         {
-            if ((header.retries & ROXIE_RETRIES_MASK) == header.getSubChannelMask(header.channel))
+            unsigned subChannel = header.getRespondingSubChannel();
+            if (subChannel == subChannels[header.channel] - 1)
             {
                 if (traceLevel > 10)
                     DBGLOG("doIBYTI packet was from self");
@@ -2128,7 +2200,7 @@ public:
             }
             else
             {
-                liveSubChannels[header.channel] |= header.retries & ROXIE_RETRIES_MASK;
+                channelInfo[header.channel].noteChannelHealthy(subChannel);
                 bool foundInQ = queue.remove(header);
                 if (foundInQ)
                 {

+ 1 - 1
roxie/ccd/ccdserver.cpp

@@ -4716,7 +4716,7 @@ public:
                         // MORE - ROXIE_ALIVE perhaps should go here too
                         default:
                             if (ctxTraceLevel > 3)
-                                activity.queryLogCtx().CTXLOG("Discarding packet %p - original %p is NULL or has result already", mr.get(), original);
+                                activity.queryLogCtx().CTXLOG("Discarding packet %p %x - original %p is NULL or has result already", mr.get(), header.activityId, original);
                             mr->discard();
                             break;
                     }

+ 11 - 0
testing/regress/ecl/key/spray_replicate_test.xml

@@ -0,0 +1,11 @@
+<Dataset name='Result 1'>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><result>Despray Pass</result></Row>
+</Dataset>
+<Dataset name='Result 3'>
+ <Row><result>Spray Pass</result></Row>
+</Dataset>
+<Dataset name='Result 4'>
+ <Row><result>Replicate Pass</result></Row>
+</Dataset>

+ 146 - 0
testing/regress/ecl/spray_replicate_test.ecl

@@ -0,0 +1,146 @@
+/*##############################################################################
+
+    Copyright (C) 2018 HPCC Systems®.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+//noHThor
+
+//class=spray
+
+import std.system.thorlib;
+import Std.File AS FileServices;
+
+dropzonePath := '/var/lib/HPCCSystems/mydropzone/' : STORED('dropzonePath');
+espIpPort := 'http://127.0.0.1:8010/FileSpray' : STORED('espIpPort');
+engine := thorlib.platform();
+prefix := '~regress::' + engine + '-';
+suffix := '-' + WORKUNIT;
+
+sprayPrepFileName := prefix + 'spray_prep' + suffix;;
+desprayOutFileName := dropzonePath + 'file_for_spray' + suffix;
+sprayOutFileName := prefix + 'spray_wo_replication' + suffix;
+sprayDestGroup := thorlib.group();
+
+unsigned VERBOSE := 0;
+
+Layout_Person := RECORD
+  STRING3  name;
+  UNSIGNED2 age;
+  BOOLEAN good;
+END;
+
+empty := DATASET([], Layout_Person);
+
+dsSetup := DATASET([ {'foo', 10, 1},
+                     {'bar', 12, 0},
+                     {'baz', 32, 1} ]
+            ,Layout_Person);
+
+//  Create a small logical file
+setup := output(dsSetup, , DYNAMIC(sprayPrepFileName), CSV, OVERWRITE);
+
+rec := RECORD
+  string result;
+  string msg;
+end;
+
+
+// Despray it to default drop zone
+rec despray(rec l) := TRANSFORM
+  SELF.msg := FileServices.fDespray(
+                       LOGICALNAME := sprayPrepFileName
+                      ,DESTINATIONIP := '.'
+                      ,DESTINATIONPATH := desprayOutFileName
+                      ,ALLOWOVERWRITE := True
+                      );
+  SELF.result := 'Despray Pass';
+end;
+
+dst1 := NOFOLD(DATASET([{'', ''}], rec));
+p1 := NOTHOR(PROJECT(NOFOLD(dst1), despray(LEFT)));
+c1 := CATCH(NOFOLD(p1), ONFAIL(TRANSFORM(rec,
+                                 SELF.result := 'Despray Fail',
+                                 SELF.msg := FAILMESSAGE
+                                )));
+#if (VERBOSE = 1)
+    desprayOut := output(c1);
+#else
+    desprayOut := output(c1, {result});
+#end
+
+rec spray(rec l) := TRANSFORM
+    SELF.msg := FileServices.fSprayDelimited(
+                        SOURCEIP := '.',
+                        SOURCEPATH := desprayOutFileName,
+                        DESTINATIONGROUP := sprayDestGroup,
+                        DESTINATIONLOGICALNAME := sprayOutFileName,
+                        TIMEOUT := -1,
+                        ESPSERVERIPPORT := espIpPort,
+                        ALLOWOVERWRITE := true,
+                        REPLICATE := false
+                        );
+    self.result := 'Spray Pass';
+end;
+
+
+dst2 := NOFOLD(DATASET([{'', ''}], rec));
+p2 := NOTHOR(PROJECT(NOFOLD(dst2), spray(LEFT)));
+c2 := CATCH(NOFOLD(p2), ONFAIL(TRANSFORM(rec,
+                                 SELF.result := 'Spray Fail',
+                                 SELF.msg := FAILMESSAGE
+                                )));
+#if (VERBOSE = 1)
+    sprayOut := output(c2);
+#else
+    sprayOut := output(c2, {result});
+#end
+
+
+// Replicate
+rec replicate(rec l) := TRANSFORM
+    SELF.msg := FileServices.fReplicate(
+                        LOGICALNAME := sprayOutFileName,
+                        TIMEOUT := -1,
+                        ESPSERVERIPPORT := espIpPort
+                        );
+    self.result := 'Replicate Pass';
+end;
+
+
+dst3 := NOFOLD(DATASET([{'', ''}], rec));
+p3 := NOTHOR(PROJECT(NOFOLD(dst3), replicate(LEFT)));
+c3 := CATCH(NOFOLD(p3), ONFAIL(TRANSFORM(rec,
+                                 SELF.result := 'Replicate Fail',
+                                 SELF.msg := FAILMESSAGE
+                                )));
+#if (VERBOSE = 1)
+    replicateOut := output(c3);
+#else
+    replicateOut := output(c3, {result});
+#end
+
+SEQUENTIAL(
+
+  setup,
+  desprayOut,
+  sprayOut,
+  replicateOut,
+
+  // Clean-up
+  FileServices.DeleteExternalFile('.', desprayOutFileName),
+  FileServices.DeleteLogicalFile(sprayPrepFileName),
+  FileServices.DeleteLogicalFile(sprayOutFileName),
+
+);