"""A collection of miscellaneous functions used for Cluster Expansion."""
from collections.abc import Iterable
from itertools import chain, combinations, filterfalse, permutations, product
import logging
from pathlib import Path
import re
from typing import Dict, List, Optional, Sequence, Set, Tuple, Union
from typing import Iterable as tIterable
import ase
import ase.build.supercells as ase_sc
from ase.db import connect
from ase.db.core import parse_selection
import numpy as np
from numpy.random import sample, shuffle
from packaging.version import Version, parse
from scipy.optimize import linprog
from scipy.spatial import cKDTree as KDTree
from typing_extensions import Protocol
ASE_VERSION = parse(ase.__version__)
logger = logging.getLogger(__name__)
class ApproxEqualityList:
"""
Wrapper around a list which implements a new comparison operator. If two
items in the list is equal within a given tolerance, the items are
considered to be equal.
Parameters:
array: list
List that is wrapped
tol: float
Toleracnce for comparison check
"""
def __init__(self, array, tol=1e-5):
self.array = array
self.tol = tol
def __lt__(self, other):
for x, y in zip(self.array, other.array):
if x - y < -self.tol:
return True
if x - y > self.tol:
return False
return False
def index_by_position(atoms: ase.Atoms) -> List[int]:
"""Set atomic indices by its position."""
# add zero to avoid negative zeros
tags = atoms.get_positions() + 0
tags = tags.tolist()
tags = [ApproxEqualityList(x) for x in tags]
deco = sorted([(tag, i) for i, tag in enumerate(tags)])
indices = [i for tag, i in deco]
return indices
def sort_by_position(atoms: ase.Atoms) -> ase.Atoms:
"""Sort atoms by its position."""
# Return a new Atoms object with sorted atomic order.
# The default is to order according to chemical symbols,
# but if *tags* is not None, it will be used instead.
# A stable sorting algorithm is used.
indices = index_by_position(atoms)
return atoms[indices]
def wrap_and_sort_by_position(atoms: ase.Atoms, copy: bool = True) -> ase.Atoms:
"""Wrap and sort atoms by their positions.
Args:
atoms (ase.Atoms): Atoms object to be wrapped and sorted
copy (bool, optional): If True, a copy of the atoms object is returned,
to avoid mutating the original atoms object. Defaults to True.
Returns:
ase.Atoms: The wrapped and sorted Atoms object.
"""
if copy:
atoms = atoms.copy()
atoms.wrap()
atoms = sort_by_position(atoms)
return atoms
def ndarray2list(data):
"""
Convert nested lists of a combination of lists and numpy arrays
to list of lists
"""
if not isinstance(data, list) and not isinstance(data, np.ndarray):
return data
data = list(data)
for i, value in enumerate(data):
data[i] = ndarray2list(value)
return list(data)
def dec_string(deco, equiv_sites):
"""Create the decoration string based on equiv sites."""
equiv_dec = sorted(equivalent_deco(deco, equiv_sites))
return "".join(str(i) for i in equiv_dec[0])
def equivalent_deco(deco, equiv_sites):
"""Generate equivalent decoration numbers based on equivalent sites."""
if not equiv_sites:
return [deco]
perm = []
for equiv in equiv_sites:
perm.append(list(permutations(equiv)))
equiv_deco = []
for comb in product(*perm):
order = []
for item in comb:
order += list(item)
orig_order = list(range(len(deco)))
for i, srt_indx in enumerate(sorted(order)):
orig_order[srt_indx] = order[i]
equiv_deco.append([deco[indx] for indx in orig_order])
unique_deco = []
for eq_dec in equiv_deco:
if eq_dec not in unique_deco:
unique_deco.append(eq_dec)
return unique_deco
def flatten(x):
"""Flatten list."""
if isinstance(x, Iterable):
return [a for i in x for a in flatten(i)]
return [x]
def flatten_iter(x):
"""Flatten an iterable as a generator"""
if isinstance(x, Iterable):
return (a for i in x for a in flatten_iter(i))
return [x]
def nested_array2list(array):
"""Convert a nested array/tuple to a nested list."""
if isinstance(array, np.ndarray):
array = array.tolist()
else:
array = list(array)
try:
for i, value in enumerate(array):
if isinstance(value, np.ndarray):
array[i] = value.tolist()
else:
array[i] = list(value)
except TypeError:
pass
return array
[docs]
def update_db(
uid_initial=None,
final_struct=None,
db_name=None,
custom_kvp_init: Optional[dict] = None,
custom_kvp_final: Optional[dict] = None,
):
"""Update the database.
Parameters:
uid_initial: int
entry ID of the initial structure in the database
final_struct: Atoms
Atoms object with the final structure with a physical
quantity that needs to be modeled (e.g., DFT energy)
db_name: str
Database name
custom_kvp_init: dict (optional)
If desired, one can pass additional key-value-pairs for the
entry containing the initial structure
custom_kvp_final: dict (optional)
If desired, one can pass additional key-value-pairs for the
entry containing the final structure
"""
if custom_kvp_init is None:
custom_kvp_init = {}
if custom_kvp_final is None:
custom_kvp_final = {}
db = connect(db_name)
init_row = db.get(id=uid_initial)
# Check if a final structure already exits
name = init_row.name
select_cond = [("name", "=", name), ("struct_type", "=", "final")]
exist = sum(1 for row in db.select(select_cond))
if exist >= 1:
logger.warning(
"A structure with 'name'=%s and 'struct_type'=final already exits in DB.",
name,
)
return
# Write the final structure to database
kvp_final = {"struct_type": "final", "name": name}
kvp_final.update(custom_kvp_final)
uid_final = db.write(final_struct, key_value_pairs=kvp_final)
kvp_update_init = {
"converged": True,
"started": "",
"queued": "",
}
if kvp_final["struct_type"] == "final":
kvp_update_init["final_struct_id"] = uid_final
kvp_update_init.update(custom_kvp_init)
# Update info for the initial structure
db.update(uid_initial, **kvp_update_init)
def exclude_information_entries():
"""Return selection condition to exlcude all entries in the database that
only contain information about the clusters.
"""
return [("name", "!=", "primitive_cell"), ("name", "!=", "template")]
def get_all_internal_distances(atoms, max_dist, ref_indices):
"""Obtain all internal distances of the passed atoms object and return a
Numpy array containing all the distances sorted in an ascending order.
"""
tree = KDTree(atoms.get_positions())
distances = []
for ind in ref_indices:
indices = tree.query_ball_point(atoms[ind].position, max_dist)
dists = atoms.get_distances(ind, indices)
for d in dists:
if np.any(np.abs(np.array(distances) - d) < 1e-6):
continue
distances.append(d)
distances = sorted(distances)
# Make sure that the first element is 0
assert distances[0] < 1e-6
return np.array(distances[1:])
def reconfigure(settings, **kwargs) -> None:
"""Reconfigure the correlation functions for a settings object.
Args:
settings (ClusterExpansionSettings):
Instance of a cluster expansion settings object.
"""
from clease.corr_func import CorrFunction
from clease.settings import ClusterExpansionSettings
# Note, we cannot import ClusterExpansionSettings for typing,
# otherwise we get cyclic imports.
if not isinstance(settings, ClusterExpansionSettings):
raise TypeError(f"settings must be of type ClusterExpansionSettings, got {type(settings)}")
CorrFunction(settings).reconfigure_db_entries(**kwargs)
def split_dataset(
X: np.ndarray, y: np.ndarray, nsplits: int = 10, groups: Sequence[int] = ()
) -> List[Dict[str, np.ndarray]]:
"""Split the dataset such that it can be used for k-fold
cross validation.
:param X: Design matrix
:param y: Target values
:param nsplits: Number of partittions
:param groups: List of the same length as y. Each row
in y with the same group tag, is treated as the same
group. Datapoints in the same group are never split
across different partitions. If an empty list is give,
each item in y is assumed to constitute its own group.
"""
if not groups:
groups = list(range(len(y)))
unique_groups = list(set(groups))
if len(unique_groups) < nsplits:
raise ValueError(
"The number of unique groups has to be greater than the number of partitions."
)
shuffle(unique_groups)
partitions = []
num_validation = int(len(unique_groups) / nsplits)
# Must at least be 1
num_validation = max(num_validation, 1)
for i in range(nsplits):
start = i * num_validation
end = (i + 1) * num_validation
if i == nsplits - 1:
chosen_groups = unique_groups[start:]
else:
chosen_groups = unique_groups[start:end]
group_mask = np.zeros(len(groups), dtype=np.uint8)
group_mask[chosen_groups] = 1
index_mask = np.zeros(len(y), dtype=np.uint8)
for j, g in enumerate(groups):
if group_mask[g]:
index_mask[j] = 1
train_mask = index_mask == 0
validate_mask = index_mask == 1
train_index = np.arange(len(y))[train_mask]
validate_index = np.arange(len(y))[validate_mask]
data = {
"train_X": X[train_mask, :],
"train_y": y[train_mask],
"train_index": train_index,
"validate_X": X[validate_mask, :],
"validate_y": y[validate_mask],
"validate_index": validate_index,
}
partitions.append(data)
return partitions
def random_validation_set(
num: int = 10, select_cond: Optional[list] = None, db_name: Optional[str] = None
):
"""
Construct a random test set.
:param num: Number of datapoints to include in the test set
:param select_cond: Select condition to be used to select items from
the database. If not given, it will be struct_type='initial',
converged=True
:param db_name: Name of the database
"""
if select_cond is None:
select_cond = [("struct_type", "=", "initial"), ("converged", "=", True)]
if db_name is None:
raise ValueError("No database provided!")
all_ids = get_ids(select_cond, db_name)
return sample(all_ids, num)
def exclude_ids(ids: List[int]) -> List[tuple]:
"""
Construct a select condition based on the ids passed.
:params ids: List of IDs
"""
return [("id", "!=", x) for x in ids]
def symbols2integer(basis_functions):
"""Convert each symbol in the basis function
to a unique integer.
Parameters:
basis_functions: list of dict
The basis function dictionary from ClusterExpansionSettings
"""
symb_id = {}
for i, symb in enumerate(basis_functions[0].keys()):
symb_id[symb] = i
return symb_id
def bf2npyarray(basis_functions, symb_id):
"""Convert the basis function dictionary to a 2D
numpy array
Parameters:
basis_function: list of dict
Basis function dictionary from ClusterExpansionSettings
symb_id: dict
Dictionary of symbol integer (see symbols2integer)
"""
bf_npy = np.zeros((len(basis_functions), len(symb_id)))
for i, bf in enumerate(basis_functions):
for k, v in bf.items():
bf_npy[i, symb_id[k]] = v
return bf_npy
def nested_list2str(nested_list):
"""Convert a nested list to string."""
return "x".join(",".join(str(x) for x in item) for item in nested_list)
def str2nested_list(string):
"""Convert string to nested list."""
def _as_int(x):
return int(x)
return [list(map(_as_int, item.split(","))) for item in string.split("x")]
def min_distance_from_facet(x, cell):
"""
Calculate the minimum distance from a point to the cell facet.
Parameters:
x: np.array
Position from which to calculate the minimum distance
cell: Cell
Cell of an Atoms object
"""
dists = []
for plane in combinations([0, 1, 2], r=2):
# Unit normal vector
n = np.cross(cell[plane[0], :], cell[plane[1], :])
n /= np.sqrt(n.dot(n))
# Plane with origin in it
dist = np.abs(n.dot(x))
dists.append(dist)
# Opposite facet
remaining = list(set([0, 1, 2]) - set(plane))[0]
vec = cell[remaining, :]
dist = np.abs(n.dot(x - vec))
dists.append(dist)
return min(dists)
def trans_matrix_index2tags(trans_matrix, tagged_atoms, indices=None):
"""
Convert from indices to tags
Parameters:
trans_matrix: list of dict
Original translation matrix
tagged_atoms: Atoms
Atoms with a tag that should be used instead of the
index
indices: list of int
Atom indices corresponding to each row in trans_matrix.
If None, it is assumed that len(trans_matrix) == len(tagged_atoms)
and each row in trans_matrix corresponds to the atom with the same
index in tagged_atoms.
"""
unique_tags = sorted(list(set(atom.tag for atom in tagged_atoms)))
if indices is None:
indices = list(range(len(trans_matrix)))
# Make sure we have a continuous series of tags
assert len(unique_tags) == max(unique_tags) + 1
new_tm = [{} for _ in range(len(unique_tags))]
used_tags = [False for _ in range(len(unique_tags))]
for i, row in enumerate(trans_matrix):
tag = tagged_atoms[indices[i]].tag
if used_tags[tag]:
continue
new_row = {tagged_atoms[k].tag: tagged_atoms[v].tag for k, v in row.items()}
used_tags[tag] = True
new_tm[tag] = new_row
return new_tm
def indices2tags(supercell, clusters):
for cluster in clusters:
for i, figure in enumerate(cluster):
cluster[i] = [int(supercell[x].tag) for x in figure]
return clusters
def list2str(array):
return "-".join([str(x) for x in array])
def factorize(n):
while n > 1:
for i in range(2, n + 1):
if n % i == 0:
n = int(n / i)
yield i
break
def count_atoms(atoms):
"""
Return dictionary with the number of items of each species
"""
count = {}
for s in atoms.symbols:
count[s] = count.get(s, 0) + 1
return count
def all_integer_transform_matrices_given_diag(diag):
rng1 = range(0, diag[0] + 1)
rng2 = rng1
rng3 = range(0, diag[1] + 1)
for off_diag in product(rng1, rng2, rng3):
yield np.array(
[
[diag[0], off_diag[0], off_diag[1]],
[0, diag[1], off_diag[2]],
[0, 0, diag[2]],
]
)
def all_integer_transform_matrices_per_diag(n):
"""
Yield all the integer transform matrices
"""
diags = filterfalse(lambda x: x[0] * x[1] * x[2] != n, product(range(n + 1), repeat=3))
for d in diags:
yield all_integer_transform_matrices_given_diag(d)
def all_integer_transform_matrices(n):
return chain(*all_integer_transform_matrices_per_diag(n))
def rotate_cells(cell, target_cell):
"""
Rotate the cells such that one vector is parallel.
And the other vector lies in a given plane
"""
dot_prod_cell = cell.dot(cell.T)
dot_prod_target_cell = target_cell.dot(target_cell.T)
# Calculate unit vector cells
uvec_cell = np.zeros_like(cell)
uvec_target_cell = np.zeros_like(target_cell)
for i in range(3):
uvec_cell[i, :] = cell[i, :] / np.sqrt(dot_prod_cell[i, i])
uvec_target_cell[i, :] = target_cell[i, :] / np.sqrt(dot_prod_target_cell[i, i])
# Rotate one vector to be parallel
v = np.cross(uvec_cell[0, :], uvec_target_cell[0, :])
c = uvec_cell[0, :].dot(uvec_target_cell[0, :])
v_cross = np.zeros((3, 3))
v[0, 1] = -v[2]
v[0, 2] = v[1]
v[1, 0] = v[2]
v[1, 2] = -v[0]
v[2, 0] = -v[1]
v[2, 1] = v[0]
R = np.eye(3) + v_cross + v_cross.dot(v_cross) / (1 + c)
target_cell = R.dot(target_cell)
return target_cell
def species_chempot2eci(bf_list, species_chempot):
"""
Convert chemical potentials given for species to their corresponding
singlet ECI values.
Parameters:
bf_list: list
List of basis function values for each species
species_chempot: dict
Dictionary containing the chemical potential for each species. Note
that the chemical potential for one species can be set to 0.0 without
any loss of generality. Hence, the species_chempot should contain
chemical potentials for all elements except one.
"""
if len(species_chempot) != len(bf_list):
msg = "Inconsistent number of chemical potentials. Basis functions\n"
msg += f"{bf_list}. Passed chemical potentials {species_chempot}"
raise ValueError(msg)
n = len(species_chempot)
mat = np.zeros((n, n))
rhs = np.zeros(n)
row = 0
for sp, mu in species_chempot.items():
for col, bf in enumerate(bf_list):
mat[row, col] = bf[sp]
rhs[row] = mu
row += 1
try:
eci_chem_pot = np.linalg.solve(mat, rhs)
except np.linalg.LinAlgError:
inv_mat = invert_matrix(mat)
eci_chem_pot = inv_mat.dot(rhs)
eci_chem_pot = eci_chem_pot.tolist()
eci_dct = {f"c1_{i}": v for i, v in enumerate(eci_chem_pot)}
return eci_dct
def bf2matrix(bfs):
"""
Convert a list of basis functions to a matrix. Each column represents
a species and each row represent a basis function.
Parameter:
bfs: list of dict
List of dictionaries containing the basis function values
"""
nrows = len(bfs)
ncols = len(bfs[0])
mat = np.zeros((nrows, ncols))
keys = sorted(list(bfs[0].keys()))
for i, bf in enumerate(bfs):
for j, symb in enumerate(keys):
mat[i, j] = bf[symb]
return mat
def singlets2conc(bf_list, singlets):
"""
Convert singlets to concentrations.
Parameters:
bf_list: list
List with the basis functions (e.g [{'Au': 1, 'Cu': -1.0}])
singlets: np.ndarray
Array with singlets (NxM), M has to match the length of bf_list.
The columns are assumed to given in sorted order (i.e. c1_0, c1_1,
c1_2 etc.)
"""
mat = bf2matrix(bf_list)
# Add row to force that concentrations sum to 1
mat = np.vstack((mat, np.ones(mat.shape[1])))
singlets = np.hstack((singlets, np.ones((singlets.shape[0], 1))))
res = np.linalg.solve(mat, singlets.T).T
symbs = sorted(list(bf_list[0].keys()))
concs = []
for i in range(res.shape[0]):
concs.append({symbs[j]: res[i, j] for j in range(res.shape[1])})
return concs
def rate_bf_subsets(elems, bfs):
"""
Rate different combinations of basis function according to how
well it is able to distinguish the elements in the basis.
Example:
bfs = [{'Li': 1.0, 'X': 0.0, 'V': 0.0, 'F': 0.0},
{'Li': 0.0, 'X': 1.0, 'V': 0.0, 'F': 0.0},
{'Li': 0.0, 'X': 0.0, 'V': 1.0, 'F': 0.0}]
If one wants basis functions for the triplet [Li, X, F] we need two
basis functions. The following combinations are possible
(bfs[0], bfs[1]), (bfs[0], bfs[2]), (bfs[1], bfs[2])
The score is defined as the sum of the absolute value of the difference
between the basis function value for the selected symbols. The score for
the first combinations is thus
score1 = |bfs[0]['Li'] - bfs[0]['F']| + |bfs[0]['Li'] - bfs[0]['X']| +
|bfs[0]['F'] - bfs[0]['X']| + |bfs[1]['Li'] - bfs[1]['F']| +
|bfs[1]['Li'] - bfs[1]['X']| + |bfs[1]['F'] - bfs[1]['X']|
Therefore,
score1 = 1.0 + 1.0 + 0.0 + 0.0 + 1.0 + 1.0 = 4.0
Parameter:
bfs: list of dict
List with dictionaries with the basis function values
"""
score_and_comb = []
for comb in combinations(range(len(bfs)), r=len(elems) - 1):
chosen = [{s: bfs[x][s] for s in elems} for x in comb]
mat = bf2matrix(chosen)
score = 0.0
for row in range(mat.shape[0]):
for i in product(range(mat.shape[1]), repeat=2):
score += abs(mat[row, i[0]] - mat[row, i[1]])
score_and_comb.append((score, list(comb)))
score_and_comb.sort(reverse=True)
return score_and_comb
def select_bf_subsets(basis_elems, bfs):
"""
Select a subset of basis functions that best describes the basis.
The best subset is the one that yields the highest sum of the scores for
each sublattice. For definition of score see docstring
of `rate_bf_subsets`
Parameters:
basis_elem: nested list of strings
Basis elements (e.g. [[Li, V], [O]])
bfs: list of dicts
List of dictionaries holding the basis function values
(e.g. [{'Li' : 1.0, 'V': 0.0, 'O': 0.0},
{'Li': 0.0, 'O': 1.0, 'V': 0.0}])
"""
rates = []
for elems in basis_elems:
rates.append(rate_bf_subsets(elems, bfs))
# Find the combination of subset selections that gives the overall highest
# score given that a basis function can be present in only one basis
best_selection = []
best_score = None
for comb in product(*rates):
total_score = sum(rate[0] for rate in comb)
selection = [rate[1] for rate in comb]
# Check that no basis function is selected twice
selected_bfs = set()
duplicates = False
for s in selection:
for bf in s:
if bf in selected_bfs:
duplicates = True
else:
selected_bfs.add(bf)
# Add penalty to the ones that has duplicate CF functions. This way we
# select a combination that has the same basis function in multiple
# atomic basis if it is possible
if duplicates:
total_score -= 1000.0
if best_score is None or total_score > best_score:
best_score = total_score
best_selection = selection
return best_selection
def cname_lt(cname1, cname2):
"""
Compare two cluster names to check if the first cluster name is
smaller than (less than, or lt) the second cluster name. Since the cluster
names take a form 'c#_d####_#', the prefix ('c#_d####') is evaluated as a
string while the suffix ('#') is evaluated as an integer.
Return `True` if cname1 < cname2 and `False` otherwise.
"""
if not isinstance(cname1, str) and isinstance(cname2, str):
raise TypeError("cnames should be strings.")
if cname1 in ("c0", "c1"):
prefix1 = cname1
else:
prefix1 = cname1.rpartition("_")[0]
if cname2 in ("c0", "c1"):
prefix2 = cname2
else:
prefix2 = cname2.rpartition("_")[0]
if prefix1 < prefix2:
return True
if prefix1 > prefix2:
return False
# Case where prefixes are the same.
if cname1 in ("c0", "c1"):
suffix1 = 0
else:
suffix1 = int(cname1.rpartition("_")[-1])
if cname2 in ("c0", "c1"):
suffix2 = 0
else:
suffix2 = int(cname2.rpartition("_")[-1])
if suffix1 < suffix2:
return True
return False
def aic(mse, num_features, num_data_points):
"""
Return Afaike's information criteria
Parameters:
mse: float
Mean square error
num_features: int
Number of features in the model
num_data_points: int
Number of data points
"""
return 2.0 * num_features + num_data_points * np.log(mse)
def aicc(mse, num_features, num_data_points):
"""
Return the modified Afaike's information criterion
Parameters:
mse: float
Mean square error
num_features: int
Number of features in the model
num_data_points: int
Number of data points
"""
if num_features >= num_data_points - 1:
denum = 1.0
else:
denum = num_data_points - num_features - 1
corr = (2 * num_features**2 + 2 * num_features) / denum
return aic(mse, num_features, num_data_points) + corr
def bic(mse, num_features, num_data_points):
"""
Return Bayes Information Criteria
Parameters:
mse: float
Mean square error
num_features: int
Number of features
num_data_points: int
Number of data points
"""
return np.log(num_data_points) * num_features + num_data_points * np.log(mse)
def get_extension(fname: Union[str, Path]) -> str:
"""
Return the file extension of a filename
Parameter:
fname: str
Filename
"""
return Path(fname).suffix
def add_file_extension(fname: Union[str, Path], ext: str) -> str:
"""
Adds the wanted file extension to a filename. If a file extension
is already present and it matches the wanted file extension, nothing
is done. If it does not match, a ValueError is raised. Finally, if
no file extension exist the wanted extension is added
:param fname: file name
:param ext: extension (with .) example (.csv, .txt, .json)
"""
fname = Path(fname)
current_ext = fname.suffix
if current_ext == ext:
return str(fname)
if current_ext == "":
return str(fname.with_suffix(ext))
raise ValueError(f"Passed extenstion {current_ext} expected {ext}")
def get_size_from_cf_name(cf_name: str) -> int:
"""Extract the size from a CF name."""
if not cf_name.startswith("c"):
raise ValueError(f"The name {cf_name} doesn't look like a CF name.")
# Regex for finding the first integer occurence.
res = re.search(r"\d+", cf_name)
if not res:
# No numbers were found in the string.
raise ValueError(f"No numbers present in name: {cf_name}")
return int(res.group())
def get_diameter_from_cf_name(cf_name: str) -> int:
"""Extract the diameter ground from the name.
Example: "c2_d0001_0_00" returns 1."""
size = get_size_from_cf_name(cf_name)
if size in {0, 1}:
# 0- and 1-body clusters have a slightly different convention.
return 0
dia_str = cf_name.split("_")[1]
# The diameter delimiter should start with a "d".
if not dia_str.startswith("d"):
raise ValueError(
f"Diameter format looks incorrect for cf name '{cf_name}'. Found '{dia_str}'."
)
return int(dia_str[1:])
def sort_cf_names(cf_names: tIterable[str]) -> List[str]:
"""
Return a sorted list of correlation function names. The names are
sorted according to the following criteria
1. Size
2. Diameter
3. Lexicographical order of the name itself
"""
# Helper sorting function to define the order of the sorting.
def _sort_ordering(name: str):
return get_size_from_cf_name(name), get_diameter_from_cf_name(name), name
return sorted(cf_names, key=_sort_ordering)
def get_ids(select_cond: List[tuple], db_name: str) -> List[int]:
"""
Return ids in the datase that correspond to the passed selection.
:param select_cond: ASE select conditions.
:return: List of database IDs matching the select conditions.
"""
keys, cmps = parse_selection(select_cond)
db = connect(db_name)
sql, args = db.create_select_statement(keys, cmps)
# Extract the ids in the database that corresponds to select_cond
sql = sql.replace("systems.*", "systems.id")
with connect(db_name) as db:
con = db.connection
cur = con.cursor()
cur.execute(sql, args)
ids = [row[0] for row in cur.fetchall()]
ids.sort()
return ids
class SQLCursor(Protocol):
def execute(self, sql: str, placeholder: Tuple[str]) -> None:
pass
def fetchall(self) -> tuple:
pass
def get_attribute(ids: List[int], cur: SQLCursor, key: str, table: str) -> list:
"""
Retrieve the value of the given key for the rows with the given ID of
the database entry.
:param ids: list of IDs
:param cur: cursor for the database
:param key: name of the key
:param table: name of the table
"""
known_tables = ["text_key_values", "number_key_values"]
if table not in known_tables:
raise ValueError(f"Table has to be one of {known_tables}")
sql = f"SELECT value, id FROM {table} WHERE key=?"
id_set = set(ids)
cur.execute(sql, (key,))
row_id_value = {}
for value, row_id in cur.fetchall():
if row_id in id_set:
row_id_value[row_id] = value
# Convert to a list that matches the order of the IDs that
# was passed
return [row_id_value[k] for k in ids]
def common_cf_names(ids: Set[int], cur: SQLCursor, table: str) -> Set[str]:
"""
Extracts all correlation function names that are present for all
ids
:param ids: List of ids that should be checked
:param cur: SQL cursor
:param table: Table to check
"""
known_tables = ["polynomial_cf", "binary_linear_cf", "trigonometric_cf"]
if table not in known_tables:
raise ValueError(f"Table has to be one of {known_tables}")
sql = f"SELECT key, id FROM {table}"
cur.execute(sql)
cf_names = {}
for cf_name, row_id in cur.fetchall():
if row_id in ids:
current = cf_names.get(row_id, set())
current.add(cf_name)
cf_names[row_id] = current
# Calculate the intersection between all sets
return set.intersection(*list(cf_names.values()))
def constraint_is_redundant(
A_lb: np.ndarray,
b_lb: np.ndarray,
c_lb: np.ndarray,
d: float,
A_eq: Optional[np.ndarray] = None,
b_eq: Optional[np.ndarray] = None,
) -> bool:
"""
The method considers the following system
min c.dot(x) for some arbitrary c
subject to
A_lb.dot(x) >= b_lb
A_eq.dot(x) = b_eq
If the additional constraint c_lb.dot(x) >= d is redundant, the method
returns True. The constraint specified by c_lb is redundant if the solution
to min c_lb.dot(x) subject to the constraint above satisfies
c_lb.dot(x) >= d. This method is know as the Linear Programming Method.
:param A_lb: Matrix specifying the lower bounds
:param b_lb: Vector specifying the right hand side of the lower bound
constraint.
:param c_lb: Vector specifying the additional in-equality constraint
:param d: Right hand side of the additional in-equality
:param A_eq: Matrix specifying the equality constraint. If None,
no equality constraints exist.
:param b_eq: Vector specifuing the right hand side of the equality
constraints. If None, no equality constraints exist
"""
# Scipy uses upper bounds in stead of lower bounds, convert lower bounds
# to upper bounds by changing the sign
res = linprog(c_lb, A_ub=-A_lb, b_ub=-b_lb, A_eq=A_eq, b_eq=b_eq)
return c_lb @ res.x >= d
def remove_redundant_constraints(
A_lb: np.ndarray,
b_lb: np.ndarray,
A_eq: Optional[np.ndarray] = None,
b_eq: Optional[np.ndarray] = None,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Remove all redundant constraints from A_lb and b_lb.
min c.dot(x) for some arbitrary c
subject to
A_lb.dot(x) >= b_lb
A_eq.dot(x) = b_eq
:param A_lb: Matrix specifying the lower bounds
:param b_lb: Vector specifying the right hand side of the lower bound
constraint.
:param A_eq: Matrix specifying the equality constraint. If None,
no equality constraints exist.
:param b_eq: Vector specifuing the right hand side of the equality
constraints. If None, no equality constraints exist
:return:
A_lb, b_lb with only non-redundant constraints
"""
redundant = []
perturb = 1
for i in range(A_lb.shape[0]):
c_lb = A_lb[i, :]
d = b_lb[i]
# Make the constraint under consideration more generous by lowering the bound
b_lb[i] -= perturb
if constraint_is_redundant(A_lb, b_lb, c_lb, d, A_eq, b_eq):
redundant.append(i)
# Set the constraint back to the original value
b_lb[i] += perturb
return np.delete(A_lb, redundant, axis=0), np.delete(b_lb, redundant)
def remove_redundant_equations(A, b, tol=1e-6):
R_trimmed = []
indices = []
Q, R = np.linalg.qr(A.T)
k = 0
for i in range(0, R.shape[1]):
if abs(R[k, i]) > tol:
R_trimmed.append(R[:, i])
indices.append(i)
k += 1
if k == R.shape[0]:
break
R_trimmed = np.array(R_trimmed).T
A_trimmed = Q.dot(R_trimmed).T
return A_trimmed.copy(), b[indices]
def get_cubicness(atoms: ase.Atoms) -> float:
"""Get the 'cubicness' of an atoms object,
the closer to 0, the more cubic the cell is."""
cell = np.array(atoms.get_cell())
# Normalize the cell
cell /= atoms.get_volume()
# Get diag of cell, and broadcast back into (3x3)
diag = np.diag(np.diag(cell))
# Take absolute value, as determinants can be negative.
# We just need it to be as close to 0 as possible
return abs(np.linalg.det(cell - diag))
def make_supercell(
prim: ase.Atoms, P: np.ndarray, wrap: bool = True, tol: float = 1e-5
) -> ase.Atoms:
"""Dispatcher for ase.build.supercells.make_supercell,
since the version prior to 3.22.1 had poor scaling with number of atoms."""
if ASE_VERSION <= Version("3.22.1"):
# Old version, use our local copy
logger.debug("Dispatching make_supercell to internal version, ASE version: %s", ASE_VERSION)
fnc = _make_supercell
else:
# Use the ASE version, since it contains the newer version
logger.debug(
"Dispatching make_supercell to the ASE version version, ASE version: %s", ASE_VERSION
)
fnc = ase_sc.make_supercell
return fnc(prim, P, wrap=wrap, tol=tol)
def _make_supercell(prim, P, wrap=True, tol=1e-5):
r"""
NOTE: This is a copy of the ase make_supercell implementation,
until !2639 (in the ASE GitLab repo) gets merged. This was due to an exceptionally
poor scaling with number of atoms.
Generate a supercell by applying a general transformation (*P*) to
the input configuration (*prim*).
The transformation is described by a 3x3 integer matrix
`\mathbf{P}`. Specifically, the new cell metric
`\mathbf{h}` is given in terms of the metric of the input
configuration `\mathbf{h}_p` by `\mathbf{P h}_p =
\mathbf{h}`.
Parameters:
prim: ASE Atoms object
Input configuration.
P: 3x3 integer matrix
Transformation matrix `\mathbf{P}`.
wrap: bool
wrap in the end
tol: float
tolerance for wrapping
"""
supercell_matrix = P
supercell = ase_sc.clean_matrix(supercell_matrix @ prim.cell)
# cartesian lattice points
lattice_points_frac = ase_sc.lattice_points_in_supercell(supercell_matrix)
lattice_points = np.dot(lattice_points_frac, supercell)
shifted = prim.get_positions()[:, None] + lattice_points
N = len(lattice_points)
# Reshape such that the order is [atom1_shift1, atom2_shift1, atom1_shift2, atom2_shift2, ...]
# i.e. alternating between atom 1 and atom 2
# This preserves the order from older implementations.
shifted_reshaped = np.reshape(shifted, (len(prim) * N, 3), order="F")
superatoms = ase.Atoms(positions=shifted_reshaped, cell=supercell, pbc=prim.pbc)
# Copy over any other possible arrays, inspired by atoms.__imul__
for name, arr in prim.arrays.items():
if name == "positions":
# This was added during construction of the super cell
continue
# Shape borrowed from atoms.__imul__
new_arr = np.tile(arr, (N,) + (1,) * (len(arr.shape) - 1))
superatoms.set_array(name, new_arr)
# check number of atoms is correct
n_target = abs(int(np.round(np.linalg.det(supercell_matrix) * len(prim))))
if n_target != len(superatoms):
msg = "Number of atoms in supercell: {}, expected: {}".format(n_target, len(superatoms))
raise ase_sc.SupercellError(msg)
if wrap:
superatoms.wrap(eps=tol)
return superatoms
def invert_matrix(mat: np.ndarray) -> np.ndarray:
"""Invert a matrix. Will first try to use np.linalg.inv,
and in case that fails, fall back to np.linalg.pinv."""
try:
return np.linalg.inv(mat)
except np.linalg.LinAlgError:
return np.linalg.pinv(mat)
def is_trivial_supercell(prim: ase.Atoms, atoms: ase.Atoms) -> bool:
"""Check whether a given supercell is trivially a supercell of the primitive,
i.e. if the supercell can be expressed as atoms = prim * (n1, n2, n3),
where n1, n2 and n3 are positive integers."""
# Get repetition matrix
cell = prim.get_cell()
try:
p_inv = invert_matrix(cell)
except np.linalg.LinAlgError:
return False
P = atoms.get_cell() @ p_inv
P = ase_sc.clean_matrix(P)
# Check if the repetition matrix is diagonal, i.e. if all non-diagonal elements are close to 0
is_diag = np.allclose(P - np.diag(np.diagonal(P)), 0)
if not is_diag:
return False
# Check if the diagonal is positive
diag = np.diag(P)
if (diag < 0).any():
return False
# Check if the diagonal elements are all integer
return np.allclose(diag - np.rint(diag), 0)
def get_repetition(prim: ase.Atoms, atoms: ase.Atoms) -> np.ndarray:
"""Get the repetitions of a primitive and supercell
Note: This assumes they are a simple repetition, i.e.
atoms = prim * (n1, n2, n3). No check will be performed!
"""
cell = prim.get_cell()
p_inv = invert_matrix(cell)
P = atoms.get_cell() @ p_inv
return np.rint(np.diag(P)).astype(int)
def wrap_positions_3d(
positions: np.ndarray,
cell: np.ndarray,
center=(0.5, 0.5, 0.5),
cell_T_inv: Optional[np.ndarray] = None,
eps: float = 1e-7,
) -> np.ndarray:
"""Similar to the ase.geometry.wrap_positions but this implementation assumes that the
cell is a fully 3-D periodic.
Assumes all directions are periodic! Also assumes the cell is correctly completed,
without any missing lattice vectors.
The inverse of the transposed cell is required, and can be provided as a pre-computed
quantity for reuse. This will be computed if not provided.
Args:
positions (np.ndarray): The positions to be wrapped.
cell (np.ndarray): Unit cell vectors of the atoms object.
center (tuple, optional): The positons in fractional coordinates that the new positions
will be nearest possible to. Defaults to (0.5, 0.5, 0.5).
cell_T_inv (np.ndarray, optional): A pre-computed inverse of the transposed cell.
Will be calculated, if it is not provided. Will not check if the provided
inverse is correct. Defaults to None.
eps (float, optional): Small number to prevent slightly negative coordinates from being
wrapped. Defaults to 1e-7.
Returns:
np.ndarray: The wrapped positions
"""
shift = np.asarray(center) - 0.5 - eps
# If the cell_T_inv is not None, assume it was provided correctly.
if cell_T_inv is None:
cell_T_inv = np.linalg.inv(cell.T)
fractional = cell_T_inv.dot(positions.T).T - shift
fractional %= 1.0
fractional += shift
return np.dot(fractional, cell)