Feature extraction

...Under Construction...

CLODE can simulate a large number of ODEs simultaneously using OpenCL. To keep memory usage low, CLODE extracts features on-the-fly using configurable observers. These observers are written in OpenCL and can capture properties like the minimum, maximum, or average of a variable.

Advanced observers can also capture local maxima, neighbourhoods, and thresholds.

Observers

CLODE's observers are highly configurable. You can choose the following:

  • basic - Captures one variable
  • basic_all_variables - Captures all variables
  • local_max - Captures local maxima
  • neighbourhood_2
  • threshold_2 - Captures all values above a threshold

Example

The following example extracts features from the Van der Pol oscillator using the dormand_prince45 integrator.

import numpy as np
import matplotlib.pyplot as plt
import clode


# Van der Pol Dormand Prince oscillator
def getRHS(
        t: float,
        var: list[float],
        par: list[float],
        derivatives: list[float],
        aux: list[float],
        wiener: list[float],
) -> None:
    mu: float = par[0]
    x: float = var[0]
    y: float = var[1]

    dx: float = y
    dy: float = mu * (1 - x ** 2) * y - x

    derivatives[0] = dx
    derivatives[1] = dy


def scipy_solve_ivp_wrapper(func, aux=None, wiener=None):
    if aux is None:
        aux = []
    if wiener is None:
        wiener = []

    def wrapper(t, y, *args):
        dydt = np.zeros_like(y)
        func(t, y, args, dydt, aux, wiener)
        return dydt

    return wrapper


wrap = scipy_solve_ivp_wrapper(getRHS)
xx = solve_ivp(
    wrap,
    [0, 1000],
    [1, 1],
    args=(-1, 0, 1),
    atol=1e-10,
    rtol=1e-10,
    mxstep=1000000,
)

# Invoke the wrapper with scipy's odeint

getRHS = scipy_odeint_wrapper(getRHS)

from scipy.integrate import odeint

res = odeint(
    getRHS,
    [1, 1],
    [0, 1000],
    args=([-1], [], []),
    atol=1e-10,
    rtol=1e-10,
    mxstep=1000000,
)

integrator = clode.FeatureSimulator(
    rhs_equation=getRHS,
    variable_names=["x", "y"],
    parameter_names=["mu"],
    observer=clode.Observer.threshold_2,
    stepper=clode.Stepper.dormand_prince,
    tspan=(0.0, 1000.0),
)

parameters = [-1, 0, 0.01, 0.1, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0]

x0 = np.tile([1, 1], (len(parameters), 1))

pars_v = np.array([[par] for par in parameters])
integrator.set_ensemble(x0, pars_v)

integrator.transient()
integrator.features()
observer_output = integrator.get_observer_results()

periods = observer_output.get_var_max("period")
plt.plot(parameters, periods[:, 0])
plt.title("Van der Pol oscillator")
plt.xlabel("mu")
plt.ylabel("period")

plt.show()

New definition

import numpy as np
import matplotlib.pyplot as plt
import clode


# Van der Pol Dormand Prince oscillator
def get_rhs(
        t: float,
        var: list[float],
        mu: float,
        kl: float,
        weiner1: float,
) -> list[float]:
    x: float = var[0]
    y: float = var[1]

    kk: float = weiner1 * kl

    dx: float = y
    dy: float = mu * (1 - x ** 2) * y - x

    aux: float = x + kk

    return [dx, dy]


ivp = clode.IVP(
    rhs=get_rhs,
    variables: dict[str, float] = {"x": 1.0, "y": 1.0},
parameters: dict[str, float] = {"mu": 0.1},
aux: list["str"] = ["aux"],
noise: list["str"] = ["weiner1"]
)


integrator = clode.FeatureSimulator(
    ivp=ivp,
    solver=clode.Solver.dormand_prince,
)

integrator.set_ensemble(
    parameters={"mu": [-1, 0, 0.01, 0.1, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0]},
    variables={"x": [1.0, 2.0], },
)

integrator.transient()
integrator.features()
observer_output = integrator.get_observer_results()

periods = observer_output.get_var_max("period")

plt.plot(parameters, periods[:, 0])
plt.title("Van der Pol oscillator")
plt.xlabel("mu")
plt.ylabel("period")

plt.show()

XPP

init x = 0.1 y = 0.1

par mu = 0.1
y' = mu * (1 - x*x) * y - x
x' = y

@ dt=0.05, total=5000, maxstor=20000000
@ bounds=10000000, xp=t, yp=v
@ xlo=0, xhi=5000, ylo=-75, yhi=0
@ method=Euler

Python

import numpy as np

import clode


def cuberoot(x):
    return x ** (1 / 3.)


def vdp_dormand_prince(end: int, input_file: str):
    tspan = (0.0, 1000.0)

    integrator = clode.CLODEFeatures(
        src_file=input_file,
        variable_names=["x", "y"],
        parameter_names=["mu"],
        num_noise=0,
        observer=clode.Observer.threshold_2,
        stepper=clode.Stepper.dormand_prince,
        tspan=tspan,
    )

    parameters = [-1, 0, 0.01, 0.1, 0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 3.5, 4.0] +
    list(range(5, end))


x0 = np.tile([1, 1], (len(parameters), 1))

pars_v = np.array([[par] for par in parameters])
integrator.set_ensemble(x0, pars_v)

integrator.transient()
integrator.features()
observer_output = integrator.get_observer_results()

return observer_output

vdp_dormand_prince(100, "vdp_oscillator.xpp")
def scipy_solve_ivp_wrapper(func, aux=None, wiener=None):
    if aux is None:
        aux = []
    if wiener is None:
        wiener = []
    def wrapper(t, y, *args):
        dydt = np.zeros_like(y)
        func(t, y, args, dydt, aux, wiener)
        return dydt

    return wrapper

wrap = scipy_solve_ivp_wrapper(getRHS)
xx = solve_ivp(
    wrap,
    [0, 1000],
    [1, 1],
    args=(-1, 0, 1),
    atol=1e-10,
    rtol=1e-10,
    mxstep=1000000,
)

# Invoke the wrapper with scipy's odeint

getRHS = scipy_odeint_wrapper(getRHS)