Skip to content

Scanners

Vulnerability scanners that query OSV, NVD, and other databases.

agent_bom.scanners

Vulnerability scanning — local SQLite DB first, OSV.dev API for gaps.

ScanOptions dataclass

Immutable per-scan scanner controls.

The legacy module-level setters remain for CLI compatibility, but request handlers and concurrent scan callers should pass explicit options so tenant policy cannot bleed through shared module state.

Source code in src/agent_bom/scanners/__init__.py
@dataclass(frozen=True)
class ScanOptions:
    """Immutable per-scan scanner controls.

    The legacy module-level setters remain for CLI compatibility, but request
    handlers and concurrent scan callers should pass explicit options so tenant
    policy cannot bleed through shared module state.
    """

    offline: bool = False
    compliance_enabled: bool = False
    resolve_transitive: bool = False
    prefer_local_db: bool = False

IncompleteScanError

Bases: RuntimeError

Raised when a scan cannot produce a trustworthy verdict.

Source code in src/agent_bom/scanners/__init__.py
class IncompleteScanError(RuntimeError):
    """Raised when a scan cannot produce a trustworthy verdict."""

default_scan_options

default_scan_options(*, compliance_enabled: bool = False, resolve_transitive: bool = False, prefer_local_db: bool | None = None, offline: bool | None = None) -> ScanOptions

Build per-scan options while preserving legacy offline defaults.

Source code in src/agent_bom/scanners/__init__.py
def default_scan_options(
    *,
    compliance_enabled: bool = False,
    resolve_transitive: bool = False,
    prefer_local_db: bool | None = None,
    offline: bool | None = None,
) -> ScanOptions:
    """Build per-scan options while preserving legacy offline defaults."""

    return ScanOptions(
        offline=offline_mode if offline is None else offline,
        compliance_enabled=compliance_enabled,
        resolve_transitive=resolve_transitive,
        prefer_local_db=prefer_local_db if prefer_local_db is not None else globals().get("prefer_local_db", False),
    )

set_offline_mode

set_offline_mode(value: bool) -> None

Set offline mode in both scanner and http_client transport layer.

Source code in src/agent_bom/scanners/__init__.py
def set_offline_mode(value: bool) -> None:
    """Set offline mode in both scanner and http_client transport layer."""
    global offline_mode  # noqa: PLW0603
    offline_mode = value
    from agent_bom.http_client import set_offline

    set_offline(value)

query_osv_batch async

query_osv_batch(packages: list[Package]) -> dict[str, list[dict]]

Query OSV API for vulnerabilities in batch.

Source code in src/agent_bom/scanners/__init__.py
async def query_osv_batch(packages: list[Package]) -> dict[str, list[dict]]:
    """Query OSV API for vulnerabilities in batch."""
    return await query_osv_batch_impl(
        packages,
        console=console,
        get_scan_cache=_get_scan_cache,
        get_api_semaphore=_get_api_semaphore,
        bump_scan_perf=_bump_scan_perf,
        enrich_results_if_needed_fn=_enrich_results_if_needed,
        record_scan_warning=record_scan_warning,
        osv_ecosystems_for_package=_osv_ecosystems_for_package,
        non_osv_ecosystems=_NON_OSV_ECOSYSTEMS,
        create_client_fn=create_client,
        request_with_retry_fn=request_with_retry,
    )

build_vulnerabilities

build_vulnerabilities(vuln_data_list: list[dict], package: Package) -> list[Vulnerability]

Convert OSV response data to Vulnerability objects.

Filters out false positives by verifying the package version falls within OSV affected ranges. Deduplicates by canonical CVE ID.

