###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida-core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
"""`verdi process` command."""
import click
from aiida.cmdline.commands.cmd_verdi import verdi
from aiida.cmdline.params import arguments, options, types
from aiida.cmdline.utils import decorators, echo
from aiida.common.log import LOG_LEVELS, capture_logging
REPAIR_INSTRUCTIONS = """\
If one ore more processes are unreachable, you can run the following commands to try and repair them:
verdi daemon stop
verdi process repair
verdi daemon start
"""
[docs]
def valid_projections():
"""Return list of valid projections for the ``--project`` option of ``verdi process list``.
This indirection is necessary to prevent loading the imported module which slows down tab-completion.
"""
from aiida.tools.query.calculation import CalculationQueryBuilder
return CalculationQueryBuilder.valid_projections
[docs]
def default_projections():
"""Return list of default projections for the ``--project`` option of ``verdi process list``.
This indirection is necessary to prevent loading the imported module which slows down tab-completion.
"""
from aiida.tools.query.calculation import CalculationQueryBuilder
return CalculationQueryBuilder.default_projections
[docs]
def get_most_recent_node():
"""Return the most recent process node.
:returns: The ``ProcessNode`` with the latest ``ctime``.
"""
from aiida.orm import ProcessNode, QueryBuilder
process = QueryBuilder().append(ProcessNode, tag='n').order_by({'n': {'ctime': 'desc'}}).first(flat=True)
echo.echo_info(f'Most recent node matched: `{process}`.')
return process
@verdi.group('process')
def verdi_process():
"""Inspect and manage processes."""
@verdi_process.command('list')
@options.PROJECT(type=types.LazyChoice(valid_projections), default=lambda: default_projections())
@options.ORDER_BY()
@options.ORDER_DIRECTION()
@options.GROUP(help='Only include entries that are a member of this group.')
@options.ALL(help='Show all entries, regardless of their process state.')
@options.PROCESS_STATE()
@options.PROCESS_LABEL()
@options.PAUSED()
@options.EXIT_STATUS()
@options.FAILED()
@options.PAST_DAYS()
@options.LIMIT()
@options.RAW()
@click.pass_context
@decorators.with_dbenv()
def process_list(
ctx,
all_entries,
group,
process_state,
process_label,
paused,
exit_status,
failed,
past_days,
limit,
project,
raw,
order_by,
order_dir,
):
"""Show a list of running or terminated processes.
By default, only those that are still running are shown, but there are options to show also the finished ones.
"""
from tabulate import tabulate
from aiida.cmdline.commands.cmd_daemon import execute_client_command
from aiida.cmdline.utils.common import print_last_process_state_change
from aiida.common.exceptions import ConfigurationError
from aiida.engine.daemon.client import get_daemon_client
from aiida.orm import ProcessNode, QueryBuilder
from aiida.tools.query.calculation import CalculationQueryBuilder
relationships = {}
if group:
relationships['with_node'] = group
builder = CalculationQueryBuilder()
filters = builder.get_filters(all_entries, process_state, process_label, paused, exit_status, failed)
query_set = builder.get_query_set(
relationships=relationships, filters=filters, order_by={order_by: order_dir}, past_days=past_days, limit=limit
)
projected = builder.get_projected(query_set, projections=project)
headers = projected.pop(0)
if raw:
tabulated = tabulate(projected, tablefmt='plain')
echo.echo(tabulated)
return
tabulated = tabulate(projected, headers=headers)
echo.echo(tabulated)
echo.echo(f'\nTotal results: {len(projected)}\n')
if 'cached' in project:
echo.echo_report('\u267b Processes marked with check-mark were not run but taken from the cache.')
echo.echo_report('Add the option `-P pk cached_from` to the command to display cache source.')
print_last_process_state_change()
try:
client = get_daemon_client()
except ConfigurationError:
echo.echo_warning('This profile does not have a broker and so it has no daemon.')
return
if not client.is_daemon_running:
echo.echo_warning('The daemon is not running', bold=True)
return
echo.echo_report('Checking daemon load... ', nl=False)
response = execute_client_command('get_numprocesses')
if not response:
# Daemon could not be reached
return
try:
active_workers = response['numprocesses']
except KeyError:
echo.echo_report('No active daemon workers.')
else:
# Second query to get active process count. Currently this is slow but will be fixed with issue #2770. It is
# placed at the end of the command so that the user can Ctrl+C after getting the process table.
slots_per_worker = ctx.obj.config.get_option('daemon.worker_process_slots', scope=ctx.obj.profile.name)
active_processes = (
QueryBuilder()
.append(ProcessNode, filters={'attributes.process_state': {'in': ('created', 'waiting', 'running')}})
.count()
)
available_slots = active_workers * slots_per_worker
if active_workers == 0:
echo.echo_warning('The daemon has no active workers!')
echo.echo_warning('Increase the number of workers with `verdi daemon incr`.')
return
percent_load = active_processes / available_slots
if percent_load > 0.9: # 90%
echo.echo_warning(f'{percent_load * 100:.0f}% of the available daemon worker slots have been used!')
echo.echo_warning('Increase the number of workers with `verdi daemon incr`.')
else:
echo.echo_report(f'Using {percent_load * 100:.0f}% of the available daemon worker slots.')
@verdi_process.command('show')
@arguments.PROCESSES()
@options.MOST_RECENT_NODE()
@decorators.with_dbenv()
def process_show(processes, most_recent_node):
"""Show details for one or multiple processes."""
from aiida.cmdline.utils.common import get_node_info
if processes and most_recent_node:
raise click.BadOptionUsage(
'most_recent_node',
'cannot specify individual processes and the `-M/--most-recent-node` flag at the same time.',
)
if most_recent_node:
processes = [get_most_recent_node()]
for process in processes:
echo.echo(get_node_info(process))
@verdi_process.command('call-root')
@arguments.PROCESSES()
@decorators.with_dbenv()
def process_call_root(processes):
"""Show root process of the call stack for the given processes."""
for process in processes:
caller = process.caller
if caller is None:
echo.echo(f'No callers found for Process<{process.pk}>')
continue
while True:
next_caller = caller.caller
if next_caller is None:
break
caller = next_caller
echo.echo(f'{caller.pk}')
@verdi_process.command('report')
@arguments.PROCESSES()
@options.MOST_RECENT_NODE()
@click.option('-i', '--indent-size', type=int, default=2, help='Set the number of spaces to indent each level by.')
@click.option(
'-l',
'--levelname',
type=click.Choice(list(LOG_LEVELS)),
default='REPORT',
help='Filter the results by name of the log level.',
)
@click.option(
'-m', '--max-depth', 'max_depth', type=int, default=None, help='Limit the number of levels to be printed.'
)
@decorators.with_dbenv()
def process_report(processes, most_recent_node, levelname, indent_size, max_depth):
"""Show the log report for one or multiple processes."""
from aiida.cmdline.utils.common import get_calcjob_report, get_process_function_report, get_workchain_report
from aiida.orm import CalcFunctionNode, CalcJobNode, WorkChainNode, WorkFunctionNode
if processes and most_recent_node:
raise click.BadOptionUsage(
'most_recent_node',
'cannot specify individual processes and the `-M/--most-recent-node` flag at the same time.',
)
if most_recent_node:
processes = [get_most_recent_node()]
for process in processes:
if isinstance(process, CalcJobNode):
echo.echo(get_calcjob_report(process))
elif isinstance(process, WorkChainNode):
echo.echo(get_workchain_report(process, levelname, indent_size, max_depth))
elif isinstance(process, (CalcFunctionNode, WorkFunctionNode)):
echo.echo(get_process_function_report(process))
else:
echo.echo(f'Nothing to show for node type {process.__class__}')
@verdi_process.command('status')
@options.MOST_RECENT_NODE()
@click.option('-c', '--call-link-label', 'call_link_label', is_flag=True, help='Include the call link label if set.')
@click.option(
'-m', '--max-depth', 'max_depth', type=int, default=None, help='Limit the number of levels to be printed.'
)
@arguments.PROCESSES()
def process_status(call_link_label, most_recent_node, max_depth, processes):
"""Print the status of one or multiple processes."""
from aiida.cmdline.utils.ascii_vis import format_call_graph
if processes and most_recent_node:
raise click.BadOptionUsage(
'most_recent_node',
'cannot specify individual processes and the `-M/--most-recent-node` flag at the same time.',
)
if most_recent_node:
processes = [get_most_recent_node()]
for process in processes:
graph = format_call_graph(process, max_depth=max_depth, call_link_label=call_link_label)
echo.echo(graph)
@verdi_process.command('kill')
@arguments.PROCESSES()
@options.ALL(help='Kill all processes if no specific processes are specified.')
@options.TIMEOUT()
@options.WAIT()
@decorators.with_dbenv()
def process_kill(processes, all_entries, timeout, wait):
"""Kill running processes."""
from aiida.engine.processes import control
if processes and all_entries:
raise click.BadOptionUsage('all', 'cannot specify individual processes and the `--all` flag at the same time.')
if all_entries:
click.confirm('Are you sure you want to kill all processes?', abort=True)
with capture_logging() as stream:
try:
message = 'Killed through `verdi process kill`'
control.kill_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message)
except control.ProcessTimeoutException as exception:
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
if 'unreachable' in stream.getvalue():
echo.echo_report(REPAIR_INSTRUCTIONS)
@verdi_process.command('pause')
@arguments.PROCESSES()
@options.ALL(help='Pause all active processes if no specific processes are specified.')
@options.TIMEOUT()
@options.WAIT()
@decorators.with_dbenv()
def process_pause(processes, all_entries, timeout, wait):
"""Pause running processes."""
from aiida.engine.processes import control
if processes and all_entries:
raise click.BadOptionUsage('all', 'cannot specify individual processes and the `--all` flag at the same time.')
with capture_logging() as stream:
try:
message = 'Paused through `verdi process pause`'
control.pause_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait, message=message)
except control.ProcessTimeoutException as exception:
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
if 'unreachable' in stream.getvalue():
echo.echo_report(REPAIR_INSTRUCTIONS)
@verdi_process.command('play')
@arguments.PROCESSES()
@options.ALL(help='Play all paused processes if no specific processes are specified.')
@options.TIMEOUT()
@options.WAIT()
@decorators.with_dbenv()
def process_play(processes, all_entries, timeout, wait):
"""Play (unpause) paused processes."""
from aiida.engine.processes import control
if processes and all_entries:
raise click.BadOptionUsage('all', 'cannot specify individual processes and the `--all` flag at the same time.')
with capture_logging() as stream:
try:
control.play_processes(processes, all_entries=all_entries, timeout=timeout, wait=wait)
except control.ProcessTimeoutException as exception:
echo.echo_critical(f'{exception}\n{REPAIR_INSTRUCTIONS}')
if 'unreachable' in stream.getvalue():
echo.echo_report(REPAIR_INSTRUCTIONS)
@verdi_process.command('watch')
@arguments.PROCESSES()
@decorators.with_dbenv()
@decorators.with_broker
@decorators.only_if_daemon_running(echo.echo_warning, 'daemon is not running, so process may not be reachable')
def process_watch(broker, processes):
"""Watch the state transitions for a process."""
from time import sleep
from kiwipy import BroadcastFilter
def _print(communicator, body, sender, subject, correlation_id):
"""Format the incoming broadcast data into a message and echo it to stdout."""
if body is None:
body = 'No message specified'
if correlation_id is None:
correlation_id = '--'
echo.echo(f'Process<{sender}> [{subject}|{correlation_id}]: {body}')
communicator = broker.get_communicator()
echo.echo_report('watching for broadcasted messages, press CTRL+C to stop...')
for process in processes:
if process.is_terminated:
echo.echo_error(f'Process<{process.pk}> is already terminated')
continue
communicator.add_broadcast_subscriber(BroadcastFilter(_print, sender=process.pk))
try:
# Block this thread indefinitely until interrupt
while True:
sleep(2)
except (SystemExit, KeyboardInterrupt):
echo.echo('') # add a new line after the interrupt character
echo.echo_report('received interrupt, exiting...')
try:
communicator.close()
except RuntimeError:
pass
# Reraise to trigger clicks builtin abort sequence
raise
@verdi_process.command('repair')
@options.DRY_RUN()
@decorators.only_if_daemon_not_running()
@decorators.with_manager
@decorators.with_broker
def process_repair(manager, broker, dry_run):
"""Automatically repair all stuck processes.
N.B.: This command requires the daemon to be stopped.
This command queries the database to find all "active" processes, meaning those that haven't yet reached a terminal
state, and cross-references them with the active process tasks in the process queue of RabbitMQ. Any active process
that does not have a corresponding process task can be considered a zombie, as it will never be picked up by a
daemon worker to complete it and will effectively be "stuck". Any process task that does not correspond to an active
process is useless and should be discarded. Finally, duplicate process tasks are also problematic and are discarded.
"""
from aiida.engine.processes.control import get_active_processes, get_process_tasks, iterate_process_tasks
active_processes = get_active_processes(project='id')
process_tasks = get_process_tasks(broker)
set_active_processes = set(active_processes)
set_process_tasks = set(process_tasks)
echo.echo_info(f'Active processes: {active_processes}')
echo.echo_info(f'Process tasks: {process_tasks}')
state_inconsistent = False
if len(process_tasks) != len(set_process_tasks):
state_inconsistent = True
echo.echo_warning('There are duplicates process tasks: ', nl=False)
echo.echo(set(x for x in process_tasks if process_tasks.count(x) > 1))
if set_process_tasks.difference(set_active_processes):
state_inconsistent = True
echo.echo_warning('There are process tasks for terminated processes: ', nl=False)
echo.echo(set_process_tasks.difference(set_active_processes))
if set_active_processes.difference(set_process_tasks):
state_inconsistent = True
echo.echo_warning('There are active processes without process task: ', nl=False)
echo.echo(set_active_processes.difference(set_process_tasks))
if not state_inconsistent:
echo.echo_success('No inconsistencies detected between database and RabbitMQ.')
return
echo.echo_warning('Inconsistencies detected between database and RabbitMQ.')
if dry_run:
echo.echo_critical('This was a dry-run, no changes will be made.')
# At this point we have either exited because of inconsistencies and ``--dry-run`` was passed, or we returned
# because there were no inconsistencies, so all that is left is to address inconsistencies
echo.echo_report('Attempting to fix inconsistencies')
# Eliminate duplicate tasks and tasks that correspond to terminated process
for task in iterate_process_tasks(broker):
pid = task.body.get('args', {}).get('pid', None)
if pid not in set_active_processes:
with task.processing() as outcome:
outcome.set_result(False)
echo.echo_report(f'Acknowledged task `{pid}`')
# Revive zombie processes that no longer have a process task
process_controller = manager.get_process_controller()
for pid in set_active_processes:
if pid not in set_process_tasks:
process_controller.continue_process(pid)
echo.echo_report(f'Revived process `{pid}`')