Commit 226f8dc6 authored by Ask Hjorth Larsen's avatar Ask Hjorth Larsen
Browse files

move delta-ml beaker files to this git repo

parent 1edbd392
container=ae5fbce12ee3
docker cp $container:/home/beaker/notebooks/ex1-molecules/molkrr/ml_chemical_space.bkr python-modules/molkrr/
container=ae5fbce12ee3
docker cp python-modules/molkrr $container:/home/beaker/notebooks/ex1-molecules/
docker exec $container chown -R beaker.beaker /home/beaker/notebooks/ex1-molecules/molkrr
This source diff could not be displayed because it is too large. You can view the blob instead.
"""
Little python code that implements the machine learning kernel
rigde regression (KKR) algorithm.
Authors: Alejandro Perez Paz. Frid Sep 23, 2016.
Ask Hjorth Larsen <asklarsen@gmail.com>, 2016.
"""
from __future__ import print_function
import os
import ase
import math
import json as js
import gzip as gz
import numpy as np
import datetime as dt
#import orcaparse as op
from glob import glob
from time import time
from contextlib import contextmanager
from ase.io import read
from readxyz import iread_xyz
@contextmanager
def timer(name):
"""
count time on a given context
:param name: context name,name of a funtion, method, or enviroment
:type name: string
:return: None
:rtype: None
"""
tstart = time()
yield
tstop = time()
print('Time {}: {:1.3}s'.format(name, tstop - tstart))
def gen_random_index(nsystems, ntestsystems, seed=0):
"""
Generate random indices to be use as test systems
:param nsystems: total number of systems
:type nsystems: int
:param ntestsystems: total numbers os test system
:type ntestsystems: int
:param seed: randomness of the systems
:type seed: int
:return: a list of indices
:rtype: int array
"""
gen = np.random.RandomState(0)
assert ntestsystems < nsystems
test_indices = gen.choice(range(nsystems), size=ntestsystems,
replace=False)
return test_indices
def random_choice(systems, refs, ntestsystems, seed=0):
"""
separate learn_systems from test_systems on
:param systems: complete systems group
:type systems: ase.Atoms
:param refs: array with the data use to learn
:type refs: array, floats, same dimention as systems
:param ntestsystems: numbers of test systems
:type ntestsystems: int
:param seed: randomnes of the selected system
:type seed: int
:return: learn_systems, learn_references,
test_systems, test_references, test_indices
:rtype: learn_systems and test_systems, ase.Atoms array
learn_references and test_references, same as refs array
test_indices int array
"""
nsystems = len(systems)
test_indices = gen_random_index(nsystems, ntestsystems, seed)
testset = set(test_indices)
learn_systems = []
learn_references = []
test_systems = []
test_references = []
for i, (system, ref) in enumerate(zip(systems, refs)):
if i in testset:
test_systems.append(system)
test_references.append(ref)
else:
learn_systems.append(system)
learn_references.append(ref)
return learn_systems, learn_references, \
test_systems, test_references, test_indices
def coulomb_matrix(atoms):
"""
compute the coulumb matrix of a atom
:param atoms: atom object
:type atoms: ase.Atoms
:return: numpy.matrix
:rtype: numpy.matrix float
"""
natoms = len(atoms)
C = np.zeros((natoms, natoms), dtype=float)
Z = atoms.get_atomic_numbers()
r = np.array(atoms.get_positions())
# calculate Rupp's coulomb matrix: do only the lower triangular part
for i in range(natoms):
C[i][i] = 0.5 * Z[i]**2.4
for j in range(i):
C[i][j] = Z[i]*Z[j]/np.linalg.norm(r[i]-r[j])
C = C + C.T - np.diag(np.diag(C))
Cf = C.ravel()
return Cf
def sorted_coulomb_matrix(atoms):
"""
compute the sorted coulumb matrix using the norm2 of each row
and then re arrange the atoms from high value to low value
:param atoms: atom object
:type atoms: ase.Atom
:return: numpy.matrix
:rtype: numpy.matrix float
"""
cm = coulomb_matrix(atoms)
N = len(atoms)
cm = cm.reshape((N, N))
norm2s = -cm.sum(axis=1)**2
args = np.argsort(norm2s)
atoms = atoms[args]
cm = coulomb_matrix(atoms)
return cm
def diagonalize_coulumb_matrix(atoms):
N = len(atoms)
cm = coulomb_matrix(atoms)
v, dcm = np.linalg.eigh(cm.reshape((N, N)))
return dcm.ravel()
def read22k(nsystems=None, data_root_dir=None):
"""
read data from a file, probably this will be remove in the future
:param nsystems: how many systems do you want to read
:param data_root_dir:
:return: systems array
:rtype: ase.Atoms
"""
if data_root_dir is None:
thisdir = os.path.dirname(__file__)
filepath = os.path.join(thisdir, '../data/XYZ_B3LYP_631G2dfp.dat')
else:
filepath = data_root_dir + '/' + 'XYZ_B3LYP_631G2dfp.dat'
with open(filepath) as fd:
xyz = iread_xyz(fd)
systems = []
if nsystems is None:
systems = list(xyz)
else:
for i in range(nsystems):
systems.append(next(xyz))
return systems
def read22k_energy(nsystems=None):
"""
read the energy values from the dataset, probably it will remove in the
future
:param nsystems: number of systems to read
:type nsystems:
:return: numpy.array
:rtype:
"""
headers = ('Index E1-CC2 E2-CC2 f1-CC2 f2-CC2'
' E1-PBE0 E2-PBE0 f1-PBE0 f2-PBE0'
' E1-PBE0 E2-PBE0 f1-PBE0 f2-PBE0'
' E1-CAM E2-CAM f1-CAM f2-CAM').split()
all_energies = {}
with open('22k-testset/gdb8_22k_elec_spec.txt') as fd, timer('read energy'):
for line in fd:
if not line.lstrip().startswith('#'):
break
numbers = []
if nsystems is None:
numbers = [line.split() for line in fd]
else:
for i in range(nsystems):
numbers.append(line.split())
line = next(fd)
numbers = np.array(numbers).astype(float).T.copy()
assert numbers.flags.contiguous
for header, array in zip(headers, numbers):
all_energies[header] = array.astype(float)
return all_energies
def read22k_all(nsystems=None):
"""
this wil read systems and energies values from the dataset,
it will be remove in the future
:param nsystems: number of systems to read
:type nsystems: int
:return: a system array and a energies array
:rtype: ase.Atom, numpy.array(float)
"""
return read22k(nsystems), read22k_energy(nsystems)
def read_orca22kall(nsystems=None, index=None, xyz=True, theory="PBE0",
data_root_dir=""
):
"""
read the orca calculations, use the orcaparse.py to do the heavy part
:param nsystems:
:param index:
:param xyz:
:param theory:
:param data_root_dir:
:return:
"""
all_gs = []
all_absspec = []
systems = []
if index is None:
index = gen_random_index(21785, nsystems)
for i in index:
filepath = data_root_dir + "/{}/{:05}/{:05}".format(theory, i, i)
fd = open(filepath + ".out")
gs, absspec = op.parse(fd)
if xyz:
system = read22k(1, filepath + ".xyz")[0]
systems.append(system)
all_gs.append(gs)
all_absspec.append(absspec)
return systems, all_gs, all_absspec, index
def read_orca22json(nsystems=None, index=None, xyz=True,
theory="pbe0", energies=True, hlgap=True,
data_root_dir=""):
"""
read the orca calculations, use the orcaparse.py to do the heavy part
:param nsystems:
:param index:
:param xyz:
:param theory:
:param energies:
:param hlgap:
:param data_root_dir:
:return:
"""
if index is None:
index = gen_random_index(21785, nsystems,
seed=dt.datetime.now().microsecond)
data = {}
if energies:
with gz.open(data_root_dir +
"/split/orca-{}.absspec.e_1_eV.json.gz".format(theory)) \
as fd:
data['exe'] = np.array(js.load(fd))[index]
else:
with gz.open(data_root_dir +
"/split/orca-{}.absspec.f_1.json.gz".format(theory)) \
as fd:
data['fosc'] = np.array(js.load(fd))[index]
if hlgap:
with gz.open(data_root_dir +
"/split/orca-{}.gs.homolumogap.json.gz".format(theory)) \
as fd:
data['hlgap'] = np.array(js.load(fd))[index]
# data = js.load(open(data_root_dir + "/{}.json".format(theory)))
# data = np.array(data, object)
# data = data[index]
if xyz:
systems = np.array(read22k(None, data_root_dir))
systems = systems[index].tolist()
else:
systems = []
return systems, data, index
def distmatrix2kernel(*distmatrix):
"""
compute exp of the kernel rigde regression
:param distmatrix: distance matrix
:type distmatrix: np.array[float]
:return: numpy.array(float)
:rtype: numpy.array(float), one dimention
"""
exp = 1
for dist in distmatrix:
if isinstance(dist, tuple):
for d in dist:
# exp *= np.exp(-0.5 * d)
exp *= np.exp(-d)
return exp
else:
exp = np.exp(-dist)
# exp = np.exp(-0.5 * dist)
return exp
def get_sigma(d):
"""width for the Laplace/Gaussian? kernel.
Recommended value: max{|Di-Dj|}/log(2)."""
N = len(d)
L1norm = []
# do only the lower triangular part
for i in range(N):
for j in range(i):
L1norm.append(np.linalg.norm((d[i] - d[j]), ord=1))
sigma = np.max(L1norm)/np.log(2.0)
return sigma
# ---------------------------Laplace Kernel-------------------------------------
def laplace_kernel_all(descriptors, s_sigma, e_sigma):
"""
Define Laplace kernel matrix K (square NxN matrix).
Uses molecular descriptors of the training set
:param descriptors: obje with all descriptors
:type descriptors: GDesc
:param s_sigma: width of the kernel
:type s_sigma: int, float
:param e_sigma: width of the kernel, not used if L1 norm is use to calculate
distances
:type e_sigma: int, float
:return: kernel matrix
:rtype: numpy.matrix
"""
nsys = descriptors.nsystems
K = np.zeros((nsys, nsys))
for i in range(nsys):
s, e = descriptors.dists_l1(i, i, nsys)
if s_sigma is not None:
s /= s_sigma
if e_sigma is not None:
e /= e_sigma
K[i:, i] = distmatrix2kernel(s + e)
K[i, i] = 1
# symmetrize
K = K + K.T - np.diag(np.diag(K))
return K
def dist_l1_d1_d2(desc1, desc2, s_sigma, e_sigma):
"""
compute the distance between two descriptors, L1 norm
:param desc1: base descriptor
:type desc1: GDesc
:param desc2: new descriptor
:type desc2: GDesc
:param s_sigma:
:param e_sigma:
:return: distance matrix or array, depending of the dimentions od the
descriptors
:rtype: numpy.array, numpy.matrix
"""
s_dist, e_dist = desc1.dists_v_l1(desc2)
if s_sigma is not None:
s_dist /= s_sigma
if e_sigma is not None:
e_dist /= e_sigma
return s_dist, e_dist
def predict_v_l1(learn_desc, test_desc, coeffs, s_sigma, e_sigma):
"""
prediction funtion, use L1 norm to calculate distance between descriptors
:param learn_desc: base descriptor
:type learn_desc: GDesc
:param test_desc: new descriptor
:type test_desc: GDesc
:param coeffs: base coeficient extract from the learning process
:type coeffs: numpy.array[float]
:param s_sigma: width of the lernel
:type s_sigma: int, float
:param e_sigma: not use with this norm, needed for compativility reason
:type e_sigma: int, float
:return: predicte values
:rtype: float, int
"""
s_dist, e_dist = \
dist_l1_d1_d2(learn_desc, test_desc, s_sigma, e_sigma)
dis_m = \
distmatrix2kernel((s_dist + e_dist))
return [np.dot(dis_m, coeffs)]
# ---------------------------Gaussian Kernel------------------------------------
def gausian_kernel_all(descriptors, s_sigma, e_sigma):
"""
Define Gaussian kernel matrix K (square NxN matrix).
Uses molecular descriptors of the training set
:param descriptors: obje with all descriptors
:type descriptors: GDesc
:param s_sigma: width of the kernel
:type s_sigma: int, float
:param e_sigma: width of the kernel, not used if L1 norm is use to calculate
distances
:type e_sigma: int, float
:return: kernel matrix
:rtype: numpy.matrix
"""
nsys = descriptors.nsystems
K = np.zeros((nsys, nsys))
for i in range(nsys):
s, e = descriptors.dists_l2(i, i, nsys)
if s_sigma is not None:
s /= s_sigma
if e_sigma is not None:
e /= e_sigma
K[i:, i] = distmatrix2kernel(np.sqrt(s+e))
K[i, i] = 1
# symmetrize
K = K + K.T - np.diag(np.diag(K))
return K
def dist_l2_d1_d2(desc1, desc2, s_sigma=1, e_sigma=1):
"""
compute the distance between two descriptors, L2 norm
:param desc1: base descriptor
:type desc1: GDesc
:param desc2: new descriptor
:type desc2: GDesc
:param s_sigma:
:param e_sigma:
:return: distance matrix or array, depending of the dimentions od the
descriptors
:rtype: numpy.array, numpy.matrix
"""
s_dist, e_dist = desc1.dists_v_l2(desc2)
if s_sigma is not None:
s_dist /= s_sigma
if e_sigma is not None:
e_dist /= e_sigma
return s_dist, e_dist
def predict_v_l2(learn_desc, test_desc, coeffs, s_sigma, e_sigma):
"""
prediction funtion, use L2 norm to calculate distance between descriptors
:param learn_desc: base descriptor
:type learn_desc: GDesc
:param test_desc: new descriptor
:type test_desc: GDesc
:param coeffs: base coeficient extract from the learning process
:type coeffs: numpy.array[float]
:param s_sigma: width of the lernel
:type s_sigma: int, float
:param e_sigma: not use with this norm, needed for compativility reason
:type e_sigma: int, float
:return: predicte values
:rtype: float, int
"""
# dist = dist_l2_d1_d2(learn_desc, test_desc, s_sigma, e_sigma)
# values = []
# for s in dist:
# dis_m = distmatrix2kernel(s)
# dis_m *= coeffs
# val = np.sum(dis_m, axis=0)
# values.append(val)
s_dist, e_dist = dist_l2_d1_d2(learn_desc, test_desc,
s_sigma, e_sigma)
dis_m = distmatrix2kernel(np.sqrt(s_dist + e_dist))
return [np.dot(dis_m, coeffs)]
class Machine:
"""
machine class for the kernel rigde regression
"""
def __init__(self, desc,
struct_sigma=None,
elect_sigma=None,
kernel=laplace_kernel_all):
"""
contructor function, with some default values
:param desc: base descriptors
:type desc: GDesc
:param struct_sigma: structural width of the kernel, the value 900
prove to be the first value not to wide and not to narrow
:type struct_sigma: int, float
:param elect_sigma: electrical width of the kernel
:type elect_sigma: int, float
:param kernel: function to be use to calculate the kernel
:type kernel: function pointer, lapalace is default
"""
self.descriptors = desc
self.s_sigma = struct_sigma
self.e_sigma = elect_sigma
if desc is None:
return
self.K = kernel(desc, self.s_sigma, self.e_sigma)
def save_state(self, filepath="./ml_kernel"):
"""
serialization routine, to save the current state of the kernel
:param filepath: where to save the kernel
:type filepath: string
:return: None
:rtype: None
"""
with timer('save_kernel'):
fml_kernel_c = open(filepath + "_coef", "w")
fml_kernel_d = open(filepath + "_desc", "w")
np.save(file=fml_kernel_c, arr=self.coeffs)
np.save(file=fml_kernel_d, arr=self.descriptors.buf)
def load_state(self, filepath="./ml_kernel"):
"""
load some save kernel state
:param filepath: where the kernel is
:type filepath: string
:return:
:rtype:
"""
with timer('load_kernel'):
fml_kernel_c = open(filepath + "_coef", "r")
fml_kernel_d = open(filepath + "_desc", "r")
self.descriptors = SCMDescriptors(0, 0, 0)
self.coeffs = np.load(file=fml_kernel_c)
self.descriptors.buf = np.load(file=fml_kernel_d)
self.descriptors.nsystems = len(self.descriptors.buf)
self.descriptors.maxlen = math.sqrt(len(self.descriptors.buf[0]))
def coeff_cal(self, references, gamma=0.0):
"""
compute the coefficient, from the kernel matrix using some reference
value as
:param references: numpy.array of values to train the algorithm,
:type references: numpy.array[float]
:param gamma: not use
:type gamma: float
:return: None. change the internal state of the kernel
:rtype: None
"""
# regularization parameter. Determine its value by cross-validation!
self.coeffs = np.linalg.solve(self.K + gamma *
np.eye(len(self.descriptors)),
references)
# print("coeff")
# print(self.coeffs)
def predict(self, test_des, dist=predict_v_l1):
"""
evalute the distance between the kernel and a new descriptor
to produce a predicter result base of that distance and the coefficient
:param test_des: new descriptor
:type test_des: GDesc
:param dist: function use to evaluate the distance
:type dist: funtion pointer, default predict_v_l1
:return: predicted value(s)
:rtype: numpy.array[float] dimention can chage depending of test_des
"""
# maching descriptors size to
# t = self.descriptors.buf.copy()
# if test_des.maxlen > self.descriptors.maxlen:
# gap = (test_des.maxlen ** 2 - int(self.descriptors.maxlen) ** 2)
# self.descriptors.buf = np.require(self.descriptors.buf,
# requirements=['OWNDATA'])
# self.descriptors.buf = np.c_[self.descriptors.buf,
# np.zeros((len(self.descriptors.buf),
# gap))]
values = dist(self.descriptors, test_des, self.coeffs,
self.s_sigma, self.e_sigma)
values = np.array(values)
return values
class GDesc:
"""
descriptor class, only take care of mantain descriptor list
and calculate distance, of descriptors