Module mogptk.init

Expand source code Browse git
import torch
import numpy as np
from . import gpr

def BNSE(x, y, y_err=None, max_freq=None, n=1000, iters=100, jit=True):
    """
    Bayesian non-parametric spectral estimation [1] is a method for estimating the power spectral density of a signal that uses a Gaussian process with a spectral mixture kernel to learn the spectral representation of the signal. The resulting power spectral density is distributed as a generalized Chi-Squared distributions.

    Args:
        x (numpy.ndarray): Input data of shape (data_points,).
        y (numpy.ndarray): Output data of shape (data_points,).
        y_err (numpy.ndarray): Output std.dev. data of shape (data_points,).
        max_freq (float): Maximum frequency of the power spectral density. If not given the Nyquist frequency is estimated and used instead.
        n (int): Number of points in frequency space to sample the power spectral density.
        iters (int): Number of iterations used to train the Gaussian process.

    Returns:
        numpy.ndarray: Frequencies of shape (n,).
        numpy.ndarray: Power spectral density mean of shape (n,).
        numpy.ndarray: Power spectral density variance of shape (n,).

    [1] F. Tobar, Bayesian nonparametric spectral estimation, Advances in Neural Information Processing Systems, 2018
    """
    x -= np.median(x)
    x_range = np.max(x)-np.min(x)
    x_dist = x_range/len(x)
    if max_freq is None:
        max_freq = 0.5/x_dist

    x = torch.tensor(x, device=gpr.config.device, dtype=gpr.config.dtype)
    if x.ndim == 0:
        x = x.reshape(1,1)
    elif x.ndim == 1:
        x = x.reshape(-1,1)
    y = torch.tensor(y, device=gpr.config.device, dtype=gpr.config.dtype).reshape(-1,1)

    kernel = gpr.SpectralKernel()
    model = gpr.Exact(kernel, x, y, data_variance=y_err**2 if y_err is not None else None)

    # initialize parameters
    magnitude = y.var()
    mean = 0.01
    variance = 0.25 / np.pi**2 / x_dist**2
    noise = y.std()/10.0
    model.kernel.magnitude.assign(magnitude)
    model.kernel.mean.assign(mean, upper=max_freq)
    model.kernel.variance.assign(variance)
    model.likelihood.scale.assign(noise)

    if jit:
        model.compile()

    # train model
    optimizer = torch.optim.Adam(model.parameters(), lr=2.0)
    for i in range(iters):
        optimizer.step(model.loss)

    alpha = float(0.5/x_range**2)
    w = torch.linspace(0.0, max_freq, n, device=gpr.config.device, dtype=gpr.config.dtype).reshape(-1,1)

    def kernel_ff(f1, f2, magnitude, mean, variance, alpha):
        # f1,f2: MxD,  mean,variance: D
        mean = mean.reshape(1,1,-1)
        variance = variance.reshape(1,1,-1)
        gamma = 2.0*np.pi**2*variance
        const = 0.5 * np.pi * magnitude / torch.sqrt(alpha**2 + 2.0*alpha*gamma.prod())
        exp1 = -0.5 * np.pi**2 / alpha * gpr.Kernel.squared_distance(f1,f2)  # MxMxD
        exp2a = -2.0 * np.pi**2 / (alpha+2.0*gamma) * (gpr.Kernel.average(f1,f2)-mean)**2  # MxMxD
        exp2b = -2.0 * np.pi**2 / (alpha+2.0*gamma) * (gpr.Kernel.average(f1,f2)+mean)**2  # MxMxD
        return const * (torch.exp(exp1+exp2a) + torch.exp(exp1+exp2b)).sum(dim=2)

    def kernel_tf(t, f, magnitude, mean, variance, alpha):
        # t: NxD,  f: MxD,  mean,variance: D
        mean = mean.reshape(1,-1)
        variance = variance.reshape(1,-1)
        gamma = 2.0*np.pi**2*variance
        Lq_inv = np.pi**2 * (1.0/alpha + 1.0/gamma)  # 1xD
        Lq_inv = 1.0/Lq_inv  # this line must be kept, is this wrong in the paper?

        const = torch.sqrt(np.pi/(alpha+gamma.prod()))  # 1
        exp1 = -np.pi**2 * torch.tensordot(t**2, Lq_inv.T, dims=1)  # Nx1
        exp2a = -torch.tensordot(np.pi**2/(alpha+gamma), (f-mean).T**2, dims=1)  # 1xM
        exp2b = -torch.tensordot(np.pi**2/(alpha+gamma), (f+mean).T**2, dims=1)  # 1xM
        exp3a = -2.0*np.pi * torch.tensordot(t.mm(Lq_inv), np.pi**2 * (f/alpha + mean/gamma).T, dims=1)  # NxM
        exp3b = -2.0*np.pi * torch.tensordot(t.mm(Lq_inv), np.pi**2 * (f/alpha - mean/gamma).T, dims=1)  # NxM

        a = 0.5 * magnitude * const * torch.exp(exp1)
        real = torch.exp(exp2a)*torch.cos(exp3a) + torch.exp(exp2b)*torch.cos(exp3b)
        imag = torch.exp(exp2a)*torch.sin(exp3a) + torch.exp(exp2b)*torch.sin(exp3b)
        return a * real, a * imag

    with torch.inference_mode():
        Ktt = kernel(x)
        Ktt += model.likelihood.scale().square() * torch.eye(x.shape[0], device=gpr.config.device, dtype=gpr.config.dtype)
        Ltt = model._cholesky(Ktt, add_jitter=True)

        Kff = kernel_ff(w, w, kernel.magnitude(), kernel.mean(), kernel.variance(), alpha)
        Pff = kernel_ff(w, -w, kernel.magnitude(), kernel.mean(), kernel.variance(), alpha)
        Kff_real = 0.5 * (Kff + Pff)
        Kff_imag = 0.5 * (Kff - Pff)

        Ktf_real, Ktf_imag = kernel_tf(x, w, kernel.magnitude(), kernel.mean(), kernel.variance(), alpha)

        a = torch.cholesky_solve(y,Ltt)
        b = torch.linalg.solve_triangular(Ltt,Ktf_real,upper=False)
        c = torch.linalg.solve_triangular(Ltt,Ktf_imag,upper=False)

        mu_real = Ktf_real.T.mm(a)
        mu_imag = Ktf_imag.T.mm(a)
        var_real = Kff_real - b.T.mm(b)
        var_imag = Kff_imag - c.T.mm(c)

        # The PSD equals N(mu_real,var_real)^2 + N(mu_imag,var_imag)^2, which is a generalized Chi-Squared distribution
        var_real = var_real.diagonal().reshape(-1,1)
        var_imag = var_imag.diagonal().reshape(-1,1)
        mu = mu_real**2 + mu_imag**2 + var_real + var_imag
        var = 2.0*var_real**2 + 2.0*var_imag**2 + 4.0*var_real*mu_real**2 + 4.0*var_imag*mu_imag**2

        w = w.cpu().numpy().reshape(-1)
        mu = mu.cpu().numpy().reshape(-1)
        var = var.cpu().numpy().reshape(-1)
    return w, mu, var

