Browse Source

Implemented parallel r.mapcalc and r3.mapcalc calls in temporal raster algebra.

git-svn-id: https://svn.osgeo.org/grass/grass/trunk@59012 15284696-431f-4ddb-bdfa-cd5b030d7da7
Soeren Gebbert 11 years ago
parent
commit
67e3753ad7

+ 6 - 1
lib/python/temporal/abstract_space_time_dataset.py

@@ -204,8 +204,13 @@ class AbstractSpaceTimeDataset(AbstractDataset):
                 first = token.split("=")[0]
                 second = ""
 
+                flag = 0
                 for t in token.split("=")[1:]:
-                    second += t
+                    if flag == 0:
+                        second += t
+                        flag = 1
+                    else:
+                        second += "=" + t
 
                 token = "%s=\"%s\"" % (first, second)
 

+ 5 - 2
lib/python/temporal/temporal_algebra.py

@@ -2358,8 +2358,11 @@ class TemporalAlgebraParser(object):
 
     # Handle errors.
     def p_error(self, t):
-        raise SyntaxError("syntax error on line %d near '%s' expression '%s'" %
-            (t.lineno, t.value, self.expression))
+        if t:
+            raise SyntaxError("syntax error on line %d, token %s near '%s' expression '%s'" %
+                             (t.lineno, t.type, t.value, self.expression))
+        else:
+            raise SyntaxError("Unexpected syntax error")
 
 ###############################################################################
 

+ 3 - 3
lib/python/temporal/temporal_raster3d_algebra.py

@@ -19,10 +19,10 @@ from temporal_raster_base_algebra import *
 class TemporalRaster3DAlgebraParser(TemporalRasterBaseAlgebraParser):
     """The temporal raster algebra class"""
 
-    def __init__(self, pid=None, run=False, debug=True, spatial = False):
-        TemporalRasterBaseAlgebraParser.__init__(self, pid, run, debug, spatial)
+    def __init__(self, pid=None, run=False, debug=True, spatial = False, nprocs = 1):
+        TemporalRasterBaseAlgebraParser.__init__(self, pid, run, debug, spatial, nprocs)
 
-        self.m_mapcalc = pygrass.Module('r3.mapcalc')
+        self.m_mapcalc = pymod.Module('r3.mapcalc')
 
     def parse(self, expression, basename = None, overwrite=False):
         self.lexer = TemporalRasterAlgebraLexer()

+ 3 - 4
lib/python/temporal/temporal_raster_algebra.py

@@ -53,7 +53,6 @@ for details.
 @endcode
 """
 
-import grass.pygrass.modules as pygrass
 from temporal_raster_base_algebra import *
 
 ###############################################################################
@@ -61,10 +60,10 @@ from temporal_raster_base_algebra import *
 class TemporalRasterAlgebraParser(TemporalRasterBaseAlgebraParser):
     """The temporal raster algebra class"""
 
-    def __init__(self, pid=None, run=False, debug=True, spatial = False):
-        TemporalRasterBaseAlgebraParser.__init__(self, pid, run, debug, spatial)
+    def __init__(self, pid=None, run=False, debug=True, spatial = False, nprocs = 1):
+        TemporalRasterBaseAlgebraParser.__init__(self, pid, run, debug, spatial, nprocs)
 
-        self.m_mapcalc = pygrass.Module('r.mapcalc')
+        self.m_mapcalc = pymod.Module('r.mapcalc')
 
     def parse(self, expression, basename = None, overwrite=False):
         self.lexer = TemporalRasterAlgebraLexer()

+ 36 - 29
lib/python/temporal/temporal_raster_base_algebra.py

@@ -96,6 +96,7 @@ for details.
 @endcode
 """
 
+import grass.pygrass.modules as pymod
 from temporal_raster_operator import *
 from temporal_algebra import *
 
@@ -186,8 +187,9 @@ class TemporalRasterBaseAlgebraParser(TemporalAlgebraParser):
          'T_ARITH1_OPERATOR'))
 
     def __init__(self, pid=None, run = True, debug = False, spatial = False, \
-                  null = False):
+                  nprocs = 1, null = False):
         TemporalAlgebraParser.__init__(self, pid, run, debug, spatial)
+        self.nprocs = nprocs
 
     ######################### Temporal functions ##############################
 
