"""Audit — 4B 가 자체 답변한 경우 금지 패턴 검출. escalate_to_26b=False 인 이벤트에만 호출. 위반 검출 시 policy_violation=true 로 analyze_events 에 기록되고 야간 sweep 에서 under_escalation 후보로 포획된다. detection_patterns 는 Python re.search() 로 평가 (Postgres regex 아님). """ from __future__ import annotations import re from functools import lru_cache from typing import Iterable from policy.loader import load_policy from policy.schema import DomainPolicy, ForbiddenRule @lru_cache(maxsize=256) def _compiled_patterns(pattern_tuple: tuple[str, ...]) -> tuple[re.Pattern[str], ...]: return tuple(re.compile(p) for p in pattern_tuple) def _rules_for_subject( policy: DomainPolicy, subject_domain: str ) -> Iterable[ForbiddenRule]: for rule in policy.forbidden_for_4b: if subject_domain in rule.applies_when_subject_in: yield rule def check_4b_output_violations( output_text: str, subject_domain: str, *, policy: DomainPolicy | None = None, ) -> list[str]: """Return list of violated forbidden-rule IDs (빈 리스트면 위반 없음). Parameters ---------- output_text: 4B 가 생성한 자체 답변 텍스트. subject_domain: routing 에서 결정된 도메인 이름. fallback 도메인은 `generic`. policy: 주입용 (테스트). None 이면 load_policy(). """ if not output_text: return [] if policy is None: policy = load_policy() violations: list[str] = [] for rule in _rules_for_subject(policy, subject_domain): patterns = _compiled_patterns(tuple(rule.detection_patterns)) if any(p.search(output_text) for p in patterns): violations.append(rule.id) return violations