/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #include "jliball.hpp" #include "eclrtl.hpp" #include "hqlexpr.hpp" #include "hqlthql.hpp" #include "fvresultset.ipp" #include "fileview.hpp" #include "fvdisksource.ipp" #include "fvwugen.hpp" #include "fverror.hpp" #include "dasess.hpp" #define DEFAULT_MAX_CSV_SIZE 0x1100 PhysicalFileInfo::PhysicalFileInfo() { cachedPart = (unsigned)-1; totalSize = 0; } offset_t getPartSize(IDistributedFilePart & part, unsigned copy) { try { RemoteFilename rfn; Owned in = createIFile(part.getFilename(rfn,copy)); return in->size(); } catch (IException * e) { e->Release(); } return (offset_t) -1; } void PhysicalFileInfo::init(IDistributedFile * _df) { df.set(_df); totalSize = 0; Owned iter = df->getIterator(); ForEach(*iter) { IDistributedFilePart & cur = iter->query(); offset_t partSize = cur.getFileSize(true, false); if (partSize == -1) partSize = getPartSize(cur, 0); if (partSize == -1) partSize = getPartSize(cur, 1); if (partSize == -1) partSize = 0x100000; // force an error when the part is opened. partSizes.append(partSize); totalSize += partSize; } } offset_t PhysicalFileInfo::getOptimizedOffset(offset_t offset, unsigned copyLength) { offset_t newOffset = 0; ForEachItemIn(idx, partSizes) { offset_t curSize = partSizes.item(idx); if (offset < curSize) return newOffset + ((offset) / copyLength) * copyLength; newOffset += curSize; offset -= curSize; } return newOffset; } bool PhysicalFileInfo::readData(MemoryBuffer & out, __int64 startOffset, size32_t copyLength) { CriticalBlock procedure(cs); offset_t chunkOffset = startOffset; unsigned numParts = partSizes.ordinality(); unsigned part; offset_t curPartLength; if (isLocalFpos(startOffset)) { part = getLocalFposPart(startOffset); chunkOffset = getLocalFposOffset(startOffset); if (part >= numParts) return false; curPartLength = partSizes.item(part); } else { for (part = 0; part < numParts; part++) { curPartLength = partSizes.item(part); if (chunkOffset < curPartLength) break; chunkOffset -= curPartLength; } } if (part == numParts) return false; bool isLast = false; if (chunkOffset + copyLength >= curPartLength) { copyLength = (size32_t)(curPartLength - chunkOffset); isLast = true; } if (part != cachedPart) { cachedPart = (unsigned)-1; cachedFile.clear(); cachedIO.clear(); Owned dfp = df->getPart(part); try { RemoteFilename rfn; cachedFile.setown(createIFile(dfp->getFilename(rfn))); cachedIO.setown(cachedFile->open(IFOread)); } catch (IException * e) { e->Release(); } if (!cachedIO) { RemoteFilename rfn; cachedFile.setown(createIFile(dfp->getFilename(rfn,1))); cachedIO.setown(cachedFile->open(IFOread)); if (!cachedIO) { StringBuffer str; throwError1(FVERR_FailedOpenFile, dfp->getPartName(str).str()); return false; } } if (df->isCompressed()) { cachedIO.setown(createCompressedFileReader(cachedIO)); if (!cachedIO) { StringBuffer str; throwError1(FVERR_FailedOpenCompressedFile, dfp->getPartName(str).str()); return false; } } cachedPart = part; } char * data = (char *)out.clear().reserve(copyLength); unsigned numGot = cachedIO->read(chunkOffset, copyLength, data); out.setLength(numGot); return isLast; } void PhysicalFileInfo::close() { cachedPart = (unsigned)-1; cachedFile.clear(); cachedIO.clear(); } //--------------------------------------------------------------------------- DiskDataSource::DiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password) { logicalName.set(_logicalName); diskRecord.set(_diskRecord); Owned udesc; if(_username != NULL && *_username != '\0') { udesc.setown(createUserDescriptor()); udesc->set(_username, _password); } df.setown(queryDistributedFileDirectory().lookup(logicalName, udesc.get())); } //--------------------------------------------------------------------------- DirectDiskDataSource::DirectDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password) : DiskDataSource(_logicalName, _diskRecord, _username, _password) { } bool DirectDiskDataSource::init() { if (!df) return false; IPropertyTree & properties = df->queryProperties(); const char * kind = properties.queryProp("@kind"); bool isGrouped =properties.getPropBool("@grouped"); if (kind && (stricmp(kind, "key") == 0)) throwError1(FVERR_CannotViewKey, logicalName.get()); //Need to assign the transformed record to meta diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, isGrouped, 0)); if (!returnedMeta) { returnedMeta.set(diskMeta); returnedRecordSize.set(returnedMeta); } if (!transformedMeta) transformedMeta.set(returnedMeta); addFileposition(); physical.init(df); if (diskMeta->isFixedSize()) { if (diskMeta->fixedSize() == 0) throwError1(FVERR_ZeroSizeRecord, logicalName.get()); totalRows = physical.totalSize / diskMeta->fixedSize(); } else if (properties.hasProp("@recordCount")) totalRows = properties.getPropInt64("@recordCount"); else totalRows = UNKNOWN_NUM_ROWS; readBlockSize = 4 * diskMeta->getRecordSize(NULL); if (readBlockSize < DISK_BLOCK_SIZE) readBlockSize = DISK_BLOCK_SIZE; return true; } bool DirectDiskDataSource::fetchRowData(MemoryBuffer & out, __int64 offset) { physical.readData(out, offset, returnedMeta->getMaxRecordSize()); if (out.length() == 0) return false; out.setLength(returnedMeta->getRecordSize(out.toByteArray())); return true; } size32_t DirectDiskDataSource::getCopyLength() { size32_t copyLength = readBlockSize; if (returnedMeta->isFixedSize()) { unsigned fixedSize = returnedMeta->fixedSize(); copyLength = (copyLength / fixedSize) * fixedSize; } return copyLength; } void DirectDiskDataSource::improveLocation(__int64 row, RowLocation & location) { if (!returnedMeta->isFixedSize()) return; //Align the row so the chunks don't overlap.... unsigned fixedSize = returnedMeta->fixedSize(); size32_t copyLength = getCopyLength(); location.bestOffset = physical.getOptimizedOffset(row * fixedSize, copyLength); location.bestRow = location.bestOffset / fixedSize; } bool DirectDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset) { size32_t copyLength = getCopyLength(); MemoryBuffer temp; bool isLast = physical.readData(temp, startOffset, copyLength); if (temp.length() == 0) return false; RowBlock * rows; if (returnedMeta->isFixedSize()) rows = new FixedRowBlock(temp, startRow, startOffset, returnedMeta->fixedSize()); else rows = new VariableRowBlock(temp, startRow, startOffset, returnedRecordSize, isLast); cache.addRowsOwn(rows); return true; } void DirectDiskDataSource::onClose() { DiskDataSource::onClose(); if (openCount == 0) physical.close(); } //--------------------------------------------------------------------------- UtfReader::UtfFormat getFormat(const char * format) { if (memicmp(format, "utf", 3) == 0) { const char * tail = format + 3; if (*tail == '-') tail++; if (stricmp(tail, "8N")==0) return UtfReader::Utf8; else if (stricmp(tail, "16BE")==0) return UtfReader::Utf16be; else if (stricmp(tail, "16LE")==0) return UtfReader::Utf16le; else if (stricmp(tail, "32BE")==0) return UtfReader::Utf32be; else if (stricmp(tail, "32LE")==0) return UtfReader::Utf32le; else throwError1(FVERR_UnknownUTFFormat, format); } return UtfReader::Utf8; } enum { NONE, TERMINATOR }; void CsvRecordSize::init(IDistributedFile * df) { IPropertyTree * props = &df->queryProperties(); UtfReader::UtfFormat utfType = getFormat(props->queryProp("@format")); switch (utfType) { case UtfReader::Utf16be: case UtfReader::Utf16le: unitSize = 2; break; case UtfReader::Utf32be: case UtfReader::Utf32le: unitSize = 4; break; default: unitSize = 1; } maxRecordSize = props->getPropInt("@maxRecordSize", DEFAULT_MAX_CSV_SIZE); const char * terminate = props->queryProp("@csvTerminate"); addUtfActionList(matcher, terminate ? terminate : "\\n,\\r\\n", TERMINATOR, NULL, utfType); } size32_t CsvRecordSize::getRecordLength(size32_t maxLength, const void * start, bool includeTerminator) { //If we need more complicated processing... const byte * cur = (const byte *)start; const byte * end = (const byte *)start + maxLength; while (cur != end) { unsigned matchLen; unsigned match = matcher.getMatch(end-cur, (const char *)cur, matchLen); switch (match & 255) { case NONE: cur += unitSize; // matchLen == 0; break; case TERMINATOR: if (includeTerminator) return cur + matchLen - (const byte *)start; return cur - (const byte *)start; } cur += matchLen; } return end - (const byte *)start; } size32_t CsvRecordSize::getRecordSize(const void * start) { if (!start) return maxRecordSize; return getRecordLength(maxRecordSize, start, true); } size32_t CsvRecordSize::getRecordSize(unsigned maxLength, const void * start) { if (!start) return maxRecordSize; return getRecordLength(maxLength, start, true); } size32_t CsvRecordSize::getFixedSize() const { return 0; // is variable } DirectCsvDiskDataSource::DirectCsvDiskDataSource(IDistributedFile * _df, const char * _format) { df.set(_df); isUnicode = (memicmp(_format, "utf", 3) == 0); utfFormat = getFormat(_format); returnedMeta.setown(new DataSourceMetaData(isUnicode ? type_unicode : type_string)); returnedRecordSize.set(&recordSizer); transformedMeta.set(returnedMeta); addFileposition(); IPropertyTree & properties = df->queryProperties(); if (properties.hasProp("@recordCount")) totalRows = properties.getPropInt64("@recordCount"); } bool DirectCsvDiskDataSource::init() { physical.init(df); recordSizer.init(df); readBlockSize = 4 * recordSizer.getRecordSize(NULL); if (readBlockSize < DISK_BLOCK_SIZE) readBlockSize = DISK_BLOCK_SIZE; return true; } void DirectCsvDiskDataSource::copyRow(MemoryBuffer & out, size32_t length, const void * data) { if (isUnicode) { unsigned offsetOfLength = out.length(); out.append(length); convertUtf(out, UtfReader::Utf16le, length, data, utfFormat); unsigned savedLength = out.length(); out.setWritePos(offsetOfLength); out.append((unsigned) (savedLength - offsetOfLength - sizeof(unsigned))/2); out.setWritePos(savedLength); } else { out.append(length); out.append(length, data); } } bool DirectCsvDiskDataSource::fetchRowData(MemoryBuffer & out, __int64 offset) { MemoryBuffer temp; physical.readData(temp, offset, recordSizer.getRecordSize(NULL)); if (temp.length() == 0) return false; unsigned realLength = recordSizer.getRecordSize(temp.length(), temp.toByteArray()); copyRow(out, realLength, temp.toByteArray()); return true; } bool DirectCsvDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset) { size32_t copyLength = readBlockSize; MemoryBuffer temp; bool isLast = physical.readData(temp, startOffset, copyLength); if (temp.length() == 0) return false; RowBlock * rows = new VariableRowBlock(temp, startRow, startOffset, &recordSizer, isLast); cache.addRowsOwn(rows); return true; } bool DirectCsvDiskDataSource::getRow(MemoryBuffer & out, __int64 row) { size32_t length; const void * data; unsigned __int64 offset = 0; if (getRowData(row, length, data, offset)) { //strip the end of line terminator from the length... length = recordSizer.getRecordLength(length, data, false); copyRow(out, length, data); out.append(offset); return true; } return false; } //--------------------------------------------------------------------------- WorkunitDiskDataSource::WorkunitDiskDataSource(const char * _logicalName, IConstWUResult * _wuResult, const char * _wuid, const char * _username, const char * _password) : DirectDiskDataSource(_logicalName, NULL, _username, _password) { wuid.set(_wuid); wuResult.set(_wuResult); } bool WorkunitDiskDataSource::init() { if (!setReturnedInfoFromResult()) return false; diskRecord.set(returnedRecord); return DirectDiskDataSource::init(); } //--------------------------------------------------------------------------- TranslatedDiskDataSource::TranslatedDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char * _cluster, const char * _username, const char * _password) { logicalName.set(_logicalName); diskRecord.set(_diskRecord); cluster.set(_cluster); username.set(_username); password.set(_password); openCount = 0; } TranslatedDiskDataSource::~TranslatedDiskDataSource() { if (helperWuid) { directSource.clear(); Owned factory = getWorkUnitFactory(); factory->deleteWorkUnit(helperWuid); } } bool TranslatedDiskDataSource::createHelperWU() { OwnedHqlExpr browseWUcode = buildDiskOutputEcl(logicalName, diskRecord); if (!browseWUcode) return false; // MORE: Where should we get these parameters from ???? StringAttr application("fileViewer"); StringAttr customerid("viewer"); Owned factory = getWorkUnitFactory(); Owned workunit = factory->createWorkUnit(NULL, application, username); workunit->setUser(username); workunit->setClusterName(cluster); workunit->setCustomerId(customerid); workunit->setAction(WUActionCompile); StringBuffer jobName; jobName.append("FileView_for_").append(logicalName); workunit->setJobName(jobName.str()); StringBuffer eclText; toECL(browseWUcode, eclText, true); Owned query = workunit->updateQuery(); query->setQueryText(eclText.str()); query->setQueryName(jobName.str()); workunit->setCompareMode(CompareModeOff); StringAttrAdaptor xxx(helperWuid); workunit->getWuid(xxx); return true; } bool TranslatedDiskDataSource::init() { if (!createHelperWU() || !compileHelperWU()) return false; Owned factory = getWorkUnitFactory(); Owned wu = factory->openWorkUnit(helperWuid, false); Owned dataResult = wu->getResultBySequence(0); directSource.setown(new WorkunitDiskDataSource(logicalName, dataResult, helperWuid, username.get(), password.get())); return directSource->init(); } bool TranslatedDiskDataSource::compileHelperWU() { submitWorkUnit(helperWuid, username, password); return waitForWorkUnitToCompile(helperWuid); } //--------------------------------------------------------------------------- IndirectDiskDataSource::IndirectDiskDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char * _cluster, const char * _username, const char * _password) : DiskDataSource(_logicalName, _diskRecord, _username, _password) { cluster.set(_cluster); username.set(_username); password.set(_password); extraFieldsSize = sizeof(offset_t) + sizeof(unsigned short); totalSize = 0; } IndirectDiskDataSource::~IndirectDiskDataSource() { if (browseWuid) { Owned factory = getWorkUnitFactory(); factory->deleteWorkUnit(browseWuid); } } bool IndirectDiskDataSource::createBrowseWU() { OwnedHqlExpr browseWUcode = buildDiskFileViewerEcl(logicalName, diskRecord); if (!browseWUcode) return false; returnedRecord.set(browseWUcode->queryChild(0)->queryRecord()); // MORE: Where should we get these parameters from ???? StringAttr application("fileViewer"); StringAttr owner("fileViewer"); StringAttr customerid("viewer"); Owned factory = getWorkUnitFactory(); Owned workunit = factory->createWorkUnit(NULL, application, owner); workunit->setUser(owner); workunit->setClusterName(cluster); workunit->setCustomerId(customerid); StringBuffer jobName; jobName.append("FileView_for_").append(logicalName); workunit->setJobName(jobName.str()); StringBuffer eclText; toECL(browseWUcode, eclText, true); Owned query = workunit->updateQuery(); query->setQueryText(eclText.str()); query->setQueryName(jobName.str()); workunit->setCompareMode(CompareModeOff); StringAttrAdaptor xxx(browseWuid); workunit->getWuid(xxx); return true; } bool IndirectDiskDataSource::init() { if (!df) return false; if (!createBrowseWU()) return false; //Need to assign the transformed record to meta bool isGrouped = false; // more not sure this is strictly true... returnedMeta.setown(new DataSourceMetaData(returnedRecord, 2, true, isGrouped, 0)); transformedMeta.set(returnedMeta); diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, isGrouped, 0)); totalSize = df->getFileSize(true,false); if (diskMeta->isFixedSize()) totalRows = totalSize / diskMeta->fixedSize(); else totalRows = UNKNOWN_NUM_ROWS; return true; } bool IndirectDiskDataSource::loadBlock(__int64 startRow, offset_t startOffset) { MemoryBuffer temp; //enter scope....> { Owned factory = getWorkUnitFactory(); Owned wu = factory->updateWorkUnit(browseWuid); Owned lower = wu->updateVariableByName(LOWER_LIMIT_ID); lower->setResultInt(startOffset); lower->setResultStatus(ResultStatusSupplied); Owned dataResult = wu->updateResultBySequence(0); dataResult->setResultRaw(0, NULL, ResultFormatRaw); dataResult->setResultStatus(ResultStatusUndefined); wu->clearExceptions(); if (wu->getState() != WUStateUnknown) wu->setState(WUStateCompiled); //Owned count = wu->updateVariableByName(RECORD_LIMIT_ID); //count->setResultInt64(fetchSize); } //Resubmit the query... submitWorkUnit(browseWuid, username, password); WUState finalState = waitForWorkUnitToComplete(browseWuid, -1, true); if(!((finalState == WUStateCompleted) || (finalState == WUStateWait))) return false; //Now extract the results... Owned factory = getWorkUnitFactory(); Owned wu = factory->openWorkUnit(browseWuid, false); Owned dataResult = wu->getResultBySequence(0); MemoryBuffer2IDataVal xxx(temp); dataResult->getResultRaw(xxx, NULL, NULL); if (temp.length() == 0) return false; RowBlock * rows; if (returnedMeta->isFixedSize()) rows = new FilePosFixedRowBlock(temp, startRow, startOffset, returnedMeta->fixedSize()); else rows = new FilePosVariableRowBlock(temp, startRow, startOffset, returnedMeta, true); cache.addRowsOwn(rows); return true; }