浏览代码

EclAgent through pipe behaving erratically

Fixes gh-269. Due to a race condition between the reading and writing threads
at either side of the through PIPE activity, erratic results could be returned.
These would include spurious reports of pipe program failing with error code
254, or incomplete results.

This patch abandons the original unreliable EclAgent implementation of the PIPE
activity family and uses instead the implementation copied from Roxie, adapted
for the eclagent execution environment. An error in the Roxie implementation
(which also would have affected the original hthor code) concerning the handling
of REPEAT,GROUP pipes is also corrected. The pipe8.ecl test from the regression
suite is corrected to run properly on linux. Finally, a flag is added to allow the
return code from the pipe program to be ignored (for easier testing of the cases
where empty results are returned by allowing us to use grep).

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 14 年之前
父节点
当前提交
18bf597073
共有 9 个文件被更改,包括 446 次插入483 次删除
  1. 1 0
      ecl/hql/hqlgram.y
  2. 4 0
      ecl/hqlcpp/hqlhtcpp.cpp
  3. 3 1
      ecl/hqlcpp/hqlsource.cpp
  4. 372 312
      ecl/hthor/hthor.cpp
  5. 0 144
      ecl/hthor/hthor.ipp
  6. 41 16
      roxie/ccd/ccdserver.cpp
  7. 1 0
      rtl/include/eclhelper.hpp
  8. 15 1
      system/jlib/jthread.cpp
  9. 9 9
      testing/ecl/pipe8.ecl

+ 1 - 0
ecl/hql/hqlgram.y

@@ -9270,6 +9270,7 @@ pipeOption
                             $$.setExpr(createExprAttribute(outputAtom, $3.getExpr()), $1);
                         }
     | GROUP             {   $$.setExpr(createAttribute(groupAtom)); }
+    | OPT               {   $$.setExpr(createAttribute(optAtom)); }
     ;
     
 pipeFormatOption

+ 4 - 0
ecl/hqlcpp/hqlhtcpp.cpp

@@ -10169,6 +10169,8 @@ ABoundActivity * HqlCppTranslator::doBuildActivityOutput(BuildCtx & ctx, IHqlExp
             StringBuffer flags;
             if (expr->hasProperty(repeatAtom))
                 flags.append("|TPFrecreateeachrow");
+            if (expr->hasProperty(optAtom))
+                flags.append("|TPFnofail");
             if (csvAttr)
                 flags.append("|TPFwritecsvtopipe");
             if (xmlAttr)
@@ -10973,6 +10975,8 @@ ABoundActivity * HqlCppTranslator::doBuildActivityPipeThrough(BuildCtx & ctx, IH
         flags.append("|TPFrecreateeachrow");
     if (expr->hasProperty(groupAtom))
         flags.append("|TPFgroupeachrow");
+    if (expr->hasProperty(optAtom))
+        flags.append("|TPFnofail");
 
     if (csvToPipe)
         flags.append("|TPFwritecsvtopipe");

+ 3 - 1
ecl/hqlcpp/hqlsource.cpp

@@ -2721,8 +2721,10 @@ void DiskReadBuilder::buildMembers(IHqlExpression * expr)
         }
         
         StringBuffer flags;
-        if (tableExpr->hasProperty(groupAtom))      // not supported in parser
+        if (tableExpr->hasProperty(groupAtom))      // not supported in parser?
             flags.append("|TPFgroupeachrow");
+        if (tableExpr->hasProperty(optAtom))        // not supported in parser?
+            flags.append("|TPFnofail");
 
         if (csvFromPipe)
             flags.append("|TPFreadcsvfrompipe");

+ 372 - 312
ecl/hthor/hthor.cpp

@@ -24,6 +24,7 @@
 #include "jprop.hpp"
 #include "jdebug.hpp"
 #include "jlzw.hpp"
+#include "jisem.hpp"
 #include "roxiedebug.hpp"
 #include "eclhelper.hpp"
 #include "workunit.hpp"
@@ -966,7 +967,7 @@ void throwPipeProcessError(unsigned err, char const * preposition, char const *
             char error[512];
             size32_t sz = pipe->readError(sizeof(error), error);
             if(sz && sz!=(size32_t)-1)
-            msg.append(", stderr: '").append(sz, error).append("'");
+                msg.append(", stderr: '").append(sz, error).append("'");
         }
         catch (IException *e)
         {
@@ -977,72 +978,6 @@ void throwPipeProcessError(unsigned err, char const * preposition, char const *
     throw MakeStringException(2, "%s", msg.str());
 }
 
-CHThorPipeWriteActivity::CHThorPipeWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorPipeWriteArg &_arg, ThorActivityKind _kind) : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
-{
-}
-
-void CHThorPipeWriteActivity::ready()
-{
-    CHThorActivityBase::ready();
-    pipe.setown(createPipeProcess(agent.queryAllowedPipePrograms()));
-    writer.setown(new PipeWriter(this, pipe, false, false, helper.recreateEachRow()));
-    inputMeta.set(input->queryOutputMeta());
-    rowSerializer.setown(inputMeta.createRowSerializer(agent.queryCodeContext(), activityId));
-    xformHelper.setown(createPipeWriteXformHelper(helper.getPipeFlags(), helper.queryXmlOutput(), helper.queryCsvOutput(), rowSerializer));
-    xformHelper->ready();
-    writer->ready();
-}
-
-void CHThorPipeWriteActivity::execute()
-{
-    writer->run();
-    if (helper.getSequence() >= 0)
-    {
-        WorkunitUpdate wu = agent.updateWorkUnit();
-        Owned<IWUResult> result = wu->updateResultBySequence(helper.getSequence());
-        if (result)
-        {
-            result->setResultTotalRowCount(writer->queryRecCount()); 
-            result->setResultStatus(ResultStatusCalculated);
-        }
-    }
-}
-
-void CHThorPipeWriteActivity::openPipe(const void * row, bool displayTitle)
-{
-    if(row)
-        pipeCommand.setown(helper.getNameFromRow(row));
-    else
-        pipeCommand.setown(helper.getPipeProgram());
-    if(!pipe->run(displayTitle ? "PipeWrite" : NULL, pipeCommand.get(), ".", true, false, true, 0x10000)) // 64K error buffer
-    {
-        StringBuffer msg;
-        msg.append("Could not pipe to process (").append(pipeCommand.get()).append(")");
-        agent.fail(2, msg.str());
-    }
-    xformHelper->writeHeader(pipe);
-}
-
-const void * CHThorPipeWriteActivity::nextInput(size32_t & inputSize)
-{
-    const void * ret = input->nextInGroup();
-    if(ret == NULL)
-        ret = input->nextInGroup();
-    if(ret == NULL)
-        return NULL;
-    inputSize = inputMeta.getRecordSize(ret);
-    return ret;
-}
-
-void CHThorPipeWriteActivity::closePipe()
-{
-    xformHelper->writeFooter(pipe);
-    pipe->closeInput();
-    unsigned err = pipe->wait();
-    if(err)
-        throwPipeProcessError(err, "to", pipeCommand.get(), pipe);
-}
-
 //=====================================================================================================
 
 CHThorIndexWriteActivity::CHThorIndexWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexWriteArg &_arg, ThorActivityKind _kind) : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
