1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498 |
- """
- @package vnet.vnet_data
- @brief Vector network analysis classes for data management.
- Classes:
- - vnet_data::VNETData
- - vnet_data::VNETPointsData
- - vnet_data::VNETAnalysisParameters
- - vnet_data::VNETAnalysesProperties
- - vnet_data::VNETTmpVectMaps
- - vnet_data::VectMap
- - vnet_data::History
- - vnet_data::VNETGlobalTurnsData
- (C) 2013-2014 by the GRASS Development Team
- This program is free software under the GNU General Public License
- (>=v2). Read the file COPYING that comes with GRASS for details.
- @author Stepan Turek <stepan.turek seznam.cz> (GSoC 2012, mentor: Martin Landa)
- @author Lukas Bocan <silent_bob centrum.cz> (turn costs support)
- @author Eliska Kyzlikova <eliska.kyzlikova gmail.com> (turn costs support)
- """
- import os
- import math
- import six
- from copy import deepcopy
- from grass.script.utils import try_remove
- from grass.script import core as grass
- from grass.script.task import cmdlist_to_tuple
- import wx
- from core import utils
- from core.gcmd import RunCommand, GMessage
- from core.settings import UserSettings
- from vnet.vnet_utils import ParseMapStr, SnapToNode
- from gui_core.gselect import VectorDBInfo
- from grass.pydispatch.signal import Signal
- from vnet.vnet_utils import DegreesToRadians
- class VNETData:
- def __init__(self, guiparent, mapWin):
- # setting initialization
- self._initSettings()
- self.guiparent = guiparent
- self.an_props = VNETAnalysesProperties()
- self.an_params = VNETAnalysisParameters(self.an_props)
- self.an_points = VNETPointsData(mapWin, self.an_props, self.an_params)
- self.global_turns = VNETGlobalTurnsData()
- self.pointsChanged = self.an_points.pointsChanged
- self.parametersChanged = self.an_params.parametersChanged
- def CleanUp(self):
- self.an_points.CleanUp()
- def GetAnalyses(self):
- return self.an_props.used_an
- def GetPointsData(self):
- return self.an_points
- def GetGlobalTurnsData(self):
- return self.global_turns
- def GetRelevantParams(self, analysis=None):
- if analysis:
- return self.an_props.GetRelevantParams(analysis)
- else:
- analysis, valid = self.an_params.GetParam("analysis")
- return self.an_props.GetRelevantParams(analysis)
- def GetAnalysisProperties(self, analysis=None):
- if analysis:
- return self.an_props[analysis]
- else:
- analysis, valid = self.an_params.GetParam("analysis")
- return self.an_props[analysis]
- def GetParam(self, param):
- return self.an_params.GetParam(param)
- def GetParams(self):
- return self.an_params.GetParams()
- def SetParams(self, params, flags):
- return self.an_params.SetParams(params, flags)
- def SetSnapping(self, activate):
- self.an_points.SetSnapping(activate)
- def GetSnapping(self):
- return self.an_points.GetSnapping()
- def GetLayerStyle(self):
- """Returns cmd for d.vect, with set style for analysis result"""
- analysis, valid = self.an_params.GetParam("analysis")
- resProps = self.an_props[analysis]["resultProps"]
- width = UserSettings.Get(group="vnet", key="res_style", subkey="line_width")
- layerStyleCmd = ["layer=1", "width=" + str(width)]
- if "catColor" in resProps:
- layerStyleCmd.append("flags=c")
- elif "singleColor" in resProps:
- col = UserSettings.Get(group="vnet", key="res_style", subkey="line_color")
- layerStyleCmd.append(
- "color=" + str(col[0]) + ":" + str(col[1]) + ":" + str(col[2])
- )
- layerStyleVnetColors = []
- if "attrColColor" in resProps:
- colorStyle = UserSettings.Get(
- group="vnet", key="res_style", subkey="color_table"
- )
- invert = UserSettings.Get(
- group="vnet", key="res_style", subkey="invert_colors"
- )
- layerStyleVnetColors = [
- "v.colors",
- "color=" + colorStyle,
- "column=" + resProps["attrColColor"],
- ]
- if invert:
- layerStyleVnetColors.append("-n")
- return layerStyleCmd, layerStyleVnetColors
- def InputsErrorMsgs(
- self, msg, analysis, params, flags, inv_params, relevant_params
- ):
- """Checks input data in Parameters tab and shows messages if some value is not valid
- :param str msg: message added to start of message string
- :return: True if checked inputs are OK
- :return: False if some of checked inputs is not ok
- """
- if flags["t"] and "turn_layer" not in relevant_params:
- GMessage(
- parent=self.guiparent,
- message=_("Module <%s> does not support turns costs." % analysis),
- )
- return False
- errMapStr = ""
- if "input" in inv_params:
- if params["input"]:
- errMapStr = _("Vector map '%s' does not exist.") % (params["input"])
- else:
- errMapStr = _("Vector map was not chosen.")
- if errMapStr:
- GMessage(parent=self.guiparent, message=msg + "\n" + errMapStr)
- return False
- errLayerStr = ""
- vals = {
- "arc_layer": _("arc layer"),
- "node_layer": _("node layer"),
- "turn_layer": _("turntable layer"),
- "turn_cat_layer": _("unique categories layer"),
- }
- for layer, layerLabel in six.iteritems(vals):
- if layer in ["turn_layer", "turn_cat_layer"] and not flags["t"]:
- continue
- if layer in inv_params:
- if params[layer]:
- errLayerStr += _(
- "Chosen %s '%s' does not exist in vector map '%s'.\n"
- ) % (layerLabel, params[layer], params["input"])
- else:
- errLayerStr += _("Choose existing %s.\n") % (layerLabel)
- if errLayerStr:
- GMessage(parent=self.guiparent, message=msg + "\n" + errLayerStr)
- return False
- errColStr = ""
- for col in ["arc_column", "arc_backward_column", "node_column"]:
- if params[col] and col in inv_params and col in relevant_params:
- errColStr += _(
- "Chosen column '%s' does not exist in attribute table of layer '%s' of vector map '%s'.\n"
- ) % (params[col], params[layer], params["input"])
- if errColStr:
- GMessage(parent=self.guiparent, message=msg + "\n" + errColStr)
- return False
- return True
- def _initSettings(self):
- """Initialization of settings (if not already defined)"""
- # initializes default settings
- initSettings = [
- ["res_style", "line_width", 5],
- ["res_style", "line_color", (192, 0, 0)],
- ["res_style", "color_table", "byr"],
- ["res_style", "invert_colors", False],
- ["point_symbol", "point_size", 10],
- ["point_symbol", "point_width", 2],
- ["point_colors", "unused", (131, 139, 139)],
- ["point_colors", "used1cat", (192, 0, 0)],
- ["point_colors", "used2cat", (0, 0, 255)],
- ["point_colors", "selected", (9, 249, 17)],
- ["other", "snap_tresh", 10],
- ["other", "max_hist_steps", 5],
- ]
- for init in initSettings:
- UserSettings.ReadSettingsFile()
- UserSettings.Append(
- dict=UserSettings.userSettings,
- group="vnet",
- key=init[0],
- subkey=init[1],
- value=init[2],
- overwrite=False,
- )
- class VNETPointsData:
- def __init__(self, mapWin, an_data, an_params):
- self.mapWin = mapWin
- self.an_data = an_data
- self.an_params = an_params
- # information, whether mouse event handler is registered in map window
- self.handlerRegistered = False
- self.pointsChanged = Signal("VNETPointsData.pointsChanged")
- self.an_params.parametersChanged.connect(self.ParametersChanged)
- self.snapping = False
- self.data = []
- self.cols = {
- "name": ["use", "type", "topology", "e", "n"],
- "label": [_("use"), _("type"), _("topology"), "e", "n"],
- # TDO
- "type": [None, ["", _("Start point"), _("End Point")], None, float, float],
- "def_vals": [False, 0, "new point", 0, 0],
- }
- # registration graphics for drawing
- self.pointsToDraw = self.mapWin.RegisterGraphicsToDraw(
- graphicsType="point", setStatusFunc=self.SetPointStatus
- )
- self.SetPointDrawSettings()
- self.AddPoint()
- self.AddPoint()
- self.SetPointData(0, {"use": True, "type": 1})
- self.SetPointData(1, {"use": True, "type": 2})
- self.selected = 0
- def __del__(self):
- self.CleanUp()
- def CleanUp(self):
- self.mapWin.UnregisterGraphicsToDraw(self.pointsToDraw)
- if self.handlerRegistered:
- self.mapWin.UnregisterMouseEventHandler(
- wx.EVT_LEFT_DOWN, self.OnMapClickHandler
- )
- def SetSnapping(self, activate):
- self.snapping = activate
- def GetSnapping(self):
- return self.snapping
- def AddPoint(self):
- self.pointsToDraw.AddItem(
- coords=(self.cols["def_vals"][3], self.cols["def_vals"][4])
- )
- self.data.append(self.cols["def_vals"][:])
- self.pointsChanged.emit(method="AddPoint", kwargs={})
- def DeletePoint(self, pt_id):
- item = self.pointsToDraw.GetItem(pt_id)
- if item:
- self.pointsToDraw.DeleteItem(item)
- self.data.pop(pt_id)
- self.pointsChanged.emit(method="DeletePoint", kwargs={"pt_id": pt_id})
- def SetPoints(self, pts_data):
- for item in self.pointsToDraw.GetAllItems():
- self.pointsToDraw.DeleteItem(item)
- self.data = []
- for pt_data in pts_data:
- pt_data_list = self._ptDataToList(pt_data)
- self.data.append(pt_data_list)
- self.pointsToDraw.AddItem(coords=(pt_data_list[3], pt_data_list[4]))
- self.pointsChanged.emit(method="SetPoints", kwargs={"pts_data": pts_data})
- def SetPointData(self, pt_id, data):
- for col, v in six.iteritems(data):
- if col == "use":
- continue
- idx = self.cols["name"].index(col)
- self.data[pt_id][idx] = v
- # if type is changed checked columns must be recalculated by _usePoint
- if "type" in data and "use" not in data:
- data["use"] = self.GetPointData(pt_id)["use"]
- if "use" in data:
- if self._usePoint(pt_id, data["use"]) == -1:
- data["use"] = False
- idx = self.cols["name"].index("use")
- self.data[pt_id][idx] = data["use"]
- self.pointsChanged.emit(
- method="SetPointData", kwargs={"pt_id": pt_id, "data": data}
- )
- def GetPointData(self, pt_id):
- return self._ptListDataToPtData(self.data[pt_id])
- def GetPointsCount(self):
- return len(self.data)
- def SetPointStatus(self, item, itemIndex):
- """Before point is drawn, decides properties of drawing style"""
- analysis, valid = self.an_params.GetParam("analysis")
- cats = self.an_data[analysis]["cmdParams"]["cats"]
- if itemIndex == self.selected:
- wxPen = "selected"
- elif not self.data[itemIndex][0]:
- wxPen = "unused"
- item.hide = False
- elif len(cats) > 1:
- idx = self.data[itemIndex][1]
- if idx == 2: # End/To/Sink point
- wxPen = "used2cat"
- else:
- wxPen = "used1cat"
- else:
- wxPen = "used1cat"
- item.SetPropertyVal("label", str(itemIndex + 1))
- item.SetPropertyVal("penName", wxPen)
- def SetSelected(self, pt_id):
- self.selected = pt_id
- self.pointsChanged.emit(method="SetSelected", kwargs={"pt_id": pt_id})
- def GetSelected(self):
- return self.selected
- def SetPointDrawSettings(self):
- """Set settings for drawing of points"""
- ptSize = int(
- UserSettings.Get(group="vnet", key="point_symbol", subkey="point_size")
- )
- self.pointsToDraw.SetPropertyVal("size", ptSize)
- colors = UserSettings.Get(group="vnet", key="point_colors")
- ptWidth = int(
- UserSettings.Get(group="vnet", key="point_symbol", subkey="point_width")
- )
- textProp = self.pointsToDraw.GetPropertyVal("text")
- textProp["font"].SetPointSize(ptSize + 2)
- for colKey, col in six.iteritems(colors):
- pen = self.pointsToDraw.GetPen(colKey)
- if pen:
- pen.SetColour(wx.Colour(col[0], col[1], col[2], 255))
- pen.SetWidth(ptWidth)
- else:
- self.pointsToDraw.AddPen(
- colKey,
- wx.Pen(
- colour=wx.Colour(col[0], col[1], col[2], 255), width=ptWidth
- ),
- )
- def ParametersChanged(self, method, kwargs):
- if "analysis" in list(kwargs["changed_params"].keys()):
- self._updateTypeCol()
- if self.an_params.GetParam("analysis")[0] == "v.net.path":
- self._vnetPathUpdateUsePoints(None)
- def _updateTypeCol(self):
- """Rename category values when module is changed. Expample: Start point -> Sink point"""
- colValues = [""]
- analysis, valid = self.an_params.GetParam("analysis")
- anParamsCats = self.an_data[analysis]["cmdParams"]["cats"]
- for ptCat in anParamsCats:
- colValues.append(ptCat[1])
- type_idx = self.cols["name"].index("type")
- self.cols["type"][type_idx] = colValues
- def _ptDataToList(self, pt_data):
- pt_list_data = [None] * len(self.cols["name"])
- for k, val in six.iteritems(pt_data):
- pt_list_data[self.cols["name"].index(k)] = val
- return pt_list_data
- def _ptListDataToPtData(self, pt_list_data):
- pt_data = {}
- for i, val in enumerate(pt_list_data):
- pt_data[self.cols["name"][i]] = val
- return pt_data
- def _usePoint(self, pt_id, use):
- """Item is checked/unchecked"""
- analysis, valid = self.an_params.GetParam("analysis")
- cats = self.an_data[analysis]["cmdParams"]["cats"]
- # TODO move
- # if self.updateMap:
- # up_map_evt = gUpdateMap(render = False, renderVector = False)
- # wx.PostEvent(self.dialog.mapWin, up_map_evt)
- if len(cats) <= 1:
- return 0
- use_idx = self.cols["name"].index("use")
- checkedVal = self.data[pt_id][1]
- # point without given type cannot be selected
- if checkedVal == 0:
- self.data[pt_id][use_idx] = False
- self.pointsChanged.emit(
- method="SetPointData", kwargs={"pt_id": pt_id, "data": {"use": False}}
- )
- return -1
- if analysis == "v.net.path" and use:
- self._vnetPathUpdateUsePoints(pt_id)
- def _vnetPathUpdateUsePoints(self, checked_pt_id):
- alreadyChecked = []
- type_idx = self.cols["name"].index("type")
- use_idx = self.cols["name"].index("use")
- if checked_pt_id is not None:
- checkedKey = checked_pt_id
- alreadyChecked.append(self.data[checked_pt_id][type_idx])
- else:
- checkedKey = -1
- for iKey, dt in enumerate(self.data):
- pt_type = dt[type_idx]
- if (
- (pt_type in alreadyChecked and checkedKey != iKey) or pt_type == 0
- ) and self.data[iKey][use_idx]:
- self.data[iKey][use_idx] = False
- self.pointsChanged.emit(
- method="SetPointData",
- kwargs={"pt_id": iKey, "data": {"use": False}},
- )
- elif self.data[iKey][use_idx]:
- alreadyChecked.append(pt_type)
- def EditPointMode(self, activate):
- """Registers/unregisters mouse handler into map window"""
- if activate == self.handlerRegistered:
- return
- if activate:
- self.mapWin.RegisterMouseEventHandler(
- wx.EVT_LEFT_DOWN, self.OnMapClickHandler, "cross"
- )
- self.handlerRegistered = True
- else:
- self.mapWin.UnregisterMouseEventHandler(
- wx.EVT_LEFT_DOWN, self.OnMapClickHandler
- )
- self.handlerRegistered = False
- self.pointsChanged.emit(method="EditMode", kwargs={"activated": activate})
- def IsEditPointModeActive(self):
- return self.handlerRegistered
- def OnMapClickHandler(self, event):
- """Take coordinates from map window"""
- # TODO update snapping after input change
- if event == "unregistered":
- self.handlerRegistered = False
- return
- if not self.data:
- self.AddPoint()
- e, n = self.mapWin.GetLastEN()
- if self.snapping:
- # compute threshold
- snapTreshPix = int(
- UserSettings.Get(group="vnet", key="other", subkey="snap_tresh")
- )
- res = max(self.mapWin.Map.region["nsres"], self.mapWin.Map.region["ewres"])
- snapTreshDist = snapTreshPix * res
- params, err_params, flags = self.an_params.GetParams()
- vectMap = params["input"]
- if "input" in err_params:
- msg = _("new point")
- coords = SnapToNode(e, n, snapTreshDist, vectMap)
- if coords:
- e = coords[0]
- n = coords[1]
- msg = "snapped to node"
- else:
- msg = _("new point")
- else:
- msg = _("new point")
- self.SetPointData(self.selected, {"topology": msg, "e": e, "n": n})
- self.pointsToDraw.GetItem(self.selected).SetCoords([e, n])
- if self.selected == len(self.data) - 1:
- self.SetSelected(0)
- else:
- self.SetSelected(self.GetSelected() + 1)
- def GetColumns(self, only_relevant=True):
- cols_data = deepcopy(self.cols)
- hidden_cols = []
- hidden_cols.append(self.cols["name"].index("e"))
- hidden_cols.append(self.cols["name"].index("n"))
- analysis, valid = self.an_params.GetParam("analysis")
- if only_relevant and len(self.an_data[analysis]["cmdParams"]["cats"]) <= 1:
- hidden_cols.append(self.cols["name"].index("type"))
- i_red = 0
- hidden_cols.sort()
- for idx in hidden_cols:
- for dt in six.itervalues(cols_data):
- dt.pop(idx - i_red)
- i_red += 1
- return cols_data
- class VNETAnalysisParameters:
- def __init__(self, an_props):
- self.an_props = an_props
- self.params = {
- "analysis": self.an_props.used_an[0],
- "input": "",
- "arc_layer": "",
- "node_layer": "",
- "arc_column": "",
- "arc_backward_column": "",
- "node_column": "",
- "turn_layer": "",
- "turn_cat_layer": "",
- "iso_lines": "", # TODO check validity
- "max_dist": 0,
- } # TODO check validity
- self.flags = {"t": False}
- self.parametersChanged = Signal("VNETAnalysisParameters.parametersChanged")
- def SetParams(self, params, flags):
- changed_params = {}
- for p, v in six.iteritems(params):
- if p == "analysis" and v not in self.an_props.used_an:
- continue
- if p == "input":
- mapName, mapSet = ParseMapStr(v)
- v = mapName + "@" + mapSet
- if p in self.params:
- if isinstance(v, str):
- v = v.strip()
- self.params[p] = v
- changed_params[p] = v
- changed_flags = {}
- for p, v in six.iteritems(flags):
- if p in self.flags:
- self.flags[p] = v
- changed_flags[p] = v
- self.parametersChanged.emit(
- method="SetParams",
- kwargs={"changed_params": changed_params, "changed_flags": changed_flags},
- )
- return changed_params, changed_flags
- def GetParam(self, param):
- invParams = []
- if param in [
- "input",
- "arc_layer",
- "node_layer",
- "arc_column",
- "arc_backward_column",
- "node_column",
- "turn_layer",
- "turn_cat_layer",
- ]:
- invParams = self._getInvalidParams(self.params)
- if invParams:
- return self.params[param], False
- return self.params[param], True
- def GetParams(self):
- invParams = self._getInvalidParams(self.params)
- return self.params, invParams, self.flags
- def _getInvalidParams(self, params):
- """Check of analysis input data for invalid values (Parameters tab)"""
- # dict of invalid values {key from self.itemData (comboboxes from
- # Parameters tab) : invalid value}
- invParams = []
- # check vector map
- if params["input"]:
- mapName, mapSet = params["input"].split("@")
- if mapSet in grass.list_grouped("vector"):
- vectMaps = grass.list_grouped("vector")[mapSet]
- if not params["input"] or mapName not in vectMaps:
- invParams = list(params.keys())[:]
- return invParams
- # check arc/node layer
- layers = utils.GetVectorNumberOfLayers(params["input"])
- for layer in ["arc_layer", "node_layer", "turn_layer", "turn_cat_layer"]:
- if not layers or params[layer] not in layers:
- invParams.append(layer)
- dbInfo = VectorDBInfo(params["input"])
- try:
- table = dbInfo.GetTable(int(params["arc_layer"]))
- columnchoices = dbInfo.GetTableDesc(table)
- except (KeyError, ValueError):
- table = None
- # check costs columns
- for col in ["arc_column", "arc_backward_column", "node_column"]:
- if col == "node_column":
- try:
- table = dbInfo.GetTable(int(params["node_layer"]))
- columnchoices = dbInfo.GetTableDesc(table)
- except (KeyError, ValueError):
- table = None
- if not table or not params[col] in list(columnchoices.keys()):
- invParams.append(col)
- continue
- if columnchoices[params[col]]["type"] not in [
- "integer",
- "double precision",
- ]:
- invParams.append(col)
- continue
- return invParams
- class VNETAnalysesProperties:
- def __init__(self):
- """Initializes parameters for different v.net.* modules"""
- # initialization of v.net.* analysis parameters (data which
- # characterizes particular analysis)
- self.attrCols = {
- "arc_column": {
- "label": _("Arc forward/both direction(s) cost column:"),
- "name": _("arc forward/both"),
- },
- "arc_backward_column": {
- "label": _("Arc backward direction cost column:"),
- "name": _("arc backward"),
- },
- "acolumn": {
- "label": _("Arcs' cost column (for both directions):"),
- "name": _("arc"),
- "inputField": "arc_column",
- },
- "node_column": {"label": _("Node cost column:"), "name": _("node")},
- }
- self.vnetProperties = {
- "v.net.path": {
- "label": _("Shortest path %s") % "(v.net.path)",
- "cmdParams": {
- "cats": [["st_pt", _("Start point")], ["end_pt", _("End point")]],
- "cols": ["arc_column", "arc_backward_column", "node_column"],
- },
- "resultProps": {
- "singleColor": None,
- "dbMgr": True, # TODO delete this property, this information can be get from result
- },
- "turns_support": True,
- },
- "v.net.salesman": {
- "label": _("Traveling salesman %s") % "(v.net.salesman)",
- "cmdParams": {
- "cats": [["center_cats", None]],
- "cols": ["arc_column", "arc_backward_column"],
- },
- "resultProps": {"singleColor": None, "dbMgr": False},
- "turns_support": True,
- },
- "v.net.flow": {
- "label": _("Maximum flow %s") % "(v.net.flow)",
- "cmdParams": {
- "cats": [
- ["source_cats", _("Source point")],
- ["sink_cats", _("Sink point")],
- ],
- "cols": ["arc_column", "arc_backward_column", "node_column"],
- },
- "resultProps": {"attrColColor": "flow", "dbMgr": True},
- "turns_support": False,
- },
- "v.net.alloc": {
- "label": _("Subnets for nearest centers %s") % "(v.net.alloc)",
- "cmdParams": {
- "cats": [["center_cats", None]],
- "cols": ["arc_column", "arc_backward_column", "node_column"],
- },
- "resultProps": {"catColor": None, "dbMgr": False},
- "turns_support": True,
- },
- "v.net.steiner": {
- "label": _("Steiner tree for the network and given terminals %s")
- % "(v.net.steiner)",
- "cmdParams": {
- "cats": [["terminal_cats", None]],
- "cols": [
- "acolumn",
- ],
- },
- "resultProps": {"singleColor": None, "dbMgr": False},
- "turns_support": True,
- },
- "v.net.distance": {
- "label": _("Shortest distance via the network %s") % "(v.net.distance)",
- "cmdParams": {
- "cats": [["from_cats", "From point"], ["to_cats", "To point"]],
- "cols": ["arc_column", "arc_backward_column", "node_column"],
- },
- "resultProps": {"catColor": None, "dbMgr": True},
- "turns_support": False,
- },
- "v.net.iso": {
- "label": _("Cost isolines %s") % "(v.net.iso)",
- "cmdParams": {
- "cats": [["center_cats", None]],
- "cols": ["arc_column", "arc_backward_column", "node_column"],
- },
- "resultProps": {"catColor": None, "dbMgr": False},
- "turns_support": True,
- },
- }
- self.used_an = [
- "v.net.path",
- "v.net.salesman",
- "v.net.flow",
- "v.net.alloc",
- "v.net.distance",
- "v.net.iso",
- # "v.net.steiner"
- ]
- for an in list(self.vnetProperties.keys()):
- if an not in self.used_an:
- del self.vnetProperties[an]
- continue
- cols = self.vnetProperties[an]["cmdParams"]["cols"]
- self.vnetProperties[an]["cmdParams"]["cols"] = {}
- for c in cols:
- self.vnetProperties[an]["cmdParams"]["cols"][c] = self.attrCols[c]
- def has_key(self, key):
- return key in self.vnetProperties
- def __getitem__(self, key):
- return self.vnetProperties[key]
- def GetRelevantParams(self, analysis):
- if analysis not in self.vnetProperties:
- return None
- relevant_params = ["input", "arc_layer", "node_layer"]
- if self.vnetProperties[analysis]["turns_support"]:
- relevant_params += ["turn_layer", "turn_cat_layer"]
- cols = self.vnetProperties[analysis]["cmdParams"]["cols"]
- for col, v in six.iteritems(cols):
- if "inputField" in col:
- colInptF = v["inputField"]
- else:
- colInptF = col
- relevant_params.append(colInptF)
- return relevant_params
- class VNETTmpVectMaps:
- """Class which creates, stores and destroys all tmp maps created during analysis"""
- def __init__(self, parent, mapWin):
- self.tmpMaps = [] # temporary maps
- self.parent = parent
- self.mapWin = mapWin
- def AddTmpVectMap(self, mapName, msg):
- """New temporary map
- :return: instance of VectMap representing temporary map
- """
- currMapSet = grass.gisenv()["MAPSET"]
- tmpMap = grass.find_file(name=mapName, element="vector", mapset=currMapSet)
- fullName = tmpMap["fullname"]
- # map already exists
- if fullName:
- # TODO move dialog out of class, AddTmpVectMap(self, mapName,
- # overvrite = False)
- dlg = wx.MessageDialog(
- parent=self.parent,
- message=msg,
- caption=_("Overwrite map layer"),
- style=wx.YES_NO | wx.NO_DEFAULT | wx.ICON_QUESTION | wx.CENTRE,
- )
- ret = dlg.ShowModal()
- dlg.Destroy()
- if ret == wx.ID_NO:
- return None
- else:
- fullName = mapName + "@" + currMapSet
- newVectMap = VectMap(self.mapWin, fullName)
- self.tmpMaps.append(newVectMap)
- return newVectMap
- def HasTmpVectMap(self, vectMapName):
- """
- :param: vectMapName name of vector map
- :return: True if it contains the map
- :return: False if not
- """
- mapValSpl = vectMapName.strip().split("@")
- if len(mapValSpl) > 1:
- mapSet = mapValSpl[1]
- else:
- mapSet = grass.gisenv()["MAPSET"]
- mapName = mapValSpl[0]
- fullName = mapName + "@" + mapSet
- for vectTmpMap in self.tmpMaps:
- if vectTmpMap.GetVectMapName() == fullName:
- return True
- return False
- def GetTmpVectMap(self, vectMapName):
- """Get instance of VectMap with name vectMapName"""
- for vectMap in self.tmpMaps:
- if vectMap.GetVectMapName() == vectMapName.strip():
- return vectMap
- return None
- def RemoveFromTmpMaps(self, vectMap):
- """Temporary map is removed from the class instance however it is not deleted
- :param vectMap: instance of VectMap class to be removed
- :return: True if was removed
- :return: False if does not contain the map
- """
- try:
- self.tmpMaps.remove(vectMap)
- return True
- except ValueError:
- return False
- def DeleteTmpMap(self, vectMap):
- """Temporary map is removed from the class and it is deleted
- :param vectMap: instance of VectMap class to be deleted
- :return: True if was removed
- :return: False if does not contain the map
- """
- if vectMap:
- vectMap.DeleteRenderLayer()
- RunCommand(
- "g.remove", flags="f", type="vector", name=vectMap.GetVectMapName()
- )
- self.RemoveFromTmpMaps(vectMap)
- return True
- return False
- def DeleteAllTmpMaps(self):
- """Delete all temporary maps in the class"""
- update = False
- for tmpMap in self.tmpMaps:
- RunCommand(
- "g.remove", flags="f", type="vector", name=tmpMap.GetVectMapName()
- )
- if tmpMap.DeleteRenderLayer():
- update = True
- return update
- class VectMap:
- """Represents map
- It can check if it was modified or render it
- """
- def __init__(self, mapWin, fullName):
- self.fullName = fullName
- self.mapWin = mapWin
- self.renderLayer = None
- self.modifTime = None # time, for modification check
- def __del__(self):
- self.DeleteRenderLayer()
- def AddRenderLayer(self, cmd=None, colorsCmd=None):
- """Add map from map window layers to render"""
- if not self.mapWin:
- return False
- existsMap = grass.find_file(
- name=self.fullName, element="vector", mapset=grass.gisenv()["MAPSET"]
- )
- if not existsMap["name"]:
- self.DeleteRenderLayer()
- return False
- if not cmd:
- cmd = []
- cmd.insert(0, "d.vect")
- cmd.append("map=%s" % self.fullName)
- if self.renderLayer:
- self.DeleteRenderLayer()
- if colorsCmd:
- colorsCmd.append("map=%s" % self.fullName)
- layerStyleVnetColors = cmdlist_to_tuple(colorsCmd)
- RunCommand(layerStyleVnetColors[0], **layerStyleVnetColors[1])
- self.renderLayer = self.mapWin.Map.AddLayer(
- ltype="vector",
- command=cmd,
- name=self.fullName,
- active=True,
- opacity=1.0,
- render=False,
- pos=-1,
- )
- return True
- def DeleteRenderLayer(self):
- """Remove map from map window layers to render"""
- if not self.mapWin:
- return False
- if self.renderLayer:
- self.mapWin.Map.DeleteLayer(self.renderLayer)
- self.renderLayer = None
- return True
- return False
- def GetRenderLayer(self):
- return self.renderLayer
- def GetVectMapName(self):
- return self.fullName
- def SaveVectMapState(self):
- """Save modification time for vector map"""
- self.modifTime = self.GetLastModified()
- def VectMapState(self):
- """Checks if map was modified
- :return: -1 - if no modification time was saved
- :return: 0 - if map was modified
- :return: 1 - if map was not modified
- """
- if self.modifTime is None:
- return -1
- if self.modifTime != self.GetLastModified():
- return 0
- return 1
- def GetLastModified(self):
- """Get modification time
- :return: MAP DATE time string from vector map head file
- """
- mapValSpl = self.fullName.split("@")
- mapSet = mapValSpl[1]
- mapName = mapValSpl[0]
- headPath = os.path.join(
- grass.gisenv()["GISDBASE"],
- grass.gisenv()["LOCATION_NAME"],
- mapSet,
- "vector",
- mapName,
- "head",
- )
- try:
- head = open(headPath, "r")
- for line in head.readlines():
- i = line.find(
- "MAP DATE:",
- )
- if i == 0:
- head.close()
- return line.split(":", 1)[1].strip()
- head.close()
- return ""
- except IOError:
- return ""
- class History:
- """Class which reads and saves history data (based on gui.core.settings Settings class file save/load)
- .. todo::
- Maybe it could be useful for other GRASS wxGUI tools.
- """
- def __init__(self):
- # max number of steps in history (zero based)
- self.maxHistSteps = 3
- # current history step
- self.currHistStep = 0
- # number of steps saved in history
- self.histStepsNum = 0
- # dict contains data saved in history for current history step
- self.currHistStepData = {}
- # buffer for data to be saved into history
- self.newHistStepData = {}
- self.histFile = grass.tempfile()
- # key/value separator
- self.sep = ";"
- def __del__(self):
- try_remove(self.histFile)
- def GetNext(self):
- """Go one step forward in history"""
- self.currHistStep -= 1
- self.currHistStepData.clear()
- self.currHistStepData = self._getHistStepData(self.currHistStep)
- return self.currHistStepData
- def GetPrev(self):
- """Go one step back in history"""
- self.currHistStep += 1
- self.currHistStepData.clear()
- self.currHistStepData = self._getHistStepData(self.currHistStep)
- return self.currHistStepData
- def GetStepsNum(self):
- """Get number of steps saved in history"""
- return self.histStepsNum
- def GetCurrHistStep(self):
- """Get current history step"""
- return self.currHistStep
- def Add(self, key, subkey, value):
- """Add new data into buffer"""
- if key not in self.newHistStepData:
- self.newHistStepData[key] = {}
- if isinstance(subkey, list):
- if subkey[0] not in self.newHistStepData[key]:
- self.newHistStepData[key][subkey[0]] = {}
- self.newHistStepData[key][subkey[0]][subkey[1]] = value
- else:
- self.newHistStepData[key][subkey] = value
- def SaveHistStep(self):
- """Create new history step with data in buffer"""
- self.maxHistSteps = UserSettings.Get(
- group="vnet", key="other", subkey="max_hist_steps"
- )
- self.currHistStep = 0
- newHistFile = grass.tempfile()
- newHist = open(newHistFile, "w")
- self._saveNewHistStep(newHist)
- oldHist = open(self.histFile)
- removedHistData = self._savePreviousHist(newHist, oldHist)
- oldHist.close()
- newHist.close()
- try_remove(self.histFile)
- self.histFile = newHistFile
- self.newHistStepData.clear()
- return removedHistData
- def _savePreviousHist(self, newHist, oldHist):
- """Save previous history into new file"""
- newHistStep = False
- removedHistData = {}
- newHistStepsNum = self.histStepsNum
- for line in oldHist.readlines():
- if not line.strip():
- newHistStep = True
- newHistStepsNum += 1
- continue
- if newHistStep:
- newHistStep = False
- line = line.split("=")
- line[1] = str(newHistStepsNum)
- line = "=".join(line)
- if newHistStepsNum >= self.maxHistSteps:
- removedHistStep = removedHistData[line] = {}
- continue
- else:
- newHist.write("%s%s%s" % ("\n", line, "\n"))
- self.histStepsNum = newHistStepsNum
- else:
- if newHistStepsNum >= self.maxHistSteps:
- self._parseLine(line, removedHistStep)
- else:
- newHist.write("%s" % line)
- return removedHistData
- def _saveNewHistStep(self, newHist):
- """Save buffer (new step) data into file"""
- newHist.write("%s%s%s" % ("\n", "history step=0", "\n"))
- for key in list(self.newHistStepData.keys()):
- subkeys = list(self.newHistStepData[key].keys())
- newHist.write("%s%s" % (key, self.sep))
- for idx in range(len(subkeys)):
- value = self.newHistStepData[key][subkeys[idx]]
- if isinstance(value, dict):
- if idx > 0:
- newHist.write("%s%s%s" % ("\n", key, self.sep))
- newHist.write("%s%s" % (subkeys[idx], self.sep))
- kvalues = list(self.newHistStepData[key][subkeys[idx]].keys())
- srange = range(len(kvalues))
- for sidx in srange:
- svalue = self._parseValue(
- self.newHistStepData[key][subkeys[idx]][kvalues[sidx]]
- )
- newHist.write("%s%s%s" % (kvalues[sidx], self.sep, svalue))
- if sidx < len(kvalues) - 1:
- newHist.write("%s" % self.sep)
- else:
- if idx > 0 and isinstance(
- self.newHistStepData[key][subkeys[idx - 1]], dict
- ):
- newHist.write("%s%s%s" % ("\n", key, self.sep))
- value = self._parseValue(self.newHistStepData[key][subkeys[idx]])
- newHist.write("%s%s%s" % (subkeys[idx], self.sep, value))
- if idx < len(subkeys) - 1 and not isinstance(
- self.newHistStepData[key][subkeys[idx + 1]], dict
- ):
- newHist.write("%s" % self.sep)
- newHist.write("\n")
- self.histStepsNum = 0
- def _parseValue(self, value, read=False):
- """Parse value"""
- if read: # -> read data (cast values)
- if value:
- if (
- value[0] == "[" and value[-1] == "]"
- ): # TODO, possible wrong interpretation
- value = value[1:-1].split(",")
- value = map(self._castValue, value)
- return value
- if value == "True":
- value = True
- elif value == "False":
- value = False
- elif value == "None":
- value = None
- elif ":" in value: # -> color
- try:
- value = tuple(map(int, value.split(":")))
- except ValueError: # -> string
- pass
- else:
- try:
- value = int(value)
- except ValueError:
- try:
- value = float(value)
- except ValueError:
- pass
- else: # -> write data
- if isinstance(value, type(())): # -> color
- value = str(value[0]) + ":" + str(value[1]) + ":" + str(value[2])
- return value
- def _castValue(self, value):
- """Cast value"""
- try:
- value = int(value)
- except ValueError:
- try:
- value = float(value)
- except ValueError:
- value = value[1:-1]
- return value
- def _getHistStepData(self, histStep):
- """Load data saved in history step"""
- hist = open(self.histFile)
- histStepData = {}
- newHistStep = False
- isSearchedHistStep = False
- for line in hist.readlines():
- if not line.strip() and isSearchedHistStep:
- break
- elif not line.strip():
- newHistStep = True
- continue
- elif isSearchedHistStep:
- self._parseLine(line, histStepData)
- if newHistStep:
- line = line.split("=")
- if int(line[1]) == histStep:
- isSearchedHistStep = True
- newHistStep = False
- hist.close()
- return histStepData
- def _parseLine(self, line, histStepData):
- """Parse line in file with history"""
- line = line.rstrip("%s" % os.linesep).split(self.sep)
- key = line[0]
- kv = line[1:]
- idx = 0
- subkeyMaster = None
- if len(kv) % 2 != 0: # multiple (e.g. nviz)
- subkeyMaster = kv[0]
- del kv[0]
- idx = 0
- while idx < len(kv):
- if subkeyMaster:
- subkey = [subkeyMaster, kv[idx]]
- else:
- subkey = kv[idx]
- value = kv[idx + 1]
- value = self._parseValue(value, read=True)
- if key not in histStepData:
- histStepData[key] = {}
- if isinstance(subkey, list):
- if subkey[0] not in histStepData[key]:
- histStepData[key][subkey[0]] = {}
- histStepData[key][subkey[0]][subkey[1]] = value
- else:
- histStepData[key][subkey] = value
- idx += 2
- def DeleteNewHistStepData(self):
- """Delete buffer data for new history step"""
- self.newHistStepData.clear()
- class VNETGlobalTurnsData:
- """Turn Data"""
- def __init__(self):
- # Definition of four basic directions
- self.turn_data = [
- ["Straight", DegreesToRadians(-30), DegreesToRadians(+30), 0.0],
- ["Right Turn", DegreesToRadians(+30), DegreesToRadians(+150), 0.0],
- ["Reverse", DegreesToRadians(+150), DegreesToRadians(-150), 0.0],
- ["Left Turn", DegreesToRadians(-150), DegreesToRadians(-30), 0.0],
- ]
- def GetData(self):
- data = []
- for ival in self.turn_data:
- data.append(ival[1:])
- return data
- def GetValue(self, line, col):
- return self.turn_data[line][col]
- def GetLinesCount(self):
- return len(self.turn_data)
- def SetValue(self, value, line, col):
- self.DataValidator(line, col, value)
- self.turn_data[line][col] = value
- def SetUTurns(self, value):
- """Checked if checeBox is checed"""
- useUTurns = value
- def AppendRow(self, values):
- self.turn_data.append(values)
- def InsertRow(self, line, values):
- self.turn_data.insert(line, values)
- def PopRow(self, values):
- self.RemoveDataValidator(values)
- self.turn_data.pop(values)
- def DataValidator(self, row, col, value):
- """Angle recalculation due to value changing"""
- if col not in [1, 2]:
- return
- if col == 1:
- new_from_angle = value
- old_from_angle = self.turn_data[row][1]
- new_to_angle = self.turn_data[row][2]
- if self.IsInInterval(old_from_angle, new_to_angle, new_from_angle):
- prev_row = row - 1
- if prev_row == -1:
- prev_row = len(self.turn_data) - 1
- self.turn_data[prev_row][2] = new_from_angle
- return
- if col == 2:
- new_to_angle = value
- old_to_angle = self.turn_data[row][2]
- new_from_angle = self.turn_data[row][1]
- if self.IsInInterval(new_from_angle, old_to_angle, new_to_angle):
- next_row = row + 1
- if len(self.turn_data) == next_row:
- next_row = 0
- self.turn_data[next_row][1] = new_to_angle
- return
- inside_new = []
- overlap_new_from = []
- overlap_new_to = []
- for i in range(self.GetLinesCount()):
- if i == row:
- continue
- from_angle = self.turn_data[i][1]
- is_in_from = self.IsInInterval(new_from_angle, new_to_angle, from_angle)
- to_angle = self.turn_data[i][2]
- is_in_to = self.IsInInterval(new_from_angle, new_to_angle, to_angle)
- if is_in_from and is_in_to:
- inside_new.append(i)
- if is_in_from:
- overlap_new_to.append(i)
- if is_in_to:
- overlap_new_from.append(i)
- for i_row in overlap_new_from:
- self.turn_data[i_row][2] = new_from_angle
- for i_row in overlap_new_to:
- self.turn_data[i_row][1] = new_to_angle
- for i_row in inside_new:
- if col == 1:
- angle = new_from_angle
- else:
- angle = new_to_angle
- self.turn_data[i_row][1] = angle
- self.turn_data[i_row][2] = angle
- def RemoveDataValidator(self, row):
- """Angle recalculation due to direction remove"""
- if row == 0:
- prev_row = self.GetLinesCount() - 1
- else:
- prev_row = row - 1
- remove_to_angle = self.turn_data[row][2]
- self.turn_data[prev_row][2] = remove_to_angle
- def IsInInterval(self, from_angle, to_angle, angle):
- """Test if a direction includes or not includes a value"""
- if to_angle < from_angle:
- to_angle = math.pi * 2 + to_angle
- if angle < from_angle:
- angle = math.pi * 2 + angle
- if angle > from_angle and angle < to_angle:
- return True
- return False
|