From 832ea72784511ff6e8d9c3775ed8dded22129e41 Mon Sep 17 00:00:00 2001 From: hyungi Date: Mon, 29 Jun 2026 13:18:24 +0900 Subject: [PATCH] =?UTF-8?q?fix(publish):=20backfill=20=EC=8A=A4=ED=81=AC?= =?UTF-8?q?=EB=A6=BD=ED=8A=B8=20after=5Fid=20=ED=8E=98=EC=9D=B4=EC=A7=95?= =?UTF-8?q?=20=EB=A3=A8=ED=94=84=20(overflow=20=EB=88=84=EB=9D=BD=20?= =?UTF-8?q?=EB=B0=A9=EC=A7=80)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit backfill_publish_* 가 단일 호출(after_id=0, limit=PAGE)이라 PAGE 초과분이 누락(경고만)됐다. docstring 은 이미 페이지 반복을 명시했으나 스크립트가 미구현. 함수 반환을 (count, last_id)로 바꾸고 3 스크립트를 last_id 기반 while 루프로 전량 처리. PAGE=5000 bounded tx. Co-Authored-By: Claude Opus 4.8 (1M context) --- app/services/study/publish_enqueue.py | 24 +++++++++++------------ scripts/backfill_publish_card_progress.py | 20 +++++++++++-------- scripts/backfill_publish_cards.py | 20 +++++++++++-------- scripts/backfill_publish_topics.py | 18 ++++++++++------- 4 files changed, 47 insertions(+), 35 deletions(-) diff --git a/app/services/study/publish_enqueue.py b/app/services/study/publish_enqueue.py index 40698fe..9a73c3a 100644 --- a/app/services/study/publish_enqueue.py +++ b/app/services/study/publish_enqueue.py @@ -68,10 +68,10 @@ async def enqueue_question_publish(session: AsyncSession, q: Any) -> None: await enqueue_publish(session, kind=KIND_EXPLANATION, source_id=q.id, payload=expl) -async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: +async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]: """active(미삭제) 문항을 id>after_id 부터 bounded 로 outbox 적재. - 반환 = enqueue 한 문항 수(0 이면 끝). 큰 셋은 마지막 id 로 페이지 반복. caller commit. + 반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. caller commit. """ rows = ( await session.execute( @@ -83,7 +83,7 @@ async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0 ).scalars().all() for q in rows: await enqueue_question_publish(session, q) - return len(rows) + return len(rows), (rows[-1].id if rows else after_id) async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None: @@ -91,10 +91,10 @@ async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None: await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=project_topic(topic)) -async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: +async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]: """active(미삭제) 주제를 id>after_id 부터 bounded 로 outbox 적재(S-1 초기 백필). - 반환 = enqueue 한 주제 수(0 이면 끝). 큰 셋은 마지막 id 로 페이지 반복. caller commit. + 반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. caller commit. 멱등 = 발행 워커의 (payload_hash, deleted) 디둡이 no-op 재투영 흡수(중복 enqueue 무해). """ rows = ( @@ -107,7 +107,7 @@ async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, l ).scalars().all() for t in rows: await enqueue_topic_publish(session, t) - return len(rows) + return len(rows), (rows[-1].id if rows else after_id) async def enqueue_card_publish(session: AsyncSession, card: Any) -> None: @@ -123,10 +123,10 @@ async def enqueue_card_publish(session: AsyncSession, card: Any) -> None: await enqueue_publish(session, kind=KIND_CARD, source_id=card.id, payload=project_card(card)) -async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: +async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]: """검수완료(needs_review=False)·미삭제 카드를 id>after_id 부터 bounded 로 outbox 적재(S-2 초기 백필). - 반환 = enqueue 한 카드 수(0 이면 끝). 멱등 = 워커 (payload_hash, deleted) 디둡. caller commit. + 반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. 멱등 = 워커 디둡. caller commit. """ rows = ( await session.execute( @@ -142,7 +142,7 @@ async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, li ).scalars().all() for c in rows: await enqueue_card_publish(session, c) - return len(rows) + return len(rows), (rows[-1].id if rows else after_id) async def enqueue_card_progress_publish(session: AsyncSession, progress: Any) -> None: @@ -155,11 +155,11 @@ async def enqueue_card_progress_publish(session: AsyncSession, progress: Any) -> ) -async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: +async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]: """모든 card progress row 를 id>after_id 부터 bounded 로 outbox 적재(S-4 초기 백필). ★필터 없음 = ALL row(due_at NULL sentinel·terminal 포함) — due-only 백필은 sentinel 누락. - 반환 = enqueue 한 row 수(0 이면 끝). 멱등 = 워커 디둡. caller commit. + 반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. 멱등 = 워커 디둡. caller commit. """ rows = ( await session.execute( @@ -171,4 +171,4 @@ async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int ).scalars().all() for p in rows: await enqueue_card_progress_publish(session, p) - return len(rows) + return len(rows), (rows[-1].id if rows else after_id) diff --git a/scripts/backfill_publish_card_progress.py b/scripts/backfill_publish_card_progress.py index ab80e83..a86c91e 100644 --- a/scripts/backfill_publish_card_progress.py +++ b/scripts/backfill_publish_card_progress.py @@ -31,8 +31,8 @@ from core.database import async_session from models.study_memo_card_progress import StudyMemoCardProgress from services.study.publish_enqueue import backfill_publish_card_progress -# 개인 학습툴 progress row 대비 넉넉. 도달 시 가드 경보. -PAGE = 100000 +# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx). +PAGE = 5000 async def run(dry_run: bool) -> None: @@ -50,13 +50,17 @@ async def run(dry_run: bool) -> None: print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.") return - async with async_session() as session: - n = await backfill_publish_card_progress(session, after_id=0, limit=PAGE) - await session.commit() + total = 0 + after = 0 + while True: + async with async_session() as session: + n, after = await backfill_publish_card_progress(session, after_id=after, limit=PAGE) + await session.commit() + total += n + if n < PAGE: + break - print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") - if n >= PAGE: - print(f"[warn] PAGE({PAGE}) 도달 — progress 가 더 있을 수 있음. after_id 페이징 추가 필요.") + print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") def main() -> None: diff --git a/scripts/backfill_publish_cards.py b/scripts/backfill_publish_cards.py index 4e27833..3cffa6b 100644 --- a/scripts/backfill_publish_cards.py +++ b/scripts/backfill_publish_cards.py @@ -31,8 +31,8 @@ from core.database import async_session from models.study_memo_card import StudyMemoCard from services.study.publish_enqueue import backfill_publish_cards -# 개인 학습툴 카드 수 대비 넉넉(단일 outbox 적재 tx, 워커는 BATCH_SIZE 로 drain). 도달 시 가드 경보. -PAGE = 100000 +# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx). 워커는 BATCH_SIZE 로 drain. +PAGE = 5000 async def run(dry_run: bool) -> None: @@ -55,13 +55,17 @@ async def run(dry_run: bool) -> None: print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.") return - async with async_session() as session: - n = await backfill_publish_cards(session, after_id=0, limit=PAGE) - await session.commit() + total = 0 + after = 0 + while True: + async with async_session() as session: + n, after = await backfill_publish_cards(session, after_id=after, limit=PAGE) + await session.commit() + total += n + if n < PAGE: + break - print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") - if n >= PAGE: - print(f"[warn] PAGE({PAGE}) 도달 — 카드가 더 있을 수 있음. after_id 페이징 추가 필요.") + print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") def main() -> None: diff --git a/scripts/backfill_publish_topics.py b/scripts/backfill_publish_topics.py index 7f299b2..d4724fc 100644 --- a/scripts/backfill_publish_topics.py +++ b/scripts/backfill_publish_topics.py @@ -37,7 +37,7 @@ from core.database import async_session from models.study_topic import StudyTopic from services.study.publish_enqueue import backfill_publish_topics -# 개인 학습툴 주제 수 대비 넉넉. 도달 시 overflow 가드가 경보. +# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx). PAGE = 5000 @@ -58,13 +58,17 @@ async def run(dry_run: bool) -> None: print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.") return - async with async_session() as session: - n = await backfill_publish_topics(session, after_id=0, limit=PAGE) - await session.commit() + total = 0 + after = 0 + while True: + async with async_session() as session: + n, after = await backfill_publish_topics(session, after_id=after, limit=PAGE) + await session.commit() + total += n + if n < PAGE: + break - print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") - if n >= PAGE: - print(f"[warn] PAGE({PAGE}) 도달 — 주제가 더 있을 수 있음. after_id 페이징 추가 필요.") + print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") def main() -> None: