Tasks (rpx_benchmark.tasks)¶
End-to-end benchmark pipelines plus the task plugin registry. Nine of ten tasks have a runnable pipeline; object tracking is deferred until a sequence-per-sample protocol is decided.
Registry¶
registry
¶
Pluggable task registry for RPX benchmark pipelines.
Each benchmark task (monocular depth, segmentation, tracking, …)
registers a :class:TaskSpec describing:
- which modalities must be downloaded from HuggingFace,
- which metric drives the ESD-weighted phase score,
- how to parse CLI arguments into a typed config,
- how to run the end-to-end pipeline (download → load → evaluate → report).
The CLI auto-discovers every registered task at startup and
generates one rpx bench <task> subcommand per entry, so adding a
new task requires zero changes to cli.py — you write a new
module under rpx_benchmark/tasks/ and decorate the runner with
@register_task(...).
Example
::
from rpx_benchmark.api import TaskType
from rpx_benchmark.tasks.registry import TaskSpec, register_task
@register_task(
TaskSpec(
task=TaskType.OBJECT_SEGMENTATION,
display_name="Object Segmentation",
primary_metric="miou",
required_modalities=["rgb", "mask"],
build_config=_build_seg_config,
run=_run_segmentation,
add_cli_arguments=_add_seg_cli_arguments,
)
)
def _run_segmentation(cfg): ...
TaskSpec(task: TaskType, display_name: str, primary_metric: str, required_modalities: List[str], build_config: BuildConfigFn, run: RunFn, add_cli_arguments: AddCliArgsFn, higher_is_better: bool = False, description: str = '')
dataclass
¶
Everything the CLI + runner need to drive a task end-to-end.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
TaskType
|
Enum value identifying the task. |
required |
display_name
|
str
|
Human-readable name shown in CLI help and UI headers. |
required |
primary_metric
|
str
|
Metric key (as produced by the task's calculators) used to
drive the ESD-weighted phase score. Lower is better unless
:attr: |
required |
required_modalities
|
list[str]
|
Hint describing which RPX modalities ( |
required |
build_config
|
callable
|
|
required |
run
|
callable
|
|
required |
add_cli_arguments
|
callable
|
|
required |
higher_is_better
|
bool
|
When True, the primary metric is interpreted higher-is-better in reports and deltas. Default False (error metrics). |
False
|
description
|
str
|
One-line description used in subparser help. |
''
|
register_task(spec: TaskSpec) -> TaskSpec
¶
Install a :class:TaskSpec into the global registry.
Returns the spec unchanged, so it can be used as a decorator or a direct call.
Raises:
| Type | Description |
|---|---|
ConfigError
|
If another spec is already registered under the same task. |
Source code in rpx_benchmark/tasks/registry.py
unregister_task(task: TaskType) -> bool
¶
get_task_spec(task: TaskType) -> TaskSpec
¶
Look up a registered spec.
Raises:
| Type | Description |
|---|---|
ConfigError
|
If the task is not registered. The error lists every task that is registered so the user can quickly spot typos. |
Source code in rpx_benchmark/tasks/registry.py
available_tasks() -> List[TaskType]
¶
Shared pipeline helper¶
_pipeline
¶
Shared plumbing for RPX task-level pipelines.
Every task runner under :mod:rpx_benchmark.tasks executes the same
five-step flow — download the split, load the dataset, resolve the
model, run the benchmark runner, write JSON + markdown reports.
That flow lives here so each task module only has to describe what
is task-specific (config subclass, model resolver, primary metric,
CLI argument parser) and delegate the boilerplate.
Design
- :class:
TaskRunConfigowns every field that every task's run function takes. Subclass it with@dataclasswhen a task wants extra fields. - :func:
validate_model_selectorenforces the three-way "exactly one ofmodel/model_name/hf_checkpoint" contract shared by every task. - :func:
normalise_splitturns a string like"hard"into :class:Difficulty.HARDat config construction time. - :func:
resolve_deviceapplies the CUDA-available fallback before any weights are downloaded. - :func:
run_pipelineis the entry point every task runner calls. It takes the task enum, the config, a primary metric key, and a task-specific callable that turns the config into a :class:BenchmarkableModel.
Refactor history: the original run_monocular_depth and
run_segmentation each duplicated ~250 lines of plumbing. Adding
a new task used to mean cloning + editing a big file; now it is
roughly 80 lines of subclass + resolver + registration.
TaskRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None)
dataclass
¶
Base configuration shared by every task runner.
Task-specific config classes should inherit from this dataclass
and add their own fields. The common fields below are consumed
by :func:run_pipeline and cover the download, load, model
resolution, and reporting steps.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
model
|
BenchmarkableModel
|
A fully-constructed model object. Takes precedence over the other two selectors when provided. |
None
|
model_name
|
str
|
Name of a registered model factory (run |
None
|
hf_checkpoint
|
str
|
HuggingFace Hub checkpoint id. Only meaningful when the task
runner ships an HF fast path (currently
:func: |
None
|
split
|
Difficulty or str
|
ESD difficulty split to evaluate on. String values are
normalised to :class: |
HARD
|
repo_id
|
str
|
HuggingFace dataset repo hosting RPX. Defaults to the value
of :data: |
None
|
cache_dir
|
str
|
HuggingFace cache directory override. |
None
|
revision
|
str
|
HF revision (branch / tag) to pin. |
None
|
batch_size
|
int
|
Dataset batch size. Default 1. |
1
|
device
|
str
|
Torch device string. Auto-falls back to |
'cuda'
|
output_dir
|
str
|
Directory to write |
None
|
model_kwargs
|
dict
|
Extra kwargs forwarded to the model factory when resolving
from |
dict()
|
progress
|
ProgressCallback
|
Per-sample progress callback the runner will invoke. |
None
|
Raises:
| Type | Description |
|---|---|
ConfigError
|
Subclasses must call :meth: |
validate_model_selector(cfg: TaskRunConfig) -> None
¶
Raise :class:ConfigError unless exactly one selector is set.
Called from every task config's __post_init__. The three
selectors (model, model_name, hf_checkpoint) are
mutually exclusive — supplying zero or two+ is a user error.
Source code in rpx_benchmark/tasks/_pipeline.py
normalise_split(split: Difficulty | str) -> Difficulty
¶
Turn a string split into a :class:Difficulty enum.
Raises :class:ConfigError with a usable hint when the string
does not match one of the three ESD difficulty levels.
Source code in rpx_benchmark/tasks/_pipeline.py
resolve_device(requested: str) -> str
¶
Return requested or fall back to 'cpu' when CUDA is unavailable.
Emits a WARNING log line on fallback so users see why their
--device cuda request quietly landed on CPU.
Source code in rpx_benchmark/tasks/_pipeline.py
display_name(cfg: TaskRunConfig, model: BenchmarkableModel) -> str
¶
Pick the best human name for a model across the three selectors.
Source code in rpx_benchmark/tasks/_pipeline.py
run_pipeline(*, task: TaskType, primary_metric: str, cfg: TaskRunConfig, model_resolver: Callable[[TaskRunConfig], BenchmarkableModel], compute_ts: bool = True, compute_sgc: bool = False) -> PipelineResult
¶
Execute a task end-to-end.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task
|
TaskType
|
The task enum. Used for the HuggingFace download scope and for metric suite construction. |
required |
primary_metric
|
str
|
Metric key that drives the ESD-weighted phase score. For
lower-is-better metrics (absrel, rmse, rotation_error_deg,
...) the report's |
required |
cfg
|
TaskRunConfig
|
A task-specific subclass of :class: |
required |
model_resolver
|
callable
|
Turns the config into a :class: |
required |
compute_ts
|
bool
|
Whether to compute Temporal Stability. Set False for tasks where TS is meaningless (single-sample predictions across unrelated pairs, e.g. pose / keypoint matching). |
True
|
compute_sgc
|
bool
|
Whether to compute Stack-Level Geometric Coherence. Only meaningful when running segmentation + depth jointly. |
False
|
Returns:
| Type | Description |
|---|---|
(BenchmarkResult, DeploymentReadinessReport, dict)
|
Same shape for every task. The dict contains |
Raises:
| Type | Description |
|---|---|
(ConfigError, DownloadError, ManifestError, ModelError, MetricError)
|
Propagated from the respective subsystem with a hint. |
Source code in rpx_benchmark/tasks/_pipeline.py
263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 | |
Monocular absolute depth¶
monocular_depth
¶
Monocular absolute depth benchmark pipeline.
Reference implementation for a task module. Every other task follows
exactly the same structure: subclass :class:TaskRunConfig, define a
model resolver, wire a :class:TaskSpec. The heavy lifting lives in
:mod:rpx_benchmark.tasks._pipeline so each task file stays small.
Three usage paths
1. Zero-code — any HuggingFace depth checkpoint via the CLI::
rpx bench monocular_depth \
--hf-checkpoint depth-anything/Depth-Anything-V2-Metric-Indoor-Large-hf \
--split hard
2. One-line Python — a plain numpy callable::
import numpy as np
import rpx_benchmark as rpx
def my_depth(rgb: np.ndarray) -> np.ndarray:
return ... # H x W float32, metres
bm = rpx.make_numpy_depth_model(my_depth, name="my_model")
cfg = rpx.MonocularDepthRunConfig(model=bm, split="hard", device="cpu")
result, report, paths = rpx.run_monocular_depth(cfg)
3. Custom adapter stack — provide your own
:class:~rpx_benchmark.adapters.BenchmarkableModel and hand it to
the config via model=.
MonocularDepthRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None)
dataclass
¶
Bases: TaskRunConfig
Runtime configuration for :func:run_monocular_depth.
Inherits every standard field from :class:TaskRunConfig. Adds
no task-specific fields — monocular depth is the "base case" a
new user encounters.
Examples:
>>> from rpx_benchmark import MonocularDepthRunConfig
>>> cfg = MonocularDepthRunConfig(
... hf_checkpoint="depth-anything/Depth-Anything-V2-Metric-Indoor-Small-hf",
... split="hard",
... device="cpu",
... )
run_monocular_depth(cfg: MonocularDepthRunConfig) -> PipelineResult
¶
Run the monocular absolute depth benchmark end-to-end.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cfg
|
MonocularDepthRunConfig
|
|
required |
Returns:
| Type | Description |
|---|---|
(BenchmarkResult, DeploymentReadinessReport, dict)
|
|
Raises:
| Type | Description |
|---|---|
(ConfigError, DownloadError, ManifestError, ModelError, MetricError)
|
Propagated from the respective subsystem with actionable hints. |
Examples:
>>> import rpx_benchmark as rpx
>>> import numpy as np
>>> def my_depth(rgb): return np.ones(rgb.shape[:2], dtype=np.float32) * 2.0
>>> bm = rpx.make_numpy_depth_model(my_depth, name="unit")
>>> cfg = rpx.MonocularDepthRunConfig(model=bm, split="hard", device="cpu")
>>> result, report, paths = rpx.run_monocular_depth(cfg)
Source code in rpx_benchmark/tasks/monocular_depth.py
Object segmentation¶
segmentation
¶
Object segmentation benchmark pipeline.
Same template as :mod:rpx_benchmark.tasks.monocular_depth — swap
the task-specific knobs (TaskType, primary metric,
higher_is_better, model resolver), inherit everything else from
:func:rpx_benchmark.tasks._pipeline.run_pipeline.
Usage
::
rpx bench object_segmentation \
--hf-checkpoint facebook/mask2former-swin-tiny-coco-instance \
--split hard
SegmentationRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None)
dataclass
¶
Bases: TaskRunConfig
Runtime configuration for :func:run_segmentation.
See :class:TaskRunConfig for the full field reference.
Raises:
| Type | Description |
|---|---|
ConfigError
|
If zero or more than one model selector is set, or if the
split is not a valid ESD difficulty, or |
run_segmentation(cfg: SegmentationRunConfig) -> PipelineResult
¶
Run the object-segmentation benchmark end-to-end.
Returns the same (BenchmarkResult, DeploymentReadinessReport,
{json, markdown}) tuple shape as :func:run_monocular_depth.
primary_metric="miou" and higher_is_better=True so the
deployment-readiness report interprets deltas accordingly.
Source code in rpx_benchmark/tasks/segmentation.py
Object detection + open-vocab detection¶
detection
¶
Object detection + open-vocabulary detection pipeline.
Handles both :attr:TaskType.OBJECT_DETECTION and
:attr:TaskType.OPEN_VOCAB_DETECTION — they share the same
Prediction / GroundTruth contract, only the ground-truth vocabulary
differs.
ObjectDetectionRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None)
dataclass
¶
Bases: TaskRunConfig
Runtime configuration for :func:run_object_detection.
See :class:TaskRunConfig for the shared field reference.
Detection currently has no registered model factories, so the
only supported selector is model= (a pre-built
:class:BenchmarkableModel, typically from
:func:rpx_benchmark.make_numpy_detection_model).
run_object_detection(cfg: ObjectDetectionRunConfig) -> PipelineResult
¶
Run the object-detection benchmark end-to-end.
The returned BenchmarkResult.aggregated has precision,
recall, and f1 keys produced by
:class:rpx_benchmark.metrics.detection.DetectionMetrics. The
deployment report uses f1 as the primary metric and treats
higher as better.
Source code in rpx_benchmark/tasks/detection.py
run_open_vocab_detection(cfg: ObjectDetectionRunConfig) -> PipelineResult
¶
Run the open-vocabulary detection benchmark end-to-end.
Uses the same metric suite as :func:run_object_detection but
evaluates on :attr:TaskType.OPEN_VOCAB_DETECTION manifests.
Source code in rpx_benchmark/tasks/detection.py
Visual grounding¶
visual_grounding
¶
Visual grounding benchmark pipeline.
VisualGroundingRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None)
dataclass
¶
Bases: TaskRunConfig
Runtime configuration for :func:run_visual_grounding.
Grounding models take (rgb, text) and return the referred
bounding box. The referring expression is plucked from
sample.ground_truth.text by the adapter so the model never
sees the GT boxes.
run_visual_grounding(cfg: VisualGroundingRunConfig) -> PipelineResult
¶
Run the visual-grounding benchmark end-to-end.
Source code in rpx_benchmark/tasks/visual_grounding.py
Relative camera pose¶
relative_pose
¶
Relative camera pose benchmark pipeline.
RelativePoseRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None)
dataclass
¶
Bases: TaskRunConfig
Runtime configuration for :func:run_relative_pose.
Pose models receive two RGB frames (rgb_a and rgb_b
plucked from sample.metadata by the adapter) and return the
predicted rotation + translation from frame A to frame B.
run_relative_pose(cfg: RelativePoseRunConfig) -> PipelineResult
¶
Run the relative-camera-pose benchmark end-to-end.
Source code in rpx_benchmark/tasks/relative_pose.py
Keypoint matching¶
keypoint_matching
¶
Keypoint matching benchmark pipeline.
KeypointMatchingRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None)
dataclass
¶
Bases: TaskRunConfig
Runtime configuration for :func:run_keypoint_matching.
Matching models receive two RGB frames (rgb_a + rgb_b)
and return corresponding points in each image's pixel grid.
run_keypoint_matching(cfg: KeypointMatchingRunConfig) -> PipelineResult
¶
Run the keypoint-matching benchmark end-to-end.
Source code in rpx_benchmark/tasks/keypoint_matching.py
Sparse depth¶
sparse_depth
¶
Sparse depth benchmark pipeline.
SparseDepthRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None)
dataclass
¶
Bases: TaskRunConfig
Runtime configuration for :func:run_sparse_depth.
Sparse-depth models receive (rgb, coordinates) where
coordinates is the (N, 2) float array of pixel locations
where depth is queried, and return an (N,) array of depth
values in metres at those exact coordinates.
run_sparse_depth(cfg: SparseDepthRunConfig) -> PipelineResult
¶
Run the sparse-depth benchmark end-to-end.
Source code in rpx_benchmark/tasks/sparse_depth.py
Novel view synthesis¶
novel_view_synthesis
¶
Novel view synthesis benchmark pipeline.
NovelViewSynthesisRunConfig(model: Optional[BenchmarkableModel] = None, model_name: Optional[str] = None, hf_checkpoint: Optional[str] = None, split: Difficulty | str = Difficulty.HARD, repo_id: Optional[str] = None, cache_dir: Optional[str] = None, revision: Optional[str] = None, batch_size: int = 1, device: str = 'cuda', output_dir: Optional[str] = None, model_kwargs: Dict[str, Any] = dict(), progress: Optional[ProgressCallback] = None)
dataclass
¶
Bases: TaskRunConfig
Runtime configuration for :func:run_novel_view_synthesis.
NVS models receive (rgb_source, target_pose) where the
target pose is a 4×4 SE(3) camera-to-world matrix plucked from
sample.ground_truth.camera_pose by the adapter. They return
a synthesised RGB image at the target viewpoint.
run_novel_view_synthesis(cfg: NovelViewSynthesisRunConfig) -> PipelineResult
¶
Run the novel-view-synthesis benchmark end-to-end.