ws_sqlService.cpp 75 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "ws_sqlService.hpp"
  14. #include "exception_util.hpp"
  15. void CwssqlEx::init(IPropertyTree *_cfg, const char *_process, const char *_service)
  16. {
  17. cfg = _cfg;
  18. try
  19. {
  20. ECLFunctions::init();
  21. }
  22. catch (...)
  23. {
  24. throw MakeStringException(-1, "ws_sqlEx: Problem initiating ECLFunctions structure");
  25. }
  26. refreshValidClusters();
  27. setWsSqlBuildVersion(BUILD_TAG);
  28. }
  29. bool CwssqlEx::onEcho(IEspContext &context, IEspEchoRequest &req, IEspEchoResponse &resp)
  30. {
  31. resp.setResponse(req.getRequest());
  32. return true;
  33. }
  34. bool CwssqlEx::onGetDBMetaData(IEspContext &context, IEspGetDBMetaDataRequest &req, IEspGetDBMetaDataResponse &resp)
  35. {
  36. if (!context.validateFeatureAccess(WSSQLACCESS, SecAccess_Read, false))
  37. throw MakeStringException(-1, "Failed to fetch HPCC information. Permission denied.");
  38. bool success = false;
  39. StringBuffer username;
  40. context.getUserID(username);
  41. const char* passwd = context.queryPassword();
  42. bool includetables = req.getIncludeTables();
  43. if (includetables)
  44. {
  45. Owned<HPCCFileCache> tmpHPCCFileCache = HPCCFileCache::createFileCache(username.str(), passwd);
  46. tmpHPCCFileCache->populateTablesResponse(resp, req.getTableFilter());
  47. resp.setTableCount(resp.getTables().length());
  48. }
  49. bool includeStoredProcs = req.getIncludeStoredProcedures();
  50. if (includeStoredProcs)
  51. {
  52. const char * querysetfilter = req.getQuerySet();
  53. Owned<IStringIterator> targets = getTargetClusters(NULL, NULL);
  54. IArrayOf<IEspHPCCQuerySet> pquerysets;
  55. SCMStringBuffer target;
  56. ForEach(*targets)
  57. {
  58. const char *setname = targets->str(target).str();
  59. if ( querysetfilter && *querysetfilter && stricmp(setname, querysetfilter)!=0)
  60. continue;
  61. Owned<IEspHPCCQuerySet> pqset = createHPCCQuerySet();
  62. pqset->setName(setname);
  63. pquerysets.append(*pqset.getLink());
  64. Owned<IPropertyTree> settree = getQueryRegistry(setname, true);
  65. if (settree == NULL)
  66. continue;
  67. IArrayOf<IEspPublishedQuery> queries;
  68. Owned<IPropertyTreeIterator> iter = settree->getElements("Query");
  69. ForEach(*iter)
  70. {
  71. const char * id = iter->query().queryProp("@id");
  72. const char * qname = iter->query().queryProp("@name");
  73. const char * wuid = iter->query().queryProp("@wuid");
  74. if (qname && *qname && wuid && *wuid)
  75. {
  76. StringBuffer resp;
  77. Owned<IEspPublishedQuery> pubQuery = createPublishedQuery();
  78. pubQuery->setName(qname);
  79. pubQuery->setId(id);
  80. pubQuery->setWuid(wuid);
  81. pubQuery->setSuspended(iter->query().getPropBool("@suspended"));
  82. Owned<IEspQuerySignature> querysignature = createQuerySignature();
  83. IArrayOf<IEspHPCCColumn> inparams;
  84. IArrayOf<IEspOutputDataset> resultsets;
  85. WsEclWuInfo wsinfo(wuid, setname, qname, username, passwd);
  86. Owned<IResultSetFactory> resultSetFactory(getResultSetFactory(username, passwd));
  87. //Each published query can have multiple results (datasets)
  88. IConstWUResultIterator &results = wsinfo.ensureWorkUnit()->getResults();
  89. ForEach(results)
  90. {
  91. Owned<IEspOutputDataset> outputdataset = createOutputDataset();
  92. IArrayOf<IEspHPCCColumn> outparams;
  93. IConstWUResult &result = results.query();
  94. SCMStringBuffer resultName;
  95. result.getResultName(resultName);
  96. outputdataset->setName(resultName.s.str());
  97. Owned<IResultSetMetaData> meta = resultSetFactory->createResultSetMeta(&result);
  98. //Each result dataset can have multiple result columns
  99. int columncount = meta->getColumnCount();
  100. for (int i = 0; i < columncount; i++)
  101. {
  102. Owned<IEspHPCCColumn> col = createHPCCColumn();
  103. SCMStringBuffer columnLabel;
  104. meta->getColumnLabel(columnLabel,i);
  105. col->setName(columnLabel.str());
  106. SCMStringBuffer eclType;
  107. meta->getColumnEclType(eclType, i);
  108. col->setType(eclType.str());
  109. outparams.append(*col.getLink());
  110. }
  111. outputdataset->setOutParams(outparams);
  112. resultsets.append(*outputdataset.getLink());
  113. }
  114. //Each query can have multiple input parameters
  115. IConstWUResultIterator &vars = wsinfo.ensureWorkUnit()->getVariables();
  116. ForEach(vars)
  117. {
  118. Owned<IEspHPCCColumn> col = createHPCCColumn();
  119. IConstWUResult &var = vars.query();
  120. SCMStringBuffer varname;
  121. var.getResultName(varname);
  122. col->setName(varname.str());
  123. Owned<IResultSetMetaData> meta = resultSetFactory->createResultSetMeta(&var);
  124. SCMStringBuffer eclType;
  125. meta->getColumnEclType(eclType, 0);
  126. col->setType(eclType.str());
  127. inparams.append(*col.getLink());
  128. }
  129. querysignature->setInParams(inparams);
  130. querysignature->setResultSets(resultsets);
  131. pubQuery->setSignature(*querysignature.getLink());
  132. queries.append(*pubQuery.getLink());
  133. }
  134. }
  135. pqset->setQuerySetQueries(queries);
  136. IArrayOf<IEspQuerySetAliasMap> aliases;
  137. Owned<IPropertyTreeIterator> aliasiter = settree->getElements("Alias");
  138. ForEach(*aliasiter)
  139. {
  140. Owned<IEspQuerySetAliasMap> alias = createQuerySetAliasMap();
  141. const char * qname;
  142. const char * id;
  143. id = aliasiter->query().queryProp("@id");
  144. qname = aliasiter->query().queryProp("@name");
  145. alias->setId(id);
  146. alias->setName(qname);
  147. aliases.append(*alias.getLink());
  148. }
  149. pqset->setQuerySetAliases(aliases);
  150. }
  151. resp.setQuerySets(pquerysets);
  152. }
  153. bool includeTargetClusters = req.getIncludeTargetClusters();
  154. if (includeTargetClusters)
  155. {
  156. try
  157. {
  158. CTpWrapper topologyWrapper;
  159. IArrayOf<IEspTpLogicalCluster> clusters;
  160. topologyWrapper.getTargetClusterList(clusters, req.getClusterType(), NULL);
  161. StringArray dfuclusters;
  162. ForEachItemIn(k, clusters)
  163. {
  164. IEspTpLogicalCluster& cluster = clusters.item(k);
  165. dfuclusters.append(cluster.getName());
  166. }
  167. resp.setClusterNames(dfuclusters);
  168. }
  169. catch(IException* e)
  170. {
  171. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  172. }
  173. }
  174. return success;
  175. }
  176. bool CwssqlEx::onGetDBSystemInfo(IEspContext &context, IEspGetDBSystemInfoRequest &req, IEspGetDBSystemInfoResponse &resp)
  177. {
  178. bool success = false;
  179. resp.setName("HPCC Systems");
  180. if (!context.validateFeatureAccess(WSSQLACCESS, SecAccess_Access, false))
  181. throw MakeStringException(-1, "Failed to fetch HPCC information. Permission denied.");
  182. try
  183. {
  184. const char* build_ver = getBuildVersion();
  185. if (build_ver && *build_ver)
  186. {
  187. StringBuffer project;
  188. StringBuffer major;
  189. StringBuffer minor;
  190. StringBuffer point;
  191. StringBuffer maturity;
  192. //community_4.1.0-trunk1-Debug[heads/wssql-0-gb9e351-dirty
  193. const char * tail = build_ver;
  194. while (tail && *tail != '_')
  195. project.append(*tail++);
  196. tail++;
  197. while (tail && *tail != '.')
  198. major.append(*tail++);
  199. resp.setMajor(major.str());
  200. tail++;
  201. while (tail && *tail != '.')
  202. minor.append(*tail++);
  203. resp.setMinor(minor.str());
  204. tail++;
  205. while (tail && *tail != '-')
  206. point.append(*tail++);
  207. resp.setPoint(point.str());
  208. if (req.getIncludeAll())
  209. {
  210. resp.setFullVersion(build_ver);
  211. resp.setProject(project.str());
  212. tail++;
  213. while (tail && *tail != '-' && *tail != '[')
  214. maturity.append(*tail++);
  215. resp.setMaturity(maturity.str());
  216. }
  217. }
  218. const char* wssqlbuild_ver = getWsSqlBuildVersion();
  219. if (wssqlbuild_ver && *wssqlbuild_ver)
  220. {
  221. StringBuffer major;
  222. StringBuffer minor;
  223. StringBuffer point;
  224. StringBuffer maturity;
  225. //5.4.0-trunk1-Debug[heads/wssql-0-gb9e351-dirty
  226. const char * tail = wssqlbuild_ver;
  227. while (tail && *tail != '.')
  228. major.append(*tail++);
  229. resp.setWsSQLMajor(major.str());
  230. tail++;
  231. while (tail && *tail != '.')
  232. minor.append(*tail++);
  233. resp.setWsSQLMinor(minor.str());
  234. tail++;
  235. while (tail && *tail != '-')
  236. point.append(*tail++);
  237. resp.setWsSQLPoint(point.str());
  238. if (req.getIncludeAll())
  239. {
  240. resp.setWsSQLFullVersion(wssqlbuild_ver);
  241. tail++;
  242. while (tail && *tail != '-' && *tail != '[')
  243. maturity.append(*tail++);
  244. resp.setWsSQLMaturity(maturity.str());
  245. }
  246. success = true;
  247. }
  248. }
  249. catch (...)
  250. {
  251. ERRLOG("Error Parsing HPCC and/or WsSQL Version string.");
  252. }
  253. return success;
  254. }
  255. void printTree(pANTLR3_BASE_TREE t, int indent)
  256. {
  257. pANTLR3_BASE_TREE child = NULL;
  258. int children = 0;
  259. char * tokenText = NULL;
  260. string ind = "";
  261. int i = 0;
  262. if ( t != NULL )
  263. {
  264. children = t->getChildCount(t);
  265. for ( i = 0; i < indent; i++ )
  266. ind += " ";
  267. for ( i = 0; i < children; i++ )
  268. {
  269. pANTLR3_BASE_TREE child = (pANTLR3_BASE_TREE)(t->getChild(t, i));
  270. ANTLR3_UINT32 tokenType = child->getType(child);
  271. tokenText = (char *)child->toString(child)->chars;
  272. fprintf(stderr, "%s%s\n", ind.c_str(), tokenText);
  273. if (tokenType == ANTLR3_TOKEN_EOF)
  274. break;
  275. printTree(child, indent+1);
  276. }
  277. }
  278. }
  279. void myDisplayRecognitionError (pANTLR3_BASE_RECOGNIZER recognizer,pANTLR3_UINT8 * tokenNames)
  280. {
  281. StringBuffer errorMessage;
  282. pANTLR3_PARSER parser = NULL;
  283. pANTLR3_TREE_PARSER tparser = NULL;
  284. pANTLR3_INT_STREAM is;
  285. pANTLR3_STRING ttext;
  286. pANTLR3_EXCEPTION ex;
  287. pANTLR3_COMMON_TOKEN theToken;
  288. pANTLR3_BASE_TREE theBaseTree;
  289. pANTLR3_COMMON_TREE theCommonTree;
  290. ex = recognizer->state->exception;
  291. ttext = nullptr;
  292. errorMessage.append("Error while parsing");
  293. if (ex)
  294. {
  295. errorMessage.appendf(": ANTLR Error %d : %s", ex->type, (pANTLR3_UINT8)(ex->message));
  296. switch (recognizer->type)
  297. {
  298. case ANTLR3_TYPE_PARSER:
  299. {
  300. parser = (pANTLR3_PARSER) (recognizer->super);
  301. is = parser->tstream->istream;
  302. theToken = (pANTLR3_COMMON_TOKEN)(ex->token);
  303. if (theToken)
  304. {
  305. ttext = theToken->toString(theToken);
  306. if (theToken->type == ANTLR3_TOKEN_EOF)
  307. errorMessage.append(", at <EOF>");
  308. else
  309. errorMessage.appendf("\n Near %s\n ", ttext == nullptr ? (pANTLR3_UINT8)"<no text for the token>" : ttext->chars);
  310. }
  311. break;
  312. }
  313. case ANTLR3_TYPE_TREE_PARSER:
  314. {
  315. tparser = (pANTLR3_TREE_PARSER) (recognizer->super);
  316. is = tparser->ctnstream->tnstream->istream;
  317. theBaseTree = (pANTLR3_BASE_TREE)(ex->token);
  318. if (theBaseTree)
  319. {
  320. ttext = theBaseTree->toStringTree(theBaseTree);
  321. theCommonTree = (pANTLR3_COMMON_TREE) theBaseTree->super;
  322. if (theCommonTree != nullptr)
  323. theToken = (pANTLR3_COMMON_TOKEN) theBaseTree->getToken(theBaseTree);
  324. errorMessage.appendf( ", at offset %d", theBaseTree->getCharPositionInLine(theBaseTree));
  325. errorMessage.appendf( ", near %s", ttext->chars);
  326. }
  327. break;
  328. }
  329. default:
  330. //errorMessage.appendf("Base recognizer function displayRecognitionError called by unknown parser type - provide override for this function\n");
  331. return;
  332. break;
  333. }
  334. switch (ex->type)
  335. {
  336. case ANTLR3_UNWANTED_TOKEN_EXCEPTION:
  337. {
  338. // Indicates that the recognizer was fed a token which seesm to be
  339. // spurious input. We can detect this when the token that follows
  340. // this unwanted token would normally be part of the syntactically
  341. // correct stream. Then we can see that the token we are looking at
  342. // is just something that should not be there and throw this exception.
  343. //
  344. if (tokenNames == nullptr)
  345. {
  346. errorMessage.appendf( " : Extraneous input...");
  347. }
  348. else
  349. {
  350. if (ex->expecting == ANTLR3_TOKEN_EOF)
  351. errorMessage.appendf(" : Extraneous input - expected <EOF>\n");
  352. else
  353. errorMessage.appendf(" : Extraneous input - expected %s ...\n", tokenNames[ex->expecting]);
  354. }
  355. break;
  356. }
  357. case ANTLR3_MISSING_TOKEN_EXCEPTION:
  358. {
  359. // Indicates that the recognizer detected that the token we just
  360. // hit would be valid syntactically if preceeded by a particular
  361. // token. Perhaps a missing ';' at line end or a missing ',' in an
  362. // expression list, and such like.
  363. //
  364. if (tokenNames == nullptr)
  365. {
  366. errorMessage.appendf( " : Missing token (%d)...\n", ex->expecting);
  367. }
  368. else
  369. {
  370. if (ex->expecting == ANTLR3_TOKEN_EOF)
  371. errorMessage.appendf( " : Missing <EOF>\n");
  372. else
  373. errorMessage.appendf( " : Missing %s \n", tokenNames[ex->expecting]);
  374. }
  375. break;
  376. }
  377. case ANTLR3_RECOGNITION_EXCEPTION:
  378. {
  379. // Indicates that the recognizer received a token
  380. // in the input that was not predicted. This is the basic exception type
  381. // from which all others are derived. So we assume it was a syntax error.
  382. // You may get this if there are not more tokens and more are needed
  383. // to complete a parse for instance.
  384. //
  385. errorMessage.appendf( " : syntax error...\n");
  386. break;
  387. }
  388. case ANTLR3_MISMATCHED_TOKEN_EXCEPTION:
  389. {
  390. // We were expecting to see one thing and got another. This is the
  391. // most common error if we coudl not detect a missing or unwanted token.
  392. // Here you can spend your efforts to
  393. // derive more useful error messages based on the expected
  394. // token set and the last token and so on. The error following
  395. // bitmaps do a good job of reducing the set that we were looking
  396. // for down to something small. Knowing what you are parsing may be
  397. // able to allow you to be even more specific about an error.
  398. //
  399. if (tokenNames == NULL)
  400. {
  401. errorMessage.appendf(" : syntax error...\n");
  402. }
  403. else
  404. {
  405. if (ex->expecting == ANTLR3_TOKEN_EOF)
  406. errorMessage.appendf(" : expected <EOF>\n");
  407. else
  408. errorMessage.appendf(" : expected %s ...\n", tokenNames[ex->expecting]);
  409. }
  410. break;
  411. }
  412. case ANTLR3_NO_VIABLE_ALT_EXCEPTION:
  413. {
  414. // We could not pick any alt decision from the input given
  415. // so god knows what happened - however when you examine your grammar,
  416. // you should. It means that at the point where the current token occurred
  417. // that the DFA indicates nowhere to go from here.
  418. //
  419. errorMessage.appendf(" : cannot match to any predicted input...\n");
  420. break;
  421. }
  422. case ANTLR3_MISMATCHED_SET_EXCEPTION:
  423. {
  424. ANTLR3_UINT32 count;
  425. ANTLR3_UINT32 bit;
  426. ANTLR3_UINT32 size;
  427. ANTLR3_UINT32 numbits;
  428. pANTLR3_BITSET errBits;
  429. // This means we were able to deal with one of a set of
  430. // possible tokens at this point, but we did not see any
  431. // member of that set.
  432. errorMessage.appendf( " : unexpected input...\n expected one of : ");
  433. // What tokens could we have accepted at this point in the parse?
  434. count = 0;
  435. errBits = antlr3BitsetLoad (ex->expectingSet);
  436. numbits = errBits->numBits (errBits);
  437. size = errBits->size (errBits);
  438. if (size > 0)
  439. {
  440. // However many tokens we could have dealt with here, it is usually
  441. // not useful to print ALL of the set here. I arbitrarily chose 8
  442. // here, but you should do whatever makes sense for you of course.
  443. // No token number 0, so look for bit 1 and on.
  444. for (bit = 1; bit < numbits && count < 8 && count < size; bit++)
  445. {
  446. if (tokenNames[bit])
  447. {
  448. errorMessage.appendf( "%s%s", count > 0 ? ", " : "", tokenNames[bit]);
  449. count++;
  450. }
  451. }
  452. errorMessage.appendf( "\n");
  453. }
  454. else
  455. {
  456. errorMessage.appendf( "Unknown parsing error.\n");
  457. }
  458. break;
  459. }
  460. case ANTLR3_EARLY_EXIT_EXCEPTION:
  461. {
  462. // We entered a loop requiring a number of token sequences
  463. // but found a token that ended that sequence earlier than
  464. // we should have done.
  465. errorMessage.appendf( " : missing elements...\n");
  466. break;
  467. }
  468. default:
  469. {
  470. // We don't handle any other exceptions here, but you can
  471. // if you wish. If we get an exception that hits this point
  472. // then we are just going to report what we know about the
  473. // token.
  474. //
  475. errorMessage.appendf( " : unrecognized syntax...\n");
  476. break;
  477. }
  478. }
  479. }
  480. throw MakeStringException(-1, "%s", errorMessage.str());
  481. }
  482. HPCCSQLTreeWalker * CwssqlEx::parseSQL(IEspContext &context, StringBuffer & sqltext, bool attemptParameterization)
  483. {
  484. int limit = -1;
  485. pHPCCSQLLexer hpccSqlLexer = NULL;
  486. pANTLR3_COMMON_TOKEN_STREAM sqlTokens = NULL;
  487. pHPCCSQLParser hpccSqlParser = NULL;
  488. pANTLR3_BASE_TREE sqlAST = NULL;
  489. pANTLR3_INPUT_STREAM sqlInputStream = NULL;
  490. Owned<HPCCSQLTreeWalker> hpccSqlTreeWalker;
  491. try
  492. {
  493. if (sqltext.length() <= 0)
  494. throw MakeStringException(-1, "Empty SQL String detected.");
  495. pANTLR3_UINT8 input_string = (pANTLR3_UINT8)sqltext.str();
  496. pANTLR3_INPUT_STREAM sqlinputstream = antlr3StringStreamNew(input_string,
  497. ANTLR3_ENC_8BIT,
  498. sqltext.length(),
  499. (pANTLR3_UINT8)"SQL INPUT");
  500. pHPCCSQLLexer hpccsqllexer = HPCCSQLLexerNew(sqlinputstream);
  501. //hpccSqlLexer->pLexer->rec->displayRecognitionError = myDisplayRecognitionError;
  502. //ANTLR3_UINT32 lexerrors = hpccsqllexer->pLexer->rec->getNumberOfSyntaxErrors(hpccsqllexer->pLexer->rec);
  503. //if (lexerrors > 0)
  504. // throw MakeStringException(-1, "HPCCSQL Lexer reported %d error(s), request aborted.", lexerrors);
  505. pANTLR3_COMMON_TOKEN_STREAM sqltokens = antlr3CommonTokenStreamSourceNew(ANTLR3_SIZE_HINT, TOKENSOURCE(hpccsqllexer));
  506. if (sqltokens == NULL)
  507. {
  508. throw MakeStringException(-1, "Out of memory trying to allocate ANTLR HPCCSQLParser token stream.");
  509. }
  510. pHPCCSQLParser hpccsqlparser = HPCCSQLParserNew(sqltokens);
  511. //#if not defined(_DEBUG)
  512. hpccsqlparser->pParser->rec->displayRecognitionError = myDisplayRecognitionError;
  513. //#endif
  514. pANTLR3_BASE_TREE sqlAST = (hpccsqlparser->root_statement(hpccsqlparser)).tree;
  515. ANTLR3_UINT32 parserrors = hpccsqlparser->pParser->rec->getNumberOfSyntaxErrors(hpccsqlparser->pParser->rec);
  516. if (parserrors > 0)
  517. throw MakeStringException(-1, "HPCCSQL Parser reported %d error(s), request aborted.", parserrors);
  518. #if defined(_DEBUG)
  519. printTree(sqlAST, 0);
  520. #endif
  521. hpccSqlTreeWalker.setown(new HPCCSQLTreeWalker(sqlAST, context, attemptParameterization));
  522. hpccsqlparser->free(hpccsqlparser);
  523. sqltokens->free(sqltokens);
  524. hpccsqllexer->free(hpccsqllexer);
  525. sqlinputstream->free(sqlinputstream);
  526. }
  527. catch(IException* e)
  528. {
  529. try
  530. {
  531. if (hpccSqlParser)
  532. hpccSqlParser->free(hpccSqlParser);
  533. if (sqlTokens)
  534. sqlTokens->free(sqlTokens);
  535. if (hpccSqlLexer)
  536. hpccSqlLexer->free(hpccSqlLexer);
  537. if (sqlInputStream)
  538. sqlInputStream->free(sqlInputStream);
  539. hpccSqlTreeWalker.clear();
  540. }
  541. catch (...)
  542. {
  543. ERRLOG("!!! Unable to free HPCCSQL parser/lexer objects.");
  544. }
  545. //All IExceptions get bubbled up
  546. throw e;
  547. }
  548. catch(...)
  549. {
  550. try
  551. {
  552. if (hpccSqlParser)
  553. hpccSqlParser->free(hpccSqlParser);
  554. if (sqlTokens)
  555. sqlTokens->free(sqlTokens);
  556. if (hpccSqlLexer)
  557. hpccSqlLexer->free(hpccSqlLexer);
  558. if (sqlInputStream)
  559. sqlInputStream->free(sqlInputStream);
  560. hpccSqlTreeWalker.clear();
  561. }
  562. catch (...)
  563. {
  564. ERRLOG("!!! Unable to free HPCCSQL parser/lexer objects.");
  565. }
  566. //All other unexpected exceptions are reported as generic ecl generation error.
  567. throw MakeStringException(-1, "Error generating ECL code.");
  568. }
  569. return hpccSqlTreeWalker.getLink();
  570. }
  571. bool CwssqlEx::getWUResult(IEspContext &context, const char * wuid, StringBuffer &result, unsigned start, unsigned count, int sequence, const char * dsname, const char * schemaname)
  572. {
  573. context.addTraceSummaryTimeStamp(LogMin, "StrtgetReslts");
  574. if (wuid && *wuid)
  575. {
  576. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  577. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
  578. if (!cw)
  579. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid);
  580. SCMStringBuffer stateDesc;
  581. switch (cw->getState())
  582. {
  583. case WUStateCompleted:
  584. case WUStateFailed:
  585. case WUStateUnknown:
  586. case WUStateCompiled:
  587. {
  588. StringBufferAdaptor resultXML(result);
  589. Owned<IResultSetFactory> factory = getResultSetFactory(context.queryUserId(), context.queryPassword());
  590. Owned<INewResultSet> nr = factory->createNewResultSet(wuid, sequence, NULL);
  591. if (nr.get())
  592. {
  593. context.addTraceSummaryTimeStamp(LogMax, "strtgetXMLRslts");
  594. getResultXml(resultXML, nr.get(), dsname, start, count, schemaname);
  595. context.addTraceSummaryTimeStamp(LogMax, "endgetXMLRslts");
  596. }
  597. else
  598. return false;
  599. break;
  600. }
  601. default:
  602. break;
  603. }
  604. context.addTraceSummaryTimeStamp(LogMin, "ExitgetRslts");
  605. return true;
  606. }
  607. context.addTraceSummaryTimeStamp(LogMin, "ExitgetRslts");
  608. return false;
  609. }
  610. bool CwssqlEx::onSetRelatedIndexes(IEspContext &context, IEspSetRelatedIndexesRequest &req, IEspSetRelatedIndexesResponse &resp)
  611. {
  612. if (!context.validateFeatureAccess(WSSQLACCESS, SecAccess_Write, false))
  613. throw MakeStringException(-1, "WsSQL::SetRelatedIndexes failed to execute SQL. Permission denied.");
  614. StringBuffer username;
  615. context.getUserID(username);
  616. const char* passwd = context.queryPassword();
  617. IArrayOf<IConstRelatedIndexSet>& relatedindexSets = req.getRelatedIndexSets();
  618. if (relatedindexSets.length() == 0)
  619. throw MakeStringException(-1, "WsSQL::SetRelatedIndexes empty request detected.");
  620. ForEachItemIn(relatedindexsetindex, relatedindexSets)
  621. {
  622. IConstRelatedIndexSet &relatedIndexSet = relatedindexSets.item(relatedindexsetindex);
  623. const char * fileName = relatedIndexSet.getFileName();
  624. if (!fileName || !*fileName)
  625. throw MakeStringException(-1, "WsSQL::SetRelatedIndexes error: Empty file name detected.");
  626. StringArray& indexHints = relatedIndexSet.getIndexes();
  627. int indexHintsCount = indexHints.length();
  628. if (indexHintsCount > 0)
  629. {
  630. Owned<HPCCFile> file = HPCCFileCache::fetchHpccFileByName(fileName,username.str(), passwd, false, false);
  631. if (!file)
  632. throw MakeStringException(-1, "WsSQL::SetRelatedIndexes error: could not find file: %s.", fileName);
  633. StringBuffer description;
  634. StringBuffer currentIndexes;
  635. description = file->getDescription();
  636. HPCCFile::parseOutRelatedIndexes(description, currentIndexes);
  637. description.append("\nXDBC:RelIndexes=[");
  638. for(int indexHintIndex = 0; indexHintIndex < indexHintsCount; indexHintIndex++)
  639. {
  640. description.appendf("%s%c", indexHints.item(indexHintIndex), (indexHintIndex < indexHintsCount-1 ? ';' : ' '));
  641. }
  642. description.append("]\n");
  643. HPCCFileCache::updateHpccFileDescription(fileName, username, passwd, description.str());
  644. file->setDescription(description.str());
  645. }
  646. }
  647. resp.setRelatedIndexSets(relatedindexSets);
  648. return true;
  649. }
  650. bool CwssqlEx::onGetRelatedIndexes(IEspContext &context, IEspGetRelatedIndexesRequest &req, IEspGetRelatedIndexesResponse &resp)
  651. {
  652. try
  653. {
  654. if (!context.validateFeatureAccess(WSSQLACCESS, SecAccess_Read, false))
  655. throw MakeStringException(-1, "Failed to execute SQL. Permission denied.");
  656. StringArray& filenames = req.getFileNames();
  657. if (filenames.length() == 0)
  658. throw MakeStringException(-1, "WsSQL::GetRelatedIndexes error: No filenames detected");
  659. StringBuffer username;
  660. context.getUserID(username);
  661. const char* passwd = context.queryPassword();
  662. IArrayOf<IEspRelatedIndexSet> relatedindexSets;
  663. ForEachItemIn(filenameindex, filenames)
  664. {
  665. const char * fileName = filenames.item(filenameindex);
  666. Owned<HPCCFile> file = HPCCFileCache::fetchHpccFileByName(fileName,username.str(), passwd, false, false);
  667. if (file)
  668. {
  669. StringArray indexHints;
  670. file->getRelatedIndexes(indexHints);
  671. Owned<IEspRelatedIndexSet> relatedIndexSet = createRelatedIndexSet("", "");
  672. relatedIndexSet->setFileName(fileName);
  673. relatedIndexSet->setIndexes(indexHints);
  674. relatedindexSets.append(*relatedIndexSet.getLink());
  675. }
  676. }
  677. resp.setRelatedIndexSets(relatedindexSets);
  678. }
  679. catch(IException* e)
  680. {
  681. FORWARDEXCEPTION(context, e, -1);
  682. }
  683. return true;
  684. }
  685. void CwssqlEx::processMultipleClusterOption(StringArray & clusters, const char * targetcluster, StringBuffer & hashoptions)
  686. {
  687. int clusterscount = clusters.length();
  688. if (clusterscount > 0)
  689. {
  690. hashoptions.appendf("\n#OPTION('AllowedClusters', '%s", targetcluster);
  691. ForEachItemIn(i,clusters)
  692. {
  693. if (!isValidCluster(clusters.item(i)))
  694. throw MakeStringException(-1, "Invalid alternate cluster name: %s", clusters.item(i));
  695. hashoptions.appendf(",%s", clusters.item(i));
  696. }
  697. hashoptions.append("');\n#OPTION('AllowAutoQueueSwitch', TRUE);\n\n");
  698. }
  699. }
  700. bool CwssqlEx::onExecuteSQL(IEspContext &context, IEspExecuteSQLRequest &req, IEspExecuteSQLResponse &resp)
  701. {
  702. try
  703. {
  704. context.addTraceSummaryTimeStamp(LogMin, "StrtOnExecuteSQL");
  705. if (!context.validateFeatureAccess(WSSQLACCESS, SecAccess_Write, false))
  706. throw MakeStringException(-1, "Failed to execute SQL. Permission denied.");
  707. double version = context.getClientVersion();
  708. StringBuffer sqltext;
  709. StringBuffer ecltext;
  710. StringBuffer username;
  711. context.getUserID(username);
  712. const char* passwd = context.queryPassword();
  713. sqltext.set(req.getSqlText());
  714. if (sqltext.length() <= 0)
  715. throw MakeStringException(1,"Empty SQL request.");
  716. const char * cluster = req.getTargetCluster();
  717. StringBuffer hashoptions;
  718. if (version > 3.03)
  719. {
  720. StringArray & alternates = req.getAlternateClusters();
  721. if (alternates.length() > 0)
  722. processMultipleClusterOption(alternates, cluster, hashoptions);
  723. }
  724. SCMStringBuffer compiledwuid;
  725. int resultLimit = req.getResultLimit();
  726. __int64 resultWindowStart = req.getResultWindowStart();
  727. __int64 resultWindowCount = req.getResultWindowCount();
  728. if (resultWindowStart < 0 || resultWindowCount <0 )
  729. throw MakeStringException(-1,"Invalid result window value");
  730. bool clonable = false;
  731. bool cacheeligible = (version > 3.04 ) ? !req.getIgnoreCache() : true;
  732. Owned<HPCCSQLTreeWalker> parsedSQL;
  733. ESPLOG(LogNormal, "WsSQL: Parsing sql query...");
  734. parsedSQL.setown(parseSQL(context, sqltext));
  735. ESPLOG(LogNormal, "WsSQL: Finished parsing sql query...");
  736. SQLQueryType querytype = parsedSQL->getSqlType();
  737. if (querytype == SQLTypeCall)
  738. {
  739. if (strlen(parsedSQL->getQuerySetName())==0)
  740. {
  741. if (strlen(req.getTargetQuerySet())==0)
  742. throw MakeStringException(-1,"Missing Target QuerySet.");
  743. else
  744. parsedSQL->setQuerySetName(req.getTargetQuerySet());
  745. }
  746. ESPLOG(LogMax, "WsSQL: Processing call query...");
  747. WsEclWuInfo wsinfo("", parsedSQL->getQuerySetName(), parsedSQL->getStoredProcName(), username.str(), passwd);
  748. compiledwuid.set(wsinfo.ensureWuid());
  749. clonable = true;
  750. }
  751. else if (querytype == SQLTypeCreateAndLoad)
  752. {
  753. cacheeligible = false;
  754. }
  755. StringBuffer xmlparams;
  756. StringBuffer normalizedSQL = parsedSQL->getNormalizedSQL();
  757. normalizedSQL.append(" | --TC=").append(cluster);
  758. if (username.length() > 0)
  759. normalizedSQL.append("--USER=").append(username.str());
  760. if (resultLimit > 0)
  761. normalizedSQL.append("--HARDLIMIT=").append(resultLimit);
  762. const char * wuusername = req.getUserName();
  763. if (wuusername && *wuusername)
  764. normalizedSQL.append("--WUOWN=").append(wuusername);
  765. if (hashoptions.length()>0)
  766. normalizedSQL.append("--HO=").append(hashoptions.str());
  767. if (compiledwuid.length() != 0)
  768. normalizedSQL.append("--PWUID=").append(compiledwuid.str());
  769. ESPLOG(LogMax, "WsSQL: getWorkUnitFactory...");
  770. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  771. ESPLOG(LogMax, "WsSQL: checking query cache...");
  772. if(cacheeligible && getCachedQuery(normalizedSQL.str(), compiledwuid.s))
  773. {
  774. ESPLOG(LogMax, "WsSQL: cache hit opening wuid %s...", compiledwuid.str());
  775. Owned<IConstWorkUnit> cw = factory->openWorkUnit(compiledwuid.str(), false);
  776. if (!cw)//cache hit but unavailable WU
  777. {
  778. ESPLOG(LogMax, "WsSQL: cache hit but unavailable WU...");
  779. removeQueryFromCache(normalizedSQL.str());
  780. compiledwuid.clear();
  781. }
  782. else
  783. clonable = true;
  784. }
  785. if (compiledwuid.length()==0)
  786. {
  787. {
  788. if (isEmpty(cluster))
  789. throw MakeStringException(-1,"Target cluster not set.");
  790. if (!isValidCluster(cluster))
  791. throw MakeStringException(-1, "Invalid cluster name: %s", cluster);
  792. if (querytype == SQLTypeCreateAndLoad)
  793. clonable = false;
  794. context.addTraceSummaryTimeStamp(LogNormal, "StartECLGenerate");
  795. ECLEngine::generateECL(parsedSQL, ecltext);
  796. if (hashoptions.length() > 0)
  797. ecltext.insert(0, hashoptions.str());
  798. context.addTraceSummaryTimeStamp(LogNormal, "EndECLGenerate");
  799. if (isEmpty(ecltext))
  800. throw MakeStringException(1,"Could not generate ECL from SQL.");
  801. ecltext.appendf(EMBEDDEDSQLQUERYCOMMENT, sqltext.str(), normalizedSQL.str());
  802. #if defined _DEBUG
  803. fprintf(stderr, "GENERATED ECL:\n%s\n", ecltext.str());
  804. #endif
  805. ESPLOG(LogMax, "WsSQL: creating new WU...");
  806. NewWsWorkunit wu(context);
  807. compiledwuid.set(wu->queryWuid());
  808. wu->setJobName("WsSQL Job");
  809. wu.setQueryText(ecltext.str());
  810. wu->setClusterName(cluster);
  811. if (clonable)
  812. wu->setCloneable(true);
  813. wu->setAction(WUActionCompile);
  814. if (resultLimit)
  815. wu->setResultLimit(resultLimit);
  816. if (wuusername && *wuusername)
  817. wu->setUser(wuusername);
  818. wu->commit();
  819. wu.clear();
  820. context.addTraceSummaryTimeStamp(LogNormal, "strtWUCompile");
  821. WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, NULL, 0, true, false, false, NULL, NULL, NULL);
  822. waitForWorkUnitToCompile(compiledwuid.str(), req.getWait());
  823. context.addTraceSummaryTimeStamp(LogNormal, "endWUCompile");
  824. }
  825. }
  826. ESPLOG(LogMax, "WsSQL: opening WU...");
  827. Owned<IConstWorkUnit> cw = factory->openWorkUnit(compiledwuid.str(), false);
  828. if (!cw)
  829. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", compiledwuid.str());
  830. WsWUExceptions errors(*cw);
  831. if (errors.ErrCount()>0)
  832. {
  833. WsWuInfo winfo(context, compiledwuid.str());
  834. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  835. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  836. }
  837. else
  838. {
  839. if (querytype == SQLTypeCall)
  840. createWUXMLParams(xmlparams, parsedSQL, NULL, cw);
  841. else if (querytype == SQLTypeSelect)
  842. {
  843. if (notEmpty(cluster) && !isValidCluster(cluster))
  844. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster);
  845. createWUXMLParams(xmlparams, parsedSQL->getParamList());
  846. }
  847. StringBuffer runningwuid;
  848. if (clonable)
  849. {
  850. context.addTraceSummaryTimeStamp(LogNormal, "StartWUCloneExe");
  851. cloneAndExecuteWU(context, compiledwuid.str(), runningwuid, xmlparams.str(), NULL, NULL, cluster);
  852. context.addTraceSummaryTimeStamp(LogNormal, "EndWUCloneExe");
  853. if(cacheeligible && !isQueryCached(normalizedSQL.str()))
  854. addQueryToCache(normalizedSQL.str(), compiledwuid.str());
  855. }
  856. else
  857. {
  858. context.addTraceSummaryTimeStamp(LogNormal, "StartWUSubmit");
  859. WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, NULL, 0, false, true, true, NULL, NULL, NULL);
  860. context.addTraceSummaryTimeStamp(LogNormal, "EndWUSubmit");
  861. runningwuid.set(compiledwuid.str());
  862. if (cacheeligible)
  863. addQueryToCache(normalizedSQL.str(), runningwuid.str());
  864. }
  865. int timeToWait = req.getWait();
  866. if (timeToWait != 0)
  867. {
  868. context.addTraceSummaryTimeStamp(LogNormal, "StartWUProcessWait");
  869. waitForWorkUnitToComplete(runningwuid.str(), timeToWait);
  870. context.addTraceSummaryTimeStamp(LogNormal, "EndWUProcessWait");
  871. }
  872. if (strcmp(runningwuid.str(), compiledwuid.str())!=0)
  873. resp.setParentWuId(compiledwuid.str());
  874. resp.setResultLimit(resultLimit);
  875. resp.setResultWindowCount( (unsigned)resultWindowCount);
  876. resp.setResultWindowStart( (unsigned)resultWindowStart);
  877. if (!req.getSuppressResults())
  878. {
  879. StringBuffer result;
  880. if (getWUResult(context, runningwuid.str(), result, (unsigned)resultWindowStart, (unsigned)resultWindowCount, 0, WSSQLRESULT, req.getSuppressXmlSchema() ? NULL : WSSQLRESULTSCHEMA))
  881. {
  882. StringBuffer count;
  883. if (getWUResult( context, runningwuid.str(), count , 0, 1, 1, WSSQLCOUNT, NULL))
  884. result.append(count.str());
  885. resp.setResult(result.str());
  886. }
  887. }
  888. WsWuInfo winfo(context, runningwuid);
  889. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  890. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  891. }
  892. AuditSystemAccess(context.queryUserId(), true, "Updated %s", compiledwuid.str());
  893. }
  894. catch(IException* e)
  895. {
  896. FORWARDEXCEPTION(context, e, -1);
  897. }
  898. //catch (...)
  899. //{
  900. // me->append(*MakeStringException(0,"Unknown exception submitting %s",wuid.str()));
  901. //}
  902. context.addTraceSummaryTimeStamp(LogMin, "EndOnExecuteSQL");
  903. return true;
  904. }
  905. void CwssqlEx::createWUXMLParams(StringBuffer & xmlparams, const IArrayOf <ISQLExpression> * parameterlist)
  906. {
  907. xmlparams.append("<root>");
  908. for (int expindex = 0; expindex < parameterlist->length(); expindex++)
  909. {
  910. ISQLExpression * exp = &parameterlist->item(expindex);
  911. if (exp->getExpType() == Value_ExpressionType)
  912. {
  913. SQLValueExpression * currentvalplaceholder = static_cast<SQLValueExpression *>(exp);
  914. currentvalplaceholder->trimTextQuotes();
  915. xmlparams.appendf("<%s>", currentvalplaceholder->getPlaceHolderName());
  916. encodeXML(currentvalplaceholder->getValue(), xmlparams);
  917. xmlparams.appendf("</%s>", currentvalplaceholder->getPlaceHolderName());
  918. }
  919. else
  920. ESPLOG(LogNormal, "WsSQL: attempted to create XML params from unexpected expression type.");
  921. }
  922. xmlparams.append("</root>");
  923. DBGLOG("XML PARAMS: %s", xmlparams.str());
  924. }
  925. //Integrates all "variables" into "param" based xml
  926. void CwssqlEx::createWUXMLParams(StringBuffer & xmlparams, HPCCSQLTreeWalker* parsedSQL, IArrayOf<IConstNamedValue> *variables, IConstWorkUnit * cw)
  927. {
  928. IArrayOf<IConstWUResult> expectedparams;
  929. if (cw)
  930. {
  931. IConstWUResultIterator &vars = cw->getVariables();
  932. ForEach(vars)
  933. {
  934. IConstWUResult &cur = vars.query();
  935. expectedparams.append(cur);
  936. }
  937. }
  938. if (expectedparams.length() > 0)
  939. {
  940. int totalvars = 0;
  941. if (variables)
  942. totalvars = variables->length();
  943. if (parsedSQL && parsedSQL->getSqlType() == SQLTypeCall)
  944. {
  945. IArrayOf<ISQLExpression> * embeddedparams = NULL;
  946. if (parsedSQL)
  947. embeddedparams = parsedSQL->getStoredProcParamList();
  948. int parametersidx = 0;
  949. int varsidx = 0;
  950. SCMStringBuffer varname;
  951. if (embeddedparams && embeddedparams->length()>0)
  952. {
  953. xmlparams.append("<root>");
  954. for(int i=0; i < embeddedparams->length() && i < expectedparams.length(); i++)
  955. {
  956. expectedparams.item(i).getResultName(varname);
  957. xmlparams.append("<").append(varname.s.str()).append(">");
  958. ISQLExpression* paramcol = &embeddedparams->item(i);
  959. if (paramcol->getExpType() == ParameterPlaceHolder_ExpressionType)
  960. {
  961. if (varsidx < totalvars)
  962. {
  963. IConstNamedValue &item = variables->item(varsidx++);
  964. const char *value = item.getValue();
  965. if(value && *value)
  966. encodeXML(value, xmlparams);
  967. // else ??
  968. }
  969. }
  970. else
  971. {
  972. paramcol->toString(xmlparams,false);
  973. }
  974. xmlparams.append("</").append(varname.s.str()).append(">");
  975. }
  976. xmlparams.append("</root>");
  977. }
  978. }
  979. else
  980. {
  981. int parametersidx = 0;
  982. int varsidx = 0;
  983. SCMStringBuffer varname;
  984. xmlparams.append("<root>");
  985. for(int i=0; i < expectedparams.length() && i < totalvars; i++)
  986. {
  987. expectedparams.item(i).getResultName(varname);
  988. xmlparams.append("<").append(varname.s.str()).append(">");
  989. IConstNamedValue &item = variables->item(i);
  990. char * value = ((char *)item.getValue());
  991. if (value && *value)
  992. {
  993. while(value && isspace(*value)) //fast trim left
  994. value++;
  995. int len = strlen(value);
  996. while(len && isspace(value[len-1]))
  997. len--;
  998. value[len] = '\0';//fast trim right, even if len didn't change
  999. if (len >= 2)
  1000. {
  1001. //WU cloning mechanism doesn't handle quoted strings very well...
  1002. //We're forced to blindly remove them here...
  1003. if (value[0] == '\'' && value[len-1] == '\'')
  1004. {
  1005. value[len-1] = '\0'; //clip rightmost quote
  1006. value++; //clip leftmost quote
  1007. }
  1008. }
  1009. if(len)
  1010. encodeXML(value, xmlparams);
  1011. // else ??
  1012. }
  1013. xmlparams.append("</").append(varname.s.str()).append(">");
  1014. }
  1015. xmlparams.append("</root>");
  1016. }
  1017. }
  1018. }
  1019. bool CwssqlEx::onExecutePreparedSQL(IEspContext &context, IEspExecutePreparedSQLRequest &req, IEspExecutePreparedSQLResponse &resp)
  1020. {
  1021. try
  1022. {
  1023. if (!context.validateFeatureAccess(WSSQLACCESS, SecAccess_Write, false))
  1024. throw MakeStringException(-1, "Failed to execute SQL. Permission denied.");
  1025. const char *cluster = req.getTargetCluster();
  1026. if (notEmpty(cluster) && !isValidCluster(cluster))
  1027. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster);
  1028. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1029. StringBuffer runningWuId;
  1030. const char* parentWuId = req.getWuId();
  1031. Owned<IConstWorkUnit> cw = factory->openWorkUnit(parentWuId, false);
  1032. if (!cw)
  1033. throw MakeStringException(-1,"Cannot open workunit %s.", parentWuId);
  1034. WsWUExceptions errors(*cw);
  1035. if (errors.ErrCount()>0)
  1036. {
  1037. WsWuInfo winfo(context, parentWuId);
  1038. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  1039. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1040. }
  1041. else
  1042. {
  1043. StringBuffer xmlparams;
  1044. createWUXMLParams(xmlparams, NULL, &req.getVariables(),cw);
  1045. if (parentWuId && *parentWuId)
  1046. {
  1047. cloneAndExecuteWU(context, parentWuId, runningWuId, xmlparams, NULL, NULL, cluster);
  1048. }
  1049. else
  1050. throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Missing WuId");
  1051. int timeToWait = req.getWait();
  1052. if (timeToWait != 0)
  1053. waitForWorkUnitToComplete(runningWuId.str(), timeToWait);
  1054. Owned<IConstWorkUnit> cw = factory->openWorkUnit(runningWuId.str(), false);
  1055. if (!cw)
  1056. throw MakeStringException(-1,"Cannot open workunit %s.", runningWuId.str());
  1057. resp.setParentWuId(parentWuId);
  1058. __int64 resultWindowStart = req.getResultWindowStart();
  1059. __int64 resultWindowCount = req.getResultWindowCount();
  1060. if (resultWindowStart < 0 || resultWindowCount <0 )
  1061. throw MakeStringException(-1,"Invalid result window value");
  1062. if (!req.getSuppressResults())
  1063. {
  1064. StringBuffer result;
  1065. if (getWUResult(context, runningWuId.str(), result, (unsigned)resultWindowStart, (unsigned)resultWindowCount, 0, WSSQLRESULT, req.getSuppressXmlSchema() ? NULL : WSSQLRESULTSCHEMA))
  1066. resp.setResult(result.str());
  1067. }
  1068. WsWuInfo winfo(context, runningWuId);
  1069. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  1070. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1071. winfo.getVariables(resp.updateWorkunit(), WUINFO_All);
  1072. }
  1073. }
  1074. catch(IException* e)
  1075. {
  1076. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1077. }
  1078. return true;
  1079. }
  1080. bool CwssqlEx::isQueryCached(const char * sqlQuery)
  1081. {
  1082. CriticalBlock block(critCache);
  1083. return (sqlQuery && cachedSQLQueries.find(sqlQuery) != cachedSQLQueries.end());
  1084. }
  1085. bool CwssqlEx::getCachedQuery(const char * sqlQuery, StringBuffer & wuid)
  1086. {
  1087. CriticalBlock block(critCache);
  1088. if(sqlQuery && cachedSQLQueries.find(sqlQuery) != cachedSQLQueries.end())
  1089. {
  1090. wuid.set(cachedSQLQueries.find(sqlQuery)->second.c_str());
  1091. return true;
  1092. }
  1093. return false;
  1094. }
  1095. void CwssqlEx::removeQueryFromCache(const char * sqlQuery)
  1096. {
  1097. CriticalBlock block(critCache);
  1098. cachedSQLQueries.erase(sqlQuery);
  1099. }
  1100. bool CwssqlEx::addQueryToCache(const char * sqlQuery, const char * wuid)
  1101. {
  1102. if (sqlQuery && *sqlQuery && wuid && *wuid)
  1103. {
  1104. CriticalBlock block(critCache);
  1105. if (isCacheExpired())
  1106. {
  1107. ESPLOG(LogNormal, "WsSQL: Query Cache has expired and is being flushed.");
  1108. //Flushing cache logic could have been in dedicated function, but
  1109. //putting it here makes this action more atomic, less synchronization concerns
  1110. cachedSQLQueries.clear();
  1111. setNewCacheFlushTime();
  1112. }
  1113. cachedSQLQueries.insert(std::pair<std::string,std::string>(sqlQuery, wuid));
  1114. }
  1115. return false;
  1116. }
  1117. bool CwssqlEx::onPrepareSQL(IEspContext &context, IEspPrepareSQLRequest &req, IEspPrepareSQLResponse &resp)
  1118. {
  1119. bool success = false;
  1120. StringBuffer sqltext;
  1121. StringBuffer ecltext;
  1122. bool clonable = false;
  1123. try
  1124. {
  1125. if (!context.validateFeatureAccess(WSSQLACCESS, SecAccess_Write, false))
  1126. throw MakeStringException(-1, "Failed to Prepare SQL. Permission denied.");
  1127. double version = context.getClientVersion();
  1128. StringBuffer username;
  1129. context.getUserID(username);
  1130. const char* passwd = context.queryPassword();
  1131. sqltext.set(req.getSqlText());
  1132. if (sqltext.length() <= 0)
  1133. throw MakeStringException(1,"Empty SQL request.");
  1134. Owned<HPCCSQLTreeWalker> parsedSQL;
  1135. parsedSQL.setown(parseSQL(context, sqltext, false));
  1136. if (parsedSQL->getSqlType() == SQLTypeCall)
  1137. {
  1138. if (strlen(parsedSQL->getQuerySetName())==0)
  1139. {
  1140. if (strlen(req.getTargetQuerySet())==0)
  1141. throw MakeStringException(-1,"Missing Target QuerySet.");
  1142. else
  1143. parsedSQL->setQuerySetName(req.getTargetQuerySet());
  1144. }
  1145. }
  1146. const char * cluster = req.getTargetCluster();
  1147. StringBuffer hashoptions;
  1148. if (version > 3.03)
  1149. {
  1150. StringArray & alternates = req.getAlternateClusters();
  1151. if (alternates.length() > 0)
  1152. processMultipleClusterOption(alternates, cluster, hashoptions);
  1153. }
  1154. StringBuffer xmlparams;
  1155. StringBuffer normalizedSQL = parsedSQL->getNormalizedSQL();
  1156. normalizedSQL.append(" | --TC=").append(cluster);
  1157. if (username.length() > 0)
  1158. normalizedSQL.append("--USER=").append(username.str());
  1159. if (hashoptions.length()>0)
  1160. normalizedSQL.append("--HO=").append(hashoptions.str());
  1161. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1162. SCMStringBuffer wuid;
  1163. if(getCachedQuery(normalizedSQL.str(), wuid.s))
  1164. {
  1165. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  1166. if (!cw)//cache hit but unavailable WU
  1167. {
  1168. removeQueryFromCache(normalizedSQL.str());
  1169. wuid.clear();
  1170. }
  1171. }
  1172. if(wuid.length()==0)
  1173. {
  1174. if (parsedSQL->getSqlType() == SQLTypeCall)
  1175. {
  1176. WsEclWuInfo wsinfo("", parsedSQL->getQuerySetName(), parsedSQL->getStoredProcName(), username.str(), passwd);
  1177. wuid.set(wsinfo.ensureWuid());
  1178. //if call somePublishedQuery(1,2,3);
  1179. // or
  1180. // someotherPublishedQuery(1,?)
  1181. //must clone published query wuid and set params
  1182. //else just return published query WUID
  1183. if (parsedSQL->getStoredProcParamListCount() > 0 && !parsedSQL->isParameterizedCall() )
  1184. throw MakeStringException(-1, "Prepared Call query must be fully parameterized");
  1185. //KEEP THIS AROUND IF WE WANT TO SUPPORT CLONING WU with embedded params
  1186. // if (parsedSQL->isParameterizedCall())
  1187. // {
  1188. // createXMLParams(xmlparams, parsedSQL, NULL, wsinfo.wu);
  1189. //
  1190. // SCMStringBuffer newwuid;
  1191. // NewWsWorkunit wu(context);
  1192. // wu->getWuid(newwuid);
  1193. // if (xmlparams && *xmlparams)
  1194. // wu->setXmlParams(xmlparams);
  1195. //
  1196. // WsWuProcess::copyWsWorkunit(context, *wu, wuid.s.str());
  1197. //
  1198. // //StringBuffer params;
  1199. // //toXML(wu->getXmlParams(), params, true, true);
  1200. // wu->setCloneable(true);
  1201. // wu->setAction(WUActionCompile);
  1202. //
  1203. // wu->commit();
  1204. // wu.clear();
  1205. //
  1206. // WsWuProcess::submitWsWorkunit(context, newwuid.str(), req.getTargetCluster(), NULL, 0, true, false, false, xmlparams.str(), NULL, NULL);
  1207. // waitForWorkUnitToCompile(newwuid.str(), req.getWait());
  1208. //
  1209. // wuid.s.set(newwuid.str());
  1210. // }
  1211. }
  1212. else
  1213. {
  1214. if (isEmpty(cluster))
  1215. throw MakeStringException(1,"Target cluster not set.");
  1216. if (!isValidCluster(cluster))
  1217. throw MakeStringException(-1/*ECLWATCH_INVALID_CLUSTER_NAME*/, "Invalid cluster name: %s", cluster);
  1218. ECLEngine::generateECL(parsedSQL, ecltext);
  1219. if (hashoptions.length() > 0)
  1220. ecltext.insert(0, hashoptions.str());
  1221. #if defined _DEBUG
  1222. fprintf(stderr, "GENERATED ECL:\n%s\n", ecltext.str());
  1223. #endif
  1224. if (isEmpty(ecltext))
  1225. throw MakeStringException(1,"Could not generate ECL from SQL.");
  1226. //ecltext.appendf("\n\n/****************************************************\nOriginal SQL: \"%s\"\nNormalized SQL: \"%s\"\n****************************************************/\n", sqltext.str(), normalizedSQL.str());
  1227. ecltext.appendf(EMBEDDEDSQLQUERYCOMMENT, sqltext.str(), normalizedSQL.str());
  1228. NewWsWorkunit wu(context);
  1229. wuid.set(wu->queryWuid());
  1230. wu->setClusterName(cluster);
  1231. wu->setCloneable(true);
  1232. wu->setAction(WUActionCompile);
  1233. wu.setQueryText(ecltext.str());
  1234. wu->setJobName("WsSQL PreparedQuery Job");
  1235. StringBuffer xmlparams;
  1236. createWUXMLParams(xmlparams, parsedSQL, NULL, NULL);
  1237. wu->commit();
  1238. wu.clear();
  1239. WsWuHelpers::submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, true, false, false, xmlparams.str(), NULL, NULL);
  1240. success = waitForWorkUnitToCompile(wuid.str(), req.getWait());
  1241. }
  1242. if (success)
  1243. addQueryToCache(normalizedSQL.str(), wuid.s.str());
  1244. }
  1245. WsWuInfo winfo(context, wuid.str());
  1246. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  1247. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1248. StringBuffer result;
  1249. getWUResult(context, wuid.str(), result, 0, 0, 0, WSSQLRESULT, WSSQLRESULTSCHEMA);
  1250. resp.setResult(result);
  1251. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  1252. }
  1253. catch(IException* e)
  1254. {
  1255. FORWARDEXCEPTION(context, e, -1);
  1256. }
  1257. return true;
  1258. }
  1259. bool CwssqlEx::executePublishedQueryByName(IEspContext &context, const char * queryset, const char * queryname, StringBuffer &clonedwuid, const char *paramXml, IArrayOf<IConstNamedValue> *variables, const char * targetcluster, int start, int count)
  1260. {
  1261. bool success = true;
  1262. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1263. if (factory.get())
  1264. {
  1265. StringBuffer wuid;
  1266. StringBuffer username;
  1267. context.getUserID(username);
  1268. const char* passwd = context.queryPassword();
  1269. WsEclWuInfo wsinfo(wuid, queryset, queryname, username.str(), passwd);
  1270. success = executePublishedQueryByWuId(context, wsinfo.ensureWuid(), clonedwuid, paramXml, variables, targetcluster, start, count);
  1271. }
  1272. else
  1273. success = false;
  1274. return success;
  1275. }
  1276. bool CwssqlEx::executePublishedQueryByWuId(IEspContext &context, const char * targetwuid, StringBuffer &clonedwuid, const char *paramXml, IArrayOf<IConstNamedValue> *variables, const char * targetcluster, int start, int count)
  1277. {
  1278. bool success = true;
  1279. if (targetwuid && *targetwuid)
  1280. {
  1281. success = cloneAndExecuteWU(context, targetwuid, clonedwuid, paramXml, variables, NULL, targetcluster);
  1282. /*
  1283. if (waittime != 0)
  1284. waitForWorkUnitToComplete(clonedwui.str(), waittime);
  1285. Owned<IConstWorkUnit> cw = factory->openWorkUnit(clonedwui.str(), false);
  1286. if (!cw)
  1287. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", clonedwui.str());
  1288. getWUResult(context, clonedwui.str(), resp, start, count);
  1289. */
  1290. }
  1291. else
  1292. success = false;
  1293. return success;
  1294. }
  1295. bool CwssqlEx::executePublishedQuery(IEspContext &context, const char * queryset, const char * queryname, StringBuffer &resp, int start, int count, int waittime)
  1296. {
  1297. bool success = true;
  1298. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1299. if (factory.get())
  1300. {
  1301. StringBuffer wuid;
  1302. StringBuffer username;
  1303. context.getUserID(username);
  1304. const char* passwd = context.queryPassword();
  1305. WsEclWuInfo wsinfo(wuid, queryset, queryname, username.str(), passwd);
  1306. StringBuffer clonedwui;
  1307. cloneAndExecuteWU(context, wsinfo.ensureWuid(), clonedwui, NULL, NULL, NULL, "");
  1308. if (waittime != 0)
  1309. waitForWorkUnitToComplete(clonedwui.str(), waittime);
  1310. Owned<IConstWorkUnit> cw = factory->openWorkUnit(clonedwui.str(), false);
  1311. if (!cw)
  1312. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", clonedwui.str());
  1313. getWUResult(context, clonedwui.str(), resp, start, count, 0, WSSQLRESULT, WSSQLRESULTSCHEMA);
  1314. }
  1315. else
  1316. success = false;
  1317. return success;
  1318. }
  1319. bool CwssqlEx::executePublishedQuery(IEspContext &context, const char * wuid, StringBuffer &resp, int start, int count, int waittime)
  1320. {
  1321. bool success = true;
  1322. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1323. if (factory.get())
  1324. {
  1325. StringBuffer username;
  1326. context.getUserID(username);
  1327. const char* passwd = context.queryPassword();
  1328. WsEclWuInfo wsinfo(wuid, "", "", username.str(), passwd);
  1329. StringBuffer clonedwui;
  1330. cloneAndExecuteWU(context, wsinfo.ensureWuid(), clonedwui, NULL, NULL, NULL, "");
  1331. if (waittime != 0)
  1332. waitForWorkUnitToComplete(clonedwui.str(), waittime);
  1333. Owned<IConstWorkUnit> cw = factory->openWorkUnit(clonedwui.str(), false);
  1334. if (!cw)
  1335. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", clonedwui.str());
  1336. getWUResult(context, clonedwui.str(), resp, start, count, 0, WSSQLRESULT, WSSQLRESULTSCHEMA);
  1337. }
  1338. else
  1339. success = false;
  1340. return success;
  1341. }
  1342. bool CwssqlEx::cloneAndExecuteWU(IEspContext &context, const char * originalwuid, StringBuffer &clonedwuid, const char *paramXml, IArrayOf<IConstNamedValue> *variables, IArrayOf<IConstNamedValue> *debugs, const char * targetcluster)
  1343. {
  1344. bool success = true;
  1345. try
  1346. {
  1347. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1348. if (originalwuid && *originalwuid)
  1349. {
  1350. if (!looksLikeAWuid(originalwuid, 'W'))
  1351. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", originalwuid);
  1352. Owned<IConstWorkUnit> pwu = factory->openWorkUnit(originalwuid, false);
  1353. if (!pwu)
  1354. throw MakeStringException(-1,"Cannot open workunit %s.", originalwuid);
  1355. if (pwu->getExceptionCount()>0)
  1356. {
  1357. WsWUExceptions errors(*pwu);
  1358. if (errors.ErrCount()>0)
  1359. throw MakeStringException(-1,"Original query contains errors %s.", originalwuid);
  1360. }
  1361. StringBufferAdaptor isvWuid(clonedwuid);
  1362. WsWuHelpers::runWsWorkunit(
  1363. context,
  1364. clonedwuid,
  1365. originalwuid,
  1366. targetcluster,
  1367. paramXml,
  1368. variables,
  1369. debugs);
  1370. }
  1371. else
  1372. throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Missing WuId");
  1373. }
  1374. catch(IException* e)
  1375. {
  1376. FORWARDEXCEPTION(context, e, -1);
  1377. }
  1378. return success;
  1379. }
  1380. bool CwssqlEx::onCreateTableAndLoad(IEspContext &context, IEspCreateTableAndLoadRequest &req, IEspCreateTableAndLoadResponse &resp)
  1381. {
  1382. if (!context.validateFeatureAccess(WSSQLACCESS, SecAccess_Write, false))
  1383. throw MakeStringException(-1, "Failed to fetch results (open workunit). Permission denied.");
  1384. bool success = true;
  1385. const char * targetTableName = req.getTableName();
  1386. if (!targetTableName || !*targetTableName)
  1387. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: TableName cannot be empty.");
  1388. if (!HPCCFile::validateFileName(targetTableName))
  1389. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Target TableName is invalid: %s.", targetTableName);
  1390. const char * cluster = req.getTargetCluster();
  1391. if (notEmpty(cluster) && !isValidCluster(cluster))
  1392. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "WsSQL::CreateTableAndLoad: Invalid cluster name: %s", cluster);
  1393. IConstDataSourceInfo & datasource = req.getDataSource();
  1394. StringBuffer sourceDataFileName;
  1395. sourceDataFileName.set(datasource.getSprayedFileName()).trim();
  1396. if (sourceDataFileName.length() == 0)
  1397. {
  1398. sourceDataFileName.set(datasource.getLandingZoneFileName());
  1399. if (sourceDataFileName.length() == 0)
  1400. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Data Source File Name cannot be empty, provide either sprayed file name, or landing zone file name.");
  1401. const char * lzIP = datasource.getLandingZoneIP();
  1402. if (!lzIP || !*lzIP)
  1403. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: LandingZone IP cannot be empty if targeting a landing zone file.");
  1404. StringBuffer lzPath = datasource.getLandingZonePath();
  1405. if (!lzPath.length())
  1406. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Landingzone path cannot be empty.");
  1407. addPathSepChar(lzPath);
  1408. RemoteFilename rfn;
  1409. SocketEndpoint ep(lzIP);
  1410. rfn.setPath(ep, lzPath.append(sourceDataFileName.str()).str());
  1411. CDfsLogicalFileName dlfn;
  1412. dlfn.setExternal(rfn);
  1413. dlfn.get(sourceDataFileName.clear(), false, false);
  1414. }
  1415. IConstDataType & format = req.getDataSourceType();
  1416. const char * formatname = "";
  1417. CHPCCFileType formattype = format.getType();
  1418. switch (formattype)
  1419. {
  1420. case CHPCCFileType_FLAT:
  1421. formatname = "FLAT";
  1422. break;
  1423. case CHPCCFileType_CSV:
  1424. formatname = "CSV";
  1425. break;
  1426. case CHPCCFileType_JSON:
  1427. formatname = "JSON";
  1428. break;
  1429. case CHPCCFileType_XML:
  1430. formatname = "XML";
  1431. break;
  1432. default:
  1433. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Invalid file format detected.");
  1434. }
  1435. StringBuffer ecl;
  1436. StringBuffer recDef;
  1437. ecl.set("import std;\n");
  1438. {
  1439. IArrayOf<IConstEclFieldDeclaration>& eclFields = req.getEclFields();
  1440. if (eclFields.length() == 0)
  1441. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Empty record definition detected.");
  1442. recDef.set("TABLERECORDDEF := RECORD\n");
  1443. ForEachItemIn(fieldindex, eclFields)
  1444. {
  1445. IConstEclFieldDeclaration &eclfield = eclFields.item(fieldindex);
  1446. IConstEclFieldType &ecltype = eclfield.getEclFieldType();
  1447. const char * name = "";
  1448. CHPCCFieldType format = ecltype.getType();
  1449. switch (format)
  1450. {
  1451. case CHPCCFieldType_BOOLEAN:
  1452. name = "BOOLEAN";
  1453. break;
  1454. case CHPCCFieldType_INTEGER:
  1455. name = "INTEGER";
  1456. break;
  1457. case CHPCCFieldType_xUNSIGNED:
  1458. name = "UNSIGNED";
  1459. break;
  1460. case CHPCCFieldType_REAL:
  1461. name = "REAL";
  1462. break;
  1463. case CHPCCFieldType_DECIMAL:
  1464. name = "DECIMAL";
  1465. break;
  1466. case CHPCCFieldType_xSTRING:
  1467. name = "STRING";
  1468. break;
  1469. case CHPCCFieldType_QSTRING:
  1470. name = "QSTRING";
  1471. break;
  1472. case CHPCCFieldType_UNICODE:
  1473. name = "UNICODE";
  1474. break;
  1475. case CHPCCFieldType_DATA:
  1476. name = "DATA";
  1477. break;
  1478. case CHPCCFieldType_VARSTRING:
  1479. name = "VARSTRING";
  1480. break;
  1481. case CHPCCFieldType_VARUNICODE:
  1482. name = "VARUNICODE";
  1483. break;
  1484. default:
  1485. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Unrecognized field type detected.");
  1486. }
  1487. int len = ecltype.getLength();
  1488. const char * locale = ecltype.getLocale();
  1489. int precision = ecltype.getPrecision();
  1490. recDef.appendf("\t%s", name);
  1491. if (len > 0)
  1492. {
  1493. if(isdigit(recDef.charAt(recDef.length() - 1)))
  1494. recDef.append("_");
  1495. recDef.append(len);
  1496. }
  1497. if (locale && *locale)
  1498. recDef.append(locale);
  1499. if (precision > 0)
  1500. recDef.appendf("_%d", precision);
  1501. recDef.appendf(" %s;\n", eclfield.getFieldName());
  1502. }
  1503. recDef.append("END;\n");
  1504. }
  1505. ecl.append(recDef.str());
  1506. bool overwrite = req.getOverwrite();
  1507. StringBuffer formatnamefull = formatname;
  1508. IArrayOf<IConstDataTypeParam> & formatparams = format.getParams();
  1509. int formatparamscount = formatparams.length();
  1510. if (formatparamscount > 0 )
  1511. {
  1512. formatnamefull.append("(");
  1513. for (int paramindex = 0; paramindex < formatparamscount; paramindex++)
  1514. {
  1515. IConstDataTypeParam &paramitem = formatparams.item(paramindex);
  1516. const char * paramname = paramitem.getName();
  1517. if (!paramname || !*paramname)
  1518. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Format type '%s' appears to have unnamed parameter(s).", formatname);
  1519. StringArray & paramvalues = paramitem.getValues();
  1520. int paramvalueslen = paramvalues.length();
  1521. formatnamefull.appendf("%s(", paramname);
  1522. if (paramvalueslen > 1)
  1523. formatnamefull.append("[");
  1524. for (int paramvaluesindex = 0; paramvaluesindex < paramvalueslen; paramvaluesindex++)
  1525. {
  1526. formatnamefull.appendf("'%s'%s", paramvalues.item(paramvaluesindex), paramvaluesindex < paramvalueslen-1 ? "," : "");
  1527. }
  1528. if (paramvalueslen > 1)
  1529. formatnamefull.append("]");
  1530. formatnamefull.append(")");
  1531. if (paramindex < formatparamscount-1)
  1532. formatnamefull.append(",");
  1533. }
  1534. formatnamefull.append(")");
  1535. }
  1536. ecl.appendf("\nFILEDATASET := DATASET('~%s', TABLERECORDDEF, %s);\n",sourceDataFileName.str(), formatnamefull.str());
  1537. ecl.appendf("OUTPUT(FILEDATASET, ,'~%s'%s);", targetTableName, overwrite ? ", OVERWRITE" : "");
  1538. const char * description = req.getTableDescription();
  1539. if (description && * description)
  1540. ecl.appendf("\nStd.file.setfiledescription('~%s','%s')", targetTableName, description);
  1541. ESPLOG(LogMax, "WsSQL: creating new WU...");
  1542. NewWsWorkunit wu(context);
  1543. SCMStringBuffer compiledwuid;
  1544. compiledwuid.set(wu->queryWuid());
  1545. wu->setJobName("WsSQL Create table");
  1546. wu.setQueryText(ecl.str());
  1547. wu->setClusterName(cluster);
  1548. wu->setAction(WUActionCompile);
  1549. const char * wuusername = req.getOwner();
  1550. if (wuusername && *wuusername)
  1551. wu->setUser(wuusername);
  1552. wu->commit();
  1553. wu.clear();
  1554. ESPLOG(LogMax, "WsSQL: compiling WU...");
  1555. WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, NULL, 0, true, false, false, NULL, NULL, NULL);
  1556. waitForWorkUnitToCompile(compiledwuid.str(), req.getWait());
  1557. ESPLOG(LogMax, "WsSQL: finish compiling WU...");
  1558. ESPLOG(LogMax, "WsSQL: opening WU...");
  1559. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1560. Owned<IConstWorkUnit> cw = factory->openWorkUnit(compiledwuid.str(), false);
  1561. if (!cw)
  1562. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open WorkUnit %s.", compiledwuid.str());
  1563. WsWUExceptions errors(*cw);
  1564. if (errors.ErrCount()>0)
  1565. {
  1566. WsWuInfo winfo(context, compiledwuid.str());
  1567. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1568. success = false;
  1569. }
  1570. else
  1571. {
  1572. ESPLOG(LogMax, "WsSQL: executing WU(%s)...", compiledwuid.str());
  1573. WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, NULL, 0, false, true, true, NULL, NULL, NULL);
  1574. ESPLOG(LogMax, "WsSQL: waiting on WU(%s)...", compiledwuid.str());
  1575. waitForWorkUnitToComplete(compiledwuid.str(), req.getWait());
  1576. ESPLOG(LogMax, "WsSQL: finished waiting on WU(%s)...", compiledwuid.str());
  1577. Owned<IConstWorkUnit> rw = factory->openWorkUnit(compiledwuid.str(), false);
  1578. if (!rw)
  1579. throw MakeStringException(-1,"WsSQL: Cannot verify create and load request success.");
  1580. WsWuInfo winfo(context, compiledwuid.str());
  1581. WsWUExceptions errors(*rw);
  1582. if (errors.ErrCount() > 0 )
  1583. {
  1584. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1585. success = false;
  1586. }
  1587. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  1588. resp.setSuccess(success);
  1589. resp.setEclRecordDefinition(recDef.str());
  1590. resp.setTableName(targetTableName);
  1591. }
  1592. return success;
  1593. }
  1594. bool CwssqlEx::onGetResults(IEspContext &context, IEspGetResultsRequest &req, IEspGetResultsResponse &resp)
  1595. {
  1596. if (!context.validateFeatureAccess(WSSQLACCESS, SecAccess_Read, false))
  1597. throw MakeStringException(-1, "Failed to fetch results (open workunit). Permission denied.");
  1598. bool success = true;
  1599. const char* parentWuId = req.getWuId();
  1600. if (parentWuId && *parentWuId)
  1601. {
  1602. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1603. if (factory.get())
  1604. {
  1605. Owned<IConstWorkUnit> cw = factory->openWorkUnit(parentWuId, false);
  1606. if (!cw)
  1607. throw MakeStringException(-1,"Cannot open workunit %s.", parentWuId);
  1608. __int64 resultWindowStart = req.getResultWindowStart();
  1609. __int64 resultWindowCount = req.getResultWindowCount();
  1610. if (resultWindowStart < 0 || resultWindowCount <0 )
  1611. throw MakeStringException(-1,"Invalid result window value");
  1612. //resp.setResultLimit(resultLimit);
  1613. resp.setResultWindowCount((unsigned)resultWindowCount);
  1614. resp.setResultWindowStart((unsigned)resultWindowStart);
  1615. StringBuffer result;
  1616. if (getWUResult(context, parentWuId, result, (unsigned)resultWindowStart, (unsigned)resultWindowCount, 0, WSSQLRESULT, req.getSuppressXmlSchema() ? NULL : WSSQLRESULTSCHEMA))
  1617. resp.setResult(result.str());
  1618. WsWuInfo winfo(context, parentWuId);
  1619. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  1620. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1621. }
  1622. else
  1623. throw MakeStringException(-1,"Could not create WU factory object");
  1624. }
  1625. else
  1626. throw MakeStringException(-1,"Missing WuId");
  1627. return success;
  1628. }
  1629. void CwssqlEx::refreshValidClusters()
  1630. {
  1631. validClusters.kill();
  1632. Owned<IStringIterator> it = getTargetClusters(NULL, NULL);
  1633. ForEach(*it)
  1634. {
  1635. SCMStringBuffer s;
  1636. IStringVal &val = it->str(s);
  1637. if (!validClusters.getValue(val.str()))
  1638. validClusters.setValue(val.str(), true);
  1639. }
  1640. }
  1641. bool CwssqlEx::isValidCluster(const char *cluster)
  1642. {
  1643. if (!cluster || !*cluster)
  1644. return false;
  1645. CriticalBlock block(crit);
  1646. if (validClusters.getValue(cluster))
  1647. return true;
  1648. if (validateTargetClusterName(cluster))
  1649. {
  1650. refreshValidClusters();
  1651. return true;
  1652. }
  1653. return false;
  1654. }
  1655. bool CwssqlEx::publishWorkunit(IEspContext &context, const char * queryname, const char * wuid, const char * targetcluster)
  1656. {
  1657. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1658. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
  1659. if (!cw)
  1660. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit %s", wuid);
  1661. SCMStringBuffer queryName;
  1662. if (notEmpty(queryname))
  1663. queryName.set(queryname);
  1664. else
  1665. queryName.set(cw->queryJobName());
  1666. if (!queryName.length())
  1667. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query/Job name not defined for publishing workunit %s", wuid);
  1668. SCMStringBuffer target;
  1669. if (notEmpty(targetcluster))
  1670. target.set(targetcluster);
  1671. else
  1672. target.set(cw->queryClusterName());
  1673. if (!target.length())
  1674. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Cluster name not defined for publishing workunit %s", wuid);
  1675. if (!isValidCluster(target.str()))
  1676. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", target.str());
  1677. //RODRIGO this is needed:
  1678. //copyQueryFilesToCluster(context, cw, "", target.str(), queryName.str(), false);
  1679. WorkunitUpdate wu(&cw->lock());
  1680. wu->setJobName(queryName.str());
  1681. StringBuffer queryId;
  1682. addQueryToQuerySet(wu, target.str(), queryName.str(), MAKE_ACTIVATE, queryId, context.queryUserId());
  1683. wu->commit();
  1684. wu.clear();
  1685. return true;
  1686. }