/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
############################################################################## */
#include "platform.h"
#include
#include "jfile.hpp"
#include "jthread.hpp"
#include "jio.ipp"
#include "jlzw.ipp"
#include "jmisc.hpp"
#include
#include
#include "jexcept.hpp"
#include "jqueue.tpp"
#ifdef _WIN32
#include
#endif
#define DEFAULTBUFFERSIZE 0x10000 // 64K
#define RANDOM_BUFFER_SIZE DEFAULTBUFFERSIZE
#define MAX_RANDOM_CACHE_SIZE 0x10000
#define RANDOM_CACHE_DEPTH 10
#define threshold 1024
#define timelimit 100
#define MINCOMPRESSEDROWSIZE 16
#define MAXCOMPRESSEDROWSIZE 0x4000
static unsigned ioRetryCount=0;
void setIORetryCount(unsigned _ioRetryCount) // non atomic, expected to be called just once at process start up.
{
ioRetryCount = _ioRetryCount;
PROGLOG("setIORetryCount set to : %d", ioRetryCount);
}
extern jlib_decl offset_t checked_lseeki64( int handle, offset_t offset, int origin )
{
offset_t ret=_lseeki64(handle,offset,origin);
if (ret==(offset_t)-1)
throw MakeErrnoException("checked_lseeki64");
return ret;
}
extern jlib_decl size32_t checked_read(int file, void *buffer, size32_t len)
{
if (0==len) return 0;
unsigned attempts = 0;
size32_t ret = 0;
unsigned __int64 startCycles = get_cycles_now();
loop
{
ssize_t readNow = _read(file, buffer, len);
if (readNow == (ssize_t)-1)
{
switch (errno)
{
case EINTR:
readNow = 0;
break;
default:
if (attempts < ioRetryCount)
{
attempts++;
StringBuffer callStr("read");
callStr.append("[errno=").append(errno);
unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
callStr.append(", took=").append(elapsedMs);
callStr.append(", attempt=").append(attempts).append("](handle=");
callStr.append(file).append(", len=").append(len).append(")");
PROGLOG("%s", callStr.str());
readNow = 0;
break;
}
throw MakeErrnoException(errno, "checked_read");
}
}
else if (!readNow)
break;
ret += readNow;
if (readNow == len)
break;
buffer = ((char *) buffer) + readNow;
len -= readNow;
}
return ret;
}
#ifdef WIN32
static bool atomicsupported = true;
static CriticalSection atomicsection;
#endif
extern jlib_decl size32_t checked_pread(int file, void *buffer, size32_t len, offset_t pos)
{
if (0==len) return 0;
#ifdef WIN32
if (atomicsupported)
{
HANDLE hFile = (HANDLE)_get_osfhandle(file);
DWORD rread;
OVERLAPPED overlapped;
memset(&overlapped, 0, sizeof(overlapped));
overlapped.Offset = (DWORD) pos;
overlapped.OffsetHigh = (DWORD)(pos>>32);
if (ReadFile(hFile, buffer, len, &rread, &overlapped))
return rread;
int err = (int)GetLastError();
if (err == ERROR_HANDLE_EOF)
return 0;
if (err == ERROR_INVALID_PARAMETER) // Win98 etc
atomicsupported = false;
else
throw MakeOsException(GetLastError(), "checked_pread");
}
{
CriticalBlock blk(atomicsection);
checked_lseeki64(file, pos, FILE_BEGIN);
return checked_read(file, buffer, len);
}
#else
size32_t ret = 0;
unsigned attempts = 0;
unsigned __int64 startCycles = get_cycles_now();
loop
{
ssize_t readNow = ::pread(file, buffer, len, pos);
if (readNow == (ssize_t)-1)
{
switch (errno)
{
case EINTR:
readNow = 0;
break;
default:
if (attempts < ioRetryCount)
{
attempts++;
StringBuffer callStr("pread");
callStr.append("[errno=").append(errno);
unsigned __int64 elapsedMs = cycle_to_nanosec(get_cycles_now() - startCycles)/1000000;
callStr.append(", took=").append(elapsedMs);
callStr.append(", attempt=").append(attempts).append("](handle=");
callStr.append(file).append(", pos=").append(pos).append(", len=").append(len).append(")");
PROGLOG("%s", callStr.str());
readNow = 0;
break;
}
throw MakeErrnoException(errno,"checked_pread");
}
}
else if (!readNow)
break;
ret += readNow;
if (readNow == len)
break;
pos += readNow;
buffer = ((char *) buffer) + readNow;
len -= readNow;
}
return ret;
#endif
}
extern jlib_decl size32_t checked_write( int handle, const void *buffer, size32_t count )
{
int ret=_write(handle,buffer,count);
if ((size32_t)ret != count)
{
throw MakeErrnoException((ret==-1)?errno:DISK_FULL_EXCEPTION_CODE, "checked_write");
}
return (size32_t)ret;
}
class CReadSeq : public CInterface, public IReadSeq
{
int fh;
size32_t size;
char *buffer;
char *ptr;
size32_t bufSize;
size32_t bytesInBuffer;
offset_t startpos;
offset_t endpos;
offset_t nextbufpos;
bool compressed;
void *prev;
size32_t maxcompsize;
bool first;
inline unsigned remaining()
{
return (unsigned)(buffer+bytesInBuffer-ptr);
}
size32_t getBytes(void *dst, size32_t _size)
{
size32_t left = remaining();
size32_t read = 0;
while (_size>left) {
if (left) {
memcpy(dst, ptr, left);
dst = (char *)dst + left;
_size -= left;
read += left;
ptr+=left;
}
refill();
left = bytesInBuffer;
if (!left)
return read;
}
memcpy(dst, ptr, _size);
ptr += _size;
read += _size;
return read;
}
void refill()
{
size32_t left = remaining();
memmove(buffer,ptr,left);
size32_t rd=bufSize-left;
if (endpos-nextbufpos<(offset_t)rd)
rd = (size32_t)(endpos-nextbufpos);
if (rd)
rd = checked_pread(fh, buffer+left, rd, nextbufpos);
nextbufpos += rd;
bytesInBuffer = left+rd;
ptr = buffer;
}
public:
IMPLEMENT_IINTERFACE;
CReadSeq(int _fh, offset_t _offset, unsigned maxrecs, size32_t _size, size32_t _bufsize, bool _compressed)
{
assertex(_size);
fh = _fh;
size = _size;
bufSize = (_bufsize==(unsigned) -1)?DEFAULTBUFFERSIZE:_bufsize;
bytesInBuffer = 0;
startpos = _offset;
nextbufpos = _offset;
compressed = ((size=MINCOMPRESSEDROWSIZE)&&(size<=MAXCOMPRESSEDROWSIZE))?_compressed:false;
if (compressed) {
maxcompsize = size+size/3+3; // migger than needed
buffer = (char *) malloc(bufSize+size);
prev = buffer+bufSize;
}
else
buffer = (char *) malloc(bufSize);
ptr = buffer;
first = true;
endpos = (maxrecs!=(unsigned)-1)?(_offset+(offset_t)maxrecs*(offset_t)_size):I64C(0x7ffffffffff);
}
~CReadSeq()
{
free(buffer);
}
virtual bool get(void *dst)
{
if (!compressed)
return getBytes(dst, size)==size;
return (getn(dst,1)==1);
}
virtual unsigned getn(void *dst, unsigned n)
{
if (!compressed)
return getBytes(dst, size*n)/size;
byte *d = (byte *)dst;
byte *e = d+(size*n);
byte *p = (byte *)prev;
unsigned ret = 0;
while (d!=e) {
if (first) {
if (getBytes(d, size)!=size)
break;
first = false;
}
else {
if (remaining()reset(); // not done itself
return seq;
}
return new CReadSeq(fh, _offset, maxrecs, size, bufsize, compressed);
}
//================================================================================================
class CWriteSeq : public CInterface, public IWriteSeq
{
private:
int fh;
size32_t size;
char *buffer;
char *ptr;
size32_t bufSize;
offset_t fpos;
bool compressed;
size32_t maxcompsize;
void *prev;
void *aux;
bool first;
inline size32_t remaining()
{
return (size32_t)(bufSize - (ptr-buffer));
}
void putBytes(const void *src, size32_t _size)
{
fpos += _size;
size32_t left = remaining();
if (_size>left)
{
if (ptr!=buffer) { // don't buffer if entire block
memcpy(ptr, src, left);
ptr += left;
src = (char *)src + left;
_size -= left;
flush();
left = bufSize;
}
while (_size>=bufSize) // write out directly
{
checked_write(fh, src, bufSize); // stick to writing bufSize blocks
src = (char *)src + bufSize;
_size -= bufSize;
}
}
memcpy(ptr, src, _size);
ptr += _size;
}
public:
IMPLEMENT_IINTERFACE;
CWriteSeq(int _fh, size32_t _size, size32_t _bufsize, bool _compressed)
{
assertex(_fh);
assertex(_size);
fh = _fh;
size = _size;
fpos = 0;
if (_bufsize == (unsigned) -1)
_bufsize = DEFAULTBUFFERSIZE;
bufSize = _bufsize;
compressed = ((size=MINCOMPRESSEDROWSIZE)&&(size<=MAXCOMPRESSEDROWSIZE))?_compressed:false;
if (compressed) {
maxcompsize = size+size/3+3; // bigger than needed
buffer = (char *) malloc(bufSize+size+maxcompsize);
prev = buffer+bufSize;
aux = (char *)prev+size;
}
else
buffer = (char *) malloc(bufSize);
ptr = buffer;
first = true;
}
~CWriteSeq()
{
free(buffer);
}
void put(const void *src)
{
if (compressed) {
if (first) {
first = false;
memcpy(prev,src,size);
}
else if (remaining()>=maxcompsize) {
size32_t sz = DiffCompress(src,ptr,prev,size);
fpos += sz;
ptr += sz;
return;
}
else {
putBytes(aux, DiffCompress(src,aux,prev,size));
return;
}
}
putBytes(src, size);
}
void putn(const void *src, unsigned numRecs)
{
if (compressed) {
while (numRecs) {
put(src);
src = (byte *)src+size;
numRecs--;
}
}
else
putBytes(src, size*numRecs);
}
void flush()
{
if (ptr != buffer)
{
checked_write(fh, buffer, (size32_t)(ptr-buffer));
ptr = buffer;
}
}
offset_t getPosition()
{
return fpos;
}
virtual size32_t getRecordSize()
{
return size;
}
};
IWriteSeq *createWriteSeq(int fh, size32_t size, size32_t bufsize, bool compressed)
{
// Async TBD
if (!bufsize)
return new CUnbufferedReadWriteSeq(fh, 0, size);
else
return new CWriteSeq(fh, size, bufsize,compressed);
}
IWriteSeq *createTeeWriteSeq(IWriteSeq *f1, IWriteSeq *f2)
{
return new CTeeWriteSeq(f1, f2);
}
//===========================================================================================
CUnbufferedReadWriteSeq::CUnbufferedReadWriteSeq(int _fh, offset_t _offset, size32_t _size)
{
fh = _fh;
size = _size;
offset = _offset;
fpos = _offset;
}
void CUnbufferedReadWriteSeq::put(const void *src)
{
checked_write(fh, src, size);
fpos += size;
}
void CUnbufferedReadWriteSeq::putn(const void *src, unsigned n)
{
checked_write(fh, src, size*n);
fpos += size*n;
}
void CUnbufferedReadWriteSeq::flush()
{}
offset_t CUnbufferedReadWriteSeq::getPosition()
{
return fpos;
}
bool CUnbufferedReadWriteSeq::get(void *dst)
{
size32_t toread = size;
while (toread)
{
int read = checked_read(fh, dst, toread);
if (!read)
return false;
toread -= read;
dst = (char *) dst + read;
}
return true;
}
unsigned CUnbufferedReadWriteSeq::getn(void *dst, unsigned n)
{
size32_t toread = size*n;
size32_t totread = 0;
while (toread)
{
int read = checked_read(fh, dst, toread);
if (!read)
break;
toread -= read;
totread += read;
dst = (char *) dst + read;
}
return totread/size;
}
void CUnbufferedReadWriteSeq::reset()
{
checked_lseeki64(fh, offset, SEEK_SET);
fpos = offset;
}
//===========================================================================================
//===========================================================================================
CTeeWriteSeq::CTeeWriteSeq(IWriteSeq *_f1, IWriteSeq *_f2)
{
w1 = _f1;
w1->Link();
w2 = _f2;
w2->Link();
assertex(w1->getRecordSize()==w2->getRecordSize());
}
CTeeWriteSeq::~CTeeWriteSeq()
{
w1->Release();
w2->Release();
}
void CTeeWriteSeq::put(const void *src)
{
w1->put(src);
w2->put(src);
}
void CTeeWriteSeq::putn(const void *src, unsigned n)
{
w1->putn(src, n);
w2->putn(src, n);
}
void CTeeWriteSeq::flush()
{
w1->flush();
w2->flush();
}
size32_t CTeeWriteSeq::getRecordSize()
{
return w1->getRecordSize();
}
offset_t CTeeWriteSeq::getPosition()
{
return w1->getPosition();
}
//==================================================================================================
class CFixedRecordSize: public CInterface, public IRecordSize
{
protected:
size32_t recsize;
public:
IMPLEMENT_IINTERFACE;
CFixedRecordSize(size32_t _recsize) { recsize=_recsize; }
virtual size32_t getRecordSize(const void *)
{
return recsize;
}
size32_t getFixedSize() const
{
return recsize;
}
};
IRecordSize *createFixedRecordSize(size32_t recsize)
{
return new CFixedRecordSize(recsize);
}
class CDeltaRecordSize: public CInterface, public IRecordSize
{
protected:
Owned recordSize;
int delta;
public:
CDeltaRecordSize(IRecordSize * _recordSize, int _delta) { recordSize.set(_recordSize); delta = _delta; }
IMPLEMENT_IINTERFACE;
virtual size32_t getRecordSize(const void * data)
{
return recordSize->getRecordSize(data) + delta;
}
size32_t getFixedSize() const
{
return recordSize->getFixedSize()?recordSize->getFixedSize()+delta:0;
}
};
extern jlib_decl IRecordSize *createDeltaRecordSize(IRecordSize * size, int delta)
{
if (delta == 0)
return LINK(size);
return new CDeltaRecordSize(size, delta);
}
//==================================================================================================
// Elevator scanning
#define MAX_PENDING 20000
class ElevatorScanner;
class PendingFetch : public CInterface, public IInterface
{
public:
IMPLEMENT_IINTERFACE;
static int compare(const void *a, const void *b);
offset_t pos;
IReceiver *receiver;
void *target;
IRecordFetchChannel *channel;
};
class ElevatorChannel : public CInterface, implements IRecordFetchChannel
{
private:
bool cancelled;
bool immediate;
ElevatorScanner &scanner;
public:
IMPLEMENT_IINTERFACE;
ElevatorChannel(ElevatorScanner &, bool);
~ElevatorChannel();
//Interface IRecordFetchChannel
virtual void fetch(offset_t pos, void *buffer, IReceiver *receiver);
virtual void flush();
virtual void abort() { cancelled = true; }
virtual bool isAborted() { return cancelled; }
virtual bool isImmediate() { return immediate; }
};
class ElevatorScanner : public Thread, public IRecordFetcher
{
private:
Monitor scanlist;
Monitor isRoom;
PendingFetch pending[MAX_PENDING];
unsigned nextSlot;
size32_t recordSize;
int file;
offset_t reads;
unsigned scans;
bool stopped;
unsigned duetime;
void scan();
void doFetch(PendingFetch &);
void stop();
void resetTimer()
{
duetime = msTick()+timelimit;
}
public:
IMPLEMENT_IINTERFACE;
virtual void beforeDispose();
ElevatorScanner(int file, size32_t recordSize);
~ElevatorScanner();
//Interface IRecordFetcher
virtual IRecordFetchChannel *openChannel(bool immediate) { return new ElevatorChannel(*this, immediate); }
//Interface Thread
virtual int run();
void flush(IRecordFetchChannel *);
void fetch(offset_t, void *, IReceiver *, IRecordFetchChannel *);
};
int PendingFetch::compare(const void *a, const void *b)
{
offset_t aa = ((PendingFetch *) a)->pos;
offset_t bb = ((PendingFetch *) b)->pos;
if (aa > bb)
return 1;
else if (aa == bb)
return 0;
else
return -1;
}
ElevatorChannel::ElevatorChannel(ElevatorScanner &_scanner, bool _immediate) : scanner(_scanner)
{
scanner.Link();
cancelled = false;
immediate = _immediate;
}
ElevatorChannel::~ElevatorChannel()
{
flush();
scanner.Release();
}
void ElevatorChannel::fetch(offset_t fpos, void *buffer, IReceiver *receiver)
{
scanner.fetch(fpos, buffer, receiver, this);
}
void ElevatorChannel::flush()
{
scanner.flush(this);
}
ElevatorScanner::ElevatorScanner(int _file, size32_t _recordSize) : Thread("ElevatorScanner")
{
file = _file;
recordSize = _recordSize;
nextSlot = 0;
reads = 0;
scans = 0;
stopped = false;
start();
}
ElevatorScanner::~ElevatorScanner()
{
PrintLog("Elevator scanner statistics: %"I64F"d reads (%"I64F"d bytes), %d scans", reads, reads*recordSize, scans);
}
void ElevatorScanner::beforeDispose()
{
stop();
join();
}
void ElevatorScanner::fetch(offset_t fpos, void *buffer, IReceiver *receiver, IRecordFetchChannel *channel)
{
synchronized procedure(scanlist);
if (channel->isImmediate())
{
// MORE - atomic seek/read would be preferable!
checked_lseeki64(file, fpos, SEEK_SET);
checked_read(file, buffer, recordSize);
reads++;
if (!receiver->takeRecord(fpos))
channel->abort();
return;
}
{
synchronized block(isRoom);
while (nextSlot >= MAX_PENDING)
isRoom.wait();
}
if (!channel->isAborted())
{
pending[nextSlot].pos = fpos;
pending[nextSlot].receiver = receiver;
pending[nextSlot].target = buffer;
pending[nextSlot].channel = channel;
nextSlot++;
resetTimer();
scanlist.notify();
}
}
void ElevatorScanner::doFetch(PendingFetch &next)
{
if (!next.channel->isAborted())
{
// MORE - atomic seek/read would be preferable!
checked_lseeki64(file, next.pos, SEEK_SET);
checked_read(file, next.target, recordSize);
reads++;
if (!next.receiver->takeRecord(next.pos))
next.channel->abort();
}
}
void ElevatorScanner::scan()
{
PrintLog("Starting elevator scan of %d items", nextSlot);
scans++;
qsort(pending, nextSlot, sizeof(pending[0]), PendingFetch::compare);
for (unsigned i = 0; i < nextSlot; i++)
{
doFetch(pending[i]);
}
nextSlot = 0;
{
synchronized block(isRoom);
isRoom.notify();
}
PrintLog("Finished elevator scan");
}
void ElevatorScanner::flush(IRecordFetchChannel *)
{
// MORE - I could just flush what was asked for, but I may as well flush the lot.
synchronized procedure(scanlist);
if (nextSlot)
scan();
}
int ElevatorScanner::run()
{
scanlist.lock();
for (;;)
{
while (nextSlotLink();
num = 0;
recsize = 0;
pos = 0;
}
virtual ~CChainedWriteSeq()
{
::Release(stream);
allocator->Release();
}
void flush() { if (stream) stream->flush(); }
void put(const void *dst) { putn(dst,1); }
void putn(const void *dst, unsigned numrecs)
{
if (numrecs==0) return;
if (stream==NULL) return; // could raise exception instead
byte *out=(byte *)dst;
while (numrecs>num) {
stream->putn(out,num);
pos+=num;
numrecs-=num;
stream->flush();
IWriteSeq *oldstream=stream;
stream = allocator->next(num);
oldstream->Release();
if (!stream) {
return; // could raise exception
}
}
stream->putn(out,numrecs);
pos+=numrecs;
}
virtual size32_t getRecordSize() { if ((recsize==0)&&stream) recsize = stream->getRecordSize(); return recsize; }
virtual offset_t getPosition() { return pos; }
};
class CChainedReadSeq : public CInterface, public IReadSeq
{
protected:
IReadSeq *stream;
IReadSeqAllocator *allocator;
unsigned num;
size32_t recsize;
public:
IMPLEMENT_IINTERFACE;
CChainedReadSeq(IReadSeqAllocator *_allocator)
{
allocator = _allocator;
allocator->Link();
stream = allocator->next();
num = 0;
recsize = 0;
}
virtual ~CChainedReadSeq()
{
::Release(stream);
allocator->Release();
}
virtual bool get(void *dst) { return (getn(dst,1)==1); }
virtual unsigned getn(void *dst, unsigned n)
{
unsigned done=0;
while (stream&&n) {
unsigned r = stream->getn(dst,n);
if (r==0) {
IReadSeq *oldstream=stream;
stream = allocator->next();
oldstream->Release();
}
else {
n-=r;
done+=r;
}
}
return done;
}
virtual size32_t getRecordSize() { return stream->getRecordSize(); }
virtual void reset() { stream->reset(); }
virtual void stop() { stream->stop(); }
};
unsigned copySeq(IReadSeq *from,IWriteSeq *to,size32_t bufsize)
{
size32_t recsize=from->getRecordSize();
assertex(recsize==to->getRecordSize());
unsigned nbuf = bufsize/recsize;
if (nbuf==0)
nbuf = 1;
MemoryAttr ma;
byte *buf=(byte *)ma.allocate(nbuf*recsize);
unsigned ret = 0;
loop {
unsigned n = from->getn(buf,nbuf);
if (n==0)
break;
to->putn(buf,n);
ret += n;
}
return ret;
}
/////////////////
CBufferedIOStreamBase::CBufferedIOStreamBase(size32_t _bufferSize)
{
bufferSize = _bufferSize;
numInBuffer = 0;
curBufferOffset = 0;
reading = true;
minDirectSize = std::min(bufferSize/4,(size32_t)0x2000); // size where we don't bother copying into the buffer
}
size32_t CBufferedIOStreamBase::doread(size32_t len, void * data)
{
if (!reading)
{
doflush();
reading = true;
}
size32_t sizeGot = readFromBuffer(len, data);
len -= sizeGot;
if (len!=0)
{
data = (char *)data + sizeGot;
if (len >= minDirectSize)
sizeGot += directRead(len, data); // if direct read specified don't loop
else
{
do
{
if (!fillBuffer())
break;
size32_t numRead = readFromBuffer(len, data);
sizeGot += numRead;
len -= numRead;
data = (char *)data + numRead;
} while (len);
}
}
return sizeGot;
}
size32_t CBufferedIOStreamBase::dowrite(size32_t len, const void * data)
{
if (reading)
{
curBufferOffset = 0;
numInBuffer = 0;
reading = false;
}
size32_t ret = len;
while (len) { // tries to write in buffer size chunks, also flushes as soon as possible
size32_t wr;
if (numInBuffer != 0) {
wr = std::min(len,bufferSize-curBufferOffset);
writeToBuffer(wr, data);
if (numInBuffer==bufferSize)
doflush();
len -= wr;
if (len==0)
break;
data = (char *)data + wr;
}
if (len >= minDirectSize)
return directWrite(len, data)+ret-len;
wr = std::min(len,bufferSize);
writeToBuffer(wr, data);
if (numInBuffer==bufferSize)
doflush();
len -= wr;
data = (char *)data + wr;
}
return ret; // there is a bit of an assumption here that flush always works
}
////////////////////////////
class CBufferedIIOStream : public CBufferedIOStreamBase, implements IIOStream
{
Linked io;
public:
IMPLEMENT_IINTERFACE;
CBufferedIIOStream(IIOStream *_io, unsigned bufSize) : CBufferedIOStreamBase(bufSize), io(_io)
{
buffer = new byte[bufSize];
}
~CBufferedIIOStream()
{
try { flush(); }
catch (IException *)
{
delete [] buffer;
throw;
}
delete [] buffer;
}
virtual bool fillBuffer()
{
reading = true;
numInBuffer = io->read(bufferSize, buffer);
curBufferOffset = 0;
return numInBuffer!=0;
}
virtual size32_t directRead(size32_t len, void * data)
{
return io->read(len, data);
}
virtual size32_t directWrite(size32_t len, const void * data)
{
return io->write(len,data);
}
virtual void doflush()
{
if (!reading && numInBuffer)
{
//Copy numInBuffer out before flush so that destructor doesn't attempt to flush again.
size32_t numToWrite = numInBuffer;
numInBuffer = 0;
io->write(numToWrite, buffer);
curBufferOffset = 0;
}
}
// IIOStream impl.
virtual size32_t read(size32_t len, void * data) { return CBufferedIOStreamBase::doread(len, data); }
virtual size32_t write(size32_t len, const void * data) { return CBufferedIOStreamBase::dowrite(len, data); }
virtual void flush() { doflush(); }
};
IIOStream *createBufferedIOStream(IIOStream *io, unsigned bufSize)
{
if (bufSize == (unsigned)-1)
bufSize = DEFAULT_BUFFER_SIZE;
return new CBufferedIIOStream(io, bufSize);
}
IRowStream *createNullRowStream()
{
class cNullStream: public CInterface, implements IRowStream
{
const void *nextRow() { return NULL; }
void stop() {}
public:
IMPLEMENT_IINTERFACE;
cNullStream() {}
};
return new cNullStream;
}
unsigned copyRowStream(IRowStream *in, IRowWriter *out)
{
unsigned ret=0;
loop {
const void *row = in->nextRow();
if (!row)
break;
ret ++;
out->putRow(row);
}
return ret;
}
unsigned groupedCopyRowStream(IRowStream *in, IRowWriter *out)
{
unsigned ret=0;
loop {
const void *row = in->nextRow();
if (!row) {
row = in->nextRow();
if (!row)
break;
out->putRow(NULL);
}
ret ++;
out->putRow(row);
}
return ret;
}
unsigned ungroupedCopyRowStream(IRowStream *in, IRowWriter *out)
{
unsigned ret=0;
loop {
const void *row = in->ungroupedNextRow();
if (!row)
break;
ret ++;
out->putRow(row);
}
return ret;
}
class CConcatRowStream : public CInterface, public IRowStream
{
IArrayOf oinstreams;
public:
unsigned num;
unsigned idx;
IRowStream **in;
bool grouped;
bool needeog;
public:
IMPLEMENT_IINTERFACE;
CConcatRowStream(unsigned _num, IRowStream **_in,bool _grouped)
{
// in streams assumed valid throughout (i.e. not linked)
num = _num;
idx = 0;
assertex(num);
oinstreams.ensure(num);
for (unsigned n = 0;nnextRow();
if (row) {
needeog = true;
return row;
}
if (needeog) {
needeog = false;
return NULL;
}
}
else {
row = in[idx]->ungroupedNextRow();
if (row)
return row;
}
idx++;
}
return NULL;
}
virtual void stop()
{
while (idxstop();
}
};
extern jlib_decl IWriteSeq *createChainedWriteSeq(IWriteSeqAllocator *iwsa)
{
return new CChainedWriteSeq(iwsa);
}
extern jlib_decl IReadSeq *createChainedReadSeq(IReadSeqAllocator *irsa)
{
return new CChainedReadSeq(irsa);
}
IRowStream *createConcatRowStream(unsigned numstreams,IRowStream** streams,bool grouped)
{
return new CConcatRowStream(numstreams,streams,grouped);
}
#ifdef __x86_64__
void writeStringToStream(IIOStream &out, const char *s) { out.write((size32_t)strlen(s), s); }
void writeCharsNToStream(IIOStream &out, char c, unsigned cnt) { while(cnt--) out.write(1, &c); }
void writeCharToStream(IIOStream &out, char c) { out.write(1, &c); }
#endif