Explorar o código

HPCC-19743 Allow disk read to directly deserialize a row without calling a transform

Currently only implemented for Thor, since this is the engine that will
gain the main benefit when spill files are dynamically spilled.

Signed-off-by: Gavin Halliday <gavin.halliday@lexisnexis.com>
Gavin Halliday %!s(int64=6) %!d(string=hai) anos
pai
achega
d6efee502d

+ 1 - 2
ecl/hql/hqlutil.cpp

@@ -10316,12 +10316,11 @@ const RtlTypeInfo *buildRtlType(IRtlFieldTypeDeserializer &deserializer, ITypeIn
         }
     case type_table:
     case type_groupedtable:
+    case type_dictionary:
         {
             info.childType = buildRtlType(deserializer, ::queryRecordType(type));
             break;
         }
-    case type_dictionary:
-        return nullptr;  // MORE - does this leak?
     case type_blob:
     case type_set:
     case type_keyedint:

+ 1 - 0
ecl/hqlcpp/hqlcpp.cpp

@@ -1813,6 +1813,7 @@ void HqlCppTranslator::cacheOptions()
         DebugOption(options.addDefaultBloom,"addDefaultBloom", true),
         DebugOption(options.newDiskReadMapping, "newDiskReadMapping", true),
         DebugOption(options.transformNestedSequential, "transformNestedSequential", true),
+        DebugOption(options.forceAllProjectedDiskSerialized, "internalForceAllProjectedDiskSerialized", false),  // Delete in 8.0 once new code has been proved in anger
     };
 
     //get options values from workunit

+ 1 - 0
ecl/hqlcpp/hqlcpp.ipp

@@ -798,6 +798,7 @@ struct HqlCppOptions
     bool                addDefaultBloom;
     bool                newDiskReadMapping;
     bool                transformNestedSequential;
+    bool                forceAllProjectedDiskSerialized;
 };
 
 //Any information gathered while processing the query should be moved into here, rather than cluttering up the translator class

+ 15 - 12
ecl/hqlcpp/hqlsource.cpp

@@ -1123,13 +1123,6 @@ void SourceBuilder::buildReadMembers(IHqlExpression * expr)
 {
     buildFilenameMember();
 
-    //Sanity check to ensure that the projected row is only in the in memory format if it from a spill file, or no transform needs to be called.
-    if (needToCallTransform || transformCanFilter)
-    {
-        if (!tableExpr->hasAttribute(_spill_Atom) && recordRequiresSerialization(tableExpr->queryRecord(), diskAtom))
-            throwUnexpectedX("Projected dataset should have been serialized");
-    }
-
     //---- virtual bool needTransform() { return <bool>; } ----
     if (needToCallTransform || transformCanFilter)
         translator.doBuildBoolFunction(instance->classctx, "needTransform", true);
@@ -3064,6 +3057,16 @@ void DiskReadBuilder::buildTransform(IHqlExpression * expr)
         return;
     }
 
