|
@@ -190,11 +190,10 @@ public:
|
|
{
|
|
{
|
|
try
|
|
try
|
|
{
|
|
{
|
|
- if (current.get()!=&failure)
|
|
|
|
- {
|
|
|
|
- atomic_dec(&numFilesOpen[remote]);
|
|
|
|
- mergeStats(fileStats, current);
|
|
|
|
- }
|
|
|
|
|
|
+ if (current.get()==&failure)
|
|
|
|
+ return;
|
|
|
|
+ atomic_dec(&numFilesOpen[remote]);
|
|
|
|
+ mergeStats(fileStats, current);
|
|
current.set(&failure);
|
|
current.set(&failure);
|
|
}
|
|
}
|
|
catch (IException *E)
|
|
catch (IException *E)
|
|
@@ -214,6 +213,14 @@ public:
|
|
_checkOpen();
|
|
_checkOpen();
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ IFileIO *getCheckOpen(unsigned &activeIdx)
|
|
|
|
+ {
|
|
|
|
+ CriticalBlock b(crit);
|
|
|
|
+ _checkOpen();
|
|
|
|
+ activeIdx = currentIdx;
|
|
|
|
+ return LINK(current);
|
|
|
|
+ }
|
|
|
|
+
|
|
void _checkOpen()
|
|
void _checkOpen()
|
|
{
|
|
{
|
|
if (current.get() == &failure)
|
|
if (current.get() == &failure)
|
|
@@ -312,13 +319,14 @@ public:
|
|
|
|
|
|
virtual size32_t read(offset_t pos, size32_t len, void * data)
|
|
virtual size32_t read(offset_t pos, size32_t len, void * data)
|
|
{
|
|
{
|
|
- CriticalBlock b(crit);
|
|
|
|
|
|
+ unsigned activeIdx;
|
|
|
|
+ Owned<IFileIO> active = getCheckOpen(activeIdx);
|
|
unsigned tries = 0;
|
|
unsigned tries = 0;
|
|
for (;;)
|
|
for (;;)
|
|
{
|
|
{
|
|
try
|
|
try
|
|
{
|
|
{
|
|
- size32_t ret = current->read(pos, len, data);
|
|
|
|
|
|
+ size32_t ret = active->read(pos, len, data);
|
|
lastAccess = msTick();
|
|
lastAccess = msTick();
|
|
return ret;
|
|
return ret;
|
|
|
|
|
|
@@ -331,37 +339,47 @@ public:
|
|
{
|
|
{
|
|
EXCLOG(MCoperatorError, E, "Read error");
|
|
EXCLOG(MCoperatorError, E, "Read error");
|
|
E->Release();
|
|
E->Release();
|
|
- DBGLOG("Failed to read length %d offset %" I64F "x file %s", len, pos, sources.item(currentIdx).queryFilename());
|
|
|
|
- currentIdx++;
|
|
|
|
- setFailure();
|
|
|
|
|
|
+ DBGLOG("Failed to read length %d offset %" I64F "x file %s", len, pos, sources.item(activeIdx).queryFilename());
|
|
|
|
+ {
|
|
|
|
+ CriticalBlock b(crit);
|
|
|
|
+ if (currentIdx == activeIdx)
|
|
|
|
+ {
|
|
|
|
+ currentIdx = activeIdx+1;
|
|
|
|
+ setFailure();
|
|
|
|
+ }
|
|
|
|
+ }
|
|
}
|
|
}
|
|
- _checkOpen();
|
|
|
|
|
|
+ active.setown(getCheckOpen(activeIdx));
|
|
tries++;
|
|
tries++;
|
|
if (tries == MAX_READ_RETRIES)
|
|
if (tries == MAX_READ_RETRIES)
|
|
- throw MakeStringException(ROXIE_FILE_ERROR, "Failed to read length %d offset %" I64F "x file %s after %d attempts", len, pos, sources.item(currentIdx).queryFilename(), tries);
|
|
|
|
|
|
+ throw MakeStringException(ROXIE_FILE_ERROR, "Failed to read length %d offset %" I64F "x file %s after %d attempts", len, pos, sources.item(activeIdx).queryFilename(), tries);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
virtual void flush()
|
|
virtual void flush()
|
|
{
|
|
{
|
|
- CriticalBlock b(crit);
|
|
|
|
- if (current.get() != &failure)
|
|
|
|
- current->flush();
|
|
|
|
|
|
+ Linked<IFileIO> active;
|
|
|
|
+ {
|
|
|
|
+ CriticalBlock b(crit);
|
|
|
|
+ active.set(current);
|
|
|
|
+ }
|
|
|
|
+ if (active.get() != &failure)
|
|
|
|
+ active->flush();
|
|
}
|
|
}
|
|
|
|
|
|
virtual offset_t size()
|
|
virtual offset_t size()
|
|
{
|
|
{
|
|
- CriticalBlock b(crit);
|
|
|
|
- _checkOpen();
|
|
|
|
|
|
+ unsigned activeIdx;
|
|
|
|
+ Owned<IFileIO> active = getCheckOpen(activeIdx);
|
|
lastAccess = msTick();
|
|
lastAccess = msTick();
|
|
- return current->size();
|
|
|
|
|
|
+ return active->size();
|
|
}
|
|
}
|
|
|
|
|
|
virtual unsigned __int64 getStatistic(StatisticKind kind)
|
|
virtual unsigned __int64 getStatistic(StatisticKind kind)
|
|
{
|
|
{
|
|
- CriticalBlock b(crit);
|
|
|
|
- unsigned __int64 openValue = current->getStatistic(kind);
|
|
|
|
- return openValue + fileStats.getStatisticValue(kind);
|
|
|
|
|
|
+ unsigned __int64 v = fileStats.getStatisticValue(kind);
|
|
|
|
+ CriticalBlock b(crit); // don't bother with linking current and performing getStatistic outside of crit, because getStatistic is very quick
|
|
|
|
+ return v + current->getStatistic(kind);
|
|
}
|
|
}
|
|
|
|
|
|
virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
|
|
virtual size32_t write(offset_t pos, size32_t len, const void * data) { throwUnexpected(); }
|