/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #include "platform.h" #include #include "jfile.hpp" #include "jthread.hpp" #include "jio.ipp" #include "jlzw.ipp" #include "jmisc.hpp" #include #include #include "jexcept.hpp" #include "jqueue.tpp" #ifdef _WIN32 #include #endif #define DEFAULTBUFFERSIZE 0x10000 // 64K #define RANDOM_BUFFER_SIZE DEFAULTBUFFERSIZE #define MAX_RANDOM_CACHE_SIZE 0x10000 #define RANDOM_CACHE_DEPTH 10 #define threshold 1024 #define timelimit 100 #define MINCOMPRESSEDROWSIZE 16 #define MAXCOMPRESSEDROWSIZE 0x4000 static unsigned ioRetryCount=0; void setIORetryCount(unsigned _ioRetryCount) // non atomic, expected to be called just once at process start up. { ioRetryCount = _ioRetryCount; PROGLOG("setIORetryCount set to : %d", ioRetryCount); } extern jlib_decl offset_t checked_lseeki64( int handle, offset_t offset, int origin ) { offset_t ret=_lseeki64(handle,offset,origin); if (ret==(offset_t)-1) throw MakeErrnoException("checked_lseeki64"); return ret; } extern jlib_decl size32_t checked_read(int file, void *buffer, size32_t len) { if (0==len) return 0; unsigned attempts = 0; size32_t ret = 0; unsigned __int64 startCycles = get_cycles_now(); loop { ssize_t readNow = _read(file, buffer, len); if (readNow == (ssize_t)-1) { switch (errno) { case EINTR: readNow = 0; break; default: if (attempts < ioRetryCount) { attempts++; StringBuffer callStr("read"); callStr.append("[errno=").append(errno); unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000; callStr.append(", took=").append(elapsedMs); callStr.append(", attempt=").append(attempts).append("](handle="); callStr.append(file).append(", len=").append(len).append(")"); PROGLOG("%s", callStr.str()); readNow = 0; break; } throw MakeErrnoException(errno, "checked_read"); } } else if (!readNow) break; ret += readNow; if (readNow == len) break; buffer = ((char *) buffer) + readNow; len -= readNow; } return ret; } #ifdef WIN32 static bool atomicsupported = true; static CriticalSection atomicsection; #endif extern jlib_decl size32_t checked_pread(int file, void *buffer, size32_t len, offset_t pos) { if (0==len) return 0; #ifdef WIN32 if (atomicsupported) { HANDLE hFile = (HANDLE)_get_osfhandle(file); DWORD rread; OVERLAPPED overlapped; memset(&overlapped, 0, sizeof(overlapped)); overlapped.Offset = (DWORD) pos; overlapped.OffsetHigh = (DWORD)(pos>>32); if (ReadFile(hFile, buffer, len, &rread, &overlapped)) return rread; int err = (int)GetLastError(); if (err == ERROR_HANDLE_EOF) return 0; if (err == ERROR_INVALID_PARAMETER) // Win98 etc atomicsupported = false; else throw MakeOsException(GetLastError(), "checked_pread"); } { CriticalBlock blk(atomicsection); checked_lseeki64(file, pos, FILE_BEGIN); return checked_read(file, buffer, len); } #else size32_t ret = 0; unsigned attempts = 0; unsigned __int64 startCycles = get_cycles_now(); loop { ssize_t readNow = ::pread(file, buffer, len, pos); if (readNow == (ssize_t)-1) { switch (errno) { case EINTR: readNow = 0; break; default: if (attempts < ioRetryCount) { attempts++; StringBuffer callStr("pread"); callStr.append("[errno=").append(errno); unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000; callStr.append(", took=").append(elapsedMs); callStr.append(", attempt=").append(attempts).append("](handle="); callStr.append(file).append(", pos=").append(pos).append(", len=").append(len).append(")"); PROGLOG("%s", callStr.str()); readNow = 0; break; } throw MakeErrnoException(errno,"checked_pread"); } } else if (!readNow) break; ret += readNow; if (readNow == len) break; pos += readNow; buffer = ((char *) buffer) + readNow; len -= readNow; } return ret; #endif } extern jlib_decl size32_t checked_write( int handle, const void *buffer, size32_t count ) { int ret=_write(handle,buffer,count); if ((size32_t)ret != count) { throw MakeErrnoException((ret==-1)?errno:DISK_FULL_EXCEPTION_CODE, "checked_write"); } return (size32_t)ret; } class CReadSeq : public CInterface, public IReadSeq { int fh; size32_t size; char *buffer; char *ptr; size32_t bufSize; size32_t bytesInBuffer; offset_t startpos; offset_t endpos; offset_t nextbufpos; bool compressed; void *prev; size32_t maxcompsize; bool first; inline unsigned remaining() { return (unsigned)(buffer+bytesInBuffer-ptr); } size32_t getBytes(void *dst, size32_t _size) { size32_t left = remaining(); size32_t read = 0; while (_size>left) { if (left) { memcpy(dst, ptr, left); dst = (char *)dst + left; _size -= left; read += left; ptr+=left; } refill(); left = bytesInBuffer; if (!left) return read; } memcpy(dst, ptr, _size); ptr += _size; read += _size; return read; } void refill() { size32_t left = remaining(); memmove(buffer,ptr,left); size32_t rd=bufSize-left; if (endpos-nextbufpos<(offset_t)rd) rd = (size32_t)(endpos-nextbufpos); if (rd) rd = checked_pread(fh, buffer+left, rd, nextbufpos); nextbufpos += rd; bytesInBuffer = left+rd; ptr = buffer; } public: IMPLEMENT_IINTERFACE; CReadSeq(int _fh, offset_t _offset, unsigned maxrecs, size32_t _size, size32_t _bufsize, bool _compressed) { assertex(_size); fh = _fh; size = _size; bufSize = (_bufsize==(unsigned) -1)?DEFAULTBUFFERSIZE:_bufsize; bytesInBuffer = 0; startpos = _offset; nextbufpos = _offset; compressed = ((size=MINCOMPRESSEDROWSIZE)&&(size<=MAXCOMPRESSEDROWSIZE))?_compressed:false; if (compressed) { maxcompsize = size+size/3+3; // migger than needed buffer = (char *) malloc(bufSize+size); prev = buffer+bufSize; } else buffer = (char *) malloc(bufSize); ptr = buffer; first = true; endpos = (maxrecs!=(unsigned)-1)?(_offset+(offset_t)maxrecs*(offset_t)_size):I64C(0x7ffffffffff); } ~CReadSeq() { free(buffer); } virtual bool get(void *dst) { if (!compressed) return getBytes(dst, size)==size; return (getn(dst,1)==1); } virtual unsigned getn(void *dst, unsigned n) { if (!compressed) return getBytes(dst, size*n)/size; byte *d = (byte *)dst; byte *e = d+(size*n); byte *p = (byte *)prev; unsigned ret = 0; while (d!=e) { if (first) { if (getBytes(d, size)!=size) break; first = false; } else { if (remaining()reset(); // not done itself return seq; } return new CReadSeq(fh, _offset, maxrecs, size, bufsize, compressed); } //================================================================================================ class CWriteSeq : public CInterface, public IWriteSeq { private: int fh; size32_t size; char *buffer; char *ptr; size32_t bufSize; offset_t fpos; bool compressed; size32_t maxcompsize; void *prev; void *aux; bool first; inline size32_t remaining() { return (size32_t)(bufSize - (ptr-buffer)); } void putBytes(const void *src, size32_t _size) { fpos += _size; size32_t left = remaining(); if (_size>left) { if (ptr!=buffer) { // don't buffer if entire block memcpy(ptr, src, left); ptr += left; src = (char *)src + left; _size -= left; flush(); left = bufSize; } while (_size>=bufSize) // write out directly { checked_write(fh, src, bufSize); // stick to writing bufSize blocks src = (char *)src + bufSize; _size -= bufSize; } } memcpy(ptr, src, _size); ptr += _size; } public: IMPLEMENT_IINTERFACE; CWriteSeq(int _fh, size32_t _size, size32_t _bufsize, bool _compressed) { assertex(_fh); assertex(_size); fh = _fh; size = _size; fpos = 0; if (_bufsize == (unsigned) -1) _bufsize = DEFAULTBUFFERSIZE; bufSize = _bufsize; compressed = ((size=MINCOMPRESSEDROWSIZE)&&(size<=MAXCOMPRESSEDROWSIZE))?_compressed:false; if (compressed) { maxcompsize = size+size/3+3; // bigger than needed buffer = (char *) malloc(bufSize+size+maxcompsize); prev = buffer+bufSize; aux = (char *)prev+size; } else buffer = (char *) malloc(bufSize); ptr = buffer; first = true; } ~CWriteSeq() { free(buffer); } void put(const void *src) { if (compressed) { if (first) { first = false; memcpy(prev,src,size); } else if (remaining()>=maxcompsize) { size32_t sz = DiffCompress(src,ptr,prev,size); fpos += sz; ptr += sz; return; } else { putBytes(aux, DiffCompress(src,aux,prev,size)); return; } } putBytes(src, size); } void putn(const void *src, unsigned numRecs) { if (compressed) { while (numRecs) { put(src); src = (byte *)src+size; numRecs--; } } else putBytes(src, size*numRecs); } void flush() { if (ptr != buffer) { checked_write(fh, buffer, (size32_t)(ptr-buffer)); ptr = buffer; } } offset_t getPosition() { return fpos; } virtual size32_t getRecordSize() { return size; } }; IWriteSeq *createWriteSeq(int fh, size32_t size, size32_t bufsize, bool compressed) { // Async TBD if (!bufsize) return new CUnbufferedReadWriteSeq(fh, 0, size); else return new CWriteSeq(fh, size, bufsize,compressed); } IWriteSeq *createTeeWriteSeq(IWriteSeq *f1, IWriteSeq *f2) { return new CTeeWriteSeq(f1, f2); } //=========================================================================================== CUnbufferedReadWriteSeq::CUnbufferedReadWriteSeq(int _fh, offset_t _offset, size32_t _size) { fh = _fh; size = _size; offset = _offset; fpos = _offset; } void CUnbufferedReadWriteSeq::put(const void *src) { checked_write(fh, src, size); fpos += size; } void CUnbufferedReadWriteSeq::putn(const void *src, unsigned n) { checked_write(fh, src, size*n); fpos += size*n; } void CUnbufferedReadWriteSeq::flush() {} offset_t CUnbufferedReadWriteSeq::getPosition() { return fpos; } bool CUnbufferedReadWriteSeq::get(void *dst) { size32_t toread = size; while (toread) { int read = checked_read(fh, dst, toread); if (!read) return false; toread -= read; dst = (char *) dst + read; } return true; } unsigned CUnbufferedReadWriteSeq::getn(void *dst, unsigned n) { size32_t toread = size*n; size32_t totread = 0; while (toread) { int read = checked_read(fh, dst, toread); if (!read) break; toread -= read; totread += read; dst = (char *) dst + read; } return totread/size; } void CUnbufferedReadWriteSeq::reset() { checked_lseeki64(fh, offset, SEEK_SET); fpos = offset; } //=========================================================================================== //=========================================================================================== CTeeWriteSeq::CTeeWriteSeq(IWriteSeq *_f1, IWriteSeq *_f2) { w1 = _f1; w1->Link(); w2 = _f2; w2->Link(); assertex(w1->getRecordSize()==w2->getRecordSize()); } CTeeWriteSeq::~CTeeWriteSeq() { w1->Release(); w2->Release(); } void CTeeWriteSeq::put(const void *src) { w1->put(src); w2->put(src); } void CTeeWriteSeq::putn(const void *src, unsigned n) { w1->putn(src, n); w2->putn(src, n); } void CTeeWriteSeq::flush() { w1->flush(); w2->flush(); } size32_t CTeeWriteSeq::getRecordSize() { return w1->getRecordSize(); } offset_t CTeeWriteSeq::getPosition() { return w1->getPosition(); } //================================================================================================== class CFixedRecordSize: public CInterface, public IRecordSize { protected: size32_t recsize; public: IMPLEMENT_IINTERFACE; CFixedRecordSize(size32_t _recsize) { recsize=_recsize; } virtual size32_t getRecordSize(const void *) { return recsize; } size32_t getFixedSize() const { return recsize; } }; IRecordSize *createFixedRecordSize(size32_t recsize) { return new CFixedRecordSize(recsize); } class CDeltaRecordSize: public CInterface, public IRecordSize { protected: Owned recordSize; int delta; public: CDeltaRecordSize(IRecordSize * _recordSize, int _delta) { recordSize.set(_recordSize); delta = _delta; } IMPLEMENT_IINTERFACE; virtual size32_t getRecordSize(const void * data) { return recordSize->getRecordSize(data) + delta; } size32_t getFixedSize() const { return recordSize->getFixedSize()?recordSize->getFixedSize()+delta:0; } }; extern jlib_decl IRecordSize *createDeltaRecordSize(IRecordSize * size, int delta) { if (delta == 0) return LINK(size); return new CDeltaRecordSize(size, delta); } //================================================================================================== // Elevator scanning #define MAX_PENDING 20000 class ElevatorScanner; class PendingFetch : public CInterface, public IInterface { public: IMPLEMENT_IINTERFACE; static int compare(const void *a, const void *b); offset_t pos; IReceiver *receiver; void *target; IRecordFetchChannel *channel; }; class ElevatorChannel : public CInterface, implements IRecordFetchChannel { private: bool cancelled; bool immediate; ElevatorScanner &scanner; public: IMPLEMENT_IINTERFACE; ElevatorChannel(ElevatorScanner &, bool); ~ElevatorChannel(); //Interface IRecordFetchChannel virtual void fetch(offset_t pos, void *buffer, IReceiver *receiver); virtual void flush(); virtual void abort() { cancelled = true; } virtual bool isAborted() { return cancelled; } virtual bool isImmediate() { return immediate; } }; class ElevatorScanner : public Thread, public IRecordFetcher { private: Monitor scanlist; Monitor isRoom; PendingFetch pending[MAX_PENDING]; unsigned nextSlot; size32_t recordSize; int file; offset_t reads; unsigned scans; bool stopped; unsigned duetime; void scan(); void doFetch(PendingFetch &); void stop(); void resetTimer() { duetime = msTick()+timelimit; } public: IMPLEMENT_IINTERFACE; virtual void beforeDispose(); ElevatorScanner(int file, size32_t recordSize); ~ElevatorScanner(); //Interface IRecordFetcher virtual IRecordFetchChannel *openChannel(bool immediate) { return new ElevatorChannel(*this, immediate); } //Interface Thread virtual int run(); void flush(IRecordFetchChannel *); void fetch(offset_t, void *, IReceiver *, IRecordFetchChannel *); }; int PendingFetch::compare(const void *a, const void *b) { offset_t aa = ((PendingFetch *) a)->pos; offset_t bb = ((PendingFetch *) b)->pos; if (aa > bb) return 1; else if (aa == bb) return 0; else return -1; } ElevatorChannel::ElevatorChannel(ElevatorScanner &_scanner, bool _immediate) : scanner(_scanner) { scanner.Link(); cancelled = false; immediate = _immediate; } ElevatorChannel::~ElevatorChannel() { flush(); scanner.Release(); } void ElevatorChannel::fetch(offset_t fpos, void *buffer, IReceiver *receiver) { scanner.fetch(fpos, buffer, receiver, this); } void ElevatorChannel::flush() { scanner.flush(this); } ElevatorScanner::ElevatorScanner(int _file, size32_t _recordSize) : Thread("ElevatorScanner") { file = _file; recordSize = _recordSize; nextSlot = 0; reads = 0; scans = 0; stopped = false; start(); } ElevatorScanner::~ElevatorScanner() { PrintLog("Elevator scanner statistics: %"I64F"d reads (%"I64F"d bytes), %d scans", reads, reads*recordSize, scans); } void ElevatorScanner::beforeDispose() { stop(); join(); } void ElevatorScanner::fetch(offset_t fpos, void *buffer, IReceiver *receiver, IRecordFetchChannel *channel) { synchronized procedure(scanlist); if (channel->isImmediate()) { // MORE - atomic seek/read would be preferable! checked_lseeki64(file, fpos, SEEK_SET); checked_read(file, buffer, recordSize); reads++; if (!receiver->takeRecord(fpos)) channel->abort(); return; } { synchronized block(isRoom); while (nextSlot >= MAX_PENDING) isRoom.wait(); } if (!channel->isAborted()) { pending[nextSlot].pos = fpos; pending[nextSlot].receiver = receiver; pending[nextSlot].target = buffer; pending[nextSlot].channel = channel; nextSlot++; resetTimer(); scanlist.notify(); } } void ElevatorScanner::doFetch(PendingFetch &next) { if (!next.channel->isAborted()) { // MORE - atomic seek/read would be preferable! checked_lseeki64(file, next.pos, SEEK_SET); checked_read(file, next.target, recordSize); reads++; if (!next.receiver->takeRecord(next.pos)) next.channel->abort(); } } void ElevatorScanner::scan() { PrintLog("Starting elevator scan of %d items", nextSlot); scans++; qsort(pending, nextSlot, sizeof(pending[0]), PendingFetch::compare); for (unsigned i = 0; i < nextSlot; i++) { doFetch(pending[i]); } nextSlot = 0; { synchronized block(isRoom); isRoom.notify(); } PrintLog("Finished elevator scan"); } void ElevatorScanner::flush(IRecordFetchChannel *) { // MORE - I could just flush what was asked for, but I may as well flush the lot. synchronized procedure(scanlist); if (nextSlot) scan(); } int ElevatorScanner::run() { scanlist.lock(); for (;;) { while (nextSlotLink(); num = 0; recsize = 0; pos = 0; } virtual ~CChainedWriteSeq() { ::Release(stream); allocator->Release(); } void flush() { if (stream) stream->flush(); } void put(const void *dst) { putn(dst,1); } void putn(const void *dst, unsigned numrecs) { if (numrecs==0) return; if (stream==NULL) return; // could raise exception instead byte *out=(byte *)dst; while (numrecs>num) { stream->putn(out,num); pos+=num; numrecs-=num; stream->flush(); IWriteSeq *oldstream=stream; stream = allocator->next(num); oldstream->Release(); if (!stream) { return; // could raise exception } } stream->putn(out,numrecs); pos+=numrecs; } virtual size32_t getRecordSize() { if ((recsize==0)&&stream) recsize = stream->getRecordSize(); return recsize; } virtual offset_t getPosition() { return pos; } }; class CChainedReadSeq : public CInterface, public IReadSeq { protected: IReadSeq *stream; IReadSeqAllocator *allocator; unsigned num; size32_t recsize; public: IMPLEMENT_IINTERFACE; CChainedReadSeq(IReadSeqAllocator *_allocator) { allocator = _allocator; allocator->Link(); stream = allocator->next(); num = 0; recsize = 0; } virtual ~CChainedReadSeq() { ::Release(stream); allocator->Release(); } virtual bool get(void *dst) { return (getn(dst,1)==1); } virtual unsigned getn(void *dst, unsigned n) { unsigned done=0; while (stream&&n) { unsigned r = stream->getn(dst,n); if (r==0) { IReadSeq *oldstream=stream; stream = allocator->next(); oldstream->Release(); } else { n-=r; done+=r; } } return done; } virtual size32_t getRecordSize() { return stream->getRecordSize(); } virtual void reset() { stream->reset(); } virtual void stop() { stream->stop(); } }; unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize) { size32_t recsize=from->getRecordSize(); assertex(recsize==to->getRecordSize()); unsigned nbuf = bufsize/recsize; if (nbuf==0) nbuf = 1; MemoryAttr ma; byte *buf=(byte *)ma.allocate(nbuf*recsize); unsigned ret = 0; loop { unsigned n = from->getn(buf,nbuf); if (n==0) break; to->putn(buf,n); ret += n; } return ret; } ///////////////// CBufferedIOStreamBase::CBufferedIOStreamBase(size32_t _bufferSize) { bufferSize = _bufferSize; numInBuffer = 0; curBufferOffset = 0; reading = true; minDirectSize = std::min(bufferSize/4,(size32_t)0x2000); // size where we don't bother copying into the buffer } size32_t CBufferedIOStreamBase::doread(size32_t len, void * data) { if (!reading) { doflush(); reading = true; } size32_t sizeGot = readFromBuffer(len, data); len -= sizeGot; if (len!=0) { data = (char *)data + sizeGot; if (len >= minDirectSize) sizeGot += directRead(len, data); // if direct read specified don't loop else { do { if (!fillBuffer()) break; size32_t numRead = readFromBuffer(len, data); sizeGot += numRead; len -= numRead; data = (char *)data + numRead; } while (len); } } return sizeGot; } size32_t CBufferedIOStreamBase::dowrite(size32_t len, const void * data) { if (reading) { curBufferOffset = 0; numInBuffer = 0; reading = false; } size32_t ret = len; while (len) { // tries to write in buffer size chunks, also flushes as soon as possible size32_t wr; if (numInBuffer != 0) { wr = std::min(len,bufferSize-curBufferOffset); writeToBuffer(wr, data); if (numInBuffer==bufferSize) doflush(); len -= wr; if (len==0) break; data = (char *)data + wr; } if (len >= minDirectSize) return directWrite(len, data)+ret-len; wr = std::min(len,bufferSize); writeToBuffer(wr, data); if (numInBuffer==bufferSize) doflush(); len -= wr; data = (char *)data + wr; } return ret; // there is a bit of an assumption here that flush always works } //////////////////////////// class CBufferedIIOStream : public CBufferedIOStreamBase, implements IIOStream { Linked io; public: IMPLEMENT_IINTERFACE; CBufferedIIOStream(IIOStream *_io, unsigned bufSize) : CBufferedIOStreamBase(bufSize), io(_io) { buffer = new byte[bufSize]; } ~CBufferedIIOStream() { try { flush(); } catch (IException *) { delete [] buffer; throw; } delete [] buffer; } virtual bool fillBuffer() { reading = true; numInBuffer = io->read(bufferSize, buffer); curBufferOffset = 0; return numInBuffer!=0; } virtual size32_t directRead(size32_t len, void * data) { return io->read(len, data); } virtual size32_t directWrite(size32_t len, const void * data) { return io->write(len,data); } virtual void doflush() { if (!reading && numInBuffer) { //Copy numInBuffer out before flush so that destructor doesn't attempt to flush again. size32_t numToWrite = numInBuffer; numInBuffer = 0; io->write(numToWrite, buffer); curBufferOffset = 0; } } // IIOStream impl. virtual size32_t read(size32_t len, void * data) { return CBufferedIOStreamBase::doread(len, data); } virtual size32_t write(size32_t len, const void * data) { return CBufferedIOStreamBase::dowrite(len, data); } virtual void flush() { doflush(); } }; IIOStream *createBufferedIOStream(IIOStream *io, unsigned bufSize) { if (bufSize == (unsigned)-1) bufSize = DEFAULT_BUFFER_SIZE; return new CBufferedIIOStream(io, bufSize); } IRowStream *createNullRowStream() { class cNullStream: public CInterface, implements IRowStream { const void *nextRow() { return NULL; } void stop() {} public: IMPLEMENT_IINTERFACE; cNullStream() {} }; return new cNullStream; } unsigned copyRowStream(IRowStream *in, IRowWriter *out) { unsigned ret=0; loop { const void *row = in->nextRow(); if (!row) break; ret ++; out->putRow(row); } return ret; } unsigned groupedCopyRowStream(IRowStream *in, IRowWriter *out) { unsigned ret=0; loop { const void *row = in->nextRow(); if (!row) { row = in->nextRow(); if (!row) break; out->putRow(NULL); } ret ++; out->putRow(row); } return ret; } unsigned ungroupedCopyRowStream(IRowStream *in, IRowWriter *out) { unsigned ret=0; loop { const void *row = in->ungroupedNextRow(); if (!row) break; ret ++; out->putRow(row); } return ret; } class CConcatRowStream : public CInterface, public IRowStream { IArrayOf oinstreams; public: unsigned num; unsigned idx; IRowStream **in; bool grouped; bool needeog; public: IMPLEMENT_IINTERFACE; CConcatRowStream(unsigned _num, IRowStream **_in,bool _grouped) { // in streams assumed valid throughout (i.e. not linked) num = _num; idx = 0; assertex(num); oinstreams.ensure(num); for (unsigned n = 0;nnextRow(); if (row) { needeog = true; return row; } if (needeog) { needeog = false; return NULL; } } else { row = in[idx]->ungroupedNextRow(); if (row) return row; } idx++; } return NULL; } virtual void stop() { while (idxstop(); } }; extern jlib_decl IWriteSeq *createChainedWriteSeq(IWriteSeqAllocator *iwsa) { return new CChainedWriteSeq(iwsa); } extern jlib_decl IReadSeq *createChainedReadSeq(IReadSeqAllocator *irsa) { return new CChainedReadSeq(irsa); } IRowStream *createConcatRowStream(unsigned numstreams,IRowStream** streams,bool grouped) { return new CConcatRowStream(numstreams,streams,grouped); } #ifdef __x86_64__ void writeStringToStream(IIOStream &out, const char *s) { out.write((size32_t)strlen(s), s); } void writeCharsNToStream(IIOStream &out, char c, unsigned cnt) { while(cnt--) out.write(1, &c); } void writeCharToStream(IIOStream &out, char c) { out.write(1, &c); } #endif