123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730 |
- """
- @package frame
- @brief Timeline Tool
- Classes:
- - frame::DataCursor
- - frame::TimelineFrame
- - frame::LookUp
- (C) 2012-2020 by the GRASS Development Team
- This program is free software under the GNU General Public License
- (>=v2). Read the file COPYING that comes with GRASS for details.
- @author Anna Kratochvilova <kratochanna gmail.com>
- """
- import six
- from math import ceil
- from itertools import cycle
- import numpy as np
- import wx
- from functools import reduce
- try:
- import matplotlib
- # The recommended way to use wx with mpl is with the WXAgg
- # backend.
- matplotlib.use("WXAgg")
- from matplotlib.figure import Figure
- from matplotlib.backends.backend_wxagg import (
- FigureCanvasWxAgg as FigCanvas,
- NavigationToolbar2WxAgg as NavigationToolbar,
- )
- import matplotlib.dates as mdates
- except ImportError as e:
- raise ImportError(
- _(
- 'The Timeline Tool needs the "matplotlib" '
- "(python-matplotlib and on some systems also python-matplotlib-wx) "
- "package(s) to be installed. {}"
- ).format(e)
- )
- import grass.script as grass
- import grass.temporal as tgis
- from core.gcmd import GError, GException, RunCommand
- from gui_core import gselect
- from gui_core.wrap import Button, StaticText
- from core import globalvar
- ALPHA = 1
- COLORS = ["b", "g", "r", "c", "m", "y", "k"]
- def check_version(*version):
- """Checks if given version or newer is installed"""
- versionInstalled = []
- for i in matplotlib.__version__.split("."):
- try:
- v = int(i)
- versionInstalled.append(v)
- except ValueError:
- versionInstalled.append(0)
- if versionInstalled < list(version):
- return False
- else:
- return True
- class TimelineFrame(wx.Frame):
- """The main frame of the application"""
- def __init__(self, parent, title=_("Timeline Tool")):
- wx.Frame.__init__(self, parent, id=wx.ID_ANY, title=title)
- tgis.init(True)
- self.datasets = []
- self.timeData = {}
- self._layout()
- self.temporalType = None
- self.unit = None
- # We create a database interface here to speedup the GUI
- self.dbif = tgis.SQLDatabaseInterfaceConnection()
- self.dbif.connect()
- self.Bind(wx.EVT_CLOSE, self.OnClose)
- def OnClose(self, event):
- """Close the database interface and stop the messenger and C-interface
- subprocesses.
- """
- if self.dbif.connected is True:
- self.dbif.close()
- tgis.stop_subprocesses()
- self.Destroy()
- def _layout(self):
- """Creates the main panel with all the controls on it:
- * mpl canvas
- * mpl navigation toolbar
- * Control panel for interaction
- """
- self.panel = wx.Panel(self)
- # Create the mpl Figure and FigCanvas objects.
- # 5x4 inches, 100 dots-per-inch
- #
- # color = wx.SystemSettings.GetColour(wx.SYS_COLOUR_BACKGROUND)
- self.fig = Figure((5.0, 4.0), facecolor=(1, 1, 1))
- self.canvas = FigCanvas(self.panel, wx.ID_ANY, self.fig)
- # axes are initialized later
- self.axes2d = None
- self.axes3d = None
- # Create the navigation toolbar, tied to the canvas
- #
- self.toolbar = NavigationToolbar(self.canvas)
- #
- # Layout
- #
- self.vbox = wx.BoxSizer(wx.VERTICAL)
- self.vbox.Add(self.canvas, 1, wx.LEFT | wx.TOP | wx.EXPAND)
- self.vbox.Add(self.toolbar, 0, wx.EXPAND)
- self.vbox.AddSpacer(10)
- gridSizer = wx.GridBagSizer(hgap=5, vgap=5)
- self.datasetSelect = gselect.Select(
- parent=self.panel,
- id=wx.ID_ANY,
- size=globalvar.DIALOG_GSELECT_SIZE,
- type="stds",
- multiple=True,
- )
- self.drawButton = Button(self.panel, id=wx.ID_ANY, label=_("Draw"))
- self.drawButton.Bind(wx.EVT_BUTTON, self.OnRedraw)
- self.helpButton = Button(self.panel, id=wx.ID_ANY, label=_("Help"))
- self.helpButton.Bind(wx.EVT_BUTTON, self.OnHelp)
- self.view3dCheck = wx.CheckBox(
- self.panel, id=wx.ID_ANY, label=_("3D plot of spatio-temporal extents")
- )
- self.view3dCheck.Bind(wx.EVT_CHECKBOX, self.OnRedraw)
- if not check_version(1, 0, 0):
- self.view3dCheck.SetLabel(
- _("3D plot of spatio-temporal extents " "(matplotlib >= 1.0.0)")
- )
- self.view3dCheck.Disable()
- gridSizer.Add(
- StaticText(
- self.panel, id=wx.ID_ANY, label=_("Select space time dataset(s):")
- ),
- pos=(0, 0),
- flag=wx.EXPAND | wx.ALIGN_CENTER_VERTICAL,
- )
- gridSizer.Add(self.datasetSelect, pos=(1, 0), flag=wx.EXPAND)
- gridSizer.Add(self.drawButton, pos=(1, 1), flag=wx.EXPAND)
- gridSizer.Add(self.helpButton, pos=(1, 2), flag=wx.EXPAND)
- gridSizer.Add(
- self.view3dCheck, pos=(2, 0), flag=wx.EXPAND | wx.ALIGN_CENTER_VERTICAL
- )
- self.vbox.Add(gridSizer, proportion=0, flag=wx.EXPAND | wx.ALL, border=10)
- self.panel.SetSizer(self.vbox)
- self.vbox.Fit(self)
- def _getData(self, timeseries):
- """Load data and read properties"""
- self.timeData = {}
- mode = None
- unit = None
- for series in timeseries:
- name = series[0] + "@" + series[1]
- etype = series[2]
- sp = tgis.dataset_factory(etype, name)
- if not sp.is_in_db(dbif=self.dbif):
- GError(
- self,
- message=_("Dataset <%s> not found in temporal database") % (name),
- )
- return
- sp.select(dbif=self.dbif)
- self.timeData[name] = {}
- self.timeData[name]["elementType"] = series[2]
- self.timeData[name]["temporalType"] = sp.get_temporal_type() # abs/rel
- if mode is None:
- mode = self.timeData[name]["temporalType"]
- elif self.timeData[name]["temporalType"] != mode:
- GError(
- parent=self,
- message=_(
- "Datasets have different temporal type "
- "(absolute x relative), which is not allowed."
- ),
- )
- return
- # check topology
- maps = sp.get_registered_maps_as_objects(dbif=self.dbif)
- self.timeData[name]["validTopology"] = sp.check_temporal_topology(
- maps=maps, dbif=self.dbif
- )
- self.timeData[name]["temporalMapType"] = sp.get_map_time() # point/interval
- self.timeData[name]["unit"] = None # only with relative
- if self.timeData[name]["temporalType"] == "relative":
- start, end, self.timeData[name]["unit"] = sp.get_relative_time()
- if unit is None:
- unit = self.timeData[name]["unit"]
- elif self.timeData[name]["unit"] != unit:
- GError(
- self,
- _("Datasets have different time unit which is not allowed."),
- )
- return
- self.timeData[name]["start_datetime"] = []
- # self.timeData[name]['start_plot'] = []
- self.timeData[name]["end_datetime"] = []
- # self.timeData[name]['end_plot'] = []
- self.timeData[name]["names"] = []
- self.timeData[name]["north"] = []
- self.timeData[name]["south"] = []
- self.timeData[name]["west"] = []
- self.timeData[name]["east"] = []
- columns = ",".join(
- ["name", "start_time", "end_time", "north", "south", "west", "east"]
- )
- rows = sp.get_registered_maps(
- columns=columns, where=None, order="start_time", dbif=self.dbif
- )
- if not rows:
- GError(
- parent=self,
- message=_("Dataset <{name}> is empty").format(
- name=series[0] + "@" + series[1]
- ),
- )
- return
- for row in rows:
- mapName, start, end, north, south, west, east = row
- self.timeData[name]["start_datetime"].append(start)
- self.timeData[name]["end_datetime"].append(end)
- self.timeData[name]["names"].append(mapName)
- self.timeData[name]["north"].append(north)
- self.timeData[name]["south"].append(south)
- self.timeData[name]["west"].append(west)
- self.timeData[name]["east"].append(east)
- self.temporalType = mode
- self.unit = unit
- def _draw3dFigure(self):
- """Draws 3d view (spatio-temporal extents).
- Only for matplotlib versions >= 1.0.0.
- Earlier versions cannot draw time ticks and alpha
- and it has a slightly different API.
- """
- self.axes3d.clear()
- self.axes3d.grid(False)
- # self.axes3d.grid(True)
- if self.temporalType == "absolute":
- convert = mdates.date2num
- else:
- convert = lambda x: x # noqa: E731
- colors = cycle(COLORS)
- plots = []
- for name in self.datasets:
- name = name[0] + "@" + name[1]
- startZ = convert(self.timeData[name]["start_datetime"])
- mapType = self.timeData[name]["temporalMapType"]
- if mapType == "interval":
- dZ = convert(self.timeData[name]["end_datetime"]) - startZ
- else:
- dZ = [0] * len(startZ)
- startX = self.timeData[name]["west"]
- dX = self.timeData[name]["east"] - np.array(startX)
- startY = self.timeData[name]["south"]
- dY = self.timeData[name]["north"] - np.array(startY)
- color = next(colors)
- plots.append(
- self.axes3d.bar3d(
- startX, startY, startZ, dX, dY, dZ, color=color, alpha=ALPHA
- )
- )
- params = grass.read_command("g.proj", flags="g")
- params = grass.parse_key_val(params)
- if "unit" in params:
- self.axes3d.set_xlabel(_("X [%s]") % params["unit"])
- self.axes3d.set_ylabel(_("Y [%s]") % params["unit"])
- else:
- self.axes3d.set_xlabel(_("X"))
- self.axes3d.set_ylabel(_("Y"))
- if self.temporalType == "absolute":
- if check_version(1, 1, 0):
- self.axes3d.zaxis_date()
- self.axes3d.set_zlabel(_("Time"))
- self.axes3d.mouse_init()
- self.canvas.draw()
- def _draw2dFigure(self):
- """Draws 2D plot (temporal extents)"""
- self.axes2d.clear()
- self.axes2d.grid(True)
- if self.temporalType == "absolute":
- convert = mdates.date2num
- else:
- convert = lambda x: x # noqa: E731
- colors = cycle(COLORS)
- yticksNames = []
- yticksPos = []
- plots = []
- lookUp = LookUp(self.timeData)
- for i, name in enumerate(self.datasets):
- # just name; with mapset it would be long
- yticksNames.append(name[0])
- name = name[0] + "@" + name[1]
- yticksPos.append(i)
- barData = []
- pointData = []
- mapType = self.timeData[name]["temporalMapType"]
- start = convert(self.timeData[name]["start_datetime"])
- # TODO: mixed
- if mapType == "interval":
- end = convert(self.timeData[name]["end_datetime"])
- lookUpData = list(zip(start, end))
- duration = end - np.array(start)
- barData = list(zip(start, duration))
- lookUp.AddDataset(
- type_="bar",
- yrange=(i - 0.1, i + 0.1),
- xranges=lookUpData,
- datasetName=name,
- )
- else:
- # self.timeData[name]['end_plot'] = None
- pointData = start
- lookUp.AddDataset(
- type_="point", yrange=i, xranges=pointData, datasetName=name
- )
- color = next(colors)
- if mapType == "interval":
- plots.append(
- self.axes2d.broken_barh(
- xranges=barData,
- yrange=(i - 0.1, 0.2),
- facecolors=color,
- edgecolor="black",
- alpha=ALPHA,
- picker=True,
- )
- )
- else:
- plots.append(
- self.axes2d.plot(
- pointData,
- [i] * len(pointData),
- marker="o",
- linestyle="None",
- color=color,
- picker=True,
- )[0]
- )
- if self.temporalType == "absolute":
- self.axes2d.xaxis_date()
- self.fig.autofmt_xdate()
- # self.axes2d.set_xlabel(_("Time"))
- else:
- self.axes2d.set_xlabel(_("Time [%s]") % self.unit)
- self.axes2d.set_yticks(yticksPos)
- self.axes2d.set_yticklabels(yticksNames)
- self.axes2d.set_ylim(min(yticksPos) - 1, max(yticksPos) + 1)
- # adjust xlim
- xlim = self.axes2d.get_xlim()
- padding = ceil((xlim[1] - xlim[0]) / 20.0)
- self.axes2d.set_xlim(xlim[0] - padding, xlim[1] + padding)
- self.axes2d.set_axisbelow(True)
- self.canvas.draw()
- DataCursor(plots, lookUp, InfoFormat)
- def OnRedraw(self, event):
- """Required redrawing."""
- datasets = self.datasetSelect.GetValue().strip()
- if not datasets:
- return
- datasets = datasets.split(",")
- try:
- datasets = self._checkDatasets(datasets)
- if not datasets:
- return
- except GException as error:
- GError(parent=self, message=str(error), showTraceback=False)
- return
- self.datasets = datasets
- self._redraw()
- def _redraw(self):
- """Readraw data.
- Decides if to draw also 3D and adjusts layout if needed.
- """
- self._getData(self.datasets)
- # axes3d are physically removed
- if not self.axes2d:
- self.axes2d = self.fig.add_subplot(1, 1, 1)
- self._draw2dFigure()
- if check_version(1, 0, 0):
- if self.view3dCheck.IsChecked():
- self.axes2d.change_geometry(2, 1, 1)
- if not self.axes3d:
- # do not remove this import - unused but it is required for
- # 3D
- from mpl_toolkits.mplot3d import Axes3D # noqa: F401
- self.axes3d = self.fig.add_subplot(2, 1, 2, projection="3d")
- self.axes3d.set_visible(True)
- self._draw3dFigure()
- else:
- if self.axes3d:
- self.fig.delaxes(self.axes3d)
- self.axes3d = None
- self.axes2d.change_geometry(1, 1, 1)
- self.canvas.draw()
- def _checkDatasets(self, datasets):
- """Checks and validates datasets.
- Reports also type of dataset (e.g. 'strds').
- :return: (mapName, mapset, type)
- """
- validated = []
- tDict = tgis.tlist_grouped("stds", group_type=True, dbif=self.dbif)
- # nested list with '(map, mapset, etype)' items
- allDatasets = [
- [
- [(map, mapset, etype) for map in maps]
- for etype, maps in six.iteritems(etypesDict)
- ]
- for mapset, etypesDict in six.iteritems(tDict)
- ]
- # flatten this list
- if allDatasets:
- allDatasets = reduce(
- lambda x, y: x + y, reduce(lambda x, y: x + y, allDatasets)
- )
- mapsets = tgis.get_tgis_c_library_interface().available_mapsets()
- allDatasets = [
- i for i in sorted(allDatasets, key=lambda l: mapsets.index(l[1]))
- ]
- for dataset in datasets:
- errorMsg = _("Space time dataset <%s> not found.") % dataset
- if dataset.find("@") >= 0:
- nameShort, mapset = dataset.split("@", 1)
- indices = [
- n
- for n, (mapName, mapsetName, etype) in enumerate(allDatasets)
- if nameShort == mapName and mapsetName == mapset
- ]
- else:
- indices = [
- n
- for n, (mapName, mapset, etype) in enumerate(allDatasets)
- if dataset == mapName
- ]
- if len(indices) == 0:
- raise GException(errorMsg)
- elif len(indices) >= 2:
- dlg = wx.SingleChoiceDialog(
- self,
- message=_("Please specify the space time dataset <%s>." % dataset),
- caption=_("Ambiguous dataset name"),
- choices=[
- (
- "%(map)s@%(mapset)s: %(etype)s"
- % {
- "map": allDatasets[i][0],
- "mapset": allDatasets[i][1],
- "etype": allDatasets[i][2],
- }
- )
- for i in indices
- ],
- style=wx.CHOICEDLG_STYLE | wx.OK,
- )
- if dlg.ShowModal() == wx.ID_OK:
- index = dlg.GetSelection()
- validated.append(allDatasets[indices[index]])
- else:
- continue
- else:
- validated.append(allDatasets[indices[0]])
- return validated
- def OnHelp(self, event):
- RunCommand("g.manual", quiet=True, entry="g.gui.timeline")
- # interface
- def SetDatasets(self, datasets):
- """Set data"""
- if not datasets:
- return
- try:
- datasets = self._checkDatasets(datasets)
- if not datasets:
- return
- except GException as error:
- GError(parent=self, message=str(error), showTraceback=False)
- return
- self.datasets = datasets
- self.datasetSelect.SetValue(
- ",".join(map(lambda x: x[0] + "@" + x[1], datasets))
- )
- self._redraw()
- def Show3D(self, show):
- """Show also 3D if possible"""
- if check_version(1, 0, 0):
- self.view3dCheck.SetValue(show)
- class LookUp:
- """Helper class for searching info by coordinates"""
- def __init__(self, timeData):
- self.data = {}
- self.timeData = timeData
- def AddDataset(self, type_, yrange, xranges, datasetName):
- if type_ == "bar":
- self.data[yrange] = {"name": datasetName}
- for i, (start, end) in enumerate(xranges):
- self.data[yrange][(start, end)] = i
- elif type_ == "point":
- self.data[(yrange, yrange)] = {"name": datasetName}
- for i, start in enumerate(xranges):
- self.data[(yrange, yrange)][(start, start)] = i
- def GetInformation(self, x, y):
- keys = None
- for keyY in self.data.keys():
- if keyY[0] <= y <= keyY[1]:
- for keyX in self.data[keyY].keys():
- if keyX != "name" and keyX[0] <= x <= keyX[1]:
- keys = keyY, keyX
- break
- if keys:
- break
- if not keys:
- return None
- datasetName = self.data[keys[0]]["name"]
- mapIndex = self.data[keys[0]][keys[1]]
- return self.timeData, datasetName, mapIndex
- def InfoFormat(timeData, datasetName, mapIndex):
- """Formats information about dataset"""
- text = []
- etype = timeData[datasetName]["elementType"]
- name, mapset = datasetName.split("@")
- if etype == "strds":
- text.append(_("Space time raster dataset: %s") % name)
- elif etype == "stvds":
- text.append(_("Space time vector dataset: %s") % name)
- elif etype == "str3ds":
- text.append(_("Space time 3D raster dataset: %s") % name)
- text.append(_("Mapset: %s") % mapset)
- text.append(_("Map name: %s") % timeData[datasetName]["names"][mapIndex])
- text.append(_("Start time: %s") % timeData[datasetName]["start_datetime"][mapIndex])
- text.append(_("End time: %s") % timeData[datasetName]["end_datetime"][mapIndex])
- if not timeData[datasetName]["validTopology"]:
- text.append(_("WARNING: invalid topology"))
- text.append(_("\nPress Del to dismiss."))
- return "\n".join(text)
- class DataCursor(object):
- """A simple data cursor widget that displays the x,y location of a
- matplotlib artist when it is selected.
- Source: http://stackoverflow.com/questions/4652439/
- is-there-a-matplotlib-equivalent-of-matlabs-datacursormode/4674445
- """
- def __init__(
- self,
- artists,
- lookUp,
- formatFunction,
- tolerance=5,
- offsets=(-30, 30),
- display_all=False,
- ):
- """Create the data cursor and connect it to the relevant figure.
- "artists" is the matplotlib artist or sequence of artists that will be
- selected.
- "tolerance" is the radius (in points) that the mouse click must be
- within to select the artist.
- "offsets" is a tuple of (x,y) offsets in points from the selected
- point to the displayed annotation box
- "display_all" controls whether more than one annotation box will
- be shown if there are multiple axes. Only one will be shown
- per-axis, regardless.
- """
- self.lookUp = lookUp
- self.formatFunction = formatFunction
- self.offsets = offsets
- self.display_all = display_all
- if not np.iterable(artists):
- artists = [artists]
- self.artists = artists
- self.axes = tuple(set(art.axes for art in self.artists))
- self.figures = tuple(set(ax.figure for ax in self.axes))
- self.annotations = {}
- for ax in self.axes:
- self.annotations[ax] = self.annotate(ax)
- for artist in self.artists:
- artist.set_pickradius(tolerance)
- for fig in self.figures:
- fig.canvas.mpl_connect("pick_event", self)
- fig.canvas.mpl_connect("key_press_event", self.keyPressed)
- def keyPressed(self, event):
- """Key pressed - hide annotation if Delete was pressed"""
- if event.key != "delete":
- return
- for ax in self.axes:
- self.annotations[ax].set_visible(False)
- event.canvas.draw()
- def annotate(self, ax):
- """Draws and hides the annotation box for the given axis "ax"."""
- annotation = ax.annotate(
- self.formatFunction,
- xy=(0, 0),
- ha="center",
- xytext=self.offsets,
- textcoords="offset points",
- va="bottom",
- bbox=dict(boxstyle="round,pad=0.5", fc="yellow", alpha=0.7),
- arrowprops=dict(arrowstyle="->", connectionstyle="arc3,rad=0"),
- annotation_clip=False,
- multialignment="left",
- )
- annotation.set_visible(False)
- return annotation
- def __call__(self, event):
- """Intended to be called through "mpl_connect"."""
- # Rather than trying to interpolate, just display the clicked coords
- # This will only be called if it's within "tolerance", anyway.
- x, y = event.mouseevent.xdata, event.mouseevent.ydata
- annotation = self.annotations[event.artist.axes]
- if x is not None:
- if not self.display_all:
- # Hide any other annotation boxes...
- for ann in self.annotations.values():
- ann.set_visible(False)
- # Update the annotation in the current axis..
- annotation.xy = x, y
- if "Line2D" in str(type(event.artist)):
- y = event.artist.get_ydata()[0]
- xData = event.artist.get_xdata()
- x = xData[np.argmin(abs(xData - x))]
- info = self.lookUp.GetInformation(x, y)
- if not info:
- return
- text = self.formatFunction(*info)
- annotation.set_text(text)
- annotation.set_visible(True)
- event.canvas.draw()
- def run(parent=None, datasets=None):
- frame = TimelineFrame(parent)
- if datasets:
- frame.SetDatasets(datasets)
- frame.Show()
- if __name__ == "__main__":
- run()
|