From 12ac18eb7061158268c2542130d2ab90712f129b Mon Sep 17 00:00:00 2001 From: hyungi Date: Sat, 20 Jun 2026 05:42:12 +0000 Subject: [PATCH] =?UTF-8?q?fix(collector):=20=EC=88=98=EC=A7=91=EA=B8=B0?= =?UTF-8?q?=20=EA=B2=AC=EA=B3=A0=ED=99=94=20=E2=80=94=20=ED=95=9C=20?= =?UTF-8?q?=EA=B1=B4=20=EC=8B=A4=ED=8C=A8=EA=B0=80=20=EC=A0=84=EC=B2=B4=20?= =?UTF-8?q?=EC=82=AC=EC=9D=B4=ED=81=B4=EC=9D=84=20=EC=A3=BD=EC=9D=B4?= =?UTF-8?q?=EB=8D=98=20=EA=B2=83=20=EC=B0=A8=EB=8B=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit C2 csb_collector: 주간 run 의 per-URL 루프에 try/except/continue — URL 1건 실패(page-extract 예외·DB DataError)가 run() 밖으로 전파돼 이후 URL 전부 스킵+watermark 정지하던 것 차단. 각 iteration 자체 session 이라 실패 격리. H3 news_collector: 공유 세션+종단 단일 commit → 한 소스 DB오류가 오염시켜 전 소스 insert 소실하던 구조를 소스별 독립 세션으로(csb 패턴 동형). 실패 시 rollback 후 깨끗한 상태에서 failure 기록. 실증: 수동 수집서 Taipei Times ReadTimeout 격리하고 327건 정상 완주. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/workers/csb_collector.py | 16 ++++++++++----- app/workers/news_collector.py | 38 +++++++++++++++++++++++------------ 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/app/workers/csb_collector.py b/app/workers/csb_collector.py index 14b3298..b1dd86f 100644 --- a/app/workers/csb_collector.py +++ b/app/workers/csb_collector.py @@ -374,11 +374,17 @@ async def run(bulk: bool = False, limit: int = 0) -> None: totals = {"page": 0, "pdf": 0, "skip": 0} for i, (url, lastmod) in enumerate(todo, 1): - async with async_session() as session: - src = await session.get(NewsSource, source_id) - counts = await _ingest_url(session, src, url, lastmod) - _set_watermark(src, lastmod) - await session.commit() + # 2026-06-20 C2: URL 1건 실패가 주간 run 전체를 중단(이후 URL 스킵·watermark 정지)하던 것 차단. + # 각 iteration 은 자체 session(async with) 이라 실패 격리 — 건너뛰고 계속. + try: + async with async_session() as session: + src = await session.get(NewsSource, source_id) + counts = await _ingest_url(session, src, url, lastmod) + _set_watermark(src, lastmod) + await session.commit() + except Exception as e: + logger.error(f"[csb] URL 처리 실패 (건너뜀): {url} — {str(e) or repr(e)}") + continue for k in totals: totals[k] += counts[k] if i % 10 == 0: diff --git a/app/workers/news_collector.py b/app/workers/news_collector.py index 3a4a6df..a2f06e4 100644 --- a/app/workers/news_collector.py +++ b/app/workers/news_collector.py @@ -213,17 +213,25 @@ async def _run_locked(): result = await session.execute( select(NewsSource).where(NewsSource.enabled == True) ) - sources = result.scalars().all() + source_ids = [s.id for s in result.scalars().all()] - if not sources: - logger.info("활성화된 뉴스 소스 없음") - return + if not source_ids: + logger.info("활성화된 뉴스 소스 없음") + return - total = 0 - for source in sources: - health = await _get_or_create_health(session, source.id) + # 2026-06-20 H3: 소스마다 독립 세션 — 한 소스의 DB 오류가 종단 단일 commit 을 깨뜨려 + # 전 소스 insert 를 잃던 것 차단. 실패 시 rollback 후 깨끗한 상태에서 failure 기록. + # (csb_collector 의 per-iteration 세션 패턴과 동형.) + total = 0 + for sid in source_ids: + async with async_session() as session: + source = await session.get(NewsSource, sid) + if source is None: + continue + sname = source.name + health = await _get_or_create_health(session, sid) if not _should_attempt(health, now): - logger.info(f"[{source.name}] circuit {health.circuit_state} — 이번 사이클 skip") + logger.info(f"[{sname}] circuit {health.circuit_state} — 이번 사이클 skip") continue try: if source.feed_type == "api": @@ -234,14 +242,18 @@ async def _run_locked(): source.last_fetched_at = datetime.now(timezone.utc) _record_success(health, count, status == "not_modified", now) total += count + await session.commit() except Exception as e: # str 이 빈 예외(httpx.ConnectError('')) 대비 — health 기록과 동일 규칙 - logger.error(f"[{source.name}] 수집 실패: {str(e) or repr(e)}") - source.last_fetched_at = datetime.now(timezone.utc) + await session.rollback() + logger.error(f"[{sname}] 수집 실패: {str(e) or repr(e)}") + health = await _get_or_create_health(session, sid) + src = await session.get(NewsSource, sid) + if src is not None: + src.last_fetched_at = datetime.now(timezone.utc) _record_failure(health, str(e) or repr(e), now) - - await session.commit() - logger.info(f"뉴스 수집 완료: {total}건 신규") + await session.commit() + logger.info(f"뉴스 수집 완료: {total}건 신규") MAX_RESPONSE_SIZE = 5 * 1024 * 1024 # 5MB