Source code for olympus.baselines.launch

from argparse import ArgumentParser, REMAINDER
from dataclasses import dataclass, field
import os
import json
import signal
import subprocess
import sys
from typing import List, Dict

from olympus.utils.gpu import GpuMonitor
from olympus.utils import info, warning, parse_args


[docs]def get_available_tasks(): __ignored_files = { '__init__.py', '__pycache__', 'launch.py' } scripts = [] for file_name in os.listdir(os.path.dirname(__file__)): if file_name not in __ignored_files: scripts.append(file_name.replace('.py', '')) return scripts
[docs]def arguments(): parser = ArgumentParser(description="Olympus task launcher") parser.add_argument('--no-mon', action='store_true', default=False, help='Disable GPU monitoring') # When training on multi GPUs you can specify the devices on which to train parser.add_argument('--devices', type=str, default=None, help='GPU ids used for the training (comma separated value)') # You can have more than one worker parser.add_argument('--workers', type=int, default=1, help='Number of workers') parser.add_argument('--device-sharing', action='store_true', default=True, dest='device_sharing', help='Enable device sharing across workers') parser.add_argument('--no-device-sharing', action='store_false', dest='device_sharing', help='Enable device sharing across workers') parser.add_argument('--cpu', action='store_true', default=False, help='Workers do not need devices') # parser.add_argument('--rdv-url', type=str, default=None, # help='URL used to sync multi node workers for multi node training') parser.add_argument('task', choices=get_available_tasks()) # rest from the training program parser.add_argument('args', nargs=REMAINDER) return parser
[docs]def get_available_port(): import socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.bind(('', 0)) port = s.getsockname()[1] s.close() return port
[docs]def local_multigpu_launch(task_name, script_args, job_env, device_id, rank, world_size, port): """Launch the task using multiple GPUs""" info(f'Launching job on (device: {device_id})') script = f'{os.path.dirname(__file__)}/{task_name}.py' cmd = list([f'CUDA_VISIBLE_DEVICES={device_id}', sys.executable, '-u']) cmd.append(script) cmd.extend(('--rank', str(rank))) cmd.extend(('--world-size', str(world_size))) cmd.extend(('--dist-url', f'nccl:tcp://localhost:{port}')) cmd.extend(script_args) return subprocess.Popen(' '.join(cmd), env=job_env, shell=True)
[docs]def simple_launch(task_name, script_args): """Launch the task without creating another python interpreter""" module = __import__("olympus.baselines.{}".format(task_name), fromlist=['']) parser = module.arguments() args = parse_args(parser, script_args) module.main(**vars(args)) return None
[docs]def single_gpu_launch(task_name, script_args, job_env, device_id, rank, world_size, port): """Launch the task for a given GPU""" info(f'Launching job on (device: {device_id})') script = f'{os.path.dirname(__file__)}/{task_name}.py' cmd = list([f'CUDA_VISIBLE_DEVICES={device_id}', sys.executable, '-u']) cmd.append(script) cmd.extend(script_args) return subprocess.Popen(' '.join(cmd), env=job_env, shell=True)
[docs]@dataclass class Worker: worker_id: int devices: List[int] env: Dict = field(default_factory=lambda: os.environ) processes: List = field(default_factory=list)
[docs] def launch(self, task_name, script_args): """ Parameters ---------- task_name: str name of the task to run script_args: List[str] list of arguments to pass to the task """ subprocess_factory = single_gpu_launch port = None self.env['OLYMPUS_WORKER_ID'] = str(self.worker_id) if len(self.devices) > 1: port = get_available_port() subprocess_factory = local_multigpu_launch self.env['OLYMPUS_WORKER_PORT'] = str(port) self.env['OLYMPUS_WORKER_WORLD'] = str(len(self.devices)) self.processes = [] for rank, device_id in enumerate(self.devices): proc = subprocess_factory( task_name, script_args, self.env, device_id, rank, len(self.devices), port) self.processes.append(proc) return self.processes
[docs]def get_device_count(devices): if devices: return [int(d) for d in devices.split(',')] import torch return list(range(torch.cuda.device_count()))
[docs]def make_device_groups(worker_count, devices, shared, cpu_mode): if not devices or cpu_mode: print('Warning no devices detected, It will run in CPU mode') devices = [0] if shared or cpu_mode: return [devices for _ in range(worker_count)] device_per_worker = len(devices) // worker_count remaining_devices = len(devices) % worker_count if device_per_worker == 0: raise RuntimeError( f'Not enough devices (devices: {len(devices)}) < (workers: {worker_count})' 'Use --device-sharing or --cpu to bypass this error' ) groups = [] for wid in range(worker_count): groups.append(devices[device_per_worker * wid: device_per_worker * (wid + 1)]) if remaining_devices > 0: warning('Some devices were not assigned to worker') return groups
[docs]def single_worker_single_gpu(task, args, no_mon): with GpuMonitor(enabled=not no_mon) as mon: simple_launch(task, args) print(json.dumps(mon.to_json(), indent=2)) show_resource_stats(mon)
[docs]def show_resource_stats(monitor): return import numpy as np import pandas as pd import altair as alt ds = monitor.monitor.ts print(ds) df = pd.DataFrame(ds) print(df) df.to_csv('data.csv') df['index'] = np.arange(len(ds['utilization.gpu0'])) chart = (alt.Chart(df) .mark_line() .encode(x='index', y='utilization.memory0')) chart.save('utilization_memory0.png')
[docs]def run(workers, all_processes, task, script_args): for worker in workers: processes = worker.launch(task, script_args) all_processes.extend(processes) errors = [] for process in all_processes: process.wait() if process.returncode != 0: errors.append(( process.returncode, process.args )) for return_code, cmd in errors: print(f'Command {cmd} failed with return code {return_code}')
[docs]def cleanup(all_processes): # propagate the signal to children for process in all_processes: process.send_signal(signal=signal.SIGINT) # wait 5 seconds for them to die for process in all_processes: try: process.wait(timeout=5) # kill them if time out except subprocess.TimeoutExpired: process.terminate()
[docs]def main(argv=None): args = arguments().parse_args(argv) args.devices = get_device_count(args.devices) script_args = args.args device_groups = make_device_groups( args.workers, args.devices, args.device_sharing, args.cpu) # If we can do not spawn another python interpreter # One GPU or no GPU and a single worker if (args.devices == [0] or args.devices == []) and args.workers <= 1: single_worker_single_gpu(args.task, script_args, args.no_mon) return # We have to spawn interpreters # Create work groups workers = [] for wid, devices in zip(range(args.workers), device_groups): worker = Worker(wid, devices=devices) workers.append(worker) all_processes = [] with GpuMonitor(enabled=not args.no_mon) as mon: try: run(workers, all_processes, args.task, script_args) show_resource_stats(mon) print(json.dumps(mon.to_json(), indent=2)) except KeyboardInterrupt: cleanup(all_processes)
if __name__ == '__main__': main()