|
@@ -0,0 +1,209 @@
|
|
|
+# -*- coding: utf-8 -*-
|
|
|
+"""
|
|
|
+Created on Wed Jun 25 11:08:22 2014
|
|
|
+
|
|
|
+@author: pietro
|
|
|
+"""
|
|
|
+import os
|
|
|
+import unittest
|
|
|
+import sqlite3
|
|
|
+import tempfile as tmp
|
|
|
+from string import ascii_letters, digits
|
|
|
+from random import choice
|
|
|
+
|
|
|
+
|
|
|
+import numpy as np
|
|
|
+
|
|
|
+from grass.pygrass.vector.table import Table, get_path
|
|
|
+
|
|
|
+
|
|
|
+# dictionary that generate random data
|
|
|
+COL2VALS = {'INT': lambda n: np.random.randint(9, size=n),
|
|
|
+ 'INTEGER': lambda n: np.random.randint(9, size=n),
|
|
|
+ 'INTEGER PRIMARY KEY': lambda n: np.arange(1, n+1, dtype=long),
|
|
|
+ 'REAL': lambda n: np.random.rand(n),
|
|
|
+ 'TEXT': lambda n: np.array([randstr() for _ in range(n)])}
|
|
|
+
|
|
|
+
|
|
|
+def randstr(prefix='', suffix='', size=6, chars=ascii_letters + digits):
|
|
|
+ """Return a random string of characters.
|
|
|
+
|
|
|
+ :param prefix: string prefix, default: ''
|
|
|
+ :type prefix: str
|
|
|
+
|
|
|
+ :param suffix: string suffix, default: ''
|
|
|
+ :type suffix: str
|
|
|
+
|
|
|
+ :param size: number of random characters
|
|
|
+ :type size: int
|
|
|
+
|
|
|
+ :param chars: string containing the characters that will be used
|
|
|
+ :type chars: str
|
|
|
+
|
|
|
+ :returns: string
|
|
|
+ """
|
|
|
+ return prefix + ''.join(choice(chars) for _ in range(size)) + suffix
|
|
|
+
|
|
|
+
|
|
|
+def get_table_random_values(nrows, columns):
|
|
|
+ """Generate a random recarray respecting the columns definition.
|
|
|
+
|
|
|
+ :param nrows: number of rows of the generated array
|
|
|
+ :type nrows: int
|
|
|
+
|
|
|
+ :param columns: list of tuple containing column name and type.
|
|
|
+ :type columns: list of tuple
|
|
|
+
|
|
|
+ :returns: numpy recarray
|
|
|
+ """
|
|
|
+ vals, dtype = [], []
|
|
|
+ for cname, ctype in columns:
|
|
|
+ if ctype not in COL2VALS:
|
|
|
+ raise TypeError("Unkown column type %s for: %s" % (ctype, cname))
|
|
|
+ vals.append(COL2VALS[ctype](nrows))
|
|
|
+ dtype.append((cname, vals[-1].dtype.str))
|
|
|
+ return np.array([v for v in zip(*vals)], dtype=dtype)
|
|
|
+
|
|
|
+
|
|
|
+class DBconnection(object):
|
|
|
+ """Define a class to share common methods between TestCase."""
|
|
|
+ path = os.path.join(tmp.gettempdir(), randstr(prefix='temp', suffix='.db'))
|
|
|
+ connection = sqlite3.connect(get_path(path))
|
|
|
+ columns = [('cat', 'INTEGER PRIMARY KEY'),
|
|
|
+ ('cint', 'INT'),
|
|
|
+ ('creal', 'REAL'),
|
|
|
+ ('ctxt', 'TEXT')]
|
|
|
+
|
|
|
+ def create_table_instance(self, **kw):
|
|
|
+ """Return a Table class instance
|
|
|
+
|
|
|
+ :param **kw: keyword arguments of Table class
|
|
|
+ without name and connection.
|
|
|
+ :type **kw: key-word arguments
|
|
|
+
|
|
|
+ :returns: Table instance
|
|
|
+ """
|
|
|
+ self.tname = randstr(prefix='temp')
|
|
|
+ return Table(name=self.tname,
|
|
|
+ connection=self.connection, **kw)
|
|
|
+
|
|
|
+ def create_empty_table(self, columns=None, **kw):
|
|
|
+ """Create an empty table in the database and return Table class
|
|
|
+ instance.
|
|
|
+
|
|
|
+ :param columns: list of tuple containing the column names and types.
|
|
|
+ :type columns: list of tuple
|
|
|
+
|
|
|
+ :param **kw: keyword arguments of Table class
|
|
|
+ without name and connection.
|
|
|
+ :type **kw: key-word arguments
|
|
|
+
|
|
|
+ :returns: Table instance
|
|
|
+ """
|
|
|
+ columns = self.columns if columns is None else columns
|
|
|
+ table = self.create_table_instance(**kw)
|
|
|
+ table.create(columns)
|
|
|
+ return table
|
|
|
+
|
|
|
+ def create_not_empty_table(self, nrows=None, values=None,
|
|
|
+ columns=None, **kw):
|
|
|
+ """Create a not empty table in the database and return Table class
|
|
|
+ instance.
|
|
|
+
|
|
|
+ :param nrows: number of rows.
|
|
|
+ :type nrows: list of tuple
|
|
|
+
|
|
|
+ :param values: list of tuple containing the values for each row.
|
|
|
+ :type values: list of tuple
|
|
|
+
|
|
|
+ :param columns: list of tuple containing the column names and types.
|
|
|
+ :type columns: list of tuple
|
|
|
+
|
|
|
+ :param **kw: keyword arguments of Table class
|
|
|
+ without name and connection.
|
|
|
+ :type **kw: key-word arguments
|
|
|
+
|
|
|
+ :returns: Table instance
|
|
|
+ """
|
|
|
+ if nrows is None and values is None:
|
|
|
+ msg = "Both parameters ``nrows`` ``values`` are empty"
|
|
|
+ raise RuntimeError(msg)
|
|
|
+ columns = self.columns if columns is None else columns
|
|
|
+ values = (get_table_random_values(nrows, columns) if values is None
|
|
|
+ else values)
|
|
|
+ table = self.create_empty_table(columns=columns, **kw)
|
|
|
+ table.insert(values, many=True)
|
|
|
+ return table
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+ """Create a not empty table instance"""
|
|
|
+ self.table = self.create_not_empty_table(10)
|
|
|
+ self.cols = self.table.columns
|
|
|
+
|
|
|
+ def tearDown(self):
|
|
|
+ """Remove the generated vector map, if exist"""
|
|
|
+ self.table.drop(force=True)
|
|
|
+ self.table = None
|
|
|
+ self.cols = None
|
|
|
+
|
|
|
+
|
|
|
+class ColumnsTestCase(DBconnection, unittest.TestCase):
|
|
|
+
|
|
|
+ def test_check_insert_update_str(self):
|
|
|
+ """Check insert_str and update_str attribute of Columns are correct"""
|
|
|
+ insert = 'INSERT INTO %s VALUES (?,?,?,?)'
|
|
|
+ self.assertEqual(self.cols.insert_str, insert % self.tname)
|
|
|
+ update = 'UPDATE %s SET cint=?,creal=?,ctxt=? WHERE cat=?;'
|
|
|
+ self.assertEqual(self.cols.update_str, update % self.tname)
|
|
|
+
|
|
|
+
|
|
|
+class TableInsertTestCase(DBconnection, unittest.TestCase):
|
|
|
+
|
|
|
+ def setUp(self):
|
|
|
+ """Create a not empty table instance"""
|
|
|
+ self.table = self.create_empty_table()
|
|
|
+ self.cols = self.table.columns
|
|
|
+
|
|
|
+ def tearDown(self):
|
|
|
+ """Remove the generated vector map, if exist"""
|
|
|
+ self.table.drop(force=True)
|
|
|
+ self.table = None
|
|
|
+ self.cols = None
|
|
|
+
|
|
|
+ def test_insert(self):
|
|
|
+ """Test Table.insert method"""
|
|
|
+ cat = 1
|
|
|
+ vals = (cat, 1111, 0.1111, 'test')
|
|
|
+ cur = self.connection.cursor()
|
|
|
+ self.table.insert(vals, cursor=cur)
|
|
|
+ sqlquery = "SELECT cat, cint, creal, ctxt FROM %s WHERE cat=%d"
|
|
|
+ cur.execute(sqlquery % (self.tname, cat))
|
|
|
+ self.assertTupleEqual(vals, cur.fetchone())
|
|
|
+
|
|
|
+ def test_insert_many(self):
|
|
|
+ """Test Table.insert method using many==True"""
|
|
|
+ vals = [(1, 1111, 0.1111, 'test1'),
|
|
|
+ (2, 2222, 0.2222, 'test2'),
|
|
|
+ (3, 3333, 0.3333, 'test3')]
|
|
|
+ cur = self.connection.cursor()
|
|
|
+ self.table.insert(vals, cursor=cur, many=True)
|
|
|
+ sqlquery = "SELECT cat, cint, creal, ctxt FROM %s"
|
|
|
+ cur.execute(sqlquery % self.tname)
|
|
|
+ self.assertListEqual(vals, cur.fetchall())
|
|
|
+
|
|
|
+
|
|
|
+class TableUpdateTestCase(DBconnection, unittest.TestCase):
|
|
|
+
|
|
|
+ def test_update(self):
|
|
|
+ """Test Table.update method"""
|
|
|
+ vals = (1111, 0.1111, 'test')
|
|
|
+ cat = 1
|
|
|
+ cur = self.connection.cursor()
|
|
|
+ self.table.update(cat, list(vals), cursor=cur)
|
|
|
+ sqlquery = "SELECT cint, creal, ctxt FROM %s WHERE cat=%d"
|
|
|
+ cur.execute(sqlquery % (self.tname, cat))
|
|
|
+ self.assertTupleEqual(vals, cur.fetchone())
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ unittest.main()
|