Kaynağa Gözat

temporal library: Using a thread to observe the state of the libgis server process, since the fatal_error SIGABRT signal can not be catched in Python and the Python error handler function can not be registered in the libgis error handling system.

git-svn-id: https://svn.osgeo.org/grass/grass/trunk@64526 15284696-431f-4ddb-bdfa-cd5b030d7da7
Soeren Gebbert 10 yıl önce
ebeveyn
işleme
112ca1d33c

+ 148 - 55
lib/python/temporal/c_libraries_interface.py

@@ -11,6 +11,9 @@ for details.
 :authors: Soeren Gebbert
 """
 
+from grass.exceptions import FatalError
+import time
+import threading
 import sys
 from multiprocessing import Process, Lock, Pipe
 import logging
@@ -723,7 +726,6 @@ def _convert_timestamp_from_grass(ts):
 
 ###############################################################################
 
-
 def _stop(lock, conn, data):
     libgis.G_debug(1, "Stop C-interface server")
     conn.close()
@@ -735,6 +737,8 @@ def _stop(lock, conn, data):
 server_connection = None
 server_lock = None
 
+#def error_handler(data):
+#    server_connection.close()
 
 def c_library_server(lock, conn):
     """The GRASS C-libraries server function designed to be a target for
@@ -743,6 +747,17 @@ def c_library_server(lock, conn):
        :param lock: A multiprocessing.Lock
        :param conn: A multiprocessing.Pipe
     """
+    #global server_connection
+    #server_connection = conn
+ 
+    #CALLBACK = CFUNCTYPE(None, c_void_p)
+    #CALLBACK.restype = None
+    #CALLBACK.argtypes = c_void_p
+
+    #cerror_handler = CALLBACK(error_handler)
+    
+    #libgis.G_add_error_handler(cerror_handler, POINTER(None))
+
     # Crerate the function array
     functions = [0]*15
     functions[RPCDefs.STOP] = _stop
@@ -765,13 +780,12 @@ def c_library_server(lock, conn):
 
     while True:
         # Avoid busy waiting
-        conn.poll(4)
+        conn.poll(None)
         data = conn.recv()
         lock.acquire()
         functions[data[0]](lock, conn, data)
         lock.release()
 
-
 class CLibrariesInterface(object):
     """Fast and exit-safe interface to GRASS C-libraries functions
 
@@ -897,6 +911,30 @@ class CLibrariesInterface(object):
            >>> mapset = ciface.get_mapset()
            >>> location = ciface.get_location()
            >>> gisdbase = ciface.get_gisdbase()
+           
+           >>> ciface.fatal_error()
+           Traceback (most recent call last):
+               raise FatalError(message)
+           FatalError: Fatal error
+           
+           >>> mapset = ciface.get_mapset()
+           >>> location = ciface.get_location()
+           >>> gisdbase = ciface.get_gisdbase()
+           
+           >>> ciface.fatal_error()
+           Traceback (most recent call last):
+               raise FatalError(message)
+           FatalError: Fatal error
+           
+           >>> ciface.fatal_error()
+           Traceback (most recent call last):
+               raise FatalError(message)
+           FatalError: Fatal error
+
+           >>> ciface.fatal_error()
+           Traceback (most recent call last):
+               raise FatalError(message)
+           FatalError: Fatal error
 
            >>> gscript.del_temp_region()
 
@@ -906,7 +944,39 @@ class CLibrariesInterface(object):
         self.server_conn = None
         self.queue = None
         self.server = None
+        self.checkThread = None
+        self.threadLock = threading.Lock()
         self.start_server()
+        self.start_checker_thread()
+        self.stopThread = False
+        
+    def start_checker_thread(self):
+        if self.checkThread is not None and self.checkThread.is_alive():
+            self.stop_checker_thread()
+
+        self.checkThread = threading.Thread(target=self.thread_checker)
+        self.checkThread.daemon = True
+        self.stopThread = False
+        self.checkThread.start()
+    
+    def stop_checker_thread(self):
+        self.threadLock.acquire()
+        self.stopThread = True
+        self.threadLock.release()
+        self.checkThread.join(None)
+        
+    def thread_checker(self):
+        """Check every 200 micro seconds if the server process is alive"""
+        while True:
+            time.sleep(0.2)
+            #sys.stderr.write("Check server process\n")
+            self._check_restart_server()
+            self.threadLock.acquire()
+            if self.stopThread == True:
+                #sys.stderr.write("Stop thread\n")
+                self.threadLock.release()
+                return
+            self.threadLock.release()
 
     def start_server(self):
         self.client_conn, self.server_conn = Pipe(True)