@@ -259,6 +261,8 @@ class TemporalRasterBaseAlgebraParser(TemporalAlgebraParser):
 
         """
         if self.run:
+            # Create the process queue for parallel mapcalc processing
+            process_queue = pymod.ParallelModuleQueue(int(self.nprocs))
             if isinstance(t[3], list):
                 num = len(t[3])
                 count = 0
@@ -271,33 +275,38 @@ class TemporalRasterBaseAlgebraParser(TemporalAlgebraParser):
                     if rastermap.map_exists() and self.overwrite == False:
                         self.msgr.fatal("Error raster maps with basename %s exist. Use --o flag to overwrite existing file" \
                                             %(rastername))
+                map_test_list = []
                 for map_i in t[3]:
                     newident = self.basename + "_" + str(count)
                     if "cmd_list" in dir(map_i):
-                        returncode = 0
                         print(newident + ' = ' + map_i.cmd_list)
                         # Build r.mapcalc module and execute expression.
                         # Change map name to given basename.
                         # Create deepcopy of r.mapcalc module.
+
+                        map_test = map_i.get_new_instance(newident + "@" + self.mapset)
+                        map_test.set_temporal_extent(map_i.get_temporal_extent())
+                        map_test.set_spatial_extent(map_i.get_spatial_extent())
+                        map_test_list.append(map_test)
+
                         m = copy.deepcopy(self.m_mapcalc)
                         m_expression = newident + "=" + map_i.cmd_list
                         m.inputs["expression"].value = m_expression
                         m.flags["overwrite"].value = self.overwrite
-                        m.run()
-                        if m.popen.returncode != 0:
-                            self.msgr.fatal("Error starting %s : \n%s" \
-                                                %(m.get_bash(), \
-                                                m.popen.stderr))
-                        map_test = map_i.get_new_instance(newident + "@" + self.mapset)
-                        if not map_test.map_exists():
-                            returncode = 1
-                            break
-                        if returncode == 0:
-                            map_i.set_id(newident + "@" + self.mapset)
-                            count += 1
-                            register_list.append(map_i)
+                        process_queue.put(m)
+                    else:
+                        map_i.set_id(newident + "@" + self.mapset)
+                        map_test_list.append(map_i)
+                    count  += 1
+
+                process_queue.wait()
+
+                for map_i in map_test_list:
+                    if not map_test.map_exists():
+                        self.msgr.error(_("Error computing map <%s>"%(map_i.get_id()) ))
                     else:
                         register_list.append(map_i)
+
                 # Open connection to temporal database.
                 dbif, connect = init_dbif(self.dbif)
                 # Create result space time dataset.
@@ -306,20 +315,18 @@ class TemporalRasterBaseAlgebraParser(TemporalAlgebraParser):
                                                          'mean', self.dbif, \
                                                          overwrite = self.overwrite)
                 for map_i in register_list:
-                    # Check if modules should be executed from command list.
-                    if "cmd_list" in dir(map_i):
-                        # Get meta data from grass database.
-                        map_i.load()
-                        if map_i.is_in_db(dbif) and self.overwrite:
-                            # Update map in temporal database.
-                            map_i.update_all(dbif)
-                        elif map_i.is_in_db(dbif) and self.overwrite == False:
-                            # Raise error if map exists and no overwrite flag is given.
-                            self.msgr.fatal("Error vector map %s exist in temporal database. Use overwrite flag.  : \n%s" \
-                                                %(map_i.get_map_id(), cmd.popen.stderr))
-                        else:
-                            # Insert map into temporal database.
-                            map_i.insert(dbif)
+                    # Get meta data from grass database.
+                    map_i.load()
+                    if map_i.is_in_db(dbif) and self.overwrite:
+                        # Update map in temporal database.
+                        map_i.update_all(dbif)
+                    elif map_i.is_in_db(dbif) and self.overwrite == False:
+                        # Raise error if map exists and no overwrite flag is given.
+                        self.msgr.fatal("Error vector map %s exist in temporal database. Use overwrite flag.  : \n%s" \
+                                            %(map_i.get_map_id(), cmd.popen.stderr))
+                    else:
+                        # Insert map into temporal database.
+                        map_i.insert(dbif)
                     # Register map in result space time dataset.
                     success = resultstds.register_map(map_i, dbif)
                 resultstds.update_from_registered_maps(dbif)