Functions

def BNSE(x, y, y_err=None, max_freq=None, n=1000, iters=100, jit=True)

Bayesian non-parametric spectral estimation [1] is a method for estimating the power spectral density of a signal that uses a Gaussian process with a spectral mixture kernel to learn the spectral representation of the signal. The resulting power spectral density is distributed as a generalized Chi-Squared distributions.

Args

x : numpy.ndarray
Input data of shape (data_points,).
y : numpy.ndarray
Output data of shape (data_points,).
y_err : numpy.ndarray
Output std.dev. data of shape (data_points,).
max_freq : float
Maximum frequency of the power spectral density. If not given the Nyquist frequency is estimated and used instead.
n : int
Number of points in frequency space to sample the power spectral density.
iters : int
Number of iterations used to train the Gaussian process.

Returns

numpy.ndarray
Frequencies of shape (n,).
numpy.ndarray
Power spectral density mean of shape (n,).
numpy.ndarray
Power spectral density variance of shape (n,).

[1] F. Tobar, Bayesian nonparametric spectral estimation, Advances in Neural Information Processing Systems, 2018

Expand source code Browse git
def BNSE(x, y, y_err=None, max_freq=None, n=1000, iters=100, jit=True):
    """
    Bayesian non-parametric spectral estimation [1] is a method for estimating the power spectral density of a signal that uses a Gaussian process with a spectral mixture kernel to learn the spectral representation of the signal. The resulting power spectral density is distributed as a generalized Chi-Squared distributions.

    Args:
        x (numpy.ndarray): Input data of shape (data_points,).
        y (numpy.ndarray): Output data of shape (data_points,).
        y_err (numpy.ndarray): Output std.dev. data of shape (data_points,).
        max_freq (float): Maximum frequency of the power spectral density. If not given the Nyquist frequency is estimated and used instead.
        n (int): Number of points in frequency space to sample the power spectral density.
        iters (int): Number of iterations used to train the Gaussian process.

    Returns:
        numpy.ndarray: Frequencies of shape (n,).
        numpy.ndarray: Power spectral density mean of shape (n,).
        numpy.ndarray: Power spectral density variance of shape (n,).

    [1] F. Tobar, Bayesian nonparametric spectral estimation, Advances in Neural Information Processing Systems, 2018
    """
    x -= np.median(x)
    x_range = np.max(x)-np.min(x)
    x_dist = x_range/len(x)
    if max_freq is None:
        max_freq = 0.5/x_dist

    x = torch.tensor(x, device=gpr.config.device, dtype=gpr.config.dtype)
    if x.ndim == 0:
        x = x.reshape(1,1)
    elif x.ndim == 1:
        x = x.reshape(-1,1)
    y = torch.tensor(y, device=gpr.config.device, dtype=gpr.config.dtype).reshape(-1,1)

    kernel = gpr.SpectralKernel()
    model = gpr.Exact(kernel, x, y, data_variance=y_err**2 if y_err is not None else None)

    # initialize parameters
    magnitude = y.var()
    mean = 0.01
    variance = 0.25 / np.pi**2 / x_dist**2
    noise = y.std()/10.0
    model.kernel.magnitude.assign(magnitude)
    model.kernel.mean.assign(mean, upper=max_freq)
    model.kernel.variance.assign(variance)
    model.likelihood.scale.assign(noise)

    if jit:
        model.compile()

    # train model
    optimizer = torch.optim.Adam(model.parameters(), lr=2.0)
    for i in range(iters):
        optimizer.step(model.loss)

    alpha = float(0.5/x_range**2)
    w = torch.linspace(0.0, max_freq, n, device=gpr.config.device, dtype=gpr.config.dtype).reshape(-1,1)

    def kernel_ff(f1, f2, magnitude, mean, variance, alpha):
        # f1,f2: MxD,  mean,variance: D
        mean = mean.reshape(1,1,-1)
        variance = variance.reshape(1,1,-1)
        gamma = 2.0*np.pi**2*variance
        const = 0.5 * np.pi * magnitude / torch.sqrt(alpha**2 + 2.0*alpha*gamma.prod())
        exp1 = -0.5 * np.pi**2 / alpha * gpr.Kernel.squared_distance(f1,f2)  # MxMxD
        exp2a = -2.0 * np.pi**2 / (alpha+2.0*gamma) * (gpr.Kernel.average(f1,f2)-mean)**2  # MxMxD
        exp2b = -2.0 * np.pi**2 / (alpha+2.0*gamma) * (gpr.Kernel.average(f1,f2)+mean)**2  # MxMxD
        return const * (torch.exp(exp1+exp2a) + torch.exp(exp1+exp2b)).sum(dim=2)

    def kernel_tf(t, f, magnitude, mean, variance, alpha):
        # t: NxD,  f: MxD,  mean,variance: D
        mean = mean.reshape(1,-1)
        variance = variance.reshape(1,-1)
        gamma = 2.0*np.pi**2*variance
        Lq_inv = np.pi**2 * (1.0/alpha + 1.0/gamma)  # 1xD
        Lq_inv = 1.0/Lq_inv  # this line must be kept, is this wrong in the paper?

        const = torch.sqrt(np.pi/(alpha+gamma.prod()))  # 1
        exp1 = -np.pi**2 * torch.tensordot(t**2, Lq_inv.T, dims=1)  # Nx1
        exp2a = -torch.tensordot(np.pi**2/(alpha+gamma), (f-mean).T**2, dims=1)  # 1xM
        exp2b = -torch.tensordot(np.pi**2/(alpha+gamma), (f+mean).T**2, dims=1)  # 1xM
        exp3a = -2.0*np.pi * torch.tensordot(t.mm(Lq_inv), np.pi**2 * (f/alpha + mean/gamma).T, dims=1)  # NxM
        exp3b = -2.0*np.pi * torch.tensordot(t.mm(Lq_inv), np.pi**2 * (f/alpha - mean/gamma).T, dims=1)  # NxM

        a = 0.5 * magnitude * const * torch.exp(exp1)
        real = torch.exp(exp2a)*torch.cos(exp3a) + torch.exp(exp2b)*torch.cos(exp3b)
        imag = torch.exp(exp2a)*torch.sin(exp3a) + torch.exp(exp2b)*torch.sin(exp3b)
        return a * real, a * imag

    with torch.inference_mode():
        Ktt = kernel(x)
        Ktt += model.likelihood.scale().square() * torch.eye(x.shape[0], device=gpr.config.device, dtype=gpr.config.dtype)
        Ltt = model._cholesky(Ktt, add_jitter=True)

        Kff = kernel_ff(w, w, kernel.magnitude(), kernel.mean(), kernel.variance(), alpha)
        Pff = kernel_ff(w, -w, kernel.magnitude(), kernel.mean(), kernel.variance(), alpha)
        Kff_real = 0.5 * (Kff + Pff)
        Kff_imag = 0.5 * (Kff - Pff)

        Ktf_real, Ktf_imag = kernel_tf(x, w, kernel.magnitude(), kernel.mean(), kernel.variance(), alpha)

        a = torch.cholesky_solve(y,Ltt)
        b = torch.linalg.solve_triangular(Ltt,Ktf_real,upper=False)
        c = torch.linalg.solve_triangular(Ltt,Ktf_imag,upper=False)

        mu_real = Ktf_real.T.mm(a)
        mu_imag = Ktf_imag.T.mm(a)
        var_real = Kff_real - b.T.mm(b)
        var_imag = Kff_imag - c.T.mm(c)

        # The PSD equals N(mu_real,var_real)^2 + N(mu_imag,var_imag)^2, which is a generalized Chi-Squared distribution
        var_real = var_real.diagonal().reshape(-1,1)
        var_imag = var_imag.diagonal().reshape(-1,1)
        mu = mu_real**2 + mu_imag**2 + var_real + var_imag
        var = 2.0*var_real**2 + 2.0*var_imag**2 + 4.0*var_real*mu_real**2 + 4.0*var_imag*mu_imag**2

        w = w.cpu().numpy().reshape(-1)
        mu = mu.cpu().numpy().reshape(-1)
        var = var.cpu().numpy().reshape(-1)
    return w, mu, var