From dcf99b377ea4c23ce4f95241dbbdc4d30a606263 Mon Sep 17 00:00:00 2001 From: hyungi Date: Wed, 10 Jun 2026 13:34:46 +0900 Subject: [PATCH] =?UTF-8?q?fix(news):=20=EC=A0=81=EB=8C=80=20=EB=A6=AC?= =?UTF-8?q?=EB=B7=B0=20=EB=B0=98=EC=98=81=20=E2=80=94=20reconcile=20auto-c?= =?UTF-8?q?orrelation=C2=B7=EC=9B=8C=ED=84=B0=EB=A7=88=ED=81=AC=20?= =?UTF-8?q?=EA=B2=80=EC=A6=9D=20=ED=9B=84=20=EC=98=81=EC=86=8D=C2=B7?= =?UTF-8?q?=EC=88=98=EC=A7=91=20=EB=9D=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - fulltext_worker.reconcile_unresolved: EXISTS 서브쿼리 aliased(ProcessingQueue) — auto-correlation 이 FROM 전부 제거해 매 실행 InvalidRequestError (안전망 dead code). SQLAlchemy 2.0.50 컴파일 재현·수정 확인. - news_collector._fetch_rss: ETag/Last-Modified/content-hash 영속을 bozo 파싱 검증 뒤로 이동 — 부패 응답 워터마크 저장 시 영구 304-skip 차단. - news_collector.run: 모듈 락으로 수동 collect vs 6h 스케줄 동시 실행 차단 — _get_or_create_health 동시 INSERT 의 uq_source_health_source_id 위반이 사이클 전체를 죽이는 경합 봉쇄. Co-Authored-By: Claude Fable 5 --- app/workers/fulltext_worker.py | 10 +++++++--- app/workers/news_collector.py | 28 ++++++++++++++++++++++------ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/app/workers/fulltext_worker.py b/app/workers/fulltext_worker.py index 1fbb505..c9d7ee3 100644 --- a/app/workers/fulltext_worker.py +++ b/app/workers/fulltext_worker.py @@ -24,6 +24,7 @@ from pathlib import Path from sqlalchemy import exists, select from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm import aliased from core.config import settings from core.crawl_politeness import CrawlBlocked, CrawlFetchError, CrawlSkip, fetch_page @@ -191,11 +192,14 @@ async def reconcile_unresolved() -> None: """안전망 (야간 1회): fulltext 영구 실패(3회 소진)로 summarize 가 영영 안 잡힌 뉴스 문서에 RSS 요약 기준 후속 단계를 enqueue. 멱등 — enqueue 후엔 조건 불일치.""" async with async_session() as session: + # 외부 쿼리 FROM 에 ProcessingQueue 가 이미 있어 alias 없이는 auto-correlation 이 + # 서브쿼리 FROM 을 전부 제거 → InvalidRequestError (queue_consumer.reset_stale_items 패턴) + pq = aliased(ProcessingQueue) summarize_q = ( - select(ProcessingQueue.id) + select(pq.id) .where( - ProcessingQueue.document_id == Document.id, - ProcessingQueue.stage == "summarize", + pq.document_id == Document.id, + pq.stage == "summarize", ) ) result = await session.execute( diff --git a/app/workers/news_collector.py b/app/workers/news_collector.py index c64cf95..cba44ae 100644 --- a/app/workers/news_collector.py +++ b/app/workers/news_collector.py @@ -7,6 +7,7 @@ plan crawl-24x7-1 A그룹 (2026-06-10): A-6 first-wins + 포털 전재 2차 dedup (제목+최근 3일, 12자 이상 제목 한정) """ +import asyncio import hashlib import re from datetime import datetime, timedelta, timezone @@ -185,8 +186,19 @@ async def _get_or_create_health(session, source_id: int) -> SourceHealth: return health +# 수동 POST /api/news/collect 와 6h 스케줄 사이클의 동시 실행 차단 (단일 프로세스·단일 +# 이벤트루프). 동시 진입 시 _get_or_create_health 가 같은 source_id 를 양쪽에서 INSERT +# → uq_source_health_source_id 위반 IntegrityError 로 사이클 전체가 죽는 경합의 원천 봉쇄. +_run_lock = asyncio.Lock() + + async def run(): """뉴스 수집 실행""" + async with _run_lock: + await _run_locked() + + +async def _run_locked(): now = datetime.now(timezone.utc) async with async_session() as session: result = await session.execute( @@ -337,22 +349,26 @@ async def _fetch_rss(session, source: NewsSource) -> tuple[int, str]: if not any(t in ct for t in ALLOWED_CONTENT_TYPES): raise FeedError(f"비정상 content-type: {ct}") - # A-1: 워터마크 갱신 + 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) + # A-1: 콘텐츠 해시 변경감지 (CDN 의 ETag 회전 대비 병행) — 저장된 해시는 항상 + # 파싱 검증을 통과한 응답의 것이므로 동일성 비교는 파싱 전에 안전 new_etag = resp.headers.get("etag") new_last_modified = resp.headers.get("last-modified") - if new_etag: - source.etag = new_etag - if new_last_modified: - source.last_modified = new_last_modified content_hash = hashlib.sha256(resp.content).hexdigest() if source.feed_content_hash == content_hash: logger.info(f"[{source.name}] 콘텐츠 해시 동일 — 파싱 skip") return 0, "not_modified" - source.feed_content_hash = content_hash feed = feedparser.parse(resp.text) if feed.bozo and not feed.entries: raise FeedError(f"RSS 파싱 실패: {feed.bozo_exception}") + + # A-1: 워터마크 영속은 파싱 검증 통과 후에만 — 부패(bozo) 응답의 ETag 를 저장하면 + # 이후 304 로 영구 skip 되는 silent corruption 차단 + if new_etag: + source.etag = new_etag + if new_last_modified: + source.last_modified = new_last_modified + source.feed_content_hash = content_hash count = 0 for entry in feed.entries: