base.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. """
  2. Fast and exit-safe interface to PyGRASS Raster and Vector layer
  3. using multiprocessing
  4. (C) 2015 by the GRASS Development Team
  5. This program is free software under the GNU General Public
  6. License (>=v2). Read the file COPYING that comes with GRASS
  7. for details.
  8. :authors: Soeren Gebbert
  9. """
  10. from grass.exceptions import FatalError
  11. import time
  12. import threading
  13. import sys
  14. from multiprocessing import Process, Lock, Pipe
  15. import logging
  16. ###############################################################################
  17. def dummy_server(lock, conn):
  18. """Dummy server process
  19. :param lock: A multiprocessing.Lock
  20. :param conn: A multiprocessing.Pipe
  21. """
  22. while True:
  23. # Avoid busy waiting
  24. conn.poll(None)
  25. data = conn.recv()
  26. lock.acquire()
  27. if data[0] == 0:
  28. conn.close()
  29. lock.release()
  30. sys.exit()
  31. if data[0] == 1:
  32. raise Exception("Server process intentionally killed by exception")
  33. lock.release()
  34. class RPCServerBase(object):
  35. """This is the base class for send and receive RPC server
  36. It uses a Pipe for IPC.
  37. >>> import grass.script as gscript
  38. >>> from grass.pygrass.rpc.base import RPCServerBase
  39. >>> import time
  40. >>> provider = RPCServerBase()
  41. >>> provider.is_server_alive()
  42. True
  43. >>> provider.is_check_thread_alive()
  44. True
  45. >>> provider.stop()
  46. >>> time.sleep(1)
  47. >>> provider.is_server_alive()
  48. False
  49. >>> provider.is_check_thread_alive()
  50. False
  51. >>> provider = RPCServerBase()
  52. >>> provider.is_server_alive()
  53. True
  54. >>> provider.is_check_thread_alive()
  55. True
  56. Kill the server process with an exception, it should restart
  57. >>> provider.client_conn.send([1])
  58. >>> provider.is_server_alive()
  59. True
  60. >>> provider.is_check_thread_alive()
  61. True
  62. """
  63. def __init__(self):
  64. self.client_conn = None
  65. self.server_conn = None
  66. self.queue = None
  67. self.server = None
  68. self.checkThread = None
  69. self.threadLock = threading.Lock()
  70. self.start_server()
  71. self.start_checker_thread()
  72. self.stopThread = False
  73. self.stopped = True
  74. # logging.basicConfig(level=logging.DEBUG)
  75. def is_server_alive(self):
  76. return self.server.is_alive()
  77. def is_check_thread_alive(self):
  78. return self.checkThread.is_alive()
  79. def start_checker_thread(self):
  80. if self.checkThread is not None and self.checkThread.is_alive():
  81. self.stop_checker_thread()
  82. self.checkThread = threading.Thread(target=self.thread_checker)
  83. self.checkThread.daemon = True
  84. self.stopThread = False
  85. self.checkThread.start()
  86. def stop_checker_thread(self):
  87. self.threadLock.acquire()
  88. self.stopThread = True
  89. self.threadLock.release()
  90. self.checkThread.join(None)
  91. def thread_checker(self):
  92. """Check every 200 micro seconds if the server process is alive"""
  93. while True:
  94. time.sleep(0.2)
  95. self._check_restart_server(caller="Server check thread")
  96. self.threadLock.acquire()
  97. if self.stopThread is True:
  98. self.threadLock.release()
  99. return
  100. self.threadLock.release()
  101. def start_server(self):
  102. """This function must be re-implemented in the subclasses"""
  103. logging.debug("Start the libgis server")
  104. self.client_conn, self.server_conn = Pipe(True)
  105. self.lock = Lock()
  106. self.server = Process(target=dummy_server, args=(self.lock, self.server_conn))
  107. self.server.daemon = True
  108. self.server.start()
  109. def check_server(self):
  110. self._check_restart_server()
  111. def _check_restart_server(self, caller="main thread"):
  112. """Restart the server if it was terminated"""
  113. logging.debug("Check libgis server restart")
  114. self.threadLock.acquire()
  115. if self.server.is_alive() is True:
  116. self.threadLock.release()
  117. return
  118. self.client_conn.close()
  119. self.server_conn.close()
  120. self.start_server()
  121. if self.stopped is not True:
  122. logging.warning(
  123. "Needed to restart the libgis server, caller: %s" % (caller)
  124. )
  125. self.threadLock.release()
  126. self.stopped = False
  127. def safe_receive(self, message):
  128. """Receive the data and throw a FatalError exception in case the server
  129. process was killed and the pipe was closed by the checker thread"""
  130. logging.debug("Receive message: {message}")
  131. try:
  132. ret = self.client_conn.recv()
  133. if isinstance(ret, FatalError):
  134. raise ret
  135. return ret
  136. except (EOFError, IOError, FatalError) as e:
  137. # The pipe was closed by the checker thread because
  138. # the server process was killed
  139. raise FatalError("Exception raised: " + str(e) + " Message: " + message)
  140. def stop(self):
  141. """Stop the check thread, the libgis server and close the pipe
  142. This method should be called at exit using the package atexit
  143. """
  144. logging.debug("Stop libgis server")
  145. self.stop_checker_thread()
  146. if self.server is not None and self.server.is_alive():
  147. self.client_conn.send(
  148. [
  149. 0,
  150. ]
  151. )
  152. self.server.terminate()
  153. if self.client_conn is not None:
  154. self.client_conn.close()
  155. self.stopped = True
  156. if __name__ == "__main__":
  157. import doctest
  158. doctest.testmod()