module.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882
  1. # -*- coding: utf-8 -*-
  2. from __future__ import (nested_scopes, generators, division, absolute_import,
  3. with_statement, print_function, unicode_literals)
  4. import sys
  5. from multiprocessing import cpu_count
  6. import threading
  7. import time
  8. from xml.etree.ElementTree import fromstring
  9. from grass.exceptions import CalledModuleError, GrassError, ParameterError
  10. from grass.script.core import Popen, PIPE
  11. from .docstring import docstring_property
  12. from .parameter import Parameter
  13. from .flag import Flag
  14. from .typedict import TypeDict
  15. from .read import GETFROMTAG, DOC
  16. from .env import G_debug
  17. if sys.version_info[0] == 2:
  18. from itertools import izip_longest as zip_longest
  19. else:
  20. from itertools import zip_longest
  21. unicode = str
  22. def _get_bash(self, *args, **kargs):
  23. return self.get_bash()
  24. class ParallelModuleQueue(object):
  25. """This class is designed to run an arbitrary number of pygrass Module
  26. processes in parallel.
  27. Objects of type grass.pygrass.modules.Module can be put into the
  28. queue using put() method. When the queue is full with the maximum
  29. number of parallel processes it will wait for all processes to finish,
  30. sets the stdout and stderr of the Module object and removes it
  31. from the queue when its finished.
  32. To finish the queue before the maximum number of parallel
  33. processes was reached call wait() .
  34. This class will raise a GrassError in case a Module process exits
  35. with a return code other than 0.
  36. Usage:
  37. Check with a queue size of 3 and 5 processes
  38. >>> import copy
  39. >>> from grass.pygrass.modules import Module, ParallelModuleQueue
  40. >>> mapcalc_list = []
  41. Setting run_ to False is important, otherwise a parallel processing is not possible
  42. >>> mapcalc = Module("r.mapcalc", overwrite=True, run_=False)
  43. >>> queue = ParallelModuleQueue(nprocs=3)
  44. >>> for i in xrange(5):
  45. ... new_mapcalc = copy.deepcopy(mapcalc)
  46. ... mapcalc_list.append(new_mapcalc)
  47. ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
  48. ... queue.put(m)
  49. >>> queue.wait()
  50. >>> queue.get_num_run_procs()
  51. 0
  52. >>> queue.get_max_num_procs()
  53. 3
  54. >>> for mapcalc in mapcalc_list:
  55. ... print(mapcalc.popen.returncode)
  56. 0
  57. 0
  58. 0
  59. 0
  60. 0
  61. Check with a queue size of 8 and 5 processes
  62. >>> queue = ParallelModuleQueue(nprocs=8)
  63. >>> mapcalc_list = []
  64. >>> for i in xrange(5):
  65. ... new_mapcalc = copy.deepcopy(mapcalc)
  66. ... mapcalc_list.append(new_mapcalc)
  67. ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
  68. ... queue.put(m)
  69. >>> queue.wait()
  70. >>> queue.get_num_run_procs()
  71. 0
  72. >>> queue.get_max_num_procs()
  73. 8
  74. >>> for mapcalc in mapcalc_list:
  75. ... print(mapcalc.popen.returncode)
  76. 0
  77. 0
  78. 0
  79. 0
  80. 0
  81. Check MultiModule approach with three by two processes
  82. >>> gregion = Module("g.region", flags="p", run_=False)
  83. >>> queue = ParallelModuleQueue(nprocs=3)
  84. >>> proc_list = []
  85. >>> for i in xrange(3):
  86. ... new_gregion = copy.deepcopy(gregion)
  87. ... proc_list.append(new_gregion)
  88. ... new_mapcalc = copy.deepcopy(mapcalc)
  89. ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
  90. ... proc_list.append(new_mapcalc)
  91. ... mm = MultiModule(module_list=[new_gregion, new_mapcalc], finish=False)
  92. ... queue.put(mm)
  93. >>> queue.wait()
  94. >>> queue.get_num_run_procs()
  95. 0
  96. >>> queue.get_max_num_procs()
  97. 3
  98. >>> for proc in proc_list:
  99. ... print(proc.popen.returncode)
  100. 0
  101. 0
  102. 0
  103. 0
  104. 0
  105. 0
  106. Check with a queue size of 8 and 4 processes
  107. >>> queue = ParallelModuleQueue(nprocs=8)
  108. >>> mapcalc_list = []
  109. >>> new_mapcalc = copy.deepcopy(mapcalc)
  110. >>> mapcalc_list.append(new_mapcalc)
  111. >>> m = new_mapcalc(expression="test_pygrass_1 =1")
  112. >>> queue.put(m)
  113. >>> queue.get_num_run_procs()
  114. 1
  115. >>> new_mapcalc = copy.deepcopy(mapcalc)
  116. >>> mapcalc_list.append(new_mapcalc)
  117. >>> m = new_mapcalc(expression="test_pygrass_2 =2")
  118. >>> queue.put(m)
  119. >>> queue.get_num_run_procs()
  120. 2
  121. >>> new_mapcalc = copy.deepcopy(mapcalc)
  122. >>> mapcalc_list.append(new_mapcalc)
  123. >>> m = new_mapcalc(expression="test_pygrass_3 =3")
  124. >>> queue.put(m)
  125. >>> queue.get_num_run_procs()
  126. 3
  127. >>> new_mapcalc = copy.deepcopy(mapcalc)
  128. >>> mapcalc_list.append(new_mapcalc)
  129. >>> m = new_mapcalc(expression="test_pygrass_4 =4")
  130. >>> queue.put(m)
  131. >>> queue.get_num_run_procs()
  132. 4
  133. >>> queue.wait()
  134. >>> queue.get_num_run_procs()
  135. 0
  136. >>> queue.get_max_num_procs()
  137. 8
  138. >>> for mapcalc in mapcalc_list:
  139. ... print(mapcalc.popen.returncode)
  140. 0
  141. 0
  142. 0
  143. 0
  144. Check with a queue size of 3 and 4 processes
  145. >>> queue = ParallelModuleQueue(nprocs=3)
  146. >>> mapcalc_list = []
  147. >>> new_mapcalc = copy.deepcopy(mapcalc)
  148. >>> mapcalc_list.append(new_mapcalc)
  149. >>> m = new_mapcalc(expression="test_pygrass_1 =1")
  150. >>> queue.put(m)
  151. >>> queue.get_num_run_procs()
  152. 1
  153. >>> new_mapcalc = copy.deepcopy(mapcalc)
  154. >>> mapcalc_list.append(new_mapcalc)
  155. >>> m = new_mapcalc(expression="test_pygrass_2 =2")
  156. >>> queue.put(m)
  157. >>> queue.get_num_run_procs()
  158. 2
  159. >>> new_mapcalc = copy.deepcopy(mapcalc)
  160. >>> mapcalc_list.append(new_mapcalc)
  161. >>> m = new_mapcalc(expression="test_pygrass_3 =3")
  162. >>> queue.put(m) # Now it will wait until all procs finish and set the counter back to 0
  163. >>> queue.get_num_run_procs()
  164. 0
  165. >>> new_mapcalc = copy.deepcopy(mapcalc)
  166. >>> mapcalc_list.append(new_mapcalc)
  167. >>> m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
  168. >>> queue.put(m)
  169. >>> queue.get_num_run_procs()
  170. 1
  171. >>> queue.wait()
  172. >>> queue.get_num_run_procs()
  173. 0
  174. >>> queue.get_max_num_procs()
  175. 3
  176. >>> for mapcalc in mapcalc_list:
  177. ... print(mapcalc.popen.returncode)
  178. 0
  179. 0
  180. 0
  181. 0
  182. """
  183. def __init__(self, nprocs=1):
  184. """Constructor
  185. :param nprocs: The maximum number of Module processes that
  186. can be run in parallel, defualt is 1, if None
  187. then use all the available CPUs.
  188. :type nprocs: int
  189. """
  190. nprocs = int(nprocs) if nprocs else cpu_count()
  191. self._num_procs = nprocs
  192. self._list = nprocs * [None]
  193. self._proc_count = 0
  194. def put(self, module):
  195. """Put the next Module object in the queue
  196. To run the Module objects in parallel the run\_ and finish\_ options
  197. of the Module must be set to False.
  198. :param module: a preconfigured Module object or a list of Module objects
  199. with run\_ and finish\_ set to False,
  200. :type module: Module object or list of Module objects
  201. """
  202. self._list[self._proc_count] = module
  203. # Force that finish is False, otherwise the execution
  204. # will not be parallel
  205. self._list[self._proc_count].finish_ = False
  206. self._list[self._proc_count].run()
  207. self._proc_count += 1
  208. if self._proc_count == self._num_procs:
  209. self.wait()
  210. def get(self, num):
  211. """Get a Module object or list of Module objects from the queue
  212. :param num: the number of the object in queue
  213. :type num: int
  214. :returns: the Module object or list of Module objects or None if num is not in the queue
  215. """
  216. if num < self._num_procs:
  217. return self._list[num]
  218. return None
  219. def get_num_run_procs(self):
  220. """Get the number of Module processes that are in the queue running
  221. or finished
  222. :returns: the number fo Module processes running/finished in the queue
  223. """
  224. return self._proc_count
  225. def get_max_num_procs(self):
  226. """Return the maximum number of parallel Module processes
  227. :returns: the maximum number of parallel Module processes
  228. """
  229. return self._num_procs
  230. def set_max_num_procs(self, nprocs):
  231. """Set the maximum number of Module processes that should run
  232. in parallel
  233. :param nprocs: The maximum number of Module processes that can be
  234. run in parallel
  235. :type nprocs: int
  236. """
  237. self._num_procs = int(nprocs)
  238. self.wait()
  239. def wait(self):
  240. """Wait for all Module processes that are in the list to finish
  241. and set the modules stdout and stderr output options
  242. """
  243. for proc in self._list:
  244. if proc:
  245. proc.wait()
  246. self._list = self._num_procs * [None]
  247. self._proc_count = 0
  248. class Module(object):
  249. """This class is design to wrap/run/interact with the GRASS modules.
  250. The class during the init phase read the XML description generate using
  251. the ``--interface-description`` in order to understand which parameters
  252. are required which optionals. ::
  253. >>> from grass.pygrass.modules import Module
  254. >>> from subprocess import PIPE
  255. >>> import copy
  256. >>> region = Module("g.region")
  257. >>> region.flags.p = True # set flags
  258. >>> region.flags.u = True
  259. >>> region.flags["3"].value = True # set numeric flags
  260. >>> region.get_bash()
  261. u'g.region -p -3 -u'
  262. >>> new_region = copy.deepcopy(region)
  263. >>> new_region.inputs.res = "10"
  264. >>> new_region.get_bash()
  265. u'g.region res=10 -p -3 -u'
  266. >>> neighbors = Module("r.neighbors")
  267. >>> neighbors.inputs.input = "mapA"
  268. >>> neighbors.outputs.output = "mapB"
  269. >>> neighbors.inputs.size = 5
  270. >>> neighbors.inputs.quantile = 0.5
  271. >>> neighbors.get_bash()
  272. u'r.neighbors input=mapA method=average size=5 quantile=0.5 output=mapB'
  273. >>> new_neighbors1 = copy.deepcopy(neighbors)
  274. >>> new_neighbors1.inputs.input = "mapD"
  275. >>> new_neighbors1.inputs.size = 3
  276. >>> new_neighbors1.inputs.quantile = 0.5
  277. >>> new_neighbors1.get_bash()
  278. u'r.neighbors input=mapD method=average size=3 quantile=0.5 output=mapB'
  279. >>> new_neighbors2 = copy.deepcopy(neighbors)
  280. >>> new_neighbors2(input="mapD", size=3, run_=False)
  281. Module('r.neighbors')
  282. >>> new_neighbors2.get_bash()
  283. u'r.neighbors input=mapD method=average size=3 quantile=0.5 output=mapB'
  284. >>> neighbors = Module("r.neighbors")
  285. >>> neighbors.get_bash()
  286. u'r.neighbors method=average size=3'
  287. >>> new_neighbors3 = copy.deepcopy(neighbors)
  288. >>> new_neighbors3(input="mapA", size=3, output="mapB", run_=False)
  289. Module('r.neighbors')
  290. >>> new_neighbors3.get_bash()
  291. u'r.neighbors input=mapA method=average size=3 output=mapB'
  292. >>> mapcalc = Module("r.mapcalc", expression="test_a = 1",
  293. ... overwrite=True, run_=False)
  294. >>> mapcalc.run()
  295. Module('r.mapcalc')
  296. >>> mapcalc.popen.returncode
  297. 0
  298. >>> mapcalc = Module("r.mapcalc", expression="test_a = 1",
  299. ... overwrite=True, run_=False, finish_=False)
  300. >>> mapcalc.run()
  301. Module('r.mapcalc')
  302. >>> mapcalc.wait()
  303. >>> mapcalc.popen.returncode
  304. 0
  305. >>> mapcalc.run()
  306. Module('r.mapcalc')
  307. >>> mapcalc.wait()
  308. >>> mapcalc.popen.returncode
  309. 0
  310. >>> colors = Module("r.colors", map="test_a", rules="-",
  311. ... run_=False, stdout_=PIPE,
  312. ... stderr_=PIPE, stdin_="1 red")
  313. >>> colors.run()
  314. Module('r.colors')
  315. >>> mapcalc.wait()
  316. >>> colors.popen.returncode
  317. 0
  318. >>> colors.inputs["stdin"].value
  319. u'1 red'
  320. >>> colors.outputs["stdout"].value
  321. u''
  322. >>> colors.outputs["stderr"].value.strip()
  323. "Color table for raster map <test_a> set to 'rules'"
  324. >>> colors = Module("r.colors", map="test_a", rules="-",
  325. ... run_=False, finish_=False, stdin_=PIPE)
  326. >>> colors.run()
  327. Module('r.colors')
  328. >>> stdout, stderr = colors.popen.communicate(input="1 red")
  329. >>> colors.popen.returncode
  330. 0
  331. >>> stdout
  332. >>> stderr
  333. >>> colors = Module("r.colors", map="test_a", rules="-",
  334. ... run_=False, finish_=False,
  335. ... stdin_=PIPE, stderr_=PIPE)
  336. >>> colors.run()
  337. Module('r.colors')
  338. >>> stdout, stderr = colors.popen.communicate(input="1 red")
  339. >>> colors.popen.returncode
  340. 0
  341. >>> stdout
  342. >>> stderr.strip()
  343. "Color table for raster map <test_a> set to 'rules'"
  344. Run a second time
  345. >>> colors.run()
  346. Module('r.colors')
  347. >>> stdout, stderr = colors.popen.communicate(input="1 blue")
  348. >>> colors.popen.returncode
  349. 0
  350. >>> stdout
  351. >>> stderr.strip()
  352. "Color table for raster map <test_a> set to 'rules'"
  353. Multiple run test
  354. >>> colors = Module("r.colors", map="test_a",
  355. ... color="ryb", run_=False)
  356. >>> colors.get_bash()
  357. u'r.colors map=test_a color=ryb'
  358. >>> colors.run()
  359. Module('r.colors')
  360. >>> colors(color="gyr")
  361. Module('r.colors')
  362. >>> colors.run()
  363. Module('r.colors')
  364. >>> colors(color="ryg")
  365. Module('r.colors')
  366. >>> colors(stderr_=PIPE)
  367. Module('r.colors')
  368. >>> colors.run()
  369. Module('r.colors')
  370. >>> print(colors.outputs["stderr"].value.strip())
  371. Color table for raster map <test_a> set to 'ryg'
  372. >>> colors(color="byg")
  373. Module('r.colors')
  374. >>> colors(stdout_=PIPE)
  375. Module('r.colors')
  376. >>> colors.run()
  377. Module('r.colors')
  378. >>> print(colors.outputs["stderr"].value.strip())
  379. Color table for raster map <test_a> set to 'byg'
  380. Often in the Module class you can find ``*args`` and ``kwargs`` annotation
  381. in methods, like in the __call__ method.
  382. Python allow developers to not specify all the arguments and
  383. keyword arguments of a method or function. ::
  384. def f(*args):
  385. for arg in args:
  386. print arg
  387. therefore if we call the function like:
  388. >>> f('grass', 'gis', 'modules') # doctest: +SKIP
  389. grass
  390. gis
  391. modules
  392. or we can define a new list:
  393. >>> words = ['grass', 'gis', 'modules'] # doctest: +SKIP
  394. >>> f(*words) # doctest: +SKIP
  395. grass
  396. gis
  397. modules
  398. we can do the same with keyword arguments, rewrite the above function: ::
  399. def f(*args, **kargs):
  400. for arg in args:
  401. print arg
  402. for key, value in kargs.items():
  403. print "%s = %r" % (key, value)
  404. now we can use the new function, with:
  405. >>> f('grass', 'gis', 'modules', os = 'linux', language = 'python')
  406. ... # doctest: +SKIP
  407. grass
  408. gis
  409. modules
  410. os = 'linux'
  411. language = 'python'
  412. or, as before we can, define a dictionary and give the dictionary to
  413. the function, like:
  414. >>> keywords = {'os' : 'linux', 'language' : 'python'} # doctest: +SKIP
  415. >>> f(*words, **keywords) # doctest: +SKIP
  416. grass
  417. gis
  418. modules
  419. os = 'linux'
  420. language = 'python'
  421. In the Module class we heavily use this language feature to pass arguments
  422. and keyword arguments to the grass module.
  423. """
  424. def __init__(self, cmd, *args, **kargs):
  425. if isinstance(cmd, unicode):
  426. self.name = str(cmd)
  427. elif isinstance(cmd, str):
  428. self.name = cmd
  429. else:
  430. raise GrassError("Problem initializing the module {s}".format(s=cmd))
  431. try:
  432. # call the command with --interface-description
  433. get_cmd_xml = Popen([cmd, "--interface-description"], stdout=PIPE)
  434. except OSError as e:
  435. print("OSError error({0}): {1}".format(e.errno, e.strerror))
  436. str_err = "Error running: `%s --interface-description`."
  437. raise GrassError(str_err % self.name)
  438. # get the xml of the module
  439. self.xml = get_cmd_xml.communicate()[0]
  440. # transform and parse the xml into an Element class:
  441. # http://docs.python.org/library/xml.etree.elementtree.html
  442. tree = fromstring(self.xml)
  443. for e in tree:
  444. if e.tag not in ('parameter', 'flag'):
  445. self.__setattr__(e.tag, GETFROMTAG[e.tag](e))
  446. #
  447. # extract parameters from the xml
  448. #
  449. self.params_list = [Parameter(p) for p in tree.findall("parameter")]
  450. self.inputs = TypeDict(Parameter)
  451. self.outputs = TypeDict(Parameter)
  452. self.required = []
  453. # Insert parameters into input/output and required
  454. for par in self.params_list:
  455. if par.input:
  456. self.inputs[par.name] = par
  457. else:
  458. self.outputs[par.name] = par
  459. if par.required:
  460. self.required.append(par.name)
  461. #
  462. # extract flags from the xml
  463. #
  464. flags_list = [Flag(f) for f in tree.findall("flag")]
  465. self.flags = TypeDict(Flag)
  466. for flag in flags_list:
  467. self.flags[flag.name] = flag
  468. #
  469. # Add new attributes to the class
  470. #
  471. self.run_ = True
  472. self.finish_ = True
  473. self.check_ = True
  474. self.env_ = None
  475. self.stdin_ = None
  476. self.stdin = None
  477. self.stdout_ = None
  478. self.stderr_ = None
  479. diz = {'name': 'stdin', 'required': False,
  480. 'multiple': False, 'type': 'all',
  481. 'value': None}
  482. self.inputs['stdin'] = Parameter(diz=diz)
  483. diz['name'] = 'stdout'
  484. self.outputs['stdout'] = Parameter(diz=diz)
  485. diz['name'] = 'stderr'
  486. self.outputs['stderr'] = Parameter(diz=diz)
  487. self.popen = None
  488. self.time = None
  489. self.start_time = None # This variable will be set in the run() function
  490. self._finished = False # This variable is set True if wait() was successfully called
  491. if args or kargs:
  492. self.__call__(*args, **kargs)
  493. self.__call__.__func__.__doc__ = self.__doc__
  494. def __call__(self, *args, **kargs):
  495. """Set module parameters to the class and, if run_ is True execute the
  496. module, therefore valid parameters are all the module parameters
  497. plus some extra parameters that are: run_, stdin_, stdout_, stderr_,
  498. env_ and finish_.
  499. """
  500. if not args and not kargs:
  501. self.run()
  502. return self
  503. #
  504. # check for extra kargs, set attribute and remove from dictionary
  505. #
  506. if 'flags' in kargs:
  507. for flg in kargs['flags']:
  508. self.flags[flg].value = True
  509. del(kargs['flags'])
  510. # set attributs
  511. for key in ('run_', 'env_', 'finish_', 'stdout_', 'stderr_', 'check_'):
  512. if key in kargs:
  513. setattr(self, key, kargs.pop(key))
  514. # set inputs
  515. for key in ('stdin_', ):
  516. if key in kargs:
  517. self.inputs[key[:-1]].value = kargs.pop(key)
  518. #
  519. # set/update args
  520. #
  521. for param, arg in zip(self.params_list, args):
  522. param.value = arg
  523. for key, val in kargs.items():
  524. if key in self.inputs:
  525. self.inputs[key].value = val
  526. elif key in self.outputs:
  527. self.outputs[key].value = val
  528. elif key in self.flags:
  529. # we need to add this, because some parameters (overwrite,
  530. # verbose and quiet) work like parameters
  531. self.flags[key].value = val
  532. else:
  533. raise ParameterError('%s is not a valid parameter.' % key)
  534. #
  535. # check if execute
  536. #
  537. if self.run_:
  538. #
  539. # check reqire parameters
  540. #
  541. if self.check_:
  542. self.check()
  543. return self.run()
  544. return self
  545. def get_bash(self):
  546. """Return a BASH representation of the Module."""
  547. return ' '.join(self.make_cmd())
  548. def get_python(self):
  549. """Return a Python representation of the Module."""
  550. prefix = self.name.split('.')[0]
  551. name = '_'.join(self.name.split('.')[1:])
  552. params = ', '.join([par.get_python() for par in self.params_list
  553. if par.get_python() != ''])
  554. flags = ''.join([flg.get_python()
  555. for flg in self.flags.values()
  556. if not flg.special and flg.get_python() != ''])
  557. special = ', '.join([flg.get_python()
  558. for flg in self.flags.values()
  559. if flg.special and flg.get_python() != ''])
  560. # pre name par flg special
  561. if flags and special:
  562. return "%s.%s(%s, flags=%r, %s)" % (prefix, name, params,
  563. flags, special)
  564. elif flags:
  565. return "%s.%s(%s, flags=%r)" % (prefix, name, params, flags)
  566. elif special:
  567. return "%s.%s(%s, %s)" % (prefix, name, params, special)
  568. else:
  569. return "%s.%s(%s)" % (prefix, name, params)
  570. def __str__(self):
  571. """Return the command string that can be executed in a shell"""
  572. return ' '.join(self.make_cmd())
  573. def __repr__(self):
  574. return "Module(%r)" % self.name
  575. @docstring_property(__doc__)
  576. def __doc__(self):
  577. """{cmd_name}({cmd_params})
  578. """
  579. head = DOC['head'].format(cmd_name=self.name,
  580. cmd_params=('\n' + # go to a new line
  581. # give space under the function name
  582. (' ' * (len(self.name) + 1))).join([', '.join(
  583. # transform each parameter in string
  584. [str(param) for param in line if param is not None])
  585. # make a list of parameters with only 3 param per line
  586. for line in zip_longest(*[iter(self.params_list)] * 3)]),)
  587. params = '\n'.join([par.__doc__ for par in self.params_list])
  588. flags = self.flags.__doc__
  589. return '\n'.join([head, params, DOC['flag_head'], flags, DOC['foot']])
  590. def check(self):
  591. """Check the correctness of the provide parameters"""
  592. required = True
  593. for flg in self.flags.values():
  594. if flg and flg.suppress_required:
  595. required = False
  596. if required:
  597. for k in self.required:
  598. if ((k in self.inputs and self.inputs[k].value is None) or
  599. (k in self.outputs and self.outputs[k].value is None)):
  600. msg = "Required parameter <%s> not set."
  601. raise ParameterError(msg % k)
  602. def get_dict(self):
  603. """Return a dictionary that includes the name, all valid
  604. inputs, outputs and flags
  605. """
  606. dic = {}
  607. dic['name'] = self.name
  608. dic['inputs'] = [(k, v.value) for k, v in self.inputs.items()
  609. if v.value]
  610. dic['outputs'] = [(k, v.value) for k, v in self.outputs.items()
  611. if v.value]
  612. dic['flags'] = [flg for flg in self.flags if self.flags[flg].value]
  613. return dic
  614. def make_cmd(self):
  615. """Create the command string that can be executed in a shell
  616. :returns: the command string
  617. """
  618. skip = ['stdin', 'stdout', 'stderr']
  619. args = [self.name, ]
  620. for key in self.inputs:
  621. if key not in skip and self.inputs[key].value is not None and self.inputs[key].value != '':
  622. args.append(self.inputs[key].get_bash())
  623. for key in self.outputs:
  624. if key not in skip and self.outputs[key].value is not None and self.outputs[key].value != '':
  625. args.append(self.outputs[key].get_bash())
  626. for flg in self.flags:
  627. if self.flags[flg].value:
  628. args.append(str(self.flags[flg]))
  629. return args
  630. def run(self):
  631. """Run the module
  632. This function will wait for the process to terminate in case
  633. finish_==True and sets up stdout and stderr. If finish_==False this
  634. function will return after starting the process. Use wait() to wait for
  635. the started process
  636. """
  637. G_debug(1, self.get_bash())
  638. self._finished = False
  639. if self.inputs['stdin'].value:
  640. self.stdin = self.inputs['stdin'].value
  641. self.stdin_ = PIPE
  642. cmd = self.make_cmd()
  643. self.start_time = time.time()
  644. self.popen = Popen(cmd,
  645. stdin=self.stdin_,
  646. stdout=self.stdout_,
  647. stderr=self.stderr_,
  648. env=self.env_)
  649. if self.finish_ is True:
  650. self.wait()
  651. return self
  652. def wait(self):
  653. """Wait for the module to finish. Call this method if
  654. the run() call was performed with self.false_ = False.
  655. """
  656. if self._finished is False:
  657. stdout, stderr = self.popen.communicate(input=self.stdin)
  658. self.outputs['stdout'].value = stdout if stdout else ''
  659. self.outputs['stderr'].value = stderr if stderr else ''
  660. self.time = time.time() - self.start_time
  661. self._finished = True
  662. if self.popen.poll():
  663. raise CalledModuleError(returncode=self.popen.returncode,
  664. code=self.get_bash(),
  665. module=self.name, errors=stderr)
  666. class MultiModule(object):
  667. """This class is designed to run modules in serial in the provided order.
  668. Module can be run in serial synchronously or asynchronously.
  669. - Synchronously: When calling run() all modules will run in serial order
  670. until they are finished and then return
  671. - Asynchronously: When calling run() all modules will run in serial order in a background thread,
  672. run() will return after starting the modules without waiting for them to finish.
  673. The user must call the wait() method to wait for the modules to finish.
  674. >>> from grass.pygrass.modules import Module
  675. >>> from grass.pygrass.modules import MultiModule
  676. >>> import threading
  677. >>> import copy
  678. Synchronous module run
  679. >>> region_1 = Module("g.region", run_=False)
  680. >>> region_1.flags.p = True
  681. >>> region_2 = copy.deepcopy(region_1)
  682. >>> region_2.flags.p = True
  683. >>> mm = MultiModule(module_list=[region_1, region_2])
  684. >>> mm.run()
  685. >>> m_list = mm.get_modules()
  686. >>> m_list[0].popen.returncode
  687. 0
  688. >>> m_list[1].popen.returncode
  689. 0
  690. Asynchronous module run, setting finish = False
  691. >>> region_1 = Module("g.region", run_=False)
  692. >>> region_1.flags.p = True
  693. >>> region_2 = copy.deepcopy(region_1)
  694. >>> region_2.flags.p = True
  695. >>> region_3 = copy.deepcopy(region_1)
  696. >>> region_3.flags.p = True
  697. >>> region_4 = copy.deepcopy(region_1)
  698. >>> region_4.flags.p = True
  699. >>> region_5 = copy.deepcopy(region_1)
  700. >>> region_5.flags.p = True
  701. >>> mm = MultiModule(module_list=[region_1, region_2, region_3, region_4, region_5],
  702. ... finish=False)
  703. >>> t = mm.run()
  704. >>> isinstance(t, threading.Thread)
  705. True
  706. >>> mm.wait()
  707. >>> m_list = mm.get_modules()
  708. >>> m_list[0].popen.returncode
  709. 0
  710. >>> m_list[1].popen.returncode
  711. 0
  712. >>> m_list[2].popen.returncode
  713. 0
  714. >>> m_list[3].popen.returncode
  715. 0
  716. >>> m_list[4].popen.returncode
  717. 0
  718. """
  719. def __init__(self, module_list=[], finish=True):
  720. """Konstruktor of the multi module runner
  721. :param module_list: A list of preconfigured modules that should be run by this module
  722. :param finish: If set True the run() method will wait for all processes to finish,
  723. If set False, the run() method will return after starting the processes.
  724. The wait() method must be called to finish the modules.
  725. :return:
  726. """
  727. self.module_list = module_list
  728. self.finish_ = finish
  729. self.t = None
  730. def get_modules(self):
  731. """Return the list of modules
  732. :return: The list of modules
  733. """
  734. return self.module_list
  735. def run(self):
  736. """Start the modules in the list. if self.finished_ is set True
  737. this method will return after all processes finished.
  738. If self.finish_ is set False, this method will return
  739. after the process list was started for execution.
  740. In a background thread, the processes in the list will
  741. be run one after the another.
  742. :return: None in case of self.finish_ is True, Thread that runs the module otherwise
  743. """
  744. if self.finish_ is True:
  745. for module in self.module_list:
  746. module.finish_ = True
  747. module.run()
  748. return None
  749. else:
  750. self.t = threading.Thread(target=run_modules_in_serial,
  751. args=[self.module_list])
  752. self.t.start()
  753. return self.t
  754. def wait(self):
  755. """Wait for all processes to finish. Call this method if finished was set False
  756. """
  757. if self.t:
  758. self.t.join()
  759. def run_modules_in_serial(module_list):
  760. for proc in module_list:
  761. proc.run()
  762. proc.wait()
  763. return
  764. ###############################################################################
  765. if __name__ == "__main__":
  766. import doctest
  767. doctest.testmod()