Source code for aiida_vasp.parsers.vasp

"""
Parser module for composing output of a VASP calculation.

The simplified parser outputs the following nodes:

1. A `misc` node that stores simple summary information such as total energies,
    total run times, any warnings issues during the calculation and if the calculation
    was finished.
2. A `arrays` node that for storing miscellaneous quantities that are arrays by nature and typically
    have a large size.
3. A `trajectory` node for storing the trajectory of geometry optimisation and AIMD.
4. A `bands` node for storing the band structure.
5. A `dos` node for storing the density of states.
6. Other nodes for storing other relevant quantities such as the born effective charges.


Main difference from the previous version

1. `pydantic` is used to validate the parser settings *at submission* time.
2. When parsing retrieved data, we take a 'parse as much as possible' approach -
   the quantities are always parsed if available, and only excluded during the stage of composing the output nodes.
   This is simpler from the previous 'parse only when needed' approach, where multiple checks have be done to work
   out which parser to call and which quantities to include.
3. All parser logic is contained in a single class and can be extended by updating content parsers and modify the
   default settings and update/add the `_compose_xx` methods.
"""

from typing import Any, Dict, List

import numpy as np
from aiida import orm
from aiida.engine import ExitCode
from aiida.parsers.parser import Parser
from pydantic import Field

from aiida_vasp.data.chargedensity import ChargedensityData
from aiida_vasp.data.wavefun import WavefunData
from aiida_vasp.parsers.content_parsers import *
from aiida_vasp.utils.opthold import OptionContainer


[docs] class ParserError(RuntimeError): pass
[docs] class QuantityMissingError(ParserError): pass
[docs] class RequiredQuantityMissingError(ParserError): pass
[docs] class MissingFileError(ParserError): pass
DEFAULT_EXCLUDED_QUANTITIES = ( 'elastic_moduli', 'symmetries', 'parameters', # The parameters used for the calculation ) DEFAULT_EXCLUDED_NODE = ('bands', 'dos', 'kpoints', 'trajectory', 'energies', 'wavecar', 'chgcar', 'projectors') DEFAULT_REQUIRED_QUANTITIES = ('run_status', 'run_stats') DEFAULT_FILE_MAPPING = { 'vasprun.xml': 'vasprun.xml', 'vasp_output': 'vasp_output', 'OUTCAR': 'OUTCAR', 'CONTCAR': 'CONTCAR', 'CHGCAR': 'CHGCAR', 'IBZKPT': 'IBZKPT', } MISC_QUANTITIES = ( 'total_energies', 'notifications', 'run_status', 'run_stats', 'version', 'forces', 'stress', 'site_magnetization', 'band_properties', 'elastic_moduli', 'symmetries', 'fermi_level', 'band_properties', 'magnetization', 'parameters', ) ALLOW_EMPTY = ('notifications',) # Quantities that should be stored inside separate nodes STANDALONE_ARRAY_QUANTITIES = { 'born_charges': 'vasprun.xml', 'dielectrics': 'vasprun.xml', 'dynmat': 'vasprun.xml', 'hessian': 'vasprun.xml', 'projectors': 'vasprun.xml', 'energies': 'vasprun.xml', }
[docs] class ParserSettingsConfig(OptionContainer): """ Settings for the VASP parser. """ include_quantity: List[str] = Field(description='Properties to include', default_factory=lambda: []) exclude_quantity: List[str] = Field(description='Quantities to be excluded', default_factory=lambda: []) required_quantity: List[str] = Field( description='Quantities that most be present', default_factory=lambda: list(DEFAULT_REQUIRED_QUANTITIES) ) include_node: List[str] = Field(description='Output node to include', default_factory=lambda: []) exclude_node: List[str] = Field(description='Output node to exclude', default_factory=lambda: []) file_mapping: Dict[str, str] = Field( description='Mapping of file names to quantities', default_factory=lambda: dict(DEFAULT_FILE_MAPPING) ) kpoints_from_ibzkpt: bool = False check_completeness: bool = True electronic_step_energies: bool = False energy_type: List[str] = Field( description='Energy types to include', default_factory=lambda: ['energy_extrapolated'] ) keep_stream_history: bool = Field( description='Whether to keep the history of all notifications in the parsed stream (stdout)', default=False ) ignore_notification_errors: bool = Field( description='Whether to ignore errors in the notifications parsed from vasp_output', default=False ) critical_notification_errors: List[str] = Field( description='Critical stream errors to raise', default_factory=lambda: [ 'brmix', 'edwave', 'brmix', 'cnormn', 'denmp', 'dentet', 'edddav_zhegv', 'eddrmm_zhegv', 'edwav', 'fexcp', 'fock_acc', 'non_collinear', 'not_hermitian', 'pzstein', 'real_optlay', 'rhosyg', 'rspher', 'set_indpw_full', 'sgrcon', 'no_potimm', 'magmom', 'bandocc', ], ) critical_objects: List[str] = Field( description='Critical objects to be present', default_factory=lambda: ['vasprun.xml', 'OUTCAR'] ) check_errors: bool = Field(description='Whether to check for errors in calculation', default=True) check_ionic_convergence: bool = Field( description='Whether to check for convergence during the relaxation based on the INCAR settings', default=True ) omit_structure: bool = Field( description='Whether to omit the structure node from the output if no ionic movement', default=True )
[docs] class VaspParser(Parser): """Class for parsing VASP output files and storing the results in AiiDA.""" def __init__(self, node: orm.CalcJobNode) -> None: """ Initialize the Parser instance """ super(VaspParser, self).__init__(node) # Create the containers self.user_config = None self.quantities_each: Dict[str, Any] = {} self.errored_quantities: Dict[str, Any] = {} self.errored_parsers: Dict[str, Any] = {} self.parser_notifications: Dict[str, List[str]] = {} self.retrieve_object_names: List[str] = [] self.quantities_to_exclude: List[str] = [] self.nodes_to_exclude: List[str] = []
[docs] def _init_user_settings(self) -> ParserSettingsConfig: """Initialize the settings from the inputs.""" if 'settings' in self.node.inputs: user_config: ParserSettingsConfig = ParserSettingsConfig( **self.node.inputs.settings.get_dict().get('parser_settings', {}) ) else: user_config = ParserSettingsConfig() # Initialize the containers self.user_config = user_config return user_config
[docs] def _get_quantities_to_parse(self) -> ExitCode | None: """Return the list of quantities to parse.""" # Apply the modifiers user_config = self.user_config quantities_to_exclude = [key for key in DEFAULT_EXCLUDED_QUANTITIES if key not in user_config.include_quantity] quantities_to_exclude += user_config.exclude_quantity nodes_to_exclude = [key for key in DEFAULT_EXCLUDED_NODE if key not in user_config.include_node] nodes_to_exclude += user_config.exclude_node self.quantities_to_exclude = quantities_to_exclude self.nodes_to_exclude = nodes_to_exclude # Check for critical missing objects self.retrieve_object_names = self.retrieved.list_object_names() missing = False for name, _ in user_config.file_mapping.items(): if name in user_config.critical_objects and name not in self.retrieve_object_names: missing = True if missing is True: return self.exit_codes.ERROR_CRITICAL_MISSING_OBJECT
[docs] def _post_process_quantities(self) -> ExitCode | None: """Post-process the parsed quantities.""" # Warn about errored/missing quantities and parsers if self.errored_quantities: self.logger.warning( f'The following quantities cannot be parsed due to errors: {", ".join(self.errored_quantities)}' ) if self.errored_parsers: self.logger.warning( f'The following parsers cannot be instantiated due to: {", ".join(self.errored_parsers)}' ) # Remove the quantities for name, parsed_quantities in self.quantities_each.items(): for sub_key in list(parsed_quantities.keys()): if sub_key in self.quantities_to_exclude: del parsed_quantities[sub_key] # Check in required quantities are present missing_required = [] for name in self.user_config.required_quantity: exists = False for _, value in self.quantities_each.items(): if value.get(name) is not None: exists = True break if exists is False: missing_required.append(name) if missing_required: return self.exit_codes.ERROR_NOT_ABLE_TO_PARSE_QUANTITY.format(quantity=','.join(missing_required))
[docs] def parse(self, **kwargs: Any) -> ExitCode | None: """ Parse outputs, store results in database. """ user_config = self._init_user_settings() exit_code = self._get_quantities_to_parse() if exit_code is not None: return exit_code # Parse the files def parse_and_add( name: str, parser_cls: Any, required: bool = True, open_mode: str = 'r', content_parser_settings: dict | None = None, ) -> None: """Parse the target file and add the result to the quantities_each dictionary""" resolved_name = user_config.file_mapping[name] if resolved_name in self.retrieve_object_names: with self.retrieved.open(resolved_name, open_mode) as handler: try: parser: BaseFileParser = parser_cls(handler=handler, settings=content_parser_settings) except Exception as error: self.errored_parsers[name] = error return if parser.parser_notifications: self.parser_notifications.update(parser.parser_notifications) self.quantities_each[name], errored = parser.get_all_quantities() self.errored_quantities.update(errored) elif user_config.check_completeness is True and required is True: raise MissingFileError(f'{resolved_name} is missing in the retrieved folder.') parse_and_add( 'vasprun.xml', VasprunParser, required=True, open_mode='rb', content_parser_settings={ 'electronic_step_energies': user_config.electronic_step_energies, 'energy_type': user_config.energy_type, 'stream_history': user_config.keep_stream_history, }, ) parse_and_add('OUTCAR', OutcarParser, required=True) parse_and_add('vasp_output', StreamParser, required=True) parse_and_add('CONTCAR', PoscarParser, required=True) if user_config.kpoints_from_ibzkpt: parse_and_add('IBZKPT', KpointsParser, required=True) exit_code = self._post_process_quantities() if exit_code is not None: return exit_code return self._create_outputs()
[docs] def _create_outputs(self) -> ExitCode | None: """Create the output nodes""" # Create the outputs self._failed_to_compose = {} # Call the _compose_xx methods to create the output nodes for method_name in [item for item in self.__dir__() if item.startswith('_compose_')]: name = method_name.replace('_compose_', '') if name in self.nodes_to_exclude: continue node_or_dict = None try: node_or_dict = getattr(self, '_compose_' + name)(self.quantities_each) except (QuantityMissingError, KeyError, ValueError, TypeError, AttributeError) as error: self._failed_to_compose[name] = error self.logger.warning(f'Failed to compose {name} node: {error}') continue if isinstance(node_or_dict, orm.Data): self.out(name, node_or_dict) elif isinstance(node_or_dict, dict): for key, value in node_or_dict.items(): self.out(key, value) if ( any(name in self.user_config.include_node for name in self._failed_to_compose) and self.user_config.check_completeness is True ): return self.exit_codes.ERROR_NOT_ABLE_TO_CREATE_NODE.format(nodes=', '.join(self._failed_to_compose.keys())) # Check for errors if self.user_config.check_errors is True: error = self._check_vasp_errors(self.parser_notifications) return error
[docs] def _compose_misc(self, quantities_each: dict[str, Any]) -> orm.Dict: """Compose the `misc` output node""" out_dict = {} gather_quantities(quantities_each, self.user_config.file_mapping['vasprun.xml'], out_dict, MISC_QUANTITIES) gather_quantities(quantities_each, self.user_config.file_mapping['OUTCAR'], out_dict, MISC_QUANTITIES) gather_quantities(quantities_each, self.user_config.file_mapping['vasp_output'], out_dict, MISC_QUANTITIES) # Filter field with all empty container out_dict = {key: value for key, value in out_dict.items() if not is_all_empty(value) or key in ALLOW_EMPTY} return orm.Dict(dict=out_dict)
[docs] def _compose_structure(self, quantities_each: dict[str, Any]) -> orm.StructureData | None: """Compose the `structure` output node""" data = None # Omit output structure if not doing ionic relaxation # Better to inspect the parameters recorded directly inside the vasprun.xml if 'parameters' in self.node.inputs: incar_dict = {key.lower(): value for key, value in self.node.inputs.parameters.get_dict().items()} if ( incar_dict.get('ibrion', -1) < 0 or incar_dict.get('nsw', 0) <= 0 ) and self.user_config.omit_structure is True: self.logger.info('No ionic movement detected, omitting the structure output node.') return None if 'vasprun.xml' in quantities_each: data = quantities_each['vasprun.xml'].get('structure') if data is None: data = quantities_each.get('CONTCAR', {}).get('structure') if data is None: raise QuantityMissingError() return get_structure_node(data)
[docs] def _compose_wavecar(self, quantities_each: dict[str, Any]) -> None: """Compose the `wavecar` output node""" # Check if WAVECAR is present in the retrieved folder if 'WAVECAR' in self.retrieve_object_names: with self.retrieved.base.repository.open('WAVECAR', 'rb') as handler: self.outputs['wavecar'] = WavefunData(file=handler, filename='WAVECAR') else: self.logger.warning('WAVECAR is not present in the retrieved folder.')
[docs] def _compose_chgcar(self, quantities_each: dict[str, Any]) -> None: """Compose the `chgcar` output node""" # Check if WAVECAR is present in the retrieved folder if 'CHGCAR' in self.retrieve_object_names: with self.retrieved.base.repository.open('CHGCAR', 'rb') as handler: self.outputs['chgcar'] = ChargedensityData(file=handler, filename='CHGCAR') else: self.logger.warning('CHGCAR is not present in the retrieved folder.')
[docs] def _compose_arrays(self, quantities_each: dict[str, Any]) -> dict[str, orm.ArrayData]: """Generate the generic `arrays` output node""" out_arrays = {} # Compose the standalone arrays - each corresponds to a single quantity for name, file_name in STANDALONE_ARRAY_QUANTITIES.items(): if name in self.nodes_to_exclude: continue array_node = self._make_standalone_array(quantities_each, name, file_name) if array_node is not None: out_arrays[name] = array_node return out_arrays
[docs] def _make_standalone_array( self, quantities_each: dict[str, Any], name: str, file_name: str = 'vasprun.xml' ) -> orm.ArrayData | None: """Compose the `dielectrics` output node""" # The output can be an array or a dictionary of arrays - both cases should be handled arrays_or_dict = quantities_each.get(file_name, {}).get(name) # Avoid creating empty arrays if isinstance(arrays_or_dict, dict) and len(arrays_or_dict) > 0: arrays_or_dict = {key: value for key, value in arrays_or_dict.items() if value is not None} return orm.ArrayData(arrays_or_dict) elif isinstance(arrays_or_dict, (np.ndarray, list)) and len(arrays_or_dict) > 0: return orm.ArrayData({name: arrays_or_dict}) return None
[docs] def _compose_kpoints(self, quantities_each: dict[str, Any]) -> orm.KpointsData: """Compose the `kpoints` output node""" kpoints_data = None if self.user_config.kpoints_from_ibzkpt is True: kpoints_data = quantities_each['IBZKPT']['kpoints'] elif 'vasprun.xml' in quantities_each: kpoints_data = quantities_each['vasprun.xml'].get('kpoints') if kpoints_data is not None: return get_kpoints_node(kpoints_data, quantities_each['vasprun.xml']['structure']['unitcell']) raise QuantityMissingError('No valid kpoints data to use')
[docs] def _compose_trajectory(self, quantities_each: dict[str, Any]) -> orm.TrajectoryData | None: """Compose the `trajectory` output""" if 'vasprun.xml' in quantities_each: node = orm.TrajectoryData() traj_data = quantities_each['vasprun.xml'].get('trajectory') # No need to carry on if there are no trajectory data if traj_data is None or len(traj_data) == 0: return None for key, value in traj_data.items(): if key == 'symbols': node.base.attributes.set(key, list(value)) elif value.dtype.hasobject: self.logger.warning(f'Cannot set array {key}: {value} in TrajectoryData as it is not numerical.') else: node.set_array(key, value) for key, value in quantities_each['vasprun.xml']['energies'].items(): node.set_array(key, value) return node return None
[docs] def _compose_bands(self, quantities_each: dict[str, Any]) -> orm.BandsData: """Compose the `band` node""" if 'vasprun.xml' in quantities_each: deigen = quantities_each['vasprun.xml']['eigenvalues'] docc = quantities_each['vasprun.xml']['occupancies'] if 'total' in deigen: eigenvalues = np.array(deigen['total']) occupancies = np.array(docc['total']) else: eigenvalues = np.array([deigen['up'], deigen['down']]) occupancies = np.array([docc['up'], docc['down']]) node = orm.BandsData() kpoints = self._compose_kpoints(quantities_each) node.set_kpointsdata(kpoints) node.set_bands(eigenvalues, occupations=occupancies) # Record the Fermi level if available node.base.attributes.set('fermi_level', quantities_each['vasprun.xml'].get('fermi_level')) node.base.attributes.set('efermi', quantities_each['vasprun.xml'].get('fermi_level')) node.set_cell(quantities_each['vasprun.xml']['structure']['unitcell']) return node
[docs] def _compose_dos(self, quantities_each: dict[str, Any]) -> orm.ArrayData | None: """Compose the `dos` node""" arrays_dict = {} if 'vasprun.xml' in quantities_each: gather_quantities(quantities_each, 'dos', arrays_dict, ['dos'], flatten_dict=True) if arrays_dict: node = orm.ArrayData(arrays_dict['dos']) return node
[docs] def _check_vasp_errors(self, parser_notifications: dict[str, Any]) -> ExitCode | None: """ Detect simple vasp execution problems and returns the exit_codes to be set """ quantities = {} for key, value in self.quantities_each.items(): for key_, value_ in value.items(): quantities[key_] = value_ if 'run_status' not in quantities: return self.exit_codes.ERROR_DIAGNOSIS_OUTPUTS_MISSING run_status = quantities['run_status'] try: # We have an overflow in the XML file which is critical, but not reported by VASP in # the standard output, so checking this here. if parser_notifications.get('vasprun_xml_overflow'): return self.exit_codes.ERROR_OVERFLOW_IN_XML except AttributeError: pass # Return errors related to execution and convergence problems. # Note that the order is important here - if a calculation is not finished, we cannot # comment on wether properties are converged are not. if run_status['finished'] is False: return self.exit_codes.ERROR_DID_NOT_FINISH if run_status['electronic_converged'] is False: return self.exit_codes.ERROR_ELECTRONIC_NOT_CONVERGED # Check the ionic convergence issues if run_status['ionic_converged'] is False: if self.user_config.check_ionic_convergence is True: return self.exit_codes.ERROR_IONIC_NOT_CONVERGED self.logger.warning('The ionic relaxation is not converged, but the calculation is treated as successful.') # Check for the existence of critical warnings if 'notifications' in quantities: notifications = quantities['notifications'] ignore_all = self.user_config.ignore_notification_errors if not ignore_all: composer = NotificationComposer( notifications, quantities['run_status'], self.node.inputs, self.exit_codes, critical_notifications=self.user_config.critical_notification_errors, ) exit_code = composer.compose() if exit_code is not None: return exit_code else: self.logger.warning('WARNING: missing notification output for VASP warnings and errors.') return None
[docs] def gather_quantities( quantities_each: dict[str, Any], namespace: str, dst: dict[str, Any], fields: list[str], flatten_dict: bool = False ) -> None: """ Gather quantities and put them into the target dictionary """ for key, value in quantities_each.get(namespace, {}).items(): if key in fields: if isinstance(value, dict) and flatten_dict: # flatten the dictionary - prepend the key with the name of the quantity for key2, value2 in value.items(): dst[key + '_' + key2] = value2 else: dst[key] = value
[docs] class NotificationComposer: """Compose errors codes based on the notifications""" def __init__( self, notifications: list[dict[str, Any]], run_status: dict[str, Any], inputs: dict[str, Any], exit_codes: Any, critical_notifications: list[str], ) -> None: """ Composed error codes based on the notifications Some of the errors need to have additional properties inspected before they can be emitted, as they might be trigged in a harmless way. To add new checkers, one needs to implement a property with the name of the error for this class and contains the code for checking. This property should return the exit_code or return None. The property is inspected if its name is in the list critical notifications. """ self.notifications = notifications self.notifications_dict = {item['name']: item['message'] for item in self.notifications} self.run_status = run_status self.inputs = inputs self.exit_codes = exit_codes self.critical_notifications = critical_notifications
[docs] def compose(self) -> ExitCode | None: """ Compose the exit codes Returns None if no exit code should be emitted, otherwise emit the error code. """ for critical in self.critical_notifications: # Check for any special handling if hasattr(self, critical): output = getattr(self, critical) if output: return output # No special handling, just check if it exists elif critical in self.notifications_dict: return self.exit_codes.ERROR_VASP_CRITICAL_ERROR.format(error_message=self.notifications_dict[critical]) return None
@property def brmix(self) -> ExitCode | None: """Check if BRMIX should be emitted""" if 'brmix' not in self.notifications_dict: return None # If NELECT is set explicitly for the calculation then this is not an critical error if 'parameters' in self.inputs and 'nelect' in self.inputs['parameters'].get_dict(): return None return self.exit_codes.ERROR_VASP_CRITICAL_ERROR.format(error_message=self.notifications_dict['brmix']) @property def edddav_zhegv(self) -> ExitCode | None: """Check if EDDDAV call to ZHEGV should be emitted. Sometimes it has converged.""" if 'edddav_zhegv' not in self.notifications_dict: return None if self.run_status['electronic_converged']: return None return self.exit_codes.ERROR_VASP_CRITICAL_ERROR.format(error_message=self.notifications_dict['edddav_zhegv']) @property def eddrmm_zhegv(self) -> ExitCode | None: """Check if EDDRMM call to ZHEGV should be emitted. Sometimes it has converged.""" if 'eddrmm_zhegv' not in self.notifications_dict: return None if self.run_status['electronic_converged']: return None return self.exit_codes.ERROR_VASP_CRITICAL_ERROR.format(error_message=self.notifications_dict['eddrmm_zhegv'])
[docs] def get_structure_node(structure_dict: dict[str, Any]) -> orm.StructureData: """Compose a structure node from the dictionary output by the parser""" node = orm.StructureData() node.set_cell(structure_dict['unitcell']) for site in structure_dict['sites']: node.append_atom(position=site['position'], symbols=site['symbol'], name=site['kind_name']) return node
[docs] def is_all_empty(obj: dict | list) -> bool: """Check if all elements of a dictionary or list are empty""" if isinstance(obj, dict): if len(obj) == 0: return True else: return all(is_all_empty(value) for value in obj.values()) elif isinstance(obj, list): if len(obj) == 0: return True else: return all(is_all_empty(value) for value in obj) else: return False
[docs] def get_kpoints_node(kpoints_data: dict[str, Any], cell: list[list] | np.ndarray): """Get a KpointData node from parsed kpoints data and cell matrix""" node = orm.KpointsData() if kpoints_data['mode'] == 'explicit': node.set_kpoints(kpoints_data['points'], weights=kpoints_data['weights'], cartesian=kpoints_data['cartesian']) elif kpoints_data['mode'] == 'automatic': node.set_kpoints_mesh(kpoints_data['divisions'], offset=kpoints_data['shifts']) else: raise ValueError(f'Unknown kpoints mode {kpoints_data["mode"]}') # Record the cell for which the kpoints are defined for node.set_cell(cell) return node