Source code for aiida_vasp.workchains.v2.vasp

"""
This file contains the VaspWorkChain class definition which uses the BaseRestartWorkChain.

Below is a copy of the error handler logic from aiida-core:

If the process is excepted or killed, the work chain will abort. Otherwise any attached handlers will be called
in order of their specified priority. If the process was failed and no handler returns a report indicating that
the error was handled, it is considered an unhandled process failure and the process is relaunched. If this
happens twice in a row, the work chain is aborted. In the case that at least one handler returned a report the
following matrix determines the logic that is followed::

    Process  Handler    Handler     Action
    result   report?    exit code

    -----------------------------------------

    Success      yes        == 0     Restart
    Success      yes        != 0     Abort
    Failed       yes        == 0     Restart
    Failed       yes        != 0     Abort

If no handler returned a report and the process finished successfully, the work chain's work is considered done
and it will move on to the next step that directly follows the `while` conditional, if there is one defined in
the outline.

This means that for a handler:

- No error found - just return None
- No action taken

  - the error is not recoverable - return with a non-zero error code with do break
  - the error is not recoverable, but other handler may/maynot save it - return with a non-zero code without do
    break
  - the error is not recoverable, and the workchain should be aborted immediately - non-zero code + do break

- Action taken

  - the error is fixed in full - return with a zero error code with `do_break=True`
  - the error is not fixed in full - return with a report with `do_break=False` but has a `exit_code`.
    this mean other handlers (with lower priority) must handle it and return a zero error_code.

"""

from __future__ import annotations

import math
import warnings
from types import TracebackType
from typing import Any, List, Optional, Tuple

import numpy as np
from aiida import orm
from aiida.common.exceptions import InputValidationError, NotExistent

# from aiida.engine.job_processes import override
from aiida.common.extendeddicts import AttributeDict
from aiida.common.lang import override
from aiida.engine import ExitCode, ProcessSpec, if_, while_
from aiida.engine.processes.workchains.restart import (
    BaseRestartWorkChain,
    ProcessHandlerReport,
    WorkChain,
    process_handler,
)
from aiida.orm import CalcJobNode, Dict, KpointsData
from aiida.orm.nodes.data.base import to_aiida_type
from aiida.plugins import CalculationFactory

from aiida_vasp.assistant.parameters import (
    ParametersMassage,
)
from aiida_vasp.calcs.vasp import VaspCalculation
from aiida_vasp.common import parameters_validator, warn_deprecated_options
from aiida_vasp.common.dryrun import get_jobscheme
from aiida_vasp.data.potcar import PotcarData
from aiida_vasp.protocols import ProtocolMixin, recursive_merge
from aiida_vasp.utils.ldau import get_ldau_keys
from aiida_vasp.utils.workchains import compose_exit_code, prepare_process_inputs, site_magnetization_to_magmom

from .mixins import WithBuilderUpdater

# pylint: disable=no-member


