|
@@ -82,15 +82,10 @@ protected:
|
|
|
Owned<IFileIO> current;
|
|
|
Owned<IMemoryMappedFile> mmapped;
|
|
|
mutable CriticalSection crit;
|
|
|
- bool memFileRequested;
|
|
|
- StringAttr id;
|
|
|
bool remote;
|
|
|
offset_t fileSize;
|
|
|
CDateTime fileDate;
|
|
|
unsigned crc;
|
|
|
- Owned<ILazyFileIO> patchFile;
|
|
|
- StringBuffer baseIndexFileName;
|
|
|
- RoxieFileType fileType;
|
|
|
unsigned lastAccess;
|
|
|
bool copying;
|
|
|
bool isCompressed;
|
|
@@ -103,9 +98,8 @@ protected:
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
|
|
|
- CLazyFileIO(const char *_id, RoxieFileType _fileType, IFile *_logical, offset_t size, const CDateTime &_date, bool _memFileRequested, unsigned _crc, bool _isCompressed)
|
|
|
- : id(_id),
|
|
|
- fileType(_fileType), logical(_logical), fileSize(size), crc(_crc), isCompressed(_isCompressed)
|
|
|
+ CLazyFileIO(IFile *_logical, offset_t size, const CDateTime &_date, unsigned _crc, bool _isCompressed)
|
|
|
+ : logical(_logical), fileSize(size), crc(_crc), isCompressed(_isCompressed)
|
|
|
{
|
|
|
fileDate.set(_date);
|
|
|
currentIdx = 0;
|
|
@@ -114,7 +108,6 @@ public:
|
|
|
#ifdef FAIL_20_READ
|
|
|
readCount = 0;
|
|
|
#endif
|
|
|
- memFileRequested = _memFileRequested;
|
|
|
lastAccess = msTick();
|
|
|
copying = false;
|
|
|
cached = NULL;
|
|
@@ -239,39 +232,32 @@ public:
|
|
|
if ((openCount % 5) == 0)
|
|
|
throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Pretending to fail on an open");
|
|
|
#endif
|
|
|
-#if 0
|
|
|
- if (memFileRequested && &sources.item(currentIdx)==logical)
|
|
|
- current.setown(createMemoryFile(logical));
|
|
|
+ IFile *f = &sources.item(currentIdx);
|
|
|
+ if (firstTime)
|
|
|
+ cacheFileConnect(f, dafilesrvLookupTimeout); // set timeout to 10 seconds
|
|
|
else
|
|
|
-#endif
|
|
|
{
|
|
|
- IFile *f = &sources.item(currentIdx);
|
|
|
- if (firstTime)
|
|
|
- cacheFileConnect(f, dafilesrvLookupTimeout); // set timeout to 10 seconds
|
|
|
- else
|
|
|
- {
|
|
|
- if (traceLevel > 10)
|
|
|
- DBGLOG("Looking for file using non-cached file open");
|
|
|
- }
|
|
|
+ if (traceLevel > 10)
|
|
|
+ DBGLOG("Looking for file using non-cached file open");
|
|
|
+ }
|
|
|
|
|
|
- fileStatus = queryFileCache().fileUpToDate(f, fileType, fileSize, fileDate, crc, sourceName, isCompressed);
|
|
|
- if (fileStatus == FileIsValid)
|
|
|
+ fileStatus = queryFileCache().fileUpToDate(f, fileSize, fileDate, crc, isCompressed);
|
|
|
+ if (fileStatus == FileIsValid)
|
|
|
+ {
|
|
|
+ if (isCompressed)
|
|
|
+ current.setown(createCompressedFileReader(f));
|
|
|
+ else
|
|
|
+ current.setown(f->open(IFOread));
|
|
|
+ if (current)
|
|
|
{
|
|
|
- if (isCompressed)
|
|
|
- current.setown(createCompressedFileReader(f));
|
|
|
- else
|
|
|
- current.setown(f->open(IFOread));
|
|
|
- if (current)
|
|
|
- {
|
|
|
- if (traceLevel > 5)
|
|
|
- DBGLOG("Opening %s", sourceName);
|
|
|
- disconnectRemoteIoOnExit(current);
|
|
|
- break;
|
|
|
- }
|
|
|
- // throwUnexpected(); - try another location if this one has the wrong version of the file
|
|
|
+ if (traceLevel > 5)
|
|
|
+ DBGLOG("Opening %s", sourceName);
|
|
|
+ disconnectRemoteIoOnExit(current);
|
|
|
+ break;
|
|
|
}
|
|
|
- disconnectRemoteFile(f);
|
|
|
+ // throwUnexpected(); - try another location if this one has the wrong version of the file
|
|
|
}
|
|
|
+ disconnectRemoteFile(f);
|
|
|
}
|
|
|
catch (IException *E)
|
|
|
{
|
|
@@ -368,32 +354,6 @@ public:
|
|
|
return current->size();
|
|
|
}
|
|
|
|
|
|
- virtual void setBaseIndexFileName(const char *val)
|
|
|
- {
|
|
|
- CriticalBlock b(crit);
|
|
|
- baseIndexFileName.append(val);
|
|
|
- }
|
|
|
-
|
|
|
- virtual const char *queryBaseIndexFileName()
|
|
|
- {
|
|
|
- CriticalBlock b(crit);
|
|
|
- if (baseIndexFileName.length())
|
|
|
- return baseIndexFileName.str();
|
|
|
- return NULL;
|
|
|
- }
|
|
|
-
|
|
|
- virtual void setPatchFile(ILazyFileIO *val)
|
|
|
- {
|
|
|
- CriticalBlock b(crit);
|
|
|
- patchFile.setown(LINK(val));
|
|
|
- }
|
|
|
-
|
|
|
- virtual ILazyFileIO *queryPatchFile()
|
|
|
- {
|
|
|
- CriticalBlock b(crit);
|
|
|
- return patchFile;
|
|
|
- }
|
|
|
-
|
|
|
virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
|
|
|
virtual void setSize(offset_t size) { throwUnexpected(); }
|
|
|
virtual offset_t appendFile(IFile *file,offset_t pos,offset_t len) { throwUnexpected(); return 0; }
|
|
@@ -401,7 +361,6 @@ public:
|
|
|
virtual const char *queryFilename() { return logical->queryFilename(); }
|
|
|
virtual bool isAlive() const { return CInterface::isAlive(); }
|
|
|
virtual int getLinkCount() const { return CInterface::getLinkCount(); }
|
|
|
- virtual RoxieFileType getFileType() { return fileType; }
|
|
|
|
|
|
virtual IMemoryMappedFile *queryMappedFile()
|
|
|
{
|
|
@@ -436,7 +395,7 @@ public:
|
|
|
filesTried.appendf(" %s", sourceName);
|
|
|
try
|
|
|
{
|
|
|
- if (queryFileCache().fileUpToDate(&sources.item(currentIdx), fileType, fileSize, fileDate, crc, sourceName, isCompressed) == FileIsValid)
|
|
|
+ if (queryFileCache().fileUpToDate(&sources.item(currentIdx), fileSize, fileDate, crc, isCompressed) == FileIsValid)
|
|
|
{
|
|
|
StringBuffer source_drive;
|
|
|
splitFilename(sourceName, &source_drive, NULL, NULL, NULL);
|
|
@@ -511,8 +470,59 @@ public:
|
|
|
|
|
|
//----------------------------------------------------------------------------------------------
|
|
|
|
|
|
+static IPartDescriptor *queryMatchingRemotePart(IPartDescriptor *pdesc, IFileDescriptor *remoteFDesc, unsigned int partNum)
|
|
|
+{
|
|
|
+ if (!remoteFDesc)
|
|
|
+ return NULL;
|
|
|
+ IPartDescriptor *remotePDesc = remoteFDesc->queryPart(partNum);
|
|
|
+ if (!remotePDesc)
|
|
|
+ return NULL;
|
|
|
+ unsigned int crc, remoteCrc;
|
|
|
+ if (!pdesc || !pdesc->getCrc(crc)) //local crc not available, never DFS copied?
|
|
|
+ return remotePDesc;
|
|
|
+ if (remotePDesc->getCrc(remoteCrc) && remoteCrc==crc)
|
|
|
+ return remotePDesc;
|
|
|
+ return NULL;
|
|
|
+}
|
|
|
+
|
|
|
+static bool isCopyFromCluster(IPartDescriptor *pdesc, unsigned clusterNo, const char *process)
|
|
|
+{
|
|
|
+ StringBuffer s;
|
|
|
+ return strieq(process, pdesc->queryOwner().getClusterGroupName(clusterNo, s));
|
|
|
+}
|
|
|
+
|
|
|
+static bool checkClusterCount(UnsignedArray &counts, unsigned clusterNo, unsigned max)
|
|
|
+{
|
|
|
+ while (!counts.isItem(clusterNo))
|
|
|
+ counts.append(0);
|
|
|
+ unsigned count = counts.item(clusterNo);
|
|
|
+ if (count>=max)
|
|
|
+ return false;
|
|
|
+ counts.replace(++count, clusterNo);
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+static void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, bool checkSelf)
|
|
|
+{
|
|
|
+ UnsignedArray clusterCounts;
|
|
|
+ unsigned numCopies = pdesc->numCopies();
|
|
|
+ for (unsigned copy = 0; copy < numCopies; copy++)
|
|
|
+ {
|
|
|
+ unsigned clusterNo = pdesc->copyClusterNum(copy);
|
|
|
+ if (!checkClusterCount(clusterCounts, clusterNo, 2))
|
|
|
+ continue;
|
|
|
+ if (checkSelf && isCopyFromCluster(pdesc, clusterNo, roxieName.str())) //don't add ourself
|
|
|
+ continue;
|
|
|
+ RemoteFilename r;
|
|
|
+ pdesc->getFilename(copy,r);
|
|
|
+ StringBuffer path;
|
|
|
+ locations.append(r.getRemotePath(path).str());
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+//----------------------------------------------------------------------------------------------
|
|
|
+
|
|
|
typedef StringArray *StringArrayPtr;
|
|
|
-typedef MapStringTo<StringArrayPtr> MapStringToDiffFileUsage;
|
|
|
|
|
|
class CRoxieFileCache : public CInterface, implements ICopyFileProgress, implements IRoxieFileCache
|
|
|
{
|
|
@@ -532,8 +542,7 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
|
|
|
Semaphore bctStarted;
|
|
|
Semaphore hctStarted;
|
|
|
|
|
|
-
|
|
|
- RoxieFileStatus fileUpToDate(IFile *f, RoxieFileType fileType, offset_t size, const CDateTime &modified, unsigned crc, const char* id, bool isCompressed)
|
|
|
+ RoxieFileStatus fileUpToDate(IFile *f, offset_t size, const CDateTime &modified, unsigned crc, bool isCompressed)
|
|
|
{
|
|
|
if (f->exists())
|
|
|
{
|
|
@@ -543,8 +552,8 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
|
|
|
|
|
|
if (crc > 0)
|
|
|
{
|
|
|
- // if a crc is specified lets check it
|
|
|
- unsigned file_crc = getFileCRC(id);
|
|
|
+ // if a crc is specified let's check it
|
|
|
+ unsigned file_crc = f->getCRC();
|
|
|
if (file_crc && crc != file_crc) // for remote files crc_file can fail, even if the file is valid
|
|
|
{
|
|
|
DBGLOG("FAILED CRC Check");
|
|
@@ -558,52 +567,42 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
|
|
|
return FileNotFound;
|
|
|
}
|
|
|
|
|
|
- ILazyFileIO *openFile(const char *id, unsigned partNo, RoxieFileType fileType, const char *localLocation, const StringArray &peerRoxieCopiedLocationInfo, const StringArray &remoteLocationInfo, offset_t size, const CDateTime &modified, bool memFile, unsigned crc, bool isCompressed)
|
|
|
+ ILazyFileIO *openFile(const char *lfn, unsigned partNo, const char *localLocation,
|
|
|
+ IPartDescriptor *pdesc,
|
|
|
+ const StringArray &remoteLocationInfo,
|
|
|
+ offset_t size, const CDateTime &modified, unsigned crc)
|
|
|
{
|
|
|
Owned<IFile> local = createIFile(localLocation);
|
|
|
-
|
|
|
- Owned<CLazyFileIO> ret = new CLazyFileIO(id, fileType, local.getLink(), size, modified, memFile, crc, isCompressed);
|
|
|
- RoxieFileStatus fileStatus = fileUpToDate(local, fileType, size, modified, crc, localLocation, isCompressed);
|
|
|
+ bool isCompressed = pdesc->queryOwner().isCompressed();
|
|
|
+ Owned<CLazyFileIO> ret = new CLazyFileIO(local.getLink(), size, modified, crc, isCompressed);
|
|
|
+ RoxieFileStatus fileStatus = fileUpToDate(local, size, modified, crc, isCompressed);
|
|
|
if (fileStatus == FileIsValid)
|
|
|
{
|
|
|
ret->addSource(local.getLink());
|
|
|
ret->setRemote(false);
|
|
|
}
|
|
|
- else if (copyResources || useRemoteResources)
|
|
|
+ else if (local->exists()) // Implies local dali and local file out of sync
|
|
|
+ throw MakeStringException(ROXIE_FILE_ERROR, "File does not match DFS information");
|
|
|
+ else
|
|
|
{
|
|
|
- if (local->exists())
|
|
|
- {
|
|
|
- StringBuffer errStatus;
|
|
|
- switch (fileStatus)
|
|
|
- {
|
|
|
- case FileSizeMismatch:
|
|
|
- errStatus.append("FileSizeMismatch");
|
|
|
- break;
|
|
|
-
|
|
|
- case FileCRCMismatch:
|
|
|
- errStatus.append("FileCRCMismatch");
|
|
|
- break;
|
|
|
-
|
|
|
- case FileDateMismatch:
|
|
|
- errStatus.append("FileDateMismatch");
|
|
|
- break;
|
|
|
- }
|
|
|
- DBGLOG("Removing local file - %s because %s", localLocation, errStatus.str());
|
|
|
- local->remove();
|
|
|
- }
|
|
|
bool addedOne = false;
|
|
|
|
|
|
// put the peerRoxieLocations next in the list
|
|
|
- ForEachItemIn(roxie_idx, peerRoxieCopiedLocationInfo)
|
|
|
+ StringArray localLocations;
|
|
|
+ appendRemoteLocations(pdesc, localLocations, true);
|
|
|
+ ForEachItemIn(roxie_idx, localLocations)
|
|
|
{
|
|
|
try
|
|
|
{
|
|
|
- const char *remoteName = peerRoxieCopiedLocationInfo.item(roxie_idx);
|
|
|
- if (miscDebugTraceLevel > 10)
|
|
|
- DBGLOG("adding peer roxie location %s", remoteName);
|
|
|
-
|
|
|
- ret->addSource(createIFile(remoteName));
|
|
|
- addedOne = true;
|
|
|
+ const char *remoteName = localLocations.item(roxie_idx);
|
|
|
+ Owned<IFile> remote = createIFile(remoteName);
|
|
|
+ if (fileUpToDate(remote, size, modified, crc, isCompressed)==FileIsValid)
|
|
|
+ {
|
|
|
+ if (miscDebugTraceLevel > 10)
|
|
|
+ DBGLOG("adding peer roxie location %s", remoteName);
|
|
|
+ ret->addSource(remote.getClear());
|
|
|
+ addedOne = true;
|
|
|
+ }
|
|
|
}
|
|
|
catch (IException *E)
|
|
|
{
|
|
@@ -612,110 +611,39 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- ForEachItemIn(idx, remoteLocationInfo)
|
|
|
+ if (!addedOne && (copyResources || useRemoteResources)) // If no peer locations available, go to remote
|
|
|
{
|
|
|
- try
|
|
|
- {
|
|
|
- const char *remoteName = remoteLocationInfo.item(idx);
|
|
|
- if (miscDebugTraceLevel > 10)
|
|
|
- DBGLOG("adding remote location %s", remoteName);
|
|
|
-
|
|
|
- ret->addSource(createIFile(remoteName));
|
|
|
- addedOne = true;
|
|
|
- }
|
|
|
- catch (IException *E)
|
|
|
+ ForEachItemIn(idx, remoteLocationInfo)
|
|
|
{
|
|
|
- EXCLOG(MCoperatorError, E, "While creating remote file reference");
|
|
|
- E->Release();
|
|
|
+ try
|
|
|
+ {
|
|
|
+ const char *remoteName = remoteLocationInfo.item(idx);
|
|
|
+ Owned<IFile> remote = createIFile(remoteName);
|
|
|
+ if (fileUpToDate(remote, size, modified, crc, isCompressed)==FileIsValid)
|
|
|
+ {
|
|
|
+ if (miscDebugTraceLevel > 10)
|
|
|
+ DBGLOG("adding remote location %s", remoteName);
|
|
|
+ ret->addSource(remote.getClear());
|
|
|
+ addedOne = true;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ catch (IException *E)
|
|
|
+ {
|
|
|
+ EXCLOG(MCoperatorError, E, "While creating remote file reference");
|
|
|
+ E->Release();
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- if (!addedOne)
|
|
|
- {
|
|
|
- StringBuffer remoteLocs;
|
|
|
-
|
|
|
-// ForEachItemIn(roxie_idx, peerRoxieCopiedLocationInfo)
|
|
|
-// remoteLocs.appendf("%s ", peerRoxieCopiedLocationInfo.item(roxie_idx)); // all remote locations that were checked
|
|
|
|
|
|
- ForEachItemIn(idx2, remoteLocationInfo)
|
|
|
- remoteLocs.appendf("%s ", remoteLocationInfo.item(idx2)); // all remote locations that were checked
|
|
|
- throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s at any remote location - %s", localLocation, remoteLocs.str());
|
|
|
- }
|
|
|
+ if (!addedOne)
|
|
|
+ throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s", localLocation);
|
|
|
ret->setRemote(true);
|
|
|
}
|
|
|
- else
|
|
|
- throw MakeStringException(ROXIE_FILE_OPEN_FAIL, "Could not open file %s", localLocation);
|
|
|
ret->setCache(this);
|
|
|
files.setValue(localLocation, (ILazyFileIO *)ret);
|
|
|
return ret.getClear();
|
|
|
}
|
|
|
|
|
|
- bool doCreateFromPatch(ILazyFileIO *targetFile, const char *baseIndexFilename, ILazyFileIO *patchFile, const char *targetFilename, const char *destPath)
|
|
|
- {
|
|
|
- if (!enableKeyDiff)
|
|
|
- return false; // feature disabled in roxietopology
|
|
|
-
|
|
|
- bool fileCopied = false;
|
|
|
- IFile *patch_sourceFile;
|
|
|
- try
|
|
|
- {
|
|
|
- // MORE - sort out when to disallow closes and of what
|
|
|
- patch_sourceFile = patchFile->querySource();
|
|
|
- }
|
|
|
- catch (IException *E)
|
|
|
- {
|
|
|
- EXCLOG(MCoperatorError, E, "While trying to open patch file");
|
|
|
- throw;
|
|
|
- }
|
|
|
-
|
|
|
- unsigned __int64 freeDiskSpace = getFreeSpace(destPath);
|
|
|
- if ( (targetFile->size() + minFreeDiskSpace) > freeDiskSpace)
|
|
|
- {
|
|
|
- StringBuffer err;
|
|
|
- err.appendf("Insufficient disk space. File %s needs %"I64F"d bytes, but only %"I64F"d remains, and %"I64F"d is needed as a reserve", targetFilename, targetFile->size(), freeDiskSpace, minFreeDiskSpace );
|
|
|
- IException *E = MakeStringException(ROXIE_DISKSPACE_ERROR, "%s", err.str());
|
|
|
- EXCLOG(MCoperatorError, E);
|
|
|
- E->Release();
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- try
|
|
|
- {
|
|
|
- MTimeSection timing(NULL, "createKeyDiff");
|
|
|
- Owned<IKeyDiffApplicator> patchApplicator;
|
|
|
- const char *patchFilename = patch_sourceFile->queryFilename();
|
|
|
-
|
|
|
- DBGLOG("***** Using KeyDiff to create %s", targetFilename);
|
|
|
- patchApplicator.setown(createKeyDiffApplicator(patchFilename, baseIndexFilename, targetFilename, NULL, true, true));
|
|
|
- patchApplicator->run();
|
|
|
- patchApplicator.clear();
|
|
|
-
|
|
|
- // need to update time stamp in roxiestate file
|
|
|
- Owned<IFile> tmp_file = createIFile(targetFilename);
|
|
|
- IFile* remote_sourceFile = targetFile->querySource();
|
|
|
- CDateTime dt1, dt2, dt3;
|
|
|
- remote_sourceFile->getTime(&dt1, &dt2, &dt3);
|
|
|
- tmp_file->setTime(&dt1, &dt2, &dt3);
|
|
|
- }
|
|
|
- catch(IException *E)
|
|
|
- {
|
|
|
- EXCLOG(E, "Create PatchFile exception");
|
|
|
- E->Release();
|
|
|
- return false;
|
|
|
- //throw; - do not treat as fatal
|
|
|
- }
|
|
|
- if (needToDeleteFile)
|
|
|
- {
|
|
|
- DBGLOG("creating of data file %s stopped since query has been deleted", targetFilename);
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- targetFile->copyComplete();
|
|
|
- fileCopied = true;
|
|
|
- }
|
|
|
- }
|
|
|
- return fileCopied;
|
|
|
- }
|
|
|
-
|
|
|
void deleteTempFiles(const char *targetFilename)
|
|
|
{
|
|
|
try
|
|
@@ -887,13 +815,6 @@ class CRoxieFileCache : public CInterface, implements ICopyFileProgress, impleme
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- const char *baseIndexFilename = f->queryBaseIndexFileName();
|
|
|
- ILazyFileIO* patchFile = f->queryPatchFile();
|
|
|
-
|
|
|
- if (baseIndexFilename && patchFile)
|
|
|
- if (doCreateFromPatch(f, baseIndexFilename, patchFile, targetFilename, destPath.str()))
|
|
|
- return true;
|
|
|
-
|
|
|
tempFile.append(".$$$");
|
|
|
const char *msg = background ? "Background copy" : "Copy";
|
|
|
return doCopyFile(f, tempFile.str(), targetFilename, destPath.str(), msg);
|
|
@@ -1120,8 +1041,34 @@ public:
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- virtual ILazyFileIO *lookupFile(const char *id, unsigned partNo, RoxieFileType fileType, const char *localLocation, const char *baseIndexFileName, ILazyFileIO *patchFile, const StringArray &peerRoxieCopiedLocationInfo, const StringArray &deployedLocationInfo, offset_t size, const CDateTime &modified, bool memFile, bool isRemote, bool startFileCopy, bool doForegroundCopy, unsigned crc, bool isCompressed, const char *lookupDali)
|
|
|
+ virtual ILazyFileIO *lookupFile(const char *lfn, RoxieFileType fileType,
|
|
|
+ IPartDescriptor *pdesc, unsigned numParts,
|
|
|
+ const StringArray &deployedLocationInfo, bool startFileCopy)
|
|
|
{
|
|
|
+ IPropertyTree &partProps = pdesc->queryProperties();
|
|
|
+ offset_t dfsSize = partProps.getPropInt64("@size", -1);
|
|
|
+ unsigned crc;
|
|
|
+ if (!pdesc->getCrc(crc))
|
|
|
+ crc = 0;
|
|
|
+ CDateTime dfsDate;
|
|
|
+ if (checkFileDate)
|
|
|
+ {
|
|
|
+ const char *dateStr = partProps.queryProp("@modified");
|
|
|
+ dfsDate.setString(dateStr);
|
|
|
+ }
|
|
|
+
|
|
|
+ unsigned partNo = pdesc->queryPartIndex() + 1;
|
|
|
+ StringBuffer localLocation;
|
|
|
+
|
|
|
+ // MORE - not at all sure about this. Foreign files should stay foreign ?
|
|
|
+ CDfsLogicalFileName dlfn;
|
|
|
+ dlfn.set(lfn);
|
|
|
+ if (dlfn.isForeign())
|
|
|
+ dlfn.clearForeign();
|
|
|
+ const char *logicalname = dlfn.get();
|
|
|
+
|
|
|
+ makePhysicalPartName(logicalname, partNo, numParts, localLocation, false, DFD_OSdefault, baseDataDirectory); // MORE - if we get the dataDirectory we can pass it in and possibly reuse an existing file
|
|
|
+
|
|
|
Owned<ILazyFileIO> ret;
|
|
|
try
|
|
|
{
|
|
@@ -1129,48 +1076,43 @@ public:
|
|
|
Linked<ILazyFileIO> f = files.getValue(localLocation);
|
|
|
if (f && f->isAlive())
|
|
|
{
|
|
|
- if ((size != -1 && size != f->getSize()) ||
|
|
|
- (!modified.isNull() && !modified.equals(*f->queryDateTime(), false)))
|
|
|
+ if ((dfsSize != (offset_t) -1 && dfsSize != f->getSize()) ||
|
|
|
+ (!dfsDate.isNull() && !dfsDate.equals(*f->queryDateTime(), false)))
|
|
|
{
|
|
|
StringBuffer modifiedDt;
|
|
|
- if (!modified.isNull())
|
|
|
- modified.getString(modifiedDt);
|
|
|
+ if (!dfsDate.isNull())
|
|
|
+ dfsDate.getString(modifiedDt);
|
|
|
StringBuffer fileDt;
|
|
|
f->queryDateTime()->getString(fileDt);
|
|
|
- if (fileErrorList.find(id) == 0)
|
|
|
+ if (fileErrorList.find(lfn) == 0)
|
|
|
{
|
|
|
switch (fileType)
|
|
|
{
|
|
|
case ROXIE_KEY:
|
|
|
- fileErrorList.setValue(id, "Key");
|
|
|
+ fileErrorList.setValue(lfn, "Key");
|
|
|
break;
|
|
|
|
|
|
case ROXIE_FILE:
|
|
|
- fileErrorList.setValue(id, "File");
|
|
|
+ fileErrorList.setValue(lfn, "File");
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
|
- throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %"I64F"d %"I64F"d Date = %s %s", id, size, f->getSize(), modifiedDt.str(), fileDt.str());
|
|
|
+ throw MakeStringException(ROXIE_MISMATCH, "Different version of %s already loaded: sizes = %"I64F"d %"I64F"d Date = %s %s", lfn, dfsSize, f->getSize(), modifiedDt.str(), fileDt.str());
|
|
|
}
|
|
|
else
|
|
|
return f.getClear();
|
|
|
}
|
|
|
|
|
|
- ret.setown(openFile(id, partNo, fileType, localLocation, peerRoxieCopiedLocationInfo, deployedLocationInfo, size, modified, memFile, crc, isCompressed)); // for now don't check crcs
|
|
|
-
|
|
|
- if (baseIndexFileName)
|
|
|
- ret->setBaseIndexFileName(baseIndexFileName);
|
|
|
- if (patchFile)
|
|
|
- ret->setPatchFile(patchFile);
|
|
|
+ ret.setown(openFile(lfn, partNo, localLocation, pdesc, deployedLocationInfo, dfsSize, dfsDate, crc));
|
|
|
|
|
|
if (startFileCopy)
|
|
|
{
|
|
|
if (ret->isRemote())
|
|
|
{
|
|
|
- if (copyResources || memFile)
|
|
|
+ if (copyResources) // MORE - should always copy peer files
|
|
|
{
|
|
|
needToDeleteFile = false;
|
|
|
- if (doForegroundCopy)
|
|
|
+ if (numParts==1 || (partNo==numParts && fileType==ROXIE_KEY))
|
|
|
{
|
|
|
ret->checkOpen();
|
|
|
doCopy(ret, false, false);
|
|
@@ -1183,31 +1125,25 @@ public:
|
|
|
|
|
|
}
|
|
|
}
|
|
|
- else if (isRemote)
|
|
|
- {
|
|
|
-// todo.append(*ret); // don't really need to copy. But do need to send message that copy happened (async)
|
|
|
-// atomic_inc(&numFilesToProcess); // must increment counter for SNMP accuracy
|
|
|
-// toCopy.signal();
|
|
|
- }
|
|
|
}
|
|
|
|
|
|
- if (!lazyOpen || fileType == ROXIE_PATCH) // patch file MUST be open at this point - make sure we open it
|
|
|
+ if (!lazyOpen)
|
|
|
ret->checkOpen();
|
|
|
}
|
|
|
catch(IException *e)
|
|
|
{
|
|
|
if (e->errorCode() == ROXIE_FILE_OPEN_FAIL)
|
|
|
{
|
|
|
- if (fileErrorList.find(id) == 0)
|
|
|
+ if (fileErrorList.find(lfn) == 0)
|
|
|
{
|
|
|
switch (fileType)
|
|
|
{
|
|
|
case ROXIE_KEY:
|
|
|
- fileErrorList.setValue(id, "Key");
|
|
|
+ fileErrorList.setValue(lfn, "Key");
|
|
|
break;
|
|
|
|
|
|
case ROXIE_FILE:
|
|
|
- fileErrorList.setValue(id, "File");
|
|
|
+ fileErrorList.setValue(lfn, "File");
|
|
|
break;
|
|
|
}
|
|
|
}
|
|
@@ -1342,91 +1278,13 @@ public:
|
|
|
}
|
|
|
};
|
|
|
|
|
|
-IPartDescriptor *queryMatchingRemotePart(IPartDescriptor *pdesc, IFileDescriptor *remoteFDesc, unsigned int partNum)
|
|
|
-{
|
|
|
- if (!remoteFDesc)
|
|
|
- return NULL;
|
|
|
- IPartDescriptor *remotePDesc = remoteFDesc->queryPart(partNum);
|
|
|
- if (!remotePDesc)
|
|
|
- return NULL;
|
|
|
- unsigned int crc, remoteCrc;
|
|
|
- if (!pdesc || !pdesc->getCrc(crc)) //local crc not available, never DFS copied?
|
|
|
- return remotePDesc;
|
|
|
- if (remotePDesc->getCrc(remoteCrc) && remoteCrc==crc)
|
|
|
- return remotePDesc;
|
|
|
- return NULL;
|
|
|
-}
|
|
|
-
|
|
|
-inline bool isCopyFromCluster(IPartDescriptor *pdesc, unsigned clusterNo, const char *process)
|
|
|
-{
|
|
|
- StringBuffer s;
|
|
|
- return strieq(process, pdesc->queryOwner().getClusterGroupName(clusterNo, s));
|
|
|
-}
|
|
|
-
|
|
|
-inline bool checkClusterCount(UnsignedArray &counts, unsigned clusterNo, unsigned max)
|
|
|
-{
|
|
|
- while (!counts.isItem(clusterNo))
|
|
|
- counts.append(0);
|
|
|
- unsigned count = counts.item(clusterNo);
|
|
|
- if (count>=max)
|
|
|
- return false;
|
|
|
- counts.replace(++count, clusterNo);
|
|
|
- return true;
|
|
|
-}
|
|
|
-
|
|
|
-inline void appendRemoteLocations(IPartDescriptor *pdesc, StringArray &locations, bool checkSelf)
|
|
|
-{
|
|
|
- UnsignedArray clusterCounts;
|
|
|
- unsigned numCopies = pdesc->numCopies();
|
|
|
- for (unsigned copy = 0; copy < numCopies; copy++)
|
|
|
- {
|
|
|
- unsigned clusterNo = pdesc->copyClusterNum(copy);
|
|
|
- if (!checkClusterCount(clusterCounts, clusterNo, 2))
|
|
|
- continue;
|
|
|
- if (checkSelf && isCopyFromCluster(pdesc, clusterNo, roxieName.str())) //don't add ourself
|
|
|
- continue;
|
|
|
- RemoteFilename r;
|
|
|
- pdesc->getFilename(copy,r);
|
|
|
- StringBuffer path;
|
|
|
- locations.append(r.getRemotePath(path).str());
|
|
|
- }
|
|
|
-}
|
|
|
-
|
|
|
ILazyFileIO *createDynamicFile(const char *id, IPartDescriptor *pdesc, IPartDescriptor *remotePDesc, RoxieFileType fileType, int numParts, bool startCopy)
|
|
|
{
|
|
|
- IPropertyTree &partProps = pdesc->queryProperties();
|
|
|
- offset_t dfsSize = partProps.getPropInt64("@size");
|
|
|
- unsigned crc;
|
|
|
- if (!pdesc->getCrc(crc))
|
|
|
- crc = 0;
|
|
|
- CDateTime fileDate;
|
|
|
- if (checkFileDate)
|
|
|
- {
|
|
|
- const char *dateStr = partProps.queryProp("@modified");
|
|
|
- fileDate.setString(dateStr);
|
|
|
- }
|
|
|
-
|
|
|
- StringArray localLocations;
|
|
|
StringArray remoteLocations;
|
|
|
-
|
|
|
- unsigned partNo = pdesc->queryPartIndex() + 1;
|
|
|
- StringBuffer localFileName;
|
|
|
-
|
|
|
- CDfsLogicalFileName dlfn;
|
|
|
- dlfn.set(id);
|
|
|
- if (dlfn.isForeign())
|
|
|
- dlfn.clearForeign();
|
|
|
-
|
|
|
- const char *logicalname = dlfn.get();
|
|
|
-
|
|
|
- makePhysicalPartName(logicalname, partNo, numParts, localFileName, false, DFD_OSdefault, baseDataDirectory); // MORE - if we get the dataDirectory we can pass it in and possibly reuse an existing file
|
|
|
-
|
|
|
- appendRemoteLocations(pdesc, remoteLocations, true);
|
|
|
if (remotePDesc)
|
|
|
appendRemoteLocations(remotePDesc, remoteLocations, false);
|
|
|
|
|
|
- bool foregroundCopy = numParts==1 || (partNo==numParts && fileType==ROXIE_KEY);
|
|
|
- return queryFileCache().lookupFile(id, partNo, fileType, localFileName, NULL, NULL, localLocations, remoteLocations, dfsSize, fileDate, false, true, startCopy, foregroundCopy, crcResources ? crc : 0, pdesc->queryOwner().isCompressed(), NULL);
|
|
|
+ return queryFileCache().lookupFile(id, fileType, pdesc, numParts, remoteLocations, startCopy);
|
|
|
}
|
|
|
|
|
|
//====================================================================================================
|
|
@@ -1779,7 +1637,7 @@ public:
|
|
|
Owned<IFileDescriptor> fDesc = sub.getFileDescriptor();
|
|
|
Owned<IFileDescriptor> remoteFDesc;
|
|
|
if (daliHelper)
|
|
|
- remoteFDesc.setown(daliHelper->checkClonedFromRemote(sub.queryLogicalName(), fDesc, cacheIt, writeAccess));
|
|
|
+ remoteFDesc.setown(daliHelper->checkClonedFromRemote(sub.queryLogicalName(), fDesc, cacheIt));
|
|
|
addFile(sub.queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
|
|
|
}
|
|
|
}
|
|
@@ -1788,7 +1646,7 @@ public:
|
|
|
Owned<IFileDescriptor> fDesc = dFile->getFileDescriptor();
|
|
|
Owned<IFileDescriptor> remoteFDesc;
|
|
|
if (daliHelper)
|
|
|
- remoteFDesc.setown(daliHelper->checkClonedFromRemote(_lfn, fDesc, cacheIt, writeAccess));
|
|
|
+ remoteFDesc.setown(daliHelper->checkClonedFromRemote(_lfn, fDesc, cacheIt));
|
|
|
addFile(dFile->queryLogicalName(), fDesc.getClear(), remoteFDesc.getClear());
|
|
|
}
|
|
|
bool tsSet = dFile->getModificationTime(fileTimeStamp);
|
|
@@ -2400,151 +2258,6 @@ extern void releaseSlaveDynamicFileCache()
|
|
|
slaveDynamicFileCache.clear();
|
|
|
}
|
|
|
|
|
|
-class CDiffFileInfoCache : public CInterface, implements IDiffFileInfoCache
|
|
|
-{
|
|
|
- CriticalSection crit;
|
|
|
- MapStringToDiffFileUsage diffFileInfoMap; // store all diff / patch file location info - even if not used
|
|
|
-
|
|
|
-public:
|
|
|
- IMPLEMENT_IINTERFACE;
|
|
|
-
|
|
|
- CDiffFileInfoCache()
|
|
|
- {
|
|
|
- }
|
|
|
-
|
|
|
- ~CDiffFileInfoCache()
|
|
|
- {
|
|
|
- HashIterator info(diffFileInfoMap);
|
|
|
- for(info.first();info.isValid();info.next())
|
|
|
- {
|
|
|
- StringArray *a = *diffFileInfoMap.mapToValue(&info.query());
|
|
|
- delete a;
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- virtual void saveDiffFileLocationInfo(const char *id, const StringArray &locations)
|
|
|
- {
|
|
|
- CriticalBlock b(crit);
|
|
|
-
|
|
|
- StringArray *diffNames = 0;
|
|
|
- StringArrayPtr *diffs = diffFileInfoMap.getValue(id);
|
|
|
- if (diffs)
|
|
|
- diffNames = *diffs;
|
|
|
- else
|
|
|
- {
|
|
|
- diffNames = new StringArray;
|
|
|
- diffFileInfoMap.setValue(id, diffNames);
|
|
|
- }
|
|
|
-
|
|
|
- ForEachItemIn(idx, locations)
|
|
|
- diffNames->append(locations.item(idx));
|
|
|
- }
|
|
|
-
|
|
|
- virtual void saveDiffFileLocationInfo(const char *id, const char *location)
|
|
|
- {
|
|
|
- CriticalBlock b(crit);
|
|
|
-
|
|
|
- StringArray *diffNames = 0;
|
|
|
- StringArrayPtr *diffs = diffFileInfoMap.getValue(id);
|
|
|
- if (diffs)
|
|
|
- {
|
|
|
- diffNames = *diffs;
|
|
|
- }
|
|
|
- else
|
|
|
- {
|
|
|
- diffNames = new StringArray;
|
|
|
- diffFileInfoMap.setValue(id, diffNames);
|
|
|
- }
|
|
|
-
|
|
|
- diffNames->append(location);
|
|
|
- }
|
|
|
-
|
|
|
- virtual const char *queryDiffFileNames(StringBuffer &names)
|
|
|
- {
|
|
|
- names.append("<DiffFileNames>");
|
|
|
- HashIterator diffs_iter(diffFileInfoMap);
|
|
|
- for(diffs_iter.first();diffs_iter.isValid();diffs_iter.next())
|
|
|
- {
|
|
|
- IMapping &cur = diffs_iter.query();
|
|
|
- const char *name = (const char *) cur.getKey();
|
|
|
- names.appendf("<name>%s</name>", name);
|
|
|
- }
|
|
|
-
|
|
|
- names.append("</DiffFileNames>");
|
|
|
- return names.str();
|
|
|
- }
|
|
|
-
|
|
|
- virtual void deleteDiffFiles(IPropertyTree *tree, IPropertyTree *goers)
|
|
|
- {
|
|
|
- Owned<IPropertyTreeIterator> diffFiles = tree->getElements("Patch");
|
|
|
-
|
|
|
- ForEach(*diffFiles)
|
|
|
- {
|
|
|
- IPropertyTree &item = diffFiles->query();
|
|
|
- StringBuffer id(item.queryProp("@id"));
|
|
|
- StringArray **a = diffFileInfoMap.getValue(id.str());
|
|
|
-
|
|
|
- if (!a)
|
|
|
- {
|
|
|
- if (id[0] == '~')
|
|
|
- id.remove(0,1);
|
|
|
- else
|
|
|
- id.insert(0,'~');
|
|
|
-
|
|
|
- a = diffFileInfoMap.getValue(id.str());
|
|
|
- }
|
|
|
-
|
|
|
- if (a)
|
|
|
- {
|
|
|
- ForEachItemIn(idx, **a)
|
|
|
- {
|
|
|
- const char *name = (*a)->item(idx);
|
|
|
- try
|
|
|
- {
|
|
|
- OwnedIFile unneededFile = createIFile(name);
|
|
|
- unneededFile->remove();
|
|
|
- DBGLOG("deleted key diff file %s", name);
|
|
|
- }
|
|
|
- catch (IException *E)
|
|
|
- {
|
|
|
- // we don't care if there was an error - the file may not exist
|
|
|
- E->Release();
|
|
|
- }
|
|
|
- }
|
|
|
- // add Patch name to delete delta state file info
|
|
|
- IPropertyTree *goer = createPTree("Patch");
|
|
|
- goer->setProp("@id", id);
|
|
|
- goer->setProp("@mode", "delete");
|
|
|
- goers->addPropTree("Patch", goer);
|
|
|
-
|
|
|
- item.setProp("@mode", "delete");
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-};
|
|
|
-
|
|
|
-
|
|
|
-static CriticalSection diffFileInfoCacheCrit;
|
|
|
-static Owned<IDiffFileInfoCache> diffFileInfoCache;
|
|
|
-
|
|
|
-extern IDiffFileInfoCache *queryDiffFileInfoCache()
|
|
|
-{
|
|
|
- if (!diffFileInfoCache)
|
|
|
- {
|
|
|
- CriticalBlock b(diffFileInfoCacheCrit);
|
|
|
- if (!diffFileInfoCache)
|
|
|
- diffFileInfoCache.setown(new CDiffFileInfoCache());
|
|
|
- }
|
|
|
- return diffFileInfoCache;
|
|
|
-}
|
|
|
-
|
|
|
-extern void releaseDiffFileInfoCache()
|
|
|
-{
|
|
|
- CriticalBlock b(diffFileInfoCacheCrit);
|
|
|
- diffFileInfoCache.clear();
|
|
|
-}
|
|
|
-
|
|
|
|
|
|
// Initialization/termination
|
|
|
MODULE_INIT(INIT_PRIORITY_STANDARD)
|
|
@@ -2732,90 +2445,3 @@ extern IRoxieWriteHandler *createRoxieWriteHandler(IRoxieDaliHelper *_daliHelper
|
|
|
{
|
|
|
return new CRoxieWriteHandler(_daliHelper, _dFile, _clusters);
|
|
|
}
|
|
|
-
|
|
|
-#ifdef _USE_CPPUNIT
|
|
|
-#include <cppunit/extensions/HelperMacros.h>
|
|
|
-
|
|
|
-class LazyIOTest: public CppUnit::TestFixture
|
|
|
-{
|
|
|
- CPPUNIT_TEST_SUITE( LazyIOTest );
|
|
|
- CPPUNIT_TEST(testAllCases);
|
|
|
- CPPUNIT_TEST_SUITE_END();
|
|
|
-
|
|
|
- void testIt(bool localPresent, bool localCorrect)
|
|
|
- {
|
|
|
- DBGLOG("Testing localPresent=%d localCorrect=%d", localPresent, localCorrect);
|
|
|
- remove("cppfile_localfile1");
|
|
|
- FILE *x = fopen("cppfile_localfile2", "wb");
|
|
|
- assertex(x);
|
|
|
- fputs("Test", x);
|
|
|
- fclose(x);
|
|
|
- if (localPresent)
|
|
|
- {
|
|
|
- if (localCorrect)
|
|
|
- copyFile("cppfile_localfile1", "cppfile_localfile2");
|
|
|
- else
|
|
|
- {
|
|
|
- FILE *x = fopen("cppfile_localfile1", "wb");
|
|
|
- assertex(x);
|
|
|
- fputs("Pink1", x);
|
|
|
- fclose(x);
|
|
|
- }
|
|
|
- }
|
|
|
- Owned<CRoxieFileCache> cache = new CRoxieFileCache(true);
|
|
|
- StringArray remoteNames;
|
|
|
- StringArray peerNames;
|
|
|
- remoteNames.append("cppfile_localfile2");
|
|
|
- CDateTime nullDT;
|
|
|
- Owned<IFileIO> l1 = cache->lookupFile("cppfile_localfile1", 0, ROXIE_FILE, "cppfile_localfile1", NULL, NULL, peerNames, remoteNames, 4, nullDT, false, false, true, false, 0, false, NULL);
|
|
|
- Owned<IFileIO> l2 = cache->lookupFile("cppfile_localfile1", 0, ROXIE_FILE, "cppfile_localfile1", NULL, NULL, peerNames, remoteNames, 4, nullDT, false, false, true, false, 0, false, NULL);
|
|
|
- CPPUNIT_ASSERT(l1 == l2);
|
|
|
- char buf[4];
|
|
|
- l1->read(0, 4, buf);
|
|
|
- if (memcmp(buf, "Test", 4)!=0)
|
|
|
- DBGLOG("huh");
|
|
|
- CPPUNIT_ASSERT(memcmp(buf, "Test", 4)==0);
|
|
|
- cache->start();
|
|
|
- cache->wait();
|
|
|
- memset(buf, 0, 4);
|
|
|
- l1->read(0, 4, buf);
|
|
|
- CPPUNIT_ASSERT(memcmp(buf, "Test", 4)==0);
|
|
|
- l1.clear();
|
|
|
- l2.clear();
|
|
|
- cache.clear();
|
|
|
- DBGLOG("Tested localPresent=%d localCorrect=%d", localPresent, localCorrect);
|
|
|
- }
|
|
|
-
|
|
|
-protected:
|
|
|
- void testAllCases()
|
|
|
- {
|
|
|
- for (unsigned i1 = 0; i1 < 2; i1++)
|
|
|
- for (unsigned i2 = 0; i2 < 2; i2++)
|
|
|
- for (unsigned i3 = 0; i3 < 2; i3++)
|
|
|
- for (unsigned i4 = 0; i4 < 2; i4++)
|
|
|
- for (unsigned i5 = 0; i5 < 2; i5++)
|
|
|
- {
|
|
|
- useRemoteResources = i1==0;
|
|
|
- copyResources = i2==0;
|
|
|
- lazyOpen = i3==0;
|
|
|
- bool localPresent = i4==0;
|
|
|
- bool localCorrect = i5==0;
|
|
|
- try
|
|
|
- {
|
|
|
- testIt(localPresent, localCorrect);
|
|
|
- }
|
|
|
- catch (IException *E)
|
|
|
- {
|
|
|
- E->Release();
|
|
|
- CPPUNIT_ASSERT(!(localPresent && localCorrect) && !(useRemoteResources || copyResources));
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
-};
|
|
|
-
|
|
|
-CPPUNIT_TEST_SUITE_REGISTRATION( LazyIOTest );
|
|
|
-CPPUNIT_TEST_SUITE_NAMED_REGISTRATION( LazyIOTest, "LazyIOTest" );
|
|
|
-
|
|
|
-#endif
|
|
|
-
|