""" @package gmodeler.model @brief wxGUI Graphical Modeler (base classes & read/write) Classes: - model::Model - model::ModelObject - model::ModelAction - model::ModelData - model::ModelRelation - model::ModelItem - model::ModelLoop - model::ModelCondition - model::ModelComment - model::ProcessModelFile - model::WriteModelFile - model::WritePythonFile - model::ModelParamDialog (C) 2010-2018 by the GRASS Development Team This program is free software under the GNU General Public License (>=v2). Read the file COPYING that comes with GRASS for details. @author Martin Landa @author Python parameterization Ondrej Pesek """ import os import getpass import copy import re import mimetypes import time import six try: import xml.etree.ElementTree as etree except ImportError: import elementtree.ElementTree as etree # Python <= 2.4 import xml.sax.saxutils as saxutils import wx from wx.lib import ogl from core import globalvar from core import utils from core.gcmd import ( GMessage, GException, GError, RunCommand, GWarning, GetDefaultEncoding, ) from core.settings import UserSettings from gui_core.forms import GUI, CmdPanel from gui_core.widgets import GNotebook from gui_core.wrap import Button from gmodeler.giface import GraphicalModelerGrassInterface from grass.script import task as gtask class Model(object): """Class representing the model""" def __init__(self, canvas=None): self.items = list() # list of ordered items (action/loop/condition) # model properties self.properties = { "name": _("model"), "description": _("Script generated by wxGUI Graphical Modeler."), "author": getpass.getuser(), } # model variables self.variables = dict() self.variablesParams = dict() self.canvas = canvas def GetCanvas(self): """Get canvas or None""" return self.canvas def GetItems(self, objType=None): """Get list of model items :param objType: Object type to filter model objects """ if not objType: return self.items result = list() for item in self.items: if isinstance(item, objType): result.append(item) return result def GetItem(self, aId, objType=None): """Get item of given id :param aId: item id :return: Model* instance :return: None if no item found """ ilist = self.GetItems(objType) for item in ilist: if item.GetId() == aId: return item return None def GetItemIndex(self, item): """Return list index of given item""" return self.items.index(item) def GetNumItems(self, actionOnly=False): """Get number of items""" if actionOnly: return len(self.GetItems(objType=ModelAction)) return len(self.GetItems()) def ReorderItems(self, idxList): items = list() for oldIdx, newIdx in six.iteritems(idxList): item = self.items.pop(oldIdx) items.append(item) self.items.insert(newIdx, item) # try: # nextItem = self.items[newIdx+1] # except IndexError: # continue # newIdx is the last item in the list # items.append(nextItem) # x = item.GetX() # y = item.GetY() # item.SetX(nextItem.GetX()) # item.SetY(nextItem.GetY()) # nextItem.SetX(x) # nextItem.SetY(y) dc = wx.ClientDC(self.canvas) for item in items: item.MoveLinks(dc) for mo in item.GetBlock(): if isinstance(mo, ModelLoop): self.canvas.parent.DefineLoop(mo) elif isinstance(mo, ModelCondition): self.canvas.parent.DefineCondition(mo) def Normalize(self): # check for inconsistecies for idx in range(1, len(self.items)): if not self.items[idx].GetBlock() and isinstance( self.items[idx - 1], ModelLoop ): # swap action not-in-block with previously defined # loop itemPrev = self.items[idx - 1] self.items[idx - 1] = self.items[idx] self.items[idx] = itemPrev # update ids iId = 1 for item in self.items: item.SetId(iId) item.SetLabel() iId += 1 def GetNextId(self): """Get next id (data ignored) :return: next id to be used (default: 1) """ if len(self.items) < 1: return 1 currId = self.items[-1].GetId() if currId > 0: return currId + 1 return 1 def GetProperties(self): """Get model properties""" return self.properties def GetVariables(self, params=False): """Get model variables""" if params: return self.variablesParams return self.variables def SetVariables(self, data): """Set model variables""" self.variables = data def Reset(self): """Reset model""" self.items = list() def RemoveItem(self, item, reference=None): """Remove item from model :item: item to be removed :reference: reference item valid for deletion :return: list of related items to remove/update """ relList = list() upList = list() doRemove = True if isinstance(item, ModelAction): for rel in item.GetRelations(): relList.append(rel) data = rel.GetData() if len(data.GetRelations()) < 2: relList.append(data) else: upList.append(data) elif isinstance(item, ModelData): for rel in item.GetRelations(): otherItem = rel.GetTo() if rel.GetFrom() == item else rel.GetFrom() if reference and reference != otherItem: doRemove = False continue # avoid recursive deletion relList.append(rel) if reference and reference != otherItem: relList.append(otherItem) if not doRemove: upList.append(item) elif isinstance(item, ModelLoop): for rel in item.GetRelations(): relList.append(rel) for action in self.GetItems(): action.UnSetBlock(item) if doRemove and item in self.items: self.items.remove(item) return relList, upList def FindAction(self, aId): """Find action by id""" alist = self.GetItems(objType=ModelAction) for action in alist: if action.GetId() == aId: return action return None def GetMaps(self, prompt): """Get list of maps of selected type :param prompt: to filter maps""" maps = list() for data in self.GetData(): if prompt == data.GetPrompt(): mapName = data.GetValue() if not mapName or mapName[0] == "%": continue # skip variables maps.append(mapName) return maps def GetData(self): """Get list of data items""" result = list() dataItems = self.GetItems(objType=ModelData) for action in self.GetItems(objType=ModelAction): for rel in action.GetRelations(): dataItem = rel.GetData() if dataItem not in result: result.append(dataItem) if dataItem in dataItems: dataItems.remove(dataItem) # standalone data if dataItems: result += dataItems return result def FindData(self, value, prompt): """Find data item in the model :param value: value :param prompt: prompt :return: ModelData instance :return: None if not found """ for data in self.GetData(): if data.GetValue() == value and data.GetPrompt() == prompt: return data return None def LoadModel(self, filename): """Load model definition stored in GRASS Model XML file (gxm) .. todo:: Validate against DTD Raise exception on error. """ dtdFilename = os.path.join(globalvar.WXGUIDIR, "xml", "grass-gxm.dtd") # parse workspace file try: gxmXml = ProcessModelFile(etree.parse(filename)) except Exception as e: raise GException("{}".format(e)) if self.canvas: win = self.canvas.parent if gxmXml.pos: win.SetPosition(gxmXml.pos) if gxmXml.size: win.SetSize(gxmXml.size) # load properties self.properties = gxmXml.properties self.variables = gxmXml.variables # load actions for action in gxmXml.actions: actionItem = ModelAction( parent=self, x=action["pos"][0], y=action["pos"][1], width=action["size"][0], height=action["size"][1], task=action["task"], id=action["id"], label=action["label"], comment=action["comment"], ) if action["disabled"]: actionItem.Enable(False) self.AddItem(actionItem, pos=actionItem.GetId() - 1) actionItem.SetValid(actionItem.GetTask().get_options()) actionItem.GetLog() # substitute variables (-> valid/invalid) # load data & relations for data in gxmXml.data: dataItem = ModelData( parent=self, x=data["pos"][0], y=data["pos"][1], width=data["size"][0], height=data["size"][1], prompt=data["prompt"], value=data["value"], ) dataItem.SetIntermediate(data["intermediate"]) dataItem.SetHasDisplay(data["display"]) for rel in data["rels"]: actionItem = self.FindAction(rel["id"]) if rel["dir"] == "from": relation = ModelRelation( parent=self, fromShape=dataItem, toShape=actionItem, param=rel["name"], ) else: relation = ModelRelation( parent=self, fromShape=actionItem, toShape=dataItem, param=rel["name"], ) relation.SetControlPoints(rel["points"]) actionItem.AddRelation(relation) dataItem.AddRelation(relation) if self.canvas: dataItem.Update() # load loops for loop in gxmXml.loops: loopItem = ModelLoop( parent=self, x=loop["pos"][0], y=loop["pos"][1], width=loop["size"][0], height=loop["size"][1], label=loop["text"], id=loop["id"], ) self.AddItem(loopItem, pos=loopItem.GetId() - 1) # load conditions for condition in gxmXml.conditions: conditionItem = ModelCondition( parent=self, x=condition["pos"][0], y=condition["pos"][1], width=condition["size"][0], height=condition["size"][1], label=condition["text"], id=condition["id"], ) self.AddItem(conditionItem, pos=conditionItem.GetId() - 1) # define loops & if/else items for loop in gxmXml.loops: loopItem = self.GetItem(loop["id"], objType=ModelLoop) loopItem.SetItems(loop["items"]) for idx in loop["items"]: action = self.GetItem(idx, objType=ModelAction) action.SetBlock(loopItem) for condition in gxmXml.conditions: conditionItem = self.GetItem(condition["id"]) for b in condition["items"].keys(): alist = list() for aId in condition["items"][b]: action = self.GetItem(aId) alist.append(action) conditionItem.SetItems(alist, branch=b) items = conditionItem.GetItems() for b in items.keys(): for action in items[b]: action.SetBlock(conditionItem) # load comments for comment in gxmXml.comments: commentItem = ModelComment( parent=self, x=comment["pos"][0], y=comment["pos"][1], width=comment["size"][0], height=comment["size"][1], id=comment["id"], label=comment["text"], ) self.AddItem(commentItem, pos=commentItem.GetId() - 1) def AddItem(self, newItem, pos=-1): """Add item to the list""" if pos != -1: self.items.insert(pos, newItem) else: self.items.append(newItem) # i = 1 # for item in self.items: # item.SetId(i) # i += 1 def IsValid(self): """Return True if model is valid""" if self.Validate(): return False return True def Validate(self): """Validate model, return None if model is valid otherwise error string""" errList = list() variables = self.GetVariables().keys() pattern = re.compile(r"(.*)(%.+\s?)(.*)") for action in self.GetItems(objType=ModelAction): cmd = action.GetLog(string=False) task = GUI(show=None).ParseCommand(cmd=cmd) errList += map(lambda x: cmd[0] + ": " + x, task.get_cmd_error()) # check also variables for opt in cmd[1:]: if "=" not in opt: continue key, value = opt.split("=", 1) sval = pattern.search(value) if sval: var = sval.group(2).strip()[1:] # strip '%' from beginning found = False for v in variables: if var.startswith(v): found = True break if not found: report = True for item in filter( lambda x: isinstance(x, ModelLoop), action.GetBlock() ): if var in item.GetLabel(): report = False break if report: errList.append( cmd[0] + ": " + _("undefined variable '%s'") % var ) # TODO: check variables in file only optionally # errList += self._substituteFile(action, checkOnly = True) return errList def _substituteFile(self, item, params=None, checkOnly=False): """Subsitute variables in command file inputs :param bool checkOnly: tuble - True to check variable, don't touch files :return: list of undefined variables """ errList = list() self.fileInput = dict() # collect ascii inputs for p in item.GetParams()["params"]: if ( p.get("element", "") == "file" and p.get("prompt", "") == "input" and p.get("age", "") == "old" ): filename = p.get("value", p.get("default", "")) if filename and mimetypes.guess_type(filename)[0] == "text/plain": self.fileInput[filename] = None for finput in self.fileInput: # read lines fd = open(finput, "r") try: data = self.fileInput[finput] = fd.read() finally: fd.close() # substitute variables write = False variables = self.GetVariables() for variable in variables: pattern = re.compile("%" + variable) value = "" if params and "variables" in params: for p in params["variables"]["params"]: if variable == p.get("name", ""): if p.get("type", "string") == "string": value = p.get("value", "") else: value = str(p.get("value", "")) break if not value: value = variables[variable].get("value", "") data = pattern.sub(value, data) if not checkOnly: write = True pattern = re.compile(r"(.*)(%.+\s?)(.*)") sval = pattern.search(data) if sval: var = sval.group(2).strip()[1:] # ignore '%' cmd = item.GetLog(string=False)[0] errList.append(cmd + ": " + _("undefined variable '%s'") % var) if not checkOnly: if write: fd = open(finput, "w") try: fd.write(data) finally: fd.close() else: self.fileInput[finput] = None return errList def OnPrepare(self, item, params): self._substituteFile(item, params, checkOnly=False) def RunAction(self, item, params, log, onDone=None, onPrepare=None, statusbar=None): """Run given action :param item: action item :param params: parameters dict :param log: logging window :param onDone: on-done method :param onPrepare: on-prepare method :param statusbar: wx.StatusBar instance or None """ name = "({0}) {1}".format(item.GetId(), item.GetLabel()) if name in params: paramsOrig = item.GetParams(dcopy=True) item.MergeParams(params[name]) if statusbar: statusbar.SetStatusText(_("Running model..."), 0) data = {"item": item, "params": copy.deepcopy(params)} log.RunCmd( command=item.GetLog(string=False, substitute=params), onDone=onDone, onPrepare=self.OnPrepare, userData=data, ) if name in params: item.SetParams(paramsOrig) def Run(self, log, onDone, parent=None): """Run model :param log: logging window (see gconsole.GConsole) :param onDone: on-done method :param parent: window for messages or None """ if self.GetNumItems() < 1: GMessage(parent=parent, message=_("Model is empty. Nothing to run.")) return statusbar = None if isinstance(parent, wx.Frame): statusbar = parent.GetStatusBar() # validation if statusbar: statusbar.SetStatusText(_("Validating model..."), 0) errList = self.Validate() if statusbar: statusbar.SetStatusText("", 0) if errList: dlg = wx.MessageDialog( parent=parent, message=_( "Model is not valid. Do you want to " "run the model anyway?\n\n%s" ) % "\n".join(errList), caption=_("Run model?"), style=wx.YES_NO | wx.NO_DEFAULT | wx.ICON_QUESTION | wx.CENTRE, ) ret = dlg.ShowModal() dlg.Destroy() if ret != wx.ID_YES: return # parametrization params = self.Parameterize() delInterData = False if params: dlg = ModelParamDialog(parent=parent, model=self, params=params) dlg.CenterOnParent() ret = dlg.ShowModal() if ret != wx.ID_OK: dlg.Destroy() return err = dlg.GetErrors() delInterData = dlg.DeleteIntermediateData() dlg.Destroy() if err: GError(parent=parent, message="\n".join(err)) return err = list() for key, item in six.iteritems(params): for p in item["params"]: if p.get("value", "") == "": err.append((key, p.get("name", ""), p.get("description", ""))) if err: GError( parent=parent, message=_("Variables below not defined:") + "\n\n" + "\n".join(map(lambda x: "%s: %s (%s)" % (x[0], x[1], x[2]), err)), ) return log.cmdThread.SetId(-1) for item in self.GetItems(): if not item.IsEnabled(): continue if isinstance(item, ModelAction): if item.GetBlockId(): continue self.RunAction(item, params, log) elif isinstance(item, ModelLoop): cond = item.GetLabel() # substitute variables in condition variables = self.GetVariables() for variable in variables: pattern = re.compile("%" + variable) if pattern.search(cond): value = "" if params and "variables" in params: for p in params["variables"]["params"]: if variable == p.get("name", ""): value = p.get("value", "") break if not value: value = variables[variable].get("value", "") if not value: continue vtype = variables[variable].get("type", "string") if vtype == "string": value = '"' + value + '"' cond = pattern.sub(value, cond) # split condition # TODO: this part needs some better solution condVar, condText = map( lambda x: x.strip(), re.split("\s* in \s*", cond) ) pattern = re.compile("%" + condVar) # for vars()[condVar] in eval(condText): ? vlist = list() if condText[0] == "`" and condText[-1] == "`": # run command cmd, dcmd = gtask.cmdlist_to_tuple(condText[1:-1].split(" ")) ret = RunCommand(cmd, read=True, **dcmd) if ret: vlist = ret.splitlines() else: vlist = eval(condText) if "variables" not in params: params["variables"] = {"params": []} varDict = {"name": condVar, "value": ""} params["variables"]["params"].append(varDict) for var in vlist: for action in item.GetItems(self.GetItems()): if not action.IsEnabled(): continue varDict["value"] = var self.RunAction(item=action, params=params, log=log) params["variables"]["params"].remove(varDict) if delInterData: self.DeleteIntermediateData(log) # discard values if params: for item in six.itervalues(params): for p in item["params"]: p["value"] = "" def DeleteIntermediateData(self, log): """Detele intermediate data""" rast, vect, rast3d, msg = self.GetIntermediateData() if rast: log.RunCmd(["g.remove", "-f", "type=raster", "name=%s" % ",".join(rast)]) if rast3d: log.RunCmd( ["g.remove", "-f", "type=raster_3d", "name=%s" % ",".join(rast3d)] ) if vect: log.RunCmd(["g.remove", "-f", "type=vector", "name=%s" % ",".join(vect)]) def GetIntermediateData(self): """Get info about intermediate data""" rast = list() rast3d = list() vect = list() for data in self.GetData(): if not data.IsIntermediate(): continue name = data.GetValue() prompt = data.GetPrompt() if prompt == "raster": rast.append(name) elif prompt == "vector": vect.append(name) elif prompt == "raster_3d": rast3d.append(name) msg = "" if rast: msg += "\n\n%s: " % _("Raster maps") msg += ", ".join(rast) if rast3d: msg += "\n\n%s: " % _("3D raster maps") msg += ", ".join(rast3d) if vect: msg += "\n\n%s: " % _("Vector maps") msg += ", ".join(vect) return rast, vect, rast3d, msg def Update(self): """Update model""" for item in self.items: item.Update() def IsParameterized(self): """Return True if model is parameterized""" if self.Parameterize(): return True return False def Parameterize(self): """Return parameterized options""" result = dict() idx = 0 if self.variables: params = list() result["variables"] = {"flags": list(), "params": params, "idx": idx} for name, values in six.iteritems(self.variables): gtype = values.get("type", "string") if gtype in ("raster", "vector", "mapset", "file", "region", "dir"): gisprompt = True prompt = gtype if gtype == "raster": element = "cell" else: element = gtype ptype = "string" else: gisprompt = False prompt = None element = None ptype = gtype params.append( { "gisprompt": gisprompt, "multiple": False, "description": values.get("description", ""), "guidependency": "", "default": "", "age": None, "required": True, "value": values.get("value", ""), "label": "", "guisection": "", "key_desc": "", "values": list(), "parameterized": False, "values_desc": list(), "prompt": prompt, "element": element, "type": ptype, "name": name, } ) idx += 1 for action in self.GetItems(objType=ModelAction): if not action.IsEnabled(): continue name = "({0}) {1}".format(action.GetId(), action.GetLabel()) params = action.GetParams() increment = False for f in params["flags"]: if f.get("parameterized", False): if name not in result: increment = True result[name] = {"flags": list(), "params": list(), "idx": idx} result[name]["flags"].append(f) for p in params["params"]: if p.get("parameterized", False): if name not in result: increment = True result[name] = {"flags": list(), "params": list(), "idx": idx} result[name]["params"].append(p) if increment: idx += 1 self.variablesParams = result # record parameters return result class ModelObject(object): def __init__(self, id=-1, label=""): self.id = id # internal id, should be not changed self.label = "" self.rels = list() # list of ModelRelations self.isEnabled = True self.inBlock = list() # list of related loops/conditions def __del__(self): pass def GetLabel(self): """Get label""" return self.label def SetLabel(self, label=""): """Set label""" self.label = label def GetId(self): """Get id""" return self.id def SetId(self, newId): """Set id""" if self.inBlock: for loop in self.inBlock: # update block item loop.UpdateItem(self.id, newId) self.id = newId def AddRelation(self, rel): """Record new relation""" self.rels.append(rel) def GetRelations(self, fdir=None): """Get list of relations :param bool fdir: True for 'from' """ if fdir is None: return self.rels result = list() for rel in self.rels: if fdir == "from": if rel.GetFrom() == self: result.append(rel) else: if rel.GetTo() == self: result.append(rel) return result def IsEnabled(self): """Get True if action is enabled, otherwise False""" return self.isEnabled def Enable(self, enabled=True): """Enable/disable action""" self.isEnabled = enabled self.Update() def Update(self): pass def SetBlock(self, item): """Add object to the block (loop/condition) :param item: reference to ModelLoop or ModelCondition which defines loops/condition """ if item not in self.inBlock: self.inBlock.append(item) def UnSetBlock(self, item): """Remove object from the block (loop/consition) :param item: reference to ModelLoop or ModelCondition which defines loops/codition """ if item in self.inBlock: self.inBlock.remove(item) def GetBlock(self): """Get list of related ModelObject(s) which defines block (loop/condition) :return: list of ModelObjects """ return self.inBlock def GetBlockId(self): """Get list of related ids which defines block :return: list of ids """ ret = list() for mo in self.inBlock: ret.append(mo.GetId()) return ret class ModelAction(ModelObject, ogl.DividedShape): """Action class (GRASS module)""" def __init__( self, parent, x, y, id=-1, cmd=None, task=None, width=None, height=None, label=None, comment="", ): ModelObject.__init__(self, id, label) self.parent = parent self.task = task self.comment = comment if not width: width = UserSettings.Get( group="modeler", key="action", subkey=("size", "width") ) if not height: height = UserSettings.Get( group="modeler", key="action", subkey=("size", "height") ) if cmd: self.task = GUI(show=None).ParseCommand(cmd=cmd) else: if task: self.task = task else: self.task = None self.propWin = None self.data = list() # list of connected data items self.isValid = False self.isParameterized = False if self.parent.GetCanvas(): ogl.DividedShape.__init__(self, width, height) self.regionLabel = ogl.ShapeRegion() self.regionLabel.SetFormatMode( ogl.FORMAT_CENTRE_HORIZ | ogl.FORMAT_CENTRE_VERT ) self.AddRegion(self.regionLabel) self.regionComment = None self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self._setPen() self._setBrush() self.SetLabel(label) if comment: self.SetComment(comment) self.SetRegionSizes() self.ReformatRegions() if self.task: self.SetValid(self.task.get_options()) def _setBrush(self, running=False): """Set brush""" if running: color = UserSettings.Get( group="modeler", key="action", subkey=("color", "running") ) elif not self.isEnabled: color = UserSettings.Get(group="modeler", key="disabled", subkey="color") elif self.isValid: color = UserSettings.Get( group="modeler", key="action", subkey=("color", "valid") ) else: color = UserSettings.Get( group="modeler", key="action", subkey=("color", "invalid") ) wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def _setPen(self): """Set pen""" if self.isParameterized: width = int( UserSettings.Get( group="modeler", key="action", subkey=("width", "parameterized") ) ) else: width = int( UserSettings.Get( group="modeler", key="action", subkey=("width", "default") ) ) if self.isEnabled: style = wx.SOLID else: style = wx.DOT pen = wx.Pen(wx.BLACK, width, style) self.SetPen(pen) def ReformatRegions(self): rnum = 0 canvas = self.parent.GetCanvas() dc = wx.ClientDC(canvas) # used for measuring for region in self.GetRegions(): text = region.GetText() self.FormatText(dc, text, rnum) rnum += 1 def OnSizingEndDragLeft(self, pt, x, y, keys, attch): ogl.DividedShape.OnSizingEndDragLeft(self, pt, x, y, keys, attch) self.SetRegionSizes() self.ReformatRegions() self.GetCanvas().Refresh() def SetLabel(self, label=None): """Set label :param label: if None use command string instead """ if label: self.label = label elif self.label: label = self.label else: try: label = self.task.get_cmd(ignoreErrors=True)[0] except: label = _("unknown") idx = self.GetId() self.regionLabel.SetText("(%d) %s" % (idx, label)) self.SetRegionSizes() self.ReformatRegions() def SetComment(self, comment): """Set comment""" self.comment = comment if self.regionComment is None: self.regionComment = ogl.ShapeRegion() self.regionComment.SetFormatMode(ogl.FORMAT_CENTRE_HORIZ) font = wx.SystemSettings.GetFont(wx.SYS_DEFAULT_GUI_FONT) font.SetStyle(wx.ITALIC) self.regionComment.SetFont(font) # clear doesn't work # self.regionComment.ClearText() self.regionComment.SetText(comment) self.ClearRegions() self.AddRegion(self.regionLabel) self.regionLabel.SetProportions(0.0, 1.0) if self.comment: self.AddRegion(self.regionComment) self.regionLabel.SetProportions(0.0, 0.4) self.SetRegionSizes() self.ReformatRegions() def GetComment(self): """Get comment""" return self.comment def SetProperties(self, params, propwin): """Record properties dialog""" self.task.params = params["params"] self.task.flags = params["flags"] self.propWin = propwin def GetPropDialog(self): """Get properties dialog""" return self.propWin def GetLog(self, string=True, substitute=None): """Get logging info :param string: True to get cmd as a string otherwise a list :param substitute: dictionary of parameter to substitute or None """ cmd = self.task.get_cmd( ignoreErrors=True, ignoreRequired=True, ignoreDefault=False ) # substitute variables if substitute: variables = [] if "variables" in substitute: for p in substitute["variables"]["params"]: variables.append(p.get("name", "")) else: variables = self.parent.GetVariables() # order variables by length for variable in sorted(variables, key=len, reverse=True): pattern = re.compile("%" + variable) value = "" if substitute and "variables" in substitute: for p in substitute["variables"]["params"]: if variable == p.get("name", ""): if p.get("type", "string") == "string": value = p.get("value", "") else: value = str(p.get("value", "")) break if not value: value = variables[variable].get("value", "") if not value: continue for idx in range(len(cmd)): if pattern.search(cmd[idx]): cmd[idx] = pattern.sub(value, cmd[idx]) idx += 1 if string: if cmd is None: return "" else: return " ".join(cmd) return cmd def GetLabel(self): """Get name""" if self.label: return self.label cmd = self.task.get_cmd(ignoreErrors=True) if cmd and len(cmd) > 0: return cmd[0] return _("unknown") def GetParams(self, dcopy=False): """Get dictionary of parameters""" if dcopy: return copy.deepcopy(self.task.get_options()) return self.task.get_options() def GetTask(self): """Get grassTask instance""" return self.task def SetParams(self, params): """Set dictionary of parameters""" self.task.params = params["params"] self.task.flags = params["flags"] def MergeParams(self, params): """Merge dictionary of parameters""" if "flags" in params: for f in params["flags"]: self.task.set_flag(f["name"], f.get("value", False)) if "params" in params: for p in params["params"]: self.task.set_param(p["name"], p.get("value", "")) def SetValid(self, options): """Set validity for action :param options: dictionary with flags and params (gtask) """ self.isValid = True for f in options["flags"]: if f.get("parameterized", False): self.isParameterized = True break for p in options["params"]: if ( self.isValid and p.get("required", False) and p.get("value", "") == "" and p.get("default", "") == "" ): self.isValid = False if not self.isParameterized and p.get("parameterized", False): self.isParameterized = True if self.parent.GetCanvas(): self._setBrush() self._setPen() def IsValid(self): """Check validity (all required parameters set)""" return self.isValid def IsParameterized(self): """Check if action is parameterized""" return self.isParameterized def GetParameterizedParams(self): """Return parameterized flags and options""" param = {"flags": [], "params": []} options = self.GetParams() for f in options["flags"]: if f.get("parameterized", False): param["flags"].append(f) for p in options["params"]: if p.get("parameterized", False): param["params"].append(p) return param def FindData(self, name): """Find data item by name""" for rel in self.GetRelations(): data = rel.GetData() if name == rel.GetLabel() and name in data.GetLabel(): return data return None def Update(self, running=False): """Update action""" if running: self._setBrush(running=True) else: self._setBrush() self._setPen() def OnDraw(self, dc): """Draw action in canvas""" self._setBrush() self._setPen() ogl.RectangleShape.Recentre(self, dc) # re-center text ogl.RectangleShape.OnDraw(self, dc) class ModelData(ModelObject, ogl.EllipseShape): def __init__(self, parent, x, y, value="", prompt="", width=None, height=None): """Data item class :param parent: window parent :param x, y: position of the shape :param fname, tname: list of parameter names from / to :param value: value :param prompt: type of GIS element :param width, height: dimension of the shape """ ModelObject.__init__(self) self.parent = parent self.value = value self.prompt = prompt self.intermediate = False self.display = False self.propWin = None if not width: width = UserSettings.Get( group="modeler", key="data", subkey=("size", "width") ) if not height: height = UserSettings.Get( group="modeler", key="data", subkey=("size", "height") ) if self.parent.GetCanvas(): ogl.EllipseShape.__init__(self, width, height) self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self._setPen() self._setBrush() self.SetLabel() def IsIntermediate(self): """Checks if data item is intermediate""" return self.intermediate def SetIntermediate(self, im): """Set intermediate flag""" self.intermediate = im def HasDisplay(self): """Checks if data item is marked to be displayed""" return self.display def SetHasDisplay(self, tbd): """Set to-be-displayed flag""" self.display = tbd def OnDraw(self, dc): self._setPen() ogl.EllipseShape.OnDraw(self, dc) def GetLog(self, string=True): """Get logging info""" name = list() for rel in self.GetRelations(): name.append(rel.GetLabel()) if name: return "/".join(name) + "=" + self.value + " (" + self.prompt + ")" else: return self.value + " (" + self.prompt + ")" def GetLabel(self): """Get list of names""" name = list() for rel in self.GetRelations(): name.append(rel.GetLabel()) return name def GetPrompt(self): """Get prompt""" return self.prompt def SetPrompt(self, prompt): """Set prompt :param prompt: """ self.prompt = prompt def GetValue(self): """Get value""" return self.value def SetValue(self, value): """Set value :param value: """ self.value = value self.SetLabel() for direction in ("from", "to"): for rel in self.GetRelations(direction): if direction == "from": action = rel.GetTo() else: action = rel.GetFrom() task = GUI(show=None).ParseCommand(cmd=action.GetLog(string=False)) task.set_param(rel.GetLabel(), self.value) action.MergeParams(task.get_options()) def GetPropDialog(self): """Get properties dialog""" return self.propWin def SetPropDialog(self, win): """Get properties dialog""" self.propWin = win def _setBrush(self): """Set brush""" if self.prompt == "raster": color = UserSettings.Get( group="modeler", key="data", subkey=("color", "raster") ) elif self.prompt == "raster_3d": color = UserSettings.Get( group="modeler", key="data", subkey=("color", "raster3d") ) elif self.prompt == "vector": color = UserSettings.Get( group="modeler", key="data", subkey=("color", "vector") ) elif self.prompt == "dbtable": color = UserSettings.Get( group="modeler", key="data", subkey=("color", "dbtable") ) else: color = UserSettings.Get( group="modeler", key="action", subkey=("color", "invalid") ) wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def _setPen(self): """Set pen""" isParameterized = False for rel in self.GetRelations("from"): if rel.GetTo().IsParameterized(): isParameterized = True break if not isParameterized: for rel in self.GetRelations("to"): if rel.GetFrom().IsParameterized(): isParameterized = True break if isParameterized: width = int( UserSettings.Get( group="modeler", key="action", subkey=("width", "parameterized") ) ) else: width = int( UserSettings.Get( group="modeler", key="action", subkey=("width", "default") ) ) if self.intermediate: style = wx.DOT else: style = wx.SOLID pen = wx.Pen(wx.BLACK, width, style) self.SetPen(pen) def SetLabel(self): """Update text""" self.ClearText() name = [] for rel in self.GetRelations(): name.append(rel.GetLabel()) self.AddText("/".join(name)) if self.value: self.AddText(self.value) else: self.AddText(_("")) def Update(self): """Update action""" self._setBrush() self._setPen() self.SetLabel() def GetDisplayCmd(self): """Get display command as list""" cmd = [] if self.prompt == "raster": cmd.append("d.rast") elif self.prompt == "vector": cmd.append("d.vect") else: raise GException("Unsupported display prompt: {}".format(self.prompt)) cmd.append("map=" + self.value) return cmd class ModelRelation(ogl.LineShape): """Data - action relation""" def __init__(self, parent, fromShape, toShape, param=""): self.fromShape = fromShape self.toShape = toShape self.param = param self.parent = parent self._points = None if self.parent.GetCanvas(): ogl.LineShape.__init__(self) def __del__(self): if self in self.fromShape.rels: self.fromShape.rels.remove(self) if self in self.toShape.rels: self.toShape.rels.remove(self) def GetFrom(self): """Get id of 'from' shape""" return self.fromShape def GetTo(self): """Get id of 'to' shape""" return self.toShape def GetData(self): """Get related ModelData instance :return: ModelData instance :return: None if not found """ if isinstance(self.fromShape, ModelData): return self.fromShape elif isinstance(self.toShape, ModelData): return self.toShape return None def GetLabel(self): """Get parameter name""" return self.param def ResetShapes(self): """Reset related objects""" self.fromShape.ResetControlPoints() self.toShape.ResetControlPoints() self.ResetControlPoints() def SetControlPoints(self, points): """Set control points""" self._points = points def GetControlPoints(self): """Get list of control points""" return self._points def _setPen(self): """Set pen""" pen = wx.Pen(wx.BLACK, 1, wx.SOLID) self.SetPen(pen) def OnDraw(self, dc): """Draw relation""" self._setPen() ogl.LineShape.OnDraw(self, dc) def SetName(self, param): self.param = param class ModelItem(ModelObject): def __init__( self, parent, x, y, id=-1, width=None, height=None, label="", items=[] ): """Abstract class for loops and conditions""" ModelObject.__init__(self, id, label) self.parent = parent def _setPen(self): """Set pen""" if self.isEnabled: style = wx.SOLID else: style = wx.DOT pen = wx.Pen(wx.BLACK, 1, style) self.SetPen(pen) def SetId(self, id): """Set loop id""" self.id = id def SetLabel(self, label=""): """Set loop text (condition)""" if label: self.label = label self.ClearText() self.AddText("(" + str(self.id) + ") " + self.label) def GetLog(self): """Get log info""" if self.label: return _("Condition: ") + self.label else: return _("Condition: not defined") def AddRelation(self, rel): """Record relation""" self.rels.append(rel) def Clear(self): """Clear object, remove rels""" self.rels = list() class ModelLoop(ModelItem, ogl.RectangleShape): def __init__( self, parent, x, y, id=-1, idx=-1, width=None, height=None, label="", items=[] ): """Defines a loop""" ModelItem.__init__(self, parent, x, y, id, width, height, label, items) self.itemIds = list() # unordered if not width: width = UserSettings.Get( group="modeler", key="loop", subkey=("size", "width") ) if not height: height = UserSettings.Get( group="modeler", key="loop", subkey=("size", "height") ) if self.parent.GetCanvas(): ogl.RectangleShape.__init__(self, width, height) self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self._setPen() self._setBrush() self.SetCornerRadius(100) self.SetLabel(label) def _setBrush(self): """Set brush""" if not self.isEnabled: color = UserSettings.Get(group="modeler", key="disabled", subkey="color") else: color = UserSettings.Get( group="modeler", key="loop", subkey=("color", "valid") ) wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def Enable(self, enabled=True): """Enable/disable action""" for idx in self.itemIds: item = self.parent.FindAction(idx) if item: item.Enable(enabled) ModelObject.Enable(self, enabled) self.Update() def Update(self): self._setPen() self._setBrush() def GetItems(self, items): """Get sorted items by id""" result = list() for item in items: if item.GetId() in self.itemIds: result.append(item) return result def SetItems(self, items): """Set items (id)""" self.itemIds = items def UpdateItem(self, oldId, newId): """Update item in the list""" idx = self.itemIds.index(oldId) if idx != -1: self.itemIds[idx] = newId def OnDraw(self, dc): """Draw loop in canvas""" self._setBrush() ogl.RectangleShape.Recentre(self, dc) # re-center text ogl.RectangleShape.OnDraw(self, dc) class ModelCondition(ModelItem, ogl.PolygonShape): def __init__( self, parent, x, y, id=-1, width=None, height=None, label="", items={"if": [], "else": []}, ): """Defines a if-else condition""" ModelItem.__init__(self, parent, x, y, id, width, height, label, items) self.itemIds = {"if": [], "else": []} if not width: self.width = UserSettings.Get( group="modeler", key="if-else", subkey=("size", "width") ) else: self.width = width if not height: self.height = UserSettings.Get( group="modeler", key="if-else", subkey=("size", "height") ) else: self.height = height if self.parent.GetCanvas(): ogl.PolygonShape.__init__(self) points = [ (0, -self.height / 2), (self.width / 2, 0), (0, self.height / 2), (-self.width / 2, 0), ] self.Create(points) self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self.SetPen(wx.BLACK_PEN) if label: self.AddText("(" + str(self.id) + ") " + label) else: self.AddText("(" + str(self.id) + ")") def GetLabel(self): """Get name""" return _("if-else") def GetWidth(self): """Get object width""" return self.width def GetHeight(self): """Get object height""" return self.height def GetItems(self, items): """Get sorted items by id""" result = {"if": [], "else": []} for item in items: if item.GetId() in self.itemIds["if"]: result["if"].append(item) elif item.GetId() in self.itemIds["else"]: result["else"].append(item) return result def SetItems(self, items, branch="if"): """Set items (id) :param items: list of items :param branch: 'if' / 'else' """ if branch in ["if", "else"]: self.itemIds[branch] = items class ModelComment(ModelObject, ogl.RectangleShape): def __init__(self, parent, x, y, id=-1, width=None, height=None, label=""): """Defines a model comment""" ModelObject.__init__(self, id, label) if not width: width = UserSettings.Get( group="modeler", key="comment", subkey=("size", "width") ) if not height: height = UserSettings.Get( group="modeler", key="comment", subkey=("size", "height") ) if parent.GetCanvas(): ogl.RectangleShape.__init__(self, width, height) self.SetCanvas(parent) self.SetX(x) self.SetY(y) font = wx.SystemSettings.GetFont(wx.SYS_DEFAULT_GUI_FONT) font.SetStyle(wx.ITALIC) self.SetFont(font) self._setPen() self._setBrush() self.SetLabel(label) def _setBrush(self, running=False): """Set brush""" color = UserSettings.Get(group="modeler", key="comment", subkey="color") wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def _setPen(self): """Set pen""" pen = wx.Pen(wx.BLACK, 1, wx.DOT) self.SetPen(pen) def SetLabel(self, label=None): """Set label :param label: if None use command string instead """ if label: self.label = label elif self.label: label = self.label else: label = "" idx = self.GetId() self.ClearText() self.AddText("(%d) %s" % (idx, label)) def GetComment(self): return self.GetLabel() def SetComment(self, comment): self.SetLabel(comment) self.GetCanvas().Refresh() class ProcessModelFile: """Process GRASS model file (gxm)""" def __init__(self, tree): """A ElementTree handler for the GXM XML file, as defined in grass-gxm.dtd. """ self.tree = tree self.root = self.tree.getroot() # check if input is a valid GXM file if self.root is None or self.root.tag != "gxm": if self.root is not None: tagName = self.root.tag else: tabName = _("empty") raise GException(_("Details: unsupported tag name '{0}'.").format(tagName)) # list of actions, data self.properties = dict() self.variables = dict() self.actions = list() self.data = list() self.loops = list() self.conditions = list() self.comments = list() self._processWindow() self._processProperties() self._processVariables() self._processItems() self._processData() def _filterValue(self, value): """Filter value :param value: """ value = value.replace("<", "<") value = value.replace(">", ">") return value def _getNodeText(self, node, tag, default=""): """Get node text""" p = node.find(tag) if p is not None: if p.text: return utils.normalize_whitespace(p.text) else: return "" return default def _processWindow(self): """Process window properties""" node = self.root.find("window") if node is None: self.pos = self.size = None return self.pos, self.size = self._getDim(node) def _processProperties(self): """Process model properties""" node = self.root.find("properties") if node is None: return for key in ("name", "description", "author"): self._processProperty(node, key) for f in node.findall("flag"): name = f.get("name", "") if name == "overwrite": self.properties["overwrite"] = True def _processProperty(self, pnode, name): """Process given property""" node = pnode.find(name) if node is not None: self.properties[name] = node.text else: self.properties[name] = "" def _processVariables(self): """Process model variables""" vnode = self.root.find("variables") if vnode is None: return for node in vnode.findall("variable"): name = node.get("name", "") if not name: continue # should not happen self.variables[name] = {"type": node.get("type", "string")} for key in ("description", "value"): self._processVariable(node, name, key) def _processVariable(self, pnode, name, key): """Process given variable""" node = pnode.find(key) if node is not None: if node.text: self.variables[name][key] = node.text def _processItems(self): """Process model items (actions, loops, conditions)""" self._processActions() self._processLoops() self._processConditions() self._processComments() def _processActions(self): """Process model file""" for action in self.root.findall("action"): pos, size = self._getDim(action) disabled = False task = action.find("task") if task is not None: if task.find("disabled") is not None: disabled = True task = self._processTask(task) else: task = None aId = int(action.get("id", -1)) label = action.get("name") comment = action.find("comment") if comment is not None: commentString = comment.text else: commentString = "" self.actions.append( { "pos": pos, "size": size, "task": task, "id": aId, "disabled": disabled, "label": label, "comment": commentString, } ) def _getDim(self, node): """Get position and size of shape""" pos = size = None posAttr = node.get("pos", None) if posAttr: posVal = list(map(int, posAttr.split(","))) try: pos = (posVal[0], posVal[1]) except: pos = None sizeAttr = node.get("size", None) if sizeAttr: sizeVal = list(map(int, sizeAttr.split(","))) try: size = (sizeVal[0], sizeVal[1]) except: size = None return pos, size def _processData(self): """Process model file""" for data in self.root.findall("data"): pos, size = self._getDim(data) param = data.find("data-parameter") prompt = value = None if param is not None: prompt = param.get("prompt", None) value = self._filterValue(self._getNodeText(param, "value")) intermediate = False if data.find("intermediate") is None else True display = False if data.find("display") is None else True rels = list() for rel in data.findall("relation"): defrel = { "id": int(rel.get("id", -1)), "dir": rel.get("dir", "to"), "name": rel.get("name", ""), } points = list() for point in rel.findall("point"): x = self._filterValue(self._getNodeText(point, "x")) y = self._filterValue(self._getNodeText(point, "y")) points.append((float(x), float(y))) defrel["points"] = points rels.append(defrel) self.data.append( { "pos": pos, "size": size, "prompt": prompt, "value": value, "intermediate": intermediate, "display": display, "rels": rels, } ) def _processTask(self, node): """Process task :return: grassTask instance :return: None on error """ cmd = list() parameterized = list() name = node.get("name", None) if not name: return None cmd.append(name) # flags for f in node.findall("flag"): flag = f.get("name", "") if f.get("parameterized", "0") == "1": parameterized.append(("flag", flag)) if f.get("value", "1") == "0": continue if len(flag) > 1: cmd.append("--" + flag) else: cmd.append("-" + flag) # parameters for p in node.findall("parameter"): name = p.get("name", "") if p.find("parameterized") is not None: parameterized.append(("param", name)) cmd.append( "%s=%s" % (name, self._filterValue(self._getNodeText(p, "value"))) ) task, err = GUI(show=None, checkError=True).ParseCommand(cmd=cmd) if err: GWarning(os.linesep.join(err)) for opt, name in parameterized: if opt == "flag": task.set_flag(name, True, element="parameterized") else: task.set_param(name, True, element="parameterized") return task def _processLoops(self): """Process model loops""" for node in self.root.findall("loop"): pos, size = self._getDim(node) text = self._filterValue(self._getNodeText(node, "condition")).strip() aid = list() for anode in node.findall("item"): try: aid.append(int(anode.text)) except ValueError: pass self.loops.append( { "pos": pos, "size": size, "text": text, "id": int(node.get("id", -1)), "items": aid, } ) def _processConditions(self): """Process model conditions""" for node in self.root.findall("if-else"): pos, size = self._getDim(node) text = self._filterValue(self._getNodeText(node, "condition")).strip() aid = {"if": list(), "else": list()} for b in aid.keys(): bnode = node.find(b) if bnode is None: continue for anode in bnode.findall("item"): try: aid[b].append(int(anode.text)) except ValueError: pass self.conditions.append( { "pos": pos, "size": size, "text": text, "id": int(node.get("id", -1)), "items": aid, } ) def _processComments(self): """Process model comments""" for node in self.root.findall("comment"): pos, size = self._getDim(node) text = self._filterValue(node.text) self.comments.append( { "pos": pos, "size": size, "text": text, "id": int(node.get("id", -1)), "text": text, } ) class WriteModelFile: """Generic class for writing model file""" def __init__(self, fd, model): self.fd = fd self.model = model self.properties = model.GetProperties() self.variables = model.GetVariables() self.items = model.GetItems() self.indent = 0 self._header() self._window() self._properties() self._variables() self._items() dataList = list() for action in model.GetItems(objType=ModelAction): for rel in action.GetRelations(): dataItem = rel.GetData() if dataItem not in dataList: dataList.append(dataItem) self._data(dataList) self._footer() def _filterValue(self, value): """Escapes value to be stored in XML. :param value: string to be escaped as XML :return: a XML-valid string """ value = saxutils.escape(value) return value def _header(self): """Write header""" self.fd.write( '\n' % GetDefaultEncoding(forceUTF8=True) ) self.fd.write('\n') self.fd.write("%s\n" % (" " * self.indent)) self.indent += 4 def _footer(self): """Write footer""" self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) def _window(self): """Write window properties""" canvas = self.model.GetCanvas() if canvas is None: return win = canvas.parent pos = win.GetPosition() size = win.GetSize() self.fd.write( '%s\n' % (" " * self.indent, pos[0], pos[1], size[0], size[1]) ) def _properties(self): """Write model properties""" self.fd.write("%s\n" % (" " * self.indent)) self.indent += 4 if self.properties["name"]: self.fd.write( "%s%s\n" % (" " * self.indent, self.properties["name"]) ) if self.properties["description"]: self.fd.write( "%s%s\n" % (" " * self.indent, self.properties["description"]) ) if self.properties["author"]: self.fd.write( "%s%s\n" % (" " * self.indent, self.properties["author"]) ) if "overwrite" in self.properties and self.properties["overwrite"]: self.fd.write('%s\n' % (" " * self.indent)) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) def _variables(self): """Write model variables""" if not self.variables: return self.fd.write("%s\n" % (" " * self.indent)) self.indent += 4 for name, values in six.iteritems(self.variables): self.fd.write( '%s\n' % (" " * self.indent, name, values["type"]) ) self.indent += 4 if "value" in values: self.fd.write( "%s%s\n" % (" " * self.indent, values["value"]) ) if "description" in values: self.fd.write( "%s%s\n" % (" " * self.indent, values["description"]) ) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) def _items(self): """Write actions/loops/conditions""" for item in self.items: if isinstance(item, ModelAction): self._action(item) elif isinstance(item, ModelLoop): self._loop(item) elif isinstance(item, ModelCondition): self._condition(item) elif isinstance(item, ModelComment): self._comment(item) def _action(self, action): """Write actions""" self.fd.write( '%s\n' % ( " " * self.indent, action.GetId(), action.GetLabel(), action.GetX(), action.GetY(), action.GetWidth(), action.GetHeight(), ) ) self.indent += 4 comment = action.GetComment() if comment: self.fd.write("%s%s\n" % (" " * self.indent, comment)) self.fd.write( '%s\n' % (" " * self.indent, action.GetLog(string=False)[0]) ) self.indent += 4 if not action.IsEnabled(): self.fd.write("%s\n" % (" " * self.indent)) for key, val in six.iteritems(action.GetParams()): if key == "flags": for f in val: if f.get("value", False) or f.get("parameterized", False): if f.get("parameterized", False): if f.get("value", False) is False: self.fd.write( '%s\n' % (" " * self.indent, f.get("name", "")) ) else: self.fd.write( '%s\n' % (" " * self.indent, f.get("name", "")) ) else: self.fd.write( '%s\n' % (" " * self.indent, f.get("name", "")) ) else: # parameter for p in val: if not p.get("value", "") and not p.get("parameterized", False): continue self.fd.write( '%s\n' % (" " * self.indent, p.get("name", "")) ) self.indent += 4 if p.get("parameterized", False): self.fd.write("%s\n" % (" " * self.indent)) self.fd.write( "%s%s\n" % (" " * self.indent, self._filterValue(p.get("value", ""))) ) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) def _data(self, dataList): """Write data""" for data in dataList: self.fd.write( '%s\n' % ( " " * self.indent, data.GetX(), data.GetY(), data.GetWidth(), data.GetHeight(), ) ) self.indent += 4 self.fd.write( '%s\n' % (" " * self.indent, data.GetPrompt()) ) self.indent += 4 self.fd.write( "%s%s\n" % (" " * self.indent, self._filterValue(data.GetValue())) ) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) if data.IsIntermediate(): self.fd.write("%s\n" % (" " * self.indent)) if data.HasDisplay(): self.fd.write("%s\n" % (" " * self.indent)) # relations for ft in ("from", "to"): for rel in data.GetRelations(ft): if ft == "from": aid = rel.GetTo().GetId() else: aid = rel.GetFrom().GetId() self.fd.write( '%s\n' % (" " * self.indent, ft, aid, rel.GetLabel()) ) self.indent += 4 for point in rel.GetLineControlPoints()[1:-1]: self.fd.write("%s\n" % (" " * self.indent)) self.indent += 4 x, y = point.Get() self.fd.write("%s%d\n" % (" " * self.indent, int(x))) self.fd.write("%s%d\n" % (" " * self.indent, int(y))) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) def _loop(self, loop): """Write loops""" self.fd.write( '%s\n' % ( " " * self.indent, loop.GetId(), loop.GetX(), loop.GetY(), loop.GetWidth(), loop.GetHeight(), ) ) self.indent += 4 cond = loop.GetLabel() if cond: self.fd.write( "%s%s\n" % (" " * self.indent, self._filterValue(cond)) ) for item in loop.GetItems(self.model.GetItems(objType=ModelAction)): self.fd.write("%s%d\n" % (" " * self.indent, item.GetId())) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) def _condition(self, condition): """Write conditions""" bbox = condition.GetBoundingBoxMin() self.fd.write( '%s\n' % ( " " * self.indent, condition.GetId(), condition.GetX(), condition.GetY(), bbox[0], bbox[1], ) ) text = condition.GetLabel() self.indent += 4 if text: self.fd.write( "%s%s\n" % (" " * self.indent, self._filterValue(text)) ) items = condition.GetItems() for b in items.keys(): if len(items[b]) < 1: continue self.fd.write("%s<%s>\n" % (" " * self.indent, b)) self.indent += 4 for item in items[b]: self.fd.write("%s%d\n" % (" " * self.indent, item.GetId())) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent, b)) self.indent -= 4 self.fd.write("%s\n" % (" " * self.indent)) def _comment(self, comment): """Write comment""" self.fd.write( '%s%s\n' % ( " " * self.indent, comment.GetId(), comment.GetX(), comment.GetY(), comment.GetWidth(), comment.GetHeight(), comment.GetLabel(), ) ) class WritePythonFile: def __init__(self, fd, model): """Class for exporting model to Python script :param fd: file descriptor """ self.fd = fd self.model = model self.indent = 4 self._writePython() def _getStandardizedOption(self, string): if string == "raster": return "G_OPT_R_MAP" elif string == "vector": return "G_OPT_V_MAP" elif string == "mapset": return "G_OPT_M_MAPSET" elif string == "file": return "G_OPT_F_INPUT" elif string == "dir": return "G_OPT_M_DIR" elif string == "region": return "G_OPT_M_REGION" return "" def _writePython(self): """Write model to file""" properties = self.model.GetProperties() # header self.fd.write( r"""#!/usr/bin/env python3 # #{header_begin} # # MODULE: {module_name} # # AUTHOR(S): {author} # # PURPOSE: {purpose} # # DATE: {date} # #{header_end} """.format( header_begin="#" * 77, module_name=properties["name"], author=properties["author"], purpose="\n# ".join(properties["description"].splitlines()), date=time.asctime(), header_end="#" * 77, ) ) # UI self.fd.write( r""" # %module # % description: {description} # %end """.format( description=" ".join(properties["description"].splitlines()) ) ) modelItems = self.model.GetItems() for item in modelItems: for flag in item.GetParameterizedParams()["flags"]: if flag["label"]: desc = flag["label"] else: desc = flag["description"] self.fd.write( r"""# %option # % key: {flag_name} # % description: {description} # % required: yes # % type: string # % options: True, False # % guisection: Flags """.format( flag_name=self._getParamName(flag["name"], item), description=desc, ) ) if flag["value"]: self.fd.write("# % answer: {}\n".format(flag["value"])) else: self.fd.write("# % answer: False\n") self.fd.write("# %end\n") for param in item.GetParameterizedParams()["params"]: if param["label"]: desc = param["label"] else: desc = param["description"] self.fd.write( r"""# %option # % key: {param_name} # % description: {description} # % required: yes """.format( param_name=self._getParamName(param["name"], item), description=desc, ) ) if param["type"] != "float": self.fd.write("# % type: {}\n".format(param["type"])) else: self.fd.write("# % type: double\n") if param["key_desc"]: self.fd.write("# % key_desc: ") self.fd.write(", ".join(param["key_desc"])) self.fd.write("\n") if param["value"]: self.fd.write("# % answer: {}\n".format(param["value"])) self.fd.write("# %end\n") # import modules self.fd.write( r""" import sys import os import atexit from grass.script import parser, run_command """ ) # cleanup() rast, vect, rast3d, msg = self.model.GetIntermediateData() self.fd.write( r""" def cleanup(): """ ) if rast: self.fd.write( r""" run_command('g.remove', flags='f', type='raster', name=%s) """ % ",".join(map(lambda x: "'" + x + "'", rast)) ) if vect: self.fd.write( r""" run_command('g.remove', flags='f', type='vector', name=%s) """ % ",".join(map(lambda x: "'" + x + "'", vect)) ) if rast3d: self.fd.write( r""" run_command('g.remove', flags='f', type='raster_3d', name=%s) """ % ",".join(map(lambda x: "'" + x + "'", rast3d)) ) if not rast and not vect and not rast3d: self.fd.write(" pass\n") self.fd.write("\ndef main(options, flags):\n") for item in self.model.GetItems(): self._writePythonItem(item, variables=item.GetParameterizedParams()) self.fd.write(" return 0\n") for item in modelItems: if item.GetParameterizedParams()["flags"]: self.fd.write( r""" def getParameterizedFlags(paramFlags, itemFlags): fl = '' """ ) self.fd.write( """ for i in [key for key, value in paramFlags.items() if value == 'True']: if i in itemFlags: fl += i[-1] return fl """ ) break self.fd.write( r""" if __name__ == "__main__": options, flags = parser() atexit.register(cleanup) """ ) if properties.get("overwrite"): self.fd.write(' os.environ["GRASS_OVERWRITE"] = "1"\n') self.fd.write(" sys.exit(main(options, flags))\n") def _writePythonItem(self, item, ignoreBlock=True, variables={}): """Write model object to Python file""" if isinstance(item, ModelAction): if ignoreBlock and item.GetBlockId(): # ignore items in loops of conditions return self._writePythonAction(item, variables=variables) elif isinstance(item, ModelLoop) or isinstance(item, ModelCondition): # substitute condition cond = item.GetLabel() for variable in self.model.GetVariables(): pattern = re.compile("%" + variable) if pattern.search(cond): value = variables[variable].get("value", "") if variables[variable].get("type", "string") == "string": value = '"' + value + '"' cond = pattern.sub(value, cond) if isinstance(item, ModelLoop): condVar, condText = map( lambda x: x.strip(), re.split("\s* in \s*", cond) ) cond = "%sfor %s in " % (" " * self.indent, condVar) if condText[0] == "`" and condText[-1] == "`": task = GUI(show=None).ParseCommand(cmd=utils.split(condText[1:-1])) cond += "grass.read_command(" cond += ( self._getPythonActionCmd(task, len(cond), variables=[condVar]) + ".splitlines()" ) else: cond += condText self.fd.write("%s:\n" % cond) self.indent += 4 variablesLoop = variables.copy() variablesLoop[condVar] = None for action in item.GetItems(self.model.GetItems(objType=ModelAction)): self._writePythonItem( action, ignoreBlock=False, variables=variablesLoop ) self.indent -= 4 if isinstance(item, ModelCondition): self.fd.write("%sif %s:\n" % (" " * self.indent, cond)) self.indent += 4 condItems = item.GetItems() for action in condItems["if"]: self._writePythonItem(action, ignoreBlock=False) if condItems["else"]: self.indent -= 4 self.fd.write("%selse:\n" % (" " * self.indent)) self.indent += 4 for action in condItems["else"]: self._writePythonItem(action, ignoreBlock=False) self.indent += 4 self.fd.write("\n") if isinstance(item, ModelComment): self._writePythonComment(item) def _writePythonAction(self, item, variables={}): """Write model action to Python file""" task = GUI(show=None).ParseCommand(cmd=item.GetLog(string=False)) strcmd = "%srun_command(" % (" " * self.indent) self.fd.write( strcmd + self._getPythonActionCmd(item, task, len(strcmd), variables) + "\n" ) def _getPythonActionCmd(self, item, task, cmdIndent, variables={}): opts = task.get_options() ret = "" flags = "" params = list() itemParameterizedFlags = list() parameterizedParams = [v["name"] for v in variables["params"]] parameterizedFlags = [v["name"] for v in variables["flags"]] for f in opts["flags"]: if f.get("name") in parameterizedFlags and len(f.get("name")) == 1: itemParameterizedFlags.append( '"{}"'.format(self._getParamName(f.get("name"), item)) ) if f.get("value", False): name = f.get("name", "") if len(name) > 1: params.append("%s = True" % name) else: flags += name itemParameterizedFlags = ", ".join(itemParameterizedFlags) for p in opts["params"]: name = p.get("name", None) value = p.get("value", None) if (name and value) or (name in parameterizedParams): ptype = p.get("type", "string") foundVar = False if name in parameterizedParams: foundVar = True value = 'options["{}"]'.format(self._getParamName(name, item)) if foundVar or ptype != "string": params.append("{}={}".format(name, value)) else: params.append('{}="{}"'.format(name, value)) ret += '"%s"' % task.get_name() if flags: ret += ",\n{indent}flags='{fl}'".format(indent=" " * cmdIndent, fl=flags) if itemParameterizedFlags: ret += " + getParameterizedFlags(options, [{}])".format( itemParameterizedFlags ) elif itemParameterizedFlags: ret += ",\n{}flags=getParameterizedFlags(options, [{}])".format( " " * cmdIndent, itemParameterizedFlags ) if len(params) > 0: ret += ",\n" for opt in params[:-1]: ret += "%s%s,\n" % (" " * cmdIndent, opt) ret += "%s%s)" % (" " * cmdIndent, params[-1]) else: ret += ")" return ret def _writePythonComment(self, item): """Write model comment to Python file""" for line in item.GetLabel().splitlines(): self.fd.write("#" + line + "\n") def _substituteVariable(self, string, variable, data): """Substitute variable in the string :param string: string to be modified :param variable: variable to be substituted :param data: data related to the variable :return: modified string """ result = "" ss = re.split("\w*(%" + variable + ")w*", string) if not ss[0] and not ss[-1]: if data: return "options['%s']" % variable else: return variable for s in ss: if not s or s == '"': continue if s == "%" + variable: if data: result += "+options['%s']+" % variable else: result += "+%s+" % variable else: result += '"' + s if not s.endswith("]"): # options result += '"' return result.strip("+") def _getParamName(self, parameter_name, item): return "{module_name}{module_id}_{param_name}".format( module_name=re.sub("[^a-zA-Z]+", "", item.GetLabel()), module_id=item.GetId(), param_name=parameter_name, ) class ModelParamDialog(wx.Dialog): def __init__( self, parent, model, params, id=wx.ID_ANY, title=_("Model parameters"), style=wx.DEFAULT_DIALOG_STYLE | wx.RESIZE_BORDER, **kwargs, ): """Model parameters dialog""" self.parent = parent self._model = model self.params = params self.tasks = list() # list of tasks/pages wx.Dialog.__init__( self, parent=parent, id=id, title=title, style=style, **kwargs ) self.notebook = GNotebook(parent=self, style=globalvar.FNPageDStyle) panel = self._createPages() wx.CallAfter(self.notebook.SetSelection, 0) # intermediate data? self.interData = wx.CheckBox( parent=self, label=_("Delete intermediate data when finish") ) self.interData.SetValue(True) rast, vect, rast3d, msg = self._model.GetIntermediateData() if not rast and not vect and not rast3d: self.interData.Hide() self.btnCancel = Button(parent=self, id=wx.ID_CANCEL) self.btnRun = Button(parent=self, id=wx.ID_OK, label=_("&Run")) self.btnRun.SetDefault() self._layout() size = self.GetBestSize() self.SetMinSize(size) self.SetSize( (size.width, size.height + panel.constrained_size[1] - panel.panelMinHeight) ) def _layout(self): btnSizer = wx.StdDialogButtonSizer() btnSizer.AddButton(self.btnCancel) btnSizer.AddButton(self.btnRun) btnSizer.Realize() mainSizer = wx.BoxSizer(wx.VERTICAL) mainSizer.Add(self.notebook, proportion=1, flag=wx.EXPAND) if self.interData.IsShown(): mainSizer.Add( self.interData, proportion=0, flag=wx.EXPAND | wx.ALL | wx.ALIGN_CENTER, border=5, ) mainSizer.Add( wx.StaticLine(parent=self, id=wx.ID_ANY, style=wx.LI_HORIZONTAL), proportion=0, flag=wx.EXPAND | wx.LEFT | wx.RIGHT, border=5, ) mainSizer.Add(btnSizer, proportion=0, flag=wx.EXPAND | wx.ALL, border=5) self.SetSizer(mainSizer) mainSizer.Fit(self) def _createPages(self): """Create for each parameterized module its own page""" nameOrdered = [""] * len(self.params.keys()) for name, params in six.iteritems(self.params): nameOrdered[params["idx"]] = name for name in nameOrdered: params = self.params[name] panel = self._createPage(name, params) if name == "variables": name = _("Variables") self.notebook.AddPage(page=panel, text=name) return panel def _createPage(self, name, params): """Define notebook page""" if name in globalvar.grassCmd: task = gtask.grassTask(name) else: task = gtask.grassTask() task.flags = params["flags"] task.params = params["params"] panel = CmdPanel( parent=self, id=wx.ID_ANY, task=task, giface=GraphicalModelerGrassInterface(self._model), ) self.tasks.append(task) return panel def GetErrors(self): """Check for errors, get list of messages""" errList = list() for task in self.tasks: errList += task.get_cmd_error() return errList def DeleteIntermediateData(self): """Check if to detele intermediate data""" if self.interData.IsShown() and self.interData.IsChecked(): return True return False