"""
This file contains the VaspWorkChain class definition which uses the BaseRestartWorkChain.
Below is a copy of the error handler logic from aiida-core:
If the process is excepted or killed, the work chain will abort. Otherwise any attached handlers will be called
in order of their specified priority. If the process was failed and no handler returns a report indicating that
the error was handled, it is considered an unhandled process failure and the process is relaunched. If this
happens twice in a row, the work chain is aborted. In the case that at least one handler returned a report the
following matrix determines the logic that is followed::
Process Handler Handler Action
result report? exit code
-----------------------------------------
Success yes == 0 Restart
Success yes != 0 Abort
Failed yes == 0 Restart
Failed yes != 0 Abort
If no handler returned a report and the process finished successfully, the work chain's work is considered done
and it will move on to the next step that directly follows the `while` conditional, if there is one defined in
the outline.
This means that for a handler:
- No error found - just return None
- No action taken
- the error is not recoverable - return with a non-zero error code with do break
- the error is not recoverable, but other handler may/maynot save it - return with a non-zero code without do
break
- the error is not recoverable, and the workchain should be aborted immediately - non-zero code + do break
- Action taken
- the error is fixed in full - return with a zero error code with `do_break=True`
- the error is not fixed in full - return with a report with `do_break=False` but has a `exit_code`.
this mean other handlers (with lower priority) must handle it and return a zero error_code.
"""
from __future__ import annotations
import math
import warnings
from types import TracebackType
from typing import Any, List, Optional, Tuple
import numpy as np
from aiida import orm
from aiida.common.exceptions import InputValidationError, NotExistent
# from aiida.engine.job_processes import override
from aiida.common.extendeddicts import AttributeDict
from aiida.common.lang import override
from aiida.engine import ExitCode, ProcessSpec, if_, while_
from aiida.engine.processes.workchains.restart import (
BaseRestartWorkChain,
ProcessHandlerReport,
WorkChain,
process_handler,
)
from aiida.orm import CalcJobNode, Dict, KpointsData
from aiida.orm.nodes.data.base import to_aiida_type
from aiida.plugins import CalculationFactory
from aiida_vasp.assistant.parameters import (
ParametersMassage,
)
from aiida_vasp.calcs.vasp import VaspCalculation
from aiida_vasp.common import parameters_validator, warn_deprecated_options
from aiida_vasp.common.dryrun import get_jobscheme
from aiida_vasp.data.potcar import PotcarData
from aiida_vasp.protocols import ProtocolMixin, recursive_merge
from aiida_vasp.utils.ldau import get_ldau_keys
from aiida_vasp.utils.workchains import compose_exit_code, prepare_process_inputs, site_magnetization_to_magmom
from .mixins import WithBuilderUpdater
# pylint: disable=no-member
[docs]
class VaspWorkChain(BaseRestartWorkChain, WithBuilderUpdater, ProtocolMixin):
"""
The VASP workchain.
Error handling enriched wrapper around VaspCalculation.
Deliberately conserves most of the interface (required inputs) of the VaspCalculation class, but
makes it possible for a user to interact with a workchain and not a calculation.
This is intended to be used instead of directly submitting a VaspCalculation,
so that future features like
automatic restarting, error checking etc. can be propagated to higher level workchains
automatically by implementing them here.
Handlers are implemented to try fix common problems and improves the robustness.
Individual handlers can be enabled/disabled by setting the ``handler_overrides`` input port.
Additional settings may be passed under the "settings" input, which is also forwarded to the
calculations. The available options are:
- ``USE_WAVECAR_FOR_RESTART`` wether calculation restarts should use the WAVECAR. The default is ``True``.
Usage::
from aiida.common.extendeddicts import AttributeDict
from aiida.work import submit
basevasp = WorkflowFactory('vasp.vasp')
inputs = basevasp.get_builder()
inputs = AttributeDict()
## ... set inputs
submit(basevasp, **inputs)
To see a working example, including generation of input nodes from scratch, please
refer to ``examples/run_vasp_lean.py``.
"""
_verbose = False
_process_class = CalculationFactory('vasp.vasp')
_algo_switching = {
'normal': ['fast', 'veryfast', 'damped'],
'fast': ['normal', 'veryfast', 'damped'],
'veryfast': ['normal', 'fast', 'damped'],
'damped': ['normal', 'fast', 'veryfast'],
}
_default_unsupported_parameters = {}
_protocol_tag = 'vasp' # Search for vasp.yml in protocol directories
[docs]
@classmethod
def define(cls, spec: ProcessSpec) -> None: # pylint: disable=too-many-statements
super(VaspWorkChain, cls).define(spec)
spec.expose_inputs(cls._process_class, exclude=('metadata',))
spec.expose_inputs(
cls._process_class, namespace='calc', include=('metadata',), namespace_options={'populate_defaults': True}
)
# Use a custom validator for backward compatibility
# This needs to be removed in the next major release/formalized workchain interface
spec.inputs.validator = validate_calc_job_custom
spec.inputs['calc']['metadata']['options']['resources']._required = False
spec.inputs['calc']._required = False
spec.input('kpoints', valid_type=orm.KpointsData, required=False)
spec.input(
'potential_family',
valid_type=orm.Str,
required=True,
serializer=to_aiida_type,
validator=potential_family_validator,
)
spec.input(
'potential_mapping',
valid_type=orm.Dict,
required=True,
serializer=to_aiida_type,
)
spec.input(
'parameters',
valid_type=orm.Dict,
required=True,
validator=parameters_validator,
serializer=to_aiida_type,
)
spec.input(
'options',
valid_type=orm.Dict,
required=False,
serializer=to_aiida_type,
validator=warn_deprecated_options,
help='Deprecated - use `calc.metadata.options` instead.',
)
spec.input(
'max_iterations',
valid_type=orm.Int,
required=False,
default=lambda: orm.Int(5),
serializer=to_aiida_type,
help="""
The maximum number of iterations to perform.
""",
)
spec.input(
'clean_workdir',
valid_type=orm.Bool,
required=False,
serializer=to_aiida_type,
default=lambda: orm.Bool(True),
help="""
If True, clean the work dir upon the completion of a successfull calculation.
""",
)
spec.input(
'keep_last_workdir',
valid_type=orm.Bool,
default=lambda: orm.Bool(False),
serializer=to_aiida_type,
help='If True, prevent the last workdir from being cleaned in case the files are needed for restarts.',
)
spec.input(
'verbose',
valid_type=orm.Bool,
required=False,
serializer=to_aiida_type,
default=lambda: orm.Bool(False),
help="""
If True, enable more detailed output during workchain execution.
""",
)
spec.input(
'ldau_mapping',
valid_type=orm.Dict,
required=False,
serializer=to_aiida_type,
help="""Settings for assign LDA+U related settings according to the input structure,
A nested dictionary containing the following keys:
mapping: a dictionary in the format of {"Mn": [d, 4]...} for U
utype: the type of LDA+U, default to 2, which is the one with only one parameter
jmapping: a dictionary in the format of {"Mn": [d, 4]...} but for J
felec: Wether we are dealing with f electrons, will increase lmaxmix if we are.""",
)
spec.input(
'magmom_mapping',
valid_type=orm.Dict,
required=False,
serializer=to_aiida_type,
help='Mapping for the initial magnetic moments.',
)
spec.input(
'kpoints_spacing',
valid_type=orm.Float,
required=False,
serializer=to_aiida_type,
help='Spacing for the kpoints in units A^-1 * 2pi',
)
spec.input(
'auto_parallel',
valid_type=orm.Dict,
serializer=to_aiida_type,
required=False,
help='Automatic parallelisation settings, keywords passed to `get_jobscheme` function.',
)
spec.outline(
cls.setup,
cls.init_inputs,
if_(cls.run_auto_parallel)(cls.prepare_inputs, cls.perform_autoparallel),
while_(cls.should_run_process)(
cls.prepare_inputs,
cls.run_process,
cls.inspect_process,
),
cls.results,
) # yapf: disable
spec.output('parallel_settings', valid_type=orm.Dict, required=False)
spec.expose_outputs(cls._process_class)
# Copied from the old plugin restart workchain
spec.exit_code(
0,
'NO_ERROR',
message='the sun is shining',
)
spec.exit_code(
300,
'ERROR_MISSING_REQUIRED_OUTPUT',
message='the calculation is missing at least one required output in the restart workchain',
)
spec.exit_code(
400,
'ERROR_ITERATION_RETURNED_NO_CALCULATION',
message='the run_calculation step did not successfully add a calculation node to the context',
)
spec.exit_code(
401,
'ERROR_MAXIMUM_ITERATIONS_EXCEEDED',
message='the maximum number of iterations was exceeded',
)
spec.exit_code(
402,
'ERROR_UNEXPECTED_CALCULATION_STATE',
message='the calculation finished with an unexpected calculation state',
)
spec.exit_code(
403,
'ERROR_UNEXPECTED_CALCULATION_FAILURE',
message='the calculation experienced and unexpected failure',
)
spec.exit_code(
404,
'ERROR_SECOND_CONSECUTIVE_SUBMISSION_FAILURE',
message='the calculation failed to submit, twice in a row',
)
spec.exit_code(
405,
'ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE',
message='the calculation failed for an unknown reason, twice in a row',
)
spec.exit_code(
500,
'ERROR_MISSING_CRITICAL_OUTPUT',
message='Missing critical output for inspecting the status of the calculation.',
)
spec.exit_code(
501,
'ERROR_OTHER_INTERVENTION_NEEDED',
message='Cannot handle the error - inputs are likely need to be revised manually. Message: {message}',
)
spec.exit_code(
502,
'ERROR_CALCULATION_NOT_FINISHED',
message='Cannot handle the error - the last calculation did not reach the end of execution.',
)
spec.exit_code(
503,
'ERROR_ELECTRONIC_STRUCTURE_NOT_CONVERGED',
message='Cannot handle the error - the last calculation did not reach electronic convergence.',
)
spec.exit_code(
504,
'ERROR_IONIC_RELAXATION_NOT_CONVERGED',
message='The ionic relaxation is not converged.',
)
spec.exit_code(
505,
'ERROR_UNCONVERGED_ELECTRONIC_STRUCTURE_IN_RELAX',
message='At least one of the ionic steps during the relaxation has did not have converged '
'electronic structure.',
)
spec.exit_code(
700,
'ERROR_NO_POTENTIAL_FAMILY_NAME',
message='the user did not supply a potential family name',
)
spec.exit_code(
701,
'ERROR_POTENTIAL_VALUE_ERROR',
message='ValueError was returned from get_potcars_from_structure',
)
spec.exit_code(
702,
'ERROR_POTENTIAL_DO_NOT_EXIST',
message='the potential does not exist',
)
spec.exit_code(
703,
'ERROR_IN_PARAMETER_MASSAGER',
message='the exception: {exception} was thrown while massaging the parameters',
)
[docs]
def setup(self) -> None:
super().setup()
self.ctx.restart_calc = None
self.ctx.vasp_did_not_execute = False
self.ctx.last_calc_was_unfinished = False
self.ctx.use_wavecar = True
self.ctx.ignore_transient_nelm_breach = False # Flag for ignoring the NELM breach during the relaxation
self.ctx.verbose = None
self.ctx.last_calc_remote_objects = []
self.ctx.handler = AttributeDict()
self.ctx.handler.nbands_increase_tries = 0
[docs]
@classmethod
def get_builder_from_protocol(
cls,
code: orm.AbstractCode,
structure: orm.StructureData,
protocol=None,
overrides=None,
options=None,
**_,
):
"""Return a builder prepopulated with inputs selected according to the chosen protocol.
:param code: the ``Code`` instance configured for the ``abacus.abacus`` plugin.
:param structure: the ``StructureData`` instance to use.
:param protocol: protocol to use, if not specified, the default will be used.
:param overrides: optional dictionary of inputs to override the defaults of the protocol.
:param options: A dictionary of options that will be recursively set for the ``metadata.options`` input of all
the ``CalcJobs`` that are nested in this work chain.
:return: a process builder instance with all inputs defined ready for launch.
"""
if isinstance(code, str):
code = orm.load_code(code)
has_pmg = True
try:
from aiida_vasp.protocols.pmg import PymatgenInputAdaptor # noqa: PLC0415
except ImportError:
has_pmg = False
if has_pmg and protocol in PymatgenInputAdaptor.KNOWN_SETS:
adaptor = PymatgenInputAdaptor(
protocol,
incar_overrides=overrides.get('incar_overrides', {}),
pmg_kwargs=overrides.get('pmg_kwargs', {}),
)
inputs = adaptor.get_inputs(structure, is_workchain=True, overrides=overrides)
else:
inputs = cls.get_protocol_inputs(protocol, overrides)
meta_parameters = inputs.pop('meta_parameters', {})
natoms = len(structure.sites)
# Update the parameters based on the protocol inputs
parameters = inputs['parameters']
# Update EDIFF if not overriden
if 'ediff' not in parameters['incar']:
parameters['incar']['ediff'] = natoms * float(meta_parameters['ediff_per_atom'])
# Configure the options for the underlying VaspCalculation to be launched
metadata = inputs.get('calc', {}).get('metadata', {})
if options:
metadata['options'] = recursive_merge(metadata.get('options', {}), options)
# Forward to the builders
builder = cls.get_builder()
builder.code = code
# Use explicit potential if given
if len(inputs.get('potentials', {})) > 0:
builder.potentials = inputs['potentials']
else:
builder.potential_family = inputs['potential_family']
builder.potential_mapping = {key: inputs['potential_mapping'][key] for key in structure.get_kind_names()}
# Apply inputs
builder.structure = structure
builder.parameters = parameters
builder.calc.metadata = metadata
if 'settings' in inputs:
builder.settings = inputs.get('settings', {})
if 'clean_workdir' in inputs:
builder.clean_workdir = orm.Bool(inputs['clean_workdir'])
# Configure the kpoints
if 'kpoints' in inputs:
if isinstance(inputs['kpoints'], orm.Data):
builder.kpoints = inputs['kpoints']
# Has mesh been explicitly supplied?
elif 'mesh' in inputs['kpoints']:
kpoints = KpointsData()
kpoints.set_cell_from_structure(structure)
kpoints.set_kpoints_mesh(inputs['kpoints']['mesh'], inputs['kpoints'].get('offset', [0, 0, 0]))
builder.kpoints = kpoints
elif 'spacing' in inputs['kpoints']:
builder.kpoints_spacing = orm.Float(inputs['kpoints']['spacing'])
else:
builder.kpoints_spacing = orm.Float(inputs['kpoints_spacing'])
# Apply maximum iteration
if 'max_iterations' in inputs:
builder.max_iterations = inputs['max_iterations']
# Check if we have any valid ldau_u_mapping defined
if inputs.get('ldau_mapping') is None:
ldau_u_mapping = {}
else:
ldau_u_mapping = {key: inputs['ldau_mapping'].get('mapping').get(key) for key in structure.get_kind_names()}
# Only forward the mapping if there are elements included in the mapping
# Note that only the u-mapping is checked. The assumption is that if one wants to use it
# it should be supplied
if any(ldau_u_mapping.values()):
builder.ldau_mapping = inputs['ldau_mapping']
magmom_mapping = inputs.get('magmom_mapping', {})
if dict(magmom_mapping):
builder.magmom_mapping = magmom_mapping
return builder
[docs]
def verbose_report(self, *args, **kwargs) -> None:
"""Send report if self.ctx.verbose is True"""
if self.ctx.verbose is True:
self.report(*args, **kwargs)
[docs]
def update_magmom(self, node: Optional[CalcJobNode] = None) -> None:
"""
Update magmom from site magnetization information if available
:param node: Calculation node to be used, defaults to the last launched calculation.
"""
if self.is_noncollinear:
self.report('Automatic carrying on magmom for non-collinear magnetism calculation is not implemented.')
return
if node is None:
node = self.ctx.children[-1]
if 'site_magnetization' in node.outputs:
try:
self.ctx.inputs.parameters['magmom'] = site_magnetization_to_magmom(
node.outputs.site_magnetization.get_dict()
)
except ValueError:
pass
[docs]
def setup_potcar(self) -> None:
# Verify and set potentials (potcar)
if not self.inputs.potential_family.value:
self.report('An empty string for the potential family name was detected.') # pylint: disable=not-callable
return self.exit_codes.ERROR_NO_POTENTIAL_FAMILY_NAME # pylint: disable=no-member
try:
self.ctx.inputs.potential = PotcarData.get_potcars_from_structure(
structure=self.inputs.structure,
family_name=self.inputs.potential_family.value,
mapping=self.inputs.potential_mapping.get_dict(),
)
except ValueError as err:
return compose_exit_code(self.exit_codes.ERROR_POTENTIAL_VALUE_ERROR.status, str(err)) # pylint: disable=no-member
except NotExistent as err:
return compose_exit_code(self.exit_codes.ERROR_POTENTIAL_DO_NOT_EXIST.status, str(err)) # pylint: disable=no-member
[docs]
def run_auto_parallel(self) -> bool:
"""Wether we should run auto-parallelisation test"""
return 'auto_parallel' in self.inputs and self.inputs.auto_parallel.value is True
@property
def is_noncollinear(self) -> bool:
"""Check if the calculation is a noncollinear one"""
return self.ctx.inputs.parameters.get('lnoncollinear') or self.ctx.inputs.parameters.get('lsorbit')
[docs]
@override
def on_except(self, exc_info: Tuple[Any, Exception, TracebackType]) -> None:
"""Handle excepted state."""
try:
last_calc = self.ctx.calculations[-1] if self.ctx.calculations else None
if last_calc is not None:
self.report(f'Last calculation: {last_calc!r}') # pylint: disable=not-callable
sched_err = last_calc.outputs.retrieved.get_file_content('_scheduler-stderr.txt')
sched_out = last_calc.outputs.retrieved.get_file_content('_scheduler-stdout.txt')
self.report(f'Scheduler output:\n{sched_out or ""}') # pylint: disable=not-callable
self.report(f'Scheduler stderr:\n{sched_err or ""}') # pylint: disable=not-callable
except AttributeError:
self.report(
'No calculation was found in the context. ' # pylint: disable=not-callable
'Something really awful happened. '
'Please inspect messages and act.'
)
return super().on_except(exc_info)
[docs]
@override
def on_terminated(self) -> None:
"""
Clean the working directories of all child calculation jobs if `clean_workdir=True` in the inputs and
the calculation is finished without problem.
"""
# Directly called the WorkChain method as this method replaces that of the BaseRestartWorkChain
WorkChain.on_terminated(self)
if self.inputs.clean_workdir.value is False: # type: ignore[union-attr]
self.report('remote folders will not be cleaned')
return
if not self.ctx.is_finished: # type: ignore[union-attr]
self.report('remote folders will not be cleaned because the workchain finished with error.')
return
# Find the remote folder of the last calculation which should be kept from cleaning
out_remote_pk = None
if self.inputs.keep_last_workdir.value is True:
out_remote_pk = self.outputs['remote_folder'].pk
cleaned_calcs = []
for called_descendant in self.node.called_descendants:
if isinstance(called_descendant, CalcJobNode):
try:
remote_folder = called_descendant.outputs.remote_folder
if remote_folder.pk != out_remote_pk:
remote_folder._clean() # pylint: disable=protected-access
cleaned_calcs.append(str(called_descendant.pk))
except (IOError, OSError, KeyError):
pass
if cleaned_calcs:
self.report(f'cleaned remote folders of calculations: {" ".join(cleaned_calcs)}')
[docs]
def _get_run_status(self, node: CalcJobNode) -> None:
"""Return the run status of the calculation."""
return node.outputs.misc['run_status']
[docs]
@process_handler(priority=2000, enabled=False)
def handler_always_attach_outputs(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""
Handle the case where we attach the outputs even if underlying child calculation ends up
with some exit status.
"""
# Only active this error handler at the last iteration
if node.is_finished_ok or self.ctx.iteration < self.inputs.max_iterations.value:
return None
# Attach all outputs from the last workchain
self.report('At the last iteration - attaching outputs from the last workchain.')
self.report('WARNING: The attached outputs may contain incorrect results - proceed with caution.')
# Attach the required outputs defined in the spec
for name, port in self.spec().outputs.items():
try:
output = node.get_outgoing(link_label_filter=name).one().node
except ValueError:
if port.required:
self.report(f"required output '{name}' was not an output of {self.ctx.process_name}<{node.pk}>")
else:
self.out(name, output)
# Try to get some meaningful exit codes using the sanity check handler
# Always generate a handler report with do_break so no more handlers will be run and
# overwrite the error code.
report = self._calculation_sanity_checks(node)
if report:
self.report('Problems during checks of the outputs. The corresponding `exit_code` will be returned.')
return ProcessHandlerReport(exit_code=report.exit_code, do_break=True)
return ProcessHandlerReport(exit_code=self.exit_codes.ERROR_MAXIMUM_ITERATION_EXCEEDED, do_break=True)
[docs]
@process_handler(priority=1100, exit_codes=VaspCalculation.exit_codes.ERROR_VASP_DID_NOT_EXECUTE)
def handler_calculation_did_not_run(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""Handle the case where the calculation is not performed"""
if self.ctx.vasp_did_not_execute:
self.report(f'{node} did not execute, and this is the second time - aborting.')
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(
message='VASP executable did not run on the remote computer.'
),
)
self.report(f'{node} did not execute - try again')
self.ctx.vasp_did_not_execute = True
return ProcessHandlerReport(do_break=True)
[docs]
@process_handler(priority=1000)
def handler_misc_not_exist(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""
Handle the case where misc output is not available, in which case we cannot do anything for it.
"""
# Check if the run is converged electronically
if 'misc' not in node.outputs:
self.report('Cannot found `misc` outputs - please check the process reports for issues.')
return ProcessHandlerReport(exit_code=self.exit_codes.ERROR_MISSING_CRITICAL_OUTPUT, do_break=True) # pylint: disable=no-member
return None
[docs]
@process_handler(priority=910, exit_codes=[VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH])
def handler_unfinished_calc_ionic(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""
Handled the problem such that the calculation is not finished, e.g. did not reach the
end of execution.
If WAVECAR exists, just resubmit the calculation with the restart folder.
If it is a geometry optimization, attempt to restart with output structure + WAVECAR.
"""
# Check it is a geometry optimization
incar = self.ctx.inputs.parameters
if incar.get('nsw', -1) > 0:
if 'structure' not in node.outputs:
self.report('Performing a geometry optimization but the output structure is not found.')
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(
message='No output structure for restart.'
),
) # pylint: disable=no-member
self.report('Continuing geometry optimization using the last geometry.')
self.ctx.inputs.structure = node.outputs.structure
self._setup_restart(node)
self.update_magmom(node)
return ProcessHandlerReport(do_break=True)
return None
[docs]
@process_handler(
priority=799,
enabled=False,
exit_codes=[VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH],
)
def handler_unfinished_calc_ionic_alt(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""
Handled the problem such that the calculation is not finished, e.g. did not reach the
end of execution.
If WAVECAR exists, just resubmit the calculation with the restart folder.
If it is a geometry optimization, attempt to restart with output structure + WAVECAR.
"""
# Check it is a geometry optimization
incar = self.ctx.inputs.parameters
if incar.get('nsw', -1) > 0:
if 'structure' not in node.outputs:
self.report('Performing a geometry optimization but the output structure is not found.')
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(
message='No output structure for restart.'
),
) # pylint: disable=no-member
self.report('Continuing geometry optimization using the last geometry.')
self.ctx.inputs.structure = node.outputs.structure
self._setup_restart(node)
self.update_magmom(node)
return ProcessHandlerReport(do_break=True)
return None
[docs]
@process_handler(priority=798, enabled=False)
def handler_unfinished_calc_generic_alt(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""
A generic handler for unfinished calculations, we attempt to restart it once.
"""
# Only act on this specific return code, otherwise we reset the flag
if node.exit_status != VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH.status:
self.ctx.last_calc_was_unfinished = False
return None
if self.ctx.last_calc_was_unfinished:
msg = (
'The last calculation was not completed for the second time, potentially due to insufficient '
'walltime/node failure. Please revise the resources request and/or input parameters.'
)
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=msg),
) # pylint: disable=no-member
self.report(
(
'The last calculation was not finished - restart using the same set of inputs. '
'If it was due to transient problem this may fix it, fingers crossed.'
)
)
self.ctx.last_calc_was_unfinished = True
return ProcessHandlerReport(do_break=True)
[docs]
@process_handler(priority=900)
def handler_unfinished_calc_generic(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""
A generic handler for unfinished calculations, we attempt to restart it once.
"""
# Only act on this specific return code, otherwise we reset the flag
if node.exit_status != VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH.status:
self.ctx.last_calc_was_unfinished = False
return None
if self.ctx.last_calc_was_unfinished:
msg = (
'The last calculation was not completed for the second time, potentially due to insufficient'
'walltime/node failure. Please revise the resources request and/or input parameters.'
)
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=msg),
) # pylint: disable=no-member
self.report(
(
'The last calculation was not finished - restart using the same set of inputs. '
'If it was due to transient problem this may fix it, fingers crossed.'
)
)
self.ctx.last_calc_was_unfinished = True
return ProcessHandlerReport(do_break=True)
[docs]
@process_handler(
priority=800,
enabled=False,
exit_codes=[
VaspCalculation.exit_codes.ERROR_ELECTRONIC_NOT_CONVERGED,
VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED,
VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH,
VaspCalculation.exit_codes.ERROR_VASP_CRITICAL_ERROR,
VaspCalculation.exit_codes.ERROR_OVERFLOW_IN_XML,
],
)
def handler_electronic_conv_alt(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: # pylint: disable=too-many-return-statements,too-many-branches
"""Handle electronic convergence problem"""
incar = node.inputs.parameters.get_dict()
run_status = self._get_run_status(node)
notifications = node.outputs.misc['notifications']
nelm = run_status['nelm']
algo = incar.get('algo', 'normal')
# In case of ionic convergence problem, we also act if electronic convergence problem has been reported.
if node.exit_status in [
VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED.status,
VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH,
]:
perform_fix = False
if run_status['consistent_nelm_breach']:
self.report(
'The NELM limit has been breached in all ionic steps - proceed to take actions'
'for improving convergence.'
)
perform_fix = True
elif run_status['contains_nelm_breach']:
# Then there are some breaches in the ionic cycles
if self.ctx.ignore_transient_nelm_breach:
self.report(
'WARNING: NELM limit breached in some ionic steps but requested to ignore this -'
'no action taken.'
)
perform_fix = False
else:
self.report(
'The NELM limit has been breached in some ionic steps - proceed to take actions for'
' improving convergence.'
)
perform_fix = True
if not perform_fix:
return None
if node.exit_status == VaspCalculation.exit_codes.ERROR_VASP_CRITICAL_ERROR.status:
# Make sure we only continue in this handler for a selected set of the critical errors.
if not any(item in node.exit_message for item in ['EDDRMM', 'EDDDAV', 'The topmost band is occupied']):
# We have some other critical error not to handle here
return None
# Check if we need to add more bands
for item in notifications:
if item['name'] == 'bandocc' and self.ctx.handler.nbands_increase_tries < 5:
try:
nbands = run_status['nbands']
# Increase nbands with 10%
nbands_new = math.ceil(nbands * 1.1)
self.report(f'Changing NBANDS from {nbands} to {nbands_new}')
self.ctx.handler.nbands_increase_tries += 1
incar['nbands'] = nbands_new
self.ctx.inputs.parameters.update(incar)
return ProcessHandlerReport(do_break=True)
except KeyError:
self.report(
'The topmost band is occupied but did not locate the nbands entry in run_status, '
'so no way to do corrections.'
)
return ProcessHandlerReport(
exit_code=self.exit_codes.ERROR_MISSING_CRITICAL_OUTPUT,
do_break=True,
) # pylint: disable=no-member
if nelm < 300:
# Standard NELM might be a bit low, so increase a bit
incar['nelm'] = 300
# Here we can just continue from previous run
self._setup_restart(node)
self.ctx.inputs.parameters.update(incar)
self.report(f'Changing NELM from {nelm} to 300.')
return ProcessHandlerReport(do_break=True)
# Let us start or continue to switch algorithms and reduce try list
try:
if self.ctx.handler.get('remaining_algos', None) is None:
self.ctx.handler.remaining_algos = self._algo_switching[algo.lower()]
new_algo = self.ctx.handler.remaining_algos.pop(0)
incar['algo'] = new_algo
self.ctx.inputs.parameters.update(incar)
self.report(f'Changing ALGO from {algo.lower()} to {new_algo}')
return ProcessHandlerReport(do_break=True)
except IndexError:
self.report('No more algorithms to try and we still have not reached electronic convergence.')
self.report('No additional fixes can be applied to improve the electronic convergence - aborting.')
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(
message='Cannot apply fix for reaching electronic convergence.'
),
)
[docs]
@process_handler(
priority=800,
exit_codes=[
VaspCalculation.exit_codes.ERROR_ELECTRONIC_NOT_CONVERGED,
VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED,
VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH,
],
)
def handler_electronic_conv(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""Handle electronic convergence problem"""
incar = node.inputs.parameters.get_dict()
run_status = self._get_run_status(node)
nelm = run_status['nelm']
algo = incar.get('algo', 'normal')
# In case of ionic convergence problem, we also act if electronic convergence problem has been reported.
if node.exit_status in [
VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED.status,
VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH,
]:
perform_fix = False
if run_status['consistent_nelm_breach']:
self.report(
'The NELM limit has been breached in all ionic steps - proceed to take'
' actions for improving convergence.'
)
perform_fix = True
elif run_status['contains_nelm_breach']:
# Then there are some breaches in the ionic cycles
if self.ctx.ignore_transient_nelm_breach:
self.report(
'WARNING: NELM limit breached in some ionic steps but requested to ignore this'
' - no action taken.'
)
perform_fix = False
else:
self.report(
'The NELM limit has been breached in some ionic steps - proceed to take actions'
' for improving convergence.'
)
perform_fix = True
if not perform_fix:
return None
if algo.lower() in ('fast', 'veryfast'):
incar['algo'] = 'normal'
self._setup_restart(node)
self.ctx.inputs.parameters.update(incar)
self.report(f'Setting ALGO=normal from ALGO={algo.lower()}')
return ProcessHandlerReport(do_break=True)
# The logic below only works for algo=normal
if algo.lower() == 'normal':
# First try - Increase NELM if we started from a low NELM
if nelm < 100:
incar['nelm'] = 150
self._setup_restart(node)
self.ctx.inputs.parameters.update(incar)
self.report('Setting NELM to 150')
return ProcessHandlerReport(do_break=True)
# Adjust AMIX value if NELM is already high
amix = incar.get('amix', 0.4)
amix_steps = [0.2, 0.1, 0.05]
for amix_target in amix_steps:
if amix > amix_target:
incar['amix'] = amix_target
# Increase NELM in the mean time - smaller amplitude requires more cycles but more stable.
incar['nelm'] = nelm + 20
self._setup_restart(node)
self.ctx.inputs.parameters.update(incar)
self.report(f'Reducing AMIX to {incar["amix"]}')
return ProcessHandlerReport(do_break=True)
# Change to ALGO if options have been exhausted
incar['algo'] = 'all'
self.ctx.inputs.parameters.update(incar)
self._setup_restart(node)
self.report('Switching to ALGO = ALL')
return ProcessHandlerReport(do_break=True)
self.report('No additional fixes can be applied to improve the electronic convergence - aborting.')
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(
message='Cannot apply fix for reaching electronic convergence.'
),
)
[docs]
@process_handler(
priority=510,
exit_codes=[VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED],
enabled=False,
)
def handler_ionic_conv_enhanced(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: # pylint: disable=too-many-return-statements, too-many-branches
"""
Enhanced handling of ionic relaxation problem beyond simple restarts.
This is only used when the calculation is having difficulties reaching the
convergence. This handler should be applied before the standard handler which
breaks the handling cycle.
"""
if 'structure' not in node.outputs:
self.report('Performing a geometry optimization but the output structure is not found.')
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(
message='No output structure for restarting ionic relaxation.'
),
) # pylint: disable=no-member
# The simplest solution - resubmit the calculation again
child_nodes = self.ctx.children
child_miscs = [node.outputs.misc for node in child_nodes]
self.update_magmom(node)
# Enhanced handler only takes place after 3 trials
if len(child_miscs) < 3:
return None
natom = len(self.ctx.inputs.structure.sites)
# Number of iterations
ionic_iterations = [misc['run_status']['last_iteration_index'][0] for misc in child_miscs]
# Output energies
energies = []
for misc in child_miscs:
energies.append(misc.get('total_energies', {}).get('energy_extrapolated'))
if all([eng is not None for eng in energies[-3:]]): # pylint: disable=use-a-generator
de_per_atom = np.diff(energies) / natom
else:
return None
# First check if dE is very small
if np.all(np.abs(de_per_atom) < 1e-5):
msg = (
'The total energy difference between the last two step is smaller than 1e-5 /atom'
'- please consider to revise the cutoff value of the ionic steps.'
)
self.report(msg)
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=msg),
)
# Check if there are very few step performed per launch. Because VASP does not carry over
# the internal parameters of the optimizer, this can make convergence slower.
if ionic_iterations[-1] < 5:
msg = (
'Less than 5 iterations performed in the last launch - '
'please consider submitting the jobs with revised resources request.'
)
self.report(msg)
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=msg),
)
# Warn about very unusually large number of steps and switch IBRION if needed.
# Total degrees of freedom
dof = 3 * natom
isif = self.ctx.inputs.parameters.get('isif', 2)
if isif == 3:
dof += 6
if sum(ionic_iterations) > dof + 10:
self.report(f'Unusually large number of iterations performed for the degrees of freedom: {dof}')
ibrion = self.ctx.inputs.parameters.get('ibrion')
# In this case alternate between different relaxation algorithms
if ibrion == 2:
self.ctx.inputs.parameters['ibrion'] = 1
self.ctx.inputs.parameters['potim'] = 0.3
self.ctx.inputs.structure = node.outputs.structure
self.report('Switching to IBRION=1 from IBRION=2 with POTIM = 0.3')
return ProcessHandlerReport(do_break=True)
if ibrion == 1:
self.ctx.inputs.parameters['ibrion'] = 2
self.ctx.inputs.parameters['potim'] = 0.1
self.ctx.inputs.structure = node.outputs.structure
self.report('Switching to IBRION=2 from IBRION=1 with POTIM = 0.1')
return ProcessHandlerReport(do_break=True)
# Check if energies are increasing without significant volume change
vol_changes = []
for child in child_nodes:
inp_vol = child.inputs.structure.get_cell_volume()
out_vol = child.outputs.structure.get_cell_volume()
vol_changes.append(out_vol / inp_vol - 1.0)
vol_changes = np.array(vol_changes)
vol_tol = 0.03
if np.all(de_per_atom > 0.0) and np.all(abs(vol_changes[-2:]) < vol_tol):
msg = 'Energy increasing for the last two iterations - something can be very wrong...'
self.report(msg)
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=msg),
)
self.report('No fixes can be applied for ionic convergence.')
return None
[docs]
@process_handler(priority=505, exit_codes=[VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED])
def handler_ionic_conv(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""Handle ionic convergence problem"""
if 'structure' not in node.outputs:
self.report('Performing a geometry optimization but the output structure is not found.')
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(
message='No output structure for restarting ionic relaxation.'
),
) # pylint: disable=no-member
# The simplest solution - resubmit the calculation again
self.report('Continuing geometry optimization using the last geometry.')
self.ctx.inputs.structure = node.outputs.structure
self._setup_restart(node)
self.update_magmom(node)
return ProcessHandlerReport(do_break=True)
[docs]
@process_handler(priority=400, exit_codes=[VaspCalculation.exit_codes.ERROR_VASP_CRITICAL_ERROR])
def handler_vasp_critical_error(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""
Check if the calculation contain any critical error.
"""
notification = node.outputs.misc['notifications']
message = (
f'Critical error detected in the notifications: {", ".join([item.get("name") for item in notification])}'
)
self.report(message + ' - aborting.')
return ProcessHandlerReport(
do_break=True,
exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=message),
)
[docs]
@process_handler(priority=5)
def check_misc_output(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""
Check if misc output exists.
"""
misc = node.outputs.misc.get_dict()
if 'run_status' not in misc:
self.report('`run_status` is not found in misc - cannot verify the integrity of the child calculation.')
return ProcessHandlerReport(exit_code=self.exit_codes.ERROR_MISSING_CRITICAL_OUTPUT, do_break=True)
return None
[docs]
@process_handler(priority=4)
def check_calc_is_finished(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""
Check if the calculation has reached the end of execution.
"""
run_status = self._get_run_status(node)
if not run_status.get('finished'):
self.report(f'The child calculation {node} did not reach the end of execution.')
return ProcessHandlerReport(exit_code=self.exit_codes.ERROR_CALCULATION_NOT_FINISHED, do_break=True)
return None
[docs]
@process_handler(priority=3)
def check_electronic_converged(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""
Check if the calculation has converged electronic structure.
"""
run_status = self._get_run_status(node)
# Check that the electronic structure is converged
if not run_status.get('electronic_converged'):
self.report(f'The child calculation {node} does not possess a converged electronic structure.')
return ProcessHandlerReport(
exit_code=self.exit_codes.ERROR_ELECTRONIC_STRUCTURE_NOT_CONVERGED,
do_break=True,
) # pylint: disable=no-member
if run_status.get('contains_nelm_breach'):
if self.ctx.ignore_transient_nelm_breach:
self.report(
'The calculation contains at least one electronic minimization '
'that was truncated. It should thus not be considered converged. '
'Upon request from user, this is ignored.'
)
else:
self.report(
'The calculation contains at least one electronic minimization '
'that is truncated. It should thus not be considered converged. '
'Treating the calculation as failed. Please inspect, maybe it is salvageable.'
)
return ProcessHandlerReport(
exit_code=self.exit_codes.ERROR_UNCONVERGED_ELECTRONIC_STRUCTURE_IN_RELAX,
do_break=True,
) # pylint: disable=no-member
return None
[docs]
@process_handler(priority=2)
def check_ionic_converged(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]:
"""
Check if the calculation has converged ionic structure.
"""
# Check if we have requested to ignore ionic convergence check at calculation level
# If so, then this handler should be by-passed
if 'settings' in node.inputs:
settings = node.inputs.settings.get_dict()
if not settings.get('CHECK_IONIC_CONVERGENCE', True):
return None
run_status = self._get_run_status(node)
# Check that the ionic structure is converged
if run_status.get('ionic_converged') is False:
self.report(f'The child calculation {node} did not have converged ionic structure.')
return ProcessHandlerReport(
exit_code=self.exit_codes.ERROR_IONIC_RELAXATION_NOT_CONVERGED,
do_break=True,
) # pylint: disable=no-member
return None
# In this workchain we default to ignore the NELM breaches in the middle of the calculation
[docs]
@process_handler(priority=850, enabled=True)
def ignore_nelm_breach_relax(self, node: CalcJobNode) -> None:
"""
Not a actual handler but works as a switch to bypass checks for NELM breaches
in the middle of an ionic relaxation.
"""
_ = node
self.ctx.ignore_transient_nelm_breach = True
[docs]
def _calculation_sanity_checks(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: # pylint: disable=unused-argument
"""
Perform additional sanity checks on successfully completed calculation.
This method acts invokes the 'check' handlers to check the calculations and abort the workchain if any
problem is found. This is useful when all of the corresponding error handlers are disabled, and allow
one to avoid the default behaviour of restarting the calculation one more times regardlessly with
unhandled errors.
"""
checks = [
self._check_misc_output,
self._check_calc_is_finished,
self._check_electronic_converged,
self._check_ionic_converged,
]
# Go though the checks one after another, return report if necessary
last_report = None
for check in checks:
report = check(node)
if report:
if report.do_break:
return report
last_report = report
return last_report
[docs]
def _update_last_calc_objects(self, node: CalcJobNode) -> List[str]:
"""
Connect to the remote and find the valid objects in th calculation folder
Only update if the entry is empty in order to avoid too many connections to the remote.
"""
if not self.ctx.last_calc_remote_objects:
self.ctx.last_calc_remote_objects = list_valid_objects_in_remote(node.outputs.remote_folder)
return self.ctx.last_calc_remote_objects
[docs]
def _setup_restart(self, node: CalcJobNode) -> bool:
"""
Check the existence of any restart objects, if any of them eixsts use the last calculation
for restart.
"""
self._update_last_calc_objects(node)
if 'WAVECAR' in self.ctx.last_calc_remote_objects or 'CHGCAR' in self.ctx.last_calc_remote_objects:
self.ctx.restart_calc = node
return True
return False
[docs]
def list_valid_objects_in_remote(remote: orm.RemoteData, path: str = '.', size_threshold: int = 0) -> List[str]:
"""
List non-empty objects in the remote folder
:param remote: The `RemoteFolder` node to be inspected.
:param path: The relative path.
:param size_threshold: The size threshold to treat the object as a valide one.
:returns: A list of valid objects in the directory.
"""
none_empty = []
try:
contents = remote.listdir_withattributes(path)
except OSError:
return []
for obj in contents:
if obj['attributes'].st_size > size_threshold and not obj['isdir']:
none_empty.append(obj['name'])
return none_empty
[docs]
def potential_family_validator(family: orm.Str, _) -> None:
"""
Validate the potential family input.
:param faimly: The potential family to be validated.
:raises ValueError: If the potential family is not valid.
"""
if not family.value:
raise InputValidationError('The potential family cannot be empty.')
group = PotcarData.get_potcar_group(family.value)
if group is None:
raise InputValidationError(
f'The potential family "{family.value}" is not found. '
'Please use aiida-vasp potcar listfamilies tool to verify your settings.'
)
[docs]
def validate_calc_job_custom(inputs: Any, ctx) -> Optional[str]:
"""Validate the entire set of inputs passed to the `CalcJob` constructor.
Reasons that will cause this validation to raise an `InputValidationError`:
* No `Computer` has been specified, neither directly in `metadata.computer` nor indirectly through the `Code` input
* The specified computer is not stored
* The `Computer` specified in `metadata.computer` is not the same as that of the specified `Code`
* No `Code` has been specified and no `remote_folder` input has been specified, i.e. this is no import run
:return: string with error message in case the inputs are invalid
"""
try:
ctx.get_port('code')
ctx.get_port('metadata.computer')
except ValueError:
# If the namespace no longer contains the `code` or `metadata.computer` ports we skip validation
return None
remote_folder = inputs.get('remote_folder', None)
if remote_folder is not None:
# The `remote_folder` input has been specified and so this concerns an import run, which means that neither
# a `Code` nor a `Computer` are required. However, they are allowed to be specified but will not be explicitly
# checked for consistency.
return None
code = inputs.get('code', None)
computer_from_code = code.computer
computer_from_metadata = inputs.get('metadata', {}).get('computer', None)
if not computer_from_code and not computer_from_metadata:
return 'no computer has been specified in `metadata.computer` nor via `code`.'
if computer_from_code and not computer_from_code.is_stored:
return f'the Computer<{computer_from_code}> is not stored'
if computer_from_metadata and not computer_from_metadata.is_stored:
return f'the Computer<{computer_from_metadata}> is not stored'
if computer_from_code and computer_from_metadata and computer_from_code.uuid != computer_from_metadata.uuid:
return (
'Computer<{}> explicitly defined in `metadata.computer` is different from Computer<{}> which is the '
'computer of Code<{}> defined as the `code` input.'.format(computer_from_metadata, computer_from_code, code)
)
try:
resources_port = ctx.get_port('metadata.options.resources')
except ValueError:
return None
# If the resources port exists but is not required, we don't need to validate it against the computer's scheduler
if not resources_port.required:
return None
computer = computer_from_code or computer_from_metadata
scheduler = computer.get_scheduler()
old_workchain_interface = False
try:
resources = inputs['metadata']['options']['resources']
except KeyError:
old_workchain_interface = True
warnings.warn(
'input `metadata.options.resources` is not specified - you are probably using the old options port.'
'Please define options directly in `calc.metadata.options` instead of `options`.'
)
if not old_workchain_interface:
scheduler.preprocess_resources(resources, computer.get_default_mpiprocs_per_machine())
try:
scheduler.validate_resources(**resources)
except ValueError as exception:
return f'input `metadata.options.resources` is not valid for the `{scheduler}` scheduler: {exception}'
else:
resources = inputs['options'].get('resources')
if resources is None:
return (
'`resources` is not specified under `options` nor in `calc.metadata.options.resources` '
'please define it in `calc.metadata.options`'
)
return None