f66b6e2f17
★ranking 변경(의도 기록): freshness soft multiplier(floor 0.7) 정책 갱신. - law_365d 폐기: 법령 현행성은 version_status(B-1 버전체인 current/superseded)가 처리. age-decay 는 current 법령을 부당 강등 → law_monitor/law 비적용으로 전환. - incident 흡수(1행): material_type='incident'(KOSHA 재해사례/사망사고) → news_90d. 시간 민감(최근 재해 가중), source_channel 무관(업로드 incident 포함). - _DocMeta/_fetch_meta 에 material_type 추가(getattr 로 mock-safe). 테스트: law 3건(policy/decay/apply) 비적용 전환 + incident 2건 신규. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
196 lines
7.5 KiB
Python
196 lines
7.5 KiB
Python
"""Time-aware retrieval freshness decay (PR-RAG-Time-1).
|
|
|
|
뉴스(source_channel='news') / 재해사례(material_type='incident', KOSHA) 도메인은
|
|
시간이 중요한 문서. 단순 relevance score 만으로는 오래된 문서가 상위에 머물러
|
|
검색 품질이 떨어짐. 본 모듈은 reranker 이후 final score 합성 단계에서
|
|
soft multiplier 로 시간 가중치 적용. 삭제는 없음 — ranking 만 demote.
|
|
|
|
설계 원칙:
|
|
- reranker = 의미 관련도, freshness decay = 운영 정책. 두 단계 분리 유지.
|
|
- floor 0.7 (multiplier 가 0.7 미만으로 안 떨어짐) — 오래되어도 죽지 않음.
|
|
- 일반 업로드 / 학습 자료 / KGS Code 원문 / ai_drafted 는 비적용 (no-op).
|
|
- ★법령(law)은 C-1 후속에서 freshness 제외 — 현행성은 version_status(B-1 버전체인)가 처리.
|
|
|
|
published_date 컬럼이 documents 에 없음 → created_at(수집 시점) 을 임시 proxy.
|
|
news/KOSHA 워커가 수집 즉시 indexing 하므로 created_at ≈ published_date.
|
|
정확도 향상은 후속 PR (worker 가 published_date 메타 채우기) 로 분리.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import math
|
|
from dataclasses import dataclass
|
|
from datetime import datetime, timezone
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
if TYPE_CHECKING:
|
|
from api.search import SearchResult
|
|
|
|
|
|
# ─── Policy ────────────────────────────────────────────────────────
|
|
|
|
# half-life (일). 90 일: 한 달 ~0.79 / 6개월 ~0.25.
|
|
# C-1 후속(2026-06-13): law_365d 폐기 — 법령 현행성은 version_status(B-1 버전체인)가 처리,
|
|
# age-decay 는 current 법령을 부당 강등(의도 변경 기록). 재해사례(incident)는 news_90d 흡수.
|
|
HALF_LIFE_DAYS: dict[str, int] = {
|
|
"news_90d": 90,
|
|
}
|
|
|
|
# soft multiplier — final = base * (FLOOR + (1-FLOOR) * decay).
|
|
# decay=1 → 1.0배, decay=0 → FLOOR. 완전 demote 금지.
|
|
DECAY_FLOOR = 0.7
|
|
DECAY_RANGE = 1.0 - DECAY_FLOOR # 0.3
|
|
|
|
# 임시 date source — published_date 부재로 created_at 사용. 후속 PR 에서 확장.
|
|
FRESHNESS_DATE_SOURCE = "created_at"
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class _DocMeta:
|
|
source_channel: str | None
|
|
content_origin: str | None
|
|
created_at: datetime | None
|
|
material_type: str | None = None
|
|
|
|
|
|
def freshness_policy(meta: _DocMeta | None) -> str | None:
|
|
"""문서 메타 → freshness 정책 이름 또는 None (no-op).
|
|
|
|
적용:
|
|
- material_type='incident' (KOSHA 재해사례/사망사고) → news_90d (C-1 후속 흡수, 시간 민감)
|
|
- source_channel='news' → news_90d
|
|
|
|
비적용 (None 반환):
|
|
- meta 자체가 None
|
|
- content_origin='ai_drafted' (생성 시점 = 가치 시점, 시간 demote 부적합)
|
|
- ★법령(source_channel='law_monitor'/material_type='law'): C-1 후속에서 law_365d 폐기.
|
|
법령 현행성은 version_status(B-1 버전체인 current/superseded)가 처리 — age-decay 는
|
|
current 법령을 부당 강등(의도 변경 기록). law 검색 ranking = version_status decorate.
|
|
- 그 외 모든 source_channel (manual, drive_sync, inbox_route, memo 등 — 자연 비적용)
|
|
"""
|
|
if meta is None:
|
|
return None
|
|
# 가드 2: content_origin='ai_drafted' 비적용
|
|
if meta.content_origin == "ai_drafted":
|
|
return None
|
|
# 재해사례/사망사고 = 시간 민감 → news 와 동일 90d (source 무관, 업로드 incident 도 포함)
|
|
if meta.material_type == "incident":
|
|
return "news_90d"
|
|
if meta.source_channel == "news":
|
|
return "news_90d"
|
|
# 법령 law_365d 폐기 + unknown source_channel → no decay
|
|
return None
|
|
|
|
|
|
def compute_age_days(created_at: datetime | None, *, now: datetime | None = None) -> float | None:
|
|
"""created_at → 경과일. 가드 6: missing/future 처리.
|
|
|
|
- created_at None → None (no decay)
|
|
- future (now < created_at) → 0.0 (clamped, age_days 음수 금지)
|
|
"""
|
|
if created_at is None:
|
|
return None
|
|
if now is None:
|
|
now = datetime.now(timezone.utc)
|
|
# naive datetime 보호 (DB 는 timestamptz 라 정상이지만 mock 테스트 대비)
|
|
if created_at.tzinfo is None:
|
|
created_at = created_at.replace(tzinfo=timezone.utc)
|
|
delta = (now - created_at).total_seconds()
|
|
if delta < 0:
|
|
return 0.0 # future date → age 0
|
|
return delta / 86400.0
|
|
|
|
|
|
def compute_decay(age_days: float | None, policy: str | None) -> float:
|
|
"""age_days + policy → decay factor (0~1). policy None / age None → 1.0 (no decay)."""
|
|
if policy is None or age_days is None:
|
|
return 1.0
|
|
half_life = HALF_LIFE_DAYS.get(policy)
|
|
if not half_life:
|
|
return 1.0
|
|
# decay = exp(-ln(2) * age / HL) = 0.5 ^ (age / HL)
|
|
return math.exp(-math.log(2) * age_days / half_life)
|
|
|
|
|
|
def adjusted_score(base_score: float, decay_factor: float) -> float:
|
|
"""final = base * (FLOOR + RANGE * decay). decay=1 → base, decay=0 → base*FLOOR."""
|
|
multiplier = DECAY_FLOOR + DECAY_RANGE * decay_factor
|
|
return base_score * multiplier
|
|
|
|
|
|
# ─── Application (called from search_pipeline) ─────────────────────
|
|
|
|
|
|
async def _fetch_meta(
|
|
session: AsyncSession, doc_ids: list[int]
|
|
) -> dict[int, _DocMeta]:
|
|
if not doc_ids:
|
|
return {}
|
|
rows = await session.execute(
|
|
text(
|
|
"""
|
|
SELECT id, source_channel::text AS source_channel,
|
|
content_origin, material_type, created_at
|
|
FROM documents
|
|
WHERE id = ANY(:ids)
|
|
"""
|
|
),
|
|
{"ids": doc_ids},
|
|
)
|
|
return {
|
|
row.id: _DocMeta(
|
|
source_channel=row.source_channel,
|
|
content_origin=row.content_origin,
|
|
created_at=row.created_at,
|
|
material_type=getattr(row, "material_type", None),
|
|
)
|
|
for row in rows
|
|
}
|
|
|
|
|
|
async def apply_freshness_decay(
|
|
results: list["SearchResult"],
|
|
session: AsyncSession,
|
|
*,
|
|
now: datetime | None = None,
|
|
) -> list["SearchResult"]:
|
|
"""results in-place 수정 + 재정렬 + freshness_debug 채움.
|
|
|
|
- 가드 8: adjusted score 가 r.score (final sort 에 실제 사용) 에 반영됨.
|
|
- 가드 9: 모든 적용 row 에 base_score / age_days / decay_factor /
|
|
freshness_adjusted_score / freshness_policy / freshness_date_source 기록.
|
|
- 비적용 row 도 freshness_debug={..., 'freshness_policy': None} 로 명시.
|
|
"""
|
|
if not results:
|
|
return results
|
|
|
|
doc_ids = list({r.id for r in results if r.id is not None})
|
|
meta_by_id = await _fetch_meta(session, doc_ids)
|
|
|
|
for r in results:
|
|
meta = meta_by_id.get(r.id)
|
|
policy = freshness_policy(meta)
|
|
age = compute_age_days(meta.created_at if meta else None, now=now)
|
|
decay = compute_decay(age, policy)
|
|
base = r.score
|
|
new_score = adjusted_score(base, decay) if policy else base
|
|
|
|
# 가드 9: debug metadata. base_score 는 항상 보존.
|
|
r.freshness_debug = {
|
|
"base_score": base,
|
|
"age_days": int(age) if age is not None else None,
|
|
"decay_factor": decay if policy else None,
|
|
"freshness_adjusted_score": new_score,
|
|
"freshness_policy": policy,
|
|
"freshness_date_source": FRESHNESS_DATE_SOURCE,
|
|
}
|
|
if policy:
|
|
r.score = new_score
|
|
|
|
# 가드 8: adjusted score 로 재정렬.
|
|
results.sort(key=lambda x: x.score, reverse=True)
|
|
return results
|