123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263 |
- """
- Created on Wed Aug 8 15:29:21 2012
- @author: pietro
- """
- from __future__ import (
- nested_scopes,
- generators,
- division,
- absolute_import,
- with_statement,
- print_function,
- unicode_literals,
- )
- import os
- import sys
- import ctypes
- import numpy as np
- from sqlite3 import OperationalError
- try:
- from collections import OrderedDict
- except ImportError:
- from grass.pygrass.orderdict import OrderedDict
- import grass.lib.vector as libvect
- from grass.pygrass.gis import Mapset
- from grass.exceptions import DBError
- from grass.pygrass.utils import table_exist, decode
- from grass.script.db import db_table_in_vector
- from grass.script.core import warning
- from grass.pygrass.vector import sql
- from grass.lib.ctypes_preamble import ReturnString
- if sys.version_info.major >= 3:
- long = int
- unicode = str
- # For test purposes
- test_vector_name = "table_doctest_map"
- DRIVERS = ("sqlite", "pg")
- UNSUPPORTED_DRIVERS = ("ogr", "dbf")
- def get_path(path, vect_name=None):
- """Return the full path to the database; replacing environment variable
- with real values
- :param path: The path with substitutional parameter
- :param vect_name: The name of the vector map
- >>> from grass.script.core import gisenv
- >>> import os
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> new_path = get_path(path)
- >>> new_path2 = os.path.join(gisenv()['GISDBASE'], gisenv()['LOCATION_NAME'],
- ... gisenv()['MAPSET'], 'sqlite', 'sqlite.db')
- >>> new_path.replace("//","/") == new_path2.replace("//","/")
- True
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/vector/$MAP/sqlite.db'
- >>> new_path = get_path(path, "test")
- >>> new_path2 = os.path.join(gisenv()['GISDBASE'], gisenv()['LOCATION_NAME'],
- ... gisenv()['MAPSET'], 'vector', 'test', 'sqlite.db')
- >>> new_path.replace("//","/") == new_path2.replace("//","/")
- True
- """
- if "$" not in path:
- return path
- else:
- mapset = Mapset()
- path = path.replace("$GISDBASE", mapset.gisdbase)
- path = path.replace("$LOCATION_NAME", mapset.location)
- path = path.replace("$MAPSET", mapset.name)
- if vect_name is not None:
- path = path.replace("$MAP", vect_name)
- return path
- class Filters(object):
- """Help user to build a simple sql query.
- >>> filter = Filters('table')
- >>> filter.get_sql()
- 'SELECT * FROM table;'
- >>> filter.where("area<10000").get_sql()
- 'SELECT * FROM table WHERE area<10000;'
- >>> filter.select("cat", "area").get_sql()
- 'SELECT cat, area FROM table WHERE area<10000;'
- >>> filter.order_by("area").limit(10).get_sql()
- 'SELECT cat, area FROM table WHERE area<10000 ORDER BY area LIMIT 10;'
- """
- def __init__(self, tname):
- self.tname = tname
- self._select = None
- self._where = None
- self._orderby = None
- self._limit = None
- self._groupby = None
- def __repr__(self):
- return "Filters(%r)" % self.get_sql()
- def select(self, *args):
- """Create the select query"""
- cols = ", ".join(args) if args else "*"
- select = sql.SELECT[:-1]
- self._select = select.format(cols=cols, tname=self.tname)
- return self
- def where(self, condition):
- """Create the where condition
- :param condition: the condition of where statement, for example
- `cat = 1`
- :type condition: str
- """
- self._where = "WHERE {condition}".format(condition=condition)
- return self
- def order_by(self, *orderby):
- """Create the order by condition
- :param orderby: the name of column/s to order the result
- :type orderby: str
- """
- self._orderby = "ORDER BY {orderby}".format(orderby=", ".join(orderby))
- return self
- def limit(self, number):
- """Create the limit condition
- :param number: the number to limit the result
- :type number: int
- """
- if not isinstance(number, int):
- raise ValueError("Must be an integer.")
- else:
- self._limit = "LIMIT {number}".format(number=number)
- return self
- def group_by(self, *groupby):
- """Create the group by condition
- :param groupby: the name of column/s to group the result
- :type groupby: str, list
- """
- self._groupby = "GROUP BY {groupby}".format(groupby=", ".join(groupby))
- return self
- def get_sql(self):
- """Return the SQL query"""
- sql_list = list()
- if self._select is None:
- self.select()
- sql_list.append(self._select)
- if self._where is not None:
- sql_list.append(self._where)
- if self._groupby is not None:
- sql_list.append(self._groupby)
- if self._orderby is not None:
- sql_list.append(self._orderby)
- if self._limit is not None:
- sql_list.append(self._limit)
- return "%s;" % " ".join(sql_list)
- def reset(self):
- """Clean internal variables"""
- self._select = None
- self._where = None
- self._orderby = None
- self._limit = None
- self._groupby = None
- class Columns(object):
- """Object to work with columns table.
- It is possible to instantiate a Columns object given the table name and
- the database connection.
- For a sqlite table:
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> cols_sqlite = Columns(test_vector_name,
- ... sqlite3.connect(get_path(path)))
- >>> cols_sqlite.tname
- 'table_doctest_map'
- For a postgreSQL table:
- >>> import psycopg2 as pg #doctest: +SKIP
- >>> cols_pg = Columns(test_vector_name,
- ... pg.connect('host=localhost dbname=grassdb')) #doctest: +SKIP
- >>> cols_pg.tname #doctest: +SKIP
- 'table_doctest_map' #doctest: +SKIP
- """
- def __init__(self, tname, connection, key="cat"):
- self.tname = tname
- self.conn = connection
- self.key = key
- self.odict = None
- self.update_odict()
- def __contains__(self, item):
- return item in self.names()
- def __repr__(self):
- return "Columns(%r)" % list(self.items())
- def __getitem__(self, key):
- return self.odict[key]
- def __setitem__(self, name, new_type):
- self.cast(name, new_type)
- self.update_odict(self)
- def __iter__(self):
- return self.odict.__iter__()
- def __len__(self):
- return self.odict.__len__()
- def __eq__(self, obj):
- """Return True if two table have the same columns.
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> connection = sqlite3.connect(get_path(path))
- >>> cols0 = Columns(test_vector_name, connection)
- >>> cols1 = Columns(test_vector_name, connection)
- >>> cols0 == cols1
- True
- """
- return obj.tname == self.tname and obj.odict == self.odict
- def __ne__(self, other):
- return not self == other
- # Restore Python 2 hashing beaviour on Python 3
- __hash__ = object.__hash__
- def is_pg(self):
- """Return True if is a psycopg connection.
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> cols_sqlite = Columns(test_vector_name,
- ... sqlite3.connect(get_path(path)))
- >>> cols_sqlite.is_pg()
- False
- >>> import psycopg2 as pg #doctest: +SKIP
- >>> cols_pg = Columns(test_vector_name,
- ... pg.connect('host=localhost dbname=grassdb')) #doctest: +SKIP
- >>> cols_pg.is_pg() #doctest: +SKIP
- True
- """
- return hasattr(self.conn, "xid")
- def update_odict(self):
- """Read columns name and types from table and update the odict
- attribute.
- """
- if self.is_pg():
- # is a postgres connection
- cur = self.conn.cursor()
- cur.execute("SELECT oid,typname FROM pg_type")
- diz = dict(cur.fetchall())
- odict = OrderedDict()
- import psycopg2 as pg
- try:
- cur.execute(sql.SELECT.format(cols="*", tname=self.tname))
- descr = cur.description
- for column in descr:
- name, ctype = column[:2]
- odict[name] = diz[ctype]
- except pg.ProgrammingError:
- pass
- self.odict = odict
- else:
- # is a sqlite connection
- cur = self.conn.cursor()
- cur.execute(sql.PRAGMA.format(tname=self.tname))
- descr = cur.fetchall()
- odict = OrderedDict()
- for column in descr:
- name, ctype = column[1:3]
- odict[name] = ctype
- self.odict = odict
- values = ",".join(
- [
- "?",
- ]
- * self.__len__()
- )
- kv = ",".join(["%s=?" % k for k in self.odict.keys() if k != self.key])
- where = "%s=?" % self.key
- self.insert_str = sql.INSERT.format(tname=self.tname, values=values)
- self.update_str = sql.UPDATE_WHERE.format(
- tname=self.tname, values=kv, condition=where
- )
- def sql_descr(self, remove=None):
- """Return a string with description of columns.
- Remove it is used to remove a columns.
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> cols_sqlite = Columns(test_vector_name,
- ... sqlite3.connect(get_path(path)))
- >>> cols_sqlite.sql_descr() # doctest: +ELLIPSIS
- 'cat INTEGER, name varchar(50), value double precision'
- >>> import psycopg2 as pg # doctest: +SKIP
- >>> cols_pg = Columns(test_vector_name,
- ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP
- >>> cols_pg.sql_descr() # doctest: +ELLIPSIS +SKIP
- 'cat INTEGER, name varchar(50), value double precision'
- """
- if remove:
- return ", ".join(
- ["%s %s" % (key, val) for key, val in self.items() if key != remove]
- )
- else:
- return ", ".join(["%s %s" % (key, val) for key, val in self.items()])
- def types(self):
- """Return a list with the column types.
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> cols_sqlite = Columns(test_vector_name,
- ... sqlite3.connect(get_path(path)))
- >>> cols_sqlite.types() # doctest: +ELLIPSIS
- ['INTEGER', 'varchar(50)', 'double precision']
- >>> import psycopg2 as pg # doctest: +SKIP
- >>> cols_pg = Columns(test_vector_name,
- ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP
- >>> cols_pg.types() # doctest: +ELLIPSIS +SKIP
- ['INTEGER', 'varchar(50)', 'double precision']
- """
- return list(self.odict.values())
- def names(self, remove=None, unicod=True):
- """Return a list with the column names.
- Remove it is used to remove a columns.
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> cols_sqlite = Columns(test_vector_name,
- ... sqlite3.connect(get_path(path)))
- >>> cols_sqlite.names() # doctest: +ELLIPSIS
- ['cat', 'name', 'value']
- >>> import psycopg2 as pg # doctest: +SKIP
- >>> cols_pg = Columns(test_vector_name, # doctest: +SKIP
- ... pg.connect('host=localhost dbname=grassdb'))
- >>> cols_pg.names() # doctest: +ELLIPSIS +SKIP
- ['cat', 'name', 'value']
- """
- if remove:
- nams = list(self.odict.keys())
- nams.remove(remove)
- else:
- nams = list(self.odict.keys())
- if unicod:
- return nams
- else:
- return [str(name) for name in nams]
- def items(self):
- """Return a list of tuple with column name and column type.
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> cols_sqlite = Columns(test_vector_name,
- ... sqlite3.connect(get_path(path)))
- >>> cols_sqlite.items() # doctest: +ELLIPSIS
- [('cat', 'INTEGER'), ('name', 'varchar(50)'), ('value', 'double precision')]
- >>> import psycopg2 as pg # doctest: +SKIP
- >>> cols_pg = Columns(test_vector_name,
- ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP
- >>> cols_pg.items() # doctest: +ELLIPSIS +SKIP
- [('cat', 'INTEGER'), ('name', 'varchar(50)'), ('value', 'double precision')]
- """
- return list(self.odict.items())
- def add(self, col_name, col_type):
- """Add a new column to the table.
- :param col_name: the name of column to add
- :type col_name: str
- :param col_type: the tipe of column to add
- :type col_type: str
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> from grass.pygrass.utils import copy, remove
- >>> copy(test_vector_name,'mycensus','vect')
- >>> cols_sqlite = Columns('mycensus',
- ... sqlite3.connect(get_path(path)))
- >>> cols_sqlite.add(['n_pizza'], ['INT'])
- >>> 'n_pizza' in cols_sqlite
- True
- >>> import psycopg2 as pg # doctest: +SKIP
- >>> cols_pg = Columns('boundary_municp_pg',
- ... pg.connect('host=localhost dbname=grassdb')) #doctest: +SKIP
- >>> cols_pg.add('n_pizza', 'INT') # doctest: +SKIP
- >>> 'n_pizza' in cols_pg # doctest: +SKIP
- True
- >>> remove('mycensus', 'vect')
- """
- def check(col_type):
- """Check the column type if it is supported by GRASS
- :param col_type: the type of column
- :type col_type: str
- """
- valid_type = (
- "DOUBLE PRECISION",
- "DOUBLE",
- "INT",
- "INTEGER",
- "DATE",
- "VARCHAR",
- )
- col = col_type.upper()
- valid = [col.startswith(tp) for tp in valid_type]
- if not any(valid):
- str_err = "Type: %r is not supported." "\nSupported types are: %s"
- raise TypeError(str_err % (col_type, ", ".join(valid_type)))
- return col_type
- col_type = (
- [
- check(col_type),
- ]
- if isinstance(col_type, (str, unicode))
- else [check(col) for col in col_type]
- )
- col_name = (
- [
- col_name,
- ]
- if isinstance(col_name, (str, unicode))
- else col_name
- )
- sqlcode = [
- sql.ADD_COL.format(tname=self.tname, cname=cn, ctype=ct)
- for cn, ct in zip(col_name, col_type)
- ]
- cur = self.conn.cursor()
- cur.executescript("\n".join(sqlcode))
- self.conn.commit()
- cur.close()
- self.update_odict()
- def rename(self, old_name, new_name):
- """Rename a column of the table.
- :param old_name: the name of existing column
- :type old_name: str
- :param new_name: the name of new column
- :type new_name: str
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> from grass.pygrass.utils import copy, remove
- >>> copy(test_vector_name,'mycensus','vect')
- >>> cols_sqlite = Columns('mycensus',
- ... sqlite3.connect(get_path(path)))
- >>> cols_sqlite.add(['n_pizza'], ['INT'])
- >>> 'n_pizza' in cols_sqlite
- True
- >>> cols_sqlite.rename('n_pizza', 'n_pizzas') # doctest: +ELLIPSIS
- >>> 'n_pizza' in cols_sqlite
- False
- >>> 'n_pizzas' in cols_sqlite
- True
- >>> import psycopg2 as pg # doctest: +SKIP
- >>> cols_pg = Columns(test_vector_name,
- ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP
- >>> cols_pg.rename('n_pizza', 'n_pizzas') # doctest: +SKIP
- >>> 'n_pizza' in cols_pg # doctest: +SKIP
- False
- >>> 'n_pizzas' in cols_pg # doctest: +SKIP
- True
- >>> remove('mycensus', 'vect')
- """
- cur = self.conn.cursor()
- if self.is_pg():
- cur.execute(
- sql.RENAME_COL.format(
- tname=self.tname, old_name=old_name, new_name=new_name
- )
- )
- self.conn.commit()
- cur.close()
- self.update_odict()
- else:
- cur.execute(
- sql.ADD_COL.format(
- tname=self.tname, cname=new_name, ctype=str(self.odict[old_name])
- )
- )
- cur.execute(
- sql.UPDATE.format(tname=self.tname, new_col=new_name, old_col=old_name)
- )
- self.conn.commit()
- cur.close()
- self.update_odict()
- self.drop(old_name)
- def cast(self, col_name, new_type):
- """Change the column type.
- :param col_name: the name of column
- :type col_name: str
- :param new_type: the new type of column
- :type new_type: str
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> from grass.pygrass.utils import copy, remove
- >>> copy(test_vector_name,'mycensus','vect')
- >>> cols_sqlite = Columns('mycensus',
- ... sqlite3.connect(get_path(path)))
- >>> cols_sqlite.add(['n_pizzas'], ['INT'])
- >>> cols_sqlite.cast('n_pizzas', 'float8') # doctest: +ELLIPSIS
- Traceback (most recent call last):
- ...
- grass.exceptions.DBError: SQLite does not support to cast columns.
- >>> import psycopg2 as pg # doctest: +SKIP
- >>> cols_pg = Columns(test_vector_name,
- ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP
- >>> cols_pg.cast('n_pizzas', 'float8') # doctest: +SKIP
- >>> cols_pg['n_pizzas'] # doctest: +SKIP
- 'float8'
- >>> remove('mycensus', 'vect')
- .. warning ::
- It is not possible to cast a column with sqlite
- """
- if self.is_pg():
- cur = self.conn.cursor()
- cur.execute(
- sql.CAST_COL.format(tname=self.tname, col=col_name, ctype=new_type)
- )
- self.conn.commit()
- cur.close()
- self.update_odict()
- else:
- # sqlite does not support rename columns:
- raise DBError("SQLite does not support to cast columns.")
- def drop(self, col_name):
- """Drop a column from the table.
- :param col_name: the name of column to remove
- :type col_name: str
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> from grass.pygrass.utils import copy, remove
- >>> copy(test_vector_name,'mycensus','vect')
- >>> cols_sqlite = Columns('mycensus',
- ... sqlite3.connect(get_path(path)))
- >>> cols_sqlite.drop('name') # doctest: +ELLIPSIS
- >>> 'name' in cols_sqlite
- False
- >>> import psycopg2 as pg # doctest: +SKIP
- >>> cols_pg = Columns(test_vector_name,
- ... pg.connect('host=localhost dbname=grassdb')) # doctest: +SKIP
- >>> cols_pg.drop('name') # doctest: +SKIP
- >>> 'name' in cols_pg # doctest: +SKIP
- False
- >>> remove('mycensus','vect')
- """
- cur = self.conn.cursor()
- if self.is_pg():
- cur.execute(sql.DROP_COL.format(tname=self.tname, cname=col_name))
- else:
- desc = str(self.sql_descr(remove=col_name))
- names = ", ".join(self.names(remove=col_name, unicod=False))
- queries = sql.DROP_COL_SQLITE.format(
- tname=self.tname, keycol=self.key, coldef=desc, colnames=names
- ).split("\n")
- for query in queries:
- cur.execute(query)
- self.conn.commit()
- cur.close()
- self.update_odict()
- class Link(object):
- """Define a Link between vector map and the attributes table.
- It is possible to define a Link object or given all the information
- (layer, name, table name, key, database, driver):
- >>> link = Link(1, 'link0', test_vector_name, 'cat',
- ... '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db', 'sqlite')
- >>> link.layer
- 1
- >>> link.name
- 'link0'
- >>> link.table_name
- 'table_doctest_map'
- >>> link.key
- 'cat'
- >>> link.database
- '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> link.driver
- 'sqlite'
- >>> link
- Link(1, link0, sqlite)
- It is possible to change parameters with:
- >>> link.driver = 'pg' # doctest: +SKIP
- >>> link.driver # doctest: +SKIP
- 'pg'
- >>> link.driver = 'postgres' # doctest: +ELLIPSIS +SKIP
- Traceback (most recent call last):
- ...
- TypeError: Driver not supported, use: sqlite, pg.
- >>> link.driver # doctest: +SKIP
- 'pg'
- >>> link.number = 0 # doctest: +ELLIPSIS +SKIP
- Traceback (most recent call last):
- ...
- TypeError: Number must be positive and greater than 0.
- Or given a c_fieldinfo object that is a ctypes pointer to the field_info C
- struct. ::
- >>> link = Link(c_fieldinfo = ctypes.pointer(libvect.field_info()))
- """
- def _get_layer(self):
- return self.c_fieldinfo.contents.number
- def _set_layer(self, number):
- if number <= 0:
- raise TypeError("Number must be positive and greater than 0.")
- self.c_fieldinfo.contents.number = number
- layer = property(
- fget=_get_layer, fset=_set_layer, doc="Set and obtain layer number"
- )
- def _get_name(self):
- return decode(self.c_fieldinfo.contents.name)
- def _set_name(self, name):
- self.c_fieldinfo.contents.name = ReturnString(name)
- name = property(fget=_get_name, fset=_set_name, doc="Set and obtain name vale")
- def _get_table(self):
- return decode(self.c_fieldinfo.contents.table)
- def _set_table(self, new_name):
- self.c_fieldinfo.contents.table = ReturnString(new_name)
- table_name = property(
- fget=_get_table, fset=_set_table, doc="Set and obtain table name value"
- )
- def _get_key(self):
- return decode(self.c_fieldinfo.contents.key)
- def _set_key(self, key):
- self.c_fieldinfo.contents.key = ReturnString(key)
- key = property(fget=_get_key, fset=_set_key, doc="Set and obtain cat value")
- def _get_database(self):
- return decode(self.c_fieldinfo.contents.database)
- def _set_database(self, database):
- self.c_fieldinfo.contents.database = ReturnString(database)
- database = property(
- fget=_get_database, fset=_set_database, doc="Set and obtain database value"
- )
- def _get_driver(self):
- return decode(self.c_fieldinfo.contents.driver)
- def _set_driver(self, driver):
- if driver in DRIVERS:
- self.c_fieldinfo.contents.driver = ReturnString(driver)
- elif driver in UNSUPPORTED_DRIVERS:
- raise NotImplementedError(
- "The database driver %s is not supported by PyGRASS, "
- "use: %s." % (driver, ", ".join(DRIVERS))
- )
- else:
- raise ValueError(
- "The database driver %s is not known to PyGRASS, "
- "use: %s." % (driver, ", ".join(DRIVERS))
- )
- driver = property(
- fget=_get_driver,
- fset=_set_driver,
- doc="Set and obtain driver value. The drivers supported \
- by PyGRASS are: SQLite and PostgreSQL",
- )
- def __init__(
- self,
- layer=1,
- name=None,
- table=None,
- key="cat",
- database="$GISDBASE/$LOCATION_NAME/" "$MAPSET/sqlite/sqlite.db",
- driver="sqlite",
- c_fieldinfo=None,
- ):
- if c_fieldinfo is not None:
- self.c_fieldinfo = c_fieldinfo
- else:
- self.c_fieldinfo = ctypes.pointer(libvect.field_info())
- self.layer = layer
- self.name = name
- self.table_name = table
- self.key = key
- self.database = database
- self.driver = driver
- def __repr__(self):
- return "Link(%d, %s, %s)" % (self.layer, self.name, self.driver)
- def __eq__(self, link):
- """Return True if two Link instance have the same parameters.
- >>> l0 = Link(1, 'link0', test_vector_name, 'cat',
- ... '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db', 'sqlite')
- >>> l1 = Link(1, 'link0', test_vector_name, 'cat',
- ... '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db', 'sqlite')
- >>> l2 = Link(2, 'link0', test_vector_name, 'cat',
- ... '$GISDBASE/$LOCATION_NAME/PERMANENT/sqlite/sqlite.db', 'sqlite')
- >>> l0 == l1
- True
- >>> l1 == l2
- False
- """
- attrs = ["layer", "name", "table_name", "key", "driver"]
- for attr in attrs:
- if getattr(self, attr) != getattr(link, attr):
- return False
- return True
- def __ne__(self, other):
- return not self == other
- # Restore Python 2 hashing beaviour on Python 3
- __hash__ = object.__hash__
- def connection(self):
- """Return a connection object.
- >>> link = Link(1, 'link0', test_vector_name, 'cat',
- ... '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db',
- ... 'sqlite')
- >>> conn = link.connection()
- >>> cur = conn.cursor()
- >>> link.table_name
- 'table_doctest_map'
- >>> cur.execute("SELECT cat, name, value from %s" %
- ... link.table_name) # doctest: +ELLIPSIS
- <sqlite3.Cursor object at ...>
- >>> cur.fetchone() #doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- (1, 'point', 1.0)
- >>> cur.close()
- >>> conn.close()
- """
- driver = self.driver
- if driver == "sqlite":
- import sqlite3
- # Numpy is using some custom integer data types to efficiently
- # pack data into memory. Since these types aren't familiar to
- # sqlite, you'll have to tell it about how to handle them.
- for t in (
- np.int8,
- np.int16,
- np.int32,
- np.int64,
- np.uint8,
- np.uint16,
- np.uint32,
- np.uint64,
- ):
- sqlite3.register_adapter(t, long)
- dbpath = get_path(self.database, self.table_name)
- dbdirpath = os.path.split(dbpath)[0]
- if not os.path.exists(dbdirpath):
- os.mkdir(dbdirpath)
- return sqlite3.connect(dbpath)
- elif driver == "pg":
- try:
- import psycopg2
- psycopg2.paramstyle = "qmark"
- db = " ".join(self.database.split(","))
- return psycopg2.connect(db)
- except ImportError:
- er = "You need to install psycopg2 to connect with this table."
- raise ImportError(er)
- else:
- str_err = "Driver is not supported yet, pleas use: sqlite or pg"
- raise TypeError(str_err)
- def table(self):
- """Return a Table object.
- >>> link = Link(1, 'link0', test_vector_name, 'cat',
- ... '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db',
- ... 'sqlite')
- >>> table = link.table()
- >>> table.filters.select('cat', 'name', 'value')
- Filters('SELECT cat, name, value FROM table_doctest_map;')
- >>> cur = table.execute()
- >>> cur.fetchone()
- (1, 'point', 1.0)
- >>> cur.close()
- """
- return Table(self.table_name, self.connection(), self.key)
- def info(self):
- """Print information of the link.
- >>> link = Link(1, 'link0', test_vector_name, 'cat',
- ... '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db',
- ... 'sqlite')
- >>> link.info()
- layer: 1
- name: link0
- table: table_doctest_map
- key: cat
- database: $GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db
- driver: sqlite
- """
- print("layer: ", self.layer)
- print("name: ", self.name)
- print("table: ", self.table_name)
- print("key: ", self.key)
- print("database: ", self.database)
- print("driver: ", self.driver)
- class DBlinks(object):
- """Interface containing link to the table DB.
- >>> from grass.pygrass.vector import VectorTopo
- >>> cens = VectorTopo(test_vector_name)
- >>> cens.open(mode='r')
- >>> dblinks = DBlinks(cens.c_mapinfo)
- >>> dblinks
- DBlinks([Link(1, table_doctest_map, sqlite)])
- >>> dblinks[0]
- Link(1, table_doctest_map, sqlite)
- >>> dblinks[test_vector_name]
- Link(1, table_doctest_map, sqlite)
- >>> cens.close()
- """
- def __init__(self, c_mapinfo):
- self.c_mapinfo = c_mapinfo
- def __len__(self):
- return self.num_dblinks()
- def __iter__(self):
- return (self.by_index(i) for i in range(self.num_dblinks()))
- def __getitem__(self, item):
- if isinstance(item, int):
- return self.by_index(item)
- else:
- return self.by_name(item)
- def __repr__(self):
- return "DBlinks(%r)" % [link for link in self.__iter__()]
- def by_index(self, indx):
- """Return a Link object by index
- :param indx: the index where add new point
- :type indx: int
- """
- nlinks = self.num_dblinks()
- if nlinks == 0:
- raise IndexError
- if indx < 0:
- indx += nlinks
- if indx > nlinks:
- raise IndexError
- c_fieldinfo = libvect.Vect_get_dblink(self.c_mapinfo, indx)
- return Link(c_fieldinfo=c_fieldinfo) if c_fieldinfo else None
- def by_layer(self, layer):
- """Return the chosen Link using the layer
- :param layer: the number of layer
- :type layer: int
- """
- c_fieldinfo = libvect.Vect_get_field(self.c_mapinfo, layer)
- return Link(c_fieldinfo=c_fieldinfo) if c_fieldinfo else None
- def by_name(self, name):
- """Return the chosen Link using the name
- :param name: the name of Link
- :type name: str
- """
- c_fieldinfo = libvect.Vect_get_field_by_name(self.c_mapinfo, name)
- return Link(c_fieldinfo=c_fieldinfo) if c_fieldinfo else None
- def num_dblinks(self):
- """Return the number of DBlinks"""
- return libvect.Vect_get_num_dblinks(self.c_mapinfo)
- def add(self, link):
- """Add a new link. Need to open vector map in write mode
- :param link: the Link to add to the DBlinks
- :type link: a Link object
- >>> from grass.pygrass.vector import VectorTopo
- >>> test_vect = VectorTopo(test_vector_name)
- >>> test_vect.open(mode='r')
- >>> dblinks = DBlinks(test_vect.c_mapinfo)
- >>> dblinks
- DBlinks([Link(1, table_doctest_map, sqlite)])
- >>> link = Link(2, 'pg_link', test_vector_name, 'cat',
- ... 'host=localhost dbname=grassdb', 'pg') # doctest: +SKIP
- >>> dblinks.add(link) # doctest: +SKIP
- >>> dblinks # doctest: +SKIP
- DBlinks([Link(1, table_doctest_map, sqlite)])
- """
- # TODO: check if open in write mode or not.
- libvect.Vect_map_add_dblink(
- self.c_mapinfo,
- link.layer,
- link.name,
- link.table_name,
- link.key,
- link.database,
- link.driver,
- )
- def remove(self, key, force=False):
- """Remove a link. If force set to true remove also the table
- :param key: the key of Link
- :type key: str
- :param force: if True remove also the table from database otherwise
- only the link between table and vector
- :type force: boole
- >>> from grass.pygrass.vector import VectorTopo
- >>> test_vect = VectorTopo(test_vector_name)
- >>> test_vect.open(mode='r')
- >>> dblinks = DBlinks(test_vect.c_mapinfo)
- >>> dblinks
- DBlinks([Link(1, table_doctest_map, sqlite)])
- >>> dblinks.remove('pg_link') # doctest: +SKIP
- >>> dblinks # need to open vector map in write mode
- DBlinks([Link(1, table_doctest_map, sqlite)])
- """
- if force:
- link = self.by_name(key)
- table = link.table()
- table.drop(force=force)
- if isinstance(key, unicode):
- key = self.from_name_to_num(key)
- libvect.Vect_map_del_dblink(self.c_mapinfo, key)
- def from_name_to_num(self, name):
- """
- Vect_get_field_number
- """
- return libvect.Vect_get_field_number(self.c_mapinfo, name)
- class Table(object):
- """
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/PERMANENT/sqlite/sqlite.db'
- >>> tab_sqlite = Table(name=test_vector_name,
- ... connection=sqlite3.connect(get_path(path)))
- >>> tab_sqlite.name
- 'table_doctest_map'
- >>> import psycopg2 # doctest: +SKIP
- >>> tab_pg = Table(test_vector_name,
- ... psycopg2.connect('host=localhost dbname=grassdb',
- ... 'pg')) # doctest: +SKIP
- >>> tab_pg.columns # doctest: +ELLIPSIS +SKIP
- Columns([('cat', 'int4'), ...])
- """
- def _get_name(self):
- """Private method to return the name of table"""
- return self._name
- def _set_name(self, new_name):
- """Private method to set the name of table
- :param new_name: the new name of table
- :type new_name: str
- """
- old_name = self._name
- cur = self.conn.cursor()
- cur.execute(sql.RENAME_TAB.format(old_name=old_name, new_name=new_name))
- self.conn.commit()
- cur.close()
- name = property(fget=_get_name, fset=_set_name, doc="Set and obtain table name")
- def __init__(self, name, connection, key="cat"):
- self._name = name
- self.conn = connection
- self.key = key
- self.columns = Columns(self.name, self.conn, self.key)
- self.filters = Filters(self.name)
- def __repr__(self):
- """
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/PERMANENT/sqlite/sqlite.db'
- >>> tab_sqlite = Table(name=test_vector_name,
- ... connection=sqlite3.connect(get_path(path)))
- >>> tab_sqlite
- Table('table_doctest_map')
- """
- return "Table(%r)" % (self.name)
- def __iter__(self):
- cur = self.execute()
- return (cur.fetchone() for _ in range(self.__len__()))
- def __len__(self):
- """Return the number of rows"""
- return self.n_rows()
- def drop(self, cursor=None, force=False):
- """Method to drop table from database
- :param cursor: the cursor to connect, if None it use the cursor
- of connection table object
- :type cursor: Cursor object
- :param force: True to remove the table, by default False to print
- advice
- :type force: bool
- """
- cur = cursor if cursor else self.conn.cursor()
- if self.exist(cursor=cur):
- used = db_table_in_vector(self.name)
- if used is not None and len(used) > 0 and not force:
- print(
- _("Deleting table <%s> which is attached" " to following map(s):")
- % self.name
- )
- for vect in used:
- warning("%s" % vect)
- print(
- _("You must use the force flag to actually" " remove it. Exiting.")
- )
- else:
- cur.execute(sql.DROP_TAB.format(tname=self.name))
- def n_rows(self):
- """Return the number of rows
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> tab_sqlite = Table(name=test_vector_name,
- ... connection=sqlite3.connect(get_path(path)))
- >>> tab_sqlite.n_rows()
- 3
- """
- cur = self.conn.cursor()
- cur.execute(sql.SELECT.format(cols="Count(*)", tname=self.name))
- number = cur.fetchone()[0]
- cur.close()
- return number
- def execute(self, sql_code=None, cursor=None, many=False, values=None):
- """Execute SQL code from a given string or build with filters and
- return a cursor object.
- :param sql_code: the SQL code to execute, if not pass it use filters
- variable
- :type sql_code: str
- :param cursor: the cursor to connect, if None it use the cursor
- of connection table object
- :type cursor: Cursor object
- :param many: True to run executemany function
- :type many: bool
- :param values: The values to substitute into sql_code string
- :type values: list of tuple
- >>> import sqlite3
- >>> path = '$GISDBASE/$LOCATION_NAME/$MAPSET/sqlite/sqlite.db'
- >>> tab_sqlite = Table(name=test_vector_name,
- ... connection=sqlite3.connect(get_path(path)))
- >>> tab_sqlite.filters.select('cat', 'name').order_by('value')
- Filters('SELECT cat, name FROM table_doctest_map ORDER BY value;')
- >>> cur = tab_sqlite.execute()
- >>> cur.fetchone() #doctest: +ELLIPSIS +NORMALIZE_WHITESPACE
- (1, 'point')
- """
- try:
- sqlc = sql_code if sql_code else self.filters.get_sql()
- cur = cursor if cursor else self.conn.cursor()
- if many and values:
- return cur.executemany(sqlc, values)
- return cur.execute(sqlc, values) if values else cur.execute(sqlc)
- except Exception as exc:
- raise ValueError(
- "The SQL statement is not correct:\n%r,\n"
- "values: %r,\n"
- "SQL error: %s" % (sqlc, values, str(exc))
- )
- def exist(self, cursor=None):
- """Return True if the table already exist in the DB, False otherwise
- :param cursor: the cursor to connect, if None it use the cursor
- of connection table object
- """
- cur = cursor if cursor else self.conn.cursor()
- return table_exist(cur, self.name)
- def insert(self, values, cursor=None, many=False):
- """Insert a new row
- :param values: a tuple of values to insert, it is possible to insert
- more rows using a list of tuple and parameter `many`
- :type values: tuple
- :param cursor: the cursor to connect, if None it use the cursor
- of connection table object
- :type cursor: Cursor object
- :param many: True to run executemany function
- :type many: bool
- """
- cur = cursor if cursor else self.conn.cursor()
- if many:
- return cur.executemany(self.columns.insert_str, values)
- return cur.execute(self.columns.insert_str, values)
- def update(self, key, values, cursor=None):
- """Update a table row
- :param key: the rowid
- :type key: int
- :param values: the values to insert without row id.
- For example if we have a table with four columns:
- cat, c0, c1, c2 the values list should
- containing only c0, c1, c2 values.
- :type values: list
- :param cursor: the cursor to connect, if None it use the cursor
- of connection table object
- :type cursor: Cursor object
- """
- cur = cursor if cursor else self.conn.cursor()
- vals = list(values) + [
- key,
- ]
- return cur.execute(self.columns.update_str, vals)
- def create(self, cols, name=None, overwrite=False, cursor=None):
- """Create a new table
- :param cols:
- :type cols:
- :param name: the name of table to create, None for the name of Table object
- :type name: str
- :param overwrite: overwrite existing table
- :type overwrite: bool
- :param cursor: the cursor to connect, if None it use the cursor
- of connection table object
- :type cursor: Cursor object
- """
- cur = cursor if cursor else self.conn.cursor()
- coldef = ",\n".join(["%s %s" % col for col in cols])
- if name:
- newname = name
- else:
- newname = self.name
- try:
- cur.execute(sql.CREATE_TAB.format(tname=newname, coldef=coldef))
- self.conn.commit()
- except OperationalError: # OperationalError
- if overwrite:
- self.drop(force=True)
- cur.execute(sql.CREATE_TAB.format(tname=newname, coldef=coldef))
- self.conn.commit()
- else:
- print("The table: %s already exist." % self.name)
- cur.close()
- self.columns.update_odict()
- if __name__ == "__main__":
- import doctest
- from grass.pygrass import utils
- utils.create_test_vector_map(test_vector_name)
- doctest.testmod()
- """Remove the generated vector map, if exist"""
- from grass.pygrass.utils import get_mapset_vector
- from grass.script.core import run_command
- mset = get_mapset_vector(test_vector_name, mapset="")
- if mset:
- run_command("g.remove", flags="f", type="vector", name=test_vector_name)
|