/*############################################################################## Copyright (C) 2011 HPCC Systems. All rights reserved. This program is free software: you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with this program. If not, see . ############################################################################## */ #include "jliball.hpp" #include "hql.hpp" #include "platform.h" #include "jlib.hpp" #include "jmisc.hpp" #include "jstream.ipp" #include "jdebug.hpp" #include "hql.hpp" #include "hqlthql.hpp" #include "hqlhtcpp.ipp" #include "hqlttcpp.ipp" #include "hqlutil.hpp" #include "hqlthql.hpp" #include "hqlpmap.hpp" #include "hqlwcpp.hpp" #include "hqlcpputil.hpp" #include "hqltcppc.ipp" #include "hqlopt.hpp" #include "hqlfold.hpp" #include "hqlcerrors.hpp" #include "hqlcatom.hpp" #include "hqlresource.hpp" #include "hqlregex.ipp" #include "hqlsource.ipp" #include "hqlcse.ipp" #include "eclhelper.hpp" //-------------------------------------------------------------------------------------------------- IHqlExpression * getHozedBias(ITypeInfo * type) { unsigned __int64 bias = ((unsigned __int64)1 << (type->getSize()*8-1)); return createConstant(type->castFrom(false, bias)); } bool requiresHozedTransform(ITypeInfo * type) { type = type->queryPromotedType(); switch (type->getTypeCode()) { case type_boolean: case type_data: case type_qstring: return false; case type_littleendianint: return type->isSigned() || type->getSize() != 1; case type_bigendianint: return type->isSigned(); case type_string: case type_varstring: return (type->queryCharset()->queryName() != asciiAtom); case type_decimal: return type->isSigned(); default: //anything else is a payload field, don't do any transformations... return false; } } bool requiresHozedTransform(IHqlExpression * value, ITypeInfo * keyFieldType) { ITypeInfo * type = value->queryType(); if (type != keyFieldType) return true; return requiresHozedTransform(type); } bool isKeyableType(ITypeInfo * type) { switch (type->getTypeCode()) { case type_boolean: case type_swapint: case type_int: case type_decimal: return true; case type_string: case type_varstring: case type_qstring: case type_data: case type_unicode: case type_varunicode: case type_utf8: return (type->getSize() != UNKNOWN_LENGTH); default: return false; } } IHqlExpression * getHozedKeyValue(IHqlExpression * _value) { HqlExprAttr value = _value; Linked type = _value->queryType()->queryPromotedType(); type_t tc = type->getTypeCode(); switch (tc) { case type_boolean: case type_data: case type_qstring: break; case type_int: case type_swapint: if (type->isSigned()) { type.setown(makeIntType(type->getSize(), false)); value.setown(ensureExprType(value, type)); value.setown(createValue(no_add, LINK(type), LINK(value), getHozedBias(type))); } if ((type->getTypeCode() == type_littleendianint) && (type->getSize() != 1)) type.setown(makeSwapIntType(type->getSize(), false)); break; case type_string: if (type->queryCharset()->queryName() != asciiAtom) type.setown(makeStringType(type->getSize(), NULL, NULL)); break; case type_varstring: if (type->queryCharset()->queryName() != asciiAtom) type.setown(makeVarStringType(type->getStringLen(), NULL, NULL)); break; case type_decimal: if (!type->isSigned()) break; //fallthrough default: //anything else is a payload field, don't do any transformations... break; } return ensureExprType(value, type); } IHqlExpression * convertIndexPhysical2LogicalValue(IHqlExpression * cur, IHqlExpression * physicalSelect, bool allowTranslate) { if (cur->hasProperty(blobAtom)) { if (cur->isDataset()) return createDataset(no_id2blob, LINK(physicalSelect), LINK(cur->queryRecord())); else if (cur->isDatarow()) return createRow(no_id2blob, LINK(physicalSelect), LINK(cur->queryRecord())); else return createValue(no_id2blob, cur->getType(), LINK(physicalSelect)); } else if (allowTranslate) { LinkedHqlExpr newValue = physicalSelect; OwnedHqlExpr target = createSelectExpr(getActiveTableSelector(), LINK(cur)); // select not used, just created to get correct types. ITypeInfo * type = target->queryType(); type_t tc = type->getTypeCode(); if (tc == type_int || tc == type_swapint) { if (type->isSigned()) { Owned tempType = makeIntType(type->getSize(), false); newValue.setown(ensureExprType(newValue, tempType)); newValue.setown(createValue(no_sub, newValue->getType(), LINK(newValue), getHozedBias(newValue->queryType()))); } } return ensureExprType(newValue, type); } else return LINK(physicalSelect); } //-------------------------------------------------------------------------------------------------- void HqlCppTranslator::buildJoinMatchFunction(BuildCtx & ctx, const char * name, IHqlExpression * left, IHqlExpression * right, IHqlExpression * match, IHqlExpression * selSeq) { if (match) { StringBuffer s; BuildCtx matchctx(ctx); matchctx.addQuotedCompound(s.append("virtual bool ").append(name).append("(const void * _left, const void * _right)")); matchctx.addQuoted("const unsigned char * left = (const unsigned char *) _left;"); matchctx.addQuoted("const unsigned char * right = (const unsigned char *) _right;"); bindTableCursor(matchctx, left, "left", no_left, selSeq); bindTableCursor(matchctx, right, "right", no_right, selSeq); OwnedHqlExpr cseMatch = options.spotCSE ? spotScalarCSE(match) : LINK(match); buildReturn(matchctx, cseMatch); } } //-------------------------------------------------------------------------------------------------- IHqlExpression * createKeyFromComplexKey(IHqlExpression * expr) { IHqlExpression * base = queryPhysicalRootTable(expr); if (base->getOperator() == no_newkeyindex) return LINK(base); UNIMPLEMENTED_XY("Key", getOpString(base->getOperator())); return NULL; } class KeyedJoinInfo : public CInterface { public: KeyedJoinInfo(HqlCppTranslator & _translator, IHqlExpression * _expr, bool _canOptimizeTransfer); ~KeyedJoinInfo(); void buildClearRightFunction(BuildCtx & classctx); void buildExtractFetchFields(BuildCtx & ctx); void buildExtractIndexReadFields(BuildCtx & ctx); void buildExtractJoinFields(ActivityInstance & instance); void buildFailureTransform(BuildCtx & ctx, IHqlExpression * transform); void buildFetchMatch(BuildCtx & ctx); void buildIndexReadMatch(BuildCtx & ctx); void buildLeftOnly(BuildCtx & ctx); void buildMonitors(BuildCtx & ctx); void buildTransform(BuildCtx & ctx); IHqlExpression * getMatchExpr(bool isKeyFilter) { return isKeyFilter ? LINK(monitors->queryExtraFilter()) : LINK(fileFilter); } bool isFetchFiltered() { return fileFilter != NULL; } bool isFullJoin() { return file != NULL; } bool isHalfJoin() { return !file; } bool processFilter(); IHqlExpression * queryKey() { return key; } IHqlExpression * queryOriginalKey() { return originalKey; } IHqlExpression * queryKeyFilename() { return hasComplexIndex ? NULL : key->queryChild(3); } IHqlExpression * queryFile() { return file; } IHqlExpression * queryFileFilename() { return file->queryChild(0); } IHqlExpression * queryRawKey() { return rawKey; } IHqlExpression * queryRawRhs() { return rawRhs; } bool isKeyOpt() { return key->hasProperty(optAtom); } bool isFileOpt() { return file && file->hasProperty(optAtom); } bool needToExtractJoinFields() const { return extractJoinFieldsTransform != NULL; } bool hasPostFilter() const { return monitors->queryExtraFilter() || fileFilter; } bool requireActivityForKey() const { return hasComplexIndex; } void reportFailureReason(IHqlExpression * cond) { monitors->reportFailureReason(cond); } protected: void buildClearRecord(BuildCtx & ctx, RecordSelectIterator & rawIter, RecordSelectIterator & keyIter); void buildTransformBody(BuildCtx & ctx, IHqlExpression * transform); IHqlExpression * expandDatasetReferences(IHqlExpression * expr, IHqlExpression * ds); IHqlExpression * optimizeTransfer(HqlExprArray & fields, HqlExprArray & values, IHqlExpression * expr, IHqlExpression * leftSelector); void optimizeExtractJoinFields(); void optimizeTransfer(SharedHqlExpr & targetDataset, SharedHqlExpr & targetTransform, SharedHqlExpr & keyedFilter, OwnedHqlExpr * extraFilter); void splitFilter(IHqlExpression * filter, SharedHqlExpr & keyTarget); protected: HqlCppTranslator & translator; HqlExprAttr expr; HqlExprAttr originalKey; // even if computed/parameter HqlExprAttr key; HqlExprAttr rawKey; HqlExprAttr expandedKey; HqlExprAttr file; HqlExprAttr expandedFile; HqlExprAttr keyAccessDataset; HqlExprAttr keyAccessTransform; HqlExprAttr fileAccessDataset; HqlExprAttr fileAccessTransform; HqlExprAttr joinSeq; MonitorExtractor * monitors; HqlExprAttr fileFilter; HqlExprAttr leftOnlyMatch; HqlExprAttr rawRhs; TableProjectMapper keyedMapper; OwnedHqlExpr counter; OwnedHqlExpr extractJoinFieldsRecord; OwnedHqlExpr extractJoinFieldsTransform; bool canOptimizeTransfer; bool hasComplexIndex; }; KeyedJoinInfo::KeyedJoinInfo(HqlCppTranslator & _translator, IHqlExpression * _expr, bool _canOptimizeTransfer) : translator(_translator) { expr.set(_expr); joinSeq.set(querySelSeq(expr)); hasComplexIndex = false; IHqlExpression * right = expr->queryChild(1); IHqlExpression * keyed = expr->queryProperty(keyedAtom); if (keyed && keyed->queryChild(0)) { key.set(keyed->queryChild(0)); if (right->getOperator() == no_keyed) right = right->queryChild(0); file.set(right); IHqlExpression * rightTable = queryPhysicalRootTable(right); if (!rightTable || rightTable->queryNormalizedSelector() != right->queryNormalizedSelector()) translator.throwError(HQLERR_FullKeyedNeedsFile); expandedFile.setown(convertToPhysicalTable(rightTable, true)); keyedMapper.setDataset(key); } else if (right->getOperator() == no_newkeyindex) { key.set(right); } else { hasComplexIndex = true; originalKey.set(right); key.setown(createKeyFromComplexKey(right)); } if (!originalKey) originalKey.set(key); expandedKey.setown(translator.convertToPhysicalIndex(key)); rawKey.set(queryPhysicalRootTable(expandedKey)); canOptimizeTransfer = _canOptimizeTransfer; monitors = NULL; counter.set(queryPropertyChild(expr, _countProject_Atom, 0)); if (isFullJoin()) rawRhs.set(queryPhysicalRootTable(expandedFile)); else rawRhs.set(rawKey); } KeyedJoinInfo::~KeyedJoinInfo() { delete monitors; } void KeyedJoinInfo::buildClearRecord(BuildCtx & ctx, RecordSelectIterator & rawIter, RecordSelectIterator & keyIter) { keyIter.first(); ForEach(rawIter) { assert(keyIter.isValid()); OwnedHqlExpr rawSelect = rawIter.get(); OwnedHqlExpr keySelect = keyIter.get(); ITypeInfo * keyFieldType = keySelect->queryType(); OwnedHqlExpr null = createNullExpr(keyFieldType); OwnedHqlExpr keyNull = (rawIter.isInsideIfBlock() || (rawIter.isInsideNested() && isInPayload())) ? LINK(null) : getHozedKeyValue(null); OwnedHqlExpr folded = foldHqlExpression(keyNull); translator.buildAssign(ctx, rawSelect, folded); keyIter.next(); } } void KeyedJoinInfo::buildClearRightFunction(BuildCtx & classctx) { if (extractJoinFieldsTransform || isFullJoin()) { OwnedHqlExpr ds = createDataset(no_anon, LINK(extractJoinFieldsRecord)); translator.buildClearRecordMember(classctx, "Right", ds); } else { //Need to initialize the record with the zero logical values, not zero key values //which differs for biased integers etc. BuildCtx funcctx(classctx); funcctx.addQuotedCompound("virtual size32_t createDefaultRight(ARowBuilder & crSelf)"); translator.ensureRowAllocated(funcctx, "crSelf"); BoundRow * selfCursor = translator.bindSelf(funcctx, rawKey, "crSelf"); IHqlExpression * rawSelf = selfCursor->querySelector(); RecordSelectIterator rawIter(rawKey->queryRecord(), rawSelf); RecordSelectIterator keyIter(key->queryRecord(), key); buildClearRecord(funcctx, rawIter, keyIter); translator.buildReturnRecordSize(funcctx, selfCursor); } } void KeyedJoinInfo::buildExtractFetchFields(BuildCtx & ctx) { // For the data going to the fetch remote activity: //virtual size32_t extractFetchFields(ARowBuilder & crSelf, const void * _left) = 0; if (fileAccessDataset) { BuildCtx ctx1(ctx); ctx1.addQuotedCompound("virtual size32_t extractFetchFields(ARowBuilder & crSelf, const void * _left)"); translator.ensureRowAllocated(ctx1, "crSelf"); if (fileAccessTransform) { translator.buildTransformBody(ctx1, fileAccessTransform, expr->queryChild(0), NULL, fileAccessDataset, joinSeq); } else { translator.buildRecordSerializeExtract(ctx1, fileAccessDataset->queryRecord()); } } //virtual IOutputMetaData * queryFetchInputRecordSize() = 0; translator.buildMetaMember(ctx, fileAccessDataset, false, "queryFetchInputRecordSize"); } void KeyedJoinInfo::buildExtractIndexReadFields(BuildCtx & ctx) { //virtual size32_t extractIndexReadFields(ARowBuilder & crSelf, const void * _left) = 0; BuildCtx ctx1(ctx); ctx1.addQuotedCompound("virtual size32_t extractIndexReadFields(ARowBuilder & crSelf, const void * _left)"); translator.ensureRowAllocated(ctx1, "crSelf"); if (keyAccessTransform) { translator.buildTransformBody(ctx1, keyAccessTransform, expr->queryChild(0), NULL, keyAccessDataset, joinSeq); } else { translator.buildRecordSerializeExtract(ctx1, keyAccessDataset->queryRecord()); } //virtual IOutputMetaData * queryIndexReadInputRecordSize() = 0; translator.buildMetaMember(ctx, keyAccessDataset, isGrouped(keyAccessDataset), "queryIndexReadInputRecordSize"); //->false } void KeyedJoinInfo::buildExtractJoinFields(ActivityInstance & instance) { //virtual size32_t extractJoinFields(void *dest, const void *diskRow, IBlobProvider * blobs) = 0; BuildCtx extractctx(instance.startctx); extractctx.addQuotedCompound("virtual size32_t extractJoinFields(ARowBuilder & crSelf, const void *_left, unsigned __int64 _filepos, IBlobProvider * blobs)"); translator.ensureRowAllocated(extractctx, "crSelf"); if (needToExtractJoinFields()) { OwnedHqlExpr extracted = createDataset(no_anon, LINK(extractJoinFieldsRecord)); OwnedHqlExpr raw = createDataset(no_anon, LINK(rawRhs->queryRecord())); BoundRow * selfCursor = translator.buildTransformCursors(extractctx, extractJoinFieldsTransform, raw, NULL, extracted, joinSeq); if (isHalfJoin()) { OwnedHqlExpr left = createSelector(no_left, raw, joinSeq); translator.associateBlobHelper(extractctx, left, "blobs"); OwnedHqlExpr fileposExpr = getFilepos(left, false); OwnedHqlExpr fileposVar = createVariable("_filepos", fileposExpr->getType()); extractctx.associateExpr(fileposExpr, fileposVar); } translator.doBuildTransformBody(extractctx, extractJoinFieldsTransform, selfCursor); } else { translator.buildRecordSerializeExtract(extractctx, extractJoinFieldsRecord); } //virtual IOutputMetaData * queryJoinFieldsRecordSize() = 0; translator.buildMetaMember(instance.classctx, extractJoinFieldsRecord, false, "queryJoinFieldsRecordSize"); } void KeyedJoinInfo::buildFetchMatch(BuildCtx & ctx) { translator.buildJoinMatchFunction(ctx, "fetchMatch", fileAccessDataset, expr->queryChild(1), fileFilter, joinSeq); } void KeyedJoinInfo::buildIndexReadMatch(BuildCtx & ctx) { LinkedHqlExpr matchExpr = monitors->queryExtraFilter(); if (matchExpr) { BuildCtx matchctx(ctx); matchctx.addQuotedCompound("virtual bool indexReadMatch(const void * _left, const void * _right, unsigned __int64 _filepos, IBlobProvider * blobs)"); matchctx.addQuoted("const unsigned char * left = (const unsigned char *) _left;"); matchctx.addQuoted("const unsigned char * right = (const unsigned char *) _right;"); OwnedHqlExpr fileposExpr = getFilepos(rawKey, false); OwnedHqlExpr fileposVar = createVariable("_filepos", fileposExpr->getType()); if (translator.queryOptions().spotCSE) matchExpr.setown(spotScalarCSE(matchExpr)); translator.associateBlobHelper(matchctx, rawKey, "blobs"); translator.bindTableCursor(matchctx, keyAccessDataset, "left", no_left, joinSeq); translator.bindTableCursor(matchctx, rawKey, "right"); matchctx.associateExpr(fileposExpr, fileposVar); translator.buildReturn(matchctx, matchExpr); } } void KeyedJoinInfo::buildLeftOnly(BuildCtx & ctx) { if (leftOnlyMatch) { BuildCtx funcctx(ctx); funcctx.addQuotedCompound("virtual bool leftCanMatch(const void * _left)"); funcctx.addQuoted("const unsigned char * left = (const unsigned char *)_left;"); translator.bindTableCursor(funcctx, expr->queryChild(0), "left", no_left, joinSeq); translator.buildReturn(funcctx, leftOnlyMatch); } } void KeyedJoinInfo::buildMonitors(BuildCtx & ctx) { monitors->optimizeSegments(keyAccessDataset->queryRecord()); //---- virtual void createSegmentMonitors(struct IIndexReadContext *) { ... } ---- BuildCtx createSegmentCtx(ctx); createSegmentCtx.addQuotedCompound("virtual void createSegmentMonitors(IIndexReadContext *irc, const void * _left)"); createSegmentCtx.addQuoted("const unsigned char * left = (const unsigned char *) _left;"); translator.bindTableCursor(createSegmentCtx, keyAccessDataset, "left", no_left, joinSeq); monitors->buildSegments(createSegmentCtx, "irc", false); } void KeyedJoinInfo::buildTransform(BuildCtx & ctx) { BuildCtx funcctx(ctx); switch (expr->getOperator()) { case no_join: { funcctx.addQuotedCompound("virtual size32_t transform(ARowBuilder & crSelf, const void * _left, const void * _right, unsigned __int64 _filepos)"); break; } case no_denormalize: { funcctx.addQuotedCompound("virtual size32_t transform(ARowBuilder & crSelf, const void * _left, const void * _right, unsigned __int64 _filepos, unsigned counter)"); translator.associateCounter(funcctx, counter, "counter"); break; } case no_denormalizegroup: { funcctx.addQuotedCompound("virtual size32_t transform(ARowBuilder & crSelf, const void * _left, const void * _right, unsigned numRows, const void * * _rows)"); funcctx.addQuoted("unsigned char * * rows = (unsigned char * *) _rows;"); break; } } translator.ensureRowAllocated(funcctx, "crSelf"); buildTransformBody(funcctx, expr->queryChild(3)); } //expand references to oldDataset using ds. //First expand references like a.b using the table definition //Then need to expand references to the complete table as a usertable projection. //e.g., left.x := l.x + right; static IHqlExpression * expandDatasetReferences(IHqlExpression * expr, IHqlExpression * ds, IHqlExpression * oldDataset, IHqlExpression * newDataset) { TableProjectMapper mapper(ds); OwnedHqlExpr expanded = mapper.expandFields(expr, oldDataset, newDataset); OwnedHqlExpr mapParent; IHqlExpression * dsParent = ds->queryChild(0); OwnedHqlExpr seq = createSelectorSequence(); switch (getChildDatasetType(ds)) { case childdataset_dataset: mapParent.set(dsParent->queryNormalizedSelector()); break; case childdataset_left: UNIMPLEMENTED; mapParent.setown(createSelector(no_left, dsParent, querySelSeq(ds))); break; default: UNIMPLEMENTED; break; } OwnedHqlExpr newLeft = createSelector(no_left, dsParent, seq); OwnedHqlExpr newKeyTransform = replaceSelector(queryNewColumnProvider(ds), mapParent, newLeft); HqlExprArray args; unwindChildren(args, newKeyTransform); newKeyTransform.setown(createValue(no_transform, makeTransformType(LINK(ds->queryRecordType())), args)); OwnedHqlExpr rightProject = createRow(no_projectrow, LINK(newDataset), createComma(LINK(newKeyTransform), LINK(seq))); OwnedHqlExpr wrappedProject = createRow(no_newrow, LINK(rightProject)); return replaceSelector(expanded, oldDataset, wrappedProject); } IHqlExpression * KeyedJoinInfo::expandDatasetReferences(IHqlExpression * transform, IHqlExpression * ds) { OwnedHqlExpr oldRight = createSelector(no_right, ds, joinSeq); OwnedHqlExpr newRight = createSelector(no_right, ds->queryChild(0), joinSeq); return ::expandDatasetReferences(transform, ds, oldRight, newRight); } void KeyedJoinInfo::buildTransformBody(BuildCtx & ctx, IHqlExpression * transform) { IHqlExpression * rhs = expr->queryChild(1); IHqlExpression * rhsRecord = rhs->queryRecord(); OwnedHqlExpr serializedRhsRecord = getSerializedForm(rhsRecord); //Map the file position field in the file to the incoming parameter //MORE: This doesn't cope with local/global file position distinctions. OwnedHqlExpr fileposVar = createVariable("_filepos", makeIntType(8, false)); OwnedHqlExpr joinDataset = createDataset(no_anon, LINK(extractJoinFieldsRecord)); OwnedHqlExpr newTransform = replaceMemorySelectorWithSerializedSelector(transform, rhsRecord, no_right, joinSeq); OwnedHqlExpr oldRight = createSelector(no_right, serializedRhsRecord, joinSeq); OwnedHqlExpr newRight = createSelector(no_right, joinDataset, joinSeq); OwnedHqlExpr fileposExpr = getFilepos(newRight, false); if (extractJoinFieldsTransform) { IHqlExpression * fileposField = isFullJoin() ? queryVirtualFileposField(file->queryRecord()) : queryLastField(key->queryRecord()); if (fileposField && (expr->getOperator() != no_denormalizegroup)) { HqlMapTransformer fileposMapper; OwnedHqlExpr select = createSelectExpr(LINK(oldRight), LINK(fileposField)); OwnedHqlExpr castFilepos = ensureExprType(fileposExpr, fileposField->queryType()); fileposMapper.setMapping(select, castFilepos); newTransform.setown(fileposMapper.transformRoot(newTransform)); } newTransform.setown(replaceSelector(newTransform, oldRight, newRight)); } else { if (isFullJoin()) { if (expandedFile != queryPhysicalRootTable(file)) newTransform.setown(expandDatasetReferences(newTransform, expandedFile)); } else newTransform.setown(expandDatasetReferences(newTransform, expandedKey)); } newTransform.setown(optimizeHqlExpression(newTransform, HOOfold|HOOcompoundproject)); newTransform.setown(foldHqlExpression(newTransform)); BoundRow * selfCursor = translator.buildTransformCursors(ctx, newTransform, expr->queryChild(0), joinDataset, expr, joinSeq); if (expr->getOperator() == no_denormalizegroup) { //Last parameter is false since implementation of group keyed denormalize in roxie passes in pointers to the serialized slave data bool rowsAreLinkCounted = false; translator.bindRows(ctx, no_right, joinSeq, expr->queryProperty(_rowsid_Atom), joinDataset, "numRows", "rows", rowsAreLinkCounted); } ctx.associateExpr(fileposExpr, fileposVar); translator.doBuildTransformBody(ctx, newTransform, selfCursor); } void KeyedJoinInfo::buildFailureTransform(BuildCtx & ctx, IHqlExpression * onFailTransform) { BuildCtx funcctx(ctx); funcctx.addQuotedCompound("virtual size32_t onFailTransform(ARowBuilder & crSelf, const void * _left, const void * _right, unsigned __int64 _filepos, IException * except)"); translator.associateLocalFailure(funcctx, "except"); translator.ensureRowAllocated(funcctx, "crSelf"); buildTransformBody(funcctx, onFailTransform); } IHqlExpression * KeyedJoinInfo::optimizeTransfer(HqlExprArray & fields, HqlExprArray & values, IHqlExpression * filter, IHqlExpression * leftSelector) { switch (filter->getOperator()) { case no_join: return NULL; case no_left: if (filter->queryBody() == leftSelector) return NULL; // Something nasty e.g., evaluate()... break; case no_right: return LINK(filter); case no_select: { //Check for an expression of the form LEFT.x.y.z.a.b.c, but if any of x,y,z, are datasets then process later. IHqlExpression * cur = filter; IHqlExpression * ds; loop { ds = cur->queryChild(0); //if a select from a dataset, then wait until we recurse to here if ((ds->getOperator() != no_select) || ds->isDataset()) break; cur = ds; } //check it was the correct left.. if (ds->queryBody() == leftSelector) { unsigned match = values.find(*filter); if (match == NotFound) { match = fields.ordinality(); LinkedHqlExpr field = filter->queryChild(1); if (fields.find(*field) != NotFound) { //Check same field isn't used in two different nested records. StringBuffer name; name.append("__unnamed__").append(fields.ordinality()); field.setown(createField(createIdentifierAtom(name), field->getType(), NULL, NULL)); } fields.append(*LINK(field)); values.append(*LINK(filter)); } IHqlExpression * matchField = &fields.item(match); OwnedHqlExpr serializedField = getSerializedForm(matchField); OwnedHqlExpr result = createSelectExpr(getActiveTableSelector(), LINK(serializedField)); return ensureDeserialized(result, matchField->queryType()); } break; } case no_attr_expr: if (filter->queryName() == _selectors_Atom) return LINK(filter); break; } HqlExprArray children; ForEachChild(i, filter) { IHqlExpression * next = optimizeTransfer(fields, values, filter->queryChild(i), leftSelector); if (!next) return NULL; children.append(*next); } return cloneOrLink(filter, children); } IHqlExpression * reverseOptimizeTransfer(IHqlExpression * left, IHqlExpression * transform, IHqlExpression * filter) { if (!transform) return LINK(filter); HqlMapTransformer mapper; ForEachChild(i, transform) { IHqlExpression * cur = transform->queryChild(i); IHqlExpression * tgt = cur->queryChild(0); IHqlExpression * src = cur->queryChild(1); OwnedHqlExpr selector = createSelectExpr(LINK(left), LINK(tgt->queryChild(1))); mapper.setMapping(selector, src); } return mapper.transformRoot(filter); } void KeyedJoinInfo::optimizeTransfer(SharedHqlExpr & targetDataset, SharedHqlExpr & targetTransform, SharedHqlExpr & filter, OwnedHqlExpr * extraFilter) { IHqlExpression * dataset = expr->queryChild(0); if (canOptimizeTransfer) { if (filter) { IHqlExpression * record = dataset->queryRecord(); HqlExprArray fields; HqlExprArray values; bool hasExtra = (extraFilter && extraFilter->get()); OwnedHqlExpr oldLeft = createSelector(no_left, dataset, joinSeq); OwnedHqlExpr newFilter = optimizeTransfer(fields, values, filter, oldLeft); OwnedHqlExpr newExtraFilter = hasExtra ? optimizeTransfer(fields, values, *extraFilter, oldLeft) : NULL; if (newFilter && (newExtraFilter || !hasExtra) && fields.ordinality() < getFieldCount(dataset->queryRecord())) { OwnedHqlExpr extractedRecord = translator.createRecordInheritMaxLength(fields, dataset); OwnedHqlExpr serializedRecord = getSerializedForm(extractedRecord); targetDataset.setown(createDataset(no_anon, LINK(serializedRecord), NULL)); HqlExprArray assigns; OwnedHqlExpr self = getSelf(serializedRecord); ForEachItemIn(i, fields) { IHqlExpression * curField = &fields.item(i); OwnedHqlExpr serializedField = getSerializedForm(curField); OwnedHqlExpr value = ensureSerialized(&values.item(i)); assigns.append(*createAssign(createSelectExpr(LINK(self), LINK(serializedField)), LINK(value))); } targetTransform.setown(createValue(no_newtransform, makeTransformType(serializedRecord->getType()), assigns)); OwnedHqlExpr leftSelect = createSelector(no_left, serializedRecord, joinSeq); filter.setown(replaceSelector(newFilter, queryActiveTableSelector(), leftSelect)); if (hasExtra) extraFilter->setown(replaceSelector(newExtraFilter, queryActiveTableSelector(), leftSelect)); } else if (recordRequiresSerialization(record)) { OwnedHqlExpr serializedRecord = getSerializedForm(record); targetDataset.setown(createDataset(no_anon, LINK(serializedRecord))); targetTransform.setown(createRecordMappingTransform(no_transform, serializedRecord, oldLeft)); filter.setown(replaceMemorySelectorWithSerializedSelector(filter, record, no_left, joinSeq)); if (hasExtra) extraFilter->setown(replaceMemorySelectorWithSerializedSelector(*extraFilter, record, no_left, joinSeq)); } else targetDataset.set(dataset); } else { //any fields will be serialized automatically, and no filter targetDataset.set(dataset); } } else { targetDataset.set(dataset); } } static void expandAllFields(HqlExprArray & fieldsAccessed, IHqlExpression * expr) { switch (expr->getOperator()) { case no_field: if (fieldsAccessed.find(*expr) == NotFound) fieldsAccessed.append(*LINK(expr)); break; case no_ifblock: expandAllFields(fieldsAccessed, expr->queryChild(1)); break; case no_record: ForEachChild(i, expr) expandAllFields(fieldsAccessed, expr->queryChild(i)); break; } } static void doGatherFieldsAccessed(RecursionChecker & checker, HqlExprArray & fieldsAccessed, IHqlExpression * expr, IHqlExpression * ds) { if (checker.alreadyVisited(expr)) return; checker.setVisited(expr); switch (expr->getOperator()) { case no_select: if (expr->queryChild(0) == ds) { IHqlExpression * field = expr->queryChild(1); if (fieldsAccessed.find(*field) == NotFound) fieldsAccessed.append(*LINK(field)); return; } break; case no_record: case no_attr: return; case no_left: case no_right: //Never walk children if (expr == ds) expandAllFields(fieldsAccessed, expr->queryRecord()); return; default: if (expr == ds) { expandAllFields(fieldsAccessed, expr->queryRecord()); return; } break; } ForEachChild(i, expr) doGatherFieldsAccessed(checker, fieldsAccessed, expr->queryChild(i), ds); } static void gatherFieldsAccessed(HqlExprArray & fieldsAccessed, IHqlExpression * cond, IHqlExpression * ds) { RecursionChecker checker; doGatherFieldsAccessed(checker, fieldsAccessed, cond, ds); } void KeyedJoinInfo::optimizeExtractJoinFields() { HqlExprArray fieldsAccessed; bool doExtract = false; IHqlExpression * fileposField = NULL; OwnedHqlExpr right = createSelector(no_right, expr->queryChild(1), joinSeq); IHqlExpression * rightRecord = right->queryRecord(); OwnedHqlExpr extractedRecord; if (expr->getOperator() == no_denormalizegroup) { //Version1: Don't remove any fields doExtract = true; OwnedHqlExpr rows = createDataset(no_rows, LINK(right), LINK(expr->queryProperty(_rowsid_Atom))); if (isFullJoin()) { //unwindFields(fieldsAccessed, file->queryRecord()); extractedRecord.set(file->queryRecord()); } else { // unwindFields(fieldsAccessed, key->queryRecord()); extractedRecord.set(key->queryRecord()); fileposField = queryLastField(key->queryRecord()); } } else { gatherFieldsAccessed(fieldsAccessed, expr->queryChild(3), right); if (isFullJoin()) { IHqlExpression * filepos = queryVirtualFileposField(file->queryRecord()); if (filepos) fieldsAccessed.zap(*filepos); if (translator.getTargetClusterType() != HThorCluster) doExtract = (fieldsAccessed.ordinality() < getFlatFieldCount(rawRhs->queryRecord())); } else { IHqlExpression * keyRecord = key->queryRecord(); IHqlExpression * filepos = queryLastField(keyRecord); if (filepos) fieldsAccessed.zap(*filepos); if (translator.getTargetClusterType() != HThorCluster) doExtract = (fieldsAccessed.ordinality() < getFlatFieldCount(keyRecord)-1); if (!doExtract && recordContainsBlobs(keyRecord)) doExtract = true; } if (recordRequiresSerialization(rightRecord)) doExtract = true; } if (doExtract) { HqlExprArray assigns; OwnedHqlExpr left = createSelector(no_left, rawRhs, joinSeq); if (extractedRecord || (fieldsAccessed.ordinality() != 0)) { if (!extractedRecord) extractedRecord.setown(translator.createRecordInheritMaxLength(fieldsAccessed, rawRhs)); extractJoinFieldsRecord.setown(getSerializedForm(extractedRecord)); OwnedHqlExpr self = getSelf(extractJoinFieldsRecord); OwnedHqlExpr memorySelf = getSelf(extractedRecord); if (isFullJoin() && (rawRhs->queryBody() == expandedFile->queryBody())) { assertex(extractedRecord == extractJoinFieldsRecord); ForEachChild(i, extractedRecord) { IHqlExpression * curMemoryField = extractedRecord->queryChild(i); IHqlExpression * curSerializedField = extractJoinFieldsRecord->queryChild(i); if (curMemoryField == fileposField) assigns.append(*createAssign(createSelectExpr(LINK(self), LINK(curSerializedField)), getFilepos(left, false))); else if (!curMemoryField->isAttribute()) assigns.append(*createAssign(createSelectExpr(LINK(self), LINK(curSerializedField)), createSelectExpr(LINK(left), LINK(curMemoryField)))); // no } } else { TableProjectMapper fieldMapper; if (isFullJoin()) fieldMapper.setDataset(expandedFile); else fieldMapper.setDataset(expandedKey); ForEachChild(i, extractJoinFieldsRecord) { IHqlExpression * curMemoryField = extractedRecord->queryChild(i); IHqlExpression * curSerializedField = extractJoinFieldsRecord->queryChild(i); if (curMemoryField == fileposField) assigns.append(*createAssign(createSelectExpr(LINK(self), LINK(curSerializedField)), getFilepos(left, false))); else if (!curMemoryField->isAttribute()) { OwnedHqlExpr tgt = createSelectExpr(LINK(self), LINK(curSerializedField)); OwnedHqlExpr src = createSelectExpr(LINK(memorySelf), LINK(curMemoryField)); OwnedHqlExpr mappedSrc = fieldMapper.expandFields(src, memorySelf, left, rawRhs); assigns.append(*createAssign(tgt.getClear(), ensureSerialized(mappedSrc))); } } } } else { //A bit of a hack - Richard can't cope with zero length values being returned, so allocate //a single byte to keep him happy. OwnedHqlExpr nonEmptyAttr = createAttribute(_nonEmpty_Atom); extractJoinFieldsRecord.setown(createRecord(nonEmptyAttr)); } extractJoinFieldsTransform.setown(createValue(no_transform, makeTransformType(extractJoinFieldsRecord->getType()), assigns)); } else { if (isFullJoin()) extractJoinFieldsRecord.set(rawRhs->queryRecord()); else extractJoinFieldsRecord.set(rawKey->queryRecord()); } } bool KeyedJoinInfo::processFilter() { OwnedHqlExpr atmostCond, atmostLimit; IHqlExpression * atmost = expr->queryProperty(atmostAtom); extractAtmostArgs(atmost, atmostCond, atmostLimit); IHqlExpression * cond = expr->queryChild(2); OwnedHqlExpr fuzzy, hard; translator.splitFuzzyCondition(cond, atmostCond, fuzzy, hard); OwnedHqlExpr keyedKeyFilter, fuzzyKeyFilter; splitFilter(hard, keyedKeyFilter); if (!keyedKeyFilter) { if (!cond->queryValue() || cond->queryValue()->getBoolValue()) { StringBuffer s; getExprECL(cond, s); if (isFullJoin()) translator.throwError1(HQLERR_KeyAccessNoKeyField, s.str()); else translator.throwError1(HQLERR_KeyedJoinTooComplex, s.str()); } else leftOnlyMatch.set(cond); } if (atmost && fileFilter) { StringBuffer s; translator.throwError1(HQLERR_BadKeyedJoinConditionAtMost,getExprECL(fileFilter, s.append(" (")).append(")").str()); } splitFilter(fuzzy, fuzzyKeyFilter); //Now work out what fields need to be serialized to perform the match optimizeTransfer(keyAccessDataset, keyAccessTransform, keyedKeyFilter, &fuzzyKeyFilter); if (file && fileFilter) optimizeTransfer(fileAccessDataset, fileAccessTransform, fileFilter, NULL); //Now need to transform the index into its real representation so //the hozed transforms take place. unsigned payload = numPayloadFields(key); assertex(payload); // don't use rawindex once payload can be 0 TableProjectMapper mapper(expandedKey); OwnedHqlExpr rightSelect = createSelector(no_right, key, joinSeq); OwnedHqlExpr newFilter = mapper.expandFields(keyedKeyFilter, rightSelect, rawKey, rawKey); //Now extract the filters from it. OwnedHqlExpr extra; monitors = new MonitorExtractor(rawKey, translator, -(int)numPayloadFields(rawKey), false); if (newFilter) monitors->extractFilters(newFilter, extra); if (atmost && extra && (atmostCond || !monitors->isCleanlyKeyedExplicitly())) { StringBuffer s; //map the key references back so the error message refers to RIGHT instead of a weird key expression. bool collapsedAll = false; OwnedHqlExpr collapsed = mapper.collapseFields(extra, rawKey, rightSelect, rawKey, &collapsedAll); translator.throwError1(HQLERR_BadKeyedJoinConditionAtMost,getExprECL(collapsed, s.append(" (")).append(")").str()); } // OwnedHqlExpr oldLeft = createSelector(no_left, expr->queryChild(0), joinSeq); OwnedHqlExpr newLeft = createSelector(no_left, keyAccessDataset, joinSeq); //Finally extend the non-keyed filter with the non-keyed portion. OwnedHqlExpr newFuzzyKeyFilter = mapper.expandFields(fuzzyKeyFilter, rightSelect, rawKey, rawKey); // newFuzzyKeyFilter.setown(replaceSelector(newFuzzyKeyFilter, oldLeft, newLeft)); monitors->appendFilter(newFuzzyKeyFilter); //add any key-invariant condition to the leftOnly match IHqlExpression * keyedLeftOnly = monitors->queryGlobalGuard(); if (keyedLeftOnly) extendConditionOwn(leftOnlyMatch, no_and, reverseOptimizeTransfer(newLeft, keyAccessTransform, keyedLeftOnly)); //optimize the fields returned from the rhs to perform the transform if (expr->getOperator() != no_keyeddistribute) optimizeExtractJoinFields(); return monitors->isKeyed(); } void KeyedJoinInfo::splitFilter(IHqlExpression * filter, SharedHqlExpr & keyTarget) { if (!filter) return; if (filter->getOperator() == no_and) { splitFilter(filter->queryChild(0), keyTarget); splitFilter(filter->queryChild(1), keyTarget); } else if (containsOnlyLeft(filter)) extendAndCondition(leftOnlyMatch, filter); else if (filter->queryValue()) { //remove silly "and true" conditions if (!filter->queryValue()->getBoolValue()) extendAndCondition(keyTarget, filter); } else { if (file) { bool doneAll = false; OwnedHqlExpr fileRight = createSelector(no_right, file, joinSeq); OwnedHqlExpr keyRight = createSelector(no_right, key, joinSeq); OwnedHqlExpr mapped = keyedMapper.collapseFields(filter, fileRight, keyRight, &doneAll); if (doneAll) extendAndCondition(keyTarget, mapped); else extendAndCondition(fileFilter, filter); } else extendAndCondition(keyTarget, filter); } } void HqlCppTranslator::buildKeyedJoinExtra(ActivityInstance & instance, IHqlExpression * expr, KeyedJoinInfo * info) { //virtual IOutputMetaData * queryDiskRecordSize() = 0; // Excluding fpos and sequence if (info->isFullJoin()) buildMetaMember(instance.classctx, info->queryRawRhs(), false, "queryDiskRecordSize"); //virtual unsigned __int64 extractPosition(const void * _right) = 0; // Gets file position value from rhs row if (info->isFullJoin()) { IHqlExpression * index = info->queryKey(); IHqlExpression * indexRecord = index->queryRecord(); BuildCtx ctx4(instance.startctx); ctx4.addQuotedCompound("virtual unsigned __int64 extractPosition(const void * _right)"); ctx4.addQuoted("const unsigned char * right = (const unsigned char *) _right;"); bindTableCursor(ctx4, index, "right"); OwnedHqlExpr fileposExpr = createSelectExpr(LINK(index), LINK(indexRecord->queryChild(indexRecord->numChildren()-1))); buildReturn(ctx4, fileposExpr); } //virtual const char * getFileName() = 0; // Returns filename of raw file fpos'es refer into if (info->isFullJoin()) buildFilenameFunction(instance, instance.createctx, "getFileName", info->queryFileFilename(), hasDynamicFilename(info->queryFile())); //virtual bool diskAccessRequired() = 0; if (info->isFullJoin()) doBuildBoolFunction(instance.startctx, "diskAccessRequired", true); //virtual size32_t transform(ARowBuilder & crSelf, const void * _left, const void * _right) = 0; info->buildTransform(instance.startctx); IHqlExpression * onFail = expr->queryProperty(onFailAtom); if (onFail) { //virtual size32_t onFailTransform(ARowBuilder & crSelf, const void * _left, const void * _right, unsigned __int64 _filepos, IException * except) info->buildFailureTransform(instance.startctx, onFail->queryChild(0)); } //limit helpers... IHqlExpression * limit = expr->queryProperty(limitAtom); if (limit) { if (limit->hasProperty(skipAtom)) { BuildCtx ctx1(instance.startctx); ctx1.addQuotedCompound("virtual unsigned __int64 getSkipLimit()"); buildReturn(ctx1, limit->queryChild(0)); } else buildLimitHelpers(instance.startctx, limit->queryChild(0), limit->queryChild(1), false, info->queryKeyFilename(), instance.activityId); } } void HqlCppTranslator::buildKeyJoinIndexReadHelper(ActivityInstance & instance, IHqlExpression * expr, KeyedJoinInfo * info) { //virtual size32_t extractIndexReadFields(ARowBuilder & crSelf, const void * _input) = 0; //virtual IOutputMetaData * queryIndexReadInputRecordSize() = 0; info->buildExtractIndexReadFields(instance.startctx); //virtual const char * getIndexFileName() = 0; buildFilenameFunction(instance, instance.startctx, "getIndexFileName", info->queryKeyFilename(), hasDynamicFilename(info->queryKey())); //virtual IOutputMetaData * queryIndexRecordSize() = 0; //Excluding fpos and sequence buildMetaMember(instance.classctx, info->queryRawKey(), false, "queryIndexRecordSize"); //virtual void createSegmentMonitors(IIndexReadContext *ctx, const void *lhs) = 0; info->buildMonitors(instance.startctx); //virtual bool indexReadMatch(const void * indexRow, const void * inputRow) = 0; info->buildIndexReadMatch(instance.startctx); } void HqlCppTranslator::buildKeyJoinFetchHelper(ActivityInstance & instance, IHqlExpression * expr, KeyedJoinInfo * info) { //virtual size32_t extractFetchFields(ARowBuilder & crSelf, const void * _input) = 0; //virtual IOutputMetaData * queryFetchInputRecordSize() = 0; info->buildExtractFetchFields(instance.startctx); // Inside the fetch remote activity //virtual bool fetchMatch(const void * diskRow, const void * inputRow) = 0; info->buildFetchMatch(instance.startctx); //virtual size32_t extractJoinFields(void *dest, const void *diskRow, IBlobProvider * blobs) = 0; info->buildExtractJoinFields(instance); } ABoundActivity * HqlCppTranslator::doBuildActivityKeyedJoinOrDenormalize(BuildCtx & ctx, IHqlExpression * expr) { KeyedJoinInfo info(*this, expr, !targetHThor()); IHqlExpression * cond = expr->queryChild(2); if (!info.processFilter() && !cond->isConstant()) info.reportFailureReason(cond); if (info.isFullJoin()) { IHqlExpression * table = info.queryFile(); if (table->getOperator() != no_table) throwError(HQLERR_FullJoinNeedDataset); } Owned boundDataset1 = buildCachedActivity(ctx, expr->queryChild(0)); Owned boundIndexActivity; if (options.forceActivityForKeyedJoin || info.requireActivityForKey()) boundIndexActivity.setown(buildCachedActivity(ctx, info.queryOriginalKey())); node_operator op = expr->getOperator(); ThorActivityKind kind; switch (op) { case no_join: kind = TAKkeyedjoin; break; case no_denormalize: kind = TAKkeyeddenormalize; break; case no_denormalizegroup: kind = TAKkeyeddenormalizegroup; break; } Owned instance = new ActivityInstance(*this, ctx, kind, expr, (op == no_join) ? "KeyedJoin" : "KeyedDenormalize"); IHqlExpression * indexName = info.queryKeyFilename(); if (indexName) { OwnedHqlExpr folded = foldHqlExpression(indexName); if (folded->queryValue()) { StringBuffer graphLabel; if (instance->isGrouped) graphLabel.append("Grouped "); else if (instance->isLocal) graphLabel.append("Local "); graphLabel.append(getActivityText(instance->kind)); getStringValue(graphLabel.append("\n'"), folded).append("'"); instance->graphLabel.set(graphLabel.str()); } } buildActivityFramework(instance); IHqlExpression * rowlimit = expr->queryProperty(rowLimitAtom); StringBuffer s; buildInstancePrefix(instance); OwnedHqlExpr atmostCond, atmostLimit; IHqlExpression * atmost = expr->queryProperty(atmostAtom); extractAtmostArgs(atmost, atmostCond, atmostLimit); //virtual unsigned getJoinFlags() StringBuffer flags; bool isLeftOuter = (expr->hasProperty(leftonlyAtom) || expr->hasProperty(leftouterAtom)); if (expr->hasProperty(leftonlyAtom)) flags.append("|JFexclude"); if (isLeftOuter) flags.append("|JFleftouter"); if (expr->hasProperty(firstAtom)) flags.append("|JFfirst"); if (expr->hasProperty(firstLeftAtom)) flags.append("|JFfirstleft"); if (transformContainsSkip(expr->queryChild(3))) flags.append("|JFtransformMaySkip"); if (info.isFetchFiltered()) flags.append("|JFfetchMayFilter"); if (rowlimit && rowlimit->hasProperty(skipAtom)) flags.append("|JFmatchAbortLimitSkips"); if (rowlimit && rowlimit->hasProperty(countAtom)) flags.append("|JFcountmatchabortlimit"); if (expr->hasProperty(onFailAtom)) flags.append("|JFonfail"); if (info.isKeyOpt()) flags.append("|JFindexoptional"); if (info.needToExtractJoinFields()) flags.append("|JFextractjoinfields"); if (expr->hasProperty(unorderedAtom)) flags.append("|JFreorderable"); if (transformReturnsSide(expr, no_left, 0)) flags.append("|JFtransformmatchesleft"); if (info.queryKeyFilename() && !info.queryKeyFilename()->isConstant()) flags.append("|JFvarindexfilename"); if (hasDynamicFilename(info.queryKey())) flags.append("|JFdynamicindexfilename"); if (boundIndexActivity) flags.append("|JFindexfromactivity"); if (flags.length()) doBuildUnsignedFunction(instance->classctx, "getJoinFlags", flags.str()+1); //Fetch flags flags.clear(); if (info.isFullJoin()) { if (info.isFileOpt()) flags.append("|FFdatafileoptional"); if (!info.queryFileFilename()->isConstant()) flags.append("|FFvarfilename"); if (hasDynamicFilename(info.queryFile())) flags.append("|FFdynamicfilename"); } if (flags.length()) doBuildUnsignedFunction(instance->classctx, "getFetchFlags", flags.str()+1); //virtual unsigned getJoinLimit() if (!isZero(atmostLimit)) doBuildUnsignedFunction(instance->startctx, "getJoinLimit", atmostLimit); //virtual unsigned getKeepLimit() LinkedHqlExpr keepLimit = queryPropertyChild(expr, keepAtom, 0); if (keepLimit) doBuildUnsignedFunction(instance->startctx, "getKeepLimit", keepLimit); bool implicitLimit = !rowlimit && !atmost && (!keepLimit || info.hasPostFilter()) && !expr->hasProperty(leftonlyAtom); //virtual unsigned getKeepLimit() doBuildJoinRowLimitHelper(*instance, rowlimit, info.queryKeyFilename(), implicitLimit); buildFormatCrcFunction(instance->classctx, "getIndexFormatCrc", info.queryRawKey(), info.queryRawKey(), 1); if (info.isFullJoin()) { buildFormatCrcFunction(instance->classctx, "getDiskFormatCrc", info.queryRawRhs(), NULL, 0); buildEncryptHelper(instance->startctx, info.queryFile()->queryProperty(encryptAtom), "getFileEncryptKey"); } IHqlExpression * key = info.queryKey(); buildSerializedLayoutMember(instance->classctx, key->queryRecord(), "getIndexLayout", numKeyedFields(key)); //--function to clear right, used for left outer join if (isLeftOuter || expr->hasProperty(onFailAtom)) info.buildClearRightFunction(instance->createctx); buildKeyedJoinExtra(*instance, expr, &info); buildKeyJoinIndexReadHelper(*instance, expr, &info); buildKeyJoinFetchHelper(*instance, expr, &info); info.buildLeftOnly(instance->startctx); if (targetRoxie()) { instance->addAttributeBool("_diskAccessRequired", info.isFullJoin()); instance->addAttributeBool("_isIndexOpt", info.isKeyOpt()); instance->addAttributeBool("_isOpt", info.isFileOpt()); } buildInstanceSuffix(instance); buildConnectInputOutput(ctx, instance, boundDataset1, 0, 0); if (boundIndexActivity) buildConnectInputOutput(ctx, instance, boundIndexActivity, 0, 1); addFileDependency(info.queryKeyFilename(), instance->queryBoundActivity()); if (info.isFullJoin()) addFileDependency(info.queryFileFilename(), instance->queryBoundActivity()); return instance->getBoundActivity(); } //--------------------------------------------------------------------------- ABoundActivity * HqlCppTranslator::doBuildActivityKeyedDistribute(BuildCtx & ctx, IHqlExpression * expr) { if (!targetThor()) return buildCachedActivity(ctx, expr->queryChild(0)); HqlExprArray leftSorts, rightSorts; IHqlExpression * left = expr->queryChild(0); IHqlExpression * right = expr->queryChild(1); IHqlExpression * indexRecord = right->queryRecord(); IHqlExpression * seq = querySelSeq(expr); bool isLimitedSubstringJoin; OwnedHqlExpr match = findJoinSortOrders(expr, leftSorts, rightSorts, isLimitedSubstringJoin, NULL); assertex(leftSorts.ordinality() == rightSorts.ordinality()); if (isLimitedSubstringJoin) throwError(HQLERR_KeyedDistributeNoSubstringJoin); unsigned numUnsortedFields = numPayloadFields(right); unsigned numKeyedFields = getFlatFieldCount(indexRecord)-numUnsortedFields; if (match || (!expr->hasProperty(firstAtom) && (leftSorts.ordinality() != numKeyedFields))) throwError(HQLERR_MustMatchExactly); //Should already be caught in parser KeyedJoinInfo info(*this, expr, false); info.processFilter(); Owned boundDataset1 = buildCachedActivity(ctx, left); Owned instance = new ActivityInstance(*this, ctx, TAKkeyeddistribute, expr, "KeyedDistribute"); buildActivityFramework(instance); StringBuffer s; buildInstancePrefix(instance); IHqlExpression * keyFilename = info.queryKeyFilename(); bool dynamic = hasDynamicFilename(info.queryKey()); //virtual unsigned getFlags() StringBuffer flags; if (!keyFilename->isConstant()) flags.append("|KDFvarindexfilename"); if (dynamic) flags.append("|KDFdynamicindexfilename"); if (flags.length()) doBuildUnsignedFunction(instance->classctx, "getFlags", flags.str()+1); //virtual const char * getIndexFileName() = 0; buildFilenameFunction(*instance, instance->startctx, "getIndexFileName", keyFilename, dynamic); //virtual IOutputMetaData * queryIndexRecordSize() = 0; //Excluding fpos and sequence buildMetaMember(instance->classctx, info.queryRawKey(), false, "queryIndexRecordSize"); //virtual void createSegmentMonitors(IIndexReadContext *ctx, const void *lhs) = 0; info.buildMonitors(instance->startctx); //The comparison is against the raw key entries, so expand out the logical to the physical //virtual ICompare * queryCompareRowKey() = 0; OwnedHqlExpr expandedIndex = convertToPhysicalIndex(right); assertex(expandedIndex->getOperator() == no_newusertable); IHqlExpression * rawIndex = expandedIndex->queryChild(0); OwnedHqlExpr oldSelector = createSelector(no_activetable, right, seq); OwnedHqlExpr newSelector = createSelector(no_activetable, rawIndex, seq); TableProjectMapper mapper(expandedIndex); HqlExprArray normalizedRight; ForEachItemIn(i, rightSorts) { IHqlExpression & curRight = rightSorts.item(i); normalizedRight.append(*mapper.expandFields(&curRight, oldSelector, newSelector)); } DatasetReference leftDs(left, no_activetable, seq); DatasetReference rightDs(rawIndex, no_activetable, seq); doCompareLeftRight(instance->nestedctx, "CompareRowKey", leftDs, rightDs, leftSorts, normalizedRight); buildFormatCrcFunction(instance->classctx, "getFormatCrc", info.queryRawKey(), info.queryRawKey(), 1); buildSerializedLayoutMember(instance->classctx, indexRecord, "getIndexLayout", numKeyedFields); OwnedHqlExpr matchExpr = info.getMatchExpr(true); assertex(!matchExpr); buildInstanceSuffix(instance); buildConnectInputOutput(ctx, instance, boundDataset1, 0, 0); Owned whoAmI = instance->getBoundActivity(); addFileDependency(keyFilename, whoAmI); return instance->getBoundActivity(); } //--------------------------------------------------------------------------- ABoundActivity * HqlCppTranslator::doBuildActivityKeyDiff(BuildCtx & ctx, IHqlExpression * expr, bool isRoot) { StringBuffer s; IHqlExpression * original = expr->queryChild(0); IHqlExpression * updated = expr->queryChild(1); IHqlExpression * output = expr->queryChild(2); Owned instance = new ActivityInstance(*this, ctx, TAKkeydiff, expr, "KeyDiff"); buildActivityFramework(instance, isRoot); buildInstancePrefix(instance); //virtual unsigned getFlags() = 0; StringBuffer flags; if (expr->hasProperty(overwriteAtom)) flags.append("|KDPoverwrite"); else if (expr->hasProperty(noOverwriteAtom)) flags.append("|KDPnooverwrite"); if (!output->isConstant()) flags.append("|KDPvaroutputname"); if (flags.length()) doBuildUnsignedFunction(instance->classctx, "getFlags", flags.str()+1); //virtual const char * queryOriginalName() = 0; // may be null buildRefFilenameFunction(*instance, instance->startctx, "queryOriginalName", original); //virtual const char * queryPatchName() = 0; buildRefFilenameFunction(*instance, instance->startctx, "queryUpdatedName", updated); //virtual const char * queryOutputName() = 0; buildFilenameFunction(*instance, instance->startctx, "queryOutputName", output, hasDynamicFilename(expr)); //virtual int getSequence() = 0; doBuildSequenceFunc(instance->classctx, querySequence(expr), false); buildExpiryHelper(instance->createctx, expr->queryProperty(expireAtom)); buildInstanceSuffix(instance); return instance->getBoundActivity(); } ABoundActivity * HqlCppTranslator::doBuildActivityKeyPatch(BuildCtx & ctx, IHqlExpression * expr, bool isRoot) { StringBuffer s; IHqlExpression * original = expr->queryChild(0); IHqlExpression * patch = expr->queryChild(1); IHqlExpression * output = expr->queryChild(2); Owned instance = new ActivityInstance(*this, ctx, TAKkeypatch, expr, "KeyPatch"); buildActivityFramework(instance, isRoot); buildInstancePrefix(instance); //virtual unsigned getFlags() = 0; StringBuffer flags; if (expr->hasProperty(overwriteAtom)) flags.append("|KDPoverwrite"); else if (expr->hasProperty(noOverwriteAtom)) flags.append("|KDPnooverwrite"); if (!output->isConstant()) flags.append("|KDPvaroutputname"); if (flags.length()) doBuildUnsignedFunction(instance->classctx, "getFlags", flags.str()+1); //virtual const char * queryOriginalName() = 0; buildRefFilenameFunction(*instance, instance->startctx, "queryOriginalName", original); //virtual const char * queryPatchName() = 0; buildFilenameFunction(*instance, instance->startctx, "queryPatchName", patch, true); //virtual const char * queryOutputName() = 0; buildFilenameFunction(*instance, instance->startctx, "queryOutputName", output, hasDynamicFilename(expr)); //virtual int getSequence() = 0; doBuildSequenceFunc(instance->classctx, querySequence(expr), false); buildExpiryHelper(instance->createctx, expr->queryProperty(expireAtom)); buildInstanceSuffix(instance); return instance->getBoundActivity(); } //--------------------------------------------------------------------------- IHqlExpression * querySelectorTable(IHqlExpression * expr) { loop { IHqlExpression * selector = expr->queryChild(0); if (selector->getOperator() != no_select) return selector; switch (selector->queryType()->getTypeCode()) { case type_row: case type_record: break; default: return selector; } } } static IHqlExpression * getBlobAttribute(IHqlExpression * ds) { return createAttribute(blobHelperAtom, LINK(ds->queryNormalizedSelector())); } IHqlExpression * queryBlobHelper(BuildCtx & ctx, IHqlExpression * select) { OwnedHqlExpr search = getBlobAttribute(querySelectorTable(select)); HqlExprAssociation * match = ctx.queryAssociation(search, AssocExpr, NULL); if (!match) return NULL; return match->queryExpr(); } void HqlCppTranslator::associateBlobHelper(BuildCtx & ctx, IHqlExpression * ds, const char * name) { OwnedHqlExpr search = getBlobAttribute(ds); OwnedHqlExpr matched = createVariable(name, ds->getType()); ctx.associateExpr(search, matched); } IHqlExpression * HqlCppTranslator::getBlobRowSelector(BuildCtx & ctx, IHqlExpression * expr) { IHqlExpression * id = expr->queryChild(0); IHqlExpression * helper = queryBlobHelper(ctx, id); //MORE: Need to clone the dataset attributes. Really they should be included in the type somehow: via modifiers? //or give an error if blob used on alien with ref/ OwnedHqlExpr field = createField(unnamedAtom, expr->getType(), NULL, NULL); HqlExprArray fields; fields.append(*LINK(field)); OwnedHqlExpr record = createRecord(fields); OwnedHqlExpr row = createRow(no_anon, LINK(record), createAttribute(_internal_Atom, LINK(id))); if (!ctx.queryAssociation(row, AssocRow, NULL)) { Owned rowType = makeReferenceModifier(row->getType()); OwnedHqlExpr boundRow = ctx.getTempDeclare(rowType, NULL); CHqlBoundExpr boundId; buildExpr(ctx, id, boundId); HqlExprArray args; args.append(*LINK(helper)); args.append(*LINK(boundId.expr)); OwnedHqlExpr call = bindTranslatedFunctionCall(lookupBlobAtom, args); ctx.addAssign(boundRow, call); bindRow(ctx, row, boundRow); } return createSelectExpr(LINK(row), LINK(field)); } void HqlCppTranslator::doBuildAssignIdToBlob(BuildCtx & ctx, const CHqlBoundTarget & target, IHqlExpression * expr) { if (!queryBlobHelper(ctx, expr->queryChild(0))) { //Do the assignment by building an expression then assigning doBuildExprAssign(ctx, target, expr); return; } OwnedHqlExpr select = getBlobRowSelector(ctx, expr); buildExprAssign(ctx, target, select); } void HqlCppTranslator::doBuildExprIdToBlob(BuildCtx & ctx, IHqlExpression * expr, CHqlBoundExpr & tgt) { if (!queryBlobHelper(ctx, expr->queryChild(0))) { if (!buildExprInCorrectContext(ctx, expr, tgt, false)) throwError(HQLERR_BlobTranslationContextNotFound); return; } OwnedHqlExpr select = getBlobRowSelector(ctx, expr); buildExpr(ctx, select, tgt); } IReferenceSelector * HqlCppTranslator::doBuildRowIdToBlob(BuildCtx & ctx, IHqlExpression * expr, bool isNew) { if (!queryBlobHelper(ctx, expr->queryChild(0))) throwError(HQLERR_AccessRowBlobInsideChildQuery); OwnedHqlExpr select = getBlobRowSelector(ctx, expr); return buildNewOrActiveRow(ctx, select, isNew); } void HqlCppTranslator::doBuildExprBlobToId(BuildCtx & ctx, IHqlExpression * expr, CHqlBoundExpr & tgt) { IHqlExpression * value = expr->queryChild(0); IHqlExpression * helper = queryBlobHelper(ctx, value); assertex(helper); Owned selector = buildReference(ctx, value); CHqlBoundExpr boundSize, boundAddress; selector->getSize(ctx, boundSize); selector->buildAddress(ctx, boundAddress); HqlExprArray args; args.append(*LINK(helper)); args.append(*LINK(boundSize.expr)); args.append(*LINK(boundAddress.expr)); tgt.expr.setown(bindTranslatedFunctionCall(createBlobAtom, args)); }