diff --git a/app/policy/audit.py b/app/policy/audit.py new file mode 100644 index 0000000..8b15ca5 --- /dev/null +++ b/app/policy/audit.py @@ -0,0 +1,56 @@ +"""Audit — 4B 가 자체 답변한 경우 금지 패턴 검출. + +escalate_to_26b=False 인 이벤트에만 호출. 위반 검출 시 policy_violation=true 로 +analyze_events 에 기록되고 야간 sweep 에서 under_escalation 후보로 포획된다. + +detection_patterns 는 Python re.search() 로 평가 (Postgres regex 아님). +""" + +from __future__ import annotations + +import re +from functools import lru_cache +from typing import Iterable + +from app.policy.loader import load_policy +from app.policy.schema import DomainPolicy, ForbiddenRule + + +@lru_cache(maxsize=256) +def _compiled_patterns(pattern_tuple: tuple[str, ...]) -> tuple[re.Pattern[str], ...]: + return tuple(re.compile(p) for p in pattern_tuple) + + +def _rules_for_subject( + policy: DomainPolicy, subject_domain: str +) -> Iterable[ForbiddenRule]: + for rule in policy.forbidden_for_4b: + if subject_domain in rule.applies_when_subject_in: + yield rule + + +def check_4b_output_violations( + output_text: str, + subject_domain: str, + *, + policy: DomainPolicy | None = None, +) -> list[str]: + """Return list of violated forbidden-rule IDs (빈 리스트면 위반 없음). + + Parameters + ---------- + output_text: 4B 가 생성한 자체 답변 텍스트. + subject_domain: routing 에서 결정된 도메인 이름. fallback 도메인은 `generic`. + policy: 주입용 (테스트). None 이면 load_policy(). + """ + if not output_text: + return [] + if policy is None: + policy = load_policy() + + violations: list[str] = [] + for rule in _rules_for_subject(policy, subject_domain): + patterns = _compiled_patterns(tuple(rule.detection_patterns)) + if any(p.search(output_text) for p in patterns): + violations.append(rule.id) + return violations