Source code for aiida.backends.sqlalchemy.models.workflow

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################

import json

from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship, backref
from sqlalchemy.schema import Column, UniqueConstraint, Table
from sqlalchemy.types import Integer, String, DateTime, Text

from sqlalchemy.dialects.postgresql import UUID

from sqlalchemy_utils.types.choice import ChoiceType

from aiida.backends.sqlalchemy.models.base import Base, _QueryProperty, _AiidaQuery
from aiida.backends.sqlalchemy.models.utils import uuid_func
from aiida.common.datastructures import (wf_states, wf_data_types,
                                         wf_data_value_types, wf_default_call)
from aiida.utils import timezone



[docs]class DbWorkflow(Base): __tablename__ = "db_dbworkflow" aiida_query = _QueryProperty(_AiidaQuery) id = Column(Integer, primary_key=True) uuid = Column(UUID(as_uuid=True), default=uuid_func) ctime = Column(DateTime(timezone=True), default=timezone.now) mtime = Column(DateTime(timezone=True), default=timezone.now) user_id = Column(Integer, ForeignKey('db_dbuser.id')) user = relationship('DbUser') label = Column(String(255), index=True) description = Column(Text) nodeversion = Column(Integer) lastsyncedversion = Column(Integer) state = Column(ChoiceType((_, _) for _ in wf_states), default=wf_states.INITIALIZED) report = Column(Text) data = relationship("DbWorkflowData", backref='parent') # XXX the next three attributes have "blank=False", but can be null. It may # be needed to add some validation for this, but only at commit time. # To do so: see https://stackoverflow.com/questions/28228766/running-cleaning-validation-code-before-committing-in-sqlalchemy module = Column(Text) module_class = Column(Text) script_path = Column(Text) # XXX restrict the size of this column, MD5 have a fixed size script_md5 = Column(String(255)) # Blank = False. def __init__(self, *args, **kwargs): super(DbWorkflow, self).__init__(*args, **kwargs) self.nodeversion = 1 self.lastsyncedversion = 0 @property def pk(self): return self.id
[docs] def get_aiida_class(self): """ Return the corresponding aiida instance of class aiida.worflow """ from aiida.orm.workflow import Workflow return Workflow.get_subclass_from_uuid(self.uuid)
[docs] def set_state(self, state): self.state = state self.save()
[docs] def set_script_md5(self, md5): self.script_md5 = md5 self.save()
[docs] def add_data(self, dict, d_type): for k in dict.keys(): p, create = self._get_or_create_data(name=k, data_type=d_type) p.set_value(dict[k])
[docs] def _get_or_create_data(self, name, data_type): match_data = {name: _ for _ in self.data if _.name == name and _.data_type == data_type} if not match_data: # create case dbdata = DbWorkflowData(parent_id=self.id, name=name, data_type=data_type) self.data.append(dbdata) return dbdata, True else: # already existing case return match_data[name], False
[docs] def _get_or_create_step(self, name, user): match_step = [_ for _ in self.steps if (_.name == name and _.user == user)] if not match_step: # create case dbstep = DbWorkflowStep(parent_id=self.id, name=name, user_id=user.id) self.steps.append(dbstep) return dbstep, True else: # already existing case return match_step[0], False
[docs] def get_data(self, d_type): dict = {} # for p in self.data.filter(parent=self, data_type=d_type): for p in [_ for _ in self.data if _.data_type == d_type]: dict[p.name] = p.get_value() return dict
[docs] def add_parameters(self, _dict, force=False): if not self.state == wf_states.INITIALIZED and not force: raise ValueError("Cannot add initial parameters to an already initialized workflow") self.add_data(_dict, wf_data_types.PARAMETER)
[docs] def add_parameter(self, name, value): self.add_parameters({name: value})
[docs] def get_parameters(self): return self.get_data(wf_data_types.PARAMETER)
[docs] def get_parameter(self, name): res = self.get_parameters() if name in res: return res[name] else: raise ValueError("Error retrieving results: {0}".format(name))
[docs] def add_results(self, _dict): self.add_data(_dict, wf_data_types.RESULT)
[docs] def add_result(self, name, value): self.add_results({name: value})
[docs] def get_results(self): return self.get_data(wf_data_types.RESULT)
[docs] def get_result(self, name): res = self.get_results() if name in res: return res[name] else: raise ValueError("Error retrieving results: {0}".format(name))
[docs] def add_attributes(self, _dict): self.add_data(_dict, wf_data_types.ATTRIBUTE)
[docs] def add_attribute(self, name, value): self.add_attributes({name: value})
[docs] def get_attributes(self): return self.get_data(wf_data_types.ATTRIBUTE)
[docs] def get_attribute(self, name): res = self.get_attributes() if name in res: return res[name] else: raise ValueError("Error retrieving results: {0}".format(name))
[docs] def clear_report(self): self.report = '' self.save()
[docs] def append_to_report(self, _text): if self.report == None: self.report = '' self.report += str(timezone.now()) + "] " + _text + "\n" self.save()
[docs] def get_calculations(self): from aiida.orm import JobCalculation return JobCalculation.query(workflow_step=self.steps)
[docs] def get_sub_workflows(self): return DbWorkflow.objects.filter(parent_workflow_step=self.steps.all())
[docs] def is_subworkflow(self): """ Return True if this is a subworkflow, False if it is a root workflow, launched by the user. """ return len(self.parent_workflow_step) > 0
[docs] def finish(self): self.state = wf_states.FINISHED
[docs] def __str__(self): simplename = self.module_class # node pk + type if self.label: return "{} workflow [{}]: {}".format(simplename, self.pk, self.label) else: return "{} workflow [{}]".format(simplename, self.pk)
[docs]class DbWorkflowData(Base): __tablename__ = "db_dbworkflowdata" id = Column(Integer, primary_key=True) parent_id = Column(Integer, ForeignKey('db_dbworkflow.id'), index=True) name = Column(String(255)) # Blank = false time = Column(DateTime(timezone=True), default=timezone.now) data_type = Column(String(255), default=wf_data_types.PARAMETER) # blank = false value_type = Column(String(255), default=wf_data_value_types.NONE) # blank = false json_value = Column(Text) aiida_obj_id = Column(Integer, ForeignKey('db_dbnode.id'), nullable=True, index=True) aiida_obj = relationship("DbNode") __table_args__ = ( UniqueConstraint("parent_id", "name", "data_type"), )
[docs] def get_or_create(self, **kwargs): # this is to emulate the django method from sqlalchemy.sql.expression import ClauseElement instance = self.query().filter_by(kwargs).first() if instance: return instance, False else: params = dict((k, v) for k, v in kwargs.iteritems() if not isinstance(v, ClauseElement)) instance = model(**params) session.add(instance) return instance, True
[docs] def set_value(self, arg): from aiida.orm import Node from aiida.backends.sqlalchemy import get_scoped_session try: if isinstance(arg, Node) or issubclass(arg.__class__, Node): if arg.pk is None: raise ValueError("Cannot add an unstored node as an " "attribute of a Workflow!") sess = get_scoped_session() self.aiida_obj = sess.merge(arg._dbnode, load=True) self.value_type = wf_data_value_types.AIIDA self.save() else: self.json_value = json.dumps(arg) self.value_type = wf_data_value_types.JSON self.save() except Exception as ex: raise ValueError("Cannot set the parameter {}\n".format(self.name) + ex.message)
[docs] def get_value(self): if self.value_type == wf_data_value_types.JSON: return json.loads(self.json_value) elif self.value_type == wf_data_value_types.AIIDA: return self.aiida_obj.get_aiida_class() elif self.value_type == wf_data_value_types.NONE: return None else: raise ValueError("Cannot rebuild the parameter {}".format(self.name))
[docs] def __str__(self): return "Data for workflow {} [{}]: {}".format( self.parent.module_class, self.parent.id, self.name)
table_workflowstep_calc = Table( 'db_dbworkflowstep_calculations', Base.metadata, Column('id', Integer, primary_key=True), Column('dbworkflowstep_id', Integer, ForeignKey('db_dbworkflowstep.id')), Column('dbnode_id', Integer, ForeignKey('db_dbnode.id')), UniqueConstraint('dbworkflowstep_id', 'dbnode_id') ) table_workflowstep_subworkflow = Table( 'db_dbworkflowstep_sub_workflows', Base.metadata, Column('id', Integer, primary_key=True), Column('dbworkflowstep_id', Integer, ForeignKey('db_dbworkflowstep.id')), Column('dbworkflow_id', Integer, ForeignKey('db_dbworkflow.id')), UniqueConstraint('dbworkflowstep_id', 'dbworkflow_id') )
[docs]class DbWorkflowStep(Base): __tablename__ = "db_dbworkflowstep" id = Column(Integer, primary_key=True) parent_id = Column(Integer, ForeignKey('db_dbworkflow.id')) parent = relationship("DbWorkflow", backref='steps') user_id = Column(Integer, ForeignKey('db_dbuser.id')) user = relationship('DbUser') name = Column(String(255)) # Blank = false time = Column(DateTime(timezone=True), default=timezone.now) nextcall = Column(String(255), default=wf_default_call) # Blank = false state = Column(ChoiceType((_, _) for _ in wf_states), default=wf_states.CREATED) calculations = relationship("DbNode", secondary=table_workflowstep_calc, backref="workflow_step") sub_workflows = relationship("DbWorkflow", secondary=table_workflowstep_subworkflow, backref="parent_workflow_step") __table_args__ = ( UniqueConstraint('parent_id', 'name'), )
[docs] def add_calculation(self, step_calculation): from aiida.orm import JobCalculation if (not isinstance(step_calculation, JobCalculation)): raise ValueError("Cannot add a non-Calculation object to a workflow step") try: self.calculations.append(step_calculation._dbnode) except: raise ValueError("Error adding calculation to step")
[docs] def get_calculations(self, state=None): dbnodes = self.calculations calcs = [_.get_aiida_class() for _ in dbnodes] if (state == None): return calcs else: return [_ for _ in calcs if _.get_state() == state]
[docs] def remove_calculations(self): self.calculations.all().delete()
[docs] def add_sub_workflow(self, sub_wf): from aiida.orm import Workflow if not issubclass(sub_wf.__class__, Workflow) and \ not isinstance(sub_wf, Workflow): raise ValueError("Cannot add a workflow not of type Workflow") try: self.sub_workflows.append(sub_wf.dbworkflowinstance) except: raise ValueError("Error adding calculation to step")
[docs] def get_sub_workflows(self): return [_.get_aiida_class() for _ in self.sub_workflows]
[docs] def remove_sub_workflows(self): self.sub_workflows.all().delete()
[docs] def is_finished(self): return self.state == wf_states.FINISHED
[docs] def set_nextcall(self, _nextcall): self.nextcall = _nextcall self.save()
[docs] def set_state(self, _state): self.state = _state self.save()
[docs] def reinitialize(self): self.set_state(wf_states.INITIALIZED)
[docs] def finish(self): self.set_state(wf_states.FINISHED)
[docs] def __str__(self): return "Step {} for workflow {} [{}]".format( self.name, self.parent.module_class, self.parent.id)