reader.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. import os
  2. import sys
  3. import json
  4. import glob
  5. import re
  6. from collections import OrderedDict
  7. import grass.script as gs
  8. class BandReferenceReaderError(Exception):
  9. pass
  10. class BandReferenceReader:
  11. """Band references reader"""
  12. def __init__(self):
  13. self._json_files = glob.glob(
  14. os.path.join(os.environ['GISBASE'], 'etc', 'g.bands', '*.json')
  15. )
  16. if not self._json_files:
  17. raise BandReferenceReaderError("No band definitions found")
  18. self._read_config()
  19. def _read_config(self):
  20. """Read configuration"""
  21. self.config = dict()
  22. for json_file in self._json_files:
  23. try:
  24. with open(json_file) as fd:
  25. config = json.load(
  26. fd,
  27. object_pairs_hook=OrderedDict
  28. )
  29. except json.decoder.JSONDecodeError as e:
  30. raise BandReferenceReaderError(
  31. "Unable to parse '{}': {}".format(
  32. json_file, e
  33. ))
  34. # check if configuration is valid
  35. self._check_config(config)
  36. self.config[os.path.basename(json_file)] = config
  37. @staticmethod
  38. def _check_config(config):
  39. """Check if config is valid
  40. :todo: check shortcut uniqueness
  41. :param dict config: configuration to be validated
  42. """
  43. for items in config.values():
  44. for item in ('shortcut', 'bands'):
  45. if item not in items.keys():
  46. raise BandReferenceReaderError(
  47. "Invalid band definition: <{}> is missing".format(item
  48. ))
  49. if len(items['bands']) < 1:
  50. raise BandReferenceReaderError(
  51. "Invalid band definition: no bands defined"
  52. )
  53. @staticmethod
  54. def _print_band_extended(band, item):
  55. """Print band-specific metadata
  56. :param str band: band identifier
  57. :param str item: items to be printed out
  58. """
  59. def print_kv(k, v, indent):
  60. if isinstance(v, OrderedDict):
  61. print ('{}{}:'.format(' ' * indent * 2, k))
  62. for ki, vi in v.items():
  63. print_kv(ki, vi, indent * 2)
  64. else:
  65. print ('{}{}: {}'.format(' ' * indent * 2, k, v))
  66. indent = 4
  67. print ('{}band: {}'.format(
  68. ' ' * indent, band
  69. ))
  70. for k, v in item[band].items():
  71. print_kv(k, v, indent)
  72. def _print_band(self, shortcut, band, tag=None):
  73. sys.stdout.write(self._band_identifier(shortcut, band))
  74. if tag:
  75. sys.stdout.write(' {}'.format(tag))
  76. sys.stdout.write(os.linesep)
  77. def print_info(self, shortcut=None, band=None, extended=False):
  78. """Prints band reference information to stdout.
  79. Can be filtered by shortcut or band identifier.
  80. :param str shortcut: shortcut to filter (eg. S2) or None
  81. :param str band: band (eg. 1) or None
  82. :param bool extended: print also extended metadata
  83. """
  84. found = False
  85. for root in self.config.values():
  86. for item in root.values():
  87. try:
  88. if shortcut and re.match(shortcut, item['shortcut']) is None:
  89. continue
  90. except re.error as e:
  91. raise BandReferenceReaderError(
  92. "Invalid pattern: {}".format(e)
  93. )
  94. found = True
  95. if band and band not in item['bands']:
  96. raise BandReferenceReaderError(
  97. "Band <{}> not found in <{}>".format(
  98. band, shortcut
  99. ))
  100. # print generic information
  101. if extended:
  102. for subitem in item.keys():
  103. if subitem == 'bands':
  104. # bands item is processed bellow
  105. continue
  106. print ('{}: {}'.format(
  107. subitem, item[subitem]
  108. ))
  109. # print detailed band information
  110. if band:
  111. self._print_band_extended(band, item['bands'])
  112. else:
  113. for iband in item['bands']:
  114. self._print_band_extended(iband, item['bands'])
  115. else:
  116. # basic information only
  117. if band:
  118. self._print_band(
  119. item['shortcut'], band,
  120. item['bands'][band].get('tag')
  121. )
  122. else:
  123. for iband in item['bands']:
  124. self._print_band(
  125. item['shortcut'], iband,
  126. item['bands'][iband].get('tag')
  127. )
  128. # raise error when defined shortcut not found
  129. if shortcut and not found:
  130. raise BandReferenceReaderError(
  131. "Band reference <{}> not found".format(shortcut)
  132. )
  133. def find_file(self, band_reference):
  134. """Find file by band reference.
  135. Match is case-insensitive.
  136. :param str band_reference: band reference identifier to search for (eg. S2_1)
  137. :return str: file basename if found or None
  138. """
  139. try:
  140. shortcut, band = band_reference.split('_')
  141. except ValueError:
  142. raise BandReferenceReaderError("Invalid band identifier <{}>".format(
  143. band_reference
  144. ))
  145. for filename, config in self.config.items():
  146. for root in config.keys():
  147. if config[root]['shortcut'].upper() == shortcut.upper() and \
  148. band.upper() in map(lambda x: x.upper(), config[root]['bands'].keys()):
  149. return filename
  150. return None
  151. def get_bands(self):
  152. """Get list of band identifiers.
  153. :return list: list of valid band identifiers
  154. """
  155. bands = []
  156. for root in self.config.values():
  157. for item in root.values():
  158. for band in item['bands']:
  159. bands.append(
  160. self._band_identifier(item['shortcut'], band)
  161. )
  162. return bands
  163. @staticmethod
  164. def _band_identifier(shortcut, band):
  165. return '{}_{}'.format(shortcut, band)