|
@@ -656,14 +656,15 @@ class CWriteHandler : public CSimpleInterface, implements IFileIO
|
|
|
bool *aborted;
|
|
|
CActivityBase &activity;
|
|
|
IPartDescriptor &partDesc;
|
|
|
- bool remote, direct, renameToPrimary;
|
|
|
+ bool remote;
|
|
|
CFIPScope fipScope;
|
|
|
+ unsigned twFlags;
|
|
|
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE_USING(CSimpleInterface);
|
|
|
|
|
|
- CWriteHandler(CActivityBase &_activity, IPartDescriptor &_partDesc, IFile *_primary, IFileIO *_primaryio, ICopyFileProgress *_iProgress, bool _direct, bool _renameToPrimary, bool *_aborted)
|
|
|
- : activity(_activity), partDesc(_partDesc), primary(_primary), primaryio(_primaryio), iProgress(_iProgress), direct(_direct), renameToPrimary(_renameToPrimary), aborted(_aborted), fipScope(primary->queryFilename())
|
|
|
+ CWriteHandler(CActivityBase &_activity, IPartDescriptor &_partDesc, IFile *_primary, IFileIO *_primaryio, ICopyFileProgress *_iProgress, unsigned _twFlags, bool *_aborted)
|
|
|
+ : activity(_activity), partDesc(_partDesc), primary(_primary), primaryio(_primaryio), iProgress(_iProgress), twFlags(_twFlags), aborted(_aborted), fipScope(primary->queryFilename())
|
|
|
{
|
|
|
RemoteFilename rfn;
|
|
|
partDesc.getFilename(0, rfn);
|
|
@@ -680,11 +681,11 @@ public:
|
|
|
primary->remove(); // i.e. never completed, so remove partial (temp) primary
|
|
|
return;
|
|
|
}
|
|
|
- if (renameToPrimary)
|
|
|
+ if (twFlags & TW_RenameToPrimary)
|
|
|
{
|
|
|
OwnedIFile tmpIFile;
|
|
|
CFIPScope fipScope;
|
|
|
- if (remote)
|
|
|
+ if (remote && !(twFlags & TW_External))
|
|
|
{
|
|
|
StringBuffer tmpName(primaryName.str());
|
|
|
tmpName.append(".tmp");
|
|
@@ -749,7 +750,7 @@ public:
|
|
|
virtual void close() { primaryio->close(); }
|
|
|
};
|
|
|
|
|
|
-IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, bool &compress, bool extend, ICompressor *ecomp, ICopyFileProgress *iProgress, bool direct, bool renameToPrimary, bool *aborted, StringBuffer *_outLocationName)
|
|
|
+IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc, unsigned recordSize, unsigned twFlags, bool &compress, ICompressor *ecomp, ICopyFileProgress *iProgress, bool *aborted, StringBuffer *_outLocationName)
|
|
|
{
|
|
|
StringBuffer outLocationNameI;
|
|
|
StringBuffer &outLocationName = _outLocationName?*_outLocationName:outLocationNameI;
|
|
@@ -757,8 +758,8 @@ IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc,
|
|
|
RemoteFilename rfn;
|
|
|
partDesc.getFilename(0, rfn);
|
|
|
StringBuffer primaryName;
|
|
|
- rfn.getPath(primaryName);
|
|
|
- if (direct)
|
|
|
+ rfn.getPath(primaryName);
|
|
|
+ if (twFlags & TW_Direct)
|
|
|
{
|
|
|
if (0 == outLocationName.length())
|
|
|
outLocationName.append(primaryName.str());
|
|
@@ -767,7 +768,7 @@ IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc,
|
|
|
{
|
|
|
// use temp name
|
|
|
GetTempName(outLocationName, "partial");
|
|
|
- if (rfn.isLocal())
|
|
|
+ if (rfn.isLocal() || (twFlags & TW_External))
|
|
|
{ // ensure local tmp in same directory as target
|
|
|
StringBuffer dir;
|
|
|
splitDirTail(primaryName, dir);
|
|
@@ -784,21 +785,21 @@ IFileIO *createMultipleWrite(CActivityBase *activity, IPartDescriptor &partDesc,
|
|
|
{
|
|
|
if (activity->getOptBool(THOROPT_COMP_FORCELZW, false))
|
|
|
recordSize = 0; // by default if fixed length (recordSize set), row diff compression is used. This forces LZW
|
|
|
- fileio.setown(createCompressedFileWriter(file, recordSize, extend, true, ecomp));
|
|
|
+ fileio.setown(createCompressedFileWriter(file, recordSize, 0 != (twFlags & TW_Extend), true, ecomp));
|
|
|
if (!fileio)
|
|
|
{
|
|
|
compress = false;
|
|
|
Owned<IThorException> e = MakeActivityWarning(activity, TE_LargeBufferWarning, "Could not write file '%s' compressed", outLocationName.str());
|
|
|
activity->fireException(e);
|
|
|
- fileio.setown(file->open(extend&&file->exists()?IFOwrite:IFOcreate));
|
|
|
+ fileio.setown(file->open((twFlags & TW_Extend)&&file->exists()?IFOwrite:IFOcreate));
|
|
|
}
|
|
|
}
|
|
|
else
|
|
|
- fileio.setown(file->open(extend&&file->exists()?IFOwrite:IFOcreate));
|
|
|
+ fileio.setown(file->open((twFlags & TW_Extend)&&file->exists()?IFOwrite:IFOcreate));
|
|
|
if (!fileio)
|
|
|
throw MakeActivityException(activity, TE_FileCreationFailed, "Failed to create file for write (%s) error = %d", outLocationName.str(), GetLastError());
|
|
|
ActPrintLog(activity, "Writing to file: %s, compress=%s, rdiff=%s", file->queryFilename(), compress ? "true" : "false", (compress && recordSize) ? "true" : "false");
|
|
|
- return new CWriteHandler(*activity, partDesc, file, fileio, iProgress, direct, renameToPrimary, aborted);
|
|
|
+ return new CWriteHandler(*activity, partDesc, file, fileio, iProgress, twFlags, aborted);
|
|
|
}
|
|
|
|
|
|
StringBuffer &locateFilePartPath(CActivityBase *activity, const char *logicalFilename, IPartDescriptor &partDesc, StringBuffer &filePath)
|