|
@@ -60,6 +60,120 @@ from flag import Flag
|
|
from typedict import TypeDict
|
|
from typedict import TypeDict
|
|
from read import GETFROMTAG, DOC
|
|
from read import GETFROMTAG, DOC
|
|
|
|
|
|
|
|
+class ParallelModuleQueue(object):
|
|
|
|
+ """!This class is designed to run an arbitrary number of pygrass Module
|
|
|
|
+ processes in parallel.
|
|
|
|
+
|
|
|
|
+ Objects of type grass.pygrass.modules.Module can be put into the
|
|
|
|
+ queue using put() method. When the queue is full with the maximum
|
|
|
|
+ number of parallel processes it will wait for all processes to finish,
|
|
|
|
+ sets the stdout and stderr of the Module object and removes it
|
|
|
|
+ from the queue when its finished.
|
|
|
|
+
|
|
|
|
+ This class will raise a GrassError in case a Module process exits
|
|
|
|
+ with a return code other than 0.
|
|
|
|
+
|
|
|
|
+ Usage:
|
|
|
|
+
|
|
|
|
+ @code
|
|
|
|
+
|
|
|
|
+ >>> import copy
|
|
|
|
+ >>> import grass.pygrass.modules as pymod
|
|
|
|
+ >>> mapcalc_list = []
|
|
|
|
+ >>> mapcalc = pymod.Module("r.mapcalc",
|
|
|
|
+ ... overwrite=True,
|
|
|
|
+ ... run_=False,
|
|
|
|
+ ... finish_=False)
|
|
|
|
+ >>> queue = pymod.ParallelModuleQueue(max_num_procs=3)
|
|
|
|
+ >>> for i in xrange(5):
|
|
|
|
+ ... new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
|
+ ... mapcalc_list.append(new_mapcalc)
|
|
|
|
+ ... new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|
|
|
+ ... queue.put(new_mapcalc)
|
|
|
|
+ >>> queue.wait()
|
|
|
|
+ >>> for mapcalc in mapcalc_list:
|
|
|
|
+ ... print(mapcalc.popen.returncode)
|
|
|
|
+ 0
|
|
|
|
+ 0
|
|
|
|
+ 0
|
|
|
|
+ 0
|
|
|
|
+ 0
|
|
|
|
+
|
|
|
|
+ @endcode
|
|
|
|
+
|
|
|
|
+ """
|
|
|
|
+ def __init__(self, max_num_procs=1):
|
|
|
|
+ """!Constructor
|
|
|
|
+
|
|
|
|
+ @param max_num_procs The maximum number of Module processes that
|
|
|
|
+ can be run in parallel
|
|
|
|
+ """
|
|
|
|
+ self._num_procs = int(max_num_procs)
|
|
|
|
+ self._list = int(max_num_procs) * [None]
|
|
|
|
+ self._proc_count = 0
|
|
|
|
+
|
|
|
|
+ def put(self, module):
|
|
|
|
+ """!Put the next Module object in the queue
|
|
|
|
+
|
|
|
|
+ To run the Module objects in parallel the run_ and finish_ options
|
|
|
|
+ of the Module must be set to False.
|
|
|
|
+
|
|
|
|
+ @param module A preconfigured Module object with run_ and finish_
|
|
|
|
+ set to False
|
|
|
|
+ """
|
|
|
|
+ self._list[self._proc_count] = module
|
|
|
|
+ self._list[self._proc_count].run()
|
|
|
|
+ self._proc_count += 1
|
|
|
|
+
|
|
|
|
+ if self._proc_count == self._num_procs:
|
|
|
|
+ self.wait()
|
|
|
|
+
|
|
|
|
+ def get(self, num):
|
|
|
|
+ """!Get a Module object from the queue
|
|
|
|
+
|
|
|
|
+ @param num The number of the object in queue
|
|
|
|
+ @return The Module object or None if num is not in the queue
|
|
|
|
+ """
|
|
|
|
+ if num < self._num_procs:
|
|
|
|
+ return self._list[num]
|
|
|
|
+ return None
|
|
|
|
+
|
|
|
|
+ def get_num_run_procs(self):
|
|
|
|
+ """!Get the number of Module processes that are in the queue running
|
|
|
|
+ or finished
|
|
|
|
+
|
|
|
|
+ @return The maximum number fo Module processes running/finished in
|
|
|
|
+ the queue
|
|
|
|
+ """
|
|
|
|
+ return len(self._list)
|
|
|
|
+
|
|
|
|
+ def get_max_num_procs(self):
|
|
|
|
+ """!Return the maximum number of parallel Module processes
|
|
|
|
+ """
|
|
|
|
+ return self._num_procs
|
|
|
|
+
|
|
|
|
+ def set_max_num_procs(self, max_num_procs):
|
|
|
|
+ """!Set the maximum number of Module processes that should run
|
|
|
|
+ in parallel
|
|
|
|
+ """
|
|
|
|
+ self._num_procs = int(max_num_procs)
|
|
|
|
+ self.wait()
|
|
|
|
+
|
|
|
|
+ def wait(self):
|
|
|
|
+ """!Wait for all Module processes that are in the list to finish
|
|
|
|
+ and set the modules stdout and stderr output options
|
|
|
|
+ """
|
|
|
|
+ for proc in self._list:
|
|
|
|
+ if proc:
|
|
|
|
+ stdout, stderr = proc.popen.communicate(input=proc.stdin)
|
|
|
|
+ proc.outputs['stdout'].value = stdout if stdout else ''
|
|
|
|
+ proc.outputs['stderr'].value = stderr if stderr else ''
|
|
|
|
+
|
|
|
|
+ if proc.popen.returncode != 0:
|
|
|
|
+ GrassError(("Error running module %s")%(proc.name))
|
|
|
|
+
|
|
|
|
+ self._list = self._num_procs * [None]
|
|
|
|
+ self._proc_count = 0
|
|
|
|
|
|
class Module(object):
|
|
class Module(object):
|
|
"""
|
|
"""
|
|
@@ -308,7 +422,7 @@ class Module(object):
|
|
return dic
|
|
return dic
|
|
|
|
|
|
def make_cmd(self):
|
|
def make_cmd(self):
|
|
- """!Create the commdn string that can be exceuted in a shell
|
|
|
|
|
|
+ """!Create the command string that can be executed in a shell
|
|
|
|
|
|
@return The command string
|
|
@return The command string
|
|
"""
|
|
"""
|