Procházet zdrojové kódy

HPCC-11234 Roxie gives UNIMPLEMENTED if try to read result from workunit

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman před 11 roky
rodič
revize
441adec81f
3 změnil soubory, kde provedl 82 přidání a 34 odebrání
  1. 78 28
      roxie/ccd/ccdcontext.cpp
  2. 1 1
      roxie/ccd/ccdcontext.hpp
  3. 3 5
      roxie/ccd/ccdserver.cpp

+ 78 - 28
roxie/ccd/ccdcontext.cpp

@@ -932,6 +932,37 @@ public:
     }
 };
 
+class WuResultDataReader : public RawDataReader
+{
+    Owned<IConstWUResult> result;
+    IXmlToRowTransformer *rowTransformer;
+public:
+    WuResultDataReader(ICodeContext *codeContext, IEngineRowAllocator *_rowAllocator, bool _isGrouped, const IRoxieContextLogger &_logctx, IConstWUResult *_result, IXmlToRowTransformer *_rowTransformer)
+        : RawDataReader(codeContext, _rowAllocator, _isGrouped, _logctx), result(_result), rowTransformer(_rowTransformer)
+    {
+    }
+
+    virtual bool nextBlock(unsigned & tlen, void * & tgt, void * & base)
+    {
+        tgt = NULL;
+        base = NULL;
+        if (result)
+        {
+            Variable2IDataVal r(&tlen, &tgt);
+            Owned<IXmlToRawTransformer> rawXmlTransformer = createXmlRawTransformer(rowTransformer);
+            result->getResultRaw(r, rawXmlTransformer, NULL);
+            base = tgt;
+            result.clear();
+            return tlen != 0;
+        }
+        else
+        {
+            tlen = 0;
+            return false;
+        }
+    }
+};
+
 class InlineXmlDataReader : public WorkUnitRowReaderBase
 {
     Linked<IPropertyTree> xml;
@@ -1690,7 +1721,7 @@ public:
     {
         try
         {
-            Owned<IWorkUnitRowReader> wuReader = getWorkunitRowReader(stepname, sequence, xmlTransformer, _rowAllocator, isGrouped);
+            Owned<IWorkUnitRowReader> wuReader = getWorkunitRowReader(NULL, stepname, sequence, xmlTransformer, _rowAllocator, isGrouped);
             wuReader->getResultRowset(tcount, tgt);
         }
         catch (IException * e)
@@ -1710,7 +1741,7 @@ public:
     {
         try
         {
-            Owned<IWorkUnitRowReader> wuReader = getWorkunitRowReader(stepname, sequence, xmlTransformer, _rowAllocator, false);
+            Owned<IWorkUnitRowReader> wuReader = getWorkunitRowReader(NULL, stepname, sequence, xmlTransformer, _rowAllocator, false);
             wuReader->getResultRowset(tcount, tgt);
         }
         catch (IException * e)
@@ -2050,44 +2081,63 @@ protected:
     {
         throwUnexpected();  // Should only see on server
     }
-    virtual IWorkUnitRowReader *getWorkunitRowReader(const char *stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, IEngineRowAllocator *rowAllocator, bool isGrouped)
+    virtual IWorkUnitRowReader *getWorkunitRowReader(const char *wuid, const char *stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, IEngineRowAllocator *rowAllocator, bool isGrouped)
     {
         try
         {
-            CriticalBlock b(contextCrit);
-            IPropertyTree &ctx = useContext(sequence);
-            IPropertyTree *val = ctx.queryPropTree(stepname);
-            if (val)
+            if (wuid)
             {
-                const char *id = val->queryProp("@id");
-                const char *format = val->queryProp("@format");
-                if (id)
+                Owned<IRoxieDaliHelper> daliHelper = connectToDali();
+                if (daliHelper && daliHelper->connected())
                 {
-                    if (!format || strcmp(format, "raw") == 0)
-                    {
-                        return createStreamedRawRowReader(rowAllocator, isGrouped, id);
-                    }
-                    else if (strcmp(format, "deserialized") == 0)
-                    {
-                        IDeserializedResultStore &resultStore = useResultStore(sequence);
-                        return resultStore.createDeserializedReader(atoi(id));
-                    }
-                    else
-                        throwUnexpected();
+                    Owned<IWorkUnitFactory> factory = getWorkUnitFactory();
+                    Owned<IConstWorkUnit> externalWU = factory->openWorkUnit(wuid, false);
+                    externalWU->remoteCheckAccess(queryUserDescriptor(), false);
+                    Owned<IConstWUResult> wuResult = getWorkUnitResult(externalWU, stepname, sequence);
+                    if (!wuResult)
+                        throw MakeStringException(ROXIE_FILE_ERROR, "Failed to find value %s:%d in workunit %s", stepname ? stepname : "(null)", sequence, wuid);
+                    return new WuResultDataReader(queryCodeContext(), rowAllocator, isGrouped, logctx, wuResult.getClear(), xmlTransformer);
                 }
                 else
+                    throw MakeStringException(ROXIE_DALI_ERROR, "WorkUnit read: no dali connection available");
+            }
+            else
+            {
+                CriticalBlock b(contextCrit);
+                IPropertyTree &ctx = useContext(sequence);
+                IPropertyTree *val = ctx.queryPropTree(stepname);
+                if (val)
                 {
-                    if (!format || strcmp(format, "xml") == 0)
+                    const char *id = val->queryProp("@id");
+                    const char *format = val->queryProp("@format");
+                    if (id)
                     {
-                        if (xmlTransformer)
-                            return new InlineXmlDataReader(*xmlTransformer, val, rowAllocator, isGrouped);
+                        if (!format || strcmp(format, "raw") == 0)
+                        {
+                            return createStreamedRawRowReader(rowAllocator, isGrouped, id);
+                        }
+                        else if (strcmp(format, "deserialized") == 0)
+                        {
+                            IDeserializedResultStore &resultStore = useResultStore(sequence);
+                            return resultStore.createDeserializedReader(atoi(id));
+                        }
+                        else
+                            throwUnexpected();
                     }
-                    else if (strcmp(format, "raw") == 0)
+                    else
                     {
-                        return new InlineRawDataReader(queryCodeContext(), rowAllocator, isGrouped, logctx, val);
+                        if (!format || strcmp(format, "xml") == 0)
+                        {
+                            if (xmlTransformer)
+                                return new InlineXmlDataReader(*xmlTransformer, val, rowAllocator, isGrouped);
+                        }
+                        else if (strcmp(format, "raw") == 0)
+                        {
+                            return new InlineRawDataReader(queryCodeContext(), rowAllocator, isGrouped, logctx, val);
+                        }
+                        else
+                            throwUnexpected();
                     }
-                    else
-                        throwUnexpected();
                 }
             }
         }

+ 1 - 1
roxie/ccd/ccdcontext.hpp

@@ -74,7 +74,7 @@ interface IRoxieSlaveContext : extends IRoxieContextLogger
     virtual IWorkUnit *updateWorkUnit() const = 0;
     virtual IConstWorkUnit *queryWorkUnit() const = 0;
     virtual IRoxieServerContext *queryServerContext() = 0;
-    virtual IWorkUnitRowReader *getWorkunitRowReader(const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, IEngineRowAllocator *rowAllocator, bool isGrouped) = 0;
+    virtual IWorkUnitRowReader *getWorkunitRowReader(const char *wuid, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, IEngineRowAllocator *rowAllocator, bool isGrouped) = 0;
 };
 
 interface IRoxieServerContext : extends IInterface

+ 3 - 5
roxie/ccd/ccdserver.cpp

@@ -395,9 +395,9 @@ public:
     {
         return ctx->queryServerContext();
     }
-    virtual IWorkUnitRowReader *getWorkunitRowReader(const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, IEngineRowAllocator *rowAllocator, bool isGrouped)
+    virtual IWorkUnitRowReader *getWorkunitRowReader(const char *wuid, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, IEngineRowAllocator *rowAllocator, bool isGrouped)
     {
-        return ctx->getWorkunitRowReader(name, sequence, xmlTransformer, rowAllocator, isGrouped);
+        return ctx->getWorkunitRowReader(wuid, name, sequence, xmlTransformer, rowAllocator, isGrouped);
     }
 protected:
     IRoxieSlaveContext * ctx;
@@ -5327,9 +5327,7 @@ public:
         CRoxieServerActivity::start(parentExtractSize, parentExtract, paused);
         IXmlToRowTransformer * xmlTransformer = helper.queryXmlTransformer();
         OwnedRoxieString fromWuid(helper.getWUID());
-        if (fromWuid)
-            UNIMPLEMENTED;
-        wuReader.setown(ctx->getWorkunitRowReader(helper.queryName(), helper.querySequence(), xmlTransformer, rowAllocator, meta.isGrouped()));
+        wuReader.setown(ctx->getWorkunitRowReader(fromWuid, helper.queryName(), helper.querySequence(), xmlTransformer, rowAllocator, meta.isGrouped()));
         // MORE _ should that be in onCreate?
     }