Browse Source

Merge remote-tracking branch 'origin/candidate-3.10.0' into candidate-3.10.x

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 12 years ago
parent
commit
59a36f1181

+ 1 - 1
common/thorhelper/thorcommon.cpp

@@ -1614,7 +1614,7 @@ public:
         tempname.append('.').append(tempfiles.ordinality()).append('_').append((__int64)GetCurrentThreadId()).append('_').append((unsigned)GetCurrentProcessId());
         IFile *file = createIFile(tempname.str());
         tempfiles.append(*file);
-        return createRowWriter(file, rowInterfaces, 0); // flushed by close
+        return createRowWriter(file, rowInterfaces);
     }
     void put(const void **rows,unsigned numrows)
     {

+ 22 - 7
dali/daliadmin/daliadmin.cpp

@@ -78,7 +78,7 @@ void usage(const char *exe)
   printf("  dfsfile <logicalname>          -- get meta information for file\n");
   printf("  dfspart <logicalname> <part>   -- get meta information for part num\n");
   printf("  dfscsv <logicalnamemask>       -- get csv info. for files matching mask\n");
-  printf("  dfsgroup <logicalgroupname>    -- get IPs for logical group (aka cluster)\n");
+  printf("  dfsgroup <logicalgroupname> [filename] -- get IPs for logical group (aka cluster). Written to optional filename if provided\n");
   printf("  dfsmap <logicalname>           -- get part files (primary and replicates)\n");
   printf("  dfsexists <logicalname>        -- sets return value to 0 if file exists\n");
   printf("  dfsparents <logicalname>       -- list superfiles containing file\n");
@@ -606,17 +606,32 @@ void dfscsv(const char *dali,IUserDescriptor *udesc)
 
 //=============================================================================
 
-static void dfsgroup(const char *name)
+static void dfsgroup(const char *name, const char *outputFilename)
 {
+    Owned<IFileIOStream> io;
+    if (outputFilename)
+    {
+        OwnedIFile iFile = createIFile(outputFilename);
+        OwnedIFileIO iFileIO = iFile->open(IFOcreate);
+        io.setown(createIOStream(iFileIO));
+    }
     Owned<IGroup> group = queryNamedGroupStore().lookup(name);
-    if (!group) {
+    if (!group)
+    {
         ERRLOG("cannot find group %s",name);
         return;
     }
     StringBuffer eps;
-    for (unsigned i=0;i<group->ordinality();i++) {
+    for (unsigned i=0;i<group->ordinality();i++)
+    {
         group->queryNode(i).endpoint().getUrlStr(eps.clear());
-        OUTLOG("%s",eps.str());
+        if (io)
+        {
+            eps.newline();
+            io->write(eps.length(), eps.str());
+        }
+        else
+            OUTLOG("%s",eps.str());
     }
 }
 
@@ -2249,8 +2264,8 @@ int main(int argc, char* argv[])
                 dfscsv(params.item(1),userDesc);
             }
             else if (stricmp(cmd,"dfsgroup")==0) {
-                CHECKPARAMS(1,1);
-                dfsgroup(params.item(1));
+                CHECKPARAMS(1,2);
+                dfsgroup(params.item(1),(np>1)?params.item(2):NULL);
             }
             else if (stricmp(cmd,"dfsmap")==0) {
                 CHECKPARAMS(1,1);

+ 1 - 1
initfiles/componentfiles/thor/run_thor

@@ -34,7 +34,7 @@ echo $$ > $RUN_THOR_PID_NAME
 
 export SENTINEL="thor.sentinel"
 while [ 1 ]; do
-    daliadmin $DALISERVER dfsgroup ${groupName} > slaves
+    daliadmin $DALISERVER dfsgroup ${groupName} slaves
     errcode=$?
     if [ 0 != ${errcode} ]; then
     echo 'failed to lookup dali group for $groupName'

+ 1 - 0
roxie/roxiemem/roxiemem.hpp

@@ -314,6 +314,7 @@ public:
 #define RoxieRowCapacity(row)  roxiemem::HeapletBase::capacity(row)
 #define RoxieRowHasDestructor(row)  roxiemem::HeapletBase::hasDestructor(row)
 #define RoxieRowAllocatorId(row) roxiemem::HeapletBase::getAllocatorId(row)
+#define RoxieRowIsShared(row)  roxiemem::HeapletBase::isShared(row)
 
 class OwnedRoxieRow;
 class OwnedConstRoxieRow

+ 15 - 0
roxie/roxiemem/roxierow.cpp

@@ -196,6 +196,21 @@ protected:
         if (!rowset)
             return createRowset(newRowCount);
 
+        //Occasionally (in aggregates) we may try and append to a shared rowset.  In this case we need to clone the
+        //target rowset.  It could be that the rowset is unshared immediately, but that is inefficient at worst.
+        if (RoxieRowIsShared(rowset))
+        {
+            byte * * newset = createRowset(newRowCount);
+            for (unsigned i=0; i < oldRowCount; i++)
+            {
+                byte * cur = rowset[i];
+                LinkRoxieRow(cur);
+                newset[i] = cur;
+            }
+            ReleaseRoxieRow(rowset);
+            return newset;
+        }
+
         //This would be more efficient if previous capacity was stored by the caller - or if capacity() is more efficient
         if (newRowCount * sizeof(void *) <= RoxieRowCapacity(rowset))
             return rowset;

+ 9 - 3
thorlcr/activities/merge/thmergeslave.cpp

@@ -285,7 +285,7 @@ public:
         ActivityTimer s(totalCycles, timeActivities, NULL);
         ForEachItemIn(i, inputs) {
             IThorDataLink * input = inputs.item(i);
-            try { 
+            try {
                 startInput(input); 
             }
             catch (CATCHALL) {
@@ -294,7 +294,10 @@ public:
                     streams.item(s).stop();
                 throw;
             }
-            streams.append(*LINK(input));
+            if (input->isGrouped())
+                streams.append(*createUngroupStream(input));
+            else
+                streams.append(*LINK(input));
         }
 #ifndef _STABLE_MERGE
         // shuffle streams otherwise will all be reading in order initially
@@ -444,7 +447,10 @@ public:
                     streams.item(s).stop();
                 throw;
             }
-            streams.append(*LINK(input));
+            if (input->isGrouped())
+                streams.append(*createUngroupStream(input));
+            else
+                streams.append(*LINK(input));
         }
         Owned<IRowLinkCounter> linkcounter = new CThorRowLinkCounter;
         out.setown(createRowStreamMerger(streams.ordinality(), streams.getArray(), helper->queryCompare(), helper->dedup(), linkcounter));