Skip to content

Deployment metrics (rpx_benchmark.deployment)

ESD-weighted phase scoring, State-Transition Robustness, Temporal Stability, and Stack-Level Geometric Coherence. These are the second-tier metrics that go into the DeploymentReadinessReport beyond the raw per-sample task metrics.

deployment

Deployment-readiness metrics for RPX: TS, STR, SGC, ESD, weighted scoring.

These are the core novel contributions of the RPX benchmark paper.

Four metrics characterise how well a model generalises under deployment conditions:

TS — Temporal Stability: pose-compensated frame-to-frame consistency STR — State-Transition Robustness: performance drop / recovery across scene phases SGC — Stack-Level Geometric Coherence: mask–depth boundary alignment ESD — Effort-Stratified Difficulty: per-difficulty breakdown (Easy/Medium/Hard)

Plus the weighted phase scoring scheme: S_p = 0.25·M(p,Easy) + 0.35·M(p,Med) + 0.40·M(p,Hard) S_overall = (S_clutter + S_interaction + S_clean) / 3 Δ_int = S_interaction − S_clutter (interaction drop) Δ_rec = S_clean − S_interaction (recovery)

TemporalStabilityResult(ts_score: float, num_pairs: int, per_pair: List[float] = list()) dataclass

TS score per task type.

TS_seg = E[IoU(P_t, warp(P_{t+1}, ΔT))] (segmentation) TS_depth = E[||D_t − warp(D_{t+1}, ΔT)||_1 / N_valid] (depth)

Warping uses the relative SE(3) pose from the T265 ground-truth track. When exact warping is not feasible, a consistency proxy (unchanged-pixel fraction) is used as a lower bound.

StateTransitionRobustnessResult(str_c_to_i: float, str_i_to_l: float, metric_clutter: float, metric_interaction: float, metric_clean: float) dataclass

STR captures performance change across phase boundaries.

STR_{C→I} = M(interaction) − M(clutter) ← interaction drop (negative = worse) STR_{I→L} = M(clean) − M(interaction) ← recovery (positive = better)

StackGeometricCoherenceResult(sgc_score: float, precision: float, recall: float, num_samples: int) dataclass

SGC measures mask–depth boundary alignment.

SGC = F-score(boundary(mask), boundary(depth_gradient > τ)) Boundary pixels are extracted via Sobel gradient magnitude thresholding.

ESDResult(easy: float | None, medium: float | None, hard: float | None, metric_key: str) dataclass

Per-difficulty metric breakdown (Effort-Stratified Difficulty).

weighted_score() -> float

S_p = 0.25·Easy + 0.35·Medium + 0.40·Hard.

Source code in rpx_benchmark/deployment.py
def weighted_score(self) -> float:
    """S_p = 0.25·Easy + 0.35·Medium + 0.40·Hard."""
    total, weight = 0.0, 0.0
    for diff, w in ESD_WEIGHTS.items():
        val = getattr(self, diff.value)
        if val is not None:
            total += w * val
            weight += w
    return total / weight if weight > 0 else 0.0

WeightedPhaseScore(clutter: ESDResult, interaction: ESDResult, clean: ESDResult) dataclass

Full deployment-readiness scoring table.

Per phase: S_p = 0.25·M(p,Easy) + 0.35·M(p,Medium) + 0.40·M(p,Hard) Overall: S_overall = (S_C + S_I + S_L) / 3 Delta int: Δ_int = S_I − S_C Delta rec: Δ_rec = S_L − S_I

delta_int: float property

Interaction drop: S_I − S_C (negative = model degrades on interaction).

delta_rec: float property

Recovery: S_L − S_I (positive = model recovers after interaction).

DeploymentReadinessReport(task: str, model_name: str, weighted_phase_score: WeightedPhaseScore | None = None, temporal_stability: TemporalStabilityResult | None = None, state_transition: StateTransitionRobustnessResult | None = None, geometric_coherence: StackGeometricCoherenceResult | None = None, params_m: float | None = None, flops_g: float | None = None, actmem_gb_fp16: float | None = None, latency_ms_per_sample: float | None = None) dataclass

Aggregated deployment-readiness report for a model on a task.

compute_esd(per_sample_metrics: List[Dict[str, float]], per_sample_difficulties: List[Difficulty | None], metric_key: str) -> ESDResult

Compute per-difficulty metric averages from per-sample results.

Args: per_sample_metrics: list of metric dicts, one per sample. per_sample_difficulties: difficulty label per sample (may be None). metric_key: which metric key to stratify (e.g. "absrel", "miou").

Returns: ESDResult with easy/medium/hard averages.

Source code in rpx_benchmark/deployment.py
def compute_esd(
    per_sample_metrics: List[Dict[str, float]],
    per_sample_difficulties: List[Difficulty | None],
    metric_key: str,
) -> ESDResult:
    """Compute per-difficulty metric averages from per-sample results.

    Args:
        per_sample_metrics: list of metric dicts, one per sample.
        per_sample_difficulties: difficulty label per sample (may be None).
        metric_key: which metric key to stratify (e.g. "absrel", "miou").

    Returns:
        ESDResult with easy/medium/hard averages.
    """
    buckets: Dict[Difficulty, List[float]] = {d: [] for d in Difficulty}

    for metrics, diff in zip(per_sample_metrics, per_sample_difficulties):
        if diff is None or metric_key not in metrics:
            continue
        buckets[diff].append(metrics[metric_key])

    def mean_or_none(vals: List[float]) -> float | None:
        return float(np.mean(vals)) if vals else None

    return ESDResult(
        easy=mean_or_none(buckets[Difficulty.EASY]),
        medium=mean_or_none(buckets[Difficulty.MEDIUM]),
        hard=mean_or_none(buckets[Difficulty.HARD]),
        metric_key=metric_key,
    )

compute_weighted_phase_score(per_sample_metrics: List[Dict[str, float]], per_sample_phases: List[Phase | None], per_sample_difficulties: List[Difficulty | None], metric_key: str) -> WeightedPhaseScore

Compute the full weighted phase scoring table.

Groups samples by (phase, difficulty) and computes: S_p = 0.25·M(p,Easy) + 0.35·M(p,Medium) + 0.40·M(p,Hard) for each phase, then overall score and transition deltas.

Source code in rpx_benchmark/deployment.py
def compute_weighted_phase_score(
    per_sample_metrics: List[Dict[str, float]],
    per_sample_phases: List[Phase | None],
    per_sample_difficulties: List[Difficulty | None],
    metric_key: str,
) -> WeightedPhaseScore:
    """Compute the full weighted phase scoring table.

    Groups samples by (phase, difficulty) and computes:
        S_p = 0.25·M(p,Easy) + 0.35·M(p,Medium) + 0.40·M(p,Hard)
    for each phase, then overall score and transition deltas.
    """
    phase_sample_metrics: Dict[Phase, Tuple[List, List]] = {
        p: ([], []) for p in Phase
    }

    for m, ph, diff in zip(per_sample_metrics, per_sample_phases, per_sample_difficulties):
        if ph is None:
            continue
        phase_sample_metrics[ph][0].append(m)
        phase_sample_metrics[ph][1].append(diff)

    def esd_for_phase(ph: Phase) -> ESDResult:
        metrics_list, diff_list = phase_sample_metrics[ph]
        return compute_esd(metrics_list, diff_list, metric_key)

    return WeightedPhaseScore(
        clutter=esd_for_phase(Phase.CLUTTER),
        interaction=esd_for_phase(Phase.INTERACTION),
        clean=esd_for_phase(Phase.CLEAN),
    )

compute_str(phase_scores: Dict[Phase, float]) -> StateTransitionRobustnessResult

Compute STR from per-phase aggregated scores.

Args: phase_scores: dict mapping Phase → scalar metric value.

Source code in rpx_benchmark/deployment.py
def compute_str(
    phase_scores: Dict[Phase, float],
) -> StateTransitionRobustnessResult:
    """Compute STR from per-phase aggregated scores.

    Args:
        phase_scores: dict mapping Phase → scalar metric value.
    """
    m_c = phase_scores.get(Phase.CLUTTER, 0.0)
    m_i = phase_scores.get(Phase.INTERACTION, 0.0)
    m_l = phase_scores.get(Phase.CLEAN, 0.0)
    return StateTransitionRobustnessResult(
        str_c_to_i=m_i - m_c,
        str_i_to_l=m_l - m_i,
        metric_clutter=m_c,
        metric_interaction=m_i,
        metric_clean=m_l,
    )

compute_temporal_stability_seg(pred_masks: Sequence[np.ndarray], camera_poses: Sequence[np.ndarray | None]) -> TemporalStabilityResult

TS_seg = E[IoU(P_t, warp(P_{t+1}, ΔT))].

When T265 pose data is available, we use the relative rotation to compensate for camera motion before computing IoU between adjacent frames. Without pixel-accurate warping (which requires depth for backprojection), we apply a simplified affine proxy using the in-plane rotation component only.

This gives a conservative lower-bound TS_seg that is still a meaningful stability signal when scenes have modest depth variation.

Args: pred_masks: sequence of predicted segmentation masks (H×W int). camera_poses: per-frame 4×4 SE(3) matrices (camera-to-world), or None.

Returns: TemporalStabilityResult.

Source code in rpx_benchmark/deployment.py
def compute_temporal_stability_seg(
    pred_masks: Sequence[np.ndarray],
    camera_poses: Sequence[np.ndarray | None],
) -> TemporalStabilityResult:
    """TS_seg = E[IoU(P_t, warp(P_{t+1}, ΔT))].

    When T265 pose data is available, we use the relative rotation to compensate
    for camera motion before computing IoU between adjacent frames.  Without
    pixel-accurate warping (which requires depth for backprojection), we apply
    a simplified affine proxy using the in-plane rotation component only.

    This gives a conservative lower-bound TS_seg that is still a meaningful
    stability signal when scenes have modest depth variation.

    Args:
        pred_masks: sequence of predicted segmentation masks (H×W int).
        camera_poses: per-frame 4×4 SE(3) matrices (camera-to-world), or None.

    Returns:
        TemporalStabilityResult.
    """
    if len(pred_masks) < 2:
        return TemporalStabilityResult(ts_score=1.0, num_pairs=0)

    per_pair = []
    for t in range(len(pred_masks) - 1):
        m_t = np.asarray(pred_masks[t], dtype=np.int32)
        m_t1 = np.asarray(pred_masks[t + 1], dtype=np.int32)

        # Attempt pose-compensated warp if poses are available
        if camera_poses[t] is not None and camera_poses[t + 1] is not None:
            m_t1_warped = _warp_mask_approx(m_t1, camera_poses[t], camera_poses[t + 1])
        else:
            m_t1_warped = m_t1

        # Per-class IoU then mean
        classes = np.unique(np.concatenate([m_t.flatten(), m_t1_warped.flatten()]))
        classes = classes[classes >= 0]
        if len(classes) == 0:
            per_pair.append(1.0)
            continue
        ious = []
        for c in classes:
            inter = float(((m_t == c) & (m_t1_warped == c)).sum())
            union = float(((m_t == c) | (m_t1_warped == c)).sum())
            ious.append(inter / union if union > 0 else 1.0)
        per_pair.append(float(np.mean(ious)))

    return TemporalStabilityResult(
        ts_score=float(np.mean(per_pair)),
        num_pairs=len(per_pair),
        per_pair=per_pair,
    )

compute_temporal_stability_depth(pred_depths: Sequence[np.ndarray], camera_poses: Sequence[np.ndarray | None]) -> TemporalStabilityResult

TS_depth = E[||D_t − warp(D_{t+1}, ΔT)||_1 / N_valid].

Normalised to [0,1] by dividing by the max depth range to give a higher-is-better stability score (TS = 1 − normalised_L1).

