Source code for aiida.backends.tests.export_and_import

# -*- coding: utf-8 -*-
###########################################################################
# Copyright (c), The AiiDA team. All rights reserved.                     #
# This file is part of the AiiDA code.                                    #
#                                                                         #
# The code is hosted on GitHub at https://github.com/aiidateam/aiida_core #
# For further information on the license, see the LICENSE.txt file        #
# For further information please visit http://www.aiida.net               #
###########################################################################
"""
Tests for the export and import routines.
"""

from aiida.backends.testbase import AiidaTestCase
from aiida.orm.importexport import import_data


[docs]class TestSpecificImport(AiidaTestCase):
[docs] def setUp(self): super(TestSpecificImport, self).setUp() self.clean_db() self.insert_data()
[docs] def test_simple_import(self): """ This is a very simple test which checks that an export file with nodes that are not associated to a computer is imported correctly. In Django when such nodes are exported, there is an empty set for computers in the export file. In SQLA there is such a set only when a computer is associated with the exported nodes. When an empty computer set is found at the export file (when imported to an SQLA profile), the SQLA import code used to crash. This test demonstrates this problem. """ import tempfile from aiida.orm.data.parameter import ParameterData from aiida.orm.importexport import export, import_data from aiida.orm.node import Node from aiida.orm.querybuilder import QueryBuilder parameters = ParameterData(dict={ 'Pr': { 'cutoff': 50.0, 'pseudo_type': 'Wentzcovitch', 'dual': 8, 'cutoff_units': 'Ry' }, 'Ru': { 'cutoff': 40.0, 'pseudo_type': 'SG15', 'dual': 4, 'cutoff_units': 'Ry' }, }).store() with tempfile.NamedTemporaryFile() as handle: nodes = [parameters.dbnode] export(nodes, outfile=handle.name, overwrite=True, silent=True) # Check that we have the expected number of nodes in the database self.assertEquals(QueryBuilder().append(Node).count(), len(nodes)) # Clean the database and verify there are no nodes left self.clean_db() self.assertEquals(QueryBuilder().append(Node).count(), 0) # After importing we should have the original number of nodes again import_data(handle.name, silent=True) self.assertEquals(QueryBuilder().append(Node).count(), len(nodes))
[docs] def test_cycle_structure_data(self): """ Create an export with some Calculation and Data nodes and import it after having cleaned the database. Verify that the nodes and their attributes are restored properly after importing the created export archive """ import tempfile from aiida.common.links import LinkType from aiida.orm.calculation import Calculation from aiida.orm.data.structure import StructureData from aiida.orm.data.remote import RemoteData from aiida.orm.importexport import export, import_data from aiida.orm.node import Node from aiida.orm.querybuilder import QueryBuilder test_label = 'Test structure' test_cell = [ [8.34, 0.0, 0.0], [0.298041701839357, 8.53479766274308, 0.0], [0.842650688117053, 0.47118495164127, 10.6965192730702] ] test_kinds = [ { 'symbols': [u'Fe'], 'weights': [1.0], 'mass': 55.845, 'name': u'Fe' }, { 'symbols': [u'S'], 'weights': [1.0], 'mass': 32.065, 'name': u'S' } ] structure = StructureData(cell=test_cell) structure.append_atom(symbols=['Fe'], position=[0, 0, 0]) structure.append_atom(symbols=['S'], position=[2, 2, 2]) structure.label = test_label structure.store() parent_calculation = Calculation() parent_calculation._set_attr('key', 'value') parent_calculation.store() child_calculation = Calculation() child_calculation._set_attr('key', 'value') child_calculation.store() remote_folder = RemoteData(computer=self.computer, remote_path='/').store() remote_folder.add_link_from(parent_calculation, link_type=LinkType.CREATE) child_calculation.add_link_from(remote_folder, link_type=LinkType.INPUT) structure.add_link_from(child_calculation, link_type=LinkType.CREATE) with tempfile.NamedTemporaryFile() as handle: nodes = [structure.dbnode, child_calculation.dbnode, parent_calculation.dbnode, remote_folder.dbnode] export(nodes, outfile=handle.name, overwrite=True, silent=True) # Check that we have the expected number of nodes in the database self.assertEquals(QueryBuilder().append(Node).count(), len(nodes)) # Clean the database and verify there are no nodes left self.clean_db() self.assertEquals(QueryBuilder().append(Node).count(), 0) # After importing we should have the original number of nodes again import_data(handle.name, silent=True) self.assertEquals(QueryBuilder().append(Node).count(), len(nodes)) # Verify that Calculations have non-empty attribute dictionaries qb = QueryBuilder().append(Calculation) for [calculation] in qb.iterall(): self.assertIsInstance(calculation.get_attrs(), dict) self.assertNotEquals(len(calculation.get_attrs()), 0) # Verify that the structure data maintained its label, cell and kinds qb = QueryBuilder().append(StructureData) for [structure] in qb.iterall(): self.assertEquals(structure.label, test_label) self.assertEquals(structure.cell, test_cell) qb = QueryBuilder().append(StructureData, project=['attributes.kinds']) for [kinds] in qb.iterall(): self.assertEqual(len(kinds), 2) for kind in kinds: self.assertIn(kind, test_kinds) # Check that there is a StructureData that is an output of a Calculation qb = QueryBuilder() qb.append(Calculation, project=['uuid'], tag='calculation') qb.append(StructureData, output_of='calculation') self.assertGreater(len(qb.all()), 0) # Check that there is a RemoteData that is a child and parent of a Calculation qb = QueryBuilder() qb.append(Calculation, tag='parent') qb.append(RemoteData, project=['uuid'], output_of='parent', tag='remote') qb.append(Calculation, output_of='remote') self.assertGreater(len(qb.all()), 0)
[docs]class TestSimple(AiidaTestCase):
[docs] def setUp(self): self.clean_db() self.insert_data()
[docs] def tearDown(self): pass
[docs] def test_0(self): import os import shutil import tempfile from aiida.orm import DataFactory from aiida.orm import load_node from aiida.orm.data.base import Str, Int, Float, Bool from aiida.orm.calculation.job import JobCalculation from aiida.orm.importexport import export # Creating a folder for the import/export files temp_folder = tempfile.mkdtemp() try: # producing values for each base type values = ("Hello", 6, -1.2399834e12, False) #, ["Bla", 1, 1e-10]) filename = os.path.join(temp_folder, "export.tar.gz") # producing nodes: nodes = [cls(val).store() for val, cls in zip(values, (Str, Int, Float, Bool))] # my uuid - list to reload the node: uuids = [n.uuid for n in nodes] # exporting the nodes: export([n.dbnode for n in nodes], outfile=filename, silent=True) # cleaning: self.clean_db() # Importing back the data: import_data(filename, silent=True) # Checking whether values are preserved: for uuid, refval in zip(uuids, values): self.assertEquals(load_node(uuid).value, refval) finally: # Deleting the created temporary folder shutil.rmtree(temp_folder, ignore_errors=True)
[docs] def test_1(self): import os import shutil import tempfile from aiida.orm import DataFactory from aiida.orm import load_node from aiida.orm.calculation.job import JobCalculation from aiida.orm.importexport import export # Creating a folder for the import/export files temp_folder = tempfile.mkdtemp() try: StructureData = DataFactory('structure') sd = StructureData() sd.store() calc = JobCalculation() calc.set_computer(self.computer) calc.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) calc.store() calc.add_link_from(sd) pks = [sd.pk, calc.pk] attrs = {} for pk in pks: node = load_node(pk) attrs[node.uuid] = dict() for k in node.attrs(): attrs[node.uuid][k] = node.get_attr(k) filename = os.path.join(temp_folder, "export.tar.gz") export([calc.dbnode], outfile=filename, silent=True) self.clean_db() # NOTE: it is better to load new nodes by uuid, rather than assuming # that they will have the first 3 pks. In fact, a recommended policy in # databases is that pk always increment, even if you've deleted elements import_data(filename, silent=True) for uuid in attrs.keys(): node = load_node(uuid) # for k in node.attrs(): for k in attrs[uuid].keys(): self.assertEquals(attrs[uuid][k], node.get_attr(k)) finally: # Deleting the created temporary folder shutil.rmtree(temp_folder, ignore_errors=True)
# print temp_folder
[docs] def test_2(self): """ Test the check for the export format version. """ import json import tarfile import os import shutil import tempfile from aiida.orm import DataFactory from aiida.orm.importexport import export # Creating a folder for the import/export files export_file_tmp_folder = tempfile.mkdtemp() unpack_tmp_folder = tempfile.mkdtemp() try: StructureData = DataFactory('structure') sd = StructureData() sd.store() filename = os.path.join(export_file_tmp_folder, "export.tar.gz") export([sd.dbnode], outfile=filename, silent=True) with tarfile.open(filename, "r:gz", format=tarfile.PAX_FORMAT) as tar: tar.extractall(unpack_tmp_folder) with open(os.path.join(unpack_tmp_folder, 'metadata.json'), 'r') as f: metadata = json.load(f) metadata['export_version'] = 0.0 with open(os.path.join(unpack_tmp_folder, 'metadata.json'), 'w') as f: json.dump(metadata, f) with tarfile.open(filename, "w:gz", format=tarfile.PAX_FORMAT) as tar: tar.add(unpack_tmp_folder, arcname="") self.tearDownClass() self.setUpClass() with self.assertRaises(ValueError): import_data(filename, silent=True) finally: # Deleting the created temporary folders shutil.rmtree(export_file_tmp_folder, ignore_errors=True) shutil.rmtree(unpack_tmp_folder, ignore_errors=True)
[docs] def test_3(self): """ Test importing of nodes, that have links to unknown nodes. """ import json import tarfile import os import shutil import tempfile from aiida.orm.importexport import export from aiida.common.folders import SandboxFolder from aiida.orm.data.structure import StructureData from aiida.orm import load_node # Creating a folder for the import/export files temp_folder = tempfile.mkdtemp() try: node_label = "Test structure data" sd = StructureData() sd.label = str(node_label) sd.store() filename = os.path.join(temp_folder, "export.tar.gz") export([sd.dbnode], outfile=filename, silent=True) unpack = SandboxFolder() with tarfile.open( filename, "r:gz", format=tarfile.PAX_FORMAT) as tar: tar.extractall(unpack.abspath) with open(unpack.get_abs_path('data.json'), 'r') as f: metadata = json.load(f) metadata['links_uuid'].append({ 'output': sd.uuid, 'input': 'non-existing-uuid', 'label': 'parent' }) with open(unpack.get_abs_path('data.json'), 'w') as f: json.dump(metadata, f) with tarfile.open( filename, "w:gz", format=tarfile.PAX_FORMAT) as tar: tar.add(unpack.abspath, arcname="") self.clean_db() with self.assertRaises(ValueError): import_data(filename, silent=True) import_data(filename, ignore_unknown_nodes=True, silent=True) self.assertEquals(load_node(sd.uuid).label, node_label) finally: # Deleting the created temporary folder shutil.rmtree(temp_folder, ignore_errors=True)
[docs] def test_4(self): """ Test control of licenses. """ from aiida.common.exceptions import LicensingException from aiida.common.folders import SandboxFolder from aiida.orm.importexport import export_tree from aiida.orm import DataFactory StructureData = DataFactory('structure') sd = StructureData() sd.source = {'license': 'GPL'} sd.store() folder = SandboxFolder() export_tree([sd.dbnode], folder=folder, silent=True, allowed_licenses=['GPL']) # Folder should contain two files of metadata + nodes/ self.assertEquals(len(folder.get_content_list()), 3) folder = SandboxFolder() export_tree([sd.dbnode], folder=folder, silent=True, forbidden_licenses=['Academic']) # Folder should contain two files of metadata + nodes/ self.assertEquals(len(folder.get_content_list()), 3) folder = SandboxFolder() with self.assertRaises(LicensingException): export_tree([sd.dbnode], folder=folder, silent=True, allowed_licenses=['CC0']) folder = SandboxFolder() with self.assertRaises(LicensingException): export_tree([sd.dbnode], folder=folder, silent=True, forbidden_licenses=['GPL']) def cc_filter(license): return license.startswith('CC') def gpl_filter(license): return license == 'GPL' def crashing_filter(license): raise NotImplementedError("not implemented yet") folder = SandboxFolder() with self.assertRaises(LicensingException): export_tree([sd.dbnode], folder=folder, silent=True, allowed_licenses=cc_filter) folder = SandboxFolder() with self.assertRaises(LicensingException): export_tree([sd.dbnode], folder=folder, silent=True, forbidden_licenses=gpl_filter) folder = SandboxFolder() with self.assertRaises(LicensingException): export_tree([sd.dbnode], folder=folder, silent=True, allowed_licenses=crashing_filter) folder = SandboxFolder() with self.assertRaises(LicensingException): export_tree([sd.dbnode], folder=folder, silent=True, forbidden_licenses=crashing_filter)
[docs] def test_5(self): """ This test checks that nodes belonging to different users are correctly exported & imported. """ import os import shutil import tempfile from aiida.orm import load_node from aiida.orm.calculation.job import JobCalculation from aiida.orm.data.structure import StructureData from aiida.orm.importexport import export from aiida.common.datastructures import calc_states from aiida.common.links import LinkType from aiida.orm.user import User from aiida.common.utils import get_configured_user_email # Creating a folder for the import/export files temp_folder = tempfile.mkdtemp() try: # Create another user new_email = "newuser@new.n" user = User(email=new_email) user.force_save() # Create a structure data node that has a calculation as output sd1 = StructureData() sd1.dbnode.user = user._dbuser sd1.label = 'sd1' sd1.store() jc1 = JobCalculation() jc1.set_computer(self.computer) jc1.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) jc1.dbnode.user = user._dbuser jc1.label = 'jc1' jc1.store() jc1.add_link_from(sd1) jc1._set_state(calc_states.PARSING) # Create some nodes from a different user sd2 = StructureData() sd2.dbnode.user = user._dbuser sd2.label = 'sd2' sd2.store() sd2.add_link_from(jc1, label='l1', link_type=LinkType.CREATE) # I assume jc1 CREATED sd2 jc2 = JobCalculation() jc2.set_computer(self.computer) jc2.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) jc2.label = 'jc2' jc2.store() jc2.add_link_from(sd2, label='l2') jc2._set_state(calc_states.PARSING) sd3 = StructureData() sd3.label = 'sd3' sd3.store() sd3.add_link_from(jc2, label='l3', link_type=LinkType.CREATE) uuids_u1 = [sd1.uuid, jc1.uuid, sd2.uuid] uuids_u2 = [jc2.uuid, sd3.uuid] filename = os.path.join(temp_folder, "export.tar.gz") export([sd3.dbnode], outfile=filename, silent=True) self.clean_db() import_data(filename, silent=True) # Check that the imported nodes are correctly imported and that # the user assigned to the nodes is the right one for uuid in uuids_u1: self.assertEquals(load_node(uuid).get_user().email, new_email) for uuid in uuids_u2: self.assertEquals(load_node(uuid).get_user().email, get_configured_user_email()) finally: # Deleting the created temporary folder shutil.rmtree(temp_folder, ignore_errors=True)
[docs] def test_6(self): """ This test checks that nodes belonging to user A (which is not the default user) can be correctly exported, imported, enriched with nodes from the default user, re-exported & re-imported and that in the end all the nodes that have been finally imported belonging to the right users. """ import os import shutil import tempfile from aiida.orm import load_node from aiida.orm.calculation.job import JobCalculation from aiida.orm.data.structure import StructureData from aiida.orm.importexport import export from aiida.common.datastructures import calc_states from aiida.common.links import LinkType from aiida.common.utils import get_configured_user_email from aiida.orm.user import User # Creating a folder for the import/export files temp_folder = tempfile.mkdtemp() try: # Create another user new_email = "newuser@new.n" user = User(email=new_email) user.force_save() # Create a structure data node that has a calculation as output sd1 = StructureData() sd1.dbnode.user = user._dbuser sd1.label = 'sd1' sd1.store() jc1 = JobCalculation() jc1.set_computer(self.computer) jc1.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) jc1.dbnode.user = user._dbuser jc1.label = 'jc1' jc1.store() jc1.add_link_from(sd1) jc1._set_state(calc_states.PARSING) # Create some nodes from a different user sd2 = StructureData() sd2.dbnode.user = user._dbuser sd2.label = 'sd2' sd2.store() sd2.add_link_from(jc1, label='l1', link_type=LinkType.CREATE) # Set the jc1 to FINISHED jc1._set_state(calc_states.FINISHED) # At this point we export the generated data filename1 = os.path.join(temp_folder, "export1.tar.gz") export([sd2.dbnode], outfile=filename1, silent=True) uuids1 = [sd1.uuid, jc1.uuid, sd2.uuid] self.clean_db() self.insert_data() import_data(filename1, silent=True) # Check that the imported nodes are correctly imported and that # the user assigned to the nodes is the right one for uuid in uuids1: self.assertEquals(load_node(uuid).get_user().email, new_email) # Now we continue to generate more data based on the imported # data sd2_imp = load_node(sd2.uuid) jc2 = JobCalculation() jc2.set_computer(self.computer) jc2.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) jc2.label = 'jc2' jc2.store() jc2.add_link_from(sd2_imp, label='l2') jc2._set_state(calc_states.PARSING) sd3 = StructureData() sd3.label = 'sd3' sd3.store() sd3.add_link_from(jc2, label='l3', link_type=LinkType.CREATE) # Set the jc2 to FINISHED jc2._set_state(calc_states.FINISHED) # Store the UUIDs of the nodes that should be checked # if they can be imported correctly. uuids2 = [jc2.uuid, sd3.uuid] filename2 = os.path.join(temp_folder, "export2.tar.gz") export([sd3.dbnode], outfile=filename2, silent=True) self.clean_db() self.insert_data() import_data(filename2, silent=True) # Check that the imported nodes are correctly imported and that # the user assigned to the nodes is the right one for uuid in uuids1: self.assertEquals(load_node(uuid).get_user().email, new_email) for uuid in uuids2: self.assertEquals(load_node(uuid).get_user().email, get_configured_user_email()) finally: # Deleting the created temporary folder shutil.rmtree(temp_folder, ignore_errors=True)
[docs] def test_7(self): """ This test checks that nodes that belong to a specific group are correctly imported and exported. """ import os import shutil import tempfile from aiida.orm import load_node from aiida.orm.calculation.job import JobCalculation from aiida.orm.data.structure import StructureData from aiida.orm.importexport import export from aiida.common.datastructures import calc_states from aiida.orm.user import User from aiida.orm.node import Node from aiida.orm.querybuilder import QueryBuilder # Creating a folder for the import/export files temp_folder = tempfile.mkdtemp() try: # Create another user new_email = "newuser@new.n" user = User(email=new_email) user.force_save() # Create a structure data node that has a calculation as output sd1 = StructureData() sd1.dbnode.user = user._dbuser sd1.label = 'sd1' sd1.store() jc1 = JobCalculation() jc1.set_computer(self.computer) jc1.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) jc1.dbnode.user = user._dbuser jc1.label = 'jc1' jc1.store() jc1.add_link_from(sd1) jc1._set_state(calc_states.PARSING) # Create a group and add the data inside from aiida.orm.group import Group g1 = Group(name="node_group") g1.store() g1.add_nodes([sd1, jc1]) # At this point we export the generated data filename1 = os.path.join(temp_folder, "export1.tar.gz") export([sd1.dbnode, jc1.dbnode, g1.dbgroup], outfile=filename1, silent=True) n_uuids = [sd1.uuid, jc1.uuid] self.clean_db() self.insert_data() import_data(filename1, silent=True) # Check that the imported nodes are correctly imported and that # the user assigned to the nodes is the right one for uuid in n_uuids: self.assertEquals(load_node(uuid).get_user().email, new_email) # Check that the exported group is imported correctly qb = QueryBuilder() qb.append(Group, filters={'uuid': {'==': g1.uuid}}) self.assertEquals(qb.count(), 1, "The group was not found.") finally: # Deleting the created temporary folder shutil.rmtree(temp_folder, ignore_errors=True)
[docs] def test_workfunction_1(self): import shutil, os, tempfile from aiida.work.workfunction import workfunction from aiida.orm.data.base import Float from aiida.orm import load_node from aiida.orm.importexport import export from aiida.common.exceptions import NotExistent # Creating a folder for the import/export files temp_folder = tempfile.mkdtemp() @workfunction def add(a, b): """Add 2 numbers""" return {'res':Float(a+b)} def max_(**kwargs): """select the max value""" max_val = max([(v.value, v) for v in kwargs.values()]) return {'res': max_val[1]} try: # I'm creating a bunch of nuimbers a, b, c, d, e = (Float(i) for i in range(5)) # this adds the maximum number between bcde to a. res = add(a=a,b=max_(b=b,c=c,d=d, e=e)['res'])['res'] # These are the uuids that would be exported as well (as parents) if I wanted the final result uuids_values = [(a.uuid, a.value), (e.uuid, e.value), (res.uuid, res.value)] # These are the uuids that shouldn't be exported since it's a selection. not_wanted_uuids = [v.uuid for v in (b,c,d)] # At this point we export the generated data filename1 = os.path.join(temp_folder, "export1.tar.gz") export([res.dbnode], outfile=filename1, silent=True) self.clean_db() self.insert_data() import_data(filename1, silent=True) # Check that the imported nodes are correctly imported and that the value is preserved for uuid, value in uuids_values: self.assertEquals(load_node(uuid).value, value) for uuid in not_wanted_uuids: with self.assertRaises(NotExistent): load_node(uuid) finally: # Deleting the created temporary folder shutil.rmtree(temp_folder, ignore_errors=True)
[docs] def test_workcalculation_2(self): import shutil, os, tempfile from aiida.orm.calculation.work import WorkCalculation from aiida.orm.data.base import Float, Int from aiida.orm import load_node from aiida.common.links import LinkType from aiida.orm.importexport import export from aiida.common.exceptions import NotExistent # Creating a folder for the import/export files temp_folder = tempfile.mkdtemp() try: master = WorkCalculation().store() slave = WorkCalculation().store() input_1 = Int(3).store() input_2 = Int(5).store() output_1 = Int(2).store() master.add_link_from(input_1, 'input_1', link_type=LinkType.INPUT) slave.add_link_from(master, 'CALL', link_type=LinkType.CALL) slave.add_link_from(input_2, 'input_2', link_type=LinkType.INPUT) output_1.add_link_from(master, 'CREATE', link_type=LinkType.CREATE) uuids_values = [(v.uuid, v.value) for v in (output_1, )] filename1 = os.path.join(temp_folder, "export1.tar.gz") export([output_1.dbnode], outfile=filename1,silent=True) self.clean_db() self.insert_data() import_data(filename1, silent=True) for uuid, value in uuids_values: self.assertEquals(load_node(uuid).value, value) finally: # Deleting the created temporary folder shutil.rmtree(temp_folder, ignore_errors=True)
[docs] def test_reexport(self): """ Export something, import and reexport and check if everything is valid. The export is rather easy:: ___ ___ ___ | | INP | | CREATE | | | p | --> | c | -----> | a | |___| |___| |___| """ import os, shutil, tempfile, numpy as np, string, random from datetime import datetime from aiida.orm import Calculation, load_node, Group from aiida.orm.data.array import ArrayData from aiida.orm.data.parameter import ParameterData from aiida.orm.querybuilder import QueryBuilder from aiida.orm.importexport import export from aiida.common.hashing import make_hash from aiida.common.links import LinkType def get_hash_from_db_content(groupname): qb = QueryBuilder() qb.append(ParameterData, tag='p', project='*') qb.append(Calculation,tag='c',project='*', edge_tag='p2c', edge_project=('label', 'type')) qb.append(ArrayData, tag='a',project='*', edge_tag='c2a', edge_project=('label', 'type')) qb.append(Group, filters={'name':groupname}, project='*', tag='g', group_of='a') # I want the query to contain something! self.assertTrue(qb.count() > 0) # The hash is given from the preservable entries in an export-import cycle, # uuids, attributes, labels, descriptions, arrays, link-labels, link-types: hash_ = make_hash([( item['p']['*'].get_attrs(), item['p']['*'].uuid, item['p']['*'].label, item['p']['*'].description, item['c']['*'].uuid, item['c']['*'].get_attrs(), item['a']['*'].get_attrs(), [item['a']['*'].get_array(name) for name in item['a']['*'].get_arraynames()], item['a']['*'].uuid, item['g']['*'].uuid, item['g']['*'].name, item['p2c']['label'], item['p2c']['type'], item['c2a']['label'], item['c2a']['type'], item['g']['*'].name, ) for item in qb.dict()]) return hash_ # Creating a folder for the import/export files temp_folder = tempfile.mkdtemp() chars=string.ascii_uppercase + string.digits size=10 groupname='test-group' try: nparr = np.random.random((4,3,2)) trial_dict = {} # give some integers: trial_dict.update({str(k):np.random.randint(100) for k in range(10)}) # give some floats: trial_dict.update({str(k):np.random.random() for k in range(10,20)}) # give some booleans: trial_dict.update({str(k):bool(np.random.randint(1)) for k in range(20,30)}) # give some datetime: trial_dict.update({str(k):datetime( year=2017, month=np.random.randint(1,12), day=np.random.randint(1,28)) for k in range(30,40)}) # give some text: trial_dict.update({str(k):''.join(random.choice(chars) for _ in range(size)) for k in range(20,30)}) p = ParameterData(dict=trial_dict) p.label = str(datetime.now()) p.description = 'd_' + str(datetime.now()) p.store() c = Calculation() # setting also trial dict as attributes, but randomizing the keys) (c._set_attr(str(int(k)+np.random.randint(10)),v) for k,v in trial_dict.items()) c.store() a = ArrayData() a.set_array('array', nparr) a.store() # LINKS # the calculation has input the parameters-instance c.add_link_from(p, label='input_parameters', link_type=LinkType.INPUT) # I want the array to be an output of the calculation a.add_link_from(c, label='output_array', link_type=LinkType.CREATE) g = Group(name='test-group') g.store() g.add_nodes(a) hash_from_dbcontent = get_hash_from_db_content(groupname) # I export and reimport 3 times in a row: for i in range(3): # Always new filename: filename = os.path.join(temp_folder, "export-{}.zip".format(i)) # Loading the group from the string g = Group.get_from_string(groupname) # exporting based on all members of the group # this also checks if group memberships are preserved! export([g.dbgroup]+[n.dbnode for n in g.nodes], outfile=filename, silent=True) # cleaning the DB! self.clean_db() # reimporting the data from the file import_data(filename, silent=True, ignore_unknown_nodes=True) # creating the hash from db content new_hash = get_hash_from_db_content(groupname) # I check for equality against the first hash created, which implies that hashes # are equal in all iterations of this process self.assertEqual(hash_from_dbcontent, new_hash) finally: # Deleting the created temporary folder shutil.rmtree(temp_folder, ignore_errors=True)
[docs]class TestComplex(AiidaTestCase):
[docs] def test_complex_graph_import_export(self): """ This test checks that a small and bit complex graph can be correctly exported and imported. It will create the graph, store it to the database, export it to a file and import it. In the end it will check if the initial nodes are present at the imported graph. """ import tempfile import shutil import os from aiida.orm.calculation.job import JobCalculation from aiida.orm.data.folder import FolderData from aiida.orm.data.parameter import ParameterData from aiida.orm.data.remote import RemoteData from aiida.common.links import LinkType from aiida.orm.importexport import export, import_data from aiida.orm.utils import load_node from aiida.common.exceptions import NotExistent temp_folder = tempfile.mkdtemp() try: calc1 = JobCalculation() calc1.set_computer(self.computer) calc1.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) calc1.label = "calc1" calc1.store() calc1._set_state(u'RETRIEVING') pd1 = ParameterData() pd1.label = "pd1" pd1.store() pd2 = ParameterData() pd2.label = "pd2" pd2.store() rd1 = RemoteData() rd1.label = "rd1" rd1.set_remote_path("/x/y.py") rd1.set_computer(self.computer) rd1.store() rd1.add_link_from(calc1, link_type=LinkType.CREATE) calc2 = JobCalculation() calc2.set_computer(self.computer) calc2.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) calc2.label = "calc2" calc2.store() calc2.add_link_from(pd1, link_type=LinkType.INPUT) calc2.add_link_from(pd2, link_type=LinkType.INPUT) calc2.add_link_from(rd1, link_type=LinkType.INPUT) calc2._set_state(u'SUBMITTING') fd1 = FolderData() fd1.label = "fd1" fd1.store() fd1.add_link_from(calc2, link_type=LinkType.CREATE) node_uuids_labels = {calc1.uuid: calc1.label, pd1.uuid: pd1.label, pd2.uuid: pd2.label, rd1.uuid: rd1.label, calc2.uuid: calc2.label, fd1.uuid: fd1.label} filename = os.path.join(temp_folder, "export.tar.gz") export([fd1.dbnode], outfile=filename, silent=True) self.clean_db() import_data(filename, silent=True, ignore_unknown_nodes=True) for uuid, label in node_uuids_labels.iteritems(): try: load_node(uuid) except NotExistent: self.fail("Node with UUID {} and label {} was not " "found.".format(uuid, label)) finally: # Deleting the created temporary folder shutil.rmtree(temp_folder, ignore_errors=True)
[docs]class TestComputer(AiidaTestCase):
[docs] def setUp(self): self.clean_db() self.insert_data()
[docs] def tearDown(self): pass
[docs] def test_same_computer_import(self): """ Test that you can import nodes in steps without any problems. In this test we will import a first calculation and then a second one. The import should work as expected and have in the end two job calculations. Each calculation is related to the same computer. In the end we should have only one computer """ import os import shutil import tempfile from aiida.orm.importexport import export from aiida.orm.querybuilder import QueryBuilder from aiida.orm.computer import Computer from aiida.orm.calculation.job import JobCalculation # Creating a folder for the import/export files export_file_tmp_folder = tempfile.mkdtemp() unpack_tmp_folder = tempfile.mkdtemp() try: # Store two job calculation related to the same computer calc1_label = "calc1" calc1 = JobCalculation() calc1.set_computer(self.computer) calc1.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) calc1.label = calc1_label calc1.store() calc1._set_state(u'RETRIEVING') calc2_label = "calc2" calc2 = JobCalculation() calc2.set_computer(self.computer) calc2.set_resources({"num_machines": 2, "num_mpiprocs_per_machine": 2}) calc2.label = calc2_label calc2.store() calc2._set_state(u'RETRIEVING') # Store locally the computer name comp_name = unicode(self.computer.name) comp_uuid = unicode(self.computer.uuid) # Export the first job calculation filename1 = os.path.join(export_file_tmp_folder, "export1.tar.gz") export([calc1.dbnode], outfile=filename1, silent=True) # Export the second job calculation filename2 = os.path.join(export_file_tmp_folder, "export2.tar.gz") export([calc2.dbnode], outfile=filename2, silent=True) # Clean the local database self.clean_db() # Check that there are no computers qb = QueryBuilder() qb.append(Computer, project=['*']) self.assertEqual(qb.count(), 0, "There should not be any computers" "in the database at this point.") # Check that there are no calculations qb = QueryBuilder() qb.append(JobCalculation, project=['*']) self.assertEqual(qb.count(), 0, "There should not be any " "calculations in the database at " "this point.") # Import the first calculation import_data(filename1, silent=True) # Check that the calculation computer is imported correctly. qb = QueryBuilder() qb.append(JobCalculation, project=['label']) self.assertEqual(qb.count(), 1, "Only one calculation should be " "found.") self.assertEqual(unicode(qb.first()[0]), calc1_label, "The calculation label is not correct.") # Check that the referenced computer is imported correctly. qb = QueryBuilder() qb.append(Computer, project=['name', 'uuid', 'id']) self.assertEqual(qb.count(), 1, "Only one computer should be " "found.") self.assertEqual(unicode(qb.first()[0]), comp_name, "The computer name is not correct.") self.assertEqual(unicode(qb.first()[1]), comp_uuid, "The computer uuid is not correct.") # Store the id of the computer comp_id = qb.first()[2] # Import the second calculation import_data(filename2, silent=True) # Check that the number of computers remains the same and its data # did not change. qb = QueryBuilder() qb.append(Computer, project=['name', 'uuid', 'id']) self.assertEqual(qb.count(), 1, "Only one computer should be " "found.") self.assertEqual(unicode(qb.first()[0]), comp_name, "The computer name is not correct.") self.assertEqual(unicode(qb.first()[1]), comp_uuid, "The computer uuid is not correct.") self.assertEqual(qb.first()[2], comp_id, "The computer id is not correct.") # Check that now you have two calculations attached to the same # computer. qb = QueryBuilder() qb.append(Computer, tag='comp') qb.append(JobCalculation, has_computer='comp', project=['label']) self.assertEqual(qb.count(), 2, "Two calculations should be " "found.") ret_labels = set(_ for [_] in qb.all()) self.assertEqual(ret_labels, set([calc1_label, calc2_label]), "The labels of the calculations are not correct.") finally: # Deleting the created temporary folders shutil.rmtree(export_file_tmp_folder, ignore_errors=True) shutil.rmtree(unpack_tmp_folder, ignore_errors=True)
[docs] def test_same_computer_different_name_import(self): """ This test checks that if the computer is re-imported with a different name to the same database, then the original computer will not be renamed. It also checks that the names were correctly imported (without any change since there is no computer name collision) """ import os import shutil import tempfile from aiida.orm.importexport import export from aiida.orm.querybuilder import QueryBuilder from aiida.orm.computer import Computer from aiida.orm.calculation.job import JobCalculation # Creating a folder for the import/export files export_file_tmp_folder = tempfile.mkdtemp() unpack_tmp_folder = tempfile.mkdtemp() try: # Store a calculation calc1_label = "calc1" calc1 = JobCalculation() calc1.set_computer(self.computer) calc1.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) calc1.label = calc1_label calc1.store() calc1._set_state(u'RETRIEVING') # Store locally the computer name comp1_name = unicode(self.computer.name) # Export the first job calculation filename1 = os.path.join(export_file_tmp_folder, "export1.tar.gz") export([calc1.dbnode], outfile=filename1, silent=True) # Rename the computer self.computer.set_name(comp1_name + "_updated") # Store a second calculation calc2_label = "calc2" calc2 = JobCalculation() calc2.set_computer(self.computer) calc2.set_resources({"num_machines": 2, "num_mpiprocs_per_machine": 2}) calc2.label = calc2_label calc2.store() calc2._set_state(u'RETRIEVING') # Export the second job calculation filename2 = os.path.join(export_file_tmp_folder, "export2.tar.gz") export([calc2.dbnode], outfile=filename2, silent=True) # Clean the local database self.clean_db() # Check that there are no computers qb = QueryBuilder() qb.append(Computer, project=['*']) self.assertEqual(qb.count(), 0, "There should not be any computers" "in the database at this point.") # Check that there are no calculations qb = QueryBuilder() qb.append(JobCalculation, project=['*']) self.assertEqual(qb.count(), 0, "There should not be any " "calculations in the database at " "this point.") # Import the first calculation import_data(filename1, silent=True) # Check that the calculation computer is imported correctly. qb = QueryBuilder() qb.append(JobCalculation, project=['label']) self.assertEqual(qb.count(), 1, "Only one calculation should be " "found.") self.assertEqual(unicode(qb.first()[0]), calc1_label, "The calculation label is not correct.") # Check that the referenced computer is imported correctly. qb = QueryBuilder() qb.append(Computer, project=['name', 'uuid', 'id']) self.assertEqual(qb.count(), 1, "Only one computer should be " "found.") self.assertEqual(unicode(qb.first()[0]), comp1_name, "The computer name is not correct.") # Import the second calculation import_data(filename2, silent=True) # Check that the number of computers remains the same and its data # did not change. qb = QueryBuilder() qb.append(Computer, project=['name']) self.assertEqual(qb.count(), 1, "Only one computer should be " "found.") self.assertEqual(unicode(qb.first()[0]), comp1_name, "The computer name is not correct.") finally: # Deleting the created temporary folders shutil.rmtree(export_file_tmp_folder, ignore_errors=True) shutil.rmtree(unpack_tmp_folder, ignore_errors=True)
[docs] def test_different_computer_same_name_import(self): """ This test checks that if there is a name collision, the imported computers are renamed accordingly. """ import os import shutil import tempfile from aiida.orm.importexport import export from aiida.orm.querybuilder import QueryBuilder from aiida.orm.computer import Computer from aiida.orm.calculation.job import JobCalculation from aiida.orm.importexport import COMP_DUPL_SUFFIX # Creating a folder for the import/export files export_file_tmp_folder = tempfile.mkdtemp() unpack_tmp_folder = tempfile.mkdtemp() try: # Set the computer name comp1_name = "localhost_1" self.computer.set_name(comp1_name) # Store a calculation calc1_label = "calc1" calc1 = JobCalculation() calc1.set_computer(self.computer) calc1.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) calc1.label = calc1_label calc1.store() calc1._set_state(u'RETRIEVING') # Export the first job calculation filename1 = os.path.join(export_file_tmp_folder, "export1.tar.gz") export([calc1.dbnode], outfile=filename1, silent=True) # Reset the database self.clean_db() self.insert_data() # Set the computer name to the same name as before self.computer.set_name(comp1_name) # Store a second calculation calc2_label = "calc2" calc2 = JobCalculation() calc2.set_computer(self.computer) calc2.set_resources({"num_machines": 2, "num_mpiprocs_per_machine": 2}) calc2.label = calc2_label calc2.store() calc2._set_state(u'RETRIEVING') # Export the second job calculation filename2 = os.path.join(export_file_tmp_folder, "export2.tar.gz") export([calc2.dbnode], outfile=filename2, silent=True) # Reset the database self.clean_db() self.insert_data() # Set the computer name to the same name as before self.computer.set_name(comp1_name) # Store a third calculation calc3_label = "calc3" calc3 = JobCalculation() calc3.set_computer(self.computer) calc3.set_resources({"num_machines": 2, "num_mpiprocs_per_machine": 2}) calc3.label = calc3_label calc3.store() calc3._set_state(u'RETRIEVING') # Export the third job calculation filename3 = os.path.join(export_file_tmp_folder, "export3.tar.gz") export([calc3.dbnode], outfile=filename3, silent=True) # Clean the local database self.clean_db() # Check that there are no computers qb = QueryBuilder() qb.append(Computer, project=['*']) self.assertEqual(qb.count(), 0, "There should not be any computers" "in the database at this point.") # Check that there are no calculations qb = QueryBuilder() qb.append(JobCalculation, project=['*']) self.assertEqual(qb.count(), 0, "There should not be any " "calculations in the database at " "this point.") # Import all the calculations import_data(filename1, silent=True) import_data(filename2, silent=True) import_data(filename3, silent=True) # Retrieve the calculation-computer pairs qb = QueryBuilder() qb.append(JobCalculation, project=['label'], tag='jcalc') qb.append(Computer, project=['name'], computer_of='jcalc') self.assertEqual(qb.count(), 3, "Three combinations expected.") res = qb.all() self.assertIn([calc1_label, comp1_name], res, "Calc-Computer combination not found.") self.assertIn([calc2_label, comp1_name + COMP_DUPL_SUFFIX.format(0)], res, "Calc-Computer combination not found.") self.assertIn([calc3_label, comp1_name + COMP_DUPL_SUFFIX.format(1)], res, "Calc-Computer combination not found.") finally: # Deleting the created temporary folders shutil.rmtree(export_file_tmp_folder, ignore_errors=True) shutil.rmtree(unpack_tmp_folder, ignore_errors=True)
[docs] def test_correct_import_of_computer_json_params(self): """ This test checks that the metadata and transport params are exported and imported correctly in both backends. """ import os import shutil import tempfile from aiida.orm.importexport import export from aiida.orm.querybuilder import QueryBuilder from aiida.orm.computer import Computer from aiida.orm.calculation.job import JobCalculation # Creating a folder for the import/export files export_file_tmp_folder = tempfile.mkdtemp() unpack_tmp_folder = tempfile.mkdtemp() try: # Set the computer name comp1_name = "localhost_1" comp1_metadata = { u'workdir': u'/tmp/aiida' } comp1_transport_params = { u'key1': u'value1', u'key2': 2 } self.computer.set_name(comp1_name) self.computer._set_metadata(comp1_metadata) self.computer.set_transport_params(comp1_transport_params) # Store a calculation calc1_label = "calc1" calc1 = JobCalculation() calc1.set_computer(self.computer) calc1.set_resources({"num_machines": 1, "num_mpiprocs_per_machine": 1}) calc1.label = calc1_label calc1.store() calc1._set_state(u'RETRIEVING') # Export the first job calculation filename1 = os.path.join(export_file_tmp_folder, "export1.tar.gz") export([calc1.dbnode], outfile=filename1, silent=True) # Clean the local database self.clean_db() # Import the data import_data(filename1, silent=True) qb = QueryBuilder() qb.append(Computer, project=['transport_params', '_metadata'], tag="comp") self.assertEqual(qb.count(), 1, "Expected only one computer") res = qb.dict()[0] self.assertEqual(res['comp']['transport_params'], comp1_transport_params, "Not the expected transport parameters " "were found") self.assertEqual(res['comp']['_metadata'], comp1_metadata, "Not the expected metadata were found") finally: # Deleting the created temporary folders shutil.rmtree(export_file_tmp_folder, ignore_errors=True) shutil.rmtree(unpack_tmp_folder, ignore_errors=True)
[docs] def test_import_of_django_sqla_export_file(self): """ Check why sqla import manages to import the django export file correctly """ import inspect import os from aiida.orm.querybuilder import QueryBuilder from aiida.orm.computer import Computer for filename in ('export_dj_comp_test.aiida', 'export_sqla_comp_test.aiida'): curr_path = inspect.getfile(inspect.currentframe()) folder_path = os.path.dirname(curr_path) relative_folder_path = ("export_import_test_files/" + filename) test_file_path = os.path.join(folder_path, relative_folder_path) # Clean the database self.clean_db() # Import the needed data import_data(test_file_path, silent=True) # The expected metadata & transport parameters comp1_metadata = { u'workdir': u'/tmp/aiida' } comp1_transport_params = { u'key1': u'value1', u'key2': 2 } # Check that we got the correct metadata & transport parameters qb = QueryBuilder() qb.append(Computer, project=['transport_params', '_metadata'], tag="comp") self.assertEqual(qb.count(), 1, "Expected only one computer") res = qb.dict()[0] self.assertEqual(res['comp']['transport_params'], comp1_transport_params, "Not the expected transport parameters " "were found") self.assertEqual(res['comp']['_metadata'], comp1_metadata, "Not the expected metadata were found")