Introduce a thread pool, to handle the send streams (and compression/deduplication) of the hash distribute outputs Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
@@ -591,10 +591,12 @@ ITimeReporter *timer;
ITimeReporter *createStdTimeReporter() { return new DefaultTimeReporter(); }
ITimeReporter *createStdTimeReporter(MemoryBuffer &mb) { return new DefaultTimeReporter(mb); }
+cycle_t oneSecInCycles;
MODULE_INIT(INIT_PRIORITY_JDEBUG1)
{
// perform v. early in process startup, ideally this would grab process exclusively for the 2 100ths of a sec it performs calc.
calibrate_timing();
+ oneSecInCycles = nanosec_to_cycle(1000000000);
return 1;
}
@@ -87,6 +87,7 @@ struct ITimeReporter : public IInterface
virtual void serialize(MemoryBuffer &mb) = 0;
};
+extern jlib_decl cycle_t oneSecInCycles;
class CCycleTimer
cycle_t start_time;
@@ -108,6 +109,7 @@ public:
return static_cast<unsigned>(cycle_to_nanosec(elapsedCycles())/1000000);
+inline cycle_t queryOneSecCycles() { return oneSecInCycles; }
class jlib_decl TimeSection
@@ -30,10 +30,10 @@ interface IRowStreamWithMetaData: extends IRowStream
interface IHashDistributor: extends IInterface
- virtual IRowStream *connect(IRowStream *in, IHash *_ihash, ICompare *_icompare)=0;
+ virtual IRowStream *connect(IRowStream *in, IHash *ihash, ICompare *icompare)=0;
virtual void disconnect(bool stop)=0;
virtual void join()=0;
- virtual void setBufferSizes(unsigned _sendBufferSize, unsigned _outputBufferSize, unsigned _pullBufferSize) = 0;
+ virtual void setBufferSizes(unsigned sendBufferSize, unsigned outputBufferSize, unsigned pullBufferSize) = 0;
interface IStopInput;
@@ -2754,6 +2754,12 @@ IOutputRowDeserializer * CActivityBase::queryRowDeserializer()
return rowDeserializer;
+IRowInterfaces *CActivityBase::getRowInterfaces()
+{
+ // create an independent instance, to avoid circular link dependency problems
+ return createRowInterfaces(queryRowMetaData(), container.queryId(), queryCodeContext());
+}
+
bool CActivityBase::receiveMsg(CMessageBuffer &mb, const rank_t rank, const mptag_t mpTag, rank_t *sender, unsigned timeout)
BooleanOnOff onOff(receiving);
@@ -960,7 +960,7 @@ public:
bool firstNode() { return 1 == container.queryJob().queryMyRank(); }
bool lastNode() { return container.queryJob().querySlaves() == container.queryJob().queryMyRank(); }
unsigned queryMaxCores() const { return maxCores; }
-
+ IRowInterfaces *getRowInterfaces();
virtual void setInput(unsigned index, CActivityBase *inputActivity, unsigned inputOutIdx) { }
virtual void clearConnections() { }