@@ -916,16 +986,25 @@ class CLibrariesInterface(object):
         self.server.daemon = True
         self.server.start()
 
+    def check_server(self):
+        self._check_restart_server()
+    
     def _check_restart_server(self):
         """Restart the server if it was terminated
         """
+        self.threadLock.acquire()
+        
         if self.server.is_alive() is True:
+            self.threadLock.release()
             return
         self.client_conn.close()
         self.server_conn.close()
         self.start_server()
+
         logging.warning("Needed to restart the libgis server")
 
+        self.threadLock.release()
+
     def raster_map_exists(self, name, mapset):
         """Check if a raster map exists in the spatial database
 
@@ -933,10 +1012,10 @@ class CLibrariesInterface(object):
            :param mapset: The mapset of the map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.MAP_EXISTS, RPCDefs.TYPE_RASTER,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("raster_map_exists")
 
     def read_raster_info(self, name, mapset):
         """Read the raster map info from the file system and store the content
@@ -947,10 +1026,10 @@ class CLibrariesInterface(object):
            :returns: The key value pairs of the map specific metadata,
                      or None in case of an error
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_MAP_INFO, RPCDefs.TYPE_RASTER,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("read_raster_info")
 
     def has_raster_timestamp(self, name, mapset):
         """Check if a file based raster timetamp exists
@@ -959,10 +1038,10 @@ class CLibrariesInterface(object):
            :param mapset: The mapset of the map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.HAS_TIMESTAMP, RPCDefs.TYPE_RASTER,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("has_raster_timestamp")
 
     def remove_raster_timestamp(self, name, mapset):
         """Remove a file based raster timetamp
@@ -974,10 +1053,10 @@ class CLibrariesInterface(object):
            :param mapset: The mapset of the map
            :returns: The return value of G_remove_raster_timestamp
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.REMOVE_TIMESTAMP, RPCDefs.TYPE_RASTER,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("remove_raster_timestamp")
 
     def read_raster_timestamp(self, name, mapset):
         """Read a file based raster timetamp
@@ -996,10 +1075,10 @@ class CLibrariesInterface(object):
            :param mapset: The mapset of the map
            :returns: The return value of G_read_raster_timestamp
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_TIMESTAMP, RPCDefs.TYPE_RASTER,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("read_raster_timestamp")
 
     def write_raster_timestamp(self, name, mapset, timestring):
         """Write a file based raster timetamp
@@ -1015,10 +1094,10 @@ class CLibrariesInterface(object):
            :param timestring: A GRASS datetime C-library compatible string
            :returns: The return value of G_write_raster_timestamp
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.WRITE_TIMESTAMP, RPCDefs.TYPE_RASTER,
                                name, mapset, None, timestring])
-        return self.client_conn.recv()
+        return self.safe_receive("write_raster_timestamp")
 
     def raster3d_map_exists(self, name, mapset):
         """Check if a 3D raster map exists in the spatial database
@@ -1027,10 +1106,10 @@ class CLibrariesInterface(object):
            :param mapset: The mapset of the map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.MAP_EXISTS, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("raster3d_map_exists")
 
     def read_raster3d_info(self, name, mapset):
         """Read the 3D raster map info from the file system and store the content
@@ -1041,10 +1120,10 @@ class CLibrariesInterface(object):
            :returns: The key value pairs of the map specific metadata,
                      or None in case of an error
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_MAP_INFO, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("read_raster3d_info")
 
     def has_raster3d_timestamp(self, name, mapset):
         """Check if a file based 3D raster timetamp exists
@@ -1053,10 +1132,10 @@ class CLibrariesInterface(object):
            :param mapset: The mapset of the map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.HAS_TIMESTAMP, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("has_raster3d_timestamp")
 
     def remove_raster3d_timestamp(self, name, mapset):
         """Remove a file based 3D raster timetamp
@@ -1068,10 +1147,10 @@ class CLibrariesInterface(object):
            :param mapset: The mapset of the map
            :returns: The return value of G_remove_raster3d_timestamp
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.REMOVE_TIMESTAMP, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("remove_raster3d_timestamp")
 
     def read_raster3d_timestamp(self, name, mapset):
         """Read a file based 3D raster timetamp
@@ -1090,10 +1169,10 @@ class CLibrariesInterface(object):
            :param mapset: The mapset of the map
            :returns: The return value of G_read_raster3d_timestamp
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_TIMESTAMP, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("read_raster3d_timestamp")
 
     def write_raster3d_timestamp(self, name, mapset, timestring):
         """Write a file based 3D raster timetamp
