# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
import os
import sys
from aiida.backends.utils import load_dbenv, is_dbenv_loaded
from aiida.cmdline import delayed_load_node as load_node
from aiida.cmdline.baseclass import VerdiCommandWithSubcommands
[docs]class Calculation(VerdiCommandWithSubcommands):
"""
Query and interact with calculations
Different subcommands allow to list the running calculations, show the
content of the input/output files, see the logs, etc.
"""
[docs] def __init__(self):
"""
A dictionary with valid commands and functions to be called:
list.
"""
from aiida.cmdline.commands.node import _Label, _Description
labeler = _Label('calculation')
descriptioner = _Description('calculation')
self.valid_subcommands = {
'gotocomputer': (self.calculation_gotocomputer, self.complete_none),
'list': (self.calculation_list, self.complete_none),
'logshow': (self.calculation_logshow, self.complete_none),
'kill': (self.calculation_kill, self.complete_none),
'inputls': (self.calculation_inputls, self.complete_none),
'outputls': (self.calculation_outputls, self.complete_none),
'inputcat': (self.calculation_inputcat, self.complete_none),
'outputcat': (self.calculation_outputcat, self.complete_none),
'res': (self.calculation_res, self.complete_none),
'show': (self.calculation_show, self.complete_none),
'plugins': (self.calculation_plugins, self.complete_plugins),
'cleanworkdir': (self.calculation_cleanworkdir, self.complete_none),
'label': (labeler.run, self.complete_none),
'description': (descriptioner.run, self.complete_none),
}
[docs] def complete_plugins(self, subargs_idx, subargs):
"""
Return the list of plugins of the JobCalculation subclass of Calculation
"""
if not is_dbenv_loaded():
load_dbenv()
from aiida.common.pluginloader import all_plugins
plugins = sorted(all_plugins('calculations'))
# Do not return plugins that are already on the command line
other_subargs = subargs[:subargs_idx] + subargs[subargs_idx + 1:]
return_plugins = [_ for _ in plugins if _ not in other_subargs]
# print >> sys.stderr, "*", subargs
return "\n".join(return_plugins)
[docs] def calculation_gotocomputer(self, *args):
"""
Open a shell to the calc folder on the cluster
This command opens a ssh connection to the scratch folder on the remote
computer on which the calculation is being/has been executed.
"""
from aiida.common.exceptions import NotExistent
if not is_dbenv_loaded():
load_dbenv()
try:
calc_id = args[0]
except IndexError:
print >> sys.stderr, "Pass as further argument a calculation ID or UUID."
sys.exit(1)
try:
pk = int(calc_id)
is_pk = True
except ValueError:
uuid = calc_id
is_pk = False
print "Loading environment..."
from aiida.orm import JobCalculation
try:
if is_pk:
calc = load_node(pk)
else:
calc = load_node(uuid)
except NotExistent:
print >> sys.stderr, "No node exists with ID={}.".format(calc_id)
sys.exit(1)
if not isinstance(calc, JobCalculation):
print >> sys.stderr, "Node with ID={} is not a calculation; it is a {}".format(
calc_id, calc.__class__.__name__)
sys.exit(1)
# get the transport
try:
t = calc._get_transport()
except NotExistent as e:
print >> sys.stderr, e.message
sys.exit(1)
# get the remote directory
remotedir = calc._get_remote_workdir()
if not remotedir:
print >> sys.stderr, "No remote work directory is set for this calculation!"
print >> sys.stderr, "(It is possible that the daemon did not submit the calculation yet)"
sys.exit(1)
# get the command to run (does not require to open the connection!)
cmd_to_run = t.gotocomputer_command(remotedir)
# Connect (execute command)
print "Going to the remote folder..."
# print cmd_to_run
os.system(cmd_to_run)
[docs] def calculation_list(self, *args):
"""
Return a list of calculations on screen.
"""
if not is_dbenv_loaded():
load_dbenv()
from aiida.common.datastructures import calc_states
import argparse
from aiida.orm.calculation.job import JobCalculation as C
from aiida.common.setup import get_property
parser = argparse.ArgumentParser(
prog=self.get_full_command_name(),
description='List AiiDA calculations.')
# The default states are those that are shown if no option is given
parser.add_argument('-s', '--states', nargs='+', type=str,
help="show only the AiiDA calculations with given state",
default=[calc_states.WITHSCHEDULER,
calc_states.NEW,
calc_states.TOSUBMIT,
calc_states.SUBMITTING,
calc_states.COMPUTED,
calc_states.RETRIEVING,
calc_states.PARSING,
])
parser.add_argument('-p', '--past-days', metavar='N',
help="add a filter to show only calculations created in the past N days",
action='store', type=int)
parser.add_argument('-g', '--group', '--group-name',
metavar='GROUPNAME',
help="add a filter to show only calculations within a given group",
action='store', type=str)
parser.add_argument('-G', '--group-pk', metavar='GROUPPK',
help="add a filter to show only calculations within a given group",
action='store', type=int)
parser.add_argument('pks', type=int, nargs='*',
help="a list of calculations to show. If empty, all running calculations are shown. If non-empty, ignores the -p and -r options.")
parser.add_argument('-a', '--all-states',
dest='all_states', action='store_true',
help="Overwrite manual set of states if present, and look for calculations in every possible state")
parser.set_defaults(all_states=False)
parser.add_argument('-A', '--all-users',
dest='all_users', action='store_true',
help="Show calculations for all users, rather than only for the current user")
parser.set_defaults(all_users=False)
parser.add_argument('-t', '--absolute-time',
dest='relative_ctime', action='store_false', default=True,
help="Print the absolute creation time, rather than the relative creation time")
parser.add_argument('-l', '--limit',
type=int, default=None,
help='set a limit to the number of rows returned')
parser.add_argument('-o', '--order-by',
choices=['id', 'ctime'],
default='ctime',
help='order the results')
parser.add_argument('--project',
choices=(
'pk', 'state', 'ctime', 'sched', 'computer',
'type', 'description', 'label', 'uuid',
'mtime', 'user'
),
nargs='+',
default=get_property("verdishell.calculation_list"),
#('pk', 'ctime', 'state', 'sched', 'computer', 'type', 'label'),
help="Define the list of properties to show"
)
args = list(args)
parsed_args = parser.parse_args(args)
capital_states = [i.upper() for i in parsed_args.states]
parsed_args.states = capital_states
if parsed_args.all_states:
parsed_args.states = None
C._list_calculations(
states=parsed_args.states,
past_days=parsed_args.past_days,
pks=parsed_args.pks,
all_users=parsed_args.all_users,
group=parsed_args.group,
group_pk=parsed_args.group_pk,
relative_ctime=parsed_args.relative_ctime,
# with_scheduler_state=parsed_args.with_scheduler_state,
order_by=parsed_args.order_by,
limit=parsed_args.limit,
projections=parsed_args.project,
)
[docs] def calculation_res(self, *args):
"""
Print all or somoe data from the "res" output node.
"""
from aiida.common.exceptions import NotExistent
from aiida.cmdline import print_dictionary
import argparse
parser = argparse.ArgumentParser(
prog=self.get_full_command_name(),
description='Show calculation results (from calc.res)')
parser.add_argument('PK', type=int, default=None,
help="PK of the calculation object whose results "
"must be shown.")
parser.add_argument('-f', '--format', type=str, default='json+date',
help="Format for the output.")
parser.add_argument('-k', '--keys', nargs='+', type=str,
help="Show only the selected keys.")
args = list(args)
parsed_args = parser.parse_args(args)
if not is_dbenv_loaded():
load_dbenv()
try:
calc = load_node(int(parsed_args.PK))
except ValueError:
print >> sys.stderr, "*** {}: Not a valid PK".format(parsed_args.PK)
sys.exit(1)
except NotExistent:
print >> sys.stderr, "*** {}: Not a valid calculation".format(
parsed_args.PK)
sys.exit(1)
full_dict = calc.res._get_dict()
if parsed_args.keys:
try:
the_dict = {k: full_dict[k] for k in parsed_args.keys}
except KeyError as e:
print >> sys.stderr, ("The key '{}' was not found in the .res "
"dictionary".format(e.message))
sys.exit(1)
else:
# Return all elements
the_dict = full_dict
print_dictionary(the_dict, format=parsed_args.format)
[docs] def calculation_show(self, *args):
from aiida.common.exceptions import NotExistent
from aiida.cmdline.common import print_node_info
if not is_dbenv_loaded():
load_dbenv()
table_headers = ['Link label', 'PK', 'Type']
for calc_pk in args:
try:
calc = load_node(int(calc_pk))
except ValueError:
print "*** {}: Not a valid PK".format(calc_pk)
continue
except NotExistent:
print "*** {}: Not a valid calculation".format(calc_pk)
continue
print_node_info(calc)
[docs] def calculation_logshow(self, *args):
if not is_dbenv_loaded():
load_dbenv()
from aiida.backends.utils import get_log_messages
from aiida.common.exceptions import NotExistent
from aiida.common.datastructures import calc_states
from aiida.orm.calculation.work import WorkCalculation
for calc_pk in args:
try:
calc = load_node(int(calc_pk))
except ValueError:
print "*** {}: Not a valid PK".format(calc_pk)
continue
except NotExistent:
print "*** {}: Not a valid calculation".format(calc_pk)
continue
if isinstance(calc, WorkCalculation):
print "*** {}: Is a WorkCalculation node. Use 'verdi work report' " \
"instead to show the log messages".format(calc_pk)
continue
log_messages = get_log_messages(calc)
label_string = " [{}]".format(calc.label) if calc.label else ""
state = calc.get_state()
if state == calc_states.WITHSCHEDULER:
sched_state = calc.get_scheduler_state()
if sched_state is None:
sched_state = "(unknown)"
state += ", scheduler state: {}".format(sched_state)
print "*** {}{}: {}".format(calc_pk, label_string, state)
sched_out = calc.get_scheduler_output()
sched_err = calc.get_scheduler_error()
if sched_out is None:
print "*** Scheduler output: N/A"
elif sched_out:
print "*** Scheduler output:"
print sched_out
else:
print "*** (empty scheduler output file)"
if sched_err is None:
print "*** Scheduler errors: N/A"
elif sched_err:
print "*** Scheduler errors:"
print sched_err
else:
print "*** (empty scheduler errors file)"
if log_messages:
print "*** {} LOG MESSAGES:".format(len(log_messages))
else:
print "*** 0 LOG MESSAGES"
for log in log_messages:
print "+-> {} at {}".format(log['levelname'], log['time'])
# Print the message, with a few spaces in front of each line
print "\n".join(["| {}".format(_)
for _ in log['message'].splitlines()])
[docs] def calculation_plugins(self, *args):
if not is_dbenv_loaded():
load_dbenv()
from aiida.orm import CalculationFactory
from aiida.orm.calculation.job import JobCalculation
from aiida.common.pluginloader import all_plugins
from aiida.common.exceptions import MissingPluginError
if args:
for arg in args:
try:
C = CalculationFactory(arg)
print "* {}".format(arg)
docstring = C.__doc__
if docstring is None:
docstring = "(No documentation available)"
docstring = docstring.strip()
print "\n".join([" {}".format(_.strip())
for _ in docstring.splitlines()])
print " Inputs:"
for key, val in C._use_methods.iteritems():
try:
print " {}: {}".format(key,
val['valid_types'].__name__)
except AttributeError:
print " {}: {}".format(key,
", ".join([i.__name__ for i in val['valid_types']]))
print(" Module location: {}".format(C.__module__))
except MissingPluginError:
print "! {}: NOT FOUND!".format(arg)
else:
plugins = sorted(all_plugins('calculations'))
if plugins:
print("## Pass as a further parameter one (or more) "
"plugin names to get more details on a given plugin.")
for plugin in plugins:
print "* {}".format(plugin)
else:
print "## No calculation plugins found"
[docs] def calculation_outputls(self, *args):
"""
Show the list of output files of a calculation node.
It lists the files in the 'path' subdirectory of the output node
of files retrieved by the parser. Therefore, this will not work
before files are retrieved by the daemon.
Use the -h option for more help on the command line options.
"""
import argparse
from aiida.common.exceptions import NotExistent
from aiida.cmdline.commands.node import list_repo_files
parser = argparse.ArgumentParser(
prog=self.get_full_command_name(),
description='List ourput files in the repository folder.')
parser.add_argument('calc', metavar='PK', type=int,
help='The pk of the calculation')
parser.add_argument('-p', '--path', type=str, default='', nargs='?',
help="The relative path of the file you "
"want to show. If not specified, show content"
" of all the 'path' directory")
parser.add_argument('-c', '--color', action='store_true',
help="Color folders with a different color")
args = list(args)
parsed_args = parser.parse_args(args)
if not is_dbenv_loaded():
load_dbenv()
try:
calc = load_node(parsed_args.calc)
except NotExistent as e:
print >> sys.stderr, e.message
sys.exit(1)
try:
parsed_node = calc.out.retrieved
except AttributeError:
print >> sys.stderr, ("No 'retrieved' node found. Have the "
"calculation files already been retrieved?")
sys.exit(1)
try:
list_repo_files(parsed_node,
os.path.join('path', parsed_args.path),
parsed_args.color)
except ValueError as e:
print >> sys.stderr, e.message
sys.exit(1)
[docs] def calculation_outputcat(self, *args):
"""
Show an output file of a calculation node.
It shows the files in the 'path' subdirectory of the output node
of files retrieved by the parser. Therefore, this will not work
before files are retrieved by the daemon.
Use the -h option for more help on the command line options.
"""
from aiida.cmdline.commands.node import cat_repo_files
from aiida.common.exceptions import NotExistent
import argparse
parser = argparse.ArgumentParser(
prog=self.get_full_command_name(),
description='Output the content of a file in the repository folder.')
parser.add_argument('calc', metavar='PK', type=int,
help='The pk of the calculation')
parser.add_argument('-p', '--path', type=str, default=None, nargs='?',
help="The relative path of the file you "
"want to show. Take the default output file if "
"it is not specified")
args = list(args)
parsed_args = parser.parse_args(args)
if not is_dbenv_loaded():
load_dbenv()
from aiida.common.old_pluginloader import get_class_typestring
try:
calc = load_node(parsed_args.calc)
except NotExistent as e:
print >> sys.stderr, e.message
sys.exit(1)
path = parsed_args.path
if path is None:
path = calc._DEFAULT_OUTPUT_FILE
if path is None:
base_class, plugin_string, class_name = get_class_typestring(
calc._plugin_type_string)
print >> sys.stderr, ("Calculation '{}' does not define a "
"default output file. Please specify a path "
"explicitly".format(plugin_string))
sys.exit(1)
try:
parsed_node = calc.out.retrieved
except AttributeError:
print >> sys.stderr, ("No 'retrieved' node found. Have the "
"calculation files already been retrieved?")
sys.exit(1)
try:
cat_repo_files(parsed_node, os.path.join('path', path))
except ValueError as e:
print >> sys.stderr, e.message
sys.exit(1)
except IOError as e:
import errno
# Ignore Broken pipe errors, re-raise everything else
if e.errno == errno.EPIPE:
pass
else:
raise
[docs] def calculation_kill(self, *args):
"""
Kill a calculation.
Pass a list of calculation PKs to kill them.
If you also pass the -f option, no confirmation will be asked.
"""
if not is_dbenv_loaded():
load_dbenv()
from aiida.cmdline import wait_for_confirmation
from aiida.orm.calculation.job import JobCalculation as Calc
from aiida.common.exceptions import NotExistent, InvalidOperation, \
RemoteOperationError
import argparse
parser = argparse.ArgumentParser(
prog=self.get_full_command_name(),
description='Kill AiiDA calculations.')
parser.add_argument('calcs', metavar='PK', type=int, nargs='+',
help='The principal key (PK) of the calculations to kill')
parser.add_argument('-f', '--force',
help='Force the kill of calculations',
action="store_true")
args = list(args)
parsed_args = parser.parse_args(args)
if not parsed_args.force:
sys.stderr.write(
"Are you sure to kill {} calculation{}? [Y/N] ".format(
len(parsed_args.calcs),
"" if len(parsed_args.calcs) == 1 else "s"))
if not wait_for_confirmation():
sys.exit(0)
counter = 0
for calc_pk in parsed_args.calcs:
try:
c = load_node(calc_pk, parent_class=Calc)
c.kill() # Calc.kill(calc_pk)
counter += 1
except NotExistent:
print >> sys.stderr, ("WARNING: calculation {} "
"does not exist.".format(calc_pk))
except (InvalidOperation, RemoteOperationError) as e:
print >> sys.stderr, (e.message)
print >> sys.stderr, "{} calculation{} killed.".format(counter,
"" if counter == 1 else "s")
[docs] def calculation_cleanworkdir(self, *args):
"""
Clean the working directory of calculations by removing all the content of the
associated RemoteFolder node. Calculations can be identified by pk with the -k flag
or by specifying limits on the modification times with -p/-o flags
"""
import argparse
parser = argparse.ArgumentParser(
prog=self.get_full_command_name(),
description="""
Clean all content of all output remote folders of calculations,
passed as a list of pks, or identified by modification time.
If a list of calculation PKs is not passed with the -k option, one or both
of the -p and -o options has to be specified. If both are specified, a logical
AND is done between the two, i.e. the calculations that will be cleaned have been
modified AFTER [-p option] days from now but BEFORE [-o option] days from now.
Passing the -f option will prevent the confirmation dialog from being prompted.
"""
)
parser.add_argument(
'-k', '--pk', metavar='PK', type=int, nargs='+', dest='pk',
help='The principal key (PK) of the calculations of which to clean the work directory'
)
parser.add_argument(
'-f', '--force', action='store_true',
help='Force the cleaning (no prompt)'
)
parser.add_argument(
'-p', '--past-days', metavar='N', type=int, action='store', dest='past_days',
help='Include calculations that have been modified within the last N days',
)
parser.add_argument(
'-o', '--older-than', metavar='N', type=int, action='store', dest='older_than',
help='Include calculations that have been modified more than N days ago',
)
parser.add_argument(
'-c', '--computers', metavar='label', nargs='+', type=str, action='store', dest='computer',
help='Include only calculations that were ran on these computers'
)
if not is_dbenv_loaded():
load_dbenv()
from aiida.backends.utils import get_automatic_user
from aiida.backends.utils import get_authinfo
from aiida.common.utils import query_yes_no
from aiida.orm.computer import Computer as OrmComputer
from aiida.orm.user import User as OrmUser
from aiida.orm.calculation import Calculation as OrmCalculation
from aiida.orm.querybuilder import QueryBuilder
from aiida.utils import timezone
import datetime
parsed_args = parser.parse_args(args)
# If a pk is given then the -o & -p options should not be specified
if parsed_args.pk is not None:
if (parsed_args.past_days is not None or parsed_args.older_than is not None):
print("You cannot specify both a list of calculation pks and the -p or -o options")
return
# If no pk is given then at least one of the -o & -p options should be specified
else:
if (parsed_args.past_days is None and parsed_args.older_than is None):
print("You should specify at least a list of calculations or the -p, -o options")
return
qb_user_filters = dict()
user = OrmUser(dbuser=get_automatic_user())
qb_user_filters["email"] = user.email
qb_computer_filters = dict()
if parsed_args.computer is not None:
qb_computer_filters["name"] = {"in": parsed_args.computer}
qb_calc_filters = dict()
if parsed_args.past_days is not None:
pd_ts = timezone.now() - datetime.timedelta(days=parsed_args.past_days)
qb_calc_filters["mtime"] = {">": pd_ts}
if parsed_args.older_than is not None:
ot_ts = timezone.now() - datetime.timedelta(days=parsed_args.older_than)
qb_calc_filters["mtime"] = {"<": ot_ts}
if parsed_args.pk is not None:
print("parsed_args.pk: ", parsed_args.pk)
qb_calc_filters["id"] = {"in": parsed_args.pk}
qb = QueryBuilder()
qb.append(OrmCalculation, tag="calc",
filters=qb_calc_filters,
project=["id", "uuid", "attributes.remote_workdir"])
qb.append(OrmComputer, computer_of="calc", tag="computer",
project=["*"],
filters=qb_computer_filters)
qb.append(OrmUser, creator_of="calc", tag="user",
project=["*"],
filters=qb_user_filters)
no_of_calcs = qb.count()
if no_of_calcs == 0:
print("No calculations found with the given criteria.")
return
print("Found {} calculations with the given criteria.".format(
no_of_calcs))
if not parsed_args.force:
if not query_yes_no("Are you sure you want to clean the work "
"directory?", "no"):
return
# get the uuids of all calculations matching the filters
calc_list_data = qb.dict()
# get all computers associated to the calc uuids above, and load them
# we group them by uuid to avoid computer duplicates
comp_uuid_to_computers = {_["computer"]["*"].uuid: _["computer"]["*"] for _ in calc_list_data}
# now build a dictionary with the info of folders to delete
remotes = {}
for computer in comp_uuid_to_computers.values():
# initialize a key of info for a given computer
remotes[computer.name] = {'transport': get_authinfo(
computer=computer, aiidauser=user._dbuser).get_transport(),
'computer': computer,
}
# select the calc pks done on this computer
this_calc_pks = [_["calc"]["id"] for _ in calc_list_data
if _["computer"]["*"].id == computer.id]
this_calc_uuids = [unicode(_["calc"]["uuid"])
for _ in calc_list_data
if _["computer"]["*"].id == computer.id]
remote_workdirs = [_["calc"]["attributes.remote_workdir"]
for _ in calc_list_data
if _["calc"]["id"] in this_calc_pks
if _["calc"]["attributes.remote_workdir"]
is not None]
remotes[computer.name]['remotes'] = remote_workdirs
remotes[computer.name]['uuids'] = this_calc_uuids
# now proceed to cleaning
for computer, dic in remotes.iteritems():
print("Cleaning the work directory on computer {}.".format(computer))
counter = 0
t = dic['transport']
with t:
remote_user = remote_user = t.whoami()
aiida_workdir = dic['computer'].get_workdir().format(
username=remote_user)
t.chdir(aiida_workdir)
# Hardcoding the sharding equal to 3 parts!
existing_folders = t.glob('*/*/*')
folders_to_delete = [i for i in existing_folders if
i.replace("/", "") in dic['uuids']]
for folder in folders_to_delete:
t.rmtree(folder)
counter += 1
if counter % 20 == 0 and counter > 0:
print("Deleted work directories: {}".format(counter))
print("{} remote folder(s) cleaned.".format(counter))