base.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. # -*- coding: utf-8 -*-
  2. """
  3. Fast and exit-safe interface to PyGRASS Raster and Vector layer
  4. using multiprocessing
  5. (C) 2015 by the GRASS Development Team
  6. This program is free software under the GNU General Public
  7. License (>=v2). Read the file COPYING that comes with GRASS
  8. for details.
  9. :authors: Soeren Gebbert
  10. """
  11. from grass.exceptions import FatalError
  12. import time
  13. import threading
  14. import sys
  15. from multiprocessing import Process, Lock, Pipe
  16. import logging
  17. ###############################################################################
  18. def dummy_server(lock, conn):
  19. """Dummy server process
  20. :param lock: A multiprocessing.Lock
  21. :param conn: A multiprocessing.Pipe
  22. """
  23. while True:
  24. # Avoid busy waiting
  25. conn.poll(None)
  26. data = conn.recv()
  27. lock.acquire()
  28. if data[0] == 0:
  29. conn.close()
  30. lock.release()
  31. sys.exit()
  32. if data[0] == 1:
  33. raise Exception("Server process intentionally killed by exception")
  34. lock.release()
  35. class RPCServerBase(object):
  36. """This is the base class for send and receive RPC server
  37. It uses a Pipe for IPC.
  38. >>> import grass.script as gscript
  39. >>> from grass.pygrass.rpc.base import RPCServerBase
  40. >>> import time
  41. >>> provider = RPCServerBase()
  42. >>> provider.is_server_alive()
  43. True
  44. >>> provider.is_check_thread_alive()
  45. True
  46. >>> provider.stop()
  47. >>> time.sleep(1)
  48. >>> provider.is_server_alive()
  49. False
  50. >>> provider.is_check_thread_alive()
  51. False
  52. >>> provider = RPCServerBase()
  53. >>> provider.is_server_alive()
  54. True
  55. >>> provider.is_check_thread_alive()
  56. True
  57. Kill the server process with an exception, it should restart
  58. >>> provider.client_conn.send([1])
  59. >>> provider.is_server_alive()
  60. True
  61. >>> provider.is_check_thread_alive()
  62. True
  63. """
  64. def __init__(self):
  65. self.client_conn = None
  66. self.server_conn = None
  67. self.queue = None
  68. self.server = None
  69. self.checkThread = None
  70. self.threadLock = threading.Lock()
  71. self.start_server()
  72. self.start_checker_thread()
  73. self.stopThread = False
  74. self.stopped = True
  75. # logging.basicConfig(level=logging.DEBUG)
  76. def is_server_alive(self):
  77. return self.server.is_alive()
  78. def is_check_thread_alive(self):
  79. return self.checkThread.is_alive()
  80. def start_checker_thread(self):
  81. if self.checkThread is not None and self.checkThread.is_alive():
  82. self.stop_checker_thread()
  83. self.checkThread = threading.Thread(target=self.thread_checker)
  84. self.checkThread.daemon = True
  85. self.stopThread = False
  86. self.checkThread.start()
  87. def stop_checker_thread(self):
  88. self.threadLock.acquire()
  89. self.stopThread = True
  90. self.threadLock.release()
  91. self.checkThread.join(None)
  92. def thread_checker(self):
  93. """Check every 200 micro seconds if the server process is alive"""
  94. while True:
  95. time.sleep(0.2)
  96. self._check_restart_server(caller="Server check thread")
  97. self.threadLock.acquire()
  98. if self.stopThread is True:
  99. self.threadLock.release()
  100. return
  101. self.threadLock.release()
  102. def start_server(self):
  103. """This function must be re-implemented in the subclasses
  104. """
  105. logging.debug("Start the libgis server")
  106. self.client_conn, self.server_conn = Pipe(True)
  107. self.lock = Lock()
  108. self.server = Process(target=dummy_server, args=(self.lock,
  109. self.server_conn))
  110. self.server.daemon = True
  111. self.server.start()
  112. def check_server(self):
  113. self._check_restart_server()
  114. def _check_restart_server(self, caller="main thread"):
  115. """Restart the server if it was terminated
  116. """
  117. logging.debug("Check libgis server restart")
  118. self.threadLock.acquire()
  119. if self.server.is_alive() is True:
  120. self.threadLock.release()
  121. return
  122. self.client_conn.close()
  123. self.server_conn.close()
  124. self.start_server()
  125. if self.stopped is not True:
  126. logging.warning("Needed to restart the libgis server, caller: %s" % (caller))
  127. self.threadLock.release()
  128. self.stopped = False
  129. def safe_receive(self, message):
  130. """Receive the data and throw a FatalError exception in case the server
  131. process was killed and the pipe was closed by the checker thread"""
  132. logging.debug("Receive message: {message}")
  133. try:
  134. ret = self.client_conn.recv()
  135. if isinstance(ret, FatalError):
  136. raise ret
  137. return ret
  138. except (EOFError, IOError, FatalError) as e:
  139. # The pipe was closed by the checker thread because
  140. # the server process was killed
  141. raise FatalError("Exception raised: " + str(e) + " Message: " + message)
  142. def stop(self):
  143. """Stop the check thread, the libgis server and close the pipe
  144. This method should be called at exit using the package atexit
  145. """
  146. logging.debug("Stop libgis server")
  147. self.stop_checker_thread()
  148. if self.server is not None and self.server.is_alive():
  149. self.client_conn.send([0, ])
  150. self.server.terminate()
  151. if self.client_conn is not None:
  152. self.client_conn.close()
  153. self.stopped = True
  154. if __name__ == "__main__":
  155. import doctest
  156. doctest.testmod()