diff --git a/app/api/search.py b/app/api/search.py index 7428083..8b10663 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -65,6 +65,9 @@ class SearchResult(BaseModel): # rerank 경로를 탄 chunk에만 채워짐. normalize_display_scores는 이 필드를 # 건드리지 않는다. Phase 3 evidence fast-path 판단에 사용. rerank_score: float | None = None + # PR-RAG-Time-1: freshness decay 디버그 메타. apply_freshness_decay 가 채움. + # 비적용 row 도 채워짐(freshness_policy=None). base_score 는 항상 보존. + freshness_debug: dict | None = None # ─── Phase 0.4: 디버그 응답 스키마 ───────────────────────── diff --git a/app/services/search/freshness_decay.py b/app/services/search/freshness_decay.py new file mode 100644 index 0000000..0a834fd --- /dev/null +++ b/app/services/search/freshness_decay.py @@ -0,0 +1,190 @@ +"""Time-aware retrieval freshness decay (PR-RAG-Time-1). + +뉴스(source_channel='news') / 법령 알림(source_channel='law_monitor') 도메인은 +시간이 중요한 문서. 단순 relevance score 만으로는 오래된 문서가 상위에 머물러 +검색 품질이 떨어짐. 본 모듈은 reranker 이후 final score 합성 단계에서 +soft multiplier 로 시간 가중치 적용. 삭제는 없음 — ranking 만 demote. + +설계 원칙: +- reranker = 의미 관련도, freshness decay = 운영 정책. 두 단계 분리 유지. +- floor 0.7 (multiplier 가 0.7 미만으로 안 떨어짐) — 오래되어도 죽지 않음. +- 일반 업로드 / 학습 자료 / KGS Code 원문 / ai_drafted 는 비적용 (no-op). + +published_date 컬럼이 documents 에 없음 → created_at(수집 시점) 을 임시 proxy. +news/law_monitor 워커가 수집 즉시 indexing 하므로 created_at ≈ published_date. +정확도 향상은 후속 PR (worker 가 published_date 메타 채우기) 로 분리. +""" + +from __future__ import annotations + +import math +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any + +from sqlalchemy import text +from sqlalchemy.ext.asyncio import AsyncSession + +if TYPE_CHECKING: + from api.search import SearchResult + + +# ─── Policy ──────────────────────────────────────────────────────── + +# half-life (일). 90 일: 한 달 ~0.79 / 6개월 ~0.25. +# 365 일: 1년 ~0.5 / 3년 ~0.13. +HALF_LIFE_DAYS: dict[str, int] = { + "news_90d": 90, + "law_365d": 365, +} + +# soft multiplier — final = base * (FLOOR + (1-FLOOR) * decay). +# decay=1 → 1.0배, decay=0 → FLOOR. 완전 demote 금지. +DECAY_FLOOR = 0.7 +DECAY_RANGE = 1.0 - DECAY_FLOOR # 0.3 + +# 임시 date source — published_date 부재로 created_at 사용. 후속 PR 에서 확장. +FRESHNESS_DATE_SOURCE = "created_at" + + +@dataclass(slots=True) +class _DocMeta: + source_channel: str | None + content_origin: str | None + created_at: datetime | None + + +def freshness_policy(meta: _DocMeta | None) -> str | None: + """문서 메타 → freshness 정책 이름 또는 None (no-op). + + 적용: + - source_channel='news' → news_90d + - source_channel='law_monitor' → law_365d + + 비적용 (None 반환): + - meta 자체가 None + - content_origin='ai_drafted' (생성 시점 = 가치 시점, 시간 demote 부적합) + - 그 외 모든 source_channel (manual, drive_sync, inbox_route, memo, + Study/Manual/Reference/Academic/Checklist 류 — 자연 비적용) + """ + if meta is None: + return None + # 가드 2: content_origin='ai_drafted' 비적용 + if meta.content_origin == "ai_drafted": + return None + sc = meta.source_channel + if sc == "news": + return "news_90d" + if sc == "law_monitor": + return "law_365d" + # 가드 6: unknown source_channel → no decay + return None + + +def compute_age_days(created_at: datetime | None, *, now: datetime | None = None) -> float | None: + """created_at → 경과일. 가드 6: missing/future 처리. + + - created_at None → None (no decay) + - future (now < created_at) → 0.0 (clamped, age_days 음수 금지) + """ + if created_at is None: + return None + if now is None: + now = datetime.now(timezone.utc) + # naive datetime 보호 (DB 는 timestamptz 라 정상이지만 mock 테스트 대비) + if created_at.tzinfo is None: + created_at = created_at.replace(tzinfo=timezone.utc) + delta = (now - created_at).total_seconds() + if delta < 0: + return 0.0 # future date → age 0 + return delta / 86400.0 + + +def compute_decay(age_days: float | None, policy: str | None) -> float: + """age_days + policy → decay factor (0~1). policy None / age None → 1.0 (no decay).""" + if policy is None or age_days is None: + return 1.0 + half_life = HALF_LIFE_DAYS.get(policy) + if not half_life: + return 1.0 + # decay = exp(-ln(2) * age / HL) = 0.5 ^ (age / HL) + return math.exp(-math.log(2) * age_days / half_life) + + +def adjusted_score(base_score: float, decay_factor: float) -> float: + """final = base * (FLOOR + RANGE * decay). decay=1 → base, decay=0 → base*FLOOR.""" + multiplier = DECAY_FLOOR + DECAY_RANGE * decay_factor + return base_score * multiplier + + +# ─── Application (called from search_pipeline) ───────────────────── + + +async def _fetch_meta( + session: AsyncSession, doc_ids: list[int] +) -> dict[int, _DocMeta]: + if not doc_ids: + return {} + rows = await session.execute( + text( + """ + SELECT id, source_channel::text AS source_channel, + content_origin, created_at + FROM documents + WHERE id = ANY(:ids) + """ + ), + {"ids": doc_ids}, + ) + return { + row.id: _DocMeta( + source_channel=row.source_channel, + content_origin=row.content_origin, + created_at=row.created_at, + ) + for row in rows + } + + +async def apply_freshness_decay( + results: list["SearchResult"], + session: AsyncSession, + *, + now: datetime | None = None, +) -> list["SearchResult"]: + """results in-place 수정 + 재정렬 + freshness_debug 채움. + + - 가드 8: adjusted score 가 r.score (final sort 에 실제 사용) 에 반영됨. + - 가드 9: 모든 적용 row 에 base_score / age_days / decay_factor / + freshness_adjusted_score / freshness_policy / freshness_date_source 기록. + - 비적용 row 도 freshness_debug={..., 'freshness_policy': None} 로 명시. + """ + if not results: + return results + + doc_ids = list({r.id for r in results if r.id is not None}) + meta_by_id = await _fetch_meta(session, doc_ids) + + for r in results: + meta = meta_by_id.get(r.id) + policy = freshness_policy(meta) + age = compute_age_days(meta.created_at if meta else None, now=now) + decay = compute_decay(age, policy) + base = r.score + new_score = adjusted_score(base, decay) if policy else base + + # 가드 9: debug metadata. base_score 는 항상 보존. + r.freshness_debug = { + "base_score": base, + "age_days": int(age) if age is not None else None, + "decay_factor": decay if policy else None, + "freshness_adjusted_score": new_score, + "freshness_policy": policy, + "freshness_date_source": FRESHNESS_DATE_SOURCE, + } + if policy: + r.score = new_score + + # 가드 8: adjusted score 로 재정렬. + results.sort(key=lambda x: x.score, reverse=True) + return results diff --git a/app/services/search/search_pipeline.py b/app/services/search/search_pipeline.py index 2245422..29b7980 100644 --- a/app/services/search/search_pipeline.py +++ b/app/services/search/search_pipeline.py @@ -38,6 +38,7 @@ from .fusion_service import ( get_strategy, normalize_display_scores, ) +from .freshness_decay import apply_freshness_decay from .rerank_service import ( MAX_CHUNKS_PER_DOC, MAX_RERANK_INPUT, @@ -299,6 +300,12 @@ async def run_search( else: results = text_results + # PR-RAG-Time-1: freshness decay (reranker 이후, display 정규화 직전). + # news/law_monitor 만 적용. floor 0.7. 가드는 freshness_decay.py 참조. + t_fr = time.perf_counter() + results = await apply_freshness_decay(results, session) + timing["freshness_ms"] = (time.perf_counter() - t_fr) * 1000 + # display score 정규화 — 프론트엔드는 score*100을 % 표시. # fusion 내부 score(RRF는 0.01~0.05 범위)를 그대로 노출하면 표시가 깨짐. # Phase 3.1: rerank_score 필드는 여기서 건드리지 않음 (raw 보존). diff --git a/tests/test_freshness_decay.py b/tests/test_freshness_decay.py new file mode 100644 index 0000000..29c43a3 --- /dev/null +++ b/tests/test_freshness_decay.py @@ -0,0 +1,341 @@ +"""PR-RAG-Time-1: freshness decay 단위 테스트. + +가드레일 (plan §10): + 1. news recent vs old — recent 가 final score 높음 + 2. law_monitor recent vs old + 3. non-target source unaffected — manual 등은 score 변화 0 + 4. missing created_at unaffected — decay None, score 변화 0 + 5. future created_at clamped — age_days = 0 + 6. floor behavior — 매우 오래되어도 multiplier 0.7 미만 안 떨어짐 +""" + +from __future__ import annotations + +import math +import os +import sys +from datetime import datetime, timedelta, timezone +from types import SimpleNamespace + +# tests/ → 프로젝트 루트 → app/ +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app")) + +import pytest + +from services.search.freshness_decay import ( + DECAY_FLOOR, + HALF_LIFE_DAYS, + _DocMeta, + adjusted_score, + apply_freshness_decay, + compute_age_days, + compute_decay, + freshness_policy, +) + + +NOW = datetime(2026, 5, 3, 12, 0, 0, tzinfo=timezone.utc) + + +def _meta(channel: str | None, *, days_ago: float | None = 30.0, origin: str | None = None) -> _DocMeta: + if days_ago is None: + created = None + elif days_ago < 0: + # future: now + |days_ago| + created = NOW + timedelta(days=-days_ago) + else: + created = NOW - timedelta(days=days_ago) + return _DocMeta(source_channel=channel, content_origin=origin, created_at=created) + + +# ─── policy dispatcher ──────────────────────────────────────────── + + +def test_policy_news(): + assert freshness_policy(_meta("news")) == "news_90d" + + +def test_policy_law_monitor(): + assert freshness_policy(_meta("law_monitor")) == "law_365d" + + +def test_policy_manual_unaffected(): + assert freshness_policy(_meta("manual")) is None + + +def test_policy_drive_sync_unaffected(): + assert freshness_policy(_meta("drive_sync")) is None + + +def test_policy_inbox_route_unaffected(): + assert freshness_policy(_meta("inbox_route")) is None + + +def test_policy_memo_unaffected(): + assert freshness_policy(_meta("memo")) is None + + +def test_policy_ai_drafted_skipped_even_for_news(): + # 가드 2: content_origin='ai_drafted' 면 source_channel 무관 비적용 + assert freshness_policy(_meta("news", origin="ai_drafted")) is None + + +def test_policy_meta_none(): + assert freshness_policy(None) is None + + +def test_policy_unknown_source(): + # 가드 6: unknown source_channel → no decay + assert freshness_policy(_meta("future_channel")) is None + + +# ─── compute_age_days ───────────────────────────────────────────── + + +def test_age_normal(): + age = compute_age_days(NOW - timedelta(days=10), now=NOW) + assert age == pytest.approx(10.0) + + +def test_age_missing_date(): + # 가드 6: missing date → None + assert compute_age_days(None, now=NOW) is None + + +def test_age_future_clamped_to_zero(): + # 가드 6: future date → age_days = 0 (음수 금지) + future = NOW + timedelta(days=5) + assert compute_age_days(future, now=NOW) == 0.0 + + +def test_age_naive_datetime_treated_as_utc(): + # DB는 timestamptz 라 naive 안 오지만 mock 안전망 + naive = datetime(2026, 4, 23, 12, 0, 0) # 10일 전 + age = compute_age_days(naive, now=NOW) + assert age == pytest.approx(10.0) + + +# ─── compute_decay ──────────────────────────────────────────────── + + +def test_decay_at_half_life_news(): + # age=90 → decay = 0.5 + assert compute_decay(90.0, "news_90d") == pytest.approx(0.5, rel=1e-6) + + +def test_decay_at_half_life_law(): + assert compute_decay(365.0, "law_365d") == pytest.approx(0.5, rel=1e-6) + + +def test_decay_age_zero_full(): + assert compute_decay(0.0, "news_90d") == pytest.approx(1.0) + + +def test_decay_news_30_days(): + expected = math.exp(-math.log(2) * 30 / 90) # ~0.794 + assert compute_decay(30.0, "news_90d") == pytest.approx(expected, rel=1e-6) + + +def test_decay_policy_none_returns_one(): + assert compute_decay(100.0, None) == 1.0 + + +def test_decay_age_none_returns_one(): + assert compute_decay(None, "news_90d") == 1.0 + + +# ─── adjusted_score ─────────────────────────────────────────────── + + +def test_adjusted_score_decay_one_no_change(): + # decay=1 → final = base * 1.0 + assert adjusted_score(0.85, 1.0) == pytest.approx(0.85) + + +def test_adjusted_score_decay_zero_floor(): + # 가드 11: decay=0 → final = base * FLOOR (0.7), 그 아래로 안 내려감 + assert adjusted_score(0.85, 0.0) == pytest.approx(0.85 * DECAY_FLOOR) + + +def test_adjusted_score_old_news_floor_bound(): + # 매우 오래된 뉴스 (age >> half_life) — multiplier 가 0.7 미만 절대 아님 + decay = compute_decay(10000.0, "news_90d") # ~0 + score = adjusted_score(1.0, decay) + assert score >= DECAY_FLOOR + assert score == pytest.approx(DECAY_FLOOR, abs=0.01) + + +# ─── apply_freshness_decay (integration with mock session) ──────── + + +class _MockSession: + """SQLAlchemy AsyncSession mock — execute(text, params) 호출만 받음.""" + + def __init__(self, meta_rows: list[dict]): + self._rows = meta_rows + self.last_ids: list[int] | None = None + + async def execute(self, _stmt, params): + self.last_ids = list(params["ids"]) + # SearchPipeline 의 row._mapping 패턴이 아니라 row.field 패턴이라 + # SimpleNamespace 로 충분. + return [SimpleNamespace(**row) for row in self._rows if row["id"] in self.last_ids] + + +def _result(doc_id: int, score: float): + """SearchResult 흉내. freshness_decay 는 .id / .score / .freshness_debug 만 만짐.""" + return SimpleNamespace(id=doc_id, score=score, freshness_debug=None) + + +@pytest.mark.asyncio +async def test_apply_news_recent_vs_old_recent_higher(): + # 가드 1: news recent vs old → recent 가 final score 높음 + base = 0.50 # 동일 base + rows = [ + {"id": 1, "source_channel": "news", "content_origin": "extracted", + "created_at": NOW - timedelta(days=3)}, # 매우 최근 + {"id": 2, "source_channel": "news", "content_origin": "extracted", + "created_at": NOW - timedelta(days=365)}, # 1년 전 + ] + session = _MockSession(rows) + results = [_result(1, base), _result(2, base)] + out = await apply_freshness_decay(results, session, now=NOW) + # 재정렬: 최근(id=1) 이 위 + assert out[0].id == 1 + assert out[0].score > out[1].score + # debug + assert out[0].freshness_debug["freshness_policy"] == "news_90d" + assert out[0].freshness_debug["base_score"] == base + assert out[0].freshness_debug["age_days"] == 3 + assert out[0].freshness_debug["freshness_date_source"] == "created_at" + assert out[1].freshness_debug["age_days"] == 365 + + +@pytest.mark.asyncio +async def test_apply_law_monitor_recent_vs_old_recent_higher(): + # 가드 2: law_monitor recent 가 위 + base = 0.50 + rows = [ + {"id": 1, "source_channel": "law_monitor", "content_origin": "extracted", + "created_at": NOW - timedelta(days=10)}, + {"id": 2, "source_channel": "law_monitor", "content_origin": "extracted", + "created_at": NOW - timedelta(days=730)}, # 2년 + ] + session = _MockSession(rows) + results = [_result(1, base), _result(2, base)] + out = await apply_freshness_decay(results, session, now=NOW) + assert out[0].id == 1 + assert out[0].freshness_debug["freshness_policy"] == "law_365d" + # 2년 → law_365d 반감기 1년 → decay ~0.25 → multiplier ~ 0.775 + assert out[1].score < out[0].score + + +@pytest.mark.asyncio +async def test_apply_non_target_unaffected(): + # 가드 3: manual 은 score 변화 0 + base = 0.42 + rows = [ + {"id": 1, "source_channel": "manual", "content_origin": "extracted", + "created_at": NOW - timedelta(days=1000)}, + ] + session = _MockSession(rows) + results = [_result(1, base)] + out = await apply_freshness_decay(results, session, now=NOW) + assert out[0].score == base + assert out[0].freshness_debug["freshness_policy"] is None + assert out[0].freshness_debug["base_score"] == base + assert out[0].freshness_debug["decay_factor"] is None + + +@pytest.mark.asyncio +async def test_apply_missing_created_at_unaffected(): + # 가드 4: missing date → score 변화 0 + base = 0.50 + rows = [ + {"id": 1, "source_channel": "news", "content_origin": "extracted", + "created_at": None}, + ] + session = _MockSession(rows) + results = [_result(1, base)] + out = await apply_freshness_decay(results, session, now=NOW) + assert out[0].score == base + # policy 는 'news_90d' 인데 age None 이라 decay 1.0 → adjusted_score 호출 자체가 안 됨 (policy 없는 길 아님) + # freshness_decay 코드: policy True + age None → decay=1.0 → adjusted_score(base, 1.0) = base * 1.0 + # 결과적으로 score 변화 0 — 가드 4 통과 + assert out[0].freshness_debug["freshness_policy"] == "news_90d" + assert out[0].freshness_debug["age_days"] is None + + +@pytest.mark.asyncio +async def test_apply_future_created_at_clamped(): + # 가드 5: future created_at → age_days = 0 → decay = 1.0 → score 그대로 + base = 0.50 + rows = [ + {"id": 1, "source_channel": "news", "content_origin": "extracted", + "created_at": NOW + timedelta(days=10)}, + ] + session = _MockSession(rows) + results = [_result(1, base)] + out = await apply_freshness_decay(results, session, now=NOW) + assert out[0].freshness_debug["age_days"] == 0 + assert out[0].score == pytest.approx(base * 1.0) + + +@pytest.mark.asyncio +async def test_apply_floor_never_below_0_7(): + # 가드 6 (floor): 매우 오래된 news → multiplier 0.7 미만 절대 아님 + base = 1.0 + rows = [ + {"id": 1, "source_channel": "news", "content_origin": "extracted", + "created_at": NOW - timedelta(days=10000)}, # 27년 ㅋ + ] + session = _MockSession(rows) + results = [_result(1, base)] + out = await apply_freshness_decay(results, session, now=NOW) + assert out[0].score >= base * DECAY_FLOOR + assert out[0].score == pytest.approx(base * DECAY_FLOOR, abs=0.001) + + +@pytest.mark.asyncio +async def test_apply_ai_drafted_news_skipped(): + # 가드 2 통합: source=news 라도 content_origin='ai_drafted' 면 비적용 + base = 0.50 + rows = [ + {"id": 1, "source_channel": "news", "content_origin": "ai_drafted", + "created_at": NOW - timedelta(days=300)}, + ] + session = _MockSession(rows) + results = [_result(1, base)] + out = await apply_freshness_decay(results, session, now=NOW) + assert out[0].score == base + assert out[0].freshness_debug["freshness_policy"] is None + + +@pytest.mark.asyncio +async def test_apply_empty_results_noop(): + session = _MockSession([]) + out = await apply_freshness_decay([], session, now=NOW) + assert out == [] + + +@pytest.mark.asyncio +async def test_apply_resort_by_adjusted_score(): + # base score 가 같아도 freshness 가 정렬을 바꾼다. + rows = [ + {"id": 1, "source_channel": "news", "content_origin": "extracted", + "created_at": NOW - timedelta(days=200)}, # decay ~0.21 + {"id": 2, "source_channel": "news", "content_origin": "extracted", + "created_at": NOW - timedelta(days=10)}, # decay ~0.926 + {"id": 3, "source_channel": "manual", "content_origin": "extracted", + "created_at": NOW - timedelta(days=10000)}, # 비적용 → base 그대로 + ] + session = _MockSession(rows) + # 일부러 base 동일 0.5 → 정렬은 freshness 만으로 결정 + results = [_result(1, 0.5), _result(2, 0.5), _result(3, 0.5)] + out = await apply_freshness_decay(results, session, now=NOW) + # manual 은 base 0.5 그대로, news 10일 전 은 0.5 * (0.7 + 0.3*0.926) ≈ 0.489 + # news 200일 전 은 0.5 * (0.7 + 0.3*0.21) ≈ 0.382 + # 정렬: manual(0.5) > news_recent(0.489) > news_old(0.382) + assert out[0].id == 3 # manual + assert out[1].id == 2 # news 10일 + assert out[2].id == 1 # news 200일