|
@@ -67,35 +67,143 @@ class ParallelModuleQueue(object):
|
|
|
number of parallel processes it will wait for all processes to finish,
|
|
|
sets the stdout and stderr of the Module object and removes it
|
|
|
from the queue when its finished.
|
|
|
+
|
|
|
+ To finish the queue before the maximum number of parallel
|
|
|
+ processes was reached call wait() .
|
|
|
|
|
|
This class will raise a GrassError in case a Module process exits
|
|
|
with a return code other than 0.
|
|
|
|
|
|
Usage:
|
|
|
|
|
|
+ Check with a queue size of 3 and 5 processes
|
|
|
+
|
|
|
>>> import copy
|
|
|
>>> from grass.pygrass.modules import Module, ParallelModuleQueue
|
|
|
>>> mapcalc_list = []
|
|
|
+
|
|
|
+ Setting run_ to False is important, otherwise a parallel processing is not possible
|
|
|
+
|
|
|
>>> mapcalc = Module("r.mapcalc", overwrite=True, run_=False)
|
|
|
>>> queue = ParallelModuleQueue(nprocs=3)
|
|
|
>>> for i in xrange(5):
|
|
|
... new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
... mapcalc_list.append(new_mapcalc)
|
|
|
- ... new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|
|
- ... queue.put(new_mapcalc)
|
|
|
- Module('r.mapcalc')
|
|
|
- Module('r.mapcalc')
|
|
|
- Module('r.mapcalc')
|
|
|
- Module('r.mapcalc')
|
|
|
- Module('r.mapcalc')
|
|
|
+ ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|
|
+ ... queue.put(m)
|
|
|
+ >>> queue.wait()
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 0
|
|
|
+ >>> queue.get_max_num_procs()
|
|
|
+ 3
|
|
|
+ >>> for mapcalc in mapcalc_list:
|
|
|
+ ... print(mapcalc.popen.returncode)
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+
|
|
|
+ Check with a queue size of 8 and 5 processes
|
|
|
+
|
|
|
+ >>> queue = ParallelModuleQueue(nprocs=8)
|
|
|
+ >>> mapcalc_list = []
|
|
|
+ >>> for i in xrange(5):
|
|
|
+ ... new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
+ ... mapcalc_list.append(new_mapcalc)
|
|
|
+ ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|
|
+ ... queue.put(m)
|
|
|
+ >>> queue.wait()
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 0
|
|
|
+ >>> queue.get_max_num_procs()
|
|
|
+ 8
|
|
|
+ >>> for mapcalc in mapcalc_list:
|
|
|
+ ... print(mapcalc.popen.returncode)
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+ 0
|
|
|
|
|
|
+ Check with a queue size of 8 and 4 processes
|
|
|
+
|
|
|
+ >>> queue = ParallelModuleQueue(nprocs=8)
|
|
|
+ >>> mapcalc_list = []
|
|
|
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
+ >>> mapcalc_list.append(new_mapcalc)
|
|
|
+ >>> m = new_mapcalc(expression="test_pygrass_1 =1")
|
|
|
+ >>> queue.put(m)
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 1
|
|
|
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
+ >>> mapcalc_list.append(new_mapcalc)
|
|
|
+ >>> m = new_mapcalc(expression="test_pygrass_2 =2")
|
|
|
+ >>> queue.put(m)
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 2
|
|
|
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
+ >>> mapcalc_list.append(new_mapcalc)
|
|
|
+ >>> m = new_mapcalc(expression="test_pygrass_3 =3")
|
|
|
+ >>> queue.put(m)
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 3
|
|
|
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
+ >>> mapcalc_list.append(new_mapcalc)
|
|
|
+ >>> m = new_mapcalc(expression="test_pygrass_4 =4")
|
|
|
+ >>> queue.put(m)
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 4
|
|
|
>>> queue.wait()
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 0
|
|
|
+ >>> queue.get_max_num_procs()
|
|
|
+ 8
|
|
|
>>> for mapcalc in mapcalc_list:
|
|
|
... print(mapcalc.popen.returncode)
|
|
|
0
|
|
|
0
|
|
|
0
|
|
|
0
|
|
|
+
|
|
|
+ Check with a queue size of 3 and 4 processes
|
|
|
+
|
|
|
+ >>> queue = ParallelModuleQueue(nprocs=3)
|
|
|
+ >>> mapcalc_list = []
|
|
|
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
+ >>> mapcalc_list.append(new_mapcalc)
|
|
|
+ >>> m = new_mapcalc(expression="test_pygrass_1 =1")
|
|
|
+ >>> queue.put(m)
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 1
|
|
|
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
+ >>> mapcalc_list.append(new_mapcalc)
|
|
|
+ >>> m = new_mapcalc(expression="test_pygrass_2 =2")
|
|
|
+ >>> queue.put(m)
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 2
|
|
|
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
+ >>> mapcalc_list.append(new_mapcalc)
|
|
|
+ >>> m = new_mapcalc(expression="test_pygrass_3 =3")
|
|
|
+ >>> queue.put(m) # Now it will wait until all procs finish and set the counter back to 0
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 0
|
|
|
+ >>> new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
+ >>> mapcalc_list.append(new_mapcalc)
|
|
|
+ >>> m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|
|
+ >>> queue.put(m)
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 1
|
|
|
+ >>> queue.wait()
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 0
|
|
|
+ >>> queue.get_max_num_procs()
|
|
|
+ 3
|
|
|
+ >>> for mapcalc in mapcalc_list:
|
|
|
+ ... print(mapcalc.popen.returncode)
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+ 0
|
|
|
0
|
|
|
|
|
|
"""
|
|
@@ -147,13 +255,14 @@ class ParallelModuleQueue(object):
|
|
|
"""Get the number of Module processes that are in the queue running
|
|
|
or finished
|
|
|
|
|
|
- :returns: the maximum number fo Module processes running/finished in
|
|
|
- the queue
|
|
|
+ :returns: the number fo Module processes running/finished in the queue
|
|
|
"""
|
|
|
- return len(self._list)
|
|
|
+ return self._proc_count
|
|
|
|
|
|
def get_max_num_procs(self):
|
|
|
"""Return the maximum number of parallel Module processes
|
|
|
+
|
|
|
+ :returns: the maximum number of parallel Module processes
|
|
|
"""
|
|
|
return self._num_procs
|
|
|
|