|
@@ -445,11 +445,10 @@ void CHThorDiskWriteActivity::stop()
|
|
|
outSeq->flush(NULL);
|
|
|
if(blockcompressed)
|
|
|
uncompressedBytesWritten = outSeq->getPosition();
|
|
|
- updateWorkUnitResult(numRecords);
|
|
|
close();
|
|
|
+ updateWorkUnitResult(numRecords);
|
|
|
if((helper.getFlags() & (TDXtemporary | TDXjobtemp) ) == 0 && !agent.queryResolveFilesLocally())
|
|
|
publish();
|
|
|
- io.clear();
|
|
|
incomplete = false;
|
|
|
if(clusterHandler)
|
|
|
clusterHandler->finish(file);
|
|
@@ -645,6 +644,12 @@ void CHThorDiskWriteActivity::close()
|
|
|
{
|
|
|
diskout.clear();
|
|
|
outSeq.clear();
|
|
|
+ if (io)
|
|
|
+ {
|
|
|
+ io->flush();
|
|
|
+ numDiskWrites = io->getStatistic(StNumDiskWrites);
|
|
|
+ io.clear();
|
|
|
+ }
|
|
|
if(clusterHandler)
|
|
|
clusterHandler->copyPhysical(file, agent.queryWorkUnit()->getDebugValueBool("__output_cluster_no_copy_physical", false));
|
|
|
}
|
|
@@ -743,11 +748,7 @@ void CHThorDiskWriteActivity::publish()
|
|
|
if (helper.getFlags() & TDWrestricted)
|
|
|
properties.setPropBool("restricted", true);
|
|
|
|
|
|
- if (io)
|
|
|
- {
|
|
|
- numDiskWrites = io->getStatistic(StNumDiskWrites);
|
|
|
- properties.setPropInt64("@numDiskWrites", numDiskWrites);
|
|
|
- }
|
|
|
+ properties.setPropInt64("@numDiskWrites", numDiskWrites);
|
|
|
StringBuffer lfn;
|
|
|
expandLogicalFilename(lfn, mangledHelperFileName.str(), agent.queryWorkUnit(), agent.queryResolveFilesLocally(), false);
|
|
|
CDfsLogicalFileName logicalName;
|