1ca6d8b522
Phase 4 Global Digest 의 클러스터링 핵심 알고리즘 (time-decay weight, adaptive threshold, greedy cosine assign + EMA centroid, importance normalize) 을 `app/services/clustering_common.py` 로 추출. country 축은 caller 책임 — Phase 4 cluster_country 는 그대로 country 별 호출, 신규 morning briefing 모듈이 country 없이 cluster_global 로 호출 예정. selection.py 의 중복 _normalize 도 공통 util 로 통일. 동작 변경 0: - LAMBDA / threshold / EMA alpha / MIN_ARTICLES 모두 Phase 4 기본값 유지 - docs.sort (in-place) → sorted (copy) 변경했으나 caller 가 정렬된 docs 를 재사용하지 않으므로 무관 (dict element 의 weight 부여는 reference 라 그대로 반영) 다음 commit 에서 Phase 4 회귀 검증 (digest regenerate diff 0).
125 lines
4.1 KiB
Python
125 lines
4.1 KiB
Python
"""Cluster 알고리즘 공통 util — digest(country×topic) / briefing(topic×country) 양쪽이 import.
|
||
|
||
추출 원칙:
|
||
- digest.clustering.cluster_country / briefing.clustering.cluster_global 의 country 축은 caller 책임.
|
||
- 본 모듈은 docs list (이미 분류된 슬라이스 또는 전체) 에 대한 순수 greedy assign + normalize.
|
||
- LAMBDA / threshold / EMA alpha / MIN_ARTICLES 는 caller 가 주입 (Phase 4 = 3일 / Briefing = 2시간 등).
|
||
"""
|
||
|
||
import math
|
||
from datetime import datetime, timezone
|
||
|
||
import numpy as np
|
||
|
||
|
||
SCORE_FLOOR = 0.01
|
||
|
||
|
||
def normalize_vector(v: np.ndarray) -> np.ndarray:
|
||
norm = float(np.linalg.norm(v))
|
||
if norm == 0.0:
|
||
return v
|
||
return v / norm
|
||
|
||
|
||
def time_decay_weight(now: datetime, created_at: datetime, lambda_val: float) -> float:
|
||
"""exp(-λ · days_ago). created_at naive → UTC 가정."""
|
||
if created_at.tzinfo is None:
|
||
created_at = created_at.replace(tzinfo=timezone.utc)
|
||
days = (now - created_at).total_seconds() / 86400.0
|
||
if days < 0:
|
||
days = 0.0
|
||
return math.exp(-lambda_val * days)
|
||
|
||
|
||
def adaptive_threshold_by_density(
|
||
n_docs: int,
|
||
*,
|
||
low_n: int = 50,
|
||
high_n: int = 200,
|
||
t_low: float = 0.75,
|
||
t_mid: float = 0.78,
|
||
t_high: float = 0.80,
|
||
) -> float:
|
||
"""문서 밀도 기반 동적 threshold — fragmentation / blob 동시 방어."""
|
||
if n_docs > high_n:
|
||
return t_high
|
||
if n_docs < low_n:
|
||
return t_low
|
||
return t_mid
|
||
|
||
|
||
def greedy_assign_cluster(
|
||
docs: list[dict],
|
||
*,
|
||
threshold: float,
|
||
centroid_alpha: float = 0.7,
|
||
min_articles: int = 3,
|
||
max_topics: int = 10,
|
||
now: datetime | None = None,
|
||
lambda_val: float,
|
||
) -> tuple[list[dict], int]:
|
||
"""time-decay weight 적용 + greedy cosine assign + EMA centroid + MIN drop.
|
||
|
||
Args:
|
||
docs: [{embedding: np.ndarray, created_at: datetime, ...}]. 함수가 in-place 로 `weight` 키 추가.
|
||
threshold: cosine 유사도 cluster 병합 임계.
|
||
centroid_alpha: EMA 계수 (0.7 = 기존 70% 유지).
|
||
min_articles: cluster 당 최소 article 수 (미만 시 drop).
|
||
max_topics: 상위 cluster 보존 개수.
|
||
now: 기준 시각 (default = datetime.now(UTC)).
|
||
lambda_val: time-decay λ (caller 가 윈도우 폭에 맞게 주입).
|
||
|
||
Returns:
|
||
(clusters, raw_cluster_count_before_drop)
|
||
clusters = [{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...]
|
||
"""
|
||
if not docs:
|
||
return [], 0
|
||
|
||
now = now or datetime.now(timezone.utc)
|
||
|
||
for d in docs:
|
||
d["weight"] = time_decay_weight(now, d["created_at"], lambda_val)
|
||
docs_sorted = sorted(docs, key=lambda d: -d["weight"])
|
||
|
||
clusters: list[dict] = []
|
||
for d in docs_sorted:
|
||
v = normalize_vector(d["embedding"])
|
||
best_idx, best_sim = -1, 0.0
|
||
for i, c in enumerate(clusters):
|
||
sim = float(np.dot(c["centroid"], v))
|
||
if sim > best_sim and sim >= threshold:
|
||
best_sim, best_idx = sim, i
|
||
if best_idx >= 0:
|
||
c = clusters[best_idx]
|
||
c["centroid"] = centroid_alpha * c["centroid"] + (1.0 - centroid_alpha) * v
|
||
c["centroid"] = normalize_vector(c["centroid"])
|
||
c["members"].append(d)
|
||
c["weight_sum"] += d["weight"]
|
||
else:
|
||
clusters.append({
|
||
"centroid": v,
|
||
"members": [d],
|
||
"weight_sum": d["weight"],
|
||
})
|
||
|
||
raw_count = len(clusters)
|
||
clusters = [c for c in clusters if len(c["members"]) >= min_articles]
|
||
clusters.sort(key=lambda c: -c["weight_sum"])
|
||
clusters = clusters[:max_topics]
|
||
|
||
normalize_importance_scores(clusters)
|
||
return clusters, raw_count
|
||
|
||
|
||
def normalize_importance_scores(clusters: list[dict], *, floor: float = SCORE_FLOOR) -> None:
|
||
"""cluster.weight_sum 을 0~1 로 정규화 + floor. in-place. raw_weight_sum 보존."""
|
||
if not clusters:
|
||
return
|
||
max_w = max(c["weight_sum"] for c in clusters)
|
||
for c in clusters:
|
||
normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0
|
||
c["raw_weight_sum"] = c["weight_sum"]
|
||
c["importance_score"] = max(normalized, floor)
|