123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #ifndef SLAVE_HPP
- #define SLAVE_HPP
- #ifdef ACTIVITYSLAVES_EXPORTS
- #define activityslaves_decl DECL_EXPORT
- #else
- #define activityslaves_decl DECL_IMPORT
- #endif
- #include "jio.hpp"
- #include "jsocket.hpp"
- #include "slavmain.hpp"
- #include "thor.hpp"
- #include "eclhelper.hpp" // for IRecordSize
- #include "thgraph.hpp"
- #include "thorstep.hpp"
- #include "roxiestream.hpp"
- struct ThorDataLinkMetaInfo
- {
- __int64 totalRowsMin = 0; // set to 0 if not known
- __int64 totalRowsMax = -1; // set to -1 if not known
- offset_t spilled = (offset_t)-1; // amount "spilled" to disk (approx) (offset_t)-1 for not known
- bool isSource = false;
- bool isSequential = false;
- bool canStall = false;
- bool fastThrough = false;
- bool buffersInput = false;
- bool canBufferInput = false;
- bool singleRowOutput = false;
- bool canIncreaseNumRows = false;
- bool canReduceNumRows = false;
- bool unknownRowsOutput = false; // cannot use input to deduce total
- offset_t byteTotal = (offset_t)-1; // total (uncompressed) byte count of all rows
- };
- #ifdef _MSC_VER
- #pragma warning (push)
- #pragma warning( disable : 4275 )
- #endif
- #define MAX_SENSIBLE_STRANDS 1024 // Architecture dependent...
- class CThorStrandOptions
- {
- // Typically set from hints, common to many stranded activities
- public:
- explicit CThorStrandOptions(CGraphElementBase &container)
- {
- //PARALLEL(1) can be used to explicitly disable parallel processing.
- numStrands = container.queryXGMML().getPropInt("att[@name='parallel']/@value", 0);
- if ((numStrands == NotFound) || (numStrands > MAX_SENSIBLE_STRANDS))
- {
- unsigned channels = container.queryJob().queryJobChannels();
- unsigned cores = container.queryMaxCores();
- numStrands = (cores + (channels-1)) / channels; // i.e. round up
- }
- if (0 == numStrands)
- numStrands = container.queryJob().getOptInt("forceNumStrands");
- blockSize = container.queryJob().getOptInt("strandBlockSize");
- }
- public:
- unsigned numStrands = 0; // if 1 it forces single-stranded operations. (Useful for testing.)
- unsigned blockSize = 0;
- };
- interface IStrandJunction;
- interface IOrderedCallbackCollection;
- class CSlaveActivity;
- interface IThorDataLink : extends IInterface
- {
- virtual void start() = 0; // prepares input
- virtual CSlaveActivity *queryFromActivity() = 0; // activity that has this as an output
- virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const = 0;
- virtual bool isGrouped() const { return false; }
- virtual IOutputMetaData * queryOutputMeta() const = 0;
- virtual bool isInputOrdered(bool consumerOrdered) const = 0;
- virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) = 0;
- virtual void setOutputStream(unsigned index, IEngineRowStream *stream) = 0;
- // progress methods
- virtual void dataLinkSerialize(MemoryBuffer &mb) const = 0;
- virtual rowcount_t getProgressCount() const = 0;
- // timing methods
- virtual unsigned __int64 queryTotalCycles() const = 0;
- virtual unsigned __int64 queryEndCycles() const = 0;
- // debugging methods
- virtual void debugRequest(MemoryBuffer &mb) = 0;
- // Stepping methods
- virtual IInputSteppingMeta *querySteppingMeta() { return NULL; }
- virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
- };
- inline void serializeNullItdl(MemoryBuffer &mb) { mb.append((rowcount_t)0); }
- class CThorInput;
- interface IThorSlaveActivity
- {
- virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) = 0;
- virtual void setInputStream(unsigned index, CThorInput &input, bool consumerOrdered) = 0;
- virtual void processDone(MemoryBuffer &mb) = 0;
- virtual void reset() = 0;
- };
- #ifdef _MSC_VER
- #pragma warning (pop)
- #endif
- // utility redirects
- extern activityslaves_decl IThorRowInterfaces * queryRowInterfaces(IThorDataLink *link);
- extern activityslaves_decl IEngineRowAllocator * queryRowAllocator(IThorDataLink *link);
- extern activityslaves_decl IOutputRowSerializer * queryRowSerializer(IThorDataLink *link);
- extern activityslaves_decl IOutputRowDeserializer * queryRowDeserializer(IThorDataLink *link);
- extern activityslaves_decl IOutputMetaData *queryRowMetaData(IThorDataLink *link);
- extern activityslaves_decl unsigned queryActivityId(IThorDataLink *link);
- extern activityslaves_decl ICodeContext *queryCodeContext(IThorDataLink *link);
- extern activityslaves_decl void dummyProc();
- #endif
|