diff --git a/volc_tools.py b/volc_tools.py deleted file mode 100644 index 846fa6b5..00000000 --- a/volc_tools.py +++ /dev/null @@ -1,315 +0,0 @@ -import os -import subprocess -import uuid -import yaml -import argparse -from typing import Dict, Optional -from dataclasses import dataclass, field, asdict -from loguru import logger - -# Configure loguru logger -logger.remove() # Remove default handler -logger.add( - "volcano_deploy_{time}.log", - rotation="500 MB", - level="INFO", - format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}" -) -logger.add( - lambda msg: print(msg, flush=True), # Also print to console - colorize=True, - format="{time:HH:mm:ss} | {level: <8} | {message}" -) - -def get_current_conda_env() -> str: - """Get the path of the current conda environment.""" - try: - # Get CONDA_PREFIX from environment - conda_prefix = os.environ.get('CONDA_PREFIX') - if conda_prefix: - logger.debug(f"Found conda environment from CONDA_PREFIX: {conda_prefix}") - return conda_prefix - - # If CONDA_PREFIX is not set, try to get it from conda command - logger.debug("CONDA_PREFIX not found, trying conda info command") - result = subprocess.run( - 'conda info --envs | grep "*" | awk \'{print $NF}\'', - shell=True, - capture_output=True, - text=True - ) - if result.returncode == 0 and result.stdout.strip(): - env_path = result.stdout.strip() - logger.debug(f"Found conda environment from command: {env_path}") - return env_path - - - - - except Exception as e: - logger.warning(f"Failed to detect conda environment: {e}") - - # Return default if detection fails - default_env = '/fs-computility/llm/shared/llmeval/share_envs/oc-v034-ld-v061' - logger.warning(f"Using default conda environment: {default_env}") - return default_env - -@dataclass -class VolcanoConfig: - """Configuration for Volcano deployment.""" - home_path = '/fs-computility/llmeval/zhaoyufeng/' - bashrc_path=f'{home_path}.bashrc' - conda_env_name: str = field(default_factory=get_current_conda_env) - huggingface_cache: str = '/fs-computility/llm/shared/llmeval/models/opencompass_hf_hub' - torch_cache: str = '/fs-computility/llm/shared/llmeval/torch' - volc_cfg_file: str = '/fs-computility/llmeval/zhaoyufeng/ocplayground/envs/volc_infer.yaml' - - task_name: str = 'compassjudger-1-32B' - queue_name: str = 'llmit' - extra_envs: list = field(default_factory=lambda: [ - 'COMPASS_DATA_CACHE=/fs-computility/llm/shared/llmeval/datasets/compass_data_cache', - 'TORCH_HOME=/fs-computility/llm/shared/llmeval/torch', - 'TIKTOKEN_CACHE_DIR=/fs-computility/llm/shared/llmeval/share_tiktoken', - ]) - image: str = "vemlp-cn-beijing.cr.volces.com/preset-images/cuda:12.2.2" - -class VolcanoDeployment: - """Handles deployment of ML tasks to Volcano infrastructure.""" - - def __init__(self, config: Optional[Dict] = None): - """Initialize deployment with configuration.""" - self.config = VolcanoConfig(**config) if config else VolcanoConfig() - self.pwd = os.getcwd() - logger.info("Initialized VolcanoDeployment with configuration:") - logger.info(f"Working directory: {self.pwd}") - for key, value in asdict(self.config).items(): - logger.info(f"{key}: {value}") - - def choose_flavor(self, num_gpus: int, num_replicas: int = 1) -> Dict: - """Select appropriate machine flavor based on GPU requirements.""" - flavor_map = { - 0: 'ml.c1ie.2xlarge', - 1: 'ml.pni2l.3xlarge', - 2: 'ml.pni2l.7xlarge', - 4: 'ml.pni2l.14xlarge', - 8: 'ml.hpcpni2l.28xlarge' - } - - if num_gpus > 8: - logger.error(f"Configuration for {num_gpus} GPUs not supported") - raise NotImplementedError(f"Configuration for {num_gpus} GPUs not supported") - - for max_gpus, flavor in sorted(flavor_map.items()): - if num_gpus <= max_gpus: - selected_flavor = flavor - break - - logger.info(f"Selected flavor {selected_flavor} for {num_gpus} GPUs") - logger.info(f"Number of relicas: {num_replicas}") - - with open(self.config.volc_cfg_file) as fp: - volc_cfg = yaml.safe_load(fp) - - for role_spec in volc_cfg['TaskRoleSpecs']: - if role_spec['RoleName'] == 'worker': - role_spec['Flavor'] = selected_flavor - role_spec['RoleReplicas'] = num_replicas - - return volc_cfg - - def build_shell_command(self, task_cmd: str) -> str: - """Construct shell command with all necessary environment setup.""" - logger.debug("Building shell command") - cmd_parts = [ - f'source {self.config.bashrc_path}', - ] - - # Get CONDA_EXE from enviroment - conda_exe = os.environ.get("CONDA_EXE", None) - assert conda_exe, f"CONDA_EXE is None, please make sure conda exists in your current environment" - conda_activate = conda_exe.replace("bin/conda", "bin/activate") - - # Handle conda environment activation based on whether it's a path or name - if os.path.exists(self.config.conda_env_name): - logger.debug(f"Using conda activate with path: {self.config.conda_env_name}") - cmd_parts.append(f'source {conda_activate} {self.config.conda_env_name}') - else: - logger.debug(f"Using source activate with name: {self.config.conda_env_name}") - cmd_parts.append(f'source {conda_activate} {self.config.conda_env_name}') - - cmd_parts.extend([ - f'export PYTHONPATH={self.pwd}:$PYTHONPATH', - f'export HF_HUB_CACHE={self.config.huggingface_cache}', - f'export HUGGINGFACE_HUB_CACHE={self.config.huggingface_cache}', - f'export TORCH_HOME={self.config.torch_cache}' - ]) - - offline_vars = [ - 'HF_DATASETS_OFFLINE=1', - 'TRANSFORMERS_OFFLINE=1', - 'HF_EVALUATE_OFFLINE=1', - 'HF_HUB_OFFLINE=1' - ] - cmd_parts.extend([f'export {var}' for var in offline_vars]) - - if self.config.extra_envs: - cmd_parts.extend([f'export {env}' for env in self.config.extra_envs]) - - cmd_parts.extend([ - f'cd {self.pwd}', - task_cmd - ]) - - full_cmd = '; '.join(cmd_parts) - logger.debug(f"Generated shell command: {full_cmd}") - return full_cmd - - def deploy(self, task_cmd: str, num_gpus: int = 4, num_replicas: int = 1) -> subprocess.CompletedProcess: - """Deploy the task to Volcano infrastructure.""" - logger.info(f"Starting deployment with {num_gpus} GPUs") - logger.info(f"Task command: {task_cmd}") - - try: - volcano_cfg = self.choose_flavor(num_gpus, num_replicas) - - os.makedirs(f'{self.pwd}/tmp', exist_ok=True) - tmp_cfg_file = f'{self.pwd}/tmp/{uuid.uuid4()}_cfg.yaml' - logger.debug(f"Created temporary config file: {tmp_cfg_file}") - - with open(tmp_cfg_file, 'w') as fp: - yaml.dump(volcano_cfg, fp, sort_keys=False) - - shell_cmd = self.build_shell_command(task_cmd) - - submit_cmd = ( - 'volc ml_task submit' - f" --conf '{tmp_cfg_file}'" - f" --entrypoint '{shell_cmd}'" - f' --task_name {self.config.task_name}' - f' --resource_queue_name {self.config.queue_name}' - f' --image {self.config.image}' - ) - - logger.info("Submitting Volcano task") - logger.debug(f"Submit command: {submit_cmd}") - - result = subprocess.run( - submit_cmd, - shell=True, - text=True, - capture_output=True, - check=True - ) - - logger.info("Task submitted successfully") - return result - - except Exception as e: - logger.error(f"Deployment failed: {str(e)}") - raise - finally: - pass - # if os.path.exists(tmp_cfg_file): - # logger.debug(f"Cleaning up temporary config file: {tmp_cfg_file}") - # os.remove(tmp_cfg_file) - -def parse_args(): - """Parse command line arguments.""" - parser = argparse.ArgumentParser(description='Deploy ML tasks to Volcano infrastructure') - - # Required arguments - parser.add_argument('--task-cmd', required=True, help='The main task command to execute') - - # Optional arguments - parser.add_argument('--num-gpus', type=int, default=4, help='Number of GPUs required (default: 4)') - parser.add_argument('--num-replicas', type=int, default=1, help='Number of Replicas (default: 1)') - parser.add_argument('--task-name', help='Override default task name') - parser.add_argument('--queue-name', help='Override default queue name') - parser.add_argument('--image', help="Overide default image") - parser.add_argument('--conda-env', help='Conda environment to use (default: current environment)') - parser.add_argument('--extra-envs', nargs='+', help='Additional environment variables in format KEY=VALUE') - parser.add_argument('--log-level', choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='INFO', - help='Set logging level (default: INFO)') - - return parser.parse_args() - -def main(): - """Main execution function.""" - args = parse_args() - - # Set log level - logger.remove() # Remove existing handlers - logger.add( - "volcano_deploy_{time}.log", - rotation="500 MB", - level=args.log_level, - format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}" - ) - logger.add( - lambda msg: print(msg, flush=True), - colorize=True, - level=args.log_level, - format="{time:HH:mm:ss} | {level: <8} | {message}" - ) - - logger.info("Starting Volcano deployment script") - - # Get current conda environment - current_env = get_current_conda_env() - logger.info(f"Current conda environment: {current_env}") - - # Prepare configuration overrides - config_overrides = {} - if args.task_name: - config_overrides['task_name'] = args.task_name - if args.queue_name: - config_overrides['queue_name'] = args.queue_name - if args.conda_env: - config_overrides['conda_env_name'] = args.conda_env - if args.image: - config_overrides['image'] = args.image - if args.extra_envs: - default_config = VolcanoConfig() - config_overrides['extra_envs'] = default_config.extra_envs + args.extra_envs - - # Initialize deployment - deployer = VolcanoDeployment(config_overrides if config_overrides else None) - - # Print deployment configuration - logger.info("\nDeployment configuration summary:") - logger.info(f"Task command: {args.task_cmd}") - logger.info(f"Number of GPUs: {args.num_gpus}") - logger.info(f"Conda environment: {deployer.config.conda_env_name}") - logger.info(f"Task name: {deployer.config.task_name}") - logger.info(f"Queue name: {deployer.config.queue_name}") - logger.info(f"Image name: {deployer.config.image}") - if args.extra_envs: - logger.info(f"Additional environment variables: {args.extra_envs}") - - # Confirm deployment - confirm = input("\nProceed with deployment? [y/N]: ") - if confirm.lower() != 'y': - logger.warning("Deployment cancelled by user") - return - - # Execute deployment - try: - result = deployer.deploy(args.task_cmd, num_gpus=args.num_gpus, num_replicas=args.num_replicas) - - # Print deployment result - if result.returncode == 0: - logger.success("Deployment completed successfully") - else: - logger.error("Deployment failed") - - if result.stdout: - logger.info(f"Output: {result.stdout}") - if result.stderr: - logger.warning(f"Errors: {result.stderr}") - - except Exception as e: - logger.exception("Deployment failed with exception") - raise - -if __name__ == "__main__": - main()