Source code in src/agent_bom/scanners/__init__.py
def build_vulnerabilities(vuln_data_list: list[dict], package: Package) -> list[Vulnerability]:
    """Convert OSV response data to Vulnerability objects.

    Filters out false positives by verifying the package version falls
    within OSV affected ranges.  Deduplicates by canonical CVE ID.
    """
    vulns = []
    seen_ids: set[str] = set()

    for vuln_data in vuln_data_list:
        vuln_id = vuln_data.get("id", "unknown")

        # Version-range filter: skip vulns that don't affect our version
        if package.version and package.version not in ("unknown", "latest"):
            if not _is_version_affected(
                vuln_data,
                package.name,
                package.version,
                package.ecosystem,
                source_package=package.source_package,
            ):
                _logger.debug(
                    "Filtered %s: version %s not in affected range for %s",
                    vuln_id,
                    package.version,
                    package.name,
                )
                continue

        # Compute canonical ID early so dedup catches alias overlaps
        aliases = vuln_data.get("aliases", [])
        cve_alias = next((a for a in aliases if a.startswith("CVE-")), None)
        canonical_id = cve_alias if cve_alias and not vuln_id.startswith("CVE-") else vuln_id

        # Deduplicate by canonical ID AND raw ID — prevents PYSEC/GHSA duplicates
        if canonical_id in seen_ids or vuln_id in seen_ids:
            continue
        seen_ids.add(canonical_id)
        seen_ids.add(vuln_id)
        # Also mark all aliases as seen to prevent future duplicates
        for alias in aliases:
            seen_ids.add(alias)

        severity, cvss_score, sev_source = parse_osv_severity(vuln_data)
        fixed = parse_fixed_version(
            vuln_data,
            package.name,
            package.ecosystem,
            current_version=package.version or "",
            source_package=package.source_package,
        )

        references = [ref.get("url", "") for ref in vuln_data.get("references", []) if ref.get("url")]

        summary = vuln_data.get("summary", vuln_data.get("details", "No description available"))[:200]

        # Collect all aliases (original ID + OSV aliases, minus the canonical)
        all_aliases = [a for a in aliases if a != canonical_id]
        if vuln_id != canonical_id:
            all_aliases.append(vuln_id)

        # Extract CWE IDs from database_specific (GHSA entries store them here)
        cwe_ids: list[str] = []
        db_specific = vuln_data.get("database_specific", {})
        if isinstance(db_specific, dict):
            raw_cwes = db_specific.get("cwe_ids", [])
            if isinstance(raw_cwes, list):
                cwe_ids = [c for c in raw_cwes if isinstance(c, str) and c.startswith("CWE-")]

        vulns.append(
            Vulnerability(
                id=canonical_id,
                summary=summary,
                severity=severity,
                severity_source=sev_source,
                cvss_score=cvss_score,
                fixed_version=fixed,
                references=references,
                published_at=vuln_data.get("published"),
                modified_at=vuln_data.get("modified"),
                aliases=all_aliases,
                cwe_ids=cwe_ids,
                advisory_sources=["osv"],
            )
        )

    return vulns

deduplicate_packages

deduplicate_packages(packages: list) -> list

Remove duplicate packages across discovery sources.

Deduplicates by (ecosystem, normalized_name, version) fingerprint. When duplicates exist, the first occurrence is kept (preserves source ordering).

This prevents redundant OSV API calls and duplicate vulnerability findings when the same package is discovered from multiple sources (local, K8s, cloud).

Parameters:

Name Type Description Default
packages list

List of Package objects from one or more discovery sources.

required

Returns:

Type Description
list

Deduplicated list, preserving first-seen order.

Source code in src/agent_bom/scanners/__init__.py
def deduplicate_packages(packages: list) -> list:
    """Remove duplicate packages across discovery sources.

    Deduplicates by (ecosystem, normalized_name, version) fingerprint.
    When duplicates exist, the first occurrence is kept (preserves source ordering).

    This prevents redundant OSV API calls and duplicate vulnerability findings
    when the same package is discovered from multiple sources (local, K8s, cloud).

    Args:
        packages: List of Package objects from one or more discovery sources.

    Returns:
        Deduplicated list, preserving first-seen order.
    """
    seen: set[tuple[str, str, str]] = set()
    result = []
    for pkg in packages:
        # Use normalized name for dedup (PEP 503: torch == Torch == pytorch)
        name = getattr(pkg, "name", "") or ""
        ecosystem = getattr(pkg, "ecosystem", "") or ""
        version = getattr(pkg, "version", "") or ""
        key = canonical_package_identity(name, version, ecosystem, getattr(pkg, "purl", None))
        if key not in seen:
            seen.add(key)
            result.append(pkg)
    return result

scan_packages async

scan_packages(packages: list[Package], *, resolve_transitive: bool = False, options: ScanOptions | None = None) -> int

Scan a list of packages for vulnerabilities. Returns count of vulns found.

