Source code for aiida_vasp.utils.export

from __future__ import annotations

import gzip
import os
import re
import shutil
from pathlib import Path
from typing import Any

from aiida import orm
from aiida.common.links import LinkType
from aiida.orm import CalcJobNode, Node, QueryBuilder, WorkChainNode
from aiida.plugins import WorkflowFactory
from aiida.repository import FileType

from aiida_vasp.parsers.content_parsers.poscar import PoscarParser
from aiida_vasp.parsers.content_parsers.potcar import MultiPotcarIo
from aiida_vasp.workchains.v2.relax import VaspRelaxWorkChain

from .aiida_utils import ensure_node_first_arg, ensure_node_kwargs


[docs] def export_vasp( process_node: orm.Node, dst: str | Path, decompress: bool = False, include_potcar: bool = False ) -> None: """Export VASP calculation files to destination.""" # Dispatch export function based on process type if process_node.process_type.endswith('vasp'): _export_calculation(process_node, dst, decompress=decompress, include_potcar=include_potcar) elif process_node.process_type.endswith('neb'): export_neb(process_node, dst, decompress=decompress, include_potcar=include_potcar) elif process_node.process_type.endswith('relax'): _export_workchain(process_node, dst, decompress=decompress, include_potcar=include_potcar) else: raise TypeError(f'Unsupported process type: {process_node.process_type}')
[docs] @ensure_node_kwargs @ensure_node_first_arg def _export_calculation( calc_node: orm.CalcJobNode, dst: Path, decompress: bool = False, include_potcar: bool = False ) -> None: """Export a single calculation.""" folder = Path(dst) folder.mkdir(exist_ok=True) # Inputs retrieved = calc_node.get_outgoing(link_label_filter='retrieved').one().node if isinstance(calc_node, CalcJobNode): calcjob = calc_node elif isinstance(calc_node, WorkChainNode): # In this case the node is an workchain we export the # 'retrieved' output link and trace to its ancestor calcjob = retrieved.base.links.get_incoming(link_label_filter='retrieved', link_type=LinkType.CREATE).one().node else: raise RuntimeError(f'The node {calc_node} is not a valid calculation') info_file = folder / ('aiida_info') info_content = f'Label: {calcjob.label}\nDescription: {calcjob.description}\nUUID: {calcjob.uuid}\n' info_file.write_text(info_content) # export the retrieved outputs and the input files save_all_repository_objects(retrieved, folder, decompress) save_all_repository_objects(calcjob, folder, decompress) if include_potcar: export_pseudos(calcjob, folder)
[docs] @ensure_node_first_arg @ensure_node_kwargs def export_pseudos(calc_job_node: Any, folder: Path) -> None: """Save the pseudopotential file (POTCAR)""" pps = calc_job_node.get_incoming(link_label_filter='potential%').nested()['potential'] multi_potcar = MultiPotcarIo.from_structure(calc_job_node.inputs.structure, pps) dst = str(folder / 'POTCAR') multi_potcar.write(dst)
[docs] @ensure_node_first_arg @ensure_node_kwargs def _export_workchain( work_node: orm.WorkChainNode, dst: Path, decompress: bool = False, include_potcar: bool = False ) -> None: """Export a workchain.""" dst = Path(dst) dst.mkdir(exist_ok=True) if work_node.process_class not in (VaspRelaxWorkChain): raise ValueError( f'Error {work_node} should be `VaspRelaxWorkChain` or `RelaxWorkChain`, but it is {work_node.process_class}' ) q = QueryBuilder() q.append(Node, filters={'id': work_node.pk}) q.append(WorkChainNode, tag='vaspwork', project=['id', '*']) q.order_by({'vaspwork': {'id': 'asc'}}) # Sort by ascending PK for index, (pk, node) in enumerate(q.iterall()): relax_folder = dst / f'relax_calc_{index:03d}' try: _export_calculation(node, relax_folder, decompress=decompress, include_potcar=include_potcar) except (ValueError, AttributeError, KeyError): print(f'Error exporting calculation {pk}') # Write POSCAR file for the input input_structure = work_node.inputs.structure poscar_parser = PoscarParser(data=input_structure, precision=10) poscar_parser.write(str(dst / 'POSCAR')) # Write POSCAR file for the input try: out_structure = work_node.outputs.relax.structure except AttributeError: print( 'Cannot find the output structure - skipping.' ' This usually means that the relaxation did not finish without error.' ) out_structure = None if out_structure: poscar_parser = PoscarParser(data=out_structure, precision=10) poscar_parser.write(str(dst / 'POSCAR_RELAXED')) # Write the info info_file = dst / ('aiida_info') info_content = f'Label: {work_node.label}\nDescription: {work_node.description}\nUUID: {work_node.uuid}\n' info_file.write_text(info_content)
[docs] @ensure_node_first_arg def export_neb( workchain: Any, dst: str | Path, decompress: bool = True, include_potcar: bool = True, energy_type: str = 'energy_extrapolated', ) -> None: """Export the neb calculation""" energies = {key: value[energy_type] for key, value in workchain.outputs.misc['total_energies'].items()} # Query for the energy computed for the end structures q = orm.QueryBuilder() q.append(orm.Node, filters={'id': workchain.inputs.initial_structure.id}, tag='root') q.append(orm.CalcFunctionNode, with_outgoing='root', project=['attributes.function_name']) q.append( orm.StructureData, with_outgoing=orm.CalcFunctionNode, tag='relaxed', project=['label'], # edge_filters={'label': 'init_structure'}, edge_project=['label'], ) q.append( WorkflowFactory('vasp.v2.relax'), with_outgoing='relaxed', project=['label', 'uuid'], tag='relaxation', ) q.append( orm.Dict, with_incoming='relaxation', edge_filters={'label': 'misc'}, project=['attributes.total_energies.energy_extrapolated'], ) q.append(orm.CalcJobNode, with_outgoing=orm.Dict, project=['*']) q.distinct() # First export the original calculation _export_calculation(workchain, dst, decompress=decompress, include_potcar=include_potcar) ends = {} end_id = f'{len(energies) + 1:02d}' for _, _, _, relax_uuid, eng, calcjob, label in q.all(): if label.startswith('init'): if '00' in ends: print( 'Duplicated calculation: {relax_uuid} -> {eng} vs existing {existing}'.format( relax_uuid=relax_uuid, eng=eng, existing=ends['00'] ) ) else: ends['00'] = calcjob elif label.startswith('final'): if end_id in ends: print( 'Duplicated calculation: {relax_uuid} -> {eng} vs existing {existing}'.format( relax_uuid=relax_uuid, eng=eng, existing=ends[end_id] ) ) else: ends[end_id] = calcjob # Export the end point calculation for key, value in ends.items(): _export_calculation(value, Path(dst) / key, decompress=decompress, include_potcar=include_potcar)
[docs] @ensure_node_kwargs def copy_from_aiida(name: str, node: orm.Node, dst: Path, decompress: bool = False, exclude: str | None = None) -> None: """Copy objects from aiida repository. :param name: The full name (including the parent path) of the object. :type name: str :param node: Node object for which the files in the repo to be copied. :type node: orm.Node :param dst: Path of the destination folder. :type dst: Path :param decompress: Whether to decompress files :type decompress: bool, optional :param exclude: Pattern to exclude files :type exclude: str, optional This is a recursive function so directory copying also works. """ # For check the regex the first thing because this function will be called recursively if exclude and re.match(exclude, name): return obj = node.base.repository.get_object(name) # If it is a directory, copy the contents one by one if obj.file_type == FileType.DIRECTORY: for sub_obj in node.base.repository.list_objects(name): copy_from_aiida(os.path.join(name, sub_obj.name), node, dst, exclude=exclude) else: # It is a file with node.base.repository.open(name, mode='rb') as fsource: # Make parent directory if needed frepo_path = dst / name Path(frepo_path.parent).mkdir(exist_ok=True, parents=True) # Write the file if name.endswith('.gz') and decompress: out_path = str(frepo_path)[:-3] out_decompress = True else: out_decompress = False out_path = str(frepo_path) if not out_decompress: with open(out_path, 'wb') as fdst: shutil.copyfileobj(fsource, fdst) else: gobj = gzip.GzipFile(fileobj=fsource, mode='rb') with open(out_path, 'wb') as fdst: shutil.copyfileobj(gobj, fdst)
[docs] @ensure_node_first_arg def save_all_repository_objects( node: orm.Node, target_path: Path, decompress: bool = False, exclude: str | None = None ) -> None: """Copy all objects of a node saved in the repository to the disc""" for name in node.list_object_names(): copy_from_aiida(name, node, target_path, decompress, exclude=exclude)