|
@@ -2,6 +2,7 @@
|
|
|
from __future__ import (nested_scopes, generators, division, absolute_import,
|
|
|
with_statement, print_function, unicode_literals)
|
|
|
import sys
|
|
|
+from multiprocessing import cpu_count
|
|
|
|
|
|
if sys.version_info[0] == 2:
|
|
|
from itertools import izip_longest as zip_longest
|
|
@@ -39,12 +40,18 @@ class ParallelModuleQueue(object):
|
|
|
>>> from grass.pygrass.modules import Module, ParallelModuleQueue
|
|
|
>>> mapcalc_list = []
|
|
|
>>> mapcalc = Module("r.mapcalc", overwrite=True, run_=False)
|
|
|
- >>> queue = ParallelModuleQueue(max_num_procs=3)
|
|
|
+ >>> queue = ParallelModuleQueue(nprocs=3)
|
|
|
>>> for i in xrange(5):
|
|
|
... new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
... mapcalc_list.append(new_mapcalc)
|
|
|
... new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|
|
... queue.put(new_mapcalc)
|
|
|
+ Module('r.mapcalc')
|
|
|
+ Module('r.mapcalc')
|
|
|
+ Module('r.mapcalc')
|
|
|
+ Module('r.mapcalc')
|
|
|
+ Module('r.mapcalc')
|
|
|
+
|
|
|
>>> queue.wait()
|
|
|
>>> for mapcalc in mapcalc_list:
|
|
|
... print(mapcalc.popen.returncode)
|
|
@@ -55,15 +62,17 @@ class ParallelModuleQueue(object):
|
|
|
0
|
|
|
|
|
|
"""
|
|
|
- def __init__(self, max_num_procs=1):
|
|
|
+ def __init__(self, nprocs=1):
|
|
|
"""Constructor
|
|
|
|
|
|
- :param max_num_procs: The maximum number of Module processes that
|
|
|
- can be run in parallel
|
|
|
- :type max_num_procs: int
|
|
|
+ :param nprocs: The maximum number of Module processes that
|
|
|
+ can be run in parallel, defualt is 1, if None
|
|
|
+ then use all the available CPUs.
|
|
|
+ :type nprocs: int
|
|
|
"""
|
|
|
- self._num_procs = int(max_num_procs)
|
|
|
- self._list = int(max_num_procs) * [None]
|
|
|
+ nprocs = int(nprocs) if nprocs else cpu_count()
|
|
|
+ self._num_procs = nprocs
|
|
|
+ self._list = nprocs * [None]
|
|
|
self._proc_count = 0
|
|
|
|
|
|
def put(self, module):
|
|
@@ -111,15 +120,15 @@ class ParallelModuleQueue(object):
|
|
|
"""
|
|
|
return self._num_procs
|
|
|
|
|
|
- def set_max_num_procs(self, max_num_procs):
|
|
|
+ def set_max_num_procs(self, nprocs):
|
|
|
"""Set the maximum number of Module processes that should run
|
|
|
in parallel
|
|
|
|
|
|
- :param max_num_procs: The maximum number of Module processes that
|
|
|
+ :param nprocs: The maximum number of Module processes that
|
|
|
can be run in parallel
|
|
|
- :type max_num_procs: int
|
|
|
+ :type nprocs: int
|
|
|
"""
|
|
|
- self._num_procs = int(max_num_procs)
|
|
|
+ self._num_procs = int(nprocs)
|
|
|
self.wait()
|
|
|
|
|
|
def wait(self):
|
|
@@ -178,6 +187,7 @@ class Module(object):
|
|
|
|
|
|
>>> new_neighbors2 = copy.deepcopy(neighbors)
|
|
|
>>> new_neighbors2(input="mapD", size=3, run_=False)
|
|
|
+ Module('r.neighbors')
|
|
|
>>> new_neighbors2.get_bash()
|
|
|
u'r.neighbors input=mapD method=average size=3 quantile=0.5 output=mapB'
|
|
|
|
|
@@ -187,6 +197,7 @@ class Module(object):
|
|
|
|
|
|
>>> new_neighbors3 = copy.deepcopy(neighbors)
|
|
|
>>> new_neighbors3(input="mapA", size=3, output="mapB", run_=False)
|
|
|
+ Module('r.neighbors')
|
|
|
>>> new_neighbors3.get_bash()
|
|
|
u'r.neighbors input=mapA method=average size=3 output=mapB'
|
|
|
|
|
@@ -249,16 +260,21 @@ class Module(object):
|
|
|
>>> colors.run()
|
|
|
Module('r.colors')
|
|
|
>>> colors(color="gyr")
|
|
|
+ Module('r.colors')
|
|
|
>>> colors.run()
|
|
|
Module('r.colors')
|
|
|
>>> colors(color="ryg")
|
|
|
+ Module('r.colors')
|
|
|
>>> colors(stderr_=PIPE)
|
|
|
+ Module('r.colors')
|
|
|
>>> colors.run()
|
|
|
Module('r.colors')
|
|
|
>>> print(colors.outputs["stderr"].value.strip())
|
|
|
Color table for raster map <test_a> set to 'ryg'
|
|
|
>>> colors(color="byg")
|
|
|
+ Module('r.colors')
|
|
|
>>> colors(stdout_=PIPE)
|
|
|
+ Module('r.colors')
|
|
|
>>> colors.run()
|
|
|
Module('r.colors')
|
|
|
>>> print(colors.outputs["stderr"].value.strip())
|
|
@@ -403,7 +419,7 @@ class Module(object):
|
|
|
"""
|
|
|
if not args and not kargs:
|
|
|
self.run()
|
|
|
- return
|
|
|
+ return self
|
|
|
#
|
|
|
# check for extra kargs, set attribute and remove from dictionary
|
|
|
#
|
|
@@ -452,6 +468,7 @@ class Module(object):
|
|
|
msg = "Required parameter <%s> not set."
|
|
|
raise ParameterError(msg % k)
|
|
|
return self.run()
|
|
|
+ return self
|
|
|
|
|
|
def get_bash(self):
|
|
|
"""Return a BASH rapresentation of the Module."""
|