API Reference

Bases: Simulator

Source code in clode/trajectory.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class TrajectorySimulator(Simulator):
    _time_steps: np.ndarray[Any, np.dtype[np.float64]] | None
    _output_time_steps: np.ndarray[Any, np.dtype[np.float64]] | None
    _output_trajectories: np.ndarray[Any, np.dtype[np.float64]] | None
    _output_aux: np.ndarray[Any, np.dtype[np.float64]] | None
    _data: np.ndarray[Any, np.dtype[np.float64]] | None
    _aux: np.ndarray[Any, np.dtype[np.float64]] | None
    _integrator: TrajectorySimulatorBase

    def __init__(
        self,
        variables: Dict[str, float],
        parameters: Dict[str, float],
        src_file: Optional[str] = None,
        rhs_equation: Optional[OpenCLRhsEquation] = None,
        supplementary_equations: Optional[List[Callable[[Any], Any]]] = None,
        aux: Optional[List[str]] = None,
        num_noise: int = 0,
        t_span: Tuple[float, float] = (0.0, 1000.0),
        stepper: Stepper = Stepper.rk4,
        single_precision: bool = True,
        dt: float = 0.1,
        dtmax: float = 1.0,
        abstol: float = 1e-6,
        reltol: float = 1e-4,
        max_steps: int = 1000000,
        max_store: int = 1000000,
        nout: int = 1,
        device_type: CLDeviceType | None = None,
        vendor: CLVendor | None = None,
        platform_id: int | None = None,
        device_id: int | None = None,
        device_ids: List[int] | None = None,
    ) -> None:
        # We use Google-style docstrings: https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_google.html
        """Initialize a CLODE trajectory object.

        Args:
            src_file (str): The path to the source file to be simulated.  If the file ends with ".xpp", it will be converted to a CLODE source file.
            variable_names (List[str]): The names of the variables to be simulated.
            parameter_names (List[str]): The names of the parameters to be simulated.
            aux (Optional[List[str]], optional): The names of the auxiliary variables to be simulated. Defaults to None.
            num_noise (int, optional): The number of noise variables to be simulated. Defaults to 0.
            tspan (Tuple[float, float], optional): The time span to simulate over. Defaults to (0.0, 1000.0).
            stepper (Stepper, optional): The stepper to use. Defaults to Stepper.rk4.
            single_precision (bool, optional): Whether to use single precision. Defaults to True.
            dt (float, optional): The initial time step. Defaults to 0.1.
            dtmax (float, optional): The maximum time step. Defaults to 1.0.
            abstol (float, optional): The absolute tolerance. Defaults to 1e-6.
            reltol (float, optional): The relative tolerance. Defaults to 1e-3.
            max_steps (int, optional): The maximum number of steps. Defaults to 1000000.
            max_store (int, optional): The maximum number of time steps to store. Defaults to 1000000.
            nout (int, optional): The number of output time steps. Defaults to 1.
            device_type (Optional[CLDeviceType], optional): The type of device to use. Defaults to None.
            vendor (Optional[CLVendor], optional): The vendor of the device to use. Defaults to None.
            platform_id (Optional[int], optional): The platform ID of the device to use. Defaults to None.
            device_id (Optional[int], optional): The device ID of the device to use. Defaults to None.
            device_ids (Optional[List[int]], optional): The device IDs of the devices to use. Defaults to None.

        Raises:
            ValueError: If the source file does not exist.

        Returns (CLODETrajectory): The initialized CLODE trajectory object.
        """

        super().__init__(
            variables=variables,
            parameters=parameters,
            src_file=src_file,
            rhs_equation=rhs_equation,
            supplementary_equations=supplementary_equations,
            aux=aux,
            num_noise=num_noise,
            t_span=t_span,
            stepper=stepper,
            single_precision=single_precision,
            dt=dt,
            dtmax=dtmax,
            abstol=abstol,
            reltol=reltol,
            max_steps=max_steps,
            max_store=max_store,
            nout=nout,
            device_type=device_type,
            vendor=vendor,
            platform_id=platform_id,
            device_id=device_id,
            device_ids=device_ids,
        )

        self._data = None
        self._output_trajectories = None
        self._time_steps = None
        self._output_time_steps = None
        self._output_aux = None
        self._aux = None

    def _build_integrator(self) -> None:
        self._integrator = TrajectorySimulatorBase(
            self._pi,
            self._stepper.value,
            self._single_precision,
            self._runtime,
            _clode_root_dir,
        )

    def trajectory(self, update_x0: bool = True) -> List[TrajectoryResult]:
        """Run a trajectory simulation.

        Returns:
            List[TrajectoryResult]
        """
        if not self.is_initialized:
            raise RuntimeError("Simulator is not initialized")

        self._integrator.trajectory()
        if update_x0:
            self.shift_x0()

        return self.get_trajectory()

    def get_trajectory(self) -> List[TrajectoryResult]:
        """Get the trajectory data.

        Returns:
            np.array: The trajectory data.
        """

        # fetch data from device
        self._n_stored = self._integrator.get_n_stored()
        self._output_time_steps = self._integrator.get_t()
        self._output_trajectories = self._integrator.get_x()

        # time_steps has one column per simulation (to support adaptive steppers)
        shape = (self._ensemble_size, self._max_store)
        arr = np.array(self._output_time_steps[: np.prod(shape)])
        self._time_steps = arr.reshape(shape, order="F").transpose((1, 0))

        data_shape = (self._ensemble_size, len(self.variable_names), self._max_store)
        arr = np.array(self._output_trajectories[: np.prod(data_shape)])
        self._data = arr.reshape(data_shape, order="F").transpose((2, 1, 0))

        # Check for None values
        if self._data is None:
            raise ValueError("Must run trajectory() before getting trajectory data")
        elif self._time_steps is None:
            raise ValueError("Must run trajectory() before getting trajectory data")
        elif self._output_time_steps is None:
            raise ValueError("Must run trajectory() before getting trajectory data")
        elif self._output_trajectories is None:
            raise ValueError("Must run trajectory() before getting trajectory data")
        elif self._n_stored is None:
            raise ValueError("Must run trajectory() before getting trajectory data")

        # list of trajectories, each stored as dict:
        results = list()
        for i in range(self._ensemble_size):
            ni = self._n_stored[i]
            ti = self._time_steps[:ni, i]
            xi = self._data[:ni, :, i]
            result = TrajectoryResult({"t": ti, "x": xi})
            results.append(result)

        return results

    def get_aux(self) -> List[np.ndarray[Any, np.dtype[np.float64]]]:
        """Get the auxiliary data.

        Returns:
            np.array: The auxiliary data.
        """
        _ = self.get_trajectory()
        self._output_aux = self._integrator.get_aux()

        shape = (self._ensemble_size, len(self.aux_variables), self._max_store)
        arr = np.array(self._output_aux[: np.prod(shape)])
        self._aux = arr.reshape(shape, order="F").transpose((2, 1, 0))

        results = list()
        for i in range(self._ensemble_size):
            ni = self._n_stored[i]
            aux_data = self._aux[:ni, :, i]
            results.append(aux_data)
        return results

