[Update] Fix issue of *_param.py, avoid name conflict;add keep_tmp_file flag to support keep the temp config file. (#1640)

This commit is contained in:
Songyang Zhang 2024-10-25 16:39:25 +08:00 committed by GitHub
parent 2542bc6907
commit 84be90669b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 52 additions and 19 deletions

View File

@ -42,14 +42,15 @@ class DLCRunner(BaseRunner):
eval_with_gpu: list = ['plugin_eval'], eval_with_gpu: list = ['plugin_eval'],
retry: int = 2, retry: int = 2,
debug: bool = False, debug: bool = False,
lark_bot_url: str = None): lark_bot_url: str = None,
keep_tmp_file: bool = False):
super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url) super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url)
self.aliyun_cfg = aliyun_cfg self.aliyun_cfg = aliyun_cfg
self.max_num_workers = max_num_workers self.max_num_workers = max_num_workers
self.retry = retry self.retry = retry
self.eval_with_gpu = eval_with_gpu self.eval_with_gpu = eval_with_gpu
self.keep_tmp_file = keep_tmp_file
logger = get_logger() logger = get_logger()
logger.warning( logger.warning(
'To ensure the integrity of the log results, the log displayed ' 'To ensure the integrity of the log results, the log displayed '
@ -106,7 +107,10 @@ class DLCRunner(BaseRunner):
# Dump task config to file # Dump task config to file
mmengine.mkdir_or_exist('tmp/') mmengine.mkdir_or_exist('tmp/')
param_file = f'tmp/{os.getpid()}_params.py' # Using uuid to avoid filename conflict
import uuid
uuid_str = str(uuid.uuid4())
param_file = f'tmp/{uuid_str}_params.py'
pwd = os.getcwd() pwd = os.getcwd()
try: try:
cfg.dump(param_file) cfg.dump(param_file)
@ -305,7 +309,10 @@ class DLCRunner(BaseRunner):
return_code = _run_within_retry() return_code = _run_within_retry()
finally: finally:
# Clean up # Clean up
os.remove(param_file) if not self.keep_tmp_file:
os.remove(param_file)
else:
pass
return task_name, return_code return task_name, return_code

View File

@ -56,10 +56,12 @@ class LocalRunner(BaseRunner):
debug: bool = False, debug: bool = False,
max_workers_per_gpu: int = 1, max_workers_per_gpu: int = 1,
lark_bot_url: str = None, lark_bot_url: str = None,
keep_tmp_file: bool = False,
**kwargs): **kwargs):
super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url) super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url)
self.max_num_workers = max_num_workers self.max_num_workers = max_num_workers
self.max_workers_per_gpu = max_workers_per_gpu self.max_workers_per_gpu = max_workers_per_gpu
self.keep_tmp_file = keep_tmp_file
logger = get_logger() logger = get_logger()
for k, v in kwargs.items(): for k, v in kwargs.items():
logger.warning(f'Ignored argument in {self.__module__}: {k}={v}') logger.warning(f'Ignored argument in {self.__module__}: {k}={v}')
@ -100,7 +102,10 @@ class LocalRunner(BaseRunner):
assert len(all_gpu_ids) >= num_gpus assert len(all_gpu_ids) >= num_gpus
# get cmd # get cmd
mmengine.mkdir_or_exist('tmp/') mmengine.mkdir_or_exist('tmp/')
param_file = f'tmp/{os.getpid()}_params.py' import uuid
uuid_str = str(uuid.uuid4())
param_file = f'tmp/{uuid_str}_params.py'
try: try:
task.cfg.dump(param_file) task.cfg.dump(param_file)
# if use torchrun, restrict it behaves the same as non # if use torchrun, restrict it behaves the same as non
@ -140,7 +145,10 @@ class LocalRunner(BaseRunner):
stdout=log_file, stdout=log_file,
stderr=subprocess.STDOUT) stderr=subprocess.STDOUT)
finally: finally:
os.remove(param_file) if not self.keep_tmp_file:
os.remove(param_file)
else:
pass
status.append((task_name, 0)) status.append((task_name, 0))
else: else:
if len(all_gpu_ids) > 0: if len(all_gpu_ids) > 0:

View File

