refactor(digest): extract clustering helpers to clustering_common
Phase 4 Global Digest 의 클러스터링 핵심 알고리즘 (time-decay weight, adaptive threshold, greedy cosine assign + EMA centroid, importance normalize) 을 `app/services/clustering_common.py` 로 추출. country 축은 caller 책임 — Phase 4 cluster_country 는 그대로 country 별 호출, 신규 morning briefing 모듈이 country 없이 cluster_global 로 호출 예정. selection.py 의 중복 _normalize 도 공통 util 로 통일. 동작 변경 0: - LAMBDA / threshold / EMA alpha / MIN_ARTICLES 모두 Phase 4 기본값 유지 - docs.sort (in-place) → sorted (copy) 변경했으나 caller 가 정렬된 docs 를 재사용하지 않으므로 무관 (dict element 의 weight 부여는 reference 라 그대로 반영) 다음 commit 에서 Phase 4 회귀 검증 (digest regenerate diff 0).
This commit is contained in:
@@ -0,0 +1,124 @@
|
||||
"""Cluster 알고리즘 공통 util — digest(country×topic) / briefing(topic×country) 양쪽이 import.
|
||||
|
||||
추출 원칙:
|
||||
- digest.clustering.cluster_country / briefing.clustering.cluster_global 의 country 축은 caller 책임.
|
||||
- 본 모듈은 docs list (이미 분류된 슬라이스 또는 전체) 에 대한 순수 greedy assign + normalize.
|
||||
- LAMBDA / threshold / EMA alpha / MIN_ARTICLES 는 caller 가 주입 (Phase 4 = 3일 / Briefing = 2시간 등).
|
||||
"""
|
||||
|
||||
import math
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
SCORE_FLOOR = 0.01
|
||||
|
||||
|
||||
def normalize_vector(v: np.ndarray) -> np.ndarray:
|
||||
norm = float(np.linalg.norm(v))
|
||||
if norm == 0.0:
|
||||
return v
|
||||
return v / norm
|
||||
|
||||
|
||||
def time_decay_weight(now: datetime, created_at: datetime, lambda_val: float) -> float:
|
||||
"""exp(-λ · days_ago). created_at naive → UTC 가정."""
|
||||
if created_at.tzinfo is None:
|
||||
created_at = created_at.replace(tzinfo=timezone.utc)
|
||||
days = (now - created_at).total_seconds() / 86400.0
|
||||
if days < 0:
|
||||
days = 0.0
|
||||
return math.exp(-lambda_val * days)
|
||||
|
||||
|
||||
def adaptive_threshold_by_density(
|
||||
n_docs: int,
|
||||
*,
|
||||
low_n: int = 50,
|
||||
high_n: int = 200,
|
||||
t_low: float = 0.75,
|
||||
t_mid: float = 0.78,
|
||||
t_high: float = 0.80,
|
||||
) -> float:
|
||||
"""문서 밀도 기반 동적 threshold — fragmentation / blob 동시 방어."""
|
||||
if n_docs > high_n:
|
||||
return t_high
|
||||
if n_docs < low_n:
|
||||
return t_low
|
||||
return t_mid
|
||||
|
||||
|
||||
def greedy_assign_cluster(
|
||||
docs: list[dict],
|
||||
*,
|
||||
threshold: float,
|
||||
centroid_alpha: float = 0.7,
|
||||
min_articles: int = 3,
|
||||
max_topics: int = 10,
|
||||
now: datetime | None = None,
|
||||
lambda_val: float,
|
||||
) -> tuple[list[dict], int]:
|
||||
"""time-decay weight 적용 + greedy cosine assign + EMA centroid + MIN drop.
|
||||
|
||||
Args:
|
||||
docs: [{embedding: np.ndarray, created_at: datetime, ...}]. 함수가 in-place 로 `weight` 키 추가.
|
||||
threshold: cosine 유사도 cluster 병합 임계.
|
||||
centroid_alpha: EMA 계수 (0.7 = 기존 70% 유지).
|
||||
min_articles: cluster 당 최소 article 수 (미만 시 drop).
|
||||
max_topics: 상위 cluster 보존 개수.
|
||||
now: 기준 시각 (default = datetime.now(UTC)).
|
||||
lambda_val: time-decay λ (caller 가 윈도우 폭에 맞게 주입).
|
||||
|
||||
Returns:
|
||||
(clusters, raw_cluster_count_before_drop)
|
||||
clusters = [{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...]
|
||||
"""
|
||||
if not docs:
|
||||
return [], 0
|
||||
|
||||
now = now or datetime.now(timezone.utc)
|
||||
|
||||
for d in docs:
|
||||
d["weight"] = time_decay_weight(now, d["created_at"], lambda_val)
|
||||
docs_sorted = sorted(docs, key=lambda d: -d["weight"])
|
||||
|
||||
clusters: list[dict] = []
|
||||
for d in docs_sorted:
|
||||
v = normalize_vector(d["embedding"])
|
||||
best_idx, best_sim = -1, 0.0
|
||||
for i, c in enumerate(clusters):
|
||||
sim = float(np.dot(c["centroid"], v))
|
||||
if sim > best_sim and sim >= threshold:
|
||||
best_sim, best_idx = sim, i
|
||||
if best_idx >= 0:
|
||||
c = clusters[best_idx]
|
||||
c["centroid"] = centroid_alpha * c["centroid"] + (1.0 - centroid_alpha) * v
|
||||
c["centroid"] = normalize_vector(c["centroid"])
|
||||
c["members"].append(d)
|
||||
c["weight_sum"] += d["weight"]
|
||||
else:
|
||||
clusters.append({
|
||||
"centroid": v,
|
||||
"members": [d],
|
||||
"weight_sum": d["weight"],
|
||||
})
|
||||
|
||||
raw_count = len(clusters)
|
||||
clusters = [c for c in clusters if len(c["members"]) >= min_articles]
|
||||
clusters.sort(key=lambda c: -c["weight_sum"])
|
||||
clusters = clusters[:max_topics]
|
||||
|
||||
normalize_importance_scores(clusters)
|
||||
return clusters, raw_count
|
||||
|
||||
|
||||
def normalize_importance_scores(clusters: list[dict], *, floor: float = SCORE_FLOOR) -> None:
|
||||
"""cluster.weight_sum 을 0~1 로 정규화 + floor. in-place. raw_weight_sum 보존."""
|
||||
if not clusters:
|
||||
return
|
||||
max_w = max(c["weight_sum"] for c in clusters)
|
||||
for c in clusters:
|
||||
normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0
|
||||
c["raw_weight_sum"] = c["weight_sum"]
|
||||
c["importance_score"] = max(normalized, floor)
|
||||
@@ -1,20 +1,16 @@
|
||||
"""Time-decay weight + adaptive threshold + EMA centroid greedy clustering.
|
||||
"""Phase 4 Global Digest — country 내 topic cluster (time-decay + EMA + adaptive threshold).
|
||||
|
||||
플랜의 핵심 결정:
|
||||
- λ = ln(2)/3 (3일 반감기)
|
||||
- threshold: 0.75 / 0.78 / 0.80 (밀도 기반 adaptive)
|
||||
- centroid: EMA α=0.7 (단순 평균의 seed bias / drift 방어)
|
||||
- min_articles_per_topic = 3, max_topics_per_country = 10
|
||||
- importance_score: country 내 0~1 normalize + max(score, 0.01) floor
|
||||
- raw_weight_sum 별도 보존 (cross-day 트렌드 분석용)
|
||||
알고리즘 코어는 `app/services/clustering_common.py` 로 추출되어 briefing 모듈과 공유.
|
||||
본 파일은 Phase 4 고유 파라미터 (LAMBDA = ln(2)/3 일, MIN 3, MAX 10) 와 country 축 호출만 담당.
|
||||
"""
|
||||
|
||||
import math
|
||||
from datetime import datetime, timezone
|
||||
|
||||
import numpy as np
|
||||
|
||||
from core.utils import setup_logger
|
||||
from services.clustering_common import (
|
||||
adaptive_threshold_by_density,
|
||||
greedy_assign_cluster,
|
||||
)
|
||||
|
||||
logger = setup_logger("digest_clustering")
|
||||
|
||||
@@ -22,94 +18,32 @@ LAMBDA = math.log(2) / 3 # 3일 반감기 — 사용자 확정값
|
||||
CENTROID_ALPHA = 0.7 # EMA: 기존 중심 70% 유지, 새 멤버 30% 반영
|
||||
MIN_ARTICLES_PER_TOPIC = 3
|
||||
MAX_TOPICS_PER_COUNTRY = 10
|
||||
SCORE_FLOOR = 0.01 # UI 0 표시 문제 사전 차단
|
||||
|
||||
|
||||
def adaptive_threshold(n_docs: int) -> float:
|
||||
"""문서 밀도 기반 동적 threshold — fragmentation/blob 동시 방어."""
|
||||
if n_docs > 200:
|
||||
return 0.80
|
||||
if n_docs < 50:
|
||||
return 0.75
|
||||
return 0.78
|
||||
|
||||
|
||||
def _normalize(v: np.ndarray) -> np.ndarray:
|
||||
norm = float(np.linalg.norm(v))
|
||||
if norm == 0.0:
|
||||
return v
|
||||
return v / norm
|
||||
|
||||
|
||||
def _decay_weight(now: datetime, created_at: datetime) -> float:
|
||||
"""exp(-λ * days_ago). created_at 이 naive 면 UTC 가정."""
|
||||
if created_at.tzinfo is None:
|
||||
created_at = created_at.replace(tzinfo=timezone.utc)
|
||||
days = (now - created_at).total_seconds() / 86400.0
|
||||
if days < 0:
|
||||
days = 0.0
|
||||
return math.exp(-LAMBDA * days)
|
||||
"""Phase 4 임계 (0.75 / 0.78 / 0.80). 외부 import 호환용 alias."""
|
||||
return adaptive_threshold_by_density(n_docs)
|
||||
|
||||
|
||||
def cluster_country(country: str, docs: list[dict]) -> list[dict]:
|
||||
"""단일 country 의 docs 를 cluster 로 묶어 정렬 + normalize 후 반환.
|
||||
|
||||
Args:
|
||||
country: 국가 코드 (KR, US, ...)
|
||||
docs: loader.load_news_window 의 출력 (단일 country 슬라이스)
|
||||
|
||||
Returns:
|
||||
[{centroid, members, weight_sum, raw_weight_sum, importance_score}, ...]
|
||||
- members 는 weight 가 채워진 doc dict 리스트
|
||||
- 정렬: importance_score 내림차순, 최대 MAX_TOPICS_PER_COUNTRY 개
|
||||
공통 util `greedy_assign_cluster` 위에 country 라벨 로깅만 추가.
|
||||
"""
|
||||
if not docs:
|
||||
logger.info(f"[{country}] docs=0 → skip")
|
||||
return []
|
||||
|
||||
threshold = adaptive_threshold(len(docs))
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
# time-decay weight 계산 + 가중치 높은 순으로 seed 우선
|
||||
for d in docs:
|
||||
d["weight"] = _decay_weight(now, d["created_at"])
|
||||
docs.sort(key=lambda d: -d["weight"])
|
||||
|
||||
clusters: list[dict] = []
|
||||
for d in docs:
|
||||
v = _normalize(d["embedding"])
|
||||
best_idx, best_sim = -1, 0.0
|
||||
for i, c in enumerate(clusters):
|
||||
sim = float(np.dot(c["centroid"], v))
|
||||
if sim > best_sim and sim >= threshold:
|
||||
best_sim, best_idx = sim, i
|
||||
if best_idx >= 0:
|
||||
c = clusters[best_idx]
|
||||
# EMA centroid update — drift 방지
|
||||
c["centroid"] = CENTROID_ALPHA * c["centroid"] + (1.0 - CENTROID_ALPHA) * v
|
||||
c["centroid"] = _normalize(c["centroid"])
|
||||
c["members"].append(d)
|
||||
c["weight_sum"] += d["weight"]
|
||||
else:
|
||||
clusters.append({
|
||||
"centroid": v,
|
||||
"members": [d],
|
||||
"weight_sum": d["weight"],
|
||||
})
|
||||
|
||||
raw_count = len(clusters)
|
||||
clusters = [c for c in clusters if len(c["members"]) >= MIN_ARTICLES_PER_TOPIC]
|
||||
clusters, raw_count = greedy_assign_cluster(
|
||||
docs,
|
||||
threshold=threshold,
|
||||
centroid_alpha=CENTROID_ALPHA,
|
||||
min_articles=MIN_ARTICLES_PER_TOPIC,
|
||||
max_topics=MAX_TOPICS_PER_COUNTRY,
|
||||
lambda_val=LAMBDA,
|
||||
)
|
||||
dropped = raw_count - len(clusters)
|
||||
clusters.sort(key=lambda c: -c["weight_sum"])
|
||||
clusters = clusters[:MAX_TOPICS_PER_COUNTRY]
|
||||
|
||||
# country 내 normalize (0~1) + floor
|
||||
if clusters:
|
||||
max_w = max(c["weight_sum"] for c in clusters)
|
||||
for c in clusters:
|
||||
normalized = (c["weight_sum"] / max_w) if max_w > 0 else 0.0
|
||||
c["raw_weight_sum"] = c["weight_sum"]
|
||||
c["importance_score"] = max(normalized, SCORE_FLOOR)
|
||||
|
||||
logger.info(
|
||||
f"[{country}] docs={len(docs)} threshold={threshold} "
|
||||
|
||||
@@ -6,18 +6,13 @@ ai_summary 길이는 LLM 토큰 보호를 위해 SUMMARY_TRUNCATE 로 제한.
|
||||
|
||||
import numpy as np
|
||||
|
||||
from services.clustering_common import normalize_vector as _normalize
|
||||
|
||||
K_PER_CLUSTER = 5
|
||||
LAMBDA_MMR = 0.7 # relevance 70% / diversity 30%
|
||||
SUMMARY_TRUNCATE = 300 # long tail ai_summary 방어
|
||||
|
||||
|
||||
def _normalize(v: np.ndarray) -> np.ndarray:
|
||||
norm = float(np.linalg.norm(v))
|
||||
if norm == 0.0:
|
||||
return v
|
||||
return v / norm
|
||||
|
||||
|
||||
def select_for_llm(cluster: dict, k: int = K_PER_CLUSTER) -> list[dict]:
|
||||
"""cluster 내 LLM 호출용 대표 article 들 선정.
|
||||
|
||||
|
||||
Reference in New Issue
Block a user