/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #ifndef FVSOURCE_IPP #define FVSOURCE_IPP #include "fvdatasource.hpp" #include "dllserver.hpp" #include "hqlexpr.hpp" #include "eclhelper.hpp" //Following constants configure different sizes etc. #define DISK_BLOCK_SIZE 8096 // Size of chunks read directly from file. #define PAGED_WU_LIMIT 0x20000 // Page load work unit results >= this size. #define WU_BLOCK_SIZE 0x4000 // Size of chunks read from Work unit #define DISKREAD_PAGE_SIZE 200 // Number of rows to read in each chunk from file. interface IRecordSizeEx : public IRecordSize { IRecordSize::getRecordSize; virtual size32_t getRecordSize(unsigned maxLength, const void *rec) = 0; }; class RecordSizeToEx : public CInterface, implements IRecordSizeEx { public: RecordSizeToEx(IRecordSize * _recordSize) : recordSize(_recordSize) {} IMPLEMENT_IINTERFACE virtual size32_t getRecordSize(const void *rec) { return recordSize->getRecordSize(rec); } virtual size32_t getRecordSize(unsigned maxLength, const void *rec) { return recordSize->getRecordSize(rec); } virtual size32_t getFixedSize() const { return recordSize->getFixedSize(); } private: Linked recordSize; }; //NB: In the following the following convention is used: // storedX - size/structure in WU/on disk // returnedX - size/structure of the data actually sent to the program // transformedX - size/structure of data after applying transformation. // for workunit storedX == returnedX for disk returnedX==transformedX class DataSourceMetaData; class DataSourceMetaItem : public CInterface { public: DataSourceMetaItem(unsigned _flags, const char * _name, const char * _xpath, ITypeInfo * _type); DataSourceMetaItem(unsigned flags, MemoryBuffer & in); virtual void serialize(MemoryBuffer & out) const; virtual DataSourceMetaData * queryChildMeta() { return NULL; } public: StringAttr name; StringAttr xpath; OwnedITypeInfo type; byte flags; }; class DataSourceMetaData : public CInterface, implements IFvDataSourceMetaData, public IRecordSizeEx { friend class DataSourceSetItem; public: DataSourceMetaData(IHqlExpression * _record, byte _numFieldsToIgnore, bool _randomIsOk, bool _isGrouped, unsigned _keyedSize); DataSourceMetaData(); // for NULL implementation DataSourceMetaData(type_t type); DataSourceMetaData(MemoryBuffer & in); IMPLEMENT_IINTERFACE virtual unsigned numColumns() const; virtual ITypeInfo * queryType(unsigned column) const; virtual const char * queryName(unsigned column) const; virtual const char * queryXPath(unsigned column) const; virtual bool supportsRandomSeek() const; virtual void serialize(MemoryBuffer & out) const; virtual unsigned queryFieldFlags(unsigned column) const; virtual IFvDataSourceMetaData * queryChildMeta(unsigned column) const; virtual IFvDataSource * createChildDataSource(unsigned column, unsigned len, const void * data); virtual unsigned numKeyedColumns() const; void addFileposition(); void addGrouping(); void addVirtualField(const char * name, const char * xpath, ITypeInfo * type); void extractKeyedInfo(UnsignedArray & offsets, TypeInfoArray & types); unsigned fixedSize() { return storedFixedSize; } bool isFixedSize() { return isStoredFixedWidth; } bool isSingleSet() { return ((fields.ordinality() == 1) && (fields.item(0).type->getTypeCode() == type_set)); } inline unsigned getMaxRecordSize() { return maxRecordSize; } inline bool isKey() { return keyedSize != 0; } //IRecordSizeEx.... virtual size32_t getRecordSize(const void *rec); virtual size32_t getFixedSize() const; virtual size32_t getRecordSize(unsigned maxLength, const void *rec) { return getRecordSize(rec); } protected: void addSimpleField(const char * name, const char * xpath, ITypeInfo * type); void gatherFields(IHqlExpression * expr, bool isConditional); void gatherChildFields(IHqlExpression * expr, bool isConditional); void init(); protected: CIArrayOf fields; unsigned keyedSize; unsigned storedFixedSize; unsigned maxRecordSize; unsigned bitsRemaining; unsigned numVirtualFields; bool isStoredFixedWidth; bool randomIsOk; byte numFieldsToIgnore; }; class DataSourceDatasetItem : public DataSourceMetaItem { public: DataSourceDatasetItem(const char * _name, const char * _xpath, IHqlExpression * expr); DataSourceDatasetItem(unsigned flags, MemoryBuffer & in); virtual DataSourceMetaData * queryChildMeta() { return &record; } virtual void serialize(MemoryBuffer & out) const; protected: DataSourceMetaData record; }; class DataSourceSetItem : public DataSourceMetaItem { public: DataSourceSetItem(const char * _name, const char * _xpath, ITypeInfo * _type); DataSourceSetItem(unsigned flags, MemoryBuffer & in); virtual DataSourceMetaData * queryChildMeta() { return &record; } virtual void serialize(MemoryBuffer & out) const; protected: void createChild(); protected: DataSourceMetaData record; }; //--------------------------------------------------------------------------- class RowBlock : public CInterface { public: RowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset); RowBlock(__int64 _start, __int64 _startOffset); virtual const void * fetchRow(__int64 offset, size32_t & len) = 0; virtual const void * getRow(__int64 search, size32_t & len, unsigned __int64 & rowOffset) = 0; __int64 getStartRow() const { return start; } __int64 getNextRow() const { return start + numRows; } virtual void getNextStoredOffset(__int64 & row, offset_t & offset); protected: MemoryBuffer buffer; __int64 start; __int64 startOffset; unsigned numRows; }; class FixedRowBlock : public RowBlock { public: FixedRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, size32_t _fixedRecordSize); virtual const void * fetchRow(__int64 offset, size32_t & len); virtual const void * getRow(__int64 search, size32_t & len, unsigned __int64 & rowOffset); protected: size32_t fixedRecordSize; }; class VariableRowBlock : public RowBlock { public: VariableRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, IRecordSizeEx * recordSize, bool isLast); VariableRowBlock(MemoryBuffer & inBuffer, __int64 _start); // used by remote virtual const void * fetchRow(__int64 offset, size32_t & len); virtual const void * getRow(__int64 search, size32_t & len, unsigned __int64 & rowOffset); protected: UnsignedArray rowIndex; }; //--------------------------------------------------------------------------- class FilePosFixedRowBlock : public FixedRowBlock { public: FilePosFixedRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, size32_t _fixedRecordSize) : FixedRowBlock(_buffer, _start, _startOffset, _fixedRecordSize) {} virtual void getNextStoredOffset(__int64 & row, offset_t & offset); }; class FilePosVariableRowBlock : public VariableRowBlock { public: FilePosVariableRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, IRecordSizeEx * recordSize, bool isLast) : VariableRowBlock(_buffer, _start, _startOffset, recordSize, isLast) {} virtual void getNextStoredOffset(__int64 & row, offset_t & offset); }; //--------------------------------------------------------------------------- struct RowLocation { RowLocation() { matchRow = 0; matchLength = 0; bestRow = 0; bestOffset = 0; } const void * matchRow; size32_t matchLength; __int64 bestRow; offset_t bestOffset; }; class RowCache { enum { MaxBlocksCached = 20, MinBlocksCached = 10 }; public: void addRowsOwn(RowBlock * rows); bool getCacheRow(__int64 row, RowLocation & location); protected: void makeRoom(); unsigned getBestRow(__int64 row); unsigned getInsertPosition(__int64 row); protected: CIArrayOf allRows; Int64Array ages; }; //--------------------------------------------------------------------------- class FVDataSource : public ADataSource { public: FVDataSource(); ~FVDataSource(); virtual IFvDataSourceMetaData * queryMetaData(); virtual bool fetchRow(MemoryBuffer & out, __int64 offset); virtual bool fetchRawRow(MemoryBuffer & out, __int64 offset); virtual bool getRow(MemoryBuffer & out, __int64 row); virtual bool getRawRow(MemoryBuffer & out, __int64 row); virtual void onClose() { openCount--; } virtual void onOpen() { openCount++; } protected: virtual bool fetchRowData(MemoryBuffer & out, __int64 offset) = 0; virtual bool getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset) = 0; protected: void addFileposition(); void copyRow(MemoryBuffer & out, const void * src, size32_t length); void loadDll(const char * wuid); bool setReturnedInfoFromResult(); protected: StringAttr wuid; Owned wuResult; HqlExprAttr returnedRecord; Owned returnedMeta; Owned returnedRecordSize; Owned transformedMeta; HqlExprAttr transformedRecord; Owned loadedDll; Array pluginDlls; rowTransformFunction transformer; unsigned extraFieldsSize; unsigned openCount; bool appendFileposition; }; class PagedDataSource : public FVDataSource { public: PagedDataSource() { totalRows = UNKNOWN_NUM_ROWS; } virtual __int64 numRows(bool force = false); virtual bool getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset); protected: virtual bool loadBlock(__int64 startRow, offset_t startOffset) = 0; virtual void improveLocation(__int64 row, RowLocation & location); protected: unsigned __int64 totalRows; RowCache cache; }; class NullDataSource : public ADataSource { public: NullDataSource() {} NullDataSource(IHqlExpression * _record, bool _isGrouped, unsigned _keyedSize); virtual bool init() { return true; } virtual IFvDataSourceMetaData * queryMetaData() { return &meta; } virtual __int64 numRows(bool force = false) { return 0; } virtual bool fetchRow(MemoryBuffer & out, __int64 offset) { return false; } virtual bool fetchRawRow(MemoryBuffer & out, __int64 offset) { return false; } virtual bool getRow(MemoryBuffer & out, __int64 row){ return false; } virtual bool getRawRow(MemoryBuffer & out, __int64 row){ return false; } virtual bool isIndex() { return false; } virtual bool optimizeFilter(unsigned offset, unsigned len, const void * data) { return true; } // empty anyway... virtual void onClose() { } virtual void onOpen() { } protected: DataSourceMetaData meta; }; class NestedDataSource : public FVDataSource { public: NestedDataSource(DataSourceMetaData & _meta, unsigned len, const void * data); //interface IFvDataSource virtual bool fetchRowData(MemoryBuffer & out, __int64 offset) { return false; } virtual bool getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset); virtual bool init(); virtual bool isIndex() { return false; } virtual __int64 numRows(bool force = false); virtual bool optimizeFilter(unsigned offset, unsigned len, const void * data) { return false; } protected: unsigned __int64 totalSize; Owned rows; }; class FailureDataSource : public NullDataSource { public: FailureDataSource(IHqlExpression * _record, IException * _error, bool _isGrouped, unsigned _keyedSize); virtual void onOpen() { throw LINK(error); } protected: Linked error; }; #define FullStringMatch ((unsigned)-1) extern IHqlExpression * parseQuery(const char * text); #endif