Source code for aiida.cmdline.commands.data

# -*- coding: utf-8 -*-
import sys

from aiida.cmdline.baseclass import (
    VerdiCommandRouter, VerdiCommandWithSubcommands)
from aiida import load_dbenv
from aiida.common.exceptions import MultipleObjectsError
from aiida.cmdline.commands.node import _Label, _Description
from aiida.orm import load_node

__copyright__ = u"Copyright (c), 2015, ECOLE POLYTECHNIQUE FEDERALE DE LAUSANNE (Theory and Simulation of Materials (THEOS) and National Centre for Computational Design and Discovery of Novel Materials (NCCR MARVEL)), Switzerland and ROBERT BOSCH LLC, USA. All rights reserved."
__license__ = "MIT license, see LICENSE.txt file"
__version__ = "0.5.0"
__contributors__ = "Andrea Cepellotti, Andrius Merkys, Giovanni Pizzi, Leonid Kahle, Marco Gibertini, Martin Uhrin, Nicolas Mounet"


[docs]class Data(VerdiCommandRouter): """ Setup and manage data specific types There is a list of subcommands for managing specific types of data. For instance, 'data upf' manages pseudopotentials in the UPF format. """
[docs] def __init__(self): """ A dictionary with valid commands and functions to be called. """ ## Add here the classes to be supported. self.routed_subcommands = { 'upf': _Upf, 'structure': _Structure, 'bands': _Bands, 'cif': _Cif, 'trajectory': _Trajectory, 'parameter': _Parameter, 'array': _Array, 'label': _Label, 'description': _Description, }
[docs]class Listable(object): """ Provides shell completion for listable data nodes. .. note:: classes, inheriting Listable, MUST define value for property :py:class:`dataclass` (preferably in :py:class:`__init__`), which has to point to correct \*Data class. """
[docs] def list(self, *args): """ List all instances of given data class. :param args: a list of command line arguments. """ import argparse parser = argparse.ArgumentParser( prog=self.get_full_command_name(), description='List {} objects.'.format(self.dataclass.__name__)) self.append_list_cmdline_arguments(parser) parser.add_argument('--vseparator', default="\t", help="specify vertical separator for fields. " "Default '\\t'.", type=str, action='store') parser.add_argument('--header', default=True, help="print a header with column names. " "Default option.", dest="header", action='store_true') parser.add_argument('--no-header', '-H', help="do not print a header with column names.", dest="header", action='store_false') args = list(args) parsed_args = parser.parse_args(args) entry_list = self.query(parsed_args) vsep = parsed_args.vseparator if entry_list: to_print = "" if parsed_args.header: to_print += vsep.join(self.get_column_names()) + "\n" for entry in sorted(entry_list, key=lambda x: int(x[0])): to_print += vsep.join(entry) + "\n" sys.stdout.write(to_print)
[docs] def query(self, args): """ Perform the query and return information for the list. :param args: a namespace with parsed command line parameters. :return: table (list of lists) with information, describing nodes. Each row describes a single hit. """ load_dbenv() from aiida.orm import DataFactory from django.db.models import Q from aiida.djsite.utils import get_automatic_user q_object = None if args.all_users is False: q_object = Q(user=get_automatic_user()) else: q_object = Q() self.query_past_days(q_object, args) self.query_group(q_object, args) object_list = self.dataclass.query(q_object).distinct().order_by('ctime') entry_list = [] for obj in object_list: entry_list.append([str(obj.pk)]) return entry_list
[docs] def query_past_days(self, q_object, args): """ Subselect to filter data nodes by their age. :param q_object: a query object :param args: a namespace with parsed command line parameters. """ from django.utils import timezone from django.db.models import Q import datetime if args.past_days is not None: now = timezone.now() n_days_ago = now - datetime.timedelta(days=args.past_days) q_object.add(Q(ctime__gte=n_days_ago), Q.AND)
[docs] def query_group(self, q_object, args): """ Subselect to filter data nodes by their group. :param q_object: a query object :param args: a namespace with parsed command line parameters. """ from django.db.models import Q if args.group_name is not None: q_object.add(Q(dbgroups__name__in=args.group_name), Q.AND) if args.group_pk is not None: q_object.add(Q(dbgroups__pk__in=args.group_pk), Q.AND)
[docs] def append_list_cmdline_arguments(self, parser): """ Append additional command line parameters, that are later parsed and used in the query construction. :param parser: instance of argparse.ArgumentParser """ parser.add_argument('-p', '--past-days', metavar='N', help="add a filter to show only objects created in the past N days", type=int, action='store') parser.add_argument('-g', '--group-name', metavar='N', nargs="+", default=None, help="add a filter to show only objects belonging to groups", type=str, action='store') parser.add_argument('-G', '--group-pk', metavar='N', nargs="+", default=None, help="add a filter to show only objects belonging to groups", type=int, action='store') parser.add_argument('-A', '--all-users', action='store_true', default=False, help="show groups for all users, rather than only for the" "current user")
[docs] def get_column_names(self): """ Return the list with column names. .. note:: neither the number nor correspondence of column names and actual columns in the output from the :py:class:`query` are checked. """ return ["ID"]
[docs]class Visualizable(object): """ Provides shell completion for visualizable data nodes. .. note:: classes, inheriting Visualizable, MUST NOT contain attributes, starting with ``_show_``, which are not plugins for visualization. In order to specify a default visualization format, one has to override :py:class:`_default_show_format` property (preferably in :py:class:`__init__`), setting it to the name of default visualization tool. """ show_prefix = '_show_' show_parameters_postfix = '_parameters'
[docs] def get_show_plugins(self): """ Get the list of all implemented plugins for visualizing the structure. """ method_names = dir(self) # get list of class methods names valid_formats = [i[len(self.show_prefix):] for i in method_names if i.startswith(self.show_prefix) and \ not i.endswith(self.show_parameters_postfix)] # filter return {k: getattr(self, self.show_prefix + k) for k in valid_formats}
[docs] def show(self, *args): """ Show the data node with a visualization program. """ # DEVELOPER NOTE: to add a new plugin, just add a _show_xxx() method. import argparse, os parser = argparse.ArgumentParser( prog=self.get_full_command_name(), description='Visualize data object.') parser.add_argument('data_id', type=int, default=None, nargs="+", help="ID of the data object to be visualized.") default_format = None try: default_format = self._default_show_format except AttributeError: if len(self.get_show_plugins().keys()) == 1: default_format = self.get_show_plugins().keys()[0] else: default_format = None parser.add_argument('--format', '-f', type=str, default=default_format, help="Type of the visualization format/tool.", choices=self.get_show_plugins().keys()) # Augmenting the command line parameters with ones, that are used by # individual plugins for cmd in dir(self): if not cmd.startswith(self.show_prefix) or \ not cmd.endswith(self.show_parameters_postfix): continue getattr(self, cmd)(parser) args = list(args) parsed_args = vars(parser.parse_args(args)) data_id = parsed_args.pop('data_id') format = parsed_args.pop('format') # Removing the keys, whose values are None for key in parsed_args.keys(): if parsed_args[key] is None: parsed_args.pop(key) if format is None: print "Default format is not defined, please specify.\n" + \ "Valid formats are:" for i in self.get_show_plugins().keys(): print " {}".format(i) sys.exit(0) # I can give in input the whole path to executable code_name = os.path.split(format)[-1] try: func = self.get_show_plugins()[code_name] except KeyError: print "Not implemented; implemented plugins are:" print "{}.".format(",".join(self.get_show_plugins())) sys.exit(1) load_dbenv() # from aiida.orm.node import Node n_list = [load_node(id) for id in data_id] for n in n_list: try: if not isinstance(n, self.dataclass): print("Node {} is of class {} instead " "of {}".format(n, type(n), self.dataclass)) sys.exit(1) except AttributeError: pass try: func(format, n_list, **parsed_args) except MultipleObjectsError: print("Visualization of multiple objects is not implemented " "for '{}'".format(format)) sys.exit(1)
[docs]class Exportable(object): """ Provides shell completion for exportable data nodes. .. note:: classes, inheriting Exportable, MUST NOT contain attributes, starting with ``_export_``, which are not plugins for exporting. """ export_prefix = '_export_' export_parameters_postfix = '_parameters'
[docs] def get_export_plugins(self): """ Get the list of all implemented exporters for data class. """ method_names = dir(self) # get list of class methods names valid_formats = [i[len(self.export_prefix):] for i in method_names if i.startswith(self.export_prefix) and \ not i.endswith(self.export_parameters_postfix)] # filter return {k: getattr(self, self.export_prefix + k) for k in valid_formats}
[docs] def export(self, *args): """ Export the data node to a given format. """ # DEVELOPER NOTE: to add a new plugin, just add a _export_xxx() method. import argparse, os parser = argparse.ArgumentParser( prog=self.get_full_command_name(), description='Export data object.') parser.add_argument('data_id', type=int, default=None, help="ID of the data object to be visualized.") default_format = None try: default_format = self._default_export_format except AttributeError: if len(self.get_export_plugins().keys()) == 1: default_format = self.get_export_plugins().keys()[0] else: default_format = None parser.add_argument('--format', '-f', type=str, default=default_format, help="Type of the exported file.", choices=self.get_export_plugins().keys()) # Augmenting the command line parameters with ones, that are used by # individual plugins for cmd in dir(self): if not cmd.startswith(self.export_prefix) or \ not cmd.endswith(self.export_parameters_postfix): continue getattr(self, cmd)(parser) args = list(args) parsed_args = vars(parser.parse_args(args)) format = parsed_args.pop('format') data_id = parsed_args.pop('data_id') # Removing the keys, whose values are None for key in parsed_args.keys(): if parsed_args[key] is None: parsed_args.pop(key) if format is None: print "Default format is not defined, please specify.\n" + \ "Valid formats are:" for i in self.get_export_plugins().keys(): print " {}".format(i) sys.exit(0) try: func = self.get_export_plugins()[format] except KeyError: print "Not implemented; implemented plugins are:" print "{}.".format(",".join(self.get_export_plugins())) sys.exit(1) load_dbenv() # from aiida.orm.node import Node n = load_node(data_id) try: if not isinstance(n, self.dataclass): print("Node {} is of class {} instead " "of {}".format(n, type(n), self.dataclass)) sys.exit(1) except AttributeError: pass func(n, **parsed_args)
[docs]class Importable(object): """ Provides shell completion for importable data nodes. .. note:: classes, inheriting Importable, MUST NOT contain attributes, starting with ``_import_``, which are not plugins for importing. """ import_prefix = '_import_' import_parameters_postfix = '_parameters'
[docs] def get_import_plugins(self): """ Get the list of all implemented importers for data class. """ method_names = dir(self) # get list of class methods names valid_formats = [i[len(self.import_prefix):] for i in method_names if i.startswith(self.import_prefix) and \ not i.endswith(self.import_parameters_postfix)] # filter return {k: getattr(self, self.import_prefix + k) for k in valid_formats}
def importfile(self, *args): import argparse, os, sys parser = argparse.ArgumentParser( prog=self.get_full_command_name(), description='Import data object.') parser.add_argument('--file', type=str, default=None, help="Path of the imported file. Reads from " "standard input if not specified.") default_format = None try: default_format = self._default_import_format except AttributeError: if len(self.get_import_plugins().keys()) == 1: default_format = self.get_import_plugins().keys()[0] else: default_format = None parser.add_argument('--format', '-f', type=str, default=default_format, help="Type of the imported file.", choices=self.get_import_plugins().keys()) # Augmenting the command line parameters with ones, that are used by # individual plugins for cmd in dir(self): if not cmd.startswith(self.import_prefix) or \ not cmd.endswith(self.import_parameters_postfix): continue getattr(self, cmd)(parser) args = list(args) parsed_args = vars(parser.parse_args(args)) format = parsed_args.pop('format') filename = parsed_args.pop('file') if format is None: print "Default format is not defined, please specify.\n" + \ "Valid formats are:" for i in self.get_import_plugins().keys(): print " {}".format(i) sys.exit(0) if not filename: filename = "/dev/stdin" try: func = self.get_import_plugins()[format] except KeyError: print "Not implemented; implemented plugins are:" print "{}.".format(",".join(self.get_import_plugins())) sys.exit(1) load_dbenv() func(filename, **parsed_args)
[docs]class Depositable(object): """ Provides shell completion for depositable data nodes. .. note:: classes, inheriting Depositable, MUST NOT contain attributes, starting with ``_deposit_``, which are not plugins for depositing. """ deposit_prefix = '_deposit_' deposit_parameters_postfix = '_parameters'
[docs] def get_deposit_plugins(self): """ Get the list of all implemented deposition methods for data class. """ method_names = dir(self) # get list of class methods names valid_formats = [ i[len(self.deposit_prefix):] for i in method_names if i.startswith(self.deposit_prefix) and \ not i.endswith(self.deposit_parameters_postfix)] # filter return {k: getattr(self,self.deposit_prefix + k) for k in valid_formats}
[docs] def deposit(self, *args): """ Deposit the data node to a given database. :param args: a namespace with parsed command line parameters. """ # DEVELOPER NOTE: to add a new plugin, just add a _deposit_xxx() method. import argparse,os parser = argparse.ArgumentParser( prog=self.get_full_command_name(), description='Deposit data object.') parser.add_argument('data_id', type=int, default=None, help="ID of the data object to be deposited.") default_database = None try: default_database = self._default_deposition_database except AttributeError: if len(self.get_deposit_plugins().keys()) == 1: default_database = self.get_deposit_plugins().keys()[0] else: default_database = None parser.add_argument('--database', '-d', type=str, default=default_database, help="Label of the database for deposition.", choices=self.get_deposit_plugins().keys()) # Augmenting the command line parameters with ones, that are used by # individual plugins for cmd in dir(self): if not cmd.startswith(self.deposit_prefix) or \ not cmd.endswith(self.deposit_parameters_postfix): continue getattr(self,cmd)(parser) args = list(args) parsed_args = vars(parser.parse_args(args)) database = parsed_args.pop('database') data_id = parsed_args.pop('data_id') # Removing the keys, whose values are None for key in parsed_args.keys(): if parsed_args[key] is None: parsed_args.pop(key) if database is None: print "Default database is not defined, please specify.\n" + \ "Valid databases are:" for i in self.get_deposit_plugins().keys(): print " {}".format(i) sys.exit(0) try: func = self.get_deposit_plugins()[database] except KeyError: print "Not implemented; implemented plugins are:" print "{}.".format(",".join(self.get_deposit_plugins())) sys.exit(1) load_dbenv() n = load_node(data_id) try: if not isinstance(n,self.dataclass): print("Node {} is of class {} instead " "of {}".format(n,type(n),self.dataclass)) sys.exit(1) except AttributeError: pass calc = func(n,**parsed_args) print calc # Note: this class should not be exposed directly in the main module, # otherwise it becomes a command of 'verdi'. Instead, we want it to be a # subcommand of verdi data.
class _Upf(VerdiCommandWithSubcommands, Importable): """ Setup and manage upf to be used This command allows to list and configure upf. """ def __init__(self): """ A dictionary with valid commands and functions to be called. """ from aiida.orm.data.upf import UpfData self.dataclass = UpfData self.valid_subcommands = { 'uploadfamily': (self.uploadfamily, self.complete_auto), 'listfamilies': (self.listfamilies, self.complete_none), 'import': (self.importfile, self.complete_none), 'exportfamily': (self.exportfamily, self.complete_auto) } def uploadfamily(self, *args): """ Upload a new pseudopotential family. Returns the numbers of files found and the number of nodes uploaded. Call without parameters to get some help. """ import os.path if not len(args) == 3 and not len(args) == 4: print >> sys.stderr, ("After 'upf uploadfamily' there should be three " "arguments:") print >> sys.stderr, ("folder, group_name, group_description " "[OPTIONAL: --stop-if-existing]\n") sys.exit(1) folder = os.path.abspath(args[0]) group_name = args[1] group_description = args[2] stop_if_existing = False if len(args) == 4: if args[3] == "--stop-if-existing": stop_if_existing = True else: print >> sys.stderr, 'Unknown directive: ' + args[3] sys.exit(1) if (not os.path.isdir(folder)): print >> sys.stderr, 'Cannot find directory: ' + folder sys.exit(1) load_dbenv() import aiida.orm.data.upf as upf files_found, files_uploaded = upf.upload_upf_family(folder, group_name, group_description, stop_if_existing) print "UPF files found: {}. New files uploaded: {}".format(files_found, files_uploaded) def listfamilies(self, *args): """ Print on screen the list of upf families installed """ # note that the following command requires that the upfdata has a # key called element. As such, it is not well separated. import argparse from aiida.orm.data.upf import UPFGROUP_TYPE parser = argparse.ArgumentParser( prog=self.get_full_command_name(), description='List AiiDA upf families.') parser.add_argument('-e', '--element', nargs='+', type=str, default=None, help="Filter the families only to those containing " "a pseudo for each of the specified elements") parser.add_argument('-d', '--with-description', dest='with_description', action='store_true', help="Show also the description for the UPF family") parser.set_defaults(with_description=False) args = list(args) parsed_args = parser.parse_args(args) load_dbenv() from aiida.orm import DataFactory UpfData = DataFactory('upf') groups = UpfData.get_upf_groups(filter_elements=parsed_args.element) if groups: for g in groups: pseudos = UpfData.query(dbgroups=g.dbgroup).distinct() num_pseudos = pseudos.count() pseudos_list = pseudos.filter( dbattributes__key="element").values_list( 'dbattributes__tval', flat=True) new_ps = pseudos.filter( dbattributes__key="element").values_list( 'dbattributes__tval', flat=True) if parsed_args.with_description: description_string = ": {}".format(g.description) else: description_string = "" if num_pseudos != len(set(pseudos_list)): print ("x {} [INVALID: {} pseudos, for {} elements]{}" .format(g.name, num_pseudos, len(set(pseudos_list)), description_string)) print (" Maybe the pseudopotential family wasn't " "setup with the uploadfamily function?") else: print "* {} [{} pseudos]{}".format(g.name, num_pseudos, description_string) else: print "No valid UPF pseudopotential family found." def exportfamily(self, *args): """ Export a pseudopotential family into a folder. Call without parameters to get some help. """ import os from aiida.common.exceptions import NotExistent from aiida.orm import DataFactory load_dbenv() if not len(args) == 2: print >> sys.stderr, ("After 'upf export' there should be two " "arguments:") print >> sys.stderr, ("folder, upf_family_name\n") sys.exit(1) folder = os.path.abspath(args[0]) group_name = args[1] UpfData = DataFactory('upf') try: group = UpfData.get_upf_group(group_name) except NotExistent: print >> sys.stderr, ("upf family {} not found".format(group_name)) for u in group.nodes: dest_path = os.path.join(folder,u.filename) if not os.path.isfile(dest_path): with open(dest_path,'w') as dest: with u._get_folder_pathsubfolder.open(u.filename) as source: dest.write(source.read()) else: print >> sys.stdout, ("File {} is already present in the " "destination folder".format(u.filename)) def _import_upf(self, filename, **kwargs): """ Importer from UPF. """ try: node, _ = self.dataclass.get_or_create(filename) print node except ValueError as e: print e class _Bands(VerdiCommandWithSubcommands, Listable, Visualizable, Exportable): """ Manipulation on the bands """ def __init__(self): """ A dictionary with valid commands and functions to be called. """ from aiida.orm.data.array.bands import BandsData self.dataclass = BandsData self.valid_subcommands = { 'show': (self.show, self.complete_none), 'list': (self.list, self.complete_none), 'export': (self.export, self.complete_none), } def query(self, args): """ Perform the query and return information for the list. :param args: a namespace with parsed command line parameters. :return: table (list of lists) with information, describing nodes. Each row describes a single hit. """ load_dbenv() from collections import defaultdict from aiida.orm import DataFactory from django.db.models import Q from aiida.djsite.utils import get_automatic_user from aiida.common.utils import grouper from aiida.orm.data.structure import (get_formula, get_symbols_string, has_vacancies) from aiida.djsite.db import models from aiida.orm import Node from aiida.djsite.db.models import DbPath query_group_size = 100 # we group the attribute query in chunks of this size StructureData = DataFactory('structure') BandsData = DataFactory('array.bands') # First, I run a query to get all BandsData of the past N days q_object = None if args.all_users is False: q_object = Q(user=get_automatic_user()) else: q_object = Q() self.query_past_days(q_object, args) self.query_group(q_object, args) bands_list = BandsData.query(q_object).distinct().order_by('ctime') bands_list_data = bands_list.values_list('pk', 'label', 'ctime') # split data in chunks grouped_bands_list_data = grouper(query_group_size, [(_[0], _[1], _[2]) for _ in bands_list_data]) entry_list = [] for this_chunk in grouped_bands_list_data: # gather all banddata pks pks = [_[0] for _ in this_chunk] # get all the StructureData that are parents of the selected bandsdatas q_object = Q(child__in=pks) if args.all_users is False: q_object.add(Q(child__user=get_automatic_user()), Q.AND) q_object.add(Q(parent__type='data.structure.StructureData.'), Q.AND) structure_list = DbPath.objects.filter(q_object).distinct() structure_list_data = structure_list.values_list('parent_id', 'child_id', 'depth') # select the pks of all structure involved # the right structure is chosen as the closest structure in the graph struc_pks = [] for band_pk in pks: try: struc_pks.append(min([_ for _ in structure_list_data if _[1] == band_pk], key=lambda x: x[-1] )[0] ) except ValueError: # no structure in input struc_pks.append(None) # query for the attributes needed for the structure formula attr_query = Q(key__startswith='kinds') | Q(key__startswith='sites') attrs = models.DbAttribute.objects.filter(attr_query, dbnode__in=struc_pks).values_list( 'dbnode__pk', 'key', 'datatype', 'tval', 'fval', 'ival', 'bval', 'dval') results = defaultdict(dict) for attr in attrs: results[attr[0]][attr[1]] = {"datatype": attr[2], "tval": attr[3], "fval": attr[4], "ival": attr[5], "bval": attr[6], "dval": attr[7]} # organize all of it in a dictionary deser_data = {} for k in results: deser_data[k] = models.deserialize_attributes(results[k], sep=models.DbAttribute._sep) # prepare the printout for ((band_pk, label, date), struc_pk) in zip(this_chunk, struc_pks): if struc_pk is not None: # Exclude structures by the elements if args.element is not None: all_kinds = [k['symbols'] for k in deser_data[struc_pk]['kinds']] all_symbols = [item for sublist in all_kinds for item in sublist] if not any([s in args.element for s in all_symbols] ): continue if args.element_only is not None: all_kinds = [k['symbols'] for k in deser_data[struc_pk]['kinds']] all_symbols = [item for sublist in all_kinds for item in sublist] if not all([s in all_symbols for s in args.element_only] ): continue # build the formula symbol_dict = {k['name']: get_symbols_string(k['symbols'], k['weights']) for k in deser_data[struc_pk]['kinds']} try: symbol_list = [symbol_dict[s['kind_name']] for s in deser_data[struc_pk]['sites']] formula = get_formula(symbol_list, mode=args.formulamode) # If for some reason there is no kind with the name # referenced by the site except KeyError: formula = "<<UNKNOWN>>" # cycle if we imposed the filter on elements if args.element is not None or args.element_only is not None: continue else: formula = "<<UNKNOWN>>" # cycle if we imposed the filter on elements if args.element is not None or args.element_only is not None: continue entry_list.append([str(band_pk), str(formula), date.strftime('%d %b %Y'), label]) return entry_list def append_list_cmdline_arguments(self, parser): """ Append additional command line parameters, that are later parsed and used in the query construction. :param parser: instance of argparse.ArgumentParser """ parser.add_argument('-e', '--element', nargs='+', type=str, default=None, help="Print all bandsdatas from structures " "containing desired elements") parser.add_argument('-eo', '--element-only', nargs='+', type=str, default=None, help="Print all bandsdatas from structures " "containing only the selected elements") parser.add_argument('-f', '--formulamode', metavar='FORMULA_MODE', type=str, default='hill', help="Formula printing mode (hill, hill_compact," " reduce, group, count, or count_compact)" " (if None, does not print the formula)", action='store') parser.add_argument('-p', '--past-days', metavar='N', help="Add a filter to show only bandsdatas created in the past N days", type=int, action='store') parser.add_argument('-g', '--group-name', metavar='N', nargs="+", default=None, help="add a filter to show only objects belonging to groups", type=str, action='store') parser.add_argument('-G', '--group-pk', metavar='N', nargs="+", default=None, help="add a filter to show only objects belonging to groups", type=int, action='store') parser.add_argument('-A', '--all-users', action='store_true', default=False, help="show groups for all users, rather than only for the" "current user") def get_column_names(self): """ Return the list with column names. :note: neither the number nor correspondence of column names and actual columns in the output from the query() are checked. """ return ["ID", "formula", "ctime", "label"] def _export_xmgrace(self, node): """ Export a .agr file, to be visualized with the XMGrace plotting software. """ agrtext = node._exportstring('agr') print agrtext def _export_dat_multicolumn(self, node): """ Export a .dat file with one line per kpoint, with multiple energy values on the same line separated by spaces. """ agrtext = node._exportstring('dat_1') print agrtext def _export_dat_blocks(self, node): """ Export a .dat file with one line per datapoint (kpt, energy), with multiple bands separated in stanzas (i.e. having at least an empty newline inbetween). """ agrtext = node._exportstring('dat_2') print agrtext def _show_xmgrace(self, exec_name, list_bands): """ Plugin for show the bands with the XMGrace plotting software. """ import tempfile, subprocess, numpy from aiida.orm.data.array.bands import max_num_agr_colors list_files = [] current_band_number = 0 for iband, bands in enumerate(list_bands): # extract number of bands nbnds = bands.get_bands().shape[1] text = bands._exportstring('agr', setnumber_offset=current_band_number, color_number=numpy.mod(iband + 1, max_num_agr_colors)) # write a tempfile f = tempfile.NamedTemporaryFile(suffix='.agr') f.write(text) f.flush() list_files.append(f) # update the number of bands already plotted current_band_number += nbnds try: subprocess.check_output([exec_name] + [f.name for f in list_files]) _ = [f.close() for f in list_files] except subprocess.CalledProcessError: # The program died: just print a message print "Note: the call to {} ended with an error.".format( exec_name) _ = [f.close() for f in list_files] except OSError as e: _ = [f.close() for f in list_files] if e.errno == 2: print ("No executable '{}' found. Add to the path, " "or try with an absolute path.".format( exec_name)) sys.exit(1) else: raise class _Structure(VerdiCommandWithSubcommands, Listable, Visualizable, Exportable, Importable, Depositable): """ Visualize AiIDA structures """ def __init__(self): """ A dictionary with valid commands and functions to be called. """ from aiida.orm.data.structure import StructureData self.dataclass = StructureData self.valid_subcommands = { 'show': (self.show, self.complete_none), 'list': (self.list, self.complete_none), 'export': (self.export, self.complete_none), 'deposit': (self.deposit, self.complete_none), 'import': (self.importfile, self.complete_none), } def query(self, args): """ Perform the query """ load_dbenv() from collections import defaultdict from aiida.orm import DataFactory from django.db.models import Q from aiida.djsite.utils import get_automatic_user from aiida.common.utils import grouper from aiida.orm.data.structure import (get_formula, get_symbols_string, has_vacancies) from aiida.djsite.db import models query_group_size = 100 # we group the attribute query in chunks of this size StructureData = DataFactory('structure') q_object = None if args.all_users is False: q_object = Q(user=get_automatic_user()) else: q_object = Q() self.query_past_days(q_object, args) self.query_group(q_object, args) if args.element is not None: q1 = models.DbAttribute.objects.filter(key__startswith='kinds.', key__contains='.symbols.', tval=args.element[0]) struc_list = StructureData.query(q_object, dbattributes__in=q1).distinct().order_by('ctime') if args.elementonly: print "Not implemented elementonly search" sys.exit(1) else: struc_list = StructureData.query(q_object).distinct().order_by('ctime') struc_list_data = struc_list.values_list('pk', 'label') # Used later for efficiency reasons struc_list_data_dict = dict(struc_list_data) entry_list = [] if struc_list: struc_list_pks_grouped = grouper(query_group_size, [_[0] for _ in struc_list_data]) for struc_list_pks_part in struc_list_pks_grouped: to_print = [] # get attributes needed for formula from another query attr_query = Q(key__startswith='kinds') | Q(key__startswith='sites') # key__endswith='kind_name') attrs = models.DbAttribute.objects.filter(attr_query, dbnode__in=struc_list_pks_part).values_list( 'dbnode__pk', 'key', 'datatype', 'tval', 'fval', 'ival', 'bval', 'dval') results = defaultdict(dict) for attr in attrs: results[attr[0]][attr[1]] = { "datatype": attr[2], "tval": attr[3], "fval": attr[4], "ival": attr[5], "bval": attr[6], "dval": attr[7]} deser_data = {} for k in results: deser_data[k] = models.deserialize_attributes(results[k], sep=models.DbAttribute._sep) for s_pk in struc_list_pks_part: symbol_dict = {} for k in deser_data[s_pk]['kinds']: symbols = k['symbols'] weights = k['weights'] symbol_dict[k['name']] = get_symbols_string(symbols, weights) try: symbol_list = [] for s in deser_data[s_pk]['sites']: symbol_list.append(symbol_dict[s['kind_name']]) formula = get_formula(symbol_list, mode=args.formulamode) # If for some reason there is no kind with the name # referenced by the site except KeyError: formula = "<<UNKNOWN>>" entry_list.append([str(s_pk), str(formula), struc_list_data_dict[s_pk]]) return entry_list def append_list_cmdline_arguments(self, parser): parser.add_argument('-e', '--element', nargs='+', type=str, default=None, help="Print all structures containing desired elements") parser.add_argument('-eo', '--elementonly', action='store_true', help="If set, structures do not contain different " "elements (to be used with -e option)") parser.add_argument('-f', '--formulamode', metavar='FORMULA_MODE', type=str, default='hill', help="Formula printing mode (hill, hill_compact," " reduce, group, count, or count_compact)" " (if None, does not print the formula)", action='store') parser.add_argument('-p', '--past-days', metavar='N', help="Add a filter to show only structures created in the past N days", type=int, action='store') parser.add_argument('-g', '--group-name', metavar='N', nargs="+", default=None, help="add a filter to show only objects belonging to groups", type=str, action='store') parser.add_argument('-G', '--group-pk', metavar='N', nargs="+", default=None, help="add a filter to show only objects belonging to groups", type=int, action='store') parser.add_argument('-A', '--all-users', action='store_true', default=False, help="show groups for all users, rather than only for the" "current user") def get_column_names(self): return ["ID", "formula", "label"] def _show_xcrysden(self, exec_name, structure_list): """ Plugin for xcrysden """ import tempfile, subprocess if len(structure_list) > 1: raise MultipleObjectsError("Visualization of multiple objects " "is not implemented") structure = structure_list[0] with tempfile.NamedTemporaryFile(suffix='.xsf') as f: f.write(structure._exportstring('xsf')) f.flush() try: subprocess.check_output([exec_name, '--xsf', f.name]) except subprocess.CalledProcessError: # The program died: just print a message print "Note: the call to {} ended with an error.".format( exec_name) except OSError as e: if e.errno == 2: print ("No executable '{}' found. Add to the path, " "or try with an absolute path.".format( exec_name)) sys.exit(1) else: raise def _show_ase(self,exec_name,structure_list): """ Plugin to show the structure with the ASE visualizer """ try: from ase.visualize import view for structure in structure_list: view(structure.get_ase()) except ImportError: raise def _show_vmd(self, exec_name, structure_list): """ Plugin for vmd """ import tempfile, subprocess if len(structure_list) > 1: raise MultipleObjectsError("Visualization of multiple objects " "is not implemented") structure = structure_list[0] with tempfile.NamedTemporaryFile(suffix='.xsf') as f: f.write(structure._exportstring('xsf')) f.flush() try: subprocess.check_output([exec_name, f.name]) except subprocess.CalledProcessError: # The program died: just print a message print "Note: the call to {} ended with an error.".format( exec_name) except OSError as e: if e.errno == 2: print ("No executable '{}' found. Add to the path, " "or try with an absolute path.".format( exec_name)) sys.exit(1) else: raise def _show_jmol(self, exec_name, structure_list): """ Plugin for jmol """ import tempfile, subprocess with tempfile.NamedTemporaryFile() as f: for structure in structure_list: f.write(structure._exportstring('cif')) f.flush() try: subprocess.check_output([exec_name, f.name]) except subprocess.CalledProcessError: # The program died: just print a message print "Note: the call to {} ended with an error.".format( exec_name) except OSError as e: if e.errno == 2: print ("No executable '{}' found. Add to the path, " "or try with an absolute path.".format( exec_name)) sys.exit(1) else: raise def _export_tcod(self, node, parameter_data=None, **kwargs): """ Plugin for TCOD """ parameters = None if parameter_data is not None: from aiida.orm import DataFactory ParameterData = DataFactory('parameter') parameters = load_node(parameter_data, parent_class=ParameterData) print node._exportstring('tcod',parameters=parameters,**kwargs) def _export_tcod_parameters(self, parser, **kwargs): """ Command line parameters for TCOD """ from aiida.tools.dbexporters.tcod import extend_with_cmdline_parameters extend_with_cmdline_parameters(parser,self.dataclass.__name__) def _export_xsf(self, node, **kwargs): """ Exporter to XSF. """ print node._exportstring('xsf') def _export_cif(self, node, **kwargs): """ Exporter to CIF. """ print node._exportstring('cif') def _export_xyz(self, node): """ Exporter to XYZ. """ print node._exportstring('xyz') def _import_xyz_parameters(self, parser): """ Adding some functionality to the parser to deal with importing files """ # In order to deal with structures that do not have a cell defined: # We can increase the size of the cell from the minimal cell # The minimal cell is the cell the just accomodates the structure given, # defined by the minimum and maximum of position in each dimension parser.add_argument('--vacuum-factor', type=float, default=1.0, help = 'The factor by which the cell accomodating the structure should be increased, default: 1.0') #To that increased cell, we can also add a "safety margin" parser.add_argument('--vacuum-addition', type=float, default=10.0, help = 'The distance to add to the unit cell after vacuum-factor was applied to expand in each dimension, default: 10.0') parser.add_argument('--pbc', type=int, nargs = 3, default= [0,0,0], help = """ Set periodic boundary conditions for each lattice direction, 0 for no periodicity, any other integer for periodicity""") parser.add_argument('--view', action='store_true', default = False, help= 'View resulting structure using ASE') parser.add_argument('--dont-store', action='store_true', default = False, help= 'Do not store the structure') def _import_xyz(self, filename, **kwargs): """ Imports an XYZ-file. """ from os.path import abspath vacuum_addition = kwargs.pop('vacuum_addition') vacuum_factor = kwargs.pop('vacuum_factor') pbc = [bool(i) for i in kwargs.pop('pbc')] dont_store = kwargs.pop('dont_store') view_in_ase = kwargs.pop('view') print 'importing XYZ-structure from: \n {}'.format(abspath(filename)) filepath = abspath(filename) with open(filepath) as f: xyz_txt = f.read() new_structure = self.dataclass() try: new_structure._parse_xyz(xyz_txt) new_structure._adjust_default_cell( vacuum_addition = vacuum_addition, vacuum_factor = vacuum_factor, pbc = pbc) if not dont_store: new_structure.store() if view_in_ase: from ase.visualize import view view(new_structure.get_ase()) print ( ' Succesfully imported structure {}, ' '(PK = {})'.format(new_structure.get_formula(), new_structure.pk) ) except ValueError as e: print e def _deposit_tcod(self, node, parameter_data=None, **kwargs): """ Deposition plugin for TCOD. """ from aiida.tools.dbexporters.tcod import deposit parameters = None if parameter_data is not None: from aiida.orm import DataFactory ParameterData = DataFactory('parameter') parameters = load_node(parameter_data, parent_class=ParameterData) return deposit(node,parameters=parameters,**kwargs) def _deposit_tcod_parameters(self,parser,**kwargs): """ Command line parameters deposition plugin for TCOD. """ from aiida.tools.dbexporters.tcod import (deposition_cmdline_parameters, extend_with_cmdline_parameters) deposition_cmdline_parameters(parser,self.dataclass.__name__) extend_with_cmdline_parameters(parser,self.dataclass.__name__) class _Cif(VerdiCommandWithSubcommands, Listable, Visualizable, Exportable, Importable, Depositable): """ Visualize CIF structures """ def __init__(self): """ A dictionary with valid commands and functions to be called. """ from aiida.orm.data.cif import CifData self.dataclass = CifData self.valid_subcommands = { 'show': (self.show, self.complete_none), 'list': (self.list, self.complete_none), 'export': (self.export, self.complete_none), 'import': (self.importfile, self.complete_none), 'deposit': (self.deposit, self.complete_none), } def _show_jmol(self, exec_name, structure_list): """ Plugin for jmol """ import tempfile, subprocess with tempfile.NamedTemporaryFile() as f: for structure in structure_list: f.write(structure._exportstring('cif')) f.flush() try: subprocess.check_output([exec_name, f.name]) except subprocess.CalledProcessError: # The program died: just print a message print "Note: the call to {} ended with an error.".format( exec_name) except OSError as e: if e.errno == 2: print ("No executable '{}' found. Add to the path, " "or try with an absolute path.".format( exec_name)) sys.exit(1) else: raise def query(self, args): """ Perform the query and return information for the list. :param args: a namespace with parsed command line parameters. :return: table (list of lists) with information, describing nodes. Each row describes a single hit. """ load_dbenv() from aiida.orm import DataFactory from django.db.models import Q from aiida.djsite.utils import get_automatic_user q_object = None if args.all_users is False: q_object = Q(user=get_automatic_user()) else: q_object = Q() self.query_past_days(q_object, args) self.query_group(q_object, args) object_list = self.dataclass.query(q_object).distinct().order_by('ctime') entry_list = [] for obj in object_list: formulae = '?' try: formulae = ",".join(obj.get_attr('formulae')) except AttributeError: pass except TypeError: pass source_uri = '?' try: source_uri = obj.get_attr('source')['uri'] except AttributeError: pass except KeyError: pass entry_list.append([str(obj.pk), formulae, source_uri]) return entry_list def get_column_names(self): """ Return the list with column names. :note: neither the number nor correspondence of column names and actual columns in the output from the query() are checked. """ return ["ID", "formulae", "source_uri"] def _export_cif(self, node, **kwargs): """ Exporter to CIF. """ print node._exportstring('cif') def _export_tcod(self, node, parameter_data=None, **kwargs): """ Plugin for TCOD """ parameters = None if parameter_data is not None: from aiida.orm import DataFactory ParameterData = DataFactory('parameter') parameters = load_node(parameter_data, parent_class=ParameterData) print node._exportstring('tcod',parameters=parameters,**kwargs) def _export_tcod_parameters(self,parser,**kwargs): """ Command line parameters for TCOD """ from aiida.tools.dbexporters.tcod import extend_with_cmdline_parameters extend_with_cmdline_parameters(parser,self.dataclass.__name__) def _import_cif(self, filename, **kwargs): """ Importer from CIF. """ import os.path try: node, _ = self.dataclass.get_or_create(os.path.abspath(filename)) print node except ValueError as e: print e def _deposit_tcod(self, node, parameter_data=None, **kwargs): """ Deposition plugin for TCOD. """ from aiida.tools.dbexporters.tcod import deposit parameters = None if parameter_data is not None: from aiida.orm import DataFactory ParameterData = DataFactory('parameter') parameters = load_node(parameter_data, parent_class=ParameterData) return deposit(node,parameters=parameters,**kwargs) def _deposit_tcod_parameters(self, parser, **kwargs): """ Command line parameters deposition plugin for TCOD. """ from aiida.tools.dbexporters.tcod import (deposition_cmdline_parameters, extend_with_cmdline_parameters) deposition_cmdline_parameters(parser,self.dataclass.__name__) extend_with_cmdline_parameters(parser,self.dataclass.__name__) class _Trajectory(VerdiCommandWithSubcommands, Listable, Visualizable, Exportable, Depositable): """ View and manipulate TrajectoryData instances. """ def __init__(self): """ A dictionary with valid commands and functions to be called. """ from aiida.orm.data.array.trajectory import TrajectoryData self.dataclass = TrajectoryData self.valid_subcommands = { 'show': (self.show, self.complete_none), 'list': (self.list, self.complete_none), 'export': (self.export, self.complete_none), 'deposit': (self.deposit, self.complete_none), } def _show_jmol(self, exec_name, trajectory_list, **kwargs): """ Plugin for jmol """ import tempfile, subprocess with tempfile.NamedTemporaryFile() as f: for trajectory in trajectory_list: f.write(trajectory._exportstring('cif', **kwargs)) f.flush() try: subprocess.check_output([exec_name, f.name]) except subprocess.CalledProcessError: # The program died: just print a message print "Note: the call to {} ended with an error.".format( exec_name) except OSError as e: if e.errno == 2: print ("No executable '{}' found. Add to the path, " "or try with an absolute path.".format( exec_name)) sys.exit(1) else: raise def _show_jmol_parameters(self, parser): """ Describe command line parameters. """ parser.add_argument('--step', help="ID of the trajectory step. If none is " "supplied, all steps are exported.", type=int, action='store') def _show_xcrysden(self, exec_name, trajectory_list, **kwargs): """ Plugin for xcrysden """ import tempfile, subprocess if len(trajectory_list) > 1: raise MultipleObjectsError("Visualization of multiple trajectories " "is not implemented") trajectory = trajectory_list[0] with tempfile.NamedTemporaryFile(suffix='.xsf') as f: f.write(trajectory._exportstring('xsf', **kwargs)) f.flush() try: subprocess.check_output([exec_name, '--xsf',f.name]) except subprocess.CalledProcessError: # The program died: just print a message print "Note: the call to {} ended with an error.".format( exec_name) except OSError as e: if e.errno == 2: print ("No executable '{}' found. Add to the path, " "or try with an absolute path.".format( exec_name)) sys.exit(1) else: raise def _export_xsf(self, node, **kwargs): """ Exporter to XSF. """ print node._exportstring('xsf', **kwargs) def _export_tcod(self, node, parameter_data=None, **kwargs): """ Plugin for TCOD """ parameters = None if parameter_data is not None: from aiida.orm import DataFactory ParameterData = DataFactory('parameter') parameters = load_node(parameter_data, parent_class=ParameterData) print node._exportstring('tcod', parameters=parameters, **kwargs) def _export_tcod_parameters(self, parser, **kwargs): """ Command line parameters for TCOD """ from aiida.tools.dbexporters.tcod import extend_with_cmdline_parameters extend_with_cmdline_parameters(parser,self.dataclass.__name__) def _export_cif(self, node, **kwargs): """ Exporter to CIF. """ print node._exportstring('cif', **kwargs) def _export_cif_parameters(self, parser, **kwargs): """ Describe command line parameters. """ parser.add_argument('--step', dest='trajectory_index', help="ID of the trajectory step. If none is " "supplied, all steps are exported.", type=int, action='store') def _deposit_tcod(self, node, parameter_data=None, **kwargs): """ Deposition plugin for TCOD. """ from aiida.tools.dbexporters.tcod import deposit parameters = None if parameter_data is not None: from aiida.orm import DataFactory ParameterData = DataFactory('parameter') parameters = load_node(parameter_data, parent_class=ParameterData) return deposit(node,parameters=parameters,**kwargs) def _deposit_tcod_parameters(self, parser, **kwargs): """ Command line parameters deposition plugin for TCOD. """ from aiida.tools.dbexporters.tcod import (deposition_cmdline_parameters, extend_with_cmdline_parameters) deposition_cmdline_parameters(parser,self.dataclass.__name__) extend_with_cmdline_parameters(parser,self.dataclass.__name__) self._export_cif_parameters(parser) class _Parameter(VerdiCommandWithSubcommands, Visualizable): """ View and manipulate Parameter data classes. """ def __init__(self): """ A dictionary with valid commands and functions to be called. """ from aiida.orm.data.parameter import ParameterData self.dataclass = ParameterData self._default_show_format = 'json_date' self.valid_subcommands = { 'show': (self.show, self.complete_none), } def _show_json_date(self, exec_name, node_list): """ Show contents of ParameterData nodes. """ from aiida.cmdline import print_dictionary for node in node_list: the_dict = node.get_dict() print_dictionary(the_dict, 'json+date') class _Array(VerdiCommandWithSubcommands, Visualizable): """ View and manipulate Array data classes. """ def __init__(self): """ A dictionary with valid commands and functions to be called. """ from aiida.orm.data.array import ArrayData self.dataclass = ArrayData self._default_show_format = 'json_date' self.valid_subcommands = { 'show': (self.show, self.complete_none), } def _show_json_date(self, exec_name, node_list): """ Show contents of ArrayData nodes. """ from aiida.cmdline import print_dictionary for node in node_list: the_dict = {} for arrayname in node.arraynames(): the_dict[arrayname] = node.get_array(arrayname).tolist() print_dictionary(the_dict, 'json+date')