@ -24,11 +24,11 @@ class SlurmSequentialRunner(BaseRunner):
using `srun` command. using `srun` command.
This runner launches tasks one by one for execution. A new task will only This runner launches tasks one by one for execution. A new task will only
be launched when and only when max_num_workers is not met, and the previous be launched when and only when max_num_workers is not met, and the
task has been successfully allocated to a machine. Therefore, unlike the previous task has been successfully allocated to a machine. Therefore,
`SlurmRunner`, at most only one task will be in the PENDING status at the unlike the `SlurmRunner`, at most only one task will be in the PENDING
same time during a run, making the random_sleep strategy no longer status at the same time during a run, making the random_sleep strategy
necessary. In addition, this runner also includes a feature to no longer necessary. In addition, this runner also includes a feature to
automatically kill all jobs by the job_id on exit. automatically kill all jobs by the job_id on exit.
The runner will obtain the job_id by reading the srun output similar to The runner will obtain the job_id by reading the srun output similar to
@ -59,7 +59,8 @@ class SlurmSequentialRunner(BaseRunner):
qos: str = None, qos: str = None,
debug: bool = False, debug: bool = False,
lark_bot_url: str = None, lark_bot_url: str = None,
extra_command: Optional[List[str]] = None): extra_command: Optional[List[str]] = None,
keep_tmp_file: bool = False):
super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url) super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url)
self.max_num_workers = max_num_workers self.max_num_workers = max_num_workers
self.retry = retry self.retry = retry
@ -67,6 +68,7 @@ class SlurmSequentialRunner(BaseRunner):
self.quotatype = quotatype self.quotatype = quotatype
self.qos = qos self.qos = qos
self.task_prefix = task_prefix self.task_prefix = task_prefix
self.keep_tmp_file = keep_tmp_file
if not extra_command: if not extra_command:
extra_command = [] extra_command = []
assert isinstance(extra_command, list) assert isinstance(extra_command, list)
@ -171,7 +173,10 @@ class SlurmSequentialRunner(BaseRunner):
# Dump task config to file # Dump task config to file
mmengine.mkdir_or_exist('tmp/') mmengine.mkdir_or_exist('tmp/')
param_file = f'tmp/{os.getpid()}_params.py' # Using uuid to avoid filename conflict
import uuid
uuid_str = str(uuid.uuid4())
param_file = f'tmp/{uuid_str}_params.py'
process = None process = None
try: try:
cfg.dump(param_file) cfg.dump(param_file)
@ -256,7 +261,11 @@ class SlurmSequentialRunner(BaseRunner):
child_conn.close() child_conn.close()
if process is not None: if process is not None:
process.kill() process.kill()
os.remove(param_file) if not self.keep_tmp_file:
os.remove(param_file)
else:
pass
return task_name, process.returncode return task_name, process.returncode
def _job_failed(self, return_code: int, output_paths: List[str]) -> bool: def _job_failed(self, return_code: int, output_paths: List[str]) -> bool:

View File

@ -47,7 +47,8 @@ class VOLCRunner(BaseRunner):
max_num_workers: int = 32, max_num_workers: int = 32,
retry: int = 2, retry: int = 2,
debug: bool = False, debug: bool = False,
lark_bot_url: str = None): lark_bot_url: str = None,
keep_tmp_file: bool = False):
super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url) super().__init__(task=task, debug=debug, lark_bot_url=lark_bot_url)
self.volcano_cfg = volcano_cfg self.volcano_cfg = volcano_cfg
self.max_num_workers = max_num_workers self.max_num_workers = max_num_workers
@ -55,6 +56,7 @@ class VOLCRunner(BaseRunner):
self.queue_name = queue_name self.queue_name = queue_name
self.preemptible = preemptible self.preemptible = preemptible
self.priority = priority self.priority = priority
self.keep_tmp_file = keep_tmp_file
def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]: def launch(self, tasks: List[Dict[str, Any]]) -> List[Tuple[str, int]]:
"""Launch multiple tasks. """Launch multiple tasks.
@ -100,9 +102,12 @@ class VOLCRunner(BaseRunner):
pwd = os.getcwd() pwd = os.getcwd()
# Dump task config to file # Dump task config to file
mmengine.mkdir_or_exist('tmp/') mmengine.mkdir_or_exist('tmp/')
param_file = f'{pwd}/tmp/{os.getpid()}_params.py' # Using uuid to avoid filename conflict
import uuid
uuid_str = str(uuid.uuid4())
param_file = f'{pwd}/tmp/{uuid_str}_params.py'
volc_cfg_file = f'{pwd}/tmp/{os.getpid()}_cfg.yaml' volc_cfg_file = f'{pwd}/tmp/{uuid_str}_cfg.yaml'
volc_cfg = self._choose_flavor(num_gpus) volc_cfg = self._choose_flavor(num_gpus)
with open(volc_cfg_file, 'w') as fp: with open(volc_cfg_file, 'w') as fp:
yaml.dump(volc_cfg, fp, sort_keys=False) yaml.dump(volc_cfg, fp, sort_keys=False)
@ -191,8 +196,12 @@ class VOLCRunner(BaseRunner):
finally: finally:
# Clean up # Clean up
os.remove(param_file) if not self.keep_tmp_file:
os.remove(volc_cfg_file) os.remove(param_file)
os.remove(volc_cfg_file)
else:
pass
return task_name, returncode return task_name, returncode
def _run_task(self, cmd, log_path, poll_interval): def _run_task(self, cmd, log_path, poll_interval):