diff --git a/app/workers/arxiv_collector.py b/app/workers/arxiv_collector.py index 386733f..65f9327 100644 --- a/app/workers/arxiv_collector.py +++ b/app/workers/arxiv_collector.py @@ -303,10 +303,12 @@ async def run(bulk: bool = False, limit: int = 0) -> None: src = await session.get(NewsSource, source_id) watermark = _watermark(src, category) newest_seen: datetime | None = None + capped = False # 이번 run 이 cap 으로 카테고리 중도 절단됐는지 (R4) max_pages = (10**6 if bulk else _MAX_PAGES_PER_CAT) try: for page in range(max_pages): if inserted >= run_cap: + capped = True break xml_text = await _fetch(client, query, page * _PAGE_SIZE) total, entries = parse_arxiv_feed(xml_text) @@ -329,12 +331,18 @@ async def run(bulk: bool = False, limit: int = 0) -> None: else: await session.rollback() if inserted >= run_cap: + capped = True break await asyncio.sleep(_REQ_SLEEP) if stop or (page + 1) * _PAGE_SIZE >= total: break - # 카테고리 워터마크 전진(이번 run 최신 발행일) - if newest_seen: + # 카테고리 워터마크 전진 — cap 으로 절단된 run 은 미전진 (R4). + # 절단 시 newest_seen 으로 전진하면 [oldest-ingested, 옛 watermark] 사이 + # 미적재 항목이 다음 run 의 watermark 필터(entry.published <= watermark)에 + # 영구 배제(silent data loss). 미전진하면 다음 run 이 최신부터 재스캔하며 + # 적재분은 dedup-skip(_ingest_entry False, cap 미소모)하고 gap 까지 내려가 + # 이어 적재 → 백로그가 run 당 cap 씩 소화(livelock 회피). bulk 은 cap 무관. + if newest_seen and not capped: async with async_session() as session: src = await session.get(NewsSource, source_id) _set_watermark(src, category, newest_seen) diff --git a/app/workers/openalex_collector.py b/app/workers/openalex_collector.py index a52c867..b31d5e4 100644 --- a/app/workers/openalex_collector.py +++ b/app/workers/openalex_collector.py @@ -331,11 +331,13 @@ async def run(bulk: bool = False, limit: int = 0) -> None: filter_str = (build_issn_filter(wm_key, watermark) if kind == "issn" else build_filter(wm_key, watermark)) newest: str | None = None + capped = False # 이번 run 이 cap 으로 시드 중도 절단됐는지 (R4) cursor = "*" max_pages = (10**6 if bulk else _MAX_PAGES_PER_KW) try: for _page in range(max_pages): if inserted >= run_cap: + capped = True break text = await _fetch(client, key, filter_str, cursor) _count, next_cursor, works = parse_openalex_works(text) @@ -353,12 +355,17 @@ async def run(bulk: bool = False, limit: int = 0) -> None: else: await session.rollback() if inserted >= run_cap: + capped = True break await asyncio.sleep(_REQ_SLEEP) if not next_cursor: break cursor = next_cursor - if newest: + # cap 절단 시 워터마크 미전진 — 미페치 works 가 다음 run 의 watermark 필터 + # (publication_date > watermark)에 영구 배제되는 silent loss 방지. 미전진하면 + # 다음 run 이 옛 watermark 부터 재페치하며 적재분 dedup-skip(cap 미소모) 후 + # 이어 적재 → 백로그 run 당 cap 소화 (R4). bulk 은 cap 무관. + if newest and not capped: async with async_session() as session: src = await session.get(NewsSource, source_id) _set_watermark(src, wm_key, newest)