|
@@ -2,13 +2,12 @@
|
|
|
from __future__ import (nested_scopes, generators, division, absolute_import,
|
|
|
with_statement, print_function, unicode_literals)
|
|
|
import sys
|
|
|
-from multiprocessing import cpu_count
|
|
|
-import threading
|
|
|
+from multiprocessing import cpu_count, Process, Queue
|
|
|
import time
|
|
|
from xml.etree.ElementTree import fromstring
|
|
|
|
|
|
from grass.exceptions import CalledModuleError, GrassError, ParameterError
|
|
|
-from grass.script.core import Popen, PIPE
|
|
|
+from grass.script.core import Popen, PIPE, use_temp_region, del_temp_region
|
|
|
from .docstring import docstring_property
|
|
|
from .parameter import Parameter
|
|
|
from .flag import Flag
|
|
@@ -28,10 +27,11 @@ def _get_bash(self, *args, **kargs):
|
|
|
|
|
|
|
|
|
class ParallelModuleQueue(object):
|
|
|
- """This class is designed to run an arbitrary number of pygrass Module
|
|
|
+ """This class is designed to run an arbitrary number of pygrass Module or MultiModule
|
|
|
processes in parallel.
|
|
|
|
|
|
- Objects of type grass.pygrass.modules.Module can be put into the
|
|
|
+ Objects of type grass.pygrass.modules.Module or
|
|
|
+ grass.pygrass.modules.MultiModule can be put into the
|
|
|
queue using put() method. When the queue is full with the maximum
|
|
|
number of parallel processes it will wait for all processes to finish,
|
|
|
sets the stdout and stderr of the Module object and removes it
|
|
@@ -43,6 +43,10 @@ class ParallelModuleQueue(object):
|
|
|
This class will raise a GrassError in case a Module process exits
|
|
|
with a return code other than 0.
|
|
|
|
|
|
+ Processes that were run asynchronously with the MultiModule class
|
|
|
+ will not raise a GrassError in case of failure. This must be manually checked
|
|
|
+ by accessing finished modules by calling get_finished_modules().
|
|
|
+
|
|
|
Usage:
|
|
|
|
|
|
Check with a queue size of 3 and 5 processes
|
|
@@ -61,6 +65,7 @@ class ParallelModuleQueue(object):
|
|
|
... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|
|
... queue.put(m)
|
|
|
>>> queue.wait()
|
|
|
+ >>> mapcalc_list = queue.get_finished_modules()
|
|
|
>>> queue.get_num_run_procs()
|
|
|
0
|
|
|
>>> queue.get_max_num_procs()
|
|
@@ -83,6 +88,7 @@ class ParallelModuleQueue(object):
|
|
|
... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|
|
... queue.put(m)
|
|
|
>>> queue.wait()
|
|
|
+ >>> mapcalc_list = queue.get_finished_modules()
|
|
|
>>> queue.get_num_run_procs()
|
|
|
0
|
|
|
>>> queue.get_max_num_procs()
|
|
@@ -95,7 +101,7 @@ class ParallelModuleQueue(object):
|
|
|
0
|
|
|
0
|
|
|
|
|
|
- Check MultiModule approach with three by two processes
|
|
|
+ Check MultiModule approach with three by two processes running in a background process
|
|
|
>>> gregion = Module("g.region", flags="p", run_=False)
|
|
|
>>> queue = ParallelModuleQueue(nprocs=3)
|
|
|
>>> proc_list = []
|
|
@@ -105,9 +111,10 @@ class ParallelModuleQueue(object):
|
|
|
... new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|
|
... proc_list.append(new_mapcalc)
|
|
|
- ... mm = MultiModule(module_list=[new_gregion, new_mapcalc], finish=False)
|
|
|
+ ... mm = MultiModule(module_list=[new_gregion, new_mapcalc], sync=False, set_temp_region=True)
|
|
|
... queue.put(mm)
|
|
|
>>> queue.wait()
|
|
|
+ >>> proc_list = queue.get_finished_modules()
|
|
|
>>> queue.get_num_run_procs()
|
|
|
0
|
|
|
>>> queue.get_max_num_procs()
|
|
@@ -122,7 +129,6 @@ class ParallelModuleQueue(object):
|
|
|
0
|
|
|
|
|
|
Check with a queue size of 8 and 4 processes
|
|
|
-
|
|
|
>>> queue = ParallelModuleQueue(nprocs=8)
|
|
|
>>> mapcalc_list = []
|
|
|
>>> new_mapcalc = copy.deepcopy(mapcalc)
|
|
@@ -150,6 +156,7 @@ class ParallelModuleQueue(object):
|
|
|
>>> queue.get_num_run_procs()
|
|
|
4
|
|
|
>>> queue.wait()
|
|
|
+ >>> mapcalc_list = queue.get_finished_modules()
|
|
|
>>> queue.get_num_run_procs()
|
|
|
0
|
|
|
>>> queue.get_max_num_procs()
|
|
@@ -190,6 +197,7 @@ class ParallelModuleQueue(object):
|
|
|
>>> queue.get_num_run_procs()
|
|
|
1
|
|
|
>>> queue.wait()
|
|
|
+ >>> mapcalc_list = queue.get_finished_modules()
|
|
|
>>> queue.get_num_run_procs()
|
|
|
0
|
|
|
>>> queue.get_max_num_procs()
|
|
@@ -206,7 +214,7 @@ class ParallelModuleQueue(object):
|
|
|
"""Constructor
|
|
|
|
|
|
:param nprocs: The maximum number of Module processes that
|
|
|
- can be run in parallel, defualt is 1, if None
|
|
|
+ can be run in parallel, default is 1, if None
|
|
|
then use all the available CPUs.
|
|
|
:type nprocs: int
|
|
|
"""
|
|
@@ -214,16 +222,17 @@ class ParallelModuleQueue(object):
|
|
|
self._num_procs = nprocs
|
|
|
self._list = nprocs * [None]
|
|
|
self._proc_count = 0
|
|
|
+ self._finished_modules = [] # Store all processed modules in a list
|
|
|
|
|
|
def put(self, module):
|
|
|
- """Put the next Module object in the queue
|
|
|
+ """Put the next Module or MultiModule object in the queue
|
|
|
|
|
|
To run the Module objects in parallel the run\_ and finish\_ options
|
|
|
of the Module must be set to False.
|
|
|
|
|
|
- :param module: a preconfigured Module object or a list of Module objects
|
|
|
+ :param module: a preconfigured Module or MultiModule object that were configured
|
|
|
with run\_ and finish\_ set to False,
|
|
|
- :type module: Module object or list of Module objects
|
|
|
+ :type module: Module or MultiModule object
|
|
|
"""
|
|
|
self._list[self._proc_count] = module
|
|
|
# Force that finish is False, otherwise the execution
|
|
@@ -272,18 +281,31 @@ class ParallelModuleQueue(object):
|
|
|
self._num_procs = int(nprocs)
|
|
|
self.wait()
|
|
|
|
|
|
+ def get_finished_modules(self):
|
|
|
+ """Return all finished processes that were run by this queue
|
|
|
+
|
|
|
+ :return: A list of Module objects
|
|
|
+ """
|
|
|
+ return self._finished_modules
|
|
|
+
|
|
|
def wait(self):
|
|
|
"""Wait for all Module processes that are in the list to finish
|
|
|
and set the modules stdout and stderr output options
|
|
|
+
|
|
|
+ :return: A list of modules that were run
|
|
|
"""
|
|
|
for proc in self._list:
|
|
|
if proc:
|
|
|
- proc.wait()
|
|
|
+ if isinstance(proc, Module):
|
|
|
+ self._finished_modules.extend([proc.wait(),])
|
|
|
+ else:
|
|
|
+ self._finished_modules.extend(proc.wait())
|
|
|
|
|
|
self._list = self._num_procs * [None]
|
|
|
self._proc_count = 0
|
|
|
|
|
|
|
|
|
+
|
|
|
class Module(object):
|
|
|
"""This class is design to wrap/run/interact with the GRASS modules.
|
|
|
|
|
@@ -348,13 +370,13 @@ class Module(object):
|
|
|
... overwrite=True, run_=False, finish_=False)
|
|
|
>>> mapcalc.run()
|
|
|
Module('r.mapcalc')
|
|
|
- >>> mapcalc.wait()
|
|
|
- >>> mapcalc.popen.returncode
|
|
|
+ >>> p = mapcalc.wait()
|
|
|
+ >>> p.popen.returncode
|
|
|
0
|
|
|
>>> mapcalc.run()
|
|
|
Module('r.mapcalc')
|
|
|
- >>> mapcalc.wait()
|
|
|
- >>> mapcalc.popen.returncode
|
|
|
+ >>> p = mapcalc.wait()
|
|
|
+ >>> p.popen.returncode
|
|
|
0
|
|
|
|
|
|
>>> colors = Module("r.colors", map="test_a", rules="-",
|
|
@@ -362,8 +384,8 @@ class Module(object):
|
|
|
... stderr_=PIPE, stdin_="1 red")
|
|
|
>>> colors.run()
|
|
|
Module('r.colors')
|
|
|
- >>> mapcalc.wait()
|
|
|
- >>> colors.popen.returncode
|
|
|
+ >>> p = mapcalc.wait()
|
|
|
+ >>> p.popen.returncode
|
|
|
0
|
|
|
>>> colors.inputs["stdin"].value
|
|
|
u'1 red'
|
|
@@ -722,6 +744,8 @@ class Module(object):
|
|
|
finish_==True and sets up stdout and stderr. If finish_==False this
|
|
|
function will return after starting the process. Use wait() to wait for
|
|
|
the started process
|
|
|
+
|
|
|
+ :return: A reference to this object
|
|
|
"""
|
|
|
G_debug(1, self.get_bash())
|
|
|
self._finished = False
|
|
@@ -745,6 +769,8 @@ class Module(object):
|
|
|
def wait(self):
|
|
|
"""Wait for the module to finish. Call this method if
|
|
|
the run() call was performed with self.false_ = False.
|
|
|
+
|
|
|
+ :return: A reference to this object
|
|
|
"""
|
|
|
if self._finished is False:
|
|
|
stdout, stderr = self.popen.communicate(input=self.stdin)
|
|
@@ -759,20 +785,33 @@ class Module(object):
|
|
|
code=self.get_bash(),
|
|
|
module=self.name, errors=stderr)
|
|
|
|
|
|
+ return self
|
|
|
+
|
|
|
|
|
|
class MultiModule(object):
|
|
|
- """This class is designed to run modules in serial in the provided order.
|
|
|
- Module can be run in serial synchronously or asynchronously.
|
|
|
+ """This class is designed to run a list of modules in serial in the provided order.
|
|
|
+
|
|
|
+ Module can be run in serial synchronously or asynchronously.
|
|
|
|
|
|
- Synchronously: When calling run() all modules will run in serial order
|
|
|
- until they are finished and then return
|
|
|
- - Asynchronously: When calling run() all modules will run in serial order in a background thread,
|
|
|
+ until they are finished and then return. The modules can be accessed by
|
|
|
+ calling get_modules().
|
|
|
+ - Asynchronously: When calling run() all modules will run in serial order in a background process,
|
|
|
run() will return after starting the modules without waiting for them to finish.
|
|
|
The user must call the wait() method to wait for the modules to finish.
|
|
|
+ Asynchonously called module can be optionally run in a temporary region.
|
|
|
+
|
|
|
+ Note:
|
|
|
+
|
|
|
+ Modules run in asynchronous mode can only be accessed via the wait() method.
|
|
|
+ The wait/( method will return all finished modules as list.
|
|
|
+
|
|
|
+ This class can be passed to the ParallelModuleQueue to run lists of modules in parallel. This
|
|
|
+ is meaningful if region settings must be applied to each parallel module run.
|
|
|
|
|
|
>>> from grass.pygrass.modules import Module
|
|
|
>>> from grass.pygrass.modules import MultiModule
|
|
|
- >>> import threading
|
|
|
+ >>> from multiprocessing import Process
|
|
|
>>> import copy
|
|
|
|
|
|
Synchronous module run
|
|
@@ -800,12 +839,31 @@ class MultiModule(object):
|
|
|
>>> region_5 = copy.deepcopy(region_1)
|
|
|
>>> region_5.flags.p = True
|
|
|
>>> mm = MultiModule(module_list=[region_1, region_2, region_3, region_4, region_5],
|
|
|
- ... finish=False)
|
|
|
+ ... sync=False)
|
|
|
>>> t = mm.run()
|
|
|
- >>> isinstance(t, threading.Thread)
|
|
|
+ >>> isinstance(t, Process)
|
|
|
True
|
|
|
- >>> mm.wait()
|
|
|
- >>> m_list = mm.get_modules()
|
|
|
+ >>> m_list = mm.wait()
|
|
|
+ >>> m_list[0].popen.returncode
|
|
|
+ 0
|
|
|
+ >>> m_list[1].popen.returncode
|
|
|
+ 0
|
|
|
+ >>> m_list[2].popen.returncode
|
|
|
+ 0
|
|
|
+ >>> m_list[3].popen.returncode
|
|
|
+ 0
|
|
|
+ >>> m_list[4].popen.returncode
|
|
|
+ 0
|
|
|
+
|
|
|
+ Asynchronous module run, setting finish = False and using temporary region
|
|
|
+ >>> mm = MultiModule(module_list=[region_1, region_2, region_3, region_4, region_5],
|
|
|
+ ... sync=False, set_temp_region=True)
|
|
|
+ >>> str(mm)
|
|
|
+ 'g.region -p ; g.region -p ; g.region -p ; g.region -p ; g.region -p'
|
|
|
+ >>> t = mm.run()
|
|
|
+ >>> isinstance(t, Process)
|
|
|
+ True
|
|
|
+ >>> m_list = mm.wait()
|
|
|
>>> m_list[0].popen.returncode
|
|
|
0
|
|
|
>>> m_list[1].popen.returncode
|
|
@@ -819,21 +877,33 @@ class MultiModule(object):
|
|
|
|
|
|
"""
|
|
|
|
|
|
- def __init__(self, module_list=[], finish=True):
|
|
|
- """Konstruktor of the multi module runner
|
|
|
+ def __init__(self, module_list, sync=True, set_temp_region=False):
|
|
|
+ """Constructor of the multi module class
|
|
|
|
|
|
- :param module_list: A list of preconfigured modules that should be run by this module
|
|
|
- :param finish: If set True the run() method will wait for all processes to finish,
|
|
|
- If set False, the run() method will return after starting the processes.
|
|
|
- The wait() method must be called to finish the modules.
|
|
|
+ :param module_list: A list of pre-configured modules that should be run by this module
|
|
|
+ :param sync: If set True the run() method will wait for all processes to finish -> synchronously run.
|
|
|
+ If set False, the run() method will return after starting the processes -> asynchronously run.
|
|
|
+ The wait() method must be called to finish the modules.
|
|
|
+ :param set_temp_region: Set a temporary region in which the modules should be run, hence
|
|
|
+ region settings in the process list will not affect the current
|
|
|
+ computation region.
|
|
|
:return:
|
|
|
"""
|
|
|
self.module_list = module_list
|
|
|
- self.finish_ = finish
|
|
|
- self.t = None
|
|
|
+ self.set_temp_region = set_temp_region
|
|
|
+ self.finish_ = sync # We use the same variable name a Module
|
|
|
+ self.p = None
|
|
|
+ self.q = Queue()
|
|
|
+
|
|
|
+ def __str__(self):
|
|
|
+ """Return the command string that can be executed in a shell"""
|
|
|
+ return ' ; '.join(str(string) for string in self.module_list)
|
|
|
|
|
|
def get_modules(self):
|
|
|
- """Return the list of modules
|
|
|
+ """Return the list of modules that have been run in synchronous mode
|
|
|
+
|
|
|
+ Note: Asynchronously run module can only be accessed via the wait() method.
|
|
|
+
|
|
|
:return: The list of modules
|
|
|
"""
|
|
|
return self.module_list
|
|
@@ -844,10 +914,10 @@ class MultiModule(object):
|
|
|
|
|
|
If self.finish_ is set False, this method will return
|
|
|
after the process list was started for execution.
|
|
|
- In a background thread, the processes in the list will
|
|
|
+ In a background process, the processes in the list will
|
|
|
be run one after the another.
|
|
|
|
|
|
- :return: None in case of self.finish_ is True, Thread that runs the module otherwise
|
|
|
+ :return: None in case of self.finish_ is True, Process object that runs the module otherwise
|
|
|
"""
|
|
|
|
|
|
if self.finish_ is True:
|
|
@@ -856,23 +926,60 @@ class MultiModule(object):
|
|
|
module.run()
|
|
|
return None
|
|
|
else:
|
|
|
- self.t = threading.Thread(target=run_modules_in_serial,
|
|
|
- args=[self.module_list])
|
|
|
- self.t.start()
|
|
|
+ if self.set_temp_region is True:
|
|
|
+ self.p = Process(target=run_modules_in_temp_region,
|
|
|
+ args=[self.module_list, self.q])
|
|
|
+ else:
|
|
|
+ self.p = Process(target=run_modules,
|
|
|
+ args=[self.module_list, self.q])
|
|
|
+ self.p.start()
|
|
|
|
|
|
- return self.t
|
|
|
+ return self.p
|
|
|
|
|
|
def wait(self):
|
|
|
- """Wait for all processes to finish. Call this method if finished was set False
|
|
|
+ """Wait for all processes to finish. Call this method if finished was set False.
|
|
|
+
|
|
|
+ :return: The process list with finished processes to check their return states
|
|
|
"""
|
|
|
- if self.t:
|
|
|
- self.t.join()
|
|
|
-
|
|
|
-def run_modules_in_serial(module_list):
|
|
|
- for proc in module_list:
|
|
|
- proc.run()
|
|
|
- proc.wait()
|
|
|
- return
|
|
|
+ if self.p:
|
|
|
+ proc_list = self.q.get()
|
|
|
+ self.p.join()
|
|
|
+
|
|
|
+ return proc_list
|
|
|
+
|
|
|
+
|
|
|
+def run_modules_in_temp_region(module_list, q):
|
|
|
+ """Run the modules in a temporary region
|
|
|
+
|
|
|
+ :param module_list: The list of modules to run in serial
|
|
|
+ :param q: The process queue to put the finished process list
|
|
|
+ """
|
|
|
+ use_temp_region()
|
|
|
+ try:
|
|
|
+ for proc in module_list:
|
|
|
+ proc.run()
|
|
|
+ proc.wait()
|
|
|
+ except:
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ q.put(module_list)
|
|
|
+ del_temp_region()
|
|
|
+
|
|
|
+
|
|
|
+def run_modules(module_list, q):
|
|
|
+ """Run the modules
|
|
|
+
|
|
|
+ :param module_list: The list of modules to run in serial
|
|
|
+ :param q: The process queue to put the finished process list
|
|
|
+ """
|
|
|
+ try:
|
|
|
+ for proc in module_list:
|
|
|
+ proc.run()
|
|
|
+ proc.wait()
|
|
|
+ except:
|
|
|
+ raise
|
|
|
+ finally:
|
|
|
+ q.put(module_list)
|
|
|
|
|
|
###############################################################################
|
|
|
|