OpenCompass/opencompass/runners/local.py
Junnan Liu 499302857f
[Fix] Fix Local Runner Params Save Path (#1768)
* update local runner params save dir

* fix remove

* fix directory remove

* Fix *_params.py by uuid4
2024-12-19 16:07:34 +08:00

249 lines
9.1 KiB
Python

import os
import os.path as osp
import re
import subprocess
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from threading import Lock
from typing import Any, Dict, List, Tuple
import mmengine
import numpy as np
from mmengine.config import ConfigDict
from mmengine.device import is_npu_available
from tqdm import tqdm
from opencompass.registry import RUNNERS, TASKS
from opencompass.utils import get_logger, model_abbr_from_cfg
from .base import BaseRunner
def get_command_template(gpu_ids: List[int]) -> str:
"""Format command template given available gpu ids."""
if is_npu_available():
tmpl = 'ASCEND_RT_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids)
tmpl += ' {task_cmd}'
elif sys.platform == 'win32': # Always return win32 for Windows
# use command in Windows format
tmpl = 'set CUDA_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids)
tmpl += ' & {task_cmd}'
else:
tmpl = 'CUDA_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids)
tmpl += ' {task_cmd}'
return tmpl
@RUNNERS.register_module()
class LocalRunner(BaseRunner):
"""Local runner. Start tasks by local python.
Args:
task (ConfigDict): Task type config.
max_num_workers (int): Max number of workers to run in parallel.
Defaults to 16.
max_workers_per_gpu (int): Max number of workers to run for one GPU.
Defaults to 1.
debug (bool): Whether to run in debug mode.
lark_bot_url (str): Lark bot url.
"""
def __init__(self,
task: ConfigDict,
max_num_workers: int = 16,
debug: bool = False,
max_workers_per_gpu: int = 1,
lark_bot_url: str = None,
keep_tmp_file: bool = False,
**kwargs):
super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url)
self.max_num_workers = max_num_workers
self.max_workers_per_gpu = max_workers_per_gpu
self.keep_tmp_file = keep_tmp_file
logger = get_logger()
for k, v in kwargs.items():
logger.warning(f'Ignored argument in {self.__module__}: {k}={v}')
def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]:
"""Launch multiple tasks.
Args:
tasks (list[dict]): A list of task configs, usually generated by
Partitioner.
Returns:
list[tuple[str, int]]: A list of (task name, exit code).
"""
status = []
import torch
if is_npu_available():
visible_devices = 'ASCEND_RT_VISIBLE_DEVICES'
device_nums = torch.npu.device_count()
else:
visible_devices = 'CUDA_VISIBLE_DEVICES'
device_nums = torch.cuda.device_count()
if visible_devices in os.environ:
all_gpu_ids = [
int(i)
for i in re.findall(r'(?<!-)\d+', os.getenv(visible_devices))
]
else:
all_gpu_ids = list(range(device_nums))
if self.debug:
for task in tasks:
task = TASKS.build(dict(cfg=task, type=self.task_cfg['type']))
task_name = task.name
num_gpus = task.num_gpus
assert len(all_gpu_ids) >= num_gpus
# get cmd
mmengine.mkdir_or_exist('tmp/')
import uuid
uuid_str = str(uuid.uuid4())
param_file = f'tmp/{uuid_str}_params.py'
try:
task.cfg.dump(param_file)
# if use torchrun, restrict it behaves the same as non
# debug mode, otherwise, the torchrun will use all the
# available resources which might cause inconsistent
# behavior.
if len(all_gpu_ids) > num_gpus and num_gpus > 0:
get_logger().warning(f'Only use {num_gpus} GPUs for '
f'total {len(all_gpu_ids)} '
'available GPUs in debug mode.')
tmpl = get_command_template(all_gpu_ids[:num_gpus])
cmd = task.get_command(cfg_path=param_file, template=tmpl)
# run in subprocess if starts with torchrun etc.
if 'python3 ' in cmd or 'python ' in cmd:
# If it is an infer type task do not reload if
# the current model has already been loaded.
if 'infer' in self.task_cfg.type.lower():
# If a model instance already exists,
# do not reload it.
task.run(cur_model=getattr(self, 'cur_model',
None),
cur_model_abbr=getattr(
self, 'cur_model_abbr', None))
self.cur_model = task.model
self.cur_model_abbr = model_abbr_from_cfg(
task.model_cfg)
else:
task.run()
else:
tmp_logs = f'tmp/{os.getpid()}_debug.log'
get_logger().warning(
f'Debug mode, log will be saved to {tmp_logs}')
with open(tmp_logs, 'a') as log_file:
subprocess.run(cmd,
shell=True,
text=True,
stdout=log_file,
stderr=subprocess.STDOUT)
finally:
if not self.keep_tmp_file:
os.remove(param_file)
else:
pass
status.append((task_name, 0))
else:
if len(all_gpu_ids) > 0:
gpus = np.zeros(max(all_gpu_ids) + 1, dtype=np.uint)
gpus[all_gpu_ids] = self.max_workers_per_gpu
else:
gpus = np.array([], dtype=np.uint)
pbar = tqdm(total=len(tasks))
lock = Lock()
def submit(task, index):
task = TASKS.build(dict(cfg=task, type=self.task_cfg['type']))
num_gpus = task.num_gpus
assert len(gpus) >= num_gpus
while True:
lock.acquire()
if sum(gpus > 0) >= num_gpus:
gpu_ids = np.where(gpus)[0][:num_gpus]
gpus[gpu_ids] -= 1
lock.release()
break
lock.release()
time.sleep(1)
if num_gpus > 0:
tqdm.write(f'launch {task.name} on GPU ' +
','.join(map(str, gpu_ids)))
else:
tqdm.write(f'launch {task.name} on CPU ')
res = self._launch(task, gpu_ids, index)
pbar.update()
with lock:
gpus[gpu_ids] += 1
return res
with ThreadPoolExecutor(
max_workers=self.max_num_workers) as executor:
status = executor.map(submit, tasks, range(len(tasks)))
return status
def _launch(self, task, gpu_ids, index):
"""Launch a single task.
Args:
task (BaseTask): Task to launch.
Returns:
tuple[str, int]: Task name and exit code.
"""
task_name = task.name
pwd = os.getcwd()
# Dump task config to file
mmengine.mkdir_or_exist('tmp/')
# Using uuid to avoid filename conflict
import uuid
uuid_str = str(uuid.uuid4())
param_file = f'{pwd}/tmp/{uuid_str}_params.py'
try:
task.cfg.dump(param_file)
tmpl = get_command_template(gpu_ids)
get_cmd = partial(task.get_command,
cfg_path=param_file,
template=tmpl)
cmd = get_cmd()
logger = get_logger()
logger.debug(f'Running command: {cmd}')
# Run command
out_path = task.get_log_path(file_extension='out')
mmengine.mkdir_or_exist(osp.split(out_path)[0])
stdout = open(out_path, 'w', encoding='utf-8')
result = subprocess.run(cmd,
shell=True,
text=True,
stdout=stdout,
stderr=stdout)
if result.returncode != 0:
logger.error(f'task {task_name} fail, see\n{out_path}')
finally:
# Clean up
if not self.keep_tmp_file:
os.remove(param_file)
else:
pass
return task_name, result.returncode