"""Time-decay weight + adaptive threshold + EMA centroid greedy clustering. 플랜의 핵심 결정: - λ = ln(2)/3 (3일 반감기) - threshold: 0.75 / 0.78 / 0.80 (밀도 기반 adaptive) - centroid: EMA α=0.7 (단순 평균의 seed bias / drift 방어) - min_articles_per_topic = 3, max_topics_per_country = 10 - importance_score: country 내 0~1 normalize + max(score, 0.01) floor - raw_weight_sum 별도 보존 (cross-day 트렌드 분석용) """ import math from datetime import datetime, timezone import numpy as np from core.utils import setup_logger logger = setup_logger("digest_clustering") LAMBDA = math.log(2) / 3 # 3일 반감기 — 사용자 확정값 CENTROID_ALPHA = 0.7 # EMA: 기존 중심 70% 유지, 새 멤버 30% 반영 MIN_ARTICLES_PER_TOPIC = 3 MAX_TOPICS_PER_COUNTRY = 10 SCORE_FLOOR = 0.01 # UI 0 표시 문제 사전 차단 def adaptive_threshold(n_docs: int) -> float: """문서 밀도 기반 동적 threshold — fragmentation/blob 동시 방어.""" if n_docs > 200: return 0.80 if n_docs < 50: return 0.75 return 0.78 def _normalize(v: np.ndarray) -> np.ndarray: norm = float(np.linalg.norm(v)) if norm == 0.0: return v return v / norm def _decay_weight(now: datetime, created_at: datetime) -> float: """exp(-λ * days_ago). created_at 이 naive 면 UTC 가정.""" if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) days = (now - created_at).total_seconds() / 86400.0 if days < 0: days = 0.0 return math.exp(-LAMBDA * days) def cluster_country(country: str, docs: list[dict]) -> list[dict]: """단일 country 의 docs 를 cluster 로 묶어 정렬 + normalize 후 반환. Args: country: 국가 코드 (KR, US, ...) docs: loader.load_news_window 의 출력 (단일 country 슬라이스) Returns: [{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...] - members 는 weight 가 채워진 doc dict 리스트 - 정렬: importance_score 내림차순, 최대 MAX_TOPICS_PER_COUNTRY 개 """ if not docs: logger.info(f"[{country}] docs=0 → skip") return [] threshold = adaptive_threshold(len(docs)) now = datetime.now(timezone.utc) # time-decay weight 계산 + 가중치 높은 순으로 seed 우선 for d in docs: d["weight"] = _decay_weight(now, d["created_at"]) docs.sort(key=lambda d: -d["weight"]) clusters: list[dict] = [] for d in docs: v = _normalize(d["embedding"]) best_idx, best_sim = -1, 0.0 for i, c in enumerate(clusters): sim = float(np.dot(c["centroid"], v)) if sim > best_sim and sim >= threshold: best_sim, best_idx = sim, i if best_idx >= 0: c = clusters[best_idx] # EMA centroid update — drift 방지 c["centroid"] = CENTROID_ALPHA * c["centroid"] + (1.0 - CENTROID_ALPHA) * v c["centroid"] = _normalize(c["centroid"]) c["members"].append(d) c["weight_sum"] += d["weight"] else: clusters.append({ "centroid": v, "members": [d], "weight_sum": d["weight"], }) raw_count = len(clusters) clusters = [c for c in clusters if len(c["members"]) >= MIN_ARTICLES_PER_TOPIC] dropped = raw_count - len(clusters) clusters.sort(key=lambda c: -c["weight_sum"]) clusters = clusters[:MAX_TOPICS_PER_COUNTRY] # country 내 normalize (0~1) + floor if clusters: max_w = max(c["weight_sum"] for c in clusters) for c in clusters: normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0 c["raw_weight_sum"] = c["weight_sum"] c["importance_score"] = max(normalized, SCORE_FLOOR) logger.info( f"[{country}] docs={len(docs)} threshold={threshold} " f"raw_clusters={raw_count} dropped={dropped} kept={len(clusters)}" ) return clusters