@@ -1327,315 +1262,440 @@ void CHThorIndexWriteActivity::buildLayoutMetadata(Owned<IPropertyTree> & metada
 
 //=====================================================================================================
 
-void PipeWriter::run()
+class CHThorPipeReadActivity : public CHThorSimpleActivityBase
 {
-    if(syncsAtStart)
-        readyForWrite.wait();
-    if(aborted)
+    IHThorPipeReadArg &helper;
+    Owned<IPipeProcess> pipe;
+    StringAttr pipeCommand;
+    Owned<IOutputRowDeserializer> rowDeserializer;
+    Owned<IReadRowStream> readTransformer;
+    bool groupSignalled;
+public:
+    CHThorPipeReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorPipeReadArg &_arg, ThorActivityKind _kind)
+        : CHThorSimpleActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
     {
-        finished = true;
-        return;
+        groupSignalled = true;
     }
-    try
-    {
-        if(!recreate)
-        {
-            owner->openPipe(NULL,true);
-            started = true;
-            if(syncsAtStart)
-                readyForRead.signal();
-        }
-        while(!aborted)
-        {
-            size32_t inputSize;
-            OwnedConstHThorRow next(owner->nextInput(inputSize));
-            if(!next)
-                break;
-            if(recreate)
-                owner->openPipe(next,reccount==0);
-            if(syncs)
-                readyForRead.signal();
-            owner->writeTranslatedText(next);
-            reccount++;
 
-            if(recreate)
-                owner->closePipe();
-            if(syncs)
-                readyForWrite.wait();
-        }
-        finished = true;
-        if(syncs && !aborted)
-            readyForRead.signal();
-        if(!recreate)
-            owner->closePipe();
-    }
-    catch (IException *)
+    virtual bool needsAllocator() const { return true; }
+
+    virtual void ready()
     {
-        finished = true;
-        if(syncs || (syncsAtStart && !started))
-            readyForRead.signal();
-        throw;
+        groupSignalled = true; // i.e. don't start with a NULL row
+        CHThorSimpleActivityBase::ready();
+        rowDeserializer.setown(rowAllocator->createRowDeserializer(agent.queryCodeContext()));
+        readTransformer.setown(createReadRowStream(rowAllocator, rowDeserializer, helper.queryXmlTransformer(), helper.queryCsvTransformer(), helper.queryXmlIteratorPath(), helper.getPipeFlags()));
+        openPipe(helper.getPipeProgram());
     }
-}
 
-void PipeWriter::abort()
-{
-    aborted = true;
-    if(!finished && (syncs || (syncsAtStart && !started)))
-        readyForWrite.signal();
-}
-
-//=====================================================================================================
+    virtual void done()
+    {
+        //Need to close the output (or read it in its entirety), otherwise we might wait forever for the
+        //program to finish
+        if (pipe)
+            pipe->closeOutput();
+        pipe.clear();
+        readTransformer->setStream(NULL);
+        CHThorSimpleActivityBase::done();
+    }
 
-PipeReader::PipeReader(IPipeProcess * _pipe, bool _syncsAtStart, bool _syncs, PipeWriter * _writer, IReadRowStream * _xformer, bool _grouped)
-    : pipe(_pipe), syncsAtStart(_syncsAtStart), syncs(_syncs), writer(_writer), xformer(_xformer), grouped(_grouped)
-{
-    started = false;
-}
+    virtual const void *nextInGroup()
+    {
+        while (!waitForPipe())
+        {
+            if (!pipe)
+                return NULL;
+            if (helper.getPipeFlags() & TPFgroupeachrow)
+            {
+                if (!groupSignalled)
+                {
+                    groupSignalled = true;
+                    return NULL;
+                }
+            }
+        }
+        const void *ret = readTransformer->next();
+        assertex(ret != NULL); // if ret can ever be NULL then we need to recode this logic
+        processed++;
+        groupSignalled = false;
+        return ret;
+    }
 
-bool PipeReader::moreInputAvailable()
-{
-    return !xformer->eos();
-}
+protected:
+    bool waitForPipe()
+    {
+        if (!pipe)
+            return false;  // done
+        if (!readTransformer->eos())
+            return true;
+        verifyPipe();
+        return false;
+    }
 
-const void *PipeReader::read()
-{
-    if(!started)
+    void openPipe(char const * cmd)
     {
-        if(syncs)
-            finished = !sync();
-        else if(syncsAtStart)
-            finished = !sync();
-        else
-            setStream();
-        started = true;
+        pipeCommand.setown(cmd);
+        pipe.setown(createPipeProcess(agent.queryAllowedPipePrograms()));
+        if(!pipe->run(NULL, cmd, ".", false, true, true, 0x10000))
+            throw MakeStringException(2, "Could not run pipe process %s", cmd);
+        Owned<ISimpleReadStream> pipeReader = pipe->getOutputStream();
+        readTransformer->setStream(pipeReader.get());
     }
-    while(!finished && !moreInputAvailable())
+
+    void verifyPipe()
     {
-        if(syncs)
+        if (pipe)
         {
-            finished = !sync();
-            if (grouped)
-                return NULL;
+            unsigned err = pipe->wait();
+            if(err && !(helper.getPipeFlags() & TPFnofail))
+                throwPipeProcessError(err, "from", pipeCommand.get(), pipe);
+            pipe.clear();
         }
-        else
-            finished = true;
     }
-    if (finished)
-        return NULL;
-    return xformer->next();
-}
+};
 
-void PipeReader::done()
-{
-    xformer->setStream(NULL);
-}
+//=====================================================================================================
 
+// Through pipe code - taken from Roxie implementation
 
-void PipeReader::setStream()
+interface IPipeRecordPullerCallback : extends IExceptionHandler
 {
-    Owned<ISimpleReadStream> pipeReader = pipe->getOutputStream();
-    xformer->setStream(pipeReader);
-}
+    virtual void processRow(const void *row) = 0;
+    virtual void processDone() = 0;
+    virtual const void *nextInput() = 0;
+};
 
-bool PipeReader::sync()
+class CPipeRecordPullerThread : public Thread
 {
-    bool ret = writer->sync();
-    setStream();
-    return ret;
-}
+protected:
+    IPipeRecordPullerCallback *helper;
+    bool eog;
 
-unsigned PipeReader::wait()
-{
-    if(started && !finished)
+public:
+    CPipeRecordPullerThread() : Thread("PipeRecordPullerThread")
+    {
+        helper = NULL;
+        eog = false;
+    }
+
+    void setInput(IPipeRecordPullerCallback *_helper)
+    {
+        helper = _helper;
+    }
+
+    virtual int run()
     {
         try
         {
-            moreInputAvailable();
+            loop
+            {
+                const void * row = helper->nextInput();
+                if (row)
+                {
+                    eog = false;
+                    helper->processRow(row);
+                }
+                else if (!eog)
+                {
+                    eog = true;
+                }
+                else
+                {
+                    break;
+                }
+            }
+            helper->processDone();
+        }
+        catch (IException *e)
+        {
+            helper->fireException(e);
         }
-        catch(IException * e)
+        catch (...)
         {
-            e->Release();
+            helper->fireException(MakeStringException(2, "Unexpected exception caught in PipeRecordPullerThread::run"));
         }
+        return 0;
     }
-    unsigned ret = pipe->wait();
-    //Only return the error code if we've previously finished, otherwise the 
-    if (finished)
-        return ret;
-    return 0;
-}
+};
 
-unsigned PipeReader::tryWait(unsigned timeoutms)
+class CHThorPipeThroughActivity : public CHThorSimpleActivityBase, implements IPipeRecordPullerCallback
 {
-    bool timedout;
-    unsigned err = pipe->wait(timeoutms, timedout);
-    if(timedout)
-        return 0;
-    else
-        return err;
-}
+    IHThorPipeThroughArg &helper;
+    CPipeRecordPullerThread puller;
+    Owned<IPipeProcess> pipe;
+    StringAttr pipeCommand;
+    InterruptableSemaphore pipeVerified;
+    InterruptableSemaphore pipeOpened;
+    CachedOutputMetaData inputMeta;
+    Owned<IOutputRowSerializer> rowSerializer;
+    Owned<IOutputRowDeserializer> rowDeserializer;
+    Owned<IPipeWriteXformHelper> writeTransformer;
+    Owned<IReadRowStream> readTransformer;
+    bool firstRead;
+    bool recreate;
+    bool inputExhausted;
+    bool groupSignalled;
 
+public:
+    CHThorPipeThroughActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorPipeThroughArg &_arg, ThorActivityKind _kind)
+        : CHThorSimpleActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
+    {
+        recreate = helper.recreateEachRow();
+        groupSignalled = true;
+        firstRead = false;
+        inputExhausted = false;
+        puller.setInput(this);
+    }
 
-//=====================================================================================================
+    virtual void ready()
+    {
+        CHThorSimpleActivityBase::ready();
+        // From the create() in roxie
+
+        inputMeta.set(input->queryOutputMeta());
+        rowSerializer.setown(inputMeta.createRowSerializer(agent.queryCodeContext(), activityId));
+        rowDeserializer.setown(rowAllocator->createRowDeserializer(agent.queryCodeContext()));
+        writeTransformer.setown(createPipeWriteXformHelper(helper.getPipeFlags(), helper.queryXmlOutput(), helper.queryCsvOutput(), rowSerializer));
+
+        // From the start() in roxie
+        firstRead = true;
+        inputExhausted = false;
+        groupSignalled = true; // i.e. don't start with a NULL row
+        pipeVerified.reinit();
+        pipeOpened.reinit();
+        writeTransformer->ready();
+
+        if (!readTransformer)
+            readTransformer.setown(createReadRowStream(rowAllocator, rowDeserializer, helper.queryXmlTransformer(), helper.queryCsvTransformer(), helper.queryXmlIteratorPath(), helper.getPipeFlags()));
+        if(!recreate)
+            openPipe(helper.getPipeProgram());
+        puller.start();
+    }
 
-CHThorPipeReadActivity::CHThorPipeReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorPipeReadArg &_arg, ThorActivityKind _kind)
-    : CHThorSimpleActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
-{
-}
+    void done()
+    {
+        //Need to close the output (or read it in its entirety), otherwise we might wait forever for the
+        //program to finish
+        if (pipe)
+            pipe->closeOutput();
+        pipeVerified.interrupt(NULL);
+        pipeOpened.interrupt(NULL);
+        puller.join();
+        CHThorSimpleActivityBase::done();
+        pipe.clear();
+        readTransformer->setStream(NULL);
+    }
 
-void CHThorPipeReadActivity::ready()
-{
-    CHThorSimpleActivityBase::ready();
-    rowDeserializer.setown(rowAllocator->createRowDeserializer(agent.queryCodeContext()));
-    pipe.setown(createPipeProcess(agent.queryAllowedPipePrograms()));
+    virtual bool needsAllocator() const { return true; }
 
-    unsigned pipeFlags = helper.getPipeFlags();
-    Owned<IReadRowStream> readXformer = createReadRowStream(rowAllocator, rowDeserializer, helper.queryXmlTransformer(), helper.queryCsvTransformer(), helper.queryXmlIteratorPath(), pipeFlags);
-    reader.setown(new PipeReader(pipe, false, false, NULL, readXformer, (pipeFlags & TPFgroupeachrow) != 0));
-    
-    if(!pipe->run("PipeRead", helper.getPipeProgram(), ".", false, true, true, 0x10000)) // 64K error buffer
+    virtual const void *nextInGroup()
     {
-        StringBuffer msg;
-        msg.append("Could not pipe from process (").append(helper.getPipeProgram()).append(")");
-        agent.fail(2, msg.str());
+        while (!waitForPipe())
+        {
+            if (!pipe)
+                return NULL;
+            if (helper.getPipeFlags() & TPFgroupeachrow)
+            {
+                if (!groupSignalled)
+                {
+                    groupSignalled = true;
+                    return NULL;
+                }
+            }
+        }
+        const void *ret = readTransformer->next();
+        assertex(ret != NULL); // if ret can ever be NULL then we need to recode this logic
+        processed++;
+        groupSignalled = false;
+        return ret;
     }
-    reader->ready();
-}
 
-void CHThorPipeReadActivity::done()
-{
-    //Need to close the output (or read it in its entirety), otherwise we might wait forever for the
-    //program to finish
-    pipe->closeOutput();
-    unsigned err = reader->wait();
-    if(err)
-        throwPipeProcessError(err, "from", helper.getPipeProgram(), pipe);
-    reader->done();
-    CHThorSimpleActivityBase::done();
-}
+    virtual bool isGrouped()
+    {
+        return outputMeta.isGrouped();
+    }
 
+    virtual void processRow(const void *row)
+    {
+        // called from puller thread
+        if(recreate)
+            openPipe(helper.getNameFromRow(row));
+        writeTransformer->writeTranslatedText(row, pipe);
+        ReleaseRoxieRow(row);
+        if(recreate)
+        {
+            closePipe();
+            pipeVerified.wait();
+        }
+    }
 
-const void * CHThorPipeReadActivity::nextInGroup()
-{
-    const void * row;
-    try
+    virtual void processDone()
     {
-        row = reader->read();
+        // called from puller thread
+        if(recreate)
+        {
+            inputExhausted = true;
+            pipeOpened.signal();
+        }
+        else
+        {
+            closePipe();
+            pipeVerified.wait();
+        }
     }
-    catch(IException * e)
+
+    virtual const void *nextInput()
     {
-        throw makeWrappedException(e);
+        return input->nextInGroup();
     }
-    if(!row)
-        return NULL;
-    processed++;
-    return row;
-}
 
-//=====================================================================================================
+    virtual bool fireException(IException *e)
+    {
+        inputExhausted = true;
+        pipeOpened.interrupt(LINK(e));
+        pipeVerified.interrupt(e);
+        return true;
+    }
 
-CHThorPipeThroughActivity::CHThorPipeThroughActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorPipeThroughArg &_arg, ThorActivityKind _kind) : CHThorSimpleActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
-{
-}
+private:
+    bool waitForPipe()
+    {
+        if (firstRead)
+        {
+            pipeOpened.wait();
+            firstRead = false;
+        }
+        if (!pipe)
+            return false;  // done
+        if (!readTransformer->eos())
+            return true;
+        verifyPipe();
+        if (recreate && !inputExhausted)
+            pipeOpened.wait();
+        return false;
+    }
 
-void CHThorPipeThroughActivity::ready()
-{
-    CHThorSimpleActivityBase::ready();
-    bool recreate = helper.recreateEachRow();
-    unsigned pipeFlags = helper.getPipeFlags();
-    pipe.setown(createPipeProcess(agent.queryAllowedPipePrograms()));
-    writer.setown(new PipeWriter(this, pipe, true, recreate, recreate));
-    writerThread.setown(new WriterThread(writer, this));
+    void openPipe(char const * cmd)
+    {
+        pipeCommand.setown(cmd);
+        pipe.setown(createPipeProcess(agent.queryAllowedPipePrograms()));
+        if(!pipe->run(NULL, cmd, ".", true, true, true, 0x10000))
+            throw MakeStringException(2, "Could not run pipe process %s", cmd);
+        writeTransformer->writeHeader(pipe);
+        Owned<ISimpleReadStream> pipeReader = pipe->getOutputStream();
+        readTransformer->setStream(pipeReader.get());
+        pipeOpened.signal();
+    }
 
-    inputMeta.set(input->queryOutputMeta());
-    rowSerializer.setown(inputMeta.createRowSerializer(agent.queryCodeContext(), activityId));
-    rowDeserializer.setown(rowAllocator->createRowDeserializer(agent.queryCodeContext()));  
-
-    Owned<IReadRowStream> readXformer = createReadRowStream(rowAllocator, rowDeserializer, helper.queryXmlTransformer(), helper.queryCsvTransformer(), helper.queryXmlIteratorPath(), pipeFlags);
-    reader.setown(new PipeReader(pipe, true, recreate, writer, readXformer, (pipeFlags & TPFgroupeachrow) != 0));
-    xformHelper.setown(createPipeWriteXformHelper(helper.getPipeFlags(), helper.queryXmlOutput(), helper.queryCsvOutput(), rowSerializer));
-    xformHelper->ready();
-    writer->ready();
-    reader->ready();
-    writerThread->start();
-}
-
-void CHThorPipeThroughActivity::done()
-{
-    checkThreadException();  // We don't want to report exceptions that result from us closing the reader before consuming all data. Checking for exceptions before closing helps avoid this
-    writer->abort();
-    pipe->closeOutput();
-    unsigned err = reader->wait();
-    if(err)
-        throwPipeProcessError(err, "from", helper.getPipeProgram(), pipe);
-    writerThread->join();
-    reader->done();
-    CHThorSimpleActivityBase::done();
-}
+    void closePipe()
+    {
+        writeTransformer->writeFooter(pipe);
+        pipe->closeInput();
+    }
 
+    void verifyPipe()
+    {
+        if (pipe)
+        {
+            unsigned err = pipe->wait();
+            if(err && !(helper.getPipeFlags() & TPFnofail))
+                throwPipeProcessError(err, "through", pipeCommand.get(), pipe);
+            pipe.clear();
+            pipeVerified.signal();
+        }
+    }
+};
 
-const void * CHThorPipeThroughActivity::nextInGroup()
+class CHThorPipeWriteActivity : public CHThorActivityBase
 {
-    checkThreadException();
-    const void * row = reader->read();
-    if(!row)
-        return NULL;
-    processed++;
-    return row;
-}
+    IHThorPipeWriteArg &helper;
+    Owned<IPipeProcess> pipe;
+    StringAttr pipeCommand;
+    CachedOutputMetaData inputMeta;
+    Owned<IOutputRowSerializer> rowSerializer;
+    Owned<IPipeWriteXformHelper> writeTransformer;
+    bool firstRead;
+    bool recreate;
+    bool inputExhausted;
+public:
+    IMPLEMENT_SINKACTIVITY;
 
-void CHThorPipeThroughActivity::openPipe(const void * row, bool displayTitle)
-{
-    if(row)
-        pipeCommand.setown(helper.getNameFromRow(row));
-    else
-        pipeCommand.setown(helper.getPipeProgram());
-    if(!pipe->run(displayTitle ? "PipeThrough" : NULL, pipeCommand.get(), ".", true, true, true, 0x10000)) // 64K error buffer
+    CHThorPipeWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorPipeWriteArg &_arg, ThorActivityKind _kind)
+        : CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
     {
-        StringBuffer msg;
-        msg.append("Could not pipe through process (").append(pipeCommand.get()).append(")");
-        agent.fail(2, msg.str());
+        recreate = helper.recreateEachRow();
+        firstRead = false;
+        inputExhausted = false;
     }
-    xformHelper->writeHeader(pipe);
-}
 
-const void * CHThorPipeThroughActivity::nextInput(size32_t & inputSize)
-{
-    const void * ret = input->nextInGroup();
-    if(ret == NULL)
-        ret = input->nextInGroup();
-    if(ret == NULL)
-        return NULL;
-    inputSize = inputMeta.getRecordSize(ret);
-    return ret;
-}
+    virtual bool needsAllocator() const { return true; }
 
-void CHThorPipeThroughActivity::closePipe()
-{
-    xformHelper->writeFooter(pipe);
-    pipe->closeInput();
-    unsigned err = pipe->wait();
-    if(err)
-        throwPipeProcessError(err, "through", pipeCommand.get(), pipe);
-}
+    virtual void ready()
+    {
+        CHThorActivityBase::ready();
+        inputMeta.set(input->queryOutputMeta());
+        rowSerializer.setown(inputMeta.createRowSerializer(agent.queryCodeContext(), activityId));
+        writeTransformer.setown(createPipeWriteXformHelper(helper.getPipeFlags(), helper.queryXmlOutput(), helper.queryCsvOutput(), rowSerializer));
 
-bool CHThorPipeThroughActivity::isGrouped()
-{ 
-    return outputMeta.isGrouped();
-}
+        firstRead = true;
+        inputExhausted = false;
+        writeTransformer->ready();
+        if(!recreate)
+            openPipe(helper.getPipeProgram());
+    }
 
-int CHThorPipeThroughActivity::WriterThread::run()
-{
-    try
+    virtual void execute()
+    {
+        loop
+        {
+            const void *row = input->nextInGroup();
+            if (!row)
+            {
+                row = input->nextInGroup();
+                if (!row)
+                    break;
+            }
+            processed++;
+            if(recreate)
+                openPipe(helper.getNameFromRow(row));
+            writeTransformer->writeTranslatedText(row, pipe);
+            ReleaseRoxieRow(row);
+            if(recreate)
+                closePipe();
+        }
+        closePipe();
+        if (helper.getSequence() >= 0)
+        {
+            WorkunitUpdate wu = agent.updateWorkUnit();
+            Owned<IWUResult> result = wu->updateResultBySequence(helper.getSequence());
+            if (result)
+            {
+                result->setResultTotalRowCount(processed);
+                result->setResultStatus(ResultStatusCalculated);
+            }
+        }
+    }
+
+private:
+    void openPipe(char const * cmd)
     {
-        writer->run();
+        pipeCommand.setown(cmd);
+        pipe.setown(createPipeProcess(agent.queryAllowedPipePrograms()));
+        if(!pipe->run(NULL, cmd, ".", true, false, true, 0x10000))
+            throw MakeStringException(2, "Could not run pipe process %s", cmd);
+        writeTransformer->writeHeader(pipe);
     }
-    catch(IException * e)
+
+    void closePipe()
     {
-        owner->noteThreadException(e);
+        writeTransformer->writeFooter(pipe);
+        pipe->closeInput();
+        unsigned err = pipe->wait();
+        if(err && !(helper.getPipeFlags() & TPFnofail))
+            throwPipeProcessError(err, "to", pipeCommand.get(), pipe);
+        pipe.clear();
     }
-    return 0;
-}
+};
 
 //=====================================================================================================
 

+ 0 - 144
ecl/hthor/hthor.ipp

@@ -527,150 +527,6 @@ public:
     virtual void writeTranslatedText(const void * row) = 0;
 };
 
-class PipeWriter : public CInterface
-{
-public:
-    PipeWriter(IPipeWriteOwner * _owner, IPipeProcess * _pipe, bool _syncsAtStart, bool _syncs, bool _recreate) 
-        : owner(_owner), pipe(_pipe), syncsAtStart(_syncsAtStart), syncs(_syncs), recreate(_recreate) {}
-    void ready() { reccount = 0; started = false; finished = false; aborted = false; }
-    void run();
-    void abort();
-    bool sync() { readyForWrite.signal(); readyForRead.wait(); return !finished; }
-    unsigned __int64 queryRecCount() const { return reccount; }
-
-private:
-    IPipeWriteOwner * owner;
-    IPipeProcess * pipe;
-    bool syncsAtStart;
-    bool syncs;
-    bool recreate;
-    Semaphore readyForWrite;
-    Semaphore readyForRead;
-    unsigned reccount;
-    bool started;
-    bool finished;
-    bool aborted;
-};
-
-class PipeReader : public CInterface
-{
-public:
-    PipeReader(IPipeProcess * _pipe, bool _syncsAtStart, bool _syncs, PipeWriter * _writer, IReadRowStream * _xformer, bool _grouped);
-
-    void done();
-    void ready() { started = false; finished = false; }
-    const void * read();
-    unsigned wait();
-    unsigned tryWait(unsigned timeoutms);
-
-protected:
-    bool moreInputAvailable();
-    void setStream();
-    bool sync();
-
-private:
-    IPipeProcess * pipe;
-    CThorStreamDeserializerSource rowSource;
-    StringAttr command;
-    PipeWriter * writer;
-    Linked<IReadRowStream> xformer;
-    bool started;
-    bool finished;
-    bool syncsAtStart;
-    bool syncs;
-    bool grouped;
-};
-
-class CHThorPipeWriteActivity : public CHThorActivityBase, implements IPipeWriteOwner
-{
-    IHThorPipeWriteArg &helper;
-    Owned<IPipeProcess> pipe;
-    Owned<IOutputRowSerializer> rowSerializer;
-    CachedOutputMetaData inputMeta;
-    StringAttr pipeCommand;
-    Owned<PipeWriter> writer;
-    Owned<IPipeWriteXformHelper> xformHelper;
-
-public:
-    IMPLEMENT_SINKACTIVITY;
-
-    CHThorPipeWriteActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorPipeWriteArg &_arg, ThorActivityKind _kind);
-    virtual void ready();
-    virtual void execute();
-
-    //interface IPipeWriteOwner
-    virtual void openPipe(const void * row, bool displayTitle);
-    virtual const void * nextInput(size32_t & inputSize);
-    virtual void closePipe();
-    virtual void writeTranslatedText(const void * row) { xformHelper->writeTranslatedText(row, pipe); }
-};
-
-class CHThorPipeReadActivity : public CHThorSimpleActivityBase
-{
-    IHThorPipeReadArg &helper;
-    Owned<IPipeProcess> pipe;
-    Owned<PipeReader> reader;
-    Owned<IOutputRowDeserializer> rowDeserializer;      
-    Owned<IXmlToRowTransformer> xmlTransformer;
-    Owned<ICsvToRowTransformer> csvTransformer;
-
-public:
-    CHThorPipeReadActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorPipeReadArg &_arg, ThorActivityKind _kind);
-    virtual void done();
-    virtual void ready();
-    virtual bool needsAllocator() const { return true; }    
-
-    //interface IHThorInput
-    virtual const void *nextInGroup();
-};
-
-class CHThorPipeThroughActivity : public CHThorSimpleActivityBase, implements IPipeWriteOwner
-{
-private:
-    class WriterThread : public Thread
-    {
-    public:
-        WriterThread(PipeWriter * _writer, CHThorPipeThroughActivity * _owner) : writer(_writer), owner(_owner) {}
-        int run();
-    private:
-        PipeWriter * writer;
-        CHThorPipeThroughActivity * owner;
-    };
-
-    IHThorPipeThroughArg &helper;
-    Owned<IPipeProcess> pipe;
-    StringAttr pipeCommand;
-    Owned<PipeWriter> writer;
-    Owned<WriterThread> writerThread;
-    Owned<PipeReader> reader;
-    CachedOutputMetaData inputMeta;
-    Owned<IException> threadException;
-    CriticalSection threadExceptionCrit;
-    Owned<IOutputRowSerializer> rowSerializer;
-    Owned<IOutputRowDeserializer> rowDeserializer;
-    Owned<IPipeWriteXformHelper> xformHelper;
-
-    friend class WriterThread;
-    void noteThreadException(IException * e) { CriticalBlock block(threadExceptionCrit); if(!threadException) threadException.setown(e); }
-    void checkThreadException() { CriticalBlock block(threadExceptionCrit); if(threadException) throw threadException.getClear(); }
-
-public:
-    CHThorPipeThroughActivity(IAgentContext &agent, unsigned _activityId, unsigned _subgraphId, IHThorPipeThroughArg &_arg, ThorActivityKind _kind);
-    virtual void done();
-    virtual void ready();
-    virtual bool needsAllocator() const { return true; }    
-
-    //interface IHThorInput
-    virtual const void *nextInGroup();
-    virtual bool isGrouped();
-
-    //interface IPipeWriteOwner
-    virtual void openPipe(const void * row, bool displayTitle);
-    virtual const void * nextInput(size32_t & inputSize);
-    virtual void closePipe();
-    virtual void writeTranslatedText(const void * row) { xformHelper->writeTranslatedText(row, pipe); }
-};
-
 class CHThorIterateActivity : public CHThorSimpleActivityBase
 {
     IHThorIterateArg &helper;

+ 41 - 16
roxie/ccd/ccdserver.cpp

@@ -1537,6 +1537,7 @@ public:
     {
         input = NULL;
         helper = NULL;
+        eof = eog = FALSE;
     }
 
     inline unsigned __int64 queryTotalCycles() const
@@ -1655,7 +1656,7 @@ public:
         {
             const void * row;
             {
-                CriticalBlock c(crit);
+                CriticalBlock c(crit); // See comments in stop for why this is needed
                 row = input->nextInGroup();
             }
             if (row)
@@ -8532,10 +8533,12 @@ class CRoxieServerPipeReadActivity : public CRoxieServerActivity
     StringAttr pipeCommand;
     Owned<IOutputRowDeserializer> rowDeserializer;
     Owned<IReadRowStream> readTransformer;
+    bool groupSignalled;
 public:
     CRoxieServerPipeReadActivity(const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
         : CRoxieServerActivity(_factory, _probeManager), helper((IHThorPipeReadArg &)basehelper)
     {
+        groupSignalled = true;
     }
 
     virtual bool needsAllocator() const { return true; }
@@ -8548,6 +8551,7 @@ public:
 
     virtual void start(unsigned parentExtractSize, const byte *parentExtract, bool paused)
     {
+        groupSignalled = true; // i.e. don't start with a NULL row
         CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
         if (!readTransformer)
             readTransformer.setown(createReadRowStream(rowAllocator, rowDeserializer, helper.queryXmlTransformer(), helper.queryCsvTransformer(), helper.queryXmlIteratorPath(), helper.getPipeFlags()));
@@ -8564,11 +8568,23 @@ public:
     virtual const void *nextInGroup()
     {
         ActivityTimer t(totalCycles, timeActivities, ctx->queryDebugContext());
-        if (!waitForPipe())
-            return NULL;
+        while (!waitForPipe())
+        {
+            if (!pipe)
+                return NULL;
+            if (helper.getPipeFlags() & TPFgroupeachrow)
+            {
+                if (!groupSignalled)
+                {
+                    groupSignalled = true;
+                    return NULL;
+                }
+            }
+        }
         const void *ret = readTransformer->next();
-        if (ret)
-            processed++;
+        assertex(ret != NULL); // if ret can ever be NULL then we need to recode this logic
+        processed++;
+        groupSignalled = false;
         return ret;
     }
 
@@ -8593,17 +8609,12 @@ protected:
         readTransformer->setStream(pipeReader.get());
     }
 
-    void closePipe()
-    {
-        pipe->closeInput();
-    }
-
     void verifyPipe()
     {
         if (pipe)
         {
             unsigned err = pipe->wait();
-            if(err)
+            if(err && !(helper.getPipeFlags() & TPFnofail))
                 throw MakeStringException(ROXIE_PIPE_ERROR, "Pipe process %s returned error %d", pipeCommand.get(), err);
             pipe.clear();
         }
@@ -8626,6 +8637,7 @@ class CRoxieServerPipeThroughActivity : public CRoxieServerActivity, implements
     bool firstRead;
     bool recreate;
     bool inputExhausted;
+    bool groupSignalled;
 
 public:
 
@@ -8633,6 +8645,7 @@ public:
         : CRoxieServerActivity(_factory, _probeManager), helper((IHThorPipeThroughArg &)basehelper), puller(false)
     {
         recreate = helper.recreateEachRow();
+        groupSignalled = true;
         firstRead = false;
         inputExhausted = false;
     }
@@ -8651,6 +8664,7 @@ public:
     {
         firstRead = true;
         inputExhausted = false;
+        groupSignalled = true; // i.e. don't start with a NULL row
         pipeVerified.reinit();
         pipeOpened.reinit();
         writeTransformer->ready();
@@ -8690,11 +8704,22 @@ public:
     {
         ActivityTimer t(totalCycles, timeActivities, ctx->queryDebugContext());
         while (!waitForPipe())
-            if ((helper.getPipeFlags() & TPFgroupeachrow) || !pipe)
+        {
+            if (!pipe)
                 return NULL;
+            if (helper.getPipeFlags() & TPFgroupeachrow)
+            {
+                if (!groupSignalled)
+                {
+                    groupSignalled = true;
+                    return NULL;
+                }
+            }
+        }
         const void *ret = readTransformer->next();
-        if (ret)
-            processed++;
+        assertex(ret != NULL); // if ret can ever be NULL then we need to recode this logic
+        processed++;
+        groupSignalled = false;
         return ret;
     }
 
@@ -8784,7 +8809,7 @@ private:
         if (pipe)
         {
             unsigned err = pipe->wait();
-            if(err)
+            if(err && !(helper.getPipeFlags() & TPFnofail))
                 throw MakeStringException(ROXIE_PIPE_ERROR, "Pipe process %s returned error %d", pipeCommand.get(), err);
             pipe.clear();
             pipeVerified.signal();
@@ -8876,7 +8901,7 @@ private:
         writeTransformer->writeFooter(pipe);
         pipe->closeInput();
         unsigned err = pipe->wait();
-        if(err)
+        if(err && !(helper.getPipeFlags() & TPFnofail))
             throw MakeStringException(ROXIE_PIPE_ERROR, "Pipe process %s returned error %d", pipeCommand.get(), err);
         pipe.clear();
     }

+ 1 - 0
rtl/include/eclhelper.hpp

@@ -1964,6 +1964,7 @@ enum
 
     TPFrecreateeachrow      = 0x0100,
     TPFgroupeachrow         = 0x0200,
+    TPFnofail               = 0x0400,
 };
 
 

+ 15 - 1
system/jlib/jthread.cpp

@@ -1450,7 +1450,21 @@ static unsigned dowaitpid(HANDLE pid, int mode)
         int stat=-1;
         int ret = waitpid(pid, &stat, mode);
         if (ret>0) 
-            return WIFSIGNALED(stat)?254:WEXITSTATUS(stat);
+        {
+            if (WIFEXITED(stat))
+                return WEXITSTATUS(stat);
+            else if (WIFSIGNALED(stat))
+            {
+                ERRLOG("Program was terminated by signal %u", (unsigned) WTERMSIG(stat));
+                if (WTERMSIG(stat)==SIGPIPE)
+                    return 0;
+                return 254;
+            }
+            else
+            {
+                return 254;
+            }
+        }
         if (ret==0)
             break;
         int err = errno;

+ 9 - 9
testing/ecl/pipe8.ecl

@@ -49,9 +49,9 @@ tempname2 := getTempFilename3() : INDEPENDENT;
 tempname3 := getTempFilename4() : INDEPENDENT;
 
 #IF (__OS__ = 'windows')
-catCmd := 'cmd /C cat';
+  STRING catCmd(STRING filename) := 'cmd /C cat > ' + filename;
 #ELSE
-catCmd := 'cat';
+  STRING catCmd(STRING filename) := 'bash -c "cat > ' + filename + '"';
 #END
 
 d1 := DATASET([{1,[{'Gavin'},{'John'}]},{2,[{'Steve'},{'Steve'},{'Steve'}]},{3,[]}], houseRecord);
@@ -62,9 +62,9 @@ d3 := DATASET([' <Dataset><Row>Line3</Row></Dataset>', '  <Dataset><Row>Middle</
 
 d4 := dataset([{ 'Hello there'}, {'what a nice day'}, {'1234'}], { string line}) : stored('nofold');
 
-p1 := PIPE(d1, catCmd, houseRecord);
-p2 := PIPE(d1, catCmd, houseRecord, xml(noroot), output(xml(noroot)));
-p3 := PIPE(d1, catCmd, { STRING line }, csv, output(xml(noroot)));
+p1 := PIPE(d1, 'cat', houseRecord);
+p2 := PIPE(d1, 'cat', houseRecord, xml(noroot), output(xml(noroot)));
+p3 := PIPE(d1, 'cat', { STRING line }, csv, output(xml(noroot)));
 
 OUTPUT(p1);
 OUTPUT(p2);
@@ -74,13 +74,13 @@ csvRec := { STRING lout; };
 
 SEQUENTIAL(
  PARALLEL(
-  OUTPUT(d1,,PIPE(catCmd + '     > '+tempname1)),
-  OUTPUT(d4,,PIPE(catCmd + '     > '+tempname2, csv)),
-  OUTPUT(d1,,PIPE(catCmd + '     > '+tempname3, xml(noroot)))
+  OUTPUT(d1,,PIPE(catCmd(tempname1))),
+  OUTPUT(d4,,PIPE(catCmd(tempname2), csv)),
+  OUTPUT(d1,,PIPE(catCmd(tempname3), xml(noroot)))
  ),
  PARALLEL(
   OUTPUT(PIPE('sort ' + tempname2, csvRec, csv)),
-  OUTPUT(PIPE(catCmd + ' ' + tempname3, houseRecord, xml('Dataset/Row')))
+  OUTPUT(PIPE('cat ' + tempname3, houseRecord, xml('Dataset/Row')))
  )
 );