"""Cluster 알고리즘 공통 util — digest(country×topic) / briefing(topic×country) 양쪽이 import. 추출 원칙: - digest.clustering.cluster_country / briefing.clustering.cluster_global 의 country 축은 caller 책임. - 본 모듈은 docs list (이미 분류된 슬라이스 또는 전체) 에 대한 순수 greedy assign + normalize. - LAMBDA / threshold / EMA alpha / MIN_ARTICLES 는 caller 가 주입 (Phase 4 = 3일 / Briefing = 2시간 등). """ import math from datetime import datetime, timezone import numpy as np SCORE_FLOOR = 0.01 def normalize_vector(v: np.ndarray) -> np.ndarray: norm = float(np.linalg.norm(v)) if norm == 0.0: return v return v / norm def time_decay_weight(now: datetime, created_at: datetime, lambda_val: float) -> float: """exp(-λ · days_ago). created_at naive → UTC 가정.""" if created_at.tzinfo is None: created_at = created_at.replace(tzinfo=timezone.utc) days = (now - created_at).total_seconds() / 86400.0 if days < 0: days = 0.0 return math.exp(-lambda_val * days) def adaptive_threshold_by_density( n_docs: int, *, low_n: int = 50, high_n: int = 200, t_low: float = 0.75, t_mid: float = 0.78, t_high: float = 0.80, ) -> float: """문서 밀도 기반 동적 threshold — fragmentation / blob 동시 방어.""" if n_docs > high_n: return t_high if n_docs < low_n: return t_low return t_mid def greedy_assign_cluster( docs: list[dict], *, threshold: float, centroid_alpha: float = 0.7, min_articles: int = 3, max_topics: int = 10, now: datetime | None = None, lambda_val: float, ) -> tuple[list[dict], int]: """time-decay weight 적용 + greedy cosine assign + EMA centroid + MIN drop. Args: docs: [{embedding: np.ndarray, created_at: datetime, ...}]. 함수가 in-place 로 `weight` 키 추가. threshold: cosine 유사도 cluster 병합 임계. centroid_alpha: EMA 계수 (0.7 = 기존 70% 유지). min_articles: cluster 당 최소 article 수 (미만 시 drop). max_topics: 상위 cluster 보존 개수. now: 기준 시각 (default = datetime.now(UTC)). lambda_val: time-decay λ (caller 가 윈도우 폭에 맞게 주입). Returns: (clusters, raw_cluster_count_before_drop) clusters = [{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...] """ if not docs: return [], 0 now = now or datetime.now(timezone.utc) for d in docs: d["weight"] = time_decay_weight(now, d["created_at"], lambda_val) docs_sorted = sorted(docs, key=lambda d: -d["weight"]) clusters: list[dict] = [] for d in docs_sorted: v = normalize_vector(d["embedding"]) best_idx, best_sim = -1, 0.0 for i, c in enumerate(clusters): sim = float(np.dot(c["centroid"], v)) if sim > best_sim and sim >= threshold: best_sim, best_idx = sim, i if best_idx >= 0: c = clusters[best_idx] c["centroid"] = centroid_alpha * c["centroid"] + (1.0 - centroid_alpha) * v c["centroid"] = normalize_vector(c["centroid"]) c["members"].append(d) c["weight_sum"] += d["weight"] else: clusters.append({ "centroid": v, "members": [d], "weight_sum": d["weight"], }) raw_count = len(clusters) clusters = [c for c in clusters if len(c["members"]) >= min_articles] clusters.sort(key=lambda c: -c["weight_sum"]) clusters = clusters[:max_topics] normalize_importance_scores(clusters) return clusters, raw_count def normalize_importance_scores(clusters: list[dict], *, floor: float = SCORE_FLOOR) -> None: """cluster.weight_sum 을 0~1 로 정규화 + floor. in-place. raw_weight_sum 보존.""" if not clusters: return max_w = max(c["weight_sum"] for c in clusters) for c in clusters: normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0 c["raw_weight_sum"] = c["weight_sum"] c["importance_score"] = max(normalized, floor)