Skip to content

Top-level package

The rpx_benchmark top-level package re-exports the most commonly used symbols so user code can write import rpx_benchmark as rpx and reach everything through one namespace.

rpx_benchmark

RPX — choose and rank perception models for robot learning.

rpx_benchmark is the reference toolkit for the RPX benchmark: a unified real-world RGB-D evaluation suite for the models actually deployed inside robot learning stacks. Bring your model (any HuggingFace checkpoint, numpy callable, or custom torch stack) and the toolkit handles dataset download, splits, metrics, reports, and ESD-weighted deployment-readiness scoring.

See the top-level README and the online documentation for getting started, the adapter framework, and the extension guides.

Difficulty

Bases: str, Enum

Effort-Stratified Difficulty (ESD) split label.

ESD splits are derived per (scene, phase) from the annotation-effort signal described in paper §4. See :mod:rpx_benchmark.deployment for the scoring details.

Members

EASY Few annotation iterations, low occlusion, stable visibility. MEDIUM HARD Many annotation iterations, dense occlusion, high depth- invalid fraction, high jerk.

Phase

Bases: str, Enum

Capture phases of the three-phase RPX reconfiguration protocol.

Every scene is recorded in three phases so the benchmark can attribute performance changes to scene state rather than to lighting / viewpoint / camera identity.

Members

CLUTTER Initial dense object arrangement; significant inter-object occlusion. INTERACTION Human operator grasps and moves objects. Introduces hand-object contact and transient occlusion. CLEAN Same objects re-organised sparsely. Serves as a within-scene control for the other two phases.

TaskType

Bases: str, Enum

Enumeration of every task the benchmark toolkit recognises.

Members are plain strings so they serialise cleanly to JSON and can be used as dict keys for logging / table rows.

Members

MONOCULAR_DEPTH Dense metric depth from a single RGB frame. OBJECT_DETECTION Closed-vocabulary detection with category labels. OBJECT_SEGMENTATION Instance segmentation masks with per-pixel instance IDs. OBJECT_TRACKING Multi-object tracking with persistent track IDs. RELATIVE_CAMERA_POSE 6-DoF pose of frame B relative to frame A. OPEN_VOCAB_DETECTION Detection conditioned on a free-text vocabulary. VISUAL_GROUNDING Referring expression → bounding box on the image. SPARSE_DEPTH Depth values at a sparse set of image locations only. NOVEL_VIEW_SYNTHESIS RGB synthesis from a held-out target pose. KEYPOINT_MATCHING Dense/sparse correspondences between two images.

Examples:

>>> from rpx_benchmark.api import TaskType
>>> TaskType.MONOCULAR_DEPTH.value
'monocular_depth'
>>> TaskType("monocular_depth") is TaskType.MONOCULAR_DEPTH
True

Sample(id: str, rgb: np.ndarray, ground_truth: Any, metadata: Dict[str, Any] | None = None, phase: Phase | None = None, difficulty: Difficulty | None = None, camera_pose: np.ndarray | None = None) dataclass

One input unit handed by :class:RPXDataset to a model.

Samples are produced by the loader and consumed by BenchmarkModel.predict. Every field is deliberately simple (numpy arrays, enums, plain dicts) so models and adapters don't need to know anything about the on-disk dataset format.

Parameters:

Name Type Description Default
id str

Unique identifier of the form {scene}_{phase}_{frame}. Used for joining per-sample metrics back to scenes / phases.

required
rgb ndarray

H × W × 3 uint8 RGB image in row-major order.

required
ground_truth Any

Task-specific GroundTruth dataclass (e.g. :class:DepthGroundTruth). The concrete type is determined by :attr:RPXDataset.task.

required
metadata dict

Free-form metadata the loader can attach; conventionally holds fisheye images, secondary RGB frames for pair tasks, and any label paths that do not fit into the ground-truth dataclass. Consumers should treat unknown keys as opaque.

None
phase Phase

Capture phase the frame belongs to. Required for ESD-weighted phase scoring.

None
difficulty Difficulty

ESD difficulty label of the (scene, phase) this sample belongs to.

None
camera_pose ndarray

4 × 4 float64 SE(3) matrix (camera → world) sourced from the T265 tracker. Used for the temporal-stability metric.

None

BenchmarkModel

Bases: ABC

Abstract base class every RPX-compatible model must implement.

In practice, most users should not subclass this directly — instead compose a :class:rpx_benchmark.adapters.BenchmarkableModel from an input adapter, a model callable, and an output adapter. BenchmarkableModel already implements :meth:predict and :meth:setup correctly for you.

Subclass only when you need complete control over how samples are routed to your model (e.g. true minibatching across GPU devices).

Attributes:

Name Type Description
task TaskType

The task this model solves. Must be set by subclasses (either at class level or in __init__). The runner checks that model.task == dataset.task before running.

Examples:

Minimal subclass::

class MyDepth(BenchmarkModel):
    task = TaskType.MONOCULAR_DEPTH

    def setup(self):
        self.net = load_my_checkpoint()

    def predict(self, batch):
        return [
            DepthPrediction(depth_map=self.net(s.rgb))
            for s in batch
        ]

Composed via :class:BenchmarkableModel::

bm = rpx.BenchmarkableModel(
    task=TaskType.MONOCULAR_DEPTH,
    input_adapter=MyInputAdapter(),
    model=my_nn_module,
    output_adapter=MyOutputAdapter(),
    name="my_model",
)

setup() -> None abstractmethod

Load checkpoints, warm CUDA, and do any other one-time init.

The runner calls this exactly once before iterating the dataset, unless BenchmarkRunner(call_setup=False) was passed — in which case the caller is responsible.

Source code in rpx_benchmark/api.py
@abstractmethod
def setup(self) -> None:
    """Load checkpoints, warm CUDA, and do any other one-time init.

    The runner calls this exactly once before iterating the
    dataset, unless ``BenchmarkRunner(call_setup=False)`` was
    passed — in which case the caller is responsible.
    """

predict(batch: Sequence[Sample]) -> Sequence[Any] abstractmethod

Run inference on a batch of samples.

Parameters:

Name Type Description Default
batch sequence of Sample

One or more samples. Length equals dataset.batch_size except possibly for the final tail batch.

required

Returns:

Type Description
sequence

One task-specific Prediction dataclass per input sample, in the same order. The prediction dataclass must match what :class:MetricSuite expects for this task.

Raises:

Type Description
ModelError

(By convention) when a sample cannot be processed. The runner surfaces it as a clean error rather than a stack trace.

Source code in rpx_benchmark/api.py
@abstractmethod
def predict(self, batch: Sequence[Sample]) -> Sequence[Any]:
    """Run inference on a batch of samples.

    Parameters
    ----------
    batch : sequence of Sample
        One or more samples. Length equals ``dataset.batch_size``
        except possibly for the final tail batch.

    Returns
    -------
    sequence
        One task-specific Prediction dataclass per input sample,
        in the same order. The prediction dataclass must match
        what :class:`MetricSuite` expects for this task.

    Raises
    ------
    ModelError
        (By convention) when a sample cannot be processed. The
        runner surfaces it as a clean error rather than a stack
        trace.
    """

RPXDataset(samples: List[Dict[str, Any]], task: TaskType, root: Path, batch_size: int = 1) dataclass

Iterates over RPX samples for a specific task.

Manifest format (JSON)::

{
  "task": "object_segmentation",
  "root": "/path/to/data",
  "samples": [
    {
      "id": "scene_001_clutter_00000",
      "scene": "scene_001",
      "phase": "clutter",
      "difficulty": "hard",
      "rgb":   "scene_001/0/rgb/00000.png",
      "depth": "scene_001/0/depth/00000.png",
      "mask":  "scene_001/0/mask/00000.png",
      "pose":  "scene_001/0/pose/00000.npz",
      ...
    }
  ]
}

All paths are relative to root. depth files are 16-bit PNG in millimetres (as saved by save_device_data.py). pose files are NPZ with keys position ([x,y,z] metres) and orientation ([x,y,z,w] quaternion) from the T265 tracker.

from_manifest(manifest_path: str | Path, batch_size: int = 1) -> 'RPXDataset' classmethod

Load a manifest JSON file from disk and return a dataset.

Parameters:

Name Type Description Default
manifest_path str or Path

Path to the manifest JSON. Produced either by :func:rpx_benchmark.hub.download_split or by a custom upload script.

required
batch_size int

Number of samples per iteration. Default 1.

1

Returns:

Type Description
RPXDataset

Raises:

Type Description
ManifestError

If the manifest file is missing, not valid JSON, or is missing required top-level fields.

Source code in rpx_benchmark/loader.py
@classmethod
def from_manifest(cls, manifest_path: str | Path, batch_size: int = 1) -> "RPXDataset":
    """Load a manifest JSON file from disk and return a dataset.

    Parameters
    ----------
    manifest_path : str or Path
        Path to the manifest JSON. Produced either by
        :func:`rpx_benchmark.hub.download_split` or by a custom
        upload script.
    batch_size : int
        Number of samples per iteration. Default 1.

    Returns
    -------
    RPXDataset

    Raises
    ------
    ManifestError
        If the manifest file is missing, not valid JSON, or is
        missing required top-level fields.
    """
    manifest_path = Path(manifest_path)
    if not manifest_path.is_file():
        raise ManifestError(
            f"Manifest file not found: {manifest_path}",
            hint="Did the HuggingFace download fail? Try rerunning with "
                 "--cache-dir pointing at a writable location.",
        )
    try:
        with manifest_path.open("r", encoding="utf-8") as f:
            manifest = json.load(f)
    except json.JSONDecodeError as e:
        raise ManifestError(
            f"Manifest at {manifest_path} is not valid JSON: {e}",
        ) from e
    return cls.from_dict(
        manifest, batch_size=batch_size, default_root=manifest_path.parent
    )

from_dict(manifest: Dict[str, Any], batch_size: int = 1, default_root: str | Path | None = None) -> 'RPXDataset' classmethod

Build a dataset from an already-parsed manifest dict.

Raises:

Type Description
ManifestError

If task is missing or unknown, or if samples is missing.

Source code in rpx_benchmark/loader.py
@classmethod
def from_dict(
    cls,
    manifest: Dict[str, Any],
    batch_size: int = 1,
    default_root: str | Path | None = None,
) -> "RPXDataset":
    """Build a dataset from an already-parsed manifest dict.

    Raises
    ------
    ManifestError
        If ``task`` is missing or unknown, or if ``samples`` is
        missing.
    """
    if "task" not in manifest:
        raise ManifestError(
            "Manifest is missing required field 'task'.",
            hint="Task must be one of: " +
                 ", ".join(t.value for t in TaskType),
        )
    try:
        task = TaskType(manifest["task"])
    except ValueError as e:
        raise ManifestError(
            f"Manifest task {manifest['task']!r} is not a known TaskType.",
            hint="Expected one of: " +
                 ", ".join(t.value for t in TaskType),
        ) from e

    if "samples" not in manifest:
        raise ManifestError(
            "Manifest is missing required field 'samples'.",
        )

    root = Path(manifest.get("root") or default_root or ".")
    samples = manifest["samples"]
    if not isinstance(samples, list):
        raise ManifestError(
            f"Manifest 'samples' must be a list, got {type(samples).__name__}",
        )
    log.debug("loaded manifest: task=%s root=%s samples=%d",
              task.value, root, len(samples))
    return cls(samples=samples, task=task, root=root, batch_size=batch_size)

MetricSuite(task: TaskType)

Thin wrapper around the metric registry used by the runner.

Kept as a class rather than a function because historical API expects MetricSuite.for_task(...).evaluate(pred, gt). New code can call :func:compute_metrics directly.

Source code in rpx_benchmark/metrics/registry.py
def __init__(self, task: TaskType) -> None:
    self.task = task

for_task(task: TaskType) -> 'MetricSuite' classmethod

Create a suite for the given task.

Raises:

Type Description
MetricError

If no calculators are registered for task.

Source code in rpx_benchmark/metrics/registry.py
@classmethod
def for_task(cls, task: TaskType) -> "MetricSuite":
    """Create a suite for the given task.

    Raises
    ------
    MetricError
        If no calculators are registered for ``task``.
    """
    if not get_calculators(task):
        raise MetricError(
            f"No metric calculator registered for task {task.value!r}.",
            hint="Import rpx_benchmark.metrics to trigger built-ins.",
        )
    return cls(task=task)

evaluate(prediction: Any, ground_truth: Any) -> Dict[str, float]

Run every registered calculator and return merged results.

Raises:

Type Description
MetricError

Propagated from individual calculators when inputs are shape-mismatched or wrong-typed.

Source code in rpx_benchmark/metrics/registry.py
def evaluate(self, prediction: Any, ground_truth: Any) -> Dict[str, float]:
    """Run every registered calculator and return merged results.

    Raises
    ------
    MetricError
        Propagated from individual calculators when inputs are
        shape-mismatched or wrong-typed.
    """
    return compute_metrics(self.task, prediction, ground_truth)

aggregate(per_sample: List[Dict[str, Any]]) -> Dict[str, float]

Mean over numeric metric keys; non-numeric metadata is skipped.

Parameters:

Name Type Description Default
per_sample list of dict

Per-sample rows. May contain metric floats and metadata strings/enums in the same dict.

required

Returns:

Type Description
dict[str, float]

One float per numeric key. Empty dict if per_sample is empty or contains no numeric values.

Source code in rpx_benchmark/metrics/registry.py
def aggregate(self, per_sample: List[Dict[str, Any]]) -> Dict[str, float]:
    """Mean over numeric metric keys; non-numeric metadata is skipped.

    Parameters
    ----------
    per_sample : list of dict
        Per-sample rows. May contain metric floats and metadata
        strings/enums in the same dict.

    Returns
    -------
    dict[str, float]
        One float per numeric key. Empty dict if ``per_sample`` is
        empty or contains no numeric values.
    """
    if not per_sample:
        return {}
    numeric_keys = [
        k for k, v in per_sample[0].items()
        if isinstance(v, (int, float)) and not isinstance(v, bool)
    ]
    out: Dict[str, float] = {}
    for k in numeric_keys:
        vals = [
            m[k] for m in per_sample
            if isinstance(m.get(k), (int, float))
            and not isinstance(m.get(k), bool)
        ]
        if vals:
            out[k] = float(np.mean(vals))
    return out

build_result(per_sample: List[Dict[str, Any]]) -> 'BenchmarkResult'

Convenience: wrap per_sample and its aggregate in a :class:BenchmarkResult.

Source code in rpx_benchmark/metrics/registry.py
def build_result(self, per_sample: List[Dict[str, Any]]) -> "BenchmarkResult":
    """Convenience: wrap ``per_sample`` and its aggregate in a
    :class:`BenchmarkResult`.
    """
    return BenchmarkResult(
        task=self.task,
        per_sample=per_sample,
        aggregated=self.aggregate(per_sample),
        num_samples=len(per_sample),
    )

BenchmarkResult(task: TaskType, per_sample: List[Dict[str, Any]], aggregated: Dict[str, float], num_samples: int) dataclass

Outcome of running a :class:BenchmarkRunner against a dataset.

Attributes:

Name Type Description
task TaskType

Which task was evaluated.

per_sample list of dict

One dict per sample. Each dict mixes metric keys (numeric) and metadata keys (id, phase, difficulty, scene) that :class:MetricSuite.aggregate silently skips when computing means.

aggregated dict[str, float]

Mean over the numeric metric keys in :attr:per_sample.

num_samples int

BenchmarkRunner(model: BenchmarkModel, dataset: RPXDataset, metric_suite: MetricSuite | None = None, call_setup: bool = True)

Runs a benchmark end-to-end for a given model.

Basic usage::

runner = BenchmarkRunner(model=model, dataset=dataset)
result = runner.run()
print(result.aggregated)

Phase-stratified usage (requires manifest with phase / difficulty fields)::

runner = BenchmarkRunner(model=model, dataset=dataset)
result, dr_report = runner.run_with_deployment_readiness(
    primary_metric="absrel",
    model_name="MyDepthModel",
)
Source code in rpx_benchmark/runner.py
def __init__(
    self,
    model: BenchmarkModel,
    dataset: RPXDataset,
    metric_suite: MetricSuite | None = None,
    call_setup: bool = True,
):
    self.model = model
    self.dataset = dataset
    self.metric_suite = metric_suite or MetricSuite.for_task(model.task)
    self.call_setup = call_setup
    self._validate_task_alignment()

run(progress: Optional[ProgressCallback] = None) -> BenchmarkResult

Run benchmark and return flat per-sample + aggregated metrics.

Source code in rpx_benchmark/runner.py
def run(
    self,
    progress: Optional[ProgressCallback] = None,
) -> BenchmarkResult:
    """Run benchmark and return flat per-sample + aggregated metrics."""
    if self.call_setup:
        if progress:
            progress(0, None, "setup")
        self.model.setup()

    total = len(self.dataset)
    per_sample: List[dict] = []
    if progress:
        progress(0, total, "predict")
    for batch in self.dataset:
        predictions = self.model.predict(batch)
        if len(predictions) != len(batch):
            raise ModelError(
                f"Model returned {len(predictions)} predictions for "
                f"a batch of {len(batch)} samples — must return one "
                "prediction per sample.",
            )
        for sample, pred in zip(batch, predictions):
            validate_prediction(self.model.task, pred, sample)
            metrics = self.metric_suite.evaluate(pred, sample.ground_truth)
            per_sample.append(metrics)
            if progress:
                progress(len(per_sample), total, "predict")

    return self.metric_suite.build_result(per_sample)

run_with_deployment_readiness(primary_metric: str, model_name: str = 'model', efficiency: EfficiencyMetadata | None = None, compute_ts: bool = True, compute_sgc_flag: bool = True, progress: Optional[ProgressCallback] = None) -> tuple[BenchmarkResult, DeploymentReadinessReport]

Run benchmark and compute all deployment-readiness metrics.

Args: primary_metric: metric key used for ESD/STR scoring (e.g. "absrel", "miou"). model_name: display name for the report. efficiency: pre-computed EfficiencyMetadata (params, FLOPs). compute_ts: whether to compute Temporal Stability (needs sequential frames). compute_sgc_flag: whether to compute SGC (needs both seg + depth predictions).

Returns: (BenchmarkResult, DeploymentReadinessReport)

Source code in rpx_benchmark/runner.py
def run_with_deployment_readiness(
    self,
    primary_metric: str,
    model_name: str = "model",
    efficiency: EfficiencyMetadata | None = None,
    compute_ts: bool = True,
    compute_sgc_flag: bool = True,
    progress: Optional[ProgressCallback] = None,
) -> tuple[BenchmarkResult, DeploymentReadinessReport]:
    """Run benchmark and compute all deployment-readiness metrics.

    Args:
        primary_metric: metric key used for ESD/STR scoring (e.g. "absrel", "miou").
        model_name: display name for the report.
        efficiency: pre-computed EfficiencyMetadata (params, FLOPs).
        compute_ts: whether to compute Temporal Stability (needs sequential frames).
        compute_sgc_flag: whether to compute SGC (needs both seg + depth predictions).

    Returns:
        (BenchmarkResult, DeploymentReadinessReport)
    """
    if self.call_setup:
        if progress:
            progress(0, None, "setup")
        self.model.setup()

    total = len(self.dataset)
    per_sample_metrics: List[dict] = []
    per_sample_phases: List[Phase | None] = []
    per_sample_difficulties: List[Difficulty | None] = []
    per_sample_poses: List[Any] = []
    all_predictions: List[Any] = []
    all_samples: List[Any] = []

    # Per-sample wall-clock inference time (seconds). First batch is
    # recorded separately so callers can discard warmup from the median.
    per_sample_seconds: List[float] = []
    first_batch_flops_g: Optional[float] = None

    if progress:
        progress(0, total, "predict")

    first_batch = True
    for batch in self.dataset:
        t0 = time.perf_counter()
        if first_batch:
            flops_g, predictions = _count_flops_of(self.model.predict, batch)
            if flops_g is not None and len(batch) > 0:
                first_batch_flops_g = flops_g / len(batch)
            first_batch = False
        else:
            predictions = self.model.predict(batch)
        batch_seconds = time.perf_counter() - t0

        if len(predictions) != len(batch):
            raise ModelError(
                f"Model returned {len(predictions)} predictions for "
                f"a batch of {len(batch)} samples — must return one "
                "prediction per sample.",
            )
        per_sample_seconds.extend([batch_seconds / len(batch)] * len(batch))

        for sample, pred in zip(batch, predictions):
            validate_prediction(self.model.task, pred, sample)
            metrics = self.metric_suite.evaluate(pred, sample.ground_truth)
            metrics.update(_sample_meta(sample))
            per_sample_metrics.append(metrics)
            per_sample_phases.append(sample.phase)
            per_sample_difficulties.append(sample.difficulty)
            per_sample_poses.append(sample.camera_pose)
            all_predictions.append(pred)
            all_samples.append(sample)
            if progress:
                progress(len(per_sample_metrics), total, "predict")

    result = self.metric_suite.build_result(per_sample_metrics)

    # Compute latency median after skipping the first batch (warmup).
    latency_ms: Optional[float] = None
    if per_sample_seconds:
        warm = per_sample_seconds[1:] if len(per_sample_seconds) > 1 else per_sample_seconds
        latency_ms = round(1000.0 * float(np.median(warm)), 3)

    # --- Weighted Phase Score + STR ---
    wps = compute_weighted_phase_score(
        per_sample_metrics=per_sample_metrics,
        per_sample_phases=per_sample_phases,
        per_sample_difficulties=per_sample_difficulties,
        metric_key=primary_metric,
    )

    str_result = compute_str({
        Phase.CLUTTER: wps.s_clutter,
        Phase.INTERACTION: wps.s_interaction,
        Phase.CLEAN: wps.s_clean,
    })

    # --- Temporal Stability ---
    ts_result: TemporalStabilityResult | None = None
    if compute_ts and len(all_predictions) >= 2:
        task = self.model.task
        if task == TaskType.OBJECT_SEGMENTATION:
            masks = [p.mask for p in all_predictions]
            ts_result = compute_temporal_stability_seg(masks, per_sample_poses)
        elif task == TaskType.MONOCULAR_DEPTH:
            depths = [p.depth_map for p in all_predictions]
            ts_result = compute_temporal_stability_depth(depths, per_sample_poses)

    # --- Stack-Level Geometric Coherence (requires seg + depth in metadata) ---
    sgc_result: StackGeometricCoherenceResult | None = None
    if compute_sgc_flag:
        # SGC requires both mask and depth predictions in the same run.
        # When running segmentation, check if depth maps are in sample metadata.
        if self.model.task == TaskType.OBJECT_SEGMENTATION:
            depth_maps = [
                s.metadata.get("depth_map") if s.metadata else None
                for s in all_samples
            ]
            if any(d is not None for d in depth_maps):
                valid = [(p.mask, d) for p, d in zip(all_predictions, depth_maps)
                         if d is not None]
                masks_sgc = [v[0] for v in valid]
                depths_sgc = [np.asarray(v[1], dtype=np.float32) for v in valid]
                sgc_result = compute_sgc(masks_sgc, depths_sgc)

    # Merge counted FLOPs + measured latency into the passed-in
    # efficiency object (caller usually pre-computed params_m).
    flops_g = (efficiency.flops_g if efficiency and efficiency.flops_g else
               first_batch_flops_g)
    report = DeploymentReadinessReport(
        task=self.model.task.value,
        model_name=model_name,
        weighted_phase_score=wps,
        temporal_stability=ts_result,
        state_transition=str_result,
        geometric_coherence=sgc_result,
        params_m=efficiency.params_m if efficiency else None,
        flops_g=flops_g,
        actmem_gb_fp16=efficiency.actmem_gb_fp16 if efficiency else None,
        latency_ms_per_sample=latency_ms,
    )

    return result, report

DeploymentReadinessReport(task: str, model_name: str, weighted_phase_score: WeightedPhaseScore | None = None, temporal_stability: TemporalStabilityResult | None = None, state_transition: StateTransitionRobustnessResult | None = None, geometric_coherence: StackGeometricCoherenceResult | None = None, params_m: float | None = None, flops_g: float | None = None, actmem_gb_fp16: float | None = None, latency_ms_per_sample: float | None = None) dataclass

Aggregated deployment-readiness report for a model on a task.

ESDResult(easy: float | None, medium: float | None, hard: float | None, metric_key: str) dataclass

Per-difficulty metric breakdown (Effort-Stratified Difficulty).

weighted_score() -> float

S_p = 0.25·Easy + 0.35·Medium + 0.40·Hard.

Source code in rpx_benchmark/deployment.py
def weighted_score(self) -> float:
    """S_p = 0.25·Easy + 0.35·Medium + 0.40·Hard."""
    total, weight = 0.0, 0.0
    for diff, w in ESD_WEIGHTS.items():
        val = getattr(self, diff.value)
        if val is not None:
            total += w * val
            weight += w
    return total / weight if weight > 0 else 0.0

StackGeometricCoherenceResult(sgc_score: float, precision: float, recall: float, num_samples: int) dataclass

SGC measures mask–depth boundary alignment.

SGC = F-score(boundary(mask), boundary(depth_gradient > τ)) Boundary pixels are extracted via Sobel gradient magnitude thresholding.

StateTransitionRobustnessResult(str_c_to_i: float, str_i_to_l: float, metric_clutter: float, metric_interaction: float, metric_clean: float) dataclass

STR captures performance change across phase boundaries.

STR_{C→I} = M(interaction) − M(clutter) ← interaction drop (negative = worse) STR_{I→L} = M(clean) − M(interaction) ← recovery (positive = better)

TemporalStabilityResult(ts_score: float, num_pairs: int, per_pair: List[float] = list()) dataclass

TS score per task type.

TS_seg = E[IoU(P_t, warp(P_{t+1}, ΔT))] (segmentation) TS_depth = E[||D_t − warp(D_{t+1}, ΔT)||_1 / N_valid] (depth)

Warping uses the relative SE(3) pose from the T265 ground-truth track. When exact warping is not feasible, a consistency proxy (unchanged-pixel fraction) is used as a lower bound.

WeightedPhaseScore(clutter: ESDResult, interaction: ESDResult, clean: ESDResult) dataclass

Full deployment-readiness scoring table.

Per phase: S_p = 0.25·M(p,Easy) + 0.35·M(p,Medium) + 0.40·M(p,Hard) Overall: S_overall = (S_C + S_I + S_L) / 3 Delta int: Δ_int = S_I − S_C Delta rec: Δ_rec = S_L − S_I

delta_int: float property

Interaction drop: S_I − S_C (negative = model degrades on interaction).

delta_rec: float property

Recovery: S_L − S_I (positive = model recovers after interaction).

EfficiencyMetadata(params_m: float | None = None, flops_g: float | None = None, actmem_gb_fp16: float | None = None, latency_ms_per_sample: float | None = None, model_type: str = 'local', notes: str = '') dataclass

