Files
hyungi_document_server/app/services/digest/loader.py
Hyungi Ahn 46ba9dd231 fix(digest/loader): raw SQL pgvector string 형태 파싱 지원
raw text() SQL + asyncpg 조합에서는 pgvector Vector(1024) 컬럼이
'[0.087,0.305,...]' 형태의 string 으로 반환되며 numpy 변환이 실패함
(ORM 을 쓰면 type 등록되지만 raw SQL 은 안 됨).

_to_numpy_embedding 에서 string 이면 json.loads 로 먼저 파싱한 뒤
numpy.asarray. 변환 실패 시 None 반환 (해당 doc 자동 drop).

Phase 4 deploy 워커 첫 실행 검증 중 발견.
2026-04-09 08:00:43 +09:00

150 lines
4.6 KiB
Python

"""뉴스 7일 window 로드 + country 정규화
- documents 테이블엔 country 컬럼이 없으므로 document_chunks.country 를 first non-null 로 조인.
- chunk-level country 도 NULL 이면 news_sources.name prefix(ai_sub_group) 매칭으로 fallback.
- 그래도 NULL 이면 drop(로그 경고).
- ai_summary / embedding 이 NULL 이면 처음부터 제외 (재요약/재임베딩 0회 원칙).
"""
from collections import defaultdict
from datetime import datetime
from typing import Any
import numpy as np
from sqlalchemy import text
from core.database import async_session
from core.utils import setup_logger
logger = setup_logger("digest_loader")
_NEWS_WINDOW_SQL = text("""
SELECT
d.id,
d.title,
d.ai_summary,
d.embedding,
d.created_at,
d.edit_url,
d.ai_sub_group,
(
SELECT c.country
FROM document_chunks c
WHERE c.doc_id = d.id AND c.country IS NOT NULL
LIMIT 1
) AS chunk_country
FROM documents d
WHERE d.source_channel = 'news'
AND d.deleted_at IS NULL
AND d.created_at >= :window_start
AND d.created_at < :window_end
AND d.embedding IS NOT NULL
AND d.ai_summary IS NOT NULL
""")
_SOURCE_COUNTRY_SQL = text("""
SELECT name, country FROM news_sources WHERE country IS NOT NULL
""")
def _to_numpy_embedding(raw: Any) -> np.ndarray | None:
"""pgvector 컬럼을 numpy array(float32)로 정규화.
raw SQL + asyncpg 조합에서 pgvector type 이 등록 안 되어 있으면
embedding 이 '[0.1,0.2,...]' 같은 string 으로 반환된다. ORM 을 안 쓰므로
이 경우 직접 파싱해야 한다.
"""
if raw is None:
return None
if isinstance(raw, str):
import json
try:
raw = json.loads(raw)
except json.JSONDecodeError:
return None
try:
arr = np.asarray(raw, dtype=np.float32)
except (TypeError, ValueError):
return None
if arr.size == 0:
return None
return arr
async def _load_source_country_map(session) -> dict[str, str]:
"""news_sources name → country 매핑.
name 은 '경향신문 문화' 형태이고 documents.ai_sub_group 은 '경향신문' (split[0]).
prefix 매칭이 가능하도록 첫 토큰 → country 로 인덱싱.
"""
rows = await session.execute(_SOURCE_COUNTRY_SQL)
mapping: dict[str, str] = {}
for name, country in rows:
if not name or not country:
continue
prefix = name.split(" ")[0].strip()
if prefix and prefix not in mapping:
mapping[prefix] = country
return mapping
async def load_news_window(
window_start: datetime,
window_end: datetime,
) -> dict[str, list[dict]]:
"""주어진 윈도우 안의 뉴스 documents 를 country 별 dict 로 반환.
Returns:
{"KR": [doc_dict, ...], "US": [...], ...}
"""
docs_by_country: dict[str, list[dict]] = defaultdict(list)
null_country_count = 0
total = 0
async with async_session() as session:
source_country = await _load_source_country_map(session)
result = await session.execute(
_NEWS_WINDOW_SQL,
{"window_start": window_start, "window_end": window_end},
)
for row in result.mappings():
embedding = _to_numpy_embedding(row["embedding"])
if embedding is None:
continue
country = row["chunk_country"]
if not country:
# news_sources prefix fallback
ai_sub_group = (row["ai_sub_group"] or "").strip()
if ai_sub_group:
country = source_country.get(ai_sub_group)
if not country:
null_country_count += 1
continue
country = country.upper()
docs_by_country[country].append({
"id": int(row["id"]),
"title": row["title"] or "",
"ai_summary": row["ai_summary"] or "",
"embedding": embedding,
"created_at": row["created_at"],
"edit_url": row["edit_url"] or "",
"ai_sub_group": row["ai_sub_group"] or "",
})
total += 1
if null_country_count:
logger.warning(
f"[loader] country 분류 실패로 drop된 문서 {null_country_count}"
f"(chunk_country + news_sources fallback 모두 실패)"
)
logger.info(
f"[loader] window {window_start.date()} ~ {window_end.date()}"
f"{total}건 ({len(docs_by_country)}개 국가)"
)
return dict(docs_by_country)