Skip to content

Hub (rpx_benchmark.hub)

Task-aware HuggingFace dataset downloader. Only fetches the modalities a given task actually needs; the HF content-addressed cache makes switching tasks on the same scenes free.

hub

HuggingFace Hub integration for RPX benchmark.

Task-aware downloads: fetches only the modalities a given task needs, reusing HF's content-addressed cache so switching tasks on the same scenes only pulls the new label files.

Repo layout (on HF)::

rpx-benchmark/
├── metadata/
│   ├── scenes.parquet
│   └── esd_scores.parquet
├── manifests/
│   └── <task>/<difficulty>.json      # logical views, not duplicates
└── scenes/scene_000/{0,1,2}/
    ├── rgb/*.png
    ├── depth/*.png                   # 16-bit mm
    ├── mask/*.png                    # integer instance IDs
    ├── pose/*.npz
    ├── tracklets.json
    ├── questionnaires.json
    ├── spatial_qa.json
    └── general_qa.json

fetch_manifest(task: TaskType | str, split: Difficulty | str, repo_id: str = DEFAULT_REPO_ID, cache_dir: str | Path | None = None, revision: str | None = None) -> Dict[str, Any]

Download and parse the task-level manifest for (task, split).

Manifests are small (hundreds of KB) and are fetched eagerly so the caller can discover which (scene, phase) dirs the split references before kicking off a bulk download.

Parameters:

Name Type Description Default
task TaskType or str
required
split Difficulty or str
required
repo_id str

HuggingFace dataset repo id. Defaults to :data:DEFAULT_REPO_ID ("IRVLUTD/rpx-benchmark").

DEFAULT_REPO_ID
cache_dir str or Path
None
revision str
None

Returns:

Type Description
dict

Parsed manifest JSON.

Raises:

Type Description
DownloadError

If the download fails (network, auth, bad repo id) or the manifest file does not exist on the hub.

ManifestError

If the downloaded file is not valid JSON.

Source code in rpx_benchmark/hub.py
def fetch_manifest(
    task: TaskType | str,
    split: Difficulty | str,
    repo_id: str = DEFAULT_REPO_ID,
    cache_dir: str | Path | None = None,
    revision: str | None = None,
) -> Dict[str, Any]:
    """Download and parse the task-level manifest for ``(task, split)``.

    Manifests are small (hundreds of KB) and are fetched eagerly so the
    caller can discover which (scene, phase) dirs the split references
    before kicking off a bulk download.

    Parameters
    ----------
    task : TaskType or str
    split : Difficulty or str
    repo_id : str
        HuggingFace dataset repo id. Defaults to
        :data:`DEFAULT_REPO_ID` (``"IRVLUTD/rpx-benchmark"``).
    cache_dir : str or Path, optional
    revision : str, optional

    Returns
    -------
    dict
        Parsed manifest JSON.

    Raises
    ------
    DownloadError
        If the download fails (network, auth, bad repo id) or the
        manifest file does not exist on the hub.
    ManifestError
        If the downloaded file is not valid JSON.
    """
    hf = _hub()
    repo_path = _manifest_repo_path(task, split)
    try:
        local = hf.hf_hub_download(
            repo_id=repo_id,
            repo_type=REPO_TYPE,
            filename=repo_path,
            cache_dir=str(cache_dir) if cache_dir else None,
            revision=revision,
        )
    except Exception as e:
        raise DownloadError(
            f"Failed to download manifest {repo_path!r} from {repo_id}: {e}",
            hint=(
                "Check your network connection and that the repo id is "
                "spelled correctly. Private repos need HF_TOKEN set."
            ),
        ) from e
    try:
        with open(local, "r", encoding="utf-8") as f:
            return json.load(f)
    except json.JSONDecodeError as e:
        raise ManifestError(
            f"Manifest file at {local} is not valid JSON.",
        ) from e

download_split(task: TaskType | str, split: Difficulty | str, repo_id: str = DEFAULT_REPO_ID, cache_dir: str | Path | None = None, revision: str | None = None, extra_modalities: Sequence[str] | None = None, max_workers: int = 8) -> Path

Download only the files (task, split) needs, return resolved manifest path.

The resolved manifest is a JSON file whose root field points to the local HF snapshot directory, so it can be fed directly to :meth:RPXDataset.from_manifest.

Source code in rpx_benchmark/hub.py
def download_split(
    task: TaskType | str,
    split: Difficulty | str,
    repo_id: str = DEFAULT_REPO_ID,
    cache_dir: str | Path | None = None,
    revision: str | None = None,
    extra_modalities: Sequence[str] | None = None,
    max_workers: int = 8,
) -> Path:
    """Download only the files (task, split) needs, return resolved manifest path.

    The resolved manifest is a JSON file whose ``root`` field points to
    the local HF snapshot directory, so it can be fed directly to
    :meth:`RPXDataset.from_manifest`.
    """
    hf = _hub()
    task_enum = TaskType(task) if isinstance(task, str) and task in TaskType._value2member_map_ else task
    split_enum = Difficulty(split) if isinstance(split, str) else split

    manifest = fetch_manifest(task_enum, split_enum, repo_id, cache_dir, revision)

    pairs = _extract_scene_phase_pairs(manifest)
    if not pairs:
        raise ManifestError(
            f"Manifest {task}/{split} references no scenes; cannot "
            "derive download patterns.",
            hint="This usually means the manifest was generated against "
                 "an empty scene list — re-run the upload script.",
        )

    modalities = list(_modalities_for(task_enum))
    if extra_modalities:
        modalities.extend(extra_modalities)

    allow_patterns = _build_allow_patterns(modalities, pairs)
    allow_patterns.append(_manifest_repo_path(task_enum, split_enum))

    log.info(
        "downloading %d file patterns for task=%s split=%s from %s",
        len(allow_patterns),
        task_enum.value if isinstance(task_enum, TaskType) else task_enum,
        split_enum.value if isinstance(split_enum, Difficulty) else split_enum,
        repo_id,
    )
    try:
        snapshot_root = hf.snapshot_download(
            repo_id=repo_id,
            repo_type=REPO_TYPE,
            allow_patterns=allow_patterns,
            cache_dir=str(cache_dir) if cache_dir else None,
            revision=revision,
            max_workers=max_workers,
        )
    except Exception as e:
        raise DownloadError(
            f"snapshot_download failed for {repo_id}: {e}",
            hint="Rerun with --cache-dir pointing at a writable directory "
                 "or set HF_HUB_OFFLINE=1 to use a prebuilt local cache.",
        ) from e

    resolved = dict(manifest)
    resolved["root"] = str(snapshot_root)
    resolved.setdefault(
        "task",
        task_enum.value if isinstance(task_enum, TaskType) else str(task_enum),
    )

    task_name = task_enum.value if isinstance(task_enum, TaskType) else str(task_enum)
    out_dir = _rpx_cache_dir() / repo_id.replace("/", "__") / "manifests" / task_name
    out_dir.mkdir(parents=True, exist_ok=True)
    out_path = out_dir / f"{split_enum.value if isinstance(split_enum, Difficulty) else split_enum}.json"
    with out_path.open("w", encoding="utf-8") as f:
        json.dump(resolved, f)
    return out_path

load(task: TaskType | str, split: Difficulty | str, repo_id: str = DEFAULT_REPO_ID, cache_dir: str | Path | None = None, revision: str | None = None, batch_size: int = 1) -> RPXDataset

Download (task, split) and return an iterable :class:RPXDataset.

Incremental re-use::

# First run: fetches rgb + depth for 'hard' scenes.
depth_ds = rpx.load("monocular_depth", "hard")

# Second run: rgb/depth already cached, only spatial_qa.json fetched.
qa_ds    = rpx.load("visual_grounding", "hard")
Source code in rpx_benchmark/hub.py
def load(
    task: TaskType | str,
    split: Difficulty | str,
    repo_id: str = DEFAULT_REPO_ID,
    cache_dir: str | Path | None = None,
    revision: str | None = None,
    batch_size: int = 1,
) -> RPXDataset:
    """Download (task, split) and return an iterable :class:`RPXDataset`.

    Incremental re-use::

        # First run: fetches rgb + depth for 'hard' scenes.
        depth_ds = rpx.load("monocular_depth", "hard")

        # Second run: rgb/depth already cached, only spatial_qa.json fetched.
        qa_ds    = rpx.load("visual_grounding", "hard")
    """
    manifest_path = download_split(
        task=task,
        split=split,
        repo_id=repo_id,
        cache_dir=cache_dir,
        revision=revision,
    )
    return RPXDataset.from_manifest(manifest_path, batch_size=batch_size)

mount(repo_id: str = DEFAULT_REPO_ID)

Return an HfFileSystem rooted at the RPX repo for lazy browsing.

Each read goes over the network; prefer :func:load for real workloads.

Source code in rpx_benchmark/hub.py
def mount(repo_id: str = DEFAULT_REPO_ID):
    """Return an ``HfFileSystem`` rooted at the RPX repo for lazy browsing.

    Each read goes over the network; prefer :func:`load` for real workloads.
    """
    hf = _hub()
    fs = hf.HfFileSystem()
    return fs, f"datasets/{repo_id}"