Browse Source

Merge remote-tracking branch 'origin/candidate-5.0.0' into closedown-5.0.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 11 years ago
parent
commit
27e1b1cc39

+ 13 - 1
common/workunit/workflow.cpp

@@ -673,6 +673,17 @@ bool WorkflowMachine::executeItem(unsigned wfid, unsigned scheduledWfid)
     switch(item.queryState())
     switch(item.queryState())
     {
     {
     case WFStateDone:
     case WFStateDone:
+        if (item.queryMode() == WFModePersist)
+        {
+#ifdef TRACE_WORKFLOW
+            LOG(MCworkflow, "Recheck persist %u", wfid);
+#endif
+            break;
+        }
+#ifdef TRACE_WORKFLOW
+        LOG(MCworkflow, "Nothing to be done for workflow item %u", wfid);
+#endif
+        return true;
     case WFStateSkip:
     case WFStateSkip:
 #ifdef TRACE_WORKFLOW
 #ifdef TRACE_WORKFLOW
         LOG(MCworkflow, "Nothing to be done for workflow item %u", wfid);
         LOG(MCworkflow, "Nothing to be done for workflow item %u", wfid);
@@ -683,7 +694,8 @@ bool WorkflowMachine::executeItem(unsigned wfid, unsigned scheduledWfid)
     case WFStateBlocked:
     case WFStateBlocked:
         throw new WorkflowException(0, "INTERNAL ERROR: attempting to execute workflow item in blocked state", wfid, WorkflowException::SYSTEM, MSGAUD_user);
         throw new WorkflowException(0, "INTERNAL ERROR: attempting to execute workflow item in blocked state", wfid, WorkflowException::SYSTEM, MSGAUD_user);
     case WFStateFail:
     case WFStateFail:
-        item.reset(); //fall through
+        item.reset();
+        break;
     }
     }
 
 
     switch(item.queryMode())
     switch(item.queryMode())

+ 24 - 11
dali/base/dautils.cpp

@@ -901,20 +901,33 @@ StringBuffer &CDfsLogicalFileName::makeXPathLName(StringBuffer &lfnNodeName) con
     loop
     loop
     {
     {
         const char *e=strstr(s,"::");
         const char *e=strstr(s,"::");
-        if (!e)
-        {
-            if (!streq(".", s))
-                lfnNodeName.append(s);
-            break;
-        }
-        if (0 != strncmp(".", s, e-s))
+        if ((e && 0 != strncmp(".", s, e-s)) || (!e && !streq(".", s))) // skip '.' scopes
         {
         {
-            if (!first)
-                lfnNodeName.append('_');
-            else
+            if (first)
                 first = false;
                 first = false;
-            lfnNodeName.append(e-s,s);
+            else
+                lfnNodeName.append('_');
+            while (s != e)
+            {
+                char c = *s;
+                switch (c)
+                {
+                case '\0':
+                    return lfnNodeName; // done
+                case '^':
+                    ++s;
+                    if ('\0' == *s)
+                        return lfnNodeName; // probably an error really to end in '^'
+                    c = toupper(*s);
+                    // fall through
+                default:
+                    lfnNodeName.append(c);
+                }
+                ++s;
+            }
         }
         }
+        if (!e)
+            break;
         s = e+2;
         s = e+2;
     }
     }
     return lfnNodeName;
     return lfnNodeName;

+ 45 - 0
docs/ECLStandardLibraryReference/SLR-Mods/FromHexPairs.xml

@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE sect1 PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN"
+"http://www.oasis-open.org/docbook/xml/4.5/docbookx.dtd">
+<sect1 id="FromHexPairs">
+  <title>FromHexPairs</title>
+
+  <para><emphasis role="bold">STD.Str.FromHexPairs<indexterm>
+      <primary>STD.Str.FromHexPairs</primary>
+    </indexterm><indexterm>
+      <primary>Str.FromHexPairs</primary>
+    </indexterm><indexterm>
+      <primary>FromHexPairs</primary>
+    </indexterm>(</emphasis> <emphasis>source</emphasis> <emphasis
+  role="bold">)</emphasis> <emphasis role="bold"></emphasis></para>
+
+  <informaltable colsep="1" frame="all" rowsep="1">
+    <tgroup cols="2">
+      <colspec colwidth="80.50pt" />
+
+      <colspec />
+
+      <tbody>
+        <row>
+          <entry><emphasis>source</emphasis></entry>
+
+          <entry>The string containing the hex pairs to process.</entry>
+        </row>
+
+        <row>
+          <entry>Return:<emphasis> </emphasis></entry>
+
+          <entry>FromHexPairs returns a data value with each byte created from
+          a pair of hex digits.</entry>
+        </row>
+      </tbody>
+    </tgroup>
+  </informaltable>
+
+  <para>The <emphasis role="bold">FromHexPairs </emphasis>function returns a
+  data value with each byte created from a pair of hex digits.</para>
+
+  <para>Example:</para>
+
+  <programlisting format="linespecific">A := STD.Str.FromHexPairs('0001FF80');</programlisting>
+</sect1>

+ 46 - 0
docs/ECLStandardLibraryReference/SLR-Mods/ToHexPairs.xml

@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!DOCTYPE sect1 PUBLIC "-//OASIS//DTD DocBook XML V4.5//EN"
+"http://www.oasis-open.org/docbook/xml/4.5/docbookx.dtd">
+<sect1 id="ToHexPairs">
+  <title>ToHexPairs</title>
+
+  <para><emphasis role="bold">STD.Str.ToHexPairs<indexterm>
+      <primary>STD.Str.ToHexPairs</primary>
+    </indexterm><indexterm>
+      <primary>Str.ToHexPairs</primary>
+    </indexterm><indexterm>
+      <primary>ToHexPairs</primary>
+    </indexterm>(</emphasis> <emphasis>source</emphasis> <emphasis
+  role="bold">)</emphasis> <emphasis role="bold"></emphasis></para>
+
+  <informaltable colsep="1" frame="all" rowsep="1">
+    <tgroup cols="2">
+      <colspec colwidth="80.50pt" />
+
+      <colspec />
+
+      <tbody>
+        <row>
+          <entry><emphasis>source</emphasis></entry>
+
+          <entry>The data value that should be expanded as a sequence of hex
+          pairs.</entry>
+        </row>
+
+        <row>
+          <entry>Return:<emphasis> </emphasis></entry>
+
+          <entry>ToHexPairs returns a string containing a sequence of hex
+          pairs.</entry>
+        </row>
+      </tbody>
+    </tgroup>
+  </informaltable>
+
+  <para>The <emphasis role="bold">ToHexPairs </emphasis>function Converts the
+  data value to a sequence of hex pairs.</para>
+
+  <para>Example:</para>
+
+  <programlisting format="linespecific">A := STD.Str.ToHexPairs(D'\000\001\377\200');</programlisting>
+</sect1>

+ 7 - 1
docs/ECLStandardLibraryReference/SLR-includer.xml

@@ -297,7 +297,10 @@
 
 
     <xi:include href="ECLStandardLibraryReference/SLR-Mods/FindReplace.xml"
     <xi:include href="ECLStandardLibraryReference/SLR-Mods/FindReplace.xml"
                 xmlns:xi="http://www.w3.org/2001/XInclude" />
                 xmlns:xi="http://www.w3.org/2001/XInclude" />
-
+                
+    <xi:include href="ECLStandardLibraryReference/SLR-Mods/FromHexPairs.xml"
+                xmlns:xi="http://www.w3.org/2001/XInclude" />     
+                
     <xi:include href="ECLStandardLibraryReference/SLR-Mods/GetNthWord.xml"
     <xi:include href="ECLStandardLibraryReference/SLR-Mods/GetNthWord.xml"
                 xmlns:xi="http://www.w3.org/2001/XInclude" />
                 xmlns:xi="http://www.w3.org/2001/XInclude" />
 
 
@@ -321,6 +324,9 @@
 
 
     <xi:include href="ECLStandardLibraryReference/SLR-Mods/StartsWith.xml"
     <xi:include href="ECLStandardLibraryReference/SLR-Mods/StartsWith.xml"
                 xmlns:xi="http://www.w3.org/2001/XInclude" />
                 xmlns:xi="http://www.w3.org/2001/XInclude" />
+                                
+    <xi:include href="ECLStandardLibraryReference/SLR-Mods/ToHexPairs.xml"
+                xmlns:xi="http://www.w3.org/2001/XInclude" />  
 
 
     <xi:include href="ECLStandardLibraryReference/SLR-Mods/ToLowerCase.xml"
     <xi:include href="ECLStandardLibraryReference/SLR-Mods/ToLowerCase.xml"
                 xmlns:xi="http://www.w3.org/2001/XInclude" />
                 xmlns:xi="http://www.w3.org/2001/XInclude" />

+ 16 - 4
ecl/eclagent/eclagent.cpp

@@ -2209,13 +2209,19 @@ void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
     {
     {
         throw MakeStringException(0, "PERSIST not supported when running standalone");
         throw MakeStringException(0, "PERSIST not supported when running standalone");
     }
     }
+
+    SCMStringBuffer name;
+    const char *logicalName = item.getPersistName(name).str();
+    //Persists are re-executed to ensure they are locked when workunits are restarted, and to ensure expired persists
+    //are rebuilt, but check if we have already processed and locked this persist
+    if (agent.alreadyLockedPersist(logicalName))
+        return;
+
     unsigned wfid = item.queryWfid();
     unsigned wfid = item.queryWfid();
     // Old persist model requires dependencies to be executed BEFORE checking if the persist is up to date
     // Old persist model requires dependencies to be executed BEFORE checking if the persist is up to date
     // Defaults to old model, in case executing a WU that is created by earlier eclcc
     // Defaults to old model, in case executing a WU that is created by earlier eclcc
     if (!agent.queryWorkUnit()->getDebugValueBool("expandPersistInputDependencies", false))
     if (!agent.queryWorkUnit()->getDebugValueBool("expandPersistInputDependencies", false))
         doExecuteItemDependencies(item, wfid);
         doExecuteItemDependencies(item, wfid);
-    SCMStringBuffer name;
-    const char *logicalName = item.getPersistName(name).str();
     int maxPersistCopies = item.queryPersistCopies();
     int maxPersistCopies = item.queryPersistCopies();
     if (maxPersistCopies < 0)
     if (maxPersistCopies < 0)
         maxPersistCopies = DEFAULT_PERSIST_COPIES;
         maxPersistCopies = DEFAULT_PERSIST_COPIES;
@@ -2256,7 +2262,7 @@ void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
         doExecuteItem(item, wfid);
         doExecuteItem(item, wfid);
         agent.updatePersist(persistLock, logicalName, thisPersist->eclCRC, thisPersist->allCRC);
         agent.updatePersist(persistLock, logicalName, thisPersist->eclCRC, thisPersist->allCRC);
     }
     }
-    agent.finishPersist(persistLock.getClear());
+    agent.finishPersist(logicalName, persistLock.getClear());
 }
 }
 
 
 //----------------------------------------------------------------
 //----------------------------------------------------------------
@@ -2620,9 +2626,15 @@ IRemoteConnection *EclAgent::startPersist(const char * logicalName)
     return persistLock;
     return persistLock;
 }
 }
 
 
-void EclAgent::finishPersist(IRemoteConnection *persistLock)
+bool EclAgent::alreadyLockedPersist(const char * persistName)
+{
+    return processedPersists.contains(persistName);
+}
+
+void EclAgent::finishPersist(const char * persistName, IRemoteConnection *persistLock)
 {
 {
     LOG(MCrunlock, unknownJob, "Finished persists - add to read lock list");
     LOG(MCrunlock, unknownJob, "Finished persists - add to read lock list");
+    processedPersists.append(persistName);
     persistReadLocks.append(*persistLock);
     persistReadLocks.append(*persistLock);
 }
 }
 
 

+ 3 - 1
ecl/eclagent/eclagent.ipp

@@ -355,6 +355,7 @@ private:
     StringArray tempFiles;
     StringArray tempFiles;
     CriticalSection tfsect;
     CriticalSection tfsect;
     Array persistReadLocks;
     Array persistReadLocks;
+    StringArray processedPersists;
 
 
     Owned<ILoadedDllEntry> dll;
     Owned<ILoadedDllEntry> dll;
     CIArrayOf<EclAgentQueryLibrary> queryLibraries;
     CIArrayOf<EclAgentQueryLibrary> queryLibraries;
@@ -488,7 +489,8 @@ public:
     virtual void restoreCluster();
     virtual void restoreCluster();
 
 
     IRemoteConnection *startPersist(const char * name);
     IRemoteConnection *startPersist(const char * name);
-    void finishPersist(IRemoteConnection *persistLock);
+    bool alreadyLockedPersist(const char * persistName);
+    void finishPersist(const char * persistName, IRemoteConnection *persistLock);
     void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC);
     void updatePersist(IRemoteConnection *persistLock, const char * logicalName, unsigned eclCRC, unsigned __int64 allCRC);
     void checkPersistMatches(const char * logicalName, unsigned eclCRC);
     void checkPersistMatches(const char * logicalName, unsigned eclCRC);
     virtual void deleteLRUPersists(const char * logicalName, int keep);
     virtual void deleteLRUPersists(const char * logicalName, int keep);

+ 41 - 0
ecl/regress/issue11528a.ecl

@@ -0,0 +1,41 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+d1 := DATASET([1,2,3,4,56,31,1324,134], { unsigned id });
+
+p1 := d1 : persist('~p1', single);
+
+addMessage := output(dataset(['Hi'], { string text }),named('msgs'), EXTEND);
+
+allMessages := DATASET(WORKUNIT('msgs'), { string text });
+
+p2 := p1(id != 10) : persist('~p2', single);
+
+
+import Std.File;
+
+clean := File.DeleteLogicalFile('~p2');
+
+doFail := FAIL('Fail because this is being run for the first time!');
+
+cleanUp := IF(count(allMessages)=1, doFail);
+
+
+x1 := WHEN(p2, addMessage, BEFORE);
+
+output(WHEN(x1, cleanup, BEFORE),named('results'));

+ 22 - 0
ecl/regress/issue11528b.ecl

@@ -0,0 +1,22 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+import Std.File;
+
+clean := File.DeleteLogicalFile('~p2');
+
+clean;

+ 38 - 0
ecl/regress/issue11528c.ecl

@@ -0,0 +1,38 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+d1 := DATASET([1,2,3,4,56,31,1324,134], { unsigned id });
+
+p1 := d1 : persist('~p1', single);
+
+addMessage := output(dataset(['Hi'], { string text }),named('msgs'), EXTEND);
+
+allMessages := DATASET(WORKUNIT('msgs'), { string text });
+
+p2 := p1(id != 10) : persist('pzzz', single, FEW);
+
+
+import Std.File;
+
+doFail := FAIL('Fail because this is being run for the first time!');
+
+cleanUp := IF(count(allMessages)=1, doFail);
+
+x1 := WHEN(p2, addMessage, BEFORE);
+
+output(WHEN(x1, cleanup, BEFORE),named('results'));

+ 43 - 0
ecl/regress/issue11528d.ecl

@@ -0,0 +1,43 @@
+/*##############################################################################
+
+    HPCC SYSTEMS software Copyright (C) 2013 HPCC Systems.
+
+    Licensed under the Apache License, Version 2.0 (the "License");
+    you may not use this file except in compliance with the License.
+    You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+############################################################################## */
+
+
+d1 := DATASET([1,2,3,4,56,31,1324,134], { unsigned id });
+
+p1 := d1 : persist('~p1', single);
+
+addMessage := output(dataset(['Hi'], { string text }),named('msgs'), EXTEND);
+
+allMessages := DATASET(WORKUNIT('msgs'), { string text });
+
+p2 := p1(id != 10) : persist('~p2', single);
+
+p3 := dedup(p1 + p2) : persist('~p3', single);
+
+
+
+import Std.File;
+
+clean(string x) := File.DeleteLogicalFile('~' + x);
+
+cleanUp := [ clean('p1'), clean('p2'), clean('p3') ];
+
+
+sequential(
+    cleanUp;
+    output(p3,named('results'));
+);

+ 2 - 0
esp/services/ws_smc/ws_smcService.cpp

@@ -668,6 +668,8 @@ ActivityInfo* CWsSMCEx::getActivityInfo(IEspContext &context, IEspActivityReques
     if (activityInfoCache && activityInfoCache->isCachedActivityInfoValid(activityInfoCacheSeconds))
     if (activityInfoCache && activityInfoCache->isCachedActivityInfoValid(activityInfoCacheSeconds))
         return activityInfoCache.getLink();
         return activityInfoCache.getLink();
 
 
+    DBGLOG("CWsSMCEx::getActivityInfo - rebuild cached information");
+
     activityInfoCache.setown(createActivityInfo(context, req));
     activityInfoCache.setown(createActivityInfo(context, req));
     return activityInfoCache.getLink();
     return activityInfoCache.getLink();
 }
 }

+ 5 - 2
roxie/ccd/ccdfile.cpp

@@ -1659,6 +1659,7 @@ protected:
     IArrayOf<IResolvedFile> subRFiles;  // To make sure subfiles get locked too
     IArrayOf<IResolvedFile> subRFiles;  // To make sure subfiles get locked too
 
 
     Owned <IPropertyTree> properties;
     Owned <IPropertyTree> properties;
+    Linked<IRoxieDaliHelper> daliHelper;
     Owned<IDaliPackageWatcher> notifier;
     Owned<IDaliPackageWatcher> notifier;
 
 
     void addFile(const char *subName, IFileDescriptor *fdesc, IFileDescriptor *remoteFDesc)
     void addFile(const char *subName, IFileDescriptor *fdesc, IFileDescriptor *remoteFDesc)
@@ -1714,8 +1715,8 @@ protected:
 
 
 public:
 public:
     IMPLEMENT_IINTERFACE;
     IMPLEMENT_IINTERFACE;
-    CResolvedFile(const char *_lfn, const char *_physicalName, IDistributedFile *_dFile, RoxieFileType _fileType, IRoxieDaliHelper* daliHelper, bool isDynamic, bool cacheIt, bool writeAccess, bool _isSuperFile)
-    : lfn(_lfn), physicalName(_physicalName), dFile(_dFile), fileType(_fileType), isSuper(_isSuperFile)
+    CResolvedFile(const char *_lfn, const char *_physicalName, IDistributedFile *_dFile, RoxieFileType _fileType, IRoxieDaliHelper* _daliHelper, bool isDynamic, bool cacheIt, bool writeAccess, bool _isSuperFile)
+    : daliHelper(_daliHelper), lfn(_lfn), physicalName(_physicalName), dFile(_dFile), fileType(_fileType), isSuper(_isSuperFile)
     {
     {
         cached = NULL;
         cached = NULL;
         fileSize = 0;
         fileSize = 0;
@@ -1764,6 +1765,8 @@ public:
     }
     }
     virtual void beforeDispose()
     virtual void beforeDispose()
     {
     {
+        if (notifier)
+            daliHelper->releaseSubscription(notifier);
         notifier.clear();
         notifier.clear();
         if (cached)
         if (cached)
         {
         {

+ 1 - 0
roxie/ccd/ccdstate.cpp

@@ -187,6 +187,7 @@ public:
             }
             }
             if (nextTimeout != INFINITE)
             if (nextTimeout != INFINITE)
                 nextTimeout = nextTimeout * 1000;
                 nextTimeout = nextTimeout * 1000;
+            clearKeyStoreCache(false);   // Allows us to fully release files we no longer need because of unloaded queries
         }
         }
         if (traceLevel)
         if (traceLevel)
             DBGLOG("DelayedReleaserThread %p exiting", this);
             DBGLOG("DelayedReleaserThread %p exiting", this);