slave.hpp 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #ifndef SLAVE_HPP
  14. #define SLAVE_HPP
  15. #ifdef ACTIVITYSLAVES_EXPORTS
  16. #define activityslaves_decl DECL_EXPORT
  17. #else
  18. #define activityslaves_decl DECL_IMPORT
  19. #endif
  20. #include "jio.hpp"
  21. #include "jsocket.hpp"
  22. #include "slavmain.hpp"
  23. #include "thor.hpp"
  24. #include "eclhelper.hpp" // for IRecordSize
  25. #include "thgraph.hpp"
  26. #include "thorstep.hpp"
  27. #include "roxiestream.hpp"
  28. struct ThorDataLinkMetaInfo
  29. {
  30. __int64 totalRowsMin = 0; // set to 0 if not known
  31. __int64 totalRowsMax = -1; // set to -1 if not known
  32. offset_t spilled = (offset_t)-1; // amount "spilled" to disk (approx) (offset_t)-1 for not known
  33. bool isSource = false;
  34. bool isSequential = false;
  35. bool canStall = false;
  36. bool fastThrough = false;
  37. bool buffersInput = false;
  38. bool canBufferInput = false;
  39. bool singleRowOutput = false;
  40. bool canIncreaseNumRows = false;
  41. bool canReduceNumRows = false;
  42. bool unknownRowsOutput = false; // cannot use input to deduce total
  43. offset_t byteTotal = (offset_t)-1; // total (uncompressed) byte count of all rows
  44. };
  45. #ifdef _MSC_VER
  46. #pragma warning (push)
  47. #pragma warning( disable : 4275 )
  48. #endif
  49. #define MAX_SENSIBLE_STRANDS 1024 // Architecture dependent...
  50. class CThorStrandOptions
  51. {
  52. // Typically set from hints, common to many stranded activities
  53. public:
  54. explicit CThorStrandOptions(CGraphElementBase &container)
  55. {
  56. //PARALLEL(1) can be used to explicitly disable parallel processing.
  57. numStrands = container.queryXGMML().getPropInt("att[@name='parallel']/@value", 0);
  58. if ((numStrands == NotFound) || (numStrands > MAX_SENSIBLE_STRANDS))
  59. {
  60. unsigned channels = container.queryJob().queryJobChannels();
  61. unsigned cores = container.queryMaxCores();
  62. numStrands = (cores + (channels-1)) / channels; // i.e. round up
  63. }
  64. if (0 == numStrands)
  65. numStrands = container.queryJob().getOptInt("forceNumStrands");
  66. blockSize = container.queryJob().getOptInt("strandBlockSize");
  67. }
  68. public:
  69. unsigned numStrands = 0; // if 1 it forces single-stranded operations. (Useful for testing.)
  70. unsigned blockSize = 0;
  71. };
  72. interface IStrandJunction;
  73. interface IOrderedCallbackCollection;
  74. class CSlaveActivity;
  75. interface IThorDataLink : extends IInterface
  76. {
  77. virtual void start() = 0; // prepares input
  78. virtual CSlaveActivity *queryFromActivity() = 0; // activity that has this as an output
  79. virtual void getMetaInfo(ThorDataLinkMetaInfo &info) const = 0;
  80. virtual bool isGrouped() const { return false; }
  81. virtual IOutputMetaData * queryOutputMeta() const = 0;
  82. virtual bool isInputOrdered(bool consumerOrdered) const = 0;
  83. virtual IStrandJunction *getOutputStreams(CActivityBase &_ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) = 0;
  84. virtual void setOutputStream(unsigned index, IEngineRowStream *stream) = 0;
  85. // progress methods
  86. virtual void dataLinkSerialize(MemoryBuffer &mb) const = 0;
  87. virtual rowcount_t getProgressCount() const = 0;
  88. // timing methods
  89. virtual unsigned __int64 queryTotalCycles() const = 0;
  90. virtual unsigned __int64 queryEndCycles() const = 0;
  91. // debugging methods
  92. virtual void debugRequest(MemoryBuffer &mb) = 0;
  93. // Stepping methods
  94. virtual IInputSteppingMeta *querySteppingMeta() { return NULL; }
  95. virtual bool gatherConjunctions(ISteppedConjunctionCollector & collector) { return false; }
  96. };
  97. inline void serializeNullItdl(MemoryBuffer &mb) { mb.append((rowcount_t)0); }
  98. class CThorInput;
  99. interface IThorSlaveActivity
  100. {
  101. virtual void init(MemoryBuffer &data, MemoryBuffer &slaveData) = 0;
  102. virtual void setInputStream(unsigned index, CThorInput &input, bool consumerOrdered) = 0;
  103. virtual void processDone(MemoryBuffer &mb) = 0;
  104. virtual void reset() = 0;
  105. };
  106. #ifdef _MSC_VER
  107. #pragma warning (pop)
  108. #endif
  109. // utility redirects
  110. extern activityslaves_decl IThorRowInterfaces * queryRowInterfaces(IThorDataLink *link);
  111. extern activityslaves_decl IEngineRowAllocator * queryRowAllocator(IThorDataLink *link);
  112. extern activityslaves_decl IOutputRowSerializer * queryRowSerializer(IThorDataLink *link);
  113. extern activityslaves_decl IOutputRowDeserializer * queryRowDeserializer(IThorDataLink *link);
  114. extern activityslaves_decl IOutputMetaData *queryRowMetaData(IThorDataLink *link);
  115. extern activityslaves_decl unsigned queryActivityId(IThorDataLink *link);
  116. extern activityslaves_decl ICodeContext *queryCodeContext(IThorDataLink *link);
  117. extern activityslaves_decl void dummyProc();
  118. #endif