123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857 |
- /*##############################################################################
- Copyright (C) 2011 HPCC Systems.
- All rights reserved. This program is free software: you can redistribute it and/or modify
- it under the terms of the GNU Affero General Public License as
- published by the Free Software Foundation, either version 3 of the
- License, or (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU Affero General Public License for more details.
- You should have received a copy of the GNU Affero General Public License
- along with this program. If not, see <http://www.gnu.org/licenses/>.
- ############################################################################## */
- #include "jio.hpp"
- #include "jsort.hpp"
- #include "jfile.hpp"
- #include "jlzw.hpp"
- #include "jset.hpp"
- #include "commonext.hpp"
- #include "dadfs.hpp"
- #include "thactivityutil.ipp"
- #include "backup.hpp"
- #include "slave.ipp"
- #include "thbuf.hpp"
- #include "thbufdef.hpp"
- #include "thcrc.hpp"
- #include "thexception.hpp"
- #include "thmfilemanager.hpp"
- #include "thormisc.hpp"
- #include "thorport.hpp"
- //#define TRACE_STARTSTOP_EXCEPTIONS
- #ifdef _DEBUG
- //#define _FULL_TRACE
- #endif
- #define MAX_ROW_ARRAY_SIZE (0x100000*64) // 64MB
- #define TRANSFER_TIMEOUT (60*60*1000)
- #define JOIN_TIMEOUT (10*60*1000)
- #ifdef _MSC_VER
- #pragma warning(push)
- #pragma warning( disable : 4355 )
- #endif
- class ThorLookaheadCache: public IThorDataLink, public CSimpleInterface
- {
- rowcount_t count;
- Linked<IThorDataLink> in;
- Owned<ISmartRowBuffer> smartbuf;
- size32_t bufsize;
- CActivityBase &activity;
- bool allowspill, preserveLhsGrouping;
- ISmartBufferNotify *notify;
- bool running;
- bool stopped;
- rowcount_t required;
- Semaphore startsem;
- bool started;
- Owned<IException> startexception;
- Owned<IException> getexception;
- bool asyncstart;
- class Cthread: public Thread
- {
- ThorLookaheadCache &parent;
- public:
- Cthread(ThorLookaheadCache &_parent)
- : Thread("ThorLookaheadCache"), parent(_parent)
- {
- }
- int run()
- {
- return parent.run();
- }
- } thread;
- public:
-
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- int run()
- {
- if (!started) {
- try {
- in->start();
- started = true;
- }
- catch(IException * e)
- {
- ActPrintLog(&activity, e, "ThorLookaheadCache starting input");
- startexception.setown(e);
- if (asyncstart)
- notify->onInputStarted(startexception);
- running = false;
- stopped = true;
- startsem.signal();
- return 0;
- }
- }
- try {
- StringBuffer temp;
- if (allowspill)
- GetTempName(temp,"lookahd",true);
- assertex(bufsize);
- if (allowspill)
- smartbuf.setown(createSmartBuffer(&activity, temp.toCharArray(), bufsize, queryRowInterfaces(in)));
- else
- smartbuf.setown(createSmartInMemoryBuffer(&activity, queryRowInterfaces(in), bufsize));
- if (notify)
- notify->onInputStarted(NULL);
- startsem.signal();
- Linked<IRowWriter> writer = smartbuf->queryWriter();
- if (preserveLhsGrouping)
- {
- while (required&&running)
- {
- OwnedConstThorRow row = in->nextRow();
- if (!row)
- {
- row.setown(in->nextRow());
- if (!row)
- break;
- else
- writer->putRow(NULL); // eog
- }
- ++count;
- writer->putRow(row.getClear());
- if (required!=RCUNBOUND)
- required--;
- }
- }
- else
- {
- while (required&&running)
- {
- OwnedConstThorRow row = in->ungroupedNextRow();
- if (!row)
- break;
- ++count;
- writer->putRow(row.getClear());
- if (required!=RCUNBOUND)
- required--;
- }
- }
- }
- catch(IException * e)
- {
- ActPrintLog(&activity, e, "ThorLookaheadCache get exception");
- getexception.setown(e);
- }
- if (notify)
- notify->onInputFinished(count);
- if (smartbuf)
- smartbuf->queryWriter()->flush();
- running = false;
- try {
- if (in)
- in->stop();
- }
- catch(IException * e)
- {
- ActPrintLog(&activity, e, "ThorLookaheadCache stop exception");
- if (!getexception.get())
- getexception.setown(e);
- }
- return 0;
- }
-
- ThorLookaheadCache(CActivityBase &_activity, IThorDataLink *_in,size32_t _bufsize,bool _allowspill,bool _preserveLhsGrouping, rowcount_t _required,ISmartBufferNotify *_notify, bool _instarted, IDiskUsage *_iDiskUsage)
- : thread(*this), activity(_activity), in(_in)
- {
- #ifdef _FULL_TRACE
- ActPrintLog(&activity, "ThorLookaheadCache create %x",(unsigned)(memsize_t)this);
- #endif
- asyncstart = false;
- allowspill = _allowspill;
- preserveLhsGrouping = _preserveLhsGrouping;
- assertex((unsigned)-1 != _bufsize); // no longer supported
- bufsize = _bufsize?_bufsize:(0x40000*3); // use .75 MB buffer if bufsize omitted
- notify = _notify;
- running = true;
- required = _required;
- count = 0;
- stopped = true;
- started = _instarted;
- }
- ~ThorLookaheadCache()
- {
- if (!thread.join(1000*60))
- ActPrintLogEx(&activity.queryContainer(), thorlog_all, MCuserWarning, "ThorLookaheadCache join timedout");
- }
- void start()
- {
- #ifdef _FULL_TRACE
- ActPrintLog(&activity, "ThorLookaheadCache start %x",(unsigned)(memsize_t)this);
- #endif
- stopped = false;
- asyncstart = notify&¬ify->startAsync();
- thread.start();
- if (!asyncstart) {
- startsem.wait();
- if (startexception)
- throw startexception.getClear();
- }
- }
- void stop()
- {
- #ifdef _FULL_TRACE
- ActPrintLog(&activity, "ThorLookaheadCache stop %x",(unsigned)(memsize_t)this);
- #endif
- if (!stopped) {
- running = false;
- if (smartbuf)
- smartbuf->stop(); // just in case blocked
- thread.join();
- stopped = true;
- if (getexception)
- throw getexception.getClear();
- }
- }
- const void *nextRow()
- {
- OwnedConstThorRow row = smartbuf->nextRow();
- if (getexception)
- throw getexception.getClear();
- if (!row) {
- #ifdef _FULL_TRACE
- ActPrintLog(&activity, "ThorLookaheadCache eos %x",(unsigned)(memsize_t)this);
- #endif
- }
- return row.getClear();
- }
-
-
- bool isGrouped() { return false; }
-
- void getMetaInfo(ThorDataLinkMetaInfo &info)
- {
- memset(&info,0,sizeof(info));
- in->getMetaInfo(info);
- // more TBD
- }
- CActivityBase *queryFromActivity()
- {
- return in->queryFromActivity();
- }
- void dataLinkSerialize(MemoryBuffer &mb)
- {
- // no serialization information (yet)
- }
- unsigned __int64 queryTotalCycles() const { return in->queryTotalCycles(); }
- };
- #ifdef _MSC_VER
- #pragma warning(pop)
- #endif
- IThorDataLink *createDataLinkSmartBuffer(CActivityBase *activity, IThorDataLink *in, size32_t bufsize, bool allowspill, bool preserveLhsGrouping, rowcount_t maxcount, ISmartBufferNotify *notify, bool instarted, IDiskUsage *iDiskUsage)
- {
- return new ThorLookaheadCache(*activity, in,bufsize,allowspill,preserveLhsGrouping,maxcount,notify,instarted,iDiskUsage);
- }
- void CThorDataLink::initMetaInfo(ThorDataLinkMetaInfo &info)
- {
- memset(&info,0,sizeof(info));
- //info.rowsdone = xx;
- info.totalRowsMin = 0;
- info.totalRowsMax = -1; // rely on inputs to set
- info.spilled = (offset_t)-1;
- info.byteTotal = (offset_t)-1;
- info.rowsOutput = getDataLinkCount();
- // more
- }
- void CThorDataLink::calcMetaInfoSize(ThorDataLinkMetaInfo &info,IThorDataLink *link)
- {
- if (!info.unknownRowsOutput&&link&&((info.totalRowsMin<=0)||(info.totalRowsMax<0))) {
- ThorDataLinkMetaInfo prev;
- link->getMetaInfo(prev);
- if (info.totalRowsMin<=0) {
- if (!info.canReduceNumRows)
- info.totalRowsMin = prev.totalRowsMin;
- else
- info.totalRowsMin = 0;
- }
- if (info.totalRowsMax<0) {
- if (!info.canIncreaseNumRows) {
- info.totalRowsMax = prev.totalRowsMax;
- if (info.totalRowsMin>info.totalRowsMax)
- info.totalRowsMax = -1;
- }
- }
- if (((offset_t)-1 != prev.byteTotal) && info.totalRowsMin == info.totalRowsMax)
- info.byteTotal = prev.byteTotal;
- }
- else if (info.totalRowsMin<0)
- info.totalRowsMin = 0; // a good bet
- }
- void CThorDataLink::calcMetaInfoSize(ThorDataLinkMetaInfo &info,IThorDataLink **link,unsigned ninputs)
- {
- if (!link||(ninputs<=1)) {
- calcMetaInfoSize(info,link&&(ninputs==1)?link[0]:NULL);
- return ;
- }
- if (!info.unknownRowsOutput) {
- __int64 min=0;
- __int64 max=0;
- for (unsigned i=0;i<ninputs;i++ ) {
- if (link[i]) {
- ThorDataLinkMetaInfo prev;
- link[i]->getMetaInfo(prev);
- if (min>=0) {
- if (prev.totalRowsMin>=0)
- min += prev.totalRowsMin;
- else
- min = -1;
- }
- if (max>=0) {
- if (prev.totalRowsMax>=0)
- max += prev.totalRowsMax;
- else
- max = -1;
- }
- }
- }
- if (info.totalRowsMin<=0) {
- if (!info.canReduceNumRows)
- info.totalRowsMin = min;
- else
- info.totalRowsMin = 0;
- }
- if (info.totalRowsMax<0) {
- if (!info.canIncreaseNumRows) {
- info.totalRowsMax = max;
- if (info.totalRowsMin>info.totalRowsMax)
- info.totalRowsMax = -1;
- }
- }
- }
- else if (info.totalRowsMin<0)
- info.totalRowsMin = 0; // a good bet
- }
- void CThorDataLink::calcMetaInfoSize(ThorDataLinkMetaInfo &info, ThorDataLinkMetaInfo *infos,unsigned num)
- {
- if (!infos||(num<=1)) {
- if (1 == num)
- info = infos[0];
- return;
- }
- if (!info.unknownRowsOutput) {
- __int64 min=0;
- __int64 max=0;
- for (unsigned i=0;i<num;i++ ) {
- ThorDataLinkMetaInfo &prev = infos[i];
- if (min>=0) {
- if (prev.totalRowsMin>=0)
- min += prev.totalRowsMin;
- else
- min = -1;
- }
- if (max>=0) {
- if (prev.totalRowsMax>=0)
- max += prev.totalRowsMax;
- else
- max = -1;
- }
- }
- if (info.totalRowsMin<=0) {
- if (!info.canReduceNumRows)
- info.totalRowsMin = min;
- else
- info.totalRowsMin = 0;
- }
- if (info.totalRowsMax<0) {
- if (!info.canIncreaseNumRows) {
- info.totalRowsMax = max;
- if (info.totalRowsMin>info.totalRowsMax)
- info.totalRowsMax = -1;
- }
- }
- }
- else if (info.totalRowsMin<0)
- info.totalRowsMin = 0; // a good bet
- }
- static bool canStall(CActivityBase *act)
- {
- if (!act)
- return false;
- unsigned i=0;
- IThorDataLink *inp;
- while ((inp=((CSlaveActivity *)act)->queryInput(i++))!=NULL) {
- ThorDataLinkMetaInfo info;
- inp->getMetaInfo(info);
- if (info.canStall)
- return true;
- if (!info.isSource&&!info.buffersInput&&!info.canBufferInput)
- if (canStall((CSlaveActivity *)inp->queryFromActivity()))
- return true;
- }
- return false;
- }
- bool isSmartBufferSpillNeeded(CActivityBase *act)
- {
- // two part - forward and reverse checking
- // first reverse looking for stalling activities
- if (!canStall((CSlaveActivity *)act))
- return false;
- // now check
- return true;
- }
- bool checkSavedFileCRC(IFile * ifile, bool & timesDiffer, unsigned & storedCrc)
- {
- StringBuffer s(ifile->queryFilename());
- s.append(".crc");
- Owned<IFile> crcFile = createIFile(s.str());
- size32_t crcSz = (size32_t)crcFile->size();
- Owned<IFileIO> crcIO = crcFile->open(IFOread);
- bool performCrc = false;
- timesDiffer = false;
- if (crcIO)
- {
- Owned<IFileIOStream> crcStream = createIOStream(crcIO);
- if (sizeof(storedCrc) == crcSz) // backward compat. if = in size to just crc (no date stamps)
- {
- verifyex(crcSz == crcStream->read(crcSz, &storedCrc));
- performCrc = true;
- }
- else
- {
- size32_t sz;
- verifyex(sizeof(sz) == crcStream->read(sizeof(sz), &sz));
- void *mem = malloc(sz);
- MemoryBuffer mb;
- mb.setBuffer(sz, mem, true);
- verifyex(sz == crcStream->read(sz, mem));
- CDateTime storedCreateTime(mb);
- CDateTime storedModifiedTime(mb);
- CDateTime createTime, modifiedTime, accessedTime;
- ifile->getTime(&createTime, &modifiedTime, &accessedTime);
- if (!storedCreateTime.equals(createTime) || !storedModifiedTime.equals(modifiedTime))
- timesDiffer = true;
- else
- {
- mb.read(storedCrc);
- performCrc = true;
- }
- }
- }
- return performCrc;
- }
- static void _doReplicate(CActivityBase *activity, IPartDescriptor &partDesc, ICopyFileProgress *iProgress)
- {
- StringBuffer primaryName;
- getPartFilename(partDesc, 0, primaryName);;
- RemoteFilename rfn;
- IFileDescriptor &fileDesc = partDesc.queryOwner();
- unsigned copies = partDesc.numCopies();
- unsigned c=1;
- for (; c<copies; c++)
- {
- unsigned replicateCopy;
- unsigned clusterNum = partDesc.copyClusterNum(c, &replicateCopy);
- rfn.clear();
- partDesc.getFilename(c, rfn);
- StringBuffer dstName;
- rfn.getPath(dstName);
- assertex(dstName.length());
- if (replicateCopy>0 )
- {
- try
- {
- queryThor().queryBackup().backup(dstName.str(), primaryName.str());
- }
- catch (IException *e)
- {
- Owned<IThorException> re = MakeActivityWarning(activity, e, "Failed to create replicate file '%s'", dstName.str());
- e->Release();
- activity->fireException(re);
- }
- }
- else // another primary
- {
- ActPrintLog(activity, "Copying to primary %s", dstName.str());
- StringBuffer tmpName(dstName.str());
- tmpName.append(".tmp");
- OwnedIFile tmpIFile = createIFile(tmpName.str());
- OwnedIFile srcFile = createIFile(primaryName.str());
- CFIPScope fipScope(tmpName.str());
- try
- {
- try
- {
- ensureDirectoryForFile(dstName.str());
- ::copyFile(tmpIFile, srcFile, 0x100000, iProgress);
- }
- catch (IException *e)
- {
- IThorException *re = MakeActivityException(activity, e, "Failed to copy to tmp file '%s' from source file '%s'", tmpIFile->queryFilename(), srcFile->queryFilename());
- e->Release();
- throw re;
- }
- try
- {
- OwnedIFile dstIFile = createIFile(dstName.str());
- dstIFile->remove();
- tmpIFile->rename(pathTail(dstName.str()));
- }
- catch (IException *e)
- {
- IThorException *re = ThorWrapException(e, "Failed to rename '%s' to '%s'", tmpName.str(), dstName.str());
- e->Release();
- throw re;
- }
- }
- catch (IException *)
- {
- try { tmpIFile->remove(); }
- catch (IException *e) { ActPrintLog(&activity->queryContainer(), e, NULL); e->Release(); }
- throw;
- }
- }
- }
- }
- void cancelReplicates(CActivityBase *activity, IPartDescriptor &partDesc)
- {
- RemoteFilename rfn;
- IFileDescriptor &fileDesc = partDesc.queryOwner();
- unsigned copies = partDesc.numCopies();
- unsigned c=1;
- for (; c<copies; c++)
- {
- unsigned replicateCopy;
- unsigned clusterNum = partDesc.copyClusterNum(c, &replicateCopy);
- rfn.clear();
- partDesc.getFilename(c, rfn);
- StringBuffer dstName;
- rfn.getPath(dstName);
- assertex(dstName.length());
- if (replicateCopy>0)
- {
- try
- {
- queryThor().queryBackup().cancel(dstName.str());
- }
- catch (IException *e)
- {
- Owned<IThorException> re = MakeActivityException(activity, e, "Error cancelling backup '%s'", dstName.str());
- ActPrintLog(&activity->queryContainer(), e, NULL);
- e->Release();
- }
- }
- }
- }
- void doReplicate(CActivityBase *activity, IPartDescriptor &partDesc, ICopyFileProgress *iProgress)
- {
- try
- {
- _doReplicate(activity, partDesc, iProgress);
- }
- catch (IException *e)
- {
- Owned<IThorException> e2 = MakeActivityWarning(activity, e, "doReplicate");
- e->Release();
- activity->fireException(e2);
- }
- }
- class CWriteHandler : public CSimpleInterface, implements IFileIO
- {
- Linked<IFileIO> primaryio;
- Linked<IFile> primary;
- StringBuffer primaryName;
- ICopyFileProgress *iProgress;
- bool *aborted;
- CActivityBase &activity;
- IPartDescriptor &partDesc;
- bool remote, direct, renameToPrimary;
- CFIPScope fipScope;
- public:
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- CWriteHandler(CActivityBase &_activity, IPartDescriptor &_partDesc, IFile *_primary, IFileIO *_primaryio, ICopyFileProgress *_iProgress, bool _direct, bool _renameToPrimary, bool *_aborted)
- : activity(_activity), partDesc(_partDesc), primary(_primary), primaryio(_primaryio), iProgress(_iProgress), direct(_direct), renameToPrimary(_renameToPrimary), aborted(_aborted), fipScope(primary->queryFilename())
- {
- RemoteFilename rfn;
- partDesc.getFilename(0, rfn);
- remote = !rfn.isLocal();
- rfn.getPath(primaryName);
- if (globals->getPropBool("@replicateAsync", true))
- cancelReplicates(&activity, partDesc);
- }
- ~CWriteHandler()
- {
- primaryio.clear(); // should close
- if (aborted && *aborted) return;
- if (renameToPrimary)
- {
- OwnedIFile tmpIFile;
- CFIPScope fipScope;
- if (remote)
- {
- StringBuffer tmpName(primaryName.str());
- tmpName.append(".tmp");
- tmpIFile.setown(createIFile(tmpName.str()));
- fipScope.set(tmpName.str());
- try
- {
- try
- {
- ensureDirectoryForFile(primaryName.str());
- ::copyFile(tmpIFile, primary, 0x100000, iProgress);
- }
- catch (IException *e)
- {
- IThorException *re = ThorWrapException(e, "Failed to copy local temp file '%s' to remote temp location '%s'", primary->queryFilename(), tmpIFile->queryFilename());
- e->Release();
- throw re;
- }
- }
- catch (IException *)
- {
- try { tmpIFile->remove(); }
- catch (IException *e) { ActPrintLog(&activity.queryContainer(), e, NULL); e->Release(); }
- }
- }
- else
- tmpIFile.setown(createIFile(primary->queryFilename()));
- try
- {
- try
- {
- OwnedIFile dstIFile = createIFile(primaryName.str());
- dstIFile->remove();
- tmpIFile->rename(pathTail(primaryName.str()));
- }
- catch (IException *e)
- {
- IThorException *re = ThorWrapException(e, "Failed to rename '%s' to '%s'", tmpIFile->queryFilename(), primaryName.str());
- e->Release();
- throw re;
- }
- }
- catch (IException *)
- {
- try { primary->remove(); }
- catch (IException *e) { ActPrintLog(&activity.queryContainer(), e, NULL); e->Release(); }
- throw;
- }
- primary->remove();
- fipScope.clear();
- }
- if (partDesc.numCopies()>1)
- _doReplicate(&activity, partDesc, iProgress);
- }
- // IFileIO impl.
- virtual size32_t read(offset_t pos, size32_t len, void * data) { return primaryio->read(pos, len, data); }
- virtual offset_t size() { return primaryio->size(); }
- virtual size32_t write(offset_t pos, size32_t len, const void * data) { return primaryio->write(pos, len, data); }
- virtual offset_t appendFile(IFile *file,offset_t pos=0,offset_t len=-1) { return primaryio->appendFile(file, pos, len); }
- virtual void setSize(offset_t size) { primaryio->setSize(size); }
- virtual void flush() { primaryio->flush(); }
- virtual void close() { primaryio->close(); }
- };
- IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, bool &compress, bool extend, ICompressor *ecomp, ICopyFileProgress *iProgress, bool direct, bool renameToPrimary, bool *aborted, StringBuffer *_outLocationName)
- {
- StringBuffer outLocationNameI;
- StringBuffer &outLocationName = _outLocationName?*_outLocationName:outLocationNameI;
- RemoteFilename rfn;
- partDesc.getFilename(0, rfn);
- StringBuffer primaryName;
- rfn.getPath(primaryName);
- if (direct)
- {
- if (0 == outLocationName.length())
- outLocationName.append(primaryName.str());
- }
- else
- {
- // use temp name
- GetTempName(outLocationName, "partial");
- if (rfn.isLocal())
- { // ensure local tmp in same directory as target
- StringBuffer dir;
- splitDirTail(primaryName, dir);
- addPathSepChar(dir);
- dir.append(pathTail(outLocationName));
- outLocationName.swapWith(dir);
- }
- assertex(outLocationName.length());
- ensureDirectoryForFile(outLocationName.str());
- }
- OwnedIFile file = createIFile(outLocationName.str());
- Owned<IFileIO> fileio;
- if (compress)
- {
- fileio.setown(createCompressedFileWriter(file, recordSize, extend, true, ecomp));
- if (!fileio)
- {
- compress = false;
- Owned<IThorException> e = MakeActivityWarning(activity, TE_LargeBufferWarning, "Could not write file '%s' compressed", outLocationName.str());
- activity->fireException(e);
- fileio.setown(file->open(extend&&file->exists()?IFOwrite:IFOcreate));
- }
- }
- else
- fileio.setown(file->open(extend&&file->exists()?IFOwrite:IFOcreate));
- if (!fileio)
- throw MakeActivityException(activity, TE_FileCreationFailed, "Failed to create file for write (%s) error = %d", outLocationName.str(), GetLastError());
- ActPrintLog(activity, "Writing to file: %s", file->queryFilename());
- return new CWriteHandler(*activity, partDesc, file, fileio, iProgress, direct, renameToPrimary, aborted);
- }
- StringBuffer &locateFilePartPath(CActivityBase *activity, const char *logicalFilename, IPartDescriptor &partDesc, StringBuffer &filePath)
- {
- unsigned location;
- OwnedIFile ifile;
- if (globals->getPropBool("@autoCopyBackup", true)?ensurePrimary(activity, partDesc, ifile, location, filePath):getBestFilePart(activity, partDesc, ifile, location, filePath, activity))
- ActPrintLog(activity, "reading physical file '%s' (logical file = %s)", filePath.str(), logicalFilename);
- else
- {
- StringBuffer locations;
- IException *e = MakeActivityException(activity, TE_FileNotFound, "No physical file part for logical file %s, found at given locations: %s (Error = %d)", logicalFilename, getFilePartLocations(partDesc, locations).str(), GetLastError());
- ActPrintLog(&activity->queryContainer(), e, NULL);
- throw e;
- }
- return filePath;
- }
- IRowStream *createSequentialPartHandler(CPartHandler *partHandler, IArrayOf<IPartDescriptor> &partDescs, bool grouped)
- {
- class CSeqPartHandler : public CSimpleInterface, implements IRowStream
- {
- IArrayOf<IPartDescriptor> &partDescs;
- int part, parts;
- bool eof, grouped, someInGroup;
- Linked<CPartHandler> partHandler;
- IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
- public:
- CSeqPartHandler(CPartHandler *_partHandler, IArrayOf<IPartDescriptor> &_partDescs, bool _grouped)
- : partDescs(_partDescs), partHandler(_partHandler), grouped(_grouped)
- {
- part = 0;
- parts = partDescs.ordinality();
- someInGroup = false;
- if (0==parts)
- {
- eof = true;
- }
- else
- {
- eof = false;
- partHandler->setPart(&partDescs.item(0), 0);
- }
- }
- virtual void stop()
- {
- if (partHandler)
- {
- partHandler->stop();
- partHandler.clear();
- }
- }
- const void *nextRow()
- {
- if (eof)
- {
- return NULL;
- }
- loop
- {
- OwnedConstThorRow row = partHandler->nextRow();
- if (row)
- {
- someInGroup = true;
- return row.getClear();
- }
- if (grouped && someInGroup)
- {
- someInGroup = false;
- return NULL;
- }
- partHandler->stop();
- ++part;
- if (part >= parts)
- {
- partHandler.clear();
- eof = true;
- return NULL;
- }
- partHandler->setPart(&partDescs.item(part), part);
- }
- }
- };
- return new CSeqPartHandler(partHandler, partDescs, grouped);
- }
- // CThorRowAggregator impl.
- AggregateRowBuilder &CThorRowAggregator::addRow(const void * row)
- {
- return RowAggregator::addRow(row);
- }
- void CThorRowAggregator::mergeElement(const void * otherElement)
- {
- RowAggregator::mergeElement(otherElement);
- }
|