Module mogptk.gpr.model
Expand source code Browse git
import sys
import torch
import numpy as np
from scipy.stats import qmc, gaussian_kde
from IPython.display import display, HTML
from . import Parameter, Mean, Kernel, MultiOutputKernel, Likelihood, MultiOutputLikelihood, GaussianLikelihood, config, plot_gram
import warnings
warnings.simplefilter('ignore', torch.jit.TracerWarning)
def _init_grid(N, X):
n = np.power(N,1.0/X.shape[1])
if not n.is_integer():
raise ValueError("number of inducing points must equal N = n^%d" % X.shape[1])
n = int(n)
Z = torch.empty((N,X.shape[1]), device=config.device, dtype=config.dtype)
grid = torch.meshgrid([torch.linspace(torch.min(X[:,i]), torch.max(X[:,i]), n) for i in range(X.shape[1])], indexing='ij')
for i in range(X.shape[1]):
Z[:,i] = grid[i].flatten()
return Z
def _init_random(N, X):
sampler = qmc.Halton(d=X.shape[1])
samples = torch.tensor(sampler.random(n=N), device=config.device, dtype=config.dtype)
Z = torch.empty((N,X.shape[1]), device=config.device, dtype=config.dtype)
for i in range(X.shape[1]):
Z[:,i] = torch.min(X[:,i]) + (torch.max(X[:,i])-torch.min(X[:,i]))*samples[:,i]
return Z
def _init_density(N, X):
kernel = gaussian_kde(X.T.detach().cpu().numpy(), bw_method='scott')
Z = torch.tensor(kernel.resample(N).T, device=config.device, dtype=config.dtype)
return Z
def init_inducing_points(Z, X, method='grid', output_dims=None):
"""
Initialize locations for inducing points.
Args:
Z (int,list): Number of inducing points. A list of ints of shape (output_dims,) will initialize the specified number of inducing points per output dimension.
X (torch.tensor): Input data of shape (data_points,input_dims).
method (str): Method for initialization, can be `grid`, `random`, or `density`.
output_dims (int): Number of output dimensions. If not None, the first input dimension of the input data must contain the channel IDs.
Returns:
torch.tensor: Inducing point locations of shape (data_points,input_dims). In case of multiple output dimensions, the first input dimension will be the channel ID.
"""
_init = _init_grid
if method == 'random':
_init = _init_random
elif method == 'density':
_init = _init_density
if output_dims is not None:
if isinstance(Z, int) or all(isinstance(z, int) for z in Z) and len(Z) == output_dims:
if isinstance(Z, int):
Z = [Z] * output_dims
M = Z
Z = torch.zeros((sum(M),X.shape[1]))
for j in range(len(M)):
m0 = sum(M[:j])
m = M[j]
Z[m0:m0+m,0] = j
Z[m0:m0+m,1:] = _init(m, X[X[:,0] == j,1:])
elif isinstance(Z, int):
M = Z
Z = _init(M, X)
return Z
class CholeskyException(Exception):
def __init__(self, message, K, model):
self.message = message
self.K = K
self.model = model
def __str__(self):
return self.message
class Model(torch.nn.Module):
"""
Base model class.
Attributes:
kernel (mogptk.gpr.kernel.Kernel): Kernel.
likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
mean (mogptk.gpr.mean.Mean): Mean.
"""
def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0), jitter=1e-8, mean=None):
super().__init__()
if not issubclass(type(kernel), Kernel):
raise ValueError("kernel must derive from mogptk.gpr.Kernel")
X, y = self._check_input(X, y)
if mean is not None:
if not issubclass(type(mean), Mean):
raise ValueError("mean must derive from mogptk.gpr.Mean")
mu = mean(X).reshape(-1,1)
if mu.shape != y.shape:
raise ValueError("mean and y data must match shapes: %s != %s" % (mu.shape, y.shape))
if issubclass(type(likelihood), MultiOutputLikelihood) and likelihood.output_dims != kernel.output_dims:
raise ValueError("kernel and likelihood must have matching output dimensions")
likelihood.validate_y(X, y)
# limit to number of significant digits
if config.dtype == torch.float32:
jitter = max(jitter, 1e-6)
elif config.dtype == torch.float64:
jitter = max(jitter, 1e-15)
self.kernel = kernel
self.X = X
self.y = y
self.mean = mean
self.likelihood = likelihood
self.jitter = jitter
self.input_dims = X.shape[1]
self._compiled_forward = None
def name(self):
return self.__class__.__name__
def forward(self, x=None):
return -self.log_marginal_likelihood() - self.log_prior()
def compile(self):
if self._compiled_forward is None:
self._compiled_forward = torch.jit.trace(self.forward, ())
def __getstate__(self):
state = super().__getstate__()
state['_modules'] = state['_modules'].copy()
state['_modules'].pop('_compiled_forward', None)
return state
def __setattr__(self, name, val):
if hasattr(self, name) and isinstance(getattr(self, name), Parameter):
raise AttributeError("parameter is read-only, use Parameter.assign()")
if isinstance(val, Parameter) and val._name is None:
val._name = '%s.%s' % (self.__class__.__name__, name)
elif isinstance(val, torch.nn.ModuleList):
for i, item in enumerate(val):
for p in item.parameters():
p._name = '%s[%d].%s' % (self.__class__.__name__, i, p._name)
super().__setattr__(name, val)
def _check_input(self, X, y=None):
if not isinstance(X, torch.Tensor):
X = torch.tensor(X, device=config.device, dtype=config.dtype)
else:
X = X.to(config.device, config.dtype)
if X.ndim == 0:
X = X.reshape(1,1)
elif X.ndim == 1:
X = X.reshape(-1,1)
elif X.ndim != 2:
raise ValueError("X must have dimensions (data_points,input_dims) with input_dims optional")
if X.shape[0] == 0 or X.shape[1] == 0:
raise ValueError("X must not be empty")
if y is not None:
if not isinstance(y, torch.Tensor):
y = torch.tensor(y, device=config.device, dtype=config.dtype)
else:
y = y.to(config.device, config.dtype)
if y.ndim == 0:
y = y.reshape(1,1)
elif y.ndim == 1:
y = y.reshape(-1,1)
elif y.ndim != 2 or y.shape[1] != 1:
raise ValueError("y must have one dimension (data_points,)")
if X.shape[0] != y.shape[0]:
raise ValueError("number of data points for X and y must match")
return X, y
else:
# X is for prediction
if X.shape[1] != self.input_dims:
raise ValueError("X must have %s input dimensions" % self.input_dims)
return X
def _index_channel(self, value, X):
if self.kernel.output_dims is not None and 0 < value.ndim and value.shape[0] == self.kernel.output_dims:
return torch.index_select(value, dim=0, index=X[:,0].long())
return value
def print_parameters(self, file=None):
"""
Print parameters and their values.
"""
def param_range(lower, upper, train=True, pegged=False):
if lower is not None:
if lower.size == 1:
lower = lower.item()
elif (lower.max()-lower.min())/lower.mean() < 1e-6:
lower = lower.mean().item()
else:
lower = lower.tolist()
if upper is not None:
if upper.size == 1:
upper = upper.item()
elif (upper.max()-upper.min())/upper.mean() < 1e-6:
upper = upper.mean().item()
else:
upper = upper.tolist()
if pegged:
return "pegged"
elif not train:
return "fixed"
if lower is None and upper is None:
return "(-∞, ∞)"
elif lower is None:
return "(-∞, %s]" % upper
elif upper is None:
return "[%s, ∞)" % lower
return "[%s, %s]" % (lower, upper)
if file is None:
try:
get_ipython # fails if we're not in a notebook
table = '<table><tr><th style="text-align:left">Name</th><th>Range</th><th>Value</th></tr>'
for p in self.parameters():
table += '<tr><td style="text-align:left">%s</td><td>%s</td><td>%s</td></tr>' % (p._name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy())
table += '</table>'
display(HTML(table))
return
except Exception as e:
pass
vals = [["Name", "Range", "Value"]]
for p in self.parameters():
vals.append([p._name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy().tolist()])
nameWidth = max([len(val[0]) for val in vals])
rangeWidth = max([len(val[1]) for val in vals])
for val in vals:
#print("%-*s %-*s %s" % (nameWidth, val[0], rangeWidth, val[1], val[2]), file=file)
print("%-*s %s" % (nameWidth, val[0], val[2]), file=file)
def _cholesky(self, K, add_jitter=False):
if add_jitter:
K = K + (self.jitter * K.diagonal().mean()).repeat(K.shape[0]).diagflat()
try:
return torch.linalg.cholesky(K)
except RuntimeError as e:
print("ERROR:", e.args[0], file=sys.__stdout__)
if K.isnan().any():
print("ERROR: kernel matrix has NaNs!", file=sys.__stdout__)
if K.isinf().any():
print("ERROR: kernel matrix has infinities!", file=sys.__stdout__)
self.print_parameters()
plot_gram(K)
raise CholeskyException(e.args[0], K, self)
def log_marginal_likelihood(self):
"""
Return the log marginal likelihood given by
$$ \\log p(y) $$
Returns:
torch.tensor: Log marginal likelihood.
"""
raise NotImplementedError()
def log_prior(self):
"""
Return the log prior given by
$$ \\log p(\\theta) $$
Returns:
torch.tensor: Log prior.
"""
return sum([p.log_prior() for p in self.parameters()])
def loss(self):
"""
Model loss for training.
Returns:
torch.tensor: Loss.
"""
self.zero_grad(set_to_none=True)
if self._compiled_forward is None:
loss = self.forward()
else:
loss = self._compiled_forward()
loss.backward()
return loss
def K(self, X1, X2=None):
"""
Evaluate kernel at `X1` and `X2` and return the NumPy representation.
Args:
X1 (torch.tensor): Input of shape (data_points1,input_dims).
X2 (torch.tensor): Input of shape (data_points2,input_dims).
Returns:
numpy.ndarray: Kernel matrix of shape (data_points1,data_points2).
"""
with torch.inference_mode():
return self.kernel(X1, X2)
def predict_f(self, X, full=False):
"""
Get the predictive posterior for values of f.
Args:
X (torch.tensor): Input of shape (data_points,input_dims) needed for multi-output likelihood.
full (bool): Return the full variance matrix as opposed to the diagonal.
Returns:
torch.tensor: Mean of the predictive posterior of shape (data_points,1).
torch.tensor: Variance of the predictive posterior of shape (data_points,1) or (data_points,data_points).
"""
raise NotImplementedError()
def predict_y(self, X, ci=None, sigma=None, n=10000):
"""
Get the predictive posterior for values of y.
Args:
X (torch.tensor): Input of shape (data_points,input_dims) needed for multi-output likelihood.
ci (list of float): Two percentages [lower, upper] in the range of [0,1] that represent the confidence interval.
sigma (float): Number of standard deviations of the confidence interval. Only used for short-path for Gaussian likelihood.
n (int): Number of samples used from distribution to estimate quantile.
Returns:
torch.tensor: Mean of the predictive posterior of shape (data_points,1).
torch.tensor: Lower confidence boundary of the predictive posterior of shape (data_points,1).
torch.tensor: Upper confidence boundary of the predictive posterior of shape (data_points,1).
"""
with torch.inference_mode():
X = self._check_input(X) # MxD
mu, var = self.predict_f(X)
if ci is None and sigma is not None:
p = 0.5*(1.0 + float(torch.erf(torch.tensor(sigma/np.sqrt(2.0)))))
ci = [1.0-p, p]
return self.likelihood.predict(X, mu, var, ci, sigma=sigma, n=n)
def sample_f(self, Z, n=None, prior=False):
"""
Sample f values from model.
Args:
Z (torch.tensor): Input of shape (data_points,input_dims).
n (int): Number of samples.
prior (boolean): Sample from prior instead of posterior.
Returns:
torch.tensor: Samples of shape (data_points,n) or (data_points,) if `n` is not given.
"""
with torch.inference_mode():
Z = self._check_input(Z) # MxD
S = n
if n is None:
S = 1
if prior:
mu, var = self.mean(Z), self.kernel(Z)
else:
mu, var = self.predict_f(Z, full=True) # Mx1, MxM
eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype)
var += self.jitter * var.diagonal().mean() * eye # MxM
samples_f = torch.distributions.multivariate_normal.MultivariateNormal(mu.reshape(-1), var).sample([S])
if n is None:
samples_f = samples_f.squeeze()
return samples_f
def sample_y(self, Z, n=None):
"""
Sample y values from model.
Args:
Z (torch.tensor): Input of shape (data_points,input_dims).
n (int): Number of samples.
Returns:
torch.tensor: Samples of shape (data_points,n) or (data_points,) if `n` is not given.
"""
with torch.inference_mode():
Z = self._check_input(Z) # MxD
S = n
if n is None:
S = 1
samples_f = self.sample_f(Z, n=S)
samples_y = self.likelihood.conditional_sample(Z, samples_f)
if n is None:
samples_y = samples_y.squeeze()
return samples_y
class Exact(Model):
"""
Regular Gaussian process regression with a Gaussian likelihood which allows for exact inference.
$$ y \\sim \\mathcal{N}(0, K + \\sigma^2I) $$
Args:
kernel (mpgptk.gpr.kernel.Kernel): Kernel.
X (torch.tensor): Input data of shape (data_points,input_dims).
y (torch.tensor): Output data of shape (data_points,).
variance (float,torch.tensor): Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel.
data_variance (torch.tensor): Assign different and fixed variances per data point of shape (data_points,). These are added to the kernel's diagonal while still training an additional Gaussian variance.
jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean (mogptk.gpr.mean.Mean): Mean.
"""
def __init__(self, kernel, X, y, variance=1.0, data_variance=None, jitter=1e-8, mean=None):
if data_variance is not None:
data_variance = Parameter.to_tensor(data_variance)
if data_variance.ndim != 1 or X.ndim == 2 and data_variance.shape[0] != X.shape[0]:
raise ValueError("data variance must have shape (data_points,)")
data_variance = data_variance.diagflat()
self.data_variance = data_variance
variance = Parameter.to_tensor(variance)
channels = 1
if kernel.output_dims is not None:
channels = kernel.output_dims
if 1 < variance.ndim or variance.ndim == 1 and variance.shape[0] != channels:
raise ValueError("variance must be float or have shape (channels,)")
super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean)
self.eye = torch.eye(self.X.shape[0], device=config.device, dtype=config.dtype)
self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)
def log_marginal_likelihood(self):
Kff = self.kernel.K(self.X)
Kff += self._index_channel(self.likelihood.scale().square(), self.X) * self.eye # NxN
if self.data_variance is not None:
Kff += self.data_variance
L = self._cholesky(Kff, add_jitter=True) # NxN
if self.mean is not None:
y = self.y - self.mean(self.X).reshape(-1,1) # Nx1
else:
y = self.y # Nx1
p = -self.log_marginal_likelihood_constant
p -= L.diagonal().log().sum() # 0.5 is taken inside the log: L is the square root
p -= 0.5*y.T.mm(torch.cholesky_solve(y,L)).squeeze()
return p
def predict_f(self, X, full=False):
with torch.inference_mode():
X = self._check_input(X) # MxD
if self.mean is not None:
y = self.y - self.mean(self.X).reshape(-1,1) # Nx1
else:
y = self.y # Nx1
Kff = self.kernel.K(self.X)
Kff += self._index_channel(self.likelihood.scale().square(), self.X) * self.eye # NxN
if self.data_variance is not None:
Kff += self.data_variance
Kfs = self.kernel.K(self.X,X) # NxM
Lff = self._cholesky(Kff, add_jitter=True) # NxN
v = torch.linalg.solve_triangular(Lff,Kfs,upper=False) # NxM
mu = Kfs.T.mm(torch.cholesky_solve(y,Lff)) # Mx1
if self.mean is not None:
mu += self.mean(X).reshape(-1,1) # Mx1
if full:
Kss = self.kernel.K(X) # MxM
var = Kss - v.T.mm(v) # MxM
else:
Kss_diag = self.kernel.K_diag(X) # M
var = Kss_diag - v.T.square().sum(dim=1) # M
var = var.reshape(-1,1)
return mu, var
class Snelson(Model):
"""
A sparse Gaussian process regression based on Snelson and Ghahramani [1] with a Gaussian likelihood and inducing points.
Args:
kernel (mogptk.gpr.kernel.Kernel): Kernel.
X (torch.tensor): Input data of shape (data_points,input_dims).
y (torch.tensor): Output data of shape (data_points,).
Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`.
variance (float,torch.tensor): Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel.
jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean (mogptk.gpr.mean.Mean): Mean.
[1] E. Snelson, Z. Ghahramani, "Sparse Gaussian Processes using Pseudo-inputs", 2005
"""
def __init__(self, kernel, X, y, Z=10, Z_init='grid', variance=1.0, jitter=1e-8, mean=None):
variance = Parameter.to_tensor(variance).squeeze()
if 1 < variance.ndim or variance.ndim == 1 and variance.shape[0] != kernel.output_dims:
raise ValueError("variance must be float or have shape (channels,)")
super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean)
Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims)
Z = self._check_input(Z)
self.eye = torch.eye(Z.shape[0], device=config.device, dtype=config.dtype)
self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)
self.Z = Parameter(Z, name="induction_points")
if kernel.output_dims is not None:
self.Z.num_parameters -= self.Z().shape[0]
def log_marginal_likelihood(self):
if self.mean is not None:
y = self.y - self.mean(self.X).reshape(-1,1) # Nx1
else:
y = self.y # Nx1
Kff_diag = self.kernel.K_diag(self.X) # N
Kuf = self.kernel.K(self.Z(),self.X) # MxN
Kuu = self.kernel.K(self.Z()) # MxM
Luu = self._cholesky(Kuu, add_jitter=True) # MxM; Luu = Kuu^(1/2)
v = torch.linalg.solve_triangular(Luu,Kuf,upper=False) # MxN; Kuu^(-1/2).Kuf
g = Kff_diag - v.T.square().sum(dim=1) + self._index_channel(self.likelihood.scale().square(), self.X) # N; diag(Kff-Qff) + sigma^2.I
G = torch.diagflat(1.0/g) # N
L = self._cholesky(v.mm(G).mm(v.T) + self.eye) # MxM; (Kuu^(-1/2).Kuf.G.Kfu.Kuu^(-1/2) + I)^(1/2)
c = torch.linalg.solve_triangular(L,v.mm(G).mm(y),upper=False) # Mx1; L^(-1).Kuu^(-1/2).Kuf.G.y
p = -self.log_marginal_likelihood_constant
p -= L.diagonal().log().sum() # 0.5 is taken as the square root of L
p -= 0.5*g.log().sum()
p -= 0.5*y.T.mm(G).mm(y).squeeze()
p += 0.5*c.T.mm(c).squeeze()
return p
def predict_f(self, X, full=False):
with torch.inference_mode():
X = self._check_input(X) # MxD
if self.mean is not None:
y = self.y - self.mean(self.X).reshape(-1,1) # Nx1
else:
y = self.y # Nx1
Kff_diag = self.kernel.K_diag(self.X) # N
Kuf = self.kernel.K(self.Z(),self.X) # MxN
Kuu = self.kernel.K(self.Z()) # MxM
Kus = self.kernel.K(self.Z(),X) # MxS
Luu = self._cholesky(Kuu, add_jitter=True) # MxM; Kuu^(1/2)
v = torch.linalg.solve_triangular(Luu,Kuf,upper=False) # MxN; Kuu^(-1/2).Kuf
g = Kff_diag - v.T.square().sum(dim=1) + self._index_channel(self.likelihood.scale().square(), self.X)
G = torch.diagflat(1.0/g) # N
L = self._cholesky(v.mm(G).mm(v.T) + self.eye) # MxM; (Kuu^(-1/2).Kuf.G.Kfu.Kuu^(-1/2) + I)^(1/2)
a = torch.linalg.solve_triangular(Luu,Kus,upper=False) # NxM
b = torch.linalg.solve_triangular(L,a,upper=False)
c = torch.linalg.solve_triangular(L,v.mm(G).mm(y),upper=False) # Mx1; L^(-1).Kuu^(-1/2).Kuf.G.y
mu = b.T.mm(c) # Mx1
if self.mean is not None:
mu += self.mean(X).reshape(-1,1) # Mx1
if full:
Kss = self.kernel(X) # MxM
var = Kss - a.T.mm(w) + b.T.mm(u) # MxM
else:
Kss_diag = self.kernel.K_diag(X) # M
var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1) # M
var = var.reshape(-1,1)
return mu, var
class OpperArchambeau(Model):
"""
A Gaussian process regression based on Opper and Archambeau [1] with a non-Gaussian likelihood.
Args:
kernel (mogptk.gpr.kernel.Kernel): Kernel.
X (torch.tensor): Input data of shape (data_points,input_dims).
y (torch.tensor): Output data of shape (data_points,).
likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean (mogptk.gpr.mean.Mean): Mean.
[1] M. Opper, C. Archambeau, "The Variational Gaussian Approximation Revisited", 2009
"""
def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0),
jitter=1e-8, mean=None):
super().__init__(kernel, X, y, likelihood, jitter, mean)
n = self.X.shape[0]
self.eye = torch.eye(n, device=config.device, dtype=config.dtype)
self.q_nu = Parameter(torch.zeros(n,1))
self.q_lambda = Parameter(torch.ones(n,1), lower=config.positive_minimum)
self.likelihood = likelihood
def elbo(self):
if self.mean is not None:
y = self.y - self.mean(self.X).reshape(-1,1) # Nx1
else:
y = self.y # Nx1
q_nu = self.q_nu()
q_lambda = self.q_lambda()
Kff = self.kernel(self.X) # NxN
L = self._cholesky(q_lambda*q_lambda.T*Kff + self.eye)
invL = torch.linalg.solve_triangular(L,self.eye,upper=False) # NxN
qf_mu = Kff.mm(q_nu)
qf_var_diag = 1.0/q_lambda.square() - (invL.T.mm(invL)/q_lambda/q_lambda.T).diagonal().reshape(-1,1)
kl = q_nu.T.mm(qf_mu).squeeze() # Mahalanobis
kl += L.diagonal().square().log().sum() # determinant TODO: is this correct?
#kl += invL.diagonal().square().sum() # trace
kl += invL.square().sum() # trace
kl -= q_nu.shape[0]
if self.mean is not None:
qf_mu = qf_mu - self.mean(self.X).reshape(-1,1) # Sx1
var_exp = self.likelihood.variational_expectation(self.X, y, qf_mu, qf_var_diag)
#eye = torch.eye(q_lambda.shape[0], device=config.device, dtype=config.dtype)
#qf_var = (1.0/q_lambda.square())*eye - invL.T.mm(invL)/q_lambda/q_lambda.T
#kl = -q_nu.shape[0]
#kl += q_nu.T.mm(qf_mu).squeeze() # Mahalanobis
#kl -= qf_var.det().log() # determinant
#kl += invL.diagonal().square().sum() # trace
#kl = -q_nu.shape[0]
#kl += q_nu.T.mm(q_nu).squeeze() # Mahalanobis
#kl -= qf_var.det().log() # determinant
#kl += qf_var_diag.sum() # trace
return var_exp - 0.5*kl
def log_marginal_likelihood(self):
# maximize the lower bound
return self.elbo()
def predict_f(self, X, full=False):
with torch.inference_mode():
X = self._check_input(X) # MxD
Kff = self.kernel(self.X)
Kfs = self.kernel(self.X,X) # NxS
L = self._cholesky(Kff + (1.0/self.q_lambda().square()).diagflat()) # NxN
a = torch.linalg.solve_triangular(L,Kfs,upper=False) # NxS; Kuu^(-1/2).Kus
mu = Kfs.T.mm(self.q_nu()) # Sx1
if self.mean is not None:
mu += self.mean(X).reshape(-1,1) # Sx1
if full:
Kss = self.kernel(X) # SxS
var = Kss - a.T.mm(a) # SxS
else:
Kss_diag = self.kernel.K_diag(X) # M
var = Kss_diag - a.T.square().sum(dim=1) # M
var = var.reshape(-1,1)
return mu, var
class Titsias(Model):
"""
A sparse Gaussian process regression based on Titsias [1] with a Gaussian likelihood.
Args:
kernel (mogptk.gpr.kernel.Kernel): Kernel.
X (torch.tensor): Input data of shape (data_points,input_dims).
y (torch.tensor): Output data of shape (data_points,).
Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`.
variance (float): Gaussian likelihood initial variance.
jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean (mogptk.gpr.mean.Mean): Mean.
[1] Titsias, "Variational learning of induced variables in sparse Gaussian processes", 2009
"""
# See: http://krasserm.github.io/2020/12/12/gaussian-processes-sparse/
def __init__(self, kernel, X, y, Z, Z_init='grid', variance=1.0, jitter=1e-8, mean=None):
# TODO: variance per channel
variance = Parameter.to_tensor(variance)
super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean)
Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims)
Z = self._check_input(Z)
self.eye = torch.eye(Z.shape[0], device=config.device, dtype=config.dtype)
self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)
self.Z = Parameter(Z, name="induction_points")
if kernel.output_dims is not None:
self.Z.num_parameters -= self.Z().shape[0]
def elbo(self):
if self.mean is not None:
y = self.y - self.mean(self.X).reshape(-1,1) # Nx1
else:
y = self.y # Nx1
Kff_diag = self.kernel.K_diag(self.X) # N
Kuf = self.kernel(self.Z(),self.X) # MxN
Kuu = self.kernel(self.Z()) # MxM
Luu = self._cholesky(Kuu, add_jitter=True) # MxM; Kuu^(1/2)
v = torch.linalg.solve_triangular(Luu,Kuf,upper=False) # MxN; Kuu^(-1/2).Kuf
Q = v.mm(v.T) # MxM; Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2)
L = self._cholesky(Q/self.likelihood.scale().square() + self.eye) # MxM; (Q/sigma^2 + I)^(1/2)
c = torch.linalg.solve_triangular(L,v.mm(y),upper=False)/self.likelihood.scale().square() # Mx1; L^(-1).Kuu^(-1/2).Kuf.y
# p = log N(0, Kfu.Kuu^(-1).Kuf + I/sigma^2) - 1/(2.sigma^2).Trace(Kff - Kfu.Kuu^(-1).Kuf)
p = -self.log_marginal_likelihood_constant
p -= L.diagonal().log().sum() # 0.5 is taken as the square root of L
p -= self.X.shape[0]*self.likelihood.scale().log()
p -= 0.5*y.T.mm(y).squeeze()/self.likelihood.scale().square()
p += 0.5*c.T.mm(c).squeeze()
p -= 0.5*(Kff_diag.sum() - Q.trace())/self.likelihood.scale().square() # trace
return p
def log_marginal_likelihood(self):
# maximize the lower bound
return self.elbo()
def predict_f(self, X, full=False):
with torch.inference_mode():
X = self._check_input(X) # MxD
if self.mean is not None:
y = self.y - self.mean(self.X).reshape(-1,1) # Nx1
else:
y = self.y # Nx1
Kus = self.kernel(self.Z(),X) # MxS
Kuf = self.kernel(self.Z(),self.X) # MxN
Kuu = self.kernel(self.Z()) # MxM
Luu = self._cholesky(Kuu, add_jitter=True) # MxM; Kuu^(1/2)
v = torch.linalg.solve_triangular(Luu,Kuf,upper=False) # MxN; Kuu^(-1/2).Kuf
L = self._cholesky(v.mm(v.T)/self.likelihood.scale().square() + self.eye) # MxM; (Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2)/sigma^2 + I)^(1/2)
a = torch.linalg.solve_triangular(Luu,Kus,upper=False) # MxS; Kuu^(-1/2).Kus
b = torch.linalg.solve_triangular(L,a,upper=False) # MxS; L^(-1).Kuu^(-1/2).Kus
c = torch.linalg.solve_triangular(L,v.mm(y),upper=False)/self.likelihood.scale().square() # Mx1; L^(-1).Kuu^(-1/2).Kuf.y
# mu = sigma^(-2).Ksu.Kuu^(-1/2).(sigma^(-2).Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) + I)^(-1).Kuu^(-1/2).Kuf.y
mu = b.T.mm(c) # Mx1
if self.mean is not None:
mu += self.mean(X).reshape(-1,1) # Mx1
# var = Kss - Qsf.(Qff + sigma^2 I)^(-1).Qfs
# below is the equivalent but more stable version by using the matrix inversion lemma
# var = Kss - Ksu.Kuu^(-1).Kus + Ksu.Kuu^(-1/2).(sigma^(-2).Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) + I)^(-1).Kuu^(-1/2).Kus
if full:
Kss = self.kernel(X) # MxM
var = Kss - a.T.mm(a) + b.T.mm(b) # MxM
else:
Kss_diag = self.kernel.K_diag(X) # M
var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1) # M
var = var.reshape(-1,1)
return mu, var
class SparseHensman(Model):
"""
A sparse Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood.
Args:
kernel (mogptk.gpr.kernel.Kernel): Kernel.
X (torch.tensor): Input data of shape (data_points,input_dims).
y (torch.tensor): Output data of shape (data_points,).
Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`.
likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean (mogptk.gpr.mean.Mean): Mean.
[1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015
"""
# This version replaces mu_q by L.mu_q and sigma_q by L.sigma_q.L^T, where LL^T = Kuu
# So that p(u) ~ N(0,1) and q(u) ~ N(L.mu_q, L.sigma_q.L^T)
def __init__(self, kernel, X, y, Z=None, Z_init='grid', likelihood=GaussianLikelihood(1.0),
jitter=1e-8, mean=None):
super().__init__(kernel, X, y, likelihood, jitter, mean)
n = self.X.shape[0]
self.is_sparse = Z is not None
if self.is_sparse:
Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims)
Z = self._check_input(Z)
n = Z.shape[0]
self.eye = torch.eye(n, device=config.device, dtype=config.dtype)
self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi)
self.q_mu = Parameter(torch.zeros(n,1))
self.q_sqrt = Parameter(torch.eye(n))
self.q_sqrt.num_parameters = int((n*n+n)/2)
if self.is_sparse:
self.Z = Parameter(Z, name="induction_points")
if kernel.output_dims is not None:
self.Z.num_parameters -= self.Z().shape[0]
else:
self.Z = Parameter(self.X, train=False) # don't use inducing points
def kl_gaussian(self, q_mu, q_sqrt):
S_diag = q_sqrt.diagonal().square() # NxN
kl = q_mu.T.mm(q_mu).squeeze() # Mahalanobis
kl -= S_diag.log().sum() # determinant of q_var
kl += S_diag.sum() # same as Trace(p_var^(-1).q_var)
kl -= q_mu.shape[0]
return 0.5*kl
def elbo(self):
if self.mean is not None:
y = self.y - self.mean(self.X).reshape(-1,1) # Nx1
else:
y = self.y # Nx1
if self.is_sparse:
qf_mu, qf_var_diag = self._predict_f(self.X, full=False)
else:
Kff = self.kernel(self.X)
Lff = self._cholesky(Kff, add_jitter=True) # NxN
qf_mu = Lff.mm(self.q_mu())
if self.mean is not None:
qf_mu -= self.mean(self.X).reshape(-1,1) # Sx1
qf_sqrt = Lff.mm(self.q_sqrt().tril())
qf_var_diag = qf_sqrt.mm(qf_sqrt.T).diagonal().reshape(-1,1)
var_exp = self.likelihood.variational_expectation(self.X, y, qf_mu, qf_var_diag)
kl = self.kl_gaussian(self.q_mu(), self.q_sqrt())
return var_exp - kl
def log_marginal_likelihood(self):
# maximize the lower bound
return self.elbo()
def _predict_f(self, X, full=False):
Kuu = self.kernel(self.Z())
Kus = self.kernel(self.Z(),X) # NxS
Luu = self._cholesky(Kuu, add_jitter=True) # NxN
a = torch.linalg.solve_triangular(Luu,Kus,upper=False) # NxS; Kuu^(-1/2).Kus
b = self.q_sqrt().tril().T.mm(torch.linalg.solve_triangular(Luu,Kus,upper=False))
mu = Kus.T.mm(torch.linalg.solve_triangular(Luu.T,self.q_mu(),upper=True)) # Sx1
if full:
Kss = self.kernel(X) # SxS
var = Kss - a.T.mm(a) + b.T.mm(b) # SxS
else:
Kss_diag = self.kernel.K_diag(X) # M
var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1) # M
var = var.reshape(-1,1)
return mu, var
def predict_f(self, X, full=False):
with torch.inference_mode():
X = self._check_input(X) # MxD
mu, var = self._predict_f(X, full=full)
if self.mean is not None:
mu += self.mean(X).reshape(-1,1) # Mx1
return mu, var
class Hensman(SparseHensman):
"""
A Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood. This is equivalent to the `SparseHensman` model if we set the inducing points equal to the data points and by turning off training the inducing points.
Args:
kernel (mogptk.gpr.kernel.Kernel): Kernel.
X (torch.tensor): Input data of shape (data_points,input_dims).
y (torch.tensor): Output data of shape (data_points,).
likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood.
jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean (mogptk.gpr.mean.Mean): Mean.
[1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015
"""
def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0), jitter=1e-8, mean=None):
super().__init__(kernel, X, y, None, 'grid', likelihood, jitter, mean)
Functions
def init_inducing_points(Z, X, method='grid', output_dims=None)
-
Initialize locations for inducing points.
Args
Z
:int,list
- Number of inducing points. A list of ints of shape (output_dims,) will initialize the specified number of inducing points per output dimension.
X
:torch.tensor
- Input data of shape (data_points,input_dims).
method
:str
- Method for initialization, can be
grid
,random
, ordensity
. output_dims
:int
- Number of output dimensions. If not None, the first input dimension of the input data must contain the channel IDs.
Returns
torch.tensor
- Inducing point locations of shape (data_points,input_dims). In case of multiple output dimensions, the first input dimension will be the channel ID.
Expand source code Browse git
def init_inducing_points(Z, X, method='grid', output_dims=None): """ Initialize locations for inducing points. Args: Z (int,list): Number of inducing points. A list of ints of shape (output_dims,) will initialize the specified number of inducing points per output dimension. X (torch.tensor): Input data of shape (data_points,input_dims). method (str): Method for initialization, can be `grid`, `random`, or `density`. output_dims (int): Number of output dimensions. If not None, the first input dimension of the input data must contain the channel IDs. Returns: torch.tensor: Inducing point locations of shape (data_points,input_dims). In case of multiple output dimensions, the first input dimension will be the channel ID. """ _init = _init_grid if method == 'random': _init = _init_random elif method == 'density': _init = _init_density if output_dims is not None: if isinstance(Z, int) or all(isinstance(z, int) for z in Z) and len(Z) == output_dims: if isinstance(Z, int): Z = [Z] * output_dims M = Z Z = torch.zeros((sum(M),X.shape[1])) for j in range(len(M)): m0 = sum(M[:j]) m = M[j] Z[m0:m0+m,0] = j Z[m0:m0+m,1:] = _init(m, X[X[:,0] == j,1:]) elif isinstance(Z, int): M = Z Z = _init(M, X) return Z
Classes
class CholeskyException (message, K, model)
-
Common base class for all non-exit exceptions.
Expand source code Browse git
class CholeskyException(Exception): def __init__(self, message, K, model): self.message = message self.K = K self.model = model def __str__(self): return self.message
Ancestors
- builtins.Exception
- builtins.BaseException
class Exact (kernel, X, y, variance=1.0, data_variance=None, jitter=1e-08, mean=None)
-
Regular Gaussian process regression with a Gaussian likelihood which allows for exact inference.
y \sim \mathcal{N}(0, K + \sigma^2I)
Args
kernel
:mpgptk.gpr.kernel.Kernel
- Kernel.
X
:torch.tensor
- Input data of shape (data_points,input_dims).
y
:torch.tensor
- Output data of shape (data_points,).
variance
:float,torch.tensor
- Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel.
data_variance
:torch.tensor
- Assign different and fixed variances per data point of shape (data_points,). These are added to the kernel's diagonal while still training an additional Gaussian variance.
jitter
:float
- Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean
:Mean
- Mean.
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code Browse git
class Exact(Model): """ Regular Gaussian process regression with a Gaussian likelihood which allows for exact inference. $$ y \\sim \\mathcal{N}(0, K + \\sigma^2I) $$ Args: kernel (mpgptk.gpr.kernel.Kernel): Kernel. X (torch.tensor): Input data of shape (data_points,input_dims). y (torch.tensor): Output data of shape (data_points,). variance (float,torch.tensor): Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel. data_variance (torch.tensor): Assign different and fixed variances per data point of shape (data_points,). These are added to the kernel's diagonal while still training an additional Gaussian variance. jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky. mean (mogptk.gpr.mean.Mean): Mean. """ def __init__(self, kernel, X, y, variance=1.0, data_variance=None, jitter=1e-8, mean=None): if data_variance is not None: data_variance = Parameter.to_tensor(data_variance) if data_variance.ndim != 1 or X.ndim == 2 and data_variance.shape[0] != X.shape[0]: raise ValueError("data variance must have shape (data_points,)") data_variance = data_variance.diagflat() self.data_variance = data_variance variance = Parameter.to_tensor(variance) channels = 1 if kernel.output_dims is not None: channels = kernel.output_dims if 1 < variance.ndim or variance.ndim == 1 and variance.shape[0] != channels: raise ValueError("variance must be float or have shape (channels,)") super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean) self.eye = torch.eye(self.X.shape[0], device=config.device, dtype=config.dtype) self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi) def log_marginal_likelihood(self): Kff = self.kernel.K(self.X) Kff += self._index_channel(self.likelihood.scale().square(), self.X) * self.eye # NxN if self.data_variance is not None: Kff += self.data_variance L = self._cholesky(Kff, add_jitter=True) # NxN if self.mean is not None: y = self.y - self.mean(self.X).reshape(-1,1) # Nx1 else: y = self.y # Nx1 p = -self.log_marginal_likelihood_constant p -= L.diagonal().log().sum() # 0.5 is taken inside the log: L is the square root p -= 0.5*y.T.mm(torch.cholesky_solve(y,L)).squeeze() return p def predict_f(self, X, full=False): with torch.inference_mode(): X = self._check_input(X) # MxD if self.mean is not None: y = self.y - self.mean(self.X).reshape(-1,1) # Nx1 else: y = self.y # Nx1 Kff = self.kernel.K(self.X) Kff += self._index_channel(self.likelihood.scale().square(), self.X) * self.eye # NxN if self.data_variance is not None: Kff += self.data_variance Kfs = self.kernel.K(self.X,X) # NxM Lff = self._cholesky(Kff, add_jitter=True) # NxN v = torch.linalg.solve_triangular(Lff,Kfs,upper=False) # NxM mu = Kfs.T.mm(torch.cholesky_solve(y,Lff)) # Mx1 if self.mean is not None: mu += self.mean(X).reshape(-1,1) # Mx1 if full: Kss = self.kernel.K(X) # MxM var = Kss - v.T.mm(v) # MxM else: Kss_diag = self.kernel.K_diag(X) # M var = Kss_diag - v.T.square().sum(dim=1) # M var = var.reshape(-1,1) return mu, var
Ancestors
- Model
- torch.nn.modules.module.Module
Inherited members
class Hensman (kernel, X, y, likelihood=GaussianLikelihood(), jitter=1e-08, mean=None)
-
A Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood. This is equivalent to the
SparseHensman
model if we set the inducing points equal to the data points and by turning off training the inducing points.Args
kernel
:Kernel
- Kernel.
X
:torch.tensor
- Input data of shape (data_points,input_dims).
y
:torch.tensor
- Output data of shape (data_points,).
likelihood
:Likelihood
- Likelihood.
jitter
:float
- Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean
:Mean
- Mean.
[1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code Browse git
class Hensman(SparseHensman): """ A Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood. This is equivalent to the `SparseHensman` model if we set the inducing points equal to the data points and by turning off training the inducing points. Args: kernel (mogptk.gpr.kernel.Kernel): Kernel. X (torch.tensor): Input data of shape (data_points,input_dims). y (torch.tensor): Output data of shape (data_points,). likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood. jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky. mean (mogptk.gpr.mean.Mean): Mean. [1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015 """ def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0), jitter=1e-8, mean=None): super().__init__(kernel, X, y, None, 'grid', likelihood, jitter, mean)
Ancestors
- SparseHensman
- Model
- torch.nn.modules.module.Module
Inherited members
class Model (kernel, X, y, likelihood=GaussianLikelihood(), jitter=1e-08, mean=None)
-
Base model class.
Attributes
kernel
:Kernel
- Kernel.
likelihood
:Likelihood
- Likelihood.
mean
:Mean
- Mean.
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code Browse git
class Model(torch.nn.Module): """ Base model class. Attributes: kernel (mogptk.gpr.kernel.Kernel): Kernel. likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood. mean (mogptk.gpr.mean.Mean): Mean. """ def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0), jitter=1e-8, mean=None): super().__init__() if not issubclass(type(kernel), Kernel): raise ValueError("kernel must derive from mogptk.gpr.Kernel") X, y = self._check_input(X, y) if mean is not None: if not issubclass(type(mean), Mean): raise ValueError("mean must derive from mogptk.gpr.Mean") mu = mean(X).reshape(-1,1) if mu.shape != y.shape: raise ValueError("mean and y data must match shapes: %s != %s" % (mu.shape, y.shape)) if issubclass(type(likelihood), MultiOutputLikelihood) and likelihood.output_dims != kernel.output_dims: raise ValueError("kernel and likelihood must have matching output dimensions") likelihood.validate_y(X, y) # limit to number of significant digits if config.dtype == torch.float32: jitter = max(jitter, 1e-6) elif config.dtype == torch.float64: jitter = max(jitter, 1e-15) self.kernel = kernel self.X = X self.y = y self.mean = mean self.likelihood = likelihood self.jitter = jitter self.input_dims = X.shape[1] self._compiled_forward = None def name(self): return self.__class__.__name__ def forward(self, x=None): return -self.log_marginal_likelihood() - self.log_prior() def compile(self): if self._compiled_forward is None: self._compiled_forward = torch.jit.trace(self.forward, ()) def __getstate__(self): state = super().__getstate__() state['_modules'] = state['_modules'].copy() state['_modules'].pop('_compiled_forward', None) return state def __setattr__(self, name, val): if hasattr(self, name) and isinstance(getattr(self, name), Parameter): raise AttributeError("parameter is read-only, use Parameter.assign()") if isinstance(val, Parameter) and val._name is None: val._name = '%s.%s' % (self.__class__.__name__, name) elif isinstance(val, torch.nn.ModuleList): for i, item in enumerate(val): for p in item.parameters(): p._name = '%s[%d].%s' % (self.__class__.__name__, i, p._name) super().__setattr__(name, val) def _check_input(self, X, y=None): if not isinstance(X, torch.Tensor): X = torch.tensor(X, device=config.device, dtype=config.dtype) else: X = X.to(config.device, config.dtype) if X.ndim == 0: X = X.reshape(1,1) elif X.ndim == 1: X = X.reshape(-1,1) elif X.ndim != 2: raise ValueError("X must have dimensions (data_points,input_dims) with input_dims optional") if X.shape[0] == 0 or X.shape[1] == 0: raise ValueError("X must not be empty") if y is not None: if not isinstance(y, torch.Tensor): y = torch.tensor(y, device=config.device, dtype=config.dtype) else: y = y.to(config.device, config.dtype) if y.ndim == 0: y = y.reshape(1,1) elif y.ndim == 1: y = y.reshape(-1,1) elif y.ndim != 2 or y.shape[1] != 1: raise ValueError("y must have one dimension (data_points,)") if X.shape[0] != y.shape[0]: raise ValueError("number of data points for X and y must match") return X, y else: # X is for prediction if X.shape[1] != self.input_dims: raise ValueError("X must have %s input dimensions" % self.input_dims) return X def _index_channel(self, value, X): if self.kernel.output_dims is not None and 0 < value.ndim and value.shape[0] == self.kernel.output_dims: return torch.index_select(value, dim=0, index=X[:,0].long()) return value def print_parameters(self, file=None): """ Print parameters and their values. """ def param_range(lower, upper, train=True, pegged=False): if lower is not None: if lower.size == 1: lower = lower.item() elif (lower.max()-lower.min())/lower.mean() < 1e-6: lower = lower.mean().item() else: lower = lower.tolist() if upper is not None: if upper.size == 1: upper = upper.item() elif (upper.max()-upper.min())/upper.mean() < 1e-6: upper = upper.mean().item() else: upper = upper.tolist() if pegged: return "pegged" elif not train: return "fixed" if lower is None and upper is None: return "(-∞, ∞)" elif lower is None: return "(-∞, %s]" % upper elif upper is None: return "[%s, ∞)" % lower return "[%s, %s]" % (lower, upper) if file is None: try: get_ipython # fails if we're not in a notebook table = '<table><tr><th style="text-align:left">Name</th><th>Range</th><th>Value</th></tr>' for p in self.parameters(): table += '<tr><td style="text-align:left">%s</td><td>%s</td><td>%s</td></tr>' % (p._name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy()) table += '</table>' display(HTML(table)) return except Exception as e: pass vals = [["Name", "Range", "Value"]] for p in self.parameters(): vals.append([p._name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy().tolist()]) nameWidth = max([len(val[0]) for val in vals]) rangeWidth = max([len(val[1]) for val in vals]) for val in vals: #print("%-*s %-*s %s" % (nameWidth, val[0], rangeWidth, val[1], val[2]), file=file) print("%-*s %s" % (nameWidth, val[0], val[2]), file=file) def _cholesky(self, K, add_jitter=False): if add_jitter: K = K + (self.jitter * K.diagonal().mean()).repeat(K.shape[0]).diagflat() try: return torch.linalg.cholesky(K) except RuntimeError as e: print("ERROR:", e.args[0], file=sys.__stdout__) if K.isnan().any(): print("ERROR: kernel matrix has NaNs!", file=sys.__stdout__) if K.isinf().any(): print("ERROR: kernel matrix has infinities!", file=sys.__stdout__) self.print_parameters() plot_gram(K) raise CholeskyException(e.args[0], K, self) def log_marginal_likelihood(self): """ Return the log marginal likelihood given by $$ \\log p(y) $$ Returns: torch.tensor: Log marginal likelihood. """ raise NotImplementedError() def log_prior(self): """ Return the log prior given by $$ \\log p(\\theta) $$ Returns: torch.tensor: Log prior. """ return sum([p.log_prior() for p in self.parameters()]) def loss(self): """ Model loss for training. Returns: torch.tensor: Loss. """ self.zero_grad(set_to_none=True) if self._compiled_forward is None: loss = self.forward() else: loss = self._compiled_forward() loss.backward() return loss def K(self, X1, X2=None): """ Evaluate kernel at `X1` and `X2` and return the NumPy representation. Args: X1 (torch.tensor): Input of shape (data_points1,input_dims). X2 (torch.tensor): Input of shape (data_points2,input_dims). Returns: numpy.ndarray: Kernel matrix of shape (data_points1,data_points2). """ with torch.inference_mode(): return self.kernel(X1, X2) def predict_f(self, X, full=False): """ Get the predictive posterior for values of f. Args: X (torch.tensor): Input of shape (data_points,input_dims) needed for multi-output likelihood. full (bool): Return the full variance matrix as opposed to the diagonal. Returns: torch.tensor: Mean of the predictive posterior of shape (data_points,1). torch.tensor: Variance of the predictive posterior of shape (data_points,1) or (data_points,data_points). """ raise NotImplementedError() def predict_y(self, X, ci=None, sigma=None, n=10000): """ Get the predictive posterior for values of y. Args: X (torch.tensor): Input of shape (data_points,input_dims) needed for multi-output likelihood. ci (list of float): Two percentages [lower, upper] in the range of [0,1] that represent the confidence interval. sigma (float): Number of standard deviations of the confidence interval. Only used for short-path for Gaussian likelihood. n (int): Number of samples used from distribution to estimate quantile. Returns: torch.tensor: Mean of the predictive posterior of shape (data_points,1). torch.tensor: Lower confidence boundary of the predictive posterior of shape (data_points,1). torch.tensor: Upper confidence boundary of the predictive posterior of shape (data_points,1). """ with torch.inference_mode(): X = self._check_input(X) # MxD mu, var = self.predict_f(X) if ci is None and sigma is not None: p = 0.5*(1.0 + float(torch.erf(torch.tensor(sigma/np.sqrt(2.0))))) ci = [1.0-p, p] return self.likelihood.predict(X, mu, var, ci, sigma=sigma, n=n) def sample_f(self, Z, n=None, prior=False): """ Sample f values from model. Args: Z (torch.tensor): Input of shape (data_points,input_dims). n (int): Number of samples. prior (boolean): Sample from prior instead of posterior. Returns: torch.tensor: Samples of shape (data_points,n) or (data_points,) if `n` is not given. """ with torch.inference_mode(): Z = self._check_input(Z) # MxD S = n if n is None: S = 1 if prior: mu, var = self.mean(Z), self.kernel(Z) else: mu, var = self.predict_f(Z, full=True) # Mx1, MxM eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype) var += self.jitter * var.diagonal().mean() * eye # MxM samples_f = torch.distributions.multivariate_normal.MultivariateNormal(mu.reshape(-1), var).sample([S]) if n is None: samples_f = samples_f.squeeze() return samples_f def sample_y(self, Z, n=None): """ Sample y values from model. Args: Z (torch.tensor): Input of shape (data_points,input_dims). n (int): Number of samples. Returns: torch.tensor: Samples of shape (data_points,n) or (data_points,) if `n` is not given. """ with torch.inference_mode(): Z = self._check_input(Z) # MxD S = n if n is None: S = 1 samples_f = self.sample_f(Z, n=S) samples_y = self.likelihood.conditional_sample(Z, samples_f) if n is None: samples_y = samples_y.squeeze() return samples_y
Ancestors
- torch.nn.modules.module.Module
Subclasses
Methods
def K(self, X1, X2=None)
-
Evaluate kernel at
X1
andX2
and return the NumPy representation.Args
X1
:torch.tensor
- Input of shape (data_points1,input_dims).
X2
:torch.tensor
- Input of shape (data_points2,input_dims).
Returns
numpy.ndarray
- Kernel matrix of shape (data_points1,data_points2).
Expand source code Browse git
def K(self, X1, X2=None): """ Evaluate kernel at `X1` and `X2` and return the NumPy representation. Args: X1 (torch.tensor): Input of shape (data_points1,input_dims). X2 (torch.tensor): Input of shape (data_points2,input_dims). Returns: numpy.ndarray: Kernel matrix of shape (data_points1,data_points2). """ with torch.inference_mode(): return self.kernel(X1, X2)
def compile(self)
-
Compile this Module's forward using :func:
torch.compile
.This Module's
__call__
method is compiled and all arguments are passed as-is to :func:torch.compile
.See :func:
torch.compile
for details on the arguments for this function.Expand source code Browse git
def compile(self): if self._compiled_forward is None: self._compiled_forward = torch.jit.trace(self.forward, ())
def forward(self, x=None)
-
Defines the computation performed at every call.
Should be overridden by all subclasses.
Note
Although the recipe for forward pass needs to be defined within this function, one should call the :class:
Module
instance afterwards instead of this since the former takes care of running the registered hooks while the latter silently ignores them.Expand source code Browse git
def forward(self, x=None): return -self.log_marginal_likelihood() - self.log_prior()
def log_marginal_likelihood(self)
-
Return the log marginal likelihood given by
\log p(y)
Returns
torch.tensor
- Log marginal likelihood.
Expand source code Browse git
def log_marginal_likelihood(self): """ Return the log marginal likelihood given by $$ \\log p(y) $$ Returns: torch.tensor: Log marginal likelihood. """ raise NotImplementedError()
def log_prior(self)
-
Return the log prior given by
\log p(\theta)
Returns
torch.tensor
- Log prior.
Expand source code Browse git
def log_prior(self): """ Return the log prior given by $$ \\log p(\\theta) $$ Returns: torch.tensor: Log prior. """ return sum([p.log_prior() for p in self.parameters()])
def loss(self)
-
Model loss for training.
Returns
torch.tensor
- Loss.
Expand source code Browse git
def loss(self): """ Model loss for training. Returns: torch.tensor: Loss. """ self.zero_grad(set_to_none=True) if self._compiled_forward is None: loss = self.forward() else: loss = self._compiled_forward() loss.backward() return loss
def name(self)
-
Expand source code Browse git
def name(self): return self.__class__.__name__
def predict_f(self, X, full=False)
-
Get the predictive posterior for values of f.
Args
X
:torch.tensor
- Input of shape (data_points,input_dims) needed for multi-output likelihood.
full
:bool
- Return the full variance matrix as opposed to the diagonal.
Returns
torch.tensor
- Mean of the predictive posterior of shape (data_points,1).
torch.tensor
- Variance of the predictive posterior of shape (data_points,1) or (data_points,data_points).
Expand source code Browse git
def predict_f(self, X, full=False): """ Get the predictive posterior for values of f. Args: X (torch.tensor): Input of shape (data_points,input_dims) needed for multi-output likelihood. full (bool): Return the full variance matrix as opposed to the diagonal. Returns: torch.tensor: Mean of the predictive posterior of shape (data_points,1). torch.tensor: Variance of the predictive posterior of shape (data_points,1) or (data_points,data_points). """ raise NotImplementedError()
def predict_y(self, X, ci=None, sigma=None, n=10000)
-
Get the predictive posterior for values of y.
Args
X
:torch.tensor
- Input of shape (data_points,input_dims) needed for multi-output likelihood.
ci
:list
offloat
- Two percentages [lower, upper] in the range of [0,1] that represent the confidence interval.
sigma
:float
- Number of standard deviations of the confidence interval. Only used for short-path for Gaussian likelihood.
n
:int
- Number of samples used from distribution to estimate quantile.
Returns
torch.tensor
- Mean of the predictive posterior of shape (data_points,1).
torch.tensor
- Lower confidence boundary of the predictive posterior of shape (data_points,1).
torch.tensor
- Upper confidence boundary of the predictive posterior of shape (data_points,1).
Expand source code Browse git
def predict_y(self, X, ci=None, sigma=None, n=10000): """ Get the predictive posterior for values of y. Args: X (torch.tensor): Input of shape (data_points,input_dims) needed for multi-output likelihood. ci (list of float): Two percentages [lower, upper] in the range of [0,1] that represent the confidence interval. sigma (float): Number of standard deviations of the confidence interval. Only used for short-path for Gaussian likelihood. n (int): Number of samples used from distribution to estimate quantile. Returns: torch.tensor: Mean of the predictive posterior of shape (data_points,1). torch.tensor: Lower confidence boundary of the predictive posterior of shape (data_points,1). torch.tensor: Upper confidence boundary of the predictive posterior of shape (data_points,1). """ with torch.inference_mode(): X = self._check_input(X) # MxD mu, var = self.predict_f(X) if ci is None and sigma is not None: p = 0.5*(1.0 + float(torch.erf(torch.tensor(sigma/np.sqrt(2.0))))) ci = [1.0-p, p] return self.likelihood.predict(X, mu, var, ci, sigma=sigma, n=n)
def print_parameters(self, file=None)
-
Print parameters and their values.
Expand source code Browse git
def print_parameters(self, file=None): """ Print parameters and their values. """ def param_range(lower, upper, train=True, pegged=False): if lower is not None: if lower.size == 1: lower = lower.item() elif (lower.max()-lower.min())/lower.mean() < 1e-6: lower = lower.mean().item() else: lower = lower.tolist() if upper is not None: if upper.size == 1: upper = upper.item() elif (upper.max()-upper.min())/upper.mean() < 1e-6: upper = upper.mean().item() else: upper = upper.tolist() if pegged: return "pegged" elif not train: return "fixed" if lower is None and upper is None: return "(-∞, ∞)" elif lower is None: return "(-∞, %s]" % upper elif upper is None: return "[%s, ∞)" % lower return "[%s, %s]" % (lower, upper) if file is None: try: get_ipython # fails if we're not in a notebook table = '<table><tr><th style="text-align:left">Name</th><th>Range</th><th>Value</th></tr>' for p in self.parameters(): table += '<tr><td style="text-align:left">%s</td><td>%s</td><td>%s</td></tr>' % (p._name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy()) table += '</table>' display(HTML(table)) return except Exception as e: pass vals = [["Name", "Range", "Value"]] for p in self.parameters(): vals.append([p._name, param_range(p.lower, p.upper, p.train, p.pegged), p.numpy().tolist()]) nameWidth = max([len(val[0]) for val in vals]) rangeWidth = max([len(val[1]) for val in vals]) for val in vals: #print("%-*s %-*s %s" % (nameWidth, val[0], rangeWidth, val[1], val[2]), file=file) print("%-*s %s" % (nameWidth, val[0], val[2]), file=file)
def sample_f(self, Z, n=None, prior=False)
-
Sample f values from model.
Args
Z
:torch.tensor
- Input of shape (data_points,input_dims).
n
:int
- Number of samples.
prior
:boolean
- Sample from prior instead of posterior.
Returns
torch.tensor
- Samples of shape (data_points,n) or (data_points,) if
n
is not given.
Expand source code Browse git
def sample_f(self, Z, n=None, prior=False): """ Sample f values from model. Args: Z (torch.tensor): Input of shape (data_points,input_dims). n (int): Number of samples. prior (boolean): Sample from prior instead of posterior. Returns: torch.tensor: Samples of shape (data_points,n) or (data_points,) if `n` is not given. """ with torch.inference_mode(): Z = self._check_input(Z) # MxD S = n if n is None: S = 1 if prior: mu, var = self.mean(Z), self.kernel(Z) else: mu, var = self.predict_f(Z, full=True) # Mx1, MxM eye = torch.eye(var.shape[0], device=config.device, dtype=config.dtype) var += self.jitter * var.diagonal().mean() * eye # MxM samples_f = torch.distributions.multivariate_normal.MultivariateNormal(mu.reshape(-1), var).sample([S]) if n is None: samples_f = samples_f.squeeze() return samples_f
def sample_y(self, Z, n=None)
-
Sample y values from model.
Args
Z
:torch.tensor
- Input of shape (data_points,input_dims).
n
:int
- Number of samples.
Returns
torch.tensor
- Samples of shape (data_points,n) or (data_points,) if
n
is not given.
Expand source code Browse git
def sample_y(self, Z, n=None): """ Sample y values from model. Args: Z (torch.tensor): Input of shape (data_points,input_dims). n (int): Number of samples. Returns: torch.tensor: Samples of shape (data_points,n) or (data_points,) if `n` is not given. """ with torch.inference_mode(): Z = self._check_input(Z) # MxD S = n if n is None: S = 1 samples_f = self.sample_f(Z, n=S) samples_y = self.likelihood.conditional_sample(Z, samples_f) if n is None: samples_y = samples_y.squeeze() return samples_y
class OpperArchambeau (kernel, X, y, likelihood=GaussianLikelihood(), jitter=1e-08, mean=None)
-
A Gaussian process regression based on Opper and Archambeau [1] with a non-Gaussian likelihood.
Args
kernel
:Kernel
- Kernel.
X
:torch.tensor
- Input data of shape (data_points,input_dims).
y
:torch.tensor
- Output data of shape (data_points,).
likelihood
:Likelihood
- Likelihood.
jitter
:float
- Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean
:Mean
- Mean.
[1] M. Opper, C. Archambeau, "The Variational Gaussian Approximation Revisited", 2009
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code Browse git
class OpperArchambeau(Model): """ A Gaussian process regression based on Opper and Archambeau [1] with a non-Gaussian likelihood. Args: kernel (mogptk.gpr.kernel.Kernel): Kernel. X (torch.tensor): Input data of shape (data_points,input_dims). y (torch.tensor): Output data of shape (data_points,). likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood. jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky. mean (mogptk.gpr.mean.Mean): Mean. [1] M. Opper, C. Archambeau, "The Variational Gaussian Approximation Revisited", 2009 """ def __init__(self, kernel, X, y, likelihood=GaussianLikelihood(1.0), jitter=1e-8, mean=None): super().__init__(kernel, X, y, likelihood, jitter, mean) n = self.X.shape[0] self.eye = torch.eye(n, device=config.device, dtype=config.dtype) self.q_nu = Parameter(torch.zeros(n,1)) self.q_lambda = Parameter(torch.ones(n,1), lower=config.positive_minimum) self.likelihood = likelihood def elbo(self): if self.mean is not None: y = self.y - self.mean(self.X).reshape(-1,1) # Nx1 else: y = self.y # Nx1 q_nu = self.q_nu() q_lambda = self.q_lambda() Kff = self.kernel(self.X) # NxN L = self._cholesky(q_lambda*q_lambda.T*Kff + self.eye) invL = torch.linalg.solve_triangular(L,self.eye,upper=False) # NxN qf_mu = Kff.mm(q_nu) qf_var_diag = 1.0/q_lambda.square() - (invL.T.mm(invL)/q_lambda/q_lambda.T).diagonal().reshape(-1,1) kl = q_nu.T.mm(qf_mu).squeeze() # Mahalanobis kl += L.diagonal().square().log().sum() # determinant TODO: is this correct? #kl += invL.diagonal().square().sum() # trace kl += invL.square().sum() # trace kl -= q_nu.shape[0] if self.mean is not None: qf_mu = qf_mu - self.mean(self.X).reshape(-1,1) # Sx1 var_exp = self.likelihood.variational_expectation(self.X, y, qf_mu, qf_var_diag) #eye = torch.eye(q_lambda.shape[0], device=config.device, dtype=config.dtype) #qf_var = (1.0/q_lambda.square())*eye - invL.T.mm(invL)/q_lambda/q_lambda.T #kl = -q_nu.shape[0] #kl += q_nu.T.mm(qf_mu).squeeze() # Mahalanobis #kl -= qf_var.det().log() # determinant #kl += invL.diagonal().square().sum() # trace #kl = -q_nu.shape[0] #kl += q_nu.T.mm(q_nu).squeeze() # Mahalanobis #kl -= qf_var.det().log() # determinant #kl += qf_var_diag.sum() # trace return var_exp - 0.5*kl def log_marginal_likelihood(self): # maximize the lower bound return self.elbo() def predict_f(self, X, full=False): with torch.inference_mode(): X = self._check_input(X) # MxD Kff = self.kernel(self.X) Kfs = self.kernel(self.X,X) # NxS L = self._cholesky(Kff + (1.0/self.q_lambda().square()).diagflat()) # NxN a = torch.linalg.solve_triangular(L,Kfs,upper=False) # NxS; Kuu^(-1/2).Kus mu = Kfs.T.mm(self.q_nu()) # Sx1 if self.mean is not None: mu += self.mean(X).reshape(-1,1) # Sx1 if full: Kss = self.kernel(X) # SxS var = Kss - a.T.mm(a) # SxS else: Kss_diag = self.kernel.K_diag(X) # M var = Kss_diag - a.T.square().sum(dim=1) # M var = var.reshape(-1,1) return mu, var
Ancestors
- Model
- torch.nn.modules.module.Module
Methods
def elbo(self)
-
Expand source code Browse git
def elbo(self): if self.mean is not None: y = self.y - self.mean(self.X).reshape(-1,1) # Nx1 else: y = self.y # Nx1 q_nu = self.q_nu() q_lambda = self.q_lambda() Kff = self.kernel(self.X) # NxN L = self._cholesky(q_lambda*q_lambda.T*Kff + self.eye) invL = torch.linalg.solve_triangular(L,self.eye,upper=False) # NxN qf_mu = Kff.mm(q_nu) qf_var_diag = 1.0/q_lambda.square() - (invL.T.mm(invL)/q_lambda/q_lambda.T).diagonal().reshape(-1,1) kl = q_nu.T.mm(qf_mu).squeeze() # Mahalanobis kl += L.diagonal().square().log().sum() # determinant TODO: is this correct? #kl += invL.diagonal().square().sum() # trace kl += invL.square().sum() # trace kl -= q_nu.shape[0] if self.mean is not None: qf_mu = qf_mu - self.mean(self.X).reshape(-1,1) # Sx1 var_exp = self.likelihood.variational_expectation(self.X, y, qf_mu, qf_var_diag) #eye = torch.eye(q_lambda.shape[0], device=config.device, dtype=config.dtype) #qf_var = (1.0/q_lambda.square())*eye - invL.T.mm(invL)/q_lambda/q_lambda.T #kl = -q_nu.shape[0] #kl += q_nu.T.mm(qf_mu).squeeze() # Mahalanobis #kl -= qf_var.det().log() # determinant #kl += invL.diagonal().square().sum() # trace #kl = -q_nu.shape[0] #kl += q_nu.T.mm(q_nu).squeeze() # Mahalanobis #kl -= qf_var.det().log() # determinant #kl += qf_var_diag.sum() # trace return var_exp - 0.5*kl
Inherited members
class Snelson (kernel, X, y, Z=10, Z_init='grid', variance=1.0, jitter=1e-08, mean=None)
-
A sparse Gaussian process regression based on Snelson and Ghahramani [1] with a Gaussian likelihood and inducing points.
Args
kernel
:Kernel
- Kernel.
X
:torch.tensor
- Input data of shape (data_points,input_dims).
y
:torch.tensor
- Output data of shape (data_points,).
Z
:int,torch.tensor
- Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
Z_init
:str
- Method for initialization of inducing points, can be
grid
,random
, ordensity
. variance
:float,torch.tensor
- Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel.
jitter
:float
- Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean
:Mean
- Mean.
[1] E. Snelson, Z. Ghahramani, "Sparse Gaussian Processes using Pseudo-inputs", 2005
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code Browse git
class Snelson(Model): """ A sparse Gaussian process regression based on Snelson and Ghahramani [1] with a Gaussian likelihood and inducing points. Args: kernel (mogptk.gpr.kernel.Kernel): Kernel. X (torch.tensor): Input data of shape (data_points,input_dims). y (torch.tensor): Output data of shape (data_points,). Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points. Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`. variance (float,torch.tensor): Gaussian likelihood initial variance. Passing a float will train a single variance for all channels. Passing a tensor of shape (channels,) will assign and train different variances per multi-output channel. jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky. mean (mogptk.gpr.mean.Mean): Mean. [1] E. Snelson, Z. Ghahramani, "Sparse Gaussian Processes using Pseudo-inputs", 2005 """ def __init__(self, kernel, X, y, Z=10, Z_init='grid', variance=1.0, jitter=1e-8, mean=None): variance = Parameter.to_tensor(variance).squeeze() if 1 < variance.ndim or variance.ndim == 1 and variance.shape[0] != kernel.output_dims: raise ValueError("variance must be float or have shape (channels,)") super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean) Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims) Z = self._check_input(Z) self.eye = torch.eye(Z.shape[0], device=config.device, dtype=config.dtype) self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi) self.Z = Parameter(Z, name="induction_points") if kernel.output_dims is not None: self.Z.num_parameters -= self.Z().shape[0] def log_marginal_likelihood(self): if self.mean is not None: y = self.y - self.mean(self.X).reshape(-1,1) # Nx1 else: y = self.y # Nx1 Kff_diag = self.kernel.K_diag(self.X) # N Kuf = self.kernel.K(self.Z(),self.X) # MxN Kuu = self.kernel.K(self.Z()) # MxM Luu = self._cholesky(Kuu, add_jitter=True) # MxM; Luu = Kuu^(1/2) v = torch.linalg.solve_triangular(Luu,Kuf,upper=False) # MxN; Kuu^(-1/2).Kuf g = Kff_diag - v.T.square().sum(dim=1) + self._index_channel(self.likelihood.scale().square(), self.X) # N; diag(Kff-Qff) + sigma^2.I G = torch.diagflat(1.0/g) # N L = self._cholesky(v.mm(G).mm(v.T) + self.eye) # MxM; (Kuu^(-1/2).Kuf.G.Kfu.Kuu^(-1/2) + I)^(1/2) c = torch.linalg.solve_triangular(L,v.mm(G).mm(y),upper=False) # Mx1; L^(-1).Kuu^(-1/2).Kuf.G.y p = -self.log_marginal_likelihood_constant p -= L.diagonal().log().sum() # 0.5 is taken as the square root of L p -= 0.5*g.log().sum() p -= 0.5*y.T.mm(G).mm(y).squeeze() p += 0.5*c.T.mm(c).squeeze() return p def predict_f(self, X, full=False): with torch.inference_mode(): X = self._check_input(X) # MxD if self.mean is not None: y = self.y - self.mean(self.X).reshape(-1,1) # Nx1 else: y = self.y # Nx1 Kff_diag = self.kernel.K_diag(self.X) # N Kuf = self.kernel.K(self.Z(),self.X) # MxN Kuu = self.kernel.K(self.Z()) # MxM Kus = self.kernel.K(self.Z(),X) # MxS Luu = self._cholesky(Kuu, add_jitter=True) # MxM; Kuu^(1/2) v = torch.linalg.solve_triangular(Luu,Kuf,upper=False) # MxN; Kuu^(-1/2).Kuf g = Kff_diag - v.T.square().sum(dim=1) + self._index_channel(self.likelihood.scale().square(), self.X) G = torch.diagflat(1.0/g) # N L = self._cholesky(v.mm(G).mm(v.T) + self.eye) # MxM; (Kuu^(-1/2).Kuf.G.Kfu.Kuu^(-1/2) + I)^(1/2) a = torch.linalg.solve_triangular(Luu,Kus,upper=False) # NxM b = torch.linalg.solve_triangular(L,a,upper=False) c = torch.linalg.solve_triangular(L,v.mm(G).mm(y),upper=False) # Mx1; L^(-1).Kuu^(-1/2).Kuf.G.y mu = b.T.mm(c) # Mx1 if self.mean is not None: mu += self.mean(X).reshape(-1,1) # Mx1 if full: Kss = self.kernel(X) # MxM var = Kss - a.T.mm(w) + b.T.mm(u) # MxM else: Kss_diag = self.kernel.K_diag(X) # M var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1) # M var = var.reshape(-1,1) return mu, var
Ancestors
- Model
- torch.nn.modules.module.Module
Inherited members
class SparseHensman (kernel, X, y, Z=None, Z_init='grid', likelihood=GaussianLikelihood(), jitter=1e-08, mean=None)
-
A sparse Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood.
Args
kernel
:Kernel
- Kernel.
X
:torch.tensor
- Input data of shape (data_points,input_dims).
y
:torch.tensor
- Output data of shape (data_points,).
Z
:int,torch.tensor
- Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
Z_init
:str
- Method for initialization of inducing points, can be
grid
,random
, ordensity
. likelihood
:Likelihood
- Likelihood.
jitter
:float
- Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean
:Mean
- Mean.
[1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code Browse git
class SparseHensman(Model): """ A sparse Gaussian process regression based on Hensman et al. [1] with a non-Gaussian likelihood. Args: kernel (mogptk.gpr.kernel.Kernel): Kernel. X (torch.tensor): Input data of shape (data_points,input_dims). y (torch.tensor): Output data of shape (data_points,). Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points. Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`. likelihood (mogptk.gpr.likelihood.Likelihood): Likelihood. jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky. mean (mogptk.gpr.mean.Mean): Mean. [1] J. Hensman, et al., "Scalable Variational Gaussian Process Classification", 2015 """ # This version replaces mu_q by L.mu_q and sigma_q by L.sigma_q.L^T, where LL^T = Kuu # So that p(u) ~ N(0,1) and q(u) ~ N(L.mu_q, L.sigma_q.L^T) def __init__(self, kernel, X, y, Z=None, Z_init='grid', likelihood=GaussianLikelihood(1.0), jitter=1e-8, mean=None): super().__init__(kernel, X, y, likelihood, jitter, mean) n = self.X.shape[0] self.is_sparse = Z is not None if self.is_sparse: Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims) Z = self._check_input(Z) n = Z.shape[0] self.eye = torch.eye(n, device=config.device, dtype=config.dtype) self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi) self.q_mu = Parameter(torch.zeros(n,1)) self.q_sqrt = Parameter(torch.eye(n)) self.q_sqrt.num_parameters = int((n*n+n)/2) if self.is_sparse: self.Z = Parameter(Z, name="induction_points") if kernel.output_dims is not None: self.Z.num_parameters -= self.Z().shape[0] else: self.Z = Parameter(self.X, train=False) # don't use inducing points def kl_gaussian(self, q_mu, q_sqrt): S_diag = q_sqrt.diagonal().square() # NxN kl = q_mu.T.mm(q_mu).squeeze() # Mahalanobis kl -= S_diag.log().sum() # determinant of q_var kl += S_diag.sum() # same as Trace(p_var^(-1).q_var) kl -= q_mu.shape[0] return 0.5*kl def elbo(self): if self.mean is not None: y = self.y - self.mean(self.X).reshape(-1,1) # Nx1 else: y = self.y # Nx1 if self.is_sparse: qf_mu, qf_var_diag = self._predict_f(self.X, full=False) else: Kff = self.kernel(self.X) Lff = self._cholesky(Kff, add_jitter=True) # NxN qf_mu = Lff.mm(self.q_mu()) if self.mean is not None: qf_mu -= self.mean(self.X).reshape(-1,1) # Sx1 qf_sqrt = Lff.mm(self.q_sqrt().tril()) qf_var_diag = qf_sqrt.mm(qf_sqrt.T).diagonal().reshape(-1,1) var_exp = self.likelihood.variational_expectation(self.X, y, qf_mu, qf_var_diag) kl = self.kl_gaussian(self.q_mu(), self.q_sqrt()) return var_exp - kl def log_marginal_likelihood(self): # maximize the lower bound return self.elbo() def _predict_f(self, X, full=False): Kuu = self.kernel(self.Z()) Kus = self.kernel(self.Z(),X) # NxS Luu = self._cholesky(Kuu, add_jitter=True) # NxN a = torch.linalg.solve_triangular(Luu,Kus,upper=False) # NxS; Kuu^(-1/2).Kus b = self.q_sqrt().tril().T.mm(torch.linalg.solve_triangular(Luu,Kus,upper=False)) mu = Kus.T.mm(torch.linalg.solve_triangular(Luu.T,self.q_mu(),upper=True)) # Sx1 if full: Kss = self.kernel(X) # SxS var = Kss - a.T.mm(a) + b.T.mm(b) # SxS else: Kss_diag = self.kernel.K_diag(X) # M var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1) # M var = var.reshape(-1,1) return mu, var def predict_f(self, X, full=False): with torch.inference_mode(): X = self._check_input(X) # MxD mu, var = self._predict_f(X, full=full) if self.mean is not None: mu += self.mean(X).reshape(-1,1) # Mx1 return mu, var
Ancestors
- Model
- torch.nn.modules.module.Module
Subclasses
Methods
def elbo(self)
-
Expand source code Browse git
def elbo(self): if self.mean is not None: y = self.y - self.mean(self.X).reshape(-1,1) # Nx1 else: y = self.y # Nx1 if self.is_sparse: qf_mu, qf_var_diag = self._predict_f(self.X, full=False) else: Kff = self.kernel(self.X) Lff = self._cholesky(Kff, add_jitter=True) # NxN qf_mu = Lff.mm(self.q_mu()) if self.mean is not None: qf_mu -= self.mean(self.X).reshape(-1,1) # Sx1 qf_sqrt = Lff.mm(self.q_sqrt().tril()) qf_var_diag = qf_sqrt.mm(qf_sqrt.T).diagonal().reshape(-1,1) var_exp = self.likelihood.variational_expectation(self.X, y, qf_mu, qf_var_diag) kl = self.kl_gaussian(self.q_mu(), self.q_sqrt()) return var_exp - kl
def kl_gaussian(self, q_mu, q_sqrt)
-
Expand source code Browse git
def kl_gaussian(self, q_mu, q_sqrt): S_diag = q_sqrt.diagonal().square() # NxN kl = q_mu.T.mm(q_mu).squeeze() # Mahalanobis kl -= S_diag.log().sum() # determinant of q_var kl += S_diag.sum() # same as Trace(p_var^(-1).q_var) kl -= q_mu.shape[0] return 0.5*kl
Inherited members
class Titsias (kernel, X, y, Z, Z_init='grid', variance=1.0, jitter=1e-08, mean=None)
-
A sparse Gaussian process regression based on Titsias [1] with a Gaussian likelihood.
Args
kernel
:Kernel
- Kernel.
X
:torch.tensor
- Input data of shape (data_points,input_dims).
y
:torch.tensor
- Output data of shape (data_points,).
Z
:int,torch.tensor
- Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points.
Z_init
:str
- Method for initialization of inducing points, can be
grid
,random
, ordensity
. variance
:float
- Gaussian likelihood initial variance.
jitter
:float
- Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky.
mean
:Mean
- Mean.
[1] Titsias, "Variational learning of induced variables in sparse Gaussian processes", 2009
Initializes internal Module state, shared by both nn.Module and ScriptModule.
Expand source code Browse git
class Titsias(Model): """ A sparse Gaussian process regression based on Titsias [1] with a Gaussian likelihood. Args: kernel (mogptk.gpr.kernel.Kernel): Kernel. X (torch.tensor): Input data of shape (data_points,input_dims). y (torch.tensor): Output data of shape (data_points,). Z (int,torch.tensor): Number of inducing points to be distributed over the input space. Passing a tensor of shape (inducing_points,input_dims) sets the initial positions of the inducing points. Z_init (str): Method for initialization of inducing points, can be `grid`, `random`, or `density`. variance (float): Gaussian likelihood initial variance. jitter (float): Relative jitter of the diagonal's mean added to the kernel's diagonal before calculating the Cholesky. mean (mogptk.gpr.mean.Mean): Mean. [1] Titsias, "Variational learning of induced variables in sparse Gaussian processes", 2009 """ # See: http://krasserm.github.io/2020/12/12/gaussian-processes-sparse/ def __init__(self, kernel, X, y, Z, Z_init='grid', variance=1.0, jitter=1e-8, mean=None): # TODO: variance per channel variance = Parameter.to_tensor(variance) super().__init__(kernel, X, y, GaussianLikelihood(torch.sqrt(variance)), jitter, mean) Z = init_inducing_points(Z, self.X, method=Z_init, output_dims=kernel.output_dims) Z = self._check_input(Z) self.eye = torch.eye(Z.shape[0], device=config.device, dtype=config.dtype) self.log_marginal_likelihood_constant = 0.5*self.X.shape[0]*np.log(2.0*np.pi) self.Z = Parameter(Z, name="induction_points") if kernel.output_dims is not None: self.Z.num_parameters -= self.Z().shape[0] def elbo(self): if self.mean is not None: y = self.y - self.mean(self.X).reshape(-1,1) # Nx1 else: y = self.y # Nx1 Kff_diag = self.kernel.K_diag(self.X) # N Kuf = self.kernel(self.Z(),self.X) # MxN Kuu = self.kernel(self.Z()) # MxM Luu = self._cholesky(Kuu, add_jitter=True) # MxM; Kuu^(1/2) v = torch.linalg.solve_triangular(Luu,Kuf,upper=False) # MxN; Kuu^(-1/2).Kuf Q = v.mm(v.T) # MxM; Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) L = self._cholesky(Q/self.likelihood.scale().square() + self.eye) # MxM; (Q/sigma^2 + I)^(1/2) c = torch.linalg.solve_triangular(L,v.mm(y),upper=False)/self.likelihood.scale().square() # Mx1; L^(-1).Kuu^(-1/2).Kuf.y # p = log N(0, Kfu.Kuu^(-1).Kuf + I/sigma^2) - 1/(2.sigma^2).Trace(Kff - Kfu.Kuu^(-1).Kuf) p = -self.log_marginal_likelihood_constant p -= L.diagonal().log().sum() # 0.5 is taken as the square root of L p -= self.X.shape[0]*self.likelihood.scale().log() p -= 0.5*y.T.mm(y).squeeze()/self.likelihood.scale().square() p += 0.5*c.T.mm(c).squeeze() p -= 0.5*(Kff_diag.sum() - Q.trace())/self.likelihood.scale().square() # trace return p def log_marginal_likelihood(self): # maximize the lower bound return self.elbo() def predict_f(self, X, full=False): with torch.inference_mode(): X = self._check_input(X) # MxD if self.mean is not None: y = self.y - self.mean(self.X).reshape(-1,1) # Nx1 else: y = self.y # Nx1 Kus = self.kernel(self.Z(),X) # MxS Kuf = self.kernel(self.Z(),self.X) # MxN Kuu = self.kernel(self.Z()) # MxM Luu = self._cholesky(Kuu, add_jitter=True) # MxM; Kuu^(1/2) v = torch.linalg.solve_triangular(Luu,Kuf,upper=False) # MxN; Kuu^(-1/2).Kuf L = self._cholesky(v.mm(v.T)/self.likelihood.scale().square() + self.eye) # MxM; (Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2)/sigma^2 + I)^(1/2) a = torch.linalg.solve_triangular(Luu,Kus,upper=False) # MxS; Kuu^(-1/2).Kus b = torch.linalg.solve_triangular(L,a,upper=False) # MxS; L^(-1).Kuu^(-1/2).Kus c = torch.linalg.solve_triangular(L,v.mm(y),upper=False)/self.likelihood.scale().square() # Mx1; L^(-1).Kuu^(-1/2).Kuf.y # mu = sigma^(-2).Ksu.Kuu^(-1/2).(sigma^(-2).Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) + I)^(-1).Kuu^(-1/2).Kuf.y mu = b.T.mm(c) # Mx1 if self.mean is not None: mu += self.mean(X).reshape(-1,1) # Mx1 # var = Kss - Qsf.(Qff + sigma^2 I)^(-1).Qfs # below is the equivalent but more stable version by using the matrix inversion lemma # var = Kss - Ksu.Kuu^(-1).Kus + Ksu.Kuu^(-1/2).(sigma^(-2).Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) + I)^(-1).Kuu^(-1/2).Kus if full: Kss = self.kernel(X) # MxM var = Kss - a.T.mm(a) + b.T.mm(b) # MxM else: Kss_diag = self.kernel.K_diag(X) # M var = Kss_diag - a.T.square().sum(dim=1) + b.T.square().sum(dim=1) # M var = var.reshape(-1,1) return mu, var
Ancestors
- Model
- torch.nn.modules.module.Module
Methods
def elbo(self)
-
Expand source code Browse git
def elbo(self): if self.mean is not None: y = self.y - self.mean(self.X).reshape(-1,1) # Nx1 else: y = self.y # Nx1 Kff_diag = self.kernel.K_diag(self.X) # N Kuf = self.kernel(self.Z(),self.X) # MxN Kuu = self.kernel(self.Z()) # MxM Luu = self._cholesky(Kuu, add_jitter=True) # MxM; Kuu^(1/2) v = torch.linalg.solve_triangular(Luu,Kuf,upper=False) # MxN; Kuu^(-1/2).Kuf Q = v.mm(v.T) # MxM; Kuu^(-1/2).Kuf.Kfu.Kuu^(-1/2) L = self._cholesky(Q/self.likelihood.scale().square() + self.eye) # MxM; (Q/sigma^2 + I)^(1/2) c = torch.linalg.solve_triangular(L,v.mm(y),upper=False)/self.likelihood.scale().square() # Mx1; L^(-1).Kuu^(-1/2).Kuf.y # p = log N(0, Kfu.Kuu^(-1).Kuf + I/sigma^2) - 1/(2.sigma^2).Trace(Kff - Kfu.Kuu^(-1).Kuf) p = -self.log_marginal_likelihood_constant p -= L.diagonal().log().sum() # 0.5 is taken as the square root of L p -= self.X.shape[0]*self.likelihood.scale().log() p -= 0.5*y.T.mm(y).squeeze()/self.likelihood.scale().square() p += 0.5*c.T.mm(c).squeeze() p -= 0.5*(Kff_diag.sum() - Q.trace())/self.likelihood.scale().square() # trace return p
Inherited members