feat(policy): audit — forbidden pattern detection
check_4b_output_violations(text, subject_domain) → list[str]. Python re.search 기반 (Postgres regex 아님). forbidden_for_4b 에서 해당 subject 에 적용되는 rule 만 선택 후 detection_patterns 순회. 컴파일된 패턴 lru_cache 로 반복 호출 비용 감소. escalate_to_26b=False 인 event 에만 호출하여 policy_violation=true 기록 + under_escalation 재처리 후보로 포획. plan: ~/.claude/plans/wise-gliding-hippo.md Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,56 @@
|
||||
"""Audit — 4B 가 자체 답변한 경우 금지 패턴 검출.
|
||||
|
||||
escalate_to_26b=False 인 이벤트에만 호출. 위반 검출 시 policy_violation=true 로
|
||||
analyze_events 에 기록되고 야간 sweep 에서 under_escalation 후보로 포획된다.
|
||||
|
||||
detection_patterns 는 Python re.search() 로 평가 (Postgres regex 아님).
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import re
|
||||
from functools import lru_cache
|
||||
from typing import Iterable
|
||||
|
||||
from app.policy.loader import load_policy
|
||||
from app.policy.schema import DomainPolicy, ForbiddenRule
|
||||
|
||||
|
||||
@lru_cache(maxsize=256)
|
||||
def _compiled_patterns(pattern_tuple: tuple[str, ...]) -> tuple[re.Pattern[str], ...]:
|
||||
return tuple(re.compile(p) for p in pattern_tuple)
|
||||
|
||||
|
||||
def _rules_for_subject(
|
||||
policy: DomainPolicy, subject_domain: str
|
||||
) -> Iterable[ForbiddenRule]:
|
||||
for rule in policy.forbidden_for_4b:
|
||||
if subject_domain in rule.applies_when_subject_in:
|
||||
yield rule
|
||||
|
||||
|
||||
def check_4b_output_violations(
|
||||
output_text: str,
|
||||
subject_domain: str,
|
||||
*,
|
||||
policy: DomainPolicy | None = None,
|
||||
) -> list[str]:
|
||||
"""Return list of violated forbidden-rule IDs (빈 리스트면 위반 없음).
|
||||
|
||||
Parameters
|
||||
----------
|
||||
output_text: 4B 가 생성한 자체 답변 텍스트.
|
||||
subject_domain: routing 에서 결정된 도메인 이름. fallback 도메인은 `generic`.
|
||||
policy: 주입용 (테스트). None 이면 load_policy().
|
||||
"""
|
||||
if not output_text:
|
||||
return []
|
||||
if policy is None:
|
||||
policy = load_policy()
|
||||
|
||||
violations: list[str] = []
|
||||
for rule in _rules_for_subject(policy, subject_domain):
|
||||
patterns = _compiled_patterns(tuple(rule.detection_patterns))
|
||||
if any(p.search(output_text) for p in patterns):
|
||||
violations.append(rule.id)
|
||||
return violations
|
||||
Reference in New Issue
Block a user