# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""
Plugin for SGE.
This has been tested on GE 6.2u3.
Plugin originally written by Marco Dorigo.
Email: marco(DOT)dorigo(AT)rub(DOT)de
"""
from __future__ import division
import aiida.scheduler
import xml.parsers.expat
from aiida.common.utils import escape_for_bash
from aiida.scheduler import SchedulerError, SchedulerParsingError
from aiida.scheduler.datastructures import (
JobInfo, job_states, MachineInfo, ParEnvJobResource)
"""
'http://www.loni.ucla.edu/twiki/bin/view/Infrastructure/GridComputing?skin=plain':
Jobs Status:
'qw' - Queued and waiting,
'w' - Job waiting,
's' - Job suspended,
't' - Job transferring and about to start,
'r' - Job running,
'h' - Job hold,
'R' - Job restarted,
'd' - Job has been marked for deletion,
'Eqw' - An error occurred with the job.
'http://confluence.rcs.griffith.edu.au:8080/display/v20zCluster/
Sun+Grid+Engine+SGE+state+letter+symbol+codes+meanings':
Category State SGE Letter Code
Pending: pending qw
Pending: pending, user hold qw
Pending: pending, system hold hqw
Pending: pending, user and system hold hqw
Pending: pending, user hold, re-queue hRwq
Pending: pending, system hold, re-queue hRwq
Pending: pending, user and system hold, re-queue hRwq
Pending: pending, user hold qw
Pending: pending, user hold qw
Running running r
Running transferring t
Running running, re-submit Rr
Running transferring, re-submit Rt
Suspended job suspended s, ts
Suspended queue suspended S, tS
Suspended queue suspended by alarm T, tT
Suspended all suspended with re-submit Rs, Rts, RS, RtS, RT, RtT
Error all pending states with error Eqw, Ehqw, EhRqw
Deleted all running and suspended states with deletion dr, dt, dRr, dRt,
ds, dS, dT, dRs,
dRS, dRT
"""
_map_status_sge = {
'qw' : job_states.QUEUED,
'w' : job_states.QUEUED,
'hqw' : job_states.QUEUED_HELD,
'hRwq' : job_states.QUEUED_HELD,
'r' : job_states.RUNNING,
't' : job_states.RUNNING,
'R' : job_states.RUNNING,
'Rr' : job_states.RUNNING,
'Rt' : job_states.RUNNING,
's' : job_states.SUSPENDED,
'st' : job_states.SUSPENDED,
'Rs' : job_states.SUSPENDED,
'Rts' : job_states.SUSPENDED,
'dr' : job_states.UNDETERMINED,
'dt' : job_states.UNDETERMINED,
'ds' : job_states.UNDETERMINED,
'dRr' : job_states.UNDETERMINED,
'dRt' : job_states.UNDETERMINED,
'dRs' : job_states.UNDETERMINED,
'Eqw' : job_states.UNDETERMINED,
'Ehqw' : job_states.UNDETERMINED,
'EhRqw' : job_states.UNDETERMINED
}
[docs]class SgeJobResource(ParEnvJobResource):
pass
[docs]class SgeScheduler(aiida.scheduler.Scheduler):
"""
Support for the Sun Grid Engine scheduler and its variants/forks (Son of Grid Engine, Oracle Grid Engine, ...)
"""
_logger = aiida.scheduler.Scheduler._logger.getChild('sge')
# For SGE, we can have a good qstat xml output by querying by
# user, but not by job id
_features = {
'can_query_by_user': True,
}
# The class to be used for the job resource.
_job_resource_class = SgeJobResource
[docs] def _get_joblist_command(self,jobs=None,user=None):
"""
The command to report full information on existing jobs.
TODO: in the case of job arrays, decide what to do (i.e., if we want
to pass the -t options to list each subjob).
!!!ALL COPIED FROM PBSPRO!!!
TODO: understand if it is worth escaping the username,
or rather leave it unescaped to allow to pass $USER
"""
from aiida.common.exceptions import FeatureNotAvailable
if jobs:
raise FeatureNotAvailable("Cannot query by jobid in SGE")
command = "qstat -ext -urg -xml "
if user:
command += "-u {}".format(str(user))
else:
# All users if no user is specified
command += "-u '*'"
self.logger.debug("qstat command: {}".format(command))
return command
#raise NotImplementedError
[docs] def _get_detailed_jobinfo_command(self,jobid):
command = "qacct -j {}".format(escape_for_bash(jobid))
return command
[docs] def _get_submit_command(self, submit_script):
"""
Return the string to execute to submit a given script.
Args:
submit_script: the path of the submit script relative to the working
directory.
IMPORTANT: submit_script should be already escaped.
"""
submit_command = 'qsub -terse {}'.format(submit_script)
self.logger.info("submitting with: " + submit_command)
return submit_command
[docs] def _parse_joblist_output(self, retval, stdout, stderr):
import xml.dom.minidom
if retval != 0:
self.logger.error("Error in _parse_joblist_output: retval={}; "
"stdout={}; stderr={}".format(retval, stdout, stderr))
raise SchedulerError("Error during joblist retrieval, retval={}".\
format(retval))
if stderr.strip():
self.logger.warning("in _parse_joblist_output for {}: "
"there was some text in stderr: {}".format(
str(self.transport),stderr))
if stdout:
try:
xmldata = xml.dom.minidom.parseString(stdout)
except xml.parsers.expat.ExpatError:
self.logger.error("in sge._parse_joblist_output: "
"xml parsing of stdout failed:"
"{}".format(stdout))
raise SchedulerParsingError("Error during joblist retrieval,"
"xml parsing of stdout failed")
else:
self.logger.error("Error in sge._parse_joblist_output: retval={}; "
"stdout={}; stderr={}".format(retval, stdout, stderr))
raise SchedulerError("Error during joblist retrieval,"
"no stdout produced")
try:
first_child = xmldata.firstChild
second_childs = first_child.childNodes
tag_names_sec = [elem.tagName for elem in second_childs \
if elem.nodeType == 1]
if not 'queue_info' in tag_names_sec:
self.logger.error("Error in sge._parse_joblist_output: "
"no queue_info: {}".\
format(stdout))
raise SchedulerError
if not 'job_info' in tag_names_sec:
self.logger.error("Error in sge._parse_joblist_output: "
"no job_info: {}".\
format(stdout))
raise SchedulerError
except SchedulerError:
self.logger.error("Error in sge._parse_joblist_output: stdout={}"\
.format(stdout))
raise SchedulerError("Error during xml processing, of stdout:"
"There is no 'job_info' or no 'queue_info'"
"element or there are no jobs!")
#If something weird happens while firstChild, pop, etc:
except Exception:
self.logger.error("Error in sge._parse_joblist_output: stdout={}"\
.format(stdout))
raise SchedulerError("Error during xml processing, of stdout")
jobs = [i for i in first_child.getElementsByTagName('job_list')]
#jobs = [i for i in jobinfo.getElementsByTagName('job_list')]
#print [i[0].childNodes[0].data for i in job_numbers if i]
joblist = []
for job in jobs:
this_job = JobInfo()
#In case the user needs more information the xml-data for
#each job is stored:
this_job.raw_data = job.toxml()
try:
job_element = job.getElementsByTagName('JB_job_number').pop(0)
element_child = job_element.childNodes.pop(0)
this_job.job_id = str(element_child.data).strip()
if not this_job.job_id:
raise SchedulerError
except SchedulerError:
self.logger.error("Error in sge._parse_joblist_output:"
"no job id is given, stdout={}"\
.format(stdout))
raise SchedulerError("Error in sge._parse_joblist_output:"
"no job id is given")
except IndexError:
self.logger.error("No 'job_number' given for job index {} in "
"job list, stdout={}".format(jobs.index(job)\
,stdout))
raise IndexError("Error in sge._parse_joblist_output:"
"no job id is given")
try:
job_element = job.getElementsByTagName('state').pop(0)
element_child = job_element.childNodes.pop(0)
job_state_string = str(element_child.data).strip()
try:
this_job.job_state = _map_status_sge[job_state_string]
except KeyError:
self.logger.warning("Unrecognized job_state '{}' for job "
"id {}".format(job_state_string,
this_job.job_id))
this_job.job_state = job_states.UNDETERMINED
except IndexError:
self.logger.warning("No 'job_state' field for job id {} in"
"stdout={}".format(this_job.job_id,stdout))
this_job.job_state = job_states.UNDETERMINED
try:
job_element = job.getElementsByTagName('JB_owner').pop(0)
element_child = job_element.childNodes.pop(0)
this_job.job_owner = str(element_child.data).strip()
except IndexError:
self.logger.warning("No 'job_owner' field for job "
"id {}".format(this_job.job_id))
try:
job_element = job.getElementsByTagName('JB_name').pop(0)
element_child = job_element.childNodes.pop(0)
this_job.title = str(element_child.data).strip()
except IndexError:
self.logger.warning("No 'title' field for job "
"id {}".format(this_job.job_id))
try:
job_element = job.getElementsByTagName('queue_name').pop(0)
element_child = job_element.childNodes.pop(0)
this_job.queue_name = str(element_child.data).strip()
except IndexError:
if this_job.job_state == job_states.RUNNING:
self.logger.warning("No 'queue_name' field for job "
"id {}".format(this_job.job_id))
try:
job_element = job.getElementsByTagName('JB_submission_time').pop(0)
element_child = job_element.childNodes.pop(0)
time_string = str(element_child.data).strip()
try:
this_job.submission_time = self._parse_time_string(
time_string)
except ValueError:
self.logger.warning("Error parsing 'JB_submission_time' "
"for job id {} ('{}')".format(this_job.job_id,
time_string))
except IndexError:
try:
job_element = job.getElementsByTagName('JAT_start_time').pop(0)
element_child = job_element.childNodes.pop(0)
time_string = str(element_child.data).strip()
try:
this_job.dispatch_time = self._parse_time_string(
time_string)
except ValueError:
self.logger.warning("Error parsing 'JAT_start_time'"
"for job id {} ('{}')".format(this_job.job_id,
time_string))
except IndexError:
self.logger.warning("No 'JB_submission_time' and no "
"'JAT_start_time' field for job "
"id {}".format(this_job.job_id))
#There is also cpu_usage, mem_usage, io_usage information available:
if this_job.job_state == job_states.RUNNING:
try:
job_element = job.getElementsByTagName('slots').pop(0)
element_child = job_element.childNodes.pop(0)
this_job.num_mpiprocs = str(element_child.data).strip()
except IndexError:
self.logger.warning("No 'slots' field for job "
"id {}".format(this_job.job_id))
joblist.append(this_job)
#self.logger.debug("joblist final: {}".format(joblist))
return joblist
[docs] def _parse_submit_output(self, retval, stdout, stderr):
"""
Parse the output of the submit command, as returned by executing the
command returned by _get_submit_command command.
To be implemented by the plugin.
Return a string with the JobID.
"""
import re
if retval != 0:
self.logger.error("Error in _parse_submit_output: retval={}; "
"stdout={}; stderr={}".format(retval, stdout, stderr))
raise SchedulerError("Error during submission, retval={}\n"
"stdout={}\nstderr={}".format(
retval, stdout, stderr))
if stderr.strip():
self.logger.warning("in _parse_submit_output for {}: "
"there was some text in stderr: {}".format(
str(self.transport),stderr))
return stdout.strip()
[docs] def _parse_time_string(self,string,fmt='%Y-%m-%dT%H:%M:%S'):
"""
Parse a time string in the format returned from qstat -xml -ext and
returns a datetime object.
Example format: 2013-06-13T11:53:11
"""
import time, datetime
try:
time_struct = time.strptime(string,fmt)
except Exception as e:
self.logger.debug("Unable to parse time string {}, the message "
"was {}".format(string, e.message))
raise ValueError("Problem parsing the time string.")
# I convert from a time_struct to a datetime object going through
# the seconds since epoch, as suggested on stackoverflow:
# http://stackoverflow.com/questions/1697815
return datetime.datetime.fromtimestamp(time.mktime(time_struct))
[docs] def _get_kill_command(self, jobid):
"""
Return the command to kill the job with specified jobid.
"""
submit_command = 'qdel {}'.format(jobid)
self.logger.info("killing job {}".format(jobid))
return submit_command
[docs] def _parse_kill_output(self, retval, stdout, stderr):
"""
Parse the output of the kill command.
To be implemented by the plugin.
:return: True if everything seems ok, False otherwise.
"""
if retval != 0:
self.logger.error("Error in _parse_kill_output: retval={}; "
"stdout={}; stderr={}".format(retval, stdout, stderr))
return False
if stderr.strip():
self.logger.warning("in _parse_kill_output for {}: "
"there was some text in stderr: {}".format(
str(self.transport), stderr))
if stdout.strip():
self.logger.info("in _parse_kill_output for {}: "
"there was some text in stdout: {}".format(
str(self.transport), stdout))
return True