ws_sqlService.cpp 74 KB


  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2014 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "ws_sqlService.hpp"
  14. #include "exception_util.hpp"
  15. void CwssqlEx::init(IPropertyTree *_cfg, const char *_process, const char *_service)
  16. {
  17. cfg = _cfg;
  18. try
  19. {
  20. ECLFunctions::init();
  21. }
  22. catch (...)
  23. {
  24. throw MakeStringException(-1, "ws_sqlEx: Problem initiating ECLFunctions structure");
  25. }
  26. refreshValidClusters();
  27. setWsSqlBuildVersion(BUILD_TAG);
  28. }
  29. bool CwssqlEx::onEcho(IEspContext &context, IEspEchoRequest &req, IEspEchoResponse &resp)
  30. {
  31. resp.setResponse(req.getRequest());
  32. return true;
  33. }
  34. bool CwssqlEx::onGetDBMetaData(IEspContext &context, IEspGetDBMetaDataRequest &req, IEspGetDBMetaDataResponse &resp)
  35. {
  36. context.ensureFeatureAccess(WSSQLACCESS, SecAccess_Read, -1, "WsSQL::GetDBMetaData: Permission denied.");
  37. bool success = false;
  38. StringBuffer username;
  39. context.getUserID(username);
  40. const char* passwd = context.queryPassword();
  41. bool includetables = req.getIncludeTables();
  42. if (includetables)
  43. {
  44. Owned<HPCCFileCache> tmpHPCCFileCache = HPCCFileCache::createFileCache(username.str(), passwd);
  45. tmpHPCCFileCache->populateTablesResponse(resp, req.getTableFilter());
  46. resp.setTableCount(resp.getTables().length());
  47. }
  48. bool includeStoredProcs = req.getIncludeStoredProcedures();
  49. if (includeStoredProcs)
  50. {
  51. const char * querysetfilter = req.getQuerySet();
  52. Owned<IStringIterator> targets = getTargetClusters(NULL, NULL);
  53. IArrayOf<IEspHPCCQuerySet> pquerysets;
  54. SCMStringBuffer target;
  55. ForEach(*targets)
  56. {
  57. const char *setname = targets->str(target).str();
  58. if ( querysetfilter && *querysetfilter && stricmp(setname, querysetfilter)!=0)
  59. continue;
  60. Owned<IEspHPCCQuerySet> pqset = createHPCCQuerySet();
  61. pqset->setName(setname);
  62. pquerysets.append(*pqset.getLink());
  63. Owned<IPropertyTree> settree = getQueryRegistry(setname, true);
  64. if (settree == NULL)
  65. continue;
  66. IArrayOf<IEspPublishedQuery> queries;
  67. Owned<IPropertyTreeIterator> iter = settree->getElements("Query");
  68. ForEach(*iter)
  69. {
  70. const char * id = iter->query().queryProp("@id");
  71. const char * qname = iter->query().queryProp("@name");
  72. const char * wuid = iter->query().queryProp("@wuid");
  73. if (qname && *qname && wuid && *wuid)
  74. {
  75. StringBuffer resp;
  76. Owned<IEspPublishedQuery> pubQuery = createPublishedQuery();
  77. pubQuery->setName(qname);
  78. pubQuery->setId(id);
  79. pubQuery->setWuid(wuid);
  80. pubQuery->setSuspended(iter->query().getPropBool("@suspended"));
  81. Owned<IEspQuerySignature> querysignature = createQuerySignature();
  82. IArrayOf<IEspHPCCColumn> inparams;
  83. IArrayOf<IEspOutputDataset> resultsets;
  84. WsEclWuInfo wsinfo(wuid, setname, qname, username, passwd);
  85. Owned<IResultSetFactory> resultSetFactory(getResultSetFactory(username, passwd));
  86. //Each published query can have multiple results (datasets)
  87. IConstWUResultIterator &results = wsinfo.ensureWorkUnit()->getResults();
  88. ForEach(results)
  89. {
  90. Owned<IEspOutputDataset> outputdataset = createOutputDataset();
  91. IArrayOf<IEspHPCCColumn> outparams;
  92. IConstWUResult &result = results.query();
  93. SCMStringBuffer resultName;
  94. result.getResultName(resultName);
  95. outputdataset->setName(resultName.s.str());
  96. Owned<IResultSetMetaData> meta = resultSetFactory->createResultSetMeta(&result);
  97. //Each result dataset can have multiple result columns
  98. int columncount = meta->getColumnCount();
  99. for (int i = 0; i < columncount; i++)
  100. {
  101. Owned<IEspHPCCColumn> col = createHPCCColumn();
  102. SCMStringBuffer columnLabel;
  103. meta->getColumnLabel(columnLabel,i);
  104. col->setName(columnLabel.str());
  105. SCMStringBuffer eclType;
  106. meta->getColumnEclType(eclType, i);
  107. col->setType(eclType.str());
  108. outparams.append(*col.getLink());
  109. }
  110. outputdataset->setOutParams(outparams);
  111. resultsets.append(*outputdataset.getLink());
  112. }
  113. //Each query can have multiple input parameters
  114. IConstWUResultIterator &vars = wsinfo.ensureWorkUnit()->getVariables();
  115. ForEach(vars)
  116. {
  117. Owned<IEspHPCCColumn> col = createHPCCColumn();
  118. IConstWUResult &var = vars.query();
  119. SCMStringBuffer varname;
  120. var.getResultName(varname);
  121. col->setName(varname.str());
  122. Owned<IResultSetMetaData> meta = resultSetFactory->createResultSetMeta(&var);
  123. SCMStringBuffer eclType;
  124. meta->getColumnEclType(eclType, 0);
  125. col->setType(eclType.str());
  126. inparams.append(*col.getLink());
  127. }
  128. querysignature->setInParams(inparams);
  129. querysignature->setResultSets(resultsets);
  130. pubQuery->setSignature(*querysignature.getLink());
  131. queries.append(*pubQuery.getLink());
  132. }
  133. }
  134. pqset->setQuerySetQueries(queries);
  135. IArrayOf<IEspQuerySetAliasMap> aliases;
  136. Owned<IPropertyTreeIterator> aliasiter = settree->getElements("Alias");
  137. ForEach(*aliasiter)
  138. {
  139. Owned<IEspQuerySetAliasMap> alias = createQuerySetAliasMap();
  140. const char * qname;
  141. const char * id;
  142. id = aliasiter->query().queryProp("@id");
  143. qname = aliasiter->query().queryProp("@name");
  144. alias->setId(id);
  145. alias->setName(qname);
  146. aliases.append(*alias.getLink());
  147. }
  148. pqset->setQuerySetAliases(aliases);
  149. }
  150. resp.setQuerySets(pquerysets);
  151. }
  152. bool includeTargetClusters = req.getIncludeTargetClusters();
  153. if (includeTargetClusters)
  154. {
  155. try
  156. {
  157. CTpWrapper topologyWrapper;
  158. IArrayOf<IEspTpLogicalCluster> clusters;
  159. topologyWrapper.getTargetClusterList(clusters, req.getClusterType(), NULL);
  160. StringArray dfuclusters;
  161. ForEachItemIn(k, clusters)
  162. {
  163. IEspTpLogicalCluster& cluster = clusters.item(k);
  164. dfuclusters.append(cluster.getName());
  165. }
  166. resp.setClusterNames(dfuclusters);
  167. }
  168. catch(IException* e)
  169. {
  170. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  171. }
  172. }
  173. return success;
  174. }
  175. bool CwssqlEx::onGetDBSystemInfo(IEspContext &context, IEspGetDBSystemInfoRequest &req, IEspGetDBSystemInfoResponse &resp)
  176. {
  177. bool success = false;
  178. resp.setName("HPCC Systems");
  179. context.ensureFeatureAccess(WSSQLACCESS, SecAccess_Access, -1, "WsSQL::GetDBSystemInfo: Permission denied.");
  180. try
  181. {
  182. const char* build_ver = getBuildVersion();
  183. if (build_ver && *build_ver)
  184. {
  185. StringBuffer project;
  186. StringBuffer major;
  187. StringBuffer minor;
  188. StringBuffer point;
  189. StringBuffer maturity;
  190. //community_4.1.0-trunk1-Debug[heads/wssql-0-gb9e351-dirty
  191. const char * tail = build_ver;
  192. while (tail && *tail != '_')
  193. project.append(*tail++);
  194. tail++;
  195. while (tail && *tail != '.')
  196. major.append(*tail++);
  197. resp.setMajor(major.str());
  198. tail++;
  199. while (tail && *tail != '.')
  200. minor.append(*tail++);
  201. resp.setMinor(minor.str());
  202. tail++;
  203. while (tail && *tail != '-')
  204. point.append(*tail++);
  205. resp.setPoint(point.str());
  206. if (req.getIncludeAll())
  207. {
  208. resp.setFullVersion(build_ver);
  209. resp.setProject(project.str());
  210. tail++;
  211. while (tail && *tail != '-' && *tail != '[')
  212. maturity.append(*tail++);
  213. resp.setMaturity(maturity.str());
  214. }
  215. }
  216. const char* wssqlbuild_ver = getWsSqlBuildVersion();
  217. if (wssqlbuild_ver && *wssqlbuild_ver)
  218. {
  219. StringBuffer major;
  220. StringBuffer minor;
  221. StringBuffer point;
  222. StringBuffer maturity;
  223. //5.4.0-trunk1-Debug[heads/wssql-0-gb9e351-dirty
  224. const char * tail = wssqlbuild_ver;
  225. while (tail && *tail != '.')
  226. major.append(*tail++);
  227. resp.setWsSQLMajor(major.str());
  228. tail++;
  229. while (tail && *tail != '.')
  230. minor.append(*tail++);
  231. resp.setWsSQLMinor(minor.str());
  232. tail++;
  233. while (tail && *tail != '-')
  234. point.append(*tail++);
  235. resp.setWsSQLPoint(point.str());
  236. if (req.getIncludeAll())
  237. {
  238. resp.setWsSQLFullVersion(wssqlbuild_ver);
  239. tail++;
  240. while (tail && *tail != '-' && *tail != '[')
  241. maturity.append(*tail++);
  242. resp.setWsSQLMaturity(maturity.str());
  243. }
  244. success = true;
  245. }
  246. }
  247. catch (...)
  248. {
  249. ERRLOG("Error Parsing HPCC and/or WsSQL Version string.");
  250. }
  251. return success;
  252. }
  253. void printTree(pANTLR3_BASE_TREE t, int indent)
  254. {
  255. pANTLR3_BASE_TREE child = NULL;
  256. int children = 0;
  257. char * tokenText = NULL;
  258. string ind = "";
  259. int i = 0;
  260. if ( t != NULL )
  261. {
  262. children = t->getChildCount(t);
  263. for ( i = 0; i < indent; i++ )
  264. ind += " ";
  265. for ( i = 0; i < children; i++ )
  266. {
  267. pANTLR3_BASE_TREE child = (pANTLR3_BASE_TREE)(t->getChild(t, i));
  268. ANTLR3_UINT32 tokenType = child->getType(child);
  269. tokenText = (char *)child->toString(child)->chars;
  270. fprintf(stderr, "%s%s\n", ind.c_str(), tokenText);
  271. if (tokenType == ANTLR3_TOKEN_EOF)
  272. break;
  273. printTree(child, indent+1);
  274. }
  275. }
  276. }
  277. void myDisplayRecognitionError (pANTLR3_BASE_RECOGNIZER recognizer,pANTLR3_UINT8 * tokenNames)
  278. {
  279. StringBuffer errorMessage;
  280. pANTLR3_PARSER parser = NULL;
  281. pANTLR3_TREE_PARSER tparser = NULL;
  282. pANTLR3_INT_STREAM is;
  283. pANTLR3_STRING ttext;
  284. pANTLR3_EXCEPTION ex;
  285. pANTLR3_COMMON_TOKEN theToken;
  286. pANTLR3_BASE_TREE theBaseTree;
  287. pANTLR3_COMMON_TREE theCommonTree;
  288. ex = recognizer->state->exception;
  289. ttext = nullptr;
  290. errorMessage.append("Error while parsing");
  291. if (ex)
  292. {
  293. errorMessage.appendf(": ANTLR Error %d : %s", ex->type, (pANTLR3_UINT8)(ex->message));
  294. switch (recognizer->type)
  295. {
  296. case ANTLR3_TYPE_PARSER:
  297. {
  298. parser = (pANTLR3_PARSER) (recognizer->super);
  299. is = parser->tstream->istream;
  300. theToken = (pANTLR3_COMMON_TOKEN)(ex->token);
  301. if (theToken)
  302. {
  303. ttext = theToken->toString(theToken);
  304. if (theToken->type == ANTLR3_TOKEN_EOF)
  305. errorMessage.append(", at <EOF>");
  306. else
  307. errorMessage.appendf("\n Near %s\n ", ttext == nullptr ? (pANTLR3_UINT8)"<no text for the token>" : ttext->chars);
  308. }
  309. break;
  310. }
  311. case ANTLR3_TYPE_TREE_PARSER:
  312. {
  313. tparser = (pANTLR3_TREE_PARSER) (recognizer->super);
  314. is = tparser->ctnstream->tnstream->istream;
  315. theBaseTree = (pANTLR3_BASE_TREE)(ex->token);
  316. if (theBaseTree)
  317. {
  318. ttext = theBaseTree->toStringTree(theBaseTree);
  319. theCommonTree = (pANTLR3_COMMON_TREE) theBaseTree->super;
  320. if (theCommonTree != nullptr)
  321. theToken = (pANTLR3_COMMON_TOKEN) theBaseTree->getToken(theBaseTree);
  322. errorMessage.appendf( ", at offset %d", theBaseTree->getCharPositionInLine(theBaseTree));
  323. errorMessage.appendf( ", near %s", ttext->chars);
  324. }
  325. break;
  326. }
  327. default:
  328. //errorMessage.appendf("Base recognizer function displayRecognitionError called by unknown parser type - provide override for this function\n");
  329. return;
  330. break;
  331. }
  332. switch (ex->type)
  333. {
  334. case ANTLR3_UNWANTED_TOKEN_EXCEPTION:
  335. {
  336. // Indicates that the recognizer was fed a token which seesm to be
  337. // spurious input. We can detect this when the token that follows
  338. // this unwanted token would normally be part of the syntactically
  339. // correct stream. Then we can see that the token we are looking at
  340. // is just something that should not be there and throw this exception.
  341. //
  342. if (tokenNames == nullptr)
  343. {
  344. errorMessage.appendf( " : Extraneous input...");
  345. }
  346. else
  347. {
  348. if (ex->expecting == ANTLR3_TOKEN_EOF)
  349. errorMessage.appendf(" : Extraneous input - expected <EOF>\n");
  350. else
  351. errorMessage.appendf(" : Extraneous input - expected %s ...\n", tokenNames[ex->expecting]);
  352. }
  353. break;
  354. }
  355. case ANTLR3_MISSING_TOKEN_EXCEPTION:
  356. {
  357. // Indicates that the recognizer detected that the token we just
  358. // hit would be valid syntactically if preceeded by a particular
  359. // token. Perhaps a missing ';' at line end or a missing ',' in an
  360. // expression list, and such like.
  361. //
  362. if (tokenNames == nullptr)
  363. {
  364. errorMessage.appendf( " : Missing token (%d)...\n", ex->expecting);
  365. }
  366. else
  367. {
  368. if (ex->expecting == ANTLR3_TOKEN_EOF)
  369. errorMessage.appendf( " : Missing <EOF>\n");
  370. else
  371. errorMessage.appendf( " : Missing %s \n", tokenNames[ex->expecting]);
  372. }
  373. break;
  374. }
  375. case ANTLR3_RECOGNITION_EXCEPTION:
  376. {
  377. // Indicates that the recognizer received a token
  378. // in the input that was not predicted. This is the basic exception type
  379. // from which all others are derived. So we assume it was a syntax error.
  380. // You may get this if there are not more tokens and more are needed
  381. // to complete a parse for instance.
  382. //
  383. errorMessage.appendf( " : syntax error...\n");
  384. break;
  385. }
  386. case ANTLR3_MISMATCHED_TOKEN_EXCEPTION:
  387. {
  388. // We were expecting to see one thing and got another. This is the
  389. // most common error if we coudl not detect a missing or unwanted token.
  390. // Here you can spend your efforts to
  391. // derive more useful error messages based on the expected
  392. // token set and the last token and so on. The error following
  393. // bitmaps do a good job of reducing the set that we were looking
  394. // for down to something small. Knowing what you are parsing may be
  395. // able to allow you to be even more specific about an error.
  396. //
  397. if (tokenNames == NULL)
  398. {
  399. errorMessage.appendf(" : syntax error...\n");
  400. }
  401. else
  402. {
  403. if (ex->expecting == ANTLR3_TOKEN_EOF)
  404. errorMessage.appendf(" : expected <EOF>\n");
  405. else
  406. errorMessage.appendf(" : expected %s ...\n", tokenNames[ex->expecting]);
  407. }
  408. break;
  409. }
  410. case ANTLR3_NO_VIABLE_ALT_EXCEPTION:
  411. {
  412. // We could not pick any alt decision from the input given
  413. // so god knows what happened - however when you examine your grammar,
  414. // you should. It means that at the point where the current token occurred
  415. // that the DFA indicates nowhere to go from here.
  416. //
  417. errorMessage.appendf(" : cannot match to any predicted input...\n");
  418. break;
  419. }
  420. case ANTLR3_MISMATCHED_SET_EXCEPTION:
  421. {
  422. ANTLR3_UINT32 count;
  423. ANTLR3_UINT32 bit;
  424. ANTLR3_UINT32 size;
  425. ANTLR3_UINT32 numbits;
  426. pANTLR3_BITSET errBits;
  427. // This means we were able to deal with one of a set of
  428. // possible tokens at this point, but we did not see any
  429. // member of that set.
  430. errorMessage.appendf( " : unexpected input...\n expected one of : ");
  431. // What tokens could we have accepted at this point in the parse?
  432. count = 0;
  433. errBits = antlr3BitsetLoad (ex->expectingSet);
  434. numbits = errBits->numBits (errBits);
  435. size = errBits->size (errBits);
  436. if (size > 0)
  437. {
  438. // However many tokens we could have dealt with here, it is usually
  439. // not useful to print ALL of the set here. I arbitrarily chose 8
  440. // here, but you should do whatever makes sense for you of course.
  441. // No token number 0, so look for bit 1 and on.
  442. for (bit = 1; bit < numbits && count < 8 && count < size; bit++)
  443. {
  444. if (tokenNames[bit])
  445. {
  446. errorMessage.appendf( "%s%s", count > 0 ? ", " : "", tokenNames[bit]);
  447. count++;
  448. }
  449. }
  450. errorMessage.appendf( "\n");
  451. }
  452. else
  453. {
  454. errorMessage.appendf( "Unknown parsing error.\n");
  455. }
  456. break;
  457. }
  458. case ANTLR3_EARLY_EXIT_EXCEPTION:
  459. {
  460. // We entered a loop requiring a number of token sequences
  461. // but found a token that ended that sequence earlier than
  462. // we should have done.
  463. errorMessage.appendf( " : missing elements...\n");
  464. break;
  465. }
  466. default:
  467. {
  468. // We don't handle any other exceptions here, but you can
  469. // if you wish. If we get an exception that hits this point
  470. // then we are just going to report what we know about the
  471. // token.
  472. //
  473. errorMessage.appendf( " : unrecognized syntax...\n");
  474. break;
  475. }
  476. }
  477. }
  478. throw MakeStringException(-1, "%s", errorMessage.str());
  479. }
  480. HPCCSQLTreeWalker * CwssqlEx::parseSQL(IEspContext &context, StringBuffer & sqltext, bool attemptParameterization)
  481. {
  482. int limit = -1;
  483. pHPCCSQLLexer hpccSqlLexer = NULL;
  484. pANTLR3_COMMON_TOKEN_STREAM sqlTokens = NULL;
  485. pHPCCSQLParser hpccSqlParser = NULL;
  486. pANTLR3_BASE_TREE sqlAST = NULL;
  487. pANTLR3_INPUT_STREAM sqlInputStream = NULL;
  488. Owned<HPCCSQLTreeWalker> hpccSqlTreeWalker;
  489. try
  490. {
  491. if (sqltext.length() <= 0)
  492. throw MakeStringException(-1, "Empty SQL String detected.");
  493. pANTLR3_UINT8 input_string = (pANTLR3_UINT8)sqltext.str();
  494. pANTLR3_INPUT_STREAM sqlinputstream = antlr3StringStreamNew(input_string,
  495. ANTLR3_ENC_8BIT,
  496. sqltext.length(),
  497. (pANTLR3_UINT8)"SQL INPUT");
  498. pHPCCSQLLexer hpccsqllexer = HPCCSQLLexerNew(sqlinputstream);
  499. //hpccSqlLexer->pLexer->rec->displayRecognitionError = myDisplayRecognitionError;
  500. //ANTLR3_UINT32 lexerrors = hpccsqllexer->pLexer->rec->getNumberOfSyntaxErrors(hpccsqllexer->pLexer->rec);
  501. //if (lexerrors > 0)
  502. // throw MakeStringException(-1, "HPCCSQL Lexer reported %d error(s), request aborted.", lexerrors);
  503. pANTLR3_COMMON_TOKEN_STREAM sqltokens = antlr3CommonTokenStreamSourceNew(ANTLR3_SIZE_HINT, TOKENSOURCE(hpccsqllexer));
  504. if (sqltokens == NULL)
  505. {
  506. throw MakeStringException(-1, "Out of memory trying to allocate ANTLR HPCCSQLParser token stream.");
  507. }
  508. pHPCCSQLParser hpccsqlparser = HPCCSQLParserNew(sqltokens);
  509. //#if not defined(_DEBUG)
  510. hpccsqlparser->pParser->rec->displayRecognitionError = myDisplayRecognitionError;
  511. //#endif
  512. pANTLR3_BASE_TREE sqlAST = (hpccsqlparser->root_statement(hpccsqlparser)).tree;
  513. ANTLR3_UINT32 parserrors = hpccsqlparser->pParser->rec->getNumberOfSyntaxErrors(hpccsqlparser->pParser->rec);
  514. if (parserrors > 0)
  515. throw MakeStringException(-1, "HPCCSQL Parser reported %d error(s), request aborted.", parserrors);
  516. #if defined(_DEBUG)
  517. printTree(sqlAST, 0);
  518. #endif
  519. hpccSqlTreeWalker.setown(new HPCCSQLTreeWalker(sqlAST, context, attemptParameterization));
  520. hpccsqlparser->free(hpccsqlparser);
  521. sqltokens->free(sqltokens);
  522. hpccsqllexer->free(hpccsqllexer);
  523. sqlinputstream->free(sqlinputstream);
  524. }
  525. catch(IException* e)
  526. {
  527. try
  528. {
  529. if (hpccSqlParser)
  530. hpccSqlParser->free(hpccSqlParser);
  531. if (sqlTokens)
  532. sqlTokens->free(sqlTokens);
  533. if (hpccSqlLexer)
  534. hpccSqlLexer->free(hpccSqlLexer);
  535. if (sqlInputStream)
  536. sqlInputStream->free(sqlInputStream);
  537. hpccSqlTreeWalker.clear();
  538. }
  539. catch (...)
  540. {
  541. ERRLOG("!!! Unable to free HPCCSQL parser/lexer objects.");
  542. }
  543. //All IExceptions get bubbled up
  544. throw e;
  545. }
  546. catch(...)
  547. {
  548. try
  549. {
  550. if (hpccSqlParser)
  551. hpccSqlParser->free(hpccSqlParser);
  552. if (sqlTokens)
  553. sqlTokens->free(sqlTokens);
  554. if (hpccSqlLexer)
  555. hpccSqlLexer->free(hpccSqlLexer);
  556. if (sqlInputStream)
  557. sqlInputStream->free(sqlInputStream);
  558. hpccSqlTreeWalker.clear();
  559. }
  560. catch (...)
  561. {
  562. ERRLOG("!!! Unable to free HPCCSQL parser/lexer objects.");
  563. }
  564. //All other unexpected exceptions are reported as generic ecl generation error.
  565. throw MakeStringException(-1, "Error generating ECL code.");
  566. }
  567. return hpccSqlTreeWalker.getLink();
  568. }
  569. bool CwssqlEx::getWUResult(IEspContext &context, const char * wuid, StringBuffer &result, unsigned start, unsigned count, int sequence, const char * dsname, const char * schemaname)
  570. {
  571. context.addTraceSummaryTimeStamp(LogMin, "StrtgetReslts");
  572. if (wuid && *wuid)
  573. {
  574. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  575. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
  576. if (!cw)
  577. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", wuid);
  578. SCMStringBuffer stateDesc;
  579. switch (cw->getState())
  580. {
  581. case WUStateCompleted:
  582. case WUStateFailed:
  583. case WUStateUnknown:
  584. case WUStateCompiled:
  585. {
  586. StringBufferAdaptor resultXML(result);
  587. Owned<IResultSetFactory> factory = getResultSetFactory(context.queryUserId(), context.queryPassword());
  588. Owned<INewResultSet> nr = factory->createNewResultSet(wuid, sequence, NULL);
  589. if (nr.get())
  590. {
  591. context.addTraceSummaryTimeStamp(LogMax, "strtgetXMLRslts");
  592. getResultXml(resultXML, nr.get(), dsname, start, count, schemaname);
  593. context.addTraceSummaryTimeStamp(LogMax, "endgetXMLRslts");
  594. }
  595. else
  596. return false;
  597. break;
  598. }
  599. default:
  600. break;
  601. }
  602. context.addTraceSummaryTimeStamp(LogMin, "ExitgetRslts");
  603. return true;
  604. }
  605. context.addTraceSummaryTimeStamp(LogMin, "ExitgetRslts");
  606. return false;
  607. }
  608. bool CwssqlEx::onSetRelatedIndexes(IEspContext &context, IEspSetRelatedIndexesRequest &req, IEspSetRelatedIndexesResponse &resp)
  609. {
  610. context.ensureFeatureAccess(WSSQLACCESS, SecAccess_Write, -1, "WsSQL::SetRelatedIndexes: Permission denied.");
  611. StringBuffer username;
  612. context.getUserID(username);
  613. const char* passwd = context.queryPassword();
  614. IArrayOf<IConstRelatedIndexSet>& relatedindexSets = req.getRelatedIndexSets();
  615. if (relatedindexSets.length() == 0)
  616. throw MakeStringException(-1, "WsSQL::SetRelatedIndexes empty request detected.");
  617. ForEachItemIn(relatedindexsetindex, relatedindexSets)
  618. {
  619. IConstRelatedIndexSet &relatedIndexSet = relatedindexSets.item(relatedindexsetindex);
  620. const char * fileName = relatedIndexSet.getFileName();
  621. if (!fileName || !*fileName)
  622. throw MakeStringException(-1, "WsSQL::SetRelatedIndexes error: Empty file name detected.");
  623. StringArray& indexHints = relatedIndexSet.getIndexes();
  624. int indexHintsCount = indexHints.length();
  625. if (indexHintsCount > 0)
  626. {
  627. Owned<HPCCFile> file = HPCCFileCache::fetchHpccFileByName(fileName,username.str(), passwd, false, false);
  628. if (!file)
  629. throw MakeStringException(-1, "WsSQL::SetRelatedIndexes error: could not find file: %s.", fileName);
  630. StringBuffer description;
  631. StringBuffer currentIndexes;
  632. description = file->getDescription();
  633. HPCCFile::parseOutRelatedIndexes(description, currentIndexes);
  634. description.append("\nXDBC:RelIndexes=[");
  635. for(int indexHintIndex = 0; indexHintIndex < indexHintsCount; indexHintIndex++)
  636. {
  637. description.appendf("%s%c", indexHints.item(indexHintIndex), (indexHintIndex < indexHintsCount-1 ? ';' : ' '));
  638. }
  639. description.append("]\n");
  640. HPCCFileCache::updateHpccFileDescription(fileName, username, passwd, description.str());
  641. file->setDescription(description.str());
  642. }
  643. }
  644. resp.setRelatedIndexSets(relatedindexSets);
  645. return true;
  646. }
  647. bool CwssqlEx::onGetRelatedIndexes(IEspContext &context, IEspGetRelatedIndexesRequest &req, IEspGetRelatedIndexesResponse &resp)
  648. {
  649. try
  650. {
  651. context.ensureFeatureAccess(WSSQLACCESS, SecAccess_Read, -1, "WsSQL::GetRelatedIndexes: Permission denied.");
  652. StringArray& filenames = req.getFileNames();
  653. if (filenames.length() == 0)
  654. throw MakeStringException(-1, "WsSQL::GetRelatedIndexes error: No filenames detected");
  655. StringBuffer username;
  656. context.getUserID(username);
  657. const char* passwd = context.queryPassword();
  658. IArrayOf<IEspRelatedIndexSet> relatedindexSets;
  659. ForEachItemIn(filenameindex, filenames)
  660. {
  661. const char * fileName = filenames.item(filenameindex);
  662. Owned<HPCCFile> file = HPCCFileCache::fetchHpccFileByName(fileName,username.str(), passwd, false, false);
  663. if (file)
  664. {
  665. StringArray indexHints;
  666. file->getRelatedIndexes(indexHints);
  667. Owned<IEspRelatedIndexSet> relatedIndexSet = createRelatedIndexSet("", "");
  668. relatedIndexSet->setFileName(fileName);
  669. relatedIndexSet->setIndexes(indexHints);
  670. relatedindexSets.append(*relatedIndexSet.getLink());
  671. }
  672. }
  673. resp.setRelatedIndexSets(relatedindexSets);
  674. }
  675. catch(IException* e)
  676. {
  677. FORWARDEXCEPTION(context, e, -1);
  678. }
  679. return true;
  680. }
  681. void CwssqlEx::processMultipleClusterOption(StringArray & clusters, const char * targetcluster, StringBuffer & hashoptions)
  682. {
  683. int clusterscount = clusters.length();
  684. if (clusterscount > 0)
  685. {
  686. hashoptions.appendf("\n#OPTION('AllowedClusters', '%s", targetcluster);
  687. ForEachItemIn(i,clusters)
  688. {
  689. if (!isValidCluster(clusters.item(i)))
  690. throw MakeStringException(-1, "Invalid alternate cluster name: %s", clusters.item(i));
  691. hashoptions.appendf(",%s", clusters.item(i));
  692. }
  693. hashoptions.append("');\n#OPTION('AllowAutoQueueSwitch', TRUE);\n\n");
  694. }
  695. }
  696. bool CwssqlEx::onExecuteSQL(IEspContext &context, IEspExecuteSQLRequest &req, IEspExecuteSQLResponse &resp)
  697. {
  698. try
  699. {
  700. context.addTraceSummaryTimeStamp(LogMin, "StrtOnExecuteSQL");
  701. context.ensureFeatureAccess(WSSQLACCESS, SecAccess_Write, -1, "WsSQL::ExecuteSQL: Permission denied.");
  702. double version = context.getClientVersion();
  703. StringBuffer sqltext;
  704. StringBuffer ecltext;
  705. StringBuffer username;
  706. context.getUserID(username);
  707. const char* passwd = context.queryPassword();
  708. sqltext.set(req.getSqlText());
  709. if (sqltext.length() <= 0)
  710. throw MakeStringException(1,"Empty SQL request.");
  711. const char * cluster = req.getTargetCluster();
  712. StringBuffer hashoptions;
  713. if (version > 3.03)
  714. {
  715. StringArray & alternates = req.getAlternateClusters();
  716. if (alternates.length() > 0)
  717. processMultipleClusterOption(alternates, cluster, hashoptions);
  718. }
  719. SCMStringBuffer compiledwuid;
  720. int resultLimit = req.getResultLimit();
  721. __int64 resultWindowStart = req.getResultWindowStart();
  722. __int64 resultWindowCount = req.getResultWindowCount();
  723. if (resultWindowStart < 0 || resultWindowCount <0 )
  724. throw MakeStringException(-1,"Invalid result window value");
  725. bool clonable = false;
  726. bool cacheeligible = (version > 3.04 ) ? !req.getIgnoreCache() : true;
  727. Owned<HPCCSQLTreeWalker> parsedSQL;
  728. ESPLOG(LogNormal, "WsSQL: Parsing sql query...");
  729. parsedSQL.setown(parseSQL(context, sqltext));
  730. ESPLOG(LogNormal, "WsSQL: Finished parsing sql query...");
  731. SQLQueryType querytype = parsedSQL->getSqlType();
  732. if (querytype == SQLTypeCall)
  733. {
  734. if (strlen(parsedSQL->getQuerySetName())==0)
  735. {
  736. if (strlen(req.getTargetQuerySet())==0)
  737. throw MakeStringException(-1,"Missing Target QuerySet.");
  738. else
  739. parsedSQL->setQuerySetName(req.getTargetQuerySet());
  740. }
  741. ESPLOG(LogMax, "WsSQL: Processing call query...");
  742. WsEclWuInfo wsinfo("", parsedSQL->getQuerySetName(), parsedSQL->getStoredProcName(), username.str(), passwd);
  743. compiledwuid.set(wsinfo.ensureWuid());
  744. clonable = true;
  745. }
  746. else if (querytype == SQLTypeCreateAndLoad)
  747. {
  748. cacheeligible = false;
  749. }
  750. StringBuffer xmlparams;
  751. StringBuffer normalizedSQL(parsedSQL->getNormalizedSQL());
  752. normalizedSQL.append(" | --TC=").append(cluster);
  753. if (username.length() > 0)
  754. normalizedSQL.append("--USER=").append(username.str());
  755. if (resultLimit > 0)
  756. normalizedSQL.append("--HARDLIMIT=").append(resultLimit);
  757. const char * wuusername = req.getUserName();
  758. if (wuusername && *wuusername)
  759. normalizedSQL.append("--WUOWN=").append(wuusername);
  760. if (hashoptions.length()>0)
  761. normalizedSQL.append("--HO=").append(hashoptions.str());
  762. if (compiledwuid.length() != 0)
  763. normalizedSQL.append("--PWUID=").append(compiledwuid.str());
  764. ESPLOG(LogMax, "WsSQL: getWorkUnitFactory...");
  765. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  766. ESPLOG(LogMax, "WsSQL: checking query cache...");
  767. if(cacheeligible && getCachedQuery(normalizedSQL.str(), compiledwuid.s))
  768. {
  769. ESPLOG(LogMax, "WsSQL: cache hit opening wuid %s...", compiledwuid.str());
  770. Owned<IConstWorkUnit> cw = factory->openWorkUnit(compiledwuid.str(), false);
  771. if (!cw)//cache hit but unavailable WU
  772. {
  773. ESPLOG(LogMax, "WsSQL: cache hit but unavailable WU...");
  774. removeQueryFromCache(normalizedSQL.str());
  775. compiledwuid.clear();
  776. }
  777. else
  778. clonable = true;
  779. }
  780. if (compiledwuid.length()==0)
  781. {
  782. {
  783. if (isEmpty(cluster))
  784. throw MakeStringException(-1,"Target cluster not set.");
  785. if (!isValidCluster(cluster))
  786. throw MakeStringException(-1, "Invalid cluster name: %s", cluster);
  787. if (querytype == SQLTypeCreateAndLoad)
  788. clonable = false;
  789. context.addTraceSummaryTimeStamp(LogNormal, "StartECLGenerate");
  790. ECLEngine::generateECL(parsedSQL, ecltext);
  791. if (hashoptions.length() > 0)
  792. ecltext.insert(0, hashoptions.str());
  793. context.addTraceSummaryTimeStamp(LogNormal, "EndECLGenerate");
  794. if (isEmpty(ecltext))
  795. throw MakeStringException(1,"Could not generate ECL from SQL.");
  796. ecltext.appendf(EMBEDDEDSQLQUERYCOMMENT, sqltext.str(), normalizedSQL.str());
  797. #if defined _DEBUG
  798. fprintf(stderr, "GENERATED ECL:\n%s\n", ecltext.str());
  799. #endif
  800. ESPLOG(LogMax, "WsSQL: creating new WU...");
  801. NewWsWorkunit wu(context);
  802. compiledwuid.set(wu->queryWuid());
  803. wu->setJobName("WsSQL Job");
  804. wu.setQueryText(ecltext.str());
  805. wu->setClusterName(cluster);
  806. if (clonable)
  807. wu->setCloneable(true);
  808. wu->setAction(WUActionCompile);
  809. if (resultLimit)
  810. wu->setResultLimit(resultLimit);
  811. if (wuusername && *wuusername)
  812. wu->setUser(wuusername);
  813. wu->commit();
  814. wu.clear();
  815. context.addTraceSummaryTimeStamp(LogNormal, "strtWUCompile");
  816. WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, NULL, 0, true, false, false, NULL, NULL, NULL);
  817. waitForWorkUnitToCompile(compiledwuid.str(), req.getWait());
  818. context.addTraceSummaryTimeStamp(LogNormal, "endWUCompile");
  819. }
  820. }
  821. ESPLOG(LogMax, "WsSQL: opening WU...");
  822. Owned<IConstWorkUnit> cw = factory->openWorkUnit(compiledwuid.str(), false);
  823. if (!cw)
  824. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", compiledwuid.str());
  825. WsWUExceptions errors(*cw);
  826. if (errors.ErrCount()>0)
  827. {
  828. WsWuInfo winfo(context, compiledwuid.str());
  829. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  830. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  831. }
  832. else
  833. {
  834. if (querytype == SQLTypeCall)
  835. createWUXMLParams(xmlparams, parsedSQL, NULL, cw);
  836. else if (querytype == SQLTypeSelect)
  837. {
  838. if (notEmpty(cluster) && !isValidCluster(cluster))
  839. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster);
  840. createWUXMLParams(xmlparams, parsedSQL->getParamList());
  841. }
  842. StringBuffer runningwuid;
  843. if (clonable)
  844. {
  845. context.addTraceSummaryTimeStamp(LogNormal, "StartWUCloneExe");
  846. cloneAndExecuteWU(context, compiledwuid.str(), runningwuid, xmlparams.str(), NULL, NULL, cluster);
  847. context.addTraceSummaryTimeStamp(LogNormal, "EndWUCloneExe");
  848. if(cacheeligible && !isQueryCached(normalizedSQL.str()))
  849. addQueryToCache(normalizedSQL.str(), compiledwuid.str());
  850. }
  851. else
  852. {
  853. context.addTraceSummaryTimeStamp(LogNormal, "StartWUSubmit");
  854. WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, NULL, 0, false, true, true, NULL, NULL, NULL);
  855. context.addTraceSummaryTimeStamp(LogNormal, "EndWUSubmit");
  856. runningwuid.set(compiledwuid.str());
  857. if (cacheeligible)
  858. addQueryToCache(normalizedSQL.str(), runningwuid.str());
  859. }
  860. int timeToWait = req.getWait();
  861. if (timeToWait != 0)
  862. {
  863. context.addTraceSummaryTimeStamp(LogNormal, "StartWUProcessWait");
  864. waitForWorkUnitToComplete(runningwuid.str(), timeToWait);
  865. context.addTraceSummaryTimeStamp(LogNormal, "EndWUProcessWait");
  866. }
  867. if (strcmp(runningwuid.str(), compiledwuid.str())!=0)
  868. resp.setParentWuId(compiledwuid.str());
  869. resp.setResultLimit(resultLimit);
  870. resp.setResultWindowCount( (unsigned)resultWindowCount);
  871. resp.setResultWindowStart( (unsigned)resultWindowStart);
  872. if (!req.getSuppressResults())
  873. {
  874. StringBuffer result;
  875. if (getWUResult(context, runningwuid.str(), result, (unsigned)resultWindowStart, (unsigned)resultWindowCount, 0, WSSQLRESULT, req.getSuppressXmlSchema() ? NULL : WSSQLRESULTSCHEMA))
  876. {
  877. StringBuffer count;
  878. if (getWUResult( context, runningwuid.str(), count , 0, 1, 1, WSSQLCOUNT, NULL))
  879. result.append(count.str());
  880. resp.setResult(result.str());
  881. }
  882. }
  883. WsWuInfo winfo(context, runningwuid);
  884. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  885. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  886. }
  887. AuditSystemAccess(context.queryUserId(), true, "Updated %s", compiledwuid.str());
  888. }
  889. catch(IException* e)
  890. {
  891. FORWARDEXCEPTION(context, e, -1);
  892. }
  893. //catch (...)
  894. //{
  895. // me->append(*MakeStringException(0,"Unknown exception submitting %s",wuid.str()));
  896. //}
  897. context.addTraceSummaryTimeStamp(LogMin, "EndOnExecuteSQL");
  898. return true;
  899. }
  900. void CwssqlEx::createWUXMLParams(StringBuffer & xmlparams, const IArrayOf <ISQLExpression> * parameterlist)
  901. {
  902. xmlparams.append("<root>");
  903. for (int expindex = 0; expindex < parameterlist->length(); expindex++)
  904. {
  905. ISQLExpression * exp = &parameterlist->item(expindex);
  906. if (exp->getExpType() == Value_ExpressionType)
  907. {
  908. SQLValueExpression * currentvalplaceholder = static_cast<SQLValueExpression *>(exp);
  909. currentvalplaceholder->trimTextQuotes();
  910. xmlparams.appendf("<%s>", currentvalplaceholder->getPlaceHolderName());
  911. encodeXML(currentvalplaceholder->getValue(), xmlparams);
  912. xmlparams.appendf("</%s>", currentvalplaceholder->getPlaceHolderName());
  913. }
  914. else
  915. ESPLOG(LogNormal, "WsSQL: attempted to create XML params from unexpected expression type.");
  916. }
  917. xmlparams.append("</root>");
  918. DBGLOG("XML PARAMS: %s", xmlparams.str());
  919. }
  920. //Integrates all "variables" into "param" based xml
  921. void CwssqlEx::createWUXMLParams(StringBuffer & xmlparams, HPCCSQLTreeWalker* parsedSQL, IArrayOf<IConstNamedValue> *variables, IConstWorkUnit * cw)
  922. {
  923. IArrayOf<IConstWUResult> expectedparams;
  924. if (cw)
  925. {
  926. IConstWUResultIterator &vars = cw->getVariables();
  927. ForEach(vars)
  928. {
  929. IConstWUResult &cur = vars.query();
  930. expectedparams.append(cur);
  931. }
  932. }
  933. if (expectedparams.length() > 0)
  934. {
  935. int totalvars = 0;
  936. if (variables)
  937. totalvars = variables->length();
  938. if (parsedSQL && parsedSQL->getSqlType() == SQLTypeCall)
  939. {
  940. IArrayOf<ISQLExpression> * embeddedparams = NULL;
  941. if (parsedSQL)
  942. embeddedparams = parsedSQL->getStoredProcParamList();
  943. int parametersidx = 0;
  944. int varsidx = 0;
  945. SCMStringBuffer varname;
  946. if (embeddedparams && embeddedparams->length()>0)
  947. {
  948. xmlparams.append("<root>");
  949. for(int i=0; i < embeddedparams->length() && i < expectedparams.length(); i++)
  950. {
  951. expectedparams.item(i).getResultName(varname);
  952. xmlparams.append("<").append(varname.s.str()).append(">");
  953. ISQLExpression* paramcol = &embeddedparams->item(i);
  954. if (paramcol->getExpType() == ParameterPlaceHolder_ExpressionType)
  955. {
  956. if (varsidx < totalvars)
  957. {
  958. IConstNamedValue &item = variables->item(varsidx++);
  959. const char *value = item.getValue();
  960. if(value && *value)
  961. encodeXML(value, xmlparams);
  962. // else ??
  963. }
  964. }
  965. else
  966. {
  967. paramcol->toString(xmlparams,false);
  968. }
  969. xmlparams.append("</").append(varname.s.str()).append(">");
  970. }
  971. xmlparams.append("</root>");
  972. }
  973. }
  974. else
  975. {
  976. int parametersidx = 0;
  977. int varsidx = 0;
  978. SCMStringBuffer varname;
  979. xmlparams.append("<root>");
  980. for(int i=0; i < expectedparams.length() && i < totalvars; i++)
  981. {
  982. expectedparams.item(i).getResultName(varname);
  983. xmlparams.append("<").append(varname.s.str()).append(">");
  984. IConstNamedValue &item = variables->item(i);
  985. char * value = ((char *)item.getValue());
  986. if (value && *value)
  987. {
  988. while(value && isspace(*value)) //fast trim left
  989. value++;
  990. int len = strlen(value);
  991. while(len && isspace(value[len-1]))
  992. len--;
  993. value[len] = '\0';//fast trim right, even if len didn't change
  994. if (len >= 2)
  995. {
  996. //WU cloning mechanism doesn't handle quoted strings very well...
  997. //We're forced to blindly remove them here...
  998. if (value[0] == '\'' && value[len-1] == '\'')
  999. {
  1000. value[len-1] = '\0'; //clip rightmost quote
  1001. value++; //clip leftmost quote
  1002. }
  1003. }
  1004. if(len)
  1005. encodeXML(value, xmlparams);
  1006. // else ??
  1007. }
  1008. xmlparams.append("</").append(varname.s.str()).append(">");
  1009. }
  1010. xmlparams.append("</root>");
  1011. }
  1012. }
  1013. }
  1014. bool CwssqlEx::onExecutePreparedSQL(IEspContext &context, IEspExecutePreparedSQLRequest &req, IEspExecutePreparedSQLResponse &resp)
  1015. {
  1016. try
  1017. {
  1018. context.ensureFeatureAccess(WSSQLACCESS, SecAccess_Write, -1, "WsSQL::ExecutePreparedSQL: Permission denied.");
  1019. const char *cluster = req.getTargetCluster();
  1020. if (notEmpty(cluster) && !isValidCluster(cluster))
  1021. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", cluster);
  1022. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1023. StringBuffer runningWuId;
  1024. const char* parentWuId = req.getWuId();
  1025. Owned<IConstWorkUnit> cw = factory->openWorkUnit(parentWuId, false);
  1026. if (!cw)
  1027. throw MakeStringException(-1,"Cannot open workunit %s.", parentWuId);
  1028. WsWUExceptions errors(*cw);
  1029. if (errors.ErrCount()>0)
  1030. {
  1031. WsWuInfo winfo(context, parentWuId);
  1032. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  1033. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1034. }
  1035. else
  1036. {
  1037. StringBuffer xmlparams;
  1038. createWUXMLParams(xmlparams, NULL, &req.getVariables(),cw);
  1039. if (parentWuId && *parentWuId)
  1040. {
  1041. cloneAndExecuteWU(context, parentWuId, runningWuId, xmlparams, NULL, NULL, cluster);
  1042. }
  1043. else
  1044. throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Missing WuId");
  1045. int timeToWait = req.getWait();
  1046. if (timeToWait != 0)
  1047. waitForWorkUnitToComplete(runningWuId.str(), timeToWait);
  1048. Owned<IConstWorkUnit> cw = factory->openWorkUnit(runningWuId.str(), false);
  1049. if (!cw)
  1050. throw MakeStringException(-1,"Cannot open workunit %s.", runningWuId.str());
  1051. resp.setParentWuId(parentWuId);
  1052. __int64 resultWindowStart = req.getResultWindowStart();
  1053. __int64 resultWindowCount = req.getResultWindowCount();
  1054. if (resultWindowStart < 0 || resultWindowCount <0 )
  1055. throw MakeStringException(-1,"Invalid result window value");
  1056. if (!req.getSuppressResults())
  1057. {
  1058. StringBuffer result;
  1059. if (getWUResult(context, runningWuId.str(), result, (unsigned)resultWindowStart, (unsigned)resultWindowCount, 0, WSSQLRESULT, req.getSuppressXmlSchema() ? NULL : WSSQLRESULTSCHEMA))
  1060. resp.setResult(result.str());
  1061. }
  1062. WsWuInfo winfo(context, runningWuId);
  1063. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  1064. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1065. winfo.getVariables(resp.updateWorkunit(), WUINFO_All);
  1066. }
  1067. }
  1068. catch(IException* e)
  1069. {
  1070. FORWARDEXCEPTION(context, e, ECLWATCH_INTERNAL_ERROR);
  1071. }
  1072. return true;
  1073. }
  1074. bool CwssqlEx::isQueryCached(const char * sqlQuery)
  1075. {
  1076. CriticalBlock block(critCache);
  1077. return (sqlQuery && cachedSQLQueries.find(sqlQuery) != cachedSQLQueries.end());
  1078. }
  1079. bool CwssqlEx::getCachedQuery(const char * sqlQuery, StringBuffer & wuid)
  1080. {
  1081. CriticalBlock block(critCache);
  1082. if(sqlQuery && cachedSQLQueries.find(sqlQuery) != cachedSQLQueries.end())
  1083. {
  1084. wuid.set(cachedSQLQueries.find(sqlQuery)->second.c_str());
  1085. return true;
  1086. }
  1087. return false;
  1088. }
  1089. void CwssqlEx::removeQueryFromCache(const char * sqlQuery)
  1090. {
  1091. CriticalBlock block(critCache);
  1092. cachedSQLQueries.erase(sqlQuery);
  1093. }
  1094. bool CwssqlEx::addQueryToCache(const char * sqlQuery, const char * wuid)
  1095. {
  1096. if (sqlQuery && *sqlQuery && wuid && *wuid)
  1097. {
  1098. CriticalBlock block(critCache);
  1099. if (isCacheExpired())
  1100. {
  1101. ESPLOG(LogNormal, "WsSQL: Query Cache has expired and is being flushed.");
  1102. //Flushing cache logic could have been in dedicated function, but
  1103. //putting it here makes this action more atomic, less synchronization concerns
  1104. cachedSQLQueries.clear();
  1105. setNewCacheFlushTime();
  1106. }
  1107. cachedSQLQueries.insert(std::pair<std::string,std::string>(sqlQuery, wuid));
  1108. }
  1109. return false;
  1110. }
  1111. bool CwssqlEx::onPrepareSQL(IEspContext &context, IEspPrepareSQLRequest &req, IEspPrepareSQLResponse &resp)
  1112. {
  1113. bool success = false;
  1114. StringBuffer sqltext;
  1115. StringBuffer ecltext;
  1116. bool clonable = false;
  1117. try
  1118. {
  1119. context.ensureFeatureAccess(WSSQLACCESS, SecAccess_Write, -1, "WsSQL::PrepareSQL: Permission denied.");
  1120. double version = context.getClientVersion();
  1121. StringBuffer username;
  1122. context.getUserID(username);
  1123. const char* passwd = context.queryPassword();
  1124. sqltext.set(req.getSqlText());
  1125. if (sqltext.length() <= 0)
  1126. throw MakeStringException(1,"Empty SQL request.");
  1127. Owned<HPCCSQLTreeWalker> parsedSQL;
  1128. parsedSQL.setown(parseSQL(context, sqltext, false));
  1129. if (parsedSQL->getSqlType() == SQLTypeCall)
  1130. {
  1131. if (strlen(parsedSQL->getQuerySetName())==0)
  1132. {
  1133. if (strlen(req.getTargetQuerySet())==0)
  1134. throw MakeStringException(-1,"Missing Target QuerySet.");
  1135. else
  1136. parsedSQL->setQuerySetName(req.getTargetQuerySet());
  1137. }
  1138. }
  1139. const char * cluster = req.getTargetCluster();
  1140. StringBuffer hashoptions;
  1141. if (version > 3.03)
  1142. {
  1143. StringArray & alternates = req.getAlternateClusters();
  1144. if (alternates.length() > 0)
  1145. processMultipleClusterOption(alternates, cluster, hashoptions);
  1146. }
  1147. StringBuffer xmlparams;
  1148. StringBuffer normalizedSQL(parsedSQL->getNormalizedSQL());
  1149. normalizedSQL.append(" | --TC=").append(cluster);
  1150. if (username.length() > 0)
  1151. normalizedSQL.append("--USER=").append(username.str());
  1152. if (hashoptions.length()>0)
  1153. normalizedSQL.append("--HO=").append(hashoptions.str());
  1154. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1155. SCMStringBuffer wuid;
  1156. if(getCachedQuery(normalizedSQL.str(), wuid.s))
  1157. {
  1158. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid.str(), false);
  1159. if (!cw)//cache hit but unavailable WU
  1160. {
  1161. removeQueryFromCache(normalizedSQL.str());
  1162. wuid.clear();
  1163. }
  1164. }
  1165. if(wuid.length()==0)
  1166. {
  1167. if (parsedSQL->getSqlType() == SQLTypeCall)
  1168. {
  1169. WsEclWuInfo wsinfo("", parsedSQL->getQuerySetName(), parsedSQL->getStoredProcName(), username.str(), passwd);
  1170. wuid.set(wsinfo.ensureWuid());
  1171. //if call somePublishedQuery(1,2,3);
  1172. // or
  1173. // someotherPublishedQuery(1,?)
  1174. //must clone published query wuid and set params
  1175. //else just return published query WUID
  1176. if (parsedSQL->getStoredProcParamListCount() > 0 && !parsedSQL->isParameterizedCall() )
  1177. throw MakeStringException(-1, "Prepared Call query must be fully parameterized");
  1178. //KEEP THIS AROUND IF WE WANT TO SUPPORT CLONING WU with embedded params
  1179. // if (parsedSQL->isParameterizedCall())
  1180. // {
  1181. // createXMLParams(xmlparams, parsedSQL, NULL, wsinfo.wu);
  1182. //
  1183. // SCMStringBuffer newwuid;
  1184. // NewWsWorkunit wu(context);
  1185. // wu->getWuid(newwuid);
  1186. // if (xmlparams && *xmlparams)
  1187. // wu->setXmlParams(xmlparams);
  1188. //
  1189. // WsWuProcess::copyWsWorkunit(context, *wu, wuid.s.str());
  1190. //
  1191. // //StringBuffer params;
  1192. // //toXML(wu->getXmlParams(), params, true, true);
  1193. // wu->setCloneable(true);
  1194. // wu->setAction(WUActionCompile);
  1195. //
  1196. // wu->commit();
  1197. // wu.clear();
  1198. //
  1199. // WsWuProcess::submitWsWorkunit(context, newwuid.str(), req.getTargetCluster(), NULL, 0, true, false, false, xmlparams.str(), NULL, NULL);
  1200. // waitForWorkUnitToCompile(newwuid.str(), req.getWait());
  1201. //
  1202. // wuid.s.set(newwuid.str());
  1203. // }
  1204. }
  1205. else
  1206. {
  1207. if (isEmpty(cluster))
  1208. throw MakeStringException(1,"Target cluster not set.");
  1209. if (!isValidCluster(cluster))
  1210. throw MakeStringException(-1/*ECLWATCH_INVALID_CLUSTER_NAME*/, "Invalid cluster name: %s", cluster);
  1211. ECLEngine::generateECL(parsedSQL, ecltext);
  1212. if (hashoptions.length() > 0)
  1213. ecltext.insert(0, hashoptions.str());
  1214. #if defined _DEBUG
  1215. fprintf(stderr, "GENERATED ECL:\n%s\n", ecltext.str());
  1216. #endif
  1217. if (isEmpty(ecltext))
  1218. throw MakeStringException(1,"Could not generate ECL from SQL.");
  1219. //ecltext.appendf("\n\n/****************************************************\nOriginal SQL: \"%s\"\nNormalized SQL: \"%s\"\n****************************************************/\n", sqltext.str(), normalizedSQL.str());
  1220. ecltext.appendf(EMBEDDEDSQLQUERYCOMMENT, sqltext.str(), normalizedSQL.str());
  1221. NewWsWorkunit wu(context);
  1222. wuid.set(wu->queryWuid());
  1223. wu->setClusterName(cluster);
  1224. wu->setCloneable(true);
  1225. wu->setAction(WUActionCompile);
  1226. wu.setQueryText(ecltext.str());
  1227. wu->setJobName("WsSQL PreparedQuery Job");
  1228. StringBuffer xmlparams;
  1229. createWUXMLParams(xmlparams, parsedSQL, NULL, NULL);
  1230. wu->commit();
  1231. wu.clear();
  1232. WsWuHelpers::submitWsWorkunit(context, wuid.str(), cluster, NULL, 0, true, false, false, xmlparams.str(), NULL, NULL);
  1233. success = waitForWorkUnitToCompile(wuid.str(), req.getWait());
  1234. }
  1235. if (success)
  1236. addQueryToCache(normalizedSQL.str(), wuid.s.str());
  1237. }
  1238. WsWuInfo winfo(context, wuid.str());
  1239. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  1240. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1241. StringBuffer result;
  1242. getWUResult(context, wuid.str(), result, 0, 0, 0, WSSQLRESULT, WSSQLRESULTSCHEMA);
  1243. resp.setResult(result);
  1244. AuditSystemAccess(context.queryUserId(), true, "Updated %s", wuid.str());
  1245. }
  1246. catch(IException* e)
  1247. {
  1248. FORWARDEXCEPTION(context, e, -1);
  1249. }
  1250. return true;
  1251. }
  1252. bool CwssqlEx::executePublishedQueryByName(IEspContext &context, const char * queryset, const char * queryname, StringBuffer &clonedwuid, const char *paramXml, IArrayOf<IConstNamedValue> *variables, const char * targetcluster, int start, int count)
  1253. {
  1254. bool success = true;
  1255. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1256. if (factory.get())
  1257. {
  1258. StringBuffer wuid;
  1259. StringBuffer username;
  1260. context.getUserID(username);
  1261. const char* passwd = context.queryPassword();
  1262. WsEclWuInfo wsinfo(wuid, queryset, queryname, username.str(), passwd);
  1263. success = executePublishedQueryByWuId(context, wsinfo.ensureWuid(), clonedwuid, paramXml, variables, targetcluster, start, count);
  1264. }
  1265. else
  1266. success = false;
  1267. return success;
  1268. }
  1269. bool CwssqlEx::executePublishedQueryByWuId(IEspContext &context, const char * targetwuid, StringBuffer &clonedwuid, const char *paramXml, IArrayOf<IConstNamedValue> *variables, const char * targetcluster, int start, int count)
  1270. {
  1271. bool success = true;
  1272. if (targetwuid && *targetwuid)
  1273. {
  1274. success = cloneAndExecuteWU(context, targetwuid, clonedwuid, paramXml, variables, NULL, targetcluster);
  1275. /*
  1276. if (waittime != 0)
  1277. waitForWorkUnitToComplete(clonedwui.str(), waittime);
  1278. Owned<IConstWorkUnit> cw = factory->openWorkUnit(clonedwui.str(), false);
  1279. if (!cw)
  1280. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", clonedwui.str());
  1281. getWUResult(context, clonedwui.str(), resp, start, count);
  1282. */
  1283. }
  1284. else
  1285. success = false;
  1286. return success;
  1287. }
  1288. bool CwssqlEx::executePublishedQuery(IEspContext &context, const char * queryset, const char * queryname, StringBuffer &resp, int start, int count, int waittime)
  1289. {
  1290. bool success = true;
  1291. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1292. if (factory.get())
  1293. {
  1294. StringBuffer wuid;
  1295. StringBuffer username;
  1296. context.getUserID(username);
  1297. const char* passwd = context.queryPassword();
  1298. WsEclWuInfo wsinfo(wuid, queryset, queryname, username.str(), passwd);
  1299. StringBuffer clonedwui;
  1300. cloneAndExecuteWU(context, wsinfo.ensureWuid(), clonedwui, NULL, NULL, NULL, "");
  1301. if (waittime != 0)
  1302. waitForWorkUnitToComplete(clonedwui.str(), waittime);
  1303. Owned<IConstWorkUnit> cw = factory->openWorkUnit(clonedwui.str(), false);
  1304. if (!cw)
  1305. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", clonedwui.str());
  1306. getWUResult(context, clonedwui.str(), resp, start, count, 0, WSSQLRESULT, WSSQLRESULTSCHEMA);
  1307. }
  1308. else
  1309. success = false;
  1310. return success;
  1311. }
  1312. bool CwssqlEx::executePublishedQuery(IEspContext &context, const char * wuid, StringBuffer &resp, int start, int count, int waittime)
  1313. {
  1314. bool success = true;
  1315. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1316. if (factory.get())
  1317. {
  1318. StringBuffer username;
  1319. context.getUserID(username);
  1320. const char* passwd = context.queryPassword();
  1321. WsEclWuInfo wsinfo(wuid, "", "", username.str(), passwd);
  1322. StringBuffer clonedwui;
  1323. cloneAndExecuteWU(context, wsinfo.ensureWuid(), clonedwui, NULL, NULL, NULL, "");
  1324. if (waittime != 0)
  1325. waitForWorkUnitToComplete(clonedwui.str(), waittime);
  1326. Owned<IConstWorkUnit> cw = factory->openWorkUnit(clonedwui.str(), false);
  1327. if (!cw)
  1328. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open workunit %s.", clonedwui.str());
  1329. getWUResult(context, clonedwui.str(), resp, start, count, 0, WSSQLRESULT, WSSQLRESULTSCHEMA);
  1330. }
  1331. else
  1332. success = false;
  1333. return success;
  1334. }
  1335. bool CwssqlEx::cloneAndExecuteWU(IEspContext &context, const char * originalwuid, StringBuffer &clonedwuid, const char *paramXml, IArrayOf<IConstNamedValue> *variables, IArrayOf<IConstNamedValue> *debugs, const char * targetcluster)
  1336. {
  1337. bool success = true;
  1338. try
  1339. {
  1340. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1341. if (originalwuid && *originalwuid)
  1342. {
  1343. if (!looksLikeAWuid(originalwuid, 'W'))
  1344. throw MakeStringException(ECLWATCH_INVALID_INPUT, "Invalid Workunit ID: %s", originalwuid);
  1345. Owned<IConstWorkUnit> pwu = factory->openWorkUnit(originalwuid, false);
  1346. if (!pwu)
  1347. throw MakeStringException(-1,"Cannot open workunit %s.", originalwuid);
  1348. if (pwu->getExceptionCount()>0)
  1349. {
  1350. WsWUExceptions errors(*pwu);
  1351. if (errors.ErrCount()>0)
  1352. throw MakeStringException(-1,"Original query contains errors %s.", originalwuid);
  1353. }
  1354. StringBufferAdaptor isvWuid(clonedwuid);
  1355. WsWuHelpers::runWsWorkunit(
  1356. context,
  1357. clonedwuid,
  1358. originalwuid,
  1359. targetcluster,
  1360. paramXml,
  1361. variables,
  1362. debugs);
  1363. }
  1364. else
  1365. throw MakeStringException(ECLWATCH_MISSING_PARAMS,"Missing WuId");
  1366. }
  1367. catch(IException* e)
  1368. {
  1369. FORWARDEXCEPTION(context, e, -1);
  1370. }
  1371. return success;
  1372. }
  1373. bool CwssqlEx::onCreateTableAndLoad(IEspContext &context, IEspCreateTableAndLoadRequest &req, IEspCreateTableAndLoadResponse &resp)
  1374. {
  1375. context.ensureFeatureAccess(WSSQLACCESS, SecAccess_Write, -1, "WsSQL::CreateTableAndLoad: Permission denied.");
  1376. bool success = true;
  1377. const char * targetTableName = req.getTableName();
  1378. if (!targetTableName || !*targetTableName)
  1379. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: TableName cannot be empty.");
  1380. if (!HPCCFile::validateFileName(targetTableName))
  1381. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Target TableName is invalid: %s.", targetTableName);
  1382. const char * cluster = req.getTargetCluster();
  1383. if (notEmpty(cluster) && !isValidCluster(cluster))
  1384. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "WsSQL::CreateTableAndLoad: Invalid cluster name: %s", cluster);
  1385. IConstDataSourceInfo & datasource = req.getDataSource();
  1386. StringBuffer sourceDataFileName;
  1387. sourceDataFileName.set(datasource.getSprayedFileName()).trim();
  1388. if (sourceDataFileName.length() == 0)
  1389. {
  1390. sourceDataFileName.set(datasource.getLandingZoneFileName());
  1391. if (sourceDataFileName.length() == 0)
  1392. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Data Source File Name cannot be empty, provide either sprayed file name, or landing zone file name.");
  1393. const char * lzIP = datasource.getLandingZoneIP();
  1394. if (!lzIP || !*lzIP)
  1395. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: LandingZone IP cannot be empty if targeting a landing zone file.");
  1396. StringBuffer lzPath(datasource.getLandingZonePath());
  1397. if (!lzPath.length())
  1398. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Landingzone path cannot be empty.");
  1399. addPathSepChar(lzPath);
  1400. RemoteFilename rfn;
  1401. SocketEndpoint ep(lzIP);
  1402. rfn.setPath(ep, lzPath.append(sourceDataFileName.str()).str());
  1403. CDfsLogicalFileName dlfn;
  1404. dlfn.setExternal(rfn);
  1405. dlfn.get(sourceDataFileName.clear(), false, false);
  1406. }
  1407. IConstDataType & format = req.getDataSourceType();
  1408. const char * formatname = "";
  1409. CHPCCFileType formattype = format.getType();
  1410. switch (formattype)
  1411. {
  1412. case CHPCCFileType_FLAT:
  1413. formatname = "FLAT";
  1414. break;
  1415. case CHPCCFileType_CSV:
  1416. formatname = "CSV";
  1417. break;
  1418. case CHPCCFileType_JSON:
  1419. formatname = "JSON";
  1420. break;
  1421. case CHPCCFileType_XML:
  1422. formatname = "XML";
  1423. break;
  1424. default:
  1425. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Invalid file format detected.");
  1426. }
  1427. StringBuffer ecl;
  1428. StringBuffer recDef;
  1429. ecl.set("import std;\n");
  1430. {
  1431. IArrayOf<IConstEclFieldDeclaration>& eclFields = req.getEclFields();
  1432. if (eclFields.length() == 0)
  1433. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Empty record definition detected.");
  1434. recDef.set("TABLERECORDDEF := RECORD\n");
  1435. ForEachItemIn(fieldindex, eclFields)
  1436. {
  1437. IConstEclFieldDeclaration &eclfield = eclFields.item(fieldindex);
  1438. IConstEclFieldType &ecltype = eclfield.getEclFieldType();
  1439. const char * name = "";
  1440. CHPCCFieldType format = ecltype.getType();
  1441. switch (format)
  1442. {
  1443. case CHPCCFieldType_BOOLEAN:
  1444. name = "BOOLEAN";
  1445. break;
  1446. case CHPCCFieldType_INTEGER:
  1447. name = "INTEGER";
  1448. break;
  1449. case CHPCCFieldType_xUNSIGNED:
  1450. name = "UNSIGNED";
  1451. break;
  1452. case CHPCCFieldType_REAL:
  1453. name = "REAL";
  1454. break;
  1455. case CHPCCFieldType_DECIMAL:
  1456. name = "DECIMAL";
  1457. break;
  1458. case CHPCCFieldType_xSTRING:
  1459. name = "STRING";
  1460. break;
  1461. case CHPCCFieldType_QSTRING:
  1462. name = "QSTRING";
  1463. break;
  1464. case CHPCCFieldType_UNICODE:
  1465. name = "UNICODE";
  1466. break;
  1467. case CHPCCFieldType_DATA:
  1468. name = "DATA";
  1469. break;
  1470. case CHPCCFieldType_VARSTRING:
  1471. name = "VARSTRING";
  1472. break;
  1473. case CHPCCFieldType_VARUNICODE:
  1474. name = "VARUNICODE";
  1475. break;
  1476. default:
  1477. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Unrecognized field type detected.");
  1478. }
  1479. int len = ecltype.getLength();
  1480. const char * locale = ecltype.getLocale();
  1481. int precision = ecltype.getPrecision();
  1482. recDef.appendf("\t%s", name);
  1483. if (len > 0)
  1484. {
  1485. if(isdigit(recDef.charAt(recDef.length() - 1)))
  1486. recDef.append("_");
  1487. recDef.append(len);
  1488. }
  1489. if (locale && *locale)
  1490. recDef.append(locale);
  1491. if (precision > 0)
  1492. recDef.appendf("_%d", precision);
  1493. recDef.appendf(" %s;\n", eclfield.getFieldName());
  1494. }
  1495. recDef.append("END;\n");
  1496. }
  1497. ecl.append(recDef.str());
  1498. bool overwrite = req.getOverwrite();
  1499. StringBuffer formatnamefull(formatname);
  1500. IArrayOf<IConstDataTypeParam> & formatparams = format.getParams();
  1501. int formatparamscount = formatparams.length();
  1502. if (formatparamscount > 0 )
  1503. {
  1504. formatnamefull.append("(");
  1505. for (int paramindex = 0; paramindex < formatparamscount; paramindex++)
  1506. {
  1507. IConstDataTypeParam &paramitem = formatparams.item(paramindex);
  1508. const char * paramname = paramitem.getName();
  1509. if (!paramname || !*paramname)
  1510. throw MakeStringException(-1, "WsSQL::CreateTableAndLoad: Error: Format type '%s' appears to have unnamed parameter(s).", formatname);
  1511. StringArray & paramvalues = paramitem.getValues();
  1512. int paramvalueslen = paramvalues.length();
  1513. formatnamefull.appendf("%s(", paramname);
  1514. if (paramvalueslen > 1)
  1515. formatnamefull.append("[");
  1516. for (int paramvaluesindex = 0; paramvaluesindex < paramvalueslen; paramvaluesindex++)
  1517. {
  1518. formatnamefull.appendf("'%s'%s", paramvalues.item(paramvaluesindex), paramvaluesindex < paramvalueslen-1 ? "," : "");
  1519. }
  1520. if (paramvalueslen > 1)
  1521. formatnamefull.append("]");
  1522. formatnamefull.append(")");
  1523. if (paramindex < formatparamscount-1)
  1524. formatnamefull.append(",");
  1525. }
  1526. formatnamefull.append(")");
  1527. }
  1528. ecl.appendf("\nFILEDATASET := DATASET('~%s', TABLERECORDDEF, %s);\n",sourceDataFileName.str(), formatnamefull.str());
  1529. ecl.appendf("OUTPUT(FILEDATASET, ,'~%s'%s);", targetTableName, overwrite ? ", OVERWRITE" : "");
  1530. const char * description = req.getTableDescription();
  1531. if (description && * description)
  1532. ecl.appendf("\nStd.file.setfiledescription('~%s','%s')", targetTableName, description);
  1533. ESPLOG(LogMax, "WsSQL: creating new WU...");
  1534. NewWsWorkunit wu(context);
  1535. SCMStringBuffer compiledwuid;
  1536. compiledwuid.set(wu->queryWuid());
  1537. wu->setJobName("WsSQL Create table");
  1538. wu.setQueryText(ecl.str());
  1539. wu->setClusterName(cluster);
  1540. wu->setAction(WUActionCompile);
  1541. const char * wuusername = req.getOwner();
  1542. if (wuusername && *wuusername)
  1543. wu->setUser(wuusername);
  1544. wu->commit();
  1545. wu.clear();
  1546. ESPLOG(LogMax, "WsSQL: compiling WU...");
  1547. WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, NULL, 0, true, false, false, NULL, NULL, NULL);
  1548. waitForWorkUnitToCompile(compiledwuid.str(), req.getWait());
  1549. ESPLOG(LogMax, "WsSQL: finish compiling WU...");
  1550. ESPLOG(LogMax, "WsSQL: opening WU...");
  1551. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1552. Owned<IConstWorkUnit> cw = factory->openWorkUnit(compiledwuid.str(), false);
  1553. if (!cw)
  1554. throw MakeStringException(ECLWATCH_CANNOT_UPDATE_WORKUNIT,"Cannot open WorkUnit %s.", compiledwuid.str());
  1555. WsWUExceptions errors(*cw);
  1556. if (errors.ErrCount()>0)
  1557. {
  1558. WsWuInfo winfo(context, compiledwuid.str());
  1559. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1560. success = false;
  1561. }
  1562. else
  1563. {
  1564. ESPLOG(LogMax, "WsSQL: executing WU(%s)...", compiledwuid.str());
  1565. WsWuHelpers::submitWsWorkunit(context, compiledwuid.str(), cluster, NULL, 0, false, true, true, NULL, NULL, NULL);
  1566. ESPLOG(LogMax, "WsSQL: waiting on WU(%s)...", compiledwuid.str());
  1567. waitForWorkUnitToComplete(compiledwuid.str(), req.getWait());
  1568. ESPLOG(LogMax, "WsSQL: finished waiting on WU(%s)...", compiledwuid.str());
  1569. Owned<IConstWorkUnit> rw = factory->openWorkUnit(compiledwuid.str(), false);
  1570. if (!rw)
  1571. throw MakeStringException(-1,"WsSQL: Cannot verify create and load request success.");
  1572. WsWuInfo winfo(context, compiledwuid.str());
  1573. WsWUExceptions errors(*rw);
  1574. if (errors.ErrCount() > 0 )
  1575. {
  1576. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1577. success = false;
  1578. }
  1579. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  1580. resp.setSuccess(success);
  1581. resp.setEclRecordDefinition(recDef.str());
  1582. resp.setTableName(targetTableName);
  1583. }
  1584. return success;
  1585. }
  1586. bool CwssqlEx::onGetResults(IEspContext &context, IEspGetResultsRequest &req, IEspGetResultsResponse &resp)
  1587. {
  1588. context.ensureFeatureAccess(WSSQLACCESS, SecAccess_Read, -1, "WsSQL::GetResults: Permission denied.");
  1589. bool success = true;
  1590. const char* parentWuId = req.getWuId();
  1591. if (parentWuId && *parentWuId)
  1592. {
  1593. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1594. if (factory.get())
  1595. {
  1596. Owned<IConstWorkUnit> cw = factory->openWorkUnit(parentWuId, false);
  1597. if (!cw)
  1598. throw MakeStringException(-1,"Cannot open workunit %s.", parentWuId);
  1599. __int64 resultWindowStart = req.getResultWindowStart();
  1600. __int64 resultWindowCount = req.getResultWindowCount();
  1601. if (resultWindowStart < 0 || resultWindowCount <0 )
  1602. throw MakeStringException(-1,"Invalid result window value");
  1603. //resp.setResultLimit(resultLimit);
  1604. resp.setResultWindowCount((unsigned)resultWindowCount);
  1605. resp.setResultWindowStart((unsigned)resultWindowStart);
  1606. StringBuffer result;
  1607. if (getWUResult(context, parentWuId, result, (unsigned)resultWindowStart, (unsigned)resultWindowCount, 0, WSSQLRESULT, req.getSuppressXmlSchema() ? NULL : WSSQLRESULTSCHEMA))
  1608. resp.setResult(result.str());
  1609. WsWuInfo winfo(context, parentWuId);
  1610. winfo.getCommon(resp.updateWorkunit(), WUINFO_All);
  1611. winfo.getExceptions(resp.updateWorkunit(), WUINFO_All);
  1612. }
  1613. else
  1614. throw MakeStringException(-1,"Could not create WU factory object");
  1615. }
  1616. else
  1617. throw MakeStringException(-1,"Missing WuId");
  1618. return success;
  1619. }
  1620. void CwssqlEx::refreshValidClusters()
  1621. {
  1622. validClusters.kill();
  1623. Owned<IStringIterator> it = getTargetClusters(NULL, NULL);
  1624. ForEach(*it)
  1625. {
  1626. SCMStringBuffer s;
  1627. IStringVal &val = it->str(s);
  1628. if (!validClusters.getValue(val.str()))
  1629. validClusters.setValue(val.str(), true);
  1630. }
  1631. }
  1632. bool CwssqlEx::isValidCluster(const char *cluster)
  1633. {
  1634. if (!cluster || !*cluster)
  1635. return false;
  1636. CriticalBlock block(crit);
  1637. if (validClusters.getValue(cluster))
  1638. return true;
  1639. if (validateTargetClusterName(cluster))
  1640. {
  1641. refreshValidClusters();
  1642. return true;
  1643. }
  1644. return false;
  1645. }
  1646. bool CwssqlEx::publishWorkunit(IEspContext &context, const char * queryname, const char * wuid, const char * targetcluster)
  1647. {
  1648. Owned<IWorkUnitFactory> factory = getWorkUnitFactory(context.querySecManager(), context.queryUser());
  1649. Owned<IConstWorkUnit> cw = factory->openWorkUnit(wuid, false);
  1650. if (!cw)
  1651. throw MakeStringException(ECLWATCH_CANNOT_OPEN_WORKUNIT,"Cannot find the workunit %s", wuid);
  1652. SCMStringBuffer queryName;
  1653. if (notEmpty(queryname))
  1654. queryName.set(queryname);
  1655. else
  1656. queryName.set(cw->queryJobName());
  1657. if (!queryName.length())
  1658. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Query/Job name not defined for publishing workunit %s", wuid);
  1659. SCMStringBuffer target;
  1660. if (notEmpty(targetcluster))
  1661. target.set(targetcluster);
  1662. else
  1663. target.set(cw->queryClusterName());
  1664. if (!target.length())
  1665. throw MakeStringException(ECLWATCH_MISSING_PARAMS, "Cluster name not defined for publishing workunit %s", wuid);
  1666. if (!isValidCluster(target.str()))
  1667. throw MakeStringException(ECLWATCH_INVALID_CLUSTER_NAME, "Invalid cluster name: %s", target.str());
  1668. //RODRIGO this is needed:
  1669. //copyQueryFilesToCluster(context, cw, "", target.str(), queryName.str(), false);
  1670. WorkunitUpdate wu(&cw->lock());
  1671. wu->setJobName(queryName.str());
  1672. StringBuffer queryId;
  1673. addQueryToQuerySet(wu, target.str(), queryName.str(), MAKE_ACTIVATE, queryId, context.queryUserId());
  1674. wu->commit();
  1675. wu.clear();
  1676. return true;
  1677. }