Source code for aiida_vasp.commands.utils

"""
Auxsiliary functions for AiiDA VASP vasp.launch command line tools.
"""

from pathlib import Path

import click
from aiida import orm
from aiida.common.exceptions import NotExistent
from aiida.plugins import DataFactory
from ase.io import read

from aiida_vasp.commands.option_parser import process_dict_option
from aiida_vasp.common.builder_updater import VaspBuilderUpdater
from aiida_vasp.parsers.content_parsers.poscar import PoscarParser


[docs] def load_structure(structure_path: str | Path) -> orm.StructureData: """Load a structure from various file formats.""" structure_path = Path(structure_path) if not structure_path.exists(): try: structure = orm.load_node(structure_path) except NotExistent: raise click.ClickException(f'Structure file not found: {structure_path} nor it is a valid node identifier') # Try a node return structure # Try to determine format from extension extension = structure_path.suffix.lower() if extension in ['.cif']: # Load CIF file cif_data = DataFactory('core.cif') cif_node, _ = cif_data.get_or_create(str(structure_path.absolute())) # Convert to StructureData structure = orm.StructureData(ase=cif_node.get_ase()) elif extension in ['.poscar', '.contcar', '.vasp'] or structure_path.name.upper() in ['POSCAR', 'CONTCAR']: # Load POSCAR/CONTCAR file with open(structure_path, 'r', encoding='utf8') as handler: poscar_parser = PoscarParser(handler=handler) poscar_dict = poscar_parser.structure structure = orm.StructureData() structure.set_cell(poscar_dict['unitcell']) for site in poscar_dict['sites']: structure.append_atom(position=site['position'], symbols=site['symbol'], name=site['kind_name']) else: # Try to use ASE for other formats (XYZ, etc.) try: atoms = read(str(structure_path)) structure = orm.StructureData(ase=atoms) except Exception as e: raise click.ClickException(f'Could not load structure from {structure_path}: {e}') # Set a label based on filename structure.label = structure_path.stem return structure
[docs] def setup_calculation_options(options, resources, max_wallclock_seconds, num_machines, tot_num_mpiprocs): """Setup computational resources from various options.""" options_dict = {} if options: options_dict.update(process_dict_option(options)) if resources: options_dict['resources'] = process_dict_option(resources) if max_wallclock_seconds: options_dict['max_wallclock_seconds'] = max_wallclock_seconds if 'resources' not in options_dict: options_dict['resources'] = {} if num_machines: options_dict['resources']['num_machines'] = num_machines if tot_num_mpiprocs: options_dict['resources']['tot_num_mpiprocs'] = tot_num_mpiprocs return options_dict
[docs] def apply_additional_updates(upd: VaspBuilderUpdater, additional_overrides: dict): """ Apply additional overrides to the builder updater by using the set_xxx methods. """ if not additional_overrides: return click.echo(f'Loading input overrides from: {additional_overrides}') # Apply other overrides for key, value in additional_overrides.items(): if hasattr(upd, f'set_{key}'): method = getattr(upd, f'set_{key}') if isinstance(value, dict): method(**value) else: method(value) click.echo(f'Applied {key} overrides')
[docs] def handle_calculation_submission( upd: VaspBuilderUpdater, run_directly: bool, group: str, alias: str | None = None ) -> orm.ProcessNode: """Handle calculation submission and group assignment.""" # Submit or run the calculation if not run_directly: click.echo('Submitting calculation to daemon...') result = upd.submit() click.echo(f'Submitted calculation with PK: {result.pk}') click.echo(f'UUID: {result.uuid}') process_node = result else: click.echo('Running calculation directly...') result = upd.run_get_node() click.echo(f'Calculation completed with PK: {result.node.pk}') click.echo(f'UUID: {result.node.uuid}') if result.node.is_finished_ok: click.echo('Calculation finished successfully!') else: click.echo(f'Calculation failed with exit status: {result.node.exit_status}') if result.node.exit_message: click.echo(f'Exit message: {result.node.exit_message}') process_node = result.node # Add to group if specified if group: try: calc_group, created = orm.Group.collection.get_or_create(label=group) # Using GroupPathX - we give the resulting node an alias if specified if alias: try: from aiida_grouppathx.pathx import GroupPathX except ImportError: raise ImportError('aiida-grouppathx is required for setting an alias. Please install it first.') GroupPathX(group).add_node(process_node, alias=alias) alias_action = f' with alias "{alias}"' else: # Just add the node normally calc_group.add_nodes([process_node]) alias_action = '' action = 'Created' if created else 'Added to existing' click.echo(f"{action} group '{group}' and added calculation{alias_action}") except Exception as e: click.echo(f"Warning: Could not add to group '{group}': {e}") return process_node