|
@@ -3,6 +3,7 @@ from __future__ import (nested_scopes, generators, division, absolute_import,
|
|
|
with_statement, print_function, unicode_literals)
|
|
|
import sys
|
|
|
from multiprocessing import cpu_count
|
|
|
+import threading
|
|
|
import time
|
|
|
from xml.etree.ElementTree import fromstring
|
|
|
|
|
@@ -94,6 +95,32 @@ class ParallelModuleQueue(object):
|
|
|
0
|
|
|
0
|
|
|
|
|
|
+ Check MultiModule approach with three by two processes
|
|
|
+ >>> gregion = Module("g.region", flags="p", run_=False)
|
|
|
+ >>> queue = ParallelModuleQueue(nprocs=3)
|
|
|
+ >>> proc_list = []
|
|
|
+ >>> for i in xrange(3):
|
|
|
+ ... new_gregion = copy.deepcopy(gregion)
|
|
|
+ ... proc_list.append(new_gregion)
|
|
|
+ ... new_mapcalc = copy.deepcopy(mapcalc)
|
|
|
+ ... m = new_mapcalc(expression="test_pygrass_%i = %i"%(i, i))
|
|
|
+ ... proc_list.append(new_mapcalc)
|
|
|
+ ... mm = MultiModule(module_list=[new_gregion, new_mapcalc], finish=False)
|
|
|
+ ... queue.put(mm)
|
|
|
+ >>> queue.wait()
|
|
|
+ >>> queue.get_num_run_procs()
|
|
|
+ 0
|
|
|
+ >>> queue.get_max_num_procs()
|
|
|
+ 3
|
|
|
+ >>> for proc in proc_list:
|
|
|
+ ... print(proc.popen.returncode)
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+ 0
|
|
|
+
|
|
|
Check with a queue size of 8 and 4 processes
|
|
|
|
|
|
>>> queue = ParallelModuleQueue(nprocs=8)
|
|
@@ -194,9 +221,9 @@ class ParallelModuleQueue(object):
|
|
|
To run the Module objects in parallel the run\_ and finish\_ options
|
|
|
of the Module must be set to False.
|
|
|
|
|
|
- :param module: a preconfigured Module object with run\_ and finish\_
|
|
|
- set to False
|
|
|
- :type module: Module object
|
|
|
+ :param module: a preconfigured Module object or a list of Module objects
|
|
|
+ with run\_ and finish\_ set to False,
|
|
|
+ :type module: Module object or list of Module objects
|
|
|
"""
|
|
|
self._list[self._proc_count] = module
|
|
|
# Force that finish is False, otherwise the execution
|
|
@@ -209,11 +236,11 @@ class ParallelModuleQueue(object):
|
|
|
self.wait()
|
|
|
|
|
|
def get(self, num):
|
|
|
- """Get a Module object from the queue
|
|
|
+ """Get a Module object or list of Module objects from the queue
|
|
|
|
|
|
:param num: the number of the object in queue
|
|
|
:type num: int
|
|
|
- :returns: the Module object or None if num is not in the queue
|
|
|
+ :returns: the Module object or list of Module objects or None if num is not in the queue
|
|
|
"""
|
|
|
if num < self._num_procs:
|
|
|
return self._list[num]
|
|
@@ -251,12 +278,7 @@ class ParallelModuleQueue(object):
|
|
|
"""
|
|
|
for proc in self._list:
|
|
|
if proc:
|
|
|
- stdout, stderr = proc.popen.communicate(input=proc.stdin)
|
|
|
- proc.outputs['stdout'].value = stdout if stdout else ''
|
|
|
- proc.outputs['stderr'].value = stderr if stderr else ''
|
|
|
-
|
|
|
- if proc.popen.returncode != 0:
|
|
|
- GrassError(("Error running module %s") % (proc.name))
|
|
|
+ proc.wait()
|
|
|
|
|
|
self._list = self._num_procs * [None]
|
|
|
self._proc_count = 0
|
|
@@ -322,11 +344,25 @@ class Module(object):
|
|
|
>>> mapcalc.popen.returncode
|
|
|
0
|
|
|
|
|
|
+ >>> mapcalc = Module("r.mapcalc", expression="test_a = 1",
|
|
|
+ ... overwrite=True, run_=False, finish_=False)
|
|
|
+ >>> mapcalc.run()
|
|
|
+ Module('r.mapcalc')
|
|
|
+ >>> mapcalc.wait()
|
|
|
+ >>> mapcalc.popen.returncode
|
|
|
+ 0
|
|
|
+ >>> mapcalc.run()
|
|
|
+ Module('r.mapcalc')
|
|
|
+ >>> mapcalc.wait()
|
|
|
+ >>> mapcalc.popen.returncode
|
|
|
+ 0
|
|
|
+
|
|
|
>>> colors = Module("r.colors", map="test_a", rules="-",
|
|
|
... run_=False, stdout_=PIPE,
|
|
|
... stderr_=PIPE, stdin_="1 red")
|
|
|
>>> colors.run()
|
|
|
Module('r.colors')
|
|
|
+ >>> mapcalc.wait()
|
|
|
>>> colors.popen.returncode
|
|
|
0
|
|
|
>>> colors.inputs["stdin"].value
|
|
@@ -522,6 +558,8 @@ class Module(object):
|
|
|
self.outputs['stderr'] = Parameter(diz=diz)
|
|
|
self.popen = None
|
|
|
self.time = None
|
|
|
+ self.start_time = None # This variable will be set in the run() function
|
|
|
+ self._finished = False # This variable is set True if wait() was successfully called
|
|
|
|
|
|
if args or kargs:
|
|
|
self.__call__(*args, **kargs)
|
|
@@ -680,39 +718,161 @@ class Module(object):
|
|
|
|
|
|
def run(self):
|
|
|
"""Run the module
|
|
|
-
|
|
|
- :param node:
|
|
|
- :type node:
|
|
|
-
|
|
|
This function will wait for the process to terminate in case
|
|
|
finish_==True and sets up stdout and stderr. If finish_==False this
|
|
|
- function will return after starting the process. Use
|
|
|
- self.popen.communicate() of self.popen.wait() to wait for the process
|
|
|
- termination. The handling of stdout and stderr must then be done
|
|
|
- outside of this function.
|
|
|
+ function will return after starting the process. Use wait() to wait for
|
|
|
+ the started process
|
|
|
"""
|
|
|
G_debug(1, self.get_bash())
|
|
|
+ self._finished = False
|
|
|
if self.inputs['stdin'].value:
|
|
|
self.stdin = self.inputs['stdin'].value
|
|
|
self.stdin_ = PIPE
|
|
|
|
|
|
cmd = self.make_cmd()
|
|
|
- start = time.time()
|
|
|
+ self.start_time = time.time()
|
|
|
self.popen = Popen(cmd,
|
|
|
stdin=self.stdin_,
|
|
|
stdout=self.stdout_,
|
|
|
stderr=self.stderr_,
|
|
|
env=self.env_)
|
|
|
- if self.finish_:
|
|
|
+
|
|
|
+ if self.finish_ is True:
|
|
|
+ self.wait()
|
|
|
+
|
|
|
+ return self
|
|
|
+
|
|
|
+ def wait(self):
|
|
|
+ """Wait for the module to finish. Call this method if
|
|
|
+ the run() call was performed with self.false_ = False.
|
|
|
+ """
|
|
|
+ if self._finished is False:
|
|
|
stdout, stderr = self.popen.communicate(input=self.stdin)
|
|
|
self.outputs['stdout'].value = stdout if stdout else ''
|
|
|
self.outputs['stderr'].value = stderr if stderr else ''
|
|
|
- self.time = time.time() - start
|
|
|
+ self.time = time.time() - self.start_time
|
|
|
+
|
|
|
+ self._finished = True
|
|
|
+
|
|
|
if self.popen.poll():
|
|
|
raise CalledModuleError(returncode=self.popen.returncode,
|
|
|
code=self.get_bash(),
|
|
|
module=self.name, errors=stderr)
|
|
|
- return self
|
|
|
+
|
|
|
+
|
|
|
+class MultiModule(object):
|
|
|
+ """This class is designed to run modules in serial in the provided order.
|
|
|
+ Module can be run in serial synchronously or asynchronously.
|
|
|
+
|
|
|
+ - Synchronously: When calling run() all modules will run in serial order
|
|
|
+ until they are finished and then return
|
|
|
+ - Asynchronously: When calling run() all modules will run in serial order in a background thread,
|
|
|
+ run() will return after starting the modules without waiting for them to finish.
|
|
|
+ The user must call the wait() method to wait for the modules to finish.
|
|
|
+
|
|
|
+ >>> from grass.pygrass.modules import Module
|
|
|
+ >>> from grass.pygrass.modules import MultiModule
|
|
|
+ >>> import threading
|
|
|
+ >>> import copy
|
|
|
+
|
|
|
+ Synchronous module run
|
|
|
+ >>> region_1 = Module("g.region", run_=False)
|
|
|
+ >>> region_1.flags.p = True
|
|
|
+ >>> region_2 = copy.deepcopy(region_1)
|
|
|
+ >>> region_2.flags.p = True
|
|
|
+ >>> mm = MultiModule(module_list=[region_1, region_2])
|
|
|
+ >>> mm.run()
|
|
|
+ >>> m_list = mm.get_modules()
|
|
|
+ >>> m_list[0].popen.returncode
|
|
|
+ 0
|
|
|
+ >>> m_list[1].popen.returncode
|
|
|
+ 0
|
|
|
+
|
|
|
+ Asynchronous module run, setting finish = False
|
|
|
+ >>> region_1 = Module("g.region", run_=False)
|
|
|
+ >>> region_1.flags.p = True
|
|
|
+ >>> region_2 = copy.deepcopy(region_1)
|
|
|
+ >>> region_2.flags.p = True
|
|
|
+ >>> region_3 = copy.deepcopy(region_1)
|
|
|
+ >>> region_3.flags.p = True
|
|
|
+ >>> region_4 = copy.deepcopy(region_1)
|
|
|
+ >>> region_4.flags.p = True
|
|
|
+ >>> region_5 = copy.deepcopy(region_1)
|
|
|
+ >>> region_5.flags.p = True
|
|
|
+ >>> mm = MultiModule(module_list=[region_1, region_2, region_3, region_4, region_5],
|
|
|
+ ... finish=False)
|
|
|
+ >>> t = mm.run()
|
|
|
+ >>> isinstance(t, threading.Thread)
|
|
|
+ True
|
|
|
+ >>> mm.wait()
|
|
|
+ >>> m_list = mm.get_modules()
|
|
|
+ >>> m_list[0].popen.returncode
|
|
|
+ 0
|
|
|
+ >>> m_list[1].popen.returncode
|
|
|
+ 0
|
|
|
+ >>> m_list[2].popen.returncode
|
|
|
+ 0
|
|
|
+ >>> m_list[3].popen.returncode
|
|
|
+ 0
|
|
|
+ >>> m_list[4].popen.returncode
|
|
|
+ 0
|
|
|
+
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, module_list=[], finish=True):
|
|
|
+ """Konstruktor of the multi module runner
|
|
|
+
|
|
|
+ :param module_list: A list of preconfigured modules that should be run by this module
|
|
|
+ :param finish: If set True the run() method will wait for all processes to finish,
|
|
|
+ If set False, the run() method will return after starting the processes.
|
|
|
+ The wait() method must be called to finish the modules.
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ self.module_list = module_list
|
|
|
+ self.finish_ = finish
|
|
|
+ self.t = None
|
|
|
+
|
|
|
+ def get_modules(self):
|
|
|
+ """Return the list of modules
|
|
|
+ :return: The list of modules
|
|
|
+ """
|
|
|
+ return self.module_list
|
|
|
+
|
|
|
+ def run(self):
|
|
|
+ """Start the modules in the list. if self.finished_ is set True
|
|
|
+ this method will return after all processes finished.
|
|
|
+
|
|
|
+ If self.finish_ is set False, this method will return
|
|
|
+ after the process list was started for execution.
|
|
|
+ In a background thread, the processes in the list will
|
|
|
+ be run one after the another.
|
|
|
+
|
|
|
+ :return: None in case of self.finish_ is True, Thread that runs the module otherwise
|
|
|
+ """
|
|
|
+
|
|
|
+ if self.finish_ is True:
|
|
|
+ for module in self.module_list:
|
|
|
+ module.finish_ = True
|
|
|
+ module.run()
|
|
|
+ return None
|
|
|
+ else:
|
|
|
+ self.t = threading.Thread(target=run_modules_in_serial,
|
|
|
+ args=[self.module_list])
|
|
|
+ self.t.start()
|
|
|
+
|
|
|
+ return self.t
|
|
|
+
|
|
|
+ def wait(self):
|
|
|
+ """Wait for all processes to finish. Call this method if finished was set False
|
|
|
+ """
|
|
|
+ if self.t:
|
|
|
+ self.t.join()
|
|
|
+
|
|
|
+def run_modules_in_serial(module_list):
|
|
|
+ for proc in module_list:
|
|
|
+ proc.run()
|
|
|
+ proc.wait()
|
|
|
+ return
|
|
|
|
|
|
###############################################################################
|
|
|
|