Hardware-agnostic efficiency metadata for a model.

to_table_row() -> dict

Produce result-table-ready dict (None → 'N/A (API)' for API models).

Source code in rpx_benchmark/profiler.py
def to_table_row(self) -> dict:
    """Produce result-table-ready dict (None → 'N/A (API)' for API models)."""
    na = "N/A (API)" if self.model_type == "api" else None
    return {
        "type": self.model_type,
        "params_m": self.params_m if self.params_m is not None else na,
        "flops_g": self.flops_g if self.flops_g is not None else na,
        "actmem_gb_fp16": self.actmem_gb_fp16,
        "latency_ms_per_sample": self.latency_ms_per_sample,
    }

AdapterError(message: str, *, hint: Optional[str] = None, details: Optional[dict[str, Any]] = None)

Bases: ModelError

Input or output adapter produced an invalid payload.

Examples:

  • InputAdapter.prepare raised during preprocessing.
  • OutputAdapter.finalize returned a non-DepthPrediction for the monocular depth task.
  • HF processor's post_process_depth_estimation signature does not accept the kwargs the adapter wants to pass.
Source code in rpx_benchmark/exceptions.py
def __init__(
    self,
    message: str,
    *,
    hint: Optional[str] = None,
    details: Optional[dict[str, Any]] = None,
) -> None:
    self.message = message
    self.hint = hint
    self.details = dict(details) if details else {}
    full = message
    if hint:
        full = f"{message}\n  hint: {hint}"
    super().__init__(full)

ConfigError(message: str, *, hint: Optional[str] = None, details: Optional[dict[str, Any]] = None)

Bases: RPXError

Raised when a user-supplied config is invalid.

Examples:

  • MonocularDepthRunConfig built with both model and hf_checkpoint set.
  • CLI given --device cuda on a CPU-only host with --strict-device enabled.
  • Unknown difficulty split.
Source code in rpx_benchmark/exceptions.py
def __init__(
    self,
    message: str,
    *,
    hint: Optional[str] = None,
    details: Optional[dict[str, Any]] = None,
) -> None:
    self.message = message
    self.hint = hint
    self.details = dict(details) if details else {}
    full = message
    if hint:
        full = f"{message}\n  hint: {hint}"
    super().__init__(full)

DatasetError(message: str, *, hint: Optional[str] = None, details: Optional[dict[str, Any]] = None)

Bases: RPXError

Base class for dataset load / manifest / download failures.

Source code in rpx_benchmark/exceptions.py
def __init__(
    self,
    message: str,
    *,
    hint: Optional[str] = None,
    details: Optional[dict[str, Any]] = None,
) -> None:
    self.message = message
    self.hint = hint
    self.details = dict(details) if details else {}
    full = message
    if hint:
        full = f"{message}\n  hint: {hint}"
    super().__init__(full)

DownloadError(message: str, *, hint: Optional[str] = None, details: Optional[dict[str, Any]] = None)

Bases: DatasetError

HuggingFace download or cache lookup failed.

Raised by :mod:rpx_benchmark.hub when snapshot_download or hf_hub_download fails (network issue, bad repo id, missing revision, permission denied).

Source code in rpx_benchmark/exceptions.py
def __init__(
    self,
    message: str,
    *,
    hint: Optional[str] = None,
    details: Optional[dict[str, Any]] = None,
) -> None:
    self.message = message
    self.hint = hint
    self.details = dict(details) if details else {}
    full = message
    if hint:
        full = f"{message}\n  hint: {hint}"
    super().__init__(full)

ManifestError(message: str, *, hint: Optional[str] = None, details: Optional[dict[str, Any]] = None)

Bases: DatasetError

Manifest JSON is missing, malformed, or references missing files.

Raised by :class:rpx_benchmark.loader.RPXDataset when the loader cannot resolve a sample from the manifest it was handed.

Examples:

  • Manifest missing the task field.
  • Sample lists rgb that does not exist on disk.
  • Task value is not in :class:rpx_benchmark.api.TaskType.
Source code in rpx_benchmark/exceptions.py
def __init__(
    self,
    message: str,
    *,
    hint: Optional[str] = None,
    details: Optional[dict[str, Any]] = None,
) -> None:
    self.message = message
    self.hint = hint
    self.details = dict(details) if details else {}
    full = message
    if hint:
        full = f"{message}\n  hint: {hint}"
    super().__init__(full)

MetricError(message: str, *, hint: Optional[str] = None, details: Optional[dict[str, Any]] = None)

Bases: RPXError

Raised when a metric calculator cannot compute a score.

Examples:

  • Prediction dataclass is the wrong type for the task.
  • Ground-truth shape does not match prediction shape.
  • Unknown metric name requested from a registry.
Source code in rpx_benchmark/exceptions.py
def __init__(
    self,
    message: str,
    *,
    hint: Optional[str] = None,
    details: Optional[dict[str, Any]] = None,
) -> None:
    self.message = message
    self.hint = hint
    self.details = dict(details) if details else {}
    full = message
    if hint:
        full = f"{message}\n  hint: {hint}"
    super().__init__(full)

ModelError(message: str, *, hint: Optional[str] = None, details: Optional[dict[str, Any]] = None)

Bases: RPXError

Raised by model factories or the runner when a model misbehaves.

Examples:

  • Model returned a different number of predictions than samples.
  • Model's task attribute does not match the dataset task.
  • Prediction dataclass has a wrong shape.
Source code in rpx_benchmark/exceptions.py
def __init__(
    self,
    message: str,
    *,
    hint: Optional[str] = None,
    details: Optional[dict[str, Any]] = None,
) -> None:
    self.message = message
    self.hint = hint
    self.details = dict(details) if details else {}
    full = message
    if hint:
        full = f"{message}\n  hint: {hint}"
    super().__init__(full)

RPXError(message: str, *, hint: Optional[str] = None, details: Optional[dict[str, Any]] = None)

Bases: Exception

Base exception for every error raised by the RPX benchmark toolkit.

All library code raises a subclass of this so user code can write a single except RPXError to catch all benchmark failures without accidentally swallowing unrelated exceptions.

Parameters:

Name Type Description Default
message str

Human-readable description of what failed. Should include a hint about what the user can do next.

required
hint str

Additional remediation advice rendered after the main message.

None
details dict

Structured context (e.g. offending field, expected value) that higher-level code may inspect.

None

Examples:

>>> from rpx_benchmark.exceptions import RPXError
>>> raise RPXError("something went wrong", hint="check the cache dir")
...
Source code in rpx_benchmark/exceptions.py
def __init__(
    self,
    message: str,
    *,
    hint: Optional[str] = None,
    details: Optional[dict[str, Any]] = None,
) -> None:
    self.message = message
    self.hint = hint
    self.details = dict(details) if details else {}
    full = message
    if hint:
        full = f"{message}\n  hint: {hint}"
    super().__init__(full)

BenchmarkableModel(task: TaskType, input_adapter: InputAdapter, model: Any, output_adapter: OutputAdapter, invoker: ModelInvoker = default_invoker, name: str = 'benchmarkable_model', setup_hook: Optional[Callable[[], None]] = None) dataclass

Bases: BenchmarkModel

Compose an input adapter, a model, and an output adapter.

This is the canonical way to plug a model into the RPX benchmark harness. The BenchmarkRunner only ever sees the :class:BenchmarkModel contract (setup, predict); all the model-family-specific logic lives in the adapters so the harness stays task-agnostic.

Example — wrap a HuggingFace depth model::

from rpx_benchmark.adapters.depth_hf import make_hf_depth_model
bm = make_hf_depth_model(
    "depth-anything/Depth-Anything-V2-Metric-Indoor-Large-hf",
    device="cuda",
)
# Hand `bm` to the benchmark runner or the task entrypoint.

Example — wrap a plain numpy callable::

def my_depth(rgb: np.ndarray) -> np.ndarray:
    return some_depth_in_metres

bm = make_numpy_depth_model(my_depth)

InputAdapter

Bases: Protocol

Sample → model-ready payload.

setup() -> None

Optional one-time setup (e.g., build a processor on first use).

Source code in rpx_benchmark/adapters/base.py
def setup(self) -> None:  # pragma: no cover - optional hook
    """Optional one-time setup (e.g., build a processor on first use)."""

OutputAdapter

Bases: Protocol

Model output → RPX prediction object.

setup() -> None

Optional one-time setup.

Source code in rpx_benchmark/adapters/base.py
def setup(self) -> None:  # pragma: no cover - optional hook
    """Optional one-time setup."""

PreparedInput(payload: Any, context: Dict[str, Any] = dict()) dataclass

Everything a model needs for one sample, plus context for post-processing.

payload is whatever the model's forward call accepts. If it is a dict, the default invoker calls model(**payload); otherwise model(payload).

context is a free-form dict the output adapter receives back. Use it to stash things like target image size, original intrinsics, or any preprocessing metadata the postprocessing step needs.

MonocularDepthRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_monocular_depth.

Inherits every standard field from :class:TaskRunConfig. Adds no task-specific fields — monocular depth is the "base case" a new user encounters.

Examples:

>>> from rpx_benchmark import MonocularDepthRunConfig
>>> cfg = MonocularDepthRunConfig(
...     hf_checkpoint="depth-anything/Depth-Anything-V2-Metric-Indoor-Small-hf",
...     split="hard",
...     device="cpu",
... )

SegmentationRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_segmentation.

See :class:TaskRunConfig for the full field reference.

Raises:

Type Description
ConfigError

If zero or more than one model selector is set, or if the split is not a valid ESD difficulty, or batch_size < 1.

ObjectDetectionRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_object_detection.

See :class:TaskRunConfig for the shared field reference. Detection currently has no registered model factories, so the only supported selector is model= (a pre-built :class:BenchmarkableModel, typically from :func:rpx_benchmark.make_numpy_detection_model).

VisualGroundingRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_visual_grounding.

Grounding models take (rgb, text) and return the referred bounding box. The referring expression is plucked from sample.ground_truth.text by the adapter so the model never sees the GT boxes.

RelativePoseRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_relative_pose.

Pose models receive two RGB frames (rgb_a and rgb_b plucked from sample.metadata by the adapter) and return the predicted rotation + translation from frame A to frame B.

KeypointMatchingRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_keypoint_matching.

Matching models receive two RGB frames (rgb_a + rgb_b) and return corresponding points in each image's pixel grid.

SparseDepthRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_sparse_depth.

Sparse-depth models receive (rgb, coordinates) where coordinates is the (N, 2) float array of pixel locations where depth is queried, and return an (N,) array of depth values in metres at those exact coordinates.

NovelViewSynthesisRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_novel_view_synthesis.

NVS models receive (rgb_source, target_pose) where the target pose is a 4×4 SE(3) camera-to-world matrix plucked from sample.ground_truth.camera_pose by the adapter. They return a synthesised RGB image at the target viewpoint.

compute_esd(per_sample_metrics: List[Dict[str, float]], per_sample_difficulties: List[Difficulty | None], metric_key: str) -> ESDResult

Compute per-difficulty metric averages from per-sample results.

Args: per_sample_metrics: list of metric dicts, one per sample. per_sample_difficulties: difficulty label per sample (may be None). metric_key: which metric key to stratify (e.g. "absrel", "miou").

Returns: ESDResult with easy/medium/hard averages.

Source code in rpx_benchmark/deployment.py
def compute_esd(
    per_sample_metrics: List[Dict[str, float]],
    per_sample_difficulties: List[Difficulty | None],
    metric_key: str,
) -> ESDResult:
    """Compute per-difficulty metric averages from per-sample results.

    Args:
        per_sample_metrics: list of metric dicts, one per sample.
        per_sample_difficulties: difficulty label per sample (may be None).
        metric_key: which metric key to stratify (e.g. "absrel", "miou").

    Returns:
        ESDResult with easy/medium/hard averages.
    """
    buckets: Dict[Difficulty, List[float]] = {d: [] for d in Difficulty}

    for metrics, diff in zip(per_sample_metrics, per_sample_difficulties):
        if diff is None or metric_key not in metrics:
            continue
        buckets[diff].append(metrics[metric_key])

    def mean_or_none(vals: List[float]) -> float | None:
        return float(np.mean(vals)) if vals else None

    return ESDResult(
        easy=mean_or_none(buckets[Difficulty.EASY]),
        medium=mean_or_none(buckets[Difficulty.MEDIUM]),
        hard=mean_or_none(buckets[Difficulty.HARD]),
        metric_key=metric_key,
    )

compute_sgc(pred_masks: Sequence[np.ndarray], pred_depths: Sequence[np.ndarray], depth_gradient_threshold: float = 0.1, boundary_dilation: int = 2) -> StackGeometricCoherenceResult

Stack-Level Geometric Coherence: boundary F-score between mask and depth edges.

SGC = F-score(boundary(mask), boundary(depth_gradient > τ))

A high SGC means segmentation boundaries are geometrically consistent with the depth discontinuities — indicating the model perceives coherent surfaces.

Args: pred_masks: sequence of predicted segmentation masks (H×W int). pred_depths: sequence of predicted depth maps (H×W float32, metres). depth_gradient_threshold: τ for depth gradient thresholding. boundary_dilation: pixel tolerance for boundary matching.

Source code in rpx_benchmark/deployment.py
def compute_sgc(
    pred_masks: Sequence[np.ndarray],
    pred_depths: Sequence[np.ndarray],
    depth_gradient_threshold: float = 0.1,
    boundary_dilation: int = 2,
) -> StackGeometricCoherenceResult:
    """Stack-Level Geometric Coherence: boundary F-score between mask and depth edges.

    SGC = F-score(boundary(mask), boundary(depth_gradient > τ))

    A high SGC means segmentation boundaries are geometrically consistent with
    the depth discontinuities — indicating the model perceives coherent surfaces.

    Args:
        pred_masks: sequence of predicted segmentation masks (H×W int).
        pred_depths: sequence of predicted depth maps (H×W float32, metres).
        depth_gradient_threshold: τ for depth gradient thresholding.
        boundary_dilation: pixel tolerance for boundary matching.
    """
    if len(pred_masks) == 0:
        return StackGeometricCoherenceResult(sgc_score=0.0, precision=0.0, recall=0.0, num_samples=0)

    precisions, recalls = [], []
    for mask, depth in zip(pred_masks, pred_depths):
        mask = np.asarray(mask, dtype=np.int32)
        depth = np.asarray(depth, dtype=np.float32)

        mask_boundary = _extract_boundary(mask, dilation=boundary_dilation)
        depth_boundary = _extract_depth_boundary(depth, threshold=depth_gradient_threshold,
                                                 dilation=boundary_dilation)

        tp = float((mask_boundary & depth_boundary).sum())
        fp = float((mask_boundary & ~depth_boundary).sum())
        fn = float((~mask_boundary & depth_boundary).sum())

        p = tp / (tp + fp) if (tp + fp) > 0 else 1.0
        r = tp / (tp + fn) if (tp + fn) > 0 else 1.0
        precisions.append(p)
        recalls.append(r)

    mean_p = float(np.mean(precisions))
    mean_r = float(np.mean(recalls))
    f1 = (2 * mean_p * mean_r) / (mean_p + mean_r) if (mean_p + mean_r) > 0 else 0.0

    return StackGeometricCoherenceResult(
        sgc_score=f1,
        precision=mean_p,
        recall=mean_r,
        num_samples=len(pred_masks),
    )

compute_str(phase_scores: Dict[Phase, float]) -> StateTransitionRobustnessResult

Compute STR from per-phase aggregated scores.

Args: phase_scores: dict mapping Phase → scalar metric value.

Source code in rpx_benchmark/deployment.py
def compute_str(
    phase_scores: Dict[Phase, float],
) -> StateTransitionRobustnessResult:
    """Compute STR from per-phase aggregated scores.

    Args:
        phase_scores: dict mapping Phase → scalar metric value.
    """
    m_c = phase_scores.get(Phase.CLUTTER, 0.0)
    m_i = phase_scores.get(Phase.INTERACTION, 0.0)
    m_l = phase_scores.get(Phase.CLEAN, 0.0)
    return StateTransitionRobustnessResult(
        str_c_to_i=m_i - m_c,
        str_i_to_l=m_l - m_i,
        metric_clutter=m_c,
        metric_interaction=m_i,
        metric_clean=m_l,
    )

compute_temporal_stability_depth(pred_depths: Sequence[np.ndarray], camera_poses: Sequence[np.ndarray | None]) -> TemporalStabilityResult

TS_depth = E[||D_t − warp(D_{t+1}, ΔT)||_1 / N_valid].

Normalised to [0,1] by dividing by the max depth range to give a higher-is-better stability score (TS = 1 − normalised_L1).

Source code in rpx_benchmark/deployment.py
def compute_temporal_stability_depth(
    pred_depths: Sequence[np.ndarray],
    camera_poses: Sequence[np.ndarray | None],
) -> TemporalStabilityResult:
    """TS_depth = E[||D_t − warp(D_{t+1}, ΔT)||_1 / N_valid].

    Normalised to [0,1] by dividing by the max depth range to give a
    higher-is-better stability score (TS = 1 − normalised_L1).
    """
    if len(pred_depths) < 2:
        return TemporalStabilityResult(ts_score=1.0, num_pairs=0)

    per_pair = []
    for t in range(len(pred_depths) - 1):
        d_t = np.asarray(pred_depths[t], dtype=np.float32)
        d_t1 = np.asarray(pred_depths[t + 1], dtype=np.float32)

        if camera_poses[t] is not None and camera_poses[t + 1] is not None:
            d_t1_warped = _warp_depth_approx(d_t1, camera_poses[t], camera_poses[t + 1])
        else:
            d_t1_warped = d_t1

        valid = (d_t > 0) & (d_t1_warped > 0)
        if valid.sum() == 0:
            per_pair.append(1.0)
            continue
        l1 = float(np.abs(d_t[valid] - d_t1_warped[valid]).mean())
        depth_range = max(float(d_t[valid].max() - d_t[valid].min()), 1e-3)
        ts = max(0.0, 1.0 - l1 / depth_range)
        per_pair.append(ts)

    return TemporalStabilityResult(
        ts_score=float(np.mean(per_pair)),
        num_pairs=len(per_pair),
        per_pair=per_pair,
    )

compute_temporal_stability_seg(pred_masks: Sequence[np.ndarray], camera_poses: Sequence[np.ndarray | None]) -> TemporalStabilityResult

TS_seg = E[IoU(P_t, warp(P_{t+1}, ΔT))].

When T265 pose data is available, we use the relative rotation to compensate for camera motion before computing IoU between adjacent frames. Without pixel-accurate warping (which requires depth for backprojection), we apply a simplified affine proxy using the in-plane rotation component only.

This gives a conservative lower-bound TS_seg that is still a meaningful stability signal when scenes have modest depth variation.

Args: pred_masks: sequence of predicted segmentation masks (H×W int). camera_poses: per-frame 4×4 SE(3) matrices (camera-to-world), or None.

Returns: TemporalStabilityResult.

Source code in rpx_benchmark/deployment.py
def compute_temporal_stability_seg(
    pred_masks: Sequence[np.ndarray],
    camera_poses: Sequence[np.ndarray | None],
) -> TemporalStabilityResult:
    """TS_seg = E[IoU(P_t, warp(P_{t+1}, ΔT))].

    When T265 pose data is available, we use the relative rotation to compensate
    for camera motion before computing IoU between adjacent frames.  Without
    pixel-accurate warping (which requires depth for backprojection), we apply
    a simplified affine proxy using the in-plane rotation component only.

    This gives a conservative lower-bound TS_seg that is still a meaningful
    stability signal when scenes have modest depth variation.

    Args:
        pred_masks: sequence of predicted segmentation masks (H×W int).
        camera_poses: per-frame 4×4 SE(3) matrices (camera-to-world), or None.

    Returns:
        TemporalStabilityResult.
    """
    if len(pred_masks) < 2:
        return TemporalStabilityResult(ts_score=1.0, num_pairs=0)

    per_pair = []
    for t in range(len(pred_masks) - 1):
        m_t = np.asarray(pred_masks[t], dtype=np.int32)
        m_t1 = np.asarray(pred_masks[t + 1], dtype=np.int32)

        # Attempt pose-compensated warp if poses are available
        if camera_poses[t] is not None and camera_poses[t + 1] is not None:
            m_t1_warped = _warp_mask_approx(m_t1, camera_poses[t], camera_poses[t + 1])
        else:
            m_t1_warped = m_t1

        # Per-class IoU then mean
        classes = np.unique(np.concatenate([m_t.flatten(), m_t1_warped.flatten()]))
        classes = classes[classes >= 0]
        if len(classes) == 0:
            per_pair.append(1.0)
            continue
        ious = []
        for c in classes:
            inter = float(((m_t == c) & (m_t1_warped == c)).sum())
            union = float(((m_t == c) | (m_t1_warped == c)).sum())
            ious.append(inter / union if union > 0 else 1.0)
        per_pair.append(float(np.mean(ious)))

    return TemporalStabilityResult(
        ts_score=float(np.mean(per_pair)),
        num_pairs=len(per_pair),
        per_pair=per_pair,
    )

compute_weighted_phase_score(per_sample_metrics: List[Dict[str, float]], per_sample_phases: List[Phase | None], per_sample_difficulties: List[Difficulty | None], metric_key: str) -> WeightedPhaseScore

Compute the full weighted phase scoring table.

Groups samples by (phase, difficulty) and computes: S_p = 0.25·M(p,Easy) + 0.35·M(p,Medium) + 0.40·M(p,Hard) for each phase, then overall score and transition deltas.

Source code in rpx_benchmark/deployment.py
def compute_weighted_phase_score(
    per_sample_metrics: List[Dict[str, float]],
    per_sample_phases: List[Phase | None],
    per_sample_difficulties: List[Difficulty | None],
    metric_key: str,
) -> WeightedPhaseScore:
    """Compute the full weighted phase scoring table.

    Groups samples by (phase, difficulty) and computes:
        S_p = 0.25·M(p,Easy) + 0.35·M(p,Medium) + 0.40·M(p,Hard)
    for each phase, then overall score and transition deltas.
    """
    phase_sample_metrics: Dict[Phase, Tuple[List, List]] = {
        p: ([], []) for p in Phase
    }

    for m, ph, diff in zip(per_sample_metrics, per_sample_phases, per_sample_difficulties):
        if ph is None:
            continue
        phase_sample_metrics[ph][0].append(m)
        phase_sample_metrics[ph][1].append(diff)

    def esd_for_phase(ph: Phase) -> ESDResult:
        metrics_list, diff_list = phase_sample_metrics[ph]
        return compute_esd(metrics_list, diff_list, metric_key)

    return WeightedPhaseScore(
        clutter=esd_for_phase(Phase.CLUTTER),
        interaction=esd_for_phase(Phase.INTERACTION),
        clean=esd_for_phase(Phase.CLEAN),
    )

profile_model(model: Any, input_shape: Tuple[int, ...] = (3, 480, 640), device: str = 'cpu', model_type: str = 'local', notes: str = '') -> EfficiencyMetadata

Auto-profile a model and return EfficiencyMetadata.

Args: model: a model object (PyTorch nn.Module recommended). input_shape: (C, H, W) for FLOPs counting. Default 640×480 RGB. device: device for dummy input tensor. model_type: "local" or "api". notes: free-text notes (e.g. "ViT-L/14, FP16 inference").

Returns: EfficiencyMetadata with params_m and flops_g filled where possible.

Source code in rpx_benchmark/profiler.py
def profile_model(
    model: Any,
    input_shape: Tuple[int, ...] = (3, 480, 640),
    device: str = "cpu",
    model_type: str = "local",
    notes: str = "",
) -> EfficiencyMetadata:
    """Auto-profile a model and return EfficiencyMetadata.

    Args:
        model: a model object (PyTorch nn.Module recommended).
        input_shape: (C, H, W) for FLOPs counting. Default 640×480 RGB.
        device: device for dummy input tensor.
        model_type: "local" or "api".
        notes: free-text notes (e.g. "ViT-L/14, FP16 inference").

    Returns:
        EfficiencyMetadata with params_m and flops_g filled where possible.
    """
    if model_type == "api":
        return EfficiencyMetadata(model_type="api", notes=notes)

    params_m = count_parameters(model)
    flops_g = count_flops_torch(model, input_shape, device=device)

    return EfficiencyMetadata(
        params_m=params_m,
        flops_g=flops_g,
        model_type=model_type,
        notes=notes,
    )

count_parameters(model: Any) -> float

Count trainable parameters in millions.

Supports PyTorch nn.Module and any object with a parameters() method. Returns None if the model type is not supported.

Source code in rpx_benchmark/profiler.py
def count_parameters(model: Any) -> float:
    """Count trainable parameters in millions.

    Supports PyTorch nn.Module and any object with a ``parameters()`` method.
    Returns None if the model type is not supported.
    """
    try:
        params = sum(p.numel() for p in model.parameters() if p.requires_grad)
        return round(params / 1e6, 3)
    except AttributeError:
        pass

    # JAX / Flax: model may expose a ``params`` pytree
    try:
        import jax
        leaves = jax.tree_util.tree_leaves(model.params)
        params = sum(leaf.size for leaf in leaves)
        return round(params / 1e6, 3)
    except (AttributeError, ImportError):
        pass

    return None

download_split(task: TaskType | str, split: Difficulty | str, repo_id: str = DEFAULT_REPO_ID, cache_dir: str | Path | None = None, revision: str | None = None, extra_modalities: Sequence[str] | None = None, max_workers: int = 8) -> Path

Download only the files (task, split) needs, return resolved manifest path.

The resolved manifest is a JSON file whose root field points to the local HF snapshot directory, so it can be fed directly to :meth:RPXDataset.from_manifest.

Source code in rpx_benchmark/hub.py
def download_split(
    task: TaskType | str,
    split: Difficulty | str,
    repo_id: str = DEFAULT_REPO_ID,
    cache_dir: str | Path | None = None,
    revision: str | None = None,
    extra_modalities: Sequence[str] | None = None,
    max_workers: int = 8,
) -> Path:
    """Download only the files (task, split) needs, return resolved manifest path.

    The resolved manifest is a JSON file whose ``root`` field points to
    the local HF snapshot directory, so it can be fed directly to
    :meth:`RPXDataset.from_manifest`.
    """
    hf = _hub()
    task_enum = TaskType(task) if isinstance(task, str) and task in TaskType._value2member_map_ else task
    split_enum = Difficulty(split) if isinstance(split, str) else split

    manifest = fetch_manifest(task_enum, split_enum, repo_id, cache_dir, revision)

    pairs = _extract_scene_phase_pairs(manifest)
    if not pairs:
        raise ManifestError(
            f"Manifest {task}/{split} references no scenes; cannot "
            "derive download patterns.",
            hint="This usually means the manifest was generated against "
                 "an empty scene list — re-run the upload script.",
        )

    modalities = list(_modalities_for(task_enum))
    if extra_modalities:
        modalities.extend(extra_modalities)

    allow_patterns = _build_allow_patterns(modalities, pairs)
    allow_patterns.append(_manifest_repo_path(task_enum, split_enum))

    log.info(
        "downloading %d file patterns for task=%s split=%s from %s",
        len(allow_patterns),
        task_enum.value if isinstance(task_enum, TaskType) else task_enum,
        split_enum.value if isinstance(split_enum, Difficulty) else split_enum,
        repo_id,
    )
    try:
        snapshot_root = hf.snapshot_download(
            repo_id=repo_id,
            repo_type=REPO_TYPE,
            allow_patterns=allow_patterns,
            cache_dir=str(cache_dir) if cache_dir else None,
            revision=revision,
            max_workers=max_workers,
        )
    except Exception as e:
        raise DownloadError(
            f"snapshot_download failed for {repo_id}: {e}",
            hint="Rerun with --cache-dir pointing at a writable directory "
                 "or set HF_HUB_OFFLINE=1 to use a prebuilt local cache.",
        ) from e

    resolved = dict(manifest)
    resolved["root"] = str(snapshot_root)
    resolved.setdefault(
        "task",
        task_enum.value if isinstance(task_enum, TaskType) else str(task_enum),
    )

    task_name = task_enum.value if isinstance(task_enum, TaskType) else str(task_enum)
    out_dir = _rpx_cache_dir() / repo_id.replace("/", "__") / "manifests" / task_name
    out_dir.mkdir(parents=True, exist_ok=True)
    out_path = out_dir / f"{split_enum.value if isinstance(split_enum, Difficulty) else split_enum}.json"
    with out_path.open("w", encoding="utf-8") as f:
        json.dump(resolved, f)
    return out_path

fetch_manifest(task: TaskType | str, split: Difficulty | str, repo_id: str = DEFAULT_REPO_ID, cache_dir: str | Path | None = None, revision: str | None = None) -> Dict[str, Any]

Download and parse the task-level manifest for (task, split).

Manifests are small (hundreds of KB) and are fetched eagerly so the caller can discover which (scene, phase) dirs the split references before kicking off a bulk download.

Parameters:

Name Type Description Default
task TaskType or str
required
split Difficulty or str
required
repo_id str

HuggingFace dataset repo id. Defaults to :data:DEFAULT_REPO_ID ("IRVLUTD/rpx-benchmark").

DEFAULT_REPO_ID
cache_dir str or Path
None
revision str
None

Returns:

Type Description
dict

Parsed manifest JSON.

Raises:

Type Description
DownloadError

If the download fails (network, auth, bad repo id) or the manifest file does not exist on the hub.

ManifestError

If the downloaded file is not valid JSON.

Source code in rpx_benchmark/hub.py
def fetch_manifest(
    task: TaskType | str,
    split: Difficulty | str,
    repo_id: str = DEFAULT_REPO_ID,
    cache_dir: str | Path | None = None,
    revision: str | None = None,
) -> Dict[str, Any]:
    """Download and parse the task-level manifest for ``(task, split)``.

    Manifests are small (hundreds of KB) and are fetched eagerly so the
    caller can discover which (scene, phase) dirs the split references
    before kicking off a bulk download.

    Parameters
    ----------
    task : TaskType or str
    split : Difficulty or str
    repo_id : str
        HuggingFace dataset repo id. Defaults to
        :data:`DEFAULT_REPO_ID` (``"IRVLUTD/rpx-benchmark"``).
    cache_dir : str or Path, optional
    revision : str, optional

    Returns
    -------
    dict
        Parsed manifest JSON.

    Raises
    ------
    DownloadError
        If the download fails (network, auth, bad repo id) or the
        manifest file does not exist on the hub.
    ManifestError
        If the downloaded file is not valid JSON.
    """
    hf = _hub()
    repo_path = _manifest_repo_path(task, split)
    try:
        local = hf.hf_hub_download(
            repo_id=repo_id,
            repo_type=REPO_TYPE,
            filename=repo_path,
            cache_dir=str(cache_dir) if cache_dir else None,
            revision=revision,
        )
    except Exception as e:
        raise DownloadError(
            f"Failed to download manifest {repo_path!r} from {repo_id}: {e}",
            hint=(
                "Check your network connection and that the repo id is "
                "spelled correctly. Private repos need HF_TOKEN set."
            ),
        ) from e
    try:
        with open(local, "r", encoding="utf-8") as f:
            return json.load(f)
    except json.JSONDecodeError as e:
        raise ManifestError(
            f"Manifest file at {local} is not valid JSON.",
        ) from e

load(task: TaskType | str, split: Difficulty | str, repo_id: str = DEFAULT_REPO_ID, cache_dir: str | Path | None = None, revision: str | None = None, batch_size: int = 1) -> RPXDataset

Download (task, split) and return an iterable :class:RPXDataset.

Incremental re-use::

# First run: fetches rgb + depth for 'hard' scenes.
depth_ds = rpx.load("monocular_depth", "hard")

# Second run: rgb/depth already cached, only spatial_qa.json fetched.
qa_ds    = rpx.load("visual_grounding", "hard")
Source code in rpx_benchmark/hub.py
def load(
    task: TaskType | str,
    split: Difficulty | str,
    repo_id: str = DEFAULT_REPO_ID,
    cache_dir: str | Path | None = None,
    revision: str | None = None,
    batch_size: int = 1,
) -> RPXDataset:
    """Download (task, split) and return an iterable :class:`RPXDataset`.

    Incremental re-use::

        # First run: fetches rgb + depth for 'hard' scenes.
        depth_ds = rpx.load("monocular_depth", "hard")

        # Second run: rgb/depth already cached, only spatial_qa.json fetched.
        qa_ds    = rpx.load("visual_grounding", "hard")
    """
    manifest_path = download_split(
        task=task,
        split=split,
        repo_id=repo_id,
        cache_dir=cache_dir,
        revision=revision,
    )
    return RPXDataset.from_manifest(manifest_path, batch_size=batch_size)

mount(repo_id: str = DEFAULT_REPO_ID)

Return an HfFileSystem rooted at the RPX repo for lazy browsing.

Each read goes over the network; prefer :func:load for real workloads.

Source code in rpx_benchmark/hub.py
def mount(repo_id: str = DEFAULT_REPO_ID):
    """Return an ``HfFileSystem`` rooted at the RPX repo for lazy browsing.

    Each read goes over the network; prefer :func:`load` for real workloads.
    """
    hf = _hub()
    fs = hf.HfFileSystem()
    return fs, f"datasets/{repo_id}"

show_banner(*, context: Optional[str] = None, subtitle: Optional[str] = None, enabled: Optional[bool] = None, file: Optional[TextIO] = None) -> None

Print the RPX startup banner to a stream (default sys.stderr).

Parameters:

Name Type Description Default
context str

Short status line appended under the links (e.g. "task=monocular_depth split=hard device=cuda").

None
subtitle str

Secondary line directly under the RPX tagline. Use this for the current script name or a per-operation label.

None
enabled bool

Force banner on or off. When None (default), the banner prints unless RPX_NO_BANNER is set in the environment.

None
file file - like

Output stream. Defaults to :data:sys.stderr so the banner does not pollute stdout-parsing pipelines.

None

Examples:

>>> from rpx_benchmark.banner import show_banner
>>> show_banner(context="my smoke run")
Source code in rpx_benchmark/banner.py
def show_banner(
    *,
    context: Optional[str] = None,
    subtitle: Optional[str] = None,
    enabled: Optional[bool] = None,
    file: Optional[TextIO] = None,
) -> None:
    """Print the RPX startup banner to a stream (default ``sys.stderr``).

    Parameters
    ----------
    context : str, optional
        Short status line appended under the links (e.g.
        ``"task=monocular_depth  split=hard  device=cuda"``).
    subtitle : str, optional
        Secondary line directly under the RPX tagline. Use this for
        the current script name or a per-operation label.
    enabled : bool, optional
        Force banner on or off. When ``None`` (default), the banner
        prints unless ``RPX_NO_BANNER`` is set in the environment.
    file : file-like, optional
        Output stream. Defaults to :data:`sys.stderr` so the banner
        does not pollute ``stdout``-parsing pipelines.

    Examples
    --------
    >>> from rpx_benchmark.banner import show_banner
    >>> show_banner(context="my smoke run")  # doctest: +SKIP
    """
    if not _should_show(enabled):
        return
    target = file if file is not None else sys.stderr

    try:
        _render_rich(target, context=context, subtitle=subtitle)
    except ImportError:
        _render_plain(target, context=context, subtitle=subtitle)
    except Exception as e:  # pragma: no cover — never break the caller
        log.debug("banner render failed (%s); falling back to plain", e)
        _render_plain(target, context=context, subtitle=subtitle)

configure_logging(level: str | int = 'INFO', *, force: bool = False, use_rich: Optional[bool] = None) -> logging.Logger

Install a handler on the root rpx_benchmark logger.

Safe to call multiple times: subsequent calls are no-ops unless force=True. The CLI calls this once at the start of main; library users can call it from their own entrypoint.

Parameters:

Name Type Description Default
level str or int

Logging level name ("DEBUG", "INFO", "WARNING", "ERROR") or numeric level. Can be overridden at runtime via the RPX_LOG_LEVEL environment variable.

'INFO'
force bool

If True, re-install the handler even if one is already present. Use with care — multiple handlers cause duplicate output.

False
use_rich bool

Force rich or plain handler selection. When omitted, auto- detects: rich if the rich package is importable and stderr is a TTY, otherwise plain.

None

Returns:

Type Description
Logger

The configured root logger, for chaining.

Examples:

>>> from rpx_benchmark.logging_utils import configure_logging
>>> log = configure_logging("DEBUG")
>>> log.info("benchmark starting")
Source code in rpx_benchmark/logging_utils.py
def configure_logging(
    level: str | int = "INFO",
    *,
    force: bool = False,
    use_rich: Optional[bool] = None,
) -> logging.Logger:
    """Install a handler on the root ``rpx_benchmark`` logger.

    Safe to call multiple times: subsequent calls are no-ops unless
    ``force=True``. The CLI calls this once at the start of ``main``;
    library users can call it from their own entrypoint.

    Parameters
    ----------
    level : str or int
        Logging level name (``"DEBUG"``, ``"INFO"``, ``"WARNING"``,
        ``"ERROR"``) or numeric level. Can be overridden at runtime
        via the ``RPX_LOG_LEVEL`` environment variable.
    force : bool
        If True, re-install the handler even if one is already
        present. Use with care — multiple handlers cause duplicate
        output.
    use_rich : bool, optional
        Force rich or plain handler selection. When omitted, auto-
        detects: rich if the ``rich`` package is importable and stderr
        is a TTY, otherwise plain.

    Returns
    -------
    logging.Logger
        The configured root logger, for chaining.

    Examples
    --------
    >>> from rpx_benchmark.logging_utils import configure_logging
    >>> log = configure_logging("DEBUG")  # doctest: +SKIP
    >>> log.info("benchmark starting")    # doctest: +SKIP
    """
    global _CONFIGURED

    env_level = os.environ.get("RPX_LOG_LEVEL")
    if env_level:
        level = env_level

    if isinstance(level, str):
        level_value = logging.getLevelName(level.upper())
        if not isinstance(level_value, int):
            # Unknown name -> default to INFO with a warning emitted
            # the first time.
            level_value = logging.INFO
    else:
        level_value = int(level)

    root = logging.getLogger(_ROOT_LOGGER_NAME)
    root.setLevel(level_value)
    root.propagate = False

    if _CONFIGURED and not force:
        return root

    # Wipe any prior handlers so re-configuration is idempotent.
    for h in list(root.handlers):
        root.removeHandler(h)

    handler = _build_handler(level_value, use_rich=use_rich)
    root.addHandler(handler)
    _CONFIGURED = True
    return root

get_logger(name: str) -> logging.Logger

Return a module-scoped logger nested under the rpx_benchmark root.

Parameters:

Name Type Description Default
name str

Usually __name__. If it starts with rpx_benchmark it is returned as-is; otherwise it is attached as a child of the root logger so external integrations can still funnel output.

required

Returns:

Type Description
Logger

A logger ready to use. Call log.info(...) etc.

Examples:

>>> from rpx_benchmark.logging_utils import get_logger
>>> log = get_logger("rpx_benchmark.hub")
>>> log.name
'rpx_benchmark.hub'
Source code in rpx_benchmark/logging_utils.py
def get_logger(name: str) -> logging.Logger:
    """Return a module-scoped logger nested under the ``rpx_benchmark`` root.

    Parameters
    ----------
    name : str
        Usually ``__name__``. If it starts with ``rpx_benchmark`` it is
        returned as-is; otherwise it is attached as a child of the root
        logger so external integrations can still funnel output.

    Returns
    -------
    logging.Logger
        A logger ready to use. Call ``log.info(...)`` etc.

    Examples
    --------
    >>> from rpx_benchmark.logging_utils import get_logger
    >>> log = get_logger("rpx_benchmark.hub")
    >>> log.name
    'rpx_benchmark.hub'
    """
    if not name:
        name = _ROOT_LOGGER_NAME
    if name == _ROOT_LOGGER_NAME or name.startswith(_ROOT_LOGGER_NAME + "."):
        return logging.getLogger(name)
    return logging.getLogger(f"{_ROOT_LOGGER_NAME}.{name}")

make_numpy_depth_model(fn: Callable[[np.ndarray], np.ndarray], *, name: str = 'numpy_depth_model') -> BenchmarkableModel

Wrap a plain numpy depth callable as a :class:BenchmarkableModel.

The callable must accept a (H, W, 3) uint8 RGB image and return a (H', W') float metric depth map (in metres). If (H', W') != (H, W), the output is bilinearly resized to match the ground truth.

Parameters:

Name Type Description Default
fn callable

The depth function. Signature: fn(rgb_uint8) -> depth_float.

required
name str

Display name used in logs and reports.

'numpy_depth_model'

Examples:

>>> import numpy as np
>>> import rpx_benchmark as rpx
>>> def my_depth(rgb):
...     return np.full(rgb.shape[:2], 2.0, dtype=np.float32)
>>> bm = rpx.make_numpy_depth_model(my_depth, name="mine")
>>> bm.task is rpx.TaskType.MONOCULAR_DEPTH
True
Source code in rpx_benchmark/adapters/base.py
def make_numpy_depth_model(
    fn: Callable[[np.ndarray], np.ndarray],
    *,
    name: str = "numpy_depth_model",
) -> BenchmarkableModel:
    """Wrap a plain numpy depth callable as a :class:`BenchmarkableModel`.

    The callable must accept a ``(H, W, 3) uint8`` RGB image and return a
    ``(H', W') float`` metric depth map (in metres). If ``(H', W') !=
    (H, W)``, the output is bilinearly resized to match the ground truth.

    Parameters
    ----------
    fn : callable
        The depth function. Signature: ``fn(rgb_uint8) -> depth_float``.
    name : str
        Display name used in logs and reports.

    Examples
    --------
    >>> import numpy as np
    >>> import rpx_benchmark as rpx
    >>> def my_depth(rgb):
    ...     return np.full(rgb.shape[:2], 2.0, dtype=np.float32)
    >>> bm = rpx.make_numpy_depth_model(my_depth, name="mine")
    >>> bm.task is rpx.TaskType.MONOCULAR_DEPTH
    True
    """
    return BenchmarkableModel(
        task=TaskType.MONOCULAR_DEPTH,
        input_adapter=_NumpyDepthInput(),
        model=fn,
        output_adapter=_NumpyDepthOutput(),
        invoker=lambda model, payload: model(payload),
        name=name,
    )

make_numpy_detection_model(fn: Callable[[np.ndarray], Any], *, name: str = 'numpy_detection_model', task: TaskType = TaskType.OBJECT_DETECTION) -> BenchmarkableModel

Wrap a plain numpy detection callable as a :class:BenchmarkableModel.

The callable must accept a (H, W, 3) uint8 RGB image and return either a dict with keys "boxes" / "scores" / "labels" or a (boxes, scores, labels) tuple in that order. Boxes are pixel coordinates in (x1, y1, x2, y2) format, scores are floats in [0, 1], labels are strings.

Parameters:

Name Type Description Default
fn callable

fn(rgb_uint8) -> dict | tuple.

required
name str

Display name for reports.

'numpy_detection_model'
task TaskType

Use :attr:TaskType.OBJECT_DETECTION for closed-vocabulary detection or :attr:TaskType.OPEN_VOCAB_DETECTION for open-vocab (the Prediction contract is the same).

OBJECT_DETECTION

Examples:

>>> import numpy as np
>>> import rpx_benchmark as rpx
>>> def my_det(rgb):
...     return {
...         "boxes": np.array([[10, 10, 30, 30]], dtype=np.float32),
...         "scores": np.array([0.9], dtype=np.float32),
...         "labels": ["cup"],
...     }
>>> bm = rpx.make_numpy_detection_model(my_det)
>>> bm.task is rpx.TaskType.OBJECT_DETECTION
True
Source code in rpx_benchmark/adapters/base.py
def make_numpy_detection_model(
    fn: Callable[[np.ndarray], Any],
    *,
    name: str = "numpy_detection_model",
    task: TaskType = TaskType.OBJECT_DETECTION,
) -> BenchmarkableModel:
    """Wrap a plain numpy detection callable as a :class:`BenchmarkableModel`.

    The callable must accept a ``(H, W, 3) uint8`` RGB image and
    return either a dict with keys ``"boxes"`` / ``"scores"`` /
    ``"labels"`` or a ``(boxes, scores, labels)`` tuple in that
    order. Boxes are pixel coordinates in ``(x1, y1, x2, y2)``
    format, scores are floats in ``[0, 1]``, labels are strings.

    Parameters
    ----------
    fn : callable
        ``fn(rgb_uint8) -> dict | tuple``.
    name : str
        Display name for reports.
    task : TaskType
        Use :attr:`TaskType.OBJECT_DETECTION` for closed-vocabulary
        detection or :attr:`TaskType.OPEN_VOCAB_DETECTION` for
        open-vocab (the Prediction contract is the same).

    Examples
    --------
    >>> import numpy as np
    >>> import rpx_benchmark as rpx
    >>> def my_det(rgb):
    ...     return {
    ...         "boxes": np.array([[10, 10, 30, 30]], dtype=np.float32),
    ...         "scores": np.array([0.9], dtype=np.float32),
    ...         "labels": ["cup"],
    ...     }
    >>> bm = rpx.make_numpy_detection_model(my_det)
    >>> bm.task is rpx.TaskType.OBJECT_DETECTION
    True
    """
    return BenchmarkableModel(
        task=task,
        input_adapter=_NumpyRgbInput(),
        model=fn,
        output_adapter=_NumpyDetectionOutput(),
        invoker=_passthrough_invoker,
        name=name,
    )

make_numpy_grounding_model(fn: Callable[[np.ndarray, str], Any], *, name: str = 'numpy_grounding_model') -> BenchmarkableModel

Wrap a visual-grounding callable as a :class:BenchmarkableModel.

The callable takes (rgb_uint8, text) and returns either a dict with keys "boxes" / "scores" or a tuple (boxes, scores). Boxes are (x1, y1, x2, y2) pixel coordinates; scores are floats. The referring expression text is plucked from sample.ground_truth.text by the adapter so the callable never sees the GT boxes.

Examples:

>>> import numpy as np
>>> import rpx_benchmark as rpx
>>> def my_ground(rgb, text):
...     return (
...         np.array([[10, 10, 30, 30]], dtype=np.float32),
...         np.array([0.8], dtype=np.float32),
...     )
>>> bm = rpx.make_numpy_grounding_model(my_ground)
Source code in rpx_benchmark/adapters/base.py
def make_numpy_grounding_model(
    fn: Callable[[np.ndarray, str], Any],
    *,
    name: str = "numpy_grounding_model",
) -> BenchmarkableModel:
    """Wrap a visual-grounding callable as a :class:`BenchmarkableModel`.

    The callable takes ``(rgb_uint8, text)`` and returns either a
    dict with keys ``"boxes"`` / ``"scores"`` or a tuple
    ``(boxes, scores)``. Boxes are ``(x1, y1, x2, y2)`` pixel
    coordinates; scores are floats. The referring expression
    ``text`` is plucked from ``sample.ground_truth.text`` by the
    adapter so the callable never sees the GT boxes.

    Examples
    --------
    >>> import numpy as np
    >>> import rpx_benchmark as rpx
    >>> def my_ground(rgb, text):
    ...     return (
    ...         np.array([[10, 10, 30, 30]], dtype=np.float32),
    ...         np.array([0.8], dtype=np.float32),
    ...     )
    >>> bm = rpx.make_numpy_grounding_model(my_ground)
    """
    return BenchmarkableModel(
        task=TaskType.VISUAL_GROUNDING,
        input_adapter=_NumpyGroundingInput(),
        model=fn,
        output_adapter=_NumpyGroundingOutput(),
        invoker=_grounding_invoker,
        name=name,
    )

make_numpy_keypoint_model(fn: Callable[[np.ndarray, np.ndarray], Any], *, name: str = 'numpy_keypoint_model') -> BenchmarkableModel

Wrap a keypoint-matching callable as a :class:BenchmarkableModel.

The callable takes (rgb_a, rgb_b) and returns either a dict with keys "points0", "points1" and optional "scores" or a 2/3-tuple in the same order. Points are (N, 2) pixel coordinates.

Examples:

>>> import numpy as np
>>> import rpx_benchmark as rpx
>>> def my_matcher(rgb_a, rgb_b):
...     pts = np.array([[10, 10], [20, 20]], dtype=np.float32)
...     return pts, pts
>>> bm = rpx.make_numpy_keypoint_model(my_matcher)
Source code in rpx_benchmark/adapters/base.py
def make_numpy_keypoint_model(
    fn: Callable[[np.ndarray, np.ndarray], Any],
    *,
    name: str = "numpy_keypoint_model",
) -> BenchmarkableModel:
    """Wrap a keypoint-matching callable as a :class:`BenchmarkableModel`.

    The callable takes ``(rgb_a, rgb_b)`` and returns either a dict
    with keys ``"points0"``, ``"points1"`` and optional ``"scores"``
    or a 2/3-tuple in the same order. Points are ``(N, 2)`` pixel
    coordinates.

    Examples
    --------
    >>> import numpy as np
    >>> import rpx_benchmark as rpx
    >>> def my_matcher(rgb_a, rgb_b):
    ...     pts = np.array([[10, 10], [20, 20]], dtype=np.float32)
    ...     return pts, pts
    >>> bm = rpx.make_numpy_keypoint_model(my_matcher)
    """
    return BenchmarkableModel(
        task=TaskType.KEYPOINT_MATCHING,
        input_adapter=_NumpyKeypointInput(),
        model=fn,
        output_adapter=_NumpyKeypointOutput(),
        invoker=_keypoint_invoker,
        name=name,
    )

make_numpy_mask_model(fn: Callable[[np.ndarray], np.ndarray], *, name: str = 'numpy_mask_model') -> BenchmarkableModel

Wrap a plain numpy instance-mask callable as a :class:BenchmarkableModel.

The callable must accept a (H, W, 3) uint8 RGB image and return a (H', W') int instance mask where pixel values are instance IDs (0 is conventionally background). If (H', W') != (H, W) the output is nearest-neighbour resized to match the GT mask so integer IDs are preserved.

Parameters:

Name Type Description Default
fn callable

fn(rgb_uint8) -> mask_int.

required
name str
'numpy_mask_model'

Examples:

>>> import numpy as np
>>> import rpx_benchmark as rpx
>>> def my_seg(rgb):
...     mask = np.zeros(rgb.shape[:2], dtype=np.int32)
...     mask[rgb.sum(-1) > 384] = 1  # trivial brightness threshold
...     return mask
>>> bm = rpx.make_numpy_mask_model(my_seg, name="mine")
>>> bm.task is rpx.TaskType.OBJECT_SEGMENTATION
True
Source code in rpx_benchmark/adapters/base.py
def make_numpy_mask_model(
    fn: Callable[[np.ndarray], np.ndarray],
    *,
    name: str = "numpy_mask_model",
) -> BenchmarkableModel:
    """Wrap a plain numpy instance-mask callable as a :class:`BenchmarkableModel`.

    The callable must accept a ``(H, W, 3) uint8`` RGB image and return
    a ``(H', W') int`` instance mask where pixel values are instance
    IDs (``0`` is conventionally background). If ``(H', W') != (H, W)``
    the output is nearest-neighbour resized to match the GT mask so
    integer IDs are preserved.

    Parameters
    ----------
    fn : callable
        ``fn(rgb_uint8) -> mask_int``.
    name : str

    Examples
    --------
    >>> import numpy as np
    >>> import rpx_benchmark as rpx
    >>> def my_seg(rgb):
    ...     mask = np.zeros(rgb.shape[:2], dtype=np.int32)
    ...     mask[rgb.sum(-1) > 384] = 1  # trivial brightness threshold
    ...     return mask
    >>> bm = rpx.make_numpy_mask_model(my_seg, name="mine")
    >>> bm.task is rpx.TaskType.OBJECT_SEGMENTATION
    True
    """
    return BenchmarkableModel(
        task=TaskType.OBJECT_SEGMENTATION,
        input_adapter=_NumpyMaskInput(),
        model=fn,
        output_adapter=_NumpyMaskOutput(),
        invoker=lambda model, payload: model(payload),
        name=name,
    )

make_numpy_nvs_model(fn: Callable[[np.ndarray, np.ndarray], np.ndarray], *, name: str = 'numpy_nvs_model') -> BenchmarkableModel

Wrap a novel-view-synthesis callable as a :class:BenchmarkableModel.

The callable takes (rgb_uint8, target_pose) where the target pose is a 4×4 SE(3) camera-to-world matrix (float64). It returns an RGB image for the target viewpoint. Non-uint8 output is clipped and cast; shape mismatches are bilinearly resized.

Examples:

>>> import numpy as np
>>> import rpx_benchmark as rpx
>>> def my_nvs(rgb, target_pose):
...     return rgb  # identity baseline
>>> bm = rpx.make_numpy_nvs_model(my_nvs)
Source code in rpx_benchmark/adapters/base.py
def make_numpy_nvs_model(
    fn: Callable[[np.ndarray, np.ndarray], np.ndarray],
    *,
    name: str = "numpy_nvs_model",
) -> BenchmarkableModel:
    """Wrap a novel-view-synthesis callable as a :class:`BenchmarkableModel`.

    The callable takes ``(rgb_uint8, target_pose)`` where the target
    pose is a 4×4 SE(3) camera-to-world matrix (float64). It
    returns an RGB image for the target viewpoint. Non-uint8 output
    is clipped and cast; shape mismatches are bilinearly resized.

    Examples
    --------
    >>> import numpy as np
    >>> import rpx_benchmark as rpx
    >>> def my_nvs(rgb, target_pose):
    ...     return rgb  # identity baseline
    >>> bm = rpx.make_numpy_nvs_model(my_nvs)
    """
    return BenchmarkableModel(
        task=TaskType.NOVEL_VIEW_SYNTHESIS,
        input_adapter=_NumpyNVSInput(),
        model=fn,
        output_adapter=_NumpyNVSOutput(),
        invoker=_nvs_invoker,
        name=name,
    )

