@dataclass
class Finding:
"""Unified finding — one model for all issue types across all sources.
Phase 1 covers CVE findings (migrated from BlastRadius).
Phase 2 will add cloud CIS, proxy, SAST, skill findings.
"""
# Core identity
finding_type: FindingType
source: FindingSource
asset: Asset
severity: str # mirrors Severity enum value; str for forward-compat
# Vendor severity (from source scanner) vs normalised CVSS severity
vendor_severity: Optional[str] = None # severity as reported by vendor/scanner
cvss_severity: Optional[str] = None # normalised from CVSS base score
# Finding content
title: str = ""
description: str = ""
cve_id: Optional[str] = None # e.g. "CVE-2024-1234"
cwe_ids: list[str] = field(default_factory=list) # e.g. ["CWE-79"]
cvss_score: Optional[float] = None
epss_score: Optional[float] = None
is_kev: bool = False # CISA Known Exploited Vulnerability
# Remediation
fixed_version: Optional[str] = None
remediation_guidance: Optional[str] = None
# Compliance mappings (same tags as BlastRadius for parity)
compliance_tags: list[str] = field(default_factory=list) # all framework tags combined
# Framework slugs that govern this finding (set by compliance_hub.apply_hub_classification).
# Distinct from the per-framework `*_tags` fields below, which hold control codes.
applicable_frameworks: list[str] = field(default_factory=list)
controls: list[ControlTag] = field(default_factory=list)
owasp_tags: list[str] = field(default_factory=list)
atlas_tags: list[str] = field(default_factory=list)
attack_tags: list[str] = field(default_factory=list)
nist_ai_rmf_tags: list[str] = field(default_factory=list)
owasp_mcp_tags: list[str] = field(default_factory=list)
owasp_agentic_tags: list[str] = field(default_factory=list)
eu_ai_act_tags: list[str] = field(default_factory=list)
nist_csf_tags: list[str] = field(default_factory=list)
iso_27001_tags: list[str] = field(default_factory=list)
soc2_tags: list[str] = field(default_factory=list)
cis_tags: list[str] = field(default_factory=list)
cmmc_tags: list[str] = field(default_factory=list)
nist_800_53_tags: list[str] = field(default_factory=list)
fedramp_tags: list[str] = field(default_factory=list)
pci_dss_tags: list[str] = field(default_factory=list)
# Graph / correlation
related_findings: list[str] = field(default_factory=list) # IDs of related findings
evidence: dict = field(default_factory=dict) # raw evidence payload
# Risk
risk_score: float = 0.0 # 0-10 unified risk score
# Unique ID — deterministic UUID v5 based on content (computed in __post_init__)
# Pass an explicit id= to override (e.g. when ingesting from external scanner)
id: str = field(default="")
def __post_init__(self) -> None:
"""Compute stable ID from finding content if not explicitly set."""
self.controls = _dedupe_control_tags(
[
*(tag if isinstance(tag, ControlTag) else ControlTag.from_dict(tag) for tag in self.controls),
*self._legacy_control_tags(),
]
)
if not self.id:
# Deterministic ID: same CVE on same asset always same ID
cve_part = self.cve_id or self.title
pkg_name = ""
pkg_version = ""
if self.asset.asset_type == "package" and self.asset.identifier:
# purl like "pkg:pypi/torch@2.3.0" — extract name/version
purl = self.asset.identifier
pkg_part = purl.split("/")[-1] if "/" in purl else purl
if "@" in pkg_part:
pkg_name, pkg_version = pkg_part.rsplit("@", 1)
self.id = canonical_finding_id(
self.asset.stable_id,
cve_part,
pkg_name,
pkg_version,
)
@property
def canonical_id(self) -> str:
"""Canonical alias for id used by report and graph consumers."""
return self.id
def _legacy_control_tags(self) -> list[ControlTag]:
"""Return normalized controls derived from legacy tag arrays."""
tags: list[ControlTag] = []
for field_name, framework in LEGACY_CONTROL_FIELDS:
values = getattr(self, field_name)
for value in values:
if value:
tags.append(
ControlTag(
framework=framework,
control=str(value),
version=_LEGACY_CONTROL_VERSION_BY_FRAMEWORK.get(framework, "legacy"),
confidence=0.75,
source=f"legacy:{field_name}",
via=field_name,
)
)
return tags
def normalized_controls(self) -> list[ControlTag]:
"""Return deduplicated structured controls for this finding."""
return _dedupe_control_tags([*self.controls, *self._legacy_control_tags()])
def all_compliance_tags(self) -> list[str]:
"""Return deduplicated union of all compliance tag lists."""
seen: set[str] = set()
result: list[str] = []
for tag in (
self.compliance_tags
+ self.owasp_tags
+ self.atlas_tags
+ self.attack_tags
+ self.nist_ai_rmf_tags
+ self.owasp_mcp_tags
+ self.owasp_agentic_tags
+ self.eu_ai_act_tags
+ self.nist_csf_tags
+ self.iso_27001_tags
+ self.soc2_tags
+ self.cis_tags
+ self.cmmc_tags
+ self.nist_800_53_tags
+ self.fedramp_tags
+ self.pci_dss_tags
+ [tag.control for tag in self.normalized_controls()]
):
if tag not in seen:
seen.add(tag)
result.append(tag)
return result
def effective_severity(self) -> str:
"""Return the best severity value: vendor > cvss > base severity."""
return self.vendor_severity or self.cvss_severity or self.severity
def to_dict(self) -> dict:
"""Return a JSON-serializable finding payload."""
return {
"schema_version": FINDING_SCHEMA_VERSION,
"id": self.id,
"canonical_id": self.canonical_id,
"finding_type": self.finding_type.value,
"source": self.source.value,
"asset": {
"name": self.asset.name,
"asset_type": self.asset.asset_type,
"identifier": self.asset.identifier,
"location": self.asset.location,
"stable_id": self.asset.stable_id,
"canonical_id": self.asset.canonical_id,
"source_ids": self.asset.source_ids,
},
"severity": self.severity,
"effective_severity": self.effective_severity(),
"vendor_severity": self.vendor_severity,
"cvss_severity": self.cvss_severity,
"title": self.title,
"description": self.description,
"cve_id": self.cve_id,
"cwe_ids": self.cwe_ids,
"cvss_score": self.cvss_score,
"epss_score": self.epss_score,
"is_kev": self.is_kev,
"fixed_version": self.fixed_version,
"remediation_guidance": self.remediation_guidance,
"compliance_tags": self.all_compliance_tags(),
"applicable_frameworks": list(self.applicable_frameworks),
"controls": [tag.to_dict() for tag in self.normalized_controls()],
"owasp_tags": self.owasp_tags,
"atlas_tags": self.atlas_tags,
"attack_tags": self.attack_tags,
"nist_ai_rmf_tags": self.nist_ai_rmf_tags,
"owasp_mcp_tags": self.owasp_mcp_tags,
"owasp_agentic_tags": self.owasp_agentic_tags,
"eu_ai_act_tags": self.eu_ai_act_tags,
"nist_csf_tags": self.nist_csf_tags,
"iso_27001_tags": self.iso_27001_tags,
"soc2_tags": self.soc2_tags,
"cis_tags": self.cis_tags,
"cmmc_tags": self.cmmc_tags,
"nist_800_53_tags": self.nist_800_53_tags,
"fedramp_tags": self.fedramp_tags,
"pci_dss_tags": self.pci_dss_tags,
"related_findings": self.related_findings,
"evidence": self.evidence,
"risk_score": self.risk_score,
}