From 2ad32c5c84ac900bb6c7f9659299c09a6f5b1e64 Mon Sep 17 00:00:00 2001 From: hyungi Date: Tue, 16 Jun 2026 13:28:04 +0900 Subject: [PATCH] =?UTF-8?q?fix(collectors):=20=EC=9B=8C=ED=84=B0=EB=A7=88?= =?UTF-8?q?=ED=81=AC=20cap=20=EC=A0=88=EB=8B=A8=20=EC=8B=9C=20=EB=AF=B8?= =?UTF-8?q?=EC=A0=84=EC=A7=84=20=E2=80=94=20silent=20backlog=20loss=20?= =?UTF-8?q?=EC=B0=A8=EB=8B=A8=20(R4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit arxiv/openalex 수집기가 run_cap 도달로 카테고리/시드 중도 절단돼도 워터마크를 newest 로 전진시켜, [oldest-ingested, 옛 watermark] 사이 미적재 항목이 다음 run 의 watermark 필터에 영구 배제되던 silent data loss 수정. capped 플래그: cap 으로 루프 절단 시 set → 워터마크 미전진. 미전진하면 다음 run 이 최신부터 재스캔하며 적재분은 dedup-skip(cap 미소모)하고 gap 까지 내려가 이어 적재 → 백로그 run 당 cap 소화(livelock 회피). 정상 완주(watermark 도달/cursor 소진) 시에만 전진. bulk(CLI)은 cap 무관. docstring 의 '다음 run 이월' 약속을 실제 동작과 일치. 검증: py_compile 통과. kosha 부분실패 per-case commit 은 R4 후속. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/workers/arxiv_collector.py | 12 ++++++++++-- app/workers/openalex_collector.py | 9 ++++++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/app/workers/arxiv_collector.py b/app/workers/arxiv_collector.py index 386733f..65f9327 100644 --- a/app/workers/arxiv_collector.py +++ b/app/workers/arxiv_collector.py @@ -303,10 +303,12 @@ async def run(bulk: bool = False, limit: int = 0) -> None: src = await session.get(NewsSource, source_id) watermark = _watermark(src, category) newest_seen: datetime | None = None + capped = False # 이번 run 이 cap 으로 카테고리 중도 절단됐는지 (R4) max_pages = (10**6 if bulk else _MAX_PAGES_PER_CAT) try: for page in range(max_pages): if inserted >= run_cap: + capped = True break xml_text = await _fetch(client, query, page * _PAGE_SIZE) total, entries = parse_arxiv_feed(xml_text) @@ -329,12 +331,18 @@ async def run(bulk: bool = False, limit: int = 0) -> None: else: await session.rollback() if inserted >= run_cap: + capped = True break await asyncio.sleep(_REQ_SLEEP) if stop or (page + 1) * _PAGE_SIZE >= total: break - # 카테고리 워터마크 전진(이번 run 최신 발행일) - if newest_seen: + # 카테고리 워터마크 전진 — cap 으로 절단된 run 은 미전진 (R4). + # 절단 시 newest_seen 으로 전진하면 [oldest-ingested, 옛 watermark] 사이 + # 미적재 항목이 다음 run 의 watermark 필터(entry.published <= watermark)에 + # 영구 배제(silent data loss). 미전진하면 다음 run 이 최신부터 재스캔하며 + # 적재분은 dedup-skip(_ingest_entry False, cap 미소모)하고 gap 까지 내려가 + # 이어 적재 → 백로그가 run 당 cap 씩 소화(livelock 회피). bulk 은 cap 무관. + if newest_seen and not capped: async with async_session() as session: src = await session.get(NewsSource, source_id) _set_watermark(src, category, newest_seen) diff --git a/app/workers/openalex_collector.py b/app/workers/openalex_collector.py index a52c867..b31d5e4 100644 --- a/app/workers/openalex_collector.py +++ b/app/workers/openalex_collector.py @@ -331,11 +331,13 @@ async def run(bulk: bool = False, limit: int = 0) -> None: filter_str = (build_issn_filter(wm_key, watermark) if kind == "issn" else build_filter(wm_key, watermark)) newest: str | None = None + capped = False # 이번 run 이 cap 으로 시드 중도 절단됐는지 (R4) cursor = "*" max_pages = (10**6 if bulk else _MAX_PAGES_PER_KW) try: for _page in range(max_pages): if inserted >= run_cap: + capped = True break text = await _fetch(client, key, filter_str, cursor) _count, next_cursor, works = parse_openalex_works(text) @@ -353,12 +355,17 @@ async def run(bulk: bool = False, limit: int = 0) -> None: else: await session.rollback() if inserted >= run_cap: + capped = True break await asyncio.sleep(_REQ_SLEEP) if not next_cursor: break cursor = next_cursor - if newest: + # cap 절단 시 워터마크 미전진 — 미페치 works 가 다음 run 의 watermark 필터 + # (publication_date > watermark)에 영구 배제되는 silent loss 방지. 미전진하면 + # 다음 run 이 옛 watermark 부터 재페치하며 적재분 dedup-skip(cap 미소모) 후 + # 이어 적재 → 백로그 run 당 cap 소화 (R4). bulk 은 cap 무관. + if newest and not capped: async with async_session() as session: src = await session.get(NewsSource, source_id) _set_watermark(src, wm_key, newest)