db.test.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. #!/usr/bin/env python
  2. ############################################################################
  3. #
  4. # MODULE: db.test
  5. # AUTHOR(S): Radim Blazek
  6. # Converted to Python by Glynn Clements
  7. # PURPOSE: Test database driver
  8. # COPYRIGHT: (C) 2004-2014 by the GRASS Development Team
  9. #
  10. # This program is free software under the GNU General Public
  11. # License (version 2). Read the file COPYING that comes with GRASS
  12. # for details.
  13. #
  14. #############################################################################
  15. #%module
  16. #% description: Test database driver, database must exist and set by db.connect.
  17. #% keyword: database
  18. #% keyword: attribute table
  19. #%end
  20. #%option
  21. #% key: test
  22. #% type: string
  23. #% description: Test name
  24. #% required: yes
  25. #% options: test1
  26. #%end
  27. import sys
  28. import os
  29. from grass.script import core as gcore
  30. from grass.script import db as grassdb
  31. from grass.exceptions import CalledModuleError
  32. def main():
  33. test_file = options['test']
  34. expected = gcore.tempfile()
  35. result = gcore.tempfile()
  36. dbconn = grassdb.db_connection()
  37. gcore.message(_("Using DB driver: %s") % dbconn['driver'])
  38. infile = os.path.join(os.environ['GISBASE'], 'etc', 'db.test', test_file)
  39. inf = open(infile)
  40. while True:
  41. type = inf.readline()
  42. if not type:
  43. break
  44. type = type.rstrip('\r\n')
  45. sql = inf.readline().rstrip('\r\n')
  46. sys.stdout.write(sql + '\n')
  47. # Copy expected result to temp file
  48. try:
  49. if type == 'X':
  50. gcore.write_command('db.execute', input='-', stdin=sql + '\n')
  51. else:
  52. resf = open(result, 'w')
  53. gcore.write_command('db.select', input='-', flags='c',
  54. stdin=sql + '\n', stdout=resf)
  55. resf.close()
  56. except CalledModuleError:
  57. gcore.error("EXECUTE: ******** ERROR ********")
  58. else:
  59. gcore.message(_("EXECUTE: OK"))
  60. expf = open(expected, 'w')
  61. while True:
  62. res = inf.readline().rstrip('\r\n')
  63. if not res:
  64. break
  65. expf.write(res + '\n')
  66. expf.close()
  67. if type == 'S':
  68. if gcore.call(['diff', result, expected]) != 0:
  69. gcore.error("RESULT: ******** ERROR ********")
  70. else:
  71. gcore.message(_("RESULT: OK"))
  72. if __name__ == "__main__":
  73. options, flags = gcore.parser()
  74. main()