base.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # -*- coding: utf-8 -*-
  2. """
  3. Fast and exit-safe interface to PyGRASS Raster and Vector layer
  4. using multiprocessing
  5. (C) 2015 by the GRASS Development Team
  6. This program is free software under the GNU General Public
  7. License (>=v2). Read the file COPYING that comes with GRASS
  8. for details.
  9. :authors: Soeren Gebbert
  10. """
  11. from grass.exceptions import FatalError
  12. import time
  13. import threading
  14. import sys
  15. from multiprocessing import Process, Lock, Pipe
  16. import logging
  17. ###############################################################################
  18. def dummy_server(lock, conn):
  19. """Dummy server process
  20. :param lock: A multiprocessing.Lock
  21. :param conn: A multiprocessing.Pipe
  22. """
  23. while True:
  24. # Avoid busy waiting
  25. conn.poll(None)
  26. data = conn.recv()
  27. lock.acquire()
  28. if data[0] == 0:
  29. conn.close()
  30. lock.release()
  31. sys.exit()
  32. if data[0] == 1:
  33. raise Exception("Server process intentionally killed by exception")
  34. lock.release()
  35. class RPCServerBase(object):
  36. """This is the base class for send and receive RPC server
  37. It uses a Pipe for IPC.
  38. >>> import grass.script as gscript
  39. >>> from grass.pygrass.rpc.base import RPCServerBase
  40. >>> provider = RPCServerBase()
  41. >>> provider.is_server_alive()
  42. True
  43. >>> provider.is_check_thread_alive()
  44. True
  45. >>> provider.stop()
  46. >>> provider.is_server_alive()
  47. False
  48. >>> provider.is_check_thread_alive()
  49. False
  50. >>> provider = RPCServerBase()
  51. >>> provider.is_server_alive()
  52. True
  53. >>> provider.is_check_thread_alive()
  54. True
  55. Kill the server process with an exception, it should restart
  56. >>> provider.client_conn.send([1])
  57. >>> provider.is_server_alive()
  58. True
  59. >>> provider.is_check_thread_alive()
  60. True
  61. """
  62. def __init__(self):
  63. self.client_conn = None
  64. self.server_conn = None
  65. self.queue = None
  66. self.server = None
  67. self.checkThread = None
  68. self.threadLock = threading.Lock()
  69. self.start_server()
  70. self.start_checker_thread()
  71. self.stopThread = False
  72. def is_server_alive(self):
  73. return self.server.is_alive()
  74. def is_check_thread_alive(self):
  75. return self.checkThread.is_alive()
  76. def start_checker_thread(self):
  77. if self.checkThread is not None and self.checkThread.is_alive():
  78. self.stop_checker_thread()
  79. self.checkThread = threading.Thread(target=self.thread_checker)
  80. self.checkThread.daemon = True
  81. self.stopThread = False
  82. self.checkThread.start()
  83. def stop_checker_thread(self):
  84. self.threadLock.acquire()
  85. self.stopThread = True
  86. self.threadLock.release()
  87. self.checkThread.join(None)
  88. def thread_checker(self):
  89. """Check every 200 micro seconds if the server process is alive"""
  90. while True:
  91. time.sleep(0.2)
  92. #sys.stderr.write("Check server process\n")
  93. self._check_restart_server()
  94. self.threadLock.acquire()
  95. if self.stopThread == True:
  96. #sys.stderr.write("Stop thread\n")
  97. self.threadLock.release()
  98. return
  99. self.threadLock.release()
  100. def start_server(self):
  101. """This function must be re-implemented in the subclasses
  102. """
  103. self.client_conn, self.server_conn = Pipe(True)
  104. self.lock = Lock()
  105. self.server = Process(target=dummy_server, args=(self.lock,
  106. self.server_conn))
  107. self.server.daemon = True
  108. self.server.start()
  109. def check_server(self):
  110. self._check_restart_server()
  111. def _check_restart_server(self):
  112. """Restart the server if it was terminated
  113. """
  114. self.threadLock.acquire()
  115. if self.server.is_alive() is True:
  116. self.threadLock.release()
  117. return
  118. self.client_conn.close()
  119. self.server_conn.close()
  120. self.start_server()
  121. logging.warning("Needed to restart the libgis server")
  122. self.threadLock.release()
  123. def safe_receive(self, message):
  124. """Receive the data and throw an FatalError exception in case the server
  125. process was killed and the pipe was closed by the checker thread"""
  126. try:
  127. ret = self.client_conn.recv()
  128. if isinstance(ret, FatalError):
  129. raise FatalError()
  130. return ret
  131. except (EOFError, IOError, FatalError):
  132. # The pipe was closed by the checker thread because
  133. # the server process was killed
  134. raise FatalError(message)
  135. def stop(self):
  136. """Stop the check thread, the libgis server and close the pipe
  137. This method should be called at exit using the package atexit
  138. """
  139. #sys.stderr.write("###### Stop was called\n")
  140. self.stop_checker_thread()
  141. if self.server is not None and self.server.is_alive():
  142. self.client_conn.send([0, ])
  143. self.server.join()
  144. if self.client_conn is not None:
  145. self.client_conn.close()
  146. if __name__ == "__main__":
  147. import doctest
  148. doctest.testmod()