#!/usr/bin/env python
"""
faps -- Frontend for Automated Adsorption Analysis of Porous Solids.
aka
shpes -- Sorption analysis with a High throughput Python
frontend to Examine binding in Structures
Strucutre adsorption property analysis for high throughput processing. Run
as a script, faps will automatically run complete analysis on a structure.
Sensible defaults are implemented, but calculations can be easily customised.
Faps also provides classes and methods for adapting the simulation or only
doing select parts.
"""
# Turn on keyword expansion to get revision numbers in version strings
# in .hg/hgrc put
# [extensions]
# keyword =
#
# [keyword]
# faps.py =
#
# [keywordmaps]
# Revision = {rev}
try:
__version_info__ = (1, 5, 0, int("$Revision$".strip("$Revision: ")))
except ValueError:
__version_info__ = (1, 5, 0, 0)
__version__ = "%i.%i.%i.%i" % __version_info__
import bz2
import code
try:
import configparser
except ImportError:
import ConfigParser as configparser
import gzip
import mmap
import os
import pickle
import re
import shlex
import shutil
import subprocess
import sys
import tarfile
import textwrap
import time
from copy import copy
from glob import glob
from itertools import count
from logging import warning, debug, error, info, critical
from math import ceil, log
from os import path
import numpy as np
from numpy import pi, cos, sin, sqrt, arccos, arctan2
from numpy import array, identity, dot, cross
from numpy.linalg import norm
from binding_sites.absl import calculate_binding_sites
from config import Options
from elements import WEIGHT, ATOMIC_NUMBER, UFF, VASP_PSEUDO_PREF
from elements import CCDC_BOND_ORDERS, GULP_BOND_ORDERS, OB_BOND_ORDERS, METALS
from elements import COVALENT_RADII, UFF_FULL, QEQ_PARAMS
from eos import peng_robinson
from job_handler import JobHandler
from logo import LOGO
# Global constants
DEG2RAD = pi / 180.0
BOHR2ANG = 0.52917720859
EV2KCAL = 23.060542301389
NAVOGADRO = 6.02214129E23
INFINITY = float('inf')
KCAL_TO_KJ = 4.1868 # Steam tables from openbabel
FASTMC_DEFAULT_GRID_SPACING = 0.1
# ID values for system state
NOT_RUN = 0
RUNNING = 1
FINISHED = 2
UPDATED = 3
SKIPPED = -1
NOT_SUBMITTED = -2
# Possible folder names; need these so that similar_ones_with_underscores are
# not globbed
FOLDER_SUFFIXES = ['gulp', 'gulp_opt', 'gulp_fit', 'siesta', 'vasp', 'egulp',
'repeat', 'fastmc', 'properties', 'absl', 'gromacs']
[docs]class PyNiss(object):
"""
PyNiss -- Negotiation of Intermediate System States
A single property calculation for one structure. Instance with a set of
options, then run the job_dispatcher() to begin the calculation. The
calculation will pickle itself, or can be pickled at any time, by calling
dump_state().
"""
def __init__(self, options):
"""
Instance an empty structure in the calculation; The dispatcher should
be called to fill it up with data, as needed.
"""
self.options = options
self.structure = Structure(options.get('job_name'))
self.state = {'init': (NOT_RUN, False),
'ff_opt': (NOT_RUN, False),
'dft': (NOT_RUN, False),
'esp': (NOT_RUN, False),
'charges': (NOT_RUN, False),
'properties': (NOT_RUN, False),
'absl': {},
'gcmc': {}}
self.job_handler = JobHandler(options)
[docs] def dump_state(self):
"""Write the .niss file holding the current system state."""
job_name = self.options.get('job_name')
info("Writing state file, %s.niss." % job_name)
os.chdir(self.options.get('job_dir'))
# Don't save the job handler in case it changes
save_handler = self.job_handler
self.job_handler = None
my_niss = open(job_name + ".niss", "wb")
pickle.dump(self, my_niss)
my_niss.close()
# put the job handler back and continue
self.job_handler = save_handler
[docs] def re_init(self, new_options):
"""Re initialize simulation (with updated options)."""
if new_options.getbool('update_opts'):
info("Using new options.")
self.options = new_options
self.structure.name = new_options.get('job_name')
else:
# Just update command line stuff
info("Using old options with new command line arguments.")
self.options.args = new_options.args
self.options.options = new_options.options
self.options.cmdopts = new_options.cmdopts
self.structure.name = new_options.get('job_name')
self.status(initial=True)
[docs] def job_dispatcher(self):
"""
Run parts explicity specified on the command line or do the next step
in an automated run. Drop to interactive mode, if requested.
"""
# In case options have changed, re-intitialize
self.job_handler = JobHandler(self.options)
if 'status' in self.options.args:
self.status(initial=True)
if self.options.getbool('interactive'):
code_locals = locals()
code_locals.update(globals())
console = code.InteractiveConsole(code_locals)
console.push('import rlcompleter, readline')
console.push('readline.parse_and_bind("tab: complete")')
banner = ("((-----------------------------------------------))\n"
"(( Interactive faps mode ))\n"
"(( ===================== ))\n"
"(( ))\n"
"(( WARNING: mode is designed for devs and ))\n"
"(( experts only! ))\n"
"(( Current simulation is accessed as 'self' and ))\n"
"(( associated methods. Type 'dir()' to see the ))\n"
"(( methods in the local namespace and 'help(x)' ))\n"
"(( for help on any object. ))\n"
"(( Use 'self.dump_state()' to save any changes. ))\n"
"((-----------------------------------------------))")
console.interact(banner=banner)
if self.options.getbool('import'):
info("Importing results from a previous simulation")
self.import_old()
self.dump_state()
if self.state['init'][0] == NOT_RUN:
info("Reading in structure")
# No structure, should get one
self.structure.from_file(
self.options.get('job_name'),
self.options.get('initial_structure_format'),
self.options)
if self.options.getbool('order_atom_types'):
info("Forcing atom re-ordering by types")
self.structure.order_by_types()
self.state['init'] = (UPDATED, False)
self.dump_state()
self.step_force_field()
self.step_dft()
self.step_charges()
if self.options.getbool('qeq_fit'):
if not 'qeq_fit' in self.state and self.state['charges'][0] == UPDATED:
info("QEq parameter fit requested")
self.run_qeq_gulp(fitting=True)
self.dump_state()
self.step_properties()
self.step_gcmc()
self.step_absl()
self.send_to_database()
self.post_summary()
[docs] def status(self, initial=False):
"""Print the current status to the terminal."""
valid_states = {NOT_RUN: 'Not run',
RUNNING: 'Running',
FINISHED: 'Finished',
UPDATED: 'Processed',
SKIPPED: 'Skipped',
NOT_SUBMITTED: 'Not submitted'}
if initial:
info("Previous system state reported from .niss file "
"(running jobs may have already finished):")
else:
info("Current system status:")
for step, state in self.state.items():
if step == 'gcmc':
if not state:
info(" * State of GCMC: Not run")
else:
for point, job in state.items():
if job[0] is RUNNING:
info(" * GCMC %s: Running, jobid: %s" %
(point, job[1]))
else:
info(" * GCMC %s: %s" %
(point, valid_states[job[0]]))
elif step == 'absl':
if not state:
info(" * State of ABSL: Not run")
else:
# ABSL used to be multiple jobs, still treat jobid as list
for point, jobs in state.items():
if jobs[0] is RUNNING:
info(" * ABSL %s: Running, jobids: %s" %
(point, ",".join('%s' % x for x in jobs[1])))
else:
info(" * ABSL %s: %s" %
(point, valid_states[jobs[0]]))
elif state[0] is RUNNING:
info(" * State of %s: Running, jobid: %s" % (step, state[1]))
else:
info(" * State of %s: %s" % (step, valid_states[state[0]]))
[docs] def send_to_database(self):
"""If using a database, store the results"""
# we can skip if not using a database
if not 'sql' in self.options.get('initial_structure_format'):
return
# extract the database and structure names
db_params = self.options.get('job_name').split('.')
# import this here so sqlalchemy is not required generally
from backend.sql import AlchemyBackend
database = AlchemyBackend(db_params[0])
info("Storing results in database")
database.store_results(db_params[1], int(db_params[2]), self.structure)
debug("Database finished")
[docs] def post_summary(self):
"""Summarise any results for GCMC, properties..."""
# Also includes excess calculation if void volume calculated
# N = pV/RT
all_csvs = {}
R_GAS = 8.3144621E25 / NAVOGADRO # A^3 bar K-1 molecule
job_name = self.options.get('job_name')
info("Summary of GCMC results")
info("======= ======= ======= ======= =======")
nguests = len(self.structure.guests)
for idx, guest in enumerate(self.structure.guests):
# Determine whether we can calculate the excess for
# any different probes
void_volume = self.structure.sub_property('void_volume')
he_excess, guest_excess = "", ""
if 1.0 in void_volume:
he_excess = 'He-xs-molc/uc,He-xs-mmol/g,He-xs-v/v,He-xs-wt%,'
if hasattr(guest, 'probe_radius'):
if guest.probe_radius != 1.0 and guest.probe_radius in void_volume:
guest_excess = 'xs-molc/uc,xs-mmol/g,xs-v/v,xs-wt%,'
if hasattr(guest, 'c_v') and guest.c_v:
#TODO(tdaff): Make standard in 2.0
# makes sure that c_v is there and not empty
cv_header = "C_v,stdev,"
else:
cv_header = ""
if hasattr(guest, 'fugacities') and guest.fugacities:
fuga_header = "f/bar,"
else:
fuga_header = ""
# Generate headers separately
csv = ["#T/K,p/bar,molc/uc,mmol/g,stdev,",
"v/v,stdev,wt%,stdev,hoa/kcal/mol,stdev,",
guest_excess, he_excess, cv_header, fuga_header,
",".join("p(g%i)" % gidx for gidx in range(nguests)), "\n"]
info(guest.name)
info("---------------------------------------")
info("molc/uc mmol/g vstp/v hoa T_P")
info("======= ======= ======= ======= =======")
for tp_point in sorted(guest.uptake):
# <N>, sd, supercell
uptake = guest.uptake[tp_point]
uptake = [uptake[0]/uptake[2], uptake[1]/uptake[2]]
hoa = guest.hoa[tp_point]
# uptake in mmol/g
muptake = 1000*uptake[0]/self.structure.weight
muptake_stdev = 1000*uptake[1]/self.structure.weight
# volumetric uptake
vuptake = (guest.molar_volume*uptake[0]/
(6.023E-4*self.structure.volume))
vuptake_stdev = (guest.molar_volume*uptake[1]/
(6.023E-4*self.structure.volume))
# weight percent uptake
wtpc = 100*(1 - self.structure.weight/
(self.structure.weight + uptake[0]*guest.weight))
wtpc_stdev = 100*(1 - self.structure.weight/
(self.structure.weight + uptake[1]*guest.weight))
info("%7.2f %7.2f %7.2f %7.2f %s" % (
uptake[0], muptake, vuptake, hoa[0],
("T=%s" % tp_point[0] +
''.join(['P=%s' % x for x in tp_point[1]]))))
csv.append("%f,%f,%f,%f,%f,%f,%f,%f,%f,%f,%f," % (
tp_point[0], tp_point[1][idx], uptake[0],
muptake, muptake_stdev,
vuptake, vuptake_stdev,
wtpc, wtpc_stdev,
hoa[0], hoa[1]))
if guest_excess:
guest_void = void_volume[guest.probe_radius]
n_bulk = (tp_point[1][idx]*guest_void)/(tp_point[0]*R_GAS)
xs_uptake = uptake[0]-n_bulk
# uptake in mmol/g
muptake = 1000*xs_uptake/self.structure.weight
# volumetric uptake
vuptake = (guest.molar_volume*xs_uptake/
(6.023E-4*self.structure.volume))
# weight percent uptake
wtpc = 100*(1 - self.structure.weight/
(self.structure.weight + xs_uptake*guest.weight))
csv.append("%f,%f,%f,%f," % (
xs_uptake, muptake, vuptake, wtpc,))
if he_excess:
guest_void = void_volume[1.0]
n_bulk = (tp_point[1][idx]*guest_void)/(tp_point[0]*R_GAS)
xs_uptake = uptake[0]-n_bulk
# uptake in mmol/g
muptake = 1000*xs_uptake/self.structure.weight
# volumetric uptake
vuptake = (guest.molar_volume*xs_uptake/
(6.023E-4*self.structure.volume))
# weight percent uptake
wtpc = 100*(1 - self.structure.weight/
(self.structure.weight + xs_uptake*guest.weight))
csv.append("%f,%f,%f,%f," % (
xs_uptake, muptake, vuptake, wtpc,))
if cv_header:
csv.append("%f,%f," % (guest.c_v[tp_point]))
if fuga_header:
try:
csv.append("%f," % (guest.fugacities[tp_point]))
except KeyError:
# Assume it was done without fugacity correction
csv.append("%f," % tp_point[1][idx])
# list all the other guest pressures and start a new line
csv.append(",".join("%f" % x for x in tp_point[1]) + "\n")
csv_filename = '%s-%s.csv' % (job_name, guest.ident)
csv_file = open(csv_filename, 'w')
csv_file.writelines(csv)
csv_file.close()
all_csvs[csv_filename] = "".join(csv)
info("======= ======= ======= ======= =======")
info("Structure properties")
# Internally calculated surface area
surf_area_results = self.structure.surface_area()
if surf_area_results:
info("Summary of faps surface areas")
info("========= ========= ========= =========")
info(" radius/A total/A^2 m^2/cm^3 m^2/g")
info("========= ========= ========= =========")
for probe, area in surf_area_results.items():
vol_area = 1E4*area/self.structure.volume
specific_area = NAVOGADRO*area/(1E20*self.structure.weight)
info("%9.3f %9.2f %9.2f %9.2f" %
(probe, area, vol_area, specific_area))
info("========= ========= ========= =========")
# Messy, but check individual properties that might not be there
# and dump them to the screen
info("weight (u): %f" % self.structure.weight)
if hasattr(self.structure, 'pore_diameter'):
info("pores (A): %f %f %f" % self.structure.pore_diameter)
channel_results = self.structure.sub_property('dimensionality')
if channel_results:
for probe, channels in channel_results.items():
info(("channels %.2f probe: " % probe) +
" ".join("%i" % x for x in channels))
# The table is copied from above as it does some calculating
surf_area_results = self.structure.sub_property('zeo_surface_area')
if surf_area_results:
info("Summary of zeo++ surface areas")
info("========= ========= ========= =========")
info(" radius/A total/A^2 m^2/cm^3 m^2/g")
info("========= ========= ========= =========")
for probe, area in surf_area_results.items():
vol_area = 1E4*area/self.structure.volume
specific_area = NAVOGADRO*area/(1E20*self.structure.weight)
info("%9.3f %9.2f %9.2f %9.2f" %
(probe, area, vol_area, specific_area))
info("========= ========= ========= =========")
info("volume (A^3): %f" % self.structure.volume)
void_volume_results = self.structure.sub_property('void_volume')
if surf_area_results:
info("Summary of zeo++ void volumes")
info("========= ========= ========= =========")
info(" radius/A total/A^3 fraction cm^3/g")
info("========= ========= ========= =========")
for probe, void in void_volume_results.items():
void_fraction = void/self.structure.volume
specific_area = NAVOGADRO*void/(1E24*self.structure.weight)
info("%9.3f %9.2f %9.5f %9.4f" %
(probe, void, void_fraction, specific_area))
info("========= ========= ========= =========")
pxrd = self.structure.sub_property('pxrd')
if pxrd:
info("Summary of PXRD; see properties for cpi file")
for probe, pattern in pxrd.items():
info("%s Powder XRD:" % probe)
plot = [['|']*21]
# 1000 points makes this 75 columns wide
averaged = [sum(pattern[x:x+20])/20.0
for x in range(0, 1000, 20)]
# make peaks horizontal first
peak = max(averaged)
for point in averaged:
height = int(round(15*point/peak))
plot.append([' ']*(15-height) + ['|']*height + ['-'])
# transpose for printing
plot = zip(*plot)
for line in plot:
info(''.join(line))
# Email at the end so everything is in the .flog
self.email(all_csvs)
[docs] def email(self, csvs=None):
"""Send an email, if one has not already been sent."""
job_name = self.options.get('job_name')
email_addresses = self.options.gettuple('email')
if email_addresses:
info("Emailing results to %s" % ", ".join(email_addresses))
else:
# nobody to email to, why bother?
return
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
# Construct an email, thanks documentation!
sender = 'Faps Master <noreply@uottawa.ca>'
outer = MIMEMultipart()
outer['Subject'] = 'Results for faps job on %s' % job_name
outer['To'] = ', '.join(email_addresses)
outer['From'] = sender
outer.preamble = 'This is a MIME multipart message\n'
# Just attach all the csv files
if csvs is not None:
for csv in csvs:
msg = MIMEText(csvs[csv], _subtype='csv')
msg.add_header('Content-Disposition', 'attachment',
filename=csv)
outer.attach(msg)
# Include a cif file
msg_cif = MIMEText("".join(self.structure.to_cif()))
msg_cif.add_header('Content-Disposition', 'attachment',
filename="%s.faps.cif" % job_name)
outer.attach(msg_cif)
# And the flog file
try:
flog = open("%s.flog" % job_name)
msg_flog = MIMEText(flog.read())
flog.close()
msg_flog.add_header('Content-Disposition', 'attachment',
filename="%s.flog" % job_name)
outer.attach(msg_flog)
except IOError:
# Error reading the file, don't care
pass
# Send via local SMTP server
s = smtplib.SMTP('localhost')
s.sendmail(sender, email_addresses, outer.as_string())
s.quit()
[docs] def step_force_field(self):
"""Check the force field step of the calculation."""
end_after = False
if 'ff_opt' not in self.state:
self.state['ff_opt'] = (NOT_RUN, False)
if self.state['ff_opt'][0] not in [UPDATED, SKIPPED]:
if self.options.getbool('no_force_field_opt'):
info("Skipping force field optimisation")
self.state['ff_opt'] = (SKIPPED, False)
elif self.state['ff_opt'][0] == RUNNING:
job_state = self.job_handler.jobcheck(self.state['ff_opt'][1])
if not job_state:
info("Queue reports force field optimisation has finished")
self.state['ff_opt'] = (FINISHED, False)
else:
# Still running
info("Force field optimisation still in progress")
end_after = True
if self.state['ff_opt'][0] == NOT_RUN or 'ff_opt' in self.options.args:
jobid = self.run_ff_opt()
sys_argv_strip('ff_opt')
end_after = self.postrun(jobid)
self.dump_state()
if self.state['ff_opt'][0] == FINISHED:
self.structure.update_pos(self.options.get('ff_opt_code'),
options=self.options)
self.state['ff_opt'] = (UPDATED, False)
self.dump_state()
# If postrun is submitted then this script is done!
if end_after:
terminate(0)
[docs] def step_dft(self):
"""Check the DFT step of the calculation."""
end_after = False
if self.state['dft'][0] not in [UPDATED, SKIPPED]:
if self.options.getbool('no_dft'):
info("Skipping DFT step completely")
info("Job might fail later if you need the ESP")
self.state['dft'] = (SKIPPED, False)
elif self.state['dft'][0] == RUNNING:
job_state = self.job_handler.jobcheck(self.state['dft'][1])
if not job_state:
info("Queue reports DFT step has finished")
self.state['dft'] = (FINISHED, False)
else:
# Still running
info("DFT still in progress")
end_after = True
if self.state['dft'][0] == NOT_RUN or 'dft' in self.options.args:
jobid = self.run_dft()
sys_argv_strip('dft')
end_after = self.postrun(jobid)
self.dump_state()
if self.state['dft'][0] == FINISHED:
self.structure.update_pos(self.options.get('dft_code'))
self.state['dft'] = (UPDATED, False)
self.dump_state()
# If postrun is submitted then this script is done!
if end_after:
terminate(0)
[docs] def step_charges(self):
"""Check the charge step of the calculation."""
end_after = False
if self.state['charges'][0] not in [UPDATED, SKIPPED]:
if self.options.getbool('no_charges'):
info("Skipping charge calculation")
self.state['charges'] = (SKIPPED, False)
elif self.state['charges'][0] == RUNNING:
job_state = self.job_handler.jobcheck(self.state['charges'][1])
if not job_state:
info("Queue reports charge calculation has finished")
self.state['charges'] = (FINISHED, False)
else:
info("Charge calculation still running")
end_after = True
if self.state['charges'][0] == NOT_RUN or 'charges' in self.options.args:
jobid = self.run_charges()
sys_argv_strip('charges')
end_after = self.postrun(jobid)
self.dump_state()
if self.state['charges'][0] == FINISHED:
self.structure.update_charges(self.options.get('charge_method'),
self.options)
self.state['charges'] = (UPDATED, False)
self.dump_state()
# If postrun is submitted then this script is done!
if end_after:
terminate(0)
[docs] def step_gcmc(self):
"""Check the GCMC step of the calculation."""
end_after = False
jobids = {}
postrun_ids = []
if self.options.getbool('no_gcmc'):
info("Skipping GCMC simulation")
return
elif not self.state['gcmc'] or 'gcmc' in self.options.args:
# The dictionary is empty before any runs
info("Starting gcmc step")
jobids = self.run_fastmc()
sys_argv_strip('gcmc')
self.dump_state()
for tp_point, jobid in jobids.items():
if jobid is True:
self.state['gcmc'][tp_point] = (FINISHED, False)
elif jobid is False:
self.state['gcmc'][tp_point] = (SKIPPED, False)
else:
info("FastMC job in queue. Jobid: %s" % jobid)
self.state['gcmc'][tp_point] = (RUNNING, jobid)
postrun_ids.append(jobid)
# unfinished GCMCs
end_after = True
else:
# when the loop completes write out the state
self.dump_state()
for tp_point in self.state['gcmc']:
tp_state = self.state['gcmc'][tp_point]
if tp_state[0] == RUNNING:
new_state = self.job_handler.jobcheck(tp_state[1])
if not new_state:
info("Queue reports GCMC %s finished" % (tp_point,))
# need to know we have finished to update below
tp_state = (FINISHED, False)
self.state['gcmc'][tp_point] = tp_state
self.dump_state()
else:
info("GCMC %s still running" % (tp_point,))
# unfinished GCMC so exit later
end_after = True
# any states that need to be updated should have been done by now
if tp_state[0] == FINISHED:
startdir = os.getcwd()
# wooki seems slow to copy output files back
# so we give them a few chances to appear
max_attempts = 6
for attempt_count in range(max_attempts):
time.sleep(attempt_count)
try:
self.structure.update_gcmc(tp_point, self.options)
self.state['gcmc'][tp_point] = (UPDATED, False)
self.dump_state()
break
except IOError:
os.chdir(startdir)
else:
error('OUTPUT file never appeared')
if postrun_ids:
self.postrun(postrun_ids)
if end_after:
info("GCMC run has not finished completely")
terminate(0)
[docs] def step_properties(self):
"""Run the properties calculations if required."""
if self.state['properties'][0] not in [UPDATED, SKIPPED]:
if self.options.getbool('no_properties'):
info("Skipping all properties calculations")
self.state['properties'] = (SKIPPED, False)
if self.state['properties'][0] == NOT_RUN or 'properties' in self.options.args:
self.calculate_properties()
self.state['properties'] = (UPDATED, False)
self.dump_state()
[docs] def step_absl(self):
"""Check the binding site step of the calculation."""
end_after = False
jobids = {}
postrun_ids = []
# check for old simulations with no absl state
# TODO(tdaff): remove eventually
if 'absl' not in self.state:
self.state['absl'] = {}
if self.options.getbool('no_absl'):
info("Skipping ABSL calculation")
return
elif self.options.getbool('no_gcmc'):
info("no_gcmc requested, can't do ABSL, skipping")
return
elif not self.options.getbool('mc_probability_plot'):
info("No probability plot; Skipping ABSL calculation")
return
elif not self.options.getbool('find_maxima'):
info("No TABASCO maxima; Skipping ABSL calculation")
return
elif not self.state['absl'] or 'absl' in self.options.args:
# The dictionary is empty before any runs
info("Starting absl step")
jobids = self.run_absl()
sys_argv_strip('absl')
self.dump_state()
for tp_point, jobid in jobids.items():
if set(jobid) == set([True]):
self.state['absl'][tp_point] = (FINISHED, False)
elif set(jobid) == set([False]):
self.state['absl'][tp_point] = (SKIPPED, False)
else:
info("ABSL job in queue. Jobid: %s" % jobid)
self.state['absl'][tp_point] = (RUNNING, jobid)
postrun_ids.extend(jobid)
# unfinished ABSL calculations
end_after = True
else:
# when the loop completes write out the state
self.dump_state()
for tp_point in self.state['absl']:
tp_state = self.state['absl'][tp_point]
if tp_state[0] == RUNNING:
new_state = set([self.job_handler.jobcheck(job)
for job in tp_state[1]])
if new_state == set([False]):
info("Queue reports ABSL %s finished" % (tp_point,))
# need to know we have finished to update below
tp_state = (FINISHED, False)
self.state['absl'][tp_point] = tp_state
self.dump_state()
else:
info("ABSL %s still running" % (tp_point,))
# unfinished ABSL so exit later
end_after = True
# any states that need to be updated should have been done by now
if tp_state[0] == FINISHED:
startdir = os.getcwd()
# wooki seems slow to copy output files back
# so we give them a few chances to appear
max_attempts = 6
for attempt_count in range(max_attempts):
time.sleep(attempt_count)
try:
self.structure.update_absl(tp_point, self.options)
self.state['absl'][tp_point] = (UPDATED, False)
self.dump_state()
break
except IOError:
os.chdir(startdir)
else:
#TODO(tdaff): does this matter here?
error('ABSL output never appeared')
if postrun_ids:
self.postrun(postrun_ids)
if end_after:
info("ABSL run has not finished completely")
terminate(0)
[docs] def import_old(self):
"""Try and import any data from previous stopped simulation."""
job_name = self.options.get('job_name')
job_dir = self.options.get('job_dir')
try:
self.structure.from_file(
job_name,
self.options.get('initial_structure_format'),
self.options)
warning("Ensure that order_atom_types is on for pre-1.4 data")
if self.options.getbool('order_atom_types'):
info("Forcing atom re-ordering by types")
self.structure.order_by_types()
self.state['init'] = (UPDATED, False)
except IOError:
info("No initial structure found to import")
try:
self.structure.update_pos(self.options.get('ff_opt_code'))
self.state['ff_opt'] = (UPDATED, False)
except IOError:
info("No force field optimised structure found to import")
try:
self.structure.update_pos(self.options.get('dft_code'))
self.state['dft'] = (UPDATED, False)
except IOError:
info("No optimized structure found to import")
try:
self.structure.update_charges(self.options.get('charge_method'),
self.options)
self.state['charges'] = (UPDATED, False)
except IOError:
info("No charges found to import")
# Need to generate supercell here on import so that it is set, and
# is based on the cell from dft, if changed
self.structure.gen_supercell(self.options)
guests = [Guest(x) for x in self.options.gettuple('guests')]
if not same_guests(self.structure.guests, guests):
info("Replacing old guests with %s" % " ".join(guest.ident for
guest in guests))
self.structure.guests = guests
else:
# use existing guests that may have data
debug("Retaining previous guests")
guests = self.structure.guests
temps = self.options.gettuple('mc_temperature', float)
presses = self.options.gettuple('mc_pressure', float)
indivs = self.options.gettuple('mc_state_points', float)
for tp_point in state_points(temps, presses, indivs, len(guests)):
try:
self.structure.update_gcmc(tp_point, self.options)
self.state['gcmc'][tp_point] = (UPDATED, False)
except (IOError, OSError):
info("GCMC point %s not found" % str(tp_point))
# Reset directory at end
os.chdir(job_dir)
[docs] def postrun(self, jobid):
"""Determine if we need the job handler to post submit itself."""
# update the job tracker
if jobid is not False and jobid is not True:
if self.options.getbool('run_all'):
debug('Submitting postrun script')
os.chdir(self.options.get('job_dir'))
self.job_handler.postrun(jobid)
return True
else:
debug('Postrun script not submitted')
return False
else:
return False
[docs] def run_ff_opt(self):
"""Prepare the system and run the selected force field optimisation."""
ff_opt_code = self.options.get('ff_opt_code')
info("Checking connectivity/types")
if self.structure.check_connectivity():
if self.options.getbool('infer_types_from_bonds'):
self.structure.gen_types_from_bonds()
else:
warning("No types; try with infer_types_from_bonds")
else:
info("Bonds and types, provided")
info("Running a %s calculation" % ff_opt_code)
if ff_opt_code == 'gromacs':
jobid = self.run_optimise_gromacs()
elif ff_opt_code == 'gulp':
jobid = self.run_optimise_gulp()
else:
critical("Unknown force field method: %s" % ff_opt_code)
terminate(91)
if jobid is True:
# job run and finished
self.state['ff_opt'] = (FINISHED, False)
else:
info("Running %s job in queue. Jobid: %s" % (ff_opt_code, jobid))
self.state['ff_opt'] = (RUNNING, jobid)
return jobid
[docs] def run_dft(self):
"""Select correct method for running the dft/optim."""
dft_code = self.options.get('dft_code')
info("Running a %s calculation" % dft_code)
if dft_code == 'vasp':
jobid = self.run_vasp()
elif dft_code == 'siesta':
jobid = self.run_siesta()
else:
critical("Unknown dft method: %s" % dft_code)
terminate(92)
# update the job tracker
#if jobid is False:
# self.state['dft'] = (NOT_SUBMITTED, False)
# submission skipped
if jobid is True:
# job run and finished
self.state['dft'] = (FINISHED, False)
else:
info("Running %s job in queue. Jobid: %s" % (dft_code, jobid))
self.state['dft'] = (RUNNING, jobid)
return jobid
[docs] def run_charges(self):
"""Select correct charge processing methods."""
chg_method = self.options.get('charge_method')
info("Calculating charges with %s" % chg_method)
if chg_method == 'repeat':
# Get ESP
self.esp_to_cube()
jobid = self.run_repeat()
elif chg_method == 'gulp':
jobid = self.run_qeq_gulp()
elif chg_method == 'egulp':
jobid = self.run_qeq_egulp()
else:
critical("Unknown charge calculation method: %s" % chg_method)
terminate(93)
# update the job tracker
if jobid is True:
# job run and finished
self.state['charges'] = (FINISHED, False)
else:
info("Running %s job in queue. Jobid: %s" % (chg_method, jobid))
self.state['charges'] = (RUNNING, jobid)
return jobid
## Methods for specific codes start here
[docs] def run_optimise_gromacs(self):
"""Run GROMACS to do a UFF optimisation."""
job_name = self.options.get('job_name')
optim_code = self.options.get('ff_opt_code')
g_verbose = self.options.getbool('gromacs_verbose')
# Run in a subdirectory
optim_dir = path.join(self.options.get('job_dir'),
'faps_%s_%s' % (job_name, optim_code))
mkdirs(optim_dir)
os.chdir(optim_dir)
debug("Running in %s" % optim_dir)
metal_geom = self.options.get('gromacs_metal_geometry')
gro_file, top_file, itp_file = self.structure.to_gromacs(metal_geom)
# We use default names so we don't have to specify
# anything extra on the command line
filetemp = open('conf.gro', 'w')
filetemp.writelines(gro_file)
filetemp.close()
filetemp = open('topol.top', 'w')
filetemp.writelines(top_file)
filetemp.close()
filetemp = open('topol.itp', 'w')
filetemp.writelines(itp_file)
filetemp.close()
filetemp = open('grompp.mdp', 'w')
filetemp.writelines(mk_gromacs_mdp(self.structure.cell, mode='bfgs',
verbose=g_verbose))
filetemp.close()
filetemp = open('pcoupl.mdp', 'w')
filetemp.writelines(mk_gromacs_mdp(self.structure.cell, mode='pcoupl',
verbose=g_verbose))
filetemp.close()
# prepare for simulation!
# Codes we need; comment out the trjconv if being quiet
grompp = self.options.get('grompp_exe')
mdrun = self.options.get('mdrun_exe')
if g_verbose:
trjconv = "echo 0 | %s" % self.options.get('trjconv_exe')
else:
trjconv = "#echo 0 | %s" % self.options.get('trjconv_exe')
# everything runs in a script -- to many steps otherwise
# only make the g96 file at the end so we can tell if it breaks
gromacs_faps = open('gromacs_faps', 'w')
gromacs_faps.writelines([
"#!/bin/bash\n\n",
"export OMP_NUM_THREADS=1\n\n",
"# preprocess first bfgs\n",
"%s -maxwarn 2 &>> g.log\n\n" % grompp,
"# bfgs step\n",
"%s -nt 1 &>> g.log\n\n" % mdrun,
"%s -o traject1.gro -f traj.trr &>> g.log\n" % trjconv,
"# overwrite with pcoupl step\n",
"%s -maxwarn 2 -t traj.trr -f pcoupl.mdp &>> g.log\n\n" % grompp,
"# pcoupl step\n",
"%s -nt 1 &>> g.log\n\n" % mdrun,
"%s -o traject2.gro -f traj.trr &>> g.log\n" % trjconv,
"# overwrite with final bfgs\n",
"%s -maxwarn 2 -t traj.trr &>> g.log\n\n" % grompp,
"# generate final structure\n",
"%s -nt 1 -c confout.g96 &>> g.log\n" % mdrun,
"%s -o traject3.gro -f traj.trr &>> g.log\n" % trjconv])
gromacs_faps.close()
os.chmod('gromacs_faps', 0o755)
# Leave the run to the shell
info("Generated gromcas inputs and run script")
if self.options.getbool('no_submit'):
info("GROMACS input files generated; skipping job submission")
jobid = False
else:
jobid = self.job_handler.submit(optim_code, self.options)
# Tidy up at the end
os.chdir(self.options.get('job_dir'))
return jobid
[docs] def run_optimise_gulp(self):
"""Run GULP to do a UFF optimisation."""
job_name = self.options.get('job_name')
optim_code = 'gulp'
terse = self.options.getbool('gulp_terse')
# put an opt in path to distinguish from the charge calculation
optim_dir = path.join(self.options.get('job_dir'),
'faps_%s_%s_opt' % (job_name, optim_code))
mkdirs(optim_dir)
os.chdir(optim_dir)
debug("Running in %s" % optim_dir)
filetemp = open('%s.gin' % job_name, 'w')
filetemp.writelines(self.structure.to_gulp(optimise=True, terse=terse))
filetemp.close()
if 'GULP_LIB' not in os.environ:
warning("gulp library directory not set; optimisation might fail")
if self.options.getbool('no_submit'):
info("GULP input files generated; skipping job submission")
jobid = False
else:
jobid = self.job_handler.submit(optim_code, self.options,
input_file='%s.gin' % job_name)
# Tidy up at the end
os.chdir(self.options.get('job_dir'))
return jobid
[docs] def run_vasp(self):
"""Make inputs and run vasp job."""
job_name = self.options.get('job_name')
nproc = self.options.getint('vasp_ncpu')
# Keep things tidy in a subdirectory
dft_code = self.options.get('dft_code')
vasp_dir = path.join(self.options.get('job_dir'),
'faps_%s_%s' % (job_name, dft_code))
mkdirs(vasp_dir)
os.chdir(vasp_dir)
debug("Running in %s" % vasp_dir)
info("Running on %i nodes" % nproc)
filetemp = open("POSCAR", "w")
filetemp.writelines(self.structure.to_vasp(self.options))
filetemp.close()
esp_grid = self.esp_grid
#TODO(jlo): self.structure.types gives you each type
# e.g ['C', 'C', 'O'... ]
# self.options.get('...') to get charge or something set a default
# in default.ini
# calcualte nelect
filetemp = open("INCAR", "w")
if self.esp_reduced:
# Let VASP do the grid if we don't need to
filetemp.writelines(mk_incar(self.options, esp_grid=esp_grid))
else:
filetemp.writelines(mk_incar(self.options))
filetemp.close()
filetemp = open("KPOINTS", "w")
filetemp.writelines(mk_kpoints(self.options.gettuple('kpoints', int)))
filetemp.close()
potcar_types = unique(self.structure.types)
filetemp = open("POTCAR", "w")
potcar_dir = self.options.get('potcar_dir')
previous_type = ""
for at_type in self.structure.types:
if at_type == previous_type:
continue
# Try and get the preferred POTCARS
debug("Using %s pseudopotential for %s" %
(VASP_PSEUDO_PREF.get(at_type, at_type), at_type))
potcar_src = path.join(potcar_dir,
VASP_PSEUDO_PREF.get(at_type, at_type),
"POTCAR")
shutil.copyfileobj(open(potcar_src), filetemp)
previous_type = at_type
filetemp.close()
if self.options.getbool('no_submit'):
info("Vasp input files generated; skipping job submission")
# act as if job completed
jobid = False
else:
self.job_handler.env(dft_code, options=self.options)
jobid = self.job_handler.submit(dft_code, self.options)
# Tidy up at the end and pass on job id
os.chdir(self.options.get('job_dir'))
return jobid
[docs] def run_siesta(self):
"""Make siesta input and run job."""
job_name = self.options.get('job_name')
nproc = self.options.getint('siesta_ncpu')
# Keep things tidy in a subdirectory
dft_code = self.options.get('dft_code')
siesta_dir = path.join(self.options.get('job_dir'),
'faps_%s_%s' % (job_name, dft_code))
mkdirs(siesta_dir)
os.chdir(siesta_dir)
debug("Running in %s" % siesta_dir)
info("Running on %i nodes" % nproc)
filetemp = open('%s.fdf' % job_name, 'w')
filetemp.writelines(self.structure.to_siesta(self.options))
filetemp.close()
psf_types = unique(self.structure.types)
psf_dir = self.options.get('psf_dir')
for at_type in psf_types:
psf_atm = '%s.psf' % at_type
psf_src = path.join(psf_dir, psf_atm)
psf_dest = path.join(siesta_dir, psf_atm)
try:
if not path.exists(psf_atm):
os.symlink(psf_src, psf_dest)
# symlinks not available pre 3.2 on windows
except AttributeError:
shutil.copy(psf_src, siesta_dir)
filetemp.close()
if self.options.getbool('no_submit'):
info("Siesta input files generated; skipping job submission")
jobid = False
else:
# sharcnet does weird things for siesta
self.job_handler.env(dft_code, options=self.options)
jobid = self.job_handler.submit(dft_code, self.options,
input_file='%s.fdf' % job_name)
# Tidy up at the end
os.chdir(self.options.get('job_dir'))
return jobid
[docs] def run_qeq_gulp(self, fitting=False):
"""Run GULP to calculate charge equilibration charges."""
job_name = self.options.get('job_name')
qeq_code = 'gulp'
if fitting:
qeq_dir = path.join(self.options.get('job_dir'),
'faps_%s_%s_fit' % (job_name, qeq_code))
else:
qeq_dir = path.join(self.options.get('job_dir'),
'faps_%s_%s' % (job_name, qeq_code))
mkdirs(qeq_dir)
os.chdir(qeq_dir)
debug("Running in %s" % qeq_dir)
qeq_dict = parse_qeq_params(self.options.gettuple('qeq_parameters'))
filetemp = open('%s.gin' % job_name, 'w')
filetemp.writelines(self.structure.to_gulp(qeq_fit=fitting, qeq_dict=qeq_dict))
filetemp.close()
if self.options.getbool('no_submit'):
info("GULP input files generated; skipping job submission")
jobid = False
elif fitting:
jobid = self.job_handler.submit(qeq_code, self.options,
input_file='%s.gin' % job_name)
info("Running GULP fitting job in queue. Jobid: %s" % jobid)
self.state['qeq_fit'] = (RUNNING, jobid)
else:
jobid = self.job_handler.submit(qeq_code, self.options,
input_file='%s.gin' % job_name)
# Tidy up at the end
os.chdir(self.options.get('job_dir'))
return jobid
[docs] def run_qeq_egulp(self):
"""Run EGULP to calculate charge equilibration charges."""
job_name = self.options.get('job_name')
qeq_code = 'egulp'
qeq_dir = path.join(self.options.get('job_dir'),
'faps_%s_%s' % (job_name, qeq_code))
typed_atoms = self.options.getbool('egulp_typed_atoms')
mkdirs(qeq_dir)
os.chdir(qeq_dir)
debug("Running in %s" % qeq_dir)
filetemp = open('%s.geo' % job_name, 'w')
filetemp.writelines(self.structure.to_egulp(typed_atoms))
filetemp.close()
# EGULP defaults to GULP parameters if not specified
egulp_parameters = self.options.gettuple('qeq_parameters')
if 'mepo' in egulp_parameters:
from parameters import mepo_qeq
info("Using MEPO-QEq base parameters")
egulp_parameters = [x for x in egulp_parameters if x != 'mepo']
for element, parameters in mepo_qeq.items():
# Put MEPO parameters at the beginning so they can be
# overridden
plist = [element, parameters[0], parameters[1]]
egulp_parameters = plist + egulp_parameters
if not egulp_parameters:
# parameters are mandatory in new egulp
egulp_parameters = ('H', QEQ_PARAMS['H'][0], QEQ_PARAMS['H'][1])
else:
info("Custom EGULP parameters selected")
filetemp = open('%s.param' % job_name, 'w')
filetemp.writelines(mk_egulp_params(egulp_parameters))
filetemp.close()
filetemp = open('%s.ini' % job_name, 'w')
filetemp.writelines(mk_egulp_ini(self.options))
filetemp.close()
egulp_args = ['%s.geo' % job_name,
'%s.param' % job_name,
'%s.ini' % job_name]
if self.options.getbool('no_submit'):
info("EGULP input files generated; skipping job submission")
jobid = False
else:
jobid = self.job_handler.submit(qeq_code, self.options,
input_args=egulp_args)
# Tidy up at the end
os.chdir(self.options.get('job_dir'))
return jobid
[docs] def esp_to_cube(self):
"""Make the cube for repeat input."""
job_name = self.options.get('job_name')
# No case where the esp source will be different from the dft code
esp_src = self.options.get('dft_code')
repeat_dir = path.join(self.options.get('job_dir'),
'faps_%s_repeat' % job_name)
mkdirs(repeat_dir)
src_dir = path.join(self.options.get('job_dir'),
'faps_%s_%s' % (job_name, esp_src))
os.chdir(src_dir)
if esp_src == 'vasp':
esp_to_cube_args = shlex.split(self.options.get('vasp_to_cube'))
info("Converting vasp esp to cube, this might take a minute...")
try:
fix_vasp_wrapped_types('LOCPOT')
except IOError:
error("Couldn't find the LOCPOT file; did VASP fail?")
submit = subprocess.Popen(esp_to_cube_args)
submit.wait()
# Cube should have job_name, but can get truncated;
# therefore we try to look for it first
cube_file = glob('*.cube')
if len(cube_file) == 1:
cube_file = cube_file[0]
elif len(cube_file) > 1:
cube_file = cube_file[0]
warning("More or than one .cube found; using %s" % cube_file)
else:
error("No cube files found; check vasp_to_cube output")
# Move it to the repeat directory and give a proper name
move_and_overwrite(cube_file,
path.join(repeat_dir, job_name + '.cube'))
unneeded_files = self.options.gettuple('vasp_delete_files')
remove_files(unneeded_files)
keep_files = self.options.gettuple('vasp_compress_files')
compress_files(keep_files)
elif esp_src == 'siesta':
esp_to_cube_args = shlex.split(self.options.get('siesta_to_cube'))
esp_grid = self.esp_grid
info("Generating ESP grid of %ix%ix%i" % esp_grid)
siesta_to_cube_input = [
"%s\n" % job_name,
"%f %f %f\n" % (0.0, 0.0, 0.0),
"%i %i %i\n" % esp_grid]
info("Converting siesta esp to cube, this might take a minute...")
submit = subprocess.Popen(esp_to_cube_args, stdin=subprocess.PIPE)
submit.communicate(input=''.join(siesta_to_cube_input))
move_and_overwrite(job_name + '.cube', repeat_dir)
unneeded_files = self.options.gettuple('siesta_delete_files')
remove_files(unneeded_files)
keep_files = self.options.gettuple('siesta_compress_files')
compress_files(keep_files)
os.chdir(self.options.get('job_dir'))
[docs] def run_repeat(self):
"""Submit the repeat calc to the queue."""
job_name = self.options.get('job_name')
charge_code = self.options.get('charge_method')
repeat_dir = path.join(self.options.get('job_dir'),
'faps_%s_%s' % (job_name, charge_code))
mkdirs(repeat_dir)
os.chdir(repeat_dir)
if self.options.getbool('symmetry'):
mk_repeat(cube_name=job_name + '.cube', symmetry=True)
mk_connectivity_ff(self.structure.symmetry_tree)
else:
mk_repeat(cube_name=job_name + '.cube', symmetry=False)
if self.options.getbool('no_submit'):
info("REPEAT input files generated; skipping job submission")
jobid = False
else:
jobid = self.job_handler.submit(charge_code, self.options)
os.chdir(self.options.get('job_dir'))
return jobid
[docs] def run_fastmc(self):
"""Submit a fastmc job to the queue."""
job_name = self.options.get('job_name')
mc_code = self.options.get('mc_code')
# Set the guests before generating the files
# Load here as options may change in each run
# and before changing directory, or it will not find guests.lib
guests = [Guest(x) for x in self.options.gettuple('guests')]
if not same_guests(self.structure.guests, guests):
info("Replacing old guests with %s" % " ".join(guest.ident for
guest in guests))
self.structure.guests = guests
else:
# use existing guests that may have data
debug("Retaining previous guests")
guests = self.structure.guests
gcmc_dir = path.join(self.options.get('job_dir'),
'faps_%s_%s' % (job_name, mc_code))
mkdirs(gcmc_dir)
os.chdir(gcmc_dir)
config, field = self.structure.to_config_field(self.options, fastmc=True)
filetemp = open("CONFIG", "w")
filetemp.writelines(config)
filetemp.close()
filetemp = open("FIELD", "w")
filetemp.writelines(field)
filetemp.close()
temps = self.options.gettuple('mc_temperature', float)
presses = self.options.gettuple('mc_pressure', float)
indivs = self.options.gettuple('mc_state_points', float)
jobids = {}
for tp_point in state_points(temps, presses, indivs, len(guests)):
temp = tp_point[0]
press = tp_point[1]
info("Running GCMC: T=%.1f " % temp +
" ".join(["P=%.2f" % x for x in press]))
tp_path = format_tp_path(tp_point)
mkdirs(tp_path)
os.chdir(tp_path)
try_symlink(path.join('..', 'CONFIG'), 'CONFIG')
try_symlink(path.join('..', 'FIELD'), 'FIELD')
filetemp = open("CONTROL", "w")
# Calculate fugacities for the input if required
if self.options.get('equation_of_state').lower() == 'peng-robinson':
info("Using Peng-Robinson EOS gas fugacities")
ideal = {}
for guest, pressure in zip(guests, press):
if not hasattr(guest, 'species'):
try:
guest.species = Guest(guest.ident).species
except AttributeError:
error("Unable to use equation of state with guest"
"%s. Failure imminent." % guest.name)
if not hasattr(guest, 'fugacities'):
guest.fugacities = {}
ideal[guest.species] = pressure
# Apply the correction
fugacities = peng_robinson(ideal, temp)
fuga = []
for guest, pressure in zip(guests, press):
info("Fugacity correction for %s: %f bar -> %f bar" %
(guest.ident, pressure, fugacities[guest.species]))
fuga.append(fugacities[guest.species])
guest.fugacities[tp_point] = fugacities[guest.species]
# Expects single guest not in a list
if len(guests) == 1:
fuga = fuga[0]
else:
info("Using ideal gas fugacities")
for guest, pressure in zip(guests, press):
guest.fugacities[tp_point] = pressure
fuga = press
# make control with fugacities
filetemp.writelines(mk_gcmc_control(temp, fuga, self.options,
guests, self.structure.gcmc_supercell))
filetemp.close()
if self.options.getbool('no_submit'):
info("FastMC input files generated; "
"skipping job submission")
jobids[(temp, press)] = False
else:
jobid = self.job_handler.submit(mc_code, self.options)
jobids[(temp, press)] = jobid
os.chdir('..')
os.chdir(self.options.get('job_dir'))
return jobids
[docs] def run_absl(self):
"""Submit absl jobs to the queue."""
job_name = self.options.get('job_name')
guests = self.structure.guests
# fun in the gcmc directories
absl_dir = path.join(self.options.get('job_dir'),
'faps_%s_%s' % (job_name, 'absl'))
mkdirs(absl_dir)
os.chdir(absl_dir)
temps = self.options.gettuple('mc_temperature', float)
presses = self.options.gettuple('mc_pressure', float)
indivs = self.options.gettuple('mc_state_points', float)
jobids = {}
for tp_point in state_points(temps, presses, indivs, len(guests)):
temp = tp_point[0]
press = tp_point[1]
info("Running ABSL: T=%.1f " % temp +
" ".join(["P=%.2f" % x for x in press]))
tp_path = format_tp_path(tp_point)
mkdirs(tp_path)
os.chdir(tp_path)
# make the dummy;
dummy_guest = self.structure.guests[0]
dummy_include = {dummy_guest.ident: [[[x, 0.0, 0.0] for x in
range(dummy_guest.natoms)]]}
with open("CONFIG", "w") as config:
with open("FIELD", "w") as field:
dlp_files = self.structure.to_config_field(
self.options, include_guests=dummy_include, dummy=True)
config.writelines(dlp_files[0])
field.writelines(dlp_files[1])
with open("CONTROL", "w") as control:
control.writelines(mk_dl_poly_control(self.options, dummy=True))
# Keep track of directories so that we can run jobs at once
individual_directories = ['.']
# calculate binding sites here and submit
for guest in self.structure.guests:
binding_sites = calculate_binding_sites(guest, tp_point,
self.structure.cell)
if hasattr(guest, 'binding_sites'):
guest.binding_sites[tp_point] = binding_sites
else:
guest.binding_sites = {tp_point: binding_sites}
for bs_idx, binding_site in enumerate(binding_sites):
bs_directory = "%s_bs_%04d" % (guest.ident, bs_idx)
mkdirs(bs_directory)
os.chdir(bs_directory)
include_guests = {guest.ident: [guest.aligned_to(*binding_site)]}
dlp_files = self.structure.to_config_field(
self.options, include_guests=include_guests)
with open("CONFIG", "w") as config:
config.writelines(dlp_files[0])
if bs_idx > 0:
# symlink on FIELD to save space
zero_directory = "%s_bs_%04d" % (guest.ident, 0)
try_symlink(path.join('..', zero_directory, 'FIELD'),
'FIELD')
try_symlink(path.join('..', zero_directory, 'CONTROL'),
'CONTROL')
else:
# Always put the FIELD and CONTORL in zero to symlink to
with open("FIELD", "w") as field:
field.writelines(dlp_files[1])
with open("CONTROL", "w") as control:
control.writelines(mk_dl_poly_control(self.options))
individual_directories.append(bs_directory)
os.chdir('..')
# Make the script to run all the jobs now, using the individual
# directories
dl_poly_exe = self.options.get('dl_poly_exe')
# Try and delete REVIVE files while running the DL_POLY jobs
# we need to keep a few for processing, like OUTPUT, REVCON, CONFIG
# STATIS and FIELD and CONTROL will hopefully be symlinks, so we
# can't delete them, but REVIVE is never needed
absl_delete_files = self.options.get('absl_delete_files')
if 'REVIVE' in absl_delete_files or '*_bs_*' in absl_delete_files:
rm_line = 'rm REVIVE\n'
else:
rm_line = ''
absl_script = ["#!/bin/bash\n\n", "export FORT_BUFFERED=true\n\n",
"export OMP_NUM_THREADS=1\n\n"]
for directory in individual_directories:
absl_script.extend(["pushd %s > /dev/null\n" % directory,
"%s\n" % dl_poly_exe,
rm_line,
"popd > /dev/null\n"])
absl_faps = open('absl_faps', 'w')
absl_faps.writelines(absl_script)
absl_faps.close()
os.chmod('absl_faps', 0o755)
# Submit this script
if self.options.getbool('no_submit'):
info("ABSL input files generated; skipping job submission")
jobids[(temp, press)] = [False]
else:
jobid = self.job_handler.submit('absl', self.options)
jobids[(temp, press)] = [jobid]
os.chdir('..')
os.chdir(self.options.get('job_dir'))
return jobids
[docs] def calculate_properties(self):
"""Calculate general structural properties."""
job_name = self.options.get('job_name')
job_dir = self.options.get('job_dir')
props_dir = path.join(job_dir, 'faps_%s_properties' % job_name)
mkdirs(props_dir)
os.chdir(props_dir)
# Neighbour list is only used by surface area, uncomment if needed
# for anything else
#self.structure.gen_neighbour_list()
# Since this runs before fastmc, and can run without it, check if the
# guests are initialised here
guests = [Guest(x) for x in self.options.gettuple('guests')]
if not same_guests(self.structure.guests, guests):
info("Replacing old guests with %s" % " ".join(guest.ident for
guest in guests))
self.structure.guests = guests
##
# Surface area calculations
##
surf_probes = self.options.gettuple('surface_area_probe', dtype=float)
for probe in surf_probes:
if self.structure.surface_area(probe) is None:
self.structure.surface_area(probe, value=self.calc_surface_area(probe))
# Neighbour list makes .niss too big; remove them
for atom in self.structure.atoms:
atom.neighbours = None
del atom.neighbours
# Zeoplusplus gives fast access to many properties
if self.options.getbool('zeo++'):
try:
self.calculate_zeo_properties()
except (OSError, IOError):
error("Error running zeo++; skipping")
# PLATON can calculate the PXRD pattern
if self.options.getbool('platon_pxrd'):
try:
self.calculate_pxrd()
except (OSError, IOError):
error("Error running platon; skipping")
os.chdir(job_dir)
[docs] def calculate_zeo_properties(self):
"""Run the zeo++ and update properties with no error trapping."""
job_name = self.options.get('job_name')
zeofiles = self.structure.to_zeoplusplus()
filetemp = open("%s.cssr" % job_name, 'w')
filetemp.writelines(zeofiles[0])
filetemp.close()
filetemp = open("%s.rad" % job_name, 'w')
filetemp.writelines(zeofiles[1])
filetemp.close()
filetemp = open("%s.mass" % job_name, 'w')
filetemp.writelines(zeofiles[2])
filetemp.close()
probes = set([1.0]) # Always have a helium probe
for guest in self.structure.guests:
if hasattr(guest, 'probe_radius'):
probes.add(guest.probe_radius)
zeo_exe = shlex.split(self.options.get('zeo++_exe'))
zeo_exe += ['-mass', '%s.mass' % job_name, '-r', '%s.rad' % job_name]
cssr_file = ['%s.cssr' % job_name]
# incuded sphere, free sphere, included sphere along free path
zeo_command = zeo_exe + ['-res'] + cssr_file
info("Running zeo++ pore diameters")
debug("Running command: '" + " ".join(zeo_command) + "'")
zeo_process = subprocess.Popen(zeo_command, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
zeo_process.wait()
zeo_stderr = " ".join(x.strip() for x in zeo_process.stderr.readlines())
debug(zeo_stderr)
if "Voronoi volume check failed" in zeo_stderr:
warning("Structure is likely bad; zeo++ is unable to complete")
warning(zeo_stderr)
self.structure.bad_structure = True
res_file = open('%s.res' % job_name).read().split()
self.structure.pore_diameter = tuple(float(x) for x in res_file[1:])
atom_samples = '%i' % 2000
volume_samples = '%i' % (20*self.structure.cell.volume)
for probe in probes:
zeo_command = zeo_exe + [
'-chan', '%f' % probe,
'-sa', '%f' % probe, '%f' % probe, atom_samples,
'-vol', '%f' % probe, '%f' % probe, volume_samples] + cssr_file
debug("Running command: '" + " ".join(zeo_command) + "'")
zeo_process = subprocess.Popen(zeo_command, stdout=subprocess.PIPE)
zeo_process.wait()
# channel dimensionality
channels = [int(x) for x in open('%s.chan' % job_name).read().split()[5:]]
self.structure.sub_property('dimensionality', probe, channels)
# surface area
for line in open('%s.sa' % job_name):
if 'A^2' in line:
self.structure.sub_property('zeo_surface_area', probe,
value=float(line.split()[-1]))
# accessible volume
for line in open('%s.vol' % job_name):
if 'A^3' in line:
self.structure.sub_property('void_volume', probe,
value=float(line.split()[-1]))
[docs] def calculate_pxrd(self):
"""Run platon PXRD and update properties with no error trapping."""
job_name = self.options.get('job_name')
out_cif = self.structure.to_cif()
filetemp = open("%s.faps.cif" % job_name, 'w')
filetemp.writelines(out_cif)
filetemp.close()
platon_exe = self.options.get('platon_exe')
platon_cmd = [platon_exe, '-Q', '-o', '%s.faps.cif' % job_name]
info("Running PLATON PXRD")
debug("Running command: '" + " ".join(platon_cmd) + "'")
platon_process = subprocess.Popen(platon_cmd, stdout=subprocess.PIPE)
platon_process.wait()
cpi_file = open('%s.faps.cpi' % job_name).readlines()
probe = cpi_file[4].strip() # source metal, e.g. Cu, Mo
try:
xrd = [int(x) for x in cpi_file[10:]]
self.structure.sub_property('pxrd', probe=probe, value=xrd)
except ValueError:
warning("PXRD gave weird result, check structure")
# These are big and useless?
remove_files(['%s.faps.lis' % job_name, '%s.faps.eld' % job_name,
'%s.faps.ps' % job_name])
@property
def esp_grid(self):
"""Estimate the esp grid based on resolution and memory."""
# If repeat is unsing double precision, use 4 for single
repeat_prec = 8
# User defined resolution, try to use this
resolution = self.options.getfloat('esp_resolution')
repeat_ncpu = self.options.getint('repeat_ncpu')
if repeat_ncpu == 1:
vmem = self.options.getfloat('serial_memory')
else:
vmem = self.options.getfloat('threaded_memory')
# Nice even grids might scale better in parallel repeat
esp_grid = tuple([int(4*np.ceil(x/(4*resolution)))
for x in self.structure.cell.params[:3]])
memory_guess = prod(esp_grid)*self.structure.natoms*repeat_prec/1e9
self._esp_reduced = False
if memory_guess > vmem:
warning("ESP at this resolution might need up to %.1f GB of "
"memory but calculation will only request %.1f" %
(memory_guess, vmem))
resolution = resolution/pow(vmem/memory_guess, 1.0/3)
esp_grid = tuple([int(4*np.ceil(x/(4*resolution)))
for x in self.structure.cell.params[:3]])
warning("Reduced grid to %.2f A resolution to fit" % resolution)
self._esp_reduced = True
elif resolution != 0.1:
# VASP defaults to grids of around 0.1, so check if user has
# requested a reduced grid
info("User requested esp resolution %f" % resolution)
self._esp_reduced = True
self._esp_grid = esp_grid
return esp_grid
@property
def esp_reduced(self):
"""Has the esp been reduced to fit the memory requirements?"""
if not hasattr(self, '_esp_reduced'):
# generate the esp and check memory requirements
self.esp_grid
return self._esp_reduced
[docs] def calc_surface_area(self, rprobe=0.0):
"""Accessible surface area by uniform or Monte Carlo sampling."""
self.structure.gen_neighbour_list()
xyz = []
resolution = self.options.getfloat('surface_area_resolution')
uniform = self.options.getbool('surface_area_uniform_sample')
info("Calculating surface area: %.3f probe, %s points, %.3f res" %
(rprobe, ("random","uniform")[uniform], resolution))
total_area = 0.0
hydrophilic_area = 0.0
# gromacs default of 0.2 seems very constrained
hydrophilic_threshold = 0.3
cell = self.structure.cell.cell
inv_cell = np.linalg.inv(cell.T)
# Pre-calculate and organise the in-cell atoms
atoms = [(atom.ipos(cell, inv_cell).tolist(),
atom.ifpos(inv_cell),
atom.vdw_radius+rprobe,
atom.neighbours,
atom) for atom in self.structure.atoms]
# sigma is the vdw_radius plus distance to center of probe, which
# gives accessible surface area;
all_samples = []
for a1_pos, a1_fpos, a1_sigma, neighbours, atom in atoms:
surface_area = 4*pi*(a1_sigma**2)
nsamples = int(surface_area/resolution)
if not nsamples in all_samples:
debug("Atom type with %i samples" % nsamples)
all_samples.append(nsamples)
ncount = 0
if uniform:
# uniform spiral sample of surface
z_vals = np.linspace(1, -1, nsamples, endpoint=True)
r_vals = sqrt(1-z_vals**2)
t_vals = np.linspace(0, pi*(3-(5**0.5))*nsamples,
nsamples, endpoint=False)
points = array([r_vals*cos(t_vals),
r_vals*sin(t_vals),
z_vals]).transpose()*a1_sigma + a1_pos
else:
# random MC sampling
phi = 2*np.random.random(nsamples)*pi
costheta = np.random.random(nsamples)*2 - 1
theta = arccos(costheta)
points = array([sin(theta)*cos(phi),
sin(theta)*sin(phi),
cos(theta)]).transpose()*a1_sigma + a1_pos
# All points are brought into the cell
points = [dot(inv_cell, x) for x in points]
fpoints = np.mod(points, 1.0)
points = [list(dot(x, cell)) for x in fpoints]
for point, fpoint in zip(points, fpoints):
# Check for overlap
for a2_dist, a2_idx in neighbours:
a2_pos = atoms[a2_idx][0]
a2_fpos = atoms[a2_idx][1]
a2_sigma = atoms[a2_idx][2]
if a2_dist > a1_sigma + a2_sigma:
# No more atoms within the radius, point valid
ncount += 1
xyz.append((atom.type, point, atom.charge))
break
elif vecdist3(point, a2_pos) < a2_sigma:
# Point collision
break
elif min_dist(point, fpoint, a2_pos, a2_fpos, cell) < a2_sigma:
# Periodic collision
break
else:
# Loop over all atoms finished; point valid
ncount += 1
xyz.append((atom.type, point, atom.charge))
# Fraction of the accessible surface area for sphere to real area
if abs(atom.charge) > hydrophilic_threshold:
hydrophilic_area += (surface_area*ncount)/nsamples
total_area += (surface_area*ncount)/nsamples
if self.options.getbool('surface_area_save'):
job_name = self.options.get('job_name')
xyz_out = open('%s-surf-%.2f.xyz' % (job_name, rprobe), 'w')
xyz_out.write('%i\nResolution: %f Area: %f\n' %
(len(xyz), resolution, total_area))
for ppt in xyz:
xyz_out.write(('%-6s' % ppt[0]) +
('%10.6f %10.6f %10.6f' % tuple(ppt[1])) +
('%10.6f\n' % ppt[2]))
try:
hydrophilic_fraction = hydrophilic_area/total_area
except ZeroDivisionError:
hydrophilic_fraction = 0.0
info("Hydrophilic area (A^2) and fraction (probe: %f): %f, %f" %
(rprobe, hydrophilic_area, hydrophilic_fraction))
return total_area
[docs]class Structure(object):
"""
The current state of the structure; update as the calculations proceed.
Structure provides methods to produce input files for and take output from
various computational chemistry packages but needs to be told what to do.
Internal energy units are kcal/mol.
Methods are grouped:
* Initial structure parsers
* Output file parsers to update structure
* Input file generation
* Internal manipulation methods
"""
# TODO: dft energy?
def __init__(self, name):
"""Just instance an empty structure initially."""
self.name = name
self.cell = Cell()
self.atoms = []
self.esp = None
self.dft_energy = 0.0
self.uff_energy = 0.0
self.guests = []
self.properties = {}
self.space_group = None
self.net_charge = None
[docs] def from_file(self, basename, filetype, defaults):
"""Select the correct file parser."""
if filetype in ['sql', 'sqlite']:
# Makeshift selection method to select a mof
# jobname is dbname.type.structure_id
from backend.sql import AlchemyBackend
# [db_name, sym or free, identity]
db_params = self.name.split('.')
reader = AlchemyBackend(db_params[0])
cif_string = reader.start_cif(db_params[1], int(db_params[2]))
self.from_cif(string=cif_string)
elif filetype.lower() in ['pdb']:
self.from_pdb(basename + '.' + filetype)
elif filetype.lower() in ['pqr']:
# Look for a pqr or just a pdb wih charges
listdir = os.listdir('.')
if (basename + '.pqr') in listdir:
self.from_pdb(basename + '.pqr', charges=True)
else:
self.from_pdb(basename + '.pdb', charges=True)
elif filetype.lower() in ['vasp', 'poscar', 'contcar']:
listdir = os.listdir('.')
test_files = [
basename + '.contcar', basename + '.CONTCAR', 'CONTCAR',
basename + '.poscar', basename + '.POSCAR', 'POSCAR']
for filename in test_files:
if filename in listdir:
self.from_vasp(filename)
break
elif filetype.lower() in ['cif']:
self.from_cif(basename + '.' + filetype)
elif filetype.lower() in ['xyz']:
cell = defaults.gettuple('default_cell', float)
self.from_xyz(basename + '.' + filetype, cell=cell)
else:
error("Unknown filetype %s" % filetype)
[docs] def update_pos(self, opt_code, options=None):
"""Select the method for updating atomic positions."""
opt_path = path.join('faps_%s_%s' % (self.name, opt_code))
info("Updating positions from %s" % opt_code)
if opt_code == 'vasp':
self.from_vasp(path.join(opt_path, 'CONTCAR'), update=True)
elif opt_code == 'siesta':
self.from_siesta(path.join(opt_path, '%s.STRUCT_OUT' % self.name))
elif opt_code == 'gromacs':
self.from_gromacs(path.join(opt_path, 'confout.g96'))
unneeded_files = options.gettuple('gromacs_delete_files')
remove_files(unneeded_files, opt_path)
keep_files = options.gettuple('gromacs_compress_files')
compress_files(keep_files, opt_path)
elif opt_code == 'gulp':
opt_path = "%s_opt" % opt_path
self.optimisation_output = validate_gulp_output(
path.join(opt_path, 'faps-%s.out' % self.name))
self.from_gulp_output(path.join(opt_path, '%s.grs' % self.name))
else:
error("Unknown positions to import %s" % opt_code)
[docs] def update_charges(self, charge_method, options=None):
"""Select the method for updating charges."""
charge_path = path.join('faps_%s_%s' % (self.name, charge_method))
if charge_method == 'repeat':
info("Updating charges from repeat")
self.charges_from_repeat(
path.join(charge_path, 'faps-%s.out' % self.name),
options.getbool('symmetry'))
# Cleanup of REPEAT files
unneeded_files = options.gettuple('repeat_delete_files')
remove_files(unneeded_files, charge_path)
keep_files = options.gettuple('repeat_compress_files')
compress_files(keep_files, charge_path)
elif charge_method == 'gulp':
info("Updating charges from GULP QEq")
self.charges_from_gulp(
path.join(charge_path, 'faps-%s.out' % self.name))
elif charge_method == 'egulp':
info("Updating charges from EGULP QEq")
self.charges_from_egulp(path.join(charge_path, 'charges.dat'))
else:
error("Unknown charge method to import %s" % charge_method)
[docs] def update_gcmc(self, tp_point, options):
"""Select the source for GCMC results and import."""
gcmc_code = options.get('mc_code')
gcmc_path = path.join('faps_%s_%s' % (self.name, gcmc_code))
# Runs in subdirectories
tp_path = path.join(gcmc_path, format_tp_path(tp_point))
if gcmc_code == 'fastmc':
info("Importing results from FastGCMC")
self.fastmc_postproc(tp_path, tp_point, options)
else:
error("Unknown gcmc method to import %s" % gcmc_code)
[docs] def update_absl(self, tp_point, options):
"""Select the source for ABSL results and import."""
absl_path = path.join('faps_%s_%s' % (self.name, 'absl'))
# Runs in subdirectories
tp_path = path.join(absl_path, format_tp_path(tp_point))
info("Importing results from ABSL")
self.absl_postproc(tp_path, tp_point, options)
[docs] def from_pdb(self, filename, charges=False):
"""Read an initial structure from a pdb file."""
info("Reading positions from pdb file: %s" % filename)
filetemp = open(filename)
pdb_file = filetemp.readlines()
filetemp.close()
# Build a local list before setting attribute
newatoms = []
for line in pdb_file:
lline = line.lower()
if lline.startswith('cryst1'):
self.cell.from_pdb(line)
self.space_group = line[55:56]
elif lline.startswith('atom') or lline.startswith('hetatm'):
newatom = Atom()
newatom.from_pdb(line, charges=charges)
newatoms.append(newatom)
self.atoms = newatoms
[docs] def from_cif(self, filename=None, string=None):
"""Genereate structure from a .cif file."""
if filename is not None:
info("Reading positions from cif file: %s" % filename)
filetemp = open(filename)
cif_file = filetemp.readlines()
filetemp.close()
elif string is not None:
info("Positions from cif string")
cif_file = string.splitlines()
else:
error("No source for cif file")
cif_file = strip_blanks(cif_file)
params = [None, None, None, None, None, None]
atoms = []
cif_bonds = {}
symmetry = []
loops = []
idx = 0
while idx < len(cif_file):
# line text needs to be manageable; cif guidelines can be
# permissive
# Can no longer just check for _underscores in lines
# as UFF types can have them and mess up parsing
line = cif_file[idx].lower().strip()
if '_cell_length_a' in line:
params[0] = ufloat(line.split()[1])
elif '_cell_length_b' in line:
params[1] = ufloat(line.split()[1])
elif '_cell_length_c' in line:
params[2] = ufloat(line.split()[1])
elif '_cell_angle_alpha' in line:
params[3] = ufloat(line.split()[1])
elif '_cell_angle_beta' in line:
params[4] = ufloat(line.split()[1])
elif '_cell_angle_gamma' in line:
params[5] = ufloat(line.split()[1])
elif '_symmetry_space_group_name_h-m' in line:
self.space_group = line.split()[1]
elif '_chemical_properties_physical' in line:
physical = list(shlex.shlex(line, posix=False))[1].strip("'").lower()
if physical.startswith('net charge is'):
self.net_charge = float(physical.split()[3])
elif 'loop_' in line:
# loops for _atom_site, _symmetry and _geom
heads = []
body = []
while line.startswith('_') or 'loop_' in line:
# must keep the loop_ line as this can still contain headers
heads.extend(line.split())
idx += 1
# don't lower these to keep atomic symbols
line = cif_file[idx].strip()
while idx < len(cif_file) and not line.startswith('_') and not 'loop_' in line:
# shlex keeps 'quoted items' as one
# Some cifs seem to have primed atom symbols
# posix=False should help
# using .shlex instead of .split works with '#' comments too
split_line = shlex.shlex(line, posix=False)
split_line.whitespace_split = True
split_line = list(split_line)
body.extend([x.strip("'").strip('"') for x in split_line])
idx += 1
try:
line = cif_file[idx]
except IndexError:
line = ''
if 'loop_' in heads:
heads.remove('loop_')
loops.append((heads, body))
continue
idx += 1
# cell first
if np.all(params):
self.cell.params = params
else:
error("No cell or incomplete cell found in cif file")
# parse loop contents
for heads, body in loops:
if '_atom_site_fract_x' in heads:
while body:
atoms.append(dict(zip(heads, body)))
body = body[len(heads):]
if '_symmetry_equiv_pos_as_xyz' in heads:
while body:
sym_dict = dict(zip(heads, body))
symmetry.append(
Symmetry(sym_dict['_symmetry_equiv_pos_as_xyz']))
body = body[len(heads):]
if '_ccdc_geom_bond_type' in heads:
while body:
bond_dict = dict(zip(heads, body))
# bond is sorted so there are no duplicates
# and tuple so it can be hashed
bond = (bond_dict['_geom_bond_atom_site_label_1'],
bond_dict['_geom_bond_atom_site_label_2'])
bond = tuple(sorted(bond))
# bond distance and type defualts to None if not specified
distance = bond_dict.get('_geom_bond_distance')
if distance is not None:
distance = float(distance)
bond_type = bond_dict.get('_ccdc_geom_bond_type')
cif_bonds[bond] = (distance, bond_type)
body = body[len(heads):]
if not symmetry:
debug('No symmetry found; assuming identity only')
symmetry = [Symmetry('x,y,z')]
newatoms = []
for site_idx, atom in enumerate(atoms):
for sym_op in symmetry:
newatom = Atom(parent=self)
newatom.from_cif(atom, self.cell.cell, sym_op, site_idx)
newatoms.append(newatom)
self.atoms = newatoms
if len(symmetry) > 1:
# can skip if just identity operation as it's slow for big systems
# Some of pete's symmetrised mofs need a higher tolerence
duplicate_tolerance = 0.2 # Angstroms
self.remove_duplicates(duplicate_tolerance)
bonds = {}
# TODO(tdaff): this works for the one tested MOF; 0.1 was not enough
# only check for bonds that are too long, not too short.
bond_tolerence = 0.25
# Assign bonds by index
for bond, bond_data in cif_bonds.items():
for first_index, first_atom in enumerate(self.atoms):
if first_atom.site == bond[0]:
for second_index, second_atom in enumerate(self.atoms):
if second_atom is first_atom:
continue
elif second_atom.site == bond[1]:
# TODO(tdaff): symmetry implementation for cif bonding
distance = min_distance(first_atom, second_atom)
bond_dist = bond_data[0]
if bond_dist is None:
bond_dist = first_atom.covalent_radius + second_atom.covalent_radius
if distance < (bond_dist + bond_tolerence):
# use the sorted index as bonds between the
# same type are doubly specified
bond_id = tuple(sorted((first_index, second_index)))
bonds[bond_id] = CCDC_BOND_ORDERS[bond_data[1]]
if first_atom.is_metal or second_atom.is_metal:
first_atom.is_fixed = True
second_atom.is_fixed = True
self.bonds = bonds
self.symmetry = symmetry
[docs] def from_vasp(self, filename='CONTCAR', update=False):
"""Read a structure from a vasp [POS,CONT]CAR file."""
info("Reading positions from vasp file: %s" % filename)
fix_vasp_wrapped_types(filename)
filetemp = open(filename)
contcar = filetemp.readlines()
filetemp.close()
atom_list = []
atom_types = []
scale = float(contcar[1])
self.cell.from_lines(contcar[2:5], scale)
if contcar[5].split()[0].isalpha():
# vasp 5 with atom names
atom_types = contcar[5].split()
del contcar[5]
poscar_counts = [int(x) for x in contcar[5].split()]
natoms = sum(poscar_counts)
if contcar[6].strip()[0].lower() in "s":
# 's'elective dynamics line; we don't care
del contcar[6]
# mcell converts frac -> cart if necessary and scales
if contcar[6].strip()[0].lower() in "ck":
mcell = identity(3) * scale
else:
mcell = self.cell.cell
# parsing positions
if update:
for atom, at_line in zip(self.atoms, contcar[7:7+natoms]):
atom.from_vasp(at_line, cell=mcell)
elif not atom_types:
critical("Will not extract structure from older vasp files")
else:
line_idx = 6
for at_type, at_count in zip(atom_types, poscar_counts):
for _atom_idx in range(at_count):
line_idx += 1
this_atom = Atom()
this_atom.from_vasp(contcar[line_idx], at_type, mcell)
atom_list.append(this_atom)
self.atoms = atom_list
[docs] def from_siesta(self, filename):
"""Update the structure from the siesta output."""
info("Updating positions from file: %s" % filename)
filetemp = open(filename)
struct_out = filetemp.readlines()
filetemp.close()
self.cell.from_lines(struct_out[:3])
for atom, line in zip(self.atoms, struct_out[4:]):
atom.from_siesta(line, self.cell.cell)
[docs] def from_gromacs(self, filename):
"""Update the structure from a gromacs optimisation G96 format file."""
info("Updating positions from file: %s" % filename)
g96 = open(filename).readlines()
# Atom positions, just regular
for line, atom in zip(g96[4:], self.atoms):
atom.pos = [10.0*float(x) for x in line[25:].split()] # nm to A
del atom.fractional
# New cell too, possibly, check ordering.
box = [10*float(x) for x in g96[-2].split()]
# Only reports three values for cubic cell
if len(box) == 3:
new_cell = [box[0], 0.0, 0.0,
0.0, box[1], 0.0,
0.0, 0.0, box[2]]
else:
new_cell = [box[0], box[3], box[4],
box[5], box[1], box[6],
box[7], box[8], box[2]]
self.cell.cell = new_cell
# looking for energy too
md_log = open(path.join(path.dirname(filename), 'md.log'))
for line in md_log:
if line.startswith('Potential Energy'):
self.uff_energy = float(line.split()[-1])
info("UFF energy: %f kJ/mol" % self.uff_energy)
elif line.startswith('Maximum force'):
maximum_force = float(line.split()[3])
info("Maximum Force: %f kJ/mol/nm" % maximum_force)
if maximum_force > 10:
error("Calculation is not converged!! Check output!")
# Make sure everything is good from here
if self.check_close_contacts(covalent=1.0):
warning("Structure may have atom overlap, check gromacs output!")
self.bad_structure = True
if self.bond_length_check():
warning("Structure may have strained bonds, check gromacs output!")
self.bad_structure = True
[docs] def from_gulp_output(self, filename):
"""Update the structure from the gulp optimisation output."""
info("Updating positions from file: %s" % filename)
grs_out = open(filename)
for line in grs_out:
if line.strip() == 'cell':
params = tuple(float(x) for x in grs_out.next().split()[:6])
self.cell.params = params
elif line.strip() == 'fractional':
cell = self.cell.cell
for atom, atom_line in zip(self.atoms, grs_out):
atom.pos = dot([gfloat(x) for x in atom_line.split()[2:5]], cell)
# FIXME(tdaff) fractionals need to change automatically
# when the position gets updated (or the cell!)
del atom.fractional
# Make sure everything is good from here
if self.check_close_contacts(covalent=1.0):
warning("Structure might have atom overlap, check gulp output!")
self.bad_structure = True
if self.bond_length_check():
warning("Structure might have strained bonds, check gulp output!")
self.bad_structure = True
[docs] def from_xyz(self, filename, update=False, cell=None):
"""Read a structure from an file."""
info("Reading positions from xyz file: %s" % filename)
filetemp = open(filename)
xyz_file = filetemp.readlines()
filetemp.close()
# Setting the cell
if len(cell) == 6:
self.cell.params = cell
elif len(cell) == 9:
self.cell.cell = array(cell).reshape((3, 3))
elif cell is not None:
error("Invalid cell specification %s" % str(cell))
# Build a local list before setting attribute
newatoms = []
natoms = int(xyz_file[0])
self.properties['header'] = xyz_file[1].strip()
for line in xyz_file[2:2+natoms]:
newatom = Atom()
newatom.from_xyz(line)
newatoms.append(newatom)
if update:
if natoms != self.natoms:
critical("Incorrect number of atoms to update")
terminate(96)
for atom, newatom in zip(self.atoms, newatoms):
if atom.type != newatom.type:
error("Atom order may have changed")
atom.pos = newatom.pos
else:
self.atoms = newatoms
[docs] def charges_from_repeat(self, filename, symmetry=False):
"""Parse charges and update structure."""
info("Getting charges from file: %s" % filename)
charges = []
filetemp = open(filename)
for line in filetemp:
# Stripping line means it can read Fortran or C++ REPEAT output
if line.strip().startswith("Charge"):
line = line.split()
# index, type, charge
charges.append((int(line[1]), int(line[4]), float(line[6])))
if "Error" in line:
if float(line.split()[-1]) > 0.6:
warning("Error in repeat charges is very high - check cube!")
filetemp.close()
if symmetry:
tree = self.symmetry_tree
if len(charges) != len(tree):
error("Incorrect number of charge sets; check REPEAT output")
terminate(97)
for symm, charge in zip(sorted(tree.items()), charges):
for at_idx in symm[1]:
self.atoms[at_idx].charge = charge[2]
else:
if len(charges) != len(self.atoms):
error("Incorrect number of charges; check REPEAT output")
terminate(90)
for atom, charge in zip(self.atoms, charges):
atom.charge = charge[2]
[docs] def charges_from_gulp(self, filename):
"""Parse QEq charges from GULP output."""
info("Getting charges from file: %s" % filename)
filetemp = open(filename)
gout = filetemp.readlines()
filetemp.close()
start_line = None
failures = 0
for line_num, line in enumerate(gout):
if ' Final charges from QEq :' in line:
# Take the last set of charges
start_line = line_num + 7
elif 'Failed to converge' in line:
# This will occur twice in the output for complete failure
failures += 1
if start_line is None:
error("Final charges not found in gulp output")
terminate(184)
elif failures > 1:
warning("Gulp charges may not be converged")
for atom, chg_line in zip(self.atoms, gout[start_line:]):
atom.charge = float(chg_line.split()[2])
[docs] def charges_from_egulp(self, filename):
"""Parse QEq charges from EGULP output."""
info("Getting charges from file: %s" % filename)
filetemp = open(filename)
gout = filetemp.readlines()
filetemp.close()
# charges.dat file only has list of charges in it
for atom, chg_line in zip(self.atoms, gout):
atom.charge = float(chg_line.split()[2])
if atom.charge != atom.charge:
# These are 'nan'; should not continue or fastmc will mess up
error("Egulp charges did not converge, check structure")
terminate(107)
elif abs(atom.charge) == INFINITY:
error("Egulp gave infinite charges, check structure")
terminate(108)
elif abs(atom.charge) > 10:
warning("Very high charge from egulp: %s %f" %
(atom.site, atom.charge))
[docs] def to_vasp(self, options):
"""Return a vasp5 poscar as a list of lines."""
optim_h = options.getbool('optim_h')
optim_all = options.getbool('optim_all')
poscar = ["%s\n" % self.name[:80],
" 1.0\n"]
# Vasp does 16 dp but we get rounding errors (eg cubic) use 14
poscar.extend(self.cell.to_vector_strings(fmt="%23.14f"))
# Bunch up types as much as possible.
types, counts = count_ordered_types(self.atoms)
# Non consecutive similar types are not the same species anymore
sspecies = "".join(" %s" % x for x in types) + "\n"
scounts = "".join(" %i" % x for x in counts) + "\n"
if len(sspecies) > 254 or len(scounts) > 254:
warning("Faps has detected that you have too many atoms for vasp"
"to use in an orderd list and has attempted to re-order"
"them. This might break your calculation and it might be"
"better to run with oreder_atom_types turned on")
self.order_by_types()
types, counts = count_ordered_types(self.atoms)
# Regenerate these strings for new ordering
sspecies = "".join(" %s" % x for x in types) + "\n"
scounts = "".join(" %i" % x for x in counts) + "\n"
poscar.append(sspecies)
poscar.append(scounts)
# We always have the T or F so turn on selective dynamics for
# fixed pos variable cell
poscar.extend(["Selective dynamics\n", "Cartesian\n"])
if optim_all:
info("Optimizing all atom positions")
fix_h = 'T'
fix_all = 'T'
elif optim_h:
info("Optimizing hydrogen positions")
fix_h = 'T'
fix_all = 'F'
else:
info("All atom positions fixed")
fix_h = 'F'
fix_all = 'F'
# assume atoms are ordered
# Atom lines differ from vasp to ensure spaces between numbers
# with <-10 values
for atom in self.atoms:
if atom.type == "H":
poscar.append("%20.15f %19.15f %19.15f" % tuple(atom.pos) +
"%4s%4s%4s\n" % (fix_h, fix_h, fix_h))
else:
poscar.append("%20.15f %19.15f %19.15f" % tuple(atom.pos) +
"%4s%4s%4s\n" % (fix_all, fix_all, fix_all))
return poscar
[docs] def to_siesta(self, options):
"""Return a siesta input file as a list of lines."""
job_name = options.get('job_name')
siesta_accuracy = options.get("siesta_accuracy").lower()
if siesta_accuracy in ['high']:
info("Using 'high' accuracy siesta settings")
basis = ('DZP', 100, 200)
elif siesta_accuracy in ['low']:
info("Using 'low' accuracy siesta settings")
basis = ('SZ', 200, 100)
else:
info("Using default siesta accuracy settings")
basis = ('DZ', 150, 150)
u_atoms = unique(self.atoms, key=lambda x: x.type)
u_types = unique(self.types)
fdf = ([
"SystemName %s\n" % job_name,
"SystemLabel %s\n" % job_name,
"\n",
"NumberOfAtoms %i\n" % len(self.atoms),
"NumberOfSpecies %i\n" % len(u_atoms),
"\n",
"SaveElectrostaticPotential .true.\n",
"WriteMullikenPop 1\n"
"WriteXML F\n",
"\n",
"# Accuracy bits\n",
"\n",
"PAO.BasisSize %s\n" % basis[0],
"PAO.EnergyShift %i meV\n" % basis[1],
"MeshCutoff %i Ry\n" % basis[2],
"\n",
"MaxSCFIterations 100\n",
"XC.Functional GGA\n",
"XC.Authors PBE\n",
"SolutionMethod diagon\n",
"ElectronicTemperature 25 K\n",
"DM.NumberPulay 5\n",
"DM.MixingWeight 0.05\n",
"\n",
"%block ChemicalSpeciesLabel\n"] + [
"%6i %6i %6s\n" % ((idx + 1), atom.atomic_number, atom.type)
for idx, atom in enumerate(u_atoms)] + [
"%endblock ChemicalSpeciesLabel\n",
"\n",
"LatticeConstant 1 Ang\n"
"%block LatticeVectors\n"] +
self.cell.to_vector_strings() + [
"%endblock LatticeVectors\n",
"\n",
"AtomicCoordinatesFormat Ang\n",
"\n",
"%block AtomicCoordinatesAndAtomicSpecies\n"] + [
"%12.8f %12.8f %12.8f" % tuple(atom.pos) + "%6i\n" %
(u_types.index(atom.type) + 1) for atom in self.atoms] + [
"%endblock AtomicCoordinatesAndAtomicSpecies\n"])
if "Br" in u_types:
fdf.extend(["\n%block PS.lmax\n",
" Br 2\n",
"%endblock PS.lmax\n"])
if "In" in u_types:
# semicore states need to be accounted for
# TODO(tdaff): selective polarisation on the basis functions
fdf.extend(["\n%block PAO.Basis\n",
"In 3\n",
" n=5 0 2\n",
" 0.0 0.0\n",
" n=5 1 2 P\n",
" 0.0 0.0\n",
" n=4 2 2\n",
" 0.0 0.0\n",
"%endblock PAO.Basis\n"])
optim_h = options.getbool('optim_h')
optim_all = options.getbool('optim_all')
optim_cell = options.getbool('optim_cell')
constraint = []
if optim_all:
info("Optimizing all atom positions")
fdf.append("\nMD.TypeOfRun CG\n")
fdf.append("MD.NumCGSteps %i\n" % 800)
elif optim_h and "H" in self.types:
info("Optimizing hydrogen positions")
fdf.append("\nMD.TypeOfRun CG\n")
fdf.append("MD.NumCGSteps %i\n" % 300)
constraint = ["%i" % (idx+1)
for idx, species in enumerate(self.types)
if species not in ["H"]]
elif optim_cell:
fdf.append("\nMD.TypeOfRun CG\n")
fdf.append("MD.NumCGSteps %i\n" % 300)
constraint = ["%i" % (idx+1) for idx in range(self.natoms)]
if optim_cell:
info("Cell vectors will be optimized")
fdf.append("MD.VariableCell .true.\n")
if constraint:
constraint = textwrap.fill(" ".join(constraint),
initial_indent='position ',
subsequent_indent='position ')
fdf.extend(["\n%block GeometryConstraints\n",
constraint, "\n",
"%endblock GeometryConstraints\n"])
return fdf
[docs] def to_gulp(self, qeq_fit=False, optimise=False, terse=False, qeq_dict={}):
"""Return a GULP file to use for the QEq charges."""
if qeq_fit:
keywords = "fitting bulk_noopt qeq\n"
elif optimise:
return self.to_gulp_optimise(terse=terse)
else:
keywords = "single conp qeq\n"
gin_file = [
"# \n# Keywords:\n# \n",
keywords,
"# \n# Options:\n# \n",
"# Initial file written by faps\n"
"name %s\n" % self.name,
"vectors\n"] + self.cell.to_vector_strings() + [
"cartesian\n"]
if qeq_fit:
for atom in self.atoms:
gin_file.extend(["%-5s core " % atom.type,
"%14.7f %14.7f %14.7f " % tuple(atom.pos),
"%14.7f\n" % atom.charge])
gin_file.append("\n# \n# Fitting variables:\n# \nobservables\n")
for idx, atom in enumerate(self.atoms):
gin_file.append("monopoleq\n%5s%11.6f\n" % (idx+1, atom.charge))
gin_file.append("end\nqelectronegativity\n")
for at_type in unique(self.types):
gin_file.extend(
["%-5s" % at_type,
"%9.5f %9.5f %9.5f 1 1 0\n" % QEQ_PARAMS[at_type]])
else:
for atom in self.atoms:
gin_file.extend(["%-5s core " % atom.type,
"%14.7f %14.7f %14.7f\n" % tuple(atom.pos)])
if qeq_dict:
unique_types = unique(self.types)
gin_file.append('\nqelectronegativity\n')
for atom_type, params in qeq_dict.items():
if atom_type in unique_types:
gin_file.append('%-4s %f %f\n' % (atom_type, params[0], params[1]))
gin_file.append("\ndump every %s.grs\nprint 1\n" % self.name)
return gin_file
[docs] def to_gulp_optimise(self, terse=False):
"""Return a GULP file to optimise with UFF."""
# all atoms of the same forcefield type must have the same label
# gulp fails if atoms of the same type have different labels!
# this only crops up as an error in the 4.0 versions
# 'decimal_only' stops fractions that cannot be parsed when reading
# updated positions
if terse:
# Don't output the bonds
keywords = "opti noautobond decimal_only conj\n"
else:
keywords = "opti noautobond bond decimal_only conj\n"
gin_file = [
"# \n# Keywords:\n# \n",
keywords,
"# \n# Options:\n# \n",
"# UFF optimisation by gulp\n"
"name %s\n" % self.name,
# using an rfo minimiser with a preconditioned hessian from the
# BFGS minimiser seems to be the most efficient
"switch rfo gnorm 0.3\n",
"vectors\n"] + self.cell.to_vector_strings() + [
" 1 1 1\n 1 1 1\n 1 1 1\n", # constant pressure relaxation
"cartesian\n"]
all_ff_types = {}
for at_idx, atom in enumerate(self.atoms):
ff_type = atom.uff_type
if not ff_type in all_ff_types:
all_ff_types[ff_type] = atom.site
# Sanity check in case types are not given in the input
if ff_type not in UFF_FULL:
error("Atom %s has unknown type %s" % (atom, ff_type))
if atom.is_fixed:
fixed_flags = "0 0 0"
else:
fixed_flags = "1 1 1"
gin_file.extend(["%-5s core " % all_ff_types[ff_type],
"%14.7f %14.7f %14.7f " % tuple(atom.pos),
"%f " % atom.charge,
"%f " % 1.0, # occupancy
fixed_flags, "\n"])
#identify all the individual uff species for the library
gin_file.append("\nspecies\n")
for ff_type, species in all_ff_types.items():
gin_file.append("%-6s core %-6s\n" % (species, ff_type))
gin_file.append("\n")
for bond in sorted(self.bonds):
bond_type = GULP_BOND_ORDERS[self.bonds[bond]]
gin_file.append("connect %6i %6i %s\n" % (bond[0] + 1, bond[1] + 1, bond_type))
gin_file.append("\nlibrary uff\n")
gin_file.append("\nstepmx opt 0.05\n")
# Restart file is for final structure
gin_file.append("\ndump every %s.grs\n" % self.name)
# optimization movie useful for debugging mostly
if terse:
# These tell gulp to be quiet, but we also stop the massive arc
# file being generated
gin_file.append("\nterse inout structure\n"
"terse inout potentials\n"
"terse inout derivatives\n"
"#output movie arc %s\n" % self.name)
else:
gin_file.append("\noutput movie arc %s\n" % self.name)
return gin_file
[docs] def to_egulp(self, typed_atoms=False):
"""Generate input files for Eugene's QEq code."""
# bind cell locally for speed and convenience
cell = self.cell.cell
inv_cell = self.cell.inverse
# format exactly as Eugene original script generates it
geometry_file = ['%s\n' % self.name]
geometry_file.extend(self.cell.to_vector_strings(fmt=' %15.12f'))
geometry_file.append('%i\n' % self.natoms)
atomic_numbers = self.atomic_numbers
if typed_atoms:
# Include custom typing, new types must be found by hand.
# 800 N=O -- removed
# 801 S=O
# 802 S-O-H
for atom_idx, atom in enumerate(self.atoms):
if atom.uff_type == 'S_3+6':
for bond in self.bonds:
if atom_idx in bond:
other_idx = other_bond_index(bond, atom_idx)
if self.atoms[other_idx].uff_type == 'O_2':
atomic_numbers[other_idx] = 801
elif self.atoms[other_idx].uff_type == 'O_3':
atomic_numbers[other_idx] = 802
for another_bond in self.bonds:
if other_idx in another_bond:
another_idx = other_bond_index(another_bond, other_idx)
if self.atoms[another_idx].uff_type == 'H_':
atomic_numbers[another_idx] = 1001
# TODO(tdaff): delete code in future version; removed NO2 typing
# elif atom.uff_type == 'N_R':
# this_bonds = []
# for bond in self.bonds:
# if atom_idx in bond:
# other_idx = other_bond_index(bond, atom_idx)
# if self.atoms[other_idx].uff_type == 'O_R':
# atomic_numbers[other_idx] = 800
# atomic numbers should have been modified with exotic types by now
geometry_file.extend([
('%6d ' % atomic_number) +
('%12.7f %12.7f %12.7f' % tuple(atom.ipos(cell, inv_cell))) +
('%12.7f\n' % atom.charge)
for atom, atomic_number in zip(self.atoms, atomic_numbers)
])
return geometry_file
[docs] def to_config_field(self, options, fastmc=False, include_guests=None,
dummy=False):
"""
Return CONFIG and FIELD files in DL_POLY style
Parameters
----------
fastmc : boolean
When set to True, the FIELD file will contain the positions for MC
guests and will give incorrect results with DL_POLY.
include_guests : dict or None
A dictionary with guests to be included in the config file with
the framework. The guest type is the key and their positions are
nested lists.
dummy : boolean
If dummy is set, the interaction parameters and charges for the
guest are set to zero
Returns
-------
config : list
The config file as a list of newline terminated strings
field : list
The field file as a list of newline terminated strings
"""
# CONFIG
self.gen_supercell(options)
supercell = self.gcmc_supercell
levcfg = 0 # always
imcon = self.cell.imcon
natoms = len(self.atoms) * prod(supercell)
# do included guests first so natoms can be corrected
offset = 1
included_guests_part = []
# count if any guests are included
# use get( ... , 0) to get zero for not included
guest_nummols = {}
if include_guests is not None:
# having only one atom in DL_POLY errors with
# complaints about too few degrees of freedom
# if there is only one atom included, add a ghost
guest_items = include_guests.items()
dof_fix = (len(guest_items) == 1 # one guest type
and len(guest_items[0][1]) == 1 # one guest of type
and len(guest_items[0][1][0]) == 1 # one atom
and not fastmc)
debug("dot_fix %s" % dof_fix)
for guest in self.guests:
if guest.ident in include_guests:
guest_nummols[guest.ident] = len(include_guests[guest.ident])
for positions in include_guests[guest.ident]:
for atom, position in zip(guest.atoms, positions):
included_guests_part.extend(
["%-6s%10i\n" % (atom.type, offset),
"%20.12f%20.12f%20.12f\n" % tuple(position)])
if dof_fix:
# Add a phantom atom; hopefully Non type
# is not used elsewhere
offset += 1
included_guests_part.extend(
["%-6s%10i\n" % ("Non", offset),
"%20.12f%20.12f%20.12f\n" % tuple(position)])
# increment offset as it is used for the framework
offset += 1
# make natoms correct
natoms += offset - 1
else:
# don't need to apply degrees of freedon fix
dof_fix = False
config = ["%s\n" % self.name[:80],
"%10i%10i%10i\n" % (levcfg, imcon, natoms)]
config.extend(self.cell.to_vector_strings(scale=supercell))
# included guests go first to match field
config.extend(included_guests_part)
# put them in
for idx, atom in enumerate(self.supercell(supercell)):
# idx+offset for 1 based indexes in CONFIG
config.extend(["%-6s%10i\n" % (atom.type, idx + offset),
"%20.12f%20.12f%20.12f\n" % tuple(atom.pos)])
# FIELD
ntypes = len(self.guests) + 1
field = ["%s\n" % self.name[:80],
"UNITS kcal\n",
"molecular types %i\n" % ntypes]
# Guests
for guest in self.guests:
natoms = guest.natoms
if dof_fix and guest.ident in include_guests:
natoms += 1
field.extend(["&guest %s: %s\n" % (guest.name, guest.source),
"NUMMOLS %i\n" % guest_nummols.get(guest.ident, 0),
"ATOMS %i\n" % natoms])
for atom in guest.atoms:
# Can't just turn off electrostatics as we need to compare to
# the empty framework so zero guest charges
if dummy:
charge = 0
else:
charge = atom.charge
field.append("%-6s %12.6f %12.6f" %
tuple([atom.type, atom.mass, charge]))
if fastmc:
field.append("%12.6f %12.6f %12.6f\n" % tuple(atom.pos))
else:
# atom positions confuse dl_poly which takes nrept or ifrz
field.append(" 1 0\n")
if dof_fix and guest.ident in include_guests:
field.append("%-6s %12.6f %12.6f 1 0\n" %
tuple(["Non", 0.0, 0.0]))
field.append("rigid 1\n")
field.append("%i " % guest.natoms)
field.append(" ".join("%i" % (x + 1) for x in range(guest.natoms)))
field.append("\nfinish\n")
# Framework
field.extend(["Framework\n",
"NUMMOLS %i\n" % prod(supercell),
"ATOMS %i\n" % len(self.atoms)])
if options.getbool('mc_zero_charges'):
for atom in self.atoms:
field.append("%-6s %12.6f %20.14f %6i %6i\n" %
(atom.type, atom.mass, 0.0, 1, 1))
else:
for atom in self.atoms:
field.append("%-6s %12.6f %20.14f %6i %6i\n" %
(atom.type, atom.mass, atom.charge, 1, 1))
field.append("finish\n")
# VDW potentials
atom_set = [atom.type for atom in self.atoms]
for guest in self.guests:
atom_set.extend(atom.type for atom in guest.atoms)
atom_set = unique(atom_set)
field.append("VDW %i\n" % ((len(atom_set) * (len(atom_set) + 1)) / 2))
# modify local ff to deal with guests
force_field = copy(UFF)
for guest in self.guests:
force_field.update(guest.potentials)
# zero everything if we just want framework
if dummy:
force_field = dict((element, (0.0, 0.0)) for element in force_field)
for idxl in range(len(atom_set)):
for idxr in range(idxl, len(atom_set)):
left = atom_set[idxl]
right = atom_set[idxr]
try:
sigma, epsilon = lorentz_berthelot(force_field[left],
force_field[right])
except KeyError:
# catch this if not in the UFF -> zero
warning("No potential defined for %s %s; defaulting to 0" %
(left, right))
sigma, epsilon = 0.0, 0.0
field.append("%-6s %-6s lj %f %f\n" %
(left, right, epsilon, sigma))
# EOF
field.append("close\n")
return config, field
[docs] def to_gromacs(self, metal_geometry='input'):
"""Generate GROMACS structure and topology.
metal_geometry will affect topology terms with metal atoms. 'input'
will use the structure to generate the topology, and 'fix' will also
apply a stiffer potential.
Return gro, top and itp files as lists of lines.
"""
# Use this to multiply the potential terms and make them rigid
metal_stiffness_factor = 1
if metal_geometry.lower().startswith('fix'):
keep_metal_geometry = True
metal_stiffness_factor = 10
info("Metals fixed at input geometries")
elif metal_geometry.lower().startswith('in'):
keep_metal_geometry = True
info("Using input geometry for metals")
else:
keep_metal_geometry = False
info("Using UFF parameters for metals")
##
# .gro file
##
# TITLE line
# number of atoms
gro_file = ["%s\n" % self.name, " %i\n" % self.natoms]
# Don't use the MOF name so that we can refer to this easily later
residue = (1, "RESI")
for idx, atom in enumerate(self.atoms):
# In the specification the atom position is given as
# %8.3f in nm, but we probably would like more accuracy
# but make sure there are 5 figures to left of decimal point
pos_nm = tuple([x/10.0 for x in atom.pos])
gro_file.append(("%5i%5s" % residue) +
("%5s%5i" % (atom.uff_type, idx + 1)) +
("%15.10f%15.10f%15.10f\n" % pos_nm))
# cell is also in nm
# v1(x) v2(y) v3(z) v1(y) v1(z) v2(x) v2(z) v3(x) v3(y)
box = self.cell.cell/10.0
# gromacs needs these conditions. Might need to rotate cell sometimes?
if not (box[0][1] == box[0][2] == box[1][2] == 0):
error("Gromacs can't handle this cell orientation")
gro_file.append("%f %f %f %f %f %f %f %f %f\n" % (
box[0][0], box[1][1], box[2][2],
box[0][1], box[0][2],
box[1][0], box[1][2],
box[2][0], box[2][1]))
##
# .top file
##
comb_rule = 2 # 1 = LORENTZ_BERTHELOT; 2 = GEOMETRIC
# geometric is recommended in UFF
# if you use Lorentz Berthelot then sigma and epsilon become C6 and C12
top_file = [
"; Topology file for %s\n" % self.name,
"[ defaults ]\n",
"; nbfunc comb-rule gen-pairs fudgeLJ fudgeQQ\n",
"1 %i yes 1.0 1.0\n\n"
% comb_rule]
# define the forcefield for UFF
unique_types = set()
top_file.append("[ atomtypes ]\n")
top_file.append("; name1 name2 mass charge "
" ptype sigma epsilon\n")
for atom in self.atoms:
if atom.uff_type not in unique_types:
uff_type = atom.uff_type
# sigma = x1 * 2^(-1/6)
# CONVERT TO nm !!
sigma = 0.1*UFF_FULL[uff_type][2]*(2**(-1.0/6.0))
# epsilon = D1 in kcal
epsilon = UFF_FULL[uff_type][3] * KCAL_TO_KJ
top_file.append("%-6s %-6s %9.4f %9.4f A %12.7f "
"%12.7f\n" % (uff_type, uff_type, atom.mass,
0.0, sigma, epsilon))
unique_types.add(uff_type)
# for included file, use default name scheme "topol..."
top_file.extend(["\n#include <topol.itp>\n\n",
"[ system ]\n",
"UFF optimisation of %s\n\n" % self.name,
"[ molecules ]\n",
"%s 1\n" % residue[1]])
##
# .itp file
##
# exclude 3 neighbours
itp_file = ["[ moleculetype ]\n",
"; molname nrexcl\n"
"%s 3\n" % residue[1],
"[ atoms ]\n",
"; nr type resnr residue atom "
"cgnr charge mass \n"]
##
# atoms
##
for idx, atom in enumerate(self.atoms):
uff_type = atom.uff_type
# charge group is different for each atom
# as gromacs has max of 32 in a group
itp_file.append("%-6i %-6s %i %-6s %-6s %d %9.4f %9.4f\n" %
(idx+1, uff_type, residue[0], residue[1], uff_type,
idx+1, atom.charge, atom.mass))
##
# bonds
##
itp_file.append("\n[ bonds ]\n")
itp_file.append("; ai aj funct b0 kb\n")
unique_bonds = {}
bonding_table = dict([(x, {}) for x in range(self.natoms)])
#FIXME(tdaff) bonds will have length in v2
for bond, bondorder in self.bonds.items():
idx_a = bond[0]
idx_b = bond[1]
atom_a = self.atoms[idx_a]
atom_b = self.atoms[idx_b]
uff_a = atom_a.uff_type
uff_b = atom_b.uff_type
typed_bond = tuple(sorted([uff_a, uff_b]) + [bondorder])
bonding_table[idx_a][idx_b] = typed_bond
bonding_table[idx_b][idx_a] = typed_bond
# have we already calculated the parameters
if not typed_bond in unique_bonds:
ri = UFF_FULL[uff_a][0]
rj = UFF_FULL[uff_b][0]
chiI = UFF_FULL[atom_a.uff_type][8]
chiJ = UFF_FULL[atom_b.uff_type][8]
rbo = -0.1332*(ri+rj)*log(bondorder)
ren = ri*rj*(((sqrt(chiI) - sqrt(chiJ))**2))/(chiI*ri + chiJ*rj)
r0 = (ri + rj + rbo - ren)
# force constant
# parameters Z1
kb = (UFF_FULL[uff_a][5]*UFF_FULL[uff_b][5])/(r0**3)
kb *= 0.5*KCAL_TO_KJ*664.12
unique_bonds[typed_bond] = (r0, kb) # in nm
if atom_a.is_metal or atom_b.is_metal and keep_metal_geometry:
params = (min_distance(atom_a, atom_b, self.cell.cell),
unique_bonds[typed_bond][1]*metal_stiffness_factor)
else:
params = unique_bonds[typed_bond]
bond_func = 1 # gromacs harmonic
# add 1 to bond as 1 indexed
# All these are converted to gromcas units, ugh...
itp_file.append('%-5i %-5i %1i %11.4f %11.4f ; %-5s %-5s %.2f\n' %
(bond[0]+1, bond[1]+1, bond_func, 0.1*params[0],
200*params[1], typed_bond[0], typed_bond[1],
bondorder))
##
# angles
##
itp_file.append("\n[ angles ]\n")
for idx_a in sorted(bonding_table):
bonded_atoms = bonding_table[idx_a]
for l_idx in bonded_atoms:
for r_idx in bonded_atoms:
if r_idx >= l_idx:
# don't include angles twice
continue
# FIXME(tdaff) special cases for 5 or more coord
# tag central uff and two largest neighbours
central_atom = self.atoms[idx_a]
l_atom = self.atoms[l_idx]
r_atom = self.atoms[r_idx]
# FIXME(tdaff) over/under coordinated atoms?
theta0 = UFF_FULL[central_atom.uff_type][1]
# FIXME(tdaff) switch if coordination is not correct
cosT0 = cos(theta0*DEG2RAD)
sinT0 = sin(theta0*DEG2RAD)
c2 = 1.0 / (4.0 * sinT0 * sinT0)
c1 = -4.0 * c2 * cosT0
c0 = c2*(2.0*cosT0*cosT0 + 1.0)
zi = UFF_FULL[l_atom.uff_type][5]
zk = UFF_FULL[r_atom.uff_type][5]
bond_ab = tuple(sorted((l_idx, idx_a)))
bond_ab = tuple(sorted([l_atom.uff_type, central_atom.uff_type]) + [self.bonds[bond_ab]])
bond_bc = tuple(sorted((r_idx, idx_a)))
bond_bc = tuple(sorted([r_atom.uff_type, central_atom.uff_type]) + [self.bonds[bond_bc]])
rab = unique_bonds[bond_ab][0]
rbc = unique_bonds[bond_bc][0]
rac = sqrt(rab*rab + rbc*rbc - 2.0 * rab*rbc*cosT0)
ka = (644.12 * KCAL_TO_KJ) * (zi * zk / (rac**5.0))
ka *= (3.0*rab*rbc*(1.0 - cosT0*cosT0) - rac*rac*cosT0)
# FIXME(tdaff) change uff_coordination to coordination
if central_atom.uff_coordination == 1:
# linear bonds (e.g. C_1 triple bonds) had 0 force
# constant otherwise
thetamin = 180.0
kappa = ka
elif abs(c2) > 0.001:
thetamin = pi - arccos(c1/(4.0*c2))
thetamin /= DEG2RAD
kappa = ka * (16.0*c2*c2 - c1*c1) / (4.0*c2)
elif central_atom.coordination == 2:
thetamin = 120.0
kappa = 4.0*ka/3.0
elif central_atom.coordination in (4, 6):
thetamin = 90.0
kappa = 2.0*ka
elif central_atom.coordination == 7:
alpha = 2.0*pi/5.0
c7 = sin(alpha)*(cos(alpha) - cos(2*alpha))
thetamin = 72.0
kappa = 2.0 * c7*c7 * ka * c1
else:
thetamin = pi - arccos(c1/(4.0*c2))
thetamin /= DEG2RAD
kappa = ka * (16.0*c2*c2 - c1*c1) / (4.0*c2)
if keep_metal_geometry and (central_atom.is_metal or
l_atom.is_metal or
r_atom.is_metal):
# harmonic potential means it will not
# flip around
potential = "harmonic"
kappa *= metal_stiffness_factor
thetamin = angle_between(l_atom, central_atom, r_atom,
cell=self.cell)
elif central_atom.coordination == 1:
# linear bonds seemed too flexible
potential = "harmonic"
else:
potential = "G96"
# Use the harmonic approximation for linear angles as the
# stiffness is undefined for the G96 form
if potential == "G96" and thetamin != 180.0:
kappa /= sin(thetamin*DEG2RAD)**2
potential_function = 2
else: # harmonic
potential_function = 1
angle_fmt = ("%-6i %-6i %-6i %i %9.4f %9.4f ;"
" %-6s %-6s %-6s\n")
itp_file.append(angle_fmt % (
l_idx + 1, idx_a + 1, r_idx + 1, potential_function,
thetamin, kappa, l_atom.uff_type,
central_atom.uff_type, r_atom.uff_type))
##
# dihedrals
##
itp_file.append("\n[ dihedrals ]\n; proper torsion terms\n")
# Like OB FindTorsions
# Generate all torsions first so we can find equivalents
torsions = []
for idx_b in sorted(bonding_table):
for idx_c in bonding_table[idx_b]:
if idx_b >= idx_c:
continue
for idx_a in bonding_table[idx_b]:
if idx_a == idx_c:
continue
for idx_d in bonding_table[idx_c]:
if idx_d == idx_b or idx_d == idx_a:
continue
else:
torsions.append((idx_a, idx_b, idx_c, idx_d))
overcoordinated = set()
# now loop to generate force field
for torsion in torsions:
idx_a, idx_b, idx_c, idx_d = torsion
atom_a = self.atoms[idx_a]
atom_b = self.atoms[idx_b]
atom_c = self.atoms[idx_c]
atom_d = self.atoms[idx_d]
bond_bc = bonding_table[idx_b][idx_c]
torsiontype = bond_bc[2] # bond order
# correct for over-coordinated things like vanadium
if atom_b.is_metal and atom_b.coordination > atom_b.uff_coordination:
coord_b = atom_b.coordination
overcoordinated.add(atom_b)
else:
coord_b = atom_b.uff_coordination
if atom_c.is_metal and atom_c.coordination > atom_c.uff_coordination:
coord_c = atom_c.coordination
overcoordinated.add(atom_c)
else:
coord_c = atom_c.uff_coordination
coord_bc = (coord_b, coord_c)
V = 0
n = 0
if coord_bc == (3, 3):
# two sp3 centers
phi0 = 60.0
n = 3
vi = UFF_FULL[atom_b.uff_type][6]
vj = UFF_FULL[atom_c.uff_type][6]
# exception for a pair of group 6 sp3 atoms
if atom_b.atomic_number == 8:
vi = 2.0
n = 2
phi0 = 90.0
elif atom_b.atomic_number in (16, 34, 52, 84):
vi = 6.8
n = 2
phi0 = 90.0
if atom_c.atomic_number == 8:
vj = 2.0
n = 2
phi0 = 90.0
elif atom_c.atomic_number in (16, 34, 52, 84):
vj = 6.8
n = 2
phi0 = 90.0
V = 0.5 * KCAL_TO_KJ * (vi * vj)**0.5
elif coord_bc == (2, 2):
# two sp2 centers
ui = UFF_FULL[atom_b.uff_type][7]
uj = UFF_FULL[atom_c.uff_type][7]
phi0 = 180.0
n = 2
V = (ui*uj)**0.5 * (1.0 + 4.18*log(torsiontype))
V *= 0.5 * KCAL_TO_KJ * 5.0
elif coord_bc in [(2, 3), (3, 2)]:
# one sp3, one sp2
phi0 = 0.0
n = 6
V = 0.5 * KCAL_TO_KJ * 1.0
# exception for group 6 sp3
if coord_c == 3:
if atom_c.atomic_number in (8, 16, 34, 52):
n = 2
phi0 = 90.0
if coord_b == 3:
if atom_b.atomic_number in (8, 16, 34, 52):
n = 2
phi0 = 90.0
if abs(V) < 2e-6: # don't bother calculating this torsion
continue
# Dividing by equivalent torsions is a GG addition
equivalent_torsions = 0
for equivalent in torsions:
if equivalent[1:3] == (idx_b, idx_c):
equivalent_torsions += 1
V /= equivalent_torsions
nphi0 = n*phi0
if abs(sin(nphi0*DEG2RAD)) > 1.0e-3:
error("WARNING!!! nphi0 = %r" % nphi0)
if atom_b.is_metal or atom_c.is_metal and keep_metal_geometry:
V *= metal_stiffness_factor
phi_s = dihedral(atom_a, atom_b, atom_c, atom_d, cell=self.cell)
else:
phi_s = nphi0 - 180.0 # phi_s in degrees
tor_fmt = ("%-6i %-6i %-6i %-6i %i %9.4f %9.4f %i ;"
" %-6s %-6s %-6s %-6s\n")
tor_type = 1 # proper dihedrals in GROMACS
itp_file.append(tor_fmt % (
idx_a + 1, idx_b + 1, idx_c + 1, idx_d + 1,
tor_type, phi_s, V, n,
atom_a.uff_type, atom_b.uff_type,
atom_c.uff_type, atom_d.uff_type))
# Don't warn about overcoordination unless user cares.
if overcoordinated:
debug("%i overcoordinated atoms found: %s" % (len(overcoordinated),
" ".join(set(x.uff_type for x in overcoordinated))))
##
# inversions / improper dihedrals
##
itp_file.append("\n[ dihedrals ]\n; inversions (improper dihedrals)\n")
for idx_b in sorted(bonding_table):
atom_b = self.atoms[idx_b]
# Only have parameters for limited elements
# and only want those with 3 neighbours
if not atom_b.atomic_number in (6, 7, 8, 15, 33, 51, 83):
continue
elif len(bonding_table[idx_b]) != 3:
continue
idx_a, idx_c, idx_d = bonding_table[idx_b]
atom_a = self.atoms[idx_a]
atom_c = self.atoms[idx_c]
atom_d = self.atoms[idx_d]
if atom_b.uff_type in ('N_3', 'N_2', 'N_R', 'O_2', 'O_R'):
c0 = 1.0
c1 = -1.0
c2 = 0.0
koop = 6.0*KCAL_TO_KJ
elif atom_b.uff_type in ('P_3+3', 'As3+3', 'Sb3+3', 'Bi3+3'):
if atom_b.uff_type == 'P_3+3':
phi = 84.4339 * DEG2RAD
elif atom_b.uff_type == 'As3+3':
phi = 86.9735 * DEG2RAD
elif atom_b.uff_type == 'Sb3+3':
phi = 87.7047 * DEG2RAD
else:
phi = 90.0 * DEG2RAD
c1 = -4.0 * cos(phi)
c2 = 1.0
c0 = -1.0*c1*cos(phi) + c2*cos(2.0*phi)
koop = 22.0 * KCAL_TO_KJ
elif atom_b.uff_type in ('C_2', 'C_R'):
c0 = 1.0
c1 = -1.0
c2 = 0.0
koop = 6.0*KCAL_TO_KJ
if 'O_2' in (atom_a.uff_type, atom_c.uff_type, atom_d.uff_type):
koop = 50.0 * KCAL_TO_KJ
else:
continue
# three permutations:
koop /= 3
# that was easy...
if abs(c2) < 1.0e-5:
csi0 = 0.0
kcsi = koop
else:
#TODO(tdaff): check if this is multiply or divide
csi0 = arccos(-c1/(4.0*c2))/DEG2RAD # csi_0 in degrees
kcsi = (16.0*c2*c2-c1*c1)/(4.0*c2*c2)
kcsi *= koop # kcsi in kJ/mol/rad^2
# put it thrice, middle atom first
# b, a, c, d
# b, d, c, a
# b, a, d, c
inv_fmt = ("%-6i %-6i %-6i %-6i %i %9.4f %9.4f ;"
" %-6s %-6s %-6s %-6s\n")
inv_type = 2 # improper dihedrals in GROMACS
itp_file.append(inv_fmt % (
idx_b + 1, idx_a + 1, idx_c + 1, idx_d + 1,
inv_type, csi0, kcsi,
atom_b.uff_type, atom_a.uff_type,
atom_c.uff_type, atom_d.uff_type))
itp_file.append(inv_fmt % (
idx_b + 1, idx_d + 1, idx_c + 1, idx_a + 1,
inv_type, csi0, kcsi,
atom_b.uff_type, atom_d.uff_type,
atom_c.uff_type, atom_a.uff_type))
itp_file.append(inv_fmt % (
idx_b + 1, idx_a + 1, idx_d + 1, idx_c + 1,
inv_type, csi0, kcsi,
atom_b.uff_type, atom_a.uff_type,
atom_d.uff_type, atom_c.uff_type))
# done!
return gro_file, top_file, itp_file
[docs] def to_cssr(self, cartesian=False, no_atom_id=False):
"""
Return a Cerius2 cssr file with coordinates and cell as a list of
strings. Set no_atom_id to produce labels to work with Zeo++.
"""
space_group = (1, "P 1")
opt = 1
cell = self.cell
# spacers between all format strings ensures that there will always
# be whitespace between components.
cssr = [" "*38, "%8.3f %7.3f %7.3f\n" % cell.params[:3],
" "*21, "%8.3f %7.3f %7.3f" % cell.params[3:], " "*4,
"SPGR = %3i %-11s" % space_group, "OPT = %i\n" % opt,
"%4i %3i %s\n" % (self.natoms, cartesian, self.name),
"Structure file generated by faps\n"]
for at_idx, atom in enumerate(self.atoms):
# The default of TypeID does not work for Zeo++ as it
# treats each label as a different type
if no_atom_id:
atom_name = "%s" % (atom.type)
else:
atom_name = "%s%i" % (atom.type, at_idx+1)
# Zeo++ needs fractional coordinates
if cartesian:
string_pos = "%9.5f %9.5f %9.5f " % tuple(atom.pos)
else:
string_pos = "%9.5f %9.5f %9.5f " % tuple(atom.ifpos(cell.inverse))
cssr.append("%4i %-4s %s" % (at_idx+1, atom_name, string_pos) +
" 0"*8 + " %7.3f\n" % atom.charge)
return cssr
[docs] def to_zeoplusplus(self):
"""
Return a tuple containing a cssr file, radii file and mass file.
"""
radii = ["%-7s %-f\n" % (atom, UFF[atom][0]/2.0)
for atom in unique(self.types)]
masses = ["%-7s %-f\n" % (atom, WEIGHT[atom])
for atom in unique(self.types)]
return self.to_cssr(no_atom_id=True), radii, masses
[docs] def to_cif(self):
"""Return a CIF file with bonding and atom types."""
name = self.name
atoms = self.atoms
cell = self.cell
if hasattr(self, 'bonds'):
bonds = self.bonds
else:
bonds = {}
inv_cell = cell.inverse
type_count = {}
atom_part = []
for idx, atom in enumerate(atoms):
if atom is None:
# blanks are left in here
continue
if hasattr(atom, 'uff_type') and atom.uff_type is not None:
uff_type = atom.uff_type
else:
uff_type = '?'
if atom.element in type_count:
type_count[atom.element] += 1
else:
type_count[atom.element] = 1
atom.site = "%s%i" % (atom.element, type_count[atom.element])
atom_part.append("%-5s %-5s %-5s " % (atom.site, atom.element, uff_type))
atom_part.append("%f %f %f " % tuple(atom.ifpos(inv_cell)))
atom_part.append("%f\n" % atom.charge)
# Materials studio sorts bonds in the same order as the atoms
# but perceives bonds wrong if bond orders are mixed.
# i.e. it misreads cifs that it writes itself.
bond_properties = []
for bond, order in bonds.items():
bond_type = CCDC_BOND_ORDERS[order]
atom0, atom1 = atoms[bond[0]], atoms[bond[1]]
distance, shift = cif_bond_dist(atom0, atom1, cell)
# assume P1 for now (only one symmetry operation) so "1_???"
if shift == [0, 0, 0]:
site_symm = "."
bond_properties.append((order, bond,
(atom0.site, atom1.site, distance,
site_symm, bond_type)))
else:
site_symm = "1_%i%i%i" % (5+shift[0], 5+shift[1], 5+shift[2])
bond_properties.append((order, bond,
(atom0.site, atom1.site, distance,
site_symm, bond_type)))
# MS also includes the reverse bond over the boundary
site_symm = "1_%i%i%i" % (5-shift[0], 5-shift[1], 5-shift[2])
rbond = (bond[1], bond[0])
bond_properties.append((order, rbond,
(atom1.site, atom0.site, distance,
site_symm, bond_type)))
# Can just sort bond_properties as it will be bond_order, site1, site2
bond_part = ["%-5s %-5s %.3f %-5s %2s\n" % x[2]
for x in sorted(bond_properties)]
cif_file = [
"data_%s\n" % name.replace(' ', '_'),
"%-33s %s\n" % ("_audit_creation_date", time.strftime('%Y-%m-%dT%H:%M:%S%z')),
"%-33s %s\n" % ("_audit_creation_method", "'faps %s'" % __version__),
"%-33s %s\n" % ("_symmetry_space_group_name_H-M", "P1"),
"%-33s %s\n" % ("_symmetry_Int_Tables_number", "1"),
"%-33s %s\n" % ("_space_group_crystal_system", cell.crystal_system),
"loop_\n", "_symmetry_equiv_pos_as_xyz\n", " x,y,z\n",
"%-33s %-.10s\n" % ("_cell_length_a", cell.a),
"%-33s %-.10s\n" % ("_cell_length_b", cell.b),
"%-33s %-.10s\n" % ("_cell_length_c", cell.c),
"%-33s %-.10s\n" % ("_cell_angle_alpha", cell.alpha),
"%-33s %-.10s\n" % ("_cell_angle_beta", cell.beta),
"%-33s %-.10s\n" % ("_cell_angle_gamma", cell.gamma),
"%-33s %s\n" % ("_cell_volume", cell.volume),
# start of atom loops
"\nloop_\n",
"_atom_site_label\n",
"_atom_site_type_symbol\n",
"_atom_site_description\n",
"_atom_site_fract_x\n",
"_atom_site_fract_y\n",
"_atom_site_fract_z\n",
"_atom_type_partial_charge\n"] + atom_part
# Don't put this if there are no bonds!
if bond_part:
cif_file.extend([
# bonding loop
"\nloop_\n",
"_geom_bond_atom_site_label_1\n",
"_geom_bond_atom_site_label_2\n",
"_geom_bond_distance\n",
"_geom_bond_site_symmetry_2\n",
"_ccdc_geom_bond_type\n"] + bond_part)
return cif_file
[docs] def fastmc_postproc(self, filepath, tp_point, options):
"""Update structure properties from gcmc OUTPUT."""
startdir = os.getcwd()
os.chdir(filepath)
# Since Pete changed output strip indentation and blank lines
filetemp = open('OUTPUT')
output = strip_blanks(filetemp.readlines())
filetemp.close()
# Keep track of supercell so we can get unit cell values
supercell_mult = prod(self.gcmc_supercell)
# Still positional as we need multiple values simultaneously
# and very old versions changed wording of heat of adsorption
# and enthalpy of guest
# TODO(tdaff, r2.0): deprecate reading older fastmc files
# and put Cv in the guest definition
for line in output[::-1]:
if "+/-" in line:
# This is version 1.3 of fastmc
debug("NEW OUTPUT")
line_offset = 5
read_cv = True
# In future this should be assumed to exist
for guest in self.guests:
if not hasattr(guest, 'c_v'):
guest.c_v = {}
break
else:
# +/- not found, assume old style output
debug("OLD OUTPUT")
line_offset = 0
read_cv = False
for idx, line in enumerate(output):
# Assume that block will always start like this
if 'final stats' in line:
idx += line_offset
guest_id = int(line.split()[4]) - 1
self.guests[guest_id].uptake[tp_point] = (
float(output[idx + 1].split()[-1]),
float(output[idx + 2].split()[-1]),
supercell_mult)
# This will sometimes be NaN
self.guests[guest_id].hoa[tp_point] = (
float(output[idx + 3].split()[-1]),
float(output[idx + 4].split()[-1]))
if read_cv:
self.guests[guest_id].c_v[tp_point] = (
float(output[idx + 5].split()[-1]),
float(output[idx + 6].split()[-1]))
elif 'total accepted steps' in line:
counted_steps = int(line.split()[-1])
if counted_steps < 10000:
warning("Number of accepted GCMC steps is very low; "
"only %i counted!" % counted_steps)
fold = options.getbool('fold')
find_maxima = options.getbool('find_maxima')
prob_plot = options.getbool('mc_probability_plot')
folded = False
if prob_plot and (fold or find_maxima):
sigma = options.getfloat('absl_sigma')
radius = options.getfloat('absl_radius')
cutoff = options.getfloat('absl_cutoff')
write = options.getbool('absl_write_smooth_cube')
folded = self.fold_and_maxima(fold, find_maxima, tp_point, sigma,
radius, cutoff, write)
if folded and not options.getbool('fastmc_keep_unfolded_cubes'):
debug("Removing unfolded cube files")
cubes = glob("prob_guest??_prob_??.cube")
remove_files(cubes)
unneeded_files = options.gettuple('fastmc_delete_files')
remove_files(unneeded_files)
keep_files = options.gettuple('fastmc_compress_files')
compress_files(keep_files)
os.chdir(startdir)
[docs] def absl_postproc(self, filepath, tp_point, options):
"""Update structure properties from DL_POLY outputs."""
startdir = os.getcwd()
os.chdir(filepath)
# For pretty output
temp = tp_point[0]
press = tp_point[1]
tp_string = " T=%.1f " % temp + " ".join(["P=%.2f" % x for x in press])
statis = compressed_open('STATIS').readlines()
empty_esp = float(statis[3].split()[4])
for guest in self.guests:
info("Postprocessing: %s at %s" % (guest.ident, tp_string))
binding_energies = []
for bs_idx, binding_site in enumerate(guest.binding_sites[tp_point]):
bs_directory = "%s_bs_%04d" % (guest.ident, bs_idx)
output = compressed_open(path.join(bs_directory,
'OUTPUT')).readlines()
if 'error - quaternion integrator failed' in output[-1]:
# bad guest placement
e_vdw = float('nan')
e_esp = float('nan')
# put position of original as final position, nan anyway
try:
shutil.move(path.join(bs_directory, 'CONFIG'),
path.join(bs_directory, 'REVCON'))
except IOError:
# CONFIG already moved, just doing update?
pass
# This is not a 'binding site'
magnitude = 0.0
else:
# energies
statis = compressed_open(path.join(bs_directory,
'STATIS')).readlines()
# Need to do a backwards search for the start of the last
# block since block size varies with number of atoms
# Timestep line will start with more spaces than data line
lidx = 0 # start block of final STATIS data
for ridx, line in enumerate(statis[::-1]):
if line.startswith(' ') and not 'NaN' in line:
lidx = ridx
break
else:
warning('No energies in STATIS')
e_vdw = float(statis[-lidx].split()[3])
e_esp = float(statis[-lidx].split()[4]) - empty_esp
# can just use the peak value
magnitude = binding_site[0][2]
# get position position
revcon = compressed_open(path.join(bs_directory,
'REVCON')).readlines()
# If using MD, skip is 4 due to velocities and forces!
revcon = revcon[6::2]
# Fix molecule if it crosses boundaries
# have to make dummy objects with fractional attribute for
# the minimum_image function
cell = self.cell
# For now, put (atom, dummy) in here
positions = []
for atom, ratom in zip(guest.atoms, revcon):
try:
dummy_pos = [float(x) for x in ratom.split()]
except ValueError:
# DL_POLY prints large numbers wrong, like
# 0.3970159038+105; these are too big anyway, so nan them
dummy_pos = [float('nan'), float('nan'), float('nan')]
# Sometimes atoms get flung out of the cell, but have
# high binding energy anyway, ignore them
if any((abs(x) > 2*cell.minimum_width) for x in dummy_pos):
e_vdw = float('nan')
e_esp = float('nan')
dummy = Atom(parent=self)
# create with the fractional, so position matches
dummy.fractional = [x % 1.0 for x in
dot(cell.inverse, dummy_pos)]
# Put dummy nearest to first atom, if it exists
if positions:
dummy.pos = minimum_image(positions[0][1], dummy,
cell.cell)
positions.append((atom, dummy))
else:
# ensure anchor is within the unit cell
dummy.pos = dot(dummy.fractional, cell.cell)
positions.append((atom, dummy))
# Extract the information we need from the tuples
positions = [(atom.type, dummy.pos)
for atom, dummy in positions]
info("Binding site %i: %f kcal/mol, %f occupancy" %
(bs_idx, (e_vdw+e_esp), magnitude))
binding_energies.append([magnitude, e_vdw, e_esp, positions])
with open('%s_absl.xyz' % guest.ident, 'w') as absl_out:
frame_number = 0
for idx, bind in enumerate(binding_energies):
energy = bind[1] + bind[2]
if energy > 0 or energy != energy:
continue
pc_elec = 100*bind[2]/energy
this_point = [
"%i\n" % len(bind[3]), # number of atoms
# idx, energy, %esp, e_vdw, e_esp, magnitude
" BS: %i, Frame: %i, Ebind= %f, esp= %.2f%%, Evdw= %f, "
"Eesp= %.2f, occ= %f\n" %
(idx, frame_number, energy, pc_elec, bind[1], bind[2],
bind[0])]
for atom in bind[3]:
this_point.append("%-5s " % atom[0])
this_point.append("%12.6f %12.6f %12.6f\n" % tuple(atom[1]))
absl_out.writelines(this_point)
frame_number += 1
if hasattr(guest, 'binding_energies'):
guest.binding_energies[tp_point] = binding_energies
else:
guest.binding_energies = {tp_point: binding_energies}
unneeded_files = options.gettuple('absl_delete_files')
remove_files(unneeded_files)
keep_files = options.gettuple('absl_compress_files')
compress_files(keep_files)
os.chdir(startdir)
[docs] def fold_and_maxima(self, fold=True, find_maxima=True, tp_point=None,
sigma=2.0, radius=0.31, cutoff=0.0, write=False):
"""Determine the positions of maxima and produce an xyz xyz file."""
from cube import Cube
folded = False
if fold:
fold = self.gcmc_supercell
else:
fold = None
for guest_idx, guest in enumerate(self.guests):
guest_locations = {}
for site_idx, sites in enumerate(guest.probability):
guest_cube = Cube("prob_guest%02i_prob_%02i.cube" %
(guest_idx+1, site_idx+1), fold=fold)
if fold is not None:
debug("Folded cube file: %s" % guest_cube.folded_name)
guest_cube.write_cube()
folded = True
if find_maxima:
guest_locations[sites] = guest_cube.maxima(
sigma, radius, cutoff, write)
if guest_locations:
if tp_point:
# We can keep them for later too, must create dict
# since it might not exist in old calculations
if not hasattr(guest, 'guest_locations'):
guest.guest_locations = {tp_point: guest_locations}
else:
guest.guest_locations[tp_point] = guest_locations
maxima = []
for sites in guest_locations:
atom_name = name_from_types(sites, guest)
for atom, magnitude in guest_locations[sites]:
maxima.append((magnitude, "%-6s" % atom_name +
"%10.6f %10.6f %10.6f " % tuple(atom) +
"%10.6f\n" % magnitude))
locations = open("%s-%s.xyz" % (self.name, guest.ident), 'w')
locations.write(" %i\nBinding sites at %r\n" %
(len(maxima), tp_point))
locations.writelines([x[1] for x in sorted(maxima, reverse=True)])
locations.close()
return folded
[docs] def remove_duplicates(self, tolerance=0.02):
"""Find overlapping atoms and remove them."""
uniq_atoms = []
found_atoms = []
for atom in self.atoms:
for uniq_atom in uniq_atoms:
if atom.type != uniq_atom.type:
continue
elif min_distance(atom, uniq_atom) < tolerance:
break
# else excutes when not found here
else:
uniq_atoms.append(atom)
debug("Found %i unique atoms in %i" % (len(uniq_atoms), self.natoms))
self.atoms = uniq_atoms
[docs] def check_connectivity(self):
"""
Carry out pre-optimisation checks checks on the structure to determine
if bonding information is included and if atom types are needed.
No bonding is inferred. Return True if connectivity is bad.
:return: bool
"""
if not self.bonds:
warning("No bonding information found. "
"Typing and optimisation will fail.")
return True
for atom in self.atoms:
if not atom.uff_type or atom.uff_type == '?':
debug("Untyped atom found.")
return True
return False
[docs] def bond_length_check(self, too_long=1.25, too_short=0.7):
"""
Check if all bonds fall within a sensible range of scale factors
of the sum of the covalent radii. Return True if bad bonds are found,
otherwise False.
"""
bad_bonds = False
for bond in self.bonds:
atom = self.atoms[bond[0]]
other = self.atoms[bond[1]]
distance = min_distance(atom, other)
bond_dist = (atom.covalent_radius + other.covalent_radius)
if distance > bond_dist * too_long:
warning("Long bond found: %s(%i) and %s(%i) = %.2f A" %
(atom.site, bond[0], other.site, bond[1], distance))
bad_bonds = True
elif distance < bond_dist * too_short:
warning("Short bond found: %s(%i) and %s(%i) = %.2f A" %
(atom.site, bond[0], other.site, bond[1], distance))
bad_bonds = True
return bad_bonds
[docs] def gen_supercell(self, options):
"""Cacluate the smallest satisfactory supercell and set attribute."""
config_supercell = options.gettuple('mc_supercell', int)
config_cutoff = options.getfloat('mc_cutoff')
if config_cutoff < 12:
warning("Simulation is using a very small cutoff! I hope you "
"know, what you are doing!")
minimum_supercell = self.cell.minimum_supercell(config_cutoff)
supercell = tuple(max(i, j)
for i, j in zip(config_supercell, minimum_supercell))
if self.gcmc_supercell != supercell:
info("%s supercell requested in config" % str(config_supercell))
info("%s minimum supercell for a %.1f cutoff" %
(str(minimum_supercell), config_cutoff))
info("Constructing %s supercell for gcmc." % str(supercell))
self.gcmc_supercell = supercell
[docs] def supercell(self, scale):
"""
Iterate over all the atoms of supercell where scale is an integer
to scale uniformly or triplet with scale factors for each direction.
"""
# Beware supercells larger than 2147483647 are not supported in
# python 2
if isinstance(scale, int):
scale = (scale, scale, scale)
for x_super in range(scale[0]):
for y_super in range(scale[1]):
for z_super in range(scale[2]):
offset = dot((x_super, y_super, z_super), self.cell.cell)
for atom in self.atoms:
newatom = copy(atom)
newatom.translate(offset)
yield newatom
[docs] def order_by_types(self):
"""
Sort the atoms alphabetically and group them as in old versions.
Update bonds to reflect new ordering.
"""
# Append indexes to each and sort on type (legacy sorting)
new_atoms = sorted(enumerate(self.atoms),
key=lambda x: (x[1].type, x[1].site))
self.atoms = [x[1] for x in new_atoms]
if hasattr(self, 'bonds'):
# dict of old_index: new_index
translation_table = dict((j, i) for i, j in
enumerate(x[0] for x in new_atoms))
new_bonds = {}
# Just translate atom indexes, bond data is the same
for bond in self.bonds:
new_bond = tuple(sorted([translation_table[bond[0]],
translation_table[bond[1]]]))
new_bonds[new_bond] = self.bonds[bond]
self.bonds = new_bonds
[docs] def gen_types_from_bonds(self):
"""
Pass the bonding information into openbabel to get the atomic types.
Modifies the atoms in place to set their uff_type attribute.
"""
info("Generating UFF atom types from bonding information")
if not self.bonds:
error('No bonds, cannot generate types!')
return
# import these locally so we can run faps without them
import openbabel as ob
import pybel
# Construct the molecule from atoms and bonds
obmol = ob.OBMol()
obmol.BeginModify()
for atom in self.atoms:
new_atom = obmol.NewAtom()
new_atom.SetAtomicNum(atom.atomic_number)
for bond, bond_order in self.bonds.items():
# Remember openbabel indexes from 1
obmol.AddBond(bond[0]+1, bond[1]+1, OB_BOND_ORDERS[bond_order])
obmol.EndModify()
pybel_mol = pybel.Molecule(obmol)
# need to tell the typing system to ignore all atoms in the setup
# or it will silently crash with memory issues
constraint = ob.OBFFConstraints()
for at_idx in range(pybel_mol.OBMol.NumAtoms()):
constraint.AddIgnore(at_idx)
uff = ob.OBForceField_FindForceField('uff')
uff.Setup(pybel_mol.OBMol, constraint)
uff.GetAtomTypes(pybel_mol.OBMol)
# Dative nitrogen bonds break aromaticity determination from resonant
# structures, so make anything with an aromatic bond be aromatic
for at_idx, atom, ob_atom in zip(count(), self.atoms, pybel_mol):
uff_type = ob_atom.OBAtom.GetData("FFAtomType").GetValue()
if atom.type in ['C', 'N', 'O', 'S']:
for bond, bond_order in self.bonds.items():
if at_idx in bond and bond_order == 1.5:
uff_type = uff_type[0] + '_R'
break
atom.uff_type = uff_type
[docs] def gen_neighbour_list(self, force=False):
"""All atom pair distances."""
# This can be expensive so skip if already calcualted
if not force:
for atom in self.atoms:
if not hasattr(atom, 'neighbours'):
break
else:
# finished loop over all atoms
debug("Neighbour list already calculated")
return
debug("Calculating neighbour list.")
cell = self.cell.cell
inv_cell = self.cell.inverse
cpositions = [atom.ipos(cell, inv_cell) for atom in self.atoms]
fpositions = [atom.ifpos(inv_cell) for atom in self.atoms]
cell = cell.tolist()
# loop over all pairs to find minimum periodic distances
for atom, a_cpos, a_fpos in zip(self.atoms, cpositions, fpositions):
neighbours = []
for o_idx, o_cpos, o_fpos in zip(count(), cpositions, fpositions):
sep = min_dist(a_cpos, a_fpos, o_cpos, o_fpos, cell)
neighbours.append((sep, o_idx))
# First one is self == 0
# save in incresaing distance order
atom.neighbours = sorted(neighbours)[1:]
## Neighbourlist printed in VASP style
#for idx, atom in enumerate(self.atoms):
# print("%4i" % (idx+1) +
# "%7.3f%7.3f%7.3f" % tuple(atom.ifpos(inv_cell)) +
# "-" +
# "".join("%4i%5.2f" % (y+1, x) for x, y in atom.neighbours if x<2.5))
[docs] def surface_area(self, probe=None, value=None, delete=False):
"""
Helper:
Return all {probe:area} if no arguments given
Return the area or None for a given probe
Set area if value given
Delete value if delete is True
Areas in A^2
"""
surface_areas = self.properties.get('surface_area', {})
if value is not None:
surface_areas[probe] = value
self.properties['surface_area'] = surface_areas
elif delete:
# Set it to None to avoid KeyErrors
surface_areas[probe] = None
del surface_areas[probe]
self.properties['surface_area'] = surface_areas
elif probe is not None:
return surface_areas.get(probe, None)
else:
return surface_areas
[docs] def sub_property(self, name, probe=None, value=None, delete=False):
"""
Helper:
Return all {probe:value} if no arguments given
Return the value or None for a given probe
Set area if value given
Delete value if delete is True
Units are based on Angstrom
"""
property_data = self.properties.get(name, {})
if value is not None:
property_data[probe] = value
self.properties[name] = property_data
elif delete:
# Set it to None to avoid KeyErrors
property_data[probe] = None
del property_data[probe]
self.properties[name] = property_data
elif probe is not None:
return property_data.get(probe, None)
else:
return property_data
[docs] def void_volume(self):
"""Estimate the void volume based on VdW radii."""
initial_resolution = 0.2
params = self.cell.params
cell = self.cell.cell
inv_cell = np.linalg.inv(cell.T)
grid_size = [int(ceil(x/initial_resolution)) for x in params[:3]]
print(grid_size)
grid_resolution = [params[0]/grid_size[0],
params[1]/grid_size[1],
params[2]/grid_size[2]]
print(grid_resolution)
grid = np.zeros(grid_size, dtype=bool)
atoms = [(atom.ipos(cell), inv_cell.tolist(),
atom.ifpos(inv_cell),
atom.vdw_radius) for atom in self.atoms]
for x_idx in range(grid_size[0]):
print(x_idx)
for y_idx in range(grid_size[1]):
print(y_idx)
print(grid.sum())
for z_idx in range(grid_size[2]):
grid_pos = [x_idx*grid_resolution[0],
y_idx*grid_resolution[1],
z_idx*grid_resolution[2]]
grid_fpos = [float(x_idx)/grid_size[0],
float(y_idx)/grid_size[1],
float(z_idx)/grid_size[2]]
for pos, fpos, radius in atoms:
dist = min_dist(pos, fpos, grid_pos, grid_fpos, cell)
if dist < radius:
grid[x_idx, y_idx, z_idx] = 1
break
print(grid.sum())
@property
def types(self):
"""Ordered list of atom types."""
return [atom.type for atom in self.atoms]
@property
def atomic_numbers(self):
"""Ordered list of atomic numbers."""
return [atom.atomic_number for atom in self.atoms]
@property
def weight(self):
"""Unit cell weight."""
return sum([atom.mass for atom in self.atoms])
@property
def volume(self):
"""Unit cell volume."""
return self.cell.volume
@property
def natoms(self):
"""Number of atoms in the unit cell."""
return len(self.atoms)
@property
def symmetry_tree(self):
"""Tree of atoms that are symmetrically equivalent."""
tree = {}
for atom_id, atom in enumerate(self.atoms):
if atom.site in tree:
tree[atom.site].append(atom_id)
else:
tree[atom.site] = [atom_id]
if len(tree) == 1 and None in tree:
return dict((i, [i]) for i in range(self.natoms))
else:
return tree
[docs] def get_gcmc_supercell(self):
"""Supercell used for gcmc."""
return self.properties.get('supercell', (1, 1, 1))
[docs] def set_gcmc_supercell(self, value):
"""Set the supercell property for the structure."""
self.properties['supercell'] = value
gcmc_supercell = property(get_gcmc_supercell, set_gcmc_supercell)
# TODO(tdaff): properties: density, surface area, dft_energy, absorbance
[docs]class Cell(object):
"""
Crystollagraphic cell representations and interconversion methods.
Setter methods can be defined for different file types, however
.cell and .params will be self-consistent if set directly.
"""
def __init__(self):
"""Default to a 1A cubic box."""
self._cell = array([[1.0, 0.0, 0.0],
[0.0, 1.0, 0.0],
[0.0, 0.0, 1.0]])
self._params = (1.0, 1.0, 1.0, 90.0, 90.0, 90.0)
self._inverse = None
[docs] def from_pdb(self, line):
"""Extract cell from CRYST1 line in a pdb."""
# Must use fixed widths as -ve numbers do not leave gaps to .split()
self.params = (float(line[6:15]),
float(line[15:24]),
float(line[24:33]),
float(line[33:40]),
float(line[40:47]),
float(line[47:54]))
[docs] def from_lines(self, lines, scale=1.0):
"""Extract cell from a 3-line POSCAR cell representation."""
self.cell = array([[float(x) * scale for x in lines[0].split()],
[float(x) * scale for x in lines[1].split()],
[float(x) * scale for x in lines[2].split()]])
[docs] def to_vector_strings(self, scale=1, bohr=False, fmt="%20.12f"):
"""Generic [Super]cell vectors in Angstrom as a list of strings."""
out_format = 3 * fmt + "\n"
# If the supercell is more than 2147483647 in any direction this
# will fail in python 2, but 'long' removed for py3k forward
# compatibility
if isinstance(scale, int):
scale = [scale, scale, scale]
# else assume an iterable 3-vector
if bohr:
scale = [x / BOHR2ANG for x in scale]
return [out_format % tuple(scale[0] * self.cell[0]),
out_format % tuple(scale[1] * self.cell[1]),
out_format % tuple(scale[2] * self.cell[2])]
[docs] def minimum_supercell(self, cutoff):
"""Calculate the smallest supercell with a half-cell width cutoff."""
a_cross_b = cross(self.cell[0], self.cell[1])
b_cross_c = cross(self.cell[1], self.cell[2])
c_cross_a = cross(self.cell[2], self.cell[0])
volume = dot(self.cell[0], b_cross_c)
widths = [volume / norm(b_cross_c),
volume / norm(c_cross_a),
volume / norm(a_cross_b)]
return tuple(int(ceil(2*cutoff/x)) for x in widths)
@property
def minimum_width(self):
"""The shortest perpendicular distance within the cell."""
a_cross_b = cross(self.cell[0], self.cell[1])
b_cross_c = cross(self.cell[1], self.cell[2])
c_cross_a = cross(self.cell[2], self.cell[0])
volume = dot(self.cell[0], b_cross_c)
return volume / min(norm(b_cross_c), norm(c_cross_a), norm(a_cross_b))
@property
def imcon(self):
"""Guess cell shape and return DL_POLY imcon key."""
keys = {'none': 0,
'cubic': 1,
'orthorhombic': 2,
'parallelepiped': 3}
if np.all(self.cell == 0):
return keys['none']
elif np.all(self.params[3:] == 90):
if self.params[0] == self.params[1] == self.params[2]:
return keys['cubic']
else:
return keys['orthorhombic']
else:
return keys['parallelepiped']
@property
def crystal_system(self):
"""Return the IUCr designation for the crystal system."""
#FIXME(tdaff): must be aligned with x to work
if self.alpha == self.beta == self.gamma == 90:
if self.a == self.b == self.c:
return 'cubic'
elif self.a == self.b or self.a == self.c or self.b == self.c:
return 'tetragonal'
else:
return 'orthorhombic'
elif self.alpha == self.beta == 90:
if self.a == self.b and self.gamma == 120:
return 'hexagonal'
else:
return 'monoclinic'
elif self.alpha == self.gamma == 90:
if self.a == self.c and self.beta == 120:
return 'hexagonal'
else:
return 'monoclinic'
elif self.beta == self.gamma == 90:
if self.b == self.c and self.alpha == 120:
return 'hexagonal'
else:
return 'monoclinic'
elif self.a == self.b == self.c and self.alpha == self.beta == self.gamma:
return 'trigonal'
else:
return 'triclinic'
@property
def volume(self):
"""Calculate cell volume a.bxc."""
b_cross_c = cross(self.cell[1], self.cell[2])
return dot(self.cell[0], b_cross_c)
[docs] def get_cell(self):
"""Get the 3x3 vector cell representation."""
return self._cell
[docs] def set_cell(self, value):
"""Set cell and params from the cell representation."""
# Class internally expects an array
self._cell = array(value).reshape((3,3))
self.__mkparam()
self._inverse = np.linalg.inv(self.cell.T)
# Property so that params are updated when cell is set
cell = property(get_cell, set_cell)
[docs] def get_params(self):
"""Get the six parameter cell representation as a tuple."""
return tuple(self._params)
[docs] def set_params(self, value):
"""Set cell and params from the cell parameters."""
self._params = value
self.__mkcell()
self._inverse = np.linalg.inv(self.cell.T)
# Property so that cell is updated when params are set
params = property(get_params, set_params)
@property
def inverse(self):
"""Inverted cell matrix for converting to fractional coordinates."""
try:
if self._inverse is None:
self._inverse = np.linalg.inv(self.cell.T)
except AttributeError:
self._inverse = np.linalg.inv(self.cell.T)
return self._inverse
@property
def a(self):
"""Magnitude of cell a vector."""
return self.params[0]
@property
def b(self):
"""Magnitude of cell b vector."""
return self.params[1]
@property
def c(self):
"""Magnitude of cell c vector."""
return self.params[2]
@property
def alpha(self):
"""Cell angle alpha."""
return self.params[3]
@property
def beta(self):
"""Cell angle beta."""
return self.params[4]
@property
def gamma(self):
"""Cell angle gamma."""
return self.params[5]
# Implementation details -- directly access the private _{cell|param}
# attributes; please don't break.
def __mkcell(self):
"""Update the cell representation to match the parameters."""
a_mag, b_mag, c_mag = self.params[:3]
alpha, beta, gamma = [x * DEG2RAD for x in self.params[3:]]
a_vec = array([a_mag, 0.0, 0.0])
b_vec = array([b_mag * cos(gamma), b_mag * sin(gamma), 0.0])
c_x = c_mag * cos(beta)
c_y = c_mag * (cos(alpha) - cos(gamma) * cos(beta)) / sin(gamma)
c_vec = array([c_x, c_y, (c_mag**2 - c_x**2 - c_y**2)**0.5])
self._cell = array([a_vec, b_vec, c_vec])
def __mkparam(self):
"""Update the parameters to match the cell."""
cell_a = sqrt(sum(x**2 for x in self.cell[0]))
cell_b = sqrt(sum(x**2 for x in self.cell[1]))
cell_c = sqrt(sum(x**2 for x in self.cell[2]))
alpha = arccos(sum(self.cell[1, :] * self.cell[2, :]) /
(cell_b * cell_c)) * 180 / pi
beta = arccos(sum(self.cell[0, :] * self.cell[2, :]) /
(cell_a * cell_c)) * 180 / pi
gamma = arccos(sum(self.cell[0, :] * self.cell[1, :]) /
(cell_a * cell_b)) * 180 / pi
self._params = (cell_a, cell_b, cell_c, alpha, beta, gamma)
[docs]class Atom(object):
"""Base atom object."""
def __init__(self, at_type=False, pos=False, parent=None, **kwargs):
"""Accept arbritary kwargs as attributes."""
if parent is not None:
self._parent = parent
self.type = at_type
self.pos = pos
self.charge = 0.0
self.idx = False
self.site = None
self.mass = 0.0
self.molecule = None
self.uff_type = None
self.is_fixed = False
# Sets anything else specified as an attribute
for key, val in kwargs.items():
setattr(self, key, val)
def __str__(self):
return "%s %f %f %f" % tuple([self.type] + list(self.pos))
def __repr__(self):
return "Atom(%r,%r)" % (self.type, self.pos)
[docs] def fpos(self, inv_cell):
"""Fractional position within a given cell."""
return dot(inv_cell, self.pos).tolist()
[docs] def ifpos(self, inv_cell):
"""In cell fractional position."""
return [i % 1 for i in self.fpos(inv_cell)]
[docs] def ipos(self, cell, inv_cell):
"""In cell cartesian position."""
return dot(self.ifpos(inv_cell), cell)
[docs] def from_cif(self, at_dict, cell, symmetry=None, idx=None):
"""Extract an atom description from dictionary of cif items."""
self.site = at_dict['_atom_site_label']
# type_symbol takes precedence but need not be specified
self.type = at_dict.get('_atom_site_type_symbol', self.site)
self.mass = WEIGHT[self.type]
frac_pos = [ufloat(at_dict['_atom_site_fract_x']),
ufloat(at_dict['_atom_site_fract_y']),
ufloat(at_dict['_atom_site_fract_z'])]
if symmetry is not None:
frac_pos = symmetry.trans_frac(frac_pos)
self.pos = dot(frac_pos, cell)
if re.match('[0-9]', self.site) and idx is not None:
debug("Site label may not be unique; appending index")
self.site = "%s%i" % (self.site, idx)
if '_atom_site_description' in at_dict:
self.uff_type = at_dict['_atom_site_description']
elif '_atom_type_description' in at_dict:
self.uff_type = at_dict['_atom_type_description']
if '_atom_type_partial_charge' in at_dict:
self.charge = float(at_dict['_atom_type_partial_charge'])
elif '_atom_type_parital_charge' in at_dict:
#TODO(tdaff): remove for 2.0
self.charge = float(at_dict['_atom_type_parital_charge'])
[docs] def from_pdb(self, line, charges=False):
"""
Parse the ATOM line from a pdb file.
Occupancy field may be used to specify the charge as in a '.pqr' file.
"""
# pdb is defined with fixed width fields rather than splitting
self.idx = try_int(line[6:11])
self.site = line[12:16].strip()
self.molecule = try_int(line[22:26])
at_pos = float(line[30:38]), float(line[38:46]), float(line[47:54])
self.pos = at_pos
self.type = line[76:78].strip()
self.mass = WEIGHT[self.type]
if charges:
self.charge = float(line[54:60])
[docs] def from_vasp(self, line, at_type=None, cell=identity(3)):
"""Set the atom data from vasp input. Only pass cell if fractional."""
self.pos = dot([float(x) for x in line.split()[:3]], cell)
if at_type is not None:
self.type = at_type
self.mass = WEIGHT[at_type]
[docs] def from_siesta(self, line, cell):
"""Parse line from SIESTA.STRUCT_OUT file."""
self.pos = dot([float(x) for x in line.split()[2:5]], cell)
self.atomic_number = int(line.split()[1])
self.mass = WEIGHT[self.type]
[docs] def from_xyz(self, line):
"""Parse line from generic xyz file."""
split_line = line.split()
self.pos = [float(x) for x in split_line[1:4]]
if len(split_line) > 4:
try:
self.charge = float(split_line[4])
except ValueError:
# Assume a comment so skip it
pass
self.type = line.split()[0]
self.mass = WEIGHT[self.type]
[docs] def translate(self, vec):
"""Move the atom by the given vector."""
self.pos = [x + y for x, y in zip(self.pos, vec)]
[docs] def get_atomic_number(self):
"""The atomic number for the element, or closest match."""
name = self.type
atomic_number = None
while name:
try:
atomic_number = ATOMIC_NUMBER.index(name)
break
except ValueError:
name = name[:-1]
return atomic_number
[docs] def set_atomic_number(self, value):
"""Set the atom type based on the atomic number."""
self.type = ATOMIC_NUMBER[value]
atomic_number = property(get_atomic_number, set_atomic_number)
[docs] def get_fractional_coordinate(self):
"""Retrieve the fractional coordinates or calculate from the parent."""
try:
# Hopefully the attribute is just set
return self._fractional
except AttributeError:
# Not set yet
try:
self._fractional = self.fpos(self._parent.cell.inverse)
return self._fractional
except AttributeError:
return None
[docs] def set_fractional_coordinate(self, value):
"""Set the position using the fractional coordinates."""
fractional = [x % 1.0 for x in value]
self._fractional = fractional
self.pos = dot(fractional, self._parent.cell.cell)
[docs] def del_fractional_coordinate(self):
"""Remove the fractional coordinate; run after updating cell."""
try:
del self._fractional
except AttributeError:
# Attribute does not exist, so can't delete it
pass
fractional = property(get_fractional_coordinate, set_fractional_coordinate, del_fractional_coordinate)
@property
def element(self):
"""Guess the element from the type, fall back to type."""
name = self.type
while name:
if name in ATOMIC_NUMBER:
return name
else:
name = name[:-1]
return self.type
@property
def vdw_radius(self):
"""Get the vdw radius from the UFF parameters."""
return UFF[self.type][0]/2.0
@property
def covalent_radius(self):
"""Get the covalent radius from the library parameters."""
if self.type == 'C' and self.uff_type and self.uff_type in COVALENT_RADII:
return COVALENT_RADII[self.uff_type]
return COVALENT_RADII[self.type]
@property
def is_metal(self):
"""Return True if element is in a predetermined set of metals."""
return self.atomic_number in METALS
@property
def index(self):
"""Return the index of the atom in the current parent structure."""
return self._parent.atoms.index(self)
@property
def uff_coordination(self):
"""
The expected coordination based on the third character of the UFF
name for the element. Based on _ipar from OB forcefielduff.cpp
"""
coordinations = {
'1': 1,
'2': 2,
'R': 2,
'3': 3,
'4': 4,
'5': 5,
'6': 6,
'7': 7,
}
try:
# Just pull for the dict, or default to 1 for unknowns
return coordinations.get(self.uff_type[2], 1)
except (IndexError, TypeError):
# This catches things like 'H_' and when there is no uff_type
return 1
@property
def coordination(self):
"""
The actual coordination of the atom based on bonds in the parent.
Counts all the bonds and returns an integer.
Returns
-------
int
Coordination of the atom
"""
atom_index = self.index
return sum([atom_index in bond for bond in self._parent.bonds])
[docs]class Guest(object):
"""Guest molecule and properties."""
def __init__(self, ident=None, guest_path=None):
"""Populate an empty guest then load from library if required."""
self.ident = ''
self.name = "Unknown guest"
self.species = 'unknown'
self.potentials = {}
self.probability = []
self.atoms = []
self.source = "Unknown source"
self.uptake = {}
self.hoa = {}
self.c_v = {}
self.guest_locations = {}
self.binding_sites = {}
self.binding_energies = {}
self.fugacities = {}
# only load if asked, set the ident in the loader
if ident:
self.load_guest(ident, guest_path=None)
[docs] def load_guest(self, ident, guest_path=None):
"""Look in guests.lib in submit directory and default."""
# Ident set here to keep consistent
self.ident = ident
# Need the different directories
if guest_path is None:
guest_path = os.getcwd()
if __name__ != '__main__':
script_dir = path.dirname(__file__)
else:
script_dir = path.abspath(sys.path[0])
dot_faps_dir = path.join(path.expanduser('~'), '.faps')
# A parser for each location
job_guests = configparser.SafeConfigParser()
dot_faps_guests = configparser.SafeConfigParser()
lib_guests = configparser.SafeConfigParser()
# Try and find guest in guests.lib
job_guests.read(path.join(guest_path, 'guests.lib'))
dot_faps_guests.read(path.join(dot_faps_dir, 'guests.lib'))
lib_guests.read(path.join(script_dir, 'guests.lib'))
# Job dir and user defined have priority
if job_guests.has_section(ident):
debug("%s found in job dir" % ident)
self._parse_guest(job_guests.items(ident))
elif dot_faps_guests.has_section(ident):
debug("%s found in dot_faps" % ident)
self._parse_guest(dot_faps_guests.items(ident))
elif lib_guests.has_section(ident):
debug("%s found in library" % ident)
self._parse_guest(lib_guests.items(ident))
else:
error("Guest not found: %s" % ident)
def _parse_guest(self, raw_text):
"""Set attributes according to the raw input."""
for key, val in raw_text:
if key == 'atoms':
# Build a local list to replace what is there
new_atoms = []
# Only use non blank lines
atoms = strip_blanks(val.splitlines())
for atom in atoms:
atom = atom.split()
new_atoms.append(Atom(
at_type=atom[0],
mass=float(atom[1]),
charge=float(atom[2]),
pos=tuple(float(x) for x in atom[3:6])))
self.atoms = new_atoms
elif key == 'potentials':
potens = strip_blanks(val.splitlines())
for poten in potens:
poten = poten.split()
self.potentials[poten[0]] = tuple(float(x)
for x in poten[1:])
elif key == 'probability':
prob = strip_blanks(val.splitlines())
self.probability = [tuple(int(y) for y in x.split())
for x in prob]
elif key == 'molar volume':
self._molar_volume = float(val)
elif key == 'probe radius':
self.probe_radius = float(val)
else:
# Arbitrary attributes can be set
# Might also enable breakage
setattr(self, key, val)
[docs] def aligned_to(self, target, align=None, orient=None):
"""
Return the atomic positions of the guest with it located at position
and aligned using the align and orient vectors. Align and orient are
optional and the guest will be aligned using whatever is supplied.
Parameters
----------
target : tuple of int, list of float
The tuple contains the index of the atom to position the
guest and the position.
align : tuple of int, list of float
The index of the atom used to align the guest followed by the
alignment vector
orient : tuple of int, list of float
The index of the atom used to orient the guest followed by the
orientation vector
Returns
-------
positions : list of list of float
The positions of the aligned atoms in a list
"""
# Move to origin so that we can do rotation
target_idx, target = target[0], target[1]
target_guest = self.atoms[target_idx].pos
guest_position = [[x - y for x, y in zip(atom.pos, target_guest)]
for atom in self.atoms]
if align is not None:
align_idx, align = align
align_guest = guest_position[align_idx]
rotate = matrix_rotate(align_guest, align)
guest_position = [dot(rotate, pos) for pos in guest_position]
if orient is not None:
orient_idx, orient = orient
# guest position has changed
align_guest = guest_position[align_idx]
orient_guest = guest_position[orient_idx]
# align normals
normal_guest = cross(align_guest, orient_guest)
normal_orient = cross(align, orient)
# normals are [0.0, 0.0, 0.0] for parallel
if normal_guest.any() and normal_orient.any():
rotate = matrix_rotate(normal_guest, normal_orient)
guest_position = [dot(rotate, pos)
for pos in guest_position]
# move to location after orientation as it is easier
guest_position = [[x + y for x, y in zip(atom, target)]
for atom in guest_position]
return guest_position
[docs] def is_linear(self, idx_1, idx_2=None, idx_3=None):
"""
Return whether the atoms at the three indexes are linear. If fewer than
three indices are given, always return True.
Parameters
----------
idx_1 : int
Index of an atom in the guest.
idx_2 : int
Index of an atom in the guest. Optional.
idx_3 : int
Index of an atom in the guest. Optional.
Returns
-------
linear : bool
False if three atoms are given and they are not linear, True
otherwise.
"""
if idx_2 is not None and idx_3 is not None:
v_12 = [x - y for x, y in zip(self.atoms[idx_1].pos,
self.atoms[idx_1].pos)]
v_23 = [x - y for x, y in zip(self.atoms[idx_2].pos,
self.atoms[idx_3].pos)]
cross_123 = cross(v_12, v_23)
if cross_123.any():
return False
else:
return True
else:
return True
[docs] def is_reversible(self, idx_1, idx_2=None, idx_3=None):
"""
Return whether swapping indexes will give you an equivalent guest. For
example CO2 is linear and symmetric so each O atom is equivalent.
Parameters
----------
idx_1 : int
Index of an atom in the guest.
idx_2 : int
Index of an atom in the guest. Optional.
idx_3 : int
Index of an atom in the guest. Optional.
Returns
-------
reversible : bool
True if atoms are equivalent.
"""
if idx_2 is not None and idx_3 is not None:
# Ignore the central atom, assume 2 and three swap
src_idx = idx_2
dest_idx = idx_3
elif idx_2 is not None:
# Can the two atoms be reversed?
src_idx = idx_1
dest_idx = idx_2
else:
# Only one atom, can switch with itself
return True
src = self.atoms[src_idx]
dest = self.atoms[dest_idx]
src_env, dest_env = [], []
for test in self.atoms:
src_env.append([test.type, vecdist3(src.pos, test.pos)])
dest_env.append([test.type, vecdist3(dest.pos, test.pos)])
# if the distances to the closest atoms, sorted by types,
# are not all within 0.01 A, then the environments are different
for src, dest in zip(sorted(src_env), sorted(dest_env)):
if src[0] != dest[0] or (src[1] - dest[1]) > 0.01:
return False
else:
return True
# Simple attribute-like calculations
@property
def types(self):
"""Ordered list of atom types."""
return [atom.type for atom in self.atoms]
@property
def weight(self):
"""Unit cell weight."""
return sum([atom.mass for atom in self.atoms])
@property
def molar_volume(self):
"""Molar volume at STP in dm3/mol"""
if hasattr(self, '_molar_volume'):
return self._molar_volume
else:
return 22.414
@property
def natoms(self):
"""Number of atoms in the guest."""
return len(self.atoms)
@property
def com(self):
"""Centre of mass of the guest molecule."""
com = [0.0, 0.0, 0.0]
for atom in self.atoms:
com = [com[0] + atom.pos[0] * atom.mass,
com[1] + atom.pos[1] * atom.mass,
com[2] + atom.pos[2] * atom.mass]
com = [x/self.weight for x in com]
return com
[docs]class Symmetry(object):
"""Apply symmetry operations to atomic coordinates."""
def __init__(self, text):
"""Read the operation from the argument."""
self.xyz = "".join(text.split()) # remove all spaces
rt_matrix = []
for idx, sub_xyz in enumerate(self.xyz.split(',')):
row = [0.0, 0.0, 0.0, 0.0]
# Find multipliers of x, y and z for rotation
for component in re.finditer(r'([+-]?)(\d*)([x-z]+)', sub_xyz):
# signed?
value = -1.0 if component.group(1) == '-' else 1.0
# Some scaling factor?
if component.group(2):
value *= int(component.group(2))
if component.group(3) == 'x':
row[0] += value
elif component.group(3) == 'y':
row[1] += value
elif component.group(3) == 'z':
row[2] += value
# Find the translation component any trailing numbers or fractions
for component in re.finditer(r"([+-])(\d+)/*(\d*)$", sub_xyz):
# signed?
value = -1.0 if component.group(1) == '-' else 1.0
value *= int(component.group(2))
if component.group(3):
value /= int(component.group(3))
row[3] += value
rt_matrix.append(row)
rt_matrix.append([0.0, 0.0, 0.0, 1.0])
self.rt_matrix = np.array(rt_matrix)
[docs] def trans_frac(self, pos, in_cell=True):
"""Apply symmetry operation to the supplied position."""
new_pos = dot(self.rt_matrix, [pos[0], pos[1], pos[2], 1.0])[:3]
#print(translated, also_new)
if in_cell:
# translate positions into cell; leave as numpy array
new_pos %= 1.0
return new_pos
[docs] def rotate(self, vector):
"""
Apply only the rotation part of the
symmetry operation to the vector.
"""
return dot(self.rt_matrix[:3, :3], vector)
def __repr__(self):
return "Symmetry('{}')".format(self.xyz)
[docs]def mk_repeat(cube_name='REPEAT_ESP.cube', symmetry=False):
"""Standard REPEAT input file."""
if symmetry:
symmetry_flag = 1
else:
symmetry_flag = 0
repeat_input = [
"Input ESP file name in cube format\n",
"%s\n" % cube_name,
"Fit molecular(0) or periodic(1:default) system?\n",
"1\n",
"van der Waals scaling factor (default = 1.0)\n",
"1.00000\n",
"Apply RESP penalties?, no(0:default), yes(1)\n",
"0\n",
"Read cutoff radius? no(0), yes(1:default)\n",
"1\n",
"If flag above=1 provide R_cutoff next (in Bohrs)\n",
"20.00000\n",
"Apply symmetry restrain? no(0:default), yes(1)\n",
"%i\n" % symmetry_flag,
"Use Goddard-type restrains? no(0:default), yes(1)\n",
"0\n",
"If flag above=1 then provide weight next\n",
"0.00000\n",
"Max distance cut-off scaling factor (1000.0 default)\n",
"2.0\n"]
filetemp = open('REPEAT_param.inp', 'w')
filetemp.writelines(repeat_input)
filetemp.close()
[docs]def mk_connectivity_ff(sym_tree):
"""Write connectivity.ff for input tree."""
out_tree = ["%i\n" % len(sym_tree)]
# Always sort to get consistent order
for idx in sorted(sym_tree):
out_tree.extend([
"# %s tree\n" % str(idx),
"%i\n" % len(sym_tree[idx]),
" ".join(["%i" % (i+1) for i in sym_tree[idx]]),
"\n"
])
filetemp = open('connectivity.ff', 'w')
filetemp.writelines(out_tree)
filetemp.close()
[docs]def incar_extend(incar, *args):
"""
Add the specified (key, value) pairs to the incar string list only if they
are not already included in it. Only does startswith() matching, so can be
fooled/broken.
"""
for key, value in args:
# assume we get (key, value) pairs
for line in incar:
if line.lstrip().startswith(key):
# Value already specified, skip it
debug("Not overwriting %s" % key)
continue
incar.append("%-7s = %s\n" % (key, value))
[docs]def mk_incar(options, esp_grid=None):
"""Basic vasp INCAR; use defaults as much as possible."""
# We need these options
job_name = options.get('job_name')
spin = options.getbool('spin')
optim_h = options.getbool('optim_h')
optim_all = options.getbool('optim_all')
optim_cell = options.getbool('optim_cell')
dispersion = options.getbool('dispersion')
# Make sure that we have a list of lines with line endings
incar = ["%s\n" % x for x in options.get('vasp_custom_incar').splitlines()]
if incar:
# Non blank starting INCAR
info("Custom INCAR parameters selected")
debug("Base INCAR looks like: %s" % "".join(incar))
incar_extend(incar,
("SYSTEM", job_name),
("ALGO", "Fast"),
("EDIFF", "1E-5"),
("EDIFFG", -0.02),
("POTIM", 0.4),
("LREAL", "Auto"),
("LVTOT", ".TRUE."),
("LVHAR", ".TRUE."),
("ISMEAR", 0),
("SIGMA", 0.05),
("NWRITE", 0))
if optim_cell:
# Positions will be fixed by selective dynamics
info("Cell vectors will be optimized")
incar_extend(incar,
("ENCUT", 520),
("IBRION", 2),
("NSW", 800),
("ISIF", 3))
elif optim_all or optim_h:
# Just move positions
incar_extend(incar,
("IBRION", 2),
("NSW", 600),
("ISIF", 2))
else:
# Single point energy
info("Single point calculation")
incar_extend(incar,
("IBRION", 0),
("NSW", 0),
("ISIF", 0))
if spin:
info("Spin polarised calculation")
incar_extend(incar, ("ISPIN", 2))
if dispersion:
info("Dispersion correction will be used")
incar_extend(incar, ("IVDW", 12)) # DFT-D3 with BJ damping
if esp_grid is not None:
info("Changing FFT grid to %ix%ix%i" % esp_grid)
incar_extend(incar,
("NGXF", esp_grid[0]),
("NGYF", esp_grid[1]),
("NGZF", esp_grid[2]))
#TODO(jlo): if nelect is not None:
# VASP recommends, for best performance:
# NPAR = 4 - approx SQRT( number of cores)
vasp_ncpu = options.getint('vasp_ncpu')
npar = vasp_ncpu # recommended up to 8 cpus
if vasp_ncpu > 8:
npar = 4*max(int((vasp_ncpu**0.5)/4.0), 1)
incar_extend(incar, ("NPAR", npar))
return incar
[docs]def mk_kpoints(kpoints):
"""Defaults to gamma point only, or specified number."""
if len(kpoints) != 3:
error("kpoints specified incorectly; should be (i, i, i)")
kpoints = [
"Auto\n",
"0\n",
"Gamma\n",
"%i %i %i\n" % tuple(kpoints),
"0 0 0\n"]
return kpoints
[docs]def mk_gcmc_control(temperature, pressures, options, guests, supercell=None):
"""Standard GCMC CONTROL file."""
control = [
"GCMC Run\n"
"temperature %f\n" % temperature,
"steps %i\n" % options.getint('mc_prod_steps'),
"equilibration %i\n" % options.getint('mc_eq_steps'),
"cutoff %f angstrom\n" % options.getfloat('mc_cutoff'),
"delr 1.0 angstrom\n",
"ewald precision 1d-6\n",
"numguests %i\n" % options.getint('mc_numguests_freq'),
"history %i\n" % options.getint('mc_history_freq')]
if options.getbool('mc_jobcontrol'):
control.append("jobcontrol\n")
else:
control.append("# jobcontrol\n")
# Guest stuff
if options.getbool('mc_probability_plot'):
# We just comment out the probabilty plot lines if unneeded
prob_on = ""
else:
prob_on = "# "
if len(guests) > 1:
gp_zip = zip(guests, pressures)
else:
gp_zip = zip(guests, [pressures])
guest_count = 0
for guest, press in gp_zip:
guest_count += 1
control.append("&guest %i\n" % guest_count)
control.append(" pressure (bar) %f\n" % press)
control.append("%s probability %i\n" %
(prob_on, len(guest.probability)))
for prob in guest.probability:
control.append("%s %i " % (prob_on, len(prob)) +
" ".join(["%i" % x for x in prob]) + "\n")
control.append("&end\n")
if options.getbool('fold') and supercell is not None:
control.append('\ngrid factors %i %i %i\n' % supercell)
elif supercell is not None:
control.append('\n# grid factors %i %i %i\n' % supercell)
resolution = options.getfloat('mc_probability_plot_spacing')
if resolution != FASTMC_DEFAULT_GRID_SPACING:
control.append('\ngrid spacing %f\n' % resolution)
else:
control.append('\n#grid spacing %f\n' % FASTMC_DEFAULT_GRID_SPACING)
control.append("\nfinish\n")
return control
[docs]def mk_dl_poly_control(options, dummy=False):
"""CONTROL file for binding site energy calculation."""
if dummy:
stats = 1
steps = 1
else:
stats = 1
steps = 100
control = [
"# minimisation\n",
"optim energy 1.0\n", # gives approx 0.01 kcal
"steps %i\n" % steps,
"timestep 0.001 ps\n",
"#ensemble nvt hoover 0.1\n",
"cutoff %f angstrom\n" % options.getfloat('mc_cutoff'),
"delr 1.0 angstrom\n",
"ewald precision 1d-6\n",
"job time 199990 seconds\n",
"close time 2000 seconds\n",
"stats %i\n" % stats,
"#traj 1,100,2\n"
"finish\n"]
return control
[docs]def mk_gromacs_mdp(cell, mode='bfgs', verbose=False):
"""
Generate an energy minimsation file for GROMACS, with cell based cutoff.
Use mode argument to select from:
'bfgs' standard l-bfgs position optimisation
'pcoupl' zero kelivn cell semi anisotropic relaxation
'sd' steppest descent minimisation
"""
# Use 0.45 as it allows for around 10% shrinkage of the cell
# 1/2 smallest of cell lengths or diagonal elements, in nm
cutoff = 0.45*min(cell.a, cell.b, cell.c,
cell.cell[0][0], cell.cell[1][1], cell.cell[2][2])/10.0
if mode == 'bfgs':
# bfgs needs the shift potentials, which need a transition radius
rlist = min(1.2, cutoff)
rcoulomb = rlist - 0.1
rvdw = rlist - 0.1
mdp = ["integrator = l-bfgs\n",
"vdwtype = shift\n",
"coulombtype = shift\n"]
elif mode == 'pcoupl':
# This changes volume but not angles as I don't know how well it shears
rlist = min(1.2, cutoff)
rcoulomb = rlist - 0.1
rvdw = rlist - 0.1
mdp = ["integrator = md\n",
"vdwtype = shift\n",
"coulombtype = shift\n",
"pcoupl = berendsen\n", # others too bouncy
"pcoupltype = anisotropic\n",
"ref_p = 0.0 0.0 0.0 0.0 0.0 0.0\n",
"compressibility = 5.0e-6 5.0e-6 5.0e-6 0.0 0.0 0.0\n",
"tcoupl = v-rescale\n",
"ref_t = 0.0\n", # zero kelvin
"tau_t = 0.1\n",
"nstcalcenergy = 1\n", # needed for stability
"tc-grps = RESI\n"] # residue as in the gro file
elif mode == 'sd':
rlist = min(1.2, cutoff)
rcoulomb = rlist
rvdw = rlist
mdp = ["integrator = cg\n",
"nstcgsteep = 50\n"] # frequency of sd steps in cg
else:
error('Bad optimisation mode, %s, selected for gromacs mdp')
return
# common items work all round (bfgs only needs a few steps and
# 2000 steps reduces the box size enough)
mdp.extend([
"nsteps = 2000\n",
"rlist = %f\n" % rlist,
"rcoulomb = %f\n" % rcoulomb,
"rvdw = %f\n" % rvdw,
"periodic_molecules = yes\n",
"pbc = xyz\n",
";\n",
"; Energy minimizing stuff\n",
";\n",
"emtol = 10.0\n", # convergence (10.0) [kJ mol-1 nm-1]
"emstep = 0.01\n", # (0.01) step size
])
if verbose:
mdp.extend(["nstxout = 1\n", # trajectory frequency
"nstenergy = 1\n"]) # energy frequency
else:
mdp.extend(["nstxout = 2000\n", # trajectory frequency
"nstenergy = 0\n"]) # energy frequency
return mdp
[docs]def parse_qeq_params(param_tuple):
"""Convert an options tuple to a dict of values."""
param_dict = {}
# Check for predefined parameter sets
if 'mepo' in param_tuple:
info("Using MEPO-QEq base charges")
from parameters import mepo_qeq
param_dict.update(mepo_qeq)
param_tuple = [x for x in param_tuple if x != 'mepo']
# group up into ((atom, electronegativity, 0.5*hardness), ... )
param_tuple = subgroup(param_tuple, 3)
for param_set in param_tuple:
try:
atom_type = int(param_set[0])
atom_type = ATOMIC_NUMBER[atom_type]
except ValueError:
# assume it is an element symbol instead
atom_type = param_set[0]
except IndexError:
# Typed atom, ignore for now
continue
try:
param_dict[atom_type] = (float(param_set[1]), float(param_set[2]))
except IndexError:
warning("Cannot read parameters for %s" % atom_type)
return param_dict
[docs]def mk_egulp_params(param_tuple):
"""Convert an options tuple to an EGULP parameters file filling."""
# group up into ((atom, electronegativity, 0.5*hardness), ... )
param_tuple = subgroup(param_tuple, 3)
# first line is the number of parametr sets
#TODO(tdaff): try adding some error checking
egulp_file = ["%s\n" % len(param_tuple)]
for paramset in param_tuple:
try:
# catch if it is an atomic number or element symbol
atomic_number = int(paramset[0])
except ValueError:
atomic_number = ATOMIC_NUMBER.index(paramset[0])
egulp_file.append("%-4i %f %f\n" % (atomic_number, float(paramset[1]),
float(paramset[2])))
return egulp_file
[docs]def mk_egulp_ini(options):
"""Create a default ini file for egulp to do qeq calculation."""
egulp_grid = int(options.getbool('egulp_grid'))
egulp_potential = int(options.getbool('egulp_potential'))
egulp_potential_difference = int(options.getbool('egulp_potential_difference'))
egulp_grid_parameters = options.get('egulp_grid_parameters')
egulp_ini = [
"build_grid %i\n" % egulp_grid,
"build_grid_from_scratch %s\n" % egulp_grid_parameters,
"save_grid %i grid.cube\n" % egulp_grid,
"calculate_pot_diff %i\n" % egulp_potential_difference,
"calcaulte_pot %i repeat.cube\n" % egulp_potential,
"skip_everything 0\n",
"point_charges_present 0\n",
"include_pceq 0\n",
"imethod 0\n"]
return egulp_ini
[docs]def validate_gulp_output(filename):
"""Check to see if gulp calculation has finished and return the energy."""
final_energy = None
final_gnorm = None
try:
gulp_output = open(filename)
except IOError:
warning("Gulp output not found; continuing anyway")
return (False, final_energy, final_gnorm)
finished_optimisation = False
for line in gulp_output:
if line.startswith(" Cycle:"):
line = line.split()
try:
final_energy = float(line[3])
final_gnorm = float(line[5])
except ValueError:
final_energy = 999999.9
final_gnorm = 999999.9
elif "Optimisation achieved" in line:
# Great
finished_optimisation = True
break
elif "Too many failed attempts to optimise" in line:
warning("Gulp reports failed optimisation; check gnorm!")
finished_optimisation = True
break
elif "no lower point can be found" in line:
warning("Gulp reports failed optimisation; check gnorm!")
finished_optimisation = True
break
if not finished_optimisation:
warning("Gulp optimisation did not finish; check output!")
if final_gnorm > 0.1:
warning("Gulp optimisation has gnorm > 0.1; check output!")
finished_optimisation = False
debug("Gulp .out: %s" % [finished_optimisation, final_energy, final_gnorm])
return (finished_optimisation, final_energy, final_gnorm)
[docs]def fix_vasp_wrapped_types(filename='CONTCAR'):
"""
Output files from VASP with a long list of types sometimes get wrapped.
Check if lines have been wrapped and put them all on a single line.
Only works with VASP 5 outputs that include type names. Return True if
the file has been modified.
:param filename: name of the file to fix
:return: bool
"""
# Keep track of how long each line to be replaced is
# as well as the contents
types = []
types_length = 0
counts = []
counts_length = 0
# If we have only two lines then leave file as-is
wrapped_lines = 0
f = open(filename, 'r+b')
mm = mmap.mmap(f.fileno(), 0)
mm.readline() # Title
mm.readline() # Scale
mm.readline() # a
mm.readline() # b
mm.readline() # c
start_position = mm.tell()
# List of element symbols
type_line = mm.readline()
while type_line.split()[0].isalpha():
types.extend(type_line.split())
types_length += len(type_line)
type_line = mm.readline()
wrapped_lines += 1
# List of element counts
while type_line.split()[0].isdigit():
counts.extend(type_line.split())
counts_length += len(type_line)
type_line = mm.readline()
wrapped_lines += 1
if wrapped_lines == 2:
return
# Can overwrite a memmapped file in-place to avoid re-writing whole file
otypes = "".join(" %s" % x for x in types)
otypes = "%-*s" % (types_length - 1, otypes) + '\n'
ocounts = "".join(" %s" % x for x in counts)
ocounts = "%-*s" % (counts_length - 1, ocounts) + '\n'
linebreak_position = start_position + types_length
mm[start_position:linebreak_position] = otypes
mm[linebreak_position:linebreak_position + counts_length] = ocounts
mm.close()
f.close()
debug("Unwrapped lines in file %s" % filename)
return True
[docs]def unique(in_list, key=None):
"""Unique values in list ordered by first occurance"""
uniq = []
if key is not None:
keys = []
for item in in_list:
item_key = key(item)
if item_key not in keys:
uniq.append(item)
keys.append(item_key)
else:
for item in in_list:
if item not in uniq:
uniq.append(item)
return uniq
[docs]def lorentz_berthelot(left, right):
"""Lorentz-Berthelot mixing rules for (sigma, epsilon) tuples."""
sigma = (left[0] + right[0]) / 2.0
epsilon = (left[1] * right[1])**0.5
if epsilon == 0:
sigma = 0
return sigma, epsilon
# General utility functions
[docs]def mkdirs(directory):
"""Create a directory if it does not exist."""
if not path.exists(directory):
os.makedirs(directory)
[docs]def terminate(exit_code=0):
"""Exit and announce if faps is terminating normally (default)."""
if exit_code == 0:
info("Faps terminated normally")
raise SystemExit
else:
warning("Abnormal termination of faps; exit code %i" % exit_code)
raise SystemExit(exit_code)
[docs]def move_and_overwrite(src, dest):
"""Move src to dest and overwrite if it is an existing file."""
# As src and dest can be files or directories, do some checks.
if path.exists(dest):
if path.isdir(dest):
dest_full = path.join(dest, path.basename(src))
if path.exists(dest_full):
if path.isfile(dest_full):
os.remove(dest_full)
shutil.move(src, dest)
else:
raise OSError("Directory %s already exists" % dest_full)
else:
shutil.move(src, dest)
elif path.isfile(dest):
os.remove(dest)
shutil.move(src, dest)
else:
raise OSError("%s is not a folder or file" % dest)
else:
shutil.move(src, dest)
[docs]def try_symlink(src, dest):
"""Delete an existing dest file, symlink a new one if possible."""
if path.lexists(dest):
os.remove(dest)
# Catch cases like Windows which can't symlink or unsupported SAMBA
try:
os.symlink(src, dest)
except (AttributeError, OSError):
shutil.copy(src, dest)
[docs]def compressed_open(filename):
"""Return file objects for either compressed and uncompressed files"""
filenames = glob(filename) + glob(filename+".gz") + glob(filename+".bz2")
try:
filename = filenames[0]
except IndexError:
raise IOError("File not found: %s" % filename)
if filename[-4:] == ".bz2":
return bz2.BZ2File(filename)
elif filename[-3:] == ".gz":
return gzip.open(filename)
else:
return open(filename, "r")
[docs]def sys_argv_strip(argument):
"""Remove an argument from the sys.argv if it is there."""
while argument in sys.argv:
sys.argv.remove(argument)
return 0
[docs]def same_guests(base, other):
"""Test if the guests are the same index and order."""
return [item.ident for item in base] == [item.ident for item in other]
[docs]def ufloat(text):
"""Convert string to float, ignoring the uncertainty part."""
return float(re.sub('\(.*\)', '', text))
[docs]def gfloat(text):
"""Parse a gulp output float, where it could also be a rational fraction."""
# TODO(tdaff): use Fraction in 2.7+?
if "/" in text:
text = text.split('/')
return float(text[0])/float(text[1])
else:
return float(text)
[docs]def try_int(text, default=0):
"""Try to parse an integer but return a default if it fails."""
try:
return int(text)
except ValueError:
return default
[docs]def prod(seq):
"""Calculate the product of all members of a sequence."""
# numpy.prod will silently overflow 32 bit integer values
# so we can use python bignums natively
product = 1
for item in seq:
product *= item
return product
[docs]def dot3(vec1, vec2):
"""Calculate dot product for two 3d vectors."""
return vec1[0]*vec2[0] + vec1[1]*vec2[1] + vec1[2]*vec2[2]
[docs]def min_distance(first_atom, second_atom, cell=None):
"""Helper to find mimimum image criterion distance."""
if cell is None:
cell = first_atom._parent.cell.cell
return min_dist(first_atom.pos,
first_atom.fractional,
second_atom.pos,
second_atom.fractional,
cell)
[docs]def min_dist(c_coa, f_coa, c_cob, f_cob_in, box):
"""Calculate the closest distance assuming fractional, in-cell coords."""
f_cob = f_cob_in[:]
fdx = f_coa[0] - f_cob[0]
if fdx < -0.5:
f_cob[0] -= 1
elif fdx > 0.5:
f_cob[0] += 1
fdy = f_coa[1] - f_cob[1]
if fdy < -0.5:
f_cob[1] -= 1
elif fdy > 0.5:
f_cob[1] += 1
fdz = f_coa[2] - f_cob[2]
if fdz < -0.5:
f_cob[2] -= 1
elif fdz > 0.5:
f_cob[2] += 1
if f_cob == f_cob_in:
# if nothing has changed, use initial values
return vecdist3(c_coa, c_cob)
else:
new_b = [f_cob[0]*box[0][0] + f_cob[1]*box[1][0] + f_cob[2]*box[2][0],
f_cob[0]*box[0][1] + f_cob[1]*box[1][1] + f_cob[2]*box[2][1],
f_cob[0]*box[0][2] + f_cob[1]*box[1][2] + f_cob[2]*box[2][2]]
return vecdist3(c_coa, new_b)
[docs]def cif_bond_dist(first_atom, second_atom, cell):
"""
Calculate the distance to the minimum image and which boundaries have been
crossed for _geom_bond_site_symmetry_2.
:param first_atom: Atom within the cell boundary
:param second_atom: Atom to find the distance to
:param cell: Cell object
:return: distance, (dx, dy, dx)
"""
c_coa = first_atom.ipos(cell.cell, cell.inverse)
f_coa = first_atom.ifpos(cell.inverse)
c_cob = second_atom.ipos(cell.cell, cell.inverse)
f_cob_in = second_atom.ifpos(cell.inverse)
box = cell.cell
cell_shift = [0, 0, 0]
f_cob = f_cob_in[:]
fdx = f_coa[0] - f_cob[0]
if fdx < -0.5:
f_cob[0] -= 1
cell_shift[0] = -1
elif fdx > 0.5:
f_cob[0] += 1
cell_shift[0] = 1
fdy = f_coa[1] - f_cob[1]
if fdy < -0.5:
f_cob[1] -= 1
cell_shift[1] = -1
elif fdy > 0.5:
f_cob[1] += 1
cell_shift[1] = 1
fdz = f_coa[2] - f_cob[2]
if fdz < -0.5:
f_cob[2] -= 1
cell_shift[2] = -1
elif fdz > 0.5:
f_cob[2] += 1
cell_shift[2] = 1
if f_cob == f_cob_in:
# if nothing has changed, use initial values
return vecdist3(c_coa, c_cob), cell_shift
else:
new_b = [f_cob[0]*box[0][0] + f_cob[1]*box[1][0] + f_cob[2]*box[2][0],
f_cob[0]*box[0][1] + f_cob[1]*box[1][1] + f_cob[2]*box[2][1],
f_cob[0]*box[0][2] + f_cob[1]*box[1][2] + f_cob[2]*box[2][2]]
return vecdist3(c_coa, new_b), cell_shift
[docs]def vecdist3(coord1, coord2):
"""Calculate vector between two 3d points."""
#return [i - j for i, j in zip(coord1, coord2)]
# Twice as fast for fixed 3d vectors
vec = [coord2[0] - coord1[0],
coord2[1] - coord1[1],
coord2[2] - coord1[2]]
return (vec[0]*vec[0] + vec[1]*vec[1] + vec[2]*vec[2])**0.5
[docs]def angle_between(left, middle, right, cell=None):
"""
Calculate the angle between the atoms middle->left and middle->right.
If a Cell is specified, this will use the minimum image criterion to find
the angle with the closest point.
Parameters
----------
left : Atom
one of the atoms
middle : Atom
vertex atom
left : Atom
second atom
cell : Cell or None
if a Cell object is passes the minimum image criterion will be used
to calculate the angle
Returns
-------
angle: float
Angle between the atoms in degrees.
"""
if cell is None:
vleft = [i - j for i, j in zip(left.pos, middle.pos)]
vright = [i - j for i, j in zip(right.pos, middle.pos)]
else:
# Minimum image criterion
box = cell.cell
vleft = [i - j for i, j in
zip(minimum_image(middle, left, box), middle.pos)]
vright = [i - j for i, j in
zip(minimum_image(middle, right, box), middle.pos)]
angle = arccos(dot(vleft, vright)/(norm(vleft)*norm(vright)))/DEG2RAD
if angle != angle:
# angle is a nan; sometimes happened with older numpy
angle = 180
return angle
[docs]def dihedral(atom_a, atom_b, atom_c, atom_d, cell=None):
"""
Calculate the dihedral angle along the path a, b, c, d.
If a Cell is specified, this will use the minimum image criterion to find
the atoms that are closest for the vectors.
Parameters
----------
atom_a : Atom
the first atom
atom_b : Atom
one of the atoms
atom_c : Atom
one of the atoms
atom_d : Atom
one of the atoms
cell : Cell or None
if a Cell object is passes the minimum image criterion will be used
to calculate the dihedral angle
Returns
-------
angle: float
Angle between the atoms in degrees.
"""
if cell is None:
vec_a_b = [i - j for i, j in zip(atom_a.pos, atom_b.pos)]
vec_b_c = [i - j for i, j in zip(atom_b.pos, atom_c.pos)]
vec_c_d = [i - j for i, j in zip(atom_c.pos, atom_d.pos)]
else:
# Minimum image criterion
box = cell.cell
vec_a_b = [i - j for i, j in
zip(minimum_image(atom_b, atom_a, box), atom_b.pos)]
vec_b_c = [i - j for i, j in
zip(minimum_image(atom_c, atom_b, box), atom_c.pos)]
vec_c_d = [i - j for i, j in
zip(minimum_image(atom_d, atom_c, box), atom_d.pos)]
norm_1 = cross(vec_a_b, vec_b_c)
norm_2 = cross(vec_b_c, vec_c_d)
return -arctan2(dot(vec_b_c, cross(norm_2, norm_1)),
norm(vec_b_c)*dot(norm_1, norm_2))/DEG2RAD
[docs]def minimum_image(atom1, atom2, box):
"""Return the minimum image coordinates of atom2 with respect to atom1."""
f_coa = atom1.fractional[:]
f_cob = atom2.fractional[:]
fdx = f_coa[0] - f_cob[0]
if fdx < -0.5:
f_cob[0] -= 1
elif fdx > 0.5:
f_cob[0] += 1
fdy = f_coa[1] - f_cob[1]
if fdy < -0.5:
f_cob[1] -= 1
elif fdy > 0.5:
f_cob[1] += 1
fdz = f_coa[2] - f_cob[2]
if fdz < -0.5:
f_cob[2] -= 1
elif fdz > 0.5:
f_cob[2] += 1
new_b = [f_cob[0]*box[0][0] + f_cob[1]*box[1][0] + f_cob[2]*box[2][0],
f_cob[0]*box[0][1] + f_cob[1]*box[1][1] + f_cob[2]*box[2][1],
f_cob[0]*box[0][2] + f_cob[1]*box[1][2] + f_cob[2]*box[2][2]]
return new_b
[docs]def matrix_rotate(source, target):
"""Create a rotation matrix that will rotate source on to target."""
# Normalise so there is no scaling in the array
source = np.asarray(source)/norm(source)
target = np.asarray(target)/norm(target)
v = cross(source, target)
vlen = dot(v, v)
if vlen == 0.0:
# already aligned, no rotation needed
return identity(3)
c = dot(source, target)
h = (1 - c)/vlen
return array([[c + h*v[0]*v[0], h*v[0]*v[1] - v[2], h*v[0]*v[2] + v[1]],
[h*v[0]*v[1] + v[2], c + h*v[1]*v[1], h*v[1]*v[2] - v[0]],
[h*v[0]*v[2] - v[1], h*v[1]*v[2] + v[0], c + h*v[2]*v[2]]])
[docs]def remove_files(files, directory='.'):
"""
Delete any of the files if they exist using standard globbing,
remove empty directories, or ignore silently if not found.
"""
del_list = []
for file_name in files:
del_list.extend(glob(path.join(directory, file_name)))
for del_name in del_list:
debug("deleting %s" % del_name)
try:
os.remove(del_name)
except OSError:
try:
os.rmdir(del_name)
except OSError:
debug("Directory not empty: %s?" % del_name)
[docs]def compress_files(files, directory='.'):
"""Gzip any big files to keep."""
zip_list = []
for file_name in files:
zip_list.extend(glob(path.join(directory, file_name)))
for zip_name in zip_list:
debug("compressing %s" % zip_name)
gzip_command = ['gzip', '-f', zip_name]
subprocess.call(gzip_command)
#TODO(tdaff): internal gzip
[docs]def strip_blanks(lines):
"""Strip lines and remove blank lines."""
return [line.strip() for line in lines if line.strip() != '']
[docs]def subgroup(iterable, width, itype=None):
"""Split an iterable into nested sub-itypes of width members."""
# Return the same type as iterable
if itype is None:
if isinstance(iterable, list):
itype = list
else:
itype = tuple
# Will leave short groups if not enough members
return itype([itype(iterable[x:x+width])
for x in range(0, len(iterable), width)])
[docs]def state_points(temperatures, pressures, individual, nguests):
"""Group temperatures and pressures and append given state point tuples."""
# No error checking done, assume know what they are doing
for index in range(0, len(individual), nguests+1):
yield (individual[index], tuple(individual[index+1:index+nguests+1]))
for temp in temperatures:
for pressure in subgroup(pressures, nguests):
yield (temp, pressure)
[docs]def name_from_types(sites, guest):
"""Generate a string that gives the atoms represented in sites."""
stypes = []
for site_ident in sites:
if site_ident is 0:
stypes.append('COM')
else:
stypes.append(guest.atoms[site_ident-1].element)
stypes = unique(stypes)
if len(stypes) > 1:
site_name = "-".join(stypes)
else:
site_name = stypes[0]
return site_name
[docs]def count_ordered_types(atoms):
"""Generate a list of atom types and their counts for POSCAR file."""
types = [atoms[0].type]
counts = [0]
for atom in atoms:
if atom.type == types[-1]:
counts[-1] += 1
else:
types.append(atom.type)
counts.append(1)
return types, counts
[docs]def other_bond_index(bond, index):
"""Return the atom index for the other atom in a bond."""
if bond[0] == index:
return bond[1]
elif bond[1] == index:
return bond[0]
else:
raise ValueError("Index %s not found in bond %s" % (index, bond))
[docs]def welcome():
"""Print any important messages."""
print(LOGO)
print(("faps %s" % __version__).rjust(79))
[docs]def main():
"""Do a standalone calculation when run as a script."""
main_options = Options()
info("Starting faps version %s" % __version__)
# try to unpickle the job or
# fall back to starting a new simulation
job_name = main_options.get('job_name')
if " " in job_name:
warning("Spaces in job names may break submission on some systems.")
niss_name = "%s.niss" % job_name
tar_name = "%s.tar" % job_name
if path.exists(niss_name):
info("Existing simulation found: %s; loading..." % niss_name)
load_niss = open(niss_name, 'rb')
my_simulation = pickle.load(load_niss)
load_niss.close()
my_simulation.re_init(main_options)
else:
if not main_options.getbool('quiet'):
welcome()
info("Starting a new simulation...")
my_simulation = PyNiss(main_options)
# Should we extract a previous simulation?
if main_options.getbool('tar_extract_before') and path.exists(tar_name):
info("Extracting files from %s" % tar_name)
faps_tar = tarfile.open(tar_name, 'r')
faps_tar.extractall()
faps_tar.close()
debug("Deleting extracted file %s" % tar_name)
os.remove(tar_name)
# run requested jobs
my_simulation.job_dispatcher()
# Should we bundle the files after use?
if main_options.getbool('tar_after'):
info("Bundling calculation into %s" % tar_name)
# We don't overwrite an existing tar
faps_tar = tarfile.open(tar_name, 'a')
# Similar file names with _underscores_ might match each other so
# cannot just glob for them
for suffix in FOLDER_SUFFIXES:
directory = "faps_%s_%s/" % (job_name, suffix)
if path.isdir(directory):
debug("Adding directory %s" % directory)
faps_tar.add(directory)
shutil.rmtree(directory)
faps_tar.close()
my_simulation.dump_state()
terminate(0)
if __name__ == '__main__':
main()