make_numpy_pose_model(fn: Callable[[np.ndarray, np.ndarray], Any], *, name: str = 'numpy_pose_model') -> BenchmarkableModel

Wrap a relative-camera-pose callable as a :class:BenchmarkableModel.

The callable takes (rgb_a, rgb_b) and returns either a dict with keys "rotation" (3×3 rotation matrix or 4-element quaternion) and "translation" (3-vector, metres) or a (rotation, translation) tuple.

Examples:

>>> import numpy as np
>>> import rpx_benchmark as rpx
>>> def my_pose(rgb_a, rgb_b):
...     return {"rotation": np.eye(3), "translation": np.zeros(3)}
>>> bm = rpx.make_numpy_pose_model(my_pose)
Source code in rpx_benchmark/adapters/base.py
def make_numpy_pose_model(
    fn: Callable[[np.ndarray, np.ndarray], Any],
    *,
    name: str = "numpy_pose_model",
) -> BenchmarkableModel:
    """Wrap a relative-camera-pose callable as a :class:`BenchmarkableModel`.

    The callable takes ``(rgb_a, rgb_b)`` and returns either a dict
    with keys ``"rotation"`` (3×3 rotation matrix or 4-element
    quaternion) and ``"translation"`` (3-vector, metres) or a
    ``(rotation, translation)`` tuple.

    Examples
    --------
    >>> import numpy as np
    >>> import rpx_benchmark as rpx
    >>> def my_pose(rgb_a, rgb_b):
    ...     return {"rotation": np.eye(3), "translation": np.zeros(3)}
    >>> bm = rpx.make_numpy_pose_model(my_pose)
    """
    return BenchmarkableModel(
        task=TaskType.RELATIVE_CAMERA_POSE,
        input_adapter=_NumpyPoseInput(),
        model=fn,
        output_adapter=_NumpyPoseOutput(),
        invoker=_pose_invoker,
        name=name,
    )

make_numpy_sparse_depth_model(fn: Callable[[np.ndarray, np.ndarray], np.ndarray], *, name: str = 'numpy_sparse_depth_model') -> BenchmarkableModel

Wrap a sparse-depth callable as a :class:BenchmarkableModel.

The callable takes (rgb_uint8, coords) where coords is a (N, 2) float32 array of pixel coordinates and returns an (N,) float32 array of depths in metres at those exact coordinates.

Examples:

>>> import numpy as np
>>> import rpx_benchmark as rpx
>>> def my_sd(rgb, coords):
...     return np.full(len(coords), 2.0, dtype=np.float32)
>>> bm = rpx.make_numpy_sparse_depth_model(my_sd)
Source code in rpx_benchmark/adapters/base.py
def make_numpy_sparse_depth_model(
    fn: Callable[[np.ndarray, np.ndarray], np.ndarray],
    *,
    name: str = "numpy_sparse_depth_model",
) -> BenchmarkableModel:
    """Wrap a sparse-depth callable as a :class:`BenchmarkableModel`.

    The callable takes ``(rgb_uint8, coords)`` where ``coords`` is a
    ``(N, 2)`` float32 array of pixel coordinates and returns an
    ``(N,)`` float32 array of depths in metres at those exact
    coordinates.

    Examples
    --------
    >>> import numpy as np
    >>> import rpx_benchmark as rpx
    >>> def my_sd(rgb, coords):
    ...     return np.full(len(coords), 2.0, dtype=np.float32)
    >>> bm = rpx.make_numpy_sparse_depth_model(my_sd)
    """
    return BenchmarkableModel(
        task=TaskType.SPARSE_DEPTH,
        input_adapter=_NumpySparseDepthInput(),
        model=fn,
        output_adapter=_NumpySparseDepthOutput(),
        invoker=_sparse_depth_invoker,
        name=name,
    )

make_hf_depth_model(checkpoint: str, *, device: str = 'cuda', dtype: str | None = None, name: str | None = None) -> BenchmarkableModel

One-line factory for any HuggingFace depth-estimation checkpoint.

Parameters:

Name Type Description Default
checkpoint str

HuggingFace Hub path, e.g. "depth-anything/Depth-Anything-V2-Metric-Indoor-Large-hf".

required
device str

Device string passed to .to(device).

'cuda'
dtype str

One of "float16" / "bfloat16" / "float32". If provided, the model is cast to the matching torch dtype.

None
name str

Display name; defaults to the checkpoint id.

None
Source code in rpx_benchmark/adapters/depth_hf.py
def make_hf_depth_model(
    checkpoint: str,
    *,
    device: str = "cuda",
    dtype: str | None = None,
    name: str | None = None,
) -> BenchmarkableModel:
    """One-line factory for any HuggingFace depth-estimation checkpoint.

    Parameters
    ----------
    checkpoint : str
        HuggingFace Hub path, e.g.
        ``"depth-anything/Depth-Anything-V2-Metric-Indoor-Large-hf"``.
    device : str
        Device string passed to ``.to(device)``.
    dtype : str, optional
        One of ``"float16"`` / ``"bfloat16"`` / ``"float32"``. If provided,
        the model is cast to the matching ``torch`` dtype.
    name : str, optional
        Display name; defaults to the checkpoint id.
    """
    try:
        import torch
        from transformers import AutoImageProcessor, AutoModelForDepthEstimation
    except ImportError as e:  # pragma: no cover - guarded at install time
        raise ImportError(
            "make_hf_depth_model needs torch + transformers. "
            "Install with: pip install 'rpx-benchmark[depth-hf]'"
        ) from e

    processor = AutoImageProcessor.from_pretrained(checkpoint)
    model = AutoModelForDepthEstimation.from_pretrained(checkpoint)

    if dtype is not None:
        torch_dtype = {
            "float16": torch.float16,
            "bfloat16": torch.bfloat16,
            "float32": torch.float32,
        }[dtype]
        model = model.to(dtype=torch_dtype)

    model = model.to(device).eval()

    return BenchmarkableModel(
        task=TaskType.MONOCULAR_DEPTH,
        input_adapter=HFDepthInputAdapter(processor=processor, device=device),
        model=model,
        output_adapter=HFDepthOutputAdapter(processor=processor),
        name=name or checkpoint,
    )

make_hf_instance_seg_model(checkpoint: str, *, device: str = 'cuda', threshold: float = 0.5, name: Optional[str] = None, model_class_hint: Optional[str] = None) -> BenchmarkableModel

One-line factory for a HuggingFace segmentation checkpoint.

Parameters:

Name Type Description Default
checkpoint str

HuggingFace Hub id (e.g. "facebook/mask2former-swin-tiny-coco-instance").

required
device str
'cuda'
threshold float

