download.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. # MODULE: grass.utils
  2. #
  3. # AUTHOR(S): Vaclav Petras <wenzeslaus gmail com>
  4. #
  5. # PURPOSE: Collection of various helper general (non-GRASS) utilities
  6. #
  7. # COPYRIGHT: (C) 2021 Vaclav Petras, and by the GRASS Development Team
  8. #
  9. # This program is free software under the GNU General Public
  10. # License (>=v2). Read the file COPYING that comes with GRASS
  11. # for details.
  12. """Download and extract various archives"""
  13. import os
  14. import shutil
  15. import tarfile
  16. import tempfile
  17. import zipfile
  18. from pathlib import Path
  19. from urllib.error import HTTPError, URLError
  20. from urllib.parse import urlparse
  21. from urllib.request import urlretrieve
  22. def debug(*args, **kwargs):
  23. """Print a debug message (to be used in this module only)
  24. Using the stanard grass.script debug function is nice, but it may create a circular
  25. dependency if this is used from grass.script, so this is a wrapper which lazy
  26. imports the standard function.
  27. """
  28. # Lazy import to avoding potential circular dependency.
  29. import grass.script as gs # pylint: disable=import-outside-toplevel
  30. gs.debug(*args, **kwargs)
  31. class DownloadError(Exception):
  32. """Error happened during download or when processing the file"""
  33. # modified copy from g.extension
  34. # TODO: Possibly migrate to shutil.unpack_archive.
  35. def extract_tar(name, directory, tmpdir):
  36. """Extract a TAR or a similar file into a directory"""
  37. debug(
  38. f"extract_tar(name={name}, directory={directory}, tmpdir={tmpdir})",
  39. 3,
  40. )
  41. try:
  42. tar = tarfile.open(name)
  43. extract_dir = os.path.join(tmpdir, "extract_dir")
  44. os.mkdir(extract_dir)
  45. tar.extractall(path=extract_dir)
  46. files = os.listdir(extract_dir)
  47. _move_extracted_files(
  48. extract_dir=extract_dir, target_dir=directory, files=files
  49. )
  50. except tarfile.TarError as error:
  51. raise DownloadError(
  52. _("Archive file is unreadable: {0}").format(error)
  53. ) from error
  54. except EOFError as error:
  55. raise DownloadError(
  56. _("Archive file is incomplete: {0}").format(error)
  57. ) from error
  58. extract_tar.supported_formats = ["tar.gz", "gz", "bz2", "tar", "gzip", "targz", "xz"]
  59. # modified copy from g.extension
  60. # TODO: Possibly migrate to shutil.unpack_archive.
  61. def extract_zip(name, directory, tmpdir):
  62. """Extract a ZIP file into a directory"""
  63. debug(
  64. f"extract_zip(name={name}, directory={directory}, tmpdir={tmpdir})",
  65. 3,
  66. )
  67. try:
  68. zip_file = zipfile.ZipFile(name, mode="r")
  69. file_list = zip_file.namelist()
  70. # we suppose we can write to parent of the given dir
  71. # (supposing a tmp dir)
  72. extract_dir = os.path.join(tmpdir, "extract_dir")
  73. os.mkdir(extract_dir)
  74. for subfile in file_list:
  75. # this should be safe in Python 2.7.4
  76. zip_file.extract(subfile, extract_dir)
  77. files = os.listdir(extract_dir)
  78. _move_extracted_files(
  79. extract_dir=extract_dir, target_dir=directory, files=files
  80. )
  81. except zipfile.BadZipfile as error:
  82. raise DownloadError(_("ZIP file is unreadable: {0}").format(error))
  83. # modified copy from g.extension
  84. def _move_extracted_files(extract_dir, target_dir, files):
  85. """Fix state of extracted file by moving them to different directory
  86. When extracting, it is not clear what will be the root directory
  87. or if there will be one at all. So this function moves the files to
  88. a different directory in the way that if there was one directory extracted,
  89. the contained files are moved.
  90. """
  91. debug("_move_extracted_files({})".format(locals()))
  92. if len(files) == 1:
  93. actual_path = os.path.join(extract_dir, files[0])
  94. if os.path.isdir(actual_path):
  95. shutil.copytree(actual_path, target_dir)
  96. else:
  97. shutil.copy(actual_path, target_dir)
  98. else:
  99. if not os.path.exists(target_dir):
  100. os.mkdir(target_dir)
  101. for file_name in files:
  102. actual_file = os.path.join(extract_dir, file_name)
  103. if os.path.isdir(actual_file):
  104. # Choice of copy tree function:
  105. # shutil.copytree() fails when subdirectory exists.
  106. # However, distutils.copy_tree() may fail to create directories before
  107. # copying files into them when copying to a recently deleted directory.
  108. shutil.copytree(actual_file, os.path.join(target_dir, file_name))
  109. else:
  110. shutil.copy(actual_file, os.path.join(target_dir, file_name))
  111. # modified copy from g.extension
  112. # TODO: remove the hardcoded location/extension, use general name
  113. def download_and_extract(source, reporthook=None):
  114. """Download a file (archive) from URL and extract it
  115. Call urllib.request.urlcleanup() to clean up after urlretrieve if you terminate
  116. this function from another thread.
  117. """
  118. source_path = Path(urlparse(source).path)
  119. tmpdir = tempfile.mkdtemp()
  120. debug("Tmpdir: {}".format(tmpdir))
  121. directory = Path(tmpdir) / "extracted"
  122. http_error_message = _("Download file from <{url}>, return status code {code}, ")
  123. url_error_message = _(
  124. "Download file from <{url}>, failed. Check internet connection."
  125. )
  126. if source_path.suffix and source_path.suffix == ".zip":
  127. archive_name = os.path.join(tmpdir, "archive.zip")
  128. try:
  129. filename, headers = urlretrieve(source, archive_name, reporthook)
  130. except HTTPError as err:
  131. raise DownloadError(
  132. http_error_message.format(
  133. url=source,
  134. code=err,
  135. ),
  136. )
  137. except URLError:
  138. raise DownloadError(url_error_message.format(url=source))
  139. if headers.get("content-type", "") != "application/zip":
  140. raise DownloadError(
  141. _(
  142. "Download of <{url}> failed or file <{name}> is not a ZIP file"
  143. ).format(url=source, name=filename)
  144. )
  145. extract_zip(name=archive_name, directory=directory, tmpdir=tmpdir)
  146. elif source_path.suffix and source_path.suffix[1:] in extract_tar.supported_formats:
  147. ext = "".join(source_path.suffixes)
  148. archive_name = os.path.join(tmpdir, "archive" + ext)
  149. try:
  150. urlretrieve(source, archive_name, reporthook)
  151. except HTTPError as err:
  152. raise DownloadError(
  153. http_error_message.format(
  154. url=source,
  155. code=err,
  156. ),
  157. )
  158. except URLError:
  159. raise DownloadError(url_error_message.format(url=source))
  160. extract_tar(name=archive_name, directory=directory, tmpdir=tmpdir)
  161. else:
  162. # probably programmer error
  163. raise DownloadError(_("Unknown format '{}'.").format(source))
  164. return directory
  165. def name_from_url(url):
  166. """Extract name from URL"""
  167. name = os.path.basename(urlparse(url).path)
  168. name = os.path.splitext(name)[0]
  169. if name.endswith(".tar"):
  170. # Special treatment of .tar.gz extension.
  171. return os.path.splitext(name)[0]
  172. return name