Source code in rpx_benchmark/deployment.py
def compute_temporal_stability_depth(
    pred_depths: Sequence[np.ndarray],
    camera_poses: Sequence[np.ndarray | None],
) -> TemporalStabilityResult:
    """TS_depth = E[||D_t − warp(D_{t+1}, ΔT)||_1 / N_valid].

    Normalised to [0,1] by dividing by the max depth range to give a
    higher-is-better stability score (TS = 1 − normalised_L1).
    """
    if len(pred_depths) < 2:
        return TemporalStabilityResult(ts_score=1.0, num_pairs=0)

    per_pair = []
    for t in range(len(pred_depths) - 1):
        d_t = np.asarray(pred_depths[t], dtype=np.float32)
        d_t1 = np.asarray(pred_depths[t + 1], dtype=np.float32)

        if camera_poses[t] is not None and camera_poses[t + 1] is not None:
            d_t1_warped = _warp_depth_approx(d_t1, camera_poses[t], camera_poses[t + 1])
        else:
            d_t1_warped = d_t1

        valid = (d_t > 0) & (d_t1_warped > 0)
        if valid.sum() == 0:
            per_pair.append(1.0)
            continue
        l1 = float(np.abs(d_t[valid] - d_t1_warped[valid]).mean())
        depth_range = max(float(d_t[valid].max() - d_t[valid].min()), 1e-3)
        ts = max(0.0, 1.0 - l1 / depth_range)
        per_pair.append(ts)

    return TemporalStabilityResult(
        ts_score=float(np.mean(per_pair)),
        num_pairs=len(per_pair),
        per_pair=per_pair,
    )

compute_sgc(pred_masks: Sequence[np.ndarray], pred_depths: Sequence[np.ndarray], depth_gradient_threshold: float = 0.1, boundary_dilation: int = 2) -> StackGeometricCoherenceResult

Stack-Level Geometric Coherence: boundary F-score between mask and depth edges.

SGC = F-score(boundary(mask), boundary(depth_gradient > τ))

A high SGC means segmentation boundaries are geometrically consistent with the depth discontinuities — indicating the model perceives coherent surfaces.

Args: pred_masks: sequence of predicted segmentation masks (H×W int). pred_depths: sequence of predicted depth maps (H×W float32, metres). depth_gradient_threshold: τ for depth gradient thresholding. boundary_dilation: pixel tolerance for boundary matching.

Source code in rpx_benchmark/deployment.py
def compute_sgc(
    pred_masks: Sequence[np.ndarray],
    pred_depths: Sequence[np.ndarray],
    depth_gradient_threshold: float = 0.1,
    boundary_dilation: int = 2,
) -> StackGeometricCoherenceResult:
    """Stack-Level Geometric Coherence: boundary F-score between mask and depth edges.

    SGC = F-score(boundary(mask), boundary(depth_gradient > τ))

    A high SGC means segmentation boundaries are geometrically consistent with
    the depth discontinuities — indicating the model perceives coherent surfaces.

    Args:
        pred_masks: sequence of predicted segmentation masks (H×W int).
        pred_depths: sequence of predicted depth maps (H×W float32, metres).
        depth_gradient_threshold: τ for depth gradient thresholding.
        boundary_dilation: pixel tolerance for boundary matching.
    """
    if len(pred_masks) == 0:
        return StackGeometricCoherenceResult(sgc_score=0.0, precision=0.0, recall=0.0, num_samples=0)

    precisions, recalls = [], []
    for mask, depth in zip(pred_masks, pred_depths):
        mask = np.asarray(mask, dtype=np.int32)
        depth = np.asarray(depth, dtype=np.float32)

        mask_boundary = _extract_boundary(mask, dilation=boundary_dilation)
        depth_boundary = _extract_depth_boundary(depth, threshold=depth_gradient_threshold,
                                                 dilation=boundary_dilation)

        tp = float((mask_boundary & depth_boundary).sum())
        fp = float((mask_boundary & ~depth_boundary).sum())
        fn = float((~mask_boundary & depth_boundary).sum())

        p = tp / (tp + fp) if (tp + fp) > 0 else 1.0
        r = tp / (tp + fn) if (tp + fn) > 0 else 1.0
        precisions.append(p)
        recalls.append(r)

    mean_p = float(np.mean(precisions))
    mean_r = float(np.mean(recalls))
    f1 = (2 * mean_p * mean_r) / (mean_p + mean_r) if (mean_p + mean_r) > 0 else 0.0

    return StackGeometricCoherenceResult(
        sgc_score=f1,
        precision=mean_p,
        recall=mean_r,
        num_samples=len(pred_masks),
    )