import os import os.path as osp import re import subprocess import sys import time from concurrent.futures import ThreadPoolExecutor from functools import partial from threading import Lock from typing import Any, Dict, List, Tuple import mmengine import numpy as np from mmengine.config import ConfigDict from mmengine.device import is_npu_available from tqdm import tqdm from opencompass.registry import RUNNERS, TASKS from opencompass.utils import get_logger, model_abbr_from_cfg from .base import BaseRunner def get_command_template(gpu_ids: List[int]) -> str: """Format command template given available gpu ids.""" if is_npu_available(): tmpl = 'ASCEND_RT_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids) tmpl += ' {task_cmd}' elif sys.platform == 'win32': # Always return win32 for Windows # use command in Windows format tmpl = 'set CUDA_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids) tmpl += ' & {task_cmd}' else: tmpl = 'CUDA_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids) tmpl += ' {task_cmd}' return tmpl @RUNNERS.register_module() class LocalRunner(BaseRunner): """Local runner. Start tasks by local python. Args: task (ConfigDict): Task type config. max_num_workers (int): Max number of workers to run in parallel. Defaults to 16. max_workers_per_gpu (int): Max number of workers to run for one GPU. Defaults to 1. debug (bool): Whether to run in debug mode. lark_bot_url (str): Lark bot url. """ def __init__(self, task: ConfigDict, max_num_workers: int = 16, debug: bool = False, max_workers_per_gpu: int = 1, lark_bot_url: str = None, keep_tmp_file: bool = False, **kwargs): super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url) self.max_num_workers = max_num_workers self.max_workers_per_gpu = max_workers_per_gpu self.keep_tmp_file = keep_tmp_file logger = get_logger() for k, v in kwargs.items(): logger.warning(f'Ignored argument in {self.__module__}: {k}={v}') def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]: """Launch multiple tasks. Args: tasks (list[dict]): A list of task configs, usually generated by Partitioner. Returns: list[tuple[str, int]]: A list of (task name, exit code). """ status = [] import torch if is_npu_available(): visible_devices = 'ASCEND_RT_VISIBLE_DEVICES' device_nums = torch.npu.device_count() else: visible_devices = 'CUDA_VISIBLE_DEVICES' device_nums = torch.cuda.device_count() if visible_devices in os.environ: all_gpu_ids = [ int(i) for i in re.findall(r'(?= num_gpus # get cmd mmengine.mkdir_or_exist('tmp/') import uuid uuid_str = str(uuid.uuid4()) param_file = f'tmp/{uuid_str}_params.py' try: task.cfg.dump(param_file) # if use torchrun, restrict it behaves the same as non # debug mode, otherwise, the torchrun will use all the # available resources which might cause inconsistent # behavior. if len(all_gpu_ids) > num_gpus and num_gpus > 0: get_logger().warning(f'Only use {num_gpus} GPUs for ' f'total {len(all_gpu_ids)} ' 'available GPUs in debug mode.') tmpl = get_command_template(all_gpu_ids[:num_gpus]) cmd = task.get_command(cfg_path=param_file, template=tmpl) # run in subprocess if starts with torchrun etc. if 'python3 ' in cmd or 'python ' in cmd: # If it is an infer type task do not reload if # the current model has already been loaded. if 'infer' in self.task_cfg.type.lower(): # If a model instance already exists, # do not reload it. task.run(cur_model=getattr(self, 'cur_model', None), cur_model_abbr=getattr( self, 'cur_model_abbr', None)) self.cur_model = task.model self.cur_model_abbr = model_abbr_from_cfg( task.model_cfg) else: task.run() else: tmp_logs = f'tmp/{os.getpid()}_debug.log' get_logger().warning( f'Debug mode, log will be saved to {tmp_logs}') with open(tmp_logs, 'a') as log_file: subprocess.run(cmd, shell=True, text=True, stdout=log_file, stderr=subprocess.STDOUT) finally: if not self.keep_tmp_file: os.remove(param_file) else: pass status.append((task_name, 0)) else: if len(all_gpu_ids) > 0: gpus = np.zeros(max(all_gpu_ids) + 1, dtype=np.uint) gpus[all_gpu_ids] = self.max_workers_per_gpu else: gpus = np.array([], dtype=np.uint) pbar = tqdm(total=len(tasks)) lock = Lock() def submit(task, index): task = TASKS.build(dict(cfg=task, type=self.task_cfg['type'])) num_gpus = task.num_gpus assert len(gpus) >= num_gpus while True: lock.acquire() if sum(gpus > 0) >= num_gpus: gpu_ids = np.where(gpus)[0][:num_gpus] gpus[gpu_ids] -= 1 lock.release() break lock.release() time.sleep(1) if num_gpus > 0: tqdm.write(f'launch {task.name} on GPU ' + ','.join(map(str, gpu_ids))) else: tqdm.write(f'launch {task.name} on CPU ') res = self._launch(task, gpu_ids, index) pbar.update() with lock: gpus[gpu_ids] += 1 return res with ThreadPoolExecutor( max_workers=self.max_num_workers) as executor: status = executor.map(submit, tasks, range(len(tasks))) return status def _launch(self, task, gpu_ids, index): """Launch a single task. Args: task (BaseTask): Task to launch. Returns: tuple[str, int]: Task name and exit code. """ task_name = task.name pwd = os.getcwd() # Dump task config to file mmengine.mkdir_or_exist('tmp/') # Using uuid to avoid filename conflict import uuid uuid_str = str(uuid.uuid4()) param_file = f'{pwd}/tmp/{uuid_str}_params.py' try: task.cfg.dump(param_file) tmpl = get_command_template(gpu_ids) get_cmd = partial(task.get_command, cfg_path=param_file, template=tmpl) cmd = get_cmd() logger = get_logger() logger.debug(f'Running command: {cmd}') # Run command out_path = task.get_log_path(file_extension='out') mmengine.mkdir_or_exist(osp.split(out_path)[0]) stdout = open(out_path, 'w', encoding='utf-8') result = subprocess.run(cmd, shell=True, text=True, stdout=stdout, stderr=stdout) if result.returncode != 0: logger.error(f'task {task_name} fail, see\n{out_path}') finally: # Clean up if not self.keep_tmp_file: os.remove(param_file) else: pass return task_name, result.returncode