test_table.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226
  1. """
  2. Created on Wed Jun 25 11:08:22 2014
  3. @author: pietro
  4. """
  5. import os
  6. import sqlite3
  7. import sys
  8. import tempfile as tmp
  9. from string import ascii_letters, digits
  10. from random import choice
  11. import numpy as np
  12. from grass.gunittest.case import TestCase
  13. from grass.gunittest.main import test
  14. from grass.pygrass.vector.table import Table, get_path
  15. if sys.version_info.major >= 3:
  16. long = int
  17. # dictionary that generate random data
  18. COL2VALS = {
  19. "INT": lambda n: np.random.randint(9, size=n),
  20. "INTEGER": lambda n: np.random.randint(9, size=n),
  21. "INTEGER PRIMARY KEY": lambda n: np.arange(1, n + 1, dtype=long),
  22. "REAL": lambda n: np.random.rand(n),
  23. "TEXT": lambda n: np.array([randstr() for _ in range(n)]),
  24. }
  25. def randstr(prefix="", suffix="", size=6, chars=ascii_letters + digits):
  26. """Return a random string of characters.
  27. :param prefix: string prefix, default: ''
  28. :type prefix: str
  29. :param suffix: string suffix, default: ''
  30. :type suffix: str
  31. :param size: number of random characters
  32. :type size: int
  33. :param chars: string containing the characters that will be used
  34. :type chars: str
  35. :returns: string
  36. """
  37. return prefix + "".join(choice(chars) for _ in range(size)) + suffix
  38. def get_table_random_values(nrows, columns):
  39. """Generate a random recarray respecting the columns definition.
  40. :param nrows: number of rows of the generated array
  41. :type nrows: int
  42. :param columns: list of tuple containing column name and type.
  43. :type columns: list of tuple
  44. :returns: numpy recarray
  45. """
  46. vals, dtype = [], []
  47. for cname, ctype in columns:
  48. if ctype not in COL2VALS:
  49. raise TypeError("Unknown column type %s for: %s" % (ctype, cname))
  50. vals.append(COL2VALS[ctype](nrows))
  51. dtype.append((cname, vals[-1].dtype.str))
  52. return np.array([v for v in zip(*vals)], dtype=dtype)
  53. class DBconnection(object):
  54. """Define a class to share common methods between TestCase."""
  55. path = os.path.join(tmp.gettempdir(), randstr(prefix="temp", suffix=".db"))
  56. connection = sqlite3.connect(get_path(path))
  57. for t in (
  58. np.int8,
  59. np.int16,
  60. np.int32,
  61. np.int64,
  62. np.uint8,
  63. np.uint16,
  64. np.uint32,
  65. np.uint64,
  66. ):
  67. sqlite3.register_adapter(t, int)
  68. columns = [
  69. ("cat", "INTEGER PRIMARY KEY"),
  70. ("cint", "INT"),
  71. ("creal", "REAL"),
  72. ("ctxt", "TEXT"),
  73. ]
  74. def create_table_instance(self, **kw):
  75. """Return a Table class instance
  76. :param **kw: keyword arguments of Table class
  77. without name and connection.
  78. :type **kw: key-word arguments
  79. :returns: Table instance
  80. """
  81. self.tname = randstr(prefix="temp")
  82. return Table(name=self.tname, connection=self.connection, **kw)
  83. def create_empty_table(self, columns=None, **kw):
  84. """Create an empty table in the database and return Table class
  85. instance.
  86. :param columns: list of tuple containing the column names and types.
  87. :type columns: list of tuple
  88. :param **kw: keyword arguments of Table class
  89. without name and connection.
  90. :type **kw: key-word arguments
  91. :returns: Table instance
  92. """
  93. columns = self.columns if columns is None else columns
  94. table = self.create_table_instance(**kw)
  95. table.create(columns)
  96. return table
  97. def create_not_empty_table(self, nrows=None, values=None, columns=None, **kw):
  98. """Create a not empty table in the database and return Table class
  99. instance.
  100. :param nrows: number of rows.
  101. :type nrows: list of tuple
  102. :param values: list of tuple containing the values for each row.
  103. :type values: list of tuple
  104. :param columns: list of tuple containing the column names and types.
  105. :type columns: list of tuple
  106. :param **kw: keyword arguments of Table class
  107. without name and connection.
  108. :type **kw: key-word arguments
  109. :returns: Table instance
  110. """
  111. if nrows is None and values is None:
  112. msg = "Both parameters ``nrows`` ``values`` are empty"
  113. raise RuntimeError(msg)
  114. columns = self.columns if columns is None else columns
  115. values = get_table_random_values(nrows, columns) if values is None else values
  116. table = self.create_empty_table(columns=columns, **kw)
  117. table.insert(values, many=True)
  118. return table
  119. def setUp(self):
  120. """Create a not empty table instance"""
  121. self.table = self.create_not_empty_table(10)
  122. self.cols = self.table.columns
  123. def tearDown(self):
  124. """Remove the generated vector map, if exist"""
  125. self.table.drop(force=True)
  126. self.table = None
  127. self.cols = None
  128. class ColumnsTestCase(DBconnection, TestCase):
  129. def test_check_insert_update_str(self):
  130. """Check insert_str and update_str attribute of Columns are correct"""
  131. insert = "INSERT INTO %s VALUES (?,?,?,?)"
  132. self.assertEqual(self.cols.insert_str, insert % self.tname)
  133. update = "UPDATE %s SET cint=?,creal=?,ctxt=? WHERE cat=?;"
  134. self.assertEqual(self.cols.update_str, update % self.tname)
  135. class TableInsertTestCase(DBconnection, TestCase):
  136. def setUp(self):
  137. """Create a not empty table instance"""
  138. self.table = self.create_empty_table()
  139. self.cols = self.table.columns
  140. def tearDown(self):
  141. """Remove the generated vector map, if exist"""
  142. self.table.drop(force=True)
  143. self.table = None
  144. self.cols = None
  145. def test_insert(self):
  146. """Test Table.insert method"""
  147. cat = 1
  148. vals = (cat, 1111, 0.1111, "test")
  149. cur = self.connection.cursor()
  150. self.table.insert(vals, cursor=cur)
  151. sqlquery = "SELECT cat, cint, creal, ctxt FROM %s WHERE cat=%d"
  152. cur.execute(sqlquery % (self.tname, cat))
  153. self.assertTupleEqual(vals, cur.fetchone())
  154. def test_insert_many(self):
  155. """Test Table.insert method using many==True"""
  156. vals = [
  157. (1, 1111, 0.1111, "test1"),
  158. (2, 2222, 0.2222, "test2"),
  159. (3, 3333, 0.3333, "test3"),
  160. ]
  161. cur = self.connection.cursor()
  162. self.table.insert(vals, cursor=cur, many=True)
  163. sqlquery = "SELECT cat, cint, creal, ctxt FROM %s"
  164. cur.execute(sqlquery % self.tname)
  165. self.assertListEqual(vals, cur.fetchall())
  166. class TableUpdateTestCase(DBconnection, TestCase):
  167. def test_update(self):
  168. """Test Table.update method"""
  169. vals = (1122, 0.1122, "test")
  170. cat = 1
  171. cur = self.connection.cursor()
  172. self.table.update(cat, list(vals), cursor=cur)
  173. self.connection.commit()
  174. sqlquery = "SELECT cint, creal, ctxt FROM %s WHERE cat=%d"
  175. cur.execute(sqlquery % (self.tname, cat))
  176. self.assertTupleEqual(vals, cur.fetchone())
  177. if __name__ == "__main__":
  178. test()