rfspostgres.cpp 18 KB


  1. // POSTGRES RFS Gateway
  2. #ifdef _WIN32
  3. #define WIN32_LEAN_AND_MEAN
  4. #ifndef _CRT_SECURE_CPP_OVERLOAD_STANDARD_NAMES
  5. #define _CRT_SECURE_CPP_OVERLOAD_STANDARD_NAMES 1
  6. #undef _CRT_SECURE_NO_WARNINGS
  7. #define _CRT_SECURE_NO_WARNINGS 1
  8. #endif
  9. #undef UNICODE
  10. #include <windows.h>
  11. #include <io.h>
  12. #include <sys/utime.h>
  13. #include <sys/types.h>
  14. #include <ctype.h>
  15. #else
  16. #include <sys/types.h>
  17. #include <unistd.h>
  18. #define _strdup strdup
  19. #define _O_RDONLY O_RDONLY
  20. #define _O_WRONLY O_WRONLY
  21. #define _O_RDWR O_RDWR
  22. #define _O_CREAT O_CREAT
  23. #define _O_TRUNC O_TRUNC
  24. #define _O_BINARY (0)
  25. #define _open ::open
  26. #define _read ::read
  27. #define _write ::write
  28. #define _lseek ::lseek
  29. #define _close ::close
  30. #define _unlink unlink
  31. #define _tempnam tempnam
  32. #include <ctype.h>
  33. static int _memicmp (const void *s1, const void *s2, size_t len)
  34. {
  35. const unsigned char *b1 = (const unsigned char *)s1;
  36. const unsigned char *b2 = (const unsigned char *)s2;
  37. int ret = 0;
  38. while (len&&((ret = tolower(*b1)-tolower(*b2)) == 0)) {
  39. b1++;
  40. b2++;
  41. len--;
  42. }
  43. return ret;
  44. }
  45. static char *_itoa(unsigned long n, char *str, int b)
  46. {
  47. char *s = str;
  48. bool sign = false;
  49. if (n<0) {
  50. n = -n;
  51. sign = true;
  52. }
  53. do {
  54. char d = n % b;
  55. *(s++) = d+((d<10)?'0':('a'-10));
  56. }
  57. while ((n /= b) > 0);
  58. if (sign)
  59. *(s++) = '-';
  60. *s = '\0';
  61. // reverse
  62. char *s2 = str;
  63. s--;
  64. while (s2<s) {
  65. char tc = *s2;
  66. *(s2++) = *s;
  67. *(s--) = tc;
  68. }
  69. return str;
  70. }
  71. #endif
  72. #if defined(_M_X64) || defined ( __x86_64) || \
  73. defined(__aarch64__) || __WORDSIZE==64
  74. #define __64BIT__
  75. typedef unsigned long memsize_t;
  76. #else
  77. typedef unsigned memsize_t;
  78. #endif
  79. #include <stdlib.h>
  80. #include <stdio.h>
  81. #include <stdarg.h>
  82. #include <string.h>
  83. #include <errno.h>
  84. #include <assert.h>
  85. #include <time.h>
  86. #include <fcntl.h>
  87. #include <sys/stat.h>
  88. typedef unsigned int u_int;
  89. #ifdef SOCKET
  90. #undef SOCKET
  91. #endif
  92. typedef u_int SOCKET;
  93. #include <libpq-fe.h>
  94. #include "rfs.h"
  95. static char *tempdir=NULL;
  96. static int createTempFile(RFS_ServerBase &base,char *&name)
  97. {
  98. free(name);
  99. #ifdef _WIN32
  100. name = _tempnam(tempdir,"rfspgtmp");
  101. int ret = _open(name, _O_RDWR | _O_CREAT | _O_BINARY, _S_IREAD|_S_IWRITE);
  102. #else
  103. size_t ds = tempdir?strlen(tempdir):0;
  104. name = (char *)malloc(ds+32);
  105. if (ds) {
  106. memcpy(name,tempdir,ds);
  107. if (tempdir[ds-1]!='/')
  108. tempdir[ds++] = '/';
  109. }
  110. else
  111. *name = 0;
  112. strcat(name,"rfspg_XXXXXX");
  113. int ret = mkstemp(name);
  114. #endif
  115. if (ret==-1) {
  116. free(name);
  117. name = NULL;
  118. base.throwError(errno,"Creating temp file");
  119. }
  120. return ret;
  121. }
  122. #define RFSPOSTGRESERR_BASE 8300
  123. static void PostgresError(RFS_ServerBase &base,PGconn *pgconn,int errcode)
  124. {
  125. static char errstr[8192];
  126. char errnum[16];
  127. strcpy(errstr,"Postgres Error: ");
  128. const char *err = PQerrorMessage(pgconn);
  129. if (!err||!*err) {
  130. if (errcode) {
  131. _itoa(errcode,errnum,10);
  132. err = errnum;
  133. }
  134. else
  135. err = "Unspecified";
  136. }
  137. strcat(errstr,err);
  138. base.throwError(RFSPOSTGRESERR_BASE,errstr,false);
  139. }
  140. class CPostgresRFSconn: public RFS_ConnectionBase
  141. {
  142. RFS_ServerBase &base;
  143. char * query;
  144. byte querymode;
  145. bool eos;
  146. PGconn *pgconn;
  147. PGresult *res;
  148. rfs_fpos_t lastpos;
  149. unsigned long *lengths;
  150. unsigned numfields;
  151. unsigned numrows;
  152. unsigned currow;
  153. RFS_CSVwriter csvwriter;
  154. RFS_CSVreader csvreader;
  155. bool needlocalinput;
  156. bool multistatement;
  157. int tempfile;
  158. char * tempfilename;
  159. bool readok;
  160. bool writeok;
  161. rfs_fpos_t savesize;
  162. int localerr;
  163. void postgresError(int err)
  164. {
  165. ::PostgresError(base,pgconn,err);
  166. free(query);
  167. query = NULL;
  168. }
  169. void transformQuery(const char *in, bool forwrite)
  170. {
  171. if (*in=='/') // added by client
  172. in++;
  173. if (*in=='>') // added by client
  174. in++;
  175. free(query);
  176. query = NULL;
  177. needlocalinput = false;
  178. multistatement = false;
  179. bool istable = true;
  180. if (!in)
  181. return;
  182. query = NULL;
  183. RFS_SimpleString out;
  184. // NB only the first INFILE is transformed (deliberately)
  185. bool quoting = false;
  186. while (isspace((unsigned char)*in))
  187. in++;
  188. while (*in) {
  189. char c = *(in++);
  190. if (c=='\\') {
  191. out.appendc(c);
  192. c = *(in++);
  193. }
  194. else if (c=='\'') {
  195. quoting = !quoting;
  196. }
  197. else if (!quoting) {
  198. if (c==';') {
  199. unsigned i = 1;
  200. while (isspace((unsigned char)in[i])||(in[i]==';'))
  201. i++;
  202. if (!in[i])
  203. break; // e.g. trailing ;
  204. multistatement = true;
  205. }
  206. if (c==' ') {
  207. if (!needlocalinput&&(_memicmp(in,"INFILE []",9)==0)) {
  208. out.appends(" INFILE '#'");
  209. in+=9;
  210. needlocalinput = true;
  211. continue;
  212. }
  213. }
  214. }
  215. if (istable&&!isalnum((unsigned char)c)&&(c!='_')&&(c!='$'))
  216. istable = false;
  217. out.appendc(c);
  218. }
  219. out.trim();
  220. if (istable) {
  221. char *tablename = out.dup();
  222. out.clear();
  223. if (forwrite) {
  224. out.appends("COPY ");
  225. out.appends(tablename);
  226. out.appends(" FROM STDIN WITH CSV QUOTE AS '\''");
  227. }
  228. else {
  229. out.appends("SELECT * FROM ");
  230. out.appends(tablename);
  231. }
  232. }
  233. if (multistatement&&needlocalinput)
  234. base.throwError(EACCES,"Multi-statement not allowed here");
  235. query = out.dup();
  236. }
  237. bool getresult()
  238. {
  239. if (!query)
  240. return false;
  241. if (res)
  242. return (numrows!=0);
  243. if (base.debugLevel())
  244. base.log("Query: '%s'",query);
  245. numfields = 0;
  246. numrows = 0;
  247. currow = 0;
  248. res = PQexec(pgconn, query); // NB returns all results (i.e. only the last statement can return non-empty)
  249. ExecStatusType status = PQresultStatus(res);
  250. if (status == PGRES_TUPLES_OK) {
  251. if (res) {
  252. numfields = PQnfields(res);
  253. if (numfields)
  254. numrows = PQntuples(res);
  255. }
  256. status = PGRES_COMMAND_OK;
  257. }
  258. if (res&&!numrows) {
  259. PQclear(res);
  260. res = NULL;
  261. }
  262. if (status != PGRES_COMMAND_OK)
  263. postgresError((int)status);
  264. return numrows!=0;
  265. }
  266. void writeQuery()
  267. {
  268. if (!query||!savesize||(tempfile==-1))
  269. return;
  270. _close(tempfile);
  271. freopen(tempfilename, "rb", stdin);
  272. if (base.debugLevel())
  273. base.log("Query(w): '%s'",query);
  274. res = PQexec(pgconn, query); // NB returns all results (i.e. only the last statement can return non-empty)
  275. ExecStatusType status = PQresultStatus(res);
  276. if (status == PGRES_TUPLES_OK)
  277. status = PGRES_COMMAND_OK; // no return
  278. PQclear(res);
  279. res = NULL;
  280. fclose(stdin); // should reopen?
  281. if (status != PGRES_COMMAND_OK)
  282. postgresError((int)status);
  283. }
  284. public:
  285. CPostgresRFSconn(RFS_ServerBase &_base,PGconn *_pgconn)
  286. : base(_base)
  287. {
  288. pgconn = _pgconn;
  289. res = NULL;
  290. lastpos = 0;
  291. numfields = 0;
  292. numrows = 0;
  293. currow = 0;
  294. query = NULL;
  295. readok = false;
  296. writeok = false;
  297. needlocalinput = false;
  298. multistatement = false;
  299. eos = false;
  300. tempfilename = NULL;
  301. tempfile = -1;
  302. savesize = (rfs_fpos_t)-1;
  303. }
  304. ~CPostgresRFSconn()
  305. {
  306. close(true);
  307. }
  308. bool openRead(const char *_query)
  309. {
  310. transformQuery(_query,false);
  311. close(true);
  312. eos = false;
  313. readok = true;
  314. writeok = false;
  315. savesize = (rfs_fpos_t)-1;
  316. return getresult();
  317. }
  318. bool openWrite(const char *_query)
  319. {
  320. transformQuery(_query,true);
  321. close(true);
  322. eos = false;
  323. readok = false;
  324. writeok = true;
  325. savesize = (rfs_fpos_t)-1;
  326. return true;
  327. }
  328. void read(rfs_fpos_t pos, size_t len, size_t &outlen, void *out)
  329. {
  330. if (!readok)
  331. base.throwError(errno,"invalid mode for read");
  332. outlen = 0;
  333. if (pos==0) {
  334. currow = 0;
  335. csvwriter.rewrite();
  336. eos = false;
  337. lastpos = 0;
  338. }
  339. if (pos!=lastpos)
  340. base.throwError(EACCES,"Out of order read");
  341. if (eos||(csvwriter.length()>=len)) {
  342. if (len>csvwriter.length())
  343. len = csvwriter.length();
  344. if (out)
  345. memcpy(out,csvwriter.base(),len);
  346. csvwriter.consume(len);
  347. outlen = len;
  348. lastpos += len;
  349. return;
  350. }
  351. while (currow<numrows) {
  352. for (unsigned f=0;f<numfields;f++) {
  353. char * val = PQgetvalue(res, currow, f);
  354. if (val)
  355. csvwriter.putField(strlen(val),val);
  356. }
  357. currow++;
  358. csvwriter.putRow();
  359. if (csvwriter.length()>=len) {
  360. if (out)
  361. memcpy(out,csvwriter.base(),len);
  362. csvwriter.consume(len);
  363. outlen = len;
  364. lastpos += len;
  365. return;
  366. }
  367. }
  368. eos = true;
  369. outlen = csvwriter.length();
  370. if (outlen>len)
  371. outlen = len;
  372. if (out)
  373. memcpy(out,csvwriter.base(),outlen);
  374. csvwriter.consume(outlen);
  375. lastpos += outlen;
  376. savesize = lastpos+csvwriter.length();
  377. }
  378. rfs_fpos_t size()
  379. {
  380. if (savesize!=(rfs_fpos_t)-1)
  381. return savesize;
  382. if (lastpos!=0)
  383. base.throwError(EACCES,"Getting size mid-read");
  384. // multi (and not prev saved) then save to a temporary file
  385. // bit of a shame but ...
  386. rfs_fpos_t pos = 0;
  387. while (1) {
  388. size_t rd = 0;
  389. read(pos,0x10000000,rd,NULL);
  390. if (!rd)
  391. break;
  392. pos+=rd;
  393. if (rd<0x10000000)
  394. break;
  395. }
  396. currow = 0;
  397. eos = false;
  398. // we could reopen tempfile for read here
  399. lastpos = 0;
  400. savesize = pos;
  401. return pos;
  402. }
  403. void close(bool closetmp)
  404. {
  405. if(res) {
  406. PQclear(res);
  407. res = NULL;
  408. numfields = 0;
  409. }
  410. if (writeok)
  411. writeQuery();
  412. if (closetmp) {
  413. if (tempfile!=-1)
  414. _close(tempfile);
  415. if (tempfilename)
  416. _unlink(tempfilename);
  417. free(tempfilename);
  418. tempfilename = NULL;
  419. tempfile = -1;
  420. }
  421. }
  422. void close()
  423. {
  424. close(true);
  425. }
  426. void write(rfs_fpos_t pos, size_t len, const void *in)
  427. {
  428. if (!writeok)
  429. base.throwError(EACCES,"invalid mode for write");
  430. if (tempfile==-1)
  431. tempfile = createTempFile(base,tempfilename);
  432. long ret = (long)_lseek(tempfile,(long)pos,SEEK_SET);
  433. if (ret!=pos)
  434. base.throwError(errno,"write.1");
  435. int wr = _write(tempfile,in,len);
  436. if (wr==-1)
  437. base.throwError(errno,"write.2");
  438. if (wr!=len) // disk full
  439. base.throwError(ENOSPC,"write.3");
  440. if (pos+wr>savesize)
  441. savesize = pos+wr;
  442. }
  443. };
  444. class CPostgresRFS: public RFS_ServerBase
  445. {
  446. PGconn *pgconn;
  447. unsigned lastping;
  448. char *conninfo;
  449. public:
  450. CPostgresRFS()
  451. {
  452. tempdir = NULL;
  453. lastping = (unsigned)time(NULL);
  454. pgconn = NULL;
  455. conninfo = NULL;
  456. }
  457. ~CPostgresRFS()
  458. {
  459. if (pgconn)
  460. PQfinish(pgconn);
  461. free(tempdir);
  462. free(conninfo);
  463. }
  464. virtual RFS_ConnectionBase * open(const char *name, byte mode, byte share)
  465. {
  466. if (lastping-(unsigned)time(NULL)>60*30) { // ping every 30m
  467. // ping TBD
  468. lastping = (unsigned)time(NULL);
  469. }
  470. CPostgresRFSconn *conn = new CPostgresRFSconn(*this,pgconn);
  471. if ((*name=='/')||(*name=='\\'))
  472. *name++;
  473. if ((mode&RFS_OPEN_MODE_MASK)==RFS_OPEN_MODE_READ) {
  474. if (conn->openRead(name))
  475. return conn;
  476. }
  477. else if ((mode&RFS_OPEN_MODE_MASK)==RFS_OPEN_MODE_CREATE) {
  478. if (conn->openWrite(name))
  479. return conn;
  480. }
  481. else
  482. throwError(EACCES,"Open mode not supported");
  483. // error TBD?
  484. delete conn;
  485. return NULL;
  486. }
  487. virtual void existFile(const char *filename, bool &existsout)
  488. {
  489. existsout = true; // assume exists (query may fail but that will be an error)
  490. }
  491. virtual void removeFile(const char *filename)
  492. {
  493. // error TBD
  494. }
  495. virtual void renameFile(const char *fromname,const char *toname)
  496. {
  497. // error TBD
  498. }
  499. virtual void getFileTime(const char *filename, time_t &outaccessedtime, time_t &outcreatedtime, time_t &outmodifiedtime)
  500. {
  501. time(&outaccessedtime);
  502. outcreatedtime = outaccessedtime;
  503. outmodifiedtime = outaccessedtime;
  504. // bit odd that changes...
  505. // Alternative would be to keep a past query cache (i.e. when done) but that isn't really ideal.
  506. }
  507. virtual void setFileTime(const char *filename, time_t *outaccessedtime, time_t *outcreatedtime, time_t *outmodifiedtime) // params NULL if not to be set
  508. {
  509. // ignore
  510. }
  511. virtual void isFile(const char *filename, bool &outisfile)
  512. {
  513. outisfile = true; // pretend we are a file
  514. }
  515. virtual void isDir(const char *filename, bool &outisdir)
  516. {
  517. outisdir = true; // we aren't a directory
  518. }
  519. virtual void isReadOnly(const char *filename, bool &outisreadonly)
  520. {
  521. outisreadonly = true; // no update supported currently
  522. }
  523. virtual void setReadOnly(const char *filename, bool readonly)
  524. {
  525. // ignore
  526. }
  527. virtual void createDir(const char *dirname,bool &createdout)
  528. {
  529. // ignore
  530. }
  531. virtual void openDir(const char *dirname, const char *mask, bool recursive, bool includedir, void * &outhandle)
  532. {
  533. // TBD table list
  534. outhandle = NULL;
  535. }
  536. virtual void nextDirEntry(void * handle, size_t maxfilenamesize, char *outfilename, bool &isdir, rfs_fpos_t &filesizeout, time_t &outmodifiedtime) // empty return for filename marks end
  537. {
  538. // TBD return table
  539. outfilename[0] = 0;
  540. }
  541. virtual void closeDir(void * handle)
  542. {
  543. // TBD
  544. }
  545. virtual void getVersion(size_t programnamemax, char *programname, short &version)
  546. {
  547. assert(programnamemax>sizeof("rfspg")+1);
  548. strcpy(programname,"rfspg");
  549. version = 1;
  550. }
  551. void param(RFS_SimpleString &out,const char *name,const char *val)
  552. {
  553. if (val&&*val) {
  554. out.appends(name);
  555. out.appends(" = '");
  556. out.appends(val);
  557. out.appends("' ");
  558. }
  559. }
  560. int run(const char *server,const char *user,const char *password,const char *db, const char *pgport, const char *pgopt, char *_tempdir)
  561. {
  562. if (_tempdir[0])
  563. tempdir = _strdup(_tempdir);
  564. RFS_SimpleString out;
  565. param(out,"hostaddr",server);
  566. param(out,"port",pgport);
  567. param(out,"dbname",db);
  568. param(out,"user",user);
  569. param(out,"password",password);
  570. out.appends(pgopt);
  571. out.trim();
  572. conninfo = out.dup();
  573. out.clear();
  574. pgconn = PQconnectdb(conninfo);
  575. if (PQstatus(pgconn) != CONNECTION_OK) {
  576. fprintf(stderr, "Connection to database failed: %s", PQerrorMessage(pgconn));
  577. return 1;
  578. }
  579. return RFS_ServerBase::run();
  580. }
  581. };
  582. void usage()
  583. {
  584. printf("rfspg --port=<port>\n");
  585. printf(" --pgserver=<postgresep> --pgport=<port>\n");
  586. printf(" --user=<username> --password=<password>\n");
  587. printf(" --db=<database>\n");
  588. printf(" --pgopt=<extra-params> -- for specifying other server options\n");
  589. printf(" --tempdir=<dirname> -- directory for temporary files\n");
  590. }
  591. bool checkparam(const char *param,const char *name,char *out,size_t size)
  592. {
  593. if ((param[0]!='-')||(param[0]!='-'))
  594. return false;
  595. param+=2;
  596. if (strncmp(param,name,strlen(name))==0) {
  597. const char *v = param+strlen(name);
  598. if (*v=='=') {
  599. if (strlen(v+1)>size-1) {
  600. fprintf(stderr,"parameter %s to large (> %d chars)",param,size-1);
  601. exit(1);
  602. }
  603. strcpy(out,v+1);
  604. return true;
  605. }
  606. }
  607. return false;
  608. }
  609. int main(int argc, const char **argv)
  610. {
  611. #ifdef _WIN32
  612. // for windows service must be static (main returns)
  613. static
  614. #endif
  615. CPostgresRFS rfsserver;
  616. if (!rfsserver.init(argc,argv)) {
  617. usage();
  618. return 1;
  619. }
  620. char server[256];
  621. char user[256];
  622. char password[256];
  623. char db[256];
  624. char pgport[32];
  625. char pgopt[1024];
  626. char tempdir[256];
  627. strcpy(server,"localhost");
  628. user[0] = 0;
  629. password[0] = 0;
  630. pgport[0] = 0;
  631. pgopt[0] = 0;
  632. db[0] = 0;
  633. tempdir[0] = 0;
  634. for (int i=1;i<argc;i++) {
  635. if (checkparam(argv[i],"pgserver",server,sizeof(server))) continue;
  636. if (checkparam(argv[i],"user",user,sizeof(user))) continue;
  637. if (checkparam(argv[i],"password",password,sizeof(password))) continue;
  638. if (checkparam(argv[i],"db",db,sizeof(db))) continue;
  639. if (checkparam(argv[i],"pgport",pgport,sizeof(pgport))) continue;
  640. if (checkparam(argv[i],"pgopt",pgport,sizeof(pgopt))) continue;
  641. if (checkparam(argv[i],"tempdir",tempdir,sizeof(tempdir))) continue;
  642. }
  643. if (!db) {
  644. usage();
  645. return 0;
  646. }
  647. return rfsserver.run(server,user,password,db,pgport,pgopt,tempdir);
  648. }