"""
Class to describe groups of jobs sharing common settings, that becomes one condor submit file.
"""
import logging
import os
import re
from subprocess import check_call
from htcondenser.common import cp_hdfs, check_certificate, check_dir_create, check_good_filename
from collections import OrderedDict
import htcondenser as ht
log = logging.getLogger(__name__)
[docs]class JobSet(object):
"""Manages a set of Jobs, all sharing a common submission file, log
locations, resource request, and setup procedure.
Parameters
----------
exe : str
Name of executable for this set of jobs. Note that path must be specified,
e.g. './myexe'
copy_exe : bool, optional
If `True`, copies the executable to HDFS. Set `False` for builtins e.g. awk
setup_script : str, optional
Shell script to execute on worker node to setup necessary programs, libs, etc.
filename : str, optional
Filename for HTCondor job description file.
out_dir : str, optional
Directory for STDOUT output. Will be automatically created if it does not
already exist. Raises an OSError if already exists but is not a directory.
out_file : str, optional
Filename for STDOUT output.
err_dir : str, optional
Directory for STDERR output. Will be automatically created if it does not
already exist. Raises an OSError if already exists but is not a directory.
err_file : str, optional
Filename for STDERR output.
log_dir : str, optional
Directory for log output. Will be automatically created if it does not
already exist. Raises an OSError if already exists but is not a directory.
log_file : str, optional
Filename for log output.
cpus : int, optional
Number of CPU cores for each job.
memory : str, optional
RAM to request for each job.
disk : str, optional
Disk space to request for each job.
certificate : bool, optional
Whether the JobSet requires the user's grid certificate.
transfer_hdfs_input : bool, optional
If True, transfers input files on HDFS to worker node first.
Auto-updates program arguments to take this into account.
Otherwise files are read directly from HDFS.
Note that this does not affect input files **not** on HDFS - they will
be transferred across regardlass.
share_exe_setup : bool, optional
If True, then all jobs will use the same exe and setup files on HDFS.
If False, each job will have their own copy of the exe and setup script
in their individual job folder.
common_input_files : list[str], optional
List of common input files for each job. Unlike Job input files, there
will only be 1 copy of this input file made on HDFS. Not sure if this
will break anything...
hdfs_store : str, optional
If any local files (on `/user`) needs to be transferred to the job, it
must first be stored on `/hdfs`. This argument specifies the directory
where those files are stored. Each job will have its own copy of all
input files, in a subdirectory with the Job name. If this directory does
not exist, it will be created.
other_args: dict, optional
Dictionary of other job options to write to HTCondor submit file.
These will be added in **before** any arguments or jobs.
Raises
------
OSError
If any of `out_file`, `err_file`, or `log_file`, are blank or '.'.
OSError
If any of `out_dir`, `err_dir`, `log_dir`, `hdfs_store` cannot be created.
"""
def __init__(self,
exe,
copy_exe=True,
setup_script=None,
filename='jobs.condor',
out_dir='logs', out_file='$(cluster).$(process).out',
err_dir='logs', err_file='$(cluster).$(process).err',
log_dir='logs', log_file='$(cluster).$(process).log',
cpus=1, memory='100MB', disk='100MB',
certificate=False,
transfer_hdfs_input=True,
share_exe_setup=True,
common_input_files=None,
hdfs_store=None,
dag_mode=False,
other_args=None):
super(JobSet, self).__init__()
self.exe = exe
self.copy_exe = copy_exe
self.setup_script = setup_script
# Check output filenames are not rubbish
for f in [filename, out_file, err_file, log_file]:
check_good_filename(f)
self.filename = os.path.abspath(filename)
self.out_dir = os.path.realpath(str(out_dir))
self.out_file = str(out_file)
self.err_dir = os.path.realpath(str(err_dir))
self.err_file = str(err_file)
self.log_dir = os.path.realpath(str(log_dir))
self.log_file = str(log_file)
self.cpus = int(cpus) if int(cpus) >= 1 else 1
self.memory = str(memory)
self.disk = str(disk)
self.certificate = certificate
self.transfer_hdfs_input = transfer_hdfs_input
self.share_exe_setup = share_exe_setup
# can't use X[:] or [] idiom as [:] evaulated first (so breaks on None)
if not common_input_files:
common_input_files = []
self.common_input_files = common_input_files[:]
self.common_input_file_mirrors = [] # To hold FileMirror obj
if hdfs_store is None:
raise IOError('Need to specify hdfs_store')
self.hdfs_store = hdfs_store
# self.dag_mode = dag_mode
self.job_template = os.path.join(os.path.dirname(__file__), 'templates/job.condor')
self.other_job_args = other_args
# Hold all Job object this JobSet manages, key is Job name.
self.jobs = OrderedDict()
# Setup directories
# ---------------------------------------------------------------------
for d in [self.out_dir, self.err_dir, self.log_dir, self.hdfs_store]:
if d:
check_dir_create(d)
self.setup_common_input_file_mirrors(self.hdfs_store)
def __eq__(self, other):
return self.filename == other.filename
def __getitem__(self, i):
if isinstance(i, int):
if i >= len(self):
raise IndexError()
return self.jobs.values()[i]
elif isinstance(i, slice):
return self.jobs.values()[i]
else:
raise TypeError('Invalid argument type - must be int or slice')
def __len__(self):
return len(self.jobs)
[docs] def add_job(self, job):
"""Add a Job to the collection of jobs managed by this JobSet.
Parameters
----------
job: Job
Job object to be added.
Raises
------
TypeError
If `job` argument isn't of type Job (or derived type).
KeyError
If a job with that name is already governed by this JobSet object.
"""
if not isinstance(job, ht.Job):
raise TypeError('Added job must by of type Job')
if job.name in self.jobs:
raise KeyError('Job %s already exists in JobSet' % job.name)
self.jobs[job.name] = job
job.manager = self
[docs] def write(self, dag_mode):
"""Write jobs to HTCondor job file."""
self.setup_common_input_file_mirrors(self.hdfs_store)
with open(self.job_template) as tfile:
template = tfile.read()
file_contents = self.generate_file_contents(template, dag_mode)
log.info('Writing HTCondor job file to %s', self.filename)
check_dir_create(os.path.dirname(os.path.realpath(self.filename)))
with open(self.filename, 'w') as jfile:
jfile.write(file_contents)
[docs] def generate_file_contents(self, template, dag_mode=False):
"""Create a job file contents from a template, replacing necessary fields
and adding in all jobs with necessary arguments.
Can either be used for normal jobs, in which case all jobs added, or
for use in a DAG, where a placeholder for any job(s) is used.
Parameters
----------
template : str
Job template as a single string, including tokens to be replaced.
dag_mode : bool, optional
If True, then submit file will only contain placeholder for job args.
This is so it can be used in a DAG. Otherwise, the submit file will
specify each Job attached to this JobSet.
Returns
-------
str
Completed job template.
Raises
------
IndexError
If the JobSet has no Jobs attached.
"""
if len(self.jobs) == 0:
raise IndexError('You have not added any jobs to this JobSet.')
worker_script = os.path.join(os.path.dirname(__file__),
'templates/condor_worker.py')
# Update other_job_args if dag
if dag_mode:
if not self.other_job_args:
self.other_job_args = dict()
self.other_job_args['accounting_group'] = 'group_physics.hep'
self.other_job_args['accounting_group_user'] = '$ENV(LOGNAME)'
# Update other_job_args if certificate
if self.certificate:
check_certificate()
if not self.other_job_args:
self.other_job_args = dict()
self.other_job_args['use_x509userproxy'] = 'True'
if self.other_job_args:
other_args_str = '\n'.join('%s = %s' % (str(k), str(v))
for k, v in self.other_job_args.iteritems())
else:
other_args_str = None
# Make replacements in template
replacement_dict = {
'EXE_WRAPPER': worker_script,
'STDOUT': os.path.join(self.out_dir, self.out_file),
'STDERR': os.path.join(self.err_dir, self.err_file),
'STDLOG': os.path.join(self.log_dir, self.log_file),
'CPUS': str(self.cpus),
'MEMORY': self.memory,
'DISK': self.disk,
'OTHER_ARGS': other_args_str
}
for pattern, replacement in replacement_dict.iteritems():
if replacement:
template = template.replace("{%s}" % pattern, replacement)
# Add jobs
if dag_mode:
# actual arguments are in the DAG file, only placeholders here
template += 'arguments=$(%s)\n' % ht.DAGMan.JOB_VAR_NAME
template += 'queue\n'
else:
# specifiy each job in submit file
for name, job in self.jobs.iteritems():
template += '\n# %s\n' % name
template += 'arguments="%s"\n' % job.generate_job_arg_str()
template += '\nqueue %d\n' % job.quantity
# Check we haven't left any unused tokens in the template.
# If we have, then remove them.
leftover_tokens = re.findall(r'{\w*}', template)
if leftover_tokens:
log.debug('Leftover tokens in job file:')
for tok in leftover_tokens:
log.debug('%s', tok)
template = template.replace(tok, '')
return template
[docs] def transfer_to_hdfs(self):
"""Copy any necessary input files to HDFS.
This transfers both common exe/setup (if self.share_exe_setup == True),
and the individual files required by each Job.
"""
# Do copying of exe/setup script here instead of through Jobs if only
# 1 instance required on HDFS.
if self.share_exe_setup:
if self.copy_exe:
log.info('Copying %s -->> %s', self.exe, self.hdfs_store)
cp_hdfs(self.exe, self.hdfs_store)
if self.setup_script:
log.info('Copying %s -->> %s', self.setup_script, self.hdfs_store)
cp_hdfs(self.setup_script, self.hdfs_store)
# Transfer common input files
for ifile in self.common_input_file_mirrors:
log.info('Copying %s -->> %s', ifile.original, ifile.hdfs)
cp_hdfs(ifile.original, ifile.hdfs)
# Get each job to transfer their necessary files
for job in self.jobs.itervalues():
job.transfer_to_hdfs()
[docs] def submit(self, force=False):
"""Write HTCondor job file, copy necessary files to HDFS, and submit.
Also prints out info for user.
Parameters
----------
force : bool, optional
Force condor_submit
Raises
------
CalledProcessError
If condor_submit returns non-zero exit code.
"""
self.write(dag_mode=False)
self.transfer_to_hdfs()
cmds = ['condor_submit', self.filename]
if force:
cmds.insert(1, '-f')
check_call(cmds)
if self.log_dir == self.out_dir == self.err_dir:
log.info('Output/error/htcondor logs written to %s', self.out_dir)
else:
for t, d in {'STDOUT': self.out_dir,
'STDERR': self.err_dir,
'HTCondor log': self.log_dir}:
log.info('%s written to %s', t, d)