fix(news): 적대 리뷰 반영 — reconcile auto-correlation·워터마크 검증 후 영속·수집 락

- fulltext_worker.reconcile_unresolved: EXISTS 서브쿼리 aliased(ProcessingQueue) —
  auto-correlation 이 FROM 전부 제거해 매 실행 InvalidRequestError (안전망 dead code).
  SQLAlchemy 2.0.50 컴파일 재현·수정 확인.
- news_collector._fetch_rss: ETag/Last-Modified/content-hash 영속을 bozo 파싱 검증
  뒤로 이동 — 부패 응답 워터마크 저장 시 영구 304-skip 차단.
- news_collector.run: 모듈 락으로 수동 collect vs 6h 스케줄 동시 실행 차단 —
  _get_or_create_health 동시 INSERT 의 uq_source_health_source_id 위반이
  사이클 전체를 죽이는 경합 봉쇄.

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-10 13:34:46 +09:00
parent 3df0ca53ab
commit dcf99b377e
2 changed files with 29 additions and 9 deletions
+7 -3
View File
@@ -24,6 +24,7 @@ from pathlib import Path
from sqlalchemy import exists, select from sqlalchemy import exists, select
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import aliased
from core.config import settings from core.config import settings
from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page
@@ -191,11 +192,14 @@ async def reconcile_unresolved() -> None:
"""안전망 (야간 1회): fulltext 영구 실패(3회 소진)로 summarize 가 영영 안 잡힌 """안전망 (야간 1회): fulltext 영구 실패(3회 소진)로 summarize 가 영영 안 잡힌
뉴스 문서에 RSS 요약 기준 후속 단계를 enqueue. 멱등 — enqueue 후엔 조건 불일치.""" 뉴스 문서에 RSS 요약 기준 후속 단계를 enqueue. 멱등 — enqueue 후엔 조건 불일치."""
async with async_session() as session: async with async_session() as session:
# 외부 쿼리 FROM 에 ProcessingQueue 가 이미 있어 alias 없이는 auto-correlation 이
# 서브쿼리 FROM 을 전부 제거 → InvalidRequestError (queue_consumer.reset_stale_items 패턴)
pq = aliased(ProcessingQueue)
summarize_q = ( summarize_q = (
select(ProcessingQueue.id) select(pq.id)
.where( .where(
ProcessingQueue.document_id == Document.id, pq.document_id == Document.id,
ProcessingQueue.stage == "summarize", pq.stage == "summarize",
) )
) )
result = await session.execute( result = await session.execute(
+22 -6
View File
@@ -7,6 +7,7 @@ plan crawl-24x7-1 A그룹 (2026-06-10):
A-6 first-wins + 포털 전재 2차 dedup (제목+최근 3일, 12자 이상 제목 한정) A-6 first-wins + 포털 전재 2차 dedup (제목+최근 3일, 12자 이상 제목 한정)
""" """
import asyncio
import hashlib import hashlib
import re import re
from datetime import datetime, timedelta, timezone from datetime import datetime, timedelta, timezone
@@ -185,8 +186,19 @@ async def _get_or_create_health(session, source_id: int) -> SourceHealth:
return health return health
# 수동 POST /api/news/collect 와 6h 스케줄 사이클의 동시 실행 차단 (단일 프로세스·단일
# 이벤트루프). 동시 진입 시 _get_or_create_health 가 같은 source_id 를 양쪽에서 INSERT
# → uq_source_health_source_id 위반 IntegrityError 로 사이클 전체가 죽는 경합의 원천 봉쇄.
_run_lock = asyncio.Lock()
async def run(): async def run():
"""뉴스 수집 실행""" """뉴스 수집 실행"""
async with _run_lock:
await _run_locked()
async def _run_locked():
now = datetime.now(timezone.utc) now = datetime.now(timezone.utc)
async with async_session() as session: async with async_session() as session:
result = await session.execute( result = await session.execute(
@@ -337,22 +349,26 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]:
if not any(t in ct for t in ALLOWED_CONTENT_TYPES): if not any(t in ct for t in ALLOWED_CONTENT_TYPES):
raise FeedError(f"비정상 content-type: {ct}") raise FeedError(f"비정상 content-type: {ct}")
# A-1: 워터마크 갱신 + 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) # A-1: 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) — 저장된 해시는 항상
# 파싱 검증을 통과한 응답의 것이므로 동일성 비교는 파싱 전에 안전
new_etag = resp.headers.get("etag") new_etag = resp.headers.get("etag")
new_last_modified = resp.headers.get("last-modified") new_last_modified = resp.headers.get("last-modified")
if new_etag:
source.etag = new_etag
if new_last_modified:
source.last_modified = new_last_modified
content_hash = hashlib.sha256(resp.content).hexdigest() content_hash = hashlib.sha256(resp.content).hexdigest()
if source.feed_content_hash == content_hash: if source.feed_content_hash == content_hash:
logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip") logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip")
return 0, "not_modified" return 0, "not_modified"
source.feed_content_hash = content_hash
feed = feedparser.parse(resp.text) feed = feedparser.parse(resp.text)
if feed.bozo and not feed.entries: if feed.bozo and not feed.entries:
raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}") raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}")
# A-1: 워터마크 영속은 파싱 검증 통과 후에만 — 부패(bozo) 응답의 ETag 를 저장하면
# 이후 304 로 영구 skip 되는 silent corruption 차단
if new_etag:
source.etag = new_etag
if new_last_modified:
source.last_modified = new_last_modified
source.feed_content_hash = content_hash
count = 0 count = 0
for entry in feed.entries: for entry in feed.entries: