Examples

Simple Koopman pipeline

"""Example of how to use the Koopman pipeline."""

from matplotlib import pyplot as plt
from sklearn.preprocessing import MaxAbsScaler, StandardScaler

import pykoop

plt.rc('lines', linewidth=2)
plt.rc('axes', grid=True)
plt.rc('grid', linestyle='--')


def example_pipeline_simple() -> None:
    """Demonstrate how to use the Koopman pipeline."""
    # Get example mass-spring-damper data
    eg = pykoop.example_data_msd()

    # Create pipeline
    kp = pykoop.KoopmanPipeline(
        lifting_functions=[
            ('ma', pykoop.SkLearnLiftingFn(MaxAbsScaler())),
            ('pl', pykoop.PolynomialLiftingFn(order=2)),
            ('ss', pykoop.SkLearnLiftingFn(StandardScaler())),
        ],
        regressor=pykoop.Edmd(alpha=1),
    )

    # Fit the pipeline
    kp.fit(
        eg['X_train'],
        n_inputs=eg['n_inputs'],
        episode_feature=eg['episode_feature'],
    )

    # Predict using the pipeline
    X_pred = kp.predict_trajectory(eg['x0_valid'], eg['u_valid'])

    # Score using the pipeline
    score = kp.score(eg['X_valid'])

    # Plot results
    kp.plot_predicted_trajectory(eg['X_valid'], plot_input=True)


if __name__ == '__main__':
    example_pipeline_simple()
    plt.show()

(Source code, png, hires.png, pdf)

_images/1_example_pipeline_simple.png

Van der Pol Oscillator

"""Example of how to use the Koopman pipeline."""

import numpy as np
from matplotlib import pyplot as plt

import pykoop

plt.rc('lines', linewidth=2)
plt.rc('axes', grid=True)
plt.rc('grid', linestyle='--')


def example_pipeline_vdp() -> None:
    """Demonstrate how to use the Koopman pipeline."""
    # Get example Van der Pol data
    eg = pykoop.example_data_vdp()

    # Create pipeline
    kp = pykoop.KoopmanPipeline(
        lifting_functions=[(
            'sp',
            pykoop.SplitPipeline(
                lifting_functions_state=[
                    ('pl', pykoop.PolynomialLiftingFn(order=3))
                ],
                lifting_functions_input=None,
            ),
        )],
        regressor=pykoop.Edmd(),
    )

    # Fit the pipeline
    kp.fit(
        eg['X_train'],
        n_inputs=eg['n_inputs'],
        episode_feature=eg['episode_feature'],
    )

    # Predict with re-lifting between timesteps (default)
    X_pred_local = kp.predict_trajectory(
        eg['x0_valid'],
        eg['u_valid'],
        relift_state=True,
    )

    # Predict without re-lifting between timesteps
    X_pred_global = kp.predict_trajectory(
        eg['x0_valid'],
        eg['u_valid'],
        relift_state=False,
    )

    # Plot trajectories in phase space
    fig, ax = plt.subplots(constrained_layout=True, figsize=(6, 6))
    ax.plot(
        eg['X_valid'][:, 1],
        eg['X_valid'][:, 2],
        label='True trajectory',
    )
    ax.plot(
        X_pred_local[:, 1],
        X_pred_local[:, 2],
        '--',
        label='Local prediction',
    )
    ax.plot(
        X_pred_global[:, 1],
        X_pred_global[:, 2],
        '--',
        label='Global prediction',
    )
    ax.set_xlabel('$x_1[k]$')
    ax.set_ylabel('$x_2[k]$')
    ax.legend()
    ax.set_title('True and predicted phase-space trajectories')

    # Lift validation set
    Psi_valid = kp.lift(eg['X_valid'])

    # Predict lifted state with re-lifting between timesteps (default)
    Psi_pred_local = kp.predict_trajectory(
        eg['x0_valid'],
        eg['u_valid'],
        relift_state=True,
        return_lifted=True,
        return_input=True,
    )

    # Predict lifted state without re-lifting between timesteps
    Psi_pred_global = kp.predict_trajectory(
        eg['x0_valid'],
        eg['u_valid'],
        relift_state=False,
        return_lifted=True,
        return_input=True,
    )

    # Get feature names
    names = kp.get_feature_names_out(format='latex')

    fig, ax = plt.subplots(
        kp.n_states_out_,
        1,
        constrained_layout=True,
        sharex=True,
        squeeze=False,
        figsize=(6, 12),
    )
    for i in range(ax.shape[0]):
        ax[i, 0].plot(
            Psi_valid[:, i + 1],
            label='True trajectory',
        )
        ax[i, 0].plot(
            Psi_pred_local[:, i + 1],
            '--',
            label='Local prediction',
        )
        ax[i, 0].plot(
            Psi_pred_global[:, i + 1],
            '--',
            label='Global prediction',
        )
        ax[i, 0].set_ylabel(rf'$\vartheta_{i + 1} = {names[i + 1]}$')

    ax[-1, 0].set_xlabel('$k$')
    ax[0, 0].set_title('True and predicted lifted states')
    ax[-1, -1].legend(loc='lower right')
    fig.align_ylabels()


