Source code for comp_chem_utils.cpmd_utils

#!/usr/bin/env python
"""Collection of functions to read and manipulate CPMD output files"""

__author__="Pablo Baudin"
__email__="pablo.baudin@epfl.ch"

import sys
import os
import numpy as np

from comp_chem_utils.molecule_data import read_xyz_table, xyz_file
from comp_chem_utils.utils import get_file_as_list, get_lmax_from_atomic_charge
from comp_chem_utils.conversions import AU_TO_PS
from comp_chem_utils.periodic import element


[docs]def read_standard_file(fn): """Read standard trajectory file and export it. Here a trajectory file is understood as a file arranged as columns in which the first column contains the step indices and the others any type of informations. For example the ENERGIES or SH_STATE.dat files enter in this category. Args: fn (str): Name of the trajectory file. Returns: steps, info steps is as list of step indices (int) as read from the first column of the trajectory file, while info is an array (``np.array()``) of floats containing the rest of the information. """ lines = get_file_as_list(fn) steps = [int(l.split()[0]) for l in lines] ninfo = len(lines[0].split()) - 1 info = np.zeros( (len(steps), ninfo) ) for i, line in enumerate(lines): info[i,:] = [float(line.split()[j+1]) for j in range(ninfo)] return steps, info
[docs]def read_TRAJEC_xyz(fn): """Read TRAJEC.xyz file and export it. An xyz type information is read for each step in the trajectory file, where xyz info is assumed to have the following format:: number of atoms title line = step index atom 1 label and corresponding xyz coordinate atom 2 label and corresponding xyz coordinate : : : atom N label and corresponding xyz coordinate Args: fn (str): Name of the TRAJEC.xyz file. Returns: steps, traj_xyz steps (list): List of step indices (int) as read from the title line of each xyz_data block. traj_xyz (list): List of xyz_data blocks. Each block is itself a list of the lines containing the coordinates in the xyz format. Each line is a 4 item list with first index the atom symbol and the last 3 items are the xyz coodinates. """ lines = get_file_as_list(fn) natoms = int(lines[0]) # 1 xyz_data consist of natoms lines plus the two first lines (natoms + step index) traj_xyz = [] steps = [] for xyz_data in (lines[x:x+natoms+2] for x in range(0,len(lines),natoms+2)): # test that the xyz_data is not empty if (xyz_data[0].strip()): # read_xyz_table return a list of length natoms # each line is a list with the atom's symbol and the 3 coordinates traj_xyz.append( read_xyz_table(xyz_data[2:natoms+2]) ) steps.append( int(xyz_data[1].split()[1]) ) return steps, traj_xyz
[docs]def read_GEOMETRY_xyz(fn): """Like read_TRAJEC_xyz for a single geometry and without the step index.""" lines = get_file_as_list(fn) natoms = int(lines[0]) return read_xyz_table(lines[2:natoms+2])
[docs]def write_TRAJEC_xyz(steps, traj_xyz, output): """Write a TRAJEC.xyz file (CPMD style) to the output file. Args: steps (list): Step indices. See read_TRAJEC_xyz() function. traj_xyz (list): xyz data. See read_TRAJEC_xyz() function. output (str): Name (and path) fo the file in which the information will be written. """ with open(output, 'w') as myf: for s, x in zip(steps, traj_xyz): myf.write('{:12d}\n'.format(len(x))) myf.write(' STEP:{:12d}\n'.format(s)) for line in x: #myf.write('{:>2} {:13.6f} {:13.6f} {:13.6f}\n'.format(*line)) myf.write('{0[0]:<2} {0[1]:13.6f} {0[2]:13.6f} {0[3]:13.6f}\n'.format(line))
[docs]def split_TRAJEC_data(steps, traj_xyz, verbose=True, interactif=True, write_xyz=False, start_i=1, nstep=100, delta=None, dt=5, name=''): """Select equidistant xyz data snapshot from trajectory data. From a starting index, a number of snapshots and a number of steps between each snapshot. A new trajectory data is generated with only a subset of steps. Args: steps (list): Step indices. See read_TRAJEC_xyz() function. traj_xyz (list): xyz data. See read_TRAJEC_xyz() function. verbose (bool, optional): Print more information while running. Default ``is True``. interactif (bool, optional): Let the user chose the parameters interactively. Default ``is True``. write_xyz (bool, optional): Write an xyz file to disk for every step. Default ``is False``. start_i (int, optional): Step index for the first snapshot. Default is 1 nstep (int, optional): Total number of final snapshots. Default is 100. delta (int, optional): Number of steps between two snapshots. Default is ``None``. It will be changed to the maximum possible value depending on other paramters. dt (int, optional): Time step used in MD [in a.u.] (to provide print out the actual time between the snapshots). Default is 5 a.u. name (str, optional): String/Title to be used in xyz filename in case ``write_xyz = True``. Return: new_steps, new_traj_xyz Just a subset of ``steps`` and ``traj_xyz``. """ if verbose: print("Total number of steps : {}".format(len(steps))) print("First and last step index: {} - {}".format(steps[0], steps[-1])) print("Delta between two steps : {}\n".format(steps[1]-steps[0])) if interactif: # ask user to chose the relevant snapshots start_i = int( raw_input("Step index for first snapshot (between 0 and {}): \n".format(len(steps)-1))) nstep = int( raw_input("Total number of snapshot (between 1 and {}): \n".format(len(steps)-start_i))) delta = (len(steps) - start_i)/(nstep-1) - 1 inp = raw_input("Delta between two step indices (max and default = {}): \n".format(delta)) if inp: delta = int(inp) dt = float( raw_input("Time step used in MD [in a.u.] (to provide time between snapshots): \n")) write_xyz = raw_input("Write xyz file ? [y/n]\n").strip().lower() == 'y' if write_xyz: name = raw_input("String/Title to be used in xyz filename:\n") else: # use input or default value if not delta: delta = (len(steps) - start_i)/(nstep-1) - 1 # time between snapshots is given by d_steps * dt [a.u.] # where d_steps is the actual number of steps between two snapshots: d_steps = steps[start_i + delta] - steps[start_i] # stoping index in steps list stop_i = start_i + delta*nstep # sanity check if stop_i>len(steps): stop_i = len(steps) if verbose: print("WARNING new number of steps: {}".format( len(range(start_i, stop_i, delta)) )) if verbose: print("Starting index: {}".format( start_i )) print("Last index: {}".format( stop_i )) print("Number of snapshots: {}\n".format( nstep )) print("# steps between two snapshots: {}".format(d_steps)) print("Time between snapshots [a.u.]: {}".format(d_steps * dt)) print("Time between snapshots [ps]: {}\n".format(d_steps * dt * AU_TO_PS)) # Loop over chosen configurations new_traj_xyz = [] new_steps = [] for i in range(start_i, stop_i, delta): new_steps.append( steps[i] ) new_traj_xyz.append( traj_xyz[i] ) # create xyz file if write_xyz: fname = '{:010d}_{}.xyz'.format(new_steps[-1], name) xyzf = xyz_file() xyzf.read_from_table(new_traj_xyz[-1], title=name) xyzf.out_to_file(fname=fname) return new_steps, new_traj_xyz
[docs]def read_GEOMETRY(fn): """Simplified version of read_FTRAJECTORY for a single structure.""" data = np.loadtxt(fn) xyz = data[:,0:3] vel = data[:,3:] return xyz, vel
[docs]def read_TRAJECTORY(fn, verbose=False): """Just a wrapper to read_FTRAJECTORY.""" return read_FTRAJECTORY(fn, forces=False, verbose=verbose)
[docs]def read_FTRAJECTORY(fn, forces=True, verbose=False): """Read the FTRAJECTORY file from a CPMD run and export it. Three different formats can be read. #. The TRAJECTORY file (with ``forces=False``):: Column 0: step index Column 1-3: xyz coordinates Column 4-6: velocities #. The FTRAJECTORY file (with ``forces=True``):: Column 0: step index Column 1-3: xyz coordinates Column 4-6: velocities Column 7-9: forces #. The FTRAJECTORYMTS file (with ``forces=True``, this file is not produced anymore):: Column 0: step index Column 1-3: low level forces Column 4-6: high level forces Column 7-9: xyz coordinates """ if verbose: print('INFO: entering read_FTRAJECTORY') stp = [] xyz = [] vel = [] if forces: fce = [] lines = get_file_as_list(fn) if verbose: print('INFO: file read') # read total number of steps from last line nstep = int(lines[-1].split()[0]) # get number of atoms as (# lines) / (# steps) nlines = len(lines) natoms = get_natoms(lines) if verbose: print('INFO: number of steps: {}'.format(nstep)) print('INFO: number of atoms: {}'.format(natoms)) blocks = [ lines[x:x+natoms] for x in range(0, nlines, natoms) ] if verbose: print('INFO: blocks read') for blk in blocks: # step index from first column of first line in block stp.append( int(blk[0].split()[0]) ) if verbose: print('INFO: read step: {:10d} of {:10d}'.format(stp[-1], nstep)) myxyz = np.zeros( (natoms, 3) ) myvel = np.zeros( (natoms, 3) ) if forces: myfce = np.zeros( (natoms, 3) ) for i, ml in enumerate(blk): l = ml.split() myxyz[i,:] = [float(l[x]) for x in range(1,4)] myvel[i,:] = [float(l[x]) for x in range(4,7)] if forces: myfce[i,:] = [float(l[x]) for x in range(7,10)] xyz.append( myxyz ) vel.append( myvel ) if forces: fce.append( myfce ) if forces: return stp, xyz, vel, fce else: return stp, xyz, vel
ref_code = { 'steps' :-1, 'E_kel' : 0, 'Temp' : 1, 'E_KS' : 2, 'E_cla' : 3, 'E_ham' : 4, 'RMS' : 5, 'CPU_t' : 6 }
[docs]def read_ENERGIES(fn, code, factor=1, HIGH=True): """Read ENERGIES file from CPMD and return data based on code: Args: fn (str): filename of the ENERGIES file (can include path to the file). code: List of codes written as strings describing which information should be extracted from the file. The available codes are:: 'steps' : Step indices 'E_kel' : Electronic kinetic energy (only for CPMD) 'Temp' : Temperature [K] 'E_KS' : Kohn-Sham energy [a.u.] 'E_cla' : Classical energy, E_KS + E_kin (constant for BOMD) 'E_ham' : 0 for BOMD 'RMS' : Nuclear displacement wrt initial position (?) 'CPU_t' : CPU time factor (int): Integer factor used to skip some information. E.g. for an MTS calculation the high level information can be extracted by setting factor equals to the MTS factor used in the calculation. Default value is 1 (every time step info is extracted). HIGH (bool): define wether to extract the information every ``factor`` step (this is default, `HIGH=True`) or the negative counter part meaning the info is extracted for every step except every factor steps `HIGH=False`. Return: The function returns a dictionary with keys the input codes and with values an array (``np.array``) containing the corresonding information as ``floats``. The exception is the value for ``'steps'`` which is a simple list of integers. For example if the input codes are ``code=['steps','E_cla']``, the output dictionary will have the form:: >>> read_ENERGIES('ENERGIES', ['steps','E_cla']) {'E_cla': array([-546.99550862, -546.99770079, ..., -546.96549996]), 'steps': [1, 2, ..., 10000]} """ steps, info = read_standard_file(fn) myrange = range(factor-1,len(steps),factor) to_return = {} for i in code: if HIGH: # extract information every factor steps if i=='steps': to_return[i] = [steps[j] for j in myrange] else: l = info[:,ref_code[i]] to_return[i] = [l[j] for j in myrange] else: # extract negative info (i.e. all except every factor steps) if i=='steps': to_return[i] = [steps[j] for j in range(len(steps)) if j not in myrange ] else: l = info[:,ref_code[i]] to_return[i] = [l[j] for j in range(len(l)) if j not in myrange] return to_return
[docs]def read_SH_ENERG(fn, nstates=None, factor=1): """Read SH_ENERG.dat file and exctract info in dictionary""" steps, info = read_standard_file(fn) sh_data = {} sh_data['steps'] = [ x*factor for x in steps ] maxstates = len(info[0,:]) - 1 if not nstates: nstates = maxstates if nstates > maxstates: nstates = maxstates for i in range(nstates): sh_data[ 'State {}'.format(i) ] = info[:,i] sh_data[ 'Driving State' ] = info[:,-1] return sh_data
[docs]def read_MTS_EXC_ENERG(fn, nstates, MTS_FACTOR, HIGH): """Read MTS_EXC_ENERG.dat file and exctract info in dictionary. Either the HIGH or the LOW level info will be exctracted. """ steps, info = read_standard_file(fn) myrange = range(1, len(info[:,0]), (MTS_FACTOR+1)) if HIGH: SUB='HIGH' sub_info = np.array([info[x,:] for x in myrange] ) sub_steps = np.array([steps[x] for x in myrange] ) else: SUB='LOW' sub_info = np.array([info[x,:] for x in range(len(info[:,0])) if x not in myrange] ) sub_steps = np.array([steps[x] for x in range(len(info[:,0])) if x not in myrange] ) mts_data = {} mts_data['steps'] = sub_steps maxstates = len(sub_info[0,:]) - 1 if not nstates: nstates = maxstates if nstates > maxstates: nstates = maxstates for i in range(nstates): mts_data[ 'E({}) State {}'.format(SUB, i) ] = sub_info[:,i] mts_data[ 'E Driving' ] = sub_info[:,-1] return mts_data
[docs]def get_time_info(fn): """Read CPMD input file and extract time step info.""" # set defaults TIMESTEP = 5 MAXSTEP = 10000 USE_MTS = False MTS_FACTOR = 1 # read input file lines = get_file_as_list(fn) # parse it for j, line in enumerate(lines): if 'TIMESTEP' in line and 'FACTOR' not in line: TIMESTEP = float(lines[j+1].split()[0]) elif 'MAXSTEP' in line: MAXSTEP = int(lines[j+1].split()[0]) elif 'BOMD_FORCES MTS' in line: USE_MTS = True elif 'TIMESTEP_FACTOR' in line: MTS_FACTOR = int(lines[j+1].split()[0]) return TIMESTEP, MAXSTEP, USE_MTS, MTS_FACTOR
[docs]def get_natoms(lines): """Get the number of atoms from a file like TRAJECTORY.""" istep = lines[0].split()[0] natoms=0 for line in lines: new_i = line.split()[0] if new_i != istep: return natoms else: natoms+=1
[docs]def xyz_to_cpmd_atoms(xyz_data=None, xyz_filename=None, PPs=None): """Return list of strings as in CPMD ATOMS section.""" xyzf = xyz_file() if xyz_data: xyzf.read_from_table(xyz_data) elif xyz_filename: xyzf.read_xyz(xyz_filename) else: sys.exit('Wrong input!!') lines = [] lines.append('&ATOMS') # print atom types for atype in xyzf.atom_types: lines.append('*{}_{}'.format(atype.symb, PPs)) lmax = get_lmax_from_atomic_charge(atype.charge) lines.append(' LMAX={}'.format(lmax) ) lines.append(' {}'.format(atype.natoms) ) for x,y,z in zip(atype.xvals, atype.yvals, atype.zvals): line = " {:20.10f} {:20.10f} {:20.10f}".format( x, y, z ) lines.append( line ) lines.append('&END') return lines
[docs]def qmmm_cpmd_atoms(atom_types, PPs=None): """Return list of strings as in CPMD ATOMS section for QMMM calculation.""" lines = [] lines.append('&ATOMS') # print atom types for atype, idx in atom_types.items(): lines.append('*{}_{}'.format(atype, PPs)) atomic_charge = element(atype).atomic lmax = get_lmax_from_atomic_charge(atomic_charge) lines.append(' LMAX={}'.format(lmax) ) lines.append(' {}'.format( len(idx)) ) idx_str = ' '.join( [str(x) for x in idx ] ) lines.append( ' ' +idx_str ) lines.append('&END') return lines
[docs]def xyz_data_to_np(xyz_data): """Convert xyz_data information to a tuple (atoms, coord) Where atoms is a list of the atomic symbols and coord is a matrix: np.array[natoms, 3]""" atoms = [ l[0] for l in xyz_data] coord = [ l[1:] for l in xyz_data] return (atoms, np.array(coord))
if __name__ == "__main__": # TODO: actually test the result of the functions print("Testing {} module\n".format(os.path.basename(__file__))) fold = './tests/CPMD_files/' fn = '{}SH_ENERG.dat'.format(fold) read_standard_file(fn) print("Test of function: read_standard_file: OK") # read ./test/TRAJEC.xyz file and print it to new file fn = "{}TRAJEC.xyz".format(fold) steps, traj_xyz = read_TRAJEC_xyz(fn) print("Test of function: read_TRAJEC_xyz : OK") output = '{}TRAJEC_1.xyz'.format(fold) write_TRAJEC_xyz(steps, traj_xyz, output) print("Test of function: write_TRAJEC_xyz : OK") #os.rm(output) fn = '{}FTRAJECTORY'.format(fold) read_FTRAJECTORY(fn) print("Test of function: read_FTRAJECTORY : OK") fn = '{}ENERGIES'.format(fold) read_ENERGIES(fn, ref_code) print("Test of function: read_ENERGIES : OK") print("\nAll tests were successfully executed! :D")