Source code in src/agent_bom/scanners/__init__.py
async def scan_packages(
    packages: list[Package],
    *,
    resolve_transitive: bool = False,
    options: ScanOptions | None = None,
) -> int:
    """Scan a list of packages for vulnerabilities. Returns count of vulns found."""
    scan_options = options or default_scan_options(resolve_transitive=resolve_transitive)
    scan_offline = scan_options.offline
    scan_resolve_transitive = scan_options.resolve_transitive
    scan_prefer_local_db = scan_options.prefer_local_db
    # Deduplicate packages across discovery sources before scanning.
    # Prevents redundant OSV API calls when the same package is discovered
    # from multiple sources (local, K8s, cloud).
    original_count = len(packages)
    packages = deduplicate_packages(packages)
    deduped = original_count - len(packages)
    reset_scan_performance()
    _bump_scan_perf("packages_seen", original_count)
    if deduped > 0:
        _bump_scan_perf("packages_deduplicated", deduped)
    if deduped > 0:
        _logger.info("Deduplicated %d duplicate packages (kept %d unique)", deduped, len(packages))

    # Normalize package names for consistent matching (PEP 503 for PyPI)
    # and strip pip extras notation (OSV doesn't understand extras)
    for pkg in packages:
        if pkg.ecosystem.lower() == "pypi":
            if "[" in pkg.name:
                pkg.name = _strip_extras(pkg.name)
            pkg.name = normalize_package_name(pkg.name, pkg.ecosystem)

    reset_scan_warnings()
    try:
        from agent_bom.resolver import reset_performance_stats as _reset_resolver_performance

        _reset_resolver_performance()
    except Exception:  # noqa: BLE001
        pass

    # ── Local version resolution (installed packages) ──────────────────────
    # Try resolving versions from locally installed packages FIRST.
    # This is more accurate than registry fallback because it reflects
    # what's actually on disk (e.g. npm list, pip list).
    unresolved = [p for p in packages if p.version in ("latest", "unknown", "") and p.ecosystem.lower() in ("npm", "pypi", "go")]
    if unresolved:
        try:
            from agent_bom.resolvers.runtime_resolver import (
                resolve_go_versions,
                resolve_npm_versions,
                resolve_pip_versions,
            )

            local_resolved = 0

            # Resolve pip packages from locally installed versions
            pip_unresolved = [p for p in unresolved if p.ecosystem.lower() == "pypi"]
            if pip_unresolved:
                pip_versions = resolve_pip_versions()
                for pkg in pip_unresolved:
                    installed_ver = (
                        pip_versions.get(pkg.name.lower())
                        or pip_versions.get(pkg.name.lower().replace("-", "_"))
                        or pip_versions.get(pkg.name.lower().replace("_", "-"))
                    )
                    if installed_ver:
                        pkg.version = installed_ver
                        pkg.purl = f"pkg:{pkg.ecosystem}/{pkg.name}@{installed_ver}"
                        pkg.version_source = "installed"
                        local_resolved += 1

            # Resolve npm packages from locally installed versions
            npm_unresolved = [p for p in unresolved if p.ecosystem.lower() == "npm"]
            if npm_unresolved:
                from pathlib import Path as _NpmPath

                # Try CWD — npm ls reports the full dependency tree
                npm_versions = resolve_npm_versions(_NpmPath.cwd())
                for pkg in npm_unresolved:
                    installed_ver = npm_versions.get(pkg.name)
                    if installed_ver:
                        pkg.version = installed_ver
                        pkg.purl = f"pkg:{pkg.ecosystem}/{pkg.name}@{installed_ver}"
                        pkg.version_source = "installed"
                        local_resolved += 1

            # Resolve Go packages from locally installed versions
            go_unresolved = [p for p in unresolved if p.ecosystem.lower() == "go"]
            if go_unresolved:
                from pathlib import Path as _GoPath

                go_versions = resolve_go_versions(_GoPath.cwd())
                for pkg in go_unresolved:
                    installed_ver = go_versions.get(pkg.name)
                    if installed_ver:
                        pkg.version = installed_ver
                        pkg.purl = f"pkg:{pkg.ecosystem}/{pkg.name}@{installed_ver}"
                        pkg.version_source = "installed"
                        local_resolved += 1

            if local_resolved:
                console.print(f"  [green]✓[/green] Resolved {local_resolved} package version(s) from local install")
        except Exception as exc:
            _logger.debug("Local version resolution failed: %s", exc)

    # ── Registry fallback for still-unresolved versions ──────────────────
    # Only hit npm/PyPI registries for packages we couldn't resolve locally.
    # In offline mode, skip all registry calls entirely.
    still_unresolved = [p for p in packages if p.version in ("latest", "unknown", "") and p.ecosystem.lower() in ("npm", "pypi", "conda")]
    if still_unresolved and not scan_offline:
        try:
            from agent_bom.resolver import resolve_all_versions

            resolved_count = await resolve_all_versions(still_unresolved)
            if resolved_count:
                # Mark these as registry-resolved so output shows confidence
                for pkg in still_unresolved:
                    if pkg.version not in ("latest", "unknown", ""):
                        pkg.version_source = "registry_fallback"
                console.print(f"  [green]✓[/green] Auto-resolved {resolved_count} package version(s) from registry")
        except Exception as exc:
            _logger.warning("Version resolution failed for %d package(s): %s", len(still_unresolved), exc)
            console.print(f"  [yellow]⚠[/yellow] Version resolution skipped: {exc}")
            record_scan_warning("version resolution failed for one or more packages")
    elif still_unresolved and scan_offline:
        _logger.info("Offline mode: skipping registry version resolution for %d package(s)", len(still_unresolved))

    # ── Transitive dependency resolution (npm / PyPI / Go) ───────────────────
    if scan_resolve_transitive and not scan_offline:
        transitive_ecosystems = {"npm", "pypi", "go"}
        eligible = [p for p in packages if p.ecosystem.lower() in transitive_ecosystems]
        if eligible:
            try:
                from agent_bom.transitive import resolve_transitive_dependencies

                _logger.info("Resolving transitive dependencies for %d package(s)...", len(eligible))
                transitive_pkgs = await resolve_transitive_dependencies(eligible)
                if transitive_pkgs:
                    existing_keys = {f"{p.ecosystem.lower()}:{normalize_package_name(p.name, p.ecosystem)}@{p.version}" for p in packages}
                    new_pkgs = [
                        p
                        for p in transitive_pkgs
                        if f"{p.ecosystem.lower()}:{normalize_package_name(p.name, p.ecosystem)}@{p.version}" not in existing_keys
                    ]
                    if new_pkgs:
                        packages = packages + new_pkgs
                        packages = deduplicate_packages(packages)
                        console.print(f"  [cyan]→[/cyan] Transitive resolution: {len(new_pkgs)} additional package(s) queued")
            except Exception as exc:  # noqa: BLE001
                _logger.warning("Transitive resolution failed, scanning direct dependencies only: %s", exc)
                record_scan_warning("transitive dependency resolution failed")

    # SAST packages already carry vulns from Semgrep — skip OSV query for them
    scannable = [p for p in packages if p.version not in ("unknown", "latest", "") and p.ecosystem.lower() != "sast"]

    # Warn about packages that could not be resolved — no silent failures
    still_unresolved = [p for p in packages if p.version in ("unknown", "latest", "") and p.ecosystem.lower() != "sast"]
    if still_unresolved:
        names = ", ".join(f"{p.name}@{p.version}" for p in still_unresolved[:10])
        suffix = f" (+{len(still_unresolved) - 10} more)" if len(still_unresolved) > 10 else ""
        console.print(f"  [yellow]⚠[/yellow] {len(still_unresolved)} package(s) skipped (unresolved version): {names}{suffix}")
        _logger.warning(
            "Skipped %d package(s) with unresolved versions: %s",
            len(still_unresolved),
            names + suffix,
        )
        record_scan_warning(f"{len(still_unresolved)} package(s) skipped due to unresolved versions")

    if not scannable:
        return 0

    # ── Local DB lookup (fast, offline-capable) ───────────────────────────────
    # Query the local SQLite DB first.  Packages covered by the DB skip the
    # OSV API call — saving round-trips and enabling fully offline scanning
    # when the DB is populated via `agent-bom db update`.
    local_count, db_covered = _scan_packages_local_db(scannable)
    if local_count:
        local_label = "vulnerability found" if local_count == 1 else "vulnerabilities found"
        console.print(f"  [green]✓[/green] Local DB: {local_count} {local_label} (offline)")

    # Only call OSV for packages not already covered by the local DB
    def _db_key(p: Package) -> str:
        return f"{p.ecosystem.lower()}:{normalize_package_name(p.name, p.ecosystem)}@{p.version}"

    osv_targets = [p for p in scannable if _db_key(p) not in db_covered]

    if scan_offline or (scan_prefer_local_db and not osv_targets):
        if scan_offline and not db_covered:
            raise IncompleteScanError(
                "Offline mode requires a populated local vulnerability DB. Run `agent-bom db update` before using `--offline`."
            )
        if osv_targets and scan_offline:
            _logger.info("Offline mode: skipping OSV API for %d package(s) not in local DB", len(osv_targets))
            skipped_names = ", ".join(f"{pkg.name}@{pkg.version}" for pkg in osv_targets[:5])
            suffix = f" (+{len(osv_targets) - 5} more)" if len(osv_targets) > 5 else ""
            raise IncompleteScanError(
                "Offline mode cannot produce a trustworthy verdict because "
                f"{len(osv_targets)} package(s) are missing from the local vulnerability DB: "
                f"{skipped_names}{suffix}. Run `agent-bom db update` before using `--offline`."
            )
        results = {}
    elif scan_prefer_local_db and osv_targets:
        # DB is fresh — only query OSV for packages genuinely missing from DB
        _logger.debug("Local DB preferred: querying OSV for %d uncovered package(s) only", len(osv_targets))
        results = await query_osv_batch(osv_targets)
    elif osv_targets:
        results = await query_osv_batch(osv_targets)
    else:
        results = {}

    total_vulns = local_count
    for pkg in osv_targets:
        norm = normalize_package_name(pkg.name, pkg.ecosystem)
        key = f"{pkg.ecosystem.lower()}:{norm}@{pkg.version}"
        vuln_data = results.get(key, [])
        if vuln_data:
            new_vulns = build_vulnerabilities(vuln_data, pkg)
            # Merge: don't duplicate what the local DB already found
            existing_ids = {v.id for v in pkg.vulnerabilities}
            merged = [v for v in new_vulns if v.id not in existing_ids]
            pkg.vulnerabilities.extend(merged)
            total_vulns += len(merged)
            # Tag each CVE with compliance framework codes (pre-enrichment)
            for v in merged:
                v.compliance_tags = _tag_vuln(v, pkg)
            # Flag packages with MAL- prefixed vulnerability IDs as malicious
            flag_malicious_from_vulns(pkg)

    # Back-fill: also run OSV tagging for packages that came from local DB only
    for pkg in scannable:
        if pkg in osv_targets:
            continue  # already processed above
        for v in pkg.vulnerabilities:
            if not v.compliance_tags:
                v.compliance_tags = _tag_vuln(v, pkg)

    # Supplemental: check NVIDIA advisories for all AI framework packages.
    # nvidia_advisory.py maps NVIDIA CSAF products to bundling frameworks (torch,
    # jax, vllm, etc.) so we pass ALL AI packages — not just nvidia-prefixed ones.
    nvidia_packages = [
        p
        for p in scannable
        if p.name.lower().replace("-", "_") in _AI_FRAMEWORK_PACKAGES or p.name.lower().replace("-", "") in _AI_FRAMEWORK_PACKAGES
    ]
    if nvidia_packages and not scan_offline:
        try:
            from agent_bom.scanners.nvidia_advisory import check_nvidia_advisories

            nvidia_new = await check_nvidia_advisories(nvidia_packages)
            if nvidia_new:
                total_vulns += nvidia_new
                console.print(f"  [green]✓[/green] NVIDIA advisories: {nvidia_new} additional CVE(s)")
        except Exception as exc:
            _logger.warning("NVIDIA advisory check failed for %d package(s): %s", len(nvidia_packages), exc)
            console.print(f"  [yellow]⚠[/yellow] NVIDIA advisory check skipped: {exc}")
            record_scan_warning("NVIDIA advisory enrichment skipped")

    # Supplemental: check GitHub Security Advisories for all packages
    if scannable and not scan_offline:
        try:
            from agent_bom.scanners.ghsa_advisory import check_github_advisories

            ghsa_new = await check_github_advisories(scannable)
            if ghsa_new:
                total_vulns += ghsa_new
                console.print(f"  [green]✓[/green] GHSA advisories: {ghsa_new} additional CVE(s)")
        except Exception as exc:
            _logger.warning("GHSA advisory check failed for %d package(s): %s", len(scannable), exc)
            console.print(f"  [yellow]⚠[/yellow] GHSA advisory check skipped: {exc}")
            record_scan_warning("GHSA advisory enrichment skipped")

    # Typosquat detection for all scanned packages
    for pkg in scannable:
        if not pkg.is_malicious:
            target = check_typosquat(pkg.name, pkg.ecosystem)
            if target:
                pkg.is_malicious = True
                pkg.malicious_reason = f"Possible typosquat of '{target}'"
            # Dependency confusion check
            confusion_warning = check_dependency_confusion(pkg)
            if confusion_warning and not pkg.is_malicious:
                pkg.is_malicious = True
                pkg.malicious_reason = confusion_warning

    # Apply .agent-bom-ignore suppression rules
    try:
        from agent_bom.ignore import apply_ignore_rules, load_ignore_file

        rules = load_ignore_file()
        if not rules.is_empty:
            suppressed = apply_ignore_rules(scannable, rules)
            if suppressed:
                total_vulns -= suppressed
                console.print(f"  [yellow]⚠[/yellow] Suppressed {suppressed} finding(s) via .agent-bom-ignore")
    except Exception as exc:
        _logger.warning("Ignore file processing skipped: %s", exc)
        record_scan_warning("ignore-file processing failed")

    return total_vulns

