/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #include "jliball.hpp" #include "eclrtl_imp.hpp" #include "hqlexpr.hpp" #include "fileview.hpp" #include "fvresultset.ipp" #include "fvidxsource.ipp" #include "fverror.hpp" #include "dasess.hpp" //cloned from hthor - a candidate for commoning up. static IKeyIndex *openKeyFile(IDistributedFilePart *keyFile) { unsigned numCopies = keyFile->numCopies(); assertex(numCopies); for (unsigned copy=0; copy < numCopies; copy++) { RemoteFilename rfn; try { OwnedIFile ifile = createIFile(keyFile->getFilename(rfn,copy)); unsigned __int64 thissize = ifile->size(); if (thissize != -1) { StringBuffer remotePath; rfn.getRemotePath(remotePath); unsigned crc; keyFile->getCrc(crc); return createKeyIndex(remotePath.str(), crc, false, false); } } catch (IException *E) { EXCLOG(E, "While opening index file"); E->Release(); } } RemoteFilename rfn; StringBuffer url; keyFile->getFilename(rfn).getRemotePath(url); throw MakeStringException(1001, "Could not open key file at %s%s", url.str(), (numCopies > 1) ? " or any alternate location." : "."); } //--------------------------------------------------------------------------- #define MIN_CACHED_ROWS 150 #define MAX_CACHED_ROWS 200 IndexPageCache::IndexPageCache() { firstRow = 0; offsetDelta = 0; offsets.append(0); saved.ensureCapacity(0x10000); } void IndexPageCache::addRow(__int64 row, size32_t len, const void * data) { if (row != firstRow + numRowsCached()) { firstRow = row; offsetDelta = 0; offsets.kill(); offsets.append(0); saved.setWritePos(0); } else if (numRowsCached() >= MAX_CACHED_ROWS) { unsigned numToRemove = numRowsCached() - MIN_CACHED_ROWS; __int64 newDelta = offsets.item(numToRemove); size32_t sizeLost = (size32_t)(newDelta-offsetDelta); //copy the cached rows byte * base = (byte *)saved.bufferBase(); memmove(base, base+sizeLost, saved.length()-sizeLost); saved.setWritePos(saved.length()-sizeLost); offsets.removen(0, numToRemove); firstRow += numToRemove; offsetDelta = newDelta; } assertex(row == firstRow + numRowsCached()); assertex(offsets.tos() == saved.length() + offsetDelta); saved.append(len, data); offsets.append(saved.length() + offsetDelta); } bool IndexPageCache::getRow(__int64 row, size32_t & len, const void * & data) { if (row < firstRow || row >= firstRow + numRowsCached()) return false; unsigned __int64 startOffset = offsets.item((unsigned)(row-firstRow)); unsigned __int64 endOffset = offsets.item((unsigned)(row-firstRow+1)); len = (size32_t)(endOffset - startOffset); data = saved.toByteArray() + (unsigned)(startOffset - offsetDelta); return true; } //--------------------------------------------------------------------------- //Could probably cope with unknown record, just displaying as binary. IndexDataSource::IndexDataSource(const char * _logicalName, IHqlExpression * _diskRecord, const char* _username, const char* _password) { logicalName.set(_logicalName); diskRecord.set(_diskRecord); Owned udesc; if(_username != NULL && *_username != '\0') { udesc.setown(createUserDescriptor()); udesc->set(_username, _password); } df.setown(queryDistributedFileDirectory().lookup(logicalName, udesc.get())); filtered = false; } IndexDataSource::IndexDataSource(IndexDataSource * _other) { logicalName.set(_other->logicalName); diskRecord.set(_other->diskRecord); df.set(_other->df); original.set(_other); // stop any work units etc. being unloaded. diskMeta.set(_other->diskMeta); // optimization - would be handled by init anyway filtered = false; //MORE: What else needs cloning/initializing? } IFvDataSource * IndexDataSource::cloneForFilter() { Owned ret = new IndexDataSource(this); if (ret->init()) return ret.getClear(); return NULL; } bool IndexDataSource::init() { if (!df) return false; numParts = df->numParts(); singlePart = (numParts == 1); ignoreSkippedRows = true; // better if skipping to a particular point StringBuffer partName; Owned kf = df->getPart(numParts-1); tlk.setown(openKeyFile(kf)); if (!tlk) return false; IPropertyTree & properties = df->queryAttributes(); //Need to assign the transformed record to meta if (!diskMeta) diskMeta.setown(new DataSourceMetaData(diskRecord, 0, true, false, tlk->keyedSize())); if (!returnedMeta) { returnedMeta.set(diskMeta); returnedRecordSize.set(returnedMeta); } if (!transformedMeta) transformedMeta.set(returnedMeta); if (properties.hasProp("@recordCount")) totalRows = properties.getPropInt64("@recordCount"); else totalRows = UNKNOWN_NUM_ROWS; // more: could probably count them isLocal = properties.hasProp("@local"); diskMeta->extractKeyedInfo(keyedOffsets, keyedTypes); ForEachItemIn(i, keyedTypes) { IStringSet * set = createRtlStringSet(fieldSize(i)); set->addAll(); values.append(*set); } fileposFieldType = diskMeta->queryType(diskMeta->numColumns()-1); assertex(fileposFieldType && fileposFieldType->isInteger()); //Now gather all the //Default cursor if no filter is applied applyFilter(); return true; } __int64 IndexDataSource::numRows(bool force) { if (!filtered) return totalRows; //If leading component isn't filtered, then this can take a very long time... if (!force && values.item(0).isFullSet()) return UNKNOWN_NUM_ROWS; __int64 total = 0; ForEachItemIn(i, matchingParts) { manager->setKey(NULL); curPart.clear(); if (singlePart) curPart.set(tlk); else { Owned kf = df->getPart(matchingParts.item(i)); curPart.setown(openKeyFile(kf)); if (!curPart) { total = UNKNOWN_NUM_ROWS; break; } } manager->setKey(curPart); manager->reset(); total += manager->getCount(); } manager->setKey(NULL); curPart.clear(); resetCursor(); return total; } bool IndexDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset) { if (cache.getRow(row, length, data)) return true; if (row < 0) return false; if ((unsigned __int64)row < nextRowToRead) resetCursor(); MemoryBuffer temp; while ((unsigned __int64)row >= nextRowToRead) { bool saveRow = !ignoreSkippedRows || (row == nextRowToRead); if (!getNextRow(temp.clear(), saveRow)) return false; if (saveRow) cache.addRow(nextRowToRead, temp.length(), temp.toByteArray()); nextRowToRead++; } return cache.getRow(row, length, data); } bool IndexDataSource::getNextRow(MemoryBuffer & out, bool extractRow) { bool nextPart = !matchingParts.isItem((unsigned)curPartIndex); loop { if (nextPart) { if ((curPartIndex != -1) && ((unsigned)curPartIndex >= matchingParts.ordinality())) return false; manager->setKey(NULL); curPart.clear(); if ((unsigned)++curPartIndex >= matchingParts.ordinality()) return false; if (singlePart) curPart.set(tlk); else { Owned kf = df->getPart(matchingParts.item(curPartIndex)); curPart.setown(openKeyFile(kf)); if (!curPart) return false; } manager->setKey(curPart); manager->reset(); } if (manager->lookup(true)) { if (extractRow) { if (false) { //MORE: Allow a transformer to cope with blobs etc. } else { unsigned fileposSize = fileposFieldType->getSize(); // unsigned thisSize = manager->queryRecordSize(); // Should be possible - needs a new function to call cursor->getSize() offset_t filepos; const byte * thisRow = manager->queryKeyBuffer(filepos); unsigned thisSize = diskMeta->getRecordSize(thisRow) - fileposSize; void * temp = out.reserve(thisSize); memcpy(temp, thisRow, thisSize); //Append the fileposition, in the correct size/endianness assertex(sizeof(filepos) >= 8); #if __BYTE_ORDER == __LITTLE_ENDIAN void * data = &filepos; #else void * data = (byte *)&filepos + sizeof(filepos) - fileposSize; #endif if (fileposFieldType->isSwappedEndian()) out.appendSwap(fileposSize, data); else out.append(fileposSize, data); } } return true; } nextPart = true; } } void IndexDataSource::resetCursor() { curPartIndex = -1; nextRowToRead = 0; } bool IndexDataSource::addFilter(unsigned column, unsigned matchLen, unsigned sizeData, const void * data) { if (!values.isItem(column)) return false; unsigned curSize = fieldSize(column); IStringSet & set = values.item(column); if (set.isFullSet()) set.reset(); ITypeInfo & cur = keyedTypes.item(column); rtlDataAttr tempLow, tempHigh; unsigned keyedSize = cur.getSize(); byte * temp = (byte *)alloca(keyedSize); const void * low = data; const void * high = data; type_t tc = cur.getTypeCode(); switch (tc) { case type_int: case type_swapint: { assertex(sizeData == curSize); // values are already converted to bigendian and correctly biased break; } case type_varstring: //should cast from string to varstring break; case type_string: case type_data: { const char * inbuff = (const char *)data; if (matchLen != FullStringMatch) { unsigned lenLow, lenHigh; rtlCreateRangeLow(lenLow, tempLow.refstr(), curSize, matchLen, sizeData, inbuff); rtlCreateRangeHigh(lenHigh, tempHigh.refstr(), curSize, matchLen, sizeData, inbuff); low = tempLow.getdata(); high = tempHigh.getdata(); } else { if (tc == type_string) { //may need to cast from ascii to ebcidic rtlStrToStr(curSize, temp, sizeData, data); } else rtlDataToData(curSize, temp, sizeData, data); low = high = temp; } break; } case type_qstring: { const char * inbuff = (const char *)data; unsigned lenData = rtlQStrLength(sizeData); unsigned lenField = cur.getStringLen(); if (matchLen != FullStringMatch) { unsigned lenLow, lenHigh; rtlCreateQStrRangeLow(lenLow, tempLow.refstr(), lenField, matchLen, lenData, inbuff); rtlCreateQStrRangeHigh(lenHigh, tempHigh.refstr(), lenField, matchLen, lenData, inbuff); low = tempLow.getdata(); high = tempHigh.getdata(); } else { rtlQStrToQStr(lenField, (char *)temp, lenData, (const char *)data); low = high = temp; } break; } default: assertex(sizeData == curSize); break; } set.addRange(low, high); filtered = true; return true; } void IndexDataSource::applyFilter() { manager.setown(createKeyManager(tlk, tlk->keySize(), NULL)); ForEachItemIn(i, values) { IStringSet & cur = values.item(i); bool extend = true; // almost certainly better manager->append(createKeySegmentMonitor(extend, LINK(&cur), keyedOffsets.item(i), fieldSize(i))); } manager->finishSegmentMonitors(); //Now work out which parts are affected. matchingParts.kill(); if (singlePart) matchingParts.append(0); else if (isLocal) { for (unsigned i = 0; i < numParts; i++) matchingParts.append(i); } else { manager->reset(); while (manager->lookup(false)) { offset_t node = manager->queryFpos(); if (node) matchingParts.append((unsigned)(node-1)); } } resetCursor(); }