__init__(variables, parameters, src_file=None, rhs_equation=None, supplementary_equations=None, aux=None, num_noise=0, t_span=(0.0, 1000.0), stepper=Stepper.rk4, single_precision=True, dt=0.1, dtmax=1.0, abstol=1e-06, reltol=0.0001, max_steps=1000000, max_store=1000000, nout=1, device_type=None, vendor=None, platform_id=None, device_id=None, device_ids=None)

Initialize a CLODE trajectory object.

Parameters:
  • src_file (str, default: None ) –

    The path to the source file to be simulated. If the file ends with ".xpp", it will be converted to a CLODE source file.

  • variable_names (List[str]) –

    The names of the variables to be simulated.

  • parameter_names (List[str]) –

    The names of the parameters to be simulated.

  • aux (Optional[List[str]], default: None ) –

    The names of the auxiliary variables to be simulated. Defaults to None.

  • num_noise (int, default: 0 ) –

    The number of noise variables to be simulated. Defaults to 0.

  • tspan (Tuple[float, float]) –

    The time span to simulate over. Defaults to (0.0, 1000.0).

  • stepper (Stepper, default: rk4 ) –

    The stepper to use. Defaults to Stepper.rk4.

  • single_precision (bool, default: True ) –

    Whether to use single precision. Defaults to True.

  • dt (float, default: 0.1 ) –

    The initial time step. Defaults to 0.1.

  • dtmax (float, default: 1.0 ) –

    The maximum time step. Defaults to 1.0.

  • abstol (float, default: 1e-06 ) –

    The absolute tolerance. Defaults to 1e-6.

  • reltol (float, default: 0.0001 ) –

    The relative tolerance. Defaults to 1e-3.

  • max_steps (int, default: 1000000 ) –

    The maximum number of steps. Defaults to 1000000.

  • max_store (int, default: 1000000 ) –

    The maximum number of time steps to store. Defaults to 1000000.

  • nout (int, default: 1 ) –

    The number of output time steps. Defaults to 1.

  • device_type (Optional[CLDeviceType], default: None ) –

    The type of device to use. Defaults to None.

  • vendor (Optional[CLVendor], default: None ) –

    The vendor of the device to use. Defaults to None.

  • platform_id (Optional[int], default: None ) –

    The platform ID of the device to use. Defaults to None.

  • device_id (Optional[int], default: None ) –

    The device ID of the device to use. Defaults to None.

  • device_ids (Optional[List[int]], default: None ) –

    The device IDs of the devices to use. Defaults to None.

