/*##############################################################################
Copyright (C) 2011 HPCC Systems.
All rights reserved. This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as
published by the Free Software Foundation, either version 3 of the
License, or (at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see .
############################################################################## */
#include "platform.h"
#include "jliball.hpp"
#include "eclrtl.hpp"
#include "rtlds_imp.hpp"
#include "fvresultset.ipp"
#include "fileview.hpp"
#include "fvsource.ipp"
#include "hqlerror.hpp"
#include "eclhelper.hpp"
#include "hqlattr.hpp"
#include "hqlutil.hpp"
#include "fvdatasource.hpp"
#define MAX_RECORD_SIZE 4096
inline void appendNextXpathName(StringBuffer &s, const char *xpath, const char *&next)
{
while (*xpath && !strchr("*[/", *xpath))
s.append(*xpath++);
next = strchr(xpath, '/');
}
void splitXmlTagNamesFromXPath(const char *xpath, StringBuffer &inner, StringBuffer *outer=NULL)
{
if (!xpath || !xpath)
return;
StringBuffer s1;
StringBuffer s2;
const char *next=xpath;
appendNextXpathName(s1, xpath, next);
if (outer && next)
appendNextXpathName(s2, ++next, next);
if (next) //xpath too deep
return;
if (!s2.length())
inner.swapWith(s1);
else
{
inner.swapWith(s2);
outer->swapWith(s1);
}
if (!inner.length())
inner.set("/");
if (outer && !outer->length())
outer->set("/");
}
DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, const char * _name, const char * _xpath, ITypeInfo * _type)
{
flags = _flags;
name.set(_name);
type.set(_type);
xpath.set(_xpath);
splitXmlTagNamesFromXPath(_xpath, tagname);
}
DataSourceMetaItem::DataSourceMetaItem(unsigned _flags, MemoryBuffer & in)
{
flags = _flags;
in.read(name);
in.read(xpath);
type.setown(deserializeType(in));
splitXmlTagNamesFromXPath(xpath.get(), tagname);
}
void DataSourceMetaItem::serialize(MemoryBuffer & out) const
{
out.append(flags);
out.append(name);
out.append(xpath);
type->serialize(out);
}
//---------------------------------------------------------------------------
DataSourceDatasetItem::DataSourceDatasetItem(const char * _name, const char * _xpath, IHqlExpression * expr) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(expr->queryRecord(), 0, true, false, false)
{
type.setown(makeTableType(NULL, NULL, NULL, NULL));
name.set(_name);
xpath.set(_xpath);
splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
if (!record.tagname.length())
record.tagname.set("Row");
}
DataSourceDatasetItem::DataSourceDatasetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(FVFFdataset, NULL, NULL, NULL), record(in)
{
type.setown(makeTableType(NULL, NULL, NULL, NULL));
in.read(name);
}
void DataSourceDatasetItem::serialize(MemoryBuffer & out) const
{
out.append(flags);
record.serialize(out);
out.append(name);
}
//---------------------------------------------------------------------------
DataSourceSetItem::DataSourceSetItem(const char * _name, const char * _xpath, ITypeInfo * _type) : DataSourceMetaItem(FVFFset, _name, NULL, _type)
{
createChild();
xpath.set(_xpath);
StringBuffer attr;
splitXmlTagNamesFromXPath(_xpath, record.tagname, &tagname);
if (!record.tagname.length())
record.tagname.set("Item");
}
DataSourceSetItem::DataSourceSetItem(unsigned flags, MemoryBuffer & in) : DataSourceMetaItem(flags, in)
{
createChild();
StringBuffer attr;
splitXmlTagNamesFromXPath(xpath.get(), record.tagname, &tagname.clear());
if (!record.tagname.length())
record.tagname.set("Item");
}
void DataSourceSetItem::createChild()
{
ITypeInfo * childType = type->queryChildType()->queryPromotedType();
record.addSimpleField("Item", NULL, childType);
}
void DataSourceSetItem::serialize(MemoryBuffer & out) const
{
out.append(flags);
record.serialize(out);
out.append(name);
}
//---------------------------------------------------------------------------
DataSourceMetaData::DataSourceMetaData(IHqlExpression * _record, byte _numFieldsToIgnore, bool _randomIsOk, bool _isGrouped, unsigned _keyedSize)
{
init();
numFieldsToIgnore = _numFieldsToIgnore;
randomIsOk = _randomIsOk;
isStoredFixedWidth = true;
//MORE: Blobs aren't handled correctly in indexes....
maxRecordSize = ::getMaxRecordSize(_record, MAX_RECORD_SIZE);
keyedSize = _keyedSize;
gatherFields(_record, false);
if (_isGrouped)
{
Owned type = makeBoolType();
addSimpleField("__groupfollows__", NULL, type);
maxRecordSize++;
}
if (isStoredFixedWidth)
assertex(storedFixedSize == maxRecordSize);
}
DataSourceMetaData::DataSourceMetaData()
{
init();
randomIsOk = true;
isStoredFixedWidth = true;
}
DataSourceMetaData::DataSourceMetaData(type_t typeCode)
{
init();
OwnedITypeInfo type;
if (typeCode == type_unicode)
type.setown(makeUnicodeType(UNKNOWN_LENGTH, 0));
else
type.setown(makeStringType(UNKNOWN_LENGTH, NULL, NULL));
fields.append(*new DataSourceMetaItem(FVFFnone, "line", NULL, type));
}
void DataSourceMetaData::init()
{
keyedSize = 0;
storedFixedSize = 0;
maxRecordSize = 0;
bitsRemaining = 0;
numVirtualFields = 0;
isStoredFixedWidth = false;
randomIsOk = false;
numFieldsToIgnore = 0;
attrset = false;
}
DataSourceMetaData::DataSourceMetaData(MemoryBuffer & buffer)
{
attrset = false;
numVirtualFields = 0;
buffer.read(numFieldsToIgnore);
buffer.read(randomIsOk);
buffer.read(isStoredFixedWidth);
buffer.read(storedFixedSize);
buffer.read(keyedSize);
buffer.read(maxRecordSize);
unsigned numFields;
buffer.read(numFields);
for (unsigned idx=0; idx < numFields; idx++)
{
byte flags;
buffer.read(flags);
if (flags == FVFFdataset)
fields.append(*new DataSourceDatasetItem(flags, buffer));
else if (flags == FVFFset)
fields.append(*new DataSourceSetItem(flags, buffer));
else
fields.append(*new DataSourceMetaItem(flags, buffer));
if (flags == FVFFvirtual)
++numVirtualFields;
}
}
void DataSourceMetaData::addFileposition()
{
addVirtualField("__fileposition__", NULL, makeIntType(8, false));
}
void DataSourceMetaData::addSimpleField(const char * name, const char * xpath, ITypeInfo * type)
{
ITypeInfo * promoted = type->queryPromotedType();
unsigned size = promoted->getSize();
unsigned thisBits = 0;
if (size == UNKNOWN_LENGTH)
isStoredFixedWidth = false;
else if (type->getTypeCode() == type_bitfield)
{
thisBits = type->getBitSize();
if (thisBits > bitsRemaining)
{
size = type->queryChildType()->getSize();
storedFixedSize += size;
bitsRemaining = size * 8;
}
bitsRemaining -= thisBits;
}
else
storedFixedSize += size;
if (thisBits == 0)
bitsRemaining = 0;
fields.append(*new DataSourceMetaItem(FVFFnone, name, xpath, type));
}
void DataSourceMetaData::addVirtualField(const char * name, const char * xpath, ITypeInfo * type)
{
fields.append(*new DataSourceMetaItem(FVFFvirtual, name, xpath, type));
++numVirtualFields;
}
void DataSourceMetaData::extractKeyedInfo(UnsignedArray & offsets, TypeInfoArray & types)
{
unsigned curOffset = 0;
ForEachItemIn(i, fields)
{
if (curOffset >= keyedSize)
break;
DataSourceMetaItem & cur = fields.item(i);
switch (cur.flags)
{
case FVFFnone:
{
offsets.append(curOffset);
types.append(*LINK(cur.type));
unsigned size = cur.type->getSize();
assertex(size != UNKNOWN_LENGTH);
curOffset += size;
break;
}
case FVFFbeginrecord:
case FVFFendrecord:
break;
default:
throwUnexpected();
}
}
offsets.append(curOffset);
assertex(curOffset == keyedSize);
}
//MORE: Really this should create no_selects for the sub records, but pass on that for the moment.
void DataSourceMetaData::gatherFields(IHqlExpression * expr, bool isConditional)
{
switch (expr->getOperator())
{
case no_record:
gatherChildFields(expr, isConditional);
break;
case no_ifblock:
{
OwnedITypeInfo boolType = makeBoolType();
OwnedITypeInfo voidType = makeVoidType();
isStoredFixedWidth = false;
fields.append(*new DataSourceMetaItem(FVFFbeginif, NULL, NULL, boolType));
gatherChildFields(expr->queryChild(1), true);
fields.append(*new DataSourceMetaItem(FVFFendif, NULL, NULL, voidType));
break;
}
case no_field:
{
if (expr->hasProperty(__ifblockAtom))
break;
Linked type = expr->queryType();
IAtom * name = expr->queryName();
IHqlExpression * nameAttr = expr->queryProperty(namedAtom);
StringBuffer outname;
if (nameAttr && nameAttr->queryChild(0)->queryValue())
nameAttr->queryChild(0)->queryValue()->getStringValue(outname);
else
outname.append(name).toLowerCase();
StringBuffer xpathtext;
const char * xpath = NULL;
IHqlExpression * xpathAttr = expr->queryProperty(xpathAtom);
if (xpathAttr && xpathAttr->queryChild(0)->queryValue())
xpath = xpathAttr->queryChild(0)->queryValue()->getStringValue(xpathtext);
if (isKey() && expr->hasProperty(blobAtom))
type.setown(makeIntType(8, false));
type_t tc = type->getTypeCode();
if (tc == type_row)
{
OwnedITypeInfo voidType = makeVoidType();
fields.append(*new DataSourceMetaItem(FVFFbeginrecord, outname, xpath, voidType));
gatherChildFields(expr->queryRecord(), isConditional);
fields.append(*new DataSourceMetaItem(FVFFendrecord, outname, xpath, voidType));
}
else if ((tc == type_table) || (tc == type_groupedtable))
{
isStoredFixedWidth = false;
fields.append(*new DataSourceDatasetItem(outname, xpath, expr));
}
else if (tc == type_set)
{
isStoredFixedWidth = false;
fields.append(*new DataSourceSetItem(outname, xpath, type));
}
else
{
if (type->getTypeCode() == type_alien)
{
IHqlAlienTypeInfo * alien = queryAlienType(type);
type.set(alien->queryPhysicalType());
}
addSimpleField(outname, xpath, type);
}
break;
}
}
}
void DataSourceMetaData::gatherChildFields(IHqlExpression * expr, bool isConditional)
{
bitsRemaining = 0;
ForEachChild(idx, expr)
gatherFields(expr->queryChild(idx), isConditional);
}
unsigned DataSourceMetaData::numKeyedColumns() const
{
unsigned count = 0;
unsigned curOffset = 0;
ForEachItemIn(i, fields)
{
if (curOffset >= keyedSize)
break;
DataSourceMetaItem & cur = fields.item(i);
switch (cur.flags)
{
case FVFFnone:
{
unsigned size = cur.type->getSize();
assertex(size != UNKNOWN_LENGTH);
curOffset += size;
count++;
break;
}
default:
throwUnexpected();
}
}
return count;
assertex(curOffset == keyedSize);
}
unsigned DataSourceMetaData::numColumns() const
{
return fields.ordinality() - numFieldsToIgnore;
}
ITypeInfo * DataSourceMetaData::queryType(unsigned column) const
{
return fields.item(column).type;
}
unsigned DataSourceMetaData::queryFieldFlags(unsigned column) const
{
return fields.item(column).flags;
}
const char * DataSourceMetaData::queryName(unsigned column) const
{
return fields.item(column).name;
}
const char * DataSourceMetaData::queryXPath(unsigned column) const
{
return fields.item(column).xpath;
}
const char * DataSourceMetaData::queryXmlTag(unsigned column) const
{
DataSourceMetaItem &item = fields.item(column);
if (item.tagname.length())
return (*item.tagname.str()!='/') ? item.tagname.str() : NULL;
return item.name.get();
}
const char *DataSourceMetaData::queryXmlTag() const
{
if (tagname.length())
return (*tagname.str()!='/') ? tagname.str() : NULL;
return "Row";
}
const IntArray &DataSourceMetaData::queryAttrList()
{
if (!attrset)
{
ForEachItemIn(idx, fields)
{
DataSourceMetaItem &item = fields.item(idx);
if (*item.tagname.str()=='@')
attributes.append(idx);
}
attrset=true;
}
return attributes;
}
IFvDataSourceMetaData * DataSourceMetaData::queryChildMeta(unsigned column) const
{
return fields.item(column).queryChildMeta();
}
IFvDataSource * DataSourceMetaData::createChildDataSource(unsigned column, unsigned len, const void * data)
{
DataSourceMetaData * childMeta = fields.item(column).queryChildMeta();
if (childMeta)
return new NestedDataSource(*childMeta, len, data);
return NULL;
}
bool DataSourceMetaData::supportsRandomSeek() const
{
return randomIsOk;
}
void DataSourceMetaData::serialize(MemoryBuffer & buffer) const
{
//NB: Update NullDataSourceMeta if this changes....
buffer.append(numFieldsToIgnore);
buffer.append(randomIsOk);
buffer.append(isStoredFixedWidth);
buffer.append(storedFixedSize);
buffer.append(keyedSize);
buffer.append(maxRecordSize);
unsigned numFields = fields.ordinality();
buffer.append(numFields);
for (unsigned idx=0; idx < numFields; idx++)
{
fields.item(idx).serialize(buffer);
}
}
size32_t DataSourceMetaData::getRecordSize(const void *rec)
{
if (isStoredFixedWidth)
return storedFixedSize;
if (!rec)
return maxRecordSize;
const byte * data = (const byte *)rec;
unsigned curOffset = 0;
unsigned bitsRemaining = 0;
unsigned max = fields.ordinality() - numVirtualFields;
for (unsigned idx=0; idx < max; idx++)
{
ITypeInfo & type = *fields.item(idx).type;
unsigned size = type.getSize();
if (size == UNKNOWN_LENGTH)
{
const byte * cur = data + curOffset;
switch (type.getTypeCode())
{
case type_data:
case type_string:
case type_table:
case type_groupedtable:
size = *((unsigned *)cur) + sizeof(unsigned);
break;
case type_set:
size = *((unsigned *)(cur + sizeof(bool))) + sizeof(unsigned) + sizeof(bool);
break;
case type_qstring:
size = rtlQStrSize(*((unsigned *)cur)) + sizeof(unsigned);
break;
case type_unicode:
size = *((unsigned *)cur)*2 + sizeof(unsigned);
break;
case type_utf8:
size = sizeof(unsigned) + rtlUtf8Size(*(unsigned *)cur, cur+sizeof(unsigned));
break;
case type_varstring:
size = strlen((char *)cur)+1;
break;
case type_varunicode:
size = (rtlUnicodeStrlen((UChar *)cur)+1)*2;
break;
case type_packedint:
size = rtlGetPackedSize(cur);
break;
default:
UNIMPLEMENTED;
}
}
if (type.getTypeCode() == type_bitfield)
{
unsigned thisBits = type.getBitSize();
if (thisBits > bitsRemaining)
{
size = type.queryChildType()->getSize();
bitsRemaining = size * 8;
}
else
size = 0;
bitsRemaining -= thisBits;
}
else
bitsRemaining = 0;
curOffset += size;
}
return curOffset;
}
size32_t DataSourceMetaData::getFixedSize() const
{
if (isStoredFixedWidth)
return storedFixedSize;
return 0;
}
IFvDataSourceMetaData * deserializeDataSourceMeta(MemoryBuffer & in)
{
return new DataSourceMetaData(in);
}
//---------------------------------------------------------------------------
RowBlock::RowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset)
{
buffer.swapWith(_buffer);
start = _start;
startOffset = _startOffset;
numRows = 0;
}
RowBlock::RowBlock(__int64 _start, __int64 _startOffset)
{
start = _start;
startOffset = _startOffset;
numRows = 0;
}
void RowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
{
row = getNextRow();
offset = startOffset + buffer.length();
}
FixedRowBlock::FixedRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, size32_t _fixedRecordSize) : RowBlock(_buffer, _start, _startOffset)
{
if (_fixedRecordSize == 0) _fixedRecordSize = 1;
fixedRecordSize = _fixedRecordSize;
numRows = buffer.length() / fixedRecordSize;
}
const void * FixedRowBlock::fetchRow(__int64 offset, size32_t & len)
{
__int64 maxOffset = startOffset + buffer.length();
if (offset < startOffset || offset >= maxOffset)
return NULL;
len = fixedRecordSize;
return buffer.toByteArray() + (offset - startOffset);
}
const void * FixedRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
{
if (row < start || row >= start + numRows)
return NULL;
unsigned index = (unsigned)(row - start);
unsigned blockOffset = index * fixedRecordSize;
len = fixedRecordSize;
rowOffset = startOffset + blockOffset;
return buffer.toByteArray() + blockOffset;
}
VariableRowBlock::VariableRowBlock(MemoryBuffer & _buffer, __int64 _start, __int64 _startOffset, IRecordSizeEx * recordSize, bool isLast) : RowBlock(_buffer, _start, _startOffset)
{
const char * buff = buffer.toByteArray();
unsigned cur = 0;
unsigned max = buffer.length();
unsigned maxCur = max;
if (!isLast)
maxCur -= recordSize->getRecordSize(NULL);
while (cur < maxCur)
{
rowIndex.append(cur);
cur += recordSize->getRecordSize(max-cur, buff + cur);
}
buffer.setLength(cur);
rowIndex.append(cur);
numRows = rowIndex.ordinality()-1;
}
VariableRowBlock::VariableRowBlock(MemoryBuffer & inBuffer, __int64 _start) : RowBlock(_start, 0)
{
inBuffer.read(numRows);
for (unsigned row = 0; row < numRows; row++)
{
unsigned thisLength;
rowIndex.append(buffer.length());
inBuffer.read(thisLength);
buffer.append(thisLength, inBuffer.readDirect(thisLength));
}
rowIndex.append(buffer.length());
}
const void * VariableRowBlock::fetchRow(__int64 offset, size32_t & len)
{
__int64 maxOffset = startOffset + buffer.length();
if (offset < startOffset || offset >= maxOffset)
return NULL;
size32_t rowOffset = (size32_t)(offset - startOffset);
unsigned pos = rowIndex.find(rowOffset);
if (pos == NotFound)
return NULL;
len = rowIndex.item(pos+1)-rowOffset;
return buffer.toByteArray() + rowOffset;
}
const void * VariableRowBlock::getRow(__int64 row, size32_t & len, unsigned __int64 & rowOffset)
{
if (row < start || row >= start + numRows)
return NULL;
unsigned index = (unsigned)(row - start);
unsigned blockOffset = rowIndex.item(index);
len = rowIndex.item(index+1) - blockOffset;
rowOffset = startOffset + blockOffset;
return buffer.toByteArray() + blockOffset;
}
//---------------------------------------------------------------------------
offset_t calculateNextOffset(const char * data, unsigned len)
{
offset_t offset = *(offset_t *)(data + len - sizeof(offset_t) - sizeof(unsigned short));
return offset + *(unsigned short*)(data + len - sizeof(unsigned short));
}
void FilePosFixedRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
{
row = start + numRows;
offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
}
void FilePosVariableRowBlock::getNextStoredOffset(__int64 & row, offset_t & offset)
{
row = start + numRows;
offset = calculateNextOffset(buffer.toByteArray(), buffer.length());
}
//---------------------------------------------------------------------------
static int rowCacheId;
void RowCache::addRowsOwn(RowBlock * rows)
{
if (allRows.ordinality() == MaxBlocksCached)
makeRoom();
unsigned newPos = getInsertPosition(rows->getStartRow());
allRows.add(*rows, newPos);
ages.add(++rowCacheId, newPos);
}
bool RowCache::getCacheRow(__int64 row, RowLocation & location)
{
unsigned numRows = allRows.ordinality();
if (numRows == 0)
return false;
const RowBlock & first = allRows.item(0);
if (row < first.getStartRow())
{
location.bestRow = 0;
location.bestOffset = 0;
return false;
}
unsigned best = getBestRow(row);
ages.replace(++rowCacheId, best);
RowBlock & cur = allRows.item(best);
location.matchRow = cur.getRow(row, location.matchLength, location.bestOffset);
if (location.matchRow)
return true;
if (location.bestRow < cur.getNextRow())
{
cur.getNextStoredOffset(location.bestRow, location.bestOffset);
}
return false;
}
//Find the rowBlock that contains the expected row.
//Return the *previous* RowBlock if no match is found.
unsigned RowCache::getBestRow(__int64 row)
{
unsigned max = allRows.ordinality();
int start = 0;
int end = max;
while (end - start > 1)
{
int mid = (start + end) >> 1;
RowBlock & cur = allRows.item(mid);
if (row >= cur.getStartRow())
start = mid;
else
end = mid;
}
assertex(row >= allRows.item(start).getStartRow());
assertex(start != max);
return start;
}
unsigned RowCache::getInsertPosition(__int64 row)
{
unsigned max = allRows.ordinality();
int start = 0;
int end = max;
while (end - start > 1)
{
int mid = (start + end) >> 1;
RowBlock & cur = allRows.item(mid);
if (row >= cur.getStartRow())
start = mid;
else
end = mid;
}
if (start != max)
{
if (row >= allRows.item(start).getStartRow())
start++;
}
assertex(start == max || (row < allRows.item(start).getStartRow()));
return start;
}
void RowCache::makeRoom()
{
#if 0
//For testing the caching by throwing away random blocks
while (allRows.ordinality() > MinBlocksCached)
{
unsigned index = getRandom() % allRows.ordinality();
allRows.remove(index);
ages.remove(index);
}
return;
#endif
unsigned numToFree = allRows.ordinality() - MinBlocksCached;
RowBlock * * oldestRow = new RowBlock * [numToFree];
__int64 * oldestAge = new __int64[numToFree];
unsigned numGot = 0;
ForEachItemIn(idx, ages)
{
__int64 curAge = ages.item(idx);
unsigned compare = numGot;
while (compare != 0)
{
if (curAge >= oldestAge[compare-1])
break;
compare--;
}
if (compare < numToFree)
{
unsigned copySize = numGot - compare;
if (numGot == numToFree)
copySize--; //NB: Cannot go negative because compare < numToFree, numToFree == numGot => compare < numGot
if (copySize)
{
memmove(oldestAge + compare + 1, oldestAge + compare, copySize * sizeof(*oldestAge));
memmove(oldestRow + compare + 1, oldestRow + compare, copySize * sizeof(*oldestRow));
}
oldestAge[compare] = curAge;
oldestRow[compare] = &allRows.item(idx);
if (numGot != numToFree)
numGot++;
}
}
unsigned max = allRows.ordinality();
unsigned i;
for (i = 0; i < max; )
{
for (unsigned j = 0; j < numGot; j++)
{
if (oldestRow[j] == &allRows.item(i))
{
allRows.remove(i);
ages.remove(i);
max--;
goto testNext;
}
}
i++;
testNext: ;
}
delete [] oldestRow;
delete [] oldestAge;
}
//---------------------------------------------------------------------------
FVDataSource::FVDataSource()
{
transformer = NULL;
extraFieldsSize = 0;
openCount = 0;
appendFileposition = false;
}
FVDataSource::~FVDataSource()
{
//ensure all refs into the dll are cleared before it is unloaded
returnedRecordSize.clear();
}
void FVDataSource::addFileposition()
{
appendFileposition = true;
transformedMeta->addFileposition(); // This may modify the other metas as well!
}
void FVDataSource::copyRow(MemoryBuffer & out, const void * src, size32_t length)
{
if (transformer)
{
unsigned curLen = out.length();
size32_t maxSize = transformedMeta->getMaxRecordSize();
void * target = out.reserve(maxSize);
//MORE: won't cope with dynamic maxlengths
RtlStaticRowBuilder rowBuilder(target, maxSize);
unsigned copied = transformer(rowBuilder, (const byte *)src);
out.rewrite(curLen + copied);
}
else
out.append(length, src);
}
bool FVDataSource::fetchRow(MemoryBuffer & out, __int64 offset)
{
if (!transformer)
return fetchRawRow(out, offset);
MemoryBuffer temp;
if (fetchRowData(temp, offset))
{
copyRow(out, temp.toByteArray(), temp.length());
if (appendFileposition)
out.append(offset);
return true;
}
return false;
}
bool FVDataSource::fetchRawRow(MemoryBuffer & out, __int64 offset)
{
if (fetchRowData(out, offset))
{
if (appendFileposition)
out.append(offset);
return true;
}
return false;
}
bool FVDataSource::getRow(MemoryBuffer & out, __int64 row)
{
size32_t length;
const void * data;
unsigned __int64 offset = 0;
if (getRowData(row, length, data, offset))
{
copyRow(out, data, length);
if (appendFileposition)
out.append(offset);
return true;
}
return false;
}
bool FVDataSource::getRawRow(MemoryBuffer & out, __int64 row)
{
size32_t length;
const void * data;
unsigned __int64 offset = 0;
if (getRowData(row, length, data, offset))
{
out.append(length-extraFieldsSize, data);
if (appendFileposition)
out.append(offset);
return true;
}
return false;
}
void FVDataSource::loadDll(const char * wuid)
{
//MORE: This code should be commoned up and available in the work unit code or something...
Owned factory = getWorkUnitFactory();
Owned wu = factory->openWorkUnit(wuid, false);
//Plugins should already be loaded when they were registered with the ViewTransformerRegistry
//Something like the following code could be used to check the plugin version...
#if 0
Owned plugins = &wu->getPlugins();
SafePluginMap * map = NULL;
ForEach(*plugins)
{
IConstWUPlugin &thisplugin = plugins->query();
SCMStringBuffer name, version;
thisplugin.getPluginName(name);
thisplugin.getPluginVersion(version);
Owned loadedPlugin = map->getPluginDll(name.str(), version.str(), true);
if (!loadedPlugin)
break;
}
#endif
Owned q = wu->getQuery();
SCMStringBuffer dllname;
q->getQueryDllName(dllname);
// MORE - leaks....
loadedDll.setown(queryDllServer().loadDll(dllname.str(), DllLocationAnywhere));
}
IFvDataSourceMetaData * FVDataSource::queryMetaData()
{
return transformedMeta;
}
bool FVDataSource::setReturnedInfoFromResult()
{
SCMStringBuffer s;
wuResult->getResultEclSchema(s);
returnedRecord.setown(parseQuery(s.str()));
if (!returnedRecord)
throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
bool isKey = false;
bool isGrouped = false; // this isn't strictly true...it could be true for an internal result, but no current flag to test
returnedMeta.setown(new DataSourceMetaData(returnedRecord, 0, true, isGrouped, 0));
transformedRecord.setown(getSimplifiedRecord(returnedRecord, isKey));
if (!transformedRecord)
{
//No transformations needed, so don't need to loaded the dll containing the transform functions etc.
transformedRecord.set(returnedRecord);
returnedRecordSize.set(returnedMeta);
}
else
{
loadDll(wuid);
s.clear();
wuResult->getResultRecordSizeEntry(s);
if (s.length())
{
typedef IRecordSize * (* recSizeFunc)();
recSizeFunc func = (recSizeFunc)loadedDll->getEntry(s.str());
Owned createdRecordSize = func();
returnedRecordSize.setown(new RecordSizeToEx(createdRecordSize));
}
s.clear();
wuResult->getResultTransformerEntry(s);
if (s.length())
transformer = (rowTransformFunction)loadedDll->getEntry(s.str());
}
transformedMeta.setown(new DataSourceMetaData(transformedRecord, 0, true, isGrouped, 0));
return (returnedRecordSize != NULL);
}
//---------------------------------------------------------------------------
__int64 PagedDataSource::numRows(bool force)
{
if (force && (totalRows == UNKNOWN_NUM_ROWS))
{
//MORE: Need to go and work it out - however painful...
}
return totalRows;
}
bool PagedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
{
if ((row < 0) || ((unsigned __int64)row > totalRows))
return false;
RowLocation location;
loop
{
if (cache.getCacheRow(row, location))
{
length = location.matchLength;
data = location.matchRow;
offset = location.bestOffset;
return true;
}
improveLocation(row, location);
if (!loadBlock(location.bestRow, location.bestOffset))
return false;
}
}
void PagedDataSource::improveLocation(__int64 row, RowLocation & location)
{
}
//---------------------------------------------------------------------------
NestedDataSource::NestedDataSource(DataSourceMetaData & meta, unsigned len, const void * data)
{
returnedMeta.set(&meta);
returnedRecordSize.set(returnedMeta);
transformedMeta.set(returnedMeta);
totalSize = len;
MemoryBuffer temp;
temp.append(len, data);
if (returnedMeta->isFixedSize())
rows.setown(new FixedRowBlock(temp, 0, 0, returnedMeta->fixedSize()));
else
rows.setown(new VariableRowBlock(temp, 0, 0, returnedRecordSize, true));
}
bool NestedDataSource::init()
{
return true;
}
__int64 NestedDataSource::numRows(bool force)
{
return rows->getNextRow() - rows->getStartRow();
}
bool NestedDataSource::getRowData(__int64 row, size32_t & length, const void * & data, unsigned __int64 & offset)
{
data = rows->getRow(row, length, offset);
return (data != NULL);
}
//---------------------------------------------------------------------------
NullDataSource::NullDataSource(IHqlExpression * _record, bool _isGrouped, unsigned _keyedSize)
: meta(_record, 0, true, _isGrouped, _keyedSize)
{
}
FailureDataSource::FailureDataSource(IHqlExpression * _record, IException * _error, bool _isGrouped, unsigned _keyedSize)
: NullDataSource(_record, _isGrouped, _keyedSize), error(_error)
{
}
//---------------------------------------------------------------------------
IHqlExpression * parseQuery(const char * text)
{
MultiErrorReceiver errs;
OwnedHqlExpr ret = ::parseQuery(text, &errs);
if (errs.errCount() == 0)
return ret.getClear();
for (unsigned i=0; i < errs.errCount(); i++)
{
StringBuffer msg;
PrintLog("%d %s", errs.item(i)->getLine(), errs.item(i)->errorMessage(msg).str());
}
return NULL;
}
IFvDataSourceMetaData * createMetaData(IConstWUResult * wuResult)
{
SCMStringBuffer s;
wuResult->getResultEclSchema(s);
OwnedHqlExpr record = parseQuery(s.str());
if (!record)
throw MakeStringException(ERR_FILEVIEW_FIRST+4, "Could not process result schema [%s]", s.str());
OwnedHqlExpr simplifiedRecord = getSimplifiedRecord(record, false);
bool isGrouped = false; // more not sure this is strictly true...
if (!simplifiedRecord)
return new DataSourceMetaData(record, 0, true, isGrouped, 0);
return new DataSourceMetaData(simplifiedRecord, 0, true, isGrouped, 0);
}