if __name__ == '__main__':
    example_pipeline_vdp()
    plt.show()

(Source code)

_images/2_example_pipeline_vdp_00.png

(png, hires.png, pdf)

_images/2_example_pipeline_vdp_01.png

(png, hires.png, pdf)

Cross-validation with scikit-learn

To enable cross-validation, pykoop strives to be fully-compatible with scikit-learn. All of its regressors and lifting functions pass scikit-learn’s estimator checks, with minor exceptions made when necessary.

Regressor parameters and lifting functions can easily be cross-validated using scikit-learn:

"""Example of cross-validating lifting functions and regressor parameters."""

import numpy as np
import sklearn.model_selection
import sklearn.preprocessing
from matplotlib import pyplot as plt

import pykoop
import pykoop.dynamic_models

plt.rc('lines', linewidth=2)
plt.rc('axes', grid=True)
plt.rc('grid', linestyle='--')


def example_pipeline_cv() -> None:
    """Cross-validate regressor parameters."""
    # Get example mass-spring-damper data
    eg = pykoop.example_data_msd()

    # Create pipeline. Don't need to set lifting functions since they'll be set
    # during cross-validation.
    kp = pykoop.KoopmanPipeline(regressor=pykoop.Edmd())

    # Split data episode-by-episode
    episode_feature = eg['X_train'][:, 0]
    cv = sklearn.model_selection.GroupShuffleSplit(
        random_state=1234,
        n_splits=3,
    ).split(eg['X_train'], groups=episode_feature)

    # Define search parameters
    params = {
        # Lifting functions to try
        'lifting_functions': [
            [(
                'ss',
                pykoop.SkLearnLiftingFn(
                    sklearn.preprocessing.StandardScaler()),
            )],
            [
                (
                    'ma',
                    pykoop.SkLearnLiftingFn(
                        sklearn.preprocessing.MaxAbsScaler()),
                ),
                (
                    'pl',
                    pykoop.PolynomialLiftingFn(order=2),
                ),
                (
                    'ss',
                    pykoop.SkLearnLiftingFn(
                        sklearn.preprocessing.StandardScaler()),
                ),
            ],
        ],
        # Regressor parameters to try
        'regressor__alpha': [0, 0.1, 1, 10],
    }

    # Set up grid search
    gs = sklearn.model_selection.GridSearchCV(
        kp,
        params,
        cv=cv,
        # Score using short and long prediction time frames
        scoring={
            f'full_episode': pykoop.KoopmanPipeline.make_scorer(),
            f'ten_steps': pykoop.KoopmanPipeline.make_scorer(n_steps=10),
        },
        # Rank according to short time frame
        refit='ten_steps',
    )

    # Fit the pipeline
    gs.fit(
        eg['X_train'],
        n_inputs=eg['n_inputs'],
        episode_feature=eg['episode_feature'],
    )

    # Get results
    cv_results = gs.cv_results_
    best_estimator = gs.best_estimator_

    # Predict using the pipeline
    X_pred = best_estimator.predict_trajectory(eg['x0_valid'], eg['u_valid'])

    # Score using the pipeline
    score = best_estimator.score(eg['X_valid'])

    # Plot results
    best_estimator.plot_predicted_trajectory(eg['X_valid'], plot_input=True)


if __name__ == '__main__':
    example_pipeline_cv()
    plt.show()

(Source code, png, hires.png, pdf)

_images/3_example_pipeline_cv.png

Asymptotic stability constraint

In this example, three experimental EDMD-based regressors are compared to EDMD. Specifically, EDMD is compared to the asymptotic stability constraint and the H-infinity norm regularizer from [DF22] and [DF21], and the dissipativity constraint from [HIS19].

"""Example comparing eigenvalues of different versions of EDMD."""

import numpy as np
from matplotlib import pyplot as plt
from scipy import integrate, linalg

import pykoop
import pykoop.lmi_regressors

plt.rc('lines', linewidth=2)
plt.rc('axes', grid=True)
plt.rc('grid', linestyle='--')


def example_eigenvalue_comparison() -> None:
    """Compare eigenvalues of different versions of EDMD."""
    # Get example data
    eg = pykoop.example_data_msd()
    # Set solver (you can switch to ``'mosek'`` if you have a license).
    solver = 'cvxopt'

    # Regressor with no constraint
    reg_no_const = pykoop.KoopmanPipeline(regressor=pykoop.Edmd())
    reg_no_const.fit(
        eg['X_train'],
        n_inputs=eg['n_inputs'],
        episode_feature=eg['episode_feature'],
    )
    U_no_const = reg_no_const.regressor_.coef_.T

    # Regressor H-infinity norm constraint of 1.5
    gamma = 1.5
    Xi = np.block([
        [1 / gamma * np.eye(2), np.zeros((2, 1))],
        [np.zeros((1, 2)), -gamma * np.eye(1)],
    ])  # yapf: disable
    reg_diss_const = pykoop.KoopmanPipeline(
        regressor=pykoop.lmi_regressors.LmiEdmdDissipativityConstr(
            supply_rate=Xi,
            solver_params={'solver': solver},
        ))
    reg_diss_const.fit(
        eg['X_train'],
        n_inputs=eg['n_inputs'],
        episode_feature=eg['episode_feature'],
    )
    U_diss_const = reg_diss_const.regressor_.coef_.T

    # Regressor with H-infinity regularization
    reg_hinf_reg = pykoop.KoopmanPipeline(
        regressor=pykoop.lmi_regressors.LmiEdmdHinfReg(
            alpha=1e-2,
            solver_params={'solver': solver},
        ))
    reg_hinf_reg.fit(
        eg['X_train'],
        n_inputs=eg['n_inputs'],
        episode_feature=eg['episode_feature'],
    )
    U_hinf_reg = reg_hinf_reg.regressor_.coef_.T

    # Regressor with spectral radius constraint
    reg_sr_const = pykoop.KoopmanPipeline(
        regressor=pykoop.lmi_regressors.LmiEdmdSpectralRadiusConstr(
            spectral_radius=0.8,
            solver_params={'solver': solver},
        ))
    reg_sr_const.fit(
        eg['X_train'],
        n_inputs=eg['n_inputs'],
        episode_feature=eg['episode_feature'],
    )
    U_sr_const = reg_sr_const.regressor_.coef_.T

    # Predict using the pipeline
    X_pred_no_const = reg_no_const.predict_trajectory(
        eg['x0_valid'],
        eg['u_valid'],
    )
    X_pred_diss_const = reg_diss_const.predict_trajectory(
        eg['x0_valid'],
        eg['u_valid'],
    )
    X_pred_hinf_reg = reg_hinf_reg.predict_trajectory(
        eg['x0_valid'],
        eg['u_valid'],
    )
    X_pred_sr_const = reg_sr_const.predict_trajectory(
        eg['x0_valid'],
        eg['u_valid'],
    )

    # Plot results
    fig, ax = plt.subplots(
        reg_no_const.n_states_in_ + reg_no_const.n_inputs_in_,
        1,
        constrained_layout=True,
        sharex=True,
        figsize=(6, 6),
    )
    # Plot input
    ax[2].plot(eg['t'], eg['X_valid'][:, 3])
    # Plot predicted trajectory
    ax[0].plot(eg['t'], X_pred_no_const[:, 1], label='EDMD')
    ax[1].plot(
        eg['t'],
        X_pred_no_const[:, 2],
    )
    ax[0].plot(
        eg['t'],
        X_pred_diss_const[:, 1],
        '--',
        label='Dissipativity const.',
    )
    ax[1].plot(eg['t'], X_pred_diss_const[:, 2], '--')
    ax[0].plot(
        eg['t'],
        X_pred_hinf_reg[:, 1],
        '--',
        label='H-infinity reg.',
    )
    ax[1].plot(eg['t'], X_pred_hinf_reg[:, 2], '--')
    ax[0].plot(
        eg['t'],
        X_pred_sr_const[:, 1],
        '--',
        label=r'A.S. const. ($\bar{\rho}=0.8$)',
    )
    ax[1].plot(
        eg['t'],
        X_pred_sr_const[:, 2],
        '--',
    )
    # Add labels
    ax[-1].set_xlabel('$t$')
    ax[0].set_ylabel('$x(t)$')
    ax[1].set_ylabel(r'$\dot{x}(t)$')
    ax[2].set_ylabel('$u$')
    ax[0].set_title(f'True and predicted states')
    ax[0].legend(loc='upper right')

    # Plot results
    fig = plt.figure(figsize=(6, 6))
    ax = plt.subplot(projection='polar')
    ax.set_xlabel(r'$\mathrm{Re}(\lambda)$')
    ax.set_ylabel(r'$\mathrm{Im}(\lambda)$', labelpad=30)
    plt_eig(U_no_const[:2, :2], ax, 'EDMD', marker='o')
    plt_eig(U_diss_const[:2, :2], ax, r'Dissipativity const.')
    plt_eig(U_hinf_reg[:2, :2], ax, r'H-infinity reg.')
    plt_eig(
        U_sr_const[:2, :2],
        ax,
        r'A.S. const. ($\bar{\rho}=0.8$)',
    )
    ax.set_rmax(1.1)
    ax.legend(loc='lower left')
    ax.set_title(r'Eigenvalues of $\bf{A}$')


