浏览代码

HPCC-22081 Improve PIPE error reporting.

Capture errno of child process and report exception.

Signed-off-by: Jake Smith <jake.smith@lexisnexisrisk.com>
Jake Smith 6 年之前
父节点
当前提交
1cc208d343

+ 86 - 32
ecl/hthor/hthor.cpp

@@ -1015,7 +1015,11 @@ void CHThorXmlWriteActivity::setFormat(IFileDescriptor * desc)
 void throwPipeProcessError(unsigned err, char const * preposition, char const * program, IPipeProcess * pipe)
 {
     StringBuffer msg;
-    msg.append("Error piping ").append(preposition).append(" (").append(program).append("): process failed with code ").append(err);
+    msg.append("Error piping ").append(preposition).append(" (").append(program).append("): ");
+    if (START_FAILURE == err) // PIPE process didn't start at all, START_FAILURE is our own error code
+        msg.append("process failed to start");
+    else
+        msg.append("process failed with code ").append(err);
     if(pipe->hasError())
     {
         try
@@ -1412,7 +1416,10 @@ protected:
         pipeCommand.setown(cmd);
         pipe.setown(createPipeProcess(agent.queryAllowedPipePrograms()));
         if(!pipe->run(NULL, cmd, ".", false, true, true, 0x10000))
-            throw MakeStringException(2, "Could not run pipe process %s", cmd);
+        {
+            // NB: pipe->run can't rely on the child process failing fast enough to return false here, failure picked up later with stderr context.
+            WARNLOG(2, "Could not run pipe process %s", cmd);
+        }
         Owned<ISimpleReadStream> pipeReader = pipe->getOutputStream();
         readTransformer->setStream(pipeReader.get());
     }
@@ -1601,7 +1608,15 @@ public:
         // called from puller thread
         if(recreate)
             openPipe(helper.getNameFromRow(row));
-        writeTransformer->writeTranslatedText(row, pipe);
+        try
+        {
+            writeTransformer->writeTranslatedText(row, pipe);
+        }
+        catch (IException *e)
+        {
+            ReleaseRoxieRow(row);
+            throw;
+        }
         ReleaseRoxieRow(row);
         if(recreate)
         {
@@ -1641,28 +1656,42 @@ public:
 private:
     bool waitForPipe()
     {
-        if (firstRead)
+        Owned<IPipeProcessException> pipeException;
+        try
         {
-            pipeOpened.wait();
-            firstRead = false;
+            if (firstRead)
+            {
+                pipeOpened.wait();
+                firstRead = false;
+            }
+            if (!pipe)
+                return false;  // done
+            if (!readTransformer->eos())
+                return true;
+        }
+        catch (IPipeProcessException *e)
+        {
+            pipeException.setown(e);
         }
-        if (!pipe)
-            return false;  // done
-        if (!readTransformer->eos())
-            return true;
         verifyPipe();
+        if (pipeException) // NB: verifyPipe may throw error based on pipe prog. output 1st.
+            throw pipeException.getClear();
         if (recreate && !inputExhausted)
             pipeOpened.wait();
         return false;
     }
-
     void openPipe(char const * cmd)
     {
         pipeCommand.setown(cmd);
         pipe.setown(createPipeProcess(agent.queryAllowedPipePrograms()));
         if(!pipe->run(NULL, cmd, ".", true, true, true, 0x10000))
-            throw MakeStringException(2, "Could not run pipe process %s", cmd);
-        writeTransformer->writeHeader(pipe);
+        {
+            // NB: pipe->run can't rely on the child process failing fast enough to return false here, failure picked up later with stderr context.
+            WARNLOG(2, "Could not run pipe process %s", cmd);
+        }
+        else
+            writeTransformer->writeHeader(pipe);
+
         Owned<ISimpleReadStream> pipeReader = pipe->getOutputStream();
         readTransformer->setStream(pipeReader.get());
         pipeOpened.signal();
@@ -1730,24 +1759,38 @@ public:
 
     virtual void execute()
     {
-        for (;;)
+        Owned<IPipeProcessException> pipeException;
+        try
         {
-            const void *row = input->nextRow();
-            if (!row)
+            for (;;)
             {
-                row = input->nextRow();
+                OwnedConstRoxieRow row(input->nextRow());
                 if (!row)
-                    break;
+                {
+                    row.setown(input->nextRow());
+                    if (!row)
+                        break;
+                }
+                processed++;
+                if (recreate)
+                    openPipe(helper.getNameFromRow(row));
+                writeTransformer->writeTranslatedText(row, pipe);
+                if (recreate)
+                {
+                    closePipe();
+                    verifyPipe();
+                }
             }
-            processed++;
-            if(recreate)
-                openPipe(helper.getNameFromRow(row));
-            writeTransformer->writeTranslatedText(row, pipe);
-            ReleaseRoxieRow(row);
-            if(recreate)
+            if (!recreate)
                 closePipe();
         }
-        closePipe();
+        catch (IPipeProcessException *e)
+        {
+            pipeException.setown(e);
+        }
+        verifyPipe();
+        if (pipeException) // NB: verifyPipe may throw error based on pipe prog. output 1st.
+            throw pipeException.getClear();
         if (helper.getSequence() >= 0)
         {
             WorkunitUpdate wu = agent.updateWorkUnit();
@@ -1765,19 +1808,30 @@ private:
     {
         pipeCommand.setown(cmd);
         pipe.setown(createPipeProcess(agent.queryAllowedPipePrograms()));
-        if(!pipe->run(NULL, cmd, ".", true, false, true, 0x10000))
-            throw MakeStringException(2, "Could not run pipe process %s", cmd);
-        writeTransformer->writeHeader(pipe);
+        if (!pipe->run(NULL, cmd, ".", true, false, true, 0x10000))
+        {
+            // NB: pipe->run can't rely on the child process failing fast enough to return false here, failure picked up later with stderr context.
+            WARNLOG(2, "Could not run pipe process %s", cmd);
+        }
+        else
+            writeTransformer->writeHeader(pipe);
     }
 
     void closePipe()
     {
         writeTransformer->writeFooter(pipe);
         pipe->closeInput();
-        unsigned err = pipe->wait();
-        if(err && !(helper.getPipeFlags() & TPFnofail))
-            throwPipeProcessError(err, "to", pipeCommand.get(), pipe);
-        pipe.clear();
+    }
+
+    void verifyPipe()
+    {
+        if (pipe)
+        {
+            unsigned err = pipe->wait();
+            if(err && !(helper.getPipeFlags() & TPFnofail))
+                throwPipeProcessError(err, "to", pipeCommand.get(), pipe);
+            pipe.clear();
+        }
     }
 };
 

+ 69 - 27
roxie/ccd/ccdserver.cpp

@@ -9464,7 +9464,10 @@ protected:
         pipeCommand.setown(cmd);
         pipe.setown(createPipeProcess());
         if(!pipe->run(NULL, cmd, ".", false, true, true, 0x10000))
-            throw MakeStringException(ROXIE_PIPE_ERROR, "Could not run pipe process %s", cmd);
+        {
+            // NB: pipe->run can't rely on the child process failing fast enough to return false here, failure picked up later with stderr context.
+            WARNLOG(ROXIE_PIPE_ERROR, "Could not run pipe process %s", cmd);
+        }
         Owned<ISimpleReadStream> pipeReader = pipe->getOutputStream();
         readTransformer->setStream(pipeReader.get());
     }
@@ -9654,16 +9657,30 @@ public:
 private:
     bool waitForPipe()
     {
-        if (firstRead)
+        Owned<IException> pipeException;
+        try
         {
-            pipeOpened.wait();
-            firstRead = false;
+            if (firstRead)
+            {
+                pipeOpened.wait();
+                firstRead = false;
+            }
+            if (!pipe)
+                return false;  // done
+            if (!readTransformer->eos())
+                return true;
+        }
+        catch (IException *e)
+        {
+            // NB: the original exception is probably a IPipeProcessException, but because InterruptableSemaphore rethrows it, we must catch it as an IException
+            if (QUERYINTERFACE(e, IPipeProcessException))
+                pipeException.setown(e);
+            else
+                throw;
         }
-        if (!pipe)
-            return false;  // done
-        if (!readTransformer->eos())
-            return true;
         verifyPipe();
+        if (pipeException) // NB: verifyPipe may throw error based on pipe prog. output 1st.
+            throw pipeException.getClear();
         if (recreate && !inputExhausted)
             pipeOpened.wait();
         return false;
@@ -9674,7 +9691,10 @@ private:
         pipeCommand.setown(cmd);
         pipe.setown(createPipeProcess());
         if(!pipe->run(NULL, cmd, ".", true, true, true, 0x10000))
-            throw MakeStringException(ROXIE_PIPE_ERROR, "Could not run pipe process %s", cmd);
+        {
+            // NB: pipe->run can't rely on the child process failing fast enough to return false here, failure picked up later with stderr context.
+            WARNLOG(ROXIE_PIPE_ERROR, "Could not run pipe process %s", cmd);
+        }
         writeTransformer->writeHeader(pipe);
         Owned<ISimpleReadStream> pipeReader = pipe->getOutputStream();
         readTransformer->setStream(pipeReader.get());
@@ -9700,7 +9720,6 @@ private:
             pipeVerified.signal();
         }
     }
-
 };
 
 class CRoxieServerPipeWriteActivity : public CRoxieServerInternalSinkActivity
@@ -9767,20 +9786,34 @@ public:
 
     virtual void onExecute()
     {
-        for (;;)
+        Owned<IPipeProcessException> pipeException;
+        try
         {
-            const void *row = inputStream->ungroupedNextRow();
-            if (!row)
-                break;
-            processed++;
-            if(recreate)
-                openPipe(helper.getNameFromRow(row));
-            writeTransformer->writeTranslatedText(row, pipe);
-            ReleaseRoxieRow(row);
-            if(recreate)
+            for (;;)
+            {
+                OwnedConstRoxieRow row(inputStream->ungroupedNextRow());
+                if (!row)
+                    break;
+                processed++;
+                if(recreate)
+                    openPipe(helper.getNameFromRow(row));
+                writeTransformer->writeTranslatedText(row, pipe);
+                if (recreate)
+                {
+                    closePipe();
+                    verifyPipe();
+                }
+            }
+            if (!recreate)
                 closePipe();
         }
-        closePipe();
+        catch (IPipeProcessException *e)
+        {
+            pipeException.setown(e);
+        }
+        verifyPipe();
+        if (pipeException) // NB: verifyPipe may throw error based on pipe prog. output 1st.
+            throw pipeException.getClear();
     }
 
 private:
@@ -9789,7 +9822,10 @@ private:
         pipeCommand.setown(cmd);
         pipe.setown(createPipeProcess());
         if(!pipe->run(NULL, cmd, ".", true, false, true, 0x10000))
-            throw MakeStringException(ROXIE_PIPE_ERROR, "Could not run pipe process %s", cmd);
+        {
+            // NB: pipe->run can't rely on the child process failing fast enough to return false here, failure picked up later with stderr context.
+            WARNLOG(ROXIE_PIPE_ERROR, "Could not run pipe process %s", cmd);
+        }
         writeTransformer->writeHeader(pipe);
     }
 
@@ -9797,14 +9833,20 @@ private:
     {
         writeTransformer->writeFooter(pipe);
         pipe->closeInput();
-        unsigned err = pipe->wait();
-        if(err && !(helper.getPipeFlags() & TPFnofail))
+    }
+
+    void verifyPipe()
+    {
+        if (pipe)
         {
-            throw createPipeFailureException(pipeCommand.get(), err, pipe);
+            unsigned err = pipe->wait();
+            if(err && !(helper.getPipeFlags() & TPFnofail))
+            {
+                throw createPipeFailureException(pipeCommand.get(), err, pipe);
+            }
+            pipe.clear();
         }
-        pipe.clear();
     }
-
 };
 
 class CRoxieServerPipeReadActivityFactory : public CRoxieServerActivityFactory

+ 45 - 4
system/jlib/jthread.cpp

@@ -1326,6 +1326,40 @@ static void CheckAllowedProgram(const char *prog,const char *allowed)
     throw MakeStringException(-1,"Unauthorized pipe program(%s)",head.str());
 }
 
+class CPipeProcessException : public CSimpleInterfaceOf<IPipeProcessException>
+{
+    int errCode;
+    StringAttr msg;
+    MessageAudience audience;
+public:
+    CPipeProcessException(int _errCode, const char *_msg, MessageAudience _audience = MSGAUD_user) : errCode(_errCode), msg(_msg), audience(_audience)
+    {
+    }
+    virtual int errorCode() const override { return errCode; }
+    virtual StringBuffer & errorMessage(StringBuffer &str) const override
+    {
+        if (msg)
+            str.append(msg).append(", ");
+        return str.append(strerror(errCode));
+    }
+    MessageAudience errorAudience() const { return audience; }
+};
+
+IPipeProcessException *createPipeErrnoException(int code, const char *msg)
+{
+    return new CPipeProcessException(code, msg);
+}
+
+IPipeProcessException *createPipeErrnoExceptionV(int code, const char *msg, ...)
+{
+    StringBuffer eStr;
+    va_list args;
+    va_start(args, msg);
+    eStr.limited_valist_appendf(1024, msg, args);
+    va_end(args);
+    return new CPipeProcessException(code, eStr.str());
+}
+
 
 class CSimplePipeStream: implements ISimpleReadStream, public CInterface
 {
@@ -2022,7 +2056,14 @@ public:
                 ::setenv(envVars.item(idx), envValues.item(idx), 1);
             }
             execvp(argv[0],argv);
-            _exit(START_FAILURE);    // must be _exit!!     
+            if (haserror)
+            {
+                Owned<IException> e = createPipeErrnoExceptionV(errno, "exec failed: %s", prog.get());
+                StringBuffer eStr;
+                fprintf(stderr, "ERROR: %d: %s", e->errorCode(), e->errorMessage(eStr).str());
+                fflush(stderr);
+            }
+            _exit(START_FAILURE);    // must be _exit!!
         }
         free(argv);
         if (hasinput) 
@@ -2116,7 +2157,7 @@ public:
                 break;
             if (errno!=EINTR) {
                 aborted = true;
-                throw makeErrnoExceptionV(errno,"Pipe: read failed (size %d)", sz);
+                throw createPipeErrnoExceptionV(errno,"Pipe: read failed (size %d)", sz);
             }
         }
         return aborted?((size32_t)-1):((size32_t)sizeRead);
@@ -2147,7 +2188,7 @@ public:
             if (aborted) 
                 break;
             if (errno!=EINTR) {
-                throw makeErrnoExceptionV(errno, "Pipe: write failed (size %d)", sz);
+                throw createPipeErrnoExceptionV(errno, "Pipe: write failed (size %d)", sz);
             }
         }
         return aborted?((size32_t)-1):((size32_t)sizeWritten);
@@ -2174,7 +2215,7 @@ public:
                 break;
             if (errno!=EINTR) {
                 aborted = true;
-                throw makeErrnoExceptionV(errno, "Pipe: readError failed (size %d)", sz);
+                throw createPipeErrnoExceptionV(errno, "Pipe: readError failed (size %d)", sz);
             }
         }
         return aborted?((size32_t)-1):((size32_t)sizeRead);

+ 7 - 0
system/jlib/jthread.hpp

@@ -263,6 +263,13 @@ interface ISimpleReadStream;
 
 #define START_FAILURE (199) // return code if program cannot be started
 
+interface IPipeProcessException : extends IException
+{
+};
+
+extern jlib_decl IPipeProcessException *createPipeErrnoException(int code, const char *msg);
+extern jlib_decl IPipeProcessException *createPipeErrnoExceptionV(int code, const char *msg, ...);
+
 interface IPipeProcess: extends IInterface
 {
     virtual bool run(const char *title,const char *prog, const char *dir,

+ 6 - 0
testing/regress/ecl/key/pipefail.xml

@@ -0,0 +1,6 @@
+<Dataset name='Result 1'>
+ <Row><f1>ERROR: 2: exec failed: /tmp/fakepipecmd, No such file or directory&apos;</f1></Row>
+</Dataset>
+<Dataset name='Result 2'>
+ <Row><f1>ERROR: 2: exec failed: /tmp/fakepipecmd, No such file or directory&apos;</f1></Row>
+</Dataset>

+ 22 - 0
testing/regress/ecl/pipefail.ecl

@@ -0,0 +1,22 @@
+import Std.Str;
+
+fakecmd := '/tmp/fakepipecmd';
+rec := RECORD
+ string f1;
+END;
+ds := DATASET([{'a'},{'b'}], rec);
+
+rec myFailTrans(string failmsg) := TRANSFORM
+ string tmp := REGEXREPLACE('^.*ERROR:', failmsg, 'ERROR:');
+ SELF.f1 := REGEXREPLACE(' - PIPE.*$', tmp, '');
+END;
+
+pr := CATCH(PIPE(fakecmd, rec), ONFAIL(myFailTrans(FAILMESSAGE)));
+pt := CATCH(PIPE(ds, fakecmd), ONFAIL(myFailTrans(FAILMESSAGE)));
+//Ideally would have test that had CATCH around OUTPUT, but couldn't get to work
+//pw := OUTPUT(ads, , PIPE(fakecmd));
+
+SEQUENTIAL(
+ OUTPUT(pr);
+ OUTPUT(pt);
+);

+ 28 - 5
thorlcr/activities/piperead/thprslave.cpp

@@ -78,9 +78,12 @@ protected:
     {
         pipeCommand.setown(cmd);
         ActPrintLog("open: %s", cmd);
-        if (!pipe->run(pipeTrace, cmd, globals->queryProp("@externalProgDir"), true, true, true, 0x10000)) // 64K error buffer
-            throw MakeActivityException(this, TE_FailedToCreateProcess, "Failed to create process in %s for : %s", globals->queryProp("@externalProgDir"), cmd);
         pipeFinished = false;
+        if (!pipe->run(pipeTrace, cmd, globals->queryProp("@externalProgDir"), true, true, true, 0x10000)) // 64K error buffer
+        {
+            // NB: pipe->run can't rely on the child process failing fast enough to return false here, failure picked up later with stderr context.
+            WARNLOG(TE_FailedToCreateProcess, "Failed to create process in %s for : %s", globals->queryProp("@externalProgDir"), cmd);
+        }
         registerSelfDestructChildProcess(pipe->getProcessHandle());
         pipeStream->setStream(pipe->getOutputStream());
         readTransformer->setStream(pipeStream);
@@ -89,7 +92,7 @@ protected:
     {
         pipe->closeInput();
     }
-    void verifyPipe()
+    virtual void verifyPipe()
     {
         if (!pipeFinished)
         {
@@ -107,7 +110,7 @@ protected:
                         char error[512];
                         size32_t sz = pipe->readError(sizeof(error), error);
                         if (sz && sz!=(size32_t)-1)
-                            stdError.append(", stderr: '").append(sz, error).append("'");
+                            stdError.append(sz, error).append("'");
                     }
                     catch (IException *e)
                     {
@@ -115,7 +118,10 @@ protected:
                         e->Release();
                     }
                 }
-                throw MakeActivityException(this, TE_PipeReturnedFailure, "Process returned %d:%s - PIPE(%s)", retcode, stdError.str(), pipeCommand.get());
+                if (START_FAILURE == retcode) // PIPE process didn't start at all, START_FAILURE is our own error code
+                    throw MakeActivityException(this, TE_PipeReturnedFailure, "Process failed to start: %s - PIPE(%s)", stdError.str(), pipeCommand.get());
+                else
+                    throw MakeActivityException(this, TE_PipeReturnedFailure, "Process returned %d:%s - PIPE(%s)", retcode, stdError.str(), pipeCommand.get());
             }
         }
     }
@@ -351,6 +357,23 @@ public:
     {
         ::Release(pipeWriter);
     }
+    virtual void verifyPipe() override
+    {
+        if (!pipeFinished)
+        {
+            // If verifyPipe catches exception starting pipe program, clear follow-on errors from pipeWriter thread
+            try
+            {
+                PARENT::verifyPipe();
+            }
+            catch (IException *e)
+            {
+                retcode = 0;
+                ::Release(pipeWriter->checkError());
+                throw;
+            }
+        }
+    }
     virtual void start() override
     {
         ActivityTimer s(totalCycles, timeActivities);

+ 62 - 46
thorlcr/activities/pipewrite/thpwslave.cpp

@@ -63,75 +63,88 @@ public:
         }
         ProcessSlaveActivity::abort();
     }
-    void close()
-    {
-        if (!recreate)
-            closePipeAndVerify();
-    }
     void openPipe(char const * cmd)
     {
         pipeCommand.setown(cmd);
-        if(!pipe->run("PIPEWRITE", cmd, globals->queryProp("@externalProgDir"), true, false, true, 0x10000 )) // 64K error buffer
-            throw MakeActivityException(this, TE_FailedToCreateProcess, "PIPEWRITE: Failed to create process in %s for : %s", globals->queryProp("@externalProgDir"), cmd);
         pipeOpen = true;
-        registerSelfDestructChildProcess(pipe->getProcessHandle());
-        writeTransformer->writeHeader(pipe);
+        if(!pipe->run("PIPEWRITE", cmd, globals->queryProp("@externalProgDir"), true, false, true, 0x10000 )) // 64K error buffer
+        {
+            // NB: pipe->run can't rely on the child process failing fast enough to return false here, failure picked up later with stderr context.
+            WARNLOG(TE_FailedToCreateProcess, "PIPEWRITE: Failed to create process in %s for : %s", globals->queryProp("@externalProgDir"), cmd);
+        }
+        else
+        {
+            registerSelfDestructChildProcess(pipe->getProcessHandle());
+            writeTransformer->writeHeader(pipe);
+        }
     }
-    void closePipeAndVerify()
+    void closePipe()
     {
         if (!pipeOpen)
             return;
         pipeOpen = false;
         writeTransformer->writeFooter(pipe);
-
         pipe->closeInput();
-        HANDLE pipeProcess = pipe->getProcessHandle();
-        unsigned retcode = pipe->wait();
-        unregisterSelfDestructChildProcess(pipeProcess);
-        if (retcode!=0 && !(helper->getPipeFlags() & TPFnofail))
+    }
+    void verifyPipe()
+    {
+        if (pipe)
         {
-            StringBuffer stdError;
-            if (pipe->hasError())
+            HANDLE pipeProcess = pipe->getProcessHandle();
+            unsigned retcode = pipe->wait();
+            unregisterSelfDestructChildProcess(pipeProcess);
+            if (retcode!=0 && !(helper->getPipeFlags() & TPFnofail))
             {
-                try
+                StringBuffer stdError;
+                if (pipe->hasError())
                 {
-                    char error[512];
-                    size32_t sz = pipe->readError(sizeof(error), error);
-                    if (sz && sz!=(size32_t)-1) 
-                        stdError.append(", stderr: '").append(sz, error).append("'");
-                }
-                catch (IException *e)
-                {
-                    ActPrintLog(e, "Error reading pipe stderr");
-                    e->Release();
+                    try
+                    {
+                        char error[512];
+                        size32_t sz = pipe->readError(sizeof(error), error);
+                        if (sz && sz!=(size32_t)-1)
+                            stdError.append(", stderr: '").append(sz, error).append("'");
+                    }
+                    catch (IException *e)
+                    {
+                        ActPrintLog(e, "Error reading pipe stderr");
+                        e->Release();
+                    }
                 }
+                if (START_FAILURE == retcode) // PIPE process didn't start at all, START_FAILURE is our own error code
+                    throw MakeActivityException(this, TE_PipeReturnedFailure, "Process failed to start: %s - PIPE(%s)", stdError.str(), pipeCommand.get());
+                else
+                    throw MakeActivityException(this, TE_PipeReturnedFailure, "Process returned %d:%s - PIPE(%s)", retcode, stdError.str(), pipeCommand.get());
             }
-            throw MakeActivityException(this, TE_PipeReturnedFailure, "Process returned %d:%s - PIPE(%s)", retcode, stdError.str(), pipeCommand.get());    
+            pipe.clear();
         }
     }
     void process()
     {
         start();
-        if (!writeTransformer)
-        {
-            writeTransformer.setown(createPipeWriteXformHelper(helper->getPipeFlags(), helper->queryXmlOutput(), helper->queryCsvOutput(), ::queryRowInterfaces(input)->queryRowSerializer()));
-            writeTransformer->ready();
-        }
-        processed = THORDATALINK_STARTED;
+        Owned<IPipeProcessException> pipeException;
+
+        ActPrintLog("process");
         try
         {
-            ActPrintLog("process");
-
-            open(); 
+            if (!writeTransformer)
+            {
+                writeTransformer.setown(createPipeWriteXformHelper(helper->getPipeFlags(), helper->queryXmlOutput(), helper->queryCsvOutput(), ::queryRowInterfaces(input)->queryRowSerializer()));
+                writeTransformer->ready();
+            }
+            processed = THORDATALINK_STARTED;
+            open();
             write();
+            if (!recreate)
+                closePipe();
         }
-        catch (IException *e)
+        catch (IPipeProcessException *e)
         {
-            try { close(); } catch (CATCHALL) { ActPrintLog("Error closing file"); }
-            ActPrintLog(e, "exception");
-            throw;
+            pipeException.setown(e);
         }
-        close();
+        verifyPipe();
+        if (pipeException) // NB: verifyPipe may throw error based on pipe prog. output 1st.
+            throw pipeException.getClear();
         ActPrintLog("Wrote %" RCPF "d records", processed & THORDATALINK_COUNT_MASK);
     }
     void endProcess()
@@ -150,8 +163,8 @@ public:
     }
     void write()
     {
-        ActPrintLog("write");           
-        while(!abortSoon)
+        ActPrintLog("write");
+        while (!abortSoon)
         {
             OwnedConstThorRow row = inputStream->ungroupedNextRow();
             if (!row) 
@@ -159,8 +172,11 @@ public:
             if (recreate)
                 openPipe(helper->getNameFromRow(row.get()));
             writeTransformer->writeTranslatedText(row, pipe);
-            if(recreate)
-                closePipeAndVerify();
+            if (recreate)
+            {
+                closePipe();
+                verifyPipe();
+            }
             processed++;
         }
     }