Source code for aiida.orm.implementation.django.workflow

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
import importlib
from collections import Mapping

from django.core.exceptions import ObjectDoesNotExist
from django.db.models import Q

from aiida.backends.djsite.utils import get_automatic_user
from aiida.common import aiidalogger
from aiida.common.datastructures import wf_states, wf_exit_call, calc_states
from aiida.common.exceptions import (InternalError, ModificationNotAllowed,
                                     NotExistent, ValidationError,
                                     AiidaException)
from aiida.common.folders import RepositoryFolder, SandboxFolder
from aiida.common.utils import md5_file, str_timedelta
from aiida.orm.implementation.django.calculation.job import JobCalculation
from aiida.orm.implementation.general.workflow import AbstractWorkflow
from aiida.utils import timezone


logger = aiidalogger.getChild('Workflow')


[docs]class Workflow(AbstractWorkflow):
[docs] def __init__(self, **kwargs): """ Initializes the Workflow super class, store the instance in the DB and in case stores the starting parameters. If initialized with an uuid the Workflow is loaded from the DB, if not a new workflow is generated and added to the DB following the stack frameworks. This means that only modules inside aiida.workflows are allowed to implements the workflow super calls and be stored. The caller names, modules and files are retrieved from the stack. :param uuid: a string with the uuid of the object to be loaded. :param params: a dictionary of storable objects to initialize the specific workflow :raise: NotExistent: if there is no entry of the desired workflow kind with the given uuid. """ from aiida.backends.djsite.db.models import DbWorkflow self._to_be_stored = True self._logger = logger.getChild(self.__class__.__name__) uuid = kwargs.pop('uuid', None) if uuid is not None: self._to_be_stored = False if kwargs: raise ValueError("If you pass a UUID, you cannot pass any further parameter") try: self._dbworkflowinstance = DbWorkflow.objects.get(uuid=uuid) # self.logger.info("Workflow found in the database, now retrieved") self._repo_folder = RepositoryFolder(section=self._section_name, uuid=self.uuid) except ObjectDoesNotExist: raise NotExistent("No entry with the UUID {} found".format(uuid)) else: # ATTENTION: Do not move this code outside or encapsulate it in a function import inspect stack = inspect.stack() # cur_fr = inspect.currentframe() #call_fr = inspect.getouterframes(cur_fr, 2) # Get all the caller data caller_frame = stack[1][0] caller_file = stack[1][1] caller_funct = stack[1][3] caller_module = inspect.getmodule(caller_frame) caller_module_class = caller_frame.f_locals.get('self', None).__class__ if not caller_funct == "__init__": raise SystemError("A workflow must implement the __init__ class explicitly") # Test if the launcher is another workflow # print "caller_module", caller_module # print "caller_module_class", caller_module_class # print "caller_file", caller_file # print "caller_funct", caller_funct # Accept only the aiida.workflows packages if caller_module == None or not caller_module.__name__.startswith("aiida.workflows"): raise SystemError("The superclass can't be called directly") self.caller_module = caller_module.__name__ self.caller_module_class = caller_module_class.__name__ self.caller_file = caller_file self.caller_funct = caller_funct self._temp_folder = SandboxFolder() self.current_folder.insert_path(self.caller_file, self.caller_module_class) # self.store() # Test if there are parameters as input params = kwargs.pop('params', None) if params is not None: if isinstance(params, Mapping): self.set_params(params) # This stores the MD5 as well, to test in case the workflow has # been modified after the launch self._dbworkflowinstance = DbWorkflow(user=get_automatic_user(), module=self.caller_module, module_class=self.caller_module_class, script_path=self.caller_file, script_md5=md5_file(self.caller_file)) self.attach_calc_lazy_storage = {} self.attach_subwf_lazy_storage = {}
@property def dbworkflowinstance(self): """ Get the DbWorkflow object stored in the super class. :return: DbWorkflow object from the database """ from aiida.backends.djsite.db.models import DbWorkflow if self._dbworkflowinstance.pk is None: return self._dbworkflowinstance else: self._dbworkflowinstance = DbWorkflow.objects.get(pk=self._dbworkflowinstance.pk) return self._dbworkflowinstance
[docs] def _get_dbworkflowinstance(self): return self.dbworkflowinstance
@property def label(self): """ Get the label of the workflow. :return: a string """ return self.dbworkflowinstance.label @label.setter def label(self, label): self._update_db_label_field(label) @property def ctime(self): return self.dbworkflowinstance.ctime
[docs] def _update_db_label_field(self, field_value): """ Safety method to store the label of the workflow :return: a string """ from django.db import transaction self.dbworkflowinstance.label = field_value if not self._to_be_stored: with transaction.atomic(): self._dbworkflowinstance.save() self._increment_version_number_db()
@property def description(self): """ Get the description of the workflow. :return: a string """ return self.dbworkflowinstance.description @description.setter def description(self, desc): self._update_db_description_field(desc)
[docs] def _update_db_description_field(self, field_value): """ Safety method to store the description of the workflow :return: a string """ from django.db import transaction self.dbworkflowinstance.description = field_value if not self._to_be_stored: with transaction.atomic(): self._dbworkflowinstance.save() self._increment_version_number_db()
[docs] def _increment_version_number_db(self): """ This function increments the version number in the DB. This should be called every time you need to increment the version (e.g. on adding a extra or attribute). """ from django.db.models import F from aiida.backends.djsite.db.models import DbWorkflow # I increment the node number using a filter (this should be the right way of doing it; # dbnode.nodeversion = F('nodeversion') + 1 # will do weird stuff, returning Django Objects instead of numbers, and incrementing at # every save; moreover in this way I should do the right thing for concurrent writings # I use self._dbnode because this will not do a query to update the node; here I only # need to get its pk DbWorkflow.objects.filter(pk=self.pk).update(nodeversion=F('nodeversion') + 1) # This reload internally the node of self._dbworkflowinstance _ = self.dbworkflowinstance
# Note: I have to reload the ojbect. I don't do it here because it is done at every call # to self.dbnode # self._dbnode = DbNode.objects.get(pk=self._dbnode.pk)
[docs] @classmethod def query(cls, *args, **kwargs): """ Map to the aiidaobjects manager of the DbWorkflow, that returns Workflow objects instead of DbWorkflow entities. """ from aiida.backends.djsite.db.models import DbWorkflow return DbWorkflow.aiidaobjects.filter(*args, **kwargs)
# @property #def logger(self): # """ # Get the logger of the Workflow object. # # :return: Logger object # """ # return self._logger @property def logger(self): """ Get the logger of the Workflow object, so that it also logs to the DB. :return: LoggerAdapter object, that works like a logger, but also has the 'extra' embedded """ import logging from aiida.common.log import get_dblogger_extra return logging.LoggerAdapter(logger=self._logger, extra=get_dblogger_extra(self))
[docs] def store(self): """ Stores the DbWorkflow object data in the database """ if self._to_be_stored: self._dbworkflowinstance.save() if hasattr(self, '_params'): self.dbworkflowinstance.add_parameters(self._params, force=False) self._repo_folder = RepositoryFolder(section=self._section_name, uuid=self.uuid) self.repo_folder.replace_with_folder(self.get_temp_folder().abspath, move=True, overwrite=True) self._temp_folder = None self._to_be_stored = False # Important to allow to do w = WorkflowSubClass().store() return self
@property def uuid(self): """ Returns the DbWorkflow uuid """ return self.dbworkflowinstance.uuid @property def pk(self): """ Returns the DbWorkflow pk """ return self.dbworkflowinstance.pk
[docs] def info(self): """ Returns an array with all the informations about the modules, file, class to locate the workflow source code """ return [self.dbworkflowinstance.module, self.dbworkflowinstance.module_class, self.dbworkflowinstance.script_path, self.dbworkflowinstance.script_md5, self.dbworkflowinstance.ctime, self.dbworkflowinstance.state]
[docs] def set_params(self, params, force=False): """ Adds parameters to the Workflow that are both stored and used every time the workflow engine re-initialize the specific workflow to launch the new methods. """ def par_validate(params): the_params = {} for k, v in params.iteritems(): if any([isinstance(v, int), isinstance(v, bool), isinstance(v, float), isinstance(v, str)]): the_params[k] = v else: raise ValidationError("Cannot store in the DB a parameter " "which is not of type int, bool, float or str.") return the_params if self._to_be_stored: self._params = params else: the_params = par_validate(params) self.dbworkflowinstance.add_parameters(the_params, force=force)
[docs] def get_parameters(self): """ Get the Workflow paramenters :return: a dictionary of storable objects """ if self._to_be_stored: return self._params else: return self.dbworkflowinstance.get_parameters()
[docs] def get_parameter(self, _name): """ Get one Workflow paramenter :param name: a string with the parameters name to retrieve :return: a dictionary of storable objects """ if self._to_be_stored: return self._params(_name) else: return self.dbworkflowinstance.get_parameter(_name)
[docs] def get_attributes(self): """ Get the Workflow attributes :return: a dictionary of storable objects """ return self.dbworkflowinstance.get_attributes()
[docs] def get_attribute(self, _name): """ Get one Workflow attribute :param name: a string with the attribute name to retrieve :return: a dictionary of storable objects """ return self.dbworkflowinstance.get_attribute(_name)
[docs] def add_attributes(self, _params): """ Add a set of attributes to the Workflow. If another attribute is present with the same name it will be overwritten. :param name: a string with the attribute name to store :param value: a storable object to store """ if self._to_be_stored: raise ModificationNotAllowed("You cannot add attributes before storing") self.dbworkflowinstance.add_attributes(_params)
[docs] def add_attribute(self, _name, _value): """ Add one attributes to the Workflow. If another attribute is present with the same name it will be overwritten. :param name: a string with the attribute name to store :param value: a storable object to store """ if self._to_be_stored: raise ModificationNotAllowed("You cannot add attributes before storing") self.dbworkflowinstance.add_attribute(_name, _value)
[docs] def get_results(self): """ Get the Workflow results :return: a dictionary of storable objects """ return self.dbworkflowinstance.get_results()
[docs] def get_result(self, _name): """ Get one Workflow result :param name: a string with the result name to retrieve :return: a dictionary of storable objects """ return self.dbworkflowinstance.get_result(_name)
[docs] def add_results(self, _params): """ Add a set of results to the Workflow. If another result is present with the same name it will be overwritten. :param name: a string with the result name to store :param value: a storable object to store """ self.dbworkflowinstance.add_results(_params)
[docs] def add_result(self, _name, _value): """ Add one result to the Workflow. If another result is present with the same name it will be overwritten. :param name: a string with the result name to store :param value: a storable object to store """ self.dbworkflowinstance.add_result(_name, _value)
[docs] def get_state(self): """ Get the Workflow's state :return: a state from wf_states in aiida.common.datastructures """ return self.dbworkflowinstance.state
[docs] def set_state(self, state): """ Set the Workflow's state :param name: a state from wf_states in aiida.common.datastructures """ self.dbworkflowinstance.set_state(state)
[docs] def is_new(self): """ Returns True is the Workflow's state is CREATED """ return self.dbworkflowinstance.state == wf_states.CREATED
[docs] def is_running(self): """ Returns True is the Workflow's state is RUNNING """ return self.dbworkflowinstance.state == wf_states.RUNNING
[docs] def has_finished_ok(self): """ Returns True is the Workflow's state is FINISHED """ return self.dbworkflowinstance.state in [wf_states.FINISHED, wf_states.SLEEP]
[docs] def has_failed(self): """ Returns True is the Workflow's state is ERROR """ return self.dbworkflowinstance.state == wf_states.ERROR
[docs] def is_subworkflow(self): """ Return True is this is a subworkflow (i.e., if it has a parent), False otherwise. """ return self.dbworkflowinstance.is_subworkflow()
[docs] def get_step(self, step_method): """ Retrieves by name a step from the Workflow. :param step_method: a string with the name of the step to retrieve or a method :raise: ObjectDoesNotExist: if there is no step with the specific name. :return: a DbWorkflowStep object. """ if isinstance(step_method, basestring): step_method_name = step_method else: if not getattr(step_method, "is_wf_step"): raise AiidaException("Cannot get step calculations from a method not decorated as Workflow method") step_method_name = step_method.wf_step_name if (step_method_name == wf_exit_call): raise InternalError("Cannot query a step with name {0}, reserved string".format(step_method_name)) try: step = self.dbworkflowinstance.steps.get(name=step_method_name, user=get_automatic_user()) return step except ObjectDoesNotExist: return None
[docs] def get_steps(self, state=None): """ Retrieves all the steps from a specific workflow Workflow with the possibility to limit the list to a specific step's state. :param state: a state from wf_states in aiida.common.datastructures :return: a list of DbWorkflowStep objects. """ if state is None: return self.dbworkflowinstance.steps.all().order_by('time') #.values_list('name',flat=True) else: return self.dbworkflowinstance.steps.filter(state=state).order_by('time')
[docs] def get_report(self): """ Return the Workflow report. :note: once, in case the workflow is a subworkflow of any other Workflow this method calls the parent ``get_report`` method. This is not the case anymore. :return: a list of strings """ return self.dbworkflowinstance.report.splitlines()
[docs] def clear_report(self): """ Wipe the Workflow report. In case the workflow is a subworflow of any other Workflow this method calls the parent ``clear_report`` method. """ if len(self.dbworkflowinstance.parent_workflow_step.all()) == 0: self.dbworkflowinstance.clear_report() else: Workflow(uuid=self.dbworkflowinstance.parent_workflow_step.get().parent.uuid).clear_report()
[docs] def append_to_report(self, text): """ Adds text to the Workflow report. :note: Once, in case the workflow is a subworkflow of any other Workflow this method calls the parent ``append_to_report`` method; now instead this is not the case anymore """ self.dbworkflowinstance.append_to_report(text)
[docs] @classmethod def get_subclass_from_dbnode(cls, wf_db): module = wf_db.module module_class = wf_db.module_class try: wf_mod = importlib.import_module(module) except ImportError: raise InternalError("Unable to load the workflow module {}".format(module)) for elem_name, elem in wf_mod.__dict__.iteritems(): if module_class == elem_name: #and issubclass(elem, Workflow): return getattr(wf_mod, elem_name)(uuid=wf_db.uuid)
[docs] @classmethod def get_subclass_from_pk(cls, pk): from aiida.backends.djsite.db.models import DbWorkflow try: dbworkflowinstance = DbWorkflow.objects.get(pk=pk) return cls.get_subclass_from_dbnode(dbworkflowinstance) except ObjectDoesNotExist: raise NotExistent("No entry with pk= {} found".format(pk))
[docs] @classmethod def get_subclass_from_uuid(cls, uuid): from aiida.backends.djsite.db.models import DbWorkflow try: dbworkflowinstance = DbWorkflow.objects.get(uuid=uuid) return cls.get_subclass_from_dbnode(dbworkflowinstance) except ObjectDoesNotExist: raise NotExistent("No entry with the UUID {} found".format(uuid))
[docs]def kill_all(): from aiida.backends.djsite.db.models import DbWorkflow q_object = Q(user=get_automatic_user()) q_object.add(~Q(state=wf_states.FINISHED), Q.AND) w_list = DbWorkflow.objects.filter(q_object) for w in w_list: Workflow.get_subclass_from_uuid(w.uuid).kill()
[docs]def get_all_running_steps(): from aiida.backends.djsite.db.models import DbWorkflowStep return DbWorkflowStep.objects.filter(state=wf_states.RUNNING)
[docs]def get_workflow_info(w, tab_size=2, short=False, pre_string="", depth=16): """ Return a string with all the information regarding the given workflow and all its calculations and subworkflows. This is a recursive function (to print all subworkflows info as well). :param w: a DbWorkflow instance :param tab_size: number of spaces to use for the indentation :param short: if True, provide a shorter output (only total number of calculations, rather than the state of each calculation) :param pre_string: string appended at the beginning of each line :param depth: the maximum depth level the recursion on sub-workflows will try to reach (0 means we stay at the step level and don't go into sub-workflows, 1 means we go down to one step level of the sub-workflows, etc.) :return lines: list of lines to be outputed """ # Note: pre_string becomes larger at each call of get_workflow_info on the # subworkflows: pre_string -> pre_string + "|" + " "*(tab_size-1)) from aiida.backends.djsite.db.models import DbWorkflow if tab_size < 2: raise ValueError("tab_size must be > 2") now = timezone.now() lines = [] if w.label: wf_labelstring = "'{}', ".format(w.label) else: wf_labelstring = "" lines.append(pre_string) # put an empty line before any workflow lines.append(pre_string + "+ Workflow {} ({}pk: {}) is {} [{}]".format( w.module_class, wf_labelstring, w.pk, w.state, str_timedelta( now - w.ctime, negative_to_zero=True))) # print information on the steps only if depth is higher than 0 if depth > 0: # order all steps by time and get all the needed values steps_and_subwf_pks = w.steps.all().order_by('time', 'sub_workflows__ctime', 'calculations__ctime').values_list('pk', 'sub_workflows__pk', 'calculations', 'name', 'nextcall', 'state') # get the list of step pks (distinct), preserving the order steps_pk = [] for item in steps_and_subwf_pks: if item[0] not in steps_pk: steps_pk.append(item[0]) # build a dictionary with all the infos for each step pk subwfs_of_steps = {} for step_pk, subwf_pk, calc_pk, name, nextcall, state in steps_and_subwf_pks: if step_pk not in subwfs_of_steps.keys(): subwfs_of_steps[step_pk] = {'name': name, 'nextcall': nextcall, 'state': state, 'subwf_pks': [], 'calc_pks': [], } if subwf_pk: subwfs_of_steps[step_pk]['subwf_pks'].append(subwf_pk) if calc_pk: subwfs_of_steps[step_pk]['calc_pks'].append(calc_pk) # get all subworkflows for all steps wflows = DbWorkflow.objects.filter(parent_workflow_step__in=steps_pk) #.order_by('ctime') # dictionary mapping pks into workflows workflow_mapping = {_.pk: _ for _ in wflows} # get all calculations for all steps calcs = JobCalculation.query(workflow_step__in=steps_pk) #.order_by('ctime') # dictionary mapping pks into calculations calc_mapping = {_.pk: _ for _ in calcs} for step_pk in steps_pk: lines.append(pre_string + "|" + '-' * (tab_size - 1) + "* Step: {0} [->{1}] is {2}".format( subwfs_of_steps[step_pk]['name'], subwfs_of_steps[step_pk]['nextcall'], subwfs_of_steps[step_pk]['state'])) calc_pks = subwfs_of_steps[step_pk]['calc_pks'] # print calculations only if it is not short if short: lines.append(pre_string + "|" + " " * (tab_size - 1) + "| [{0} calculations]".format(len(calc_pks))) else: for calc_pk in calc_pks: c = calc_mapping[calc_pk] calc_state = c.get_state() if c.label: labelstring = "'{}', ".format(c.label) else: labelstring = "" if calc_state == calc_states.WITHSCHEDULER: sched_state = c.get_scheduler_state() if sched_state is None: remote_state = "(remote state still unknown)" else: last_check = c._get_scheduler_lastchecktime() if last_check is not None: when_string = " {}".format( str_timedelta(now - last_check, short=True, negative_to_zero=True)) verb_string = "was " else: when_string = "" verb_string = "" remote_state = " ({}{}{})".format(verb_string, sched_state, when_string) else: remote_state = "" lines.append(pre_string + "|" + " " * (tab_size - 1) + "| Calculation ({}pk: {}) is {}{}".format( labelstring, calc_pk, calc_state, remote_state)) ## SubWorkflows for subwf_pk in subwfs_of_steps[step_pk]['subwf_pks']: subwf = workflow_mapping[subwf_pk] lines.extend(get_workflow_info(subwf, short=short, tab_size=tab_size, pre_string=pre_string + "|" + " " * (tab_size - 1), depth=depth - 1)) lines.append(pre_string + "|") return lines