def plt_eig(A: np.ndarray,
            ax: plt.Axes,
            label: str = '',
            marker: str = 'x') -> None:
    """Eigenvalue plotting helper function."""
    eigv = linalg.eig(A)[0]
    ax.scatter(np.angle(eigv), np.absolute(eigv), marker=marker, label=label)


if __name__ == '__main__':
    example_eigenvalue_comparison()
    plt.show()

(Source code)

_images/4_example_eigenvalue_comparison_00.png

(png, hires.png, pdf)

_images/4_example_eigenvalue_comparison_01.png

(png, hires.png, pdf)

Sparse regression

This example shows how to use pykoop.EdmdMeta to implement sparse regression with sklearn.linear_model.Lasso. The lasso promotes empty columns in the Koopman matrix, which means the corresponding lifting functions can be removed from the model.

"""Example of sparse regression with :class:`EdmdMeta`."""

from matplotlib import pyplot as plt
from sklearn.linear_model import Lasso

import pykoop

plt.rc('lines', linewidth=2)
plt.rc('axes', grid=True)
plt.rc('grid', linestyle='--')


def example_sparse_regression() -> None:
    """Demonstrate sparse regression with :class:`EdmdMeta`."""
    # Get example mass-spring-damper data
    eg = pykoop.example_data_msd()

    # Choose lifting functions
    poly3 = [('pl', pykoop.PolynomialLiftingFn(order=3))]

    # Create dense pipeline
    kp_dense = pykoop.KoopmanPipeline(
        lifting_functions=poly3,
        regressor=pykoop.Edmd(alpha=1e-9),
    )
    # Create sparse pipeline
    kp_sparse = pykoop.KoopmanPipeline(
        lifting_functions=poly3,
        # Also try out :class:`sklearn.linear_model.OrthogonalMatchingPursuit`!
        regressor=pykoop.EdmdMeta(regressor=Lasso(alpha=1e-9)),
    )

    # Fit the pipelines
    kp_dense.fit(
        eg['X_train'],
        n_inputs=eg['n_inputs'],
        episode_feature=eg['episode_feature'],
    )
    kp_sparse.fit(
        eg['X_train'],
        n_inputs=eg['n_inputs'],
        episode_feature=eg['episode_feature'],
    )

    # Plot dense prediction and Koopman matrix
    fig, ax = kp_dense.plot_predicted_trajectory(
        eg['X_valid'],
        plot_input=True,
    )
    ax[0, 0].set_title('Dense regression')
    fig, ax = kp_dense.plot_koopman_matrix()
    ax.set_title('Dense regression')

    # Plot sparse prediction and Koopman matrix
    fig, ax = kp_sparse.plot_predicted_trajectory(
        eg['X_valid'],
        plot_input=True,
    )
    ax[0, 0].set_title('Sparse regression')
    fig, ax = kp_sparse.plot_koopman_matrix()
    ax.set_title('Sparse regression')


if __name__ == '__main__':
    example_sparse_regression()
    plt.show()

(Source code)

_images/5_example_sparse_regression_00.png

(png, hires.png, pdf)

_images/5_example_sparse_regression_01.png

(png, hires.png, pdf)

_images/5_example_sparse_regression_02.png

(png, hires.png, pdf)

_images/5_example_sparse_regression_03.png

(png, hires.png, pdf)

Radial basis functions on a pendulum

This example shows how thin-plate radial basis functions can be used as lifting functions to identify pendulum dynamics (where all trajectories have zero initial velocity). Latin hypercube sampling is used to generate 100 centers.

"""Example of radial basis functions on a pendulum."""
import numpy as np
import scipy.stats
from matplotlib import pyplot as plt

import pykoop

plt.rc('lines', linewidth=2)
plt.rc('axes', grid=True)
plt.rc('grid', linestyle='--')


def example_radial_basis_functions() -> None:
    """Demonstrate radial basis functions on a pendulum."""
    # Get example pendulum data
    eg = pykoop.example_data_pendulum()

    # Create pipeline
    kp = pykoop.KoopmanPipeline(
        lifting_functions=[(
            'rbf',
            pykoop.RbfLiftingFn(
                rbf='thin_plate',
                centers=pykoop.QmcCenters(
                    n_centers=100,
                    qmc=scipy.stats.qmc.LatinHypercube,
                    random_state=666,
                ),
            ),
        )],
        regressor=pykoop.Edmd(),
    )

    # Fit the pipeline
    kp.fit(
        eg['X_train'],
        n_inputs=eg['n_inputs'],
        episode_feature=eg['episode_feature'],
    )

    # Get training and validation episodes
    ep_train = np.unique(eg['X_train'][:, 0])
    ep_valid = np.unique(eg['X_valid'][:, 0])

    # Plot training trajectories
    fig, ax = plt.subplots()
    for ep in ep_train:
        idx = eg['X_train'][:, 0] == ep
        cax_line = ax.plot(
            eg['X_train'][idx, 1],
            eg['X_train'][idx, 2],
            color='C0',
        )
    # Plot centers
    c = kp.lifting_functions_[0][1].centers_.centers_
    for i in range(c.shape[0]):
        cax_scatter = ax.scatter(
            c[i, 0],
            c[i, 1],
            color='C1',
            marker='x',
            zorder=2,
        )
    # Set legend
    ax.legend(
        [cax_line[0], cax_scatter],
        [
            'Training traj.',
            'RBF center',
        ],
        loc='upper right',
    )
    # Set labels
    ax.set_title('Training trajectories and RBF centers')
    ax.set_xlabel('$x_1[k]$')
    ax.set_ylabel('$x_2[k]$')
    # Save axis limits for next figure
    xlim = ax.get_xlim()
    ylim = ax.get_ylim()

    # Predict new trajectories
    X_pred = kp.predict_trajectory(
        eg['x0_valid'],
        eg['u_valid'],
    )

    # Plot validation trajectories
    fig, ax = plt.subplots()
    tab20 = plt.cm.tab20(np.arange(0, 1, 0.05))
    color = [
        (tab20[1], tab20[0]),
        (tab20[3], tab20[2]),
        (tab20[5], tab20[4]),
    ]
    for (i, ep) in enumerate(ep_valid):
        idx = eg['X_valid'][:, 0] == ep
        cax_valid = ax.plot(
            eg['X_valid'][idx, 1],
            eg['X_valid'][idx, 2],
            color=color[i][0],
            label=f'True traj. {i}',
        )
        cax_pred = ax.plot(
            X_pred[idx, 1],
            X_pred[idx, 2],
            label=f'Pred. traj. {i}',
        )
    # Set axis limits
    ax.set_xlim(*xlim)
    ax.set_ylim(*ylim)
    # Plot true vector field
    pend = eg['dynamic_model']
    xx, yy = np.meshgrid(np.linspace(*xlim, 25), np.linspace(*ylim, 25))
    x = np.vstack((xx.ravel(), yy.ravel()))
    x_dot = np.array([pend.f(0, x[:, k], 0) for k in range(x.shape[1])]).T
    ax.quiver(x[0, :], x[1, :], x_dot[0, :], x_dot[1, :], color='grey')
    # Set legend
    ax.legend(loc='lower center', ncol=3)
    # Set labels
    ax.set_title('True and predicted trajectories')
    ax.set_xlabel('$x_1[k]$')
    ax.set_ylabel('$x_2[k]$')