Score threshold for instance acceptance (passed to the processor's post-process if it accepts the kwarg).

0.5
name str

Display name. Defaults to checkpoint.

None
model_class_hint str

One of "instance", "universal", "semantic". Most users should leave this as None and rely on AutoModelForUniversalSegmentation (the super-class used by Mask2Former / OneFormer). Only set this if the auto class does not dispatch correctly for your checkpoint.

None

Raises:

Type Description
AdapterError

If the processor exposes no post-process method we can use.

ImportError

If torch or transformers are not installed.

Source code in rpx_benchmark/adapters/seg_hf.py
def make_hf_instance_seg_model(
    checkpoint: str,
    *,
    device: str = "cuda",
    threshold: float = 0.5,
    name: Optional[str] = None,
    model_class_hint: Optional[str] = None,
) -> BenchmarkableModel:
    """One-line factory for a HuggingFace segmentation checkpoint.

    Parameters
    ----------
    checkpoint : str
        HuggingFace Hub id
        (e.g. ``"facebook/mask2former-swin-tiny-coco-instance"``).
    device : str
    threshold : float
        Score threshold for instance acceptance (passed to the
        processor's post-process if it accepts the kwarg).
    name : str, optional
        Display name. Defaults to ``checkpoint``.
    model_class_hint : str, optional
        One of ``"instance"``, ``"universal"``, ``"semantic"``. Most
        users should leave this as ``None`` and rely on
        ``AutoModelForUniversalSegmentation`` (the super-class used
        by Mask2Former / OneFormer). Only set this if the auto class
        does not dispatch correctly for your checkpoint.

    Raises
    ------
    AdapterError
        If the processor exposes no post-process method we can use.
    ImportError
        If ``torch`` or ``transformers`` are not installed.
    """
    try:
        import torch  # noqa: F401
        from transformers import AutoImageProcessor
    except ImportError as e:  # pragma: no cover
        raise ImportError(
            "make_hf_instance_seg_model needs torch + transformers. "
            "Install with: pip install 'rpx-benchmark[depth-hf]'"
        ) from e

    processor = AutoImageProcessor.from_pretrained(checkpoint)

    model_cls_name = {
        None: "AutoModelForUniversalSegmentation",
        "universal": "AutoModelForUniversalSegmentation",
        "instance": "AutoModelForInstanceSegmentation",
        "semantic": "AutoModelForSemanticSegmentation",
    }[model_class_hint]
    import transformers
    try:
        model_cls = getattr(transformers, model_cls_name)
    except AttributeError as e:
        raise AdapterError(
            f"transformers has no class named {model_cls_name!r}.",
        ) from e
    model = model_cls.from_pretrained(checkpoint).to(device).eval()

    return BenchmarkableModel(
        task=TaskType.OBJECT_SEGMENTATION,
        input_adapter=HFInstanceSegInputAdapter(processor=processor, device=device),
        model=model,
        output_adapter=HFInstanceSegOutputAdapter(
            processor=processor, threshold=threshold,
        ),
        name=name or checkpoint,
    )

available_models(include_deferred: bool = False) -> List[str]

Return sorted registered model names.

By default excludes deferred stubs so the CLI's --model choice list is runnable-only. Pass include_deferred=True to list the full intended slate.

Source code in rpx_benchmark/models/registry.py
def available_models(include_deferred: bool = False) -> List[str]:
    """Return sorted registered model names.

    By default excludes deferred stubs so the CLI's ``--model`` choice
    list is runnable-only. Pass ``include_deferred=True`` to list the
    full intended slate.
    """
    names = sorted(MODEL_REGISTRY.keys())
    if include_deferred:
        return names
    return [n for n in names if n not in DEFERRED_MODELS]

get_factory(name: str) -> Callable[..., BenchmarkableModel]

Return the factory function registered under name (lazy import).

Parameters:

Name Type Description Default
name str

Registered model name. Use :func:available_models to list the current slate.

required

Returns:

Type Description
Callable

The factory function. Instantiate the model by calling it with the appropriate device / kwargs.

Raises:

Type Description
ConfigError

If name is not in the registry. The error lists every currently registered model so typos are obvious.

Source code in rpx_benchmark/models/registry.py
def get_factory(name: str) -> Callable[..., BenchmarkableModel]:
    """Return the factory function registered under ``name`` (lazy import).

    Parameters
    ----------
    name : str
        Registered model name. Use :func:`available_models` to list
        the current slate.

    Returns
    -------
    Callable
        The factory function. Instantiate the model by calling it
        with the appropriate device / kwargs.

    Raises
    ------
    ConfigError
        If ``name`` is not in the registry. The error lists every
        currently registered model so typos are obvious.
    """
    if name not in MODEL_REGISTRY:
        raise ConfigError(
            f"Unknown model {name!r}.",
            hint=(
                "Registered models: "
                + ", ".join(available_models(include_deferred=True))
            ),
        )
    module_suffix, factory_name = MODEL_REGISTRY[name]
    module = importlib.import_module(f"rpx_benchmark.models.{module_suffix}")
    return getattr(module, factory_name)

resolve(name: str, *, device: str = 'cuda', **kwargs) -> BenchmarkableModel

Look up name and call the factory with device + extra kwargs.

Source code in rpx_benchmark/models/registry.py
def resolve(name: str, *, device: str = "cuda", **kwargs) -> BenchmarkableModel:
    """Look up ``name`` and call the factory with ``device`` + extra kwargs."""
    factory = get_factory(name)
    return factory(device=device, **kwargs)

run_monocular_depth(cfg: MonocularDepthRunConfig) -> PipelineResult

Run the monocular absolute depth benchmark end-to-end.

Parameters:

Name Type Description Default
cfg MonocularDepthRunConfig
required

Returns:

Type Description
(BenchmarkResult, DeploymentReadinessReport, dict)

dict has "json" + "markdown" keys pointing at the written report files.

Raises:

Type Description
(ConfigError, DownloadError, ManifestError, ModelError, MetricError)

Propagated from the respective subsystem with actionable hints.

Examples:

>>> import rpx_benchmark as rpx
>>> import numpy as np
>>> def my_depth(rgb): return np.ones(rgb.shape[:2], dtype=np.float32) * 2.0
>>> bm = rpx.make_numpy_depth_model(my_depth, name="unit")
>>> cfg = rpx.MonocularDepthRunConfig(model=bm, split="hard", device="cpu")
>>> result, report, paths = rpx.run_monocular_depth(cfg)
Source code in rpx_benchmark/tasks/monocular_depth.py
def run_monocular_depth(cfg: MonocularDepthRunConfig) -> PipelineResult:
    """Run the monocular absolute depth benchmark end-to-end.

    Parameters
    ----------
    cfg : MonocularDepthRunConfig

    Returns
    -------
    (BenchmarkResult, DeploymentReadinessReport, dict)
        ``dict`` has ``"json"`` + ``"markdown"`` keys pointing at
        the written report files.

    Raises
    ------
    ConfigError, DownloadError, ManifestError, ModelError, MetricError
        Propagated from the respective subsystem with actionable hints.

    Examples
    --------
    >>> import rpx_benchmark as rpx
    >>> import numpy as np
    >>> def my_depth(rgb): return np.ones(rgb.shape[:2], dtype=np.float32) * 2.0
    >>> bm = rpx.make_numpy_depth_model(my_depth, name="unit")
    >>> cfg = rpx.MonocularDepthRunConfig(model=bm, split="hard", device="cpu")
    >>> result, report, paths = rpx.run_monocular_depth(cfg)  # doctest: +SKIP
    """
    return run_pipeline(
        task=TaskType.MONOCULAR_DEPTH,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=True,
        compute_sgc=False,
    )

run_segmentation(cfg: SegmentationRunConfig) -> PipelineResult

Run the object-segmentation benchmark end-to-end.

Returns the same (BenchmarkResult, DeploymentReadinessReport, {json, markdown}) tuple shape as :func:run_monocular_depth. primary_metric="miou" and higher_is_better=True so the deployment-readiness report interprets deltas accordingly.

Source code in rpx_benchmark/tasks/segmentation.py
def run_segmentation(cfg: SegmentationRunConfig) -> PipelineResult:
    """Run the object-segmentation benchmark end-to-end.

    Returns the same ``(BenchmarkResult, DeploymentReadinessReport,
    {json, markdown})`` tuple shape as :func:`run_monocular_depth`.
    ``primary_metric="miou"`` and ``higher_is_better=True`` so the
    deployment-readiness report interprets deltas accordingly.
    """
    return run_pipeline(
        task=TaskType.OBJECT_SEGMENTATION,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=True,
        compute_sgc=False,
    )

run_object_detection(cfg: ObjectDetectionRunConfig) -> PipelineResult

Run the object-detection benchmark end-to-end.

The returned BenchmarkResult.aggregated has precision, recall, and f1 keys produced by :class:rpx_benchmark.metrics.detection.DetectionMetrics. The deployment report uses f1 as the primary metric and treats higher as better.

Source code in rpx_benchmark/tasks/detection.py
def run_object_detection(cfg: ObjectDetectionRunConfig) -> PipelineResult:
    """Run the object-detection benchmark end-to-end.

    The returned ``BenchmarkResult.aggregated`` has ``precision``,
    ``recall``, and ``f1`` keys produced by
    :class:`rpx_benchmark.metrics.detection.DetectionMetrics`. The
    deployment report uses ``f1`` as the primary metric and treats
    higher as better.
    """
    return run_pipeline(
        task=TaskType.OBJECT_DETECTION,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

run_open_vocab_detection(cfg: ObjectDetectionRunConfig) -> PipelineResult

Run the open-vocabulary detection benchmark end-to-end.

Uses the same metric suite as :func:run_object_detection but evaluates on :attr:TaskType.OPEN_VOCAB_DETECTION manifests.

Source code in rpx_benchmark/tasks/detection.py
def run_open_vocab_detection(cfg: ObjectDetectionRunConfig) -> PipelineResult:
    """Run the open-vocabulary detection benchmark end-to-end.

    Uses the same metric suite as :func:`run_object_detection` but
    evaluates on :attr:`TaskType.OPEN_VOCAB_DETECTION` manifests.
    """
    return run_pipeline(
        task=TaskType.OPEN_VOCAB_DETECTION,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

run_visual_grounding(cfg: VisualGroundingRunConfig) -> PipelineResult

Run the visual-grounding benchmark end-to-end.

Source code in rpx_benchmark/tasks/visual_grounding.py
def run_visual_grounding(cfg: VisualGroundingRunConfig) -> PipelineResult:
    """Run the visual-grounding benchmark end-to-end."""
    return run_pipeline(
        task=TaskType.VISUAL_GROUNDING,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

run_relative_pose(cfg: RelativePoseRunConfig) -> PipelineResult

Run the relative-camera-pose benchmark end-to-end.

Source code in rpx_benchmark/tasks/relative_pose.py
def run_relative_pose(cfg: RelativePoseRunConfig) -> PipelineResult:
    """Run the relative-camera-pose benchmark end-to-end."""
    return run_pipeline(
        task=TaskType.RELATIVE_CAMERA_POSE,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

run_keypoint_matching(cfg: KeypointMatchingRunConfig) -> PipelineResult

Run the keypoint-matching benchmark end-to-end.

Source code in rpx_benchmark/tasks/keypoint_matching.py
def run_keypoint_matching(cfg: KeypointMatchingRunConfig) -> PipelineResult:
    """Run the keypoint-matching benchmark end-to-end."""
    return run_pipeline(
        task=TaskType.KEYPOINT_MATCHING,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

run_sparse_depth(cfg: SparseDepthRunConfig) -> PipelineResult

Run the sparse-depth benchmark end-to-end.

Source code in rpx_benchmark/tasks/sparse_depth.py
def run_sparse_depth(cfg: SparseDepthRunConfig) -> PipelineResult:
    """Run the sparse-depth benchmark end-to-end."""
    return run_pipeline(
        task=TaskType.SPARSE_DEPTH,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

run_novel_view_synthesis(cfg: NovelViewSynthesisRunConfig) -> PipelineResult

Run the novel-view-synthesis benchmark end-to-end.

Source code in rpx_benchmark/tasks/novel_view_synthesis.py
def run_novel_view_synthesis(cfg: NovelViewSynthesisRunConfig) -> PipelineResult:
    """Run the novel-view-synthesis benchmark end-to-end."""
    return run_pipeline(
        task=TaskType.NOVEL_VIEW_SYNTHESIS,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

format_markdown_summary(*, task: str, model_name: str, split: str, repo_id: str, result: BenchmarkResult, dr_report: DeploymentReadinessReport | None = None) -> str

Render a benchmark result as a human-readable markdown report.

Parameters:

Name Type Description Default
task str
required
model_name str
required
split str
required
repo_id str
required
result BenchmarkResult
required
dr_report DeploymentReadinessReport

When provided, the output includes the Weighted Phase Score table, State-Transition Robustness deltas, Temporal Stability score, and an Efficiency table (params, FLOPs, latency).

None

Returns:

Type Description
str

A Markdown-formatted report. Matches the terminal UI tables the CLI prints so on-disk reports and terminal output stay in sync.

Examples:

>>> from rpx_benchmark.reports import format_markdown_summary
>>> md = format_markdown_summary(
...     task="monocular_depth", model_name="depth_pro",
...     split="hard", repo_id="IRVLUTD/rpx-benchmark",
...     result=result, dr_report=report,
... )
Source code in rpx_benchmark/reports.py
def format_markdown_summary(
    *,
    task: str,
    model_name: str,
    split: str,
    repo_id: str,
    result: BenchmarkResult,
    dr_report: DeploymentReadinessReport | None = None,
) -> str:
    """Render a benchmark result as a human-readable markdown report.

    Parameters
    ----------
    task : str
    model_name : str
    split : str
    repo_id : str
    result : BenchmarkResult
    dr_report : DeploymentReadinessReport, optional
        When provided, the output includes the Weighted Phase Score
        table, State-Transition Robustness deltas, Temporal Stability
        score, and an Efficiency table (params, FLOPs, latency).

    Returns
    -------
    str
        A Markdown-formatted report. Matches the terminal UI tables
        the CLI prints so on-disk reports and terminal output stay in
        sync.

    Examples
    --------
    >>> from rpx_benchmark.reports import format_markdown_summary  # doctest: +SKIP
    >>> md = format_markdown_summary(                              # doctest: +SKIP
    ...     task="monocular_depth", model_name="depth_pro",
    ...     split="hard", repo_id="IRVLUTD/rpx-benchmark",
    ...     result=result, dr_report=report,
    ... )
    """
    lines = [
        f"# RPX benchmark — {task}",
        "",
        f"- **Model:** `{model_name}`",
        f"- **Split:** `{split}`",
        f"- **Repo:** `{repo_id}`",
        f"- **Samples:** {result.num_samples}",
        "",
        "## Aggregated metrics",
        "",
        "| metric | value |",
        "|---|---|",
    ]
    for k, v in result.aggregated.items():
        lines.append(f"| {k} | {v:.4f} |")

    if dr_report is not None and dr_report.weighted_phase_score is not None:
        wps = dr_report.weighted_phase_score
        lines += [
            "",
            "## Weighted Phase Score",
            "",
            "| phase | score |",
            "|---|---|",
            f"| clutter | {wps.s_clutter:.4f} |",
            f"| interaction | {wps.s_interaction:.4f} |",
            f"| clean | {wps.s_clean:.4f} |",
            f"| **overall** | **{wps.s_overall:.4f}** |",
            f"| Δ interaction (S_I − S_C) | {wps.delta_int:+.4f} |",
            f"| Δ recovery    (S_L − S_I) | {wps.delta_rec:+.4f} |",
        ]
        if dr_report.state_transition is not None:
            st = dr_report.state_transition
            lines += [
                "",
                f"- **STR C→I (interaction drop):** {st.str_c_to_i:+.4f}",
                f"- **STR I→L (recovery):**         {st.str_i_to_l:+.4f}",
            ]
        if dr_report.temporal_stability is not None:
            lines += [
                f"- **Temporal stability (TS):** "
                f"{dr_report.temporal_stability.ts_score:.4f}",
            ]

    if dr_report is not None:
        eff_rows = []
        if dr_report.params_m is not None:
            eff_rows.append(("params (M)", f"{dr_report.params_m:.2f}"))
        if dr_report.flops_g is not None:
            eff_rows.append(("FLOPs (G)", f"{dr_report.flops_g:.2f}"))
        if dr_report.latency_ms_per_sample is not None:
            eff_rows.append(
                ("latency (ms/sample)", f"{dr_report.latency_ms_per_sample:.1f}")
            )
        if eff_rows:
            lines += ["", "## Efficiency", "", "| metric | value |", "|---|---|"]
            for k, v in eff_rows:
                lines.append(f"| {k} | {v} |")
    return "\n".join(lines) + "\n"

write_json(path: str | Path, *, task: str, model_name: str, split: str, repo_id: str, result: BenchmarkResult, dr_report: DeploymentReadinessReport | None = None, extra: Dict[str, Any] | None = None) -> Path

Serialise a benchmark result + deployment report to JSON.

Parameters:

Name Type Description Default
path str or Path

Output file path. Parent directories are created if missing.

required
task str

Task name string (e.g. "monocular_depth").

required
model_name str

Display name of the model under test.

required
split str

ESD difficulty split ("easy", "medium", "hard").

required
repo_id str

HuggingFace dataset repo id the samples came from.

required
result BenchmarkResult

Per-sample + aggregated metric container returned by :class:~rpx_benchmark.runner.BenchmarkRunner.

required
dr_report DeploymentReadinessReport

Weighted Phase Score, STR, TS, efficiency metadata. Omitted when None.

None
extra dict

Arbitrary free-form extra fields to embed under the extra key of the payload. Useful for run-specific metadata a caller wants preserved alongside the standard keys.

None

Returns:

Type Description
Path

The resolved output path, for chaining.

Notes

Dataclasses are converted via :func:dataclasses.asdict, enums are lowered to their .value, and unknown objects pass through unchanged. The output is pretty-printed with indent=2 for diff-friendliness.

Source code in rpx_benchmark/reports.py
def write_json(
    path: str | Path,
    *,
    task: str,
    model_name: str,
    split: str,
    repo_id: str,
    result: BenchmarkResult,
    dr_report: DeploymentReadinessReport | None = None,
    extra: Dict[str, Any] | None = None,
) -> Path:
    """Serialise a benchmark result + deployment report to JSON.

    Parameters
    ----------
    path : str or Path
        Output file path. Parent directories are created if missing.
    task : str
        Task name string (e.g. ``"monocular_depth"``).
    model_name : str
        Display name of the model under test.
    split : str
        ESD difficulty split (``"easy"``, ``"medium"``, ``"hard"``).
    repo_id : str
        HuggingFace dataset repo id the samples came from.
    result : BenchmarkResult
        Per-sample + aggregated metric container returned by
        :class:`~rpx_benchmark.runner.BenchmarkRunner`.
    dr_report : DeploymentReadinessReport, optional
        Weighted Phase Score, STR, TS, efficiency metadata. Omitted
        when ``None``.
    extra : dict, optional
        Arbitrary free-form extra fields to embed under the ``extra``
        key of the payload. Useful for run-specific metadata a caller
        wants preserved alongside the standard keys.

    Returns
    -------
    Path
        The resolved output path, for chaining.

    Notes
    -----
    Dataclasses are converted via :func:`dataclasses.asdict`, enums
    are lowered to their ``.value``, and unknown objects pass through
    unchanged. The output is pretty-printed with indent=2 for
    diff-friendliness.
    """
    path = Path(path)
    path.parent.mkdir(parents=True, exist_ok=True)
    payload: Dict[str, Any] = {
        "task": task,
        "model": model_name,
        "split": split,
        "repo_id": repo_id,
        "num_samples": result.num_samples,
        "aggregated": result.aggregated,
        "per_sample": result.per_sample,
    }
    if dr_report is not None:
        payload["deployment_readiness"] = _to_jsonable(dr_report)
    if extra:
        payload["extra"] = extra
    with path.open("w", encoding="utf-8") as f:
        json.dump(_to_jsonable(payload), f, indent=2)
    return path