OpenCompass/opencompass/utils/fileio.py

379 lines
12 KiB
Python
Raw Normal View History

import gzip
import hashlib
2023-07-04 21:34:55 +08:00
import io
import os
import os.path
import shutil
import tarfile
import tempfile
import urllib.error
import urllib.request
import zipfile
2023-07-04 21:34:55 +08:00
from contextlib import contextmanager
import mmengine.fileio as fileio
from mmengine.fileio import LocalBackend, get_file_backend
def patch_func(module, fn_name_to_wrap):
backup = getattr(patch_func, '_backup', [])
fn_to_wrap = getattr(module, fn_name_to_wrap)
def wrap(fn_new):
setattr(module, fn_name_to_wrap, fn_new)
backup.append((module, fn_name_to_wrap, fn_to_wrap))
setattr(fn_new, '_fallback', fn_to_wrap)
setattr(patch_func, '_backup', backup)
return fn_new
return wrap
@contextmanager
def patch_fileio(global_vars=None):
if getattr(patch_fileio, '_patched', False):
# Only patch once, avoid error caused by patch nestly.
yield
return
import builtins
@patch_func(builtins, 'open')
def open(file, mode='r', *args, **kwargs):
backend = get_file_backend(file)
if isinstance(backend, LocalBackend):
return open._fallback(file, mode, *args, **kwargs)
if 'b' in mode:
return io.BytesIO(backend.get(file, *args, **kwargs))
else:
return io.StringIO(backend.get_text(file, *args, **kwargs))
if global_vars is not None and 'open' in global_vars:
bak_open = global_vars['open']
global_vars['open'] = builtins.open
import os
@patch_func(os.path, 'join')
def join(a, *paths):
backend = get_file_backend(a)
if isinstance(backend, LocalBackend):
return join._fallback(a, *paths)
paths = [item for item in paths if len(item) > 0]
return backend.join_path(a, *paths)
@patch_func(os.path, 'isdir')
def isdir(path):
backend = get_file_backend(path)
if isinstance(backend, LocalBackend):
return isdir._fallback(path)
return backend.isdir(path)
@patch_func(os.path, 'isfile')
def isfile(path):
backend = get_file_backend(path)
if isinstance(backend, LocalBackend):
return isfile._fallback(path)
return backend.isfile(path)
@patch_func(os.path, 'exists')
def exists(path):
backend = get_file_backend(path)
if isinstance(backend, LocalBackend):
return exists._fallback(path)
return backend.exists(path)
@patch_func(os, 'listdir')
def listdir(path):
backend = get_file_backend(path)
if isinstance(backend, LocalBackend):
return listdir._fallback(path)
return backend.list_dir_or_file(path)
import filecmp
@patch_func(filecmp, 'cmp')
def cmp(f1, f2, *args, **kwargs):
with fileio.get_local_path(f1) as f1, fileio.get_local_path(f2) as f2:
return cmp._fallback(f1, f2, *args, **kwargs)
import shutil
@patch_func(shutil, 'copy')
def copy(src, dst, **kwargs):
backend = get_file_backend(src)
if isinstance(backend, LocalBackend):
return copy._fallback(src, dst, **kwargs)
return backend.copyfile_to_local(str(src), str(dst))
import torch
@patch_func(torch, 'load')
def load(f, *args, **kwargs):
if isinstance(f, str):
f = io.BytesIO(fileio.get(f))
return load._fallback(f, *args, **kwargs)
try:
setattr(patch_fileio, '_patched', True)
yield
finally:
for patched_fn in patch_func._backup:
(module, fn_name_to_wrap, fn_to_wrap) = patched_fn
setattr(module, fn_name_to_wrap, fn_to_wrap)
if global_vars is not None and 'open' in global_vars:
global_vars['open'] = bak_open
setattr(patch_fileio, '_patched', False)
def patch_hf_auto_model(cache_dir=None):
if hasattr('patch_hf_auto_model', '_patched'):
return
from transformers.modeling_utils import PreTrainedModel
from transformers.models.auto.auto_factory import _BaseAutoModelClass
ori_model_pt = PreTrainedModel.from_pretrained
@classmethod
def model_pt(cls, pretrained_model_name_or_path, *args, **kwargs):
kwargs['cache_dir'] = cache_dir
if not isinstance(get_file_backend(pretrained_model_name_or_path),
LocalBackend):
kwargs['local_files_only'] = True
if cache_dir is not None and not isinstance(
get_file_backend(cache_dir), LocalBackend):
kwargs['local_files_only'] = True
with patch_fileio():
res = ori_model_pt.__func__(cls, pretrained_model_name_or_path,
*args, **kwargs)
return res
PreTrainedModel.from_pretrained = model_pt
# transformers copied the `from_pretrained` to all subclasses,
# so we have to modify all classes
for auto_class in [
_BaseAutoModelClass, *_BaseAutoModelClass.__subclasses__()
]:
ori_auto_pt = auto_class.from_pretrained
@classmethod
def auto_pt(cls, pretrained_model_name_or_path, *args, **kwargs):
kwargs['cache_dir'] = cache_dir
if not isinstance(get_file_backend(pretrained_model_name_or_path),
LocalBackend):
kwargs['local_files_only'] = True
if cache_dir is not None and not isinstance(
get_file_backend(cache_dir), LocalBackend):
kwargs['local_files_only'] = True
with patch_fileio():
res = ori_auto_pt.__func__(cls, pretrained_model_name_or_path,
*args, **kwargs)
return res
auto_class.from_pretrained = auto_pt
patch_hf_auto_model._patched = True
def calculate_md5(fpath: str, chunk_size: int = 1024 * 1024):
md5 = hashlib.md5()
backend = get_file_backend(fpath, enable_singleton=True)
if isinstance(backend, LocalBackend):
# Enable chunk update for local file.
with open(fpath, 'rb') as f:
for chunk in iter(lambda: f.read(chunk_size), b''):
md5.update(chunk)
else:
md5.update(backend.get(fpath))
return md5.hexdigest()
def check_md5(fpath, md5, **kwargs):
return md5 == calculate_md5(fpath, **kwargs)
def check_integrity(fpath, md5=None):
if not os.path.isfile(fpath):
return False
if md5 is None:
return True
return check_md5(fpath, md5)
def download_url_to_file(url, dst, hash_prefix=None, progress=True):
"""Download object at the given URL to a local path.
Modified from
https://pytorch.org/docs/stable/hub.html#torch.hub.download_url_to_file
Args:
url (str): URL of the object to download
dst (str): Full path where object will be saved,
e.g. ``/tmp/temporary_file``
hash_prefix (string, optional): If not None, the SHA256 downloaded
file should start with ``hash_prefix``. Defaults to None.
progress (bool): whether or not to display a progress bar to stderr.
Defaults to True
"""
file_size = None
req = urllib.request.Request(url)
u = urllib.request.urlopen(req)
meta = u.info()
if hasattr(meta, 'getheaders'):
content_length = meta.getheaders('Content-Length')
else:
content_length = meta.get_all('Content-Length')
if content_length is not None and len(content_length) > 0:
file_size = int(content_length[0])
# We deliberately save it in a temp file and move it after download is
# complete. This prevents a local file being overridden by a broken
# download.
dst = os.path.expanduser(dst)
dst_dir = os.path.dirname(dst)
f = tempfile.NamedTemporaryFile(delete=False, dir=dst_dir)
import rich.progress
columns = [
rich.progress.DownloadColumn(),
rich.progress.BarColumn(bar_width=None),
rich.progress.TimeRemainingColumn(),
]
try:
if hash_prefix is not None:
sha256 = hashlib.sha256()
with rich.progress.Progress(*columns) as pbar:
task = pbar.add_task('download', total=file_size, visible=progress)
while True:
buffer = u.read(8192)
if len(buffer) == 0:
break
f.write(buffer)
if hash_prefix is not None:
sha256.update(buffer)
pbar.update(task, advance=len(buffer))
f.close()
if hash_prefix is not None:
digest = sha256.hexdigest()
if digest[:len(hash_prefix)] != hash_prefix:
raise RuntimeError(
'invalid hash value (expected "{}", got "{}")'.format(
hash_prefix, digest))
shutil.move(f.name, dst)
finally:
f.close()
if os.path.exists(f.name):
os.remove(f.name)
def download_url(url, root, filename=None, md5=None):
"""Download a file from a url and place it in root.
Args:
url (str): URL to download file from.
root (str): Directory to place downloaded file in.
filename (str | None): Name to save the file under.
If filename is None, use the basename of the URL.
md5 (str | None): MD5 checksum of the download.
If md5 is None, download without md5 check.
"""
root = os.path.expanduser(root)
if not filename:
filename = os.path.basename(url)
fpath = os.path.join(root, filename)
os.makedirs(root, exist_ok=True)
if check_integrity(fpath, md5):
print(f'Using downloaded and verified file: {fpath}')
else:
try:
print(f'Downloading {url} to {fpath}')
download_url_to_file(url, fpath)
except (urllib.error.URLError, IOError) as e:
if url[:5] == 'https':
url = url.replace('https:', 'http:')
print('Failed download. Trying https -> http instead.'
f' Downloading {url} to {fpath}')
download_url_to_file(url, fpath)
else:
raise e
# check integrity of downloaded file
if not check_integrity(fpath, md5):
raise RuntimeError('File not found or corrupted.')
def _is_tarxz(filename):
return filename.endswith('.tar.xz')
def _is_tar(filename):
return filename.endswith('.tar')
def _is_targz(filename):
return filename.endswith('.tar.gz')
def _is_tgz(filename):
return filename.endswith('.tgz')
def _is_gzip(filename):
return filename.endswith('.gz') and not filename.endswith('.tar.gz')
def _is_zip(filename):
return filename.endswith('.zip')
def extract_archive(from_path, to_path=None, remove_finished=False):
if to_path is None:
to_path = os.path.dirname(from_path)
if _is_tar(from_path):
with tarfile.open(from_path, 'r') as tar:
tar.extractall(path=to_path)
elif _is_targz(from_path) or _is_tgz(from_path):
with tarfile.open(from_path, 'r:gz') as tar:
tar.extractall(path=to_path)
elif _is_tarxz(from_path):
with tarfile.open(from_path, 'r:xz') as tar:
tar.extractall(path=to_path)
elif _is_gzip(from_path):
to_path = os.path.join(
to_path,
os.path.splitext(os.path.basename(from_path))[0])
with open(to_path, 'wb') as out_f, gzip.GzipFile(from_path) as zip_f:
out_f.write(zip_f.read())
elif _is_zip(from_path):
with zipfile.ZipFile(from_path, 'r') as z:
z.extractall(to_path)
else:
raise ValueError(f'Extraction of {from_path} not supported')
if remove_finished:
os.remove(from_path)
def download_and_extract_archive(url,
download_root,
extract_root=None,
filename=None,
md5=None,
remove_finished=False):
download_root = os.path.expanduser(download_root)
if extract_root is None:
extract_root = download_root
if not filename:
filename = os.path.basename(url)
download_url(url, download_root, filename, md5)
archive = os.path.join(download_root, filename)
print(f'Extracting {archive} to {extract_root}')
extract_archive(archive, extract_root, remove_finished)