if __name__ == '__main__':
    example_radial_basis_functions()
    plt.show()

(Source code)

_images/6_example_rbf_pendulum_00.png

(png, hires.png, pdf)

_images/6_example_rbf_pendulum_01.png

(png, hires.png, pdf)

Random Fourier features on a Duffing oscillator

This example shows how random Fourier features (and randomly binned features) can be used as lifting functions to identify Duffing oscillator dynamics. For more details on how these features are generated, see [RR07].

"""Example of random Fourier features on a Duffing oscillator."""
import numpy as np
import scipy.stats
from matplotlib import pyplot as plt

import pykoop

plt.rc('lines', linewidth=2)
plt.rc('axes', grid=True)
plt.rc('grid', linestyle='--')


def example_random_fourier_features() -> None:
    """Demonstrate random Fourier featuers on a Duffing oscillator."""
    # Get example Duffing oscillator data
    eg = pykoop.example_data_duffing()

    # Create RFF pipeline
    kp_rff = pykoop.KoopmanPipeline(
        lifting_functions=[(
            'rff',
            pykoop.KernelApproxLiftingFn(
                kernel_approx=pykoop.RandomFourierKernelApprox(
                    n_components=100,
                    random_state=1234,
                )),
        )],
        regressor=pykoop.Edmd(),
    )

    # Create random binning pipeline for comparison
    kp_bin = pykoop.KoopmanPipeline(
        lifting_functions=[(
            'bin',
            pykoop.KernelApproxLiftingFn(
                kernel_approx=pykoop.RandomBinningKernelApprox(
                    n_components=10,
                    random_state=1234,
                )),
        )],
        regressor=pykoop.Edmd(),
    )

    for label, kp in [
        ('Random Fourier features', kp_rff),
        ('Randomly binned features', kp_bin),
    ]:
        # Fit the pipeline
        kp.fit(
            eg['X_train'],
            n_inputs=eg['n_inputs'],
            episode_feature=eg['episode_feature'],
        )

        # Get training and validation episodes
        ep_train = np.unique(eg['X_train'][:, 0])
        ep_valid = np.unique(eg['X_valid'][:, 0])

        # Predict new trajectories
        X_pred = kp.predict_trajectory(
            eg['x0_valid'],
            eg['u_valid'],
        )

        # Plot validation trajectories
        fig, ax = plt.subplots()
        tab20 = plt.cm.tab20(np.arange(0, 1, 0.05))
        color = [
            (tab20[1], tab20[0]),
            (tab20[3], tab20[2]),
            (tab20[5], tab20[4]),
        ]
        for (i, ep) in enumerate(ep_valid):
            idx = eg['X_valid'][:, 0] == ep
            cax_valid = ax.plot(
                eg['X_valid'][idx, 1],
                eg['X_valid'][idx, 2],
                color=color[i][0],
                label=f'True traj. {i}',
            )
            cax_pred = ax.plot(
                X_pred[idx, 1],
                X_pred[idx, 2],
                label=f'Pred. traj. {i}',
            )
        # Set legend
        ax.legend(loc='lower center', ncol=3)
        # Set labels
        ax.set_title(f'{label}: true and predicted trajectories')
        ax.set_xlabel('$x_1[k]$')
        ax.set_ylabel('$x_2[k]$')


if __name__ == '__main__':
    example_random_fourier_features()
    plt.show()

(Source code)

_images/7_example_rff_duffing_00.png

(png, hires.png, pdf)

_images/7_example_rff_duffing_01.png

(png, hires.png, pdf)