Skip to content

Tasks (rpx_benchmark.tasks)

End-to-end benchmark pipelines plus the task plugin registry. Nine of ten tasks have a runnable pipeline; object tracking is deferred until a sequence-per-sample protocol is decided.

Registry

registry

Pluggable task registry for RPX benchmark pipelines.

Each benchmark task (monocular depth, segmentation, tracking, …) registers a :class:TaskSpec describing:

  • which modalities must be downloaded from HuggingFace,
  • which metric drives the ESD-weighted phase score,
  • how to parse CLI arguments into a typed config,
  • how to run the end-to-end pipeline (download → load → evaluate → report).

The CLI auto-discovers every registered task at startup and generates one rpx bench <task> subcommand per entry, so adding a new task requires zero changes to cli.py — you write a new module under rpx_benchmark/tasks/ and decorate the runner with @register_task(...).

Example

::

from rpx_benchmark.api import TaskType
from rpx_benchmark.tasks.registry import TaskSpec, register_task

@register_task(
    TaskSpec(
        task=TaskType.OBJECT_SEGMENTATION,
        display_name="Object Segmentation",
        primary_metric="miou",
        required_modalities=["rgb", "mask"],
        build_config=_build_seg_config,
        run=_run_segmentation,
        add_cli_arguments=_add_seg_cli_arguments,
    )
)
def _run_segmentation(cfg): ...

TaskSpec(task: TaskType, display_name: str, primary_metric: str, required_modalities: List[str], build_config: BuildConfigFn, run: RunFn, add_cli_arguments: AddCliArgsFn, higher_is_better: bool = False, description: str = '') dataclass

Everything the CLI + runner need to drive a task end-to-end.

Parameters:

Name Type Description Default
task TaskType

Enum value identifying the task.

required
display_name str

Human-readable name shown in CLI help and UI headers.

required
primary_metric str

