From 1ca6d8b522d700038f8670633dfa215072dc36ed Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Tue, 12 May 2026 12:38:32 +0900 Subject: [PATCH] refactor(digest): extract clustering helpers to clustering_common MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Phase 4 Global Digest 의 클러스터링 핵심 알고리즘 (time-decay weight, adaptive threshold, greedy cosine assign + EMA centroid, importance normalize) 을 `app/services/clustering_common.py` 로 추출. country 축은 caller 책임 — Phase 4 cluster_country 는 그대로 country 별 호출, 신규 morning briefing 모듈이 country 없이 cluster_global 로 호출 예정. selection.py 의 중복 _normalize 도 공통 util 로 통일. 동작 변경 0: - LAMBDA / threshold / EMA alpha / MIN_ARTICLES 모두 Phase 4 기본값 유지 - docs.sort (in-place) → sorted (copy) 변경했으나 caller 가 정렬된 docs 를 재사용하지 않으므로 무관 (dict element 의 weight 부여는 reference 라 그대로 반영) 다음 commit 에서 Phase 4 회귀 검증 (digest regenerate diff 0). --- app/services/clustering_common.py | 124 ++++++++++++++++++++++++++++++ app/services/digest/clustering.py | 102 +++++------------------- app/services/digest/selection.py | 9 +-- 3 files changed, 144 insertions(+), 91 deletions(-) create mode 100644 app/services/clustering_common.py diff --git a/app/services/clustering_common.py b/app/services/clustering_common.py new file mode 100644 index 0000000..0a4bb5c --- /dev/null +++ b/app/services/clustering_common.py @@ -0,0 +1,124 @@ +"""Cluster 알고리즘 공통 util — digest(country×topic) / briefing(topic×country) 양쪽이 import. + +추출 원칙: +- digest.clustering.cluster_country / briefing.clustering.cluster_global 의 country 축은 caller 책임. +- 본 모듈은 docs list (이미 분류된 슬라이스 또는 전체) 에 대한 순수 greedy assign + normalize. +- LAMBDA / threshold / EMA alpha / MIN_ARTICLES 는 caller 가 주입 (Phase 4 = 3일 / Briefing = 2시간 등). +""" + +import math +from datetime import datetime, timezone + +import numpy as np + + +SCORE_FLOOR = 0.01 + + +def normalize_vector(v: np.ndarray) -> np.ndarray: + norm = float(np.linalg.norm(v)) + if norm == 0.0: + return v + return v / norm + + +def time_decay_weight(now: datetime, created_at: datetime, lambda_val: float) -> float: + """exp(-λ · days_ago). created_at naive → UTC 가정.""" + if created_at.tzinfo is None: + created_at = created_at.replace(tzinfo=timezone.utc) + days = (now - created_at).total_seconds() / 86400.0 + if days < 0: + days = 0.0 + return math.exp(-lambda_val * days) + + +def adaptive_threshold_by_density( + n_docs: int, + *, + low_n: int = 50, + high_n: int = 200, + t_low: float = 0.75, + t_mid: float = 0.78, + t_high: float = 0.80, +) -> float: + """문서 밀도 기반 동적 threshold — fragmentation / blob 동시 방어.""" + if n_docs > high_n: + return t_high + if n_docs < low_n: + return t_low + return t_mid + + +def greedy_assign_cluster( + docs: list[dict], + *, + threshold: float, + centroid_alpha: float = 0.7, + min_articles: int = 3, + max_topics: int = 10, + now: datetime | None = None, + lambda_val: float, +) -> tuple[list[dict], int]: + """time-decay weight 적용 + greedy cosine assign + EMA centroid + MIN drop. + + Args: + docs: [{embedding: np.ndarray, created_at: datetime, ...}]. 함수가 in-place 로 `weight` 키 추가. + threshold: cosine 유사도 cluster 병합 임계. + centroid_alpha: EMA 계수 (0.7 = 기존 70% 유지). + min_articles: cluster 당 최소 article 수 (미만 시 drop). + max_topics: 상위 cluster 보존 개수. + now: 기준 시각 (default = datetime.now(UTC)). + lambda_val: time-decay λ (caller 가 윈도우 폭에 맞게 주입). + + Returns: + (clusters, raw_cluster_count_before_drop) + clusters = [{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...] + """ + if not docs: + return [], 0 + + now = now or datetime.now(timezone.utc) + + for d in docs: + d["weight"] = time_decay_weight(now, d["created_at"], lambda_val) + docs_sorted = sorted(docs, key=lambda d: -d["weight"]) + + clusters: list[dict] = [] + for d in docs_sorted: + v = normalize_vector(d["embedding"]) + best_idx, best_sim = -1, 0.0 + for i, c in enumerate(clusters): + sim = float(np.dot(c["centroid"], v)) + if sim > best_sim and sim >= threshold: + best_sim, best_idx = sim, i + if best_idx >= 0: + c = clusters[best_idx] + c["centroid"] = centroid_alpha * c["centroid"] + (1.0 - centroid_alpha) * v + c["centroid"] = normalize_vector(c["centroid"]) + c["members"].append(d) + c["weight_sum"] += d["weight"] + else: + clusters.append({ + "centroid": v, + "members": [d], + "weight_sum": d["weight"], + }) + + raw_count = len(clusters) + clusters = [c for c in clusters if len(c["members"]) >= min_articles] + clusters.sort(key=lambda c: -c["weight_sum"]) + clusters = clusters[:max_topics] + + normalize_importance_scores(clusters) + return clusters, raw_count + + +def normalize_importance_scores(clusters: list[dict], *, floor: float = SCORE_FLOOR) -> None: + """cluster.weight_sum 을 0~1 로 정규화 + floor. in-place. raw_weight_sum 보존.""" + if not clusters: + return + max_w = max(c["weight_sum"] for c in clusters) + for c in clusters: + normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0 + c["raw_weight_sum"] = c["weight_sum"] + c["importance_score"] = max(normalized, floor) diff --git a/app/services/digest/clustering.py b/app/services/digest/clustering.py index 1a22f40..a5fdfc7 100644 --- a/app/services/digest/clustering.py +++ b/app/services/digest/clustering.py @@ -1,20 +1,16 @@ -"""Time-decay weight + adaptive threshold + EMA centroid greedy clustering. +"""Phase 4 Global Digest — country 내 topic cluster (time-decay + EMA + adaptive threshold). -플랜의 핵심 결정: -- λ = ln(2)/3 (3일 반감기) -- threshold: 0.75 / 0.78 / 0.80 (밀도 기반 adaptive) -- centroid: EMA α=0.7 (단순 평균의 seed bias / drift 방어) -- min_articles_per_topic = 3, max_topics_per_country = 10 -- importance_score: country 내 0~1 normalize + max(score, 0.01) floor -- raw_weight_sum 별도 보존 (cross-day 트렌드 분석용) +알고리즘 코어는 `app/services/clustering_common.py` 로 추출되어 briefing 모듈과 공유. +본 파일은 Phase 4 고유 파라미터 (LAMBDA = ln(2)/3 일, MIN 3, MAX 10) 와 country 축 호출만 담당. """ import math -from datetime import datetime, timezone - -import numpy as np from core.utils import setup_logger +from services.clustering_common import ( + adaptive_threshold_by_density, + greedy_assign_cluster, +) logger = setup_logger("digest_clustering") @@ -22,94 +18,32 @@ LAMBDA = math.log(2) / 3 # 3일 반감기 — 사용자 확정값 CENTROID_ALPHA = 0.7 # EMA: 기존 중심 70% 유지, 새 멤버 30% 반영 MIN_ARTICLES_PER_TOPIC = 3 MAX_TOPICS_PER_COUNTRY = 10 -SCORE_FLOOR = 0.01 # UI 0 표시 문제 사전 차단 def adaptive_threshold(n_docs: int) -> float: - """문서 밀도 기반 동적 threshold — fragmentation/blob 동시 방어.""" - if n_docs > 200: - return 0.80 - if n_docs < 50: - return 0.75 - return 0.78 - - -def _normalize(v: np.ndarray) -> np.ndarray: - norm = float(np.linalg.norm(v)) - if norm == 0.0: - return v - return v / norm - - -def _decay_weight(now: datetime, created_at: datetime) -> float: - """exp(-λ * days_ago). created_at 이 naive 면 UTC 가정.""" - if created_at.tzinfo is None: - created_at = created_at.replace(tzinfo=timezone.utc) - days = (now - created_at).total_seconds() / 86400.0 - if days < 0: - days = 0.0 - return math.exp(-LAMBDA * days) + """Phase 4 임계 (0.75 / 0.78 / 0.80). 외부 import 호환용 alias.""" + return adaptive_threshold_by_density(n_docs) def cluster_country(country: str, docs: list[dict]) -> list[dict]: """단일 country 의 docs 를 cluster 로 묶어 정렬 + normalize 후 반환. - Args: - country: 국가 코드 (KR, US, ...) - docs: loader.load_news_window 의 출력 (단일 country 슬라이스) - - Returns: - [{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...] - - members 는 weight 가 채워진 doc dict 리스트 - - 정렬: importance_score 내림차순, 최대 MAX_TOPICS_PER_COUNTRY 개 + 공통 util `greedy_assign_cluster` 위에 country 라벨 로깅만 추가. """ if not docs: logger.info(f"[{country}] docs=0 → skip") return [] threshold = adaptive_threshold(len(docs)) - now = datetime.now(timezone.utc) - - # time-decay weight 계산 + 가중치 높은 순으로 seed 우선 - for d in docs: - d["weight"] = _decay_weight(now, d["created_at"]) - docs.sort(key=lambda d: -d["weight"]) - - clusters: list[dict] = [] - for d in docs: - v = _normalize(d["embedding"]) - best_idx, best_sim = -1, 0.0 - for i, c in enumerate(clusters): - sim = float(np.dot(c["centroid"], v)) - if sim > best_sim and sim >= threshold: - best_sim, best_idx = sim, i - if best_idx >= 0: - c = clusters[best_idx] - # EMA centroid update — drift 방지 - c["centroid"] = CENTROID_ALPHA * c["centroid"] + (1.0 - CENTROID_ALPHA) * v - c["centroid"] = _normalize(c["centroid"]) - c["members"].append(d) - c["weight_sum"] += d["weight"] - else: - clusters.append({ - "centroid": v, - "members": [d], - "weight_sum": d["weight"], - }) - - raw_count = len(clusters) - clusters = [c for c in clusters if len(c["members"]) >= MIN_ARTICLES_PER_TOPIC] + clusters, raw_count = greedy_assign_cluster( + docs, + threshold=threshold, + centroid_alpha=CENTROID_ALPHA, + min_articles=MIN_ARTICLES_PER_TOPIC, + max_topics=MAX_TOPICS_PER_COUNTRY, + lambda_val=LAMBDA, + ) dropped = raw_count - len(clusters) - clusters.sort(key=lambda c: -c["weight_sum"]) - clusters = clusters[:MAX_TOPICS_PER_COUNTRY] - - # country 내 normalize (0~1) + floor - if clusters: - max_w = max(c["weight_sum"] for c in clusters) - for c in clusters: - normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0 - c["raw_weight_sum"] = c["weight_sum"] - c["importance_score"] = max(normalized, SCORE_FLOOR) logger.info( f"[{country}] docs={len(docs)} threshold={threshold} " diff --git a/app/services/digest/selection.py b/app/services/digest/selection.py index 4504009..701e9d0 100644 --- a/app/services/digest/selection.py +++ b/app/services/digest/selection.py @@ -6,18 +6,13 @@ ai_summary 길이는 LLM 토큰 보호를 위해 SUMMARY_TRUNCATE 로 제한. import numpy as np +from services.clustering_common import normalize_vector as _normalize + K_PER_CLUSTER = 5 LAMBDA_MMR = 0.7 # relevance 70% / diversity 30% SUMMARY_TRUNCATE = 300 # long tail ai_summary 방어 -def _normalize(v: np.ndarray) -> np.ndarray: - norm = float(np.linalg.norm(v)) - if norm == 0.0: - return v - return v / norm - - def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]: """cluster 내 LLM 호출용 대표 article 들 선정.