Auto re-generate port number during retry (#24)

* Auto re-generate port number during retry

* Fix slurm command
This commit is contained in:
Ma Zerun 2023-07-07 17:25:56 +08:00 committed by GitHub
parent efdf116f18
commit 805293a9f2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 61 additions and 53 deletions

View File

@ -1,9 +1,9 @@
import inspect
import os import os
import os.path as osp import os.path as osp
import random import random
import subprocess import subprocess
import time import time
from functools import partial
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
import mmengine import mmengine
@ -82,7 +82,6 @@ class DLCRunner(BaseRunner):
task = task_type(task_cfg) task = task_type(task_cfg)
num_gpus = task.num_gpus num_gpus = task.num_gpus
task_name = task.name task_name = task.name
script_path = inspect.getsourcefile(task_type)
# Dump task config to file # Dump task config to file
mmengine.mkdir_or_exist('tmp/') mmengine.mkdir_or_exist('tmp/')
@ -90,28 +89,26 @@ class DLCRunner(BaseRunner):
task_cfg.dump(param_file) task_cfg.dump(param_file)
# Build up DLC command # Build up DLC command
task_cmd_template = task.get_command_template()
task_cmd = task_cmd_template.replace('{SCRIPT_PATH}',
script_path).replace(
'{CFG_PATH}', param_file)
pwd = os.getcwd() pwd = os.getcwd()
shell_cmd = (f'source {self.aliyun_cfg["bashrc_path"]}; ' shell_cmd = (f'source {self.aliyun_cfg["bashrc_path"]}; '
f'conda activate {self.aliyun_cfg["conda_env_name"]}; ' f'conda activate {self.aliyun_cfg["conda_env_name"]}; '
f'cd {pwd}; ' f'cd {pwd}; '
f'{task_cmd}') '{task_cmd}')
cmd = ('dlc create job' tmpl = ('dlc create job'
f" --command '{shell_cmd}'" f" --command '{shell_cmd}'"
f' --name {task_name[:512]}' f' --name {task_name[:512]}'
' --kind BatchJob' ' --kind BatchJob'
f" -c {self.aliyun_cfg['dlc_config_path']}" f" -c {self.aliyun_cfg['dlc_config_path']}"
f" --workspace_id {self.aliyun_cfg['workspace_id']}" f" --workspace_id {self.aliyun_cfg['workspace_id']}"
' --worker_count 1' ' --worker_count 1'
f' --worker_cpu {max(num_gpus * 6, 8)}' f' --worker_cpu {max(num_gpus * 6, 8)}'
f' --worker_gpu {num_gpus}' f' --worker_gpu {num_gpus}'
f' --worker_memory {max(num_gpus * 32, 48)}' f' --worker_memory {max(num_gpus * 32, 48)}'
f" --worker_image {self.aliyun_cfg['worker_image']}" f" --worker_image {self.aliyun_cfg['worker_image']}"
' --interactive') ' --interactive')
get_cmd = partial(task.get_command, cfg_path=param_file, template=tmpl)
cmd = get_cmd()
logger = get_logger() logger = get_logger()
logger.debug(f'Running command: {cmd}') logger.debug(f'Running command: {cmd}')
@ -138,6 +135,8 @@ class DLCRunner(BaseRunner):
retry -= 1 retry -= 1
if random_sleep: if random_sleep:
time.sleep(random.randint(0, 10)) time.sleep(random.randint(0, 10))
# Re-generate command to refresh ports.
cmd = get_cmd()
result = subprocess.run(cmd, result = subprocess.run(cmd,
shell=True, shell=True,
text=True, text=True,

View File

@ -1,9 +1,9 @@
import inspect
import os import os
import os.path as osp import os.path as osp
import subprocess import subprocess
import time import time
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from functools import partial
from threading import Lock from threading import Lock
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
@ -108,7 +108,6 @@ class LocalRunner(BaseRunner):
""" """
task_name = task.name task_name = task.name
script_path = inspect.getsourcefile(type(task))
# Dump task config to file # Dump task config to file
mmengine.mkdir_or_exist('tmp/') mmengine.mkdir_or_exist('tmp/')
@ -116,12 +115,11 @@ class LocalRunner(BaseRunner):
task.cfg.dump(param_file) task.cfg.dump(param_file)
# Build up slurm command # Build up slurm command
task_cmd_template = task.get_command_template() tmpl = 'CUDA_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids)
task_cmd = task_cmd_template.replace('{SCRIPT_PATH}', tmpl += ' {task_cmd}'
script_path).replace( get_cmd = partial(task.get_command, cfg_path=param_file, template=tmpl)
'{CFG_PATH}', param_file) cmd = get_cmd()
cmd = 'CUDA_VISIBLE_DEVICES=' + ','.join(str(i) for i in gpu_ids) + ' '
cmd += task_cmd
logger = get_logger() logger = get_logger()
logger.debug(f'Running command: {cmd}') logger.debug(f'Running command: {cmd}')

View File

@ -1,9 +1,9 @@
import inspect
import os import os
import os.path as osp import os.path as osp
import random import random
import subprocess import subprocess
import time import time
from functools import partial
from typing import Any, Dict, List, Tuple from typing import Any, Dict, List, Tuple
import mmengine import mmengine
@ -85,7 +85,6 @@ class SlurmRunner(BaseRunner):
task = task_type(task_cfg) task = task_type(task_cfg)
num_gpus = task.num_gpus num_gpus = task.num_gpus
task_name = task.name task_name = task.name
script_path = inspect.getsourcefile(task_type)
# Dump task config to file # Dump task config to file
mmengine.mkdir_or_exist('tmp/') mmengine.mkdir_or_exist('tmp/')
@ -93,18 +92,17 @@ class SlurmRunner(BaseRunner):
task_cfg.dump(param_file) task_cfg.dump(param_file)
# Build up slurm command # Build up slurm command
task_cmd_template = task.get_command_template() tmpl = 'srun'
task_cmd = task_cmd_template.replace('{SCRIPT_PATH}',
script_path).replace(
'{CFG_PATH}', param_file)
cmd = 'srun'
if self.partition: if self.partition:
cmd += f' -p {self.partition}' tmpl += f' -p {self.partition}'
if self.quotatype: if self.quotatype:
cmd += f' --quotatype={self.quotatype}' tmpl += f' --quotatype={self.quotatype}'
if num_gpus > 0: if num_gpus > 0:
cmd += f' --gres=gpu:{num_gpus}' tmpl += f' --gres=gpu:{num_gpus}'
cmd += f" -N1 -J '{task_name[:512]}' {task_cmd}" tmpl += f" -N1 -J '{task_name[:512]}'" + ' {task_cmd}'
get_cmd = partial(task.get_command, cfg_path=param_file, template=tmpl)
cmd = get_cmd()
logger = get_logger() logger = get_logger()
logger.debug(f'Running command: {cmd}') logger.debug(f'Running command: {cmd}')
@ -130,6 +128,8 @@ class SlurmRunner(BaseRunner):
retry -= 1 retry -= 1
if random_sleep: if random_sleep:
time.sleep(random.randint(0, 10)) time.sleep(random.randint(0, 10))
# Re-generate command to refresh ports.
cmd = get_cmd()
result = subprocess.run(cmd, result = subprocess.run(cmd,
shell=True, shell=True,
text=True, text=True,

View File

@ -10,7 +10,7 @@ from opencompass.utils import get_infer_output_path, task_abbr_from_cfg
class BaseTask: class BaseTask:
"""Base class for all tasks. There are two ways to run the task: """Base class for all tasks. There are two ways to run the task:
1. Directly by calling the `run` method. 1. Directly by calling the `run` method.
2. Calling the `get_command_template` method to get the command template, 2. Calling the `get_command` method to get the command,
and then run the command in the shell. and then run the command in the shell.
Args: Args:
@ -35,15 +35,13 @@ class BaseTask:
"""Run the task.""" """Run the task."""
@abstractmethod @abstractmethod
def get_command_template(self) -> str: def get_command(self, cfg_path, template) -> str:
"""Get the command template for the task. """Get the command template for the task.
The command template should Args:
contain the following placeholders: cfg_path (str): The path to the config file of the task.
1. ``{SCRIPT_PATH}``: This placeholder will be replaced by the path to template (str): The template which have '{task_cmd}' to format
the script file of the task. the command.
2. ``{CFG_PATH}`` This placeholder will be replaced by the
path to the config file of the task.
""" """
@property @property

View File

@ -31,8 +31,10 @@ class OpenICLEvalTask(BaseTask):
self.num_gpus = 0 self.num_gpus = 0
self.logger = get_logger() self.logger = get_logger()
def get_command_template(self): def get_command(self, cfg_path, template):
return 'python3 {SCRIPT_PATH} {CFG_PATH}' script_path = __file__
command = f'python3 {script_path} {cfg_path}'
return template.format(task_cmd=command)
def run(self): def run(self):
for model_cfg, dataset_cfgs in zip(self.model_cfgs, self.dataset_cfgs): for model_cfg, dataset_cfgs in zip(self.model_cfgs, self.dataset_cfgs):

View File

@ -31,13 +31,24 @@ class OpenICLInferTask(BaseTask):
self.num_gpus = run_cfg.get('num_gpus', 0) self.num_gpus = run_cfg.get('num_gpus', 0)
self.num_procs = run_cfg.get('num_procs', 1) self.num_procs = run_cfg.get('num_procs', 1)
def get_command_template(self): def get_command(self, cfg_path, template):
"""Get the command template for the task.
Args:
cfg_path (str): The path to the config file of the task.
template (str): The template which have '{task_cmd}' to format
the command.
"""
script_path = __file__
if self.num_gpus > 0: if self.num_gpus > 0:
return (f'torchrun --master_port={random.randint(12000, 32000)} ' port = random.randint(12000, 32000)
f'--nproc_per_node {self.num_procs} ' command = (f'torchrun --master_port={port} '
'{SCRIPT_PATH} {CFG_PATH}') f'--nproc_per_node {self.num_procs} '
f'{script_path} {cfg_path}')
else: else:
return ('python {SCRIPT_PATH} {CFG_PATH}') command = 'python {script_path} {cfg_path}'
return template.format(task_cmd=command)
def run(self): def run(self):
for model_cfg, dataset_cfgs in zip(self.model_cfgs, self.dataset_cfgs): for model_cfg, dataset_cfgs in zip(self.model_cfgs, self.dataset_cfgs):