Files
hyungi_document_server/app/services/clustering_common.py
Hyungi Ahn 1ca6d8b522 refactor(digest): extract clustering helpers to clustering_common
Phase 4 Global Digest 의 클러스터링 핵심 알고리즘 (time-decay weight,
adaptive threshold, greedy cosine assign + EMA centroid, importance
normalize) 을 `app/services/clustering_common.py` 로 추출. country
축은 caller 책임 — Phase 4 cluster_country 는 그대로 country 별 호출,
신규 morning briefing 모듈이 country 없이 cluster_global 로 호출 예정.

selection.py 의 중복 _normalize 도 공통 util 로 통일.

동작 변경 0:
- LAMBDA / threshold / EMA alpha / MIN_ARTICLES 모두 Phase 4 기본값 유지
- docs.sort (in-place) → sorted (copy) 변경했으나 caller 가 정렬된
  docs 를 재사용하지 않으므로 무관 (dict element 의 weight 부여는
  reference 라 그대로 반영)

다음 commit 에서 Phase 4 회귀 검증 (digest regenerate diff 0).
2026-05-12 12:38:32 +09:00

125 lines
4.1 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Cluster 알고리즘 공통 util — digest(country×topic) / briefing(topic×country) 양쪽이 import.
추출 원칙:
- digest.clustering.cluster_country / briefing.clustering.cluster_global 의 country 축은 caller 책임.
- 본 모듈은 docs list (이미 분류된 슬라이스 또는 전체) 에 대한 순수 greedy assign + normalize.
- LAMBDA / threshold / EMA alpha / MIN_ARTICLES 는 caller 가 주입 (Phase 4 = 3일 / Briefing = 2시간 등).
"""
import math
from datetime import datetime, timezone
import numpy as np
SCORE_FLOOR = 0.01
def normalize_vector(v: np.ndarray) -> np.ndarray:
norm = float(np.linalg.norm(v))
if norm == 0.0:
return v
return v / norm
def time_decay_weight(now: datetime, created_at: datetime, lambda_val: float) -> float:
"""exp(-λ · days_ago). created_at naive → UTC 가정."""
if created_at.tzinfo is None:
created_at = created_at.replace(tzinfo=timezone.utc)
days = (now - created_at).total_seconds() / 86400.0
if days < 0:
days = 0.0
return math.exp(-lambda_val * days)
def adaptive_threshold_by_density(
n_docs: int,
*,
low_n: int = 50,
high_n: int = 200,
t_low: float = 0.75,
t_mid: float = 0.78,
t_high: float = 0.80,
) -> float:
"""문서 밀도 기반 동적 threshold — fragmentation / blob 동시 방어."""
if n_docs > high_n:
return t_high
if n_docs < low_n:
return t_low
return t_mid
def greedy_assign_cluster(
docs: list[dict],
*,
threshold: float,
centroid_alpha: float = 0.7,
min_articles: int = 3,
max_topics: int = 10,
now: datetime | None = None,
lambda_val: float,
) -> tuple[list[dict], int]:
"""time-decay weight 적용 + greedy cosine assign + EMA centroid + MIN drop.
Args:
docs: [{embedding: np.ndarray, created_at: datetime, ...}]. 함수가 in-place 로 `weight` 키 추가.
threshold: cosine 유사도 cluster 병합 임계.
centroid_alpha: EMA 계수 (0.7 = 기존 70% 유지).
min_articles: cluster 당 최소 article 수 (미만 시 drop).
max_topics: 상위 cluster 보존 개수.
now: 기준 시각 (default = datetime.now(UTC)).
lambda_val: time-decay λ (caller 가 윈도우 폭에 맞게 주입).
Returns:
(clusters, raw_cluster_count_before_drop)
clusters = [{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...]
"""
if not docs:
return [], 0
now = now or datetime.now(timezone.utc)
for d in docs:
d["weight"] = time_decay_weight(now, d["created_at"], lambda_val)
docs_sorted = sorted(docs, key=lambda d: -d["weight"])
clusters: list[dict] = []
for d in docs_sorted:
v = normalize_vector(d["embedding"])
best_idx, best_sim = -1, 0.0
for i, c in enumerate(clusters):
sim = float(np.dot(c["centroid"], v))
if sim > best_sim and sim >= threshold:
best_sim, best_idx = sim, i
if best_idx >= 0:
c = clusters[best_idx]
c["centroid"] = centroid_alpha * c["centroid"] + (1.0 - centroid_alpha) * v
c["centroid"] = normalize_vector(c["centroid"])
c["members"].append(d)
c["weight_sum"] += d["weight"]
else:
clusters.append({
"centroid": v,
"members": [d],
"weight_sum": d["weight"],
})
raw_count = len(clusters)
clusters = [c for c in clusters if len(c["members"]) >= min_articles]
clusters.sort(key=lambda c: -c["weight_sum"])
clusters = clusters[:max_topics]
normalize_importance_scores(clusters)
return clusters, raw_count
def normalize_importance_scores(clusters: list[dict], *, floor: float = SCORE_FLOOR) -> None:
"""cluster.weight_sum 을 0~1 로 정규화 + floor. in-place. raw_weight_sum 보존."""
if not clusters:
return
max_w = max(c["weight_sum"] for c in clusters)
for c in clusters:
normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0
c["raw_weight_sum"] = c["weight_sum"]
c["importance_score"] = max(normalized, floor)