module.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577
  1. # -*- coding: utf-8 -*-
  2. from __future__ import (nested_scopes, generators, division, absolute_import,
  3. with_statement, print_function, unicode_literals)
  4. import sys
  5. if sys.version_info[0] == 2:
  6. from itertools import izip_longest as zip_longest
  7. else:
  8. from itertools import zip_longest
  9. from xml.etree.ElementTree import fromstring
  10. import time
  11. from grass.exceptions import CalledModuleError
  12. from grass.script.core import Popen, PIPE
  13. from grass.pygrass.errors import GrassError, ParameterError
  14. from grass.pygrass.functions import docstring_property
  15. from grass.pygrass.modules.interface.parameter import Parameter
  16. from grass.pygrass.modules.interface.flag import Flag
  17. from grass.pygrass.modules.interface.typedict import TypeDict
  18. from grass.pygrass.modules.interface.read import GETFROMTAG, DOC
  19. class ParallelModuleQueue(object):
  20. """This class is designed to run an arbitrary number of pygrass Module
  21. processes in parallel.
  22. Objects of type grass.pygrass.modules.Module can be put into the
  23. queue using put() method. When the queue is full with the maximum
  24. number of parallel processes it will wait for all processes to finish,
  25. sets the stdout and stderr of the Module object and removes it
  26. from the queue when its finished.
  27. This class will raise a GrassError in case a Module process exits
  28. with a return code other than 0.
  29. Usage:
  30. >>> import copy
  31. >>> from grass.pygrass.modules import Module, ParallelModuleQueue
  32. >>> mapcalc_list = []
  33. >>> mapcalc = Module("r.mapcalc", overwrite=True, run_=False)
  34. >>> queue = ParallelModuleQueue(max_num_procs=3)
  35. >>> for i in xrange(5):
  36. ... new_mapcalc = copy.deepcopy(mapcalc)
  37. ... mapcalc_list.append(new_mapcalc)
  38. ... new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
  39. ... queue.put(new_mapcalc)
  40. >>> queue.wait()
  41. >>> for mapcalc in mapcalc_list:
  42. ... print(mapcalc.popen.returncode)
  43. 0
  44. 0
  45. 0
  46. 0
  47. 0
  48. """
  49. def __init__(self, max_num_procs=1):
  50. """Constructor
  51. :param max_num_procs: The maximum number of Module processes that
  52. can be run in parallel
  53. :type max_num_procs: int
  54. """
  55. self._num_procs = int(max_num_procs)
  56. self._list = int(max_num_procs) * [None]
  57. self._proc_count = 0
  58. def put(self, module):
  59. """Put the next Module object in the queue
  60. To run the Module objects in parallel the run_ and finish_ options
  61. of the Module must be set to False.
  62. :param module: a preconfigured Module object with run_ and finish_
  63. set to False
  64. :type module: Module object
  65. """
  66. self._list[self._proc_count] = module
  67. # Force that finish is False, otherwise the execution
  68. # will not be parallel
  69. self._list[self._proc_count].finish_ = False
  70. self._list[self._proc_count].run()
  71. self._proc_count += 1
  72. if self._proc_count == self._num_procs:
  73. self.wait()
  74. def get(self, num):
  75. """Get a Module object from the queue
  76. :param num: the number of the object in queue
  77. :type num: int
  78. :returns: the Module object or None if num is not in the queue
  79. """
  80. if num < self._num_procs:
  81. return self._list[num]
  82. return None
  83. def get_num_run_procs(self):
  84. """Get the number of Module processes that are in the queue running
  85. or finished
  86. :returns: the maximum number fo Module processes running/finished in
  87. the queue
  88. """
  89. return len(self._list)
  90. def get_max_num_procs(self):
  91. """Return the maximum number of parallel Module processes
  92. """
  93. return self._num_procs
  94. def set_max_num_procs(self, max_num_procs):
  95. """Set the maximum number of Module processes that should run
  96. in parallel
  97. :param max_num_procs: The maximum number of Module processes that
  98. can be run in parallel
  99. :type max_num_procs: int
  100. """
  101. self._num_procs = int(max_num_procs)
  102. self.wait()
  103. def wait(self):
  104. """Wait for all Module processes that are in the list to finish
  105. and set the modules stdout and stderr output options
  106. """
  107. for proc in self._list:
  108. if proc:
  109. stdout, stderr = proc.popen.communicate(input=proc.stdin)
  110. proc.outputs['stdout'].value = stdout if stdout else ''
  111. proc.outputs['stderr'].value = stderr if stderr else ''
  112. if proc.popen.returncode != 0:
  113. GrassError(("Error running module %s") % (proc.name))
  114. self._list = self._num_procs * [None]
  115. self._proc_count = 0
  116. class Module(object):
  117. """This class is design to wrap/run/interact with the GRASS modules.
  118. The class during the init phase read the XML description generate using
  119. the ``--interface-description`` in order to understand which parameters
  120. are required which optionals. ::
  121. >>> from grass.pygrass.modules import Module
  122. >>> from subprocess import PIPE
  123. >>> import copy
  124. >>> region = Module("g.region")
  125. >>> region.flags.p = True # set flags
  126. >>> region.flags.u = True
  127. >>> region.flags["3"].value = True # set numeric flags
  128. >>> region.get_bash()
  129. u'g.region -p -3 -u'
  130. >>> new_region = copy.deepcopy(region)
  131. >>> new_region.inputs.res = "10"
  132. >>> new_region.get_bash()
  133. u'g.region res=10 -p -3 -u'
  134. >>> neighbors = Module("r.neighbors")
  135. >>> neighbors.inputs.input = "mapA"
  136. >>> neighbors.outputs.output = "mapB"
  137. >>> neighbors.inputs.size = 5
  138. >>> neighbors.inputs.quantile = 0.5
  139. >>> neighbors.get_bash()
  140. u'r.neighbors input=mapA method=average size=5 quantile=0.5 output=mapB'
  141. >>> new_neighbors1 = copy.deepcopy(neighbors)
  142. >>> new_neighbors1.inputs.input = "mapD"
  143. >>> new_neighbors1.inputs.size = 3
  144. >>> new_neighbors1.inputs.quantile = 0.5
  145. >>> new_neighbors1.get_bash()
  146. u'r.neighbors input=mapD method=average size=3 quantile=0.5 output=mapB'
  147. >>> new_neighbors2 = copy.deepcopy(neighbors)
  148. >>> new_neighbors2(input="mapD", size=3, run_=False)
  149. >>> new_neighbors2.get_bash()
  150. u'r.neighbors input=mapD method=average size=3 quantile=0.5 output=mapB'
  151. >>> neighbors = Module("r.neighbors")
  152. >>> neighbors.get_bash()
  153. u'r.neighbors method=average size=3'
  154. >>> new_neighbors3 = copy.deepcopy(neighbors)
  155. >>> new_neighbors3(input="mapA", size=3, output="mapB", run_=False)
  156. >>> new_neighbors3.get_bash()
  157. u'r.neighbors input=mapA method=average size=3 output=mapB'
  158. >>> mapcalc = Module("r.mapcalc", expression="test_a = 1",
  159. ... overwrite=True, run_=False)
  160. >>> mapcalc.run()
  161. Module('r.mapcalc')
  162. >>> mapcalc.popen.returncode
  163. 0
  164. >>> colors = Module("r.colors", map="test_a", rules="-",
  165. ... run_=False, stdout_=PIPE,
  166. ... stderr_=PIPE, stdin_="1 red")
  167. >>> colors.run()
  168. Module('r.colors')
  169. >>> colors.popen.returncode
  170. 0
  171. >>> colors.inputs["stdin"].value
  172. u'1 red'
  173. >>> colors.outputs["stdout"].value
  174. u''
  175. >>> colors.outputs["stderr"].value.strip()
  176. "Color table for raster map <test_a> set to 'rules'"
  177. >>> colors = Module("r.colors", map="test_a", rules="-",
  178. ... run_=False, finish_=False, stdin_=PIPE)
  179. >>> colors.run()
  180. Module('r.colors')
  181. >>> stdout, stderr = colors.popen.communicate(input="1 red")
  182. >>> colors.popen.returncode
  183. 0
  184. >>> stdout
  185. >>> stderr
  186. >>> colors = Module("r.colors", map="test_a", rules="-",
  187. ... run_=False, finish_=False,
  188. ... stdin_=PIPE, stderr_=PIPE)
  189. >>> colors.run()
  190. Module('r.colors')
  191. >>> stdout, stderr = colors.popen.communicate(input="1 red")
  192. >>> colors.popen.returncode
  193. 0
  194. >>> stdout
  195. >>> stderr.strip()
  196. "Color table for raster map <test_a> set to 'rules'"
  197. Run a second time
  198. >>> colors.run()
  199. Module('r.colors')
  200. >>> stdout, stderr = colors.popen.communicate(input="1 blue")
  201. >>> colors.popen.returncode
  202. 0
  203. >>> stdout
  204. >>> stderr.strip()
  205. "Color table for raster map <test_a> set to 'rules'"
  206. Multiple run test
  207. >>> colors = Module("r.colors", map="test_a",
  208. ... color="ryb", run_=False)
  209. >>> colors.run()
  210. Module('r.colors')
  211. >>> colors(color="gyr")
  212. >>> colors.run()
  213. Module('r.colors')
  214. >>> colors(color="ryg")
  215. >>> colors(stderr_=PIPE)
  216. >>> colors.run()
  217. Module('r.colors')
  218. >>> print(colors.outputs["stderr"].value.strip())
  219. Color table for raster map <test_a> set to 'ryg'
  220. >>> colors(color="byg")
  221. >>> colors(stdout_=PIPE)
  222. >>> colors.run()
  223. Module('r.colors')
  224. >>> print(colors.outputs["stderr"].value.strip())
  225. Color table for raster map <test_a> set to 'byg'
  226. Often in the Module class you can find ``*args`` and ``kwargs`` annotation
  227. in methods, like in the __call__ method.
  228. Python allow developers to not specify all the arguments and
  229. keyword arguments of a method or function. ::
  230. def f(*args):
  231. for arg in args:
  232. print arg
  233. therefore if we call the function like: ::
  234. >>> f('grass', 'gis', 'modules') # doctest: +SKIP
  235. grass
  236. gis
  237. modules
  238. or we can define a new list: ::
  239. >>> words = ['grass', 'gis', 'modules'] # doctest: +SKIP
  240. >>> f(*words) # doctest: +SKIP
  241. grass
  242. gis
  243. modules
  244. we can do the same with keyword arguments, rewrite the above function: ::
  245. def f(*args, **kargs):
  246. for arg in args:
  247. print arg
  248. for key, value in kargs.items():
  249. print "%s = %r" % (key, value)
  250. now we can use the new function, with: ::
  251. >>> f('grass', 'gis', 'modules', os = 'linux', language = 'python')
  252. ... # doctest: +SKIP
  253. grass
  254. gis
  255. modules
  256. os = 'linux'
  257. language = 'python'
  258. or, as before we can, define a dictionary and give the dictionary to
  259. the function, like: ::
  260. >>> keywords = {'os' : 'linux', 'language' : 'python'}
  261. ... # doctest: +SKIP
  262. >>> f(*words, **keywords) # doctest: +SKIP
  263. grass
  264. gis
  265. modules
  266. os = 'linux'
  267. language = 'python'
  268. In the Module class we heavily use this language feature to pass arguments
  269. and keyword arguments to the grass module.
  270. """
  271. def __init__(self, cmd, *args, **kargs):
  272. if isinstance(cmd, unicode):
  273. self.name = str(cmd)
  274. elif isinstance(cmd, str):
  275. self.name = cmd
  276. else:
  277. raise GrassError("Problem initializing the module {s}".format(s=cmd))
  278. try:
  279. # call the command with --interface-description
  280. get_cmd_xml = Popen([cmd, "--interface-description"], stdout=PIPE)
  281. except OSError as e:
  282. print("OSError error({0}): {1}".format(e.errno, e.strerror))
  283. str_err = "Error running: `%s --interface-description`."
  284. raise GrassError(str_err % self.name)
  285. # get the xml of the module
  286. self.xml = get_cmd_xml.communicate()[0]
  287. # transform and parse the xml into an Element class:
  288. # http://docs.python.org/library/xml.etree.elementtree.html
  289. tree = fromstring(self.xml)
  290. for e in tree:
  291. if e.tag not in ('parameter', 'flag'):
  292. self.__setattr__(e.tag, GETFROMTAG[e.tag](e))
  293. #
  294. # extract parameters from the xml
  295. #
  296. self.params_list = [Parameter(p) for p in tree.findall("parameter")]
  297. self.inputs = TypeDict(Parameter)
  298. self.outputs = TypeDict(Parameter)
  299. self.required = []
  300. # Insert parameters into input/output and required
  301. for par in self.params_list:
  302. if par.input:
  303. self.inputs[par.name] = par
  304. else:
  305. self.outputs[par.name] = par
  306. if par.required:
  307. self.required.append(par.name)
  308. #
  309. # extract flags from the xml
  310. #
  311. flags_list = [Flag(f) for f in tree.findall("flag")]
  312. self.flags = TypeDict(Flag)
  313. for flag in flags_list:
  314. self.flags[flag.name] = flag
  315. #
  316. # Add new attributes to the class
  317. #
  318. self.run_ = True
  319. self.finish_ = True
  320. self.env_ = None
  321. self.stdin_ = None
  322. self.stdin = None
  323. self.stdout_ = None
  324. self.stderr_ = None
  325. diz = {'name': 'stdin', 'required': False,
  326. 'multiple': False, 'type': 'all',
  327. 'value': None}
  328. self.inputs['stdin'] = Parameter(diz=diz)
  329. diz['name'] = 'stdout'
  330. self.outputs['stdout'] = Parameter(diz=diz)
  331. diz['name'] = 'stderr'
  332. self.outputs['stderr'] = Parameter(diz=diz)
  333. self.popen = None
  334. self.time = None
  335. if args or kargs:
  336. self.__call__(*args, **kargs)
  337. self.__call__.__func__.__doc__ = self.__doc__
  338. def __call__(self, *args, **kargs):
  339. """Set module paramters to the class and, if run_ is True execute the
  340. module, therefore valid parameters are all the module parameters
  341. plus some extra parameters that are: run_, stdin_, stdout_, stderr_,
  342. env_ and finish_.
  343. """
  344. if not args and not kargs:
  345. self.run()
  346. return
  347. #
  348. # check for extra kargs, set attribute and remove from dictionary
  349. #
  350. if 'flags' in kargs:
  351. for flg in kargs['flags']:
  352. self.flags[flg].value = True
  353. del(kargs['flags'])
  354. # set attributs
  355. for key in ('run_', 'env_', 'finish_', 'stdout_', 'stderr_'):
  356. if key in kargs:
  357. setattr(self, key, kargs.pop(key))
  358. # set inputs
  359. for key in ('stdin_', ):
  360. if key in kargs:
  361. self.inputs[key[:-1]].value = kargs.pop(key)
  362. #
  363. # check args
  364. #
  365. for param, arg in zip(self.params_list, args):
  366. param.value = arg
  367. for key, val in kargs.items():
  368. if key in self.inputs:
  369. self.inputs[key].value = val
  370. elif key in self.outputs:
  371. self.outputs[key].value = val
  372. elif key in self.flags:
  373. # we need to add this, because some parameters (overwrite,
  374. # verbose and quiet) work like parameters
  375. self.flags[key].value = val
  376. else:
  377. raise ParameterError('%s is not a valid parameter.' % key)
  378. #
  379. # check if execute
  380. #
  381. if self.run_:
  382. #
  383. # check reqire parameters
  384. #
  385. for k in self.required:
  386. if ((k in self.inputs and self.inputs[k].value is None) or
  387. (k in self.outputs and self.outputs[k].value is None)):
  388. msg = "Required parameter <%s> not set."
  389. raise ParameterError(msg % k)
  390. return self.run()
  391. def get_bash(self):
  392. """Return a BASH rapresentation of the Module."""
  393. return ' '.join(self.make_cmd())
  394. def get_python(self):
  395. """Return a Python rapresentation of the Module."""
  396. prefix = self.name.split('.')[0]
  397. name = '_'.join(self.name.split('.')[1:])
  398. params = ', '.join([par.get_python() for par in self.params_list
  399. if par.get_python() != ''])
  400. flags = ''.join([flg.get_python()
  401. for flg in self.flags.values()
  402. if not flg.special and flg.get_python() != ''])
  403. special = ', '.join([flg.get_python()
  404. for flg in self.flags.values()
  405. if flg.special and flg.get_python() != ''])
  406. # pre name par flg special
  407. if flags and special:
  408. return "%s.%s(%s, flags=%r, %s)" % (prefix, name, params,
  409. flags, special)
  410. elif flags:
  411. return "%s.%s(%s, flags=%r)" % (prefix, name, params, flags)
  412. elif special:
  413. return "%s.%s(%s, %s)" % (prefix, name, params, special)
  414. else:
  415. return "%s.%s(%s)" % (prefix, name, params)
  416. def __str__(self):
  417. """Return the command string that can be executed in a shell"""
  418. return ' '.join(self.make_cmd())
  419. def __repr__(self):
  420. return "Module(%r)" % self.name
  421. @docstring_property(__doc__)
  422. def __doc__(self):
  423. """{cmd_name}({cmd_params})
  424. """
  425. head = DOC['head'].format(cmd_name=self.name,
  426. cmd_params=('\n' + # go to a new line
  427. # give space under the function name
  428. (' ' * (len(self.name) + 1))).join([', '.join(
  429. # transform each parameter in string
  430. [str(param) for param in line if param is not None])
  431. # make a list of parameters with only 3 param per line
  432. for line in zip_longest(*[iter(self.params_list)] * 3)]),)
  433. params = '\n'.join([par.__doc__ for par in self.params_list])
  434. flags = self.flags.__doc__
  435. return '\n'.join([head, params, DOC['flag_head'], flags, DOC['foot']])
  436. def get_dict(self):
  437. """Return a dictionary that includes the name, all valid
  438. inputs, outputs and flags
  439. """
  440. dic = {}
  441. dic['name'] = self.name
  442. dic['inputs'] = [(k, v.value) for k, v in self.inputs.items()
  443. if v.value]
  444. dic['outputs'] = [(k, v.value) for k, v in self.outputs.items()
  445. if v.value]
  446. dic['flags'] = [flg for flg in self.flags if self.flags[flg].value]
  447. return dic
  448. def make_cmd(self):
  449. """Create the command string that can be executed in a shell
  450. :returns: the command string
  451. """
  452. skip = ['stdin', 'stdout', 'stderr']
  453. args = [self.name, ]
  454. for key in self.inputs:
  455. if key not in skip and self.inputs[key].value:
  456. args.append(self.inputs[key].get_bash())
  457. for key in self.outputs:
  458. if key not in skip and self.outputs[key].value:
  459. args.append(self.outputs[key].get_bash())
  460. for flg in self.flags:
  461. if self.flags[flg].value:
  462. args.append(str(self.flags[flg]))
  463. return args
  464. def run(self, node=None):
  465. """Run the module
  466. :param node:
  467. :type node:
  468. This function will wait for the process to terminate in case
  469. finish_==True and sets up stdout and stderr. If finish_==False this
  470. function will return after starting the process. Use
  471. self.popen.communicate() of self.popen.wait() to wait for the process
  472. termination. The handling of stdout and stderr must then be done
  473. outside of this function.
  474. """
  475. if self.inputs['stdin'].value:
  476. self.stdin = self.inputs['stdin'].value
  477. self.stdin_ = PIPE
  478. cmd = self.make_cmd()
  479. start = time.time()
  480. self.popen = Popen(cmd,
  481. stdin=self.stdin_,
  482. stdout=self.stdout_,
  483. stderr=self.stderr_,
  484. env=self.env_)
  485. if self.finish_:
  486. stdout, stderr = self.popen.communicate(input=self.stdin)
  487. self.outputs['stdout'].value = stdout if stdout else ''
  488. self.outputs['stderr'].value = stderr if stderr else ''
  489. self.time = time.time() - start
  490. if self.popen.poll():
  491. raise CalledModuleError(returncode=self.popen.returncode,
  492. code=self.get_bash(),
  493. module=self.name, errors=stderr)
  494. return self
  495. ###############################################################################
  496. if __name__ == "__main__":
  497. import doctest
  498. doctest.testmod()