base.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. # -*- coding: utf-8 -*-
  2. """
  3. Fast and exit-safe interface to PyGRASS Raster and Vector layer
  4. using multiprocessing
  5. (C) 2015 by the GRASS Development Team
  6. This program is free software under the GNU General Public
  7. License (>=v2). Read the file COPYING that comes with GRASS
  8. for details.
  9. :authors: Soeren Gebbert
  10. """
  11. from grass.exceptions import FatalError
  12. import time
  13. import threading
  14. import sys
  15. from multiprocessing import Process, Lock, Pipe
  16. import logging
  17. ###############################################################################
  18. def dummy_server(lock, conn):
  19. """Dummy server process
  20. :param lock: A multiprocessing.Lock
  21. :param conn: A multiprocessing.Pipe
  22. """
  23. while True:
  24. # Avoid busy waiting
  25. conn.poll(None)
  26. data = conn.recv()
  27. lock.acquire()
  28. if data[0] == 0:
  29. conn.close()
  30. lock.release()
  31. sys.exit()
  32. if data[0] == 1:
  33. raise Exception("Server process intentionally killed by exception")
  34. lock.release()
  35. class RPCServerBase(object):
  36. """This is the base class for send and receive RPC server
  37. It uses a Pipe for IPC.
  38. >>> import grass.script as gscript
  39. >>> from grass.pygrass.rpc.base import RPCServerBase
  40. >>> import time
  41. >>> provider = RPCServerBase()
  42. >>> provider.is_server_alive()
  43. True
  44. >>> provider.is_check_thread_alive()
  45. True
  46. >>> provider.stop()
  47. >>> time.sleep(1)
  48. >>> provider.is_server_alive()
  49. False
  50. >>> provider.is_check_thread_alive()
  51. False
  52. >>> provider = RPCServerBase()
  53. >>> provider.is_server_alive()
  54. True
  55. >>> provider.is_check_thread_alive()
  56. True
  57. Kill the server process with an exception, it should restart
  58. >>> provider.client_conn.send([1])
  59. >>> provider.is_server_alive()
  60. True
  61. >>> provider.is_check_thread_alive()
  62. True
  63. """
  64. def __init__(self):
  65. self.client_conn = None
  66. self.server_conn = None
  67. self.queue = None
  68. self.server = None
  69. self.checkThread = None
  70. self.threadLock = threading.Lock()
  71. self.start_server()
  72. self.start_checker_thread()
  73. self.stopThread = False
  74. def is_server_alive(self):
  75. return self.server.is_alive()
  76. def is_check_thread_alive(self):
  77. return self.checkThread.is_alive()
  78. def start_checker_thread(self):
  79. if self.checkThread is not None and self.checkThread.is_alive():
  80. self.stop_checker_thread()
  81. self.checkThread = threading.Thread(target=self.thread_checker)
  82. self.checkThread.daemon = True
  83. self.stopThread = False
  84. self.checkThread.start()
  85. def stop_checker_thread(self):
  86. self.threadLock.acquire()
  87. self.stopThread = True
  88. self.threadLock.release()
  89. self.checkThread.join(None)
  90. def thread_checker(self):
  91. """Check every 200 micro seconds if the server process is alive"""
  92. while True:
  93. time.sleep(0.2)
  94. # sys.stderr.write("Check server process\n")
  95. self._check_restart_server(caller="Server check thread")
  96. self.threadLock.acquire()
  97. if self.stopThread == True:
  98. #sys.stderr.write("Stop thread\n")
  99. self.threadLock.release()
  100. return
  101. self.threadLock.release()
  102. def start_server(self):
  103. """This function must be re-implemented in the subclasses
  104. """
  105. self.client_conn, self.server_conn = Pipe(True)
  106. self.lock = Lock()
  107. self.server = Process(target=dummy_server, args=(self.lock,
  108. self.server_conn))
  109. self.server.daemon = True
  110. self.server.start()
  111. def check_server(self):
  112. self._check_restart_server()
  113. def _check_restart_server(self, caller="main thread"):
  114. """Restart the server if it was terminated
  115. """
  116. self.threadLock.acquire()
  117. if self.server.is_alive() is True:
  118. self.threadLock.release()
  119. return
  120. self.client_conn.close()
  121. self.server_conn.close()
  122. self.start_server()
  123. logging.warning("Needed to restart the libgis server, caller: %s"%(caller))
  124. self.threadLock.release()
  125. def safe_receive(self, message):
  126. """Receive the data and throw a FatalError exception in case the server
  127. process was killed and the pipe was closed by the checker thread"""
  128. try:
  129. ret = self.client_conn.recv()
  130. if isinstance(ret, FatalError):
  131. raise ret
  132. return ret
  133. except (EOFError, IOError, FatalError) as e:
  134. # The pipe was closed by the checker thread because
  135. # the server process was killed
  136. raise FatalError("Exception raised: " + str(e) + " Message: " + message)
  137. def stop(self):
  138. """Stop the check thread, the libgis server and close the pipe
  139. This method should be called at exit using the package atexit
  140. """
  141. #sys.stderr.write("###### Stop was called\n")
  142. self.stop_checker_thread()
  143. if self.server is not None and self.server.is_alive():
  144. self.client_conn.send([0, ])
  145. self.server.terminate()
  146. if self.client_conn is not None:
  147. self.client_conn.close()
  148. if __name__ == "__main__":
  149. import doctest
  150. doctest.testmod()