# Copyright (c) Materials Virtual Lab.
# Distributed under the terms of the BSD License.
"""RDF implementation."""
from __future__ import annotations
from collections import Counter
from math import ceil
from multiprocessing import cpu_count
import numpy as np
from joblib import Parallel, delayed
from scipy.ndimage import gaussian_filter1d
from scipy.signal import find_peaks
from scipy.stats import norm
from pymatgen.core import Structure
from pymatgen.util.plotting import pretty_plot
[docs]
class RadialDistributionFunction:
"""
Calculate the average radial distribution function for a given set of
structures.
"""
def __init__(
self,
structures: list,
indices: list,
reference_indices: list,
ngrid: int = 101,
rmax: float = 10.0,
cell_range: int = 1,
sigma: float = 0.1,
):
"""
Args:
structures ([Structure]): list of structure
objects with the same composition. Allow for ensemble averaging.
ngrid (int): Number of radial grid points.
rmax (float): Maximum of radial grid (the minimum is always zero)
in Angstrom.
cell_range (int): Range of translational vector elements associated
with supercell. Default is 1, i.e. including the adjacent image
cells along all three directions.
sigma (float): Smearing of a Gaussian function.
indices ([int]): A list of atom index of interest.
reference_indices ([int]): set this option along with 'indices'
parameter to compute radial distribution function.
"""
if ngrid < 2:
raise ValueError("ngrid should be greater than 1!")
if sigma <= 0:
raise ValueError("sigma should be a positive number!")
if len(indices) < 1:
raise ValueError("Given species are not in the structure!")
lattices, rhos, fcoords_list, ref_fcoords_list = [], [], [], []
dr = rmax / (ngrid - 1)
interval = np.linspace(0.0, rmax, ngrid)
rdf = np.zeros((ngrid), dtype=np.double)
raw_rdf = np.zeros((ngrid), dtype=np.double)
dns = Counter() # type: ignore
# Generate the translational vectors
r = np.arange(-cell_range, cell_range + 1)
arange = r[:, None] * np.array([1, 0, 0])[None, :]
brange = r[:, None] * np.array([0, 1, 0])[None, :]
crange = r[:, None] * np.array([0, 0, 1])[None, :]
images = arange[:, None, None] + brange[None, :, None] + crange[None, None, :]
images = images.reshape((len(r) ** 3, 3))
# Find the zero image vector
zd = np.sum(images**2, axis=1)
indx0 = np.argmin(zd)
for s in structures:
latt = s.lattice
lattices.append(latt)
rhos.append(float(len(indices)) / latt.volume)
all_fcoords = np.array(s.frac_coords)
fcoords_list.append(all_fcoords[indices, :])
ref_fcoords_list.append(all_fcoords[reference_indices, :])
rho = sum(rhos) / len(rhos) # The average density
self.rhos = rhos
self.rho = rho # This is the average density
for fcoords, ref_fcoords, latt in zip(fcoords_list, ref_fcoords_list, lattices):
dcf = fcoords[:, None, None, :] + images[None, None, :, :] - ref_fcoords[None, :, None, :]
dcc = latt.get_cartesian_coords(dcf)
d2 = np.sum(dcc**2, axis=3)
dists = [
d2[u, v, j] ** 0.5
for u in range(len(indices))
for v in range(len(reference_indices))
for j in range(len(r) ** 3)
if indices[u] != reference_indices[v] or j != indx0
]
r_indices = [int(dist / dr) for dist in filter(lambda e: e < rmax, dists)]
dns.update(r_indices)
for indx, dn in dns.most_common(ngrid):
# Volume of the thin shell
ff = 4.0 / 3.0 * np.pi * (interval[indx + 1] ** 3 - interval[indx] ** 3)
# print(norm.pdf(interval, interval[indx], sigma) * dn /
# float(len(reference_indices)) / ff / rho / len(
# fcoords_list) * dr)
rdf[:] += (
norm.pdf(interval, interval[indx], sigma)
* dn
/ float(len(reference_indices))
/ ff
/ rho
/ len(fcoords_list)
* dr
)
# additional dr factor renormalises overlapping gaussians.
raw_rdf[indx] += dn / float(len(reference_indices)) / ff / rho / len(fcoords_list)
self.structures = structures
self.cell_range = cell_range
self.rmax = rmax
self.ngrid = ngrid
self.species = {structures[0][i].species_string for i in indices}
self.reference_species = {structures[0][i].species_string for i in reference_indices}
self.indices = indices
self.reference_indices = reference_indices
self.dr = dr
self.rdf = rdf
self.raw_rdf = raw_rdf
self.interval = interval
# Finding peak based on smeared RDF
self.peak_indices = find_peaks(rdf)[0]
self.peak_r = [self.interval[i] for i in self.peak_indices]
self.peak_rdf = [self.rdf[i] for i in self.peak_indices]
[docs]
@classmethod
def from_species(
cls,
structures: list,
ngrid: int = 101,
rmax: float = 10.0,
cell_range: int = 1,
sigma: float = 0.1,
species: tuple | list = ("Li", "Na"),
reference_species: tuple | list | None = None,
):
"""
Initialize using species.
Args:
structures (list of pmg_structure objects): list of structure
objects with the same composition. Allow for ensemble averaging.
ngrid (int): Number of radial grid points.
rmax (float): Maximum of radial grid (the minimum is always zero).
cell_range (int): Range of translational vector elements associated
with supercell. Default is 1, i.e. including the adjacent image
cells along all three directions.
sigma (float): Smearing of a Gaussian function.
species (list[string]): A list of specie symbols of interest.
reference_species (list[string]): set this option along with
'species' parameter to compute radial distribution function.
eg: species=["H"], reference_species=["O"] to compute
O-H pair distribution in a water MD simulation.
"""
indices = [j for j, site in enumerate(structures[0]) if site.specie.symbol in species]
if reference_species:
reference_indices = [j for j, site in enumerate(structures[0]) if site.specie.symbol in reference_species]
if len(reference_indices) < 1:
raise ValueError("Given reference species are not in the structure!")
else:
reference_indices = indices
return cls(
structures=structures,
ngrid=ngrid,
rmax=rmax,
cell_range=cell_range,
sigma=sigma,
indices=indices,
reference_indices=reference_indices,
)
@property
def coordination_number(self):
"""
returns running coordination number.
Returns:
numpy array
"""
# Note: The average density from all input structures is used here.
intervals = np.append(self.interval, self.interval[-1] + self.dr)
return np.cumsum(self.raw_rdf * self.rho * 4.0 / 3.0 * np.pi * (intervals[1:] ** 3 - intervals[:-1] ** 3))
[docs]
def get_rdf_plot(
self,
label: str | None = None,
xlim: tuple = (0.0, 8.0),
ylim: tuple = (-0.005, 3.0),
loc_peak: bool = False,
):
"""
Plot the average RDF function.
Args:
label (str): The legend label.
xlim (list): Set the x limits of the current axes.
ylim (list): Set the y limits of the current axes.
loc_peak (bool): Label peaks if True.
"""
if label is None:
symbol_list = [e.symbol for e in self.structures[0].composition]
symbol_list = [symbol for symbol in symbol_list if symbol in self.species]
label = symbol_list[0] if len(symbol_list) == 1 else "-".join(symbol_list)
ax = pretty_plot(12, 8)
ax.plot(self.interval, self.rdf, label=label, linewidth=4.0, zorder=1)
if loc_peak:
ax.scatter(
self.peak_r,
self.peak_rdf,
marker="P",
s=240,
c="k",
linewidths=0.1,
alpha=0.7,
zorder=2,
label="Peaks",
)
ax.set_xlabel("$r$ ($\\rm\\AA$)")
ax.set_ylabel("$g(r)$")
ax.legend(loc="upper right", fontsize=36)
ax.set_xlim(xlim[0], xlim[1])
ax.set_ylim(ylim[0], ylim[1])
return ax
[docs]
def export_rdf(self, filename: str):
"""
Output RDF data to a csv file.
Args:
filename (str): Filename. Supported formats are csv and dat. If
the extension is csv, a csv file is written. Otherwise,
a dat format is assumed.
"""
fmt = "csv" if filename.lower().endswith(".csv") else "dat"
delimiter = ", " if fmt == "csv" else " "
with open(filename, "w") as f:
if fmt == "dat":
f.write("# ")
f.write(delimiter.join(["r", "g(r)"]))
f.write("\n")
for r, gr in zip(self.interval, self.rdf):
f.write(delimiter.join([str(v) for v in [r, gr]]))
f.write("\n")
[docs]
class RadialDistributionFunctionFast:
"""Fast radial distribution analysis."""
def __init__(
self,
structures: Structure | list[Structure],
rmin: float = 0.0,
rmax: float = 10.0,
ngrid: float = 101,
sigma: float = 0.0,
n_jobs=None,
):
"""
This method calculates rdf on `np.linspace(rmin, rmax, ngrid)` points.
Args:
structures (list of pymatgen Structures): structures to compute RDF
rmin (float): minimal radius
rmax (float): maximal radius
ngrid (int): number of grid points, defaults to 101
sigma (float): smooth parameter
n_jobs (int): number of CPUs in processing
"""
if n_jobs is None:
n_jobs = 1
if n_jobs < 0:
n_jobs = cpu_count()
self.n_jobs = n_jobs
if isinstance(structures, Structure):
structures = [structures]
self.structures = structures
# Number of atoms in all structures should be the same
assert len({len(i) for i in self.structures}) == 1
elements = [[i.specie for i in j.sites] for j in self.structures]
unique_elements_on_sites = [len(set(i)) == 1 for i in list(zip(*elements))]
# For the same site index, all structures should have the same element there
if not all(unique_elements_on_sites):
raise RuntimeError("Elements are not the same at least for one site")
self.rmin = rmin
self.rmax = rmax
self.ngrid = ngrid
self.dr = (self.rmax - self.rmin) / (self.ngrid - 1) # end points are on grid
self.r = np.linspace(self.rmin, self.rmax, self.ngrid) # type: ignore
max_r = self.rmax + self.dr / 2.0 # add a small shell to improve robustness
if self.n_jobs > 1:
self.neighbor_lists = Parallel(n_jobs=self.n_jobs)(
delayed(_get_neighbor_list)(s, max_r) for s in self.structures
)
else:
self.neighbor_lists = [i.get_neighbor_list(max_r) for i in self.structures]
# each neighbor list is a tuple of
# center_indices, neighbor_indices, image_vectors, distances
(
self.center_indices,
self.neighbor_indices,
self.image_vectors,
self.distances,
) = list(zip(*self.neighbor_lists))
elements = np.array([str(i.specie) for i in structures[0]]) # type: ignore
self.center_elements = [elements[i] for i in self.center_indices]
self.neighbor_elements = [elements[i] for i in self.neighbor_indices]
self.density = [{}] * len(self.structures) # type: list[dict]
self.natoms = [i.composition.to_data_dict["unit_cell_composition"] for i in self.structures]
for s_index, natoms in enumerate(self.natoms):
for i, j in natoms.items():
self.density[s_index][i] = j / self.structures[s_index].volume
self.volumes = 4.0 * np.pi * self.r**2 * self.dr
self.volumes[self.volumes < 1e-8] = 1e8 # avoid divide by zero
self.n_structures = len(self.structures)
self.sigma = ceil(sigma / self.dr)
def _dist_to_counts(self, d):
"""
Convert a distance array for counts in the bin.
Args:
d: (1D np.array)
Returns:
1D array of counts in the bins centered on self.r
"""
counts = np.zeros((self.ngrid,))
indices = np.array(np.floor((d - self.rmin + 0.5 * self.dr) / self.dr), dtype=int)
unique, val_counts = np.unique(indices, return_counts=True)
counts[unique] = val_counts
return counts
[docs]
def get_rdf(
self,
ref_species: str | list[str],
species: str | list[str],
is_average=True,
):
"""
Args:
ref_species (list of species or just single specie str): the reference species.
The rdfs are calculated with these species at the center
species (list of species or just single specie str): the species that we are interested in.
The rdfs are calculated on these species.
is_average (bool): whether to take the average over
all structures.
Returns:
(x, rdf) x is the radial points, and rdf is the rdf value.
"""
if self.n_jobs > 1:
all_rdfs = Parallel(n_jobs=self.n_jobs)(
self.get_one_rdf(ref_species, species, i) for i in range(self.n_structures)
)
all_rdfs = [i[1] for i in all_rdfs]
else:
all_rdfs = [self.get_one_rdf(ref_species, species, i)[1] for i in range(self.n_structures)]
if is_average:
all_rdfs = np.mean(all_rdfs, axis=0)
return self.r, all_rdfs
[docs]
def get_one_rdf(
self,
ref_species: str | list[str],
species: str | list[str],
index=0,
):
"""
Get the RDF for one structure, indicated by the index of the structure
in all structures.
Args:
ref_species (list of species or just single specie str): the reference species.
The rdfs are calculated with these species at the center
species (list of species or just single specie str): the species that we are interested in.
The rdfs are calculated on these species.
index (int): structure index in the list
Returns:
(x, rdf) x is the radial points, and rdf is the rdf value.
"""
if isinstance(ref_species, str):
ref_species = [ref_species]
if isinstance(species, str):
species = [species]
indices = (
(np.isin(self.center_elements[index], ref_species))
& (np.isin(self.neighbor_elements[index], species))
& (self.distances[index] >= self.rmin - self.dr / 2.0)
& (self.distances[index] <= self.rmax + self.dr / 2.0)
& (self.distances[index] > 1e-8)
)
density = sum(self.density[index][i] for i in species)
natoms = sum(self.natoms[index][i] for i in ref_species)
distances = self.distances[index][indices]
counts = self._dist_to_counts(distances)
rdf_temp = counts / density / self.volumes / natoms
if self.sigma > 1e-8:
rdf_temp = gaussian_filter1d(rdf_temp, self.sigma)
return self.r, rdf_temp
[docs]
def get_coordination_number(self, ref_species, species, is_average=True):
"""
returns running coordination number.
Args:
ref_species (list of species or just single specie str): the reference species.
The rdfs are calculated with these species at the center
species (list of species or just single specie str): the species that we are interested in.
The rdfs are calculated on these species.
is_average (bool): whether to take structural average
Returns:
numpy array
"""
# Note: The average density from all input structures is used here.
all_rdf = self.get_rdf(ref_species, species, is_average=False)[1]
if isinstance(species, str):
species = [species]
density = [sum(i[j] for j in species) for i in self.density]
cn = [np.cumsum(rdf * density[i] * 4.0 * np.pi * self.r**2 * self.dr) for i, rdf in enumerate(all_rdf)]
if is_average:
cn = np.mean(cn, axis=0)
return self.r, cn
def _get_neighbor_list(structure, r) -> tuple:
"""
Thin wrapper to enable parallel calculations.
Args:
structure (pymatgen Structure): pymatgen structure
r (float): cutoff radius
Returns:
tuple of neighbor list
"""
return structure.get_neighbor_list(r)