# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""
Plugin for SLURM.
This has been tested on SLURM 14.03.7 on the CSCS.ch machines.
"""
from __future__ import division
import re
import aiida.scheduler
from aiida.common.utils import escape_for_bash
from aiida.scheduler import SchedulerError
from aiida.scheduler.datastructures import (
JobInfo, job_states, NodeNumberJobResource)
# This maps SLURM state codes to our own status list
## List of states from the man page of squeue
## CA CANCELLED Job was explicitly cancelled by the user or system
## administrator. The job may or may not have been
## initiated.
## CD COMPLETED Job has terminated all processes on all nodes.
## CF CONFIGURING Job has been allocated resources, but are waiting
## for them to become ready for use (e.g. booting).
## CG COMPLETING Job is in the process of completing. Some processes
## on some nodes may still be active.
## F FAILED Job terminated with non-zero exit code or other
## failure condition.
## NF NODE_FAIL Job terminated due to failure of one or more allo-
## cated nodes.
## PD PENDING Job is awaiting resource allocation.
## PR PREEMPTED Job terminated due to preemption.
## R RUNNING Job currently has an allocation.
## S SUSPENDED Job has an allocation, but execution has been sus-
## pended.
## TO TIMEOUT Job terminated upon reaching its time limit.
_map_status_slurm = {
'CA': job_states.DONE,
'CD': job_states.DONE,
'CF': job_states.QUEUED,
'CG': job_states.RUNNING,
'F': job_states.DONE,
'NF': job_states.DONE,
'PD': job_states.QUEUED,
'PR': job_states.DONE,
'R': job_states.RUNNING,
'S': job_states.SUSPENDED,
'TO': job_states.DONE,
}
# From the manual,
# possible lines are:
# salloc: Granted job allocation 65537
# sbatch: Submitted batch job 65541
# and in practice, often the part before the colon can be absent.
_slurm_submitted_regexp = re.compile(
r'(.*:\s*)?([Gg]ranted job allocation|[Ss]ubmitted batch job)\s+(?P<jobid>\d+)')
# From docs,
# acceptable time formats include
# "minutes", "minutes:seconds", "hours:minutes:seconds",
# "days-hours", "days-hours:minutes" and "days-hours:minutes:seconds".
_time_regexp = re.compile(
r"""
^ # beginning of string
\s* # any number of white spaces
(?=\d) # I check that there is at least a digit
# in the string, without consuming it
((?P<days>\d+)(?P<dash>-) # the number of days, if a dash is present,
# composed by any number of digits;
# may be absent
(?=\d))? # in any case, I check that there is at least
# a digit afterwards, without consuming it
((?P<hours>\d{1,2}) # match an hour (one or two digits)
(?(dash) # check if the dash was found
| # match nothing if the dash was found:
# if the dash was found, we are sure that
# the first number is a hour
(?=:\d{1,2}:\d{1,2})))? # if no dash was found, the first
# element found is an hour only if
# it is followed by two more fields (mm:ss)
(?P<firstcolon>:)? # there (can) possibly be a further colon,
# consume it
((?<!-)(?P<minutes>\d{1,2})
(:(?P<seconds>\d{1,2}))?)? # number of minutes (one or two digits)
# and seconds. A number only means minutes.
# (?<!-) means that the location BEFORE
# the current position does NOT
# match a dash, because the string 1-2
# means 1 day and 2 hours, NOT one day and
# 2 minutes
\s* # any number of whitespaces
$ # end of line
""", re.VERBOSE)
# Separator between fields in the output of squeue
_field_separator = "^^^"
[docs]class SlurmJobResource(NodeNumberJobResource):
[docs] def __init__(self, *args, **kwargs):
"""
It extends the base class init method and calculates the
num_cores_per_mpiproc fields to pass to Slurm schedulers.
Checks that num_cores_per_machine should be a multiple of
num_cores_per_mpiproc and/or num_mpiprocs_per_machine
Check sequence
1. If num_cores_per_mpiproc and num_cores_per_machine both are
specified check whether it satisfies the check
2. If only num_cores_per_machine is passed, calculate
num_cores_per_mpiproc which should always be an integer value
3. If only num_cores_per_mpiproc is passed, use it
"""
super(SlurmJobResource, self).__init__(*args, **kwargs)
value_error = ("num_cores_per_machine must be equal to "
"num_cores_per_mpiproc * num_mpiprocs_per_machine, "
"and in perticular it should be a multiple of "
"num_cores_per_mpiproc and/or num_mpiprocs_per_machine")
if (self.num_cores_per_machine is not None and
self.num_cores_per_mpiproc is not None):
if self.num_cores_per_machine != (self.num_cores_per_mpiproc
* self.num_mpiprocs_per_machine):
# If user specify both values, check if specified
# values are correct
raise ValueError(value_error)
elif self.num_cores_per_machine is not None:
if self.num_cores_per_machine <= 0:
raise ValueError("num_cores_per_machine must be >=1")
# calculate num_cores_per_mpiproc
# In this plugin we never used num_cores_per_machine so if it
# is not defined it is OK.
self.num_cores_per_mpiproc = (self.num_cores_per_machine
/ self.num_mpiprocs_per_machine)
if(isinstance(self.num_cores_per_mpiproc, int)):
raise ValueError(value_error)
[docs]class SlurmScheduler(aiida.scheduler.Scheduler):
"""
Support for the SLURM scheduler (http://slurm.schedmd.com/).
"""
_logger = aiida.scheduler.Scheduler._logger.getChild('slurm')
# Query only by list of jobs and not by user
_features = {
'can_query_by_user': False,
}
# The class to be used for the job resource.
_job_resource_class = SlurmJobResource
# Fields to query or to parse
# Unavailable fields: substate, cputime
fields = [
("%i",'job_id'), # job or job step id
("%t",'state_raw'), # job state in compact form
("%r",'annotation'), # reason for the job being in its current state
("%B",'executing_host'), # Executing (batch) host
("%u",'username'), # username
("%D",'number_nodes'), # number of nodes allocated
("%C",'number_cpus'), # number of allocated cores (if already running)
("%R",'allocated_machines'), # list of allocated nodes when running, otherwise
# reason within parenthesis
("%P",'partition'), # partition (queue) of the job
("%l",'time_limit'), # time limit in days-hours:minutes:seconds
("%M",'time_used'), # Time used by the job in days-hours:minutes:seconds
("%S",'dispatch_time'), # actual or expected dispatch time (start time)
("%j",'job_name'), # job name (title)
("%V", 'submission_time') # This is probably new, it exists in version
# 14.03.7 and later
]
[docs] def _get_joblist_command(self, jobs=None, user=None):
"""
The command to report full information on existing jobs.
Separate the fields with the _field_separator string order:
jobnum, state, walltime, queue[=partition], user, numnodes, numcores, title
"""
from aiida.common.exceptions import FeatureNotAvailable
# I add the environment variable SLURM_TIME_FORMAT in front to be
# sure to get the times in 'standard' format
command = ["SLURM_TIME_FORMAT='standard'", "squeue", "--noheader",
"-o '{}'".format(_field_separator.join(
_[0] for _ in self.fields))]
if user and jobs:
raise FeatureNotAvailable("Cannot query by user and job(s) in SLURM")
if user:
command.append('-u{}'.format(user))
if jobs:
joblist = []
if isinstance(jobs, basestring):
joblist.append(jobs)
else:
if not isinstance(jobs, (tuple, list)):
raise TypeError(
"If provided, the 'jobs' variable must be a string or "
"a list of strings")
joblist = jobs
command.append('--jobs={}'.format(','.join(joblist)))
comm = ' '.join(command)
self.logger.debug("squeue command: {}".format(comm))
return comm
[docs] def _get_detailed_jobinfo_command(self,jobid):
"""
Return the command to run to get the detailed information on a job,
even after the job has finished.
The output text is just retrieved, and returned for logging purposes.
--parsable split the fields with a pipe (|), adding a pipe also at
the end.
"""
return "sacct --format=AllocCPUS,Account,AssocID,AveCPU,AvePages,AveRSS,AveVMSize,Cluster,Comment,CPUTime,CPUTimeRAW,DerivedExitCode,Elapsed,Eligible,End,ExitCode,GID,Group,JobID,JobName,MaxRSS,MaxRSSNode,MaxRSSTask,MaxVMSize,MaxVMSizeNode,MaxVMSizeTask,MinCPU,MinCPUNode,MinCPUTask,NCPUS,NNodes,NodeList,NTasks,Priority,Partition,QOSRAW,ReqCPUS,Reserved,ResvCPU,ResvCPURAW,Start,State,Submit,Suspended,SystemCPU,Timelimit,TotalCPU,UID,User,UserCPU --parsable --jobs={}".format(jobid)
[docs] def _get_submit_command(self, submit_script):
"""
Return the string to execute to submit a given script.
Args:
submit_script: the path of the submit script relative to the working
directory.
IMPORTANT: submit_script should be already escaped.
"""
submit_command = 'sbatch {}'.format(submit_script)
self.logger.info("submitting with: " + submit_command)
return submit_command
[docs] def _parse_submit_output(self, retval, stdout, stderr):
"""
Parse the output of the submit command, as returned by executing the
command returned by _get_submit_command command.
To be implemented by the plugin.
Return a string with the JobID.
"""
if retval != 0:
self.logger.error("Error in _parse_submit_output: retval={}; "
"stdout={}; stderr={}".format(retval, stdout, stderr))
raise SchedulerError("Error during submission, retval={}\n"
"stdout={}\nstderr={}".format(
retval, stdout, stderr))
try:
transport_string = " for {}".format(self.transport)
except SchedulerError:
transport_string = ""
if stderr.strip():
self.logger.warning("in _parse_submit_output{}: "
"there was some text in stderr: {}".format(
transport_string,stderr))
# I check for a valid string in the output.
# See comments near the regexp above.
# I check for the first line that matches.
for l in stdout.split('\n'):
match = _slurm_submitted_regexp.match(l.strip())
if match:
return match.group('jobid')
# If I am here, no valid line could be found.
self.logger.error("in _parse_submit_output{}: "
"unable to find the job id: {}".format(
transport_string,stdout))
raise SchedulerError(
"Error during submission, could not retrieve the jobID from "
"sbatch output; see log for more info.")
[docs] def _parse_joblist_output(self, retval, stdout, stderr):
"""
Parse the queue output string, as returned by executing the
command returned by _get_joblist_command command,
that is here implemented as a list of lines, one for each
job, with _field_separator as separator. The order is described
in the _get_joblist_command function.
Return a list of JobInfo objects, one of each job,
each relevant parameters implemented.
Note: depending on the scheduler configuration, finished jobs may
either appear here, or not.
This function will only return one element for each job find
in the qstat output; missing jobs (for whatever reason) simply
will not appear here.
"""
num_fields = len(self.fields)
# I don't raise because if I pass a list of jobs,
# I get a non-zero status
# if one of the job is not in the list anymore
# retval should be zero
#if retval != 0:
#self.logger.warning("Error in _parse_joblist_output: retval={}; "
# "stdout={}; stderr={}".format(retval, stdout, stderr))
# issue a warning if there is any stderr output and
# there is no line containing "Invalid job id specified", that happens
# when I ask for specific calculations, and they are all finished
if stderr.strip() and "Invalid job id specified" not in stderr:
self.logger.warning("Warning in _parse_joblist_output, non-empty "
"stderr='{}'".format(stderr.strip()))
if retval != 0:
raise SchedulerError(
"Error during squeue parsing (_parse_joblist_output function)")
# will contain raw data parsed from output: only lines with the
# separator, and already split in fields
# I put num_fields, because in this way
# if the symbol _field_separator appears in the title (that is
# the last field), I don't split the title.
# This assumes that _field_separator never
# appears in any previous field.
jobdata_raw = [l.split(_field_separator,num_fields)
for l in stdout.splitlines()
if _field_separator in l]
# Create dictionary and parse specific fields
job_list = []
for job in jobdata_raw:
thisjob_dict = {k[1]: v for k, v in zip(self.fields, job)}
this_job = JobInfo()
try:
this_job.job_id = thisjob_dict['job_id']
this_job.annotation = thisjob_dict['annotation']
job_state_raw = thisjob_dict['state_raw']
except KeyError:
# I skip this calculation if I couldn't find this basic info
# (I don't append anything to job_list before continuing)
self.logger.error("Wrong line length in squeue output! '{}'"
"".format(job))
continue
try:
job_state_string = _map_status_slurm[job_state_raw]
except KeyError:
self.logger.warning("Unrecognized job_state '{}' for job "
"id {}".format(job_state_raw,
this_job.job_id))
job_state_string = job_states.UNDETERMINED
# QUEUED_HELD states are not specific states in SLURM;
# they are instead set with state QUEUED, and then the
# annotation tells if the job is held.
# I check for 'Dependency', 'JobHeldUser',
# 'JobHeldAdmin', 'BeginTime'.
# Other states should not bring the job in QUEUED_HELD, I believe
# (the man page of slurm seems to be incomplete, for instance
# JobHeld* are not reported there; I also checked at the source code
# of slurm 2.6 on github (https://github.com/SchedMD/slurm),
# file slurm/src/common/slurm_protocol_defs.c,
# and these seem all the states to be taken into account for the
# QUEUED_HELD status).
# There are actually a few others, like possible
# failures, or partition-related reasons, but for the moment I
# leave them in the QUEUED state.
if (job_state_string == job_states.QUEUED and
this_job.annotation in
['Dependency', 'JobHeldUser', 'JobHeldAdmin', 'BeginTime']):
job_state_string = job_states.QUEUED_HELD
this_job.job_state = job_state_string
####
# Up to here, I just made sure that there were at least three
# fields, to set the most important fields for a job.
# I now check if the length is equal to the number of fields
if len(job) < num_fields:
# I store this job only with the information
# gathered up to now, and continue to the next job
# Also print a warning
self.logger.warning("Wrong line length in squeue output!"
"Skipping optional fields. Line: '{}'"
"".format(jobdata_raw))
# I append this job before continuing
job_list.append(this_job)
continue
# TODO: store executing_host?
this_job.job_owner = thisjob_dict['username']
try:
this_job.num_machines = int(thisjob_dict['number_nodes'])
except ValueError:
self.logger.warning("The number of allocated nodes is not "
"an integer ({}) for job id {}!".format(
thisjob_dict['number_nodes'],
this_job.job_id))
try:
this_job.num_mpiprocs = int(thisjob_dict['number_cpus'])
except ValueError:
self.logger.warning("The number of allocated cores is not "
"an integer ({}) for job id {}!".format(
thisjob_dict['number_cpus'],
this_job.job_id))
# ALLOCATED NODES HERE
# string may be in the format
# nid00[684-685,722-723,748-749,958-959]
# therefore it requires some parsing, that is unnecessary now.
# I just store is as a raw string for the moment, and I leave
# this_job.allocated_machines undefined
if this_job.job_state == job_states.RUNNING:
this_job.allocated_machines_raw = thisjob_dict[
'allocated_machines']
this_job.queue_name = thisjob_dict['partition']
try:
this_job.requested_wallclock_time_seconds = (self._convert_time(
thisjob_dict['time_limit']))
except ValueError:
self.logger.warning("Error parsing the time limit "
"for job id {}".format(this_job.job_id))
# Only if it is RUNNING; otherwise it is not meaningful,
# and may be not set (in my test, it is set to zero)
if this_job.job_state == job_states.RUNNING:
try:
this_job.wallclock_time_seconds = (self._convert_time(
thisjob_dict['time_used']))
except ValueError:
self.logger.warning("Error parsing time_used "
"for job id {}".format(this_job.job_id))
try:
this_job.dispatch_time = self._parse_time_string(
thisjob_dict['dispatch_time'])
except ValueError:
self.logger.warning("Error parsing dispatch_time for job "
"id {}".format(this_job.job_id))
try:
this_job.submission_time = self._parse_time_string(
thisjob_dict['submission_time'])
except ValueError:
self.logger.warning("Error parsing submission_time for job "
"id {}".format(this_job.job_id))
this_job.title = thisjob_dict['job_name']
# Everything goes here anyway for debugging purposes
this_job.raw_data = job
# Double check of redundant info
# Not really useful now, allocated_machines in this
# version of the plugin is never set
if (this_job.allocated_machines is not None and
this_job.num_machines is not None):
if len(this_job.allocated_machines) != this_job.num_machines:
self.logger.error("The length of the list of allocated "
"nodes ({}) is different from the "
"expected number of nodes ({})!".format(
len(this_job.allocated_machines),
this_job.num_machines))
# I append to the list of jobs to return
job_list.append(this_job)
return job_list
[docs] def _convert_time(self,string):
"""
Convert a string in the format DD-HH:MM:SS to a number of seconds.
"""
groups = _time_regexp.match(string)
if groups is None:
self.logger.warning("Unrecognized format for "
"time string '{}'".format(string))
raise ValueError("Unrecognized format for time string.")
groupdict = groups.groupdict()
# should not raise a ValueError, they all match digits only
days = int(groupdict['days'] if groupdict['days'] is not None
else 0)
hours = int(groupdict['hours'] if groupdict['hours'] is not None
else 0)
mins = int(groupdict['minutes'] if groupdict['minutes'] is not None
else 0)
secs = int(groupdict['seconds'] if groupdict['seconds'] is not None
else 0)
return days * 86400 + hours * 3600 + mins * 60 + secs
[docs] def _parse_time_string(self,string,fmt='%Y-%m-%dT%H:%M:%S'):
"""
Parse a time string in the format returned from qstat -f and
returns a datetime object.
"""
import time, datetime
try:
time_struct = time.strptime(string,fmt)
except Exception as e:
self.logger.debug("Unable to parse time string {}, the message "
"was {}".format(string, e.message))
raise ValueError("Problem parsing the time string.")
# I convert from a time_struct to a datetime object going through
# the seconds since epoch, as suggested on stackoverflow:
# http://stackoverflow.com/questions/1697815
return datetime.datetime.fromtimestamp(time.mktime(time_struct))
[docs] def _get_kill_command(self, jobid):
"""
Return the command to kill the job with specified jobid.
"""
submit_command = 'scancel {}'.format(jobid)
self.logger.info("killing job {}".format(jobid))
return submit_command
[docs] def _parse_kill_output(self, retval, stdout, stderr):
"""
Parse the output of the kill command.
To be implemented by the plugin.
:return: True if everything seems ok, False otherwise.
"""
if retval != 0:
self.logger.error("Error in _parse_kill_output: retval={}; "
"stdout={}; stderr={}".format(retval, stdout, stderr))
return False
try:
transport_string = " for {}".format(self.transport)
except SchedulerError:
transport_string = ""
if stderr.strip():
self.logger.warning("in _parse_kill_output{}: "
"there was some text in stderr: {}".format(
transport_string,stderr))
if stdout.strip():
self.logger.warning("in _parse_kill_output{}: "
"there was some text in stdout: {}".format(
transport_string,stdout))
return True