Source code for job_handler

"""
Job handler

Machine specific job submission and tracking routines. Provides a unified
interface for all job handling. Implements the JobHandler class which will be
initialized to the machine the calculations are running on from the provided
options.

"""

import os
import re
import shlex
import subprocess
import sys
import time
from logging import debug, info, warn, error
from os import path
from subprocess import Popen, PIPE, STDOUT

MAX_RETRY = 5


[docs]class JobHandler(object): """ Abstraction of batch scheduler submission. Provides a few helper methods for job submission and tracking. submit run a job; returns a jobid for a queued job, True for a completed job and False for a failed job postrun submit script to run itself after completion """ def __init__(self, options): """Initialize for machine specified by options.""" self.queue = options.get('queue') if self.queue == 'wooki': self.submit = _wooki_submit self.postrun = _wooki_postrun self.jobcheck = _wooki_jobcheck self.env = _wooki_env elif self.queue == 'sharcnet': self.submit = _sharcnet_submit self.postrun = _mk_sharcnet_postrun(options.get('dedicated_queue')) self.jobcheck = _sharcnet_jobcheck self.env = _sharcnet_env elif self.queue == 'slurm': self.submit = _slurm_submit self.postrun = _slurm_postrun self.jobcheck = _slurm_jobcheck self.env = _pass elif self.queue == 'serial': self.submit = _serial_run self.postrun = _pass self.jobcheck = _pass self.env = _pass else: print("ERROR unknown queue %s. Using null handler" % self.queue) self.submit = _pass self.postrun = _pass self.jobcheck = _pass self.env = _pass
def _sharcnet_submit(job_type, options, input_file=None, input_args=None): """Simple interface to the 'sqsub' submission on sharcnet""" # Threaded codes have different behaviour openmp_codes = options.gettuple('threaded_codes') # Bind some things locally, so we know what's going on job_name = options.get('job_name') exe = options.get('%s_exe' % job_type) try: nodes = options.getint('%s_ncpu' % job_type) except AttributeError: nodes = 1 sqsub_args = ['sqsub'] # Always use the dedicated queue; faster dedicated_queue = options.get('dedicated_queue') if dedicated_queue: sqsub_args.extend(['-q', dedicated_queue]) # job_name sqsub_args.extend(['-j', 'faps-%s-%s' % (job_name, job_type)]) # Is it a multiple CPU job? # Memory is mandatory; set depending on job type... if nodes > 1: # request nodes sqsub_args.extend(['-n', '%i' % nodes]) if job_type in openmp_codes: # Some jobs are only openmp _check_program(exe) sqsub_args.extend(['-f', 'threaded']) sqsub_args.extend(['--mpp=%fg' % options.getfloat('threaded_memory')]) else: # Ensure mpi is enebaled _check_program(exe, mpi=True) sqsub_args.extend(['-f', 'mpi']) # Pack up to 12 procs on a single node if nodes < 12: sqsub_args.extend(['--mpp=1.66g']) sqsub_args.extend(['--pack']) elif nodes % 2 == 0 and nodes/2 < 8: sqsub_args.extend(['--mpp=1.66g']) sqsub_args.extend(['-N2']) elif nodes % 4 == 0 and nodes/4 < 8: sqsub_args.extend(['--mpp=1.66g']) sqsub_args.extend(['-N4']) elif nodes % 6 == 0 and nodes/6 < 8: sqsub_args.extend(['--mpp=1.66g']) sqsub_args.extend(['-N6']) elif nodes % 8 == 0 and nodes/8 < 8: sqsub_args.extend(['--mpp=1.66g']) sqsub_args.extend(['-N8']) elif nodes % 12 == 0 and nodes/12 < 8: sqsub_args.extend(['--mpp=1.66g']) sqsub_args.extend(['-N12']) elif nodes % 24 == 0 and nodes/24 < 8: sqsub_args.extend(['--mpp=1.66g']) sqsub_args.extend(['-N24']) else: sqsub_args.extend(['--mpp=2.66g']) else: _check_program(exe) sqsub_args.extend(['--mpp=%fg' % options.getfloat('serial_memory')]) # run-time estimate mandatory; 12 hours is plenty? sqsub_args.extend(['-r', '48h']) # Some codes need the input file name if input_file is not None: sqsub_args.extend(['-i', '%s' % input_file]) # Output sqsub_args.extend(['-o', 'faps-%s.out' % job_name]) # Which command? sqsub_args.extend([exe]) if input_args is not None: sqsub_args.extend(input_args) debug("Submission command: %s" % " ".join(sqsub_args)) submitted = False submit_count = 0 while not submitted and submit_count <= MAX_RETRY: submit = Popen(sqsub_args, stdout=PIPE) for line in submit.stdout.readlines(): if 'submitted as' in line: jobid = int(line.split()[-1]) submitted = True break else: submit_count += 1 error("Job submission attempt %i failed." % submit_count) time.sleep(submit_count) return jobid def _sharcnet_postrun(waitid): """Dummy to stop picked jobs failing""" _pass() def _mk_sharcnet_postrun(dedicated_queue=None): """Return a postrun function for a particular queue.""" def _sharcnet_postrun(waitid): """ Resubmit this script for the postrun on job completion. Will accept a single jobid or a list, as integers or strings. """ # Magic makes everything into a set of strings if hasattr(waitid, '__iter__'): waitid = frozenset([("%s" % wid).strip() for wid in waitid]) else: waitid = frozenset([("%s" % waitid).strip()]) # Check that job appears in sqjobs before submitting next one for loop_num in range(10): qstat = Popen(['qstat', '-u', '$USER'], stdout=PIPE, shell=True) if waitid.issubset(re.split('[\s.]', qstat.stdout.read())): # All jobs there break else: # Wait longer each time, in case the system is very slow time.sleep(loop_num) # Sumbit here, even if jobs never found in queue jobid_str = ('-'.join(sorted(waitid)))[:15] # this should be plenty sqsub_args = [ 'sqsub', '-r', '20m', '-o', 'faps-post-%s.out' % jobid_str, '--mpp=3g', '--waitfor=%s' % ','.join(waitid)] if dedicated_queue: sqsub_args.extend(['-q', dedicated_queue]) # Add the submitted program cleaned for instruction commands sqsub_args.extend(_argstrip(sys.argv)) # We can just call this as we don't care about the jobid debug("Postrun command: %s" % " ".join(sqsub_args)) subprocess.call(sqsub_args) return _sharcnet_postrun def _sharcnet_jobcheck(jobid): """Return true if job is still running or queued, or check fails.""" # can deal with jobid as an int or a string jobid = ("%s" % jobid).strip() running_status = ['Q', 'R', 'Z'] qstat = Popen(['sqjobs', jobid], stdout=PIPE, stderr=STDOUT) for line in qstat.stdout.readlines(): if "ERROR" in line: # Job finished and removed return False elif jobid in line: # use of 'in' should be fine as only this job will be shown # can't use positional slicing as columns resize status = line.split()[3] if status in running_status: return True else: return False else: error("Failed to get job information. Is sqjobs working?") # Act as if the job is still running, in case it hasn't finished return True def _sharcnet_env(code, *args, **kwargs): """Update the running environment for specific codes.""" if code == 'siesta': # Siesta uses pathscale libraries and is a module # the pathscale currently appends the 32 bit lib_dir # so we have to try and chop that out newenv = subprocess.Popen( ['/usr/bin/modulecmd', 'python', 'load', 'siesta'], stdout=PIPE) #newenv = newenv.stdout.read() exec(newenv.stdout) ld_lib = os.environ['LD_LIBRARY_PATH'] ld_lib = re.sub('/opt/sharcnet/pathscale/(.*)/lib/(.*)/32', '/opt/sharcnet/pathscale/\\1/lib/\\2', ld_lib) os.environ['LD_LIBRARY_PATH'] = ld_lib else: pass def _wooki_submit(job_type, options, *args, **kwargs): """ Interface to the submission scripts on wooki. Let them deal with the node types, because it is too fiddly to care about. """ submit_scripts = { 'vasp': ['vasp-submit'], 'repeat': ['repeat-submit-faps'], 'siesta': ['siesta-submit'], 'fastmc': ['fastmc-submit'], 'gulp': ['gulp-submit-faps'], 'egulp': ['egulp-submit'], 'dl_poly': ['dl_poly-submit'], 'gromacs': ['script-submit', './gromacs_faps'], 'absl': ['script-submit', './absl_faps'] } job_name = options.get('job_name') try: nodes = options.getint('%s_ncpu' % job_type) except AttributeError: nodes = 1 submit_args = submit_scripts[job_type] + [job_name, "%i" % nodes] debug("Submission command: %s" % " ".join(submit_args)) submitted = False submit_count = 0 while not submitted and submit_count <= MAX_RETRY: submit = Popen(submit_args, stdout=subprocess.PIPE) for line in submit.stdout.readlines(): # Your job 123 ("vasp.testzif") has been submitted if "has been submitted" in line: jobid = line.split()[2] submitted = True break else: submit_count += 1 error("Job submission attempt %i failed." % submit_count) time.sleep(submit_count) return jobid def _wooki_postrun(waitid): """ Resubmit this script for the postrun on job completion. Will accept a single jobid or a list, as integers or strings. """ # Magic makes everything into a set of strings if hasattr(waitid, '__iter__'): waitid = frozenset([("%s" % wid).strip() for wid in waitid]) else: waitid = frozenset([("%s" % waitid).strip()]) # No jobcheck here as we assume wooki works jobid_str = ('-'.join(sorted(waitid)))[:15] # this should be plenty sge_script = ['#!/bin/bash\n', '#$ -cwd\n', '#$ -V\n', '#$ -j y\n', '#$ -N faps-post-%s\n' % jobid_str, '#$ -o faps-post-%s.out\n' % jobid_str, '#$ -hold_jid %s\n' % ','.join(waitid), 'python ', ' '.join(_argstrip(sys.argv))] sge_script = ''.join(sge_script) submit = Popen("qsub", shell=False, stdin=PIPE) submit.communicate(input=sge_script) def _wooki_jobcheck(jobid): """Return true if job is still running or queued, or check fails.""" # can deal with jobid as an int or a string jobid = ("%s" % jobid).strip() qstat = Popen(['qstat', '-j', jobid], stdout=PIPE, stderr=STDOUT) for line in qstat.stdout.readlines(): if "Following jobs do not exist" in line: # Job finished and removed return False #TODO(tdaff): any way to get the job information? else: # Job still exists, still running return True def _wooki_env(code, *args, **kwargs): """Hacks to get things working with wooki submission scripts""" pass def _serial_run(job_type, options, input_file=None, input_args=None): """Run the exe in a subprocess. input_args must be a list if """ # Bind some things locally, so we know what's going on job_name = options.get('job_name') exe = options.get('%s_exe' % job_type) serial_args = shlex.split(exe) if input_args is not None: serial_args.extend(input_args) # Some codes need the input file name if input_file is not None: input_file = open('%s' % input_file) # Output out_file = open('faps-%s.out' % job_name, 'wb') # run the job in process debug("Waiting for command: %s" % " ".join(serial_args)) submit = Popen(serial_args, stdin=input_file, stdout=out_file) submit.wait() finished = submit.returncode info("%s job finished with return code %s" % (exe, finished)) # always return True for a finished job return True def _slurm_submit(job_type, options, input_file=None, input_args=None): """Simple interface to slurm resource manager""" # Threaded codes have different behaviour openmp_codes = options.gettuple('threaded_codes') # Bind some things locally, so we know what's going on job_name = options.get('job_name') exe = options.get('%s_exe' % job_type) try: nodes = options.getint('%s_ncpu' % job_type) except AttributeError: nodes = 1 sbatch_script = ['#!/bin/bash\n', '\n'] job_command = [exe] sbatch_args = ['sbatch'] # job_name sbatch_args.extend(['--job-name', 'faps-%s-%s' % (job_name, job_type)]) # Is it a multiple CPU job? if nodes > 1: if job_type in openmp_codes: # Some jobs are only openmp _check_program(exe) # Single task with so many CPUs sbatch_args.extend(['--cpus-per-task', '%i' % nodes]) sbatch_script.append('export OMP_NUM_THREADS=$SLURM_CPUS_PER_TASK\n') else: # assume MPI # Ensure mpi is enabled _check_program(exe, mpi=True) sbatch_args.extend(['--ntasks', '%i' % nodes]) job_command.insert(0, 'mpirun ') else: _check_program(exe) # Some codes need the input file name if input_file is not None: job_command.extend(['<', '%s' % input_file]) # Output sbatch_args.extend(['--output', 'faps-%s.out' % job_name]) # Which command? if input_args is not None: job_command.extend(input_args) sbatch_script.extend([" ".join(job_command)]) sbatch_script = ''.join(sbatch_script) debug("Submission command: %s" % " ".join(sbatch_args)) submit_count = 0 while submit_count <= MAX_RETRY: submit = Popen(sbatch_args, stdout=PIPE, stdin=PIPE) stdoutdata, _stderrdata = submit.communicate(input=sbatch_script) if 'Submitted' in stdoutdata: jobid = int(stdoutdata.split()[-1]) break else: submit_count += 1 error("Job submission attempt %i failed." % submit_count) time.sleep(submit_count) return jobid def _slurm_postrun(waitid): """ Resubmit this script for the postrun on job completion. Will accept a single jobid or a list, as integers or strings. """ # Magic makes everything into a set of strings if hasattr(waitid, '__iter__'): waitid = frozenset([("%s" % wid).strip() for wid in waitid]) else: waitid = frozenset([("%s" % waitid).strip()]) jobid_str = ('-'.join(sorted(waitid)))[:15] # this should be plenty sbatch_script = ['#!/bin/bash\n', '#SBATCH --dependency afterok:%s\n' % ':'.join(waitid), 'python ', ' '.join(_argstrip(sys.argv))] sbatch_args = ['sbatch', '--job-name', 'faps-post-%s' % jobid_str, '--output', 'faps-post-%s.out' % jobid_str] sbatch_script = ''.join(sbatch_script) submit = Popen(sbatch_args, shell=False, stdin=PIPE) submit.communicate(input=sbatch_script) def _slurm_jobcheck(jobid): """Return true if job is still running or queued, or check fails.""" # can deal with jobid as an int or a string jobid = ("%s" % jobid).strip() running_states = ['CG', 'PD', 'R', 'S', 'CF'] squeue = Popen(['squeue', '-j', jobid], stdout=PIPE, stderr=STDOUT) stdoutdata = squeue.stdout.readlines() if len(stdoutdata) == 1: if 'JOBID' in stdoutdata[0] or 'Invalid job' in stdoutdata[0]: # job cleared from the queue return False elif stdoutdata[1][48:50].strip() in running_states: return True else: # Job still exists, still running return True def _pass(*args, **kwargs): """Sometimes we want to do nothing.""" pass def _argstrip(arglist): """Some options might be best removed before resubmission.""" to_remove = ['-i', '--interactive', '-m', '--import'] newargs = list(arglist) for item in to_remove: while item in newargs: newargs.remove(item) return newargs def _check_program(program, mpi=False): """Test to see if the exe is in the path and might work.""" exe = which(program) if exe is None: error("Could not find %s in path, job will probably fail." % program) return False elif mpi: try: binary = open(exe, 'rb') if re.search('mpi_init', binary.read(), re.IGNORECASE): return True else: warn("%s doesn't appear to be an mpi executable." % program) return False except IOError: return False else: # TODO(tdaff): No easy way to check for OMP in general? return True
[docs]def is_exe(fpath): """Return executability of a file.""" return path.isfile(fpath) and os.access(fpath, os.X_OK)
[docs]def which(program): """Return the equivalent of the 'which' command.""" fpath, _fname = path.split(program) if fpath: if is_exe(program): return program else: for env_path in os.environ["PATH"].split(os.pathsep): exe_file = path.join(env_path, program) if is_exe(exe_file): return exe_file return None