# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved. #
# This file is part of the AiiDA code. #
# #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file #
# For further information please visit http://www.aiida.net #
###########################################################################
from aiida.backends.testbase import AiidaTestCase
from concurrent.futures import ThreadPoolExecutor
from plum.engine.ticking import TickingEngine
from aiida.orm.data.base import get_true_node, Int
from aiida.work.process import Process
import aiida.work.util as util
[docs]class DummyProcess(Process):
[docs] @classmethod
def define(cls, spec):
super(DummyProcess, cls).define(spec)
spec.dynamic_input()
spec.dynamic_output()
[docs] def _run(self, a):
self.out("ran", get_true_node())
[docs]class TestTickingEngine(AiidaTestCase):
[docs] def setUp(self):
super(TestTickingEngine, self).setUp()
self.assertEquals(len(util.ProcessStack.stack()), 0)
self.ticking_engine = TickingEngine()
self.executor = ThreadPoolExecutor(max_workers=1)
[docs] def tearDown(self):
super(TestTickingEngine, self).tearDown()
self.ticking_engine.shutdown()
self.assertEquals(len(util.ProcessStack.stack()), 0)
[docs] def test_get_process(self):
# Test cancelling a future before the process runs
future = self.ticking_engine.submit(DummyProcess, inputs={'a': Int(5)})
[docs] def test_submit(self):
fut = self.ticking_engine.submit(DummyProcess, inputs={'a': Int(5)})
self._tick_till_finished()
res = fut.result()
self.assertTrue(res['ran'].value)
[docs] def test_cancel(self):
# Test cancelling a future before the process runs
future = self.ticking_engine.submit(DummyProcess, inputs={'a': Int(5)})
self.assertTrue(future.running())
future.cancel()
self.assertTrue(future.cancelled())
[docs] def _tick_till_finished(self):
self.executor.submit(self._keep_ticking())
[docs] def _keep_ticking(self):
while self.ticking_engine.tick():
pass