|
@@ -79,7 +79,9 @@
|
|
|
#define MAX_HTTP_HEADERSIZE 8000
|
|
|
#define MIN_PAYLOAD_SIZE 800
|
|
|
|
|
|
+#ifdef _WIN32
|
|
|
#pragma warning(disable : 4355)
|
|
|
+#endif
|
|
|
#define DEFAULT_PARALLEL_LOOP_THREADS 1
|
|
|
|
|
|
#define PROBE
|
|
@@ -711,7 +713,7 @@ public:
|
|
|
{
|
|
|
if (idx != 0)
|
|
|
throw MakeStringException(ROXIE_SET_INPUT, "Internal error: id = %d : setInput() parameter out of bounds idx = %d at %s(%d)", id, idx, sanitizeSourceFile(__FILE__), __LINE__);
|
|
|
- if (input != -1)
|
|
|
+ if (input != (unsigned) -1)
|
|
|
throw MakeStringException(ROXIE_SET_INPUT, "Internal error: id = %d : setInput() called twice for input = %d source = %d inputidx = %d sourceidx = %d at %s(%d)", id, input, source, inputidx, sourceidx, sanitizeSourceFile(__FILE__), __LINE__);
|
|
|
input = source;
|
|
|
inputidx = sourceidx;
|
|
@@ -1353,7 +1355,7 @@ public:
|
|
|
//MORE: Create a filtered list and then use asyncfor
|
|
|
ForEachItemIn(idx, dependencies)
|
|
|
{
|
|
|
- if (dependencyControlIds.item(idx) == controlId)
|
|
|
+ if (dependencyControlIds.item(idx) == (int) controlId)
|
|
|
dependencies.item(idx).execute(parentExtractSize, parentExtract);
|
|
|
}
|
|
|
}
|
|
@@ -1362,7 +1364,7 @@ public:
|
|
|
{
|
|
|
ForEachItemIn(idx, dependencies)
|
|
|
{
|
|
|
- if (dependencyControlIds.item(idx) == controlId)
|
|
|
+ if (dependencyControlIds.item(idx) == (int) controlId)
|
|
|
dependencies.item(idx).stop();
|
|
|
}
|
|
|
}
|
|
@@ -3280,12 +3282,10 @@ public:
|
|
|
class CRowArrayMessageResult : implements IMessageResult, public CInterface
|
|
|
{
|
|
|
ConstPointerArray data;
|
|
|
- IRowManager &rowManager;
|
|
|
- bool variableSize;
|
|
|
|
|
|
public:
|
|
|
IMPLEMENT_IINTERFACE;
|
|
|
- CRowArrayMessageResult(IRowManager &_rowManager, bool _variableSize) : rowManager(_rowManager), variableSize(_variableSize)
|
|
|
+ CRowArrayMessageResult()
|
|
|
{
|
|
|
}
|
|
|
|
|
@@ -6866,8 +6866,6 @@ public:
|
|
|
|
|
|
virtual void onExecute()
|
|
|
{
|
|
|
- unsigned sequence = helper.querySequence();
|
|
|
-
|
|
|
RtlLinkedDictionaryBuilder builder(rowAllocator, helper.queryHashLookupInfo());
|
|
|
for (;;)
|
|
|
{
|
|
@@ -7102,13 +7100,12 @@ IRoxieServerActivityFactory *createRoxieServerGraphLoopResultReadActivityFactory
|
|
|
|
|
|
class CRoxieServerGraphLoopResultWriteActivity : public CRoxieServerInternalSinkActivity
|
|
|
{
|
|
|
- IHThorGraphLoopResultWriteArg &helper;
|
|
|
ILocalGraphEx * graph;
|
|
|
unsigned graphId;
|
|
|
|
|
|
public:
|
|
|
CRoxieServerGraphLoopResultWriteActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _graphId, unsigned _numOutputs)
|
|
|
- : CRoxieServerInternalSinkActivity(_ctx, _factory, _probeManager, _numOutputs), helper((IHThorGraphLoopResultWriteArg &)basehelper), graphId(_graphId)
|
|
|
+ : CRoxieServerInternalSinkActivity(_ctx, _factory, _probeManager, _numOutputs), graphId(_graphId)
|
|
|
{
|
|
|
graph = NULL;
|
|
|
}
|
|
@@ -11339,12 +11336,11 @@ IRoxieServerActivityFactory *createRoxieServerHashAggregateActivityFactory(unsig
|
|
|
|
|
|
class CRoxieServerDegroupActivity : public CRoxieServerActivity
|
|
|
{
|
|
|
- IHThorDegroupArg &helper;
|
|
|
bool eof;
|
|
|
|
|
|
public:
|
|
|
CRoxieServerDegroupActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
|
|
|
- : CRoxieServerActivity(_ctx, _factory, _probeManager), helper((IHThorDegroupArg &)basehelper)
|
|
|
+ : CRoxieServerActivity(_ctx, _factory, _probeManager)
|
|
|
{
|
|
|
eof = false;
|
|
|
}
|
|
@@ -12038,11 +12034,10 @@ public:
|
|
|
|
|
|
class CRoxieServerDiskWriteActivityFactory : public CRoxieServerMultiOutputFactory
|
|
|
{
|
|
|
- bool isRoot;
|
|
|
bool isTemp;
|
|
|
public:
|
|
|
- CRoxieServerDiskWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot)
|
|
|
- : CRoxieServerMultiOutputFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode), isRoot(_isRoot)
|
|
|
+ CRoxieServerDiskWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
|
|
|
+ : CRoxieServerMultiOutputFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode)
|
|
|
{
|
|
|
Owned<IHThorDiskWriteArg> helper = (IHThorDiskWriteArg *) helperFactory();
|
|
|
isTemp = (helper->getFlags() & TDXtemporary) != 0;
|
|
@@ -12079,9 +12074,9 @@ public:
|
|
|
|
|
|
};
|
|
|
|
|
|
-IRoxieServerActivityFactory *createRoxieServerDiskWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode, bool _isRoot)
|
|
|
+IRoxieServerActivityFactory *createRoxieServerDiskWriteActivityFactory(unsigned _id, unsigned _subgraphId, IQueryFactory &_queryFactory, HelperFactory *_helperFactory, ThorActivityKind _kind, IPropertyTree &_graphNode)
|
|
|
{
|
|
|
- return new CRoxieServerDiskWriteActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode, _isRoot);
|
|
|
+ return new CRoxieServerDiskWriteActivityFactory(_id, _subgraphId, _queryFactory, _helperFactory, _kind, _graphNode);
|
|
|
}
|
|
|
|
|
|
//=================================================================================
|
|
@@ -12321,8 +12316,6 @@ public:
|
|
|
|
|
|
virtual void setFileProperties(IFileDescriptor *desc) const
|
|
|
{
|
|
|
- IPropertyTree &partProps = desc->queryPart(0)->queryProperties(); //properties of the first file part.
|
|
|
- IPropertyTree &fileProps = desc->queryProperties(); // properties of the logical file
|
|
|
// Now publish to name services
|
|
|
StringBuffer dir,base;
|
|
|
offset_t indexFileSize = writer->queryFile()->size();
|
|
@@ -14062,14 +14055,13 @@ IRoxieServerActivityFactory *createRoxieServerMergeActivityFactory(unsigned _id,
|
|
|
|
|
|
class CRoxieServerRegroupActivity : public CRoxieServerMultiInputActivity
|
|
|
{
|
|
|
- IHThorRegroupArg &helper;
|
|
|
unsigned streamIndex;
|
|
|
bool eof;
|
|
|
unsigned __int64 numProcessedLastGroup;
|
|
|
|
|
|
public:
|
|
|
CRoxieServerRegroupActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, unsigned _numInputs)
|
|
|
- : CRoxieServerMultiInputActivity(_ctx, _factory, _probeManager, _numInputs), helper((IHThorRegroupArg &)basehelper)
|
|
|
+ : CRoxieServerMultiInputActivity(_ctx, _factory, _probeManager, _numInputs)
|
|
|
{
|
|
|
streamIndex = 0;
|
|
|
eof = false;
|
|
@@ -14741,7 +14733,6 @@ IRoxieServerActivityFactory *createRoxieServerProjectActivityFactory(unsigned _i
|
|
|
class CRoxieServerPrefetchProjectActivity : public CRoxieServerActivity, implements IRecordPullerCallback
|
|
|
{
|
|
|
unsigned numProcessedLastGroup;
|
|
|
- bool count;
|
|
|
bool eof;
|
|
|
bool allPulled;
|
|
|
bool isThreaded;
|
|
@@ -14776,9 +14767,8 @@ class CRoxieServerPrefetchProjectActivity : public CRoxieServerActivity, impleme
|
|
|
CriticalSection pulledCrit;
|
|
|
|
|
|
public:
|
|
|
- CRoxieServerPrefetchProjectActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager, bool _count)
|
|
|
+ CRoxieServerPrefetchProjectActivity(IRoxieSlaveContext *_ctx, const IRoxieServerActivityFactory *_factory, IProbeManager *_probeManager)
|
|
|
: CRoxieServerActivity(_ctx, _factory, _probeManager),
|
|
|
- count(_count),
|
|
|
helper((IHThorPrefetchProjectArg &) basehelper),
|
|
|
puller(false)
|
|
|
{
|
|
@@ -14969,7 +14959,7 @@ public:
|
|
|
|
|
|
virtual IRoxieServerActivity *createActivity(IRoxieSlaveContext *_ctx, IProbeManager *_probeManager) const
|
|
|
{
|
|
|
- return new CRoxieServerPrefetchProjectActivity(_ctx, this, _probeManager, count);
|
|
|
+ return new CRoxieServerPrefetchProjectActivity(_ctx, this, _probeManager);
|
|
|
}
|
|
|
};
|
|
|
|
|
@@ -17202,7 +17192,6 @@ public:
|
|
|
virtual void onCreate(IHThorArg *_colocalParent)
|
|
|
{
|
|
|
CRoxieServerNaryActivity::onCreate(_colocalParent);
|
|
|
- ICodeContext * codectx = ctx->queryCodeContext();
|
|
|
inputAllocator.setown(createRowAllocator(helper.queryInputMeta()));
|
|
|
outputAllocator.setown(createRowAllocator(helper.queryOutputMeta()));
|
|
|
}
|
|
@@ -24133,7 +24122,7 @@ public:
|
|
|
|
|
|
if (count)
|
|
|
{
|
|
|
- Owned<CRowArrayMessageResult> result = new CRowArrayMessageResult(ctx->queryRowManager(), false);
|
|
|
+ Owned<CRowArrayMessageResult> result = new CRowArrayMessageResult();
|
|
|
if (count > choosenLimit)
|
|
|
count = choosenLimit;
|
|
|
|
|
@@ -24372,7 +24361,7 @@ public:
|
|
|
{
|
|
|
size32_t size = meta.getRecordSize(rowBuilder.getSelf());
|
|
|
const void * recBuffer = rowBuilder.finalizeRowClear(size);
|
|
|
- Owned<CRowArrayMessageResult> result = new CRowArrayMessageResult(ctx->queryRowManager(), meta.isVariableSize());
|
|
|
+ Owned<CRowArrayMessageResult> result = new CRowArrayMessageResult();
|
|
|
result->append(recBuffer);
|
|
|
remote.injectResult(result.getClear());
|
|
|
}
|
|
@@ -24514,7 +24503,7 @@ public:
|
|
|
|
|
|
virtual bool processSingleKey(IKeyIndex *key, const IDynamicTransform * trans) override
|
|
|
{
|
|
|
- Owned<CRowArrayMessageResult> result = new CRowArrayMessageResult(ctx->queryRowManager(), meta.isVariableSize());
|
|
|
+ Owned<CRowArrayMessageResult> result = new CRowArrayMessageResult();
|
|
|
singleAggregator.start(rowAllocator, ctx->queryCodeContext(), activityId);
|
|
|
ThorActivityKind kind = factory->getKind();
|
|
|
while (tlk->lookup(true))
|
|
@@ -24551,7 +24540,6 @@ public:
|
|
|
Owned<AggregateRowBuilder> next = singleAggregator.nextResult();
|
|
|
if (!next)
|
|
|
break;
|
|
|
- size32_t size = next->querySize();
|
|
|
result->append(next->finalizeRowClear());
|
|
|
}
|
|
|
remote.injectResult(result.getClear());
|
|
@@ -24673,7 +24661,7 @@ public:
|
|
|
|
|
|
if (readHelper.first(tlk->queryKeyBuffer()))
|
|
|
{
|
|
|
- Owned<CRowArrayMessageResult> result = new CRowArrayMessageResult(ctx->queryRowManager(), meta.isVariableSize());
|
|
|
+ Owned<CRowArrayMessageResult> result = new CRowArrayMessageResult();
|
|
|
do
|
|
|
{
|
|
|
if (accepted>=choosenLimit)
|
|
@@ -25679,7 +25667,7 @@ public:
|
|
|
// MORE - this code seems to be duplicated in half keyed
|
|
|
unsigned accepted = 0;
|
|
|
unsigned rejected = 0;
|
|
|
- Owned<CRowArrayMessageResult> result = new CRowArrayMessageResult(ctx->queryRowManager(), true);
|
|
|
+ Owned<CRowArrayMessageResult> result = new CRowArrayMessageResult();
|
|
|
jg->notePending();
|
|
|
unsigned candidateCount = 0;
|
|
|
ScopedAtomic<unsigned> indexRecordsRead(::indexRecordsRead);
|
|
@@ -26524,7 +26512,7 @@ public:
|
|
|
unsigned rejected = 0;
|
|
|
Owned<CRowArrayMessageResult> result;
|
|
|
if (!isSimple)
|
|
|
- result.setown(new CRowArrayMessageResult(ctx->queryRowManager(), true));
|
|
|
+ result.setown(new CRowArrayMessageResult());
|
|
|
// MORE - This code seems to be duplicated in keyedJoinHead
|
|
|
jg->notePending();
|
|
|
unsigned candidateCount = 0;
|
|
@@ -26544,7 +26532,7 @@ public:
|
|
|
CPrefixedRowBuilder pb(KEYEDJOIN_RECORD_SIZE(0), rb);
|
|
|
accepted++;
|
|
|
KLBlobProviderAdapter adapter(tlk);
|
|
|
- size32_t joinFieldsSize = helper.extractJoinFields(pb, indexRow, &adapter);
|
|
|
+ helper.extractJoinFields(pb, indexRow, &adapter);
|
|
|
KeyedJoinHeader *rec = (KeyedJoinHeader *) rb.getUnfinalizedClear(); // lack of finalize ok as unserialized data here.
|
|
|
rec->fpos = fpos;
|
|
|
rec->thisGroup = jg;
|
|
@@ -28360,8 +28348,8 @@ protected:
|
|
|
class casyncfor: public CAsyncFor
|
|
|
{
|
|
|
public:
|
|
|
- casyncfor(CcdServerTest *_parent, IRoxieServerActivity *_activity, ArrayOf<IFinalRoxieInput *> &_outputs, char const * const *_input, char const * const *_output, unsigned _repeat)
|
|
|
- : activity(_activity), outputs(_outputs), input(_input), output(_output), repeat(_repeat), parent(_parent)
|
|
|
+ casyncfor(ArrayOf<IFinalRoxieInput *> &_outputs, char const * const *_output, unsigned _repeat)
|
|
|
+ : outputs(_outputs), output(_output), repeat(_repeat)
|
|
|
{}
|
|
|
void Do(unsigned i)
|
|
|
{
|
|
@@ -28403,13 +28391,10 @@ protected:
|
|
|
outStream->stop();
|
|
|
}
|
|
|
private:
|
|
|
- IRoxieServerActivity *activity;
|
|
|
ArrayOf<IFinalRoxieInput *> &outputs;
|
|
|
- char const * const *input;
|
|
|
char const * const *output;
|
|
|
unsigned repeat;
|
|
|
- CcdServerTest *parent;
|
|
|
- } afor(this, activity, out, input, output, repeat);
|
|
|
+ } afor(out, output, repeat);
|
|
|
afor.For(numOutputs, numOutputs);
|
|
|
|
|
|
ASSERT(in.state == TestInput::STATEstopped);
|