module.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601
  1. # -*- coding: utf-8 -*-
  2. from __future__ import (nested_scopes, generators, division, absolute_import,
  3. with_statement, print_function, unicode_literals)
  4. import sys
  5. from multiprocessing import cpu_count
  6. if sys.version_info[0] == 2:
  7. from itertools import izip_longest as zip_longest
  8. else:
  9. from itertools import zip_longest
  10. from xml.etree.ElementTree import fromstring
  11. import time
  12. from grass.exceptions import CalledModuleError
  13. from grass.script.core import Popen, PIPE
  14. from grass.pygrass.errors import GrassError, ParameterError
  15. from grass.pygrass.functions import docstring_property
  16. from grass.pygrass.modules.interface.parameter import Parameter
  17. from grass.pygrass.modules.interface.flag import Flag
  18. from grass.pygrass.modules.interface.typedict import TypeDict
  19. from grass.pygrass.modules.interface.read import GETFROMTAG, DOC
  20. from grass.pygrass.messages import Messenger
  21. class ParallelModuleQueue(object):
  22. """This class is designed to run an arbitrary number of pygrass Module
  23. processes in parallel.
  24. Objects of type grass.pygrass.modules.Module can be put into the
  25. queue using put() method. When the queue is full with the maximum
  26. number of parallel processes it will wait for all processes to finish,
  27. sets the stdout and stderr of the Module object and removes it
  28. from the queue when its finished.
  29. This class will raise a GrassError in case a Module process exits
  30. with a return code other than 0.
  31. Usage:
  32. >>> import copy
  33. >>> from grass.pygrass.modules import Module, ParallelModuleQueue
  34. >>> mapcalc_list = []
  35. >>> mapcalc = Module("r.mapcalc", overwrite=True, run_=False)
  36. >>> queue = ParallelModuleQueue(nprocs=3)
  37. >>> for i in xrange(5):
  38. ... new_mapcalc = copy.deepcopy(mapcalc)
  39. ... mapcalc_list.append(new_mapcalc)
  40. ... new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
  41. ... queue.put(new_mapcalc)
  42. Module('r.mapcalc')
  43. Module('r.mapcalc')
  44. Module('r.mapcalc')
  45. Module('r.mapcalc')
  46. Module('r.mapcalc')
  47. >>> queue.wait()
  48. >>> for mapcalc in mapcalc_list:
  49. ... print(mapcalc.popen.returncode)
  50. 0
  51. 0
  52. 0
  53. 0
  54. 0
  55. """
  56. def __init__(self, nprocs=1):
  57. """Constructor
  58. :param nprocs: The maximum number of Module processes that
  59. can be run in parallel, defualt is 1, if None
  60. then use all the available CPUs.
  61. :type nprocs: int
  62. """
  63. nprocs = int(nprocs) if nprocs else cpu_count()
  64. self._num_procs = nprocs
  65. self._list = nprocs * [None]
  66. self._proc_count = 0
  67. def put(self, module):
  68. """Put the next Module object in the queue
  69. To run the Module objects in parallel the run\_ and finish\_ options
  70. of the Module must be set to False.
  71. :param module: a preconfigured Module object with run\_ and finish\_
  72. set to False
  73. :type module: Module object
  74. """
  75. self._list[self._proc_count] = module
  76. # Force that finish is False, otherwise the execution
  77. # will not be parallel
  78. self._list[self._proc_count].finish_ = False
  79. self._list[self._proc_count].run()
  80. self._proc_count += 1
  81. if self._proc_count == self._num_procs:
  82. self.wait()
  83. def get(self, num):
  84. """Get a Module object from the queue
  85. :param num: the number of the object in queue
  86. :type num: int
  87. :returns: the Module object or None if num is not in the queue
  88. """
  89. if num < self._num_procs:
  90. return self._list[num]
  91. return None
  92. def get_num_run_procs(self):
  93. """Get the number of Module processes that are in the queue running
  94. or finished
  95. :returns: the maximum number fo Module processes running/finished in
  96. the queue
  97. """
  98. return len(self._list)
  99. def get_max_num_procs(self):
  100. """Return the maximum number of parallel Module processes
  101. """
  102. return self._num_procs
  103. def set_max_num_procs(self, nprocs):
  104. """Set the maximum number of Module processes that should run
  105. in parallel
  106. :param nprocs: The maximum number of Module processes that can be
  107. run in parallel
  108. :type nprocs: int
  109. """
  110. self._num_procs = int(nprocs)
  111. self.wait()
  112. def wait(self):
  113. """Wait for all Module processes that are in the list to finish
  114. and set the modules stdout and stderr output options
  115. """
  116. for proc in self._list:
  117. if proc:
  118. stdout, stderr = proc.popen.communicate(input=proc.stdin)
  119. proc.outputs['stdout'].value = stdout if stdout else ''
  120. proc.outputs['stderr'].value = stderr if stderr else ''
  121. if proc.popen.returncode != 0:
  122. GrassError(("Error running module %s") % (proc.name))
  123. self._list = self._num_procs * [None]
  124. self._proc_count = 0
  125. class Module(object):
  126. """This class is design to wrap/run/interact with the GRASS modules.
  127. The class during the init phase read the XML description generate using
  128. the ``--interface-description`` in order to understand which parameters
  129. are required which optionals. ::
  130. >>> from grass.pygrass.modules import Module
  131. >>> from subprocess import PIPE
  132. >>> import copy
  133. >>> region = Module("g.region")
  134. >>> region.flags.p = True # set flags
  135. >>> region.flags.u = True
  136. >>> region.flags["3"].value = True # set numeric flags
  137. >>> region.get_bash()
  138. u'g.region -p -3 -u'
  139. >>> new_region = copy.deepcopy(region)
  140. >>> new_region.inputs.res = "10"
  141. >>> new_region.get_bash()
  142. u'g.region res=10 -p -3 -u'
  143. >>> neighbors = Module("r.neighbors")
  144. >>> neighbors.inputs.input = "mapA"
  145. >>> neighbors.outputs.output = "mapB"
  146. >>> neighbors.inputs.size = 5
  147. >>> neighbors.inputs.quantile = 0.5
  148. >>> neighbors.get_bash()
  149. u'r.neighbors input=mapA method=average size=5 quantile=0.5 output=mapB'
  150. >>> new_neighbors1 = copy.deepcopy(neighbors)
  151. >>> new_neighbors1.inputs.input = "mapD"
  152. >>> new_neighbors1.inputs.size = 3
  153. >>> new_neighbors1.inputs.quantile = 0.5
  154. >>> new_neighbors1.get_bash()
  155. u'r.neighbors input=mapD method=average size=3 quantile=0.5 output=mapB'
  156. >>> new_neighbors2 = copy.deepcopy(neighbors)
  157. >>> new_neighbors2(input="mapD", size=3, run_=False)
  158. Module('r.neighbors')
  159. >>> new_neighbors2.get_bash()
  160. u'r.neighbors input=mapD method=average size=3 quantile=0.5 output=mapB'
  161. >>> neighbors = Module("r.neighbors")
  162. >>> neighbors.get_bash()
  163. u'r.neighbors method=average size=3'
  164. >>> new_neighbors3 = copy.deepcopy(neighbors)
  165. >>> new_neighbors3(input="mapA", size=3, output="mapB", run_=False)
  166. Module('r.neighbors')
  167. >>> new_neighbors3.get_bash()
  168. u'r.neighbors input=mapA method=average size=3 output=mapB'
  169. >>> mapcalc = Module("r.mapcalc", expression="test_a = 1",
  170. ... overwrite=True, run_=False)
  171. >>> mapcalc.run()
  172. Module('r.mapcalc')
  173. >>> mapcalc.popen.returncode
  174. 0
  175. >>> colors = Module("r.colors", map="test_a", rules="-",
  176. ... run_=False, stdout_=PIPE,
  177. ... stderr_=PIPE, stdin_="1 red")
  178. >>> colors.run()
  179. Module('r.colors')
  180. >>> colors.popen.returncode
  181. 0
  182. >>> colors.inputs["stdin"].value
  183. u'1 red'
  184. >>> colors.outputs["stdout"].value
  185. u''
  186. >>> colors.outputs["stderr"].value.strip()
  187. "Color table for raster map <test_a> set to 'rules'"
  188. >>> colors = Module("r.colors", map="test_a", rules="-",
  189. ... run_=False, finish_=False, stdin_=PIPE)
  190. >>> colors.run()
  191. Module('r.colors')
  192. >>> stdout, stderr = colors.popen.communicate(input="1 red")
  193. >>> colors.popen.returncode
  194. 0
  195. >>> stdout
  196. >>> stderr
  197. >>> colors = Module("r.colors", map="test_a", rules="-",
  198. ... run_=False, finish_=False,
  199. ... stdin_=PIPE, stderr_=PIPE)
  200. >>> colors.run()
  201. Module('r.colors')
  202. >>> stdout, stderr = colors.popen.communicate(input="1 red")
  203. >>> colors.popen.returncode
  204. 0
  205. >>> stdout
  206. >>> stderr.strip()
  207. "Color table for raster map <test_a> set to 'rules'"
  208. Run a second time
  209. >>> colors.run()
  210. Module('r.colors')
  211. >>> stdout, stderr = colors.popen.communicate(input="1 blue")
  212. >>> colors.popen.returncode
  213. 0
  214. >>> stdout
  215. >>> stderr.strip()
  216. "Color table for raster map <test_a> set to 'rules'"
  217. Multiple run test
  218. >>> colors = Module("r.colors", map="test_a",
  219. ... color="ryb", run_=False)
  220. >>> colors.run()
  221. Module('r.colors')
  222. >>> colors(color="gyr")
  223. Module('r.colors')
  224. >>> colors.run()
  225. Module('r.colors')
  226. >>> colors(color="ryg")
  227. Module('r.colors')
  228. >>> colors(stderr_=PIPE)
  229. Module('r.colors')
  230. >>> colors.run()
  231. Module('r.colors')
  232. >>> print(colors.outputs["stderr"].value.strip())
  233. Color table for raster map <test_a> set to 'ryg'
  234. >>> colors(color="byg")
  235. Module('r.colors')
  236. >>> colors(stdout_=PIPE)
  237. Module('r.colors')
  238. >>> colors.run()
  239. Module('r.colors')
  240. >>> print(colors.outputs["stderr"].value.strip())
  241. Color table for raster map <test_a> set to 'byg'
  242. Often in the Module class you can find ``*args`` and ``kwargs`` annotation
  243. in methods, like in the __call__ method.
  244. Python allow developers to not specify all the arguments and
  245. keyword arguments of a method or function. ::
  246. def f(*args):
  247. for arg in args:
  248. print arg
  249. therefore if we call the function like:
  250. >>> f('grass', 'gis', 'modules') # doctest: +SKIP
  251. grass
  252. gis
  253. modules
  254. or we can define a new list:
  255. >>> words = ['grass', 'gis', 'modules'] # doctest: +SKIP
  256. >>> f(*words) # doctest: +SKIP
  257. grass
  258. gis
  259. modules
  260. we can do the same with keyword arguments, rewrite the above function: ::
  261. def f(*args, **kargs):
  262. for arg in args:
  263. print arg
  264. for key, value in kargs.items():
  265. print "%s = %r" % (key, value)
  266. now we can use the new function, with:
  267. >>> f('grass', 'gis', 'modules', os = 'linux', language = 'python')
  268. ... # doctest: +SKIP
  269. grass
  270. gis
  271. modules
  272. os = 'linux'
  273. language = 'python'
  274. or, as before we can, define a dictionary and give the dictionary to
  275. the function, like:
  276. >>> keywords = {'os' : 'linux', 'language' : 'python'} # doctest: +SKIP
  277. >>> f(*words, **keywords) # doctest: +SKIP
  278. grass
  279. gis
  280. modules
  281. os = 'linux'
  282. language = 'python'
  283. In the Module class we heavily use this language feature to pass arguments
  284. and keyword arguments to the grass module.
  285. """
  286. def __init__(self, cmd, *args, **kargs):
  287. self._msgr = Messenger()
  288. if isinstance(cmd, unicode):
  289. self.name = str(cmd)
  290. elif isinstance(cmd, str):
  291. self.name = cmd
  292. else:
  293. raise GrassError("Problem initializing the module {s}".format(s=cmd))
  294. try:
  295. # call the command with --interface-description
  296. get_cmd_xml = Popen([cmd, "--interface-description"], stdout=PIPE)
  297. except OSError as e:
  298. print("OSError error({0}): {1}".format(e.errno, e.strerror))
  299. str_err = "Error running: `%s --interface-description`."
  300. raise GrassError(str_err % self.name)
  301. # get the xml of the module
  302. self.xml = get_cmd_xml.communicate()[0]
  303. # transform and parse the xml into an Element class:
  304. # http://docs.python.org/library/xml.etree.elementtree.html
  305. tree = fromstring(self.xml)
  306. for e in tree:
  307. if e.tag not in ('parameter', 'flag'):
  308. self.__setattr__(e.tag, GETFROMTAG[e.tag](e))
  309. #
  310. # extract parameters from the xml
  311. #
  312. self.params_list = [Parameter(p) for p in tree.findall("parameter")]
  313. self.inputs = TypeDict(Parameter)
  314. self.outputs = TypeDict(Parameter)
  315. self.required = []
  316. # Insert parameters into input/output and required
  317. for par in self.params_list:
  318. if par.input:
  319. self.inputs[par.name] = par
  320. else:
  321. self.outputs[par.name] = par
  322. if par.required:
  323. self.required.append(par.name)
  324. #
  325. # extract flags from the xml
  326. #
  327. flags_list = [Flag(f) for f in tree.findall("flag")]
  328. self.flags = TypeDict(Flag)
  329. for flag in flags_list:
  330. self.flags[flag.name] = flag
  331. #
  332. # Add new attributes to the class
  333. #
  334. self.run_ = True
  335. self.finish_ = True
  336. self.env_ = None
  337. self.stdin_ = None
  338. self.stdin = None
  339. self.stdout_ = None
  340. self.stderr_ = None
  341. diz = {'name': 'stdin', 'required': False,
  342. 'multiple': False, 'type': 'all',
  343. 'value': None}
  344. self.inputs['stdin'] = Parameter(diz=diz)
  345. diz['name'] = 'stdout'
  346. self.outputs['stdout'] = Parameter(diz=diz)
  347. diz['name'] = 'stderr'
  348. self.outputs['stderr'] = Parameter(diz=diz)
  349. self.popen = None
  350. self.time = None
  351. if args or kargs:
  352. self.__call__(*args, **kargs)
  353. self.__call__.__func__.__doc__ = self.__doc__
  354. def __call__(self, *args, **kargs):
  355. """Set module paramters to the class and, if run_ is True execute the
  356. module, therefore valid parameters are all the module parameters
  357. plus some extra parameters that are: run_, stdin_, stdout_, stderr_,
  358. env_ and finish_.
  359. """
  360. if not args and not kargs:
  361. self.run()
  362. return self
  363. #
  364. # check for extra kargs, set attribute and remove from dictionary
  365. #
  366. if 'flags' in kargs:
  367. for flg in kargs['flags']:
  368. self.flags[flg].value = True
  369. del(kargs['flags'])
  370. # set attributs
  371. for key in ('run_', 'env_', 'finish_', 'stdout_', 'stderr_'):
  372. if key in kargs:
  373. setattr(self, key, kargs.pop(key))
  374. # set inputs
  375. for key in ('stdin_', ):
  376. if key in kargs:
  377. self.inputs[key[:-1]].value = kargs.pop(key)
  378. #
  379. # check args
  380. #
  381. for param, arg in zip(self.params_list, args):
  382. param.value = arg
  383. for key, val in kargs.items():
  384. if key in self.inputs:
  385. self.inputs[key].value = val
  386. elif key in self.outputs:
  387. self.outputs[key].value = val
  388. elif key in self.flags:
  389. # we need to add this, because some parameters (overwrite,
  390. # verbose and quiet) work like parameters
  391. self.flags[key].value = val
  392. else:
  393. raise ParameterError('%s is not a valid parameter.' % key)
  394. #
  395. # print debug message
  396. #
  397. self._msgr.debug(0, "Module.__call__(): %s" % (self.get_bash()))
  398. #
  399. # check if execute
  400. #
  401. if self.run_:
  402. #
  403. # check reqire parameters
  404. #
  405. for k in self.required:
  406. if ((k in self.inputs and self.inputs[k].value is None) or
  407. (k in self.outputs and self.outputs[k].value is None)):
  408. msg = "Required parameter <%s> not set."
  409. raise ParameterError(msg % k)
  410. return self.run()
  411. return self
  412. def get_bash(self):
  413. """Return a BASH rapresentation of the Module."""
  414. return ' '.join(self.make_cmd())
  415. def get_python(self):
  416. """Return a Python rapresentation of the Module."""
  417. prefix = self.name.split('.')[0]
  418. name = '_'.join(self.name.split('.')[1:])
  419. params = ', '.join([par.get_python() for par in self.params_list
  420. if par.get_python() != ''])
  421. flags = ''.join([flg.get_python()
  422. for flg in self.flags.values()
  423. if not flg.special and flg.get_python() != ''])
  424. special = ', '.join([flg.get_python()
  425. for flg in self.flags.values()
  426. if flg.special and flg.get_python() != ''])
  427. # pre name par flg special
  428. if flags and special:
  429. return "%s.%s(%s, flags=%r, %s)" % (prefix, name, params,
  430. flags, special)
  431. elif flags:
  432. return "%s.%s(%s, flags=%r)" % (prefix, name, params, flags)
  433. elif special:
  434. return "%s.%s(%s, %s)" % (prefix, name, params, special)
  435. else:
  436. return "%s.%s(%s)" % (prefix, name, params)
  437. def __str__(self):
  438. """Return the command string that can be executed in a shell"""
  439. return ' '.join(self.make_cmd())
  440. def __repr__(self):
  441. return "Module(%r)" % self.name
  442. @docstring_property(__doc__)
  443. def __doc__(self):
  444. """{cmd_name}({cmd_params})
  445. """
  446. head = DOC['head'].format(cmd_name=self.name,
  447. cmd_params=('\n' + # go to a new line
  448. # give space under the function name
  449. (' ' * (len(self.name) + 1))).join([', '.join(
  450. # transform each parameter in string
  451. [str(param) for param in line if param is not None])
  452. # make a list of parameters with only 3 param per line
  453. for line in zip_longest(*[iter(self.params_list)] * 3)]),)
  454. params = '\n'.join([par.__doc__ for par in self.params_list])
  455. flags = self.flags.__doc__
  456. return '\n'.join([head, params, DOC['flag_head'], flags, DOC['foot']])
  457. def get_dict(self):
  458. """Return a dictionary that includes the name, all valid
  459. inputs, outputs and flags
  460. """
  461. dic = {}
  462. dic['name'] = self.name
  463. dic['inputs'] = [(k, v.value) for k, v in self.inputs.items()
  464. if v.value]
  465. dic['outputs'] = [(k, v.value) for k, v in self.outputs.items()
  466. if v.value]
  467. dic['flags'] = [flg for flg in self.flags if self.flags[flg].value]
  468. return dic
  469. def make_cmd(self):
  470. """Create the command string that can be executed in a shell
  471. :returns: the command string
  472. """
  473. skip = ['stdin', 'stdout', 'stderr']
  474. args = [self.name, ]
  475. for key in self.inputs:
  476. if key not in skip and self.inputs[key].value:
  477. args.append(self.inputs[key].get_bash())
  478. for key in self.outputs:
  479. if key not in skip and self.outputs[key].value:
  480. args.append(self.outputs[key].get_bash())
  481. for flg in self.flags:
  482. if self.flags[flg].value:
  483. args.append(str(self.flags[flg]))
  484. return args
  485. def run(self, node=None):
  486. """Run the module
  487. :param node:
  488. :type node:
  489. This function will wait for the process to terminate in case
  490. finish_==True and sets up stdout and stderr. If finish_==False this
  491. function will return after starting the process. Use
  492. self.popen.communicate() of self.popen.wait() to wait for the process
  493. termination. The handling of stdout and stderr must then be done
  494. outside of this function.
  495. """
  496. if self.inputs['stdin'].value:
  497. self.stdin = self.inputs['stdin'].value
  498. self.stdin_ = PIPE
  499. cmd = self.make_cmd()
  500. start = time.time()
  501. self.popen = Popen(cmd,
  502. stdin=self.stdin_,
  503. stdout=self.stdout_,
  504. stderr=self.stderr_,
  505. env=self.env_)
  506. if self.finish_:
  507. stdout, stderr = self.popen.communicate(input=self.stdin)
  508. self.outputs['stdout'].value = stdout if stdout else ''
  509. self.outputs['stderr'].value = stderr if stderr else ''
  510. self.time = time.time() - start
  511. if self.popen.poll():
  512. raise CalledModuleError(returncode=self.popen.returncode,
  513. code=self.get_bash(),
  514. module=self.name, errors=stderr)
  515. return self
  516. ###############################################################################
  517. if __name__ == "__main__":
  518. import doctest
  519. doctest.testmod()