scan_agents async

scan_agents(agents: list[Agent], *, compliance_enabled: bool = False, resolve_transitive: bool = False, show_scan_banner: bool = True, options: ScanOptions | None = None) -> list[BlastRadius]

Scan all agents' MCP server packages for vulnerabilities.

Source code in src/agent_bom/scanners/__init__.py
async def scan_agents(
    agents: list[Agent],
    *,
    compliance_enabled: bool = False,
    resolve_transitive: bool = False,
    show_scan_banner: bool = True,
    options: ScanOptions | None = None,
) -> list[BlastRadius]:
    """Scan all agents' MCP server packages for vulnerabilities."""
    scan_options = options or default_scan_options(
        compliance_enabled=compliance_enabled,
        resolve_transitive=resolve_transitive,
    )
    if show_scan_banner:
        console.print("\n[bold blue]🛡️  Scanning for vulnerabilities...[/bold blue]\n")

    def _pkg_key(pkg: Package) -> str:
        return canonical_package_key(pkg.name, pkg.version, pkg.ecosystem, pkg.purl)

    # Collect all unique packages
    all_packages = []
    pkg_to_servers: dict[str, list[MCPServer]] = {}
    pkg_to_agents: dict[str, list[Agent]] = {}

    for agent in agents:
        for server in agent.mcp_servers:
            for pkg in server.packages:
                key = _pkg_key(pkg)
                all_packages.append(pkg)

                if key not in pkg_to_servers:
                    pkg_to_servers[key] = []
                pkg_to_servers[key].append(server)

                if key not in pkg_to_agents:
                    pkg_to_agents[key] = []
                if agent not in pkg_to_agents[key]:
                    pkg_to_agents[key].append(agent)

    # Deduplicate packages for scanning — uses canonical deduplicate_packages()
    # which normalizes by (ecosystem, normalized_name, version) fingerprint.
    unique_packages = deduplicate_packages(all_packages)

    if show_scan_banner:
        console.print(f"  Scanning {len(unique_packages)} unique packages across {len(agents)} agent(s)...")

    total_vulns = await scan_packages(unique_packages, options=scan_options)

    # Propagate vulnerabilities back to all instances
    vuln_map = {}
    for pkg in unique_packages:
        if pkg.vulnerabilities:
            vuln_map[_pkg_key(pkg)] = pkg.vulnerabilities

    for agent in agents:
        for server in agent.mcp_servers:
            for pkg in server.packages:
                if _pkg_key(pkg) in vuln_map:
                    pkg.vulnerabilities = vuln_map[_pkg_key(pkg)]

    # Build blast radius analysis
    blast_radii = []
    for pkg in unique_packages:
        if not pkg.vulnerabilities:
            continue

        key = _pkg_key(pkg)
        affected_servers = pkg_to_servers.get(key, [])
        affected_agents = pkg_to_agents.get(key, [])

        # Collect exposed credentials and tools — enrich from registry when server
        # config doesn't have explicit tool/credential data.
        # Cache registry lookups per server to avoid duplicate tool creation.
        #
        # IMPORTANT: Registry-sourced tools are "phantom" — they reflect what
        # the registry CLAIMS the server has, not what was introspected.
        # We include them for visibility but mark them so blast radius
        # consumers can distinguish confirmed vs phantom tools.
        from agent_bom.parsers import get_registry_entry

        exposed_creds: list[str] = []
        exposed_tools: list = []
        _registry_cache: dict[str, dict | None] = {}
        _has_phantom_tools = False
        for server in affected_servers:
            server_creds = server.credential_names
            server_tools = list(server.tools)  # copy — don't mutate server

            # Registry enrichment: if no tools/creds known from config, use registry
            if not server_tools or not server_creds:
                if server.name not in _registry_cache:
                    _registry_cache[server.name] = get_registry_entry(server)
                reg = _registry_cache[server.name]
                if reg:
                    if not server_tools and reg.get("tools"):
                        from agent_bom.models import MCPTool

                        # Mark as registry-sourced (phantom) — not confirmed by introspection
                        server_tools = [MCPTool(name=t, description="(registry — unverified)") for t in reg["tools"]]
                        _has_phantom_tools = True
                    if not server_creds and reg.get("credential_env_vars"):
                        server_creds = reg["credential_env_vars"]

            exposed_creds.extend(server_creds)
            exposed_tools.extend(server_tools)

        # Deduplicate credentials and tools to prevent inflation
        exposed_creds_deduped = list(set(exposed_creds))
        seen_tool_names: set[str] = set()
        deduped_tools = []
        for t in exposed_tools:
            if t.name not in seen_tool_names:
                seen_tool_names.add(t.name)
                deduped_tools.append(t)
        exposed_tools = deduped_tools

        # AI-native risk context: elevated when an AI framework has creds + tools
        is_ai_framework = (
            pkg.name.lower().replace("-", "_") in {n.replace("-", "_") for n in _AI_FRAMEWORK_PACKAGES}
            or pkg.name.lower() in _AI_FRAMEWORK_PACKAGES
        )
        has_creds = bool(exposed_creds_deduped)
        has_tools = bool(exposed_tools)
        if is_ai_framework and has_creds and has_tools:
            phantom_note = " (some tools unverified — from registry)" if _has_phantom_tools else ""
            ai_risk_context = (
                f"AI framework '{pkg.name}' runs inside an agent with {len(exposed_creds_deduped)} "
                f"exposed credential(s) and {len(exposed_tools)} reachable tool(s){phantom_note}. "
                f"A compromise here gives an attacker both identity and capability."
            )
        elif is_ai_framework and has_creds:
            ai_risk_context = (
                f"AI framework '{pkg.name}' has access to {len(exposed_creds_deduped)} "
                f"credential(s). Exploitation could exfiltrate secrets via LLM output."
            )
        elif is_ai_framework:
            ai_risk_context = "AI framework package — vulnerability affects LLM inference/orchestration pipeline."
        else:
            ai_risk_context = None

        for vuln in pkg.vulnerabilities:
            if not vuln.compliance_tags:
                vuln.compliance_tags = _tag_vuln(vuln, pkg)

            # CWE-aware filtering: only expose credentials/tools the vuln
            # type can realistically reach. A DoS (CWE-400) doesn't steal
            # DATABASE_URL. An RCE (CWE-94) does.
            from agent_bom.cwe_impact import (
                build_attack_vector_summary,
                classify_cwe_impact,
                filter_credentials_by_impact,
                filter_tools_by_impact,
            )

            impact_cat = classify_cwe_impact(vuln.cwe_ids)
            filtered_creds = filter_credentials_by_impact(
                impact_cat,
                exposed_creds_deduped,
            )
            filtered_tools = filter_tools_by_impact(
                impact_cat,
                exposed_tools,
            )
            attack_summary = build_attack_vector_summary(
                cwe_ids=vuln.cwe_ids,
                category=impact_cat,
                filtered_creds=filtered_creds,
                filtered_tools=filtered_tools,
                severity=vuln.severity.value if vuln.severity else None,
                is_kev=vuln.is_kev,
            )

            br = BlastRadius(
                vulnerability=vuln,
                package=pkg,
                affected_servers=affected_servers,
                affected_agents=affected_agents,
                exposed_credentials=filtered_creds,
                exposed_tools=filtered_tools,
                ai_risk_context=ai_risk_context,
                impact_category=impact_cat,
                all_server_credentials=list(exposed_creds_deduped),
                all_server_tools=list(exposed_tools),
                attack_vector_summary=attack_summary,
            )
            br.calculate_risk_score()
            # Context-aware tagging remains opt-in for explicit compliance
            # views, but effective framework tags are always materialized.
            if scan_options.compliance_enabled:
                br.owasp_tags = tag_blast_radius(br)
                br.atlas_tags = tag_atlas_techniques(br)
                br.attack_tags = tag_attack_techniques(br)
                br.nist_ai_rmf_tags = tag_nist_ai_rmf(br)
                br.owasp_mcp_tags = tag_owasp_mcp(br)
                br.owasp_agentic_tags = tag_owasp_agentic(br)
                br.eu_ai_act_tags = tag_eu_ai_act(br)
                br.nist_csf_tags = tag_nist_csf(br)
                br.iso_27001_tags = tag_iso_27001(br)
                br.soc2_tags = tag_soc2(br)
                br.cis_tags = tag_cis_controls(br)
                br.cmmc_tags = tag_cmmc(br)
                br.nist_800_53_tags = tag_nist_800_53(br)
                br.fedramp_tags = tag_fedramp(br)
            apply_effective_blast_radius_tags(br)
            blast_radii.append(br)

    # Sort by risk score descending
    blast_radii.sort(key=lambda br: br.risk_score, reverse=True)

    if total_vulns:
        console.print(f"  [red]⚠ Found {total_vulns} vulnerabilities across {len(blast_radii)} findings[/red]")
    else:
        console.print("  [green]✓ No known vulnerabilities found[/green]")

    _logger.info(
        "Scan summary: %d packages scanned, %d vulnerabilities, %d blast radius findings across %d agent(s)",
        len(unique_packages),
        total_vulns,
        len(blast_radii),
        len(agents),
    )

    return blast_radii