[docs] class VaspWorkChain(BaseRestartWorkChain, WithBuilderUpdater, ProtocolMixin): """ The VASP workchain. Error handling enriched wrapper around VaspCalculation. Deliberately conserves most of the interface (required inputs) of the VaspCalculation class, but makes it possible for a user to interact with a workchain and not a calculation. This is intended to be used instead of directly submitting a VaspCalculation, so that future features like automatic restarting, error checking etc. can be propagated to higher level workchains automatically by implementing them here. Handlers are implemented to try fix common problems and improves the robustness. Individual handlers can be enabled/disabled by setting the ``handler_overrides`` input port. Additional settings may be passed under the "settings" input, which is also forwarded to the calculations. The available options are: - ``USE_WAVECAR_FOR_RESTART`` wether calculation restarts should use the WAVECAR. The default is ``True``. Usage:: from aiida.common.extendeddicts import AttributeDict from aiida.work import submit basevasp = WorkflowFactory('vasp.vasp') inputs = basevasp.get_builder() inputs = AttributeDict() ## ... set inputs submit(basevasp, **inputs) To see a working example, including generation of input nodes from scratch, please refer to ``examples/run_vasp_lean.py``. """ _verbose = False _process_class = CalculationFactory('vasp.vasp') _algo_switching = { 'normal': ['fast', 'veryfast', 'damped'], 'fast': ['normal', 'veryfast', 'damped'], 'veryfast': ['normal', 'fast', 'damped'], 'damped': ['normal', 'fast', 'veryfast'], } _default_unsupported_parameters = {} _protocol_tag = 'vasp' # Search for vasp.yml in protocol directories
[docs] @classmethod def define(cls, spec: ProcessSpec) -> None: # pylint: disable=too-many-statements super(VaspWorkChain, cls).define(spec) spec.expose_inputs(cls._process_class, exclude=('metadata',)) spec.expose_inputs( cls._process_class, namespace='calc', include=('metadata',), namespace_options={'populate_defaults': True} ) # Use a custom validator for backward compatibility # This needs to be removed in the next major release/formalized workchain interface spec.inputs.validator = validate_calc_job_custom spec.inputs['calc']['metadata']['options']['resources']._required = False spec.inputs['calc']._required = False spec.input('kpoints', valid_type=orm.KpointsData, required=False) spec.input( 'potential_family', valid_type=orm.Str, required=True, serializer=to_aiida_type, validator=potential_family_validator, ) spec.input( 'potential_mapping', valid_type=orm.Dict, required=True, serializer=to_aiida_type, ) spec.input( 'parameters', valid_type=orm.Dict, required=True, validator=parameters_validator, serializer=to_aiida_type, ) spec.input( 'options', valid_type=orm.Dict, required=False, serializer=to_aiida_type, validator=warn_deprecated_options, help='Deprecated - use `calc.metadata.options` instead.', ) spec.input( 'max_iterations', valid_type=orm.Int, required=False, default=lambda: orm.Int(5), serializer=to_aiida_type, help=""" The maximum number of iterations to perform. """, ) spec.input( 'clean_workdir', valid_type=orm.Bool, required=False, serializer=to_aiida_type, default=lambda: orm.Bool(True), help=""" If True, clean the work dir upon the completion of a successfull calculation. """, ) spec.input( 'keep_last_workdir', valid_type=orm.Bool, default=lambda: orm.Bool(False), serializer=to_aiida_type, help='If True, prevent the last workdir from being cleaned in case the files are needed for restarts.', ) spec.input( 'verbose', valid_type=orm.Bool, required=False, serializer=to_aiida_type, default=lambda: orm.Bool(False), help=""" If True, enable more detailed output during workchain execution. """, ) spec.input( 'ldau_mapping', valid_type=orm.Dict, required=False, serializer=to_aiida_type, help="""Settings for assign LDA+U related settings according to the input structure, A nested dictionary containing the following keys: mapping: a dictionary in the format of {"Mn": [d, 4]...} for U utype: the type of LDA+U, default to 2, which is the one with only one parameter jmapping: a dictionary in the format of {"Mn": [d, 4]...} but for J felec: Wether we are dealing with f electrons, will increase lmaxmix if we are.""", ) spec.input( 'magmom_mapping', valid_type=orm.Dict, required=False, serializer=to_aiida_type, help='Mapping for the initial magnetic moments.', ) spec.input( 'kpoints_spacing', valid_type=orm.Float, required=False, serializer=to_aiida_type, help='Spacing for the kpoints in units A^-1 * 2pi', ) spec.input( 'auto_parallel', valid_type=orm.Dict, serializer=to_aiida_type, required=False, help='Automatic parallelisation settings, keywords passed to `get_jobscheme` function.', ) spec.outline( cls.setup, cls.init_inputs, if_(cls.run_auto_parallel)(cls.prepare_inputs, cls.perform_autoparallel), while_(cls.should_run_process)( cls.prepare_inputs, cls.run_process, cls.inspect_process, ), cls.results, ) # yapf: disable spec.output('parallel_settings', valid_type=orm.Dict, required=False) spec.expose_outputs(cls._process_class) # Copied from the old plugin restart workchain spec.exit_code( 0, 'NO_ERROR', message='the sun is shining', ) spec.exit_code( 300, 'ERROR_MISSING_REQUIRED_OUTPUT', message='the calculation is missing at least one required output in the restart workchain', ) spec.exit_code( 400, 'ERROR_ITERATION_RETURNED_NO_CALCULATION', message='the run_calculation step did not successfully add a calculation node to the context', ) spec.exit_code( 401, 'ERROR_MAXIMUM_ITERATIONS_EXCEEDED', message='the maximum number of iterations was exceeded', ) spec.exit_code( 402, 'ERROR_UNEXPECTED_CALCULATION_STATE', message='the calculation finished with an unexpected calculation state', ) spec.exit_code( 403, 'ERROR_UNEXPECTED_CALCULATION_FAILURE', message='the calculation experienced and unexpected failure', ) spec.exit_code( 404, 'ERROR_SECOND_CONSECUTIVE_SUBMISSION_FAILURE', message='the calculation failed to submit, twice in a row', ) spec.exit_code( 405, 'ERROR_SECOND_CONSECUTIVE_UNHANDLED_FAILURE', message='the calculation failed for an unknown reason, twice in a row', ) spec.exit_code( 500, 'ERROR_MISSING_CRITICAL_OUTPUT', message='Missing critical output for inspecting the status of the calculation.', ) spec.exit_code( 501, 'ERROR_OTHER_INTERVENTION_NEEDED', message='Cannot handle the error - inputs are likely need to be revised manually. Message: {message}', ) spec.exit_code( 502, 'ERROR_CALCULATION_NOT_FINISHED', message='Cannot handle the error - the last calculation did not reach the end of execution.', ) spec.exit_code( 503, 'ERROR_ELECTRONIC_STRUCTURE_NOT_CONVERGED', message='Cannot handle the error - the last calculation did not reach electronic convergence.', ) spec.exit_code( 504, 'ERROR_IONIC_RELAXATION_NOT_CONVERGED', message='The ionic relaxation is not converged.', ) spec.exit_code( 505, 'ERROR_UNCONVERGED_ELECTRONIC_STRUCTURE_IN_RELAX', message='At least one of the ionic steps during the relaxation has did not have converged ' 'electronic structure.', ) spec.exit_code( 700, 'ERROR_NO_POTENTIAL_FAMILY_NAME', message='the user did not supply a potential family name', ) spec.exit_code( 701, 'ERROR_POTENTIAL_VALUE_ERROR', message='ValueError was returned from get_potcars_from_structure', ) spec.exit_code( 702, 'ERROR_POTENTIAL_DO_NOT_EXIST', message='the potential does not exist', ) spec.exit_code( 703, 'ERROR_IN_PARAMETER_MASSAGER', message='the exception: {exception} was thrown while massaging the parameters', )
[docs] def setup(self) -> None: super().setup() self.ctx.restart_calc = None self.ctx.vasp_did_not_execute = False self.ctx.last_calc_was_unfinished = False self.ctx.use_wavecar = True self.ctx.ignore_transient_nelm_breach = False # Flag for ignoring the NELM breach during the relaxation self.ctx.verbose = None self.ctx.last_calc_remote_objects = [] self.ctx.handler = AttributeDict() self.ctx.handler.nbands_increase_tries = 0
[docs] @classmethod def get_builder_from_protocol( cls, code: orm.AbstractCode, structure: orm.StructureData, protocol=None, overrides=None, options=None, **_, ): """Return a builder prepopulated with inputs selected according to the chosen protocol. :param code: the ``Code`` instance configured for the ``abacus.abacus`` plugin. :param structure: the ``StructureData`` instance to use. :param protocol: protocol to use, if not specified, the default will be used. :param overrides: optional dictionary of inputs to override the defaults of the protocol. :param options: A dictionary of options that will be recursively set for the ``metadata.options`` input of all the ``CalcJobs`` that are nested in this work chain. :return: a process builder instance with all inputs defined ready for launch. """ if isinstance(code, str): code = orm.load_code(code) has_pmg = True try: from aiida_vasp.protocols.pmg import PymatgenInputAdaptor # noqa: PLC0415 except ImportError: has_pmg = False if has_pmg and protocol in PymatgenInputAdaptor.KNOWN_SETS: adaptor = PymatgenInputAdaptor( protocol, incar_overrides=overrides.get('incar_overrides', {}), pmg_kwargs=overrides.get('pmg_kwargs', {}), ) inputs = adaptor.get_inputs(structure, is_workchain=True, overrides=overrides) else: inputs = cls.get_protocol_inputs(protocol, overrides) meta_parameters = inputs.pop('meta_parameters', {}) natoms = len(structure.sites) # Update the parameters based on the protocol inputs parameters = inputs['parameters'] # Update EDIFF if not overriden if 'ediff' not in parameters['incar']: parameters['incar']['ediff'] = natoms * float(meta_parameters['ediff_per_atom']) # Configure the options for the underlying VaspCalculation to be launched metadata = inputs.get('calc', {}).get('metadata', {}) if options: metadata['options'] = recursive_merge(metadata.get('options', {}), options) # Forward to the builders builder = cls.get_builder() builder.code = code # Use explicit potential if given if len(inputs.get('potentials', {})) > 0: builder.potentials = inputs['potentials'] else: builder.potential_family = inputs['potential_family'] builder.potential_mapping = {key: inputs['potential_mapping'][key] for key in structure.get_kind_names()} # Apply inputs builder.structure = structure builder.parameters = parameters builder.calc.metadata = metadata if 'settings' in inputs: builder.settings = inputs.get('settings', {}) if 'clean_workdir' in inputs: builder.clean_workdir = orm.Bool(inputs['clean_workdir']) # Configure the kpoints if 'kpoints' in inputs: if isinstance(inputs['kpoints'], orm.Data): builder.kpoints = inputs['kpoints'] # Has mesh been explicitly supplied? elif 'mesh' in inputs['kpoints']: kpoints = KpointsData() kpoints.set_cell_from_structure(structure) kpoints.set_kpoints_mesh(inputs['kpoints']['mesh'], inputs['kpoints'].get('offset', [0, 0, 0])) builder.kpoints = kpoints elif 'spacing' in inputs['kpoints']: builder.kpoints_spacing = orm.Float(inputs['kpoints']['spacing']) else: builder.kpoints_spacing = orm.Float(inputs['kpoints_spacing']) # Apply maximum iteration if 'max_iterations' in inputs: builder.max_iterations = inputs['max_iterations'] # Check if we have any valid ldau_u_mapping defined if inputs.get('ldau_mapping') is None: ldau_u_mapping = {} else: ldau_u_mapping = {key: inputs['ldau_mapping'].get('mapping').get(key) for key in structure.get_kind_names()} # Only forward the mapping if there are elements included in the mapping # Note that only the u-mapping is checked. The assumption is that if one wants to use it # it should be supplied if any(ldau_u_mapping.values()): builder.ldau_mapping = inputs['ldau_mapping'] magmom_mapping = inputs.get('magmom_mapping', {}) if dict(magmom_mapping): builder.magmom_mapping = magmom_mapping return builder
[docs] def verbose_report(self, *args, **kwargs) -> None: """Send report if self.ctx.verbose is True""" if self.ctx.verbose is True: self.report(*args, **kwargs)
[docs] def prepare_inputs(self) -> None: """ Enforce some settings for the restart folder and set parameters tags for a restart. This is called because launching the sub process. NOTE: This method should probably be refactored to give more control on what kind of restart is needed """ # Then check if the workchain wants a restart if self.ctx.restart_calc and isinstance(self.ctx.restart_calc.process_class, self._process_class): self.ctx.inputs.restart_folder = self.ctx.restart_calc.outputs.remote_folder old_parameters = AttributeDict(self.ctx.inputs.parameters).copy() parameters = old_parameters.copy() # Make sure ISTART and ICHARG is set to read the relevant objects - if they exists if 'istart' in parameters and 'WAVECAR' in self.ctx.last_calc_remote_objects: # Correct in case of istart = 0 if parameters.istart == 0 and self.ctx.use_wavecar: parameters.istart = 1 # Not using the WAVECAR - we make sure ISTART is 0 if not self.ctx.use_wavecar: parameters.istart = 0 if 'icharg' in parameters and 'CHGCAR' in self.ctx.last_calc_remote_objects: parameters.icharg = 1 if parameters != old_parameters: self.ctx.inputs.parameters = parameters self.report('Enforced ISTART=1 and ICHARG=1 for restarting the calculation.') # Reset the list of valid remote objects and the restart calculation self.ctx.last_calc_remote_objects = [] self.ctx.restart_calc = None
[docs] def update_magmom(self, node: Optional[CalcJobNode] = None) -> None: """ Update magmom from site magnetization information if available :param node: Calculation node to be used, defaults to the last launched calculation. """ if self.is_noncollinear: self.report('Automatic carrying on magmom for non-collinear magnetism calculation is not implemented.') return if node is None: node = self.ctx.children[-1] if 'site_magnetization' in node.outputs: try: self.ctx.inputs.parameters['magmom'] = site_magnetization_to_magmom( node.outputs.site_magnetization.get_dict() ) except ValueError: pass
[docs] def init_inputs(self) -> Optional[ExitCode]: """Make sure all the required inputs are there and valid, create input dictionary for calculation.""" #### START OF THE COPY FROM VASPWorkChain #### # - the only change is that the section about kpoints is deleted self.ctx.inputs = self.exposed_inputs(self._process_class, namespace='calc', agglomerate=True) # Interface store the parameters as a dict for easy update self.ctx.inputs.parameters = self.ctx.inputs.parameters.get_dict() # Set settings unsupported_parameters = self._default_unsupported_parameters.copy() skip_parameters_validation = False settings_dict = {} if self.inputs.get('settings'): self.ctx.inputs.settings = self.inputs.settings # Also check if the user supplied additional tags that is not in the supported file. settings_dict = self.ctx.inputs.settings.get_dict() unsupported_parameters = settings_dict.get('unsupported_parameters', unsupported_parameters) skip_parameters_validation = settings_dict.get('skip_parameters_validation', skip_parameters_validation) # Perform inputs massage to accommodate generalization in higher lying workchains # and set parameters. try: parameters_massager = ParametersMassage( self.ctx.inputs.parameters, unsupported_parameters, skip_parameters_validation=skip_parameters_validation, ) except Exception as exception: # pylint: disable=broad-except return self.exit_codes.ERROR_IN_PARAMETER_MASSAGER.format(exception=exception) # pylint: disable=no-member try: # Only set if they exists # Set any INCAR tags self.ctx.inputs.parameters = parameters_massager.parameters.incar # Set any dynamics input (currently only for selective dynamics, e.g. custom write to POSCAR) self.ctx.inputs.dynamics = parameters_massager.parameters.dynamics # Here we could set additional override flags, but those are not relevant for this VASP plugin except AttributeError: pass # For back-compatibility only - now options IS stored! # Options is very special, not storable and should be # wrapped in the metadata dictionary, which is also not storable # and should contain an entry for options if 'options' in self.inputs: options = {} options.update(self.inputs.options) self.ctx.inputs.metadata = {'options': options} # Override the parser name if it is supplied by the user. parser_name = self.ctx.inputs.metadata['options'].get('parser_name') if parser_name: self.ctx.inputs.metadata['options']['parser_name'] = parser_name # Set MPI to True, unless the user specifies otherwise withmpi = self.ctx.inputs.metadata['options'].get('withmpi', True) self.ctx.inputs.metadata['options']['withmpi'] = withmpi # Make sure we also bring along any label and description set on the WorkChain to the CalcJob, it if does # not exists, set to empty string. if 'metadata' in self.inputs: label = self.inputs.metadata.get('label', '') description = self.inputs.metadata.get('description', '') if 'metadata' not in self.ctx.inputs: self.ctx.inputs.metadata = {} self.ctx.inputs.metadata['label'] = label self.ctx.inputs.metadata['description'] = description # Carry on site magnetization for initialization if 'site_magnetization' in self.inputs: magmom = site_magnetization_to_magmom(self.inputs.site_magnetization.get_dict()) assert len(magmom) == len(self.inputs.structure.sites) self.ctx.inputs.parameters['magmom'] = magmom exit_code = self.setup_potcar() if exit_code is not None: return exit_code # Store verbose parameter in ctx - otherwise it will not work after deserialization try: self.ctx.verbose = self.inputs.verbose.value except AttributeError: self.ctx.verbose = self._verbose # Set the kpoints (kpoints) if 'kpoints' in self.inputs: self.ctx.inputs.kpoints = self.inputs.kpoints elif 'kpoints_spacing' in self.inputs: kpoints = KpointsData() kpoints.set_cell_from_structure(self.ctx.inputs.structure) kpoints.set_kpoints_mesh_from_density(self.inputs.kpoints_spacing.value * np.pi * 2) self.ctx.inputs.kpoints = kpoints else: raise InputValidationError("Must supply either 'kpoints' or 'kpoints_spacing'") # Setup LDAU keys if 'ldau_mapping' in self.inputs: ldau_settings = self.inputs.ldau_mapping.get_dict() ldau_keys = get_ldau_keys(self.ctx.inputs.structure, **ldau_settings) # Directly update the raw inputs passed to VaspCalculation self.ctx.inputs.parameters.update(ldau_keys) # Apply the magmom mapping if supplied if 'magmom_mapping' in self.inputs: mapping = self.inputs.magmom_mapping.get_dict() default = mapping.pop('default', 1.0) kind_names = set(self.inputs.structure.get_kind_names()) # Take only the relevant keys mapping = {key: mapping[key] for key in mapping if key in kind_names} # Only proceed if mapping is not empty or default is not 1.0 (VASP internal default) if mapping or (default != 1.0): magmom = [] for site in self.inputs.structure.sites: magmom.append(mapping.get(site.kind_name, default)) # If ispin is not set (possibly by mistake), we change it to 2 if 'ispin' not in self.ctx.inputs.parameters: self.ctx.inputs.parameters['ispin'] = 2 # Apply the mapping of the magmoms self.ctx.inputs.parameters['magmom'] = ' '.join(map(str, magmom)) # Attach default monitors if not provided by the user if not self.inputs.get('monitors') and not settings_dict.get('no_default_monitors', False): self.ctx.inputs.monitors = { 'stdout': Dict(dict={'entry_point': 'vasp.stdout'}), 'loop_time': Dict(dict={'entry_point': 'vasp.loop_time', 'minimum_poll_interval': 600}), } return None
[docs] def setup_potcar(self) -> None: # Verify and set potentials (potcar) if not self.inputs.potential_family.value: self.report('An empty string for the potential family name was detected.') # pylint: disable=not-callable return self.exit_codes.ERROR_NO_POTENTIAL_FAMILY_NAME # pylint: disable=no-member try: self.ctx.inputs.potential = PotcarData.get_potcars_from_structure( structure=self.inputs.structure, family_name=self.inputs.potential_family.value, mapping=self.inputs.potential_mapping.get_dict(), ) except ValueError as err: return compose_exit_code(self.exit_codes.ERROR_POTENTIAL_VALUE_ERROR.status, str(err)) # pylint: disable=no-member except NotExistent as err: return compose_exit_code(self.exit_codes.ERROR_POTENTIAL_DO_NOT_EXIST.status, str(err)) # pylint: disable=no-member
[docs] def run_auto_parallel(self) -> bool: """Wether we should run auto-parallelisation test""" return 'auto_parallel' in self.inputs and self.inputs.auto_parallel.value is True
[docs] def perform_autoparallel(self) -> None: """Dry run and obtain the best parallelisation settings""" self.report('Performing local dryrun for auto-parallelisation') # pylint: disable=not-callable ind = prepare_process_inputs(self.ctx.inputs) nprocs = self.ctx.inputs.metadata['options']['resources']['tot_num_mpiprocs'] # Take the settings pass it to the function kwargs = self.inputs.auto_parallel.get_dict() if 'cpus_per_node' not in kwargs: kwargs['cpus_per_node'] = self.inputs.code.computer.get_default_mpiprocs_per_machine() # If the dryrun errored, proceed the workchain try: scheme = get_jobscheme(ind, nprocs, **kwargs) except Exception as error: self.report(f'Dry-run errorred, process with cautions, message: {error.args}') # pylint: disable=not-callable return if (scheme.ncore is None) or (scheme.kpar is None): self.report(f'Error NCORE: {scheme.ncore}, KPAR: {scheme.kpar}') # pylint: disable=not-callable return parallel_opts = {'ncore': scheme.ncore, 'kpar': scheme.kpar} self.report(f'Found optimum KPAR={scheme.kpar}, NCORE={scheme.ncore}') # pylint: disable=not-callable self.ctx.inputs.parameters.update(parallel_opts) self.out( 'parallel_settings', Dict(dict={'ncore': scheme.ncore, 'kpar': scheme.kpar}).store(), )
@property def is_noncollinear(self) -> bool: """Check if the calculation is a noncollinear one""" return self.ctx.inputs.parameters.get('lnoncollinear') or self.ctx.inputs.parameters.get('lsorbit')
[docs] @override def on_except(self, exc_info: Tuple[Any, Exception, TracebackType]) -> None: """Handle excepted state.""" try: last_calc = self.ctx.calculations[-1] if self.ctx.calculations else None if last_calc is not None: self.report(f'Last calculation: {last_calc!r}') # pylint: disable=not-callable sched_err = last_calc.outputs.retrieved.get_file_content('_scheduler-stderr.txt') sched_out = last_calc.outputs.retrieved.get_file_content('_scheduler-stdout.txt') self.report(f'Scheduler output:\n{sched_out or ""}') # pylint: disable=not-callable self.report(f'Scheduler stderr:\n{sched_err or ""}') # pylint: disable=not-callable except AttributeError: self.report( 'No calculation was found in the context. ' # pylint: disable=not-callable 'Something really awful happened. ' 'Please inspect messages and act.' ) return super().on_except(exc_info)
[docs] @override def on_terminated(self) -> None: """ Clean the working directories of all child calculation jobs if `clean_workdir=True` in the inputs and the calculation is finished without problem. """ # Directly called the WorkChain method as this method replaces that of the BaseRestartWorkChain WorkChain.on_terminated(self) if self.inputs.clean_workdir.value is False: # type: ignore[union-attr] self.report('remote folders will not be cleaned') return if not self.ctx.is_finished: # type: ignore[union-attr] self.report('remote folders will not be cleaned because the workchain finished with error.') return # Find the remote folder of the last calculation which should be kept from cleaning out_remote_pk = None if self.inputs.keep_last_workdir.value is True: out_remote_pk = self.outputs['remote_folder'].pk cleaned_calcs = [] for called_descendant in self.node.called_descendants: if isinstance(called_descendant, CalcJobNode): try: remote_folder = called_descendant.outputs.remote_folder if remote_folder.pk != out_remote_pk: remote_folder._clean() # pylint: disable=protected-access cleaned_calcs.append(str(called_descendant.pk)) except (IOError, OSError, KeyError): pass if cleaned_calcs: self.report(f'cleaned remote folders of calculations: {" ".join(cleaned_calcs)}')
[docs] def _get_run_status(self, node: CalcJobNode) -> None: """Return the run status of the calculation.""" return node.outputs.misc['run_status']
[docs] @process_handler(priority=2000, enabled=False) def handler_always_attach_outputs(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """ Handle the case where we attach the outputs even if underlying child calculation ends up with some exit status. """ # Only active this error handler at the last iteration if node.is_finished_ok or self.ctx.iteration < self.inputs.max_iterations.value: return None # Attach all outputs from the last workchain self.report('At the last iteration - attaching outputs from the last workchain.') self.report('WARNING: The attached outputs may contain incorrect results - proceed with caution.') # Attach the required outputs defined in the spec for name, port in self.spec().outputs.items(): try: output = node.get_outgoing(link_label_filter=name).one().node except ValueError: if port.required: self.report(f"required output '{name}' was not an output of {self.ctx.process_name}<{node.pk}>") else: self.out(name, output) # Try to get some meaningful exit codes using the sanity check handler # Always generate a handler report with do_break so no more handlers will be run and # overwrite the error code. report = self._calculation_sanity_checks(node) if report: self.report('Problems during checks of the outputs. The corresponding `exit_code` will be returned.') return ProcessHandlerReport(exit_code=report.exit_code, do_break=True) return ProcessHandlerReport(exit_code=self.exit_codes.ERROR_MAXIMUM_ITERATION_EXCEEDED, do_break=True)
[docs] @process_handler(priority=1100, exit_codes=VaspCalculation.exit_codes.ERROR_VASP_DID_NOT_EXECUTE) def handler_calculation_did_not_run(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """Handle the case where the calculation is not performed""" if self.ctx.vasp_did_not_execute: self.report(f'{node} did not execute, and this is the second time - aborting.') return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format( message='VASP executable did not run on the remote computer.' ), ) self.report(f'{node} did not execute - try again') self.ctx.vasp_did_not_execute = True return ProcessHandlerReport(do_break=True)
[docs] @process_handler(priority=1000) def handler_misc_not_exist(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """ Handle the case where misc output is not available, in which case we cannot do anything for it. """ # Check if the run is converged electronically if 'misc' not in node.outputs: self.report('Cannot found `misc` outputs - please check the process reports for issues.') return ProcessHandlerReport(exit_code=self.exit_codes.ERROR_MISSING_CRITICAL_OUTPUT, do_break=True) # pylint: disable=no-member return None
[docs] @process_handler(priority=910, exit_codes=[VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH]) def handler_unfinished_calc_ionic(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """ Handled the problem such that the calculation is not finished, e.g. did not reach the end of execution. If WAVECAR exists, just resubmit the calculation with the restart folder. If it is a geometry optimization, attempt to restart with output structure + WAVECAR. """ # Check it is a geometry optimization incar = self.ctx.inputs.parameters if incar.get('nsw', -1) > 0: if 'structure' not in node.outputs: self.report('Performing a geometry optimization but the output structure is not found.') return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format( message='No output structure for restart.' ), ) # pylint: disable=no-member self.report('Continuing geometry optimization using the last geometry.') self.ctx.inputs.structure = node.outputs.structure self._setup_restart(node) self.update_magmom(node) return ProcessHandlerReport(do_break=True) return None
[docs] @process_handler( priority=799, enabled=False, exit_codes=[VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH], ) def handler_unfinished_calc_ionic_alt(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """ Handled the problem such that the calculation is not finished, e.g. did not reach the end of execution. If WAVECAR exists, just resubmit the calculation with the restart folder. If it is a geometry optimization, attempt to restart with output structure + WAVECAR. """ # Check it is a geometry optimization incar = self.ctx.inputs.parameters if incar.get('nsw', -1) > 0: if 'structure' not in node.outputs: self.report('Performing a geometry optimization but the output structure is not found.') return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format( message='No output structure for restart.' ), ) # pylint: disable=no-member self.report('Continuing geometry optimization using the last geometry.') self.ctx.inputs.structure = node.outputs.structure self._setup_restart(node) self.update_magmom(node) return ProcessHandlerReport(do_break=True) return None
[docs] @process_handler(priority=798, enabled=False) def handler_unfinished_calc_generic_alt(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """ A generic handler for unfinished calculations, we attempt to restart it once. """ # Only act on this specific return code, otherwise we reset the flag if node.exit_status != VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH.status: self.ctx.last_calc_was_unfinished = False return None if self.ctx.last_calc_was_unfinished: msg = ( 'The last calculation was not completed for the second time, potentially due to insufficient ' 'walltime/node failure. Please revise the resources request and/or input parameters.' ) return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=msg), ) # pylint: disable=no-member self.report( ( 'The last calculation was not finished - restart using the same set of inputs. ' 'If it was due to transient problem this may fix it, fingers crossed.' ) ) self.ctx.last_calc_was_unfinished = True return ProcessHandlerReport(do_break=True)
[docs] @process_handler(priority=900) def handler_unfinished_calc_generic(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """ A generic handler for unfinished calculations, we attempt to restart it once. """ # Only act on this specific return code, otherwise we reset the flag if node.exit_status != VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH.status: self.ctx.last_calc_was_unfinished = False return None if self.ctx.last_calc_was_unfinished: msg = ( 'The last calculation was not completed for the second time, potentially due to insufficient' 'walltime/node failure. Please revise the resources request and/or input parameters.' ) return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=msg), ) # pylint: disable=no-member self.report( ( 'The last calculation was not finished - restart using the same set of inputs. ' 'If it was due to transient problem this may fix it, fingers crossed.' ) ) self.ctx.last_calc_was_unfinished = True return ProcessHandlerReport(do_break=True)
[docs] @process_handler( priority=800, enabled=False, exit_codes=[ VaspCalculation.exit_codes.ERROR_ELECTRONIC_NOT_CONVERGED, VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED, VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH, VaspCalculation.exit_codes.ERROR_VASP_CRITICAL_ERROR, VaspCalculation.exit_codes.ERROR_OVERFLOW_IN_XML, ], ) def handler_electronic_conv_alt(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: # pylint: disable=too-many-return-statements,too-many-branches """Handle electronic convergence problem""" incar = node.inputs.parameters.get_dict() run_status = self._get_run_status(node) notifications = node.outputs.misc['notifications'] nelm = run_status['nelm'] algo = incar.get('algo', 'normal') # In case of ionic convergence problem, we also act if electronic convergence problem has been reported. if node.exit_status in [ VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED.status, VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH, ]: perform_fix = False if run_status['consistent_nelm_breach']: self.report( 'The NELM limit has been breached in all ionic steps - proceed to take actions' 'for improving convergence.' ) perform_fix = True elif run_status['contains_nelm_breach']: # Then there are some breaches in the ionic cycles if self.ctx.ignore_transient_nelm_breach: self.report( 'WARNING: NELM limit breached in some ionic steps but requested to ignore this -' 'no action taken.' ) perform_fix = False else: self.report( 'The NELM limit has been breached in some ionic steps - proceed to take actions for' ' improving convergence.' ) perform_fix = True if not perform_fix: return None if node.exit_status == VaspCalculation.exit_codes.ERROR_VASP_CRITICAL_ERROR.status: # Make sure we only continue in this handler for a selected set of the critical errors. if not any(item in node.exit_message for item in ['EDDRMM', 'EDDDAV', 'The topmost band is occupied']): # We have some other critical error not to handle here return None # Check if we need to add more bands for item in notifications: if item['name'] == 'bandocc' and self.ctx.handler.nbands_increase_tries < 5: try: nbands = run_status['nbands'] # Increase nbands with 10% nbands_new = math.ceil(nbands * 1.1) self.report(f'Changing NBANDS from {nbands} to {nbands_new}') self.ctx.handler.nbands_increase_tries += 1 incar['nbands'] = nbands_new self.ctx.inputs.parameters.update(incar) return ProcessHandlerReport(do_break=True) except KeyError: self.report( 'The topmost band is occupied but did not locate the nbands entry in run_status, ' 'so no way to do corrections.' ) return ProcessHandlerReport( exit_code=self.exit_codes.ERROR_MISSING_CRITICAL_OUTPUT, do_break=True, ) # pylint: disable=no-member if nelm < 300: # Standard NELM might be a bit low, so increase a bit incar['nelm'] = 300 # Here we can just continue from previous run self._setup_restart(node) self.ctx.inputs.parameters.update(incar) self.report(f'Changing NELM from {nelm} to 300.') return ProcessHandlerReport(do_break=True) # Let us start or continue to switch algorithms and reduce try list try: if self.ctx.handler.get('remaining_algos', None) is None: self.ctx.handler.remaining_algos = self._algo_switching[algo.lower()] new_algo = self.ctx.handler.remaining_algos.pop(0) incar['algo'] = new_algo self.ctx.inputs.parameters.update(incar) self.report(f'Changing ALGO from {algo.lower()} to {new_algo}') return ProcessHandlerReport(do_break=True) except IndexError: self.report('No more algorithms to try and we still have not reached electronic convergence.') self.report('No additional fixes can be applied to improve the electronic convergence - aborting.') return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format( message='Cannot apply fix for reaching electronic convergence.' ), )
[docs] @process_handler( priority=800, exit_codes=[ VaspCalculation.exit_codes.ERROR_ELECTRONIC_NOT_CONVERGED, VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED, VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH, ], ) def handler_electronic_conv(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """Handle electronic convergence problem""" incar = node.inputs.parameters.get_dict() run_status = self._get_run_status(node) nelm = run_status['nelm'] algo = incar.get('algo', 'normal') # In case of ionic convergence problem, we also act if electronic convergence problem has been reported. if node.exit_status in [ VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED.status, VaspCalculation.exit_codes.ERROR_DID_NOT_FINISH, ]: perform_fix = False if run_status['consistent_nelm_breach']: self.report( 'The NELM limit has been breached in all ionic steps - proceed to take' ' actions for improving convergence.' ) perform_fix = True elif run_status['contains_nelm_breach']: # Then there are some breaches in the ionic cycles if self.ctx.ignore_transient_nelm_breach: self.report( 'WARNING: NELM limit breached in some ionic steps but requested to ignore this' ' - no action taken.' ) perform_fix = False else: self.report( 'The NELM limit has been breached in some ionic steps - proceed to take actions' ' for improving convergence.' ) perform_fix = True if not perform_fix: return None if algo.lower() in ('fast', 'veryfast'): incar['algo'] = 'normal' self._setup_restart(node) self.ctx.inputs.parameters.update(incar) self.report(f'Setting ALGO=normal from ALGO={algo.lower()}') return ProcessHandlerReport(do_break=True) # The logic below only works for algo=normal if algo.lower() == 'normal': # First try - Increase NELM if we started from a low NELM if nelm < 100: incar['nelm'] = 150 self._setup_restart(node) self.ctx.inputs.parameters.update(incar) self.report('Setting NELM to 150') return ProcessHandlerReport(do_break=True) # Adjust AMIX value if NELM is already high amix = incar.get('amix', 0.4) amix_steps = [0.2, 0.1, 0.05] for amix_target in amix_steps: if amix > amix_target: incar['amix'] = amix_target # Increase NELM in the mean time - smaller amplitude requires more cycles but more stable. incar['nelm'] = nelm + 20 self._setup_restart(node) self.ctx.inputs.parameters.update(incar) self.report(f'Reducing AMIX to {incar["amix"]}') return ProcessHandlerReport(do_break=True) # Change to ALGO if options have been exhausted incar['algo'] = 'all' self.ctx.inputs.parameters.update(incar) self._setup_restart(node) self.report('Switching to ALGO = ALL') return ProcessHandlerReport(do_break=True) self.report('No additional fixes can be applied to improve the electronic convergence - aborting.') return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format( message='Cannot apply fix for reaching electronic convergence.' ), )
[docs] @process_handler( priority=510, exit_codes=[VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED], enabled=False, ) def handler_ionic_conv_enhanced(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: # pylint: disable=too-many-return-statements, too-many-branches """ Enhanced handling of ionic relaxation problem beyond simple restarts. This is only used when the calculation is having difficulties reaching the convergence. This handler should be applied before the standard handler which breaks the handling cycle. """ if 'structure' not in node.outputs: self.report('Performing a geometry optimization but the output structure is not found.') return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format( message='No output structure for restarting ionic relaxation.' ), ) # pylint: disable=no-member # The simplest solution - resubmit the calculation again child_nodes = self.ctx.children child_miscs = [node.outputs.misc for node in child_nodes] self.update_magmom(node) # Enhanced handler only takes place after 3 trials if len(child_miscs) < 3: return None natom = len(self.ctx.inputs.structure.sites) # Number of iterations ionic_iterations = [misc['run_status']['last_iteration_index'][0] for misc in child_miscs] # Output energies energies = [] for misc in child_miscs: energies.append(misc.get('total_energies', {}).get('energy_extrapolated')) if all([eng is not None for eng in energies[-3:]]): # pylint: disable=use-a-generator de_per_atom = np.diff(energies) / natom else: return None # First check if dE is very small if np.all(np.abs(de_per_atom) < 1e-5): msg = ( 'The total energy difference between the last two step is smaller than 1e-5 /atom' '- please consider to revise the cutoff value of the ionic steps.' ) self.report(msg) return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=msg), ) # Check if there are very few step performed per launch. Because VASP does not carry over # the internal parameters of the optimizer, this can make convergence slower. if ionic_iterations[-1] < 5: msg = ( 'Less than 5 iterations performed in the last launch - ' 'please consider submitting the jobs with revised resources request.' ) self.report(msg) return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=msg), ) # Warn about very unusually large number of steps and switch IBRION if needed. # Total degrees of freedom dof = 3 * natom isif = self.ctx.inputs.parameters.get('isif', 2) if isif == 3: dof += 6 if sum(ionic_iterations) > dof + 10: self.report(f'Unusually large number of iterations performed for the degrees of freedom: {dof}') ibrion = self.ctx.inputs.parameters.get('ibrion') # In this case alternate between different relaxation algorithms if ibrion == 2: self.ctx.inputs.parameters['ibrion'] = 1 self.ctx.inputs.parameters['potim'] = 0.3 self.ctx.inputs.structure = node.outputs.structure self.report('Switching to IBRION=1 from IBRION=2 with POTIM = 0.3') return ProcessHandlerReport(do_break=True) if ibrion == 1: self.ctx.inputs.parameters['ibrion'] = 2 self.ctx.inputs.parameters['potim'] = 0.1 self.ctx.inputs.structure = node.outputs.structure self.report('Switching to IBRION=2 from IBRION=1 with POTIM = 0.1') return ProcessHandlerReport(do_break=True) # Check if energies are increasing without significant volume change vol_changes = [] for child in child_nodes: inp_vol = child.inputs.structure.get_cell_volume() out_vol = child.outputs.structure.get_cell_volume() vol_changes.append(out_vol / inp_vol - 1.0) vol_changes = np.array(vol_changes) vol_tol = 0.03 if np.all(de_per_atom > 0.0) and np.all(abs(vol_changes[-2:]) < vol_tol): msg = 'Energy increasing for the last two iterations - something can be very wrong...' self.report(msg) return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=msg), ) self.report('No fixes can be applied for ionic convergence.') return None
[docs] @process_handler(priority=505, exit_codes=[VaspCalculation.exit_codes.ERROR_IONIC_NOT_CONVERGED]) def handler_ionic_conv(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """Handle ionic convergence problem""" if 'structure' not in node.outputs: self.report('Performing a geometry optimization but the output structure is not found.') return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format( message='No output structure for restarting ionic relaxation.' ), ) # pylint: disable=no-member # The simplest solution - resubmit the calculation again self.report('Continuing geometry optimization using the last geometry.') self.ctx.inputs.structure = node.outputs.structure self._setup_restart(node) self.update_magmom(node) return ProcessHandlerReport(do_break=True)
[docs] @process_handler(priority=400, exit_codes=[VaspCalculation.exit_codes.ERROR_VASP_CRITICAL_ERROR]) def handler_vasp_critical_error(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """ Check if the calculation contain any critical error. """ notification = node.outputs.misc['notifications'] message = ( f'Critical error detected in the notifications: {", ".join([item.get("name") for item in notification])}' ) self.report(message + ' - aborting.') return ProcessHandlerReport( do_break=True, exit_code=self.exit_codes.ERROR_OTHER_INTERVENTION_NEEDED.format(message=message), )
[docs] @process_handler(priority=5) def check_misc_output(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """ Check if misc output exists. """ misc = node.outputs.misc.get_dict() if 'run_status' not in misc: self.report('`run_status` is not found in misc - cannot verify the integrity of the child calculation.') return ProcessHandlerReport(exit_code=self.exit_codes.ERROR_MISSING_CRITICAL_OUTPUT, do_break=True) return None
[docs] @process_handler(priority=4) def check_calc_is_finished(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """ Check if the calculation has reached the end of execution. """ run_status = self._get_run_status(node) if not run_status.get('finished'): self.report(f'The child calculation {node} did not reach the end of execution.') return ProcessHandlerReport(exit_code=self.exit_codes.ERROR_CALCULATION_NOT_FINISHED, do_break=True) return None
[docs] @process_handler(priority=3) def check_electronic_converged(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """ Check if the calculation has converged electronic structure. """ run_status = self._get_run_status(node) # Check that the electronic structure is converged if not run_status.get('electronic_converged'): self.report(f'The child calculation {node} does not possess a converged electronic structure.') return ProcessHandlerReport( exit_code=self.exit_codes.ERROR_ELECTRONIC_STRUCTURE_NOT_CONVERGED, do_break=True, ) # pylint: disable=no-member if run_status.get('contains_nelm_breach'): if self.ctx.ignore_transient_nelm_breach: self.report( 'The calculation contains at least one electronic minimization ' 'that was truncated. It should thus not be considered converged. ' 'Upon request from user, this is ignored.' ) else: self.report( 'The calculation contains at least one electronic minimization ' 'that is truncated. It should thus not be considered converged. ' 'Treating the calculation as failed. Please inspect, maybe it is salvageable.' ) return ProcessHandlerReport( exit_code=self.exit_codes.ERROR_UNCONVERGED_ELECTRONIC_STRUCTURE_IN_RELAX, do_break=True, ) # pylint: disable=no-member return None
[docs] @process_handler(priority=2) def check_ionic_converged(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: """ Check if the calculation has converged ionic structure. """ # Check if we have requested to ignore ionic convergence check at calculation level # If so, then this handler should be by-passed if 'settings' in node.inputs: settings = node.inputs.settings.get_dict() if not settings.get('CHECK_IONIC_CONVERGENCE', True): return None run_status = self._get_run_status(node) # Check that the ionic structure is converged if run_status.get('ionic_converged') is False: self.report(f'The child calculation {node} did not have converged ionic structure.') return ProcessHandlerReport( exit_code=self.exit_codes.ERROR_IONIC_RELAXATION_NOT_CONVERGED, do_break=True, ) # pylint: disable=no-member return None
# In this workchain we default to ignore the NELM breaches in the middle of the calculation
[docs] @process_handler(priority=850, enabled=True) def ignore_nelm_breach_relax(self, node: CalcJobNode) -> None: """ Not a actual handler but works as a switch to bypass checks for NELM breaches in the middle of an ionic relaxation. """ _ = node self.ctx.ignore_transient_nelm_breach = True
[docs] def _calculation_sanity_checks(self, node: CalcJobNode) -> Optional[ProcessHandlerReport]: # pylint: disable=unused-argument """ Perform additional sanity checks on successfully completed calculation. This method acts invokes the 'check' handlers to check the calculations and abort the workchain if any problem is found. This is useful when all of the corresponding error handlers are disabled, and allow one to avoid the default behaviour of restarting the calculation one more times regardlessly with unhandled errors. """ checks = [ self._check_misc_output, self._check_calc_is_finished, self._check_electronic_converged, self._check_ionic_converged, ] # Go though the checks one after another, return report if necessary last_report = None for check in checks: report = check(node) if report: if report.do_break: return report last_report = report return last_report
[docs] def _update_last_calc_objects(self, node: CalcJobNode) -> List[str]: """ Connect to the remote and find the valid objects in th calculation folder Only update if the entry is empty in order to avoid too many connections to the remote. """ if not self.ctx.last_calc_remote_objects: self.ctx.last_calc_remote_objects = list_valid_objects_in_remote(node.outputs.remote_folder) return self.ctx.last_calc_remote_objects
[docs] def _setup_restart(self, node: CalcJobNode) -> bool: """ Check the existence of any restart objects, if any of them eixsts use the last calculation for restart. """ self._update_last_calc_objects(node) if 'WAVECAR' in self.ctx.last_calc_remote_objects or 'CHGCAR' in self.ctx.last_calc_remote_objects: self.ctx.restart_calc = node return True return False
[docs] def list_valid_objects_in_remote(remote: orm.RemoteData, path: str = '.', size_threshold: int = 0) -> List[str]: """ List non-empty objects in the remote folder :param remote: The `RemoteFolder` node to be inspected. :param path: The relative path. :param size_threshold: The size threshold to treat the object as a valide one. :returns: A list of valid objects in the directory. """ none_empty = [] try: contents = remote.listdir_withattributes(path) except OSError: return [] for obj in contents: if obj['attributes'].st_size > size_threshold and not obj['isdir']: none_empty.append(obj['name']) return none_empty
[docs] def potential_family_validator(family: orm.Str, _) -> None: """ Validate the potential family input. :param faimly: The potential family to be validated. :raises ValueError: If the potential family is not valid. """ if not family.value: raise InputValidationError('The potential family cannot be empty.') group = PotcarData.get_potcar_group(family.value) if group is None: raise InputValidationError( f'The potential family "{family.value}" is not found. ' 'Please use aiida-vasp potcar listfamilies tool to verify your settings.' )
[docs] def validate_calc_job_custom(inputs: Any, ctx) -> Optional[str]: """Validate the entire set of inputs passed to the `CalcJob` constructor. Reasons that will cause this validation to raise an `InputValidationError`: * No `Computer` has been specified, neither directly in `metadata.computer` nor indirectly through the `Code` input * The specified computer is not stored * The `Computer` specified in `metadata.computer` is not the same as that of the specified `Code` * No `Code` has been specified and no `remote_folder` input has been specified, i.e. this is no import run :return: string with error message in case the inputs are invalid """ try: ctx.get_port('code') ctx.get_port('metadata.computer') except ValueError: # If the namespace no longer contains the `code` or `metadata.computer` ports we skip validation return None remote_folder = inputs.get('remote_folder', None) if remote_folder is not None: # The `remote_folder` input has been specified and so this concerns an import run, which means that neither # a `Code` nor a `Computer` are required. However, they are allowed to be specified but will not be explicitly # checked for consistency. return None code = inputs.get('code', None) computer_from_code = code.computer computer_from_metadata = inputs.get('metadata', {}).get('computer', None) if not computer_from_code and not computer_from_metadata: return 'no computer has been specified in `metadata.computer` nor via `code`.' if computer_from_code and not computer_from_code.is_stored: return f'the Computer<{computer_from_code}> is not stored' if computer_from_metadata and not computer_from_metadata.is_stored: return f'the Computer<{computer_from_metadata}> is not stored' if computer_from_code and computer_from_metadata and computer_from_code.uuid != computer_from_metadata.uuid: return ( 'Computer<{}> explicitly defined in `metadata.computer` is different from Computer<{}> which is the ' 'computer of Code<{}> defined as the `code` input.'.format(computer_from_metadata, computer_from_code, code) ) try: resources_port = ctx.get_port('metadata.options.resources') except ValueError: return None # If the resources port exists but is not required, we don't need to validate it against the computer's scheduler if not resources_port.required: return None computer = computer_from_code or computer_from_metadata scheduler = computer.get_scheduler() old_workchain_interface = False try: resources = inputs['metadata']['options']['resources'] except KeyError: old_workchain_interface = True warnings.warn( 'input `metadata.options.resources` is not specified - you are probably using the old options port.' 'Please define options directly in `calc.metadata.options` instead of `options`.' ) if not old_workchain_interface: scheduler.preprocess_resources(resources, computer.get_default_mpiprocs_per_machine()) try: scheduler.validate_resources(**resources) except ValueError as exception: return f'input `metadata.options.resources` is not valid for the `{scheduler}` scheduler: {exception}' else: resources = inputs['options'].get('resources') if resources is None: return ( '`resources` is not specified under `options` nor in `calc.metadata.options.resources` ' 'please define it in `calc.metadata.options`' ) return None