""" @package gmodeler.model @brief wxGUI Graphical Modeler (base classes & read/write) Classes: - model::Model - model::ModelObject - model::ModelAction - model::ModelData - model::ModelRelation - model::ModelItem - model::ModelLoop - model::ModelCondition - model::ModelComment - model::ProcessModelFile - model::WriteModelFile - model::WritePythonFile - model::ModelParamDialog (C) 2010-2016 by the GRASS Development Team This program is free software under the GNU General Public License (>=v2). Read the file COPYING that comes with GRASS for details. @author Martin Landa """ import os import getpass import copy import re import mimetypes import time try: import xml.etree.ElementTree as etree except ImportError: import elementtree.ElementTree as etree # Python <= 2.4 import xml.sax.saxutils as saxutils import wx from wx.lib import ogl from core import globalvar from core import utils from core.utils import _ from core.gcmd import GMessage, GException, GError, RunCommand, EncodeString, GWarning, GetDefaultEncoding from core.settings import UserSettings from gui_core.forms import GUI, CmdPanel from gui_core.widgets import GNotebook from gmodeler.giface import GraphicalModelerGrassInterface from grass.script import core as grass from grass.script import task as gtask class Model(object): """Class representing the model""" def __init__(self, canvas=None): self.items = list() # list of ordered items (action/loop/condition) # model properties self.properties = { 'name': _("model"), 'description': _("Script generated by wxGUI Graphical Modeler."), 'author': getpass.getuser()} # model variables self.variables = dict() self.variablesParams = dict() self.canvas = canvas def GetCanvas(self): """Get canvas or None""" return self.canvas def GetItems(self, objType=None): """Get list of model items :param objType: Object type to filter model objects """ if not objType: return self.items result = list() for item in self.items: if isinstance(item, objType): result.append(item) return result def GetItem(self, aId, objType=None): """Get item of given id :param aId: item id :return: Model* instance :return: None if no item found """ ilist = self.GetItems(objType) for item in ilist: if item.GetId() == aId: return item return None def GetItemIndex(self, item): """Return list index of given item""" return self.items.index(item) def GetNumItems(self, actionOnly=False): """Get number of items""" if actionOnly: return len(self.GetItems(objType=ModelAction)) return len(self.GetItems()) def ReorderItems(self, idxList): items = list() for oldIdx, newIdx in idxList.iteritems(): item = self.items.pop(oldIdx) items.append(item) self.items.insert(newIdx, item) # try: # nextItem = self.items[newIdx+1] # except IndexError: # continue # newIdx is the last item in the list # items.append(nextItem) # x = item.GetX() # y = item.GetY() # item.SetX(nextItem.GetX()) # item.SetY(nextItem.GetY()) # nextItem.SetX(x) # nextItem.SetY(y) dc = wx.ClientDC(self.canvas) for item in items: item.MoveLinks(dc) for mo in item.GetBlock(): if isinstance(mo, ModelLoop): self.canvas.parent.DefineLoop(mo) elif isinstance(mo, ModelCondition): self.canvas.parent.DefineCondition(mo) def Normalize(self): # check for inconsistecies for idx in range(1, len(self.items)): if not self.items[idx].GetBlock() and \ isinstance(self.items[idx - 1], ModelLoop): # swap action not-in-block with previously defined # loop itemPrev = self.items[idx - 1] self.items[idx - 1] = self.items[idx] self.items[idx] = itemPrev # update ids iId = 1 for item in self.items: item.SetId(iId) item.SetLabel() iId += 1 def GetNextId(self): """Get next id (data ignored) :return: next id to be used (default: 1) """ if len(self.items) < 1: return 1 currId = self.items[-1].GetId() if currId > 0: return currId + 1 return 1 def GetProperties(self): """Get model properties""" return self.properties def GetVariables(self, params=False): """Get model variables""" if params: return self.variablesParams return self.variables def SetVariables(self, data): """Set model variables""" self.variables = data def Reset(self): """Reset model""" self.items = list() def RemoveItem(self, item, reference=None): """Remove item from model :item: item to be removed :reference: reference item valid for deletion :return: list of related items to remove/update """ relList = list() upList = list() doRemove = True if isinstance(item, ModelAction): for rel in item.GetRelations(): relList.append(rel) data = rel.GetData() if len(data.GetRelations()) < 2: relList.append(data) else: upList.append(data) elif isinstance(item, ModelData): for rel in item.GetRelations(): otherItem = rel.GetTo() if rel.GetFrom() == item else rel.GetFrom() if reference and reference != otherItem: doRemove = False continue # avoid recursive deletion relList.append(rel) if reference and reference != otherItem: relList.append(otherItem) if not doRemove: upList.append(item) elif isinstance(item, ModelLoop): for rel in item.GetRelations(): relList.append(rel) for action in self.GetItems(): action.UnSetBlock(item) if doRemove and item in self.items: self.items.remove(item) return relList, upList def FindAction(self, aId): """Find action by id""" alist = self.GetItems(objType=ModelAction) for action in alist: if action.GetId() == aId: return action return None def GetMaps(self, prompt): """Get list of maps of selected type :param prompt: to filter maps""" maps = list() for data in self.GetData(): if prompt == data.GetPrompt(): mapName = data.GetValue() if not mapName or mapName[0] is '%': continue # skip variables maps.append(mapName) return maps def GetData(self): """Get list of data items""" result = list() dataItems = self.GetItems(objType=ModelData) for action in self.GetItems(objType=ModelAction): for rel in action.GetRelations(): dataItem = rel.GetData() if dataItem not in result: result.append(dataItem) if dataItem in dataItems: dataItems.remove(dataItem) # standalone data if dataItems: result += dataItems return result def FindData(self, value, prompt): """Find data item in the model :param value: value :param prompt: prompt :return: ModelData instance :return: None if not found """ for data in self.GetData(): if data.GetValue() == value and \ data.GetPrompt() == prompt: return data return None def LoadModel(self, filename): """Load model definition stored in GRASS Model XML file (gxm) .. todo:: Validate against DTD Raise exception on error. """ dtdFilename = os.path.join(globalvar.WXGUIDIR, "xml", "grass-gxm.dtd") # parse workspace file try: gxmXml = ProcessModelFile(etree.parse(filename)) except Exception as e: raise GException(unicode(e)) if self.canvas: win = self.canvas.parent if gxmXml.pos: win.SetPosition(gxmXml.pos) if gxmXml.size: win.SetSize(gxmXml.size) # load properties self.properties = gxmXml.properties self.variables = gxmXml.variables # load actions for action in gxmXml.actions: actionItem = ModelAction(parent=self, x=action['pos'][0], y=action['pos'][1], width=action['size'][0], height=action['size'][1], task=action['task'], id=action['id'], label=action['label'], comment=action['comment']) if action['disabled']: actionItem.Enable(False) self.AddItem(actionItem, pos=actionItem.GetId() - 1) actionItem.SetValid(actionItem.GetTask().get_options()) actionItem.GetLog() # substitute variables (-> valid/invalid) # load data & relations for data in gxmXml.data: dataItem = ModelData(parent=self, x=data['pos'][0], y=data['pos'][1], width=data['size'][0], height=data['size'][1], prompt=data['prompt'], value=data['value']) dataItem.SetIntermediate(data['intermediate']) for rel in data['rels']: actionItem = self.FindAction(rel['id']) if rel['dir'] == 'from': relation = ModelRelation( parent=self, fromShape=dataItem, toShape=actionItem, param=rel['name']) else: relation = ModelRelation( parent=self, fromShape=actionItem, toShape=dataItem, param=rel['name']) relation.SetControlPoints(rel['points']) actionItem.AddRelation(relation) dataItem.AddRelation(relation) if self.canvas: dataItem.Update() # load loops for loop in gxmXml.loops: loopItem = ModelLoop(parent=self, x=loop['pos'][0], y=loop['pos'][1], width=loop['size'][0], height=loop['size'][1], label=loop['text'], id=loop['id']) self.AddItem(loopItem, pos=loopItem.GetId() - 1) # load conditions for condition in gxmXml.conditions: conditionItem = ModelCondition(parent=self, x=condition['pos'][0], y=condition['pos'][1], width=condition['size'][0], height=condition['size'][1], label=condition['text'], id=condition['id']) self.AddItem(conditionItem, pos=conditionItem.GetId() - 1) # define loops & if/else items for loop in gxmXml.loops: loopItem = self.GetItem(loop['id'], objType=ModelLoop) loopItem.SetItems(loop['items']) for idx in loop['items']: action = self.GetItem(idx, objType=ModelAction) action.SetBlock(loopItem) for condition in gxmXml.conditions: conditionItem = self.GetItem(condition['id']) for b in condition['items'].keys(): alist = list() for aId in condition['items'][b]: action = self.GetItem(aId) alist.append(action) conditionItem.SetItems(alist, branch=b) items = conditionItem.GetItems() for b in items.keys(): for action in items[b]: action.SetBlock(conditionItem) # load comments for comment in gxmXml.comments: commentItem = ModelComment(parent=self, x=comment['pos'][0], y=comment['pos'][1], width=comment['size'][0], height=comment['size'][1], id=comment['id'], label=comment['text']) self.AddItem(commentItem, pos=commentItem.GetId() - 1) def AddItem(self, newItem, pos=-1): """Add item to the list""" if pos != -1: self.items.insert(pos, newItem) else: self.items.append(newItem) # i = 1 # for item in self.items: # item.SetId(i) # i += 1 def IsValid(self): """Return True if model is valid""" if self.Validate(): return False return True def Validate(self): """Validate model, return None if model is valid otherwise error string""" errList = list() variables = self.GetVariables().keys() pattern = re.compile(r'(.*)(%.+\s?)(.*)') for action in self.GetItems(objType=ModelAction): cmd = action.GetLog(string=False) task = GUI(show=None).ParseCommand(cmd=cmd) errList += map(lambda x: cmd[0] + ': ' + x, task.get_cmd_error()) # check also variables for opt in cmd[1:]: if '=' not in opt: continue key, value = opt.split('=', 1) sval = pattern.search(value) if sval: var = sval.group(2).strip()[1:] # strip '%' from beginning found = False for v in variables: if var.startswith(v): found = True break if not found: report = True for item in filter( lambda x: isinstance(x, ModelLoop), action.GetBlock()): if var in item.GetLabel(): report = False break if report: errList.append( cmd[0] + ": " + _("undefined variable '%s'") % var) # TODO: check variables in file only optionally ### errList += self._substituteFile(action, checkOnly = True) return errList def _substituteFile(self, item, params=None, checkOnly=False): """Subsitute variables in command file inputs :param bool checkOnly: tuble - True to check variable, don't touch files :return: list of undefined variables """ errList = list() self.fileInput = dict() # collect ascii inputs for p in item.GetParams()['params']: if p.get('element', '') == 'file' and \ p.get('prompt', '') == 'input' and \ p.get('age', '') == 'old': filename = p.get('value', p.get('default', '')) if filename and \ mimetypes.guess_type(filename)[0] == 'text/plain': self.fileInput[filename] = None for finput in self.fileInput: # read lines fd = open(finput, "r") try: data = self.fileInput[finput] = fd.read() finally: fd.close() # substitute variables write = False variables = self.GetVariables() for variable in variables: pattern = re.compile('%' + variable) value = '' if params and 'variables' in params: for p in params['variables']['params']: if variable == p.get('name', ''): if p.get('type', 'string') == 'string': value = p.get('value', '') else: value = str(p.get('value', '')) break if not value: value = variables[variable].get('value', '') data = pattern.sub(value, data) if not checkOnly: write = True pattern = re.compile(r'(.*)(%.+\s?)(.*)') sval = pattern.search(data) if sval: var = sval.group(2).strip()[1:] # ignore '%' cmd = item.GetLog(string=False)[0] errList.append(cmd + ": " + _("undefined variable '%s'") % var) if not checkOnly: if write: fd = open(finput, "w") try: fd.write(data) finally: fd.close() else: self.fileInput[finput] = None return errList def OnPrepare(self, item, params): self._substituteFile(item, params, checkOnly=False) def RunAction(self, item, params, log, onDone, onPrepare=None, statusbar=None): """Run given action :param item: action item :param params: parameters dict :param log: logging window :param onDone: on-done method :param onPrepare: on-prepare method :param statusbar: wx.StatusBar instance or None """ name = '({0}) {1}'.format(item.GetId(), item.GetLabel()) if name in params: paramsOrig = item.GetParams(dcopy=True) item.MergeParams(params[name]) if statusbar: statusbar.SetStatusText(_('Running model...'), 0) data = {'item': item, 'params': copy.deepcopy(params)} log.RunCmd(command=item.GetLog(string=False, substitute=params), onDone=onDone, onPrepare=self.OnPrepare, userData=data) if name in params: item.SetParams(paramsOrig) def Run(self, log, onDone, parent=None): """Run model :param log: logging window (see gconsole.GConsole) :param onDone: on-done method :param parent: window for messages or None """ if self.GetNumItems() < 1: GMessage(parent=parent, message=_('Model is empty. Nothing to run.')) return statusbar = None if isinstance(parent, wx.Frame): statusbar = parent.GetStatusBar() # validation if statusbar: statusbar.SetStatusText(_('Validating model...'), 0) errList = self.Validate() if statusbar: statusbar.SetStatusText('', 0) if errList: dlg = wx.MessageDialog( parent=parent, message=_( 'Model is not valid. Do you want to ' 'run the model anyway?\n\n%s') % '\n'.join(errList), caption=_("Run model?"), style=wx.YES_NO | wx.NO_DEFAULT | wx.ICON_QUESTION | wx.CENTRE) ret = dlg.ShowModal() dlg.Destroy() if ret != wx.ID_YES: return # parametrization params = self.Parameterize() delInterData = False if params: dlg = ModelParamDialog(parent=parent, model=self, params=params) dlg.CenterOnParent() ret = dlg.ShowModal() if ret != wx.ID_OK: dlg.Destroy() return err = dlg.GetErrors() delInterData = dlg.DeleteIntermediateData() dlg.Destroy() if err: GError(parent=parent, message=unicode('\n'.join(err))) return err = list() for key, item in params.iteritems(): for p in item['params']: if p.get('value', '') == '': err.append( (key, p.get( 'name', ''), p.get( 'description', ''))) if err: GError( parent=parent, message=_("Variables below not defined:") + "\n\n" + unicode( '\n'.join( map( lambda x: "%s: %s (%s)" % (x[0], x[1], x[2]), err)))) return log.cmdThread.SetId(-1) for item in self.GetItems(): if not item.IsEnabled(): continue if isinstance(item, ModelAction): if item.GetBlockId(): continue self.RunAction(item, params, log, onDone) elif isinstance(item, ModelLoop): cond = item.GetLabel() # substitute variables in condition variables = self.GetVariables() for variable in variables: pattern = re.compile('%' + variable) if pattern.search(cond): value = '' if params and 'variables' in params: for p in params['variables']['params']: if variable == p.get('name', ''): value = p.get('value', '') break if not value: value = variables[variable].get('value', '') if not value: continue vtype = variables[variable].get('type', 'string') if vtype == 'string': value = '"' + value + '"' cond = pattern.sub(value, cond) # split condition # TODO: this part needs some better solution condVar, condText = map( lambda x: x.strip(), re.split('\s* in \s*', cond)) pattern = re.compile('%' + condVar) # for vars()[condVar] in eval(condText): ? vlist = list() if condText[0] == '`' and condText[-1] == '`': # run command cmd, dcmd = gtask.cmdlist_to_tuple( condText[1: -1].split(' ')) ret = RunCommand(cmd, read=True, **dcmd) if ret: vlist = ret.splitlines() else: vlist = eval(condText) if 'variables' not in params: params['variables'] = {'params': []} varDict = {'name': condVar, 'value': ''} params['variables']['params'].append(varDict) for var in vlist: for action in item.GetItems(self.GetItems()): if not action.IsEnabled(): continue varDict['value'] = var self.RunAction(item=action, params=params, log=log, onDone=onDone) params['variables']['params'].remove(varDict) if delInterData: self.DeleteIntermediateData(log) # discard values if params: for item in params.itervalues(): for p in item['params']: p['value'] = '' def DeleteIntermediateData(self, log): """Detele intermediate data""" rast, vect, rast3d, msg = self.GetIntermediateData() if rast: log.RunCmd(['g.remove', '-f', 'type=raster', 'name=%s' % ','.join(rast)]) if rast3d: log.RunCmd(['g.remove', '-f', 'type=raster_3d', 'name=%s' % ','.join(rast3d)]) if vect: log.RunCmd(['g.remove', '-f', 'type=vector', 'name=%s' % ','.join(vect)]) def GetIntermediateData(self): """Get info about intermediate data""" rast = list() rast3d = list() vect = list() for data in self.GetData(): if not data.IsIntermediate(): continue name = data.GetValue() prompt = data.GetPrompt() if prompt == 'raster': rast.append(name) elif prompt == 'vector': vect.append(name) elif prompt == 'raster_3d': rast3d.append(name) msg = '' if rast: msg += '\n\n%s: ' % _('Raster maps') msg += ', '.join(rast) if rast3d: msg += '\n\n%s: ' % _('3D raster maps') msg += ', '.join(rast3d) if vect: msg += '\n\n%s: ' % _('Vector maps') msg += ', '.join(vect) return rast, vect, rast3d, msg def Update(self): """Update model""" for item in self.items: item.Update() def IsParameterized(self): """Return True if model is parameterized""" if self.Parameterize(): return True return False def Parameterize(self): """Return parameterized options""" result = dict() idx = 0 if self.variables: params = list() result["variables"] = {'flags': list(), 'params': params, 'idx': idx} for name, values in self.variables.iteritems(): gtype = values.get('type', 'string') if gtype in ('raster', 'vector', 'mapset', 'file', 'region', 'dir'): gisprompt = True prompt = gtype if gtype == 'raster': element = 'cell' else: element = gtype ptype = 'string' else: gisprompt = False prompt = None element = None ptype = gtype params.append({'gisprompt': gisprompt, 'multiple': False, 'description': values.get('description', ''), 'guidependency': '', 'default': '', 'age': None, 'required': True, 'value': values.get('value', ''), 'label': '', 'guisection': '', 'key_desc': '', 'values': list(), 'parameterized': False, 'values_desc': list(), 'prompt': prompt, 'element': element, 'type': ptype, 'name': name}) idx += 1 for action in self.GetItems(objType=ModelAction): if not action.IsEnabled(): continue name = '({0}) {1}'.format(action.GetId(), action.GetLabel()) params = action.GetParams() increment = False for f in params['flags']: if f.get('parameterized', False): if name not in result: increment = True result[name] = {'flags': list(), 'params': list(), 'idx': idx} result[name]['flags'].append(f) for p in params['params']: if p.get('parameterized', False): if name not in result: increment = True result[name] = {'flags': list(), 'params': list(), 'idx': idx} result[name]['params'].append(p) if increment: idx += 1 self.variablesParams = result # record parameters return result class ModelObject(object): def __init__(self, id=-1, label=''): self.id = id # internal id, should be not changed self.label = '' self.rels = list() # list of ModelRelations self.isEnabled = True self.inBlock = list() # list of related loops/conditions def __del__(self): pass def GetLabel(self): """Get label""" return self.label def SetLabel(self, label=''): """Set label""" self.label = label def GetId(self): """Get id""" return self.id def SetId(self, newId): """Set id""" if self.inBlock: for loop in self.inBlock: # update block item loop.UpdateItem(self.id, newId) self.id = newId def AddRelation(self, rel): """Record new relation """ self.rels.append(rel) def GetRelations(self, fdir=None): """Get list of relations :param bool fdir: True for 'from' """ if fdir is None: return self.rels result = list() for rel in self.rels: if fdir == 'from': if rel.GetFrom() == self: result.append(rel) else: if rel.GetTo() == self: result.append(rel) return result def IsEnabled(self): """Get True if action is enabled, otherwise False""" return self.isEnabled def Enable(self, enabled=True): """Enable/disable action""" self.isEnabled = enabled self.Update() def Update(self): pass def SetBlock(self, item): """Add object to the block (loop/condition) :param item: reference to ModelLoop or ModelCondition which defines loops/condition """ if item not in self.inBlock: self.inBlock.append(item) def UnSetBlock(self, item): """Remove object from the block (loop/consition) :param item: reference to ModelLoop or ModelCondition which defines loops/codition """ if item in self.inBlock: self.inBlock.remove(item) def GetBlock(self): """Get list of related ModelObject(s) which defines block (loop/condition) :return: list of ModelObjects """ return self.inBlock def GetBlockId(self): """Get list of related ids which defines block :return: list of ids """ ret = list() for mo in self.inBlock: ret.append(mo.GetId()) return ret class ModelAction(ModelObject, ogl.DividedShape): """Action class (GRASS module)""" def __init__(self, parent, x, y, id=-1, cmd=None, task=None, width=None, height=None, label=None, comment=''): ModelObject.__init__(self, id, label) self.parent = parent self.task = task self.comment = comment if not width: width = UserSettings.Get( group='modeler', key='action', subkey=( 'size', 'width')) if not height: height = UserSettings.Get( group='modeler', key='action', subkey=( 'size', 'height')) if cmd: self.task = GUI(show=None).ParseCommand(cmd=cmd) else: if task: self.task = task else: self.task = None self.propWin = None self.data = list() # list of connected data items self.isValid = False self.isParameterized = False if self.parent.GetCanvas(): ogl.DividedShape.__init__(self, width, height) self.regionLabel = ogl.ShapeRegion() self.regionLabel.SetFormatMode( ogl.FORMAT_CENTRE_HORIZ | ogl.FORMAT_CENTRE_VERT) self.AddRegion(self.regionLabel) self.regionComment = None self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self._setPen() self._setBrush() self.SetLabel(label) if comment: self.SetComment(comment) self.SetRegionSizes() self.ReformatRegions() if self.task: self.SetValid(self.task.get_options()) def _setBrush(self, running=False): """Set brush""" if running: color = UserSettings.Get(group='modeler', key='action', subkey=('color', 'running')) elif not self.isEnabled: color = UserSettings.Get(group='modeler', key='disabled', subkey='color') elif self.isValid: color = UserSettings.Get(group='modeler', key='action', subkey=('color', 'valid')) else: color = UserSettings.Get(group='modeler', key='action', subkey=('color', 'invalid')) wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def _setPen(self): """Set pen""" if self.isParameterized: width = int(UserSettings.Get(group='modeler', key='action', subkey=('width', 'parameterized'))) else: width = int(UserSettings.Get(group='modeler', key='action', subkey=('width', 'default'))) if self.isEnabled: style = wx.SOLID else: style = wx.DOT pen = wx.Pen(wx.BLACK, width, style) self.SetPen(pen) def ReformatRegions(self): rnum = 0 canvas = self.parent.GetCanvas() dc = wx.ClientDC(canvas) # used for measuring for region in self.GetRegions(): text = region.GetText() self.FormatText(dc, text, rnum) rnum += 1 def OnSizingEndDragLeft(self, pt, x, y, keys, attch): ogl.DividedShape.OnSizingEndDragLeft(self, pt, x, y, keys, attch) self.SetRegionSizes() self.ReformatRegions() self.GetCanvas().Refresh() def SetLabel(self, label=None): """Set label :param label: if None use command string instead """ if label: self.label = label elif self.label: label = self.label else: try: label = self.task.get_cmd(ignoreErrors=True)[0] except: label = _("unknown") idx = self.GetId() self.regionLabel.SetText('(%d) %s' % (idx, label)) self.SetRegionSizes() self.ReformatRegions() def SetComment(self, comment): """Set comment""" self.comment = comment if self.regionComment is None: self.regionComment = ogl.ShapeRegion() self.regionComment.SetFormatMode(ogl.FORMAT_CENTRE_HORIZ) font = wx.SystemSettings.GetFont(wx.SYS_DEFAULT_GUI_FONT) font.SetStyle(wx.ITALIC) self.regionComment.SetFont(font) # clear doesn't work # self.regionComment.ClearText() self.regionComment.SetText(comment) self.ClearRegions() self.AddRegion(self.regionLabel) self.regionLabel.SetProportions(0.0, 1.0) if self.comment: self.AddRegion(self.regionComment) self.regionLabel.SetProportions(0.0, 0.4) self.SetRegionSizes() self.ReformatRegions() def GetComment(self): """Get comment""" return self.comment def SetProperties(self, params, propwin): """Record properties dialog""" self.task.params = params['params'] self.task.flags = params['flags'] self.propWin = propwin def GetPropDialog(self): """Get properties dialog""" return self.propWin def GetLog(self, string=True, substitute=None): """Get logging info :param string: True to get cmd as a string otherwise a list :param substitute: dictionary of parameter to substitute or None """ cmd = self.task.get_cmd(ignoreErrors=True, ignoreRequired=True, ignoreDefault=False) # substitute variables if substitute: variables = [] if 'variables' in substitute: for p in substitute['variables']['params']: variables.append(p.get('name', '')) else: variables = self.parent.GetVariables() # order variables by length for variable in sorted(variables, key=len, reverse=True): pattern = re.compile('%' + variable) value = '' if substitute and 'variables' in substitute: for p in substitute['variables']['params']: if variable == p.get('name', ''): if p.get('type', 'string') == 'string': value = p.get('value', '') else: value = str(p.get('value', '')) break if not value: value = variables[variable].get('value', '') if not value: continue for idx in range(len(cmd)): if pattern.search(cmd[idx]): cmd[idx] = pattern.sub(value, cmd[idx]) idx += 1 if string: if cmd is None: return '' else: return ' '.join(cmd) return cmd def GetLabel(self): """Get name""" if self.label: return self.label cmd = self.task.get_cmd(ignoreErrors=True) if cmd and len(cmd) > 0: return cmd[0] return _('unknown') def GetParams(self, dcopy=False): """Get dictionary of parameters""" if dcopy: return copy.deepcopy(self.task.get_options()) return self.task.get_options() def GetTask(self): """Get grassTask instance""" return self.task def SetParams(self, params): """Set dictionary of parameters""" self.task.params = params['params'] self.task.flags = params['flags'] def MergeParams(self, params): """Merge dictionary of parameters""" if 'flags' in params: for f in params['flags']: self.task.set_flag(f['name'], f.get('value', False)) if 'params' in params: for p in params['params']: self.task.set_param(p['name'], p.get('value', '')) def SetValid(self, options): """Set validity for action :param options: dictionary with flags and params (gtask) """ self.isValid = True for f in options['flags']: if f.get('parameterized', False): self.isParameterized = True break for p in options['params']: if self.isValid and p.get('required', False) and \ p.get('value', '') == '' and \ p.get('default', '') == '': self.isValid = False if not self.isParameterized and p.get('parameterized', False): self.isParameterized = True if self.parent.GetCanvas(): self._setBrush() self._setPen() def IsValid(self): """Check validity (all required parameters set)""" return self.isValid def IsParameterized(self): """Check if action is parameterized""" return self.isParameterized def GetParameterizedParams(self): """Return parameterized flags and options""" param = {'flags': [], 'params': []} options = self.GetParams() for f in options['flags']: if f.get('parameterized', False): param['flags'].append(f) for p in options['params']: if p.get('parameterized', False): param['params'].append(p) return param def FindData(self, name): """Find data item by name""" for rel in self.GetRelations(): data = rel.GetData() if name == rel.GetLabel() and name in data.GetLabel(): return data return None def Update(self, running=False): """Update action""" if running: self._setBrush(running=True) else: self._setBrush() self._setPen() def OnDraw(self, dc): """Draw action in canvas""" self._setBrush() self._setPen() ogl.RectangleShape.Recentre(self, dc) # re-center text ogl.RectangleShape.OnDraw(self, dc) class ModelData(ModelObject, ogl.EllipseShape): def __init__(self, parent, x, y, value='', prompt='', width=None, height=None): """Data item class :param parent: window parent :param x, y: position of the shape :param fname, tname: list of parameter names from / to :param value: value :param prompt: type of GIS element :param width, height: dimension of the shape """ ModelObject.__init__(self) self.parent = parent self.value = value self.prompt = prompt self.intermediate = False self.propWin = None if not width: width = UserSettings.Get( group='modeler', key='data', subkey=( 'size', 'width')) if not height: height = UserSettings.Get( group='modeler', key='data', subkey=( 'size', 'height')) if self.parent.GetCanvas(): ogl.EllipseShape.__init__(self, width, height) self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self._setPen() self._setBrush() self.SetLabel() def IsIntermediate(self): """Checks if data item is intermediate""" return self.intermediate def SetIntermediate(self, im): """Set intermediate flag""" self.intermediate = im def OnDraw(self, dc): self._setPen() ogl.EllipseShape.OnDraw(self, dc) def GetLog(self, string=True): """Get logging info""" name = list() for rel in self.GetRelations(): name.append(rel.GetLabel()) if name: return '/'.join(name) + '=' + self.value + ' (' + self.prompt + ')' else: return self.value + ' (' + self.prompt + ')' def GetLabel(self): """Get list of names""" name = list() for rel in self.GetRelations(): name.append(rel.GetLabel()) return name def GetPrompt(self): """Get prompt""" return self.prompt def SetPrompt(self, prompt): """Set prompt :param prompt: """ self.prompt = prompt def GetValue(self): """Get value""" return self.value def SetValue(self, value): """Set value :param value: """ self.value = value self.SetLabel() for direction in ('from', 'to'): for rel in self.GetRelations(direction): if direction == 'from': action = rel.GetTo() else: action = rel.GetFrom() task = GUI( show=None).ParseCommand( cmd=action.GetLog( string=False)) task.set_param(rel.GetLabel(), self.value) action.MergeParams(task.get_options()) def GetPropDialog(self): """Get properties dialog""" return self.propWin def SetPropDialog(self, win): """Get properties dialog""" self.propWin = win def _setBrush(self): """Set brush""" if self.prompt == 'raster': color = UserSettings.Get(group='modeler', key='data', subkey=('color', 'raster')) elif self.prompt == 'raster_3d': color = UserSettings.Get(group='modeler', key='data', subkey=('color', 'raster3d')) elif self.prompt == 'vector': color = UserSettings.Get(group='modeler', key='data', subkey=('color', 'vector')) elif self.prompt == 'dbtable': color = UserSettings.Get(group='modeler', key='data', subkey=('color', 'dbtable')) else: color = UserSettings.Get(group='modeler', key='action', subkey=('color', 'invalid')) wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def _setPen(self): """Set pen""" isParameterized = False for rel in self.GetRelations('from'): if rel.GetTo().IsParameterized(): isParameterized = True break if not isParameterized: for rel in self.GetRelations('to'): if rel.GetFrom().IsParameterized(): isParameterized = True break if isParameterized: width = int(UserSettings.Get(group='modeler', key='action', subkey=('width', 'parameterized'))) else: width = int(UserSettings.Get(group='modeler', key='action', subkey=('width', 'default'))) if self.intermediate: style = wx.DOT else: style = wx.SOLID pen = wx.Pen(wx.BLACK, width, style) self.SetPen(pen) def SetLabel(self): """Update text""" self.ClearText() name = [] for rel in self.GetRelations(): name.append(rel.GetLabel()) self.AddText('/'.join(name)) if self.value: self.AddText(self.value) else: self.AddText(_('')) def Update(self): """Update action""" self._setBrush() self._setPen() self.SetLabel() class ModelRelation(ogl.LineShape): """Data - action relation""" def __init__(self, parent, fromShape, toShape, param=''): self.fromShape = fromShape self.toShape = toShape self.param = param self.parent = parent self._points = None if self.parent.GetCanvas(): ogl.LineShape.__init__(self) def __del__(self): if self in self.fromShape.rels: self.fromShape.rels.remove(self) if self in self.toShape.rels: self.toShape.rels.remove(self) def GetFrom(self): """Get id of 'from' shape""" return self.fromShape def GetTo(self): """Get id of 'to' shape""" return self.toShape def GetData(self): """Get related ModelData instance :return: ModelData instance :return: None if not found """ if isinstance(self.fromShape, ModelData): return self.fromShape elif isinstance(self.toShape, ModelData): return self.toShape return None def GetLabel(self): """Get parameter name""" return self.param def ResetShapes(self): """Reset related objects""" self.fromShape.ResetControlPoints() self.toShape.ResetControlPoints() self.ResetControlPoints() def SetControlPoints(self, points): """Set control points""" self._points = points def GetControlPoints(self): """Get list of control points""" return self._points def _setPen(self): """Set pen""" pen = wx.Pen(wx.BLACK, 1, wx.SOLID) self.SetPen(pen) def OnDraw(self, dc): """Draw relation""" self._setPen() ogl.LineShape.OnDraw(self, dc) def SetName(self, param): self.param = param class ModelItem(ModelObject): def __init__(self, parent, x, y, id=-1, width=None, height=None, label='', items=[]): """Abstract class for loops and conditions""" ModelObject.__init__(self, id, label) self.parent = parent def _setPen(self): """Set pen""" if self.isEnabled: style = wx.SOLID else: style = wx.DOT pen = wx.Pen(wx.BLACK, 1, style) self.SetPen(pen) def SetId(self, id): """Set loop id""" self.id = id def SetLabel(self, label=''): """Set loop text (condition)""" if label: self.label = label self.ClearText() self.AddText('(' + str(self.id) + ') ' + self.label) def GetLog(self): """Get log info""" if self.label: return _("Condition: ") + self.label else: return _("Condition: not defined") def AddRelation(self, rel): """Record relation""" self.rels.append(rel) def Clear(self): """Clear object, remove rels""" self.rels = list() class ModelLoop(ModelItem, ogl.RectangleShape): def __init__(self, parent, x, y, id=-1, idx=-1, width=None, height=None, label='', items=[]): """Defines a loop""" ModelItem.__init__(self, parent, x, y, id, width, height, label, items) self.itemIds = list() # unordered if not width: width = UserSettings.Get( group='modeler', key='loop', subkey=( 'size', 'width')) if not height: height = UserSettings.Get( group='modeler', key='loop', subkey=( 'size', 'height')) if self.parent.GetCanvas(): ogl.RectangleShape.__init__(self, width, height) self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self._setPen() self._setBrush() self.SetCornerRadius(100) self.SetLabel(label) def _setBrush(self): """Set brush""" if not self.isEnabled: color = UserSettings.Get(group='modeler', key='disabled', subkey='color') else: color = UserSettings.Get(group='modeler', key='loop', subkey=('color', 'valid')) wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def Enable(self, enabled=True): """Enable/disable action""" for idx in self.itemIds: item = self.parent.FindAction(idx) if item: item.Enable(enabled) ModelObject.Enable(self, enabled) self.Update() def Update(self): self._setPen() self._setBrush() def GetItems(self, items): """Get sorted items by id""" result = list() for item in items: if item.GetId() in self.itemIds: result.append(item) return result def SetItems(self, items): """Set items (id)""" self.itemIds = items def UpdateItem(self, oldId, newId): """Update item in the list""" idx = self.itemIds.index(oldId) if idx != -1: self.itemIds[idx] = newId def OnDraw(self, dc): """Draw loop in canvas""" self._setBrush() ogl.RectangleShape.Recentre(self, dc) # re-center text ogl.RectangleShape.OnDraw(self, dc) class ModelCondition(ModelItem, ogl.PolygonShape): def __init__(self, parent, x, y, id=-1, width=None, height=None, label='', items={'if': [], 'else': []}): """Defines a if-else condition""" ModelItem.__init__(self, parent, x, y, id, width, height, label, items) self.itemIds = {'if': [], 'else': []} if not width: self.width = UserSettings.Get( group='modeler', key='if-else', subkey=( 'size', 'width')) else: self.width = width if not height: self.height = UserSettings.Get( group='modeler', key='if-else', subkey=( 'size', 'height')) else: self.height = height if self.parent.GetCanvas(): ogl.PolygonShape.__init__(self) points = [(0, - self.height / 2), (self.width / 2, 0), (0, self.height / 2), (- self.width / 2, 0)] self.Create(points) self.SetCanvas(self.parent) self.SetX(x) self.SetY(y) self.SetPen(wx.BLACK_PEN) if label: self.AddText('(' + str(self.id) + ') ' + label) else: self.AddText('(' + str(self.id) + ')') def GetLabel(self): """Get name""" return _("if-else") def GetWidth(self): """Get object width""" return self.width def GetHeight(self): """Get object height""" return self.height def GetItems(self, items): """Get sorted items by id""" result = {'if': [], 'else': []} for item in items: if item.GetId() in self.itemIds['if']: result['if'].append(item) elif item.GetId() in self.itemIds['else']: result['else'].append(item) return result def SetItems(self, items, branch='if'): """Set items (id) :param items: list of items :param branch: 'if' / 'else' """ if branch in ['if', 'else']: self.itemIds[branch] = items class ModelComment(ModelObject, ogl.RectangleShape): def __init__(self, parent, x, y, id=-1, width=None, height=None, label=''): """Defines a model comment""" ModelObject.__init__(self, id, label) if not width: width = UserSettings.Get( group='modeler', key='comment', subkey=( 'size', 'width')) if not height: height = UserSettings.Get( group='modeler', key='comment', subkey=( 'size', 'height')) if parent.GetCanvas(): ogl.RectangleShape.__init__(self, width, height) self.SetCanvas(parent) self.SetX(x) self.SetY(y) font = wx.SystemSettings.GetFont(wx.SYS_DEFAULT_GUI_FONT) font.SetStyle(wx.ITALIC) self.SetFont(font) self._setPen() self._setBrush() self.SetLabel(label) def _setBrush(self, running=False): """Set brush""" color = UserSettings.Get(group='modeler', key='comment', subkey='color') wxColor = wx.Colour(color[0], color[1], color[2]) self.SetBrush(wx.Brush(wxColor)) def _setPen(self): """Set pen""" pen = wx.Pen(wx.BLACK, 1, wx.DOT) self.SetPen(pen) def SetLabel(self, label=None): """Set label :param label: if None use command string instead """ if label: self.label = label elif self.label: label = self.label else: label = '' idx = self.GetId() self.ClearText() self.AddText('(%d) %s' % (idx, label)) def GetComment(self): return self.GetLabel() def SetComment(self, comment): self.SetLabel(comment) self.GetCanvas().Refresh() class ProcessModelFile: """Process GRASS model file (gxm)""" def __init__(self, tree): """A ElementTree handler for the GXM XML file, as defined in grass-gxm.dtd. """ self.tree = tree self.root = self.tree.getroot() # check if input is a valid GXM file if self.root is None or self.root.tag != 'gxm': if self.root is not None: tagName = self.root.tag else: tabName = _("empty") raise GException( _("Details: unsupported tag name '{}'.").format(tagName)) # list of actions, data self.properties = dict() self.variables = dict() self.actions = list() self.data = list() self.loops = list() self.conditions = list() self.comments = list() self._processWindow() self._processProperties() self._processVariables() self._processItems() self._processData() def _filterValue(self, value): """Filter value :param value: """ value = value.replace('<', '<') value = value.replace('>', '>') return value def _getNodeText(self, node, tag, default=''): """Get node text""" p = node.find(tag) if p is not None: if p.text: return utils.normalize_whitespace(p.text) else: return '' return default def _processWindow(self): """Process window properties""" node = self.root.find('window') if node is None: self.pos = self.size = None return self.pos, self.size = self._getDim(node) def _processProperties(self): """Process model properties""" node = self.root.find('properties') if node is None: return for key in ('name', 'description', 'author'): self._processProperty(node, key) for f in node.findall('flag'): name = f.get('name', '') if name == 'overwrite': self.properties['overwrite'] = True def _processProperty(self, pnode, name): """Process given property""" node = pnode.find(name) if node is not None: self.properties[name] = node.text else: self.properties[name] = '' def _processVariables(self): """Process model variables""" vnode = self.root.find('variables') if vnode is None: return for node in vnode.findall('variable'): name = node.get('name', '') if not name: continue # should not happen self.variables[name] = {'type': node.get('type', 'string')} for key in ('description', 'value'): self._processVariable(node, name, key) def _processVariable(self, pnode, name, key): """Process given variable""" node = pnode.find(key) if node is not None: if node.text: self.variables[name][key] = node.text def _processItems(self): """Process model items (actions, loops, conditions)""" self._processActions() self._processLoops() self._processConditions() self._processComments() def _processActions(self): """Process model file""" for action in self.root.findall('action'): pos, size = self._getDim(action) disabled = False task = action.find('task') if task is not None: if task.find('disabled') is not None: disabled = True task = self._processTask(task) else: task = None aId = int(action.get('id', -1)) label = action.get('name') comment = action.find('comment') if comment is not None: commentString = comment.text else: commentString = '' self.actions.append({'pos': pos, 'size': size, 'task': task, 'id': aId, 'disabled': disabled, 'label': label, 'comment': commentString}) def _getDim(self, node): """Get position and size of shape""" pos = size = None posAttr = node.get('pos', None) if posAttr: posVal = map(int, posAttr.split(',')) try: pos = (posVal[0], posVal[1]) except: pos = None sizeAttr = node.get('size', None) if sizeAttr: sizeVal = map(int, sizeAttr.split(',')) try: size = (sizeVal[0], sizeVal[1]) except: size = None return pos, size def _processData(self): """Process model file""" for data in self.root.findall('data'): pos, size = self._getDim(data) param = data.find('data-parameter') prompt = value = None if param is not None: prompt = param.get('prompt', None) value = self._filterValue(self._getNodeText(param, 'value')) if data.find('intermediate') is None: intermediate = False else: intermediate = True rels = list() for rel in data.findall('relation'): defrel = {'id': int(rel.get('id', -1)), 'dir': rel.get('dir', 'to'), 'name': rel.get('name', '')} points = list() for point in rel.findall('point'): x = self._filterValue(self._getNodeText(point, 'x')) y = self._filterValue(self._getNodeText(point, 'y')) points.append((float(x), float(y))) defrel['points'] = points rels.append(defrel) self.data.append({'pos': pos, 'size': size, 'prompt': prompt, 'value': value, 'intermediate': intermediate, 'rels': rels}) def _processTask(self, node): """Process task :return: grassTask instance :return: None on error """ cmd = list() parameterized = list() name = node.get('name', None) if not name: return None cmd.append(name) # flags for f in node.findall('flag'): flag = f.get('name', '') if f.get('parameterized', '0') == '1': parameterized.append(('flag', flag)) if f.get('value', '1') == '0': continue if len(flag) > 1: cmd.append('--' + flag) else: cmd.append('-' + flag) # parameters for p in node.findall('parameter'): name = p.get('name', '') if p.find('parameterized') is not None: parameterized.append(('param', name)) cmd.append( '%s=%s' % (name, self._filterValue( self._getNodeText( p, 'value')))) task, err = GUI(show=None, checkError=True).ParseCommand(cmd=cmd) if err: GWarning(os.linesep.join(err)) for opt, name in parameterized: if opt == 'flag': task.set_flag(name, True, element='parameterized') else: task.set_param(name, True, element='parameterized') return task def _processLoops(self): """Process model loops""" for node in self.root.findall('loop'): pos, size = self._getDim(node) text = self._filterValue( self._getNodeText( node, 'condition')).strip() aid = list() for anode in node.findall('item'): try: aid.append(int(anode.text)) except ValueError: pass self.loops.append({'pos': pos, 'size': size, 'text': text, 'id': int(node.get('id', -1)), 'items': aid}) def _processConditions(self): """Process model conditions""" for node in self.root.findall('if-else'): pos, size = self._getDim(node) text = self._filterValue( self._getNodeText( node, 'condition')).strip() aid = {'if': list(), 'else': list()} for b in aid.keys(): bnode = node.find(b) if bnode is None: continue for anode in bnode.findall('item'): try: aid[b].append(int(anode.text)) except ValueError: pass self.conditions.append({'pos': pos, 'size': size, 'text': text, 'id': int(node.get('id', -1)), 'items': aid}) def _processComments(self): """Process model comments""" for node in self.root.findall('comment'): pos, size = self._getDim(node) text = self._filterValue(node.text) self.comments.append({'pos': pos, 'size': size, 'text': text, 'id': int(node.get('id', -1)), 'text': text}) class WriteModelFile: """Generic class for writing model file""" def __init__(self, fd, model): self.fd = fd self.model = model self.properties = model.GetProperties() self.variables = model.GetVariables() self.items = model.GetItems() self.indent = 0 self._header() self._window() self._properties() self._variables() self._items() dataList = list() for action in model.GetItems(objType=ModelAction): for rel in action.GetRelations(): dataItem = rel.GetData() if dataItem not in dataList: dataList.append(dataItem) self._data(dataList) self._footer() def _filterValue(self, value): """Escapes value to be stored in XML. :param value: string to be escaped as XML :return: a XML-valid string """ value = saxutils.escape(value) return value def _header(self): """Write header""" self.fd.write( '\n' % GetDefaultEncoding( forceUTF8=True)) self.fd.write('\n') self.fd.write('%s\n' % (' ' * self.indent)) self.indent += 4 def _footer(self): """Write footer""" self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _window(self): """Write window properties""" canvas = self.model.GetCanvas() if canvas is None: return win = canvas.parent pos = win.GetPosition() size = win.GetSize() self.fd.write('%s\n' % (' ' * self.indent, pos[0], pos[1], size[0], size[1])) def _properties(self): """Write model properties""" self.fd.write('%s\n' % (' ' * self.indent)) self.indent += 4 if self.properties['name']: self.fd.write( '%s%s\n' % (' ' * self.indent, EncodeString( self.properties['name']))) if self.properties['description']: self.fd.write( '%s%s\n' % (' ' * self.indent, EncodeString( self.properties['description']))) if self.properties['author']: self.fd.write( '%s%s\n' % (' ' * self.indent, EncodeString( self.properties['author']))) if 'overwrite' in self.properties and \ self.properties['overwrite']: self.fd.write( '%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _variables(self): """Write model variables""" if not self.variables: return self.fd.write('%s\n' % (' ' * self.indent)) self.indent += 4 for name, values in self.variables.iteritems(): self.fd.write( '%s\n' % (' ' * self.indent, EncodeString(name), values['type'])) self.indent += 4 if 'value' in values: self.fd.write('%s%s\n' % (' ' * self.indent, EncodeString(values['value']))) if 'description' in values: self.fd.write( '%s%s\n' % (' ' * self.indent, EncodeString(values['description']))) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _items(self): """Write actions/loops/conditions""" for item in self.items: if isinstance(item, ModelAction): self._action(item) elif isinstance(item, ModelLoop): self._loop(item) elif isinstance(item, ModelCondition): self._condition(item) elif isinstance(item, ModelComment): self._comment(item) def _action(self, action): """Write actions""" self.fd.write( '%s\n' % (' ' * self.indent, action.GetId(), EncodeString( action.GetLabel()), action.GetX(), action.GetY(), action.GetWidth(), action.GetHeight())) self.indent += 4 comment = action.GetComment() if comment: self.fd.write( '%s%s\n' % (' ' * self.indent, EncodeString(comment))) self.fd.write('%s\n' % (' ' * self.indent, action.GetLog(string=False)[0])) self.indent += 4 if not action.IsEnabled(): self.fd.write('%s\n' % (' ' * self.indent)) for key, val in action.GetParams().iteritems(): if key == 'flags': for f in val: if f.get('value', False) or f.get('parameterized', False): if f.get('parameterized', False): if f.get('value', False) == False: self.fd.write( '%s\n' % (' ' * self.indent, f.get( 'name', ''))) else: self.fd.write( '%s\n' % (' ' * self.indent, f.get( 'name', ''))) else: self.fd.write( '%s\n' % (' ' * self.indent, f.get('name', ''))) else: # parameter for p in val: if not p.get( 'value', '') and not p.get( 'parameterized', False): continue self.fd.write('%s\n' % (' ' * self.indent, p.get('name', ''))) self.indent += 4 if p.get('parameterized', False): self.fd.write( '%s\n' % (' ' * self.indent)) self.fd.write( '%s%s\n' % (' ' * self.indent, self._filterValue( p.get( 'value', '')))) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _data(self, dataList): """Write data""" for data in dataList: self.fd.write('%s\n' % (' ' * self.indent, data.GetX(), data.GetY(), data.GetWidth(), data.GetHeight())) self.indent += 4 self.fd.write('%s\n' % (' ' * self.indent, data.GetPrompt())) self.indent += 4 self.fd.write( '%s%s\n' % (' ' * self.indent, self._filterValue( data.GetValue()))) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) if data.IsIntermediate(): self.fd.write('%s\n' % (' ' * self.indent)) # relations for ft in ('from', 'to'): for rel in data.GetRelations(ft): if ft == 'from': aid = rel.GetTo().GetId() else: aid = rel.GetFrom().GetId() self.fd.write('%s\n' % (' ' * self.indent, ft, aid, rel.GetLabel())) self.indent += 4 for point in rel.GetLineControlPoints()[1:-1]: self.fd.write('%s\n' % (' ' * self.indent)) self.indent += 4 x, y = point.Get() self.fd.write( '%s%d\n' % (' ' * self.indent, int(x))) self.fd.write( '%s%d\n' % (' ' * self.indent, int(y))) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _loop(self, loop): """Write loops""" self.fd.write( '%s\n' % (' ' * self.indent, loop.GetId(), loop.GetX(), loop.GetY(), loop.GetWidth(), loop.GetHeight())) self.indent += 4 cond = loop.GetLabel() if cond: self.fd.write('%s%s\n' % (' ' * self.indent, self._filterValue(cond))) for item in loop.GetItems(self.model.GetItems(objType=ModelAction)): self.fd.write('%s%d\n' % (' ' * self.indent, item.GetId())) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _condition(self, condition): """Write conditions""" bbox = condition.GetBoundingBoxMin() self.fd.write( '%s\n' % (' ' * self.indent, condition.GetId(), condition.GetX(), condition.GetY(), bbox[0], bbox[1])) text = condition.GetLabel() self.indent += 4 if text: self.fd.write('%s%s\n' % (' ' * self.indent, self._filterValue(text))) items = condition.GetItems() for b in items.keys(): if len(items[b]) < 1: continue self.fd.write('%s<%s>\n' % (' ' * self.indent, b)) self.indent += 4 for item in items[b]: self.fd.write('%s%d\n' % (' ' * self.indent, item.GetId())) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent, b)) self.indent -= 4 self.fd.write('%s\n' % (' ' * self.indent)) def _comment(self, comment): """Write comment""" self.fd.write( '%s%s\n' % (' ' * self.indent, comment.GetId(), comment.GetX(), comment.GetY(), comment.GetWidth(), comment.GetHeight(), EncodeString( comment.GetLabel()))) class WritePythonFile: def __init__(self, fd, model): """Class for exporting model to Python script :param fd: file descriptor """ self.fd = fd self.model = model self.indent = 4 self._writePython() def _getStandardizedOption(self, string): if string == 'raster': return 'G_OPT_R_MAP' elif string == 'vector': return 'G_OPT_V_MAP' elif string == 'mapset': return 'G_OPT_M_MAPSET' elif string == 'file': return 'G_OPT_F_INPUT' elif string == 'dir': return 'G_OPT_M_DIR' elif string == 'region': return 'G_OPT_M_REGION' return '' def _writePython(self): """Write model to file""" properties = self.model.GetProperties() # header self.fd.write( r"""#!/usr/bin/env python # #%s # # MODULE: %s # # AUTHOR(S): %s # # PURPOSE: %s # # DATE: %s # #%s """ % ('#' * 77, EncodeString( properties['name']), EncodeString( properties['author']), EncodeString( '\n# '.join( properties['description'].splitlines())), time.asctime(), '#' * 77)) # UI self.fd.write( r""" #%%module #%% description: %s #%%end """ % (EncodeString(' '.join(properties['description'].splitlines())))) variables = self.model.GetVariables() for key, data in variables.iteritems(): otype = self._getStandardizedOption(data['type']) self.fd.write( r""" #%%option %s #%% key: %s #%% description: %s #%% required: yes """ % (otype, key, data['description'])) if 'value' in data: self.fd.write("#%% answer: %s\n" % data['value']) self.fd.write("#% end\n") # import modules self.fd.write( r""" import sys import os import atexit from grass.script import parser, run_command """) # cleanup() rast, vect, rast3d, msg = self.model.GetIntermediateData() self.fd.write( r""" def cleanup(): """) if rast: self.fd.write( r""" run_command('g.remove', flags='f', type='raster', name=%s) """ % ','.join(map(lambda x: "'" + x + "'", rast))) if vect: self.fd.write( r""" run_command('g.remove', flags='f', type='vector', name=%s) """ % ','.join(map(lambda x: "'" + x + "'", vect))) if rast3d: self.fd.write( r""" run_command('g.remove', flags='f', type='raster_3d', name=%s) """ % ','.join(map(lambda x: "'" + x + "'", rast3d))) if not rast and not vect and not rast3d: self.fd.write(' pass\n') self.fd.write("\ndef main():\n") for item in self.model.GetItems(): self._writePythonItem(item, variables=self.model.GetVariables()) self.fd.write("\n return 0\n") self.fd.write( r""" if __name__ == "__main__": options, flags = parser() atexit.register(cleanup) sys.exit(main()) """) def _writePythonItem(self, item, ignoreBlock=True, variables={}): """Write model object to Python file""" if isinstance(item, ModelAction): if ignoreBlock and item.GetBlockId(): # ignore items in loops of conditions return self._writePythonAction(item, variables=variables) elif isinstance(item, ModelLoop) or isinstance(item, ModelCondition): # substitute condition cond = item.GetLabel() for variable in self.model.GetVariables(): pattern = re.compile('%' + variable) if pattern.search(cond): value = variables[variable].get('value', '') if variables[variable].get('type', 'string') == 'string': value = '"' + value + '"' cond = pattern.sub(value, cond) if isinstance(item, ModelLoop): condVar, condText = map( lambda x: x.strip(), re.split('\s* in \s*', cond)) cond = "%sfor %s in " % (' ' * self.indent, condVar) if condText[0] == '`' and condText[-1] == '`': task = GUI( show=None).ParseCommand( cmd=utils.split( condText[ 1:- 1])) cond += "grass.read_command(" cond += self._getPythonActionCmd(task, len(cond), variables=[condVar]) + ".splitlines()" else: cond += condText self.fd.write('%s:\n' % cond) self.indent += 4 variablesLoop = variables.copy() variablesLoop[condVar] = None for action in item.GetItems( self.model.GetItems(objType=ModelAction)): self._writePythonItem( action, ignoreBlock=False, variables=variablesLoop) self.indent -= 4 if isinstance(item, ModelCondition): self.fd.write('%sif %s:\n' % (' ' * self.indent, cond)) self.indent += 4 condItems = item.GetItems() for action in condItems['if']: self._writePythonItem(action, ignoreBlock=False) if condItems['else']: self.indent -= 4 self.fd.write('%selse:\n' % (' ' * self.indent)) self.indent += 4 for action in condItems['else']: self._writePythonItem(action, ignoreBlock=False) self.indent += 4 self.fd.write('\n') if isinstance(item, ModelComment): self._writePythonComment(item) def _writePythonAction(self, item, variables={}): """Write model action to Python file""" task = GUI(show=None).ParseCommand(cmd=item.GetLog(string=False)) strcmd = "%srun_command(" % (' ' * self.indent) self.fd.write( strcmd + self._getPythonActionCmd( task, len(strcmd), variables) + '\n') def _getPythonActionCmd(self, task, cmdIndent, variables={}): opts = task.get_options() ret = '' flags = '' params = list() for f in opts['flags']: if f.get('value', False): name = f.get('name', '') if len(name) > 1: params.append('%s = True' % name) else: flags += name for p in opts['params']: name = p.get('name', None) value = p.get('value', None) if name and value: ptype = p.get('type', 'string') foundVar = False for var in sorted(variables, key=len, reverse=True): data = variables[var] if '%' + var in value: value = self._substituteVariable(value, var, data) foundVar = True if foundVar or ptype != 'string': params.append("%s = %s" % (name, value)) else: params.append('%s = "%s"' % (name, value)) ret += '"%s"' % task.get_name() if flags: ret += ",\n%sflags = '%s'" % (' ' * cmdIndent, flags) if len(params) > 0: ret += ",\n" for opt in params[:-1]: ret += "%s%s,\n" % (' ' * cmdIndent, opt) ret += "%s%s)" % (' ' * cmdIndent, params[-1]) else: ret += ")" return ret def _writePythonComment(self, item): """Write model comment to Python file""" for line in item.GetLabel().splitlines(): self.fd.write('#' + line + '\n') def _substituteVariable(self, string, variable, data): """Substitute variable in the string :param string: string to be modified :param variable: variable to be substituted :param data: data related to the variable :return: modified string """ result = '' ss = re.split("\w*(%" + variable + ")w*", string) if not ss[0] and not ss[-1]: if data: return "options['%s']" % variable else: return variable for s in ss: if not s or s == '"': continue if s == '%' + variable: if data: result += "+options['%s']+" % variable else: result += '+%s+' % variable else: result += '"' + s if not s.endswith(']'): # options result += '"' return result.strip('+') class ModelParamDialog(wx.Dialog): def __init__( self, parent, model, params, id=wx.ID_ANY, title=_("Model parameters"), style=wx.DEFAULT_DIALOG_STYLE | wx.RESIZE_BORDER, **kwargs): """Model parameters dialog """ self.parent = parent self._model = model self.params = params self.tasks = list() # list of tasks/pages wx.Dialog.__init__( self, parent=parent, id=id, title=title, style=style, **kwargs) self.notebook = GNotebook(parent=self, style=globalvar.FNPageDStyle) panel = self._createPages() wx.CallAfter(self.notebook.SetSelection, 0) # intermediate data? self.interData = wx.CheckBox(parent=self, label=_( "Delete intermediate data when finish")) self.interData.SetValue(True) rast, vect, rast3d, msg = self._model.GetIntermediateData() if not rast and not vect and not rast3d: self.interData.Hide() self.btnCancel = wx.Button(parent=self, id=wx.ID_CANCEL) self.btnRun = wx.Button(parent=self, id=wx.ID_OK, label=_("&Run")) self.btnRun.SetDefault() self._layout() size = self.GetBestSize() self.SetMinSize(size) self.SetSize((size.width, size.height + panel.constrained_size[1] - panel.panelMinHeight)) def _layout(self): btnSizer = wx.StdDialogButtonSizer() btnSizer.AddButton(self.btnCancel) btnSizer.AddButton(self.btnRun) btnSizer.Realize() mainSizer = wx.BoxSizer(wx.VERTICAL) mainSizer.Add(self.notebook, proportion=1, flag=wx.EXPAND) if self.interData.IsShown(): mainSizer.Add(self.interData, proportion=0, flag=wx.EXPAND | wx.ALL | wx.ALIGN_CENTER, border=5) mainSizer.Add(wx.StaticLine(parent=self, id=wx.ID_ANY, style=wx.LI_HORIZONTAL), proportion=0, flag=wx.EXPAND | wx.LEFT | wx.RIGHT, border=5) mainSizer.Add(btnSizer, proportion=0, flag=wx.EXPAND | wx.ALL | wx.ALIGN_CENTER, border=5) self.SetSizer(mainSizer) mainSizer.Fit(self) def _createPages(self): """Create for each parameterized module its own page""" nameOrdered = [''] * len(self.params.keys()) for name, params in self.params.iteritems(): nameOrdered[params['idx']] = name for name in nameOrdered: params = self.params[name] panel = self._createPage(name, params) if name == 'variables': name = _('Variables') self.notebook.AddPage(page=panel, text=name) return panel def _createPage(self, name, params): """Define notebook page""" if name in globalvar.grassCmd: task = gtask.grassTask(name) else: task = gtask.grassTask() task.flags = params['flags'] task.params = params['params'] panel = CmdPanel(parent=self, id=wx.ID_ANY, task=task, giface=GraphicalModelerGrassInterface(self._model)) self.tasks.append(task) return panel def GetErrors(self): """Check for errors, get list of messages""" errList = list() for task in self.tasks: errList += task.get_cmd_error() return errList def DeleteIntermediateData(self): """Check if to detele intermediate data""" if self.interData.IsShown() and self.interData.IsChecked(): return True return False