""" @package frame @brief Timeline Tool Classes: - frame::DataCursor - frame::TimelineFrame - frame::LookUp (C) 2012-2020 by the GRASS Development Team This program is free software under the GNU General Public License (>=v2). Read the file COPYING that comes with GRASS for details. @author Anna Kratochvilova """ import os import signal import six from math import ceil from itertools import cycle import numpy as np import wx from functools import reduce try: import matplotlib # The recommended way to use wx with mpl is with the WXAgg # backend. matplotlib.use('WXAgg') from matplotlib.figure import Figure from matplotlib.backends.backend_wxagg import \ FigureCanvasWxAgg as FigCanvas, \ NavigationToolbar2WxAgg as NavigationToolbar import matplotlib.dates as mdates except ImportError as e: raise ImportError(_('The Timeline Tool needs the "matplotlib" ' '(python-matplotlib and on some systems also python-matplotlib-wx) package(s) to be installed. {0}').format(e)) import grass.script as grass import grass.temporal as tgis from core.gcmd import GError, GException, RunCommand from gui_core import gselect from gui_core.wrap import Button, StaticText from core import globalvar ALPHA = 1 COLORS = ['b', 'g', 'r', 'c', 'm', 'y', 'k'] def check_version(*version): """Checks if given version or newer is installed""" versionInstalled = [] for i in matplotlib.__version__.split('.'): try: v = int(i) versionInstalled.append(v) except ValueError: versionInstalled.append(0) if versionInstalled < list(version): return False else: return True class TimelineFrame(wx.Frame): """The main frame of the application""" def __init__(self, parent): wx.Frame.__init__( self, parent, id=wx.ID_ANY, title=_("GRASS GIS Timeline Tool")) tgis.init(True) self.datasets = [] self.timeData = {} self._layout() self.temporalType = None self.unit = None # We create a database interface here to speedup the GUI self.dbif = tgis.SQLDatabaseInterfaceConnection() self.dbif.connect() self.Bind(wx.EVT_CLOSE, self.OnClose) def OnClose(self, event): """Close the database interface and stop the messenger and C-interface subprocesses. """ if self.dbif.connected is True: self.dbif.close() tgis.stop_subprocesses() self.Destroy() def _layout(self): """Creates the main panel with all the controls on it: * mpl canvas * mpl navigation toolbar * Control panel for interaction """ self.panel = wx.Panel(self) # Create the mpl Figure and FigCanvas objects. # 5x4 inches, 100 dots-per-inch # # color = wx.SystemSettings.GetColour(wx.SYS_COLOUR_BACKGROUND) self.fig = Figure((5.0, 4.0), facecolor=(1, 1, 1)) self.canvas = FigCanvas(self.panel, wx.ID_ANY, self.fig) # axes are initialized later self.axes2d = None self.axes3d = None # Create the navigation toolbar, tied to the canvas # self.toolbar = NavigationToolbar(self.canvas) # # Layout # self.vbox = wx.BoxSizer(wx.VERTICAL) self.vbox.Add(self.canvas, 1, wx.LEFT | wx.TOP | wx.EXPAND) self.vbox.Add(self.toolbar, 0, wx.EXPAND) self.vbox.AddSpacer(10) gridSizer = wx.GridBagSizer(hgap=5, vgap=5) self.datasetSelect = gselect.Select(parent=self.panel, id=wx.ID_ANY, size=globalvar.DIALOG_GSELECT_SIZE, type='stds', multiple=True) self.drawButton = Button(self.panel, id=wx.ID_ANY, label=_("Draw")) self.drawButton.Bind(wx.EVT_BUTTON, self.OnRedraw) self.helpButton = Button(self.panel, id=wx.ID_ANY, label=_("Help")) self.helpButton.Bind(wx.EVT_BUTTON, self.OnHelp) self.view3dCheck = wx.CheckBox( self.panel, id=wx.ID_ANY, label=_("3D plot of spatio-temporal extents")) self.view3dCheck.Bind(wx.EVT_CHECKBOX, self.OnRedraw) if not check_version(1, 0, 0): self.view3dCheck.SetLabel(_("3D plot of spatio-temporal extents " "(matplotlib >= 1.0.0)")) self.view3dCheck.Disable() gridSizer.Add(StaticText(self.panel, id=wx.ID_ANY, label=_("Select space time dataset(s):")), pos=(0, 0), flag=wx.EXPAND | wx.ALIGN_CENTER_VERTICAL) gridSizer.Add(self.datasetSelect, pos=(1, 0), flag=wx.EXPAND) gridSizer.Add(self.drawButton, pos=(1, 1), flag=wx.EXPAND) gridSizer.Add(self.helpButton, pos=(1, 2), flag=wx.EXPAND) gridSizer.Add( self.view3dCheck, pos=(2, 0), flag=wx.EXPAND | wx.ALIGN_CENTER_VERTICAL) self.vbox.Add( gridSizer, proportion=0, flag=wx.EXPAND | wx.ALL, border=10) self.panel.SetSizer(self.vbox) self.vbox.Fit(self) def _getData(self, timeseries): """Load data and read properties""" self.timeData = {} mode = None unit = None for series in timeseries: name = series[0] + '@' + series[1] etype = series[2] sp = tgis.dataset_factory(etype, name) if not sp.is_in_db(dbif=self.dbif): GError( self, message=_("Dataset <%s> not found in temporal database") % (name)) return sp.select(dbif=self.dbif) self.timeData[name] = {} self.timeData[name]['elementType'] = series[2] self.timeData[name][ 'temporalType'] = sp.get_temporal_type() # abs/rel if mode is None: mode = self.timeData[name]['temporalType'] elif self.timeData[name]['temporalType'] != mode: GError( parent=self, message=_( "Datasets have different temporal type " "(absolute x relative), which is not allowed.")) return # check topology maps = sp.get_registered_maps_as_objects(dbif=self.dbif) self.timeData[name]['validTopology'] = sp.check_temporal_topology( maps=maps, dbif=self.dbif) self.timeData[name][ 'temporalMapType'] = sp.get_map_time() # point/interval self.timeData[name]['unit'] = None # only with relative if self.timeData[name]['temporalType'] == 'relative': start, end, self.timeData[name][ 'unit'] = sp.get_relative_time() if unit is None: unit = self.timeData[name]['unit'] elif self.timeData[name]['unit'] != unit: GError( self, _("Datasets have different time unit which is not allowed.")) return self.timeData[name]['start_datetime'] = [] # self.timeData[name]['start_plot'] = [] self.timeData[name]['end_datetime'] = [] # self.timeData[name]['end_plot'] = [] self.timeData[name]['names'] = [] self.timeData[name]['north'] = [] self.timeData[name]['south'] = [] self.timeData[name]['west'] = [] self.timeData[name]['east'] = [] columns = ','.join(['name', 'start_time', 'end_time', 'north', 'south', 'west', 'east']) rows = sp.get_registered_maps(columns=columns, where=None, order='start_time', dbif=self.dbif) if not rows: GError( parent=self, message=_("Dataset <{name}> is empty").format( name=series[0] + '@' + series[1])) return for row in rows: mapName, start, end, north, south, west, east = row self.timeData[name]['start_datetime'].append(start) self.timeData[name]['end_datetime'].append(end) self.timeData[name]['names'].append(mapName) self.timeData[name]['north'].append(north) self.timeData[name]['south'].append(south) self.timeData[name]['west'].append(west) self.timeData[name]['east'].append(east) self.temporalType = mode self.unit = unit def _draw3dFigure(self): """Draws 3d view (spatio-temporal extents). Only for matplotlib versions >= 1.0.0. Earlier versions cannot draw time ticks and alpha and it has a slightly different API. """ self.axes3d.clear() self.axes3d.grid(False) # self.axes3d.grid(True) if self.temporalType == 'absolute': convert = mdates.date2num else: convert = lambda x: x colors = cycle(COLORS) plots = [] for name in self.datasets: name = name[0] + '@' + name[1] startZ = convert(self.timeData[name]['start_datetime']) mapType = self.timeData[name]['temporalMapType'] if mapType == 'interval': dZ = convert(self.timeData[name]['end_datetime']) - startZ else: dZ = [0] * len(startZ) startX = self.timeData[name]['west'] dX = self.timeData[name]['east'] - np.array(startX) startY = self.timeData[name]['south'] dY = self.timeData[name]['north'] - np.array(startY) color = next(colors) plots.append(self.axes3d.bar3d(startX, startY, startZ, dX, dY, dZ, color=color, alpha=ALPHA)) params = grass.read_command('g.proj', flags='g') params = grass.parse_key_val(params) if 'unit' in params: self.axes3d.set_xlabel(_("X [%s]") % params['unit']) self.axes3d.set_ylabel(_("Y [%s]") % params['unit']) else: self.axes3d.set_xlabel(_("X")) self.axes3d.set_ylabel(_("Y")) if self.temporalType == 'absolute': if check_version(1, 1, 0): self.axes3d.zaxis_date() self.axes3d.set_zlabel(_('Time')) self.axes3d.mouse_init() self.canvas.draw() def _draw2dFigure(self): """Draws 2D plot (temporal extents)""" self.axes2d.clear() self.axes2d.grid(True) if self.temporalType == 'absolute': convert = mdates.date2num else: convert = lambda x: x colors = cycle(COLORS) yticksNames = [] yticksPos = [] plots = [] lookUp = LookUp(self.timeData) for i, name in enumerate(self.datasets): # just name; with mapset it would be long yticksNames.append(name[0]) name = name[0] + '@' + name[1] yticksPos.append(i) barData = [] pointData = [] mapType = self.timeData[name]['temporalMapType'] start = convert(self.timeData[name]['start_datetime']) # TODO: mixed if mapType == 'interval': end = convert(self.timeData[name]['end_datetime']) lookUpData = list(zip(start, end)) duration = end - np.array(start) barData = list(zip(start, duration)) lookUp.AddDataset(type_='bar', yrange=(i - 0.1, i + 0.1), xranges=lookUpData, datasetName=name) else: # self.timeData[name]['end_plot'] = None pointData = start lookUp.AddDataset( type_='point', yrange=i, xranges=pointData, datasetName=name) color = next(colors) if mapType == 'interval': plots.append( self.axes2d.broken_barh( xranges=barData, yrange=( i - 0.1, 0.2), facecolors=color, edgecolor='black', alpha=ALPHA)) else: plots.append( self.axes2d.plot( pointData, [i] * len(pointData), marker='o', linestyle='None', color=color)[0]) if self.temporalType == 'absolute': self.axes2d.xaxis_date() self.fig.autofmt_xdate() # self.axes2d.set_xlabel(_("Time")) else: self.axes2d.set_xlabel(_("Time [%s]") % self.unit) self.axes2d.set_yticks(yticksPos) self.axes2d.set_yticklabels(yticksNames) self.axes2d.set_ylim(min(yticksPos) - 1, max(yticksPos) + 1) # adjust xlim xlim = self.axes2d.get_xlim() padding = ceil((xlim[1] - xlim[0]) / 20.) self.axes2d.set_xlim(xlim[0] - padding, xlim[1] + padding) self.axes2d.set_axisbelow(True) self.canvas.draw() DataCursor(plots, lookUp, InfoFormat) def OnRedraw(self, event): """Required redrawing.""" datasets = self.datasetSelect.GetValue().strip() if not datasets: return datasets = datasets.split(',') try: datasets = self._checkDatasets(datasets) if not datasets: return except GException as error: GError(parent=self, message=str(error), showTraceback=False) return self.datasets = datasets self._redraw() def _redraw(self): """Readraw data. Decides if to draw also 3D and adjusts layout if needed. """ self._getData(self.datasets) # axes3d are physically removed if not self.axes2d: self.axes2d = self.fig.add_subplot(1, 1, 1) self._draw2dFigure() if check_version(1, 0, 0): if self.view3dCheck.IsChecked(): self.axes2d.change_geometry(2, 1, 1) if not self.axes3d: # do not remove this import - unused but it is required for # 3D from mpl_toolkits.mplot3d import Axes3D # pylint: disable=W0611 self.axes3d = self.fig.add_subplot( 2, 1, 2, projection='3d') self.axes3d.set_visible(True) self._draw3dFigure() else: if self.axes3d: self.fig.delaxes(self.axes3d) self.axes3d = None self.axes2d.change_geometry(1, 1, 1) self.canvas.draw() def _checkDatasets(self, datasets): """Checks and validates datasets. Reports also type of dataset (e.g. 'strds'). :return: (mapName, mapset, type) """ validated = [] tDict = tgis.tlist_grouped('stds', group_type=True, dbif=self.dbif) # nested list with '(map, mapset, etype)' items allDatasets = [[[(map, mapset, etype) for map in maps] for etype, maps in six.iteritems(etypesDict)] for mapset, etypesDict in six.iteritems(tDict)] # flatten this list if allDatasets: allDatasets = reduce( lambda x, y: x + y, reduce( lambda x, y: x + y, allDatasets)) mapsets = tgis.get_tgis_c_library_interface().available_mapsets() allDatasets = [ i for i in sorted( allDatasets, key=lambda l: mapsets.index(l[1]))] for dataset in datasets: errorMsg = _("Space time dataset <%s> not found.") % dataset if dataset.find("@") >= 0: nameShort, mapset = dataset.split('@', 1) indices = [n for n, (mapName, mapsetName, etype) in enumerate( allDatasets) if nameShort == mapName and mapsetName == mapset] else: indices = [n for n, (mapName, mapset, etype) in enumerate( allDatasets) if dataset == mapName] if len(indices) == 0: raise GException(errorMsg) elif len(indices) >= 2: dlg = wx.SingleChoiceDialog( self, message=_( "Please specify the space time dataset <%s>." % dataset), caption=_("Ambiguous dataset name"), choices=[("%(map)s@%(mapset)s: %(etype)s" % {'map': allDatasets[i][0], 'mapset': allDatasets[i][1], 'etype': allDatasets[i][2]}) for i in indices], style=wx.CHOICEDLG_STYLE | wx.OK) if dlg.ShowModal() == wx.ID_OK: index = dlg.GetSelection() validated.append(allDatasets[indices[index]]) else: continue else: validated.append(allDatasets[indices[0]]) return validated def OnHelp(self, event): RunCommand('g.manual', quiet=True, entry='g.gui.timeline') # interface def SetDatasets(self, datasets): """Set data""" if not datasets: return try: datasets = self._checkDatasets(datasets) if not datasets: return except GException as error: GError(parent=self, message=str(error), showTraceback=False) return self.datasets = datasets self.datasetSelect.SetValue( ','.join(map(lambda x: x[0] + '@' + x[1], datasets))) self._redraw() def Show3D(self, show): """Show also 3D if possible""" if check_version(1, 0, 0): self.view3dCheck.SetValue(show) class LookUp: """Helper class for searching info by coordinates""" def __init__(self, timeData): self.data = {} self.timeData = timeData def AddDataset(self, type_, yrange, xranges, datasetName): if type_ == 'bar': self.data[yrange] = {'name': datasetName} for i, (start, end) in enumerate(xranges): self.data[yrange][(start, end)] = i elif type_ == 'point': self.data[(yrange, yrange)] = {'name': datasetName} for i, start in enumerate(xranges): self.data[(yrange, yrange)][(start, start)] = i def GetInformation(self, x, y): keys = None for keyY in self.data.keys(): if keyY[0] <= y <= keyY[1]: for keyX in self.data[keyY].keys(): if keyX != 'name' and keyX[0] <= x <= keyX[1]: keys = keyY, keyX break if keys: break if not keys: return None datasetName = self.data[keys[0]]['name'] mapIndex = self.data[keys[0]][keys[1]] return self.timeData, datasetName, mapIndex def InfoFormat(timeData, datasetName, mapIndex): """Formats information about dataset""" text = [] etype = timeData[datasetName]['elementType'] name, mapset = datasetName.split('@') if etype == 'strds': text.append(_("Space time raster dataset: %s") % name) elif etype == 'stvds': text.append(_("Space time vector dataset: %s") % name) elif etype == 'str3ds': text.append(_("Space time 3D raster dataset: %s") % name) text.append(_("Mapset: %s") % mapset) text.append(_("Map name: %s") % timeData[datasetName]['names'][mapIndex]) text.append( _("Start time: %s") % timeData[datasetName]['start_datetime'][mapIndex]) text.append( _("End time: %s") % timeData[datasetName]['end_datetime'][mapIndex]) if not timeData[datasetName]['validTopology']: text.append(_("WARNING: invalid topology")) text.append(_("\nPress Del to dismiss.")) return '\n'.join(text) class DataCursor(object): """A simple data cursor widget that displays the x,y location of a matplotlib artist when it is selected. Source: http://stackoverflow.com/questions/4652439/ is-there-a-matplotlib-equivalent-of-matlabs-datacursormode/4674445 """ def __init__( self, artists, lookUp, formatFunction, tolerance=5, offsets=(-30, 30), display_all=False): """Create the data cursor and connect it to the relevant figure. "artists" is the matplotlib artist or sequence of artists that will be selected. "tolerance" is the radius (in points) that the mouse click must be within to select the artist. "offsets" is a tuple of (x,y) offsets in points from the selected point to the displayed annotation box "display_all" controls whether more than one annotation box will be shown if there are multiple axes. Only one will be shown per-axis, regardless. """ self.lookUp = lookUp self.formatFunction = formatFunction self.offsets = offsets self.display_all = display_all if not np.iterable(artists): artists = [artists] self.artists = artists self.axes = tuple(set(art.axes for art in self.artists)) self.figures = tuple(set(ax.figure for ax in self.axes)) self.annotations = {} for ax in self.axes: self.annotations[ax] = self.annotate(ax) for artist in self.artists: artist.set_pickradius(tolerance) for fig in self.figures: fig.canvas.mpl_connect('pick_event', self) fig.canvas.mpl_connect('key_press_event', self.keyPressed) def keyPressed(self, event): """Key pressed - hide annotation if Delete was pressed""" if event.key != 'delete': return for ax in self.axes: self.annotations[ax].set_visible(False) event.canvas.draw() def annotate(self, ax): """Draws and hides the annotation box for the given axis "ax".""" annotation = ax.annotate( self.formatFunction, xy=(0, 0), ha='center', xytext=self.offsets, textcoords='offset points', va='bottom', bbox=dict( boxstyle='round,pad=0.5', fc='yellow', alpha=0.7), arrowprops=dict( arrowstyle='->', connectionstyle='arc3,rad=0'), annotation_clip=False, multialignment='left') annotation.set_visible(False) return annotation def __call__(self, event): """Intended to be called through "mpl_connect".""" # Rather than trying to interpolate, just display the clicked coords # This will only be called if it's within "tolerance", anyway. x, y = event.mouseevent.xdata, event.mouseevent.ydata annotation = self.annotations[event.artist.axes] if x is not None: if not self.display_all: # Hide any other annotation boxes... for ann in self.annotations.values(): ann.set_visible(False) # Update the annotation in the current axis.. annotation.xy = x, y if 'Line2D' in str(type(event.artist)): y = event.artist.get_ydata()[0] xData = event.artist.get_xdata() x = xData[np.argmin(abs(xData - x))] info = self.lookUp.GetInformation(x, y) if not info: return text = self.formatFunction(*info) annotation.set_text(text) annotation.set_visible(True) event.canvas.draw() def run(parent=None, datasets=None): frame = TimelineFrame(parent) if datasets: frame.SetDatasets(datasets) frame.Show() if __name__ == '__main__': run()