checks.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607
  1. """
  2. Checking objects in a GRASS GIS Spatial Database
  3. (C) 2020 by the GRASS Development Team
  4. This program is free software under the GNU General Public
  5. License (>=v2). Read the file COPYING that comes with GRASS
  6. for details.
  7. .. sectionauthor:: Vaclav Petras <wenzeslaus gmail com>
  8. """
  9. import os
  10. import sys
  11. import datetime
  12. from pathlib import Path
  13. from grass.script import gisenv
  14. import grass.script as gs
  15. import glob
  16. import grass.grassdb.config as cfg
  17. def mapset_exists(database, location, mapset):
  18. """Returns True whether mapset path exists."""
  19. location_path = os.path.join(database, location)
  20. mapset_path = os.path.join(location_path, mapset)
  21. if os.path.exists(mapset_path):
  22. return True
  23. return False
  24. def location_exists(database, location):
  25. """Returns True whether location path exists."""
  26. location_path = os.path.join(database, location)
  27. if os.path.exists(location_path):
  28. return True
  29. return False
  30. # TODO: distinguish between valid for getting maps and usable as current
  31. # https://lists.osgeo.org/pipermail/grass-dev/2016-September/082317.html
  32. # interface created according to the current usage
  33. def is_mapset_valid(mapset_path):
  34. """Return True if GRASS Mapset is valid"""
  35. # WIND is created from DEFAULT_WIND by `g.region -d` and functions
  36. # or modules which create a new mapset. Most modules will fail if
  37. # WIND doesn't exist (assuming that neither GRASS_REGION nor
  38. # WIND_OVERRIDE environmental variables are set).
  39. return os.access(os.path.join(mapset_path, "WIND"), os.R_OK)
  40. def is_location_valid(database, location):
  41. """Return True if GRASS Location is valid
  42. :param database: Path to GRASS GIS database directory
  43. :param location: name of a Location
  44. """
  45. # DEFAULT_WIND file should not be required until you do something
  46. # that actually uses them. The check is just a heuristic; a directory
  47. # containing a PERMANENT/DEFAULT_WIND file is probably a GRASS
  48. # location, while a directory lacking it probably isn't.
  49. return os.access(
  50. os.path.join(database, location, "PERMANENT", "DEFAULT_WIND"), os.F_OK
  51. )
  52. def is_mapset_current(database, location, mapset):
  53. genv = gisenv()
  54. if (
  55. database == genv["GISDBASE"]
  56. and location == genv["LOCATION_NAME"]
  57. and mapset == genv["MAPSET"]
  58. ):
  59. return True
  60. return False
  61. def is_location_current(database, location):
  62. genv = gisenv()
  63. if database == genv["GISDBASE"] and location == genv["LOCATION_NAME"]:
  64. return True
  65. return False
  66. def is_current_user_mapset_owner(mapset_path):
  67. """Returns True if mapset owner is the current user.
  68. On Windows it always returns True."""
  69. # Note that this does account for libgis built with SKIP_MAPSET_OWN_CHK
  70. # which disables the ownerships check, i.e., even if it was build with the
  71. # skip, it still needs the env variable.
  72. if os.environ.get("GRASS_SKIP_MAPSET_OWNER_CHECK", None):
  73. # Mapset just needs to be accessible for writing.
  74. return os.access(mapset_path, os.W_OK)
  75. # Mapset needs to be owned by user.
  76. if sys.platform == "win32":
  77. return True
  78. stat_info = os.stat(mapset_path)
  79. mapset_uid = stat_info.st_uid
  80. return mapset_uid == os.getuid()
  81. def is_different_mapset_owner(mapset_path):
  82. """Returns True if mapset owner is different from the current user"""
  83. return not is_current_user_mapset_owner(mapset_path)
  84. def get_mapset_owner(mapset_path):
  85. """Returns mapset owner name or None if owner name unknown.
  86. On Windows it always returns None."""
  87. if sys.platform == "win32":
  88. return None
  89. try:
  90. path = Path(mapset_path)
  91. return path.owner()
  92. except KeyError:
  93. return None
  94. def is_fallback_session():
  95. """Checks if a user encounters a fallback GRASS session.
  96. Returns True if a user encounters a fallback session.
  97. It occurs when a last mapset is not usable and at the same time
  98. a user is in a temporary location.
  99. """
  100. if "LAST_MAPSET_PATH" in gisenv().keys():
  101. return is_mapset_current(
  102. os.environ["TMPDIR"], cfg.temporary_location, cfg.permanent_mapset
  103. )
  104. return False
  105. def is_first_time_user():
  106. """Check if a user is a first-time user.
  107. Returns True if a user is a first-time user.
  108. It occurs when a gisrc file has initial settings either in last used mapset
  109. or in current mapset settings.
  110. """
  111. genv = gisenv()
  112. if "LAST_MAPSET_PATH" in genv.keys():
  113. return genv["LAST_MAPSET_PATH"] == os.path.join(
  114. os.getcwd(), cfg.unknown_location, cfg.unknown_mapset
  115. )
  116. return False
  117. def is_mapset_locked(mapset_path):
  118. """Check if the mapset is locked"""
  119. lock_name = ".gislock"
  120. lockfile = os.path.join(mapset_path, lock_name)
  121. return os.path.exists(lockfile)
  122. def get_lockfile_if_present(database, location, mapset):
  123. """Return path to lock if present, None otherwise
  124. Returns the path as a string or None if nothing was found, so the
  125. return value can be used to test if the lock is present.
  126. """
  127. lock_name = ".gislock"
  128. lockfile = os.path.join(database, location, mapset, lock_name)
  129. if os.path.isfile(lockfile):
  130. return lockfile
  131. return None
  132. def get_mapset_lock_info(mapset_path):
  133. """Get information about .gislock file.
  134. Assumes lock file exists, use is_mapset_locked to find out.
  135. Returns information as a dictionary with keys
  136. 'owner' (None if unknown), 'lockpath', and 'timestamp'.
  137. """
  138. info = {}
  139. lock_name = ".gislock"
  140. info["lockpath"] = os.path.join(mapset_path, lock_name)
  141. try:
  142. info["owner"] = Path(info["lockpath"]).owner()
  143. except KeyError:
  144. info["owner"] = None
  145. info["timestamp"] = (
  146. datetime.datetime.fromtimestamp(os.path.getmtime(info["lockpath"]))
  147. ).replace(microsecond=0)
  148. return info
  149. def can_start_in_mapset(mapset_path, ignore_lock=False):
  150. """Check if a mapset from a gisrc file is usable for new session"""
  151. if not is_mapset_valid(mapset_path):
  152. return False
  153. if not is_current_user_mapset_owner(mapset_path):
  154. return False
  155. if not ignore_lock and is_mapset_locked(mapset_path):
  156. return False
  157. return True
  158. def get_reason_id_mapset_not_usable(mapset_path):
  159. """It finds a reason why mapset is not usable.
  160. Returns a reason id as a string.
  161. If mapset path is None or no reason found, returns None.
  162. """
  163. # Check whether mapset exists
  164. if not os.path.exists(mapset_path):
  165. return "non-existent"
  166. # Check whether mapset is valid
  167. elif not is_mapset_valid(mapset_path):
  168. return "invalid"
  169. # Check whether mapset is owned by current user
  170. elif not is_current_user_mapset_owner(mapset_path):
  171. return "different-owner"
  172. # Check whether mapset is locked
  173. elif is_mapset_locked(mapset_path):
  174. return "locked"
  175. return None
  176. def dir_contains_location(path):
  177. """Return True if directory *path* contains a valid location"""
  178. if not os.path.isdir(path):
  179. return False
  180. for name in os.listdir(path):
  181. if os.path.isdir(os.path.join(path, name)):
  182. if is_location_valid(path, name):
  183. return True
  184. return False
  185. # basically checking location, possibly split into two functions
  186. # (mapset one can call location one)
  187. def get_mapset_invalid_reason(database, location, mapset, none_for_no_reason=False):
  188. """Returns a message describing what is wrong with the Mapset
  189. The goal is to provide the most suitable error message
  190. (rather than to do a quick check).
  191. :param database: Path to GRASS GIS database directory
  192. :param location: name of a Location
  193. :param mapset: name of a Mapset
  194. :returns: translated message
  195. """
  196. # Since we are trying to get the one most likely message, we need all
  197. # those return statements here.
  198. # pylint: disable=too-many-return-statements
  199. location_path = os.path.join(database, location)
  200. mapset_path = os.path.join(location_path, mapset)
  201. # first checking the location validity
  202. # perhaps a special set of checks with different messages mentioning mapset
  203. # will be needed instead of the same set of messages used for location
  204. location_msg = get_location_invalid_reason(
  205. database, location, none_for_no_reason=True
  206. )
  207. if location_msg:
  208. return location_msg
  209. # if location is valid, check mapset
  210. if mapset not in os.listdir(location_path):
  211. # TODO: remove the grass.py specific wording
  212. return _(
  213. "Mapset <{mapset}> doesn't exist in GRASS Location <{location}>"
  214. ).format(mapset=mapset, location=location)
  215. if not os.path.isdir(mapset_path):
  216. return _("<%s> is not a GRASS Mapset because it is not a directory") % mapset
  217. if not os.path.isfile(os.path.join(mapset_path, "WIND")):
  218. return (
  219. _(
  220. "<%s> is not a valid GRASS Mapset"
  221. " because it does not have a WIND file"
  222. )
  223. % mapset
  224. )
  225. # based on the is_mapset_valid() function
  226. if not os.access(os.path.join(mapset_path, "WIND"), os.R_OK):
  227. return (
  228. _(
  229. "<%s> is not a valid GRASS Mapset"
  230. " because its WIND file is not readable"
  231. )
  232. % mapset
  233. )
  234. # no reason for invalidity found (might be valid)
  235. if none_for_no_reason:
  236. return None
  237. return _(
  238. "Mapset <{mapset}> or Location <{location}> is invalid for an unknown reason"
  239. ).format(mapset=mapset, location=location)
  240. def get_location_invalid_reason(database, location, none_for_no_reason=False):
  241. """Returns a message describing what is wrong with the Location
  242. The goal is to provide the most suitable error message
  243. (rather than to do a quick check).
  244. By default, when no reason is found, a message about unknown reason is
  245. returned. This applies also to the case when this function is called on
  246. a valid location (e.g. as a part of larger investigation).
  247. ``none_for_no_reason=True`` allows the function to be used as part of other
  248. diagnostic. When this function fails to find reason for invalidity, other
  249. the caller can continue the investigation in their context.
  250. :param database: Path to GRASS GIS database directory
  251. :param location: name of a Location
  252. :param none_for_no_reason: When True, return None when reason is unknown
  253. :returns: translated message or None
  254. """
  255. location_path = os.path.join(database, location)
  256. permanent_path = os.path.join(location_path, "PERMANENT")
  257. # directory
  258. if not os.path.exists(location_path):
  259. return _("Location <%s> doesn't exist") % location_path
  260. # permament mapset
  261. if "PERMANENT" not in os.listdir(location_path):
  262. return (
  263. _(
  264. "<%s> is not a valid GRASS Location"
  265. " because PERMANENT Mapset is missing"
  266. )
  267. % location_path
  268. )
  269. if not os.path.isdir(permanent_path):
  270. return (
  271. _(
  272. "<%s> is not a valid GRASS Location"
  273. " because PERMANENT is not a directory"
  274. )
  275. % location_path
  276. )
  277. # partially based on the is_location_valid() function
  278. if not os.path.isfile(os.path.join(permanent_path, "DEFAULT_WIND")):
  279. return (
  280. _(
  281. "<%s> is not a valid GRASS Location"
  282. " because PERMANENT Mapset does not have a DEFAULT_WIND file"
  283. " (default computational region)"
  284. )
  285. % location_path
  286. )
  287. # no reason for invalidity found (might be valid)
  288. if none_for_no_reason:
  289. return None
  290. return _("Location <{location_path}> is invalid for an unknown reason").format(
  291. location_path=location_path
  292. )
  293. def get_location_invalid_suggestion(database, location):
  294. """Return suggestion what to do when specified location is not valid
  295. It gives suggestion when:
  296. * A mapset was specified instead of a location.
  297. * A GRASS database was specified instead of a location.
  298. """
  299. location_path = os.path.join(database, location)
  300. # a common error is to use mapset instead of location,
  301. # if that's the case, include that info into the message
  302. if is_mapset_valid(location_path):
  303. return _(
  304. "<{location}> looks like a mapset, not a location."
  305. " Did you mean just <{one_dir_up}>?"
  306. ).format(location=location, one_dir_up=database)
  307. # confusion about what is database and what is location
  308. if dir_contains_location(location_path):
  309. return _(
  310. "It looks like <{location}> contains locations."
  311. " Did you mean to specify one of them?"
  312. ).format(location=location)
  313. return None
  314. def get_mapset_name_invalid_reason(database, location, mapset_name):
  315. """Get reasons why mapset name is not valid.
  316. It gets reasons when:
  317. * Name is not valid.
  318. * Name is reserved for OGR layers.
  319. * Mapset in the same path already exists.
  320. Returns message as string if there was a reason, otherwise None.
  321. """
  322. message = None
  323. mapset_path = os.path.join(database, location, mapset_name)
  324. # Check if mapset name is valid
  325. if not gs.legal_name(mapset_name):
  326. message = _(
  327. "Name '{}' is not a valid name for location or mapset. "
  328. "Please use only ASCII characters excluding characters {} "
  329. "and space."
  330. ).format(mapset_name, "/\"'@,=*~")
  331. # Check reserved mapset name
  332. elif mapset_name.lower() == "ogr":
  333. message = _(
  334. "Name '{}' is reserved for direct "
  335. "read access to OGR layers. Please use "
  336. "another name for your mapset."
  337. ).format(mapset_name)
  338. # Check whether mapset exists
  339. elif mapset_exists(database, location, mapset_name):
  340. message = _(
  341. "Mapset <{mapset}> already exists. Please consider using "
  342. "another name for your mapset."
  343. ).format(mapset=mapset_path)
  344. return message
  345. def get_location_name_invalid_reason(grassdb, location_name):
  346. """Get reasons why location name is not valid.
  347. It gets reasons when:
  348. * Name is not valid.
  349. * Location in the same path already exists.
  350. Returns message as string if there was a reason, otherwise None.
  351. """
  352. message = None
  353. location_path = os.path.join(grassdb, location_name)
  354. # Check if mapset name is valid
  355. if not gs.legal_name(location_name):
  356. message = _(
  357. "Name '{}' is not a valid name for location or mapset. "
  358. "Please use only ASCII characters excluding characters {} "
  359. "and space."
  360. ).format(location_name, "/\"'@,=*~")
  361. # Check whether location exists
  362. elif location_exists(grassdb, location_name):
  363. message = _(
  364. "Location <{location}> already exists. Please consider using "
  365. "another name for your location."
  366. ).format(location=location_path)
  367. return message
  368. def is_mapset_name_valid(database, location, mapset_name):
  369. """Check if mapset name is valid.
  370. Returns True if mapset name is valid, otherwise False.
  371. """
  372. return (
  373. gs.legal_name(mapset_name)
  374. and mapset_name.lower() != "ogr"
  375. and not mapset_exists(database, location, mapset_name)
  376. )
  377. def is_location_name_valid(database, location_name):
  378. """Check if location name is valid.
  379. Returns True if location name is valid, otherwise False.
  380. """
  381. return gs.legal_name(location_name) and not location_exists(database, location_name)
  382. def get_reasons_mapsets_not_removable(mapsets, check_permanent):
  383. """Get reasons why mapsets cannot be removed.
  384. Parameter *mapsets* is a list of tuples (database, location, mapset).
  385. Parameter *check_permanent* is True of False. It depends on whether
  386. we want to check for permanent mapset or not.
  387. Returns messages as list if there were any failed checks, otherwise empty list.
  388. """
  389. messages = []
  390. for grassdb, location, mapset in mapsets:
  391. message = get_reason_mapset_not_removable(
  392. grassdb, location, mapset, check_permanent
  393. )
  394. if message:
  395. messages.append(message)
  396. return messages
  397. def get_reason_mapset_not_removable(grassdb, location, mapset, check_permanent):
  398. """Get reason why one mapset cannot be removed.
  399. Parameter *check_permanent* is True of False. It depends on whether
  400. we want to check for permanent mapset or not.
  401. Returns message as string if there was failed check, otherwise None.
  402. """
  403. message = None
  404. mapset_path = os.path.join(grassdb, location, mapset)
  405. # Check if mapset is permanent
  406. if check_permanent and mapset == "PERMANENT":
  407. message = _("Mapset <{mapset}> is required for a valid location.").format(
  408. mapset=mapset_path
  409. )
  410. # Check if mapset is current
  411. elif is_mapset_current(grassdb, location, mapset):
  412. message = _("Mapset <{mapset}> is the current mapset.").format(
  413. mapset=mapset_path
  414. )
  415. # Check whether mapset is in use
  416. elif is_mapset_locked(mapset_path):
  417. message = _("Mapset <{mapset}> is in use.").format(mapset=mapset_path)
  418. # Check whether mapset is owned by different user
  419. elif is_different_mapset_owner(mapset_path):
  420. message = _("Mapset <{mapset}> is owned by a different user.").format(
  421. mapset=mapset_path
  422. )
  423. return message
  424. def get_reasons_locations_not_removable(locations):
  425. """Get reasons why locations cannot be removed.
  426. Parameter *locations* is a list of tuples (database, location).
  427. Returns messages as list if there were any failed checks, otherwise empty list.
  428. """
  429. messages = []
  430. for grassdb, location in locations:
  431. messages += get_reasons_location_not_removable(grassdb, location)
  432. return messages
  433. def get_reasons_location_not_removable(grassdb, location):
  434. """Get reasons why one location cannot be removed.
  435. Returns messages as list if there were any failed checks, otherwise empty list.
  436. """
  437. messages = []
  438. location_path = os.path.join(grassdb, location)
  439. # Check if location is current
  440. if is_location_current(grassdb, location):
  441. messages.append(
  442. _("Location <{location}> is the current location.").format(
  443. location=location_path
  444. )
  445. )
  446. return messages
  447. # Find mapsets in particular location
  448. tmp_gisrc_file, env = gs.create_environment(grassdb, location, "PERMANENT")
  449. env["GRASS_SKIP_MAPSET_OWNER_CHECK"] = "1"
  450. g_mapsets = (
  451. gs.read_command("g.mapsets", flags="l", separator="comma", quiet=True, env=env)
  452. .strip()
  453. .split(",")
  454. )
  455. # Append to the list of tuples
  456. mapsets = []
  457. for g_mapset in g_mapsets:
  458. mapsets.append((grassdb, location, g_mapset))
  459. # Concentenate both checks
  460. messages += get_reasons_mapsets_not_removable(mapsets, check_permanent=False)
  461. gs.try_remove(tmp_gisrc_file)
  462. return messages
  463. def get_reasons_grassdb_not_removable(grassdb):
  464. """Get reasons why one grassdb cannot be removed.
  465. Returns messages as list if there were any failed checks, otherwise empty list.
  466. """
  467. messages = []
  468. genv = gisenv()
  469. # Check if grassdb is current
  470. if grassdb == genv["GISDBASE"]:
  471. messages.append(
  472. _("GRASS database <{grassdb}> is the current database.").format(
  473. grassdb=grassdb
  474. )
  475. )
  476. return messages
  477. g_locations = get_list_of_locations(grassdb)
  478. # Append to the list of tuples
  479. locations = []
  480. for g_location in g_locations:
  481. locations.append((grassdb, g_location))
  482. messages = get_reasons_locations_not_removable(locations)
  483. return messages
  484. def get_list_of_locations(dbase):
  485. """Get list of GRASS locations in given dbase
  486. :param dbase: GRASS database path
  487. :return: list of locations (sorted)
  488. """
  489. locations = list()
  490. for location in glob.glob(os.path.join(dbase, "*")):
  491. if os.path.join(location, "PERMANENT") in glob.glob(
  492. os.path.join(location, "*")
  493. ):
  494. locations.append(os.path.basename(location))
  495. locations.sort(key=lambda x: x.lower())
  496. return locations