Source code for aiida_vasp.workchains.v2.relax

"""
Relaxation workchain for VASP

Unfortunately, VASP does not check the convergence criteria properly:

- it only check *either* force or energy convergence between the last two iterations
- relaxation is performed with *constant basis set*, so a final singlepoint calculation is necessary if cell is to be
  relaxed
- no check for the convergence of the atomic positions
- no check about the convergence of the cell volume

Hence we have to control it externally to make sure the structures are properly relaxed,
and perform additional singlepoint calculation where necessary.

In addition, using this workchain, there is no need to set the IBRION, NSW, ISIF and ALGO tags explicitly,
the action can be controlled using the `relax_setting` input port. This will be merged with the `parameters` and
passed downstream to the `VaspBaseWorkChain` which will work out the correct combinations for the three.

CHANGELOG

0.2.1 - added `hybrid_calc_bootstrap` options
0.3.0 - make singpoint calculation reuse the `restart_folder`
0.3.1 - called processes now have link labels. Added customisable `settings` and `options` for the final relaxation.


"""

# pylint: disable=attribute-defined-outside-init

from __future__ import annotations

from copy import deepcopy

import numpy as np
from aiida import orm
from aiida.common.exceptions import InputValidationError
from aiida.common.extendeddicts import AttributeDict
from aiida.common.utils import classproperty
from aiida.engine import ExitCode, ProcessSpec, ToContext, WorkChain, append_, if_, while_
from aiida.orm.nodes.data.base import to_aiida_type
from aiida.plugins import WorkflowFactory

from aiida_vasp.common import OVERRIDE_NAMESPACE, site_magnetization_to_magmom
from aiida_vasp.protocols import ProtocolMixin, recursive_merge
from aiida_vasp.utils.extended_dicts import update_nested_dict, update_nested_dict_node
from aiida_vasp.utils.opthold import RelaxOptions
from aiida_vasp.utils.workchains import compose_exit_code

from .mixins import WithBuilderUpdater

__version__ = '0.5.0'

# Change log
# 0.4.0 update such `vasp` namespace in `parameters` is renamed to `incar`
# 0.5.0 update the logic of convergence checking. Cell comparsion is always done using the input/output structures.


[docs] class VaspRelaxWorkChain(WorkChain, WithBuilderUpdater, ProtocolMixin): """Structure relaxation workchain.""" _verbose: bool = True _base_workchain_string: str = 'vasp.v2.vasp' _base_workchain = WorkflowFactory(_base_workchain_string) option_class = RelaxOptions _protocol_tag: str = 'relax'
[docs] @classmethod def define(cls, spec: ProcessSpec) -> None: super().define(spec) spec.expose_inputs(cls._base_workchain, 'vasp', exclude=('structure',)) spec.input('structure', valid_type=(orm.StructureData, orm.CifData)) spec.input( 'static_calc_parameters', valid_type=orm.Dict, required=False, serializer=to_aiida_type, help=""" The parameters (INCAR) to be used in the final static calculation. """, ) spec.input( 'static_calc_settings', valid_type=orm.Dict, required=False, serializer=to_aiida_type, help=""" The full settings Dict to be used in the final static calculation. """, ) spec.input( 'static_calc_options', valid_type=orm.Dict, required=False, serializer=to_aiida_type, help=""" The full options Dict to be used in the final static calculation. """, ) spec.input( 'relax_settings', valid_type=orm.Dict, validator=RelaxOptions.aiida_validate, serializer=RelaxOptions.aiida_serialize, help=RelaxOptions.aiida_description(), ) spec.input( 'verbose', required=False, help='Increased verbosity.', valid_type=orm.Bool, serializer=to_aiida_type, ) spec.exit_code(0, 'NO_ERROR', message='the sun is shining') spec.exit_code( 300, 'ERROR_MISSING_REQUIRED_OUTPUT', message='the called workchain does not contain the necessary relaxed output structure', ) spec.exit_code(420, 'ERROR_NO_CALLED_WORKCHAIN', message='no called workchain detected') spec.exit_code( 500, 'ERROR_UNKNOWN', message='unknown error detected in the relax workchain', ) spec.exit_code( 502, 'ERROR_OVERRIDE_PARAMETERS', message='there was an error overriding the parameters', ) spec.exit_code( 600, 'ERROR_RELAX_NOT_CONVERGED', message='Ionic relaxation was not converged after the maximum number of iterations has been spent', ) spec.exit_code( 601, 'ERROR_FINAL_SCF_HAS_RESIDUAL_FORCE', message=( 'The final singlepoint calculation has increased residual forces. This' ' may be caused by electronic solver converging to a different' ' solution. Care should be taken to investigate the results.' ), ) spec.outline( cls.initialize, if_(cls.perform_relaxation)( while_(cls.run_next_relax)( cls.run_relax, cls.verify_last_relax, cls.analyze_convergence, ), cls.store_relaxed, ), if_(cls.should_run_static_calculation)( cls.run_static_calculation, cls.verify_next_workchain, ), cls.results, cls.finalize, ) # yapf: disable spec.expose_outputs(cls._base_workchain) spec.output('relax.structure', valid_type=orm.StructureData, required=False)
[docs] @classmethod def get_builder_from_protocol( cls, code: orm.AbstractCode, structure: orm.StructureData, protocol=None, overrides=None, options=None, relax_settings=None, **kwargs, ): """ Return a builder prepopulated with inputs selected according to the chosen protocol. :param code: the ``Code`` instance configured for the ``abacus.abacus`` plugin. :param structure: the ``StructureData`` instance to use. :param protocol: protocol to use, if not specified, the default will be used. :param overrides: optional dictionary of inputs to override the defaults of the protocol. :param options: A dictionary of options that will be recursively set for the ``metadata.options`` input of all the ``CalcJobs`` that are nested in this work chain. :return: a process builder instance with all inputs defined ready for launch. """ overrides = overrides or {} if relax_settings: overrides['relax_settings'] = recursive_merge(overrides.get('relax_settings', {}), relax_settings) inputs = cls.get_protocol_inputs(protocol, overrides) base_builder = cls._base_workchain.get_builder_from_protocol( code=code, protocol=inputs.get('vasp', {}).get('protocol', protocol), structure=structure, overrides=inputs.get('vasp', {}), options=options, **kwargs, ) # Structure is defined at the top level base_builder.pop('structure') builder = cls.get_builder() builder.vasp = base_builder builder.structure = structure builder.relax_settings = inputs.get('relax_settings', {}) builder.static_calc_settings = inputs.get('static_calc_settings', {}) builder.static_calc_options = inputs.get('static_calc_options', {}) builder.static_calc_parameters = inputs.get('static_calc_parameters', {}) builder.verbose = inputs.get('verbose', orm.Bool(False)) return builder
[docs] def initialize(self) -> None: """Initialize.""" # Initialise the contexts self.ctx.exit_code = self.exit_codes.ERROR_UNKNOWN # pylint: disable=no-member self.ctx.is_converged = False self.ctx.relax = False self.ctx.iteration = 0 self.ctx.workchains = [] self.ctx.inputs = AttributeDict() # This may not be necessary anymore self.ctx.relax_settings = AttributeDict( self.inputs.relax_settings.get_dict() ) # relax_settings controls the logic of the workchain self.ctx.current_magmom = None self.ctx.is_double_relax = self.ctx.relax_settings.get('double_relax_mode', False) self.ctx.do_residual_forces_check = self.ctx.relax_settings.get('residual_forces_check', True) # Check potential issues in the the input parameters self._check_input_parameters() # Storage space for the inputs self.ctx.relax_input_additions = self._init_relax_input_additions() self.ctx.static_input_additions = AttributeDict() # Set the verbose flag if 'verbose' in self.inputs: self.ctx.verbose = self.inputs.verbose.value else: self.ctx.verbose = self._verbose # Hard-coded default # Take the input structure as the "current" structure self.ctx.current_structure = self.inputs.structure # Make sure we parse the output structure when we want to perform # relaxations (override if contrary entry exists). settings = self.inputs.vasp.get('settings', {}) or {} if isinstance(settings, orm.Data): settings = settings.get_dict() if self.perform_relaxation(): settings = update_nested_dict( settings, {'parser_settings': {'include_node': ['structure', 'trajectory'], 'include_quantity': ['energies']}}, extend_list=True, ) # Update the settings for the relaxation self.ctx.relax_input_additions.settings = settings # Hybrid calculation boot-strapping if self.ctx.relax_settings.get('hybrid_calc_bootstrap'): self.ctx.hybrid_status = 'dft' if not self.ctx.relax_settings.get('reuse'): self.report('Enable `reuse` mode because hybrid calculation bootstrapping has been requested.') self.ctx.relax_settings['reuse'] = True else: self.ctx.hybrid_status = None # If we are reusing existing calculations, make sure the remote folders are not cleaned if self.ctx.relax_settings.get('reuse'): clean_tmp = self.inputs.vasp.get('clean_workdir') if clean_tmp and clean_tmp.value: self.report('Disable clean_workdir for downstream workflows since `reuse` is requested.') self.ctx.relax_input_additions['clean_workdir'] = orm.Bool(False) # Check the input parameters self._check_input_parameters()
[docs] def _check_input_parameters(self) -> None: """Validate the input parameters and detect problems before running the workchain""" exposed = self.exposed_inputs(self._base_workchain, 'vasp') incar = exposed.parameters['incar'] exp_key = ['ibrion', 'nsw', 'isif'] for key in exp_key: if key in incar: self.report( f'{key} explicitly set to {incar[key]} - this overrides the relax_settings input' ' - proceed with caution.' ) isif = incar.get('isif') if isif == 3 and not all(self.ctx.relax_settings.get(key) for key in ['positions', 'volume', 'shape']): raise InputValidationError( 'ISIF = 3 is set explicity for INCAR, which is consistent with the mode of relaxation ' 'supplied to the workchain.' )
[docs] def _init_relax_input_additions(self) -> AttributeDict: """ Initialise the `relax_additions` field inside the context. It is a AttributeDict that contains the inputs that should be updated while performing the relaxation. """ additions = AttributeDict() # Update the "relax" field inside the parameters - this is needed because some of the # settings will be translated into VASP parameters if self.perform_relaxation(): parameters = update_nested_dict_node(self.inputs.vasp.parameters, {'relax': self.ctx.relax_settings}) additions.parameters = parameters return additions
[docs] def run_next_relax(self) -> bool: within_max_iterations = bool(self.ctx.iteration < self.ctx.relax_settings.convergence_max_iterations) return bool(within_max_iterations and not self.ctx.is_converged)
[docs] def run_relax(self) -> ToContext: """Perform the relaxation""" # Iteration starts from 1, not 0 self.ctx.iteration += 1 inputs = self.exposed_inputs(self._base_workchain, 'vasp') inputs.structure = self.ctx.current_structure # Attach previous calculation's folder if requested if self.ctx.relax_settings.get('reuse', False): restart_folder = self.ctx.get('current_restart_folder') # There might not be any yet if restart_folder: if self.ctx.get('verbose'): self.report(f'Using previous remote folder <{restart_folder}> for restart') inputs.restart_folder = restart_folder # Update the input with whatever stored in the relax_input_additions attribute dict inputs.update(self.ctx.relax_input_additions) if 'label' not in inputs.metadata: inputs.metadata.label = self.inputs.metadata.get('label', '') # Check if we need to boot strap hybrid calculation if self.ctx.get('hybrid_status') == 'dft': # Turn off HF and turn off relaxation inputs.parameters = update_nested_dict_node( inputs.parameters, { OVERRIDE_NAMESPACE: { 'lhfcalc': False, 'isym': 2, # Standard DFT needs ISYM=2 }, 'relax': { # This turns off any relaxation 'positions': False, 'volume': False, 'shape': False, }, }, ) # Decrease the iteration number self.ctx.iteration -= 1 # Update the wallclock seconds wallclock = self.ctx.relax_settings.get('hybrid_calc_bootstrap_wallclock') if wallclock: inputs.calc.metadata.options = update_nested_dict( inputs.calc.metadata.options, {'max_wallclock_seconds': wallclock} ) # Update the MAGMOM if self.ctx.current_magmom is not None: inputs.parameters = update_nested_dict_node( inputs.parameters, {OVERRIDE_NAMESPACE: {'magmom': self.ctx.current_magmom}}, ) # Label the calculation and links by iteration number inputs.metadata.label += f' ITER {self.ctx.iteration:02d}' inputs.metadata.call_link_label = f'relax_{self.ctx.iteration:02d}' # Keep the workdir if keep_sp_workdir is set to True but we are not having static calculation at all # This ensure that we have a valid remote_data output which is not cleaned by the called VaspWorkChain if self.inputs.relax_settings.get('keep_sp_workdir') and not self.should_run_static_calculation(): inputs.keep_last_workdir = orm.Bool(True) running = self.submit(self._base_workchain, **inputs) self.report(f'launching {self._base_workchain.__name__}<{running.pk}> iterations #{self.ctx.iteration}') return ToContext(workchains=append_(running))
[docs] def run_static_calculation(self) -> ToContext: """Perform the relaxation""" # For the final static run we do not need to parse the output structure if self.inputs.vasp.get('settings'): self.ctx.static_input_additions.settings = update_nested_dict_node( self.inputs.vasp.settings, { 'parser_settings': { 'include_node': ['structure', 'trajectory'], 'required_node': ['structure', 'trajectory'], } }, extend_list=True, ) # Apply overrides if supplied if self.inputs.get('static_calc_settings'): self.ctx.static_input_additions.settings = self.inputs.static_calc_settings # Override INCARs for the final relaxation if self.inputs.get('static_calc_parameters'): self.ctx.static_input_additions.parameters = update_nested_dict_node( self.inputs.vasp.parameters, self.inputs.static_calc_parameters.get_dict(), ) if self.is_verbose(): if not self.perform_relaxation(): self.report('Skiped structure relaxation and forwarding inputs to the static calculation.') else: self.report('performing a final calculation using the relaxed structure.') self.ctx.iteration += 1 inputs = self.exposed_inputs(self._base_workchain, 'vasp') inputs.structure = self.ctx.current_structure # Attach previous calculation's folder if requested if self.ctx.relax_settings.get('reuse', False): restart_folder = self.ctx.get('current_restart_folder') # There might not be any yet if restart_folder: if self.ctx.get('verbose'): self.report(f'Using previous remote folder <{restart_folder}> for restart') inputs.restart_folder = restart_folder # The workdir is not cleaned by the called VaspWorkChain for the static calculation # if `keep_sp_workdir`` is set to True if self.ctx.relax_settings.get('keep_sp_workdir'): inputs.keep_last_workdir = orm.Bool(True) # Update the MAGMOM if information is present if self.ctx.current_magmom is not None: inputs.parameters = update_nested_dict_node( inputs.parameters, {OVERRIDE_NAMESPACE: {'magmom': self.ctx.current_magmom}}, ) if 'label' not in inputs.metadata: inputs.metadata.label = self.inputs.metadata.get('label', '') + ' SP' # Label the calculation and links by iteration number inputs.metadata.call_link_label = 'singlepoint' # Update the input with whatever stored in the relax_input_additions attribute dict inputs.update(self.ctx.static_input_additions) if self.inputs.get('static_calc_options'): inputs.calc.metadata.options = update_nested_dict( inputs.calc.metadata.options, self.inputs.static_calc_options.get_dict() ) # Make sure NSW is not here for the static calculation incar = inputs.parameters['incar'] if 'nsw' in incar: incar.pop('nsw') new_param = inputs.parameters.get_dict() new_param['incar'] = incar inputs.parameters = orm.Dict(dict=new_param) self.report('Removed explicitly defined NSW value for the static calculation') running = self.submit(self._base_workchain, **inputs) self.report(f'launching {self._base_workchain.__name__}<{running.pk}> iterations #{self.ctx.iteration}') return ToContext(workchains=append_(running))
[docs] def verify_next_workchain(self) -> None | ExitCode: """Verify and inherit exit status from child workchains.""" try: workchain = self.ctx.workchains[-1] except IndexError: self.report(f'There is no {self._base_workchain.__name__} in the called workchain list.') return self.exit_codes.ERROR_NO_CALLED_WORKCHAIN # pylint: disable=no-member # Inherit exit status from last workchain (supposed to be # successfull) next_workchain_exit_status = workchain.exit_status next_workchain_exit_message = workchain.exit_message if not next_workchain_exit_status: self.ctx.exit_code = self.exit_codes.NO_ERROR # pylint: disable=no-member else: self.ctx.exit_code = compose_exit_code(next_workchain_exit_status, next_workchain_exit_message) self.report( 'The called {}<{}> returned a non-zero exit status. The exit status {} is inherited'.format( workchain.__class__.__name__, workchain.pk, self.ctx.exit_code ) ) return self.ctx.exit_code
[docs] def verify_last_relax(self) -> None | ExitCode: """Verify and inherit exit status from the last relaxation""" try: workchain = self.ctx.workchains[-1] except IndexError: self.report(f'There is no {self._base_workchain.__name__} in the called workchain list.') return self.exit_codes.ERROR_NO_CALLED_WORKCHAIN # pylint: disable=no-member # Inherit exit status from last workchain (supposed to be # successfull) next_workchain_exit_status = workchain.exit_status next_workchain_exit_message = workchain.exit_message if not next_workchain_exit_status: self.ctx.exit_code = self.exit_codes.NO_ERROR # pylint: disable=no-member elif 'misc' in workchain.outputs and 'structure' in workchain.outputs and 'forces' in workchain.outputs.misc: self.ctx.exit_code = self.exit_codes.NO_ERROR # pylint: disable=no-member self.report( f'The called {workchain.__class__.__name__}<{workchain.pk}> returned a non-zero exit status.' ' Continue the workflow as "misc" and "structure" outputs are present.' ) else: self.ctx.exit_code = compose_exit_code(next_workchain_exit_status, next_workchain_exit_message) self.report( 'The called {}<{}> returned a non-zero exit status. The exit status {} is inherited'.format( workchain.__class__.__name__, workchain.pk, self.ctx.exit_code ) ) return self.ctx.exit_code
[docs] def analyze_convergence(self) -> None | ExitCode: """ Analyze the convergence of the relaxation. Compare the input and output structures of the most recent relaxation run. If volume, shape and ion positions are all within a given threshold, consider the relaxation converged. """ workchain = self.ctx.workchains[-1] # Double check presence of structure if 'structure' not in workchain.outputs: self.report( f'The {workchain.__class__.__name__}<{workchain.pk}> for the relaxation run did not have an output ' 'structure and most likely failed. However, its exit status was zero.' ) return self.exit_codes.ERROR_MISSING_REQUIRED_OUTPUT # pylint: disable=no-member if self.ctx.hybrid_status == 'dft': self.report( 'Competed initial DFT calculation - skipping convergence checks and process to hybrid calculation.' ) self.ctx.hybrid_status = 'hybrid' self.ctx.current_restart_folder = workchain.outputs.remote_folder return self.exit_codes.NO_ERROR # pylint: disable=no-member # Because the workchain may have been through multiple restarts of the underlying VASP calculation # we have to query and find the exact input structure of the calculation that generated the output # structure and use that for comparison query = orm.QueryBuilder() query.append( orm.StructureData, filters={'id': workchain.outputs.structure.pk}, tag='workchain-out', ) query.append(orm.CalcJobNode, with_outgoing='workchain-out', tag='calcjob') query.append(orm.StructureData, with_outgoing='calcjob') input_structure = query.one()[0] self.ctx.previous_structure = self.ctx.current_structure self.ctx.last_calc_input_structure = input_structure self.ctx.current_structure = workchain.outputs.structure conv_mode = self.ctx.relax_settings.convergence_mode # Assign the two structure used for comparison if conv_mode == 'inout': compare_from = self.ctx.last_calc_input_structure compare_to = self.ctx.current_structure elif conv_mode == 'last': if 'trajectory' not in workchain.outputs: self.report( 'Warning - trajectory output not found but needed for convergence check - reverting to check ' 'input/output structures.' ) compare_from = self.ctx.last_calc_input_structure compare_to = self.ctx.current_structure else: traj = workchain.outputs.trajectory if traj.numsteps > 1: compare_from = get_step_structure( workchain.outputs.trajectory, -2 ) # take take second last structure compare_to = get_step_structure(workchain.outputs.trajectory, -1) # take take second last structure else: self.report( 'Warning - no enough number of steps to compare - using input/output structures instead.' ) compare_from = self.ctx.last_calc_input_structure compare_to = self.ctx.current_structure else: raise RuntimeError(f'Convergence mode {conv_mode} is not a valid option') converged = True relax_settings = self.ctx.relax_settings if relax_settings.convergence_on: if self.is_verbose(): self.report('Checking the convergence of the relaxation.') comparison = compare_structures(compare_from, compare_to) comparison_inout = compare_structures(self.ctx.last_calc_input_structure, self.ctx.current_structure) delta_inout = comparison_inout.absolute if relax_settings.convergence_absolute else comparison.relative if relax_settings.positions: # For positions it only makes sense to check the absolute change converged &= self.check_positions_convergence(comparison.absolute) # For volume and shape, always compare the input/output structure # This is because VASP only does fixed basis relaxations, so restart is needed if significant check in # the cell volume/shape is needed if relax_settings.volume: converged &= self.check_volume_convergence(delta_inout) if relax_settings.shape: converged &= self.check_shape_convergence(delta_inout) # BONAN: Check force - this is because the underlying VASP calculation may not have finished with # fully converge geometry, and the vasp plugin does not check it. force_cut_off = relax_settings.get('force_cutoff') # Set the forces to zero if using selective dynamics forces = np.array(workchain.outputs.misc.get('forces')) if 'dynamics' in workchain.inputs: dof = workchain.inputs.dynamics.get('positions_dof') else: dof = None max_force = get_maximum_force(forces, dof=dof) if force_cut_off is not None and max_force > force_cut_off: self.report( f'Maximum force in the structure {max_force:.4g} excess the cut-off limit {force_cut_off:.4g}' ' - NOT OK' ) converged = False elif self.is_verbose(): self.report(f'Maximum force in the structure {max_force:.4g} - OK') if not converged: if self.ctx.get('verbose', self._verbose): self.report('Relaxation did not converge, restarting the relaxation.') elif self.is_verbose(): self.report('Relaxation is converged, finishing with a final static calculation.') elif self.is_verbose(): self.report('Convergence checking is not enabled - finishing with a final static calculation.') self.ctx.current_restart_folder = workchain.outputs.remote_folder # Update the magmom to be used if 'site_magnetization' in workchain.outputs and self.ctx.relax_settings.get('keep_magnetization', True): try: self.ctx.current_magmom = site_magnetization_to_magmom(workchain.outputs.site_magnetization) # Some times the site magnetisation can be empty - do nothing except ValueError: pass self.ctx.is_converged = converged # Check if we are in 'double_relax' mode and the relaxation is not converged if self.ctx.is_double_relax and not converged: if self.ctx.iteration >= 2: self.report( 'DOUBLE RELAX: The second relaxation is not converged, ' ' but we will finish with a final static calculation.' ) self.ctx.is_converged = True return self.exit_codes.NO_ERROR # pylint: disable=no-member
[docs] def check_shape_convergence(self, delta: AttributeDict) -> bool: """Check the difference in cell shape before / after the last iteratio against a tolerance.""" threshold_angles = self.ctx.relax_settings.convergence_shape_angles threshold_lengths = self.ctx.relax_settings.convergence_shape_lengths if threshold_lengths < 0: self.report('Cell length convergence check bypassed.') lengths_converged = True else: lengths_converged = bool(delta.cell_lengths.max() <= threshold_lengths) if not lengths_converged: self.report( f'cell lengths changed by max {delta.cell_lengths.max():.4g}, tolerance is {threshold_lengths:.4g}' ' - NOT OK' ) elif self.is_verbose(): self.report( f'cell lengths changed by max {delta.cell_lengths.max():.4g}, tolerance is {threshold_lengths:.4g} - OK' ) angles_converged = bool(delta.cell_angles.max() <= threshold_angles) if threshold_angles < 0: self.report('Cell angles check bypassed.') angles_converged = True else: angles_converged = bool(delta.cell_angles.max() <= threshold_angles) if not angles_converged: self.report( f'cell angles changed by max {delta.cell_angles.max():.4g}, tolerance is {threshold_angles:.4g}' ' - NOT OK' ) elif self.is_verbose(): self.report( f'cell angles changed by max {delta.cell_angles.max():.4g}, tolerance is {threshold_angles:.4g} - OK' ) return lengths_converged and angles_converged
[docs] def check_volume_convergence(self, delta: AttributeDict) -> bool: """Check the convergence of the volume, given a cutoff.""" threshold = self.ctx.relax_settings.convergence_volume if threshold < 0: self.report('Volume convergence check bypassed.') return True volume_converged = bool(delta.volume <= threshold) if not volume_converged: self.report(f'cell volume changed by {delta.volume:.4g}, tolerance is {threshold:.4g} - NOT OK') elif self.is_verbose(): self.report(f'cell volume changed by {delta.volume:.4g}, tolerance is {threshold:.4g} - OK') return volume_converged
[docs] def check_positions_convergence(self, delta: AttributeDict) -> bool: """Check the convergence of the atomic positions, given a cutoff.""" threshold = self.ctx.relax_settings.convergence_positions if threshold < 0: self.report('Positions convergence check bypassed.') return True try: positions_converged = bool(np.nanmax(delta.pos_lengths) <= threshold) except RuntimeWarning: # Here we encountered the case of having one atom centered at the origin, so # we do not know if it is converged, so settings it to False # BONAN: this should never happen now - remove it later self.report( 'there is NaN entries in the relative comparison for the positions during relaxation,' ' assuming position is not converged' ) positions_converged = False if not positions_converged: try: self.report( f'max site position change is {np.nanmax(delta.pos_lengths):.4g}, tolerance is {threshold:.4g}' ' - NOT OK' ) except RuntimeWarning: pass elif self.is_verbose(): try: self.report( f'max site position change is {np.nanmax(delta.pos_lengths):.4g}, tolerance is {threshold:.4g} - OK' ) except RuntimeWarning: pass return positions_converged
[docs] def store_relaxed(self) -> None | ExitCode: """Store the relaxed structure.""" workchain = self.ctx.workchains[-1] relaxed_structure = workchain.outputs.structure if self.is_verbose(): self.report( "attaching the node {}<{}> as '{}'".format( relaxed_structure.__class__.__name__, relaxed_structure.pk, 'relax.structure', ) ) self.out('relax.structure', relaxed_structure) if not self.ctx.is_converged: # If the relaxation is not converged, there is no point to # proceed furthure, and the result of last calculation is attached workchain = self.ctx.workchains[-1] self.out_many(self.exposed_outputs(workchain, self._base_workchain)) return self.exit_codes.ERROR_RELAX_NOT_CONVERGED # pylint: disable=no-member
[docs] def results(self) -> None | ExitCode: """ Attach the remaining output results. This can either be the final static calculation or the last relaxation if the former is not needed. As a final check - check if the `maximum_force` is lower than the predefined value. """ workchain = self.ctx.workchains[-1] self.out_many(self.exposed_outputs(workchain, self._base_workchain)) # Check the maximum force only if we are not using selective dynamics if 'dynamics' in workchain.inputs: dof = workchain.inputs.dynamics.get('positions_dof') else: dof = None # Try to get the smearing type, there is no point to perform check if the tetrahedral smearing is used. if not detect_tetrahedral_method(workchain.inputs.parameters.get_dict()): max_force_threshold = self.ctx.relax_settings.get('force_cutoff', 0.03) actual_max_force = get_maximum_force(workchain.outputs.misc['forces'], dof=dof) if ( # TODO: Make this check part of the convergence cycle? actual_max_force > max(max_force_threshold * 1.5, max_force_threshold + 0.001) and self.perform_relaxation() and self.ctx.do_residual_forces_check ): if self.is_verbose(): self.report( f'The force of the final SCF is {actual_max_force} eV/A, ' 'which is significantly higher than the tolerance' f' {max_force_threshold} eV/A.' ) if not self.ctx.is_double_relax: return self.exit_codes.ERROR_FINAL_SCF_HAS_RESIDUAL_FORCE # pylint: disable=no-member else: self.report( 'Unable to perform final check for maximum force, as the tetrahedral method is used for integration.' ) # If we bypassed the relax by setting perform=False, still attached the input structure as the output structure if not RelaxOptions(**self.inputs.relax_settings.get_dict()).perform: self.out('relax.structure', workchain.inputs.structure)
[docs] def finalize(self) -> None: """ Finalize the workchain. Clean the remote working directories of the called calcjobs """ rlx_settings = self.ctx.relax_settings # Fence this section to avoid unnecessary process exceptions. try: if rlx_settings.get('reuse') and rlx_settings.get('clean_reuse', True): self.report('Cleaning remote working directory for the called CalcJobs.') cleaned_calcs = [] qbd = orm.QueryBuilder() qbd.append(orm.WorkChainNode, filters={'id': self.node.pk}) # Options for keeping the single point calculation work directory if rlx_settings.get('keep_sp_workdir', False): # Only gather the relax calculations qbd.append( orm.WorkChainNode, edge_filters={'label': {'like': 'relax_%'}}, filters={'id': {'in': [node.pk for node in self.ctx.workchains]}}, ) else: qbd.append( orm.WorkChainNode, filters={'id': {'in': [node.pk for node in self.ctx.workchains]}}, ) qbd.append(orm.CalcJobNode) # Find the CalcJobs to clean if qbd.count() > 0: calcjobs = [tmp[0] for tmp in qbd.all()] else: self.report('Cannot found called CalcJobNodes to clean.') return # Clean the remote directories one by one for calculation in calcjobs: try: calculation.outputs.remote_folder._clean() # pylint: disable=protected-access cleaned_calcs.append(calculation.pk) except BaseException: # pylint: disable=no-self-argument,no-self-use pass if cleaned_calcs: self.report(f'cleaned remote folders of calculations: {" ".join(map(str, cleaned_calcs))}') except BaseException as exception: # pylint: disable=no-self-argument,no-self-use self.report(f'Exception occurred during the cleaning of the remote contents: {exception.args}')
[docs] def perform_relaxation(self) -> bool: """Check if a relaxation is to be performed.""" return self.ctx.relax_settings.perform
[docs] def should_run_static_calculation(self) -> bool: """Control whether the static calculation should be run""" # If the relaxation itself by passed - run the final static calculation if not self.perform_relaxation(): return True # If the shape or volume is relaxed - we must do the final calculation relax_settings = self.ctx.relax_settings if relax_settings.shape or relax_settings.volume: return True # Otherwise, we skip the final calculation return False
[docs] def is_verbose(self) -> bool: """Are we in the verbose mode?""" return self.ctx.get('verbose', self._verbose)
@classproperty def relax_option_class(cls) -> RelaxOptions: # noqa: N805 """Class for relax options""" return RelaxOptions
[docs] def compare_structures(structure_a: orm.StructureData, structure_b: orm.StructureData) -> AttributeDict: """Compare two StructreData objects A, B and return a delta (A - B) of the relevant properties.""" delta = AttributeDict() delta.absolute = AttributeDict() delta.relative = AttributeDict() volume_a = structure_a.get_cell_volume() volume_b = structure_b.get_cell_volume() delta.absolute.volume = np.absolute(volume_a - volume_b) delta.relative.volume = np.absolute(volume_a - volume_b) / volume_a # Check the change in positions taking account of the pbc atoms_a = structure_a.get_ase() atoms_a.set_pbc(True) atoms_b = structure_b.get_ase() atoms_b.set_pbc(True) n_at = len(atoms_a) atoms_a.extend(atoms_b) pos_change_abs = np.zeros((n_at, 3)) for isite in range(n_at): dist = atoms_a.get_distance(isite, isite + n_at, mic=True, vector=True) pos_change_abs[isite] = dist pos_a = np.array([site.position for site in structure_a.sites]) # pos_b = np.array([site.position for site in structure_b.sites]) delta.absolute.pos = pos_change_abs site_vectors = [delta.absolute.pos[i, :] for i in range(delta.absolute.pos.shape[0])] a_lengths = np.linalg.norm(pos_a, axis=1) delta.absolute.pos_lengths = np.array([np.linalg.norm(vector) for vector in site_vectors]) delta.relative.pos_lengths = np.divide( delta.absolute.pos_lengths, a_lengths, out=np.zeros_like(delta.absolute.pos_lengths), where=a_lengths != 0, ) cell_lengths_a = np.array(structure_a.cell_lengths) delta.absolute.cell_lengths = np.absolute(cell_lengths_a - np.array(structure_b.cell_lengths)) delta.relative.cell_lengths = np.absolute(cell_lengths_a - np.array(structure_b.cell_lengths)) / cell_lengths_a cell_angles_a = np.array(structure_a.cell_angles) delta.absolute.cell_angles = np.absolute(cell_angles_a - np.array(structure_b.cell_angles)) delta.relative.cell_angles = np.absolute(cell_angles_a - np.array(structure_b.cell_angles)) / cell_angles_a return delta
[docs] def get_step_structure(traj: orm.TrajectoryData, step: int) -> orm.StructureData: """Get the step structure, but assume the positions are fractional""" _, _, cell, symbols, positions, _ = traj.get_step_data(step) # Convert to cartesian coorindates positions = positions @ cell structure = orm.StructureData(cell=cell) for _s, _p in zip(symbols, positions): # Automatic species generation structure.append_atom(symbols=_s, position=_p) return structure
[docs] def detect_tetrahedral_method(input_dict: dict) -> bool: """ Check if the tetrahedral method is used for BZ integration. """ incar = input_dict.get('incar', {}) if incar.get('ismear', 1) == -5: return True if input_dict.get('smearing', {}).get('tetra') and not incar.get('ismear'): return True return False
[docs] class VaspMultiStageRelaxWorkChain(WorkChain, WithBuilderUpdater): """ Relxation with multiple stages This workchain allows to run multiple stages of relaxation with different parameters, options and settings. The workchain takes a structure as input and runs a series of VaspRelaxWorkChain calculations, each with updated set of parameters, options and settings as specified in <name>_stages. The output of the final workchain is exposed. Example:: vasp_staged_relax = VaspMultiStageRelaxWorkChain.get_builder() vasp_staged_relax.structure = structure vasp_staged_relax.relax = <usual relax inputs> # Set ismear to 0 for the first stage, -5 for the second stage vasp_staged_relax.parameters_stages = { '0': {'incar': {'ismear': 0}}, '1': {'incar': {'ismear': 1, 'gga': 'pe', 'lhfcalc': True}}, } # Switch to RMM-DIIS for the second stage vasp_staged_relax.relax_settings_stages = { '1': {'algo': 'rd'}} # Include node in the second stage of the relaxation vasp_staged_relax.settings_stages = { '1': {'parser_settings': {'inlude_node': ['dos']}}, } Note that the index starts from 0 - e.g. '0' for the first stage, '1' for the second stage, etc. """ _base_workchain = VaspRelaxWorkChain
[docs] @classmethod def define(cls, spec: ProcessSpec) -> None: super().define(spec) spec.input_namespace('parameters_stages', required=False, dynamic=True, help='Parameters for each stage') spec.input_namespace('settings_stages', required=False, dynamic=True, help='Settings for each stage') spec.input_namespace('options_stages', required=False, dynamic=True, help='Options for each stage') spec.input_namespace( 'relax_settings_stages', required=False, dynamic=True, help='relax_settings for each stage' ) spec.input_namespace( 'others_stages', required=False, dynamic=True, help='Others for each stage, for example, set "1_kpoints" to apply kpoints to the first stage', ) spec.input( 'use_nested_update', valid_type=orm.Bool, default=lambda: orm.Bool(True), help='Use nested update for parameters, options, relax_settings, and settings. ' 'Otherwise full dictionary should be provided', ) spec.input( 'ignored_failed', valid_type=orm.Bool, default=lambda: orm.Bool(False), help='If True, continue to the next stage even when a stage fails, provided a relaxed structure exists.', ) spec.expose_inputs(cls._base_workchain, 'relax', exclude=('structure',)) spec.input('structure', valid_type=(orm.StructureData, orm.CifData)) spec.expose_outputs(cls._base_workchain) # Outline of the workchain spec.outline( cls.setup, while_(cls.should_run_stage)( cls.run_stage, cls.inspect_stage, ), cls.results, ) spec.exit_code( 500, 'ERROR_SUB_PROCESS_FAILED', message='The subprocess has failed.', )
[docs] def setup(self) -> None: """ Initialize context variables """ relax_inputs = self.exposed_inputs(self._base_workchain, 'relax') self.ctx.current_stage = 0 self.ctx.current_structure = self.inputs.structure self.ctx.parameters = relax_inputs.vasp.parameters.get_dict() self.ctx.settings = relax_inputs.vasp.settings.get_dict() self.ctx.options = deepcopy(relax_inputs.vasp.calc.metadata.options) self.ctx.relax_settings = relax_inputs.relax_settings.get_dict() self.ctx.n_stages = len(self.inputs.parameters_stages)
[docs] def should_run_stage(self) -> bool: """ Check if a stage should be run """ return self.ctx.current_stage < self.ctx.n_stages
[docs] def run_stage(self) -> ToContext: """ Run a stage """ self.report(f'Running stage {self.ctx.current_stage}') istage = self.ctx.current_stage # Update the parameters, options and settings - apply the changes to the self.ctx for name in ['parameters', 'settings', 'relax_settings', 'options']: if str(istage) in self.inputs.get(name + '_stages', {}): if self.inputs.use_nested_update.value: self.ctx[name] = update_nested_dict( self.ctx[name], self.inputs[name + '_stages'][str(istage)].get_dict() ) else: self.ctx[name] = self.inputs[name + '_stages'][str(istage)].get_dict() relax_inputs = self.exposed_inputs(self._base_workchain, 'relax') # Apply others changes {'1_relax_settings': {'algo': 'rd'}} for key, value in self.inputs.get('others_stages', {}): stage = key.split('_')[0] name = key.split('_')[1] if stage == istage: if name in ['static_calc_settings', 'static_calc_options', 'static_calc_parameters']: relax_inputs[name] = value else: relax_inputs.vasp[name] = value # Modify the inputs for name in ['parameters', 'settings']: if relax_inputs.vasp[name].get_dict() != self.ctx[name]: relax_inputs.vasp[name] = orm.Dict(dict=self.ctx[name]) # Modify the options port of the underlying VaspWorkChain update_nested_dict(relax_inputs.vasp.calc.metadata.options, self.ctx.options) # Modify the relax_settings if relax_inputs.relax_settings.get_dict() != self.ctx.relax_settings: relax_inputs.relax_settings = orm.Dict(dict=self.ctx.relax_settings) relax_inputs.structure = self.ctx.current_structure running = self.submit(self._base_workchain, **relax_inputs) return ToContext(workchains=append_(running))
[docs] def inspect_stage(self) -> None | ExitCode: """ Inspect a stage """ workchain = self.ctx.workchains[-1] self.report(f'Inspecting stage {self.ctx.current_stage} - {workchain}') if not workchain.is_finished_ok: self.report(f'Stage {self.ctx.current_stage} failed with exit status {workchain.exit_status} - aborting.') if not self.inputs.ignored_failed.value: self.out_many(self.exposed_outputs(workchain, self._base_workchain)) return self.exit_codes.ERROR_SUB_PROCESS_FAILED else: try: self.ctx.current_structure = workchain.outputs.relax.structure except AttributeError: self.report('Cannot get the relaxed structure from the last workchain - aborting.') self.out_many(self.exposed_outputs(workchain, self._base_workchain)) return self.exit_codes.ERROR_SUB_PROCESS_FAILED else: self.ctx.current_structure = workchain.outputs.relax.structure self.ctx.current_stage += 1 return
[docs] def results(self) -> None: """ Attach the remaining output results. This can either be the final static calculation or the last relaxation if the former is not needed. As a final check - check if the `maximum_force` is lower than the predefined value. """ workchain = self.ctx.workchains[-1] self.out_many(self.exposed_outputs(workchain, self._base_workchain))
[docs] def get_maximum_force(forces: np.ndarray, dof=None) -> float: """Return the maximum value of an array of forces with size (N, 3)""" if dof is not None: forces = np.asarray(forces).copy() for i, value in enumerate(dof): # Set forces to zero if there is any False entry # This is because VASP fixes fractional coorindates while forces are in Cartesian # So to void deadlock, we simply ignore the forces on that atom if any([not v for v in value]): forces[i, :] = 0.0 norm = np.linalg.norm(forces, axis=1) return np.amax(norm)