Rembed.cpp 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559
  1. /*##############################################################################
  2. HPCC SYSTEMS software Copyright (C) 2012 HPCC Systems.
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. ############################################################################## */
  13. #include "platform.h"
  14. #include "RInside.h"
  15. #include "jexcept.hpp"
  16. #include "jthread.hpp"
  17. #include "hqlplugins.hpp"
  18. #include "deftype.hpp"
  19. #include "eclrtl.hpp"
  20. #include "eclrtl_imp.hpp"
  21. #ifdef _WIN32
  22. #define EXPORT __declspec(dllexport)
  23. #else
  24. #define EXPORT
  25. #endif
  26. static const char * compatibleVersions[] =
  27. { "R Embed Helper 1.0.0", NULL };
  28. static const char *version = "R Embed Helper 1.0.0";
  29. extern "C" EXPORT bool getECLPluginDefinition(ECLPluginDefinitionBlock *pb)
  30. {
  31. if (pb->size == sizeof(ECLPluginDefinitionBlockEx))
  32. {
  33. ECLPluginDefinitionBlockEx * pbx = (ECLPluginDefinitionBlockEx *) pb;
  34. pbx->compatibleVersions = compatibleVersions;
  35. }
  36. else if (pb->size != sizeof(ECLPluginDefinitionBlock))
  37. return false;
  38. pb->magicVersion = PLUGIN_VERSION;
  39. pb->version = version;
  40. pb->moduleName = "+R+"; // Hack - we don't want to export any ECL, but if we don't export something,
  41. pb->ECL = ""; // Hack - the dll is unloaded at startup when compiling, and the R runtime closes stdin when unloaded
  42. pb->flags = PLUGIN_MULTIPLE_VERSIONS;
  43. pb->description = "R Embed Helper";
  44. return true;
  45. }
  46. #ifdef _WIN32
  47. EXTERN_C IMAGE_DOS_HEADER __ImageBase;
  48. #endif
  49. namespace Rembed
  50. {
  51. // Use a global object to ensure that the R instance is initialized only once
  52. // Because of R's dodgy stack checks, we also have to do so on main thread
  53. static class RGlobalState
  54. {
  55. public:
  56. RGlobalState()
  57. {
  58. const char *args[] = {"R", "--slave" };
  59. R = new RInside(2, args, true, false, true); // Setting interactive mode=true prevents R syntax errors from terminating the process
  60. // The R code for checking stack limits assumes that all calls are on the same thread
  61. // as the original context was created on - this will not always be true in ECL (and hardly
  62. // ever true in Roxie
  63. // Setting the stack limit to -1 disables this check
  64. R_CStackLimit = -1;
  65. // Make sure we are never unloaded (as R does not support it)
  66. // we do this by doing a dynamic load of the Rembed library
  67. #ifdef _WIN32
  68. char path[_MAX_PATH];
  69. ::GetModuleFileName((HINSTANCE)&__ImageBase, path, _MAX_PATH);
  70. if (strstr(path, "Rembed"))
  71. {
  72. HINSTANCE h = LoadSharedObject(path, false, false);
  73. DBGLOG("LoadSharedObject returned %p", h);
  74. }
  75. #else
  76. FILE *diskfp = fopen("/proc/self/maps", "r");
  77. if (diskfp)
  78. {
  79. char ln[_MAX_PATH];
  80. while (fgets(ln, sizeof(ln), diskfp))
  81. {
  82. if (strstr(ln, "libRembed"))
  83. {
  84. const char *fullName = strchr(ln, '/');
  85. if (fullName)
  86. {
  87. char *tail = (char *) strstr(fullName, SharedObjectExtension);
  88. if (tail)
  89. {
  90. tail[strlen(SharedObjectExtension)] = 0;
  91. HINSTANCE h = LoadSharedObject(fullName, false, false);
  92. break;
  93. }
  94. }
  95. }
  96. }
  97. fclose(diskfp);
  98. }
  99. #endif
  100. }
  101. ~RGlobalState()
  102. {
  103. delete R;
  104. }
  105. RInside *R;
  106. }* globalState = NULL;
  107. static CriticalSection RCrit; // R is single threaded - need to own this before making any call to R
  108. static RGlobalState *queryGlobalState()
  109. {
  110. CriticalBlock b(RCrit);
  111. if (!globalState)
  112. globalState = new RGlobalState;
  113. return globalState;
  114. }
  115. extern void unload()
  116. {
  117. CriticalBlock b(RCrit);
  118. if (globalState)
  119. delete globalState;
  120. globalState = NULL;
  121. }
  122. MODULE_INIT(INIT_PRIORITY_STANDARD)
  123. {
  124. queryGlobalState(); // make sure gets loaded by main thread
  125. return true;
  126. }
  127. MODULE_EXIT()
  128. {
  129. // Don't unload, because R seems to have problems with being reloaded, i.e. crashes on next use
  130. // unload();
  131. }
  132. // Each call to a R function will use a new REmbedFunctionContext object
  133. // This takes care of ensuring that the critsec is locked while we are executing R code,
  134. // and released when we are not
  135. class REmbedFunctionContext: public CInterfaceOf<IEmbedFunctionContext>
  136. {
  137. public:
  138. REmbedFunctionContext(RInside &_R, const char *options)
  139. : R(_R), block(RCrit), result(R_NilValue)
  140. {
  141. }
  142. ~REmbedFunctionContext()
  143. {
  144. }
  145. virtual bool getBooleanResult()
  146. {
  147. try
  148. {
  149. return ::Rcpp::as<bool>(result);
  150. }
  151. catch (std::runtime_error &E)
  152. {
  153. rtlFail(0, E.what());
  154. }
  155. }
  156. virtual void getDataResult(size32_t &__len, void * &__result)
  157. {
  158. try
  159. {
  160. std::vector<byte> vval = ::Rcpp::as<std::vector<byte> >(result);
  161. rtlStrToDataX(__len, __result, vval.size(), vval.data());
  162. }
  163. catch (std::runtime_error &E)
  164. {
  165. rtlFail(0, E.what());
  166. }
  167. }
  168. virtual double getRealResult()
  169. {
  170. try
  171. {
  172. return ::Rcpp::as<double>(result);
  173. }
  174. catch (std::runtime_error &E)
  175. {
  176. rtlFail(0, E.what());
  177. }
  178. }
  179. virtual __int64 getSignedResult()
  180. {
  181. try
  182. {
  183. return ::Rcpp::as<long int>(result); // Should really be long long, but RInside does not support that
  184. }
  185. catch (std::runtime_error &E)
  186. {
  187. rtlFail(0, E.what());
  188. }
  189. }
  190. virtual unsigned __int64 getUnsignedResult()
  191. {
  192. try
  193. {
  194. return ::Rcpp::as<unsigned long int>(result); // Should really be long long, but RInside does not support that
  195. }
  196. catch (std::runtime_error &E)
  197. {
  198. rtlFail(0, E.what());
  199. }
  200. }
  201. virtual void getStringResult(size32_t &__len, char * &__result)
  202. {
  203. try
  204. {
  205. std::string str = ::Rcpp::as<std::string>(result);
  206. rtlStrToStrX(__len, __result, str.length(), str.data());
  207. }
  208. catch (std::runtime_error &E)
  209. {
  210. rtlFail(0, E.what());
  211. }
  212. }
  213. virtual void getUTF8Result(size32_t &chars, char * &result)
  214. {
  215. throw MakeStringException(MSGAUD_user, 0, "Rembed: %s: Unicode/UTF8 results not supported", func.c_str());
  216. }
  217. virtual void getUnicodeResult(size32_t &chars, UChar * &result)
  218. {
  219. throw MakeStringException(MSGAUD_user, 0, "Rembed: %s: Unicode/UTF8 results not supported", func.c_str());
  220. }
  221. virtual void getSetResult(bool & __isAllResult, size32_t & __resultBytes, void * & __result, int _elemType, size32_t elemSize)
  222. {
  223. try
  224. {
  225. type_t elemType = (type_t) _elemType;
  226. __isAllResult = false;
  227. switch(elemType)
  228. {
  229. #define FETCH_ARRAY(type) \
  230. { \
  231. std::vector<type> vval = ::Rcpp::as< std::vector<type> >(result); \
  232. rtlStrToDataX(__resultBytes, __result, vval.size()*elemSize, (const void *) vval.data()); \
  233. }
  234. case type_boolean:
  235. {
  236. std::vector<bool> vval = ::Rcpp::as< std::vector<bool> >(result);
  237. size32_t size = vval.size();
  238. // Vector of bool is odd, and can't be retrieved via data()
  239. // Instead we need to iterate, I guess
  240. rtlDataAttr out(size);
  241. bool *outData = (bool *) out.getdata();
  242. for (std::vector<bool>::iterator iter = vval.begin(); iter < vval.end(); iter++)
  243. {
  244. *outData++ = *iter;
  245. }
  246. __resultBytes = size;
  247. __result = out.detachdata();
  248. break;
  249. }
  250. case type_int:
  251. /* if (elemSize == sizeof(signed char)) // rcpp does not seem to support...
  252. FETCH_ARRAY(signed char)
  253. else */ if (elemSize == sizeof(short))
  254. FETCH_ARRAY(short)
  255. else if (elemSize == sizeof(int))
  256. FETCH_ARRAY(int)
  257. else if (elemSize == sizeof(long)) // __int64 / long long does not work...
  258. FETCH_ARRAY(long)
  259. else
  260. rtlFail(0, "Rembed: Unsupported result type");
  261. break;
  262. case type_unsigned:
  263. if (elemSize == sizeof(byte))
  264. FETCH_ARRAY(byte)
  265. else if (elemSize == sizeof(unsigned short))
  266. FETCH_ARRAY(unsigned short)
  267. else if (elemSize == sizeof(unsigned int))
  268. FETCH_ARRAY(unsigned int)
  269. else if (elemSize == sizeof(unsigned long)) // __int64 / long long does not work...
  270. FETCH_ARRAY(unsigned long)
  271. else
  272. rtlFail(0, "Rembed: Unsupported result type");
  273. break;
  274. case type_real:
  275. if (elemSize == sizeof(float))
  276. FETCH_ARRAY(float)
  277. else if (elemSize == sizeof(double))
  278. FETCH_ARRAY(double)
  279. else
  280. rtlFail(0, "Rembed: Unsupported result type");
  281. break;
  282. case type_string:
  283. case type_varstring:
  284. {
  285. std::vector<std::string> vval = ::Rcpp::as< std::vector<std::string> >(result);
  286. size32_t numResults = vval.size();
  287. rtlRowBuilder out;
  288. byte *outData = NULL;
  289. size32_t outBytes = 0;
  290. if (elemSize != UNKNOWN_LENGTH)
  291. {
  292. outBytes = numResults * elemSize; // MORE - check for overflow?
  293. out.ensureAvailable(outBytes);
  294. outData = out.getbytes();
  295. }
  296. for (std::vector<std::string>::iterator iter = vval.begin(); iter < vval.end(); iter++)
  297. {
  298. size32_t lenBytes = (*iter).size();
  299. const char *text = (*iter).data();
  300. if (elemType == type_string)
  301. {
  302. if (elemSize == UNKNOWN_LENGTH)
  303. {
  304. out.ensureAvailable(outBytes + lenBytes + sizeof(size32_t));
  305. outData = out.getbytes() + outBytes;
  306. * (size32_t *) outData = lenBytes;
  307. rtlStrToStr(lenBytes, outData+sizeof(size32_t), lenBytes, text);
  308. outBytes += lenBytes + sizeof(size32_t);
  309. }
  310. else
  311. {
  312. rtlStrToStr(elemSize, outData, lenBytes, text);
  313. outData += elemSize;
  314. }
  315. }
  316. else
  317. {
  318. if (elemSize == UNKNOWN_LENGTH)
  319. {
  320. out.ensureAvailable(outBytes + lenBytes + 1);
  321. outData = out.getbytes() + outBytes;
  322. rtlStrToVStr(0, outData, lenBytes, text);
  323. outBytes += lenBytes + 1;
  324. }
  325. else
  326. {
  327. rtlStrToVStr(elemSize, outData, lenBytes, text); // Fixed size null terminated strings... weird.
  328. outData += elemSize;
  329. }
  330. }
  331. }
  332. __resultBytes = outBytes;
  333. __result = out.detachdata();
  334. break;
  335. }
  336. default:
  337. rtlFail(0, "REmbed: Unsupported result type");
  338. break;
  339. }
  340. }
  341. catch (std::runtime_error &E)
  342. {
  343. rtlFail(0, E.what());
  344. }
  345. }
  346. virtual IRowStream *getDatasetResult(IEngineRowAllocator * _resultAllocator)
  347. {
  348. UNIMPLEMENTED;
  349. }
  350. virtual byte * getRowResult(IEngineRowAllocator * _resultAllocator)
  351. {
  352. UNIMPLEMENTED;
  353. }
  354. virtual size32_t getTransformResult(ARowBuilder & builder)
  355. {
  356. UNIMPLEMENTED;
  357. }
  358. virtual void bindBooleanParam(const char *name, bool val)
  359. {
  360. R[name] = val;
  361. }
  362. virtual void bindDataParam(const char *name, size32_t len, const void *val)
  363. {
  364. std::vector<byte> vval;
  365. const byte *cval = (const byte *) val;
  366. vval.assign(cval, cval+len);
  367. R[name] = vval;
  368. }
  369. virtual void bindRealParam(const char *name, double val)
  370. {
  371. R[name] = val;
  372. }
  373. virtual void bindSignedParam(const char *name, __int64 val)
  374. {
  375. R[name] = (long int) val;
  376. }
  377. virtual void bindUnsignedParam(const char *name, unsigned __int64 val)
  378. {
  379. R[name] = (unsigned long int) val;
  380. }
  381. virtual void bindStringParam(const char *name, size32_t len, const char *val)
  382. {
  383. std::string s(val, len);
  384. R[name] = s;
  385. }
  386. virtual void bindVStringParam(const char *name, const char *val)
  387. {
  388. R[name] = val;
  389. }
  390. virtual void bindUTF8Param(const char *name, size32_t chars, const char *val)
  391. {
  392. rtlFail(0, "Rembed: Unsupported parameter type UTF8");
  393. }
  394. virtual void bindUnicodeParam(const char *name, size32_t chars, const UChar *val)
  395. {
  396. rtlFail(0, "Rembed: Unsupported parameter type UNICODE");
  397. }
  398. virtual void bindSetParam(const char *name, int _elemType, size32_t elemSize, bool isAll, size32_t totalBytes, void *setData)
  399. {
  400. if (isAll)
  401. rtlFail(0, "Rembed: Unsupported parameter type ALL");
  402. type_t elemType = (type_t) _elemType;
  403. int numElems = totalBytes / elemSize;
  404. switch(elemType)
  405. {
  406. #define BIND_ARRAY(type) \
  407. { \
  408. std::vector<type> vval; \
  409. const type *start = (const type *) setData; \
  410. vval.assign(start, start+numElems); \
  411. R[name] = vval; \
  412. }
  413. case type_boolean:
  414. BIND_ARRAY(bool)
  415. break;
  416. case type_int:
  417. /* if (elemSize == sizeof(signed char)) // No binding exists in rcpp
  418. BIND_ARRAY(signed char)
  419. else */ if (elemSize == sizeof(short))
  420. BIND_ARRAY(short)
  421. else if (elemSize == sizeof(int))
  422. BIND_ARRAY(int)
  423. else if (elemSize == sizeof(long)) // __int64 / long long does not work...
  424. BIND_ARRAY(long)
  425. else
  426. rtlFail(0, "Rembed: Unsupported parameter type");
  427. break;
  428. case type_unsigned:
  429. if (elemSize == sizeof(unsigned char))
  430. BIND_ARRAY(unsigned char)
  431. else if (elemSize == sizeof(unsigned short))
  432. BIND_ARRAY(unsigned short)
  433. else if (elemSize == sizeof(unsigned int))
  434. BIND_ARRAY(unsigned int)
  435. else if (elemSize == sizeof(unsigned long)) // __int64 / long long does not work...
  436. BIND_ARRAY(unsigned long)
  437. else
  438. rtlFail(0, "Rembed: Unsupported parameter type");
  439. break;
  440. case type_real:
  441. if (elemSize == sizeof(float))
  442. BIND_ARRAY(float)
  443. else if (elemSize == sizeof(double))
  444. BIND_ARRAY(double)
  445. else
  446. rtlFail(0, "Rembed: Unsupported parameter type");
  447. break;
  448. case type_string:
  449. case type_varstring:
  450. {
  451. std::vector<std::string> vval;
  452. const byte *inData = (const byte *) setData;
  453. const byte *endData = inData + totalBytes;
  454. while (inData < endData)
  455. {
  456. int thisSize;
  457. if (elemSize == UNKNOWN_LENGTH)
  458. {
  459. if (elemType==type_varstring)
  460. thisSize = strlen((const char *) inData) + 1;
  461. else
  462. {
  463. thisSize = * (size32_t *) inData;
  464. inData += sizeof(size32_t);
  465. }
  466. }
  467. else
  468. thisSize = elemSize;
  469. std::string s((const char *) inData, thisSize);
  470. vval.push_back(s);
  471. inData += thisSize;
  472. numElems++;
  473. }
  474. R[name] = vval;
  475. break;
  476. }
  477. default:
  478. rtlFail(0, "REmbed: Unsupported parameter type");
  479. break;
  480. }
  481. }
  482. virtual void importFunction(size32_t lenChars, const char *utf)
  483. {
  484. throwUnexpected();
  485. }
  486. virtual void compileEmbeddedScript(size32_t lenChars, const char *utf)
  487. {
  488. StringBuffer text(rtlUtf8Size(lenChars, utf), utf);
  489. text.stripChar('\r');
  490. func.assign(text.str());
  491. }
  492. virtual void callFunction()
  493. {
  494. try
  495. {
  496. result = R.parseEval(func);
  497. }
  498. catch (std::runtime_error &E)
  499. {
  500. rtlFail(0, E.what());
  501. }
  502. }
  503. private:
  504. RInside &R;
  505. RInside::Proxy result;
  506. std::string func;
  507. CriticalBlock block;
  508. };
  509. class REmbedContext: public CInterfaceOf<IEmbedContext>
  510. {
  511. public:
  512. virtual IEmbedFunctionContext *createFunctionContext(bool isImport, const char *options)
  513. {
  514. return new REmbedFunctionContext(*queryGlobalState()->R, options);
  515. }
  516. };
  517. extern IEmbedContext* getEmbedContext()
  518. {
  519. return new REmbedContext;
  520. }
  521. extern bool syntaxCheck(const char *script)
  522. {
  523. return true; // MORE
  524. }
  525. } // namespace