@@ -1109,10 +1188,10 @@ class CLibrariesInterface(object):
            :param timestring: A GRASS datetime C-library compatible string
            :returns: The return value of G_write_raster3d_timestamp
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.WRITE_TIMESTAMP, RPCDefs.TYPE_RASTER3D,
                                name, mapset, None, timestring])
-        return self.client_conn.recv()
+        return self.safe_receive("write_raster3d_timestamp")
 
     def vector_map_exists(self, name, mapset):
         """Check if a vector map exists in the spatial database
@@ -1121,10 +1200,10 @@ class CLibrariesInterface(object):
            :param mapset: The mapset of the map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.MAP_EXISTS, RPCDefs.TYPE_VECTOR,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("vector_map_exists")
 
     def read_vector_info(self, name, mapset):
         """Read the vector map info from the file system and store the content
@@ -1135,10 +1214,10 @@ class CLibrariesInterface(object):
            :returns: The key value pairs of the map specific metadata,
                      or None in case of an error
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_MAP_INFO, RPCDefs.TYPE_VECTOR,
                                name, mapset, None])
-        return self.client_conn.recv()
+        return self.safe_receive("read_vector_info")
 
     def has_vector_timestamp(self, name, mapset, layer=None):
         """Check if a file based vector timetamp exists
@@ -1148,10 +1227,10 @@ class CLibrariesInterface(object):
            :param layer: The layer of the vector map
            :returns: True if exists, False if not
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.HAS_TIMESTAMP, RPCDefs.TYPE_VECTOR,
                                name, mapset, layer])
-        return self.client_conn.recv()
+        return self.safe_receive("has_vector_timestamp")
 
     def remove_vector_timestamp(self, name, mapset, layer=None):
         """Remove a file based vector timetamp
@@ -1164,10 +1243,10 @@ class CLibrariesInterface(object):
            :param layer: The layer of the vector map
            :returns: The return value of G_remove_vector_timestamp
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.REMOVE_TIMESTAMP, RPCDefs.TYPE_VECTOR,
                                name, mapset, layer])
-        return self.client_conn.recv()
+        return self.safe_receive("remove_vector_timestamp")
 
     def read_vector_timestamp(self, name, mapset, layer=None):
         """Read a file based vector timetamp
@@ -1187,10 +1266,10 @@ class CLibrariesInterface(object):
            :param layer: The layer of the vector map
            :returns: The return value ofG_read_vector_timestamp and the timestamps
        """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.READ_TIMESTAMP, RPCDefs.TYPE_VECTOR,
                                name, mapset, layer])
-        return self.client_conn.recv()
+        return self.safe_receive("read_vector_timestamp")
 
     def write_vector_timestamp(self, name, mapset, timestring, layer=None):
         """Write a file based vector timestamp
@@ -1207,19 +1286,19 @@ class CLibrariesInterface(object):
            :param layer: The layer of the vector map
            :returns: The return value of G_write_vector_timestamp
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.WRITE_TIMESTAMP, RPCDefs.TYPE_VECTOR,
                                name, mapset, layer, timestring])
-        return self.client_conn.recv()
+        return self.safe_receive("write_vector_timestamp")
 
     def available_mapsets(self):
         """Return all available mapsets the user can access as a list of strings
 
            :returns: Names of available mapsets as list of strings
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.AVAILABLE_MAPSETS, ])
-        return self.client_conn.recv()
+        return self.safe_receive("available_mapsets")
 
     def get_driver_name(self, mapset=None):
         """Return the temporal database driver of a specific mapset
@@ -1228,9 +1307,9 @@ class CLibrariesInterface(object):
 
            :returns: Name of the driver or None if no temporal database present
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.GET_DRIVER_NAME, mapset])
-        return self.client_conn.recv()
+        return self.safe_receive("get_driver_name")
 
     def get_database_name(self, mapset=None):
         """Return the temporal database name of a specific mapset
@@ -1239,36 +1318,36 @@ class CLibrariesInterface(object):
 
            :returns: Name of the database or None if no temporal database present
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.GET_DATABASE_NAME, mapset])
-        return self.client_conn.recv()
+        return self.safe_receive("get_database_name")
 
     def get_mapset(self):
         """Return the current mapset
 
            :returns: Name of the current mapset
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.G_MAPSET, ])
-        return self.client_conn.recv()
+        return self.safe_receive("get_mapsetn")
 
     def get_location(self):
         """Return the location
 
            :returns: Name of the location
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.G_LOCATION, ])
-        return self.client_conn.recv()
+        return self.safe_receive("get_location")
 
     def get_gisdbase(self):
         """Return the gisdatabase
 
            :returns: Name of the gisdatabase
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.G_GISDBASE, ])
-        return self.client_conn.recv()
+        return self.safe_receive("get_gisdbase")
 
     def fatal_error(self, mapset=None):
         """Return the temporal database name of a specific mapset
@@ -1277,21 +1356,35 @@ class CLibrariesInterface(object):
 
            :returns: Name of the database or None if no temporal database present
         """
-        self._check_restart_server()
+        self.check_server()
         self.client_conn.send([RPCDefs.G_FATAL_ERROR])
+        # The pipe should be closed in the checker thread
+        return self.safe_receive("Fatal error")
+
+    def safe_receive(self, message):
+        """Receive the data and throw an FatalError exception in case the server 
+           process was killed and the pipe was closed by the checker thread"""
+        try:
+            return self.client_conn.recv()
+        except EOFError:
+            # The pipe was closed by the checker thread because
+            # the server process was killed
+            raise FatalError(message)
 
     def stop(self):
-        """Stop the messenger server and close the pipe
+        """Stop the check thread, the libgis server and close the pipe
 
            This method should be called at exit using the package atexit
         """
+        #sys.stderr.write("###### Stop was called\n")
+        self.stop_checker_thread()
         if self.server is not None and self.server.is_alive():
             self.client_conn.send([0, ])
-            self.server.join(5)
-            self.server.terminate()
+            self.server.join()
         if self.client_conn is not None:
             self.client_conn.close()
 
 if __name__ == "__main__":
     import doctest
     doctest.testmod()
+

+ 5 - 3
lib/python/temporal/core.py

@@ -36,6 +36,7 @@ gettext.install('grasslibs', os.path.join(os.getenv("GISBASE"), 'locale'))
 import grass.script as gscript
 from datetime import datetime
 from c_libraries_interface import *
+from grass.pygrass import messages
 # Import all supported database backends
 # Ignore import errors since they are checked later
 try:
@@ -238,8 +239,8 @@ def _init_tgis_message_interface(raise_on_error=False):
                               a fatal error, call sys.exit(1) otherwise
     """
     global message_interface
-    from grass.pygrass import messages
-    message_interface = messages.get_msgr(raise_on_error=raise_on_error)
+    if message_interface is None:
+        message_interface = messages.get_msgr(raise_on_error=raise_on_error)
 
 
 def get_tgis_message_interface():
@@ -266,7 +267,8 @@ def _init_tgis_c_library_interface():
        libraster, libraster3d and libvector functions
     """
     global c_library_interface
-    c_library_interface = CLibrariesInterface()
+    if c_library_interface is None:
+        c_library_interface = CLibrariesInterface()
 
 
 def get_tgis_c_library_interface():

+ 3 - 3
lib/python/temporal/testsuite/test_doctests.py

@@ -22,12 +22,11 @@ def load_tests(loader, tests, ignore):
     tests.addTests(doctest.DocTestSuite(grass.temporal.abstract_map_dataset))
     tests.addTests(doctest.DocTestSuite(grass.temporal.abstract_space_time_dataset))
     tests.addTests(doctest.DocTestSuite(grass.temporal.base))
-    tests.addTests(doctest.DocTestSuite(grass.temporal.c_libraries_interface))
     # Unexpected error here
-    #tests.addTests(doctest.DocTestSuite(grass.temporal.core))
+    ##tests.addTests(doctest.DocTestSuite(grass.temporal.core))
     tests.addTests(doctest.DocTestSuite(grass.temporal.datetime_math))
     # Unexpected error here
-    #tests.addTests(doctest.DocTestSuite(grass.temporal.list_stds))
+    ##tests.addTests(doctest.DocTestSuite(grass.temporal.list_stds))
     tests.addTests(doctest.DocTestSuite(grass.temporal.metadata))
     tests.addTests(doctest.DocTestSuite(grass.temporal.register))
     tests.addTests(doctest.DocTestSuite(grass.temporal.space_time_datasets))
@@ -44,6 +43,7 @@ def load_tests(loader, tests, ignore):
     tests.addTests(doctest.DocTestSuite(grass.temporal.temporal_raster_base_algebra))
     tests.addTests(doctest.DocTestSuite(grass.temporal.temporal_operator))
     tests.addTests(doctest.DocTestSuite(grass.temporal.temporal_vector_algebra))
+    tests.addTests(doctest.DocTestSuite(grass.temporal.c_libraries_interface))
     return tests
 
 if __name__ == '__main__':