scan_agents_with_enrichment async

scan_agents_with_enrichment(agents: list[Agent], nvd_api_key: Optional[str] = None, enable_enrichment: bool = True, compliance_enabled: bool = False, show_scan_banner: bool = True, options: ScanOptions | None = None) -> list[BlastRadius]

Scan agents and enrich vulnerabilities with NVD/EPSS/KEV data.

Source code in src/agent_bom/scanners/__init__.py
async def scan_agents_with_enrichment(
    agents: list[Agent],
    nvd_api_key: Optional[str] = None,
    enable_enrichment: bool = True,
    compliance_enabled: bool = False,
    show_scan_banner: bool = True,
    options: ScanOptions | None = None,
) -> list[BlastRadius]:
    """Scan agents and enrich vulnerabilities with NVD/EPSS/KEV data."""
    scan_options = options or default_scan_options(compliance_enabled=compliance_enabled)
    # First, do normal OSV scan
    blast_radii = await scan_agents(
        agents,
        compliance_enabled=scan_options.compliance_enabled,
        show_scan_banner=show_scan_banner,
        options=scan_options,
    )

    # Then enrich with external data
    if enable_enrichment and blast_radii:
        from agent_bom.enrichment import enrich_vulnerabilities
        from agent_bom.resolver import enrich_supply_chain_metadata

        # Collect all vulnerabilities
        all_vulns = []
        all_pkgs: list[Package] = []
        for agent in agents:
            for server in agent.mcp_servers:
                for pkg in server.packages:
                    all_pkgs.append(pkg)
                    all_vulns.extend(pkg.vulnerabilities)

        if all_vulns:
            await enrich_vulnerabilities(
                all_vulns,
                nvd_api_key=nvd_api_key,
                enable_nvd=True,
                enable_epss=True,
                enable_kev=True,
            )

            # Refresh CVE-level compliance tags now that CWE/KEV/EPSS data is populated
            for agent in agents:
                for server in agent.mcp_servers:
                    for pkg in server.packages:
                        for v in pkg.vulnerabilities:
                            v.compliance_tags = _tag_vuln(v, pkg)

        # Supply-chain metadata enrichment — feeds Scorecard repo resolution
        try:
            async with create_client(timeout=10.0) as client:
                await enrich_supply_chain_metadata(all_pkgs, client)
        except Exception as exc:  # noqa: BLE001
            _logger.warning("Supply chain metadata enrichment failed (scorecard coverage may be incomplete): %s", exc)
            record_scan_warning("supply-chain metadata enrichment failed")

        # Scorecard enrichment — adds supply-chain quality signal
        try:
            from agent_bom.scorecard import enrich_packages_with_scorecard

            if all_pkgs:
                await enrich_packages_with_scorecard(all_pkgs)
        except Exception as exc:  # noqa: BLE001
            _logger.warning("Scorecard auto-enrichment failed (risk scores may be understated): %s", exc)
            record_scan_warning("OpenSSF Scorecard enrichment failed")

        # Recalculate blast radius with all enriched data
        for br in blast_radii:
            br.calculate_risk_score()
            apply_effective_blast_radius_tags(br)

        # Re-sort by updated risk scores
        blast_radii.sort(key=lambda br: br.risk_score, reverse=True)

    return blast_radii