+    if (recordRequiresSerialization(tableExpr->queryRecord(), diskAtom))
+    {
+        //Sanity check to ensure that the projected row is only in the in memory format if no transform needs to be called.
+        if (needToCallTransform || transformCanFilter)
+            throwUnexpectedX("Projected dataset should have been serialized");
+
+        //Base implementation for a disk read throws an exception if it is called.
+        return;
+    }
+
     if (modeOp == no_csv)
     {
         translator.buildCsvParameters(instance->nestedctx, mode, NULL, true);
@@ -3113,15 +3116,15 @@ ABoundActivity * HqlCppTranslator::doBuildActivityDiskRead(BuildCtx & ctx, IHqlE
     if (info.newDiskReadMapping && (modeOp != no_csv) && (modeOp != no_xml) && (modeOp != no_pipe))
     {
         //The projected disk information (which is passed to the transform) uses the in memory format IFF
-        // - The source is a spill file (to allow efficient in memory mapping)
         // - The disk read is a trivial slimming transform (so no transform needs calling on the projected disk format.
+        // - It is used for all disk reads since directly transforming is always at least as efficient as going via
+        //   the serialized form.
         // Otherwise the table is converted to the serialized format.
 
-        //MORE: This shouldn't always need to be serialized - but engines crash at the moment if not
-        const bool forceAllProjectedSerialized = true;
+        const bool forceAllProjectedSerialized = options.forceAllProjectedDiskSerialized;
         //Reading from a spill file uses the in-memory format to optimize on-demand spilling.
-        bool optimizeInMemorySpill = !targetHThor() && tableExpr->hasAttribute(_spill_Atom);
-        bool useInMemoryFormat = optimizeInMemorySpill || isSimpleProjectingDiskRead(expr);
+        bool optimizeInMemorySpill = targetThor();
+        bool useInMemoryFormat = optimizeInMemorySpill && isSimpleProjectingDiskRead(expr);
         if (forceAllProjectedSerialized || !useInMemoryFormat)
         {
             //else if the the table isn't serialized, then map to a serialized table, and then project to the real format

+ 5 - 0
rtl/eclrtl/eclhelper_base.cpp

@@ -679,6 +679,11 @@ void CThorDiskReadArg::onKeyedLimitExceeded() { }
 ISteppingMeta * CThorDiskReadArg::queryRawSteppingMeta() { return NULL; }
 ISteppingMeta * CThorDiskReadArg::queryProjectedSteppingMeta() { return NULL; }
 void CThorDiskReadArg::mapOutputToInput(ARowBuilder & rowBuilder, const void * projectedRow, unsigned numFields) { }
+size32_t CThorDiskReadArg::transform(ARowBuilder & rowBuilder, const void * src)
+{
+    rtlFail(800, "transform() should not be called, input is deserialized");
+}
+
 size32_t CThorDiskReadArg::unfilteredTransform(ARowBuilder & rowBuilder, const void * src) { return 0; }
 size32_t CThorDiskReadArg::transformOnLimitExceeded(ARowBuilder & rowBuilder) { return 0; }
 size32_t CThorDiskReadArg::transformOnKeyedLimitExceeded(ARowBuilder & rowBuilder) { return 0; }

+ 11 - 0
rtl/eclrtl/rtlds.cpp

@@ -1442,6 +1442,17 @@ extern ECLRTL_API void rtlSerializeDictionaryToDataset(unsigned & tlen, void * &
     tgt = buffer.detach();      // not strictly speaking correct - it should have been allocated with rtlMalloc();
 }
 
+
+extern ECLRTL_API void rtlCreateDictionaryFromDataset(size32_t & count, const byte * * & rowset, IEngineRowAllocator * rowAllocator, IHThorHashLookupInfo & hashInfo)
+{
+    RtlLinkedDictionaryBuilder builder(rowAllocator, &hashInfo);
+    builder.appendRows(count, rowset);
+    rowAllocator->releaseRowset(count, rowset);
+    count = builder.getcount();
+    rowset = builder.linkrows();
+}
+
+
 //---------------------------------------------------------------------------
 
 RtlDatasetCursor::RtlDatasetCursor(size32_t _len, const void * _data)

+ 2 - 0
rtl/eclrtl/rtlds_imp.hpp

@@ -486,6 +486,7 @@ extern ECLRTL_API void rtlDeserializeDictionary(size32_t & count, const byte * *
 extern ECLRTL_API void rtlDeserializeDictionaryFromDataset(size32_t & count, const byte * * & rowset, IEngineRowAllocator * rowAllocator, IOutputRowDeserializer * deserializer, IHThorHashLookupInfo & hashInfo, size32_t lenSrc, const void * src);
 extern ECLRTL_API void rtlSerializeDictionary(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, const byte * * rows);
 extern ECLRTL_API void rtlSerializeDictionaryToDataset(unsigned & tlen, void * & tgt, IOutputRowSerializer * serializer, size32_t count, const byte * * rows);
+extern ECLRTL_API void rtlCreateDictionaryFromDataset(size32_t & count, const byte * * & rowset, IEngineRowAllocator * rowAllocator, IHThorHashLookupInfo & hashInfo);
 
 extern ECLRTL_API void rtlSerializeDictionary(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, const byte * * rows);
 extern ECLRTL_API void rtlSerializeDictionaryToDataset(IRowSerializerTarget & out, IOutputRowSerializer * serializer, size32_t count, const byte * * rows);
@@ -738,4 +739,5 @@ public:
 protected:
     CHThorDictHelper helper;
 };
+
 #endif

+ 27 - 2
rtl/eclrtl/rtldynfield.cpp

@@ -1265,6 +1265,14 @@ private:
                                     source += sourceType->queryChildType()->size(source, nullptr); // MORE - shame to repeat a calculation that the translate above almost certainly just did
                                 }
                             }
+                            if (type->getType() == type_dictionary)
+                            {
+                                const RtlTypeInfo * childType = type->queryChildType();
+                                assertex(childType && childType->getType() == type_record);
+                                CHThorHashLookupInfo lookupHelper(static_cast<const RtlRecordTypeInfo &>(*childType));
+                                rtlCreateDictionaryFromDataset(numRows, childRows, childAllocator, lookupHelper);
+                            }
+
                             // Go back in and patch the count, remembering it may have moved
                             rtlWriteInt4(builder.getSelf()+offset, numRows);
                             * ( const void * * ) (builder.getSelf()+offset+sizeof(size32_t)) = childRows;
@@ -1285,7 +1293,10 @@ private:
                                 const byte ** sourceRows = *(const byte***) source;
                                 for (size32_t childRow = 0; childRow < childCount; childRow++)
                                 {
-                                    offset = match.subTrans->doTranslate(builder, callback, offset, sourceRows[childRow]);
+                                    const byte * row = sourceRows[childRow];
+                                    //Dictionaries have blank rows - ignore them when serializing (to a dataset)
+                                    if (row)
+                                        offset = match.subTrans->doTranslate(builder, callback, offset, row);
                                 }
                             }
                             else
@@ -1394,6 +1405,19 @@ private:
         }
         return expectedSize;
     }
+    static bool canTranslateNonScalar(const RtlTypeInfo * type, const RtlTypeInfo * sourceType)
+    {
+        auto target = type->getType();
+        auto source = sourceType->getType();
+        if (target == source)
+            return true;
+        if ((target == type_dictionary) && (source == type_table))
+            return true;
+        if ((target == type_table) && (source == type_dictionary))
+            return true;
+        return false;
+    }
+
     void createMatchInfo()
     {
         for (unsigned idx = 0; idx < destRecInfo.getNumFields(); idx++)
@@ -1429,7 +1453,7 @@ private:
                 }
                 if (!type->isScalar() || !sourceType->isScalar())
                 {
-                    if (type->getType() != sourceType->getType())
+                    if (!canTranslateNonScalar(type, sourceType))
                         info.matchType = match_fail;  // No translation from one non-scalar type to another
                     else
                     {
@@ -1447,6 +1471,7 @@ private:
                         case type_ifblock:
                         case type_record:
                         case type_table:
+                        case type_dictionary:
                         {
                             const RtlRecord *subDest = destRecInfo.queryNested(idx);
                             const RtlRecord *subSrc = sourceRecInfo.queryNested(info.matchIdx);

+ 2 - 1
rtl/eclrtl/rtlrecord.cpp

@@ -258,7 +258,7 @@ RtlRecord::RtlRecord(const RtlFieldInfo * const *_fields, bool expandFields) : f
         const RtlTypeInfo *curType = queryType(i);
         if (!curType->isFixedSize() || (fields[i]->flags & RFTMinifblock))
             numVarFields++;
-        if (curType->getType()==type_table || curType->getType()==type_record)
+        if (curType->getType()==type_table || curType->getType()==type_record || curType->getType()==type_dictionary)
             numTables++;
     }
 
@@ -300,6 +300,7 @@ RtlRecord::RtlRecord(const RtlFieldInfo * const *_fields, bool expandFields) : f
         switch (curType->getType())
         {
         case type_table:
+        case type_dictionary:
             tableIds[curTable] = i;
             nestedTables[curTable++] = new RtlRecord(curType->queryChildType()->queryFields(), expandFields);
             break;

+ 1 - 0
rtl/include/eclhelper_base.hpp

@@ -905,6 +905,7 @@ class ECLRTL_API CThorDiskReadArg : public CThorArgOf<IHThorDiskReadArg>
     virtual ISteppingMeta * queryRawSteppingMeta() override;
     virtual ISteppingMeta * queryProjectedSteppingMeta() override;
     virtual void mapOutputToInput(ARowBuilder & rowBuilder, const void * projectedRow, unsigned numFields) override;
+    virtual size32_t transform(ARowBuilder & rowBuilder, const void * src) override;
     virtual size32_t unfilteredTransform(ARowBuilder & rowBuilder, const void * src) override;
 
     virtual size32_t transformOnLimitExceeded(ARowBuilder & rowBuilder) override;

+ 4 - 0
testing/regress/ecl/aggds3.ecl

@@ -21,16 +21,20 @@
 //version multiPart=true
 //version multiPart=false,useSequential=true
 //version keyedFilters=true
+//version keyedFilters=true,opRemoteRead=true
+//version keyedFilters=true,multiPart=true,useSequential=true,opRemoteRead=true
 
 import ^ as root;
 multiPart := #IFDEFINED(root.multiPart, false);
 useSequential := #IFDEFINED(root.useSequential, false);
 keyedFilters := #IFDEFINED(root.keyedFilters, false);
+optRemoteRead := #IFDEFINED(root.optRemoteRead, false);
 
 //--- end of version configuration ---
 
 #onwarning (2168, ignore);
 #option ('implicitKeyedDiskFilter', keyedFilters);
+#option('forceRemoteRead', optRemoteRead);
 
 import $.setup;
 sq := setup.sq(multiPart);

+ 26 - 7
testing/regress/ecl/sqfilt.ecl

@@ -16,12 +16,21 @@
 ############################################################################## */
 
 //version multiPart=false
+//version multiPart=true
+//version multiPart=false,useSequential=true
+//version multiPart=false,useSequential=true,useNoFold=true
+//version multiPart=false,useSequential=true,useNoFold=true,optRemoteRead=true
 
 import ^ as root;
 multiPart := #IFDEFINED(root.multiPart, false);
+useSequential := #IFDEFINED(root.useSequential, false);
+useNoFold := #IFDEFINED(root.useNoFold, false);
+optRemoteRead := #IFDEFINED(root.optRemoteRead, false);
 
 //--- end of version configuration ---
 
+#option('forceRemoteRead', optRemoteRead);
+
 import $.setup;
 sq := setup.sq(multiPart);
 
@@ -31,7 +40,11 @@ sq := setup.sq(multiPart);
 udecimal8 todaysDate := 20040602D;
 unsigned4 age(udecimal8 dob) := ((todaysDate - dob) / 10000D);
 
-//MORE: books[1] ave(books)
+#if (useNoFold)
+protect(virtual dataset x) := NOFOLD(x);
+#else
+protect(virtual dataset x) := x;
+#end
 
 // Different child operators, all inline.
 house := sq.HousePersonBookDs.persons;
@@ -45,10 +58,16 @@ personsDsDs := sq.PersonDs(houseid = sq.HouseDs.id);
 booksDsDsDs := sq.BookDs(personid = personsDsDs.id);
 
 //Who has a book worth more than their book limit? (nest, nest), (nest, ds) (ds, ds)
-t1 := table(sq.HousePersonBookDs, { addr, max(persons,booklimit), max(persons.books,price), count(persons(exists(books(price>persons.booklimit)))); });
-t2 := table(sq.HousePersonBookDs, { addr, count(persons(exists(booksDs(price>persons.booklimit)))); });
-t3 := table(sq.HouseDs, { addr, count(personsDsDs(exists(booksDsDsDs(price>personsDs.booklimit)))); });
+t1 := table(protect(sq.HousePersonBookDs), { addr, max(persons,booklimit), max(persons.books,price), count(persons(exists(books(price>persons.booklimit)))); });
+t2 := table(protect(sq.HousePersonBookDs), { addr, count(persons(exists(booksDs(price>persons.booklimit)))); });
+t3 := table(protect(sq.HouseDs), { addr, count(personsDsDs(exists(booksDsDsDs(price>personsDs.booklimit)))); });
 
-output(t1,,named('NumPeopleExceedBookLimit'));
-output(t2,,named('NumPeopleExceedBookLimitDs'));
-output(t3,,named('NumPeopleExceedBookLimitDsDs'));
+#if (useSequential)
+SEQUENTIAL(
+#end
+    output(t1,,named('NumPeopleExceedBookLimit'));
+    output(t2,,named('NumPeopleExceedBookLimitDs'));
+    output(t3,,named('NumPeopleExceedBookLimitDsDs'));
+#if (useSequential)
+);
+#end