fix(collector): 수집기 견고화 — 한 건 실패가 전체 사이클을 죽이던 것 차단

C2 csb_collector: 주간 run 의 per-URL 루프에 try/except/continue — URL 1건 실패(page-extract
예외·DB DataError)가 run() 밖으로 전파돼 이후 URL 전부 스킵+watermark 정지하던 것 차단. 각
iteration 자체 session 이라 실패 격리.
H3 news_collector: 공유 세션+종단 단일 commit → 한 소스 DB오류가 오염시켜 전 소스 insert 소실하던
구조를 소스별 독립 세션으로(csb 패턴 동형). 실패 시 rollback 후 깨끗한 상태에서 failure 기록.
실증: 수동 수집서 Taipei Times ReadTimeout 격리하고 327건 정상 완주.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-20 05:42:12 +00:00
parent 35af85c7f2
commit 12ac18eb70
2 changed files with 36 additions and 18 deletions
+11 -5
View File
@@ -374,11 +374,17 @@ async def run(bulk: bool = False, limit: int = 0) -> None:
totals = {"page": 0, "pdf": 0, "skip": 0}
for i, (url, lastmod) in enumerate(todo, 1):
async with async_session() as session:
src = await session.get(NewsSource, source_id)
counts = await _ingest_url(session, src, url, lastmod)
_set_watermark(src, lastmod)
await session.commit()
# 2026-06-20 C2: URL 1건 실패가 주간 run 전체를 중단(이후 URL 스킵·watermark 정지)하던 것 차단.
# 각 iteration 은 자체 session(async with) 이라 실패 격리 — 건너뛰고 계속.
try:
async with async_session() as session:
src = await session.get(NewsSource, source_id)
counts = await _ingest_url(session, src, url, lastmod)
_set_watermark(src, lastmod)
await session.commit()
except Exception as e:
logger.error(f"[csb] URL 처리 실패 (건너뜀): {url}{str(e) or repr(e)}")
continue
for k in totals:
totals[k] += counts[k]
if i % 10 == 0:
+25 -13
View File
@@ -213,17 +213,25 @@ async def _run_locked():
result = await session.execute(
select(NewsSource).where(NewsSource.enabled == True)
)
sources = result.scalars().all()
source_ids = [s.id for s in result.scalars().all()]
if not sources:
logger.info("활성화된 뉴스 소스 없음")
return
if not source_ids:
logger.info("활성화된 뉴스 소스 없음")
return
total = 0
for source in sources:
health = await _get_or_create_health(session, source.id)
# 2026-06-20 H3: 소스마다 독립 세션 — 한 소스의 DB 오류가 종단 단일 commit 을 깨뜨려
# 전 소스 insert 를 잃던 것 차단. 실패 시 rollback 후 깨끗한 상태에서 failure 기록.
# (csb_collector 의 per-iteration 세션 패턴과 동형.)
total = 0
for sid in source_ids:
async with async_session() as session:
source = await session.get(NewsSource, sid)
if source is None:
continue
sname = source.name
health = await _get_or_create_health(session, sid)
if not _should_attempt(health, now):
logger.info(f"[{source.name}] circuit {health.circuit_state} — 이번 사이클 skip")
logger.info(f"[{sname}] circuit {health.circuit_state} — 이번 사이클 skip")
continue
try:
if source.feed_type == "api":
@@ -234,14 +242,18 @@ async def _run_locked():
source.last_fetched_at = datetime.now(timezone.utc)
_record_success(health, count, status == "not_modified", now)
total += count
await session.commit()
except Exception as e:
# str 이 빈 예외(httpx.ConnectError('')) 대비 — health 기록과 동일 규칙
logger.error(f"[{source.name}] 수집 실패: {str(e) or repr(e)}")
source.last_fetched_at = datetime.now(timezone.utc)
await session.rollback()
logger.error(f"[{sname}] 수집 실패: {str(e) or repr(e)}")
health = await _get_or_create_health(session, sid)
src = await session.get(NewsSource, sid)
if src is not None:
src.last_fetched_at = datetime.now(timezone.utc)
_record_failure(health, str(e) or repr(e), now)
await session.commit()
logger.info(f"뉴스 수집 완료: {total}건 신규")
await session.commit()
logger.info(f"뉴스 수집 완료: {total}건 신규")
MAX_RESPONSE_SIZE = 5 * 1024 * 1024 # 5MB