# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
import uuid
import shutil
import tempfile
import threading
import plum.process_monitor
from aiida.backends.testbase import AiidaTestCase
from aiida.orm import load_node
from aiida.orm.data.base import Int
from aiida.work.persistence import Persistence
from aiida.work.process import Process, FunctionProcess
from aiida.work.run import run, submit
from aiida.work.workfunction import workfunction
import aiida.work.util as util
from aiida.work.test_utils import DummyProcess, BadOutput
from aiida.common.lang import override
from aiida.orm.data.frozendict import FrozenDict
[docs]class ProcessStackTest(Process):
[docs] @override
def _run(self):
pass
[docs] @override
def on_create(self, pid, inputs, saved_instance_state):
super(ProcessStackTest, self).on_create(
pid, inputs, saved_instance_state)
self._thread_id = threading.current_thread().ident
[docs] @override
def on_stop(self):
# The therad must match the one used in on_create because process
# stack is using thread local storage to keep track of who called who
super(ProcessStackTest, self).on_stop()
assert self._thread_id is threading.current_thread().ident
[docs]class TestProcess(AiidaTestCase):
[docs] def setUp(self):
super(TestProcess, self).setUp()
self.assertEquals(len(util.ProcessStack.stack()), 0)
self.assertEquals(len(plum.process_monitor.MONITOR.get_pids()), 0)
[docs] def tearDown(self):
super(TestProcess, self).tearDown()
self.assertEquals(len(util.ProcessStack.stack()), 0)
self.assertEquals(len(plum.process_monitor.MONITOR.get_pids()), 0)
[docs] def test_process_stack(self):
ProcessStackTest.run()
[docs] def test_pid_is_uuid(self):
p = DummyProcess.new_instance(inputs={'_store_provenance': False})
self.assertEqual(uuid.UUID(p._calc.uuid), p.pid)
p.stop()
p.run_until_complete()
[docs] def test_seal(self):
rinfo = run(DummyProcess, _return_pid=True)[1]
self.assertTrue(load_node(pk=rinfo).is_sealed)
storedir = tempfile.mkdtemp()
storage = Persistence.create_from_basedir(storedir)
rinfo = submit(DummyProcess, _jobs_store=storage)
n = load_node(pk=rinfo.pid)
self.assertFalse(n.is_sealed)
dp = DummyProcess.create_from(storage._load_checkpoint(rinfo.pid))
dp.run_until_complete()
self.assertTrue(n.is_sealed)
shutil.rmtree(storedir)
[docs] def test_description(self):
dp = DummyProcess.new_instance(inputs={'_description': 'My description'})
self.assertEquals(dp.calc.description, 'My description')
dp.run_until_complete()
with self.assertRaises(ValueError):
DummyProcess.new_instance(inputs={'_description': 5})
[docs] def test_label(self):
dp = DummyProcess.new_instance(inputs={'_label': 'My label'})
self.assertEquals(dp.calc.label, 'My label')
dp.run_until_complete()
with self.assertRaises(ValueError):
DummyProcess.new_instance(inputs={'_label': 5})
[docs]class TestFunctionProcess(AiidaTestCase):
[docs] def test_kwargs(self):
def wf_with_kwargs(**kwargs):
return kwargs
def wf_without_kwargs():
return Int(4)
def wf_fixed_args(a):
return {'a': a}
a = Int(4)
inputs = {'a': a}
FP = FunctionProcess.build(wf_with_kwargs)
outs = FP.run(**inputs)
self.assertEqual(outs, inputs)
FP = FunctionProcess.build(wf_without_kwargs)
with self.assertRaises(ValueError):
FP.run(**inputs)
FP = FunctionProcess.build(wf_fixed_args)
outs = FP.run(**inputs)
self.assertEqual(outs, inputs)