Source code for pygot.tools.traj.flow

import torch
from tqdm import tqdm
from torchdiffeq import odeint
import numpy as np
from .model_training import ODEwrapper, ODEwrapperNoTime
from anndata.experimental.pytorch import AnnLoader
from functools import partial
from tqdm import tqdm

def _get_minibatch_jacobian(y, x, return_np=True):
    """Computes the Jacobian of y wrt x assuming minibatch-mode.

    Args:
      y: (N, ...) with a total of D_y elements in ...
      x: (N, ...) with a total of D_x elements in ...
    Returns:
      The minibatch Jacobian matrix of shape (N, D_y, D_x)
    """
    assert y.shape[0] == x.shape[0]
    y = y.view(y.shape[0], -1)

    # Compute Jacobian row by row.
    jac = []
    for j in range(y.shape[1]):
        dy_j_dx = torch.autograd.grad(y[:, j], x, torch.ones_like(y[:, j]), retain_graph=True,
                                      create_graph=True)[0].view(x.shape[0], -1)
        jac.append(torch.unsqueeze(dy_j_dx, 1))
    jac = torch.cat(jac, 1)
    if return_np:
        return jac.detach().cpu().numpy()
    else:
        return jac



def latent2gene_velocity_scVI(ad,  embedding_key, velocity_key, vae, batch_size=64):
    use_cuda = vae.device.type != 'cpu'
    
    dataloader = AnnLoader(ad, batch_size=batch_size, shuffle=False, use_cuda=use_cuda)
    velocity_list = []
    
    for batch in tqdm(dataloader):
        z=batch.obsm[embedding_key]
        z.requires_grad = True
        px = vae.module.generative(z=z, library=torch.ones(z.shape[0], 1), batch_index=torch.tensor(batch.obs['_scvi_batch']).long()[:,None])['px']
        
        jac_xz = _get_minibatch_jacobian(px.mean, z, return_np=False)
        with torch.no_grad():
            velocity_batch = torch.matmul(jac_xz, batch.obsm[velocity_key].unsqueeze(-1)).squeeze(-1).cpu().numpy()
            velocity_list.append(velocity_batch)
            
        torch.cuda.empty_cache()
        
    return np.concatenate(velocity_list, axis=0)

[docs] def latent_velocity(adata, odefunc, embedding_key='X_pca', time_key=None): """Latent velocity inference using trained model. Arguments: --------- adata: :class:`~anndata.AnnData` Annotated data matrix. odefunc: class:`ODEwrapper` or class:`ODEwrapperNoTime` trained NeuralODE model by function `fit_velocity_model` embedding_key: `str` Name of latent space, in adata.obsm time_key: `str` (default: None) Name of time label, in adata.obs, use if the model input contains time label Returns ------- latent_velocity (.obsm): :class`np.ndarray` latent velocity array, (n_cells, latent_dim), store in adata.obsm[`velocity_key`] """ if isinstance(odefunc, ODEwrapperNoTime) or isinstance(odefunc, ODEwrapper): odefunc = odefunc.func assert (not odefunc.time_varying) or (odefunc.time_varying and (not time_key is None)), 'please offer `time_key`' xt = torch.Tensor(adata.obsm[embedding_key]) if odefunc.time_varying: t = adata.obs[time_key] t = torch.Tensor(t)[:,None] vt = odefunc(torch.concat([xt, t], dim=-1).to(next(odefunc.parameters()).device)) else: vt = odefunc(xt.to(next(odefunc.parameters()).device)) return vt.detach().cpu().numpy()
[docs] def latent2gene_velocity(adata, velocity_key, embedding_key, A=None, dr_mode='linear', inverse_transform=None, dt=0.001): """Transform latent velocity into gene velocity Due to linearity and orthogonality, the gene velocity can be recover by directly multiply inverse dimension reduction matrix, .. math:: v(x) ≈ Av(z), \quad z = A^Tx For non-linear transformation, .. math:: v(x) ≈ \\frac{g^{-1}(z + v(z) * dt) - g^{-1}(z)}{dt}, \quad z=g(x) Arguments: --------- adata: :class:`~anndata.AnnData` Annotated data matrix. embedding_key: `str` Name of latent space, in adata.obsm velocity_key: `str` (default: None) Name of latent velocity, in adata.obsm A: `np.ndarray` (default: None (using adata.varm['PCs'])) Inverse matrix of linear dimension reduction dr_mode: 'linear' or 'nonlinear' (default: 'linear') Dimension reduction mode inverse_transform: `function` (default: None) Inverse function for non-linear dimension reduction (e.g. :math:`g^{-1}`) dt: `float` (default: 0.001) Parameter of non-linear velocity transformation from latent space to gene space Returns ------- velocity (.layers): :class`np.ndarray` gene velocity array, (n_cells, n_genes) """ if dr_mode not in {"linear", "nonlinear"}: raise ValueError(f"Dimension reduction mode must be 'linear' or 'nonlinear', was '{dr_mode}'.") with torch.no_grad(): v_latent = adata.obsm[velocity_key] if dr_mode == 'linear': v_gene = v_latent @ A[:v_latent.shape[1]] elif dr_mode == 'nonlinear' and inverse_transform is not None: x0 = inverse_transform(adata.obsm[embedding_key]) x1 = inverse_transform(adata.obsm[embedding_key] + v_latent * dt) v_gene = (x1 - x0) / dt else: raise NotImplementedError() return v_gene
[docs] def velocity(adata, odefunc, embedding_key='X_pca', velocity_key=None, A=None, time_key=None, dr_mode='linear', inverse_transform=None, dt=.001): """Velocity inference using trained model. This function will infer velocity in latent space and gene space both. It can be sperate into `latent_velocity` and `latent2gene_velocity` Example: ---------- For linear dimension reduction space:: #using pca space as example #if pca is done in scanpy framework, the inverse matrix A will be store in adata.varm['PCs'], that do not need to specifed matrix A pygot.tl.traj.velocity(adate, model, embedding_key='X_pca', velocity_key='velocity_pca') #Otherwise, need to specify dimension reduction matrix A pygot.tl.traj.velocity(adate, model, embedding_key='X_pca', velocity_key='velocity_pca', A=pca.components_.T) For non-linear dimension reduction space:: #using vae latent space as example #first, train the vae model to transform space gs_vae = pygot.pp.GS_VAE() gs_vae.register_model(adata, latent_dim=10) adata.obsm['X_latent'] = gs_vae.fit_transform(adata) #After train velocity model using `fit_velocity_model` or `fit_velocity_model_without_time` pygot.tl.traj.velocity(adate, model, embedding_key='X_latent', velocity_key='velocity_latent', dr_mode='nonlinear', inverse_transform=gs_vae.inverse_transform ) Arguments: --------- adata: :class:`~anndata.AnnData` Annotated data matrix. odefunc: class:`ODEwrapper` or class:`ODEwrapperNoTime` trained NeuralODE model by function `fit_velocity_model` embedding_key: `str` Name of latent space, in adata.obsm velocity_key: `str` (default: None) Name of latent velocity to save, in adata.obsm A: `np.ndarray` (default: None (using adata.varm['PCs'])) Inverse matrix of linear dimension reduction time_key: `str` (default: None) Name of time label, in adata.obs, use if the model input contains time label dr_mode: 'linear' or 'nonlinear' (default: 'linear') Dimension reduction mode inverse_transform: `function` (default: None) Inverse function for non-linear dimension reduction dt: `float` (default: 0.001) Parameter of non-linear velocity transformation from latent space to gene space Returns ------- velocity (.layers): :class`np.ndarray` gene velocity array, (n_cells, n_genes) latent_velocity (.obsm): :class`np.ndarray` latent velocity array, (n_cells, latent_dim), store in adata.obsm[`velocity_key`] """ if isinstance(odefunc, ODEwrapperNoTime) or isinstance(odefunc, ODEwrapper): odefunc = odefunc.func assert (not odefunc.time_varying) or (odefunc.time_varying and (not time_key is None)), 'please offer `time_key`' if embedding_key == 'X_pca' and dr_mode == 'linear' and A is None: A = adata.varm['PCs'].T if velocity_key is None: velocity_key = 'velocity_'+embedding_key.split('_')[-1] v_latent = latent_velocity(adata, odefunc, embedding_key, time_key) adata.obsm[velocity_key] = v_latent v_gene = latent2gene_velocity(adata, velocity_key,embedding_key, A, dr_mode, inverse_transform, dt) adata.obsm['velocity'] = v_gene return v_gene
[docs] def simulate_trajectory(adata, odefunc, embedding_key, start, end, n_points=100): """Simulate trajecotry using trained NeuralODE model. Arguments: --------- adata: :class:`~anndata.AnnData` Annotated data matrix. odefunc: class:`~ODEwrapper` or class:`~ODEwrapperNoTime` trained NeuralODE model by function `fit_velocity_model` embedding_key: `str' Name of latent space to fit, in adata.obsm start: `int` Start time of simulation end: `int` End time of simulation n_points: `int` Number of points in one trajectory (e.g. 100 points from day0 -> day7) Returns ------- trajectory: :class`~np.ndarray` Trajectory array, (n_points, n_cells, latent_dim) """ latent_embedding = adata.obsm[embedding_key] with torch.no_grad(): return odeint( odefunc, torch.tensor(latent_embedding).float().to(next(odefunc.parameters()).device), torch.tensor(np.linspace(start, end, n_points)).to(next(odefunc.parameters()).device), ).detach().cpu().numpy()
@torch.no_grad() def _inverse_transform_scVI(z, vae, ad, batch_size=1024): use_cuda = vae.device.type != 'cpu' ad.obsm['X_lowd'] = z dataloader = AnnLoader(ad, batch_size=batch_size, shuffle=False, use_cuda=use_cuda) scaled_X = [] for batch in dataloader: z=batch.obsm['X_lowd'] px = vae.module.generative(z=z, library=torch.ones(z.shape[0], 1), batch_index=torch.tensor(batch.obs['_scvi_batch']).long()[:,None])['px'] scaled_X.append(px.mean.cpu().numpy()) return np.concatenate(scaled_X, axis=0) def get_inverse_transform_func_scVI(adata, vae, batch_size=1024): inverse_transform = partial(_inverse_transform_scVI, vae=vae, ad=adata, batch_size=batch_size) return inverse_transform