test_table.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. # -*- coding: utf-8 -*-
  2. """
  3. Created on Wed Jun 25 11:08:22 2014
  4. @author: pietro
  5. """
  6. import os
  7. import sqlite3
  8. import tempfile as tmp
  9. from string import ascii_letters, digits
  10. from random import choice
  11. import numpy as np
  12. from grass.gunittest.case import TestCase
  13. from grass.gunittest.main import test
  14. from grass.pygrass.vector.table import Table, get_path
  15. # dictionary that generate random data
  16. COL2VALS = {'INT': lambda n: np.random.randint(9, size=n),
  17. 'INTEGER': lambda n: np.random.randint(9, size=n),
  18. 'INTEGER PRIMARY KEY': lambda n: np.arange(1, n+1, dtype=long),
  19. 'REAL': lambda n: np.random.rand(n),
  20. 'TEXT': lambda n: np.array([randstr() for _ in range(n)])}
  21. def randstr(prefix='', suffix='', size=6, chars=ascii_letters + digits):
  22. """Return a random string of characters.
  23. :param prefix: string prefix, default: ''
  24. :type prefix: str
  25. :param suffix: string suffix, default: ''
  26. :type suffix: str
  27. :param size: number of random characters
  28. :type size: int
  29. :param chars: string containing the characters that will be used
  30. :type chars: str
  31. :returns: string
  32. """
  33. return prefix + ''.join(choice(chars) for _ in range(size)) + suffix
  34. def get_table_random_values(nrows, columns):
  35. """Generate a random recarray respecting the columns definition.
  36. :param nrows: number of rows of the generated array
  37. :type nrows: int
  38. :param columns: list of tuple containing column name and type.
  39. :type columns: list of tuple
  40. :returns: numpy recarray
  41. """
  42. vals, dtype = [], []
  43. for cname, ctype in columns:
  44. if ctype not in COL2VALS:
  45. raise TypeError("Unkown column type %s for: %s" % (ctype, cname))
  46. vals.append(COL2VALS[ctype](nrows))
  47. dtype.append((cname, vals[-1].dtype.str))
  48. return np.array([v for v in zip(*vals)], dtype=dtype)
  49. class DBconnection(object):
  50. """Define a class to share common methods between TestCase."""
  51. path = os.path.join(tmp.gettempdir(), randstr(prefix='temp', suffix='.db'))
  52. connection = sqlite3.connect(get_path(path))
  53. columns = [('cat', 'INTEGER PRIMARY KEY'),
  54. ('cint', 'INT'),
  55. ('creal', 'REAL'),
  56. ('ctxt', 'TEXT')]
  57. def create_table_instance(self, **kw):
  58. """Return a Table class instance
  59. :param **kw: keyword arguments of Table class
  60. without name and connection.
  61. :type **kw: key-word arguments
  62. :returns: Table instance
  63. """
  64. self.tname = randstr(prefix='temp')
  65. return Table(name=self.tname,
  66. connection=self.connection, **kw)
  67. def create_empty_table(self, columns=None, **kw):
  68. """Create an empty table in the database and return Table class
  69. instance.
  70. :param columns: list of tuple containing the column names and types.
  71. :type columns: list of tuple
  72. :param **kw: keyword arguments of Table class
  73. without name and connection.
  74. :type **kw: key-word arguments
  75. :returns: Table instance
  76. """
  77. columns = self.columns if columns is None else columns
  78. table = self.create_table_instance(**kw)
  79. table.create(columns)
  80. return table
  81. def create_not_empty_table(self, nrows=None, values=None,
  82. columns=None, **kw):
  83. """Create a not empty table in the database and return Table class
  84. instance.
  85. :param nrows: number of rows.
  86. :type nrows: list of tuple
  87. :param values: list of tuple containing the values for each row.
  88. :type values: list of tuple
  89. :param columns: list of tuple containing the column names and types.
  90. :type columns: list of tuple
  91. :param **kw: keyword arguments of Table class
  92. without name and connection.
  93. :type **kw: key-word arguments
  94. :returns: Table instance
  95. """
  96. if nrows is None and values is None:
  97. msg = "Both parameters ``nrows`` ``values`` are empty"
  98. raise RuntimeError(msg)
  99. columns = self.columns if columns is None else columns
  100. values = (get_table_random_values(nrows, columns) if values is None
  101. else values)
  102. table = self.create_empty_table(columns=columns, **kw)
  103. table.insert(values, many=True)
  104. return table
  105. def setUp(self):
  106. """Create a not empty table instance"""
  107. self.table = self.create_not_empty_table(10)
  108. self.cols = self.table.columns
  109. def tearDown(self):
  110. """Remove the generated vector map, if exist"""
  111. self.table.drop(force=True)
  112. self.table = None
  113. self.cols = None
  114. class ColumnsTestCase(DBconnection, TestCase):
  115. def test_check_insert_update_str(self):
  116. """Check insert_str and update_str attribute of Columns are correct"""
  117. insert = 'INSERT INTO %s VALUES (?,?,?,?)'
  118. self.assertEqual(self.cols.insert_str, insert % self.tname)
  119. update = 'UPDATE %s SET cint=?,creal=?,ctxt=? WHERE cat=?;'
  120. self.assertEqual(self.cols.update_str, update % self.tname)
  121. class TableInsertTestCase(DBconnection, TestCase):
  122. def setUp(self):
  123. """Create a not empty table instance"""
  124. self.table = self.create_empty_table()
  125. self.cols = self.table.columns
  126. def tearDown(self):
  127. """Remove the generated vector map, if exist"""
  128. self.table.drop(force=True)
  129. self.table = None
  130. self.cols = None
  131. def test_insert(self):
  132. """Test Table.insert method"""
  133. cat = 1
  134. vals = (cat, 1111, 0.1111, 'test')
  135. cur = self.connection.cursor()
  136. self.table.insert(vals, cursor=cur)
  137. sqlquery = "SELECT cat, cint, creal, ctxt FROM %s WHERE cat=%d"
  138. cur.execute(sqlquery % (self.tname, cat))
  139. self.assertTupleEqual(vals, cur.fetchone())
  140. def test_insert_many(self):
  141. """Test Table.insert method using many==True"""
  142. vals = [(1, 1111, 0.1111, 'test1'),
  143. (2, 2222, 0.2222, 'test2'),
  144. (3, 3333, 0.3333, 'test3')]
  145. cur = self.connection.cursor()
  146. self.table.insert(vals, cursor=cur, many=True)
  147. sqlquery = "SELECT cat, cint, creal, ctxt FROM %s"
  148. cur.execute(sqlquery % self.tname)
  149. self.assertListEqual(vals, cur.fetchall())
  150. class TableUpdateTestCase(DBconnection, TestCase):
  151. def test_update(self):
  152. """Test Table.update method"""
  153. vals = (1122, 0.1122, 'test')
  154. cat = 1
  155. cur = self.connection.cursor()
  156. self.table.update(cat, list(vals), cursor=cur)
  157. self.connection.commit()
  158. sqlquery = "SELECT cint, creal, ctxt FROM %s WHERE cat=%d"
  159. cur.execute(sqlquery % (self.tname, cat))
  160. self.assertTupleEqual(vals, cur.fetchone())
  161. if __name__ == '__main__':
  162. test()