""" @package gmodeler.model @brief wxGUI Graphical Modeler (base classes & read/write) Classes: - model::Model - model::ModelObject - model::ModelAction - model::ModelData - model::ModelRelation - model::ModelItem - model::ModelLoop - model::ModelCondition - model::ModelComment - model::ProcessModelFile - model::WriteModelFile - model::WritePythonFile - model::ModelParamDialog (C) 2010-2018 by the GRASS Development Team This program is free software under the GNU General Public License (>=v2). Read the file COPYING that comes with GRASS for details. @author Martin Landa @author Python parameterization Ondrej Pesek """ import os import getpass import copy import re import mimetypes import time import six try: import xml.etree.ElementTree as etree except ImportError: import elementtree.ElementTree as etree # Python <= 2.4 import xml.sax.saxutils as saxutils import wx from wx.lib import ogl from core import globalvar from core import utils from core.gcmd import GMessage, GException, GError, RunCommand, GWarning, GetDefaultEncoding from core.settings import UserSettings from gui_core.forms import GUI, CmdPanel from gui_core.widgets import GNotebook from gui_core.wrap import Button from gmodeler.giface import GraphicalModelerGrassInterface from grass.script import core as grass from grass.script import task as gtask class Model(object): """Class representing the model""" def __init__(self, canvas=None): self.items = list() # list of ordered items (action/loop/condition) # model properties self.properties = { 'name': _("model"), 'description': _("Script generated by wxGUI Graphical Modeler."), 'author': getpass.getuser()} # model variables self.variables = dict() self.variablesParams = dict() self.canvas = canvas def GetCanvas(self): """Get canvas or None""" return self.canvas def GetItems(self, objType=None): """Get list of model items :param objType: Object type to filter model objects """ if not objType: return self.items result = list() for item in self.items: if isinstance(item, objType): result.append(item) return result def GetItem(self, aId, objType=None): """Get item of given id :param aId: item id :return: Model* instance :return: None if no item found """ ilist = self.GetItems(objType) for item in ilist: if item.GetId() == aId: return item return None def GetItemIndex(self, item): """Return list index of given item""" return self.items.index(item) def GetNumItems(self, actionOnly=False): """Get number of items""" if actionOnly: return len(self.GetItems(objType=ModelAction)) return len(self.GetItems()) def ReorderItems(self, idxList): items = list() for oldIdx, newIdx in six.iteritems(idxList): item = self.items.pop(oldIdx) items.append(item) self.items.insert(newIdx, item) # try: # nextItem = self.items[newIdx+1] # except IndexError: # continue # newIdx is the last item in the list # items.append(nextItem) # x = item.GetX() # y = item.GetY() # item.SetX(nextItem.GetX()) # item.SetY(nextItem.GetY()) # nextItem.SetX(x) # nextItem.SetY(y) dc = wx.ClientDC(self.canvas) for item in items: item.MoveLinks(dc) for mo in item.GetBlock(): if isinstance(mo, ModelLoop): self.canvas.parent.DefineLoop(mo) elif isinstance(mo, ModelCondition): self.canvas.parent.DefineCondition(mo) def Normalize(self): # check for inconsistecies for idx in range(1, len(self.items)): if not self.items[idx].GetBlock() and \ isinstance(self.items[idx - 1], ModelLoop): # swap action not-in-block with previously defined # loop itemPrev = self.items[idx - 1] self.items[idx - 1] = self.items[idx] self.items[idx] = itemPrev # update ids iId = 1 for item in self.items: item.SetId(iId) item.SetLabel() iId += 1 def GetNextId(self): """Get next id (data ignored) :return: next id to be used (default: 1) """ if len(self.items) < 1: return 1 currId = self.items[-1].GetId() if currId > 0: return currId + 1 return 1 def GetProperties(self): """Get model properties""" return self.properties def GetVariables(self, params=False): """Get model variables""" if params: return self.variablesParams return self.variables def SetVariables(self, data): """Set model variables""" self.variables = data def Reset(self): """Reset model""" self.items = list() def RemoveItem(self, item, reference=None): """Remove item from model :item: item to be removed :reference: reference item valid for deletion :return: list of related items to remove/update """ relList = list() upList = list() doRemove = True if isinstance(item, ModelAction): for rel in item.GetRelations(): relList.append(rel) data = rel.GetData() if len(data.GetRelations()) < 2: relList.append(data) else: upList.append(data) elif isinstance(item, ModelData): for rel in item.GetRelations(): otherItem = rel.GetTo() if rel.GetFrom() == item else rel.GetFrom() if reference and reference != otherItem: doRemove = False continue # avoid recursive deletion relList.append(rel) if reference and reference != otherItem: relList.append(otherItem) if not doRemove: upList.append(item) elif isinstance(item, ModelLoop): for rel in item.GetRelations(): relList.append(rel) for action in self.GetItems(): action.UnSetBlock(item) if doRemove and item in self.items: self.items.remove(item) return relList, upList def FindAction(self, aId): """Find action by id""" alist = self.GetItems(objType=ModelAction) for action in alist: if action.GetId() == aId: return action return None def GetMaps(self, prompt): """Get list of maps of selected type :param prompt: to filter maps""" maps = list() for data in self.GetData(): if prompt == data.GetPrompt(): mapName = data.GetValue() if not mapName or mapName[0] is '%': continue # skip variables maps.append(mapName) return maps def GetData(self): """Get list of data items""" result = list() dataItems = self.GetItems(objType=ModelData) for action in self.GetItems(objType=ModelAction): for rel in action.GetRelations(): dataItem = rel.GetData() if dataItem not in result: result.append(dataItem) if dataItem in dataItems: dataItems.remove(dataItem) # standalone data if dataItems: result += dataItems return result def FindData(self, value, prompt): """Find data item in the model :param value: value :param prompt: prompt :return: ModelData instance :return: None if not found """ for data in self.GetData(): if data.GetValue() == value and \ data.GetPrompt() == prompt: return data return None def LoadModel(self, filename): """Load model definition stored in GRASS Model XML file (gxm) .. todo:: Validate against DTD Raise exception on error. """ dtdFilename = os.path.join(globalvar.WXGUIDIR, "xml", "grass-gxm.dtd") # parse workspace file try: gxmXml = ProcessModelFile(etree.parse(filename)) except Exception as e: raise GException(unicode(e)) if self.canvas: win = self.canvas.parent if gxmXml.pos: win.SetPosition(gxmXml.pos) if gxmXml.size: win.SetSize(gxmXml.size) # load properties self.properties = gxmXml.properties self.variables = gxmXml.variables # load actions for action in gxmXml.actions: actionItem = ModelAction(parent=self, x=action['pos'][0], y=action['pos'][1], width=action['size'][0], height=action['size'][1], task=action['task'], id=action['id'], label=action['label'], comment=action['comment']) if action['disabled']: actionItem.Enable(False) self.AddItem(actionItem, pos=actionItem.GetId() - 1) actionItem.SetValid(actionItem.GetTask().get_options()) actionItem.GetLog() # substitute variables (-> valid/invalid) # load data & relations for data in gxmXml.data: dataItem = ModelData(parent=self, x=data['pos'][0], y=data['pos'][1], width=data['size'][0], height=data['size'][1], prompt=data['prompt'], value=data['value']) dataItem.SetIntermediate(data['intermediate']) dataItem.SetHasDisplay(data['display']) for rel in data['rels']: actionItem = self.FindAction(rel['id']) if rel['dir'] == 'from': relation = ModelRelation( parent=self, fromShape=dataItem, toShape=actionItem, param=rel['name']) else: relation = ModelRelation( parent=self, fromShape=actionItem, toShape=dataItem, param=rel['name']) relation.SetControlPoints(rel['points']) actionItem.AddRelation(relation) dataItem.AddRelation(relation) if self.canvas: dataItem.Update() # load loops for loop in gxmXml.loops: loopItem = ModelLoop(parent=self, x=loop['pos'][0], y=loop['pos'][1], width=loop['size'][0], height=loop['size'][1], label=loop['text'], id=loop['id']) self.AddItem(loopItem, pos=loopItem.GetId() - 1) # load conditions for condition in gxmXml.conditions: conditionItem = ModelCondition(parent=self, x=condition['pos'][0], y=condition['pos'][1], width=condition['size'][0], height=condition['size'][1], label=condition['text'], id=condition['id']) self.AddItem(conditionItem, pos=conditionItem.GetId() - 1) # define loops & if/else items for loop in gxmXml.loops: loopItem = self.GetItem(loop['id'], objType=ModelLoop) loopItem.SetItems(loop['items']) for idx in loop['items']: action = self.GetItem(idx, objType=ModelAction) action.SetBlock(loopItem) for condition in gxmXml.conditions: conditionItem = self.GetItem(condition['id']) for b in condition['items'].keys(): alist = list() for aId in condition['items'][b]: action = self.GetItem(aId) alist.append(action) conditionItem.SetItems(alist, branch=b) items = conditionItem.GetItems() for b in items.keys(): for action in items[b]: action.SetBlock(conditionItem) # load comments for comment in gxmXml.comments: commentItem = ModelComment(parent=self, x=comment['pos'][0], y=comment['pos'][1], width=comment['size'][0], height=comment['size'][1], id=comment['id'], label=comment['text']) self.AddItem(commentItem, pos=commentItem.GetId() - 1) def AddItem(self, newItem, pos=-1): """Add item to the list""" if pos != -1: self.items.insert(pos, newItem) else: self.items.append(newItem) # i = 1 # for item in self.items: # item.SetId(i) # i += 1 def IsValid(self): """Return True if model is valid""" if self.Validate(): return False return True def Validate(self): """Validate model, return None if model is valid otherwise error string""" errList = list() variables = self.GetVariables().keys() pattern = re.compile(r'(.*)(%.+\s?)(.*)') for action in self.GetItems(objType=ModelAction): cmd = action.GetLog(string=False) task = GUI(show=None).ParseCommand(cmd=cmd) errList += map(lambda x: cmd[0] + ': ' + x, task.get_cmd_error()) # check also variables for opt in cmd[1:]: if '=' not in opt: continue key, value = opt.split('=', 1) sval = pattern.search(value) if sval: var = sval.group(2).strip()[1:] # strip '%' from beginning found = False for v in variables: if var.startswith(v): found = True break if not found: report = True for item in filter( lambda x: isinstance(x, ModelLoop), action.GetBlock()): if var in item.GetLabel(): report = False break if report: errList.append( cmd[0] + ": " + _("undefined variable '%s'") % var) # TODO: check variables in file only optionally ### errList += self._substituteFile(action, checkOnly = True) return errList def _substituteFile(self, item, params=None, checkOnly=False): """Subsitute variables in command file inputs :param bool checkOnly: tuble - True to check variable, don't touch files :return: list of undefined variables """ errList = list() self.fileInput = dict() # collect ascii inputs for p in item.GetParams()['params']: if p.get('element', '') == 'file' and \ p.get('prompt', '') == 'input' and \ p.get('age', '') == 'old': filename = p.get('value', p.get('default', '')) if filename and \ mimetypes.guess_type(filename)[0] == 'text/plain': self.fileInput[filename] = None for finput in self.fileInput: # read lines fd = open(finput, "r") try: data = self.fileInput[finput] = fd.read() finally: fd.close() # substitute variables write = False variables = self.GetVariables() for variable in variables: pattern = re.compile('%' + variable) value = '' if params and 'variables' in params: for p in params['variables']['params']: if variable == p.get('name', ''): if p.get('type', 'string') == 'string': value = p.get('value', '') else: value = str(p.get('value', '')) break if not value: value = variables[variable].get('value', '') data = pattern.sub(value, data) if not checkOnly: write = True pattern = re.compile(r'(.*)(%.+\s?)(.*)') sval = pattern.search(data) if sval: var = sval.group(2).strip()[1:] # ignore '%' cmd = item.GetLog(string=False)[0] errList.append(cmd + ": " + _("undefined variable '%s'") % var) if not checkOnly: if write: fd = open(finput, "w") try: fd.write(data) finally: fd.close() else: self.fileInput[finput] = None return errList def OnPrepare(self, item, params): self._substituteFile(item, params, checkOnly=False) def RunAction(self, item, params, log, onDone=None, onPrepare=None, statusbar=None): """Run given action :param item: action item :param params: parameters dict :param log: logging window :param onDone: on-done method :param onPrepare: on-prepare method :param statusbar: wx.StatusBar instance or None """ name = '({0}) {1}'.format(item.GetId(), item.GetLabel()) if name in params: paramsOrig = item.GetParams(dcopy=True) item.MergeParams(params[name]) if statusbar: statusbar.SetStatusText(_('Running model...'), 0) data = {'item': item, 'params': copy.deepcopy(params)} log.RunCmd(command=item.GetLog(string=False, substitute=params), onDone=onDone, onPrepare=self.OnPrepare, userData=data) if name in params: item.SetParams(paramsOrig) def Run(self, log, onDone, parent=None): """Run model :param log: logging window (see gconsole.GConsole) :param onDone: on-done method :param parent: window for messages or None """ if self.GetNumItems() < 1: GMessage(parent=parent, message=_('Model is empty. Nothing to run.')) return statusbar = None if isinstance(parent, wx.Frame): statusbar = parent.GetStatusBar() # validation if statusbar: statusbar.SetStatusText(_('Validating model...'), 0) errList = self.Validate() if statusbar: statusbar.SetStatusText('', 0) if errList: dlg = wx.MessageDialog( parent=parent, message=_( 'Model is not valid. Do you want to ' 'run the model anyway?\n\n%s') % '\n'.join(errList), caption=_("Run model?"), style=wx.YES_NO | wx.NO_DEFAULT | wx.ICON_QUESTION | wx.CENTRE) ret = dlg.ShowModal() dlg.Destroy() if ret != wx.ID_YES: return # parametrization params = self.Parameterize() delInterData = False if params: dlg = ModelParamDialog(parent=parent, model=self, params=params) dlg.CenterOnParent() ret = dlg.ShowModal() if ret != wx.ID_OK: dlg.Destroy() return err = dlg.GetErrors() delInterData = dlg.DeleteIntermediateData() dlg.Destroy() if err: GError(parent=parent, message=unicode('\n'.join(err))) return err = list() for key, item in six.iteritems(params): for p in item['params']: if p.get('value', '') == '': err.append( (key, p.get( 'name', ''), p.get( 'description', ''))) if err: GError( parent=parent, message=_("Variables below not defined:") + "\n\n" + unicode( '\n'.join( map( lambda x: "%s: %s (%s)" % (x[0], x[1], x[2]), err)))) return log.cmdThread.SetId(-1) for item in self.GetItems(): if not item.IsEnabled(): continue if isinstance(item, ModelAction): if item.GetBlockId(): continue self.RunAction(item, params, log) elif isinstance(item, ModelLoop): cond = item.GetLabel() # substitute variables in condition variables = self.GetVariables() for variable in variables: pattern = re.compile('%' + variable) if pattern.search(cond): value = '' if params and 'variables' in params: for p in params['variables']['params']: if variable == p.get('name', ''): value = p.get('value', '') break if not value: value = variables[variable].get('value', '') if not value: continue vtype = variables[variable].get('type', 'string') if vtype == 'string': value = '"' + value + '"' cond = pattern.sub(value, cond) # split condition # TODO: this part needs some better solution condVar, condText = map( lambda x: x.strip(), re.split('\s* in \s*', cond)) pattern = re.compile('%' + condVar) # for vars()[condVar] in eval(condText): ? vlist = list() if condText[0] == '`' and condText[-1] == '`': # run command cmd, dcmd = gtask.cmdlist_to_tuple( condText[1: -1].split(' ')) ret = RunCommand(cmd, read=True, **dcmd) if ret: vlist = ret.splitlines() else: vlist = eval(condText) if 'variables' not in params: params['variables'] = {'params': []} varDict = {'name': condVar, 'value': ''} params['variables']['params'].append(varDict) for var in vlist: for action in item.GetItems(self.GetItems()): if not action.IsEnabled(): continue varDict['value'] = var self.RunAction(item=action, params=params, log=log) params['variables']['params'].remove(varDict) if delInterData: self.DeleteIntermediateData(log) # discard values if params: for item in six.itervalues(params): for p in item['params']: p['value'] = '' def DeleteIntermediateData(self, log): """Detele intermediate data""" rast, vect, rast3d, msg = self.GetIntermediateData() if rast: log.RunCmd(['g.remove', '-f', 'type=raster', 'name=%s' % ','.join(rast)]) if rast3d: log.RunCmd(['g.remove', '-f', 'type=raster_3d', 'name=%s' % ','.join(rast3d)]) if vect: log.RunCmd(['g.remove', '-f', 'type=vector', 'name=%s' % ','.join(vect)]) def GetIntermediateData(self): """Get info about intermediate data""" rast = list() rast3d = list() vect = list() for data in self.GetData(): if not data.IsIntermediate(): continue name = data.GetValue() prompt = data.GetPrompt() if prompt == 'raster': rast.append(name) elif prompt == 'vector': vect.append(name) elif prompt == 'raster_3d': rast3d.append(name) msg = '' if rast: msg += '\n\n%s: ' % _('Raster maps') msg += ', '.join(rast) if rast3d: msg += '\n\n%s: ' % _('3D raster maps') msg += ', '.join(rast3d) if vect: msg += '\n\n%s: ' % _('Vector maps') msg += ', '.join(vect) return rast, vect, rast3d, msg def Update(self): """Update model""" for item in self.items: item.Update() def IsParameterized(self): """Return True if model is parameterized""" if self.Parameterize(): return True return False def Parameterize(self): """Return parameterized options""" result = dict() idx = 0 if self.variables: params = list() result["variables"] = {'flags': list(), 'params': params, 'idx': idx} for name, values in six.iteritems(self.variables): gtype = values.get('type', 'string') if gtype in ('raster', 'vector', 'mapset', 'file', 'region', 'dir'): gisprompt = True prompt = gtype if gtype == 'raster': element = 'cell' else: element = gtype ptype = 'string' else: gisprompt = False prompt = None element = None ptype = gtype params.append({'gisprompt': gisprompt, 'multiple': False, 'description': values.get('description', ''), 'guidependency': '', 'default': '', 'age': None, 'required': True, 'value': values.get('value', ''), 'label': '', 'guisection': '', 'key_desc': '', 'values': list(), 'parameterized': False, 'values_desc': list(), 'prompt': prompt, 'element': element, 'type': ptype, 'name': name}) idx += 1 for action in self.GetItems(objType=ModelAction): if not action.IsEnabled(): continue name = '({0}) {1}'.format(action.GetId(), action.GetLabel()) params = action.GetParams() increment = False for f in params['flags']: if f.get('parameterized', False): if name not in result: increment = True result[name] = {'flags': list(), 'params': list(), 'idx': idx} result[name]['flags'].append(f) for p in params['params']: if p.get('parameterized', False): if name not in result: increment = True result[name] = {'flags': list(), 'params': list(), 'idx': idx} result[name]['params'].append(p) if increment: idx += 1 self.variablesParams = result # record parameters return result class ModelObject(object): def __init__(self, id=-1, label=''): self.id = id # internal id, should be not changed self.label = '' self.rels = list() # list of ModelRelations self.isEnabled = True self.inBlock = list() # list of related loops/conditions def __del__(self): pass def GetLabel(self): """Get label""" return self.label def SetLabel(self, label=''): """Set label""" self.label = label def GetId(self): """Get id""" return self.id def SetId(self, newId): """Set id""" if self.inBlock: for loop in self.inBlock: # update block item loop.UpdateItem(self.id, newId) self.id = newId def AddRelation(self, rel): """Record new relation """ self.rels.append(rel) def GetRelations(self, fdir=None): """Get list of relations :param bool fdir: True for 'from' """ if fdir is None: return self.rels result = list() for rel in self.rels: if fdir == 'from': if rel.GetFrom() == self: result.append(rel) else: if rel.GetTo() == self: result.append(rel) return result def IsEnabled(self): """Get True if action is enabled, otherwise False""" return self.isEnabled def Enable(self, enabled=True): """Enable/disable action""" self.isEnabled = enabled self.Update() def Update(self): pass def SetBlock(self, item): """Add object to the block (loop/condition) :param item: reference to ModelLoop or ModelCondition which defines loops/condition """ if item not in self.inBlock: self.inBlock.append(item) def UnSetBlock(self, item): """Remove object from the block (loop/consition) :param item: reference to ModelLoop or ModelCondition which defines loops/codition """ if item in self.inBlock: self.inBlock.remove(item) def GetBlock(self): """Get list of related ModelObject(s) which defines block (loop/condition) :return: list of ModelObjects """ return self.inBlock def GetBlockId(self): """Get list of related ids which defines block :return: list of ids """ ret = list() for mo in self.inBlock: ret.append(mo.GetId()) return ret class ModelAction(ModelObject, ogl.DividedShape): """Action class (GRASS module)""" def __init__(self, parent, x, y, id=-1, cmd=None, task=None, width=None, height=None, label=None, comment=''): ModelObject.__init__(self, id, label) self.parent = parent self.task = task self.comment = comment if not width: width = UserSettings.Get( group='modeler', key='action', subkey=( 'size', 'width')) if not height: height = UserSettings.Get( group='modeler', key='action', subkey=( 'size', 'height')) if cmd: self.task = GUI(show=None).ParseCommand(cmd=cmd) else: if task: self.task = task else: self.task = None self.propWin = None self.data = list() # list of connected data items self.isValid = False self.isParameterized = False if self.parent.GetCanvas(): ogl.DividedShape.__init__(self, width, height) self.regionLabel = ogl.ShapeRegion() self.regionLabel.SetFormatMode( ogl.FORMAT_CENTRE_HORIZ | ogl.FORMAT_CENTRE_VERT) self.AddRegion(self.regionLabel) self.regionComment = None self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self._setPen() self._setBrush() self.SetLabel(label) if comment: self.SetComment(comment) self.SetRegionSizes() self.ReformatRegions() if self.task: self.SetValid(self.task.get_options()) def _setBrush(self, running=False): """Set brush""" if running: color = UserSettings.Get(group='modeler', key='action', subkey=('color', 'running')) elif not self.isEnabled: color = UserSettings.Get(group='modeler', key='disabled', subkey='color') elif self.isValid: color = UserSettings.Get(group='modeler', key='action', subkey=('color', 'valid')) else: color = UserSettings.Get(group='modeler', key='action', subkey=('color', 'invalid')) wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def _setPen(self): """Set pen""" if self.isParameterized: width = int(UserSettings.Get(group='modeler', key='action', subkey=('width', 'parameterized'))) else: width = int(UserSettings.Get(group='modeler', key='action', subkey=('width', 'default'))) if self.isEnabled: style = wx.SOLID else: style = wx.DOT pen = wx.Pen(wx.BLACK, width, style) self.SetPen(pen) def ReformatRegions(self): rnum = 0 canvas = self.parent.GetCanvas() dc = wx.ClientDC(canvas) # used for measuring for region in self.GetRegions(): text = region.GetText() self.FormatText(dc, text, rnum) rnum += 1 def OnSizingEndDragLeft(self, pt, x, y, keys, attch): ogl.DividedShape.OnSizingEndDragLeft(self, pt, x, y, keys, attch) self.SetRegionSizes() self.ReformatRegions() self.GetCanvas().Refresh() def SetLabel(self, label=None): """Set label :param label: if None use command string instead """ if label: self.label = label elif self.label: label = self.label else: try: label = self.task.get_cmd(ignoreErrors=True)[0] except: label = _("unknown") idx = self.GetId() self.regionLabel.SetText('(%d) %s' % (idx, label)) self.SetRegionSizes() self.ReformatRegions() def SetComment(self, comment): """Set comment""" self.comment = comment if self.regionComment is None: self.regionComment = ogl.ShapeRegion() self.regionComment.SetFormatMode(ogl.FORMAT_CENTRE_HORIZ) font = wx.SystemSettings.GetFont(wx.SYS_DEFAULT_GUI_FONT) font.SetStyle(wx.ITALIC) self.regionComment.SetFont(font) # clear doesn't work # self.regionComment.ClearText() self.regionComment.SetText(comment) self.ClearRegions() self.AddRegion(self.regionLabel) self.regionLabel.SetProportions(0.0, 1.0) if self.comment: self.AddRegion(self.regionComment) self.regionLabel.SetProportions(0.0, 0.4) self.SetRegionSizes() self.ReformatRegions() def GetComment(self): """Get comment""" return self.comment def SetProperties(self, params, propwin): """Record properties dialog""" self.task.params = params['params'] self.task.flags = params['flags'] self.propWin = propwin def GetPropDialog(self): """Get properties dialog""" return self.propWin def GetLog(self, string=True, substitute=None): """Get logging info :param string: True to get cmd as a string otherwise a list :param substitute: dictionary of parameter to substitute or None """ cmd = self.task.get_cmd(ignoreErrors=True, ignoreRequired=True, ignoreDefault=False) # substitute variables if substitute: variables = [] if 'variables' in substitute: for p in substitute['variables']['params']: variables.append(p.get('name', '')) else: variables = self.parent.GetVariables() # order variables by length for variable in sorted(variables, key=len, reverse=True): pattern = re.compile('%' + variable) value = '' if substitute and 'variables' in substitute: for p in substitute['variables']['params']: if variable == p.get('name', ''): if p.get('type', 'string') == 'string': value = p.get('value', '') else: value = str(p.get('value', '')) break if not value: value = variables[variable].get('value', '') if not value: continue for idx in range(len(cmd)): if pattern.search(cmd[idx]): cmd[idx] = pattern.sub(value, cmd[idx]) idx += 1 if string: if cmd is None: return '' else: return ' '.join(cmd) return cmd def GetLabel(self): """Get name""" if self.label: return self.label cmd = self.task.get_cmd(ignoreErrors=True) if cmd and len(cmd) > 0: return cmd[0] return _('unknown') def GetParams(self, dcopy=False): """Get dictionary of parameters""" if dcopy: return copy.deepcopy(self.task.get_options()) return self.task.get_options() def GetTask(self): """Get grassTask instance""" return self.task def SetParams(self, params): """Set dictionary of parameters""" self.task.params = params['params'] self.task.flags = params['flags'] def MergeParams(self, params): """Merge dictionary of parameters""" if 'flags' in params: for f in params['flags']: self.task.set_flag(f['name'], f.get('value', False)) if 'params' in params: for p in params['params']: self.task.set_param(p['name'], p.get('value', '')) def SetValid(self, options): """Set validity for action :param options: dictionary with flags and params (gtask) """ self.isValid = True for f in options['flags']: if f.get('parameterized', False): self.isParameterized = True break for p in options['params']: if self.isValid and p.get('required', False) and \ p.get('value', '') == '' and \ p.get('default', '') == '': self.isValid = False if not self.isParameterized and p.get('parameterized', False): self.isParameterized = True if self.parent.GetCanvas(): self._setBrush() self._setPen() def IsValid(self): """Check validity (all required parameters set)""" return self.isValid def IsParameterized(self): """Check if action is parameterized""" return self.isParameterized def GetParameterizedParams(self): """Return parameterized flags and options""" param = {'flags': [], 'params': []} options = self.GetParams() for f in options['flags']: if f.get('parameterized', False): param['flags'].append(f) for p in options['params']: if p.get('parameterized', False): param['params'].append(p) return param def FindData(self, name): """Find data item by name""" for rel in self.GetRelations(): data = rel.GetData() if name == rel.GetLabel() and name in data.GetLabel(): return data return None def Update(self, running=False): """Update action""" if running: self._setBrush(running=True) else: self._setBrush() self._setPen() def OnDraw(self, dc): """Draw action in canvas""" self._setBrush() self._setPen() ogl.RectangleShape.Recentre(self, dc) # re-center text ogl.RectangleShape.OnDraw(self, dc) class ModelData(ModelObject, ogl.EllipseShape): def __init__(self, parent, x, y, value='', prompt='', width=None, height=None): """Data item class :param parent: window parent :param x, y: position of the shape :param fname, tname: list of parameter names from / to :param value: value :param prompt: type of GIS element :param width, height: dimension of the shape """ ModelObject.__init__(self) self.parent = parent self.value = value self.prompt = prompt self.intermediate = False self.display = False self.propWin = None if not width: width = UserSettings.Get( group='modeler', key='data', subkey=( 'size', 'width')) if not height: height = UserSettings.Get( group='modeler', key='data', subkey=( 'size', 'height')) if self.parent.GetCanvas(): ogl.EllipseShape.__init__(self, width, height) self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self._setPen() self._setBrush() self.SetLabel() def IsIntermediate(self): """Checks if data item is intermediate""" return self.intermediate def SetIntermediate(self, im): """Set intermediate flag""" self.intermediate = im def HasDisplay(self): """Checks if data item is marked to be displayed""" return self.display def SetHasDisplay(self, tbd): """Set to-be-displayed flag""" self.display = tbd def OnDraw(self, dc): self._setPen() ogl.EllipseShape.OnDraw(self, dc) def GetLog(self, string=True): """Get logging info""" name = list() for rel in self.GetRelations(): name.append(rel.GetLabel()) if name: return '/'.join(name) + '=' + self.value + ' (' + self.prompt + ')' else: return self.value + ' (' + self.prompt + ')' def GetLabel(self): """Get list of names""" name = list() for rel in self.GetRelations(): name.append(rel.GetLabel()) return name def GetPrompt(self): """Get prompt""" return self.prompt def SetPrompt(self, prompt): """Set prompt :param prompt: """ self.prompt = prompt def GetValue(self): """Get value""" return self.value def SetValue(self, value): """Set value :param value: """ self.value = value self.SetLabel() for direction in ('from', 'to'): for rel in self.GetRelations(direction): if direction == 'from': action = rel.GetTo() else: action = rel.GetFrom() task = GUI( show=None).ParseCommand( cmd=action.GetLog( string=False)) task.set_param(rel.GetLabel(), self.value) action.MergeParams(task.get_options()) def GetPropDialog(self): """Get properties dialog""" return self.propWin def SetPropDialog(self, win): """Get properties dialog""" self.propWin = win def _setBrush(self): """Set brush""" if self.prompt == 'raster': color = UserSettings.Get(group='modeler', key='data', subkey=('color', 'raster')) elif self.prompt == 'raster_3d': color = UserSettings.Get(group='modeler', key='data', subkey=('color', 'raster3d')) elif self.prompt == 'vector': color = UserSettings.Get(group='modeler', key='data', subkey=('color', 'vector')) elif self.prompt == 'dbtable': color = UserSettings.Get(group='modeler', key='data', subkey=('color', 'dbtable')) else: color = UserSettings.Get(group='modeler', key='action', subkey=('color', 'invalid')) wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def _setPen(self): """Set pen""" isParameterized = False for rel in self.GetRelations('from'): if rel.GetTo().IsParameterized(): isParameterized = True break if not isParameterized: for rel in self.GetRelations('to'): if rel.GetFrom().IsParameterized(): isParameterized = True break if isParameterized: width = int(UserSettings.Get(group='modeler', key='action', subkey=('width', 'parameterized'))) else: width = int(UserSettings.Get(group='modeler', key='action', subkey=('width', 'default'))) if self.intermediate: style = wx.DOT else: style = wx.SOLID pen = wx.Pen(wx.BLACK, width, style) self.SetPen(pen) def SetLabel(self): """Update text""" self.ClearText() name = [] for rel in self.GetRelations(): name.append(rel.GetLabel()) self.AddText('/'.join(name)) if self.value: self.AddText(self.value) else: self.AddText(_('')) def Update(self): """Update action""" self._setBrush() self._setPen() self.SetLabel() def GetDisplayCmd(self): """Get display command as list""" cmd = [] if self.prompt == 'raster': cmd.append('d.rast') elif self.prompt == 'vector': cmd.append('d.vect') else: raise GException("Unsupported display prompt: {}".format( self.prompt)) cmd.append('map=' + self.value) return cmd class ModelRelation(ogl.LineShape): """Data - action relation""" def __init__(self, parent, fromShape, toShape, param=''): self.fromShape = fromShape self.toShape = toShape self.param = param self.parent = parent self._points = None if self.parent.GetCanvas(): ogl.LineShape.__init__(self) def __del__(self): if self in self.fromShape.rels: self.fromShape.rels.remove(self) if self in self.toShape.rels: self.toShape.rels.remove(self) def GetFrom(self): """Get id of 'from' shape""" return self.fromShape def GetTo(self): """Get id of 'to' shape""" return self.toShape def GetData(self): """Get related ModelData instance :return: ModelData instance :return: None if not found """ if isinstance(self.fromShape, ModelData): return self.fromShape elif isinstance(self.toShape, ModelData): return self.toShape return None def GetLabel(self): """Get parameter name""" return self.param def ResetShapes(self): """Reset related objects""" self.fromShape.ResetControlPoints() self.toShape.ResetControlPoints() self.ResetControlPoints() def SetControlPoints(self, points): """Set control points""" self._points = points def GetControlPoints(self): """Get list of control points""" return self._points def _setPen(self): """Set pen""" pen = wx.Pen(wx.BLACK, 1, wx.SOLID) self.SetPen(pen) def OnDraw(self, dc): """Draw relation""" self._setPen() ogl.LineShape.OnDraw(self, dc) def SetName(self, param): self.param = param class ModelItem(ModelObject): def __init__(self, parent, x, y, id=-1, width=None, height=None, label='', items=[]): """Abstract class for loops and conditions""" ModelObject.__init__(self, id, label) self.parent = parent def _setPen(self): """Set pen""" if self.isEnabled: style = wx.SOLID else: style = wx.DOT pen = wx.Pen(wx.BLACK, 1, style) self.SetPen(pen) def SetId(self, id): """Set loop id""" self.id = id def SetLabel(self, label=''): """Set loop text (condition)""" if label: self.label = label self.ClearText() self.AddText('(' + str(self.id) + ') ' + self.label) def GetLog(self): """Get log info""" if self.label: return _("Condition: ") + self.label else: return _("Condition: not defined") def AddRelation(self, rel): """Record relation""" self.rels.append(rel) def Clear(self): """Clear object, remove rels""" self.rels = list() class ModelLoop(ModelItem, ogl.RectangleShape): def __init__(self, parent, x, y, id=-1, idx=-1, width=None, height=None, label='', items=[]): """Defines a loop""" ModelItem.__init__(self, parent, x, y, id, width, height, label, items) self.itemIds = list() # unordered if not width: width = UserSettings.Get( group='modeler', key='loop', subkey=( 'size', 'width')) if not height: height = UserSettings.Get( group='modeler', key='loop', subkey=( 'size', 'height')) if self.parent.GetCanvas(): ogl.RectangleShape.__init__(self, width, height) self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self._setPen() self._setBrush() self.SetCornerRadius(100) self.SetLabel(label) def _setBrush(self): """Set brush""" if not self.isEnabled: color = UserSettings.Get(group='modeler', key='disabled', subkey='color') else: color = UserSettings.Get(group='modeler', key='loop', subkey=('color', 'valid')) wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def Enable(self, enabled=True): """Enable/disable action""" for idx in self.itemIds: item = self.parent.FindAction(idx) if item: item.Enable(enabled) ModelObject.Enable(self, enabled) self.Update() def Update(self): self._setPen() self._setBrush() def GetItems(self, items): """Get sorted items by id""" result = list() for item in items: if item.GetId() in self.itemIds: result.append(item) return result def SetItems(self, items): """Set items (id)""" self.itemIds = items def UpdateItem(self, oldId, newId): """Update item in the list""" idx = self.itemIds.index(oldId) if idx != -1: self.itemIds[idx] = newId def OnDraw(self, dc): """Draw loop in canvas""" self._setBrush() ogl.RectangleShape.Recentre(self, dc) # re-center text ogl.RectangleShape.OnDraw(self, dc) class ModelCondition(ModelItem, ogl.PolygonShape): def __init__(self, parent, x, y, id=-1, width=None, height=None, label='', items={'if': [], 'else': []}): """Defines a if-else condition""" ModelItem.__init__(self, parent, x, y, id, width, height, label, items) self.itemIds = {'if': [], 'else': []} if not width: self.width = UserSettings.Get( group='modeler', key='if-else', subkey=( 'size', 'width')) else: self.width = width if not height: self.height = UserSettings.Get( group='modeler', key='if-else', subkey=( 'size', 'height')) else: self.height = height if self.parent.GetCanvas(): ogl.PolygonShape.__init__(self) points = [(0, - self.height / 2), (self.width / 2, 0), (0, self.height / 2), (- self.width / 2, 0)] self.Create(points) self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self.SetPen(wx.BLACK_PEN) if label: self.AddText('(' + str(self.id) + ') ' + label) else: self.AddText('(' + str(self.id) + ')') def GetLabel(self): """Get name""" return _("if-else") def GetWidth(self): """Get object width""" return self.width def GetHeight(self): """Get object height""" return self.height def GetItems(self, items): """Get sorted items by id""" result = {'if': [], 'else': []} for item in items: if item.GetId() in self.itemIds['if']: result['if'].append(item) elif item.GetId() in self.itemIds['else']: result['else'].append(item) return result def SetItems(self, items, branch='if'): """Set items (id) :param items: list of items :param branch: 'if' / 'else' """ if branch in ['if', 'else']: self.itemIds[branch] = items class ModelComment(ModelObject, ogl.RectangleShape): def __init__(self, parent, x, y, id=-1, width=None, height=None, label=''): """Defines a model comment""" ModelObject.__init__(self, id, label) if not width: width = UserSettings.Get( group='modeler', key='comment', subkey=( 'size', 'width')) if not height: height = UserSettings.Get( group='modeler', key='comment', subkey=( 'size', 'height')) if parent.GetCanvas(): ogl.RectangleShape.__init__(self, width, height) self.SetCanvas(parent) self.SetX(x) self.SetY(y) font = wx.SystemSettings.GetFont(wx.SYS_DEFAULT_GUI_FONT) font.SetStyle(wx.ITALIC) self.SetFont(font) self._setPen() self._setBrush() self.SetLabel(label) def _setBrush(self, running=False): """Set brush""" color = UserSettings.Get(group='modeler', key='comment', subkey='color') wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def _setPen(self): """Set pen""" pen = wx.Pen(wx.BLACK, 1, wx.DOT) self.SetPen(pen) def SetLabel(self, label=None): """Set label :param label: if None use command string instead """ if label: self.label = label elif self.label: label = self.label else: label = '' idx = self.GetId() self.ClearText() self.AddText('(%d) %s' % (idx, label)) def GetComment(self): return self.GetLabel() def SetComment(self, comment): self.SetLabel(comment) self.GetCanvas().Refresh() class ProcessModelFile: """Process GRASS model file (gxm)""" def __init__(self, tree): """A ElementTree handler for the GXM XML file, as defined in grass-gxm.dtd. """ self.tree = tree self.root = self.tree.getroot() # check if input is a valid GXM file if self.root is None or self.root.tag != 'gxm': if self.root is not None: tagName = self.root.tag else: tabName = _("empty") raise GException( _("Details: unsupported tag name '{0}'.").format(tagName)) # list of actions, data self.properties = dict() self.variables = dict() self.actions = list() self.data = list() self.loops = list() self.conditions = list() self.comments = list() self._processWindow() self._processProperties() self._processVariables() self._processItems() self._processData() def _filterValue(self, value): """Filter value :param value: """ value = value.replace('<', '<') value = value.replace('>', '>') return value def _getNodeText(self, node, tag, default=''): """Get node text""" p = node.find(tag) if p is not None: if p.text: return utils.normalize_whitespace(p.text) else: return '' return default def _processWindow(self): """Process window properties""" node = self.root.find('window') if node is None: self.pos = self.size = None return self.pos, self.size = self._getDim(node) def _processProperties(self): """Process model properties""" node = self.root.find('properties') if node is None: return for key in ('name', 'description', 'author'): self._processProperty(node, key) for f in node.findall('flag'): name = f.get('name', '') if name == 'overwrite': self.properties['overwrite'] = True def _processProperty(self, pnode, name): """Process given property""" node = pnode.find(name) if node is not None: self.properties[name] = node.text else: self.properties[name] = '' def _processVariables(self): """Process model variables""" vnode = self.root.find('variables') if vnode is None: return for node in vnode.findall('variable'): name = node.get('name', '') if not name: continue # should not happen self.variables[name] = {'type': node.get('type', 'string')} for key in ('description', 'value'): self._processVariable(node, name, key) def _processVariable(self, pnode, name, key): """Process given variable""" node = pnode.find(key) if node is not None: if node.text: self.variables[name][key] = node.text def _processItems(self): """Process model items (actions, loops, conditions)""" self._processActions() self._processLoops() self._processConditions() self._processComments() def _processActions(self): """Process model file""" for action in self.root.findall('action'): pos, size = self._getDim(action) disabled = False task = action.find('task') if task is not None: if task.find('disabled') is not None: disabled = True task = self._processTask(task) else: task = None aId = int(action.get('id', -1)) label = action.get('name') comment = action.find('comment') if comment is not None: commentString = comment.text else: commentString = '' self.actions.append({'pos': pos, 'size': size, 'task': task, 'id': aId, 'disabled': disabled, 'label': label, 'comment': commentString}) def _getDim(self, node): """Get position and size of shape""" pos = size = None posAttr = node.get('pos', None) if posAttr: posVal = list(map(int, posAttr.split(','))) try: pos = (posVal[0], posVal[1]) except: pos = None sizeAttr = node.get('size', None) if sizeAttr: sizeVal = list(map(int, sizeAttr.split(','))) try: size = (sizeVal[0], sizeVal[1]) except: size = None return pos, size def _processData(self): """Process model file""" for data in self.root.findall('data'): pos, size = self._getDim(data) param = data.find('data-parameter') prompt = value = None if param is not None: prompt = param.get('prompt', None) value = self._filterValue(self._getNodeText(param, 'value')) intermediate = False if data.find('intermediate') is None else True display = False if data.find('display') is None else True rels = list() for rel in data.findall('relation'): defrel = {'id': int(rel.get('id', -1)), 'dir': rel.get('dir', 'to'), 'name': rel.get('name', '')} points = list() for point in rel.findall('point'): x = self._filterValue(self._getNodeText(point, 'x')) y = self._filterValue(self._getNodeText(point, 'y')) points.append((float(x), float(y))) defrel['points'] = points rels.append(defrel) self.data.append({'pos': pos, 'size': size, 'prompt': prompt, 'value': value, 'intermediate': intermediate, 'display': display, 'rels': rels}) def _processTask(self, node): """Process task :return: grassTask instance :return: None on error """ cmd = list() parameterized = list() name = node.get('name', None) if not name: return None cmd.append(name) # flags for f in node.findall('flag'): flag = f.get('name', '') if f.get('parameterized', '0') == '1': parameterized.append(('flag', flag)) if f.get('value', '1') == '0': continue if len(flag) > 1: cmd.append('--' + flag) else: cmd.append('-' + flag) # parameters for p in node.findall('parameter'): name = p.get('name', '') if p.find('parameterized') is not None: parameterized.append(('param', name)) cmd.append( '%s=%s' % (name, self._filterValue( self._getNodeText( p, 'value')))) task, err = GUI(show=None, checkError=True).ParseCommand(cmd=cmd) if err: GWarning(os.linesep.join(err)) for opt, name in parameterized: if opt == 'flag': task.set_flag(name, True, element='parameterized') else: task.set_param(name, True, element='parameterized') return task def _processLoops(self): """Process model loops""" for node in self.root.findall('loop'): pos, size = self._getDim(node) text = self._filterValue( self._getNodeText( node, 'condition')).strip() aid = list() for anode in node.findall('item'): try: aid.append(int(anode.text)) except ValueError: pass self.loops.append({'pos': pos, 'size': size, 'text': text, 'id': int(node.get('id', -1)), 'items': aid}) def _processConditions(self): """Process model conditions""" for node in self.root.findall('if-else'): pos, size = self._getDim(node) text = self._filterValue( self._getNodeText( node, 'condition')).strip() aid = {'if': list(), 'else': list()} for b in aid.keys(): bnode = node.find(b) if bnode is None: continue for anode in bnode.findall('item'): try: aid[b].append(int(anode.text)) except ValueError: pass self.conditions.append({'pos': pos, 'size': size, 'text': text, 'id': int(node.get('id', -1)), 'items': aid}) def _processComments(self): """Process model comments""" for node in self.root.findall('comment'): pos, size = self._getDim(node) text = self._filterValue(node.text) self.comments.append({'pos': pos, 'size': size, 'text': text, 'id': int(node.get('id', -1)), 'text': text}) class WriteModelFile: """Generic class for writing model file""" def __init__(self, fd, model): self.fd = fd self.model = model self.properties = model.GetProperties() self.variables = model.GetVariables() self.items = model.GetItems() self.indent = 0 self._header() self._window() self._properties() self._variables() self._items() dataList = list() for action in model.GetItems(objType=ModelAction): for rel in action.GetRelations(): dataItem = rel.GetData() if dataItem not in dataList: dataList.append(dataItem) self._data(dataList) self._footer() def _filterValue(self, value): """Escapes value to be stored in XML. :param value: string to be escaped as XML :return: a XML-valid string """ value = saxutils.escape(value) return value def _header(self): """Write header""" self.fd.write( '\n' % GetDefaultEncoding( forceUTF8=True)) self.fd.write('\n') self.fd.write('%s\n' % (' ' * self.indent)) self.indent += 4 def _footer(self): """Write footer""" self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _window(self): """Write window properties""" canvas = self.model.GetCanvas() if canvas is None: return win = canvas.parent pos = win.GetPosition() size = win.GetSize() self.fd.write('%s\n' % (' ' * self.indent, pos[0], pos[1], size[0], size[1])) def _properties(self): """Write model properties""" self.fd.write('%s\n' % (' ' * self.indent)) self.indent += 4 if self.properties['name']: self.fd.write( '%s%s\n' % (' ' * self.indent, self.properties['name'])) if self.properties['description']: self.fd.write( '%s%s\n' % (' ' * self.indent, self.properties['description'])) if self.properties['author']: self.fd.write( '%s%s\n' % (' ' * self.indent, self.properties['author'])) if 'overwrite' in self.properties and \ self.properties['overwrite']: self.fd.write( '%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _variables(self): """Write model variables""" if not self.variables: return self.fd.write('%s\n' % (' ' * self.indent)) self.indent += 4 for name, values in six.iteritems(self.variables): self.fd.write( '%s\n' % (' ' * self.indent, name, values['type'])) self.indent += 4 if 'value' in values: self.fd.write('%s%s\n' % (' ' * self.indent, values['value'])) if 'description' in values: self.fd.write( '%s%s\n' % (' ' * self.indent, values['description'])) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _items(self): """Write actions/loops/conditions""" for item in self.items: if isinstance(item, ModelAction): self._action(item) elif isinstance(item, ModelLoop): self._loop(item) elif isinstance(item, ModelCondition): self._condition(item) elif isinstance(item, ModelComment): self._comment(item) def _action(self, action): """Write actions""" self.fd.write( '%s\n' % (' ' * self.indent, action.GetId(), action.GetLabel(), action.GetX(), action.GetY(), action.GetWidth(), action.GetHeight())) self.indent += 4 comment = action.GetComment() if comment: self.fd.write( '%s%s\n' % (' ' * self.indent, comment)) self.fd.write('%s\n' % (' ' * self.indent, action.GetLog(string=False)[0])) self.indent += 4 if not action.IsEnabled(): self.fd.write('%s\n' % (' ' * self.indent)) for key, val in six.iteritems(action.GetParams()): if key == 'flags': for f in val: if f.get('value', False) or f.get('parameterized', False): if f.get('parameterized', False): if f.get('value', False) == False: self.fd.write( '%s\n' % (' ' * self.indent, f.get( 'name', ''))) else: self.fd.write( '%s\n' % (' ' * self.indent, f.get( 'name', ''))) else: self.fd.write( '%s\n' % (' ' * self.indent, f.get('name', ''))) else: # parameter for p in val: if not p.get( 'value', '') and not p.get( 'parameterized', False): continue self.fd.write('%s\n' % (' ' * self.indent, p.get('name', ''))) self.indent += 4 if p.get('parameterized', False): self.fd.write( '%s\n' % (' ' * self.indent)) self.fd.write( '%s%s\n' % (' ' * self.indent, self._filterValue( p.get( 'value', '')))) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _data(self, dataList): """Write data""" for data in dataList: self.fd.write('%s\n' % (' ' * self.indent, data.GetX(), data.GetY(), data.GetWidth(), data.GetHeight())) self.indent += 4 self.fd.write('%s\n' % (' ' * self.indent, data.GetPrompt())) self.indent += 4 self.fd.write( '%s%s\n' % (' ' * self.indent, self._filterValue( data.GetValue()))) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) if data.IsIntermediate(): self.fd.write('%s\n' % (' ' * self.indent)) if data.HasDisplay(): self.fd.write('%s\n' % (' ' * self.indent)) # relations for ft in ('from', 'to'): for rel in data.GetRelations(ft): if ft == 'from': aid = rel.GetTo().GetId() else: aid = rel.GetFrom().GetId() self.fd.write('%s\n' % (' ' * self.indent, ft, aid, rel.GetLabel())) self.indent += 4 for point in rel.GetLineControlPoints()[1:-1]: self.fd.write('%s\n' % (' ' * self.indent)) self.indent += 4 x, y = point.Get() self.fd.write( '%s%d\n' % (' ' * self.indent, int(x))) self.fd.write( '%s%d\n' % (' ' * self.indent, int(y))) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _loop(self, loop): """Write loops""" self.fd.write( '%s\n' % (' ' * self.indent, loop.GetId(), loop.GetX(), loop.GetY(), loop.GetWidth(), loop.GetHeight())) self.indent += 4 cond = loop.GetLabel() if cond: self.fd.write('%s%s\n' % (' ' * self.indent, self._filterValue(cond))) for item in loop.GetItems(self.model.GetItems(objType=ModelAction)): self.fd.write('%s%d\n' % (' ' * self.indent, item.GetId())) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _condition(self, condition): """Write conditions""" bbox = condition.GetBoundingBoxMin() self.fd.write( '%s\n' % (' ' * self.indent, condition.GetId(), condition.GetX(), condition.GetY(), bbox[0], bbox[1])) text = condition.GetLabel() self.indent += 4 if text: self.fd.write('%s%s\n' % (' ' * self.indent, self._filterValue(text))) items = condition.GetItems() for b in items.keys(): if len(items[b]) < 1: continue self.fd.write('%s<%s>\n' % (' ' * self.indent, b)) self.indent += 4 for item in items[b]: self.fd.write('%s%d\n' % (' ' * self.indent, item.GetId())) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent, b)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _comment(self, comment): """Write comment""" self.fd.write( '%s%s\n' % (' ' * self.indent, comment.GetId(), comment.GetX(), comment.GetY(), comment.GetWidth(), comment.GetHeight(), comment.GetLabel())) class WritePythonFile: def __init__(self, fd, model): """Class for exporting model to Python script :param fd: file descriptor """ self.fd = fd self.model = model self.indent = 4 self._writePython() def _getStandardizedOption(self, string): if string == 'raster': return 'G_OPT_R_MAP' elif string == 'vector': return 'G_OPT_V_MAP' elif string == 'mapset': return 'G_OPT_M_MAPSET' elif string == 'file': return 'G_OPT_F_INPUT' elif string == 'dir': return 'G_OPT_M_DIR' elif string == 'region': return 'G_OPT_M_REGION' return '' def _writePython(self): """Write model to file""" properties = self.model.GetProperties() # header self.fd.write( r"""#!/usr/bin/env python3 # #{header_begin} # # MODULE: {module_name} # # AUTHOR(S): {author} # # PURPOSE: {purpose} # # DATE: {date} # #{header_end} """.format(header_begin='#' * 77, module_name=properties['name'], author=properties['author'], purpose='\n# '.join(properties['description'].splitlines()), date=time.asctime(), header_end='#' * 77)) # UI self.fd.write( r""" #%module #% description: {description} #%end """.format(description=' '.join(properties['description'].splitlines()))) modelItems = self.model.GetItems() for item in modelItems: for flag in item.GetParameterizedParams()['flags']: if flag['label']: desc = flag['label'] else: desc = flag['description'] self.fd.write( r"""#%option #% key: {flag_name} #% description: {description} #% required: yes #% type: string #% options: True, False #% guisection: Flags """.format(flag_name=self._getParamName(flag['name'], item), description=desc)) if flag['value']: self.fd.write("#% answer: {}\n".format(flag['value'])) else: self.fd.write("#% answer: False\n") self.fd.write("#%end\n") for param in item.GetParameterizedParams()['params']: if param['label']: desc = param['label'] else: desc = param['description'] self.fd.write( r"""#%option #% key: {param_name} #% description: {description} #% required: yes """.format(param_name=self._getParamName(param['name'], item), description=desc)) if param['type'] != 'float': self.fd.write('#% type: {}\n'.format(param['type'])) else: self.fd.write('#% type: double\n') if param['key_desc']: self.fd.write("#% key_desc: ") self.fd.write(', '.join(param['key_desc'])) self.fd.write("\n") if param['value']: self.fd.write("#% answer: {}\n".format(param['value'])) self.fd.write("#%end\n") # import modules self.fd.write( r""" import sys import os import atexit from grass.script import parser, run_command """) # cleanup() rast, vect, rast3d, msg = self.model.GetIntermediateData() self.fd.write( r""" def cleanup(): """) if rast: self.fd.write( r""" run_command('g.remove', flags='f', type='raster', name=%s) """ % ','.join(map(lambda x: "'" + x + "'", rast))) if vect: self.fd.write( r""" run_command('g.remove', flags='f', type='vector', name=%s) """ % ','.join(map(lambda x: "'" + x + "'", vect))) if rast3d: self.fd.write( r""" run_command('g.remove', flags='f', type='raster_3d', name=%s) """ % ','.join(map(lambda x: "'" + x + "'", rast3d))) if not rast and not vect and not rast3d: self.fd.write(' pass\n') self.fd.write("\ndef main(options, flags):\n") for item in self.model.GetItems(): self._writePythonItem(item, variables=item.GetParameterizedParams()) self.fd.write(" return 0\n") for item in modelItems: if item.GetParameterizedParams()['flags']: self.fd.write(r""" def getParameterizedFlags(paramFlags, itemFlags): fl = '' """) self.fd.write(""" for i in [key for key, value in paramFlags.iteritems() if value == 'True']: if i in itemFlags: fl += i[-1] return fl """) break self.fd.write( r""" if __name__ == "__main__": options, flags = parser() atexit.register(cleanup) """) if properties.get('overwrite'): self.fd.write(' os.environ["GRASS_OVERWRITE"] = "1"\n') self.fd.write(' sys.exit(main())\n') def _writePythonItem(self, item, ignoreBlock=True, variables={}): """Write model object to Python file""" if isinstance(item, ModelAction): if ignoreBlock and item.GetBlockId(): # ignore items in loops of conditions return self._writePythonAction(item, variables=variables) elif isinstance(item, ModelLoop) or isinstance(item, ModelCondition): # substitute condition cond = item.GetLabel() for variable in self.model.GetVariables(): pattern = re.compile('%' + variable) if pattern.search(cond): value = variables[variable].get('value', '') if variables[variable].get('type', 'string') == 'string': value = '"' + value + '"' cond = pattern.sub(value, cond) if isinstance(item, ModelLoop): condVar, condText = map( lambda x: x.strip(), re.split('\s* in \s*', cond)) cond = "%sfor %s in " % (' ' * self.indent, condVar) if condText[0] == '`' and condText[-1] == '`': task = GUI( show=None).ParseCommand( cmd=utils.split( condText[ 1:- 1])) cond += "grass.read_command(" cond += self._getPythonActionCmd( task, len(cond), variables=[condVar]) + ".splitlines()" else: cond += condText self.fd.write('%s:\n' % cond) self.indent += 4 variablesLoop = variables.copy() variablesLoop[condVar] = None for action in item.GetItems( self.model.GetItems(objType=ModelAction)): self._writePythonItem( action, ignoreBlock=False, variables=variablesLoop) self.indent -= 4 if isinstance(item, ModelCondition): self.fd.write('%sif %s:\n' % (' ' * self.indent, cond)) self.indent += 4 condItems = item.GetItems() for action in condItems['if']: self._writePythonItem(action, ignoreBlock=False) if condItems['else']: self.indent -= 4 self.fd.write('%selse:\n' % (' ' * self.indent)) self.indent += 4 for action in condItems['else']: self._writePythonItem(action, ignoreBlock=False) self.indent += 4 self.fd.write('\n') if isinstance(item, ModelComment): self._writePythonComment(item) def _writePythonAction(self, item, variables={}): """Write model action to Python file""" task = GUI(show=None).ParseCommand(cmd=item.GetLog(string=False)) strcmd = "%srun_command(" % (' ' * self.indent) self.fd.write( strcmd + self._getPythonActionCmd( item, task, len(strcmd), variables) + '\n') def _getPythonActionCmd(self, item, task, cmdIndent, variables={}): opts = task.get_options() ret = '' flags = '' params = list() itemParameterizedFlags = list() parameterizedParams = [v['name'] for v in variables['params']] parameterizedFlags = [v['name'] for v in variables['flags']] for f in opts['flags']: if f.get('name') in parameterizedFlags and len(f.get('name')) == 1: itemParameterizedFlags.append( '"{}"'.format(self._getParamName(f.get('name'), item))) if f.get('value', False): name = f.get('name', '') if len(name) > 1: params.append('%s = True' % name) else: flags += name itemParameterizedFlags = ', '.join(itemParameterizedFlags) for p in opts['params']: name = p.get('name', None) value = p.get('value', None) if (name and value) or (name in parameterizedParams): ptype = p.get('type', 'string') foundVar = False if name in parameterizedParams: foundVar = True value = 'options["{}"]'.format(self._getParamName(name, item)) if foundVar or ptype != 'string': params.append("{}={}".format(name, value)) else: params.append('{}="{}"'.format(name, value)) ret += '"%s"' % task.get_name() if flags: ret += ",\n{indent}flags='{fl}'".format(indent=' ' * cmdIndent, fl=flags) if itemParameterizedFlags: ret += ' + getParameterizedFlags(options, [{}])'.format( itemParameterizedFlags) elif itemParameterizedFlags: ret += ',\n{}flags=getParameterizedFlags(options, [{}])'.format( ' ' * cmdIndent, itemParameterizedFlags) if len(params) > 0: ret += ",\n" for opt in params[:-1]: ret += "%s%s,\n" % (' ' * cmdIndent, opt) ret += "%s%s)" % (' ' * cmdIndent, params[-1]) else: ret += ")" return ret def _writePythonComment(self, item): """Write model comment to Python file""" for line in item.GetLabel().splitlines(): self.fd.write('#' + line + '\n') def _substituteVariable(self, string, variable, data): """Substitute variable in the string :param string: string to be modified :param variable: variable to be substituted :param data: data related to the variable :return: modified string """ result = '' ss = re.split("\w*(%" + variable + ")w*", string) if not ss[0] and not ss[-1]: if data: return "options['%s']" % variable else: return variable for s in ss: if not s or s == '"': continue if s == '%' + variable: if data: result += "+options['%s']+" % variable else: result += '+%s+' % variable else: result += '"' + s if not s.endswith(']'): # options result += '"' return result.strip('+') def _getParamName(self, parameter_name, item): return '{module_name}{module_id}_{param_name}'.format( module_name=re.sub('[^a-zA-Z]+', '', item.GetLabel()), module_id=item.GetId(), param_name=parameter_name) class ModelParamDialog(wx.Dialog): def __init__( self, parent, model, params, id=wx.ID_ANY, title=_("Model parameters"), style=wx.DEFAULT_DIALOG_STYLE | wx.RESIZE_BORDER, **kwargs): """Model parameters dialog """ self.parent = parent self._model = model self.params = params self.tasks = list() # list of tasks/pages wx.Dialog.__init__( self, parent=parent, id=id, title=title, style=style, **kwargs) self.notebook = GNotebook(parent=self, style=globalvar.FNPageDStyle) panel = self._createPages() wx.CallAfter(self.notebook.SetSelection, 0) # intermediate data? self.interData = wx.CheckBox(parent=self, label=_( "Delete intermediate data when finish")) self.interData.SetValue(True) rast, vect, rast3d, msg = self._model.GetIntermediateData() if not rast and not vect and not rast3d: self.interData.Hide() self.btnCancel = Button(parent=self, id=wx.ID_CANCEL) self.btnRun = Button(parent=self, id=wx.ID_OK, label=_("&Run")) self.btnRun.SetDefault() self._layout() size = self.GetBestSize() self.SetMinSize(size) self.SetSize((size.width, size.height + panel.constrained_size[1] - panel.panelMinHeight)) def _layout(self): btnSizer = wx.StdDialogButtonSizer() btnSizer.AddButton(self.btnCancel) btnSizer.AddButton(self.btnRun) btnSizer.Realize() mainSizer = wx.BoxSizer(wx.VERTICAL) mainSizer.Add(self.notebook, proportion=1, flag=wx.EXPAND) if self.interData.IsShown(): mainSizer.Add(self.interData, proportion=0, flag=wx.EXPAND | wx.ALL | wx.ALIGN_CENTER, border=5) mainSizer.Add(wx.StaticLine(parent=self, id=wx.ID_ANY, style=wx.LI_HORIZONTAL), proportion=0, flag=wx.EXPAND | wx.LEFT | wx.RIGHT, border=5) mainSizer.Add(btnSizer, proportion=0, flag=wx.EXPAND | wx.ALL | wx.ALIGN_CENTER, border=5) self.SetSizer(mainSizer) mainSizer.Fit(self) def _createPages(self): """Create for each parameterized module its own page""" nameOrdered = [''] * len(self.params.keys()) for name, params in six.iteritems(self.params): nameOrdered[params['idx']] = name for name in nameOrdered: params = self.params[name] panel = self._createPage(name, params) if name == 'variables': name = _('Variables') self.notebook.AddPage(page=panel, text=name) return panel def _createPage(self, name, params): """Define notebook page""" if name in globalvar.grassCmd: task = gtask.grassTask(name) else: task = gtask.grassTask() task.flags = params['flags'] task.params = params['params'] panel = CmdPanel(parent=self, id=wx.ID_ANY, task=task, giface=GraphicalModelerGrassInterface(self._model)) self.tasks.append(task) return panel def GetErrors(self): """Check for errors, get list of messages""" errList = list() for task in self.tasks: errList += task.get_cmd_error() return errList def DeleteIntermediateData(self): """Check if to detele intermediate data""" if self.interData.IsShown() and self.interData.IsChecked(): return True return False