scan_agents_sync

scan_agents_sync(agents: list[Agent], enable_enrichment: bool = False, nvd_api_key: Optional[str] = None, blast_radius_depth: int = 1, compliance_enabled: bool = False, resolve_transitive: bool = False, show_scan_banner: bool = True, offline: bool | None = None, prefer_local_db: bool | None = None, options: ScanOptions | None = None) -> list[BlastRadius]

Synchronous wrapper for scan_agents.

Source code in src/agent_bom/scanners/__init__.py
def scan_agents_sync(
    agents: list[Agent],
    enable_enrichment: bool = False,
    nvd_api_key: Optional[str] = None,
    blast_radius_depth: int = 1,
    compliance_enabled: bool = False,
    resolve_transitive: bool = False,
    show_scan_banner: bool = True,
    offline: bool | None = None,
    prefer_local_db: bool | None = None,
    options: ScanOptions | None = None,
) -> list[BlastRadius]:
    """Synchronous wrapper for scan_agents."""
    scan_options = options or default_scan_options(
        compliance_enabled=compliance_enabled,
        resolve_transitive=resolve_transitive,
        prefer_local_db=prefer_local_db,
        offline=offline,
    )
    if enable_enrichment:
        blast_radii = asyncio.run(
            scan_agents_with_enrichment(
                agents,
                nvd_api_key,
                enable_enrichment,
                compliance_enabled=scan_options.compliance_enabled,
                show_scan_banner=show_scan_banner,
                options=scan_options,
            )
        )
    else:
        blast_radii = asyncio.run(
            scan_agents(
                agents,
                compliance_enabled=scan_options.compliance_enabled,
                resolve_transitive=scan_options.resolve_transitive,
                show_scan_banner=show_scan_banner,
                options=scan_options,
            )
        )
    if blast_radius_depth > 1:
        expand_blast_radius_hops(blast_radii, agents, max_depth=blast_radius_depth)
    return blast_radii