diff --git a/app/services/study/publish_enqueue.py b/app/services/study/publish_enqueue.py index 40698fe..9a73c3a 100644 --- a/app/services/study/publish_enqueue.py +++ b/app/services/study/publish_enqueue.py @@ -68,10 +68,10 @@ async def enqueue_question_publish(session: AsyncSession, q: Any) -> None: await enqueue_publish(session, kind=KIND_EXPLANATION, source_id=q.id, payload=expl) -async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: +async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]: """active(미삭제) 문항을 id>after_id 부터 bounded 로 outbox 적재. - 반환 = enqueue 한 문항 수(0 이면 끝). 큰 셋은 마지막 id 로 페이지 반복. caller commit. + 반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. caller commit. """ rows = ( await session.execute( @@ -83,7 +83,7 @@ async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0 ).scalars().all() for q in rows: await enqueue_question_publish(session, q) - return len(rows) + return len(rows), (rows[-1].id if rows else after_id) async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None: @@ -91,10 +91,10 @@ async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None: await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=project_topic(topic)) -async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: +async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]: """active(미삭제) 주제를 id>after_id 부터 bounded 로 outbox 적재(S-1 초기 백필). - 반환 = enqueue 한 주제 수(0 이면 끝). 큰 셋은 마지막 id 로 페이지 반복. caller commit. + 반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. caller commit. 멱등 = 발행 워커의 (payload_hash, deleted) 디둡이 no-op 재투영 흡수(중복 enqueue 무해). """ rows = ( @@ -107,7 +107,7 @@ async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, l ).scalars().all() for t in rows: await enqueue_topic_publish(session, t) - return len(rows) + return len(rows), (rows[-1].id if rows else after_id) async def enqueue_card_publish(session: AsyncSession, card: Any) -> None: @@ -123,10 +123,10 @@ async def enqueue_card_publish(session: AsyncSession, card: Any) -> None: await enqueue_publish(session, kind=KIND_CARD, source_id=card.id, payload=project_card(card)) -async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: +async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]: """검수완료(needs_review=False)·미삭제 카드를 id>after_id 부터 bounded 로 outbox 적재(S-2 초기 백필). - 반환 = enqueue 한 카드 수(0 이면 끝). 멱등 = 워커 (payload_hash, deleted) 디둡. caller commit. + 반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. 멱등 = 워커 디둡. caller commit. """ rows = ( await session.execute( @@ -142,7 +142,7 @@ async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, li ).scalars().all() for c in rows: await enqueue_card_publish(session, c) - return len(rows) + return len(rows), (rows[-1].id if rows else after_id) async def enqueue_card_progress_publish(session: AsyncSession, progress: Any) -> None: @@ -155,11 +155,11 @@ async def enqueue_card_progress_publish(session: AsyncSession, progress: Any) -> ) -async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int: +async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]: """모든 card progress row 를 id>after_id 부터 bounded 로 outbox 적재(S-4 초기 백필). ★필터 없음 = ALL row(due_at NULL sentinel·terminal 포함) — due-only 백필은 sentinel 누락. - 반환 = enqueue 한 row 수(0 이면 끝). 멱등 = 워커 디둡. caller commit. + 반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. 멱등 = 워커 디둡. caller commit. """ rows = ( await session.execute( @@ -171,4 +171,4 @@ async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int ).scalars().all() for p in rows: await enqueue_card_progress_publish(session, p) - return len(rows) + return len(rows), (rows[-1].id if rows else after_id) diff --git a/scripts/backfill_publish_card_progress.py b/scripts/backfill_publish_card_progress.py index ab80e83..a86c91e 100644 --- a/scripts/backfill_publish_card_progress.py +++ b/scripts/backfill_publish_card_progress.py @@ -31,8 +31,8 @@ from core.database import async_session from models.study_memo_card_progress import StudyMemoCardProgress from services.study.publish_enqueue import backfill_publish_card_progress -# 개인 학습툴 progress row 대비 넉넉. 도달 시 가드 경보. -PAGE = 100000 +# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx). +PAGE = 5000 async def run(dry_run: bool) -> None: @@ -50,13 +50,17 @@ async def run(dry_run: bool) -> None: print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.") return - async with async_session() as session: - n = await backfill_publish_card_progress(session, after_id=0, limit=PAGE) - await session.commit() + total = 0 + after = 0 + while True: + async with async_session() as session: + n, after = await backfill_publish_card_progress(session, after_id=after, limit=PAGE) + await session.commit() + total += n + if n < PAGE: + break - print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") - if n >= PAGE: - print(f"[warn] PAGE({PAGE}) 도달 — progress 가 더 있을 수 있음. after_id 페이징 추가 필요.") + print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") def main() -> None: diff --git a/scripts/backfill_publish_cards.py b/scripts/backfill_publish_cards.py index 4e27833..3cffa6b 100644 --- a/scripts/backfill_publish_cards.py +++ b/scripts/backfill_publish_cards.py @@ -31,8 +31,8 @@ from core.database import async_session from models.study_memo_card import StudyMemoCard from services.study.publish_enqueue import backfill_publish_cards -# 개인 학습툴 카드 수 대비 넉넉(단일 outbox 적재 tx, 워커는 BATCH_SIZE 로 drain). 도달 시 가드 경보. -PAGE = 100000 +# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx). 워커는 BATCH_SIZE 로 drain. +PAGE = 5000 async def run(dry_run: bool) -> None: @@ -55,13 +55,17 @@ async def run(dry_run: bool) -> None: print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.") return - async with async_session() as session: - n = await backfill_publish_cards(session, after_id=0, limit=PAGE) - await session.commit() + total = 0 + after = 0 + while True: + async with async_session() as session: + n, after = await backfill_publish_cards(session, after_id=after, limit=PAGE) + await session.commit() + total += n + if n < PAGE: + break - print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") - if n >= PAGE: - print(f"[warn] PAGE({PAGE}) 도달 — 카드가 더 있을 수 있음. after_id 페이징 추가 필요.") + print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") def main() -> None: diff --git a/scripts/backfill_publish_topics.py b/scripts/backfill_publish_topics.py index 7f299b2..d4724fc 100644 --- a/scripts/backfill_publish_topics.py +++ b/scripts/backfill_publish_topics.py @@ -37,7 +37,7 @@ from core.database import async_session from models.study_topic import StudyTopic from services.study.publish_enqueue import backfill_publish_topics -# 개인 학습툴 주제 수 대비 넉넉. 도달 시 overflow 가드가 경보. +# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx). PAGE = 5000 @@ -58,13 +58,17 @@ async def run(dry_run: bool) -> None: print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.") return - async with async_session() as session: - n = await backfill_publish_topics(session, after_id=0, limit=PAGE) - await session.commit() + total = 0 + after = 0 + while True: + async with async_session() as session: + n, after = await backfill_publish_topics(session, after_id=after, limit=PAGE) + await session.commit() + total += n + if n < PAGE: + break - print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") - if n >= PAGE: - print(f"[warn] PAGE({PAGE}) 도달 — 주제가 더 있을 수 있음. after_id 페이징 추가 필요.") + print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.") def main() -> None: