فهرست منبع

Merge branch 'candidate-8.4.x'

Signed-off-by: Richard Chapman <rchapman@hpccsystems.com>
Richard Chapman 3 سال پیش
والد
کامیت
0199486291

+ 1 - 1
.github/workflows/build-containers-pr.yml

@@ -41,7 +41,7 @@ jobs:
           # echo ::set-output name=container_registry::ghcr.io
           # echo ::set-output name=cr_user::${{ github.repository_owner }}
           echo ::set-output name=container_registry::docker.io
-          echo ::set-output name=cr_user::hpccsystems
+          echo ::set-output name=cr_user::hpccbuilds
           echo ::set-output name=build_prbase_sha::${{ github.event.pull_request.base.sha }}
           echo ::set-output name=build_prbase_label::${{ github.base_ref }}
           echo ::set-output name=build_pr_label::pr-${{ github.event.number }}-${{ github.sha }}

+ 1 - 1
system/jlib/jmutex.hpp

@@ -31,7 +31,7 @@ extern jlib_decl void spinUntilReady(std::atomic_uint &value);
 
 #ifdef _DEBUG
 //#define SPINLOCK_USE_MUTEX // for testing
-#define SPINLOCK_RR_CHECK     // checks for realtime threads
+//#define SPINLOCK_RR_CHECK     // checks for realtime threads
 #define _ASSERT_LOCK_SUPPORT
 #endif
 

+ 10 - 5
system/metrics/sinks/prometheus/prometheusSink.cpp

@@ -64,16 +64,21 @@ PrometheusMetricSink::PrometheusMetricSink(const char *name, const IPropertyTree
             VStringBuffer respmessage(PROMETHEUS_METRICS_HTTP_ERROR, msg.str(), req.path.c_str(), res.status);
             res.set_content(respmessage.str(), PROMETHEUS_METRICS_SERVICE_RESP_TYPE);
 
-            LOG(MCuserInfo, "TxSummary[status=%d;user=@%s:%d;contLen=%ld;req=%s;]\n", res.status, req.remote_addr.c_str(), req.remote_port, req.content_length, req.method.c_str());
+            LOG(MCuserError, "PrometheusMetricsService Error: %s", msg.str());
+            LOG(MCuserInfo, "TxSummary[status=%d;user=@%s:%d;contLen=%ld;req=%s;path=%s]", res.status, req.remote_addr.c_str(), req.remote_port, req.content_length, req.method.c_str(), req.path.c_str());
         });
 
         m_server.Get(m_metricsServiceName.str(), [&](const Request& req, Response& res)
         {
+            LOG(MCdebugInfo, "GET PrometheusMetricsService%s, from %s:%d", req.path.c_str(), req.remote_addr.c_str(), req.remote_port);
+
             StringBuffer payload;
             toPrometheusMetrics(m_metricsManager->queryMetricsForReport(std::string(m_metricsSinkName.str())), payload, m_verbose);
 
             res.set_content(payload.str(), PROMETHEUS_METRICS_SERVICE_RESP_TYPE);
-            LOG(MCuserInfo, "TxSummary[status=%d;user=@%s:%d;contLen=%ld;req=GET;]\n",res.status, req.remote_addr.c_str(), req.remote_port, req.content_length);
+            res.status = 200;
+            LOG(MCdebugInfo, "PrometheusMetricsService Response: %s\n", payload.str());
+            LOG(MCuserInfo, "TxSummary[status=%d;user=@%s:%d;contLen=%ld;req=GET;path=%s]", res.status, req.remote_addr.c_str(), req.remote_port, req.content_length, req.path.c_str());
         });
     }
 }
@@ -87,7 +92,7 @@ const char * PrometheusMetricSink::mapHPCCMetricTypeToPrometheusStr(MetricType t
     case hpccMetrics::METRICS_GAUGE:
         return "gauge";
     default:
-        LOG(MCdebugInfo, "Encountered unknown metric - cannot map to Prometheus metric!");
+        LOG(MCinternalWarning, "Encountered unknown metric - cannot map to Prometheus metric!");
         return nullptr;
     }
 }
@@ -154,7 +159,7 @@ void PrometheusMetricSink::startCollection(MetricsManager *_pManager)
 
 void PrometheusMetricSink::stopCollection()
 {
-	LOG(MCoperatorProgress, "PrometheusMetricsService stopping:  port: '%i' uri: '%s' sinkname: '%s'\n", m_port, m_metricsServiceName.str(), m_metricsSinkName.str());
+    LOG(MCoperatorProgress, "PrometheusMetricsService stopping:  port: '%i' uri: '%s' sinkname: '%s'", m_port, m_metricsServiceName.str(), m_metricsSinkName.str());
     m_processing = false;
     m_server.stop();
     m_collectThread.join();
@@ -162,6 +167,6 @@ void PrometheusMetricSink::stopCollection()
 
 void PrometheusMetricSink::startServer()
 {
-    LOG(MCoperatorProgress, "PrometheusMetricsService started:  port: '%i' uri: '%s' sinkname: '%s'\n", m_port, m_metricsServiceName.str(), m_metricsSinkName.str());
+    LOG(MCoperatorProgress, "PrometheusMetricsService started:  port: '%i' uri: '%s' sinkname: '%s'", m_port, m_metricsServiceName.str(), m_metricsSinkName.str());
     m_server.listen(BIND_ALL_LOCAL_NICS, m_port);
 }

+ 13 - 5
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -56,9 +56,10 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase
         OwnedIFileIO iFileIO;
         CSVSplitter csvSplitter;
         CRC32 inputCRC;
-        bool readFinished;
         offset_t localOffset;
+        unsigned __int64 progress = 0;
         size32_t maxRowSize;
+        bool readFinished;
         bool processHeaderLines = false;
 
         unsigned splitLine(ISerialStream *inputStream, size32_t maxRowSize)
@@ -132,6 +133,7 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase
             inputStream.setown(createFileSerialStream(iFileIO));
             if (activity.headerLines)
                 processHeaderLines = true;
+            progress = 0;
         }
         virtual void close(CRC32 &fileCRC)
         {
@@ -140,9 +142,14 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase
                 CriticalBlock block(inputCs);
                 partFileIO.setown(iFileIO.getClear());
             }
-            mergeStats(closedPartFileStats, partFileIO);
-            partFileIO.clear();
-            inputStream.clear();
+            if (partFileIO)
+            {
+                mergeStats(closedPartFileStats, partFileIO);
+                closedPartFileStats.mergeStatistic(StNumDiskRowsRead, progress);
+                progress = 0;
+                partFileIO.clear();
+                inputStream.clear();
+            }
             fileCRC = inputCRC;
         }
         const void *nextRow()
@@ -160,7 +167,7 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase
                 if (res != 0)
                 {
                     localOffset += lineLength;
-                    ++activity.diskProgress;
+                    ++progress;
                     return row.finalizeRowClear(res);
                 }
             }
@@ -171,6 +178,7 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase
             CriticalBlock block(inputCs); // Ensure iFileIO remains valid for the duration of mergeStats()
             CDiskPartHandlerBase::gatherStats(merged);
             mergeStats(merged, iFileIO);
+            merged.mergeStatistic(StNumDiskRowsRead, progress);
         }
     };
 

+ 3 - 10
thorlcr/activities/diskread/thdiskreadslave.cpp

@@ -61,7 +61,6 @@ protected:
     bool hasMatchFilter = false;
     size32_t diskRowMinSz;
     unsigned numSegFieldsUsed = 0;
-    rowcount_t totalProgress = 0;
     rowcount_t stopAfter = 0;
     rowcount_t remoteLimit = 0;
     rowcount_t limit = 0;
@@ -133,13 +132,6 @@ public:
         segMonitors.kill();
         helper->createSegmentMonitors(this);
     }
-
-    virtual void serializeStats(MemoryBuffer &mb) override
-    {
-        if (partHandler)
-            diskProgress = totalProgress + partHandler->queryProgress();
-        CDiskReadSlaveActivityBase::serializeStats(mb);
-    }
 friend class CDiskRecordPartHandler;
 };
 
@@ -190,6 +182,8 @@ public:
         CriticalBlock block(inputCs); // Ensure iFileIO remains valid for the duration of mergeStats()
         CDiskPartHandlerBase::gatherStats(merged);
         mergeStats(merged, in);
+        if (in)
+            merged.mergeStatistic(StNumDiskRowsRead, in->queryProgress());
     }
     virtual unsigned __int64 queryProgress() override
     {
@@ -253,8 +247,6 @@ void CDiskRecordPartHandler::open()
         CriticalBlock block(inputCs);
         partStream.swap(in);
     }
-    if (partStream)
-        activity.totalProgress += partStream->queryProgress();
     partStream.clear();
 
     unsigned rwFlags = 0;
@@ -411,6 +403,7 @@ void CDiskRecordPartHandler::close(CRC32 &fileCRC)
     }
     if (partStream)
     {
+        closedPartFileStats.mergeStatistic(StNumDiskRowsRead, partStream->queryProgress());
         activity.mergeSubFileStats(partDesc, partStream);
         partStream->stop(&fileCRC);
     }

+ 1 - 1
thorlcr/activities/fetch/thfetch.cpp

@@ -78,7 +78,7 @@ public:
             IDistributedSuperFile *super = fetchFile->querySuperFile();
             unsigned numsubs = super?super->numSubFiles(true):0;
             for (unsigned i=0; i<numsubs; i++)
-                subFileStats.push_back(new CThorStatsCollection(diskReadActivityStatistics));
+                subFileStats.push_back(new CThorStatsCollection(diskReadPartStatistics));
 
             mapping.setown(getFileSlaveMaps(fetchFile->queryLogicalName(), *fileDesc, container.queryJob().queryUserDescriptor(), container.queryJob().querySlaveGroup(), container.queryLocalOrGrouped(), false, NULL, super));
             mapping->serializeFileOffsetMap(offsetMapMb);

+ 2 - 2
thorlcr/activities/fetch/thfetchslave.cpp

@@ -159,7 +159,7 @@ public:
                 e->base = part.queryProperties().getPropInt64("@offset");
                 e->top = e->base + part.queryProperties().getPropInt64("@size");
                 e->index = f;
-                e->file = queryThor().queryFileCache().lookupIFileIO(owner, logicalFilename, part, nullptr, diskReadActivityStatistics); // NB: freed by FPosTableEntryIFileIO dtor
+                e->file = queryThor().queryFileCache().lookupIFileIO(owner, logicalFilename, part, nullptr, diskReadPartStatistics); // NB: freed by FPosTableEntryIFileIO dtor
             }
         }
     }
@@ -389,7 +389,7 @@ public:
             ISuperFileDescriptor *super = parts.item(0).queryOwner().querySuperFileDescriptor();
             if (super)
                 for (unsigned i=0; i<files; i++)
-                    subFileStats.push_back(new CRuntimeStatisticCollection(diskReadActivityStatistics));
+                    subFileStats.push_back(new CRuntimeStatisticCollection(diskReadPartStatistics));
         }
 
         unsigned encryptedKeyLen;

+ 4 - 6
thorlcr/activities/thdiskbaseslave.cpp

@@ -64,7 +64,7 @@ void getPartsMetaInfo(ThorDataLinkMetaInfo &metaInfo, unsigned nparts, IPartDesc
 //////////////////////////////////////////////
 
 CDiskPartHandlerBase::CDiskPartHandlerBase(CDiskReadSlaveActivityBase &_activity) 
-    : activity(_activity), closedPartFileStats(diskReadRemoteStatistics)
+    : activity(_activity), closedPartFileStats(diskReadPartStatistics)
 {
     checkFileCrc = activity.checkFileCrc;
     which = 0;
@@ -273,7 +273,6 @@ void CDiskReadSlaveActivityBase::start()
 {
     PARENT::start();
     markStart = true;
-    diskProgress = 0;
     unsigned encryptedKeyLen;
     void *encryptedKey;
     helper->getEncryptKey(encryptedKeyLen, encryptedKey);
@@ -314,13 +313,12 @@ IThorRowInterfaces * CDiskReadSlaveActivityBase::queryProjectedDiskRowInterfaces
 
 void CDiskReadSlaveActivityBase::serializeStats(MemoryBuffer &mb)
 {
-    CRuntimeStatisticCollection activeStats(diskReadRemoteStatistics);
+    CRuntimeStatisticCollection activePartStats(diskReadPartStatistics);
     if (partHandler)
     {
-        partHandler->gatherStats(activeStats);
-        stats.set(activeStats); // replace disk read stats
+        partHandler->gatherStats(activePartStats);
+        stats.set(activePartStats); // replace disk read stats
     }
-    stats.setStatistic(StNumDiskRowsRead, diskProgress);
     PARENT::serializeStats(mb);
     for (auto &stats: subFileStats)
         stats->serialize(mb);

+ 0 - 1
thorlcr/activities/thdiskbaseslave.ipp

@@ -91,7 +91,6 @@ protected:
     mutable ThorDataLinkMetaInfo cachedMetaInfo;
     Owned<CDiskPartHandlerBase> partHandler;
     Owned<IExpander> eexp;
-    rowcount_t diskProgress = 0;
     std::vector<OwnedPtr<CRuntimeStatisticCollection>> subFileStats;
 
 public:

+ 10 - 2
thorlcr/activities/xmlread/thxmlreadslave.cpp

@@ -52,6 +52,7 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase
         OwnedIFileIO iFileIO;
         Owned<IIOStream> inputIOstream;
         offset_t localOffset;  // not sure what this is for 
+        unsigned __int64 progress = 0;
         Linked<IEngineRowAllocator> allocator;
 
     public:
@@ -96,6 +97,7 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase
                 xmlParser.setown(createJSONParse(*inputIOstream.get(), xmlIterator, *this, (0 != (TDRxmlnoroot & activity.helper->getFlags()))?ptr_noRoot:ptr_none, 0 != (TDRusexmlcontents & activity.helper->getFlags())));
             else
                 xmlParser.setown(createXMLParse(*inputIOstream.get(), xmlIterator, *this, (0 != (TDRxmlnoroot & activity.helper->getFlags()))?ptr_noRoot:ptr_none, 0 != (TDRusexmlcontents & activity.helper->getFlags())));
+            progress = 0;
         }
         virtual void close(CRC32 &fileCRC)
         {
@@ -111,7 +113,12 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase
                 CriticalBlock block(inputCs);
                 partFileIO.setown(iFileIO.getClear());
             }
-            mergeStats(closedPartFileStats, partFileIO);
+            if (partFileIO)
+            {
+                mergeStats(closedPartFileStats, partFileIO);
+                closedPartFileStats.mergeStatistic(StNumDiskRowsRead, progress);
+                progress = 0;
+            }
         }
 
         const void *nextRow()
@@ -131,7 +138,7 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase
                         if (sz)
                         {
                             localOffset = 0;
-                            ++activity.diskProgress;
+                            ++progress;
                             return row.finalizeRowClear(sz);
                         }
                     }
@@ -201,6 +208,7 @@ class CXmlReadSlaveActivity : public CDiskReadSlaveActivityBase
             CriticalBlock block(inputCs);
             CDiskPartHandlerBase::gatherStats(merged);
             mergeStats(merged, iFileIO);
+            merged.mergeStatistic(StNumDiskRowsRead, progress);
         }
     };
 public:

+ 1 - 0
thorlcr/thorutil/thormisc.cpp

@@ -86,6 +86,7 @@ const StatisticsMapping diskReadActivityStatistics({StNumDiskRowsRead}, basicAct
 const StatisticsMapping diskWriteActivityStatistics({StPerReplicated}, basicActivityStatistics, diskWriteRemoteStatistics);
 const StatisticsMapping sortActivityStatistics({}, basicActivityStatistics, spillStatistics);
 const StatisticsMapping graphStatistics({StNumExecutions}, basicActivityStatistics);
+const StatisticsMapping diskReadPartStatistics({StNumDiskRowsRead}, diskReadRemoteStatistics);
 
 
 MODULE_INIT(INIT_PRIORITY_STANDARD)

+ 1 - 0
thorlcr/thorutil/thormisc.hpp

@@ -133,6 +133,7 @@ extern graph_decl const StatisticsMapping keyedJoinActivityStatistics;
 extern graph_decl const StatisticsMapping lookupJoinActivityStatistics;
 extern graph_decl const StatisticsMapping loopActivityStatistics;
 extern graph_decl const StatisticsMapping diskReadActivityStatistics;
+extern graph_decl const StatisticsMapping diskReadPartStatistics;
 extern graph_decl const StatisticsMapping diskWriteActivityStatistics;
 extern graph_decl const StatisticsMapping sortActivityStatistics;