Metric key (as produced by the task's calculators) used to drive the ESD-weighted phase score. Lower is better unless :attr:higher_is_better is set.

required
required_modalities list[str]

Hint describing which RPX modalities (rgb, depth, mask, …) this task needs. Used for docs and for the hub downloader; the per-task run function is responsible for the actual download.

required
build_config callable

(argparse.Namespace) -> config. Converts CLI args into a typed config dataclass the task's run function expects.

required
run callable

(config) -> (BenchmarkResult, DeploymentReadinessReport | None, {paths}). The end-to-end pipeline entrypoint.

required
add_cli_arguments callable

(argparse.ArgumentParser) -> None. Populates a subparser with the task's flags. The CLI calls this once when building its parser tree.

required
higher_is_better bool

When True, the primary metric is interpreted higher-is-better in reports and deltas. Default False (error metrics).

False
description str

One-line description used in subparser help.

''

register_task(spec: TaskSpec) -> TaskSpec

Install a :class:TaskSpec into the global registry.

Returns the spec unchanged, so it can be used as a decorator or a direct call.

Raises:

Type Description
ConfigError

If another spec is already registered under the same task.

Source code in rpx_benchmark/tasks/registry.py
def register_task(spec: TaskSpec) -> TaskSpec:
    """Install a :class:`TaskSpec` into the global registry.

    Returns the spec unchanged, so it can be used as a decorator or a
    direct call.

    Raises
    ------
    ConfigError
        If another spec is already registered under the same task.
    """
    if spec.task in _TASK_REGISTRY:
        raise ConfigError(
            f"Task {spec.task.value!r} is already registered.",
            hint="Call unregister_task() first if you intended to replace it.",
        )
    _TASK_REGISTRY[spec.task] = spec
    log.debug("registered task %s (primary_metric=%s)",
              spec.task.value, spec.primary_metric)
    return spec

unregister_task(task: TaskType) -> bool

Remove a registered task. Returns True if something was removed.

Source code in rpx_benchmark/tasks/registry.py
def unregister_task(task: TaskType) -> bool:
    """Remove a registered task. Returns True if something was removed."""
    return _TASK_REGISTRY.pop(task, None) is not None

get_task_spec(task: TaskType) -> TaskSpec

Look up a registered spec.

Raises:

Type Description
ConfigError

If the task is not registered. The error lists every task that is registered so the user can quickly spot typos.

Source code in rpx_benchmark/tasks/registry.py
def get_task_spec(task: TaskType) -> TaskSpec:
    """Look up a registered spec.

    Raises
    ------
    ConfigError
        If the task is not registered. The error lists every task
        that *is* registered so the user can quickly spot typos.
    """
    if task not in _TASK_REGISTRY:
        raise ConfigError(
            f"No runner registered for task {task.value!r}.",
            hint=(
                "Registered tasks: "
                + ", ".join(sorted(t.value for t in _TASK_REGISTRY))
            ),
        )
    return _TASK_REGISTRY[task]

available_tasks() -> List[TaskType]

Return the sorted list of registered task enum values.

Source code in rpx_benchmark/tasks/registry.py
def available_tasks() -> List[TaskType]:
    """Return the sorted list of registered task enum values."""
    return sorted(_TASK_REGISTRY.keys(), key=lambda t: t.value)

iter_task_specs() -> List[TaskSpec]

Return every registered spec in stable order.

Source code in rpx_benchmark/tasks/registry.py
def iter_task_specs() -> List[TaskSpec]:
    """Return every registered spec in stable order."""
    return [_TASK_REGISTRY[t] for t in available_tasks()]

Shared pipeline helper

_pipeline

Shared plumbing for RPX task-level pipelines.

Every task runner under :mod:rpx_benchmark.tasks executes the same five-step flow — download the split, load the dataset, resolve the model, run the benchmark runner, write JSON + markdown reports. That flow lives here so each task module only has to describe what is task-specific (config subclass, model resolver, primary metric, CLI argument parser) and delegate the boilerplate.

Design
  • :class:TaskRunConfig owns every field that every task's run function takes. Subclass it with @dataclass when a task wants extra fields.
  • :func:validate_model_selector enforces the three-way "exactly one of model / model_name / hf_checkpoint" contract shared by every task.
  • :func:normalise_split turns a string like "hard" into :class:Difficulty.HARD at config construction time.
  • :func:resolve_device applies the CUDA-available fallback before any weights are downloaded.
  • :func:run_pipeline is the entry point every task runner calls. It takes the task enum, the config, a primary metric key, and a task-specific callable that turns the config into a :class:BenchmarkableModel.

Refactor history: the original run_monocular_depth and run_segmentation each duplicated ~250 lines of plumbing. Adding a new task used to mean cloning + editing a big file; now it is roughly 80 lines of subclass + resolver + registration.

TaskRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Base configuration shared by every task runner.

Task-specific config classes should inherit from this dataclass and add their own fields. The common fields below are consumed by :func:run_pipeline and cover the download, load, model resolution, and reporting steps.

Parameters:

Name Type Description Default
model BenchmarkableModel

A fully-constructed model object. Takes precedence over the other two selectors when provided.

None
model_name str

Name of a registered model factory (run rpx models to list the current slate).

None
hf_checkpoint str

HuggingFace Hub checkpoint id. Only meaningful when the task runner ships an HF fast path (currently :func:rpx_benchmark.adapters.depth_hf.make_hf_depth_model for depth and :func:rpx_benchmark.adapters.seg_hf.make_hf_instance_seg_model for segmentation).

None
split Difficulty or str

ESD difficulty split to evaluate on. String values are normalised to :class:Difficulty at config construction time.

HARD
repo_id str

HuggingFace dataset repo hosting RPX. Defaults to the value of :data:rpx_benchmark.hub.DEFAULT_REPO_ID when not provided.

None
cache_dir str

HuggingFace cache directory override.

None
revision str

HF revision (branch / tag) to pin.

None
batch_size int

Dataset batch size. Default 1.

1
device str

Torch device string. Auto-falls back to "cpu" at run time when "cuda" is requested but unavailable.

'cuda'
output_dir str

Directory to write result.json and summary.md into. Defaults to ./rpx_results/<model>/<split>/.

None
model_kwargs dict

Extra kwargs forwarded to the model factory when resolving from model_name or hf_checkpoint.

dict()
progress ProgressCallback

Per-sample progress callback the runner will invoke.

None

Raises:

Type Description
ConfigError

Subclasses must call :meth:_validate_common from their __post_init__ to enforce the selector / split / batch-size invariants.

validate_model_selector(cfg: TaskRunConfig) -> None

Raise :class:ConfigError unless exactly one selector is set.

Called from every task config's __post_init__. The three selectors (model, model_name, hf_checkpoint) are mutually exclusive — supplying zero or two+ is a user error.

Source code in rpx_benchmark/tasks/_pipeline.py
def validate_model_selector(cfg: TaskRunConfig) -> None:
    """Raise :class:`ConfigError` unless exactly one selector is set.

    Called from every task config's ``__post_init__``. The three
    selectors (``model``, ``model_name``, ``hf_checkpoint``) are
    mutually exclusive — supplying zero or two+ is a user error.
    """
    given = [x for x in (cfg.model, cfg.model_name, cfg.hf_checkpoint) if x]
    if len(given) != 1:
        raise ConfigError(
            f"{type(cfg).__name__} requires exactly one of "
            "`model`, `model_name`, or `hf_checkpoint`.",
            hint=(
                "Pick a registered model (`rpx models` lists the slate), "
                "a HuggingFace checkpoint id, or a fully-constructed "
                "BenchmarkableModel instance."
            ),
            details={"given_fields": [
                k for k, v in (
                    ("model", cfg.model),
                    ("model_name", cfg.model_name),
                    ("hf_checkpoint", cfg.hf_checkpoint),
                ) if v
            ]},
        )

normalise_split(split: Difficulty | str) -> Difficulty

Turn a string split into a :class:Difficulty enum.

Raises :class:ConfigError with a usable hint when the string does not match one of the three ESD difficulty levels.

Source code in rpx_benchmark/tasks/_pipeline.py
def normalise_split(split: Difficulty | str) -> Difficulty:
    """Turn a string split into a :class:`Difficulty` enum.

    Raises :class:`ConfigError` with a usable hint when the string
    does not match one of the three ESD difficulty levels.
    """
    if isinstance(split, Difficulty):
        return split
    try:
        return Difficulty(split)
    except ValueError as e:
        raise ConfigError(
            f"Unknown difficulty split {split!r}.",
            hint=f"Use one of: {[d.value for d in Difficulty]}",
        ) from e

resolve_device(requested: str) -> str

Return requested or fall back to 'cpu' when CUDA is unavailable.

Emits a WARNING log line on fallback so users see why their --device cuda request quietly landed on CPU.

Source code in rpx_benchmark/tasks/_pipeline.py
def resolve_device(requested: str) -> str:
    """Return ``requested`` or fall back to ``'cpu'`` when CUDA is unavailable.

    Emits a ``WARNING`` log line on fallback so users see why their
    ``--device cuda`` request quietly landed on CPU.
    """
    if requested != "cuda":
        return requested
    try:
        import torch
    except ImportError:
        return requested
    if torch.cuda.is_available():
        return requested
    log.warning(
        "CUDA requested but torch.cuda.is_available() is False; "
        "falling back to device='cpu'.",
    )
    return "cpu"

display_name(cfg: TaskRunConfig, model: BenchmarkableModel) -> str

Pick the best human name for a model across the three selectors.

Source code in rpx_benchmark/tasks/_pipeline.py
def display_name(
    cfg: TaskRunConfig,
    model: BenchmarkableModel,
) -> str:
    """Pick the best human name for a model across the three selectors."""
    if cfg.model_name:
        return cfg.model_name
    if cfg.hf_checkpoint:
        return cfg.hf_checkpoint
    return getattr(model, "name", "model")

run_pipeline(*, task: TaskType, primary_metric: str, cfg: TaskRunConfig, model_resolver: Callable[[TaskRunConfig], BenchmarkableModel], compute_ts: bool = True, compute_sgc: bool = False) -> PipelineResult

Execute a task end-to-end.

Parameters:

Name Type Description Default
task TaskType

The task enum. Used for the HuggingFace download scope and for metric suite construction.

required
primary_metric str

Metric key that drives the ESD-weighted phase score. For lower-is-better metrics (absrel, rmse, rotation_error_deg, ...) the report's delta_int / delta_rec read as "positive = model got worse" and vice versa for higher-is-better metrics.

required
cfg TaskRunConfig

A task-specific subclass of :class:TaskRunConfig. Its __post_init__ must have already called :meth:TaskRunConfig._validate_common.

required
model_resolver callable

Turns the config into a :class:BenchmarkableModel. Each task's runner supplies its own resolver so it can ship task-specific HF fast paths.

required
compute_ts bool

Whether to compute Temporal Stability. Set False for tasks where TS is meaningless (single-sample predictions across unrelated pairs, e.g. pose / keypoint matching).

True
compute_sgc bool

Whether to compute Stack-Level Geometric Coherence. Only meaningful when running segmentation + depth jointly.

False

Returns:

Type Description
(BenchmarkResult, DeploymentReadinessReport, dict)

Same shape for every task. The dict contains "json" and "markdown" keys pointing at the written reports.

Raises:

Type Description
(ConfigError, DownloadError, ManifestError, ModelError, MetricError)

Propagated from the respective subsystem with a hint.

Source code in rpx_benchmark/tasks/_pipeline.py
def run_pipeline(
    *,
    task: TaskType,
    primary_metric: str,
    cfg: TaskRunConfig,
    model_resolver: Callable[[TaskRunConfig], BenchmarkableModel],
    compute_ts: bool = True,
    compute_sgc: bool = False,
) -> PipelineResult:
    """Execute a task end-to-end.

    Parameters
    ----------
    task : TaskType
        The task enum. Used for the HuggingFace download scope and
        for metric suite construction.
    primary_metric : str
        Metric key that drives the ESD-weighted phase score. For
        lower-is-better metrics (absrel, rmse, rotation_error_deg,
        ...) the report's ``delta_int`` / ``delta_rec`` read as
        "positive = model got worse" and vice versa for
        higher-is-better metrics.
    cfg : TaskRunConfig
        A task-specific subclass of :class:`TaskRunConfig`. Its
        ``__post_init__`` must have already called
        :meth:`TaskRunConfig._validate_common`.
    model_resolver : callable
        Turns the config into a :class:`BenchmarkableModel`. Each
        task's runner supplies its own resolver so it can ship
        task-specific HF fast paths.
    compute_ts : bool
        Whether to compute Temporal Stability. Set False for tasks
        where TS is meaningless (single-sample predictions across
        unrelated pairs, e.g. pose / keypoint matching).
    compute_sgc : bool
        Whether to compute Stack-Level Geometric Coherence. Only
        meaningful when running segmentation + depth jointly.

    Returns
    -------
    (BenchmarkResult, DeploymentReadinessReport, dict)
        Same shape for every task. The dict contains ``"json"`` and
        ``"markdown"`` keys pointing at the written reports.

    Raises
    ------
    ConfigError, DownloadError, ManifestError, ModelError, MetricError
        Propagated from the respective subsystem with a hint.
    """
    cfg.device = resolve_device(cfg.device)

    split_name = cfg.split.value if isinstance(cfg.split, Difficulty) else str(cfg.split)
    log.info(
        "pipeline start: task=%s split=%s device=%s",
        task.value, split_name, cfg.device,
    )

    # 1. Download the split's task-scoped modalities.
    manifest_path = download_split(
        task=task,
        split=cfg.split,
        repo_id=cfg.repo_id or "IRVLUTD/rpx-benchmark",
        cache_dir=cfg.cache_dir,
        revision=cfg.revision,
    )
    dataset = RPXDataset.from_manifest(manifest_path, batch_size=cfg.batch_size)
    log.info("loaded %d samples from %s", len(dataset), manifest_path)

    # 2. Resolve the model — task-specific, handles all three selectors.
    model = model_resolver(cfg)
    name = display_name(cfg, model)

    # 3. Setup (outside the runner so we can profile before the run)
    # and static parameter count.
    model.setup()
    efficiency = _count_params_only(model)
    if efficiency.params_m is not None:
        log.info("model %s: %.2f M params", name, efficiency.params_m)

    # 4. Run the benchmark.
    runner = BenchmarkRunner(
        model=model,
        dataset=dataset,
        metric_suite=MetricSuite.for_task(task),
        call_setup=False,
    )
    result, dr_report = runner.run_with_deployment_readiness(
        primary_metric=primary_metric,
        model_name=name,
        efficiency=efficiency,
        compute_ts=compute_ts,
        compute_sgc_flag=compute_sgc,
        progress=cfg.progress,
    )

    # 5. Write reports.
    safe_name = name.replace("/", "__")
    out_dir = Path(cfg.output_dir or f"./rpx_results/{safe_name}/{split_name}")
    out_dir.mkdir(parents=True, exist_ok=True)
    json_path = out_dir / "result.json"
    md_path = out_dir / "summary.md"

    write_json(
        json_path,
        task=task.value,
        model_name=name,
        split=split_name,
        repo_id=cfg.repo_id or "IRVLUTD/rpx-benchmark",
        result=result,
        dr_report=dr_report,
    )
    md_path.write_text(
        format_markdown_summary(
            task=task.value,
            model_name=name,
            split=split_name,
            repo_id=cfg.repo_id or "IRVLUTD/rpx-benchmark",
            result=result,
            dr_report=dr_report,
        ),
        encoding="utf-8",
    )
    log.info("wrote %s and %s", json_path, md_path)

    return result, dr_report, {"json": json_path, "markdown": md_path}

Monocular absolute depth

monocular_depth

Monocular absolute depth benchmark pipeline.

Reference implementation for a task module. Every other task follows exactly the same structure: subclass :class:TaskRunConfig, define a model resolver, wire a :class:TaskSpec. The heavy lifting lives in :mod:rpx_benchmark.tasks._pipeline so each task file stays small.

Three usage paths

1. Zero-code — any HuggingFace depth checkpoint via the CLI::

rpx bench monocular_depth \
    --hf-checkpoint depth-anything/Depth-Anything-V2-Metric-Indoor-Large-hf \
    --split hard

2. One-line Python — a plain numpy callable::

import numpy as np
import rpx_benchmark as rpx

def my_depth(rgb: np.ndarray) -> np.ndarray:
    return ...  # H x W float32, metres

bm = rpx.make_numpy_depth_model(my_depth, name="my_model")
cfg = rpx.MonocularDepthRunConfig(model=bm, split="hard", device="cpu")
result, report, paths = rpx.run_monocular_depth(cfg)

3. Custom adapter stack — provide your own :class:~rpx_benchmark.adapters.BenchmarkableModel and hand it to the config via model=.

MonocularDepthRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_monocular_depth.

Inherits every standard field from :class:TaskRunConfig. Adds no task-specific fields — monocular depth is the "base case" a new user encounters.

Examples:

>>> from rpx_benchmark import MonocularDepthRunConfig
>>> cfg = MonocularDepthRunConfig(
...     hf_checkpoint="depth-anything/Depth-Anything-V2-Metric-Indoor-Small-hf",
...     split="hard",
...     device="cpu",
... )

run_monocular_depth(cfg: MonocularDepthRunConfig) -> PipelineResult

Run the monocular absolute depth benchmark end-to-end.

Parameters:

Name Type Description Default
cfg MonocularDepthRunConfig
required

Returns:

Type Description
(BenchmarkResult, DeploymentReadinessReport, dict)

dict has "json" + "markdown" keys pointing at the written report files.

Raises:

Type Description
(ConfigError, DownloadError, ManifestError, ModelError, MetricError)

Propagated from the respective subsystem with actionable hints.

Examples:

>>> import rpx_benchmark as rpx
>>> import numpy as np
>>> def my_depth(rgb): return np.ones(rgb.shape[:2], dtype=np.float32) * 2.0
>>> bm = rpx.make_numpy_depth_model(my_depth, name="unit")
>>> cfg = rpx.MonocularDepthRunConfig(model=bm, split="hard", device="cpu")
>>> result, report, paths = rpx.run_monocular_depth(cfg)
Source code in rpx_benchmark/tasks/monocular_depth.py
def run_monocular_depth(cfg: MonocularDepthRunConfig) -> PipelineResult:
    """Run the monocular absolute depth benchmark end-to-end.

    Parameters
    ----------
    cfg : MonocularDepthRunConfig

    Returns
    -------
    (BenchmarkResult, DeploymentReadinessReport, dict)
        ``dict`` has ``"json"`` + ``"markdown"`` keys pointing at
        the written report files.

    Raises
    ------
    ConfigError, DownloadError, ManifestError, ModelError, MetricError
        Propagated from the respective subsystem with actionable hints.

    Examples
    --------
    >>> import rpx_benchmark as rpx
    >>> import numpy as np
    >>> def my_depth(rgb): return np.ones(rgb.shape[:2], dtype=np.float32) * 2.0
    >>> bm = rpx.make_numpy_depth_model(my_depth, name="unit")
    >>> cfg = rpx.MonocularDepthRunConfig(model=bm, split="hard", device="cpu")
    >>> result, report, paths = rpx.run_monocular_depth(cfg)  # doctest: +SKIP
    """
    return run_pipeline(
        task=TaskType.MONOCULAR_DEPTH,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=True,
        compute_sgc=False,
    )

Object segmentation

segmentation

Object segmentation benchmark pipeline.

Same template as :mod:rpx_benchmark.tasks.monocular_depth — swap the task-specific knobs (TaskType, primary metric, higher_is_better, model resolver), inherit everything else from :func:rpx_benchmark.tasks._pipeline.run_pipeline.

Usage

::

rpx bench object_segmentation \
    --hf-checkpoint facebook/mask2former-swin-tiny-coco-instance \
    --split hard

SegmentationRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_segmentation.

See :class:TaskRunConfig for the full field reference.

Raises:

Type Description
ConfigError

If zero or more than one model selector is set, or if the split is not a valid ESD difficulty, or batch_size < 1.

run_segmentation(cfg: SegmentationRunConfig) -> PipelineResult

Run the object-segmentation benchmark end-to-end.

Returns the same (BenchmarkResult, DeploymentReadinessReport, {json, markdown}) tuple shape as :func:run_monocular_depth. primary_metric="miou" and higher_is_better=True so the deployment-readiness report interprets deltas accordingly.

Source code in rpx_benchmark/tasks/segmentation.py
def run_segmentation(cfg: SegmentationRunConfig) -> PipelineResult:
    """Run the object-segmentation benchmark end-to-end.

    Returns the same ``(BenchmarkResult, DeploymentReadinessReport,
    {json, markdown})`` tuple shape as :func:`run_monocular_depth`.
    ``primary_metric="miou"`` and ``higher_is_better=True`` so the
    deployment-readiness report interprets deltas accordingly.
    """
    return run_pipeline(
        task=TaskType.OBJECT_SEGMENTATION,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=True,
        compute_sgc=False,
    )

Object detection + open-vocab detection

detection

Object detection + open-vocabulary detection pipeline.

Handles both :attr:TaskType.OBJECT_DETECTION and :attr:TaskType.OPEN_VOCAB_DETECTION — they share the same Prediction / GroundTruth contract, only the ground-truth vocabulary differs.

ObjectDetectionRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_object_detection.

See :class:TaskRunConfig for the shared field reference. Detection currently has no registered model factories, so the only supported selector is model= (a pre-built :class:BenchmarkableModel, typically from :func:rpx_benchmark.make_numpy_detection_model).

run_object_detection(cfg: ObjectDetectionRunConfig) -> PipelineResult

Run the object-detection benchmark end-to-end.

The returned BenchmarkResult.aggregated has precision, recall, and f1 keys produced by :class:rpx_benchmark.metrics.detection.DetectionMetrics. The deployment report uses f1 as the primary metric and treats higher as better.

Source code in rpx_benchmark/tasks/detection.py
def run_object_detection(cfg: ObjectDetectionRunConfig) -> PipelineResult:
    """Run the object-detection benchmark end-to-end.

    The returned ``BenchmarkResult.aggregated`` has ``precision``,
    ``recall``, and ``f1`` keys produced by
    :class:`rpx_benchmark.metrics.detection.DetectionMetrics`. The
    deployment report uses ``f1`` as the primary metric and treats
    higher as better.
    """
    return run_pipeline(
        task=TaskType.OBJECT_DETECTION,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

run_open_vocab_detection(cfg: ObjectDetectionRunConfig) -> PipelineResult

Run the open-vocabulary detection benchmark end-to-end.

Uses the same metric suite as :func:run_object_detection but evaluates on :attr:TaskType.OPEN_VOCAB_DETECTION manifests.

Source code in rpx_benchmark/tasks/detection.py
def run_open_vocab_detection(cfg: ObjectDetectionRunConfig) -> PipelineResult:
    """Run the open-vocabulary detection benchmark end-to-end.

    Uses the same metric suite as :func:`run_object_detection` but
    evaluates on :attr:`TaskType.OPEN_VOCAB_DETECTION` manifests.
    """
    return run_pipeline(
        task=TaskType.OPEN_VOCAB_DETECTION,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

Visual grounding

visual_grounding

Visual grounding benchmark pipeline.

VisualGroundingRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_visual_grounding.

Grounding models take (rgb, text) and return the referred bounding box. The referring expression is plucked from sample.ground_truth.text by the adapter so the model never sees the GT boxes.

run_visual_grounding(cfg: VisualGroundingRunConfig) -> PipelineResult

Run the visual-grounding benchmark end-to-end.

Source code in rpx_benchmark/tasks/visual_grounding.py
def run_visual_grounding(cfg: VisualGroundingRunConfig) -> PipelineResult:
    """Run the visual-grounding benchmark end-to-end."""
    return run_pipeline(
        task=TaskType.VISUAL_GROUNDING,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

Relative camera pose

relative_pose

Relative camera pose benchmark pipeline.

RelativePoseRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_relative_pose.

Pose models receive two RGB frames (rgb_a and rgb_b plucked from sample.metadata by the adapter) and return the predicted rotation + translation from frame A to frame B.

run_relative_pose(cfg: RelativePoseRunConfig) -> PipelineResult

Run the relative-camera-pose benchmark end-to-end.

Source code in rpx_benchmark/tasks/relative_pose.py
def run_relative_pose(cfg: RelativePoseRunConfig) -> PipelineResult:
    """Run the relative-camera-pose benchmark end-to-end."""
    return run_pipeline(
        task=TaskType.RELATIVE_CAMERA_POSE,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

Keypoint matching

keypoint_matching

Keypoint matching benchmark pipeline.

KeypointMatchingRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_keypoint_matching.

Matching models receive two RGB frames (rgb_a + rgb_b) and return corresponding points in each image's pixel grid.

run_keypoint_matching(cfg: KeypointMatchingRunConfig) -> PipelineResult

Run the keypoint-matching benchmark end-to-end.

Source code in rpx_benchmark/tasks/keypoint_matching.py
def run_keypoint_matching(cfg: KeypointMatchingRunConfig) -> PipelineResult:
    """Run the keypoint-matching benchmark end-to-end."""
    return run_pipeline(
        task=TaskType.KEYPOINT_MATCHING,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

Sparse depth

sparse_depth

Sparse depth benchmark pipeline.

SparseDepthRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_sparse_depth.

Sparse-depth models receive (rgb, coordinates) where coordinates is the (N, 2) float array of pixel locations where depth is queried, and return an (N,) array of depth values in metres at those exact coordinates.

run_sparse_depth(cfg: SparseDepthRunConfig) -> PipelineResult

Run the sparse-depth benchmark end-to-end.

Source code in rpx_benchmark/tasks/sparse_depth.py
def run_sparse_depth(cfg: SparseDepthRunConfig) -> PipelineResult:
    """Run the sparse-depth benchmark end-to-end."""
    return run_pipeline(
        task=TaskType.SPARSE_DEPTH,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )

Novel view synthesis

novel_view_synthesis

Novel view synthesis benchmark pipeline.

NovelViewSynthesisRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None) dataclass

Bases: TaskRunConfig

Runtime configuration for :func:run_novel_view_synthesis.

NVS models receive (rgb_source, target_pose) where the target pose is a 4×4 SE(3) camera-to-world matrix plucked from sample.ground_truth.camera_pose by the adapter. They return a synthesised RGB image at the target viewpoint.

run_novel_view_synthesis(cfg: NovelViewSynthesisRunConfig) -> PipelineResult

Run the novel-view-synthesis benchmark end-to-end.

Source code in rpx_benchmark/tasks/novel_view_synthesis.py
def run_novel_view_synthesis(cfg: NovelViewSynthesisRunConfig) -> PipelineResult:
    """Run the novel-view-synthesis benchmark end-to-end."""
    return run_pipeline(
        task=TaskType.NOVEL_VIEW_SYNTHESIS,
        primary_metric=PRIMARY_METRIC,
        cfg=cfg,
        model_resolver=_resolve_model,
        compute_ts=False,
        compute_sgc=False,
    )