123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241 |
- /*##############################################################################
- HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems®.
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- ############################################################################## */
- #include "jliball.hpp"
- #include "jqueue.tpp"
- #include "jisem.hpp"
- #include "rtlformat.hpp"
- #include "thorxmlread.hpp"
- #include "thorxmlwrite.hpp"
- #include "thorcommon.ipp"
- #include "thorsoapcall.hpp"
- #include "securesocket.hpp"
- #include "eclrtl.hpp"
- #include "roxiemem.hpp"
- #include "zcrypt.hpp"
- using roxiemem::OwnedRoxieString;
- #ifndef _WIN32
- #include <stdexcept>
- #endif
- #include <new>
- #define CONTENT_LENGTH "Content-Length: "
- #define CONTENT_ENCODING "Content-Encoding"
- #define ACCEPT_ENCODING "Accept-Encoding"
- unsigned soapTraceLevel = 1;
- #define WSCBUFFERSIZE 0x10000
- #define MAXWSCTHREADS 50 //Max Web Service Call Threads
- void multiLog(const IContextLogger &logger, const char *format, ...) __attribute__((format(printf, 2, 3)));
- void multiLog(const IContextLogger &logger, const char *format, ...)
- {
- va_list args;
- va_start(args, format);
- StringBuffer log;
- log.limited_valist_appendf(1024*1024, format, args);
- va_end(args);
- StringArray loglines;
- log.replace('\r', ' ');
- loglines.appendList(log, "\n", false);
- ForEachItemIn(idx, loglines)
- {
- logger.CTXLOG("%s", loglines.item(idx));
- }
- }
- interface DECL_EXCEPTION IReceivedRoxieException : extends IException
- {
- public:
- virtual const void *errorRow() = 0;
- };
- #define EXCEPTION_PREFIX "ReceivedRoxieException:"
- class DECL_EXCEPTION ReceivedRoxieException: public IReceivedRoxieException, public CInterface
- {
- public:
- IMPLEMENT_IINTERFACE;
- ReceivedRoxieException(int code, const char *_msg, const void *_row=NULL) : errcode(code), msg(_msg), row(_row) { };
- int errorCode() const { return (errcode); };
- StringBuffer & errorMessage(StringBuffer &str) const
- {
- if (strncmp(str.str(), EXCEPTION_PREFIX, strlen(EXCEPTION_PREFIX)) == 0)
- str.append(msg);
- else
- str.append(EXCEPTION_PREFIX).append(" (").append(msg).append(")");
- return str;
- };
- MessageAudience errorAudience() const { return (MSGAUD_user); };
- const void *errorRow() { return (row); };
- private:
- int errcode;
- StringAttr msg;
- const void *row;
- };
- //=================================================================================================
- class Url : public CInterface, implements IInterface
- {
- public:
- IMPLEMENT_IINTERFACE;
- StringAttr method;
- StringAttr host;
- unsigned port;
- StringBuffer path;
- StringBuffer userPasswordPair;
- StringBuffer &getUrlString(StringBuffer &url) const
- {
- return url.append(method).append("://").append(host).append(":").append(port).append(path);
- }
- IException *getUrlException(IException *e) const
- {
- StringBuffer url;
- StringBuffer text;
- e->errorMessage(text);
- rtlAddExceptionTag(text, "url", getUrlString(url).str());
- if (text.length() <= 1024)
- return MakeStringException(e->errorCode(), "%s", text.str());
- else
- return MakeStringExceptionDirect(e->errorCode(), text.str());
- }
- Url() : port(0)
- {
- }
- private:
- char translateHex(char hex) {
- if(hex >= 'A')
- return (hex & 0xdf) - 'A' + 10;
- else
- return hex - '0';
- }
- void userPassword_decode(const char* userpass, StringBuffer& result)
- {
- if(!userpass || !*userpass)
- return;
- const char *finger = userpass;
- while (*finger)
- {
- char c = *finger++;
- if (c == '%')
- {
- if(*finger != '\0')
- {
- c = translateHex(*finger);
- finger++;
- }
- if(*finger != '\0')
- {
- c = (char)(c*16 + translateHex(*finger));
- finger++;
- }
- }
- result.append(c);
- }
- return;
- }
- void cleanPath(const char *src, StringBuffer &output)
- {
- const char *srcPtr = src;
- char *dst = output.reserveTruncate(strlen(src));
- char *dstPtr = dst;
- while (*srcPtr)
- {
- *dstPtr++ = *srcPtr;
- if ('/' == *srcPtr++)
- {
- while ('/' == *srcPtr)
- ++srcPtr;
- }
- }
- *dstPtr = '\0';
- output.setLength(dstPtr-dst);
- }
- public:
- Url(char *urltext)
- {
- char *p;
- if ((p = strstr(urltext, "://")) != NULL)
- {
- *p = 0;
- p += 3; // skip past the colon-slash-slash
- method.set(urltext);
- urltext = p;
- }
- else
- throw MakeStringException(-1, "Malformed URL");
- if ((p = strchr(urltext, '@')) != NULL)
- {
- // extract username & password
- *p = 0;
- p++;
- userPassword_decode(urltext, userPasswordPair);
- urltext = p;
- }
- if ((p = strchr(urltext, ':')) != NULL)
- {
- // extract the port
- *p = 0;
- p++;
- port = atoi(p);
- host.set(urltext);
- if ((p = strchr(p, '/')) != NULL)
- cleanPath(p, path);
- else
- path.append("/");
- }
- else
- {
- // no port - look at method for port
- if (stricmp(method.str(), "https") == 0)
- port = 443;
- else if (stricmp(method.str(), "http") == 0)
- port = 80;
- else
- throw MakeStringException(-1, "Unsupported access method");
- if ((p = strchr(urltext, '/')) != NULL)
- {
- *p = 0;
- host.set(urltext);
- *p = '/';
- cleanPath(p, path);
- }
- else
- {
- host.set(urltext);
- path.append("/");
- }
- }
- // While the code below would make some sense, there is code that relies on being able to use invalid IP addresses and catching the
- // errors that result via ONFAIL.
- #if 0
- IpAddress ipaddr(host);
- if ( ipaddr.isNull())
- throw MakeStringException(-1, "Invalid IP address %s", host.str());
- #endif
- }
- };
- typedef IArrayOf<Url> UrlArray;
- //=================================================================================================
- //MORE: This whole class should really be replaced with a single function, avoiding at least one string clone.
- class UrlListParser
- {
- public:
- UrlListParser(const char * text)
- {
- fullText = text ? strdup(text) : NULL;
- }
- ~UrlListParser()
- {
- free(fullText);
- }
- unsigned getUrls(UrlArray &array)
- {
- if (fullText)
- {
- char *copyFullText = strdup(fullText);
- char *saveptr;
- char *url = strtok_r(copyFullText, "|", &saveptr);
- while (url != NULL)
- {
- array.append(*new Url(url));
- url = strtok_r(NULL, "|", &saveptr);
- }
- free(copyFullText);
- }
- return array.ordinality();
- }
- private:
- char *fullText;
- };
- //=================================================================================================
- #define BLACKLIST_RETRIES 10
- #define ROXIE_ABORT_EVENT 1407
- #define TIMELIMIT_EXCEEDED 1408
- class BlackLister : public CInterface, implements IThreadFactory
- {
- SocketEndpointArray list;
- Owned<IThreadPool> pool;
- CriticalSection crit;
- private:
- inline void checkRoxieAbortMonitor(IRoxieAbortMonitor * roxieAbortMonitor)
- {
- if (roxieAbortMonitor)
- {
- try
- {
- roxieAbortMonitor->checkForAbort();//throws
- }
- catch (IException *e)
- {
- StringBuffer s;
- e->errorMessage(s);
- e->Release();
- throw MakeStringException(ROXIE_ABORT_EVENT, "%s", s.str());
- }
- }
- }
- public:
- bool lookup(SocketEndpoint &ep, const IContextLogger &logctx)
- {
- CriticalBlock b(crit);
- if (list.find(ep)!=NotFound)
- {
- if (soapTraceLevel > 3)
- {
- StringBuffer s;
- logctx.CTXLOG("socket %s is blacklisted", ep.getUrlStr(s).str());
- }
- return true;
- }
- return false;
- }
- void blacklist(SocketEndpoint &ep, const IContextLogger &logctx)
- {
- CriticalBlock b(crit);
- if (list.find(ep)==NotFound)
- {
- if (soapTraceLevel > 0)
- {
- StringBuffer s;
- logctx.CTXLOG("Blacklisting socket %s", ep.getUrlStr(s).str());
- }
- list.append(ep);
- pool->start(&ep);
- }
- }
- void deblacklist(SocketEndpoint &ep)
- {
- CriticalBlock b(crit);
- unsigned idx = list.find(ep);
- if (idx!=NotFound)
- {
- if (soapTraceLevel > 0)
- {
- StringBuffer s;
- DBGLOG("De-blacklisting socket %s", ep.getUrlStr(s).str());
- }
- list.remove(idx);
- }
- }
- public:
- IMPLEMENT_IINTERFACE;
- BlackLister()
- {
- pool.setown(createThreadPool("SocketBlacklistPool", this, NULL, 0, 0));
- }
- ISocket* connect(SocketEndpoint &ep,
- const IContextLogger &logctx,
- unsigned retries,
- unsigned timeoutMS,
- IRoxieAbortMonitor * roxieAbortMonitor)
- {
- if (lookup(ep, logctx))
- {
- StringBuffer s;
- ep.getUrlStr(s);
- throw MakeStringException(-1, "blacklisted socket %s", s.str());
- }
- Owned<IException> exc;
- try
- {
- checkRoxieAbortMonitor(roxieAbortMonitor);
- Owned<ISocket> sock;
- Owned<ISocketConnectWait> scw = nonBlockingConnect(ep, timeoutMS == WAIT_FOREVER ? 60000 : timeoutMS*(retries+1));
- for (;;)
- {
- sock.setown(scw->wait(1000));//throws if connect fails or timeoutMS
- checkRoxieAbortMonitor(roxieAbortMonitor);
- if (sock)
- return sock.getLink();
- }
- }
- catch (IJSOCK_Exception *e)
- {
- EXCLOG(e,"BlackLister::connect");
- if (exc)
- e->Release();
- else
- exc.setown(e);
- }
- blacklist(ep, logctx);
- if (exc->errorCode()==JSOCKERR_connection_failed) {
- StringBuffer s;
- ep.getUrlStr(s);
- throw MakeStringException(JSOCKERR_connection_failed, "connection failed %s", s.str());
- }
- throw exc.getClear();
- return NULL;
- }
- bool blacklisted (unsigned short port, char const* host)
- {
- SocketEndpoint ep(host, port);
- return (list.find(ep)!=NotFound);
- }
- ISocket* connect(unsigned short port,
- char const* host,
- const IContextLogger &logctx,
- unsigned retries,
- unsigned timeoutMS,
- IRoxieAbortMonitor * roxieAbortMonitor )
- {
- SocketEndpoint ep(host, port);
- return connect(ep, logctx, retries, timeoutMS, roxieAbortMonitor);
- }
- virtual IPooledThread *createNew()
- {
- class SocketDeblacklister : public CInterface, implements IPooledThread
- {
- SocketEndpoint ep;
- BlackLister &parent;
- Semaphore stopped;
- public:
- IMPLEMENT_IINTERFACE;
- SocketDeblacklister(BlackLister &_parent): parent(_parent)
- {
- }
- ~SocketDeblacklister()
- {
- }
- virtual void init(void *param) override
- {
- ep.set(*(SocketEndpoint *) param);
- }
- virtual void threadmain() override
- {
- unsigned delay = 5000;
- for (unsigned i = 0; i < BLACKLIST_RETRIES; i++)
- {
- try
- {
- Owned<ISocket> s = ISocket::connect_timeout(ep, 10000);
- s->close();
- break;
- }
- catch (IJSOCK_Exception *E)
- {
- // EXCLOG(E, "While updating socket blacklist"); // MORE - may need to downgrade if this fires traps
- E->Release();
- if (stopped.wait(delay))
- return;
- delay += delay;
- }
- }
- parent.deblacklist(ep);
- }
- virtual bool stop() override
- {
- stopped.signal();
- return true;
- }
- virtual bool canReuse() const override { return true; }
- };
- return new SocketDeblacklister(*this);
- }
- void stop()
- {
- pool->stopAll();
- }
- } *blacklist;
- MODULE_INIT(INIT_PRIORITY_STANDARD)
- {
- blacklist = new BlackLister;
- return true;
- }
- MODULE_EXIT()
- {
- blacklist->stop();
- delete blacklist;
- }
- //=================================================================================================
- class ColumnProvider : public IColumnProvider, public CInterface
- {
- public:
- ColumnProvider(unsigned _callLatencyMs) : callLatencyMs(_callLatencyMs), base(NULL) {}
- IMPLEMENT_IINTERFACE;
- virtual bool getBool(const char * path) { return base->getBool(path); }
- virtual void getData(size32_t len, void * text, const char * path) { base->getData(len, text, path); }
- virtual void getDataX(size32_t & len, void * & text, const char * path) { base->getDataX(len, text, path); }
- virtual __int64 getInt(const char * path)
- {
- __int64 ret = base->getInt(path);
- if((ret==0) && path && *path=='_')
- {
- if (stricmp(path, "_call_latency_ms")==0)
- ret = callLatencyMs;
- else if (stricmp(path, "_call_latency")==0)
- ret = (callLatencyMs + 500) / 1000;
- }
- return ret;
- }
- virtual __uint64 getUInt(const char * path)
- {
- __uint64 ret = base->getUInt(path);
- if((ret==0) && path && *path=='_')
- {
- if (stricmp(path, "_call_latency_ms")==0)
- ret = callLatencyMs;
- else if (stricmp(path, "_call_latency")==0)
- ret = (callLatencyMs + 500) / 1000;
- }
- return ret;
- }
- virtual void getQString(size32_t len, char * text, const char * path) { base->getQString(len, text, path); }
- virtual void getString(size32_t len, char * text, const char * path) { base->getString(len, text, path); }
- virtual void getStringX(size32_t & len, char * & text, const char * path) { base->getStringX(len, text, path); }
- virtual void getUnicodeX(size32_t & len, UChar * & text, const char * path) { base->getUnicodeX(len, text, path); }
- virtual bool getIsSetAll(const char * path) { return base->getIsSetAll(path); }
- virtual IColumnProviderIterator * getChildIterator(const char * path) { return base->getChildIterator(path); }
- virtual void getUtf8X(size32_t & len, char * & text, const char * path) { base->getUtf8X(len, text, path); }
- virtual bool readBool(const char * path, bool _default) { return base->readBool(path, _default); }
- virtual void readData(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { base->readData(len, text, path, _lenDefault, _default); }
- virtual void readDataX(size32_t & len, void * & text, const char * path, size32_t _lenDefault, const void * _default) { base->readDataX(len, text, path, _lenDefault, _default); }
- virtual __int64 readInt(const char * path, __int64 _default)
- {
- if(path && *path=='_')
- {
- if (stricmp(path, "_call_latency_ms")==0)
- return callLatencyMs;
- else if (stricmp(path, "_call_latency")==0)
- return (callLatencyMs + 500) / 1000;
- }
- return base->readInt(path, _default);
- }
- virtual __uint64 readUInt(const char * path, __uint64 _default)
- {
- if(path && *path=='_')
- {
- if (stricmp(path, "_call_latency_ms")==0)
- return callLatencyMs;
- else if (stricmp(path, "_call_latency")==0)
- return (callLatencyMs + 500) / 1000;
- }
- return base->readUInt(path, _default);
- }
- virtual void readQString(size32_t len, char * text, const char * path, size32_t _lenDefault, const char * _default) { base->readQString(len, text, path, _lenDefault, _default); }
- virtual void readString(size32_t len, char * text, const char * path, size32_t _lenDefault, const char * _default) { base->readString(len, text, path, _lenDefault, _default); }
- virtual void readStringX(size32_t & len, char * & text, const char * path, size32_t _lenDefault, const char * _default) { base->readStringX(len, text, path, _lenDefault, _default); }
- virtual void readUnicodeX(size32_t & len, UChar * & text, const char * path, size32_t _lenDefault, const UChar * _default) { base->readUnicodeX(len, text, path, _lenDefault, _default); }
- virtual bool readIsSetAll(const char * path, bool _default) { return base->readIsSetAll(path, _default); }
- virtual void readUtf8X(size32_t & len, char * & text, const char * path, size32_t _lenDefault, const char * _default) { base->readUtf8X(len, text, path, _lenDefault, _default); }
- void setBase(IColumnProvider * _base) { base = _base; }
- virtual void getDataRaw(size32_t len, void * text, const char * path) { base->getDataRaw(len, text, path); }
- virtual void getDataRawX(size32_t & len, void * & text, const char * path) { base->getDataRawX(len, text, path); }
- virtual void readDataRaw(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { base->readDataRaw(len, text, path, _lenDefault, _default); }
- virtual void readDataRawX(size32_t & len, void * & text, const char * path, size32_t _lenDefault, const void * _default) { base->readDataRawX(len, text, path, _lenDefault, _default); }
- protected:
- IColumnProvider * base;
- private:
- unsigned callLatencyMs;
- };
- //=================================================================================================
- //Same as ColumnProvider, except returns data as hex64Binary instead of 16 bit hex
- class ColumnProviderData64 : public ColumnProvider
- {
- public:
- ColumnProviderData64(unsigned _callLatencyMs) : ColumnProvider(_callLatencyMs) {}
- virtual void getData(size32_t len, void * text, const char * path) { ColumnProvider::getDataRaw(len, text, path); }
- virtual void getDataX(size32_t & len, void * & text, const char * path) { ColumnProvider::getDataRawX(len, text, path); }
- virtual void readData(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { ColumnProvider::readDataRaw(len, text, path, _lenDefault, _default); }
- virtual void readDataX(size32_t len, void * text, const char * path, size32_t _lenDefault, const void * _default) { ColumnProvider::readDataRawX(len, text, path, _lenDefault, _default); }
- };
- //=================================================================================================
- IColumnProvider * CreateColumnProvider(unsigned _callLatencyMs, bool _encoding)
- {
- if (!_encoding)
- return new ColumnProvider(_callLatencyMs);
- else
- return new ColumnProviderData64(_callLatencyMs);
- }
- //=================================================================================================
- enum WSCType{STsoap, SThttp} ; //web service call type
- //Web Services Call Asynchronous For
- interface IWSCAsyncFor: public IInterface
- {
- virtual void processException(const Url &url, const void *row, IException *e) = 0;
- virtual void processException(const Url &url, ConstPointerArray &inputRows, IException *e) = 0;
- virtual void checkTimeLimitExceeded(unsigned * _remainingMS) = 0;
- virtual void createHttpRequest(Url &url, StringBuffer &request) = 0;
- virtual int readHttpResponse(StringBuffer &response, ISocket *socket) = 0;
- virtual void processResponse(Url &url, StringBuffer &response, ColumnProvider * meta) = 0;
- virtual const char *getResponsePath() = 0;
- virtual ConstPointerArray & getInputRows() = 0;
- virtual IWSCHelper * getMaster() = 0;
- virtual IEngineRowAllocator * getOutputAllocator() = 0;
-
- virtual void For(unsigned num,unsigned maxatonce,bool abortFollowingException=false,bool shuffled=false) = 0;
- virtual void Do(unsigned idx=0)=0;
- };
- class CWSCHelper;
- IWSCAsyncFor * createWSCAsyncFor(CWSCHelper * _master, IXmlWriterExt &_xmlWriter, ConstPointerArray &_inputRows, PTreeReaderOptions _options);
- //=================================================================================================
- #define CBExceptionPathExc 0x0001
- #define CBExceptionPathExcsExc 0x0002
- #define CBExceptionPathExcs 0x0004
- class CMatchCB : implements IXMLSelect, public CInterface
- {
- IWSCAsyncFor &parent;
- const Url &url;
- StringAttr tail;
- ColumnProvider * meta;
- StringAttr excPath;
- unsigned excFlags = 0;
- public:
- IMPLEMENT_IINTERFACE;
- CMatchCB(IWSCAsyncFor &_parent, const Url &_url, const char *_tail, ColumnProvider * _meta, const char *_excPath, unsigned _excFlags) : parent(_parent), url(_url), tail(_tail), meta(_meta), excFlags(_excFlags), excPath(_excPath)
- {
- }
- bool checkGetExceptionEntry(bool check, Owned<IColumnProviderIterator> &excIter, IColumnProvider &parent, IColumnProvider *&excEntry, const char *path)
- {
- if (!check)
- return false;
- excIter.setown(parent.getChildIterator(path));
- excEntry = excIter->first();
- return excEntry != nullptr;
- }
- IColumnProvider *getExceptionEntry(Owned<IColumnProviderIterator> &excIter, IColumnProvider &parent)
- {
- IColumnProvider *excEntry = nullptr;
- if (excPath.length())
- {
- checkGetExceptionEntry(true, excIter, parent, excEntry, excPath.str()); //set by user so don't try others
- return excEntry;
- }
- if (checkGetExceptionEntry((excFlags & CBExceptionPathExc)!=0, excIter, parent, excEntry, "Exception"))
- return excEntry;
- if (checkGetExceptionEntry((excFlags & CBExceptionPathExcsExc)!=0, excIter, parent, excEntry, "Exceptions/Exception")) //ESP xml array
- return excEntry;
- checkGetExceptionEntry((excFlags & CBExceptionPathExcs)!=0, excIter, parent, excEntry, "Exceptions"); //json array
- return excEntry;
- }
- virtual void match(IColumnProvider &entry, offset_t startOffset, offset_t endOffset)
- {
- Owned<IException> e;
- if (tail.length()||excPath.length())
- {
- StringBuffer path(parent.getResponsePath());
- unsigned idx = (unsigned)entry.getInt(path.append("/@sequence").str());
- Owned<IColumnProviderIterator> excIter;
- IColumnProvider *excptEntry = getExceptionEntry(excIter, entry);
- if (excptEntry)
- {
- int code = (int)excptEntry->getInt("Code");
- size32_t len;
- char *message;
- excptEntry->getStringX(len, message, "Message");
- StringBuffer mstr(len, message);
- rtlFree(message);
- DBGLOG("Roxie exception: %s", mstr.str());
- e.setown(new ReceivedRoxieException(code, mstr.str()));
- parent.processException(url, parent.getInputRows().item(idx), e.getLink());
- }
- }
- if (parent.getMaster()->getRowTransformer() && !e)
- {
- IEngineRowAllocator * outputAllocator = parent.getOutputAllocator();
- Owned<IColumnProviderIterator> iter = entry.getChildIterator(tail);
- IColumnProvider *rowEntry = iter->first();
- while (rowEntry)
- {
- RtlDynamicRowBuilder rowBuilder(outputAllocator);
- size32_t sizeGot;
- if(meta)
- {
- meta->setBase(rowEntry);
- sizeGot = parent.getMaster()->transformRow(rowBuilder, meta);
- }
- else
- {
- sizeGot = parent.getMaster()->transformRow(rowBuilder, rowEntry);
- }
- if (sizeGot > 0)
- parent.getMaster()->putRow(rowBuilder.finalizeRowClear(sizeGot));
- rowEntry = iter->next();
- }
- }
- }
- };
- //=================================================================================================
- bool isValidHttpValue(char const * in)
- {
- for (const byte * ptr = (byte*)in; *ptr; ++ptr)
- if (((*ptr <= '\x1F') && (*ptr != '\x09')) || (*ptr == '\x7F'))
- return false;
- return true;
- }
- //=================================================================================================
- //Web Service Call helper thread
- class CWSCHelperThread : public Thread
- {
- private:
- CWSCHelper * master;
- virtual void outputRows(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows, const char *itemtag=NULL, bool encode_off=false, char const * itemns = NULL);
- virtual void createESPQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows);
- virtual void createSOAPliteralOrEncodedQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows);
- virtual void createXmlSoapQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows);
- virtual void createHttpPostQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows, bool appendRequestToName, bool appendEncodeFlag);
- virtual void processQuery(ConstPointerArray &inputRows);
-
- //Thread
- virtual int run();
- public:
- CWSCHelperThread(CWSCHelper * _master) : Thread("CWSCHelperThread")
- {
- master = _master;
- }
- ~CWSCHelperThread()
- {
- }
- };
- //=================================================================================================
- class CWSCHelper : implements IWSCHelper, public CInterface
- {
- private:
- SimpleInterThreadQueueOf<const void, true> outputQ;
- SpinLock outputQLock;
- CriticalSection toXmlCrit, transformCrit, onfailCrit, timeoutCrit;
- unsigned done;
- Owned<IPropertyTree> xpathHints;
- Linked<ClientCertificate> clientCert;
- static CriticalSection secureContextCrit;
- static Owned<ISecureSocketContext> secureContext;
- CTimeMon timeLimitMon;
- bool complete, timeLimitExceeded;
- IRoxieAbortMonitor * roxieAbortMonitor;
- protected:
- CIArrayOf<CWSCHelperThread> threads;
- WSCType wscType;
- public:
- IMPLEMENT_IINTERFACE;
- CWSCHelper(IWSCRowProvider *_rowProvider, IEngineRowAllocator * _outputAllocator, const char *_authToken, WSCMode _wscMode, ClientCertificate *_clientCert, const IContextLogger &_logctx, IRoxieAbortMonitor *_roxieAbortMonitor, WSCType _wscType)
- : logctx(_logctx), outputAllocator(_outputAllocator), clientCert(_clientCert), roxieAbortMonitor(_roxieAbortMonitor)
- {
- wscMode = _wscMode;
- wscType = _wscType;
- done = 0;
- complete = aborted = timeLimitExceeded = false;
- rowProvider = _rowProvider;
- helper = rowProvider->queryActionHelper();
- callHelper = rowProvider->queryCallHelper();
- flags = helper->getFlags();
- OwnedRoxieString s;
- authToken.append(_authToken);
- if (helper->numRetries() < 0)
- maxRetries = 3; // default to 3 retries
- else
- maxRetries = helper->numRetries();
- //Allow all of these options to be specified separately. Possibly useful, and the code is cleaner.
- logMin = (flags & SOAPFlogmin) != 0;
- logXML = (flags & SOAPFlog) != 0;
- logUserMsg = (flags & SOAPFlogusermsg) != 0;
- double dval = helper->getTimeout(); // In seconds, but may include fractions of a second...
- if (dval < 0.0) //not provided, or out of range
- timeoutMS = 300*1000; // 300 second default
- else if (dval == 0)
- timeoutMS = WAIT_FOREVER;
- else
- timeoutMS = (unsigned)(dval * 1000);
- dval = helper->getTimeLimit();
- if (dval <= 0.0)
- timeLimitMS = WAIT_FOREVER;
- else
- timeLimitMS = (unsigned)(dval * 1000);
- if (flags & SOAPFhttpheaders)
- httpHeaders.set(s.setown(helper->getHttpHeaders()));
- if (flags & SOAPFxpathhints)
- {
- s.setown(helper->getXpathHintsXml());
- xpathHints.setown(createPTreeFromXMLString(s.get()));
- }
- StringAttr proxyAddress;
- proxyAddress.set(s.setown(helper->getProxyAddress()));
- if (!proxyAddress.isEmpty())
- {
- UrlListParser proxyUrlListParser(proxyAddress);
- if (0 == proxyUrlListParser.getUrls(proxyUrlArray))
- throw MakeStringException(0, "SOAPCALL PROXYADDRESS specified no URLs");
- }
- if (wscType == STsoap)
- {
- soapaction.set(s.setown(helper->getSoapAction()));
- if(soapaction.get() && !isValidHttpValue(soapaction.get()))
- throw MakeStringException(-1, "SOAPAction value contained illegal characters: %s", soapaction.get());
- httpHeaderName.set(s.setown(helper->getHttpHeaderName()));
- if(httpHeaderName.get() && !isValidHttpValue(httpHeaderName.get()))
- throw MakeStringException(-1, "HTTPHEADER name contained illegal characters: %s", httpHeaderName.get());
- httpHeaderValue.set(s.setown(helper->getHttpHeaderValue()));
- if(httpHeaderValue.get() && !isValidHttpValue(httpHeaderValue.get()))
- throw MakeStringException(-1, "HTTPHEADER value contained illegal characters: %s", httpHeaderValue.get());
- if ((flags & SOAPFliteral) && (flags & SOAPFencoding))
- throw MakeStringException(0, "SOAPCALL 'LITERAL' and 'ENCODING' options are mutually exclusive");
- rowHeader.set(s.setown(helper->getHeader()));
- rowFooter.set(s.setown(helper->getFooter()));
- if (flags & SOAPFmarkupinfo)
- {
- rootHeader.set(s.setown(helper->getRequestHeader()));
- rootFooter.set(s.setown(helper->getRequestFooter()));
- }
- if(flags & SOAPFnamespace)
- {
- OwnedRoxieString ns = helper->getNamespaceName();
- if(ns && *ns)
- xmlnamespace.set(ns);
- }
- }
- if (wscType == SThttp)
- {
- //Check for unsupported flags
- if ((flags & SOAPFliteral) || (flags & SOAPFencoding))
- throw MakeStringException(0, "HTTPCALL 'LITERAL' and 'ENCODINGD' options not supported");
- }
- if (callHelper)
- {
- OwnedRoxieString iteratorPath(callHelper->getInputIteratorPath());
- char const * ipath = iteratorPath;
- if(ipath && (*ipath == '/'))
- ++ipath;
- inputpath.set(ipath);
- }
-
- service.set(s.setown(helper->getService()));
- service.trim();
- if (wscType == SThttp)
- {
- service.toUpperCase(); //GET/PUT/POST
- if (strcmp(service.str(), "GET") != 0)
- throw MakeStringException(0, "HTTPCALL Only 'GET' http method currently supported");
- OwnedRoxieString acceptTypeSupplied(helper->getAcceptType()); // text/html, text/xml, etc
- acceptType.set(acceptTypeSupplied);
- acceptType.trim();
- acceptType.toLowerCase();
- }
- if (callHelper)
- rowTransformer = callHelper->queryInputTransformer();
- else
- rowTransformer = NULL;
- OwnedRoxieString hosts(helper->getHosts());
- UrlListParser urlListParser(hosts);
- if ((numUrls = urlListParser.getUrls(urlArray)) > 0)
- {
- if (wscMode == SCrow)
- {
- numRowThreads = 1;
- numUrlThreads = helper->numParallelThreads();
- if (numUrlThreads == 0)
- numUrlThreads = 1;
- else if (numUrlThreads > MAXWSCTHREADS)
- numUrlThreads = MAXWSCTHREADS;
- numRecordsPerBatch = 1;
- }
- else
- {
- unsigned totThreads = helper->numParallelThreads();
- if (totThreads < 1)
- totThreads = 2; // default to 2 threads
- else if (totThreads > MAXWSCTHREADS)
- totThreads = MAXWSCTHREADS;
- numUrlThreads = (numUrls < totThreads)? numUrls: totThreads;
- numRowThreads = totThreads / numUrlThreads;
- if (numRowThreads < 1)
- numRowThreads = 1;
- else if (numRowThreads > MAXWSCTHREADS)
- numRowThreads = MAXWSCTHREADS;
- numRecordsPerBatch = helper->numRecordsPerBatch();
- if (numRecordsPerBatch < 1)
- numRecordsPerBatch = 1;
- }
- }
- else
- throw MakeStringException(0, "%sCALL specified no URLs",wscType == STsoap ? "SOAP" : "HTTP");
- for (unsigned i=0; i<numRowThreads; i++)
- threads.append(*new CWSCHelperThread(this));
- }
- ~CWSCHelper()
- {
- complete = true;
- waitUntilDone();
- threads.kill();
- }
- void waitUntilDone()
- {
- ForEachItemIn(i,threads)
- threads.item(i).join();
- for (;;)
- {
- const void *row = outputQ.dequeueNow();
- if (!row)
- break;
- outputAllocator->releaseRow(row);
- }
- outputQ.reset();
- }
- void start()
- {
- if (timeLimitMS != WAIT_FOREVER)
- timeLimitMon.reset(timeLimitMS);
- done = 0;
- complete = aborted = timeLimitExceeded = false;
- ForEachItemIn(i,threads)
- threads.item(i).start();
- }
- void abort()
- {
- aborted = true;
- complete = true;
- outputQ.stop();
- }
- const void * getRow()
- {
- if (complete)
- return NULL;
- for (;;)
- {
- const void *row = outputQ.dequeue();
- if (aborted)
- break;
- if (row)
- return row;
- // should only be here if setDone() triggered
- complete = true;
- Owned<IException> e = getError();
- if (e)
- throw e.getClear();
- break;
- }
- return NULL;
- }
- IException * getError()
- {
- SpinBlock sb(outputQLock);
- return error.getLink();
- }
- inline IEngineRowAllocator * queryOutputAllocator() const { return outputAllocator; }
- #ifdef _USE_OPENSSL
- ISecureSocket *createSecureSocket(ISocket *sock)
- {
- {
- CriticalBlock b(secureContextCrit);
- if (!secureContext)
- {
- if (clientCert != NULL)
- secureContext.setown(createSecureSocketContextEx(clientCert->certificate, clientCert->privateKey, clientCert->passphrase, ClientSocket));
- else
- secureContext.setown(createSecureSocketContext(ClientSocket));
- }
- }
- return secureContext->createSecureSocket(sock);
- }
- #endif
- bool isTimeLimitExceeded(unsigned *_remainingMS)
- {
- if (timeLimitMS != WAIT_FOREVER)
- {
- CriticalBlock block(timeoutCrit);
- if (timeLimitExceeded || timeLimitMon.timedout(_remainingMS))
- {
- timeLimitExceeded = true;
- return true;
- }
- }
- else
- *_remainingMS = (unsigned)-1;
- return false;
- }
- void addUserLogMsg(const byte * row)
- {
- if (logUserMsg)
- {
- size32_t lenText;
- rtlDataAttr text;
- helper->getLogText(lenText, text.refstr(), row);
- logctx.CTXLOG("%s: %.*s", wscCallTypeText(), lenText, text.getstr());
- }
- }
- inline IXmlToRowTransformer * getRowTransformer() { return rowTransformer; }
- inline const char * wscCallTypeText() const { return wscType == STsoap ? "SOAPCALL" : "HTTPCALL"; }
- protected:
- friend class CWSCHelperThread;
- friend class CWSCAsyncFor;
- void putRow(const void * row)
- {
- assertex(row);
- outputQ.enqueue(row);
- }
- void setDone()
- {
- bool doStop;
- {
- SpinBlock sb(outputQLock);
- done++;
- doStop = (done == numRowThreads);
- }
- if (doStop)
- {
- // Note - Don't stop the queue - that effectively discards what's already on there,
- // which is not what we want.
- // Instead, push a NULL to indicate the end of the output.
- outputQ.enqueue(NULL);
- }
- }
- void setErrorOwn(IException * e)
- {
- SpinBlock sb(outputQLock);
- if (error)
- ::Release(e);
- else
- error.setown(e);
- }
- void toXML(const byte * self, IXmlWriterExt & out) { CriticalBlock block(toXmlCrit); helper->toXML(self, out); }
- size32_t transformRow(ARowBuilder & rowBuilder, IColumnProvider * row)
- {
- CriticalBlock block(transformCrit);
- NullDiskCallback callback;
- return rowTransformer->transform(rowBuilder, row, &callback);
- }
- unsigned onFailTransform(ARowBuilder & rowBuilder, const void * left, IException * e) { CriticalBlock block(onfailCrit); return callHelper->onFailTransform(rowBuilder, left, e); }
- StringBuffer authToken;
- WSCMode wscMode;
- IWSCRowProvider * rowProvider;
- IHThorWebServiceCallActionArg * helper;
- IHThorWebServiceCallArg * callHelper;
- Linked<IEngineRowAllocator> outputAllocator;
- Owned<IException> error;
- UrlArray urlArray;
- UrlArray proxyUrlArray;
- unsigned numRecordsPerBatch;
- unsigned numUrls;
- unsigned numRowThreads;
- unsigned numUrlThreads;
- unsigned maxRetries;
- unsigned timeoutMS;
- unsigned timeLimitMS;
- bool logXML;
- bool logMin;
- bool logUserMsg;
- bool aborted;
- const IContextLogger &logctx;
- unsigned flags;
- StringAttr soapaction;
- StringAttr httpHeaderName;
- StringAttr httpHeaderValue;
- StringAttr httpHeaders;
- StringAttr inputpath;
- StringBuffer service;
- StringBuffer acceptType;//for httpcall, text/plain, text/html, text/xml, etc
- StringAttr rowHeader;
- StringAttr rowFooter;
- StringAttr rootHeader;
- StringAttr rootFooter;
- StringAttr xmlnamespace;
- IXmlToRowTransformer * rowTransformer;
- };
- CriticalSection CWSCHelper::secureContextCrit;
- Owned<ISecureSocketContext> CWSCHelper::secureContext; // created on first use
- //=================================================================================================
- void CWSCHelperThread::outputRows(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows, const char *itemtag, bool encode_off, char const * itemns)
- {
- ForEachItemIn(idx, inputRows)
- {
- if (idx!=0)
- xmlWriter.checkDelimiter();
- if (itemtag && *itemtag) //TAG
- {
- xmlWriter.outputBeginNested(itemtag, true);
- if(itemns)
- xmlWriter.outputXmlns("xmlns", itemns);
- }
- if (master->rowHeader.get()) //OPTIONAL HEADER (specified by "HEADING" option)
- xmlWriter.outputInline(master->rowHeader.get());
- //XML ROW CONTENT
- master->toXML((const byte *)inputRows.item(idx), xmlWriter);
- if (master->rowFooter.get()) //OPTION FOOTER
- xmlWriter.outputInline(master->rowFooter.get());
- if (encode_off) //ENCODING
- xmlWriter.outputInt(0, 1, "encode_");
- if (itemtag && *itemtag) //TAG
- xmlWriter.outputEndNested(itemtag);
- master->addUserLogMsg((const byte *)inputRows.item(idx));
- }
- }
- void CWSCHelperThread::createHttpPostQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows, bool appendRequestToName, bool appendEncodeFlag)
- {
- StringBuffer method_tag;
- method_tag.append(master->service);
- if (method_tag.length() && appendRequestToName)
- method_tag.append("Request");
- StringBuffer array_tag;
- StringAttr method_ns;
- if (master->rootHeader.get()) //OPTIONAL ROOT REQUEST HEADER
- xmlWriter.outputInline(master->rootHeader.get());
- if (inputRows.ordinality() > 1)
- {
- if (!(master->flags & SOAPFnoroot))
- {
- if (method_tag.length())
- {
- array_tag.append(method_tag).append("Array");
- xmlWriter.outputBeginNested(array_tag, true);
- if (master->xmlnamespace.get())
- xmlWriter.outputXmlns("xmlns", master->xmlnamespace);
- }
- }
- xmlWriter.outputBeginArray(method_tag);
- }
- else
- {
- if(master->xmlnamespace.get())
- method_ns.set(master->xmlnamespace.get());
- }
- outputRows(xmlWriter, inputRows, method_tag.str(), appendEncodeFlag ? (inputRows.ordinality() == 1) : false, method_ns.get());
- if (inputRows.ordinality() > 1)
- {
- xmlWriter.outputEndArray(method_tag);
- if (appendEncodeFlag)
- xmlWriter.outputInt(0, 1, "encode_");
- if (!(master->flags & SOAPFnoroot))
- {
- if (method_tag.length())
- xmlWriter.outputEndNested(array_tag);
- }
- }
- if (master->rootFooter.get()) //OPTIONAL ROOT REQUEST FOOTER
- xmlWriter.outputInline(master->rootFooter.get());
- }
- void CWSCHelperThread::createESPQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows)
- {
- createHttpPostQuery(xmlWriter, inputRows, true, true);
- }
- //Create servce xml request body, with binding usage of either Literal or Encoded
- //Note that Encoded usage requires type encoding for data fields
- void CWSCHelperThread::createSOAPliteralOrEncodedQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows)
- {
- xmlWriter.outputBeginNested(master->service, true);
- if (master->flags & SOAPFencoding)
- xmlWriter.outputCString("http://schemas.xmlsoap.org/soap/encoding/", "@soapenv:encodingStyle");
- if (master->xmlnamespace.get())
- xmlWriter.outputXmlns("xmlns", master->xmlnamespace.get());
- outputRows(xmlWriter, inputRows);
- xmlWriter.outputEndNested(master->service);
- }
- //Create SOAP body of http request
- void CWSCHelperThread::createXmlSoapQuery(IXmlWriterExt &xmlWriter, ConstPointerArray &inputRows)
- {
- xmlWriter.outputQuoted("<?xml version=\"1.0\" encoding=\"UTF-8\"?>");
- xmlWriter.outputBeginNested("soap:Envelope", true);
- xmlWriter.outputXmlns("soap", "http://schemas.xmlsoap.org/soap/envelope/");
- if (master->flags & SOAPFencoding)
- { //SOAP RPC/encoded. 'Encoded' usage includes type encoding
- xmlWriter.outputXmlns("xsd", "http://www.w3.org/2001/XMLSchema");
- xmlWriter.outputXmlns("xsi", "http://www.w3.org/2001/XMLSchema-instance");
- }
- xmlWriter.outputBeginNested("soap:Body", true);
- if (master->flags & SOAPFliteral || master->flags & SOAPFencoding)
- createSOAPliteralOrEncodedQuery(xmlWriter, inputRows);
- else
- createESPQuery(xmlWriter, inputRows);
- xmlWriter.outputEndNested("soap:Body");
- xmlWriter.outputEndNested("soap:Envelope");
- }
- void CWSCHelperThread::processQuery(ConstPointerArray &inputRows)
- {
- unsigned xmlWriteFlags = 0;
- unsigned xmlReadFlags = ptr_ignoreNameSpaces;
- if (master->flags & SOAPFtrim)
- xmlWriteFlags |= XWFtrim;
- if ((master->flags & SOAPFpreserveSpace) == 0)
- xmlReadFlags |= ptr_ignoreWhiteSpace;
- bool useMarkup = (master->flags & SOAPFmarkupinfo);
- XMLWriterType xmlType = WTStandard;
- if (useMarkup && (master->flags & SOAPFjson))
- xmlType = (master->flags & SOAPFnoroot) ? WTJSONRootless : WTJSONObject;
- else if (master->flags & SOAPFencoding)
- xmlType = WTEncodingData64;
- Owned<IXmlWriterExt> xmlWriter = createIXmlWriterExt(xmlWriteFlags, 0, nullptr, xmlType);
- if (useMarkup)
- createHttpPostQuery(*xmlWriter, inputRows, false, false);
- else if (master->wscType == STsoap )
- createXmlSoapQuery(*xmlWriter, inputRows);
- xmlWriter->finalize();
- Owned<IWSCAsyncFor> casyncfor = createWSCAsyncFor(master, *xmlWriter, inputRows, (PTreeReaderOptions) xmlReadFlags);
- casyncfor->For(master->numUrls, master->numUrlThreads,false,true); // shuffle URLS for poormans load balance
- }
- int CWSCHelperThread::run()
- {
- ConstPointerArray inputRows;
- if (master->wscMode == SCrow)
- {
- inputRows.append(NULL);
- try
- {
- processQuery(inputRows);
- }
- catch (IException *e)
- {
- master->setErrorOwn(e);
- }
- inputRows.pop();
- }
- else
- {
- // following a bit odd but preserving previous semantics (except fixing abort leak)
- for (;;)
- {
- try
- {
- while (!master->complete && !master->error.get())
- {
- if (master->aborted) {
- while (inputRows.ordinality() > 0)
- master->rowProvider->releaseRow(inputRows.popGet());
- return 0;
- }
- const void *r = master->rowProvider->getNextRow();
- if (!r)
- break;
- inputRows.append(r);
- if (inputRows.ordinality() >= master->numRecordsPerBatch)
- break;
- }
- if (inputRows.ordinality() == 0)
- break;
- processQuery(inputRows);
- }
- catch (IException *e)
- {
- master->setErrorOwn(e); // going to exit next time round
- }
- while (inputRows.ordinality() > 0)
- master->rowProvider->releaseRow(inputRows.popGet());
- }
- }
- master->setDone();
- return 0;
- }
- //=================================================================================================
- IWSCHelper * createSoapCallHelper(IWSCRowProvider *r, IEngineRowAllocator * outputAllocator, const char *authToken, WSCMode wscMode, ClientCertificate *clientCert, const IContextLogger &logctx, IRoxieAbortMonitor * roxieAbortMonitor)
- {
- return new CWSCHelper(r, outputAllocator, authToken, wscMode, clientCert, logctx, roxieAbortMonitor, STsoap);
- }
- IWSCHelper * createHttpCallHelper(IWSCRowProvider *r, IEngineRowAllocator * outputAllocator, const char *authToken, WSCMode wscMode, ClientCertificate *clientCert, const IContextLogger &logctx, IRoxieAbortMonitor * roxieAbortMonitor)
- {
- return new CWSCHelper(r, outputAllocator, authToken, wscMode, clientCert, logctx, roxieAbortMonitor, SThttp);
- }
- //=================================================================================================
- bool httpHeaderBlockContainsHeader(const char *httpheaders, const char *header)
- {
- if (!httpheaders || !*httpheaders)
- return false;
- VStringBuffer match("\n%s:", header);
- const char *matchStart = match.str()+1;
- if (!strncmp(httpheaders, matchStart, strlen(matchStart)))
- return true;
- if (strstr(httpheaders, match))
- return true;
- return false;
- }
- bool getHTTPHeader(const char *httpheaders, const char *header, StringBuffer& value)
- {
- if (!httpheaders || !*httpheaders || !header || !*header)
- return false;
- const char* pHeader = strstr(httpheaders, header);
- if (!pHeader)
- return false;
- pHeader += strlen(header);
- if (*pHeader != ':')
- return getHTTPHeader(pHeader, header, value);
- pHeader++;
- const char* ppHeader = strchr(pHeader, '\n');
- if (!ppHeader)
- value.append(pHeader);
- else
- value.append(pHeader, 0, ppHeader - pHeader);
- value.trim();
- return true;
- }
- class CWSCAsyncFor : implements IWSCAsyncFor, public CInterface, public CAsyncFor
- {
- class CSocketDataProvider : public CInterface
- {
- const char * buffer;
- size32_t currLen;
- size32_t curPosn;
- ISocket * socket;
- unsigned timeoutMS;
- public:
- CSocketDataProvider(const char * _buffer, size32_t _curPosn, size32_t _currLen, ISocket * _sock, unsigned _timeout )
- : buffer(_buffer), currLen(_currLen), curPosn(_curPosn), socket(_sock), timeoutMS(_timeout)
- {
- }
- size32_t getBytes(char * buf, size32_t len)
- {
- size32_t count;
- if ( len <= (currLen-curPosn))
- { //its already in the buffer
- memcpy(buf, (buffer + curPosn), len);
- curPosn += len;
- count = len;
- }
- else if (curPosn >= currLen)
- { //nothing in buffer, read from socket
- size32_t bytesRead=0;
- count = 0;
- do
- {
- socket->readtms(buf + count, 0, len - count, bytesRead, timeoutMS);
- count += bytesRead;
- } while (count != len);
- currLen = curPosn = 0;
- }
- else
- { //only some is in buffer, read rest from socket
- size32_t bytesRead=0;
- size32_t avail = currLen - curPosn;
- memcpy(buf, (buffer + curPosn), avail);
- count = avail;
- do
- {
- size32_t read;
- socket->readtms(buf+avail+bytesRead, 0, len-avail-bytesRead, read, timeoutMS);
- bytesRead += read;
- } while (len != (bytesRead + avail));
- count += bytesRead;
- currLen = curPosn = 0;
- }
- return count;
- }
- };
- private:
- CWSCHelper * master;
- ConstPointerArray &inputRows;
- IXmlWriterExt &xmlWriter;
- IEngineRowAllocator * outputAllocator;
- CriticalSection processExceptionCrit;
- StringBuffer responsePath;
- Owned<CSocketDataProvider> dataProvider;
- PTreeReaderOptions options;
- unsigned remainingMS;
- inline void checkRoxieAbortMonitor(IRoxieAbortMonitor * roxieAbortMonitor)
- {
- if (roxieAbortMonitor)
- {
- try
- {
- roxieAbortMonitor->checkForAbort();//throws
- }
- catch (IException *e)
- {
- StringBuffer s;
- throw MakeStringException(ROXIE_ABORT_EVENT, "%s", e->errorMessage(s).str());
- }
- }
- }
- bool checkContentEncodingSupported(const char* encoding)
- {
- if (strieq(encoding, "gzip"))
- return true;
- if (strieq(encoding, "deflate"))
- return true;
- if (strieq(encoding, "x-deflate"))
- return true;
- return false;
- }
- bool checkContentDecoding(const StringBuffer& headers, StringBuffer& content, StringBuffer& contentEncoding)
- {
- if ((headers.length() == 0) || (content.length() == 0))
- return false;
- getHTTPHeader(headers.str(), CONTENT_ENCODING, contentEncoding);
- if (contentEncoding.isEmpty())
- return false;
- if (!checkContentEncodingSupported(contentEncoding.str()))
- throw MakeStringException(-1, "Content-Encoding:%s not supported", contentEncoding.str());
- return true;
- }
- void decodeContent(const char* contentEncodingType, StringBuffer& content)
- {
- #ifdef _USE_ZLIB
- unsigned contentLength = content.length();
- StringBuffer contentDecoded;
- httpInflate((const byte*)content.str(), contentLength, contentDecoded, strieq(contentEncodingType, "gzip"));
- PROGLOG("Content decoded from %d bytes to %d bytes", contentLength, contentDecoded.length());
- content = contentDecoded;
- if (soapTraceLevel > 6 || master->logXML)
- multiLog(master->logctx, "Content decoded. Original %s %d", CONTENT_LENGTH, contentLength);
- #else
- throw MakeStringException(-1, "_USE_ZLIB is required for Content-Encoding:%s", contentEncodingType);
- #endif
- }
- bool checkContentEncoding(const char* httpheaders, StringBuffer& contentEncodingType)
- {
- if (xmlWriter.length() == 0)
- return false;
- getHTTPHeader(httpheaders, CONTENT_ENCODING, contentEncodingType);
- if (contentEncodingType.isEmpty())
- return false;
- if (!checkContentEncodingSupported(contentEncodingType.str()))
- throw MakeStringException(-1, "Content-Encoding:%s not supported", contentEncodingType.str());
- return true;
- }
- ZlibCompressionType getEncodeFormat(const char *name)
- {
- if (strieq(name, "gzip"))
- return ZlibCompressionType::GZIP;
- if (strieq(name, "x-deflate"))
- return ZlibCompressionType::ZLIB_DEFLATE;
- if (strieq(name, "deflate"))
- return ZlibCompressionType::DEFLATE;
- return ZlibCompressionType::GZIP; //already checked above, shouldn't be here
- }
- void encodeContent(const char* contentEncodingType, MemoryBuffer& mb)
- {
- #ifdef _USE_ZLIB
- zlib_deflate(mb, xmlWriter.str(), xmlWriter.length(), GZ_BEST_SPEED, getEncodeFormat(contentEncodingType));
- PROGLOG("Content encoded from %d bytes to %d bytes", xmlWriter.length(), mb.length());
- #else
- throw MakeStringException(-1, "_USE_ZLIB is required for Content-Encoding:%s", contentEncodingType);
- #endif
- }
- void logRequest(bool contentEncoded, StringBuffer& request)
- {
- if (soapTraceLevel > 6 || master->logXML)
- {
- if (!contentEncoded)
- multiLog(master->logctx, "%s: request(%s)", master->wscCallTypeText(), request.str());
- else
- multiLog(master->logctx, "%s: request(%s), content encoded.", master->wscCallTypeText(), request.str());
- }
- }
- void createHttpRequest(Url &url, StringBuffer &request)
- {
- // Create the HTTP POST request
- if (master->wscType == STsoap)
- request.clear().append("POST ").append(url.path).append(" HTTP/1.1\r\n");
- else
- request.clear().append(master->service).append(" ").append(url.path).append(" HTTP/1.1\r\n");
- const char *httpheaders = master->httpHeaders.get();
- if (httpheaders && *httpheaders)
- {
- if (soapTraceLevel > 6 || master->logXML)
- multiLog(master->logctx, "%s: Adding HTTP Headers(%s)", master->wscCallTypeText(), httpheaders);
- request.append(httpheaders);
- }
- if (!httpHeaderBlockContainsHeader(httpheaders, "Authorization"))
- {
- if (url.userPasswordPair.length() > 0)
- {
- StringBuffer authToken;
- JBASE64_Encode(url.userPasswordPair.str(), url.userPasswordPair.length(), authToken, false);
- request.append("Authorization: Basic ").append(authToken).append("\r\n");
- }
- else if (master->authToken.length() > 0)
- {
- request.append("Authorization: Basic ").append(master->authToken).append("\r\n");
- }
- }
- #ifdef _USE_ZLIB
- if (!httpHeaderBlockContainsHeader(httpheaders, ACCEPT_ENCODING))
- request.appendf("%s: gzip, deflate\r\n", ACCEPT_ENCODING);
- #endif
- if (!isEmptyString(master->logctx.queryGlobalId()))
- {
- request.append(master->logctx.queryGlobalIdHttpHeader()).append(": ").append(master->logctx.queryGlobalId()).append("\r\n");
- if (!isEmptyString(master->logctx.queryLocalId()))
- request.append(master->logctx.queryCallerIdHttpHeader()).append(": ").append(master->logctx.queryLocalId()).append("\r\n"); //our localId is reciever's callerId
- }
- if (master->wscType == STsoap)
- {
- if (master->soapaction.get())
- request.append("SOAPAction: ").append(master->soapaction.get()).append("\r\n");
- if (master->httpHeaders.isEmpty() && master->httpHeaderName.get() && master->httpHeaderValue.get())
- {
- //backward compatibility
- StringBuffer hdr(master->httpHeaderName.get());
- hdr.append(": ").append(master->httpHeaderValue);
- if (soapTraceLevel > 6 || master->logXML)
- multiLog(master->logctx, "SOAPCALL: Adding HTTP Header(%s)", hdr.str());
- request.append(hdr.append("\r\n"));
- }
- if (!httpHeaderBlockContainsHeader(httpheaders, "Content-Type"))
- {
- bool isJson = ((master->flags & SOAPFmarkupinfo) && (master->flags & SOAPFjson));
- if (isJson)
- request.append("Content-Type: application/json\r\n");
- else
- request.append("Content-Type: text/xml\r\n");
- }
- }
- else if(master->wscType == SThttp)
- request.append("Accept: ").append(master->acceptType).append("\r\n");
- else
- assertex(false);
- if (master->wscType == STsoap)
- {
- request.append("Host: ").append(url.host).append(":").append(url.port).append("\r\n");//http 1.1
- StringBuffer contentEncodingType;
- if (!checkContentEncoding(httpheaders, contentEncodingType))
- {
- request.append(CONTENT_LENGTH).append(xmlWriter.length()).append("\r\n\r\n");
- request.append(xmlWriter.str());//add SOAP xml content
- logRequest(false, request);
- }
- else
- {
- logRequest(true, request);
- MemoryBuffer encodedContentBuf;
- encodeContent(contentEncodingType.str(), encodedContentBuf);
- request.append(CONTENT_LENGTH).append(encodedContentBuf.length()).append("\r\n\r\n");
- request.append(encodedContentBuf.length(), encodedContentBuf.toByteArray());
- }
- }
- else
- {
- request.append("Host: ").append(url.host);//http 1.1
- if (url.port != 80) //default port?
- request.append(":").append(url.port);
- request.append("\r\n");//http 1.1
- request.append("\r\n");//httpcall
- logRequest(false, request);
- }
- if (master->logMin)
- master->logctx.CTXLOG("%s: request(%s:%u)", master->wscCallTypeText(), url.host.str(), url.port);
- }
- int readHttpResponse(StringBuffer &response, ISocket *socket)
- {
- // Read the POST reply
- // not doesn't *assume* is valid HTTP post format but if it is takes advantage of
- response.clear();
- unsigned bytesRead;
- MemoryAttr buf;
- char *buffer=(char *)buf.allocate(WSCBUFFERSIZE+1);
- int rval = 200;
- // first read header
- size32_t payloadofs = 0;
- size32_t payloadsize = 0;
- StringBuffer dbgheader, contentEncoding;
- bool chunked = false;
- size32_t read = 0;
- do {
- checkTimeLimitExceeded(&remainingMS);
- checkRoxieAbortMonitor(master->roxieAbortMonitor);
- socket->readtms(buffer+read, 0, WSCBUFFERSIZE-read, bytesRead, MIN(master->timeoutMS,remainingMS));
- checkTimeLimitExceeded(&remainingMS);
- checkRoxieAbortMonitor(master->roxieAbortMonitor);
- read += bytesRead;
- buffer[read] = 0;
- if (strncmp(buffer, "HTTP", 4) == 0) {
- const char *s = strstr(buffer,"\r\n\r\n");
- if (s) {
- payloadofs = (size32_t)(s-buffer+4);
- dbgheader.append(payloadofs,buffer);
- s = strstr(buffer, " ");
- if (s)
- rval = atoi(s+1);
- if (!strstr(buffer,"Transfer-Encoding: chunked"))
- {
- chunked = false;
- s = strstr(buffer,CONTENT_LENGTH);
- if (s) {
- s += strlen(CONTENT_LENGTH);
- if ((size32_t)(s-buffer) < payloadofs)
- payloadsize = atoi(s);
- }
- }
- else
- {
- chunked = true;
- size32_t chunkSize = 0;
- size32_t dataLen = 0;
- char ch;
- /*
- //from http://www.w3.org/Protocols/rfc2616/rfc2616-sec19.html#sec19.4.4
- "19.4.6 Introduction of Transfer-Encoding"
- length := 0
- read chunk-size, chunk-extension (if any) and CRLF
- while (chunk-size > 0)
- {
- read chunk-data and CRLF
- append chunk-data to entity-body
- length := length + chunk-size
- read chunk-size and CRLF
- }
- */
- checkTimeLimitExceeded(&remainingMS);
- dataProvider.setown(new CSocketDataProvider(buffer, payloadofs, read, socket, MIN(master->timeoutMS,remainingMS)));
- dataProvider->getBytes(&ch, 1);
- while (isalpha(ch) || isdigit(ch))
- { //get chunk-size
- if (isdigit(ch))
- chunkSize = (chunkSize*16) + (ch - '0');
- else
- chunkSize = (chunkSize*16) + 10 + (toupper(ch) - 'A');
- dataProvider->getBytes(&ch, 1);
- }
- while (chunkSize && ch != '\n')//consume chunk-extension and CRLF
- dataProvider->getBytes(&ch, 1);
- while (chunkSize)
- {
- if (chunkSize > WSCBUFFERSIZE)
- DBGLOG("SOAPCALL chunk size %d", chunkSize);
- //read chunk data directly into response
- size32_t count = dataProvider->getBytes(response.reserve(dataLen + chunkSize), chunkSize);
- assertex(count == chunkSize);
- dataLen += count;
- response.setLength(dataLen);
- dataProvider->getBytes(&ch, 1);//consume CRLF at end of chunk
- while (ch != '\n')
- dataProvider->getBytes(&ch, 1);
- chunkSize = 0;
- dataProvider->getBytes(&ch, 1);
- while (isalpha(ch) || isdigit(ch))
- { //get next chunk size
- if (isdigit(ch))
- chunkSize = (chunkSize*16) + (ch - '0');
- else
- chunkSize = (chunkSize*16) + 10 + (toupper(ch) - 'A');
- dataProvider->getBytes(&ch, 1);
- }
- while(chunkSize && ch != '\n')//consume chunk-extension and CRLF
- dataProvider->getBytes(&ch, 1);
- }
- }
- break;
- }
- }
- if (bytesRead == 0) // socket closed
- break;
- } while (bytesRead&&(read<WSCBUFFERSIZE));
- if (!chunked)
- {
- if (payloadsize)
- response.ensureCapacity(payloadsize);
- char *payload = buffer;
- if (payloadofs) {
- read -= payloadofs;
- payload += payloadofs;
- if (payloadsize&&(read>payloadsize))
- read = payloadsize;
- }
- if (read)
- response.append(read,payload);
- if (payloadsize) { // read directly into response
- while (read<payloadsize) {
- checkTimeLimitExceeded(&remainingMS);
- checkRoxieAbortMonitor(master->roxieAbortMonitor);
- socket->readtms(response.reserve(payloadsize-read), 0, payloadsize-read, bytesRead, MIN(master->timeoutMS,remainingMS));
- checkTimeLimitExceeded(&remainingMS);
- checkRoxieAbortMonitor(master->roxieAbortMonitor);
- read += bytesRead;
- response.setLength(read);
- if (bytesRead==0) {
- master->logctx.CTXLOG("%sCALL: Warning %sHTTP response terminated prematurely",master->wscType == STsoap ? "SOAP" : "HTTP",chunked?"CHUNKED ":"");
- break; // oops looks likesocket closed early
- }
- }
- }
- else {
- for (;;) {
- checkTimeLimitExceeded(&remainingMS);
- checkRoxieAbortMonitor(master->roxieAbortMonitor);
- socket->readtms(buffer, 0, WSCBUFFERSIZE, bytesRead, MIN(master->timeoutMS,remainingMS));
- checkTimeLimitExceeded(&remainingMS);
- checkRoxieAbortMonitor(master->roxieAbortMonitor);
- if (bytesRead==0)
- break; // rely on socket closing to terminate message
- response.append(bytesRead,buffer);
- }
- }
- }
- if (checkContentDecoding(dbgheader, response, contentEncoding))
- decodeContent(contentEncoding.str(), response);
- if (soapTraceLevel > 6 || master->logXML)
- multiLog(master->logctx, "%sCALL: LEN=%d %sresponse(%s%s)", master->wscType == STsoap ? "SOAP" : "HTTP",response.length(),chunked?"CHUNKED ":"", dbgheader.str(), response.str());
- else if (soapTraceLevel > 8)
- multiLog(master->logctx, "%sCALL: LEN=%d %sresponse(%s)", master->wscType == STsoap ? "SOAP" : "HTTP",response.length(),chunked?"CHUNKED ":"", response.str()); // not sure this is that useful but...
- return rval;
- }
- inline const char *queryXpathHint(const char *name)
- {
- if (!master->xpathHints)
- return nullptr;
- return master->xpathHints->queryProp(name);
- }
- void processEspResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
- {
- StringBuffer path(responsePath);
- path.append("/Results/Result/");
- const char *tail = nullptr;
- const char *excPath = nullptr;
- if (master->rowTransformer && master->inputpath.get())
- {
- StringBuffer ipath;
- ipath.append("/Envelope/Body/").append(master->inputpath.get());
- tail = queryXpathHint("rowpath");
- if(!tail && (ipath.length() >= path.length()) && (0 == memcmp(ipath.str(), path.str(), path.length())))
- tail = ipath.str() + path.length();
- else
- path.clear().append(ipath);
- excPath = queryXpathHint("excpath");
- }
- else
- tail = "Dataset/Row";
- CMatchCB matchCB(*this, url, tail, meta, excPath, CBExceptionPathExc);
- Owned<IXMLParse> xmlParser = createXMLParse((const void *)response.str(), (unsigned)response.length(), path.str(), matchCB, options, (master->flags&SOAPFusescontents)!=0);
- while (xmlParser->next());
- }
- void processLiteralResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
- {
- StringBuffer path("/Envelope/Body/");
- const char *tail = nullptr;
- const char *excPath = nullptr;
- if(master->rowTransformer && master->inputpath.get())
- {
- path.append(master->inputpath.get());
- tail = queryXpathHint("rowpath");
- excPath = queryXpathHint("excpath");
- }
- CMatchCB matchCB(*this, url, tail, meta, excPath, CBExceptionPathExc);
- Owned<IXMLParse> xmlParser = createXMLParse((const void *)response.str(), (unsigned)response.length(), path.str(), matchCB, options, (master->flags&SOAPFusescontents)!=0);
- while (xmlParser->next());
- }
- void processHttpResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
- {
- const char *path = nullptr;
- const char *tail = nullptr;
- const char *excPath = nullptr;
- if(master->rowTransformer && master->inputpath.get())
- {
- path = master->inputpath.get();
- tail = queryXpathHint("rowpath");
- excPath = queryXpathHint("excpath");
- }
- CMatchCB matchCB(*this, url, tail, meta, excPath, CBExceptionPathExc | CBExceptionPathExcs | CBExceptionPathExcsExc);
- Owned<IXMLParse> xmlParser;
- if (strieq(master->acceptType.str(), "application/json") || (master->flags & SOAPFjson))
- xmlParser.setown(createJSONParse((const void *)response.str(), (unsigned)response.length(), path, matchCB, options, (master->flags&SOAPFusescontents)!=0, true));
- else
- xmlParser.setown(createXMLParse((const void *)response.str(), (unsigned)response.length(), path, matchCB, options, (master->flags&SOAPFusescontents)!=0));
- while (xmlParser->next());
- }
- void processResponse(Url &url, StringBuffer &response, ColumnProvider * meta)
- {
- if (master->wscType == SThttp || master->flags & SOAPFmarkupinfo)
- processHttpResponse(url, response, meta);
- else if (master->flags & SOAPFliteral)
- processLiteralResponse(url, response, meta);
- else if (master->flags & SOAPFencoding)
- processLiteralResponse(url, response, meta);
- else
- processEspResponse(url, response, meta);
- }
- void processException(const Url &url, const void *row, IException *e)
- {
- CriticalBlock block(processExceptionCrit);
- Owned<IException> ne = url.getUrlException(e);
- e->Release();
- if ((master->flags & SOAPFonfail) && master->callHelper)
- {
- try
- {
- RtlDynamicRowBuilder rowBuilder(outputAllocator);
- size32_t sizeGot = master->onFailTransform(rowBuilder, row, ne);
- if (sizeGot > 0)
- master->putRow(rowBuilder.finalizeRowClear(sizeGot));
- }
- catch (IException *te)
- {
- master->setErrorOwn(te);
- }
- }
- else
- master->setErrorOwn(ne.getClear());
- }
- void processException(const Url &url, ConstPointerArray &inputRows, IException *e)
- {
- Owned<IException> ne = url.getUrlException(e);
- e->Release();
- if ((master->flags & SOAPFonfail) && master->callHelper)
- {
- ForEachItemIn(idx, inputRows)
- {
- try
- {
- RtlDynamicRowBuilder rowBuilder(outputAllocator);
- size32_t sizeGot = master->onFailTransform(rowBuilder, inputRows.item(idx), ne);
- if (sizeGot > 0)
- master->putRow(rowBuilder.finalizeRowClear(sizeGot));
- }
- catch (IException *te)
- {
- master->setErrorOwn(te);
- break;
- }
- }
- }
- else
- master->setErrorOwn(ne.getClear());
- }
- inline void checkTimeLimitExceeded(unsigned * remainingMS)
- {
- if (master->isTimeLimitExceeded(remainingMS))
- throw MakeStringException(TIMELIMIT_EXCEEDED, "%sCALL TIMELIMIT(%ums) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
- }
- public:
- CWSCAsyncFor(CWSCHelper * _master, IXmlWriterExt &_xmlWriter, ConstPointerArray &_inputRows, PTreeReaderOptions _options): xmlWriter(_xmlWriter), inputRows(_inputRows), options(_options)
- {
- master = _master;
- outputAllocator = master->queryOutputAllocator();
- responsePath.append("/Envelope/Body/");
- if (inputRows.ordinality() > 1)
- {
- // can we receive a roxie exceptions for the whole RequestArray?
- // if so, we need to handle them here
- responsePath.append(master->service).append("ResponseArray/");
- }
- responsePath.append(master->service).append("Response");
- }
- ~CWSCAsyncFor()
- {
- }
- IMPLEMENT_IINTERFACE;
- void For(unsigned num,unsigned maxatonce,bool abortFollowingException, bool shuffled)
- {
- CAsyncFor::For(num, maxatonce, abortFollowingException, shuffled);
- }
- void Do(unsigned idx)
- {
- StringBuffer request;
- StringBuffer response;
- unsigned numRetries = 0;
- unsigned retryInterval = 0;
- Url &url = master->urlArray.item(idx);
- createHttpRequest(url, request);
- unsigned startidx = idx;
- while (!master->aborted)
- {
- Owned<ISocket> socket;
- cycle_t startCycles, endCycles;
- startCycles = get_cycles_now();
- for (;;)
- {
- try
- {
- checkTimeLimitExceeded(&remainingMS);
- Url &connUrl = master->proxyUrlArray.empty() ? url : master->proxyUrlArray.item(0);
- socket.setown(blacklist->connect(connUrl.port, connUrl.host, master->logctx, (unsigned)master->maxRetries, master->timeoutMS, master->roxieAbortMonitor));
- if (stricmp(url.method, "https") == 0)
- {
- #ifdef _USE_OPENSSL
- Owned<ISecureSocket> ssock = master->createSecureSocket(socket.getClear());
- if (ssock)
- {
- checkTimeLimitExceeded(&remainingMS);
- int status = ssock->secure_connect();
- if (status < 0)
- {
- StringBuffer err;
- err.append("Failure to establish secure connection to ");
- connUrl.getUrlString(err);
- err.append(": returned ").append(status);
- throw MakeStringExceptionDirect(0, err.str());
- }
- socket.setown(ssock.getLink());
- }
- #else
- StringBuffer err;
- err.append("Failure to establish secure connection to ");
- connUrl.getUrlString(err);
- err.append(": OpenSSL disabled in build");
- throw MakeStringExceptionDirect(0, err.str());
- #endif
- }
- break;
- }
- catch (IException *e)
- {
- if (master->timeLimitExceeded)
- {
- master->logctx.CTXLOG("%sCALL exiting: time limit (%ums) exceeded",master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
- processException(url, inputRows, e);
- return;
- }
- if (e->errorCode() == ROXIE_ABORT_EVENT)
- {
- StringBuffer s;
- master->logctx.CTXLOG("%sCALL exiting: Roxie Abort : %s",master->wscType == STsoap ? "SOAP" : "HTTP",e->errorMessage(s).str());
- throw;
- }
- do
- {
- idx++; // try next socket not blacklisted
- if (idx==master->urlArray.ordinality())
- idx = 0;
- if (idx==startidx)
- {
- StringBuffer s;
- master->logctx.CTXLOG("Exception %s", e->errorMessage(s).str());
- processException(url, inputRows, e);
- return;
- }
- } while (blacklist->blacklisted(url.port, url.host));
- }
- }
- try
- {
- checkTimeLimitExceeded(&remainingMS);
- checkRoxieAbortMonitor(master->roxieAbortMonitor);
- socket->write(request.str(), request.length());
- if (soapTraceLevel > 4)
- multiLog(master->logctx, "%sCALL: sent request (%s) to %s:%d", master->wscType == STsoap ? "SOAP" : "HTTP",master->service.str(), url.host.str(), url.port);
- checkTimeLimitExceeded(&remainingMS);
- checkRoxieAbortMonitor(master->roxieAbortMonitor);
- int rval = readHttpResponse(response, socket);
- if (soapTraceLevel > 4)
- multiLog(master->logctx, "%sCALL: received response (%s) from %s:%d", master->wscType == STsoap ? "SOAP" : "HTTP",master->service.str(), url.host.str(), url.port);
- if (rval != 200)
- {
- if (rval == 503)
- throw new ReceivedRoxieException(1001, "Server Too Busy");
- StringBuffer text;
- text.appendf("HTTP error (%d) in processQuery",rval);
- rtlAddExceptionTag(text, "soapresponse", response.str());
- throw MakeStringExceptionDirect(-1, text.str());
- }
- if (response.length() == 0)
- {
- throw MakeStringException(-1, "Zero length response in processQuery");
- }
- endCycles = get_cycles_now();
- __int64 elapsedNs = cycle_to_nanosec(endCycles-startCycles);
- master->logctx.noteStatistic(StTimeSoapcall, elapsedNs);
- checkTimeLimitExceeded(&remainingMS);
- ColumnProvider * meta = (ColumnProvider*)CreateColumnProvider((unsigned)nanoToMilli(elapsedNs), master->flags&SOAPFencoding?true:false);
- processResponse(url, response, meta);
- delete meta;
- break;
- }
- catch (IReceivedRoxieException *e)
- {
- // server busy ... Sleep and retry
- if (e->errorCode() == 1001)
- {
- if (retryInterval)
- {
- int sleepTime = retryInterval + getRandom() % retryInterval;
- master->logctx.CTXLOG("Server busy (1001), sleeping for %d milliseconds before retry", sleepTime);
- Sleep(sleepTime);
- retryInterval = (retryInterval*2 >= 10000)? 10000: retryInterval*2;
- }
- else
- {
- master->logctx.CTXLOG("Server busy (1001), retrying");
- retryInterval = 10;
- }
- e->Release();
- }
- else
- {
- // other roxie exception ...
- master->logctx.CTXLOG("Exiting: received Roxie exception");
- if (e->errorRow())
- processException(url, e->errorRow(), e);
- else
- processException(url, inputRows, e);
- break;
- }
- }
- catch (IException *e)
- {
- if (master->timeLimitExceeded)
- {
- processException(url, inputRows, e);
- master->logctx.CTXLOG("%sCALL exiting: time limit (%ums) exceeded", master->wscType == STsoap ? "SOAP" : "HTTP", master->timeLimitMS);
- break;
- }
- if (e->errorCode() == ROXIE_ABORT_EVENT)
- {
- StringBuffer s;
- master->logctx.CTXLOG("%sCALL exiting: Roxie Abort : %s",master->wscType == STsoap ? "SOAP" : "HTTP",e->errorMessage(s).str());
- throw;
- }
- // other IException ... retry up to maxRetries
- StringBuffer s;
- master->logctx.CTXLOG("Exception %s", e->errorMessage(s).str());
- if (numRetries >= master->maxRetries)
- {
- // error affects all inputRows
- master->logctx.CTXLOG("Exiting: maxRetries %d exceeded", master->maxRetries);
- processException(url, inputRows, e);
- break;
- }
- numRetries++;
- master->logctx.CTXLOG("Retrying: attempt %d of %d", numRetries, master->maxRetries);
- e->Release();
- }
- catch (std::exception & es)
- {
- if(dynamic_cast<std::bad_alloc *>(&es))
- throw MakeStringException(-1, "std::exception: out of memory (std::bad_alloc) in CWSCAsyncFor processQuery");
- throw MakeStringException(-1, "std::exception: standard library exception (%s) in CWSCAsyncFor processQuery",es.what());
- }
- catch (...)
- {
- throw MakeStringException(-1, "Unknown exception in processQuery");
- }
- }
- }
- inline virtual const char *getResponsePath() { return responsePath; }
- inline virtual ConstPointerArray & getInputRows() { return inputRows; }
- inline virtual CWSCHelper * getMaster() { return master; }
- inline virtual IEngineRowAllocator * getOutputAllocator() { return outputAllocator; }
- };
- IWSCAsyncFor * createWSCAsyncFor(CWSCHelper * _master, IXmlWriterExt &_xmlWriter, ConstPointerArray &_inputRows, PTreeReaderOptions _options)
- {
- return new CWSCAsyncFor(_master, _xmlWriter, _inputRows, _options);
- }
|