"""
Spatial archetype analysis functions.
Analyze spatial co-localization and neighborhood enrichment of archetypal
patterns using squidpy. Requires spatial transcriptomics data with tissue
coordinates in ``adata.obsm['spatial']`` and archetype assignments in
``adata.obs['archetypes']``.
Main Functions:
- spatial_neighbors(): Build spatial connectivity graph
- archetype_nhood_enrichment(): Test archetype co-localization via permutation
- archetype_co_occurrence(): Distance-dependent archetype co-occurrence
- archetype_spatial_autocorr(): Moran's I / Geary's C per archetype weight
- archetype_interaction_boundaries(): Detect cross-cell-type gradient fronts
Requires: ``pip install peach[spatial]`` (squidpy >= 1.3.0)
"""
from typing import Any
import numpy as np
from anndata import AnnData
def _check_squidpy():
"""Import squidpy with helpful error on failure."""
try:
import squidpy as sq
return sq
except ImportError:
raise ImportError(
"squidpy is required for spatial analysis. "
"Install it with: pip install peach[spatial] "
"or: pip install squidpy"
)
def _validate_spatial_data(adata: AnnData, spatial_key: str = "spatial", require_archetypes: bool = False):
"""Validate that adata has spatial coordinates and optionally archetype assignments."""
if spatial_key not in adata.obsm:
available = list(adata.obsm.keys())
raise ValueError(
f"No spatial coordinates found at adata.obsm['{spatial_key}']. "
f"Available keys: {available}. "
f"Spatial transcriptomics data must have 2D tissue coordinates."
)
coords = adata.obsm[spatial_key]
if coords.shape[1] < 2:
raise ValueError(f"Spatial coordinates must be at least 2D, got shape {coords.shape}")
if require_archetypes:
if "archetypes" not in adata.obs.columns:
raise ValueError(
"No archetype assignments found in adata.obs['archetypes']. "
"Run pc.tl.assign_archetypes(adata) first."
)
def _ensure_categorical(adata: AnnData, key: str):
"""Ensure an obs column is categorical (required by squidpy)."""
if not hasattr(adata.obs[key], "cat"):
adata.obs[key] = adata.obs[key].astype("category")
[docs]
def spatial_neighbors(
adata: AnnData,
*,
spatial_key: str = "spatial",
n_neighs: int = 30,
coord_type: str = "generic",
**kwargs: Any,
) -> None:
"""Build spatial neighbor graph from tissue coordinates.
Wrapper around ``squidpy.gr.spatial_neighbors()`` with PEACH-appropriate
defaults for bead-based and imaging-based spatial transcriptomics.
Parameters
----------
adata : AnnData
Annotated data with spatial coordinates in ``adata.obsm[spatial_key]``.
spatial_key : str, default: "spatial"
Key in ``adata.obsm`` containing 2D spatial coordinates.
n_neighs : int, default: 30
Number of nearest neighbors for the spatial graph.
coord_type : str, default: "generic"
Coordinate type. Use "generic" for Slide-seq/MERFISH (continuous coords)
or "grid" for Visium (hexagonal grid). See squidpy docs for details.
**kwargs
Additional arguments passed to ``squidpy.gr.spatial_neighbors()``.
Returns
-------
None
Modifies ``adata`` in place:
- ``adata.obsp['spatial_connectivities']``: sparse connectivity matrix
- ``adata.obsp['spatial_distances']``: sparse distance matrix
"""
sq = _check_squidpy()
_validate_spatial_data(adata, spatial_key=spatial_key)
n_cells = adata.n_obs
print(f" Building spatial neighbor graph: {n_cells} cells, {n_neighs} neighbors, coord_type='{coord_type}'")
sq.gr.spatial_neighbors(
adata,
spatial_key=spatial_key,
n_neighs=n_neighs,
coord_type=coord_type,
**kwargs,
)
print(f" Spatial graph stored in adata.obsp['spatial_connectivities'] and adata.obsp['spatial_distances']")
[docs]
def archetype_nhood_enrichment(
adata: AnnData,
*,
cluster_key: str = "archetypes",
n_perms: int = 1000,
seed: int = 42,
**kwargs: Any,
) -> dict[str, np.ndarray]:
"""Test spatial neighborhood enrichment between archetype groups.
For each pair of archetypes, tests whether cells of one archetype are
found more or less frequently in the spatial neighborhood of the other
archetype than expected by chance (permutation test).
Parameters
----------
adata : AnnData
Annotated data with spatial graph (run ``spatial_neighbors`` first)
and archetype assignments in ``adata.obs[cluster_key]``.
cluster_key : str, default: "archetypes"
Column in ``adata.obs`` with archetype labels.
n_perms : int, default: 1000
Number of permutations for significance testing.
seed : int, default: 42
Random seed for permutation reproducibility.
**kwargs
Additional arguments passed to ``squidpy.gr.nhood_enrichment()``.
Returns
-------
dict
Dictionary with 'zscore' and 'count' arrays [n_archetypes x n_archetypes].
Also stored in ``adata.uns['archetype_nhood_enrichment']``.
Examples
--------
>>> pc.tl.spatial_neighbors(adata)
>>> result = pc.tl.archetype_nhood_enrichment(adata)
>>> print(result['zscore']) # positive = enriched, negative = depleted
"""
sq = _check_squidpy()
if "spatial_connectivities" not in adata.obsp:
raise ValueError("No spatial graph found. Run pc.tl.spatial_neighbors(adata) first.")
if cluster_key not in adata.obs.columns:
raise ValueError(
f"'{cluster_key}' not found in adata.obs. "
f"Run pc.tl.assign_archetypes(adata) first."
)
_ensure_categorical(adata, cluster_key)
n_categories = adata.obs[cluster_key].nunique()
print(f" Computing neighborhood enrichment: {n_categories} archetype groups, {n_perms} permutations")
# Default n_jobs=1 to avoid macOS multiprocessing spawn issues.
# Users can override via kwargs if running on Linux or with proper guards.
kwargs.setdefault("n_jobs", 1)
sq.gr.nhood_enrichment(
adata,
cluster_key=cluster_key,
seed=seed,
n_perms=n_perms,
**kwargs,
)
# squidpy stores results in adata.uns[f'{cluster_key}_nhood_enrichment']
squidpy_key = f"{cluster_key}_nhood_enrichment"
result = adata.uns.get(squidpy_key, {})
# Also store under our standardized key
adata.uns["archetype_nhood_enrichment"] = result
if "zscore" in result:
zscore = result["zscore"]
print(f" Enrichment z-scores range: [{zscore.min():.2f}, {zscore.max():.2f}]")
return result
[docs]
def archetype_co_occurrence(
adata: AnnData,
*,
cluster_key: str = "archetypes",
spatial_key: str = "spatial",
interval: int = 50,
**kwargs: Any,
) -> dict[str, np.ndarray]:
"""Compute distance-dependent co-occurrence of archetype groups.
Measures how the co-occurrence ratio between archetype pairs varies
with spatial distance. Useful for identifying distance-dependent
spatial relationships (e.g., archetypes that co-occur at short but
not long distances).
Parameters
----------
adata : AnnData
Annotated data with spatial coordinates and archetype assignments.
cluster_key : str, default: "archetypes"
Column in ``adata.obs`` with archetype labels.
spatial_key : str, default: "spatial"
Key in ``adata.obsm`` with spatial coordinates.
interval : int, default: 50
Number of distance intervals to evaluate.
**kwargs
Additional arguments passed to ``squidpy.gr.co_occurrence()``.
Returns
-------
dict
Dictionary with 'occ' (co-occurrence ratios) and 'interval' (distance bins).
Also stored in ``adata.uns['archetype_co_occurrence']``.
Examples
--------
>>> pc.tl.spatial_neighbors(adata)
>>> result = pc.tl.archetype_co_occurrence(adata)
>>> print(result['occ'].shape) # [n_archetypes, n_archetypes, n_intervals]
"""
sq = _check_squidpy()
_validate_spatial_data(adata, spatial_key=spatial_key)
if cluster_key not in adata.obs.columns:
raise ValueError(
f"'{cluster_key}' not found in adata.obs. "
f"Run pc.tl.assign_archetypes(adata) first."
)
_ensure_categorical(adata, cluster_key)
n_categories = adata.obs[cluster_key].nunique()
print(f" Computing co-occurrence: {n_categories} archetype groups, {interval} distance intervals")
# Default n_jobs=1 to avoid macOS multiprocessing spawn issues.
kwargs.setdefault("n_jobs", 1)
sq.gr.co_occurrence(
adata,
cluster_key=cluster_key,
spatial_key=spatial_key,
interval=interval,
**kwargs,
)
# squidpy stores results in adata.uns[f'{cluster_key}_co_occurrence']
squidpy_key = f"{cluster_key}_co_occurrence"
result = adata.uns.get(squidpy_key, {})
# Also store under our standardized key
adata.uns["archetype_co_occurrence"] = result
if "occ" in result:
print(f" Co-occurrence computed: {result['occ'].shape}")
return result
[docs]
def archetype_spatial_autocorr(
adata: AnnData,
*,
weights_key: str = "cell_archetype_weights",
mode: str = "moran",
n_perms: int = 100,
n_jobs: int = 1,
**kwargs: Any,
):
"""Compute spatial autocorrelation (Moran's I or Geary's C) per archetype weight.
Tests whether each archetype weight is spatially smooth (positive
autocorrelation), spatially random, or spatially checkered (negative
autocorrelation). Moran's I > 0 indicates spatial clustering of that
archetype; I ~ 0 indicates random distribution; I < 0 indicates
dispersal.
Parameters
----------
adata : AnnData
Annotated data with spatial graph (run ``spatial_neighbors`` first)
and archetype weights in ``adata.obsm[weights_key]``.
weights_key : str, default: "archetype_weights"
Key in ``adata.obsm`` with archetype weight matrix [n_cells, n_archetypes].
mode : str, default: "moran"
Autocorrelation statistic: "moran" (Moran's I) or "geary" (Geary's C).
n_perms : int, default: 100
Number of permutations for p-value estimation. Set to None for
analytical p-values only.
n_jobs : int, default: 1
Number of parallel jobs. Default 1 for macOS compatibility.
**kwargs
Additional arguments passed to ``squidpy.gr.spatial_autocorr()``.
Returns
-------
pandas.DataFrame
DataFrame with autocorrelation statistics per archetype weight.
Also stored in ``adata.uns['archetype_spatial_autocorr']``.
Examples
--------
>>> pc.tl.spatial_neighbors(adata)
>>> autocorr = pc.tl.archetype_spatial_autocorr(adata)
>>> print(autocorr) # Moran's I per archetype
"""
import pandas as pd
sq = _check_squidpy()
if "spatial_connectivities" not in adata.obsp:
raise ValueError("No spatial graph found. Run pc.tl.spatial_neighbors(adata) first.")
if weights_key not in adata.obsm:
raise ValueError(
f"No archetype weights found at adata.obsm['{weights_key}']. "
f"Run pc.tl.extract_archetype_weights(adata) first."
)
W = adata.obsm[weights_key]
n_arch = W.shape[1]
print(f" Computing spatial autocorrelation ({mode}): {n_arch} archetype weights")
# Temporarily store archetype weights as obs columns for squidpy
arch_cols = []
for i in range(n_arch):
col = f"_arch_weight_{i}"
adata.obs[col] = W[:, i]
arch_cols.append(col)
try:
sq.gr.spatial_autocorr(
adata,
mode=mode,
attr="obs",
genes=arch_cols,
n_perms=n_perms,
n_jobs=n_jobs,
**kwargs,
)
# squidpy stores results in adata.uns['moranI'] or adata.uns['gearyC']
result_key = "moranI" if mode == "moran" else "gearyC"
if result_key in adata.uns:
result_df = adata.uns[result_key].loc[arch_cols].copy()
# Rename index from _arch_weight_0 to archetype_0
result_df.index = [f"archetype_{i}" for i in range(n_arch)]
adata.uns["archetype_spatial_autocorr"] = result_df
stat_col = "I" if mode == "moran" else "C"
print(f"\n Spatial autocorrelation results ({mode}):")
for idx, row in result_df.iterrows():
val = row[stat_col]
pval = row.get("pval_norm", row.get("pval_z_sim", np.nan))
sig = "*" if pval < 0.05 else " "
print(f" {idx:20s}: {stat_col}={val:.4f} p={pval:.2e} {sig}")
return result_df
finally:
# Clean up temporary columns
adata.obs.drop(columns=arch_cols, inplace=True, errors="ignore")
[docs]
def archetype_interaction_boundaries(
adata: AnnData,
*,
cell_type_col: str = "Cell_Type",
weights_key: str = "cell_archetype_weights",
cell_type_a: str | None = None,
cell_type_b: str | None = None,
) -> dict:
"""Detect spatial fronts where archetype weight mixtures diverge between cell types.
Uses the existing spatial neighbor graph (from ``pc.tl.spatial_neighbors()``)
and continuous archetype weights (from ``pc.tl.extract_archetype_weights()``)
to identify tissue regions where two cell types run different archetype
programs in the same neighborhood.
For each cell, computes the **mean archetype weight vector** of its
spatial neighbors that belong to cell type A and cell type B separately,
then measures divergence (Jensen-Shannon) between these two distributions.
High-scoring cells sit at spatial fronts where the local archetype
program of one cell type is shifting in a different direction from the
other — e.g., macrophages trending toward inflammatory archetypes while
neighboring fibroblasts trend toward fibrotic archetypes.
Parameters
----------
adata : AnnData
Annotated data with:
- Spatial neighbor graph in ``adata.obsp['spatial_connectivities']``
(from ``pc.tl.spatial_neighbors()``)
- Archetype weights in ``adata.obsm[weights_key]``
(from ``pc.tl.extract_archetype_weights()``)
- Cell type labels in ``adata.obs[cell_type_col]``
cell_type_col : str, default: "Cell_Type"
Column in ``adata.obs`` with cell type labels.
weights_key : str, default: "archetype_weights"
Key in ``adata.obsm`` with archetype weight matrix
[n_cells, n_archetypes]. Each row sums to 1.
cell_type_a : str | None, default: None
First cell type for pairwise comparison. If None, uses the
two most abundant cell types.
cell_type_b : str | None, default: None
Second cell type.
Returns
-------
dict
Dictionary with:
- ``'boundary_scores'``: np.ndarray [n_cells] — per-cell JSD boundary score
- ``'mean_weights_a'``: np.ndarray [n_cells, n_archetypes] — mean weight
vector of type-A neighbors per cell
- ``'mean_weights_b'``: np.ndarray [n_cells, n_archetypes] — mean weight
vector of type-B neighbors per cell
- ``'cross_correlations'``: pd.DataFrame — per-archetype Spearman r
between cell types across space
- ``'cell_type_a'``, ``'cell_type_b'``: str — cell type names
- ``'n_archetypes'``: int — number of archetype dimensions
Also stored in ``adata.uns['archetype_interaction_boundaries']``
and boundary scores in ``adata.obs['boundary_score']``.
Examples
--------
>>> pc.tl.spatial_neighbors(adata, n_neighs=30)
>>> weights = pc.tl.extract_archetype_weights(adata)
>>> result = pc.tl.archetype_interaction_boundaries(
... adata, cell_type_col="Cell_Type",
... cell_type_a="Myeloid", cell_type_b="Treg",
... )
>>> pc.pl.interaction_boundaries(adata)
"""
import pandas as pd
import scipy.sparse as sp
# --- validate inputs ---
if "spatial_connectivities" not in adata.obsp:
raise ValueError(
"No spatial neighbor graph found. "
"Run pc.tl.spatial_neighbors(adata) first."
)
if weights_key not in adata.obsm:
raise ValueError(
f"No archetype weights at adata.obsm['{weights_key}']. "
f"Run pc.tl.extract_archetype_weights(adata) first."
)
if cell_type_col not in adata.obs.columns:
raise ValueError(f"'{cell_type_col}' not found in adata.obs.")
W = adata.obsm[weights_key] # [n_cells, n_arch], rows sum to 1
S = adata.obsp["spatial_connectivities"] # [n_cells, n_cells], sparse
n_cells, n_arch = W.shape
# --- pick cell types ---
if cell_type_a is None or cell_type_b is None:
top2 = adata.obs[cell_type_col].value_counts().head(2).index.tolist()
cell_type_a = cell_type_a or top2[0]
cell_type_b = cell_type_b or top2[1]
mask_a = (adata.obs[cell_type_col] == cell_type_a).values
mask_b = (adata.obs[cell_type_col] == cell_type_b).values
n_a, n_b = mask_a.sum(), mask_b.sum()
if n_a == 0 or n_b == 0:
raise ValueError(
f"Cell types not found: {cell_type_a} ({n_a}), {cell_type_b} ({n_b})"
)
print(f" Interaction boundaries: {cell_type_a} ({n_a}) vs {cell_type_b} ({n_b})")
print(f" Using {n_arch} archetype weight dimensions, {S.nnz // n_cells} avg neighbors")
# --- vectorized mean-weight computation ---
# S_a[i,j] = S[i,j] if cell j is type A, else 0
# mean_w_a[i] = mean archetype weight vector of cell i's type-A neighbors
diag_a = sp.diags(mask_a.astype(float))
diag_b = sp.diags(mask_b.astype(float))
S_a = S @ diag_a # connectivity masked to type-A columns
S_b = S @ diag_b
count_a = np.asarray(S_a.sum(axis=1)).ravel() # n type-A neighbors per cell
count_b = np.asarray(S_b.sum(axis=1)).ravel()
# Sum of neighbor weights, then divide by count for mean
sum_w_a = np.asarray(S_a @ W) # dense [n_cells, n_arch]
sum_w_b = np.asarray(S_b @ W)
safe_a = np.where(count_a > 0, count_a, 1.0)
safe_b = np.where(count_b > 0, count_b, 1.0)
mean_w_a = sum_w_a / safe_a[:, np.newaxis]
mean_w_b = sum_w_b / safe_b[:, np.newaxis]
# Zero out where no neighbors of that type exist
mean_w_a[count_a == 0] = 0.0
mean_w_b[count_b == 0] = 0.0
# --- boundary score: Jensen-Shannon divergence ---
# JSD(P, Q) = sqrt(0.5 * KL(P||M) + 0.5 * KL(Q||M)), M = (P+Q)/2
# Vectorized over all cells at once
both_present = (count_a > 0) & (count_b > 0)
boundary_scores = np.zeros(n_cells)
if both_present.any():
P = mean_w_a[both_present]
Q = mean_w_b[both_present]
M = 0.5 * (P + Q)
eps = 1e-12
kl_pm = np.sum(np.where(P > eps, P * np.log(P / np.maximum(M, eps)), 0.0), axis=1)
kl_qm = np.sum(np.where(Q > eps, Q * np.log(Q / np.maximum(M, eps)), 0.0), axis=1)
jsd = np.sqrt(np.maximum(0.5 * kl_pm + 0.5 * kl_qm, 0.0))
boundary_scores[both_present] = jsd
adata.obs["boundary_score"] = boundary_scores
# --- cross-correlations: per archetype dimension ---
n_both = both_present.sum()
cross_corr_data = []
if n_both > 50:
from scipy.stats import spearmanr
for k in range(n_arch):
r, p = spearmanr(mean_w_a[both_present, k], mean_w_b[both_present, k])
cross_corr_data.append({
"archetype": f"archetype_{k}",
"spearman_r": r,
"pvalue": p,
"n_cells": int(n_both),
"mean_weight_a": float(mean_w_a[both_present, k].mean()),
"mean_weight_b": float(mean_w_b[both_present, k].mean()),
})
cross_corr_df = pd.DataFrame(cross_corr_data)
# --- report ---
print(f" Cells with both types in neighborhood: {n_both}")
bs_active = boundary_scores[both_present]
if len(bs_active) > 0:
print(f" Boundary score: mean={bs_active.mean():.4f}, "
f"median={np.median(bs_active):.4f}, max={bs_active.max():.4f}")
if len(cross_corr_data) > 0:
print(f"\n Per-archetype cross-correlations ({cell_type_a} vs {cell_type_b}):")
for _, row in cross_corr_df.iterrows():
sig = "*" if row["pvalue"] < 0.05 else " "
direction = "co-localize" if row["spearman_r"] > 0 else "anti-corr"
print(f" {row['archetype']:15s}: r={row['spearman_r']:+.3f} "
f"p={row['pvalue']:.2e} {sig} ({direction})")
result = {
"boundary_scores": boundary_scores,
"mean_weights_a": mean_w_a,
"mean_weights_b": mean_w_b,
"cross_correlations": cross_corr_df,
"cell_type_a": cell_type_a,
"cell_type_b": cell_type_b,
"n_archetypes": n_arch,
}
adata.uns["archetype_interaction_boundaries"] = result
return result