123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #ifndef THORCOMMON_HPP
- #define THORCOMMON_HPP
- #include "jiface.hpp"
- #include "jcrc.hpp"
- #include "jsort.hpp"
- #include "eclhelper.hpp"
- #include "thorhelper.hpp"
- #include "thorxmlwrite.hpp"
- #define DALI_RESULT_OUTPUTMAX 2000 // MB
- class THORHELPER_API CSizingSerializer : implements IRowSerializerTarget
- {
- size32_t totalsize;
- public:
- inline CSizingSerializer() { reset(); }
- inline void reset() { totalsize = 0; }
- inline size32_t size() { return totalsize; }
- void put(size32_t len, const void * ptr);
- size32_t beginNested();
- void endNested(size32_t position);
- };
- class THORHELPER_API CMemoryRowSerializer: implements IRowSerializerTarget
- {
- MemoryBuffer & buffer;
- unsigned nesting;
- public:
- inline CMemoryRowSerializer(MemoryBuffer & _buffer)
- : buffer(_buffer)
- {
- nesting = 0;
- }
- void put(size32_t len, const void * ptr);
- size32_t beginNested();
- void endNested(size32_t sizePos);
- };
- // useful package
- interface IRowInterfaces: extends IInterface
- {
- virtual IEngineRowAllocator * queryRowAllocator()=0;
- virtual IOutputRowSerializer * queryRowSerializer()=0;
- virtual IOutputRowDeserializer * queryRowDeserializer()=0;
- virtual IOutputMetaData *queryRowMetaData()=0;
- virtual unsigned queryActivityId()=0;
- virtual ICodeContext *queryCodeContext()=0;
- };
- extern THORHELPER_API void useMemoryMappedRead(bool on);
- extern THORHELPER_API IRowInterfaces *createRowInterfaces(IOutputMetaData *meta, unsigned actid, ICodeContext *context);
- enum RowReaderWriterFlags
- {
- rw_grouped = 0x1,
- rw_crc = 0x2,
- rw_extend = 0x4,
- rw_compress = 0x8,
- rw_compressblkcrc = 0x10, // block compression, this sets/checks crc's at block level
- rw_fastlz = 0x20, // if rw_compress
- rw_autoflush = 0x40,
- rw_buffered = 0x80
- };
- #define DEFAULT_RWFLAGS (rw_buffered|rw_autoflush|rw_compressblkcrc)
- inline bool TestRwFlag(unsigned flags, RowReaderWriterFlags flag) { return 0 != (flags & flag); }
- interface IExtRowStream: extends IRowStream
- {
- virtual offset_t getOffset() = 0;
- virtual void stop(CRC32 *crcout=NULL) = 0;
- virtual const void *prefetchRow(size32_t *sz=NULL) = 0;
- virtual void prefetchDone() = 0;
- virtual void reinit(offset_t offset,offset_t len,unsigned __int64 maxrows) = 0;
- };
- interface IExtRowWriter: extends IRowWriter
- {
- virtual offset_t getPosition() = 0;
- virtual void flush(CRC32 *crcout=NULL) = 0;
- };
- interface IExpander;
- extern THORHELPER_API IExtRowStream *createRowStream(IFile *file, IRowInterfaces *rowif, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=NULL);
- extern THORHELPER_API IExtRowStream *createRowStreamEx(IFile *file, IRowInterfaces *rowif, offset_t offset=0, offset_t len=(offset_t)-1, unsigned __int64 maxrows=(unsigned __int64)-1, unsigned flags=DEFAULT_RWFLAGS, IExpander *eexp=NULL);
- interface ICompressor;
- extern THORHELPER_API IExtRowWriter *createRowWriter(IFile *file, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS, ICompressor *compressor=NULL);
- extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIO *fileIO, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS);
- extern THORHELPER_API IExtRowWriter *createRowWriter(IFileIOStream *strm, IRowInterfaces *rowIf, unsigned flags=DEFAULT_RWFLAGS); // strm should be unbuffered
- interface THORHELPER_API IDiskMerger : extends IInterface
- {
- virtual void put(const void **rows, unsigned numrows) = 0;
- virtual void putIndirect(const void ***rowptrs, unsigned numrows) = 0; // like put only with an additional dereference, i.e. row i is *(rowptrs[i])
- virtual void put(ISortedRowProvider * rows) = 0;
- virtual IRowStream *merge(ICompare *icompare,bool partdedup=false) = 0;
- virtual count_t mergeTo(IRowWriter *dest,ICompare *icompare,bool partdedup=false) = 0; // alternative to merge
- virtual IRowWriter *createWriteBlock() = 0;
- };
- extern THORHELPER_API IDiskMerger *createDiskMerger(IRowInterfaces *rowInterfaces, IRowLinkCounter *linker, const char *tempnamebase);
- extern THORHELPER_API void testDiskSort();
- #define TIME_ACTIVITIES
- interface IActivityTimer : extends IInterface
- {
- virtual unsigned __int64 getCyclesAdjustment() const = 0;
- };
- #ifdef TIME_ACTIVITIES
- #include "jdebug.hpp"
- class ActivityTimer
- {
- unsigned __int64 startCycles;
- unsigned __int64 &accumulator;
- protected:
- const bool &enabled;
- IActivityTimer *iActivityTimer;
- public:
- inline ActivityTimer(unsigned __int64 &_accumulator, const bool &_enabled, IActivityTimer *_iActivityTimer) : accumulator(_accumulator), enabled(_enabled), iActivityTimer(_iActivityTimer)
- {
- if (enabled)
- {
- startCycles = get_cycles_now();
- if (iActivityTimer)
- startCycles -= iActivityTimer->getCyclesAdjustment();
- }
- }
- inline ~ActivityTimer()
- {
- if (enabled)
- {
- unsigned __int64 elapsedCycles = get_cycles_now() - startCycles;
- if (iActivityTimer)
- elapsedCycles -= iActivityTimer->getCyclesAdjustment();
- accumulator += elapsedCycles;
- }
- }
- };
- #else
- struct ActivityTimer
- {
- inline ActivityTimer(unsigned __int64 &_accumulator, const bool &_enabled, IActivityTimer *_iActivityTimer) { }
- };
- #endif
- class THORHELPER_API IndirectCodeContext : implements ICodeContext
- {
- public:
- IndirectCodeContext(ICodeContext * _ctx = NULL) : ctx(_ctx) {}
- void set(ICodeContext * _ctx) { ctx = _ctx; }
- virtual const char *loadResource(unsigned id)
- {
- return ctx->loadResource(id);
- }
- virtual void setResultBool(const char *name, unsigned sequence, bool value)
- {
- ctx->setResultBool(name, sequence, value);
- }
- virtual void setResultData(const char *name, unsigned sequence, int len, const void * data)
- {
- ctx->setResultData(name, sequence, len, data);
- }
- virtual void setResultDecimal(const char * stepname, unsigned sequence, int len, int precision, bool isSigned, const void *val)
- {
- ctx->setResultDecimal(stepname, sequence, len, precision, isSigned, val);
- }
- virtual void setResultInt(const char *name, unsigned sequence, __int64 value)
- {
- ctx->setResultInt(name, sequence, value);
- }
- virtual void setResultRaw(const char *name, unsigned sequence, int len, const void * data)
- {
- ctx->setResultRaw(name, sequence, len, data);
- }
- virtual void setResultReal(const char * stepname, unsigned sequence, double value)
- {
- ctx->setResultReal(stepname, sequence, value);
- }
- virtual void setResultSet(const char *name, unsigned sequence, bool isAll, size32_t len, const void * data, ISetToXmlTransformer * transformer)
- {
- ctx->setResultSet(name, sequence, isAll, len, data, transformer);
- }
- virtual void setResultString(const char *name, unsigned sequence, int len, const char * str)
- {
- ctx->setResultString(name, sequence, len, str);
- }
- virtual void setResultUInt(const char *name, unsigned sequence, unsigned __int64 value)
- {
- ctx->setResultUInt(name, sequence, value);
- }
- virtual void setResultUnicode(const char *name, unsigned sequence, int len, UChar const * str)
- {
- ctx->setResultUnicode(name, sequence, len, str);
- }
- virtual void setResultVarString(const char * name, unsigned sequence, const char * value)
- {
- ctx->setResultVarString(name, sequence, value);
- }
- virtual void setResultVarUnicode(const char * name, unsigned sequence, UChar const * value)
- {
- ctx->setResultVarUnicode(name, sequence, value);
- }
- virtual bool getResultBool(const char * name, unsigned sequence)
- {
- return ctx->getResultBool(name, sequence);
- }
- virtual void getResultData(unsigned & tlen, void * & tgt, const char * name, unsigned sequence)
- {
- ctx->getResultData(tlen, tgt, name, sequence);
- }
- virtual void getResultDecimal(unsigned tlen, int precision, bool isSigned, void * tgt, const char * stepname, unsigned sequence)
- {
- ctx->getResultDecimal(tlen, precision, isSigned, tgt, stepname, sequence);
- }
- virtual void getResultRaw(unsigned & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
- {
- ctx->getResultRaw(tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
- }
- virtual void getResultSet(bool & isAll, size32_t & tlen, void * & tgt, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
- {
- ctx->getResultSet(isAll, tlen, tgt, name, sequence, xmlTransformer, csvTransformer);
- }
- virtual __int64 getResultInt(const char * name, unsigned sequence)
- {
- return ctx->getResultInt(name, sequence);
- }
- virtual double getResultReal(const char * name, unsigned sequence)
- {
- return ctx->getResultReal(name, sequence);
- }
- virtual void getResultString(unsigned & tlen, char * & tgt, const char * name, unsigned sequence)
- {
- ctx->getResultString(tlen, tgt, name, sequence);
- }
- virtual void getResultStringF(unsigned tlen, char * tgt, const char * name, unsigned sequence)
- {
- ctx->getResultStringF(tlen, tgt, name, sequence);
- }
- virtual void getResultUnicode(unsigned & tlen, UChar * & tgt, const char * name, unsigned sequence)
- {
- ctx->getResultUnicode(tlen, tgt, name, sequence);
- }
- virtual char *getResultVarString(const char * name, unsigned sequence)
- {
- return ctx->getResultVarString(name, sequence);
- }
- virtual UChar *getResultVarUnicode(const char * name, unsigned sequence)
- {
- return ctx->getResultVarUnicode(name, sequence);
- }
- virtual unsigned getResultHash(const char * name, unsigned sequence)
- {
- return ctx->getResultHash(name, sequence);
- }
- virtual char *getWuid()
- {
- return ctx->getWuid();
- }
- virtual void getExternalResultRaw(unsigned & tlen, void * & tgt, const char * wuid, const char * stepname, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
- {
- ctx->getExternalResultRaw(tlen, tgt, wuid, stepname, sequence, xmlTransformer, csvTransformer);
- }
- virtual void executeGraph(const char * graphName, bool realThor, size32_t parentExtractSize, const void * parentExtract)
- {
- ctx->executeGraph(graphName, realThor, parentExtractSize, parentExtract);
- }
- virtual char * getExpandLogicalName(const char * logicalName)
- {
- return ctx->getExpandLogicalName(logicalName);
- }
- virtual void addWuException(const char * text, unsigned code, unsigned severity)
- {
- ctx->addWuException(text, code, severity);
- }
- virtual void addWuAssertFailure(unsigned code, const char * text, const char * filename, unsigned lineno, unsigned column, bool isAbort)
- {
- ctx->addWuAssertFailure(code, text, filename, lineno, column, isAbort);
- }
- virtual IUserDescriptor *queryUserDescriptor()
- {
- return ctx->queryUserDescriptor();
- }
- virtual IThorChildGraph * resolveChildQuery(__int64 activityId, IHThorArg * colocal)
- {
- return ctx->resolveChildQuery(activityId, colocal);
- }
- virtual unsigned __int64 getDatasetHash(const char * name, unsigned __int64 hash)
- {
- return ctx->getDatasetHash(name, hash);
- }
- virtual unsigned getNodes()
- {
- return ctx->getNodes();
- }
- virtual unsigned getNodeNum()
- {
- return ctx->getNodeNum();
- }
- virtual char *getFilePart(const char *logicalPart, bool create)
- {
- return ctx->getFilePart(logicalPart, create);
- }
- virtual unsigned __int64 getFileOffset(const char *logicalPart)
- {
- return ctx->getFileOffset(logicalPart);
- }
- virtual IDistributedFileTransaction *querySuperFileTransaction()
- {
- return ctx->querySuperFileTransaction();
- }
- virtual char *getEnv(const char *name, const char *defaultValue) const
- {
- return ctx->getEnv(name, defaultValue);
- }
- virtual char *getJobName()
- {
- return ctx->getJobName();
- }
- virtual char *getJobOwner()
- {
- return ctx->getJobOwner();
- }
- virtual char *getClusterName()
- {
- return ctx->getClusterName();
- }
- virtual char *getGroupName()
- {
- return ctx->getGroupName();
- }
- virtual char * queryIndexMetaData(char const * lfn, char const * xpath)
- {
- return ctx->queryIndexMetaData(lfn, xpath);
- }
- virtual unsigned getPriority() const
- {
- return ctx->getPriority();
- }
- virtual char *getPlatform()
- {
- return ctx->getPlatform();
- }
- virtual char *getOS()
- {
- return ctx->getOS();
- }
- virtual IEclGraphResults * resolveLocalQuery(__int64 activityId)
- {
- return ctx->resolveLocalQuery(activityId);
- }
- virtual char *getEnv(const char *name, const char *defaultValue)
- {
- return ctx->getEnv(name, defaultValue);
- }
- virtual unsigned logString(const char *text) const
- {
- return ctx->logString(text);
- }
- virtual const IContextLogger &queryContextLogger() const
- {
- return ctx->queryContextLogger();
- }
- virtual IDebuggableContext *queryDebugContext() const
- {
- return ctx->queryDebugContext();
- }
- virtual IEngineRowAllocator * getRowAllocator(IOutputMetaData * meta, unsigned activityId) const
- {
- return ctx->getRowAllocator(meta, activityId);
- }
- virtual const char *cloneVString(const char *str) const
- {
- return ctx->cloneVString(str);
- }
- virtual const char *cloneVString(size32_t len, const char *str) const
- {
- return ctx->cloneVString(len, str);
- }
- virtual void getResultRowset(size32_t & tcount, byte * * & tgt, const char * name, unsigned sequence, IEngineRowAllocator * _rowAllocator, bool isGrouped, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer)
- {
- ctx->getResultRowset(tcount, tgt, name, sequence, _rowAllocator, isGrouped, xmlTransformer, csvTransformer);
- }
- virtual void getResultDictionary(size32_t & tcount, byte * * & tgt, IEngineRowAllocator * _rowAllocator, const char * name, unsigned sequence, IXmlToRowTransformer * xmlTransformer, ICsvToRowTransformer * csvTransformer, IHThorHashLookupInfo * hasher)
- {
- ctx->getResultDictionary(tcount, tgt, _rowAllocator, name, sequence, xmlTransformer, csvTransformer, hasher);
- }
- virtual void getRowXML(size32_t & lenResult, char * & result, IOutputMetaData & info, const void * row, unsigned flags)
- {
- convertRowToXML(lenResult, result, info, row, flags);
- }
- virtual const void * fromXml(IEngineRowAllocator * _rowAllocator, size32_t len, const char * utf8, IXmlToRowTransformer * xmlTransformer, bool stripWhitespace)
- {
- return ctx->fromXml(_rowAllocator, len, utf8, xmlTransformer, stripWhitespace);
- }
- virtual IEngineContext *queryEngineContext()
- {
- return ctx->queryEngineContext();
- }
- virtual char *getDaliServers()
- {
- return ctx->getDaliServers();
- }
- protected:
- ICodeContext * ctx;
- };
- extern THORHELPER_API bool isActivitySink(ThorActivityKind kind);
- extern THORHELPER_API bool isActivitySource(ThorActivityKind kind);
- extern THORHELPER_API const char * getActivityText(ThorActivityKind kind);
- #endif // THORHELPER_HPP
|