Browse Source

HPCC-12677 Allow max CSV row size to be configured

The max CSV row size (default 10MB) prevents unbound CSV lines
causing problems (e.g. consuming all of memory).
10MB should be sufficient in 99% of cases, but sometimes a higher
limit is required, so allow the hard-limit to be configured via
a query #option('maxCsvRowSize', <MB>)

Signed-off-by: Jake Smith <jake.smith@lexisnexis.com>
Jake Smith 10 years ago
parent
commit
86c337e190

+ 2 - 1
ecl/hthor/hthor.cpp

@@ -52,6 +52,7 @@ static unsigned const hthorReadBufferSize = 0x10000;
 static offset_t const defaultHThorDiskWriteSizeLimit = I64C(10*1024*1024*1024); //10 GB, per Nigel
 static size32_t const spillStreamBufferSize = 0x10000;
 static unsigned const hthorPipeWaitTimeout = 100; //100ms - fairly arbitrary choice
+static unsigned const defaultMaxCsvRowSize = 10; // MB
 
 using roxiemem::IRowManager;
 using roxiemem::OwnedRoxieRow;
@@ -8727,6 +8728,7 @@ const void *CHThorDiskGroupAggregateActivity::nextInGroup()
 
 CHThorCsvReadActivity::CHThorCsvReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorCsvReadArg &_arg, ThorActivityKind _kind) : CHThorDiskReadBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind), helper(_arg)
 {
+    maxRowSize = agent.queryWorkUnit()->getDebugValueInt("maxCsvRowSize", defaultMaxCsvRowSize);
 }
 
 CHThorCsvReadActivity::~CHThorCsvReadActivity()
@@ -8788,7 +8790,6 @@ const void *CHThorCsvReadActivity::nextInGroup()
         if (!inputstream->eos())
         {
             size32_t rowSize = 4096; // MORE - make configurable
-            size32_t maxRowSize = 10*1024*1024; // MORE - make configurable
             size32_t thisLineLength;
             loop
             {

+ 1 - 0
ecl/hthor/hthor.ipp

@@ -2299,6 +2299,7 @@ protected:
     CSVSplitter         csvSplitter;    
     unsigned __int64 limit;
     unsigned __int64 stopAfter;
+    size32_t maxRowSize;
 };
 
 class CHThorXmlReadActivity : public CHThorDiskReadBaseActivity, implements IXMLSelect

+ 4 - 1
thorlcr/activities/csvread/thcsvrslave.cpp

@@ -33,6 +33,8 @@
 #include "csvsplitter.hpp"
 #include "thdiskbaseslave.ipp"
 
+static unsigned const defaultMaxCsvRowSize = 10; // MB
+
 class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDataLink
 {
     IHThorCsvReadArg *helper;
@@ -55,13 +57,13 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
         CRC32 inputCRC;
         bool readFinished;
         offset_t localOffset;
+        size32_t maxRowSize;
 
         unsigned splitLine()
         {
             if (inputStream->eos())
                 return 0;
             size32_t minRequired = 4096; // MORE - make configurable
-            size32_t maxRowSize = 10*1024*1024; // MORE - make configurable
             size32_t thisLineLength;
             loop
             {
@@ -86,6 +88,7 @@ class CCsvReadSlaveActivity : public CDiskReadSlaveActivityBase, public CThorDat
             //Initialise information...
             ICsvParameters * csvInfo = activity.helper->queryCsvParameters();
             csvSplitter.init(activity.helper->getMaxColumns(), csvInfo, activity.csvQuote, activity.csvSeparate, activity.csvTerminate, activity.csvEscape);
+            maxRowSize = activity.getOptInt(THOROPT_MAXCSVROWSIZE, defaultMaxCsvRowSize) * 1024 * 1024;
         }
         virtual void setPart(IPartDescriptor *partDesc, unsigned partNoSerialized)
         {

+ 1 - 0
thorlcr/thorutil/thormisc.hpp

@@ -69,6 +69,7 @@
 #define THOROPT_LKJOIN_LOCALFAILOVER  "lkjoin_localfailover"    // Force SMART to failover to distributed local lookup join (for testing only)   (default = false)
 #define THOROPT_LKJOIN_HASHJOINFAILOVER "lkjoin_hashjoinfailover" // Force SMART to failover to hash join (for testing only)                     (default = false)
 #define THOROPT_MAX_KERNLOG           "max_kern_level"          // Max kernel logging level, to push to workunit, -1 to disable                  (default = 3)
+#define THOROPT_MAXCSVROWSIZE         "maxCsvRowSize"           // Upper limit on csv read line size                                             (default = 10 [MB])
 
 #define INITIAL_SELFJOIN_MATCH_WARNING_LEVEL 20000  // max of row matches before selfjoin emits warning