Raises:
  • ValueError

    If the source file does not exist.

Returns (CLODETrajectory): The initialized CLODE trajectory object.

Source code in clode/trajectory.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
def __init__(
    self,
    variables: Dict[str, float],
    parameters: Dict[str, float],
    src_file: Optional[str] = None,
    rhs_equation: Optional[OpenCLRhsEquation] = None,
    supplementary_equations: Optional[List[Callable[[Any], Any]]] = None,
    aux: Optional[List[str]] = None,
    num_noise: int = 0,
    t_span: Tuple[float, float] = (0.0, 1000.0),
    stepper: Stepper = Stepper.rk4,
    single_precision: bool = True,
    dt: float = 0.1,
    dtmax: float = 1.0,
    abstol: float = 1e-6,
    reltol: float = 1e-4,
    max_steps: int = 1000000,
    max_store: int = 1000000,
    nout: int = 1,
    device_type: CLDeviceType | None = None,
    vendor: CLVendor | None = None,
    platform_id: int | None = None,
    device_id: int | None = None,
    device_ids: List[int] | None = None,
) -> None:
    # We use Google-style docstrings: https://sphinxcontrib-napoleon.readthedocs.io/en/latest/example_google.html
    """Initialize a CLODE trajectory object.

    Args:
        src_file (str): The path to the source file to be simulated.  If the file ends with ".xpp", it will be converted to a CLODE source file.
        variable_names (List[str]): The names of the variables to be simulated.
        parameter_names (List[str]): The names of the parameters to be simulated.
        aux (Optional[List[str]], optional): The names of the auxiliary variables to be simulated. Defaults to None.
        num_noise (int, optional): The number of noise variables to be simulated. Defaults to 0.
        tspan (Tuple[float, float], optional): The time span to simulate over. Defaults to (0.0, 1000.0).
        stepper (Stepper, optional): The stepper to use. Defaults to Stepper.rk4.
        single_precision (bool, optional): Whether to use single precision. Defaults to True.
        dt (float, optional): The initial time step. Defaults to 0.1.
        dtmax (float, optional): The maximum time step. Defaults to 1.0.
        abstol (float, optional): The absolute tolerance. Defaults to 1e-6.
        reltol (float, optional): The relative tolerance. Defaults to 1e-3.
        max_steps (int, optional): The maximum number of steps. Defaults to 1000000.
        max_store (int, optional): The maximum number of time steps to store. Defaults to 1000000.
        nout (int, optional): The number of output time steps. Defaults to 1.
        device_type (Optional[CLDeviceType], optional): The type of device to use. Defaults to None.
        vendor (Optional[CLVendor], optional): The vendor of the device to use. Defaults to None.
        platform_id (Optional[int], optional): The platform ID of the device to use. Defaults to None.
        device_id (Optional[int], optional): The device ID of the device to use. Defaults to None.
        device_ids (Optional[List[int]], optional): The device IDs of the devices to use. Defaults to None.

    Raises:
        ValueError: If the source file does not exist.

    Returns (CLODETrajectory): The initialized CLODE trajectory object.
    """

    super().__init__(
        variables=variables,
        parameters=parameters,
        src_file=src_file,
        rhs_equation=rhs_equation,
        supplementary_equations=supplementary_equations,
        aux=aux,
        num_noise=num_noise,
        t_span=t_span,
        stepper=stepper,
        single_precision=single_precision,
        dt=dt,
        dtmax=dtmax,
        abstol=abstol,
        reltol=reltol,
        max_steps=max_steps,
        max_store=max_store,
        nout=nout,
        device_type=device_type,
        vendor=vendor,
        platform_id=platform_id,
        device_id=device_id,
        device_ids=device_ids,
    )

    self._data = None
    self._output_trajectories = None
    self._time_steps = None
    self._output_time_steps = None
    self._output_aux = None
    self._aux = None

get_aux()

Get the auxiliary data.

Returns:
  • List[ndarray[Any, dtype[float64]]]

    np.array: The auxiliary data.

Source code in clode/trajectory.py
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def get_aux(self) -> List[np.ndarray[Any, np.dtype[np.float64]]]:
    """Get the auxiliary data.

    Returns:
        np.array: The auxiliary data.
    """
    _ = self.get_trajectory()
    self._output_aux = self._integrator.get_aux()

    shape = (self._ensemble_size, len(self.aux_variables), self._max_store)
    arr = np.array(self._output_aux[: np.prod(shape)])
    self._aux = arr.reshape(shape, order="F").transpose((2, 1, 0))

    results = list()
    for i in range(self._ensemble_size):
        ni = self._n_stored[i]
        aux_data = self._aux[:ni, :, i]
        results.append(aux_data)
    return results

get_trajectory()

Get the trajectory data.

Returns:
  • List[TrajectoryResult]

    np.array: The trajectory data.

Source code in clode/trajectory.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def get_trajectory(self) -> List[TrajectoryResult]:
    """Get the trajectory data.

    Returns:
        np.array: The trajectory data.
    """

    # fetch data from device
    self._n_stored = self._integrator.get_n_stored()
    self._output_time_steps = self._integrator.get_t()
    self._output_trajectories = self._integrator.get_x()

    # time_steps has one column per simulation (to support adaptive steppers)
    shape = (self._ensemble_size, self._max_store)
    arr = np.array(self._output_time_steps[: np.prod(shape)])
    self._time_steps = arr.reshape(shape, order="F").transpose((1, 0))

    data_shape = (self._ensemble_size, len(self.variable_names), self._max_store)
    arr = np.array(self._output_trajectories[: np.prod(data_shape)])
    self._data = arr.reshape(data_shape, order="F").transpose((2, 1, 0))

    # Check for None values
    if self._data is None:
        raise ValueError("Must run trajectory() before getting trajectory data")
    elif self._time_steps is None:
        raise ValueError("Must run trajectory() before getting trajectory data")
    elif self._output_time_steps is None:
        raise ValueError("Must run trajectory() before getting trajectory data")
    elif self._output_trajectories is None:
        raise ValueError("Must run trajectory() before getting trajectory data")
    elif self._n_stored is None:
        raise ValueError("Must run trajectory() before getting trajectory data")

    # list of trajectories, each stored as dict:
    results = list()
    for i in range(self._ensemble_size):
        ni = self._n_stored[i]
        ti = self._time_steps[:ni, i]
        xi = self._data[:ni, :, i]
        result = TrajectoryResult({"t": ti, "x": xi})
        results.append(result)

    return results

trajectory(update_x0=True)

Run a trajectory simulation.

Returns:
  • List[TrajectoryResult]

    List[TrajectoryResult]

Source code in clode/trajectory.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def trajectory(self, update_x0: bool = True) -> List[TrajectoryResult]:
    """Run a trajectory simulation.

    Returns:
        List[TrajectoryResult]
    """
    if not self.is_initialized:
        raise RuntimeError("Simulator is not initialized")

    self._integrator.trajectory()
    if update_x0:
        self.shift_x0()

    return self.get_trajectory()