fix(publish): backfill 스크립트 after_id 페이징 루프 (overflow 누락 방지)
backfill_publish_* 가 단일 호출(after_id=0, limit=PAGE)이라 PAGE 초과분이 누락(경고만)됐다. docstring 은 이미 페이지 반복을 명시했으나 스크립트가 미구현. 함수 반환을 (count, last_id)로 바꾸고 3 스크립트를 last_id 기반 while 루프로 전량 처리. PAGE=5000 bounded tx. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -68,10 +68,10 @@ async def enqueue_question_publish(session: AsyncSession, q: Any) -> None:
|
||||
await enqueue_publish(session, kind=KIND_EXPLANATION, source_id=q.id, payload=expl)
|
||||
|
||||
|
||||
async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
|
||||
async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]:
|
||||
"""active(미삭제) 문항을 id>after_id 부터 bounded 로 outbox 적재.
|
||||
|
||||
반환 = enqueue 한 문항 수(0 이면 끝). 큰 셋은 마지막 id 로 페이지 반복. caller commit.
|
||||
반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. caller commit.
|
||||
"""
|
||||
rows = (
|
||||
await session.execute(
|
||||
@@ -83,7 +83,7 @@ async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0
|
||||
).scalars().all()
|
||||
for q in rows:
|
||||
await enqueue_question_publish(session, q)
|
||||
return len(rows)
|
||||
return len(rows), (rows[-1].id if rows else after_id)
|
||||
|
||||
|
||||
async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None:
|
||||
@@ -91,10 +91,10 @@ async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None:
|
||||
await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=project_topic(topic))
|
||||
|
||||
|
||||
async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
|
||||
async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]:
|
||||
"""active(미삭제) 주제를 id>after_id 부터 bounded 로 outbox 적재(S-1 초기 백필).
|
||||
|
||||
반환 = enqueue 한 주제 수(0 이면 끝). 큰 셋은 마지막 id 로 페이지 반복. caller commit.
|
||||
반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. caller commit.
|
||||
멱등 = 발행 워커의 (payload_hash, deleted) 디둡이 no-op 재투영 흡수(중복 enqueue 무해).
|
||||
"""
|
||||
rows = (
|
||||
@@ -107,7 +107,7 @@ async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, l
|
||||
).scalars().all()
|
||||
for t in rows:
|
||||
await enqueue_topic_publish(session, t)
|
||||
return len(rows)
|
||||
return len(rows), (rows[-1].id if rows else after_id)
|
||||
|
||||
|
||||
async def enqueue_card_publish(session: AsyncSession, card: Any) -> None:
|
||||
@@ -123,10 +123,10 @@ async def enqueue_card_publish(session: AsyncSession, card: Any) -> None:
|
||||
await enqueue_publish(session, kind=KIND_CARD, source_id=card.id, payload=project_card(card))
|
||||
|
||||
|
||||
async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
|
||||
async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]:
|
||||
"""검수완료(needs_review=False)·미삭제 카드를 id>after_id 부터 bounded 로 outbox 적재(S-2 초기 백필).
|
||||
|
||||
반환 = enqueue 한 카드 수(0 이면 끝). 멱등 = 워커 (payload_hash, deleted) 디둡. caller commit.
|
||||
반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. 멱등 = 워커 디둡. caller commit.
|
||||
"""
|
||||
rows = (
|
||||
await session.execute(
|
||||
@@ -142,7 +142,7 @@ async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, li
|
||||
).scalars().all()
|
||||
for c in rows:
|
||||
await enqueue_card_publish(session, c)
|
||||
return len(rows)
|
||||
return len(rows), (rows[-1].id if rows else after_id)
|
||||
|
||||
|
||||
async def enqueue_card_progress_publish(session: AsyncSession, progress: Any) -> None:
|
||||
@@ -155,11 +155,11 @@ async def enqueue_card_progress_publish(session: AsyncSession, progress: Any) ->
|
||||
)
|
||||
|
||||
|
||||
async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
|
||||
async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]:
|
||||
"""모든 card progress row 를 id>after_id 부터 bounded 로 outbox 적재(S-4 초기 백필).
|
||||
|
||||
★필터 없음 = ALL row(due_at NULL sentinel·terminal 포함) — due-only 백필은 sentinel 누락.
|
||||
반환 = enqueue 한 row 수(0 이면 끝). 멱등 = 워커 디둡. caller commit.
|
||||
반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. 멱등 = 워커 디둡. caller commit.
|
||||
"""
|
||||
rows = (
|
||||
await session.execute(
|
||||
@@ -171,4 +171,4 @@ async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int
|
||||
).scalars().all()
|
||||
for p in rows:
|
||||
await enqueue_card_progress_publish(session, p)
|
||||
return len(rows)
|
||||
return len(rows), (rows[-1].id if rows else after_id)
|
||||
|
||||
@@ -31,8 +31,8 @@ from core.database import async_session
|
||||
from models.study_memo_card_progress import StudyMemoCardProgress
|
||||
from services.study.publish_enqueue import backfill_publish_card_progress
|
||||
|
||||
# 개인 학습툴 progress row 대비 넉넉. 도달 시 가드 경보.
|
||||
PAGE = 100000
|
||||
# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx).
|
||||
PAGE = 5000
|
||||
|
||||
|
||||
async def run(dry_run: bool) -> None:
|
||||
@@ -50,13 +50,17 @@ async def run(dry_run: bool) -> None:
|
||||
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
|
||||
return
|
||||
|
||||
async with async_session() as session:
|
||||
n = await backfill_publish_card_progress(session, after_id=0, limit=PAGE)
|
||||
await session.commit()
|
||||
total = 0
|
||||
after = 0
|
||||
while True:
|
||||
async with async_session() as session:
|
||||
n, after = await backfill_publish_card_progress(session, after_id=after, limit=PAGE)
|
||||
await session.commit()
|
||||
total += n
|
||||
if n < PAGE:
|
||||
break
|
||||
|
||||
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
|
||||
if n >= PAGE:
|
||||
print(f"[warn] PAGE({PAGE}) 도달 — progress 가 더 있을 수 있음. after_id 페이징 추가 필요.")
|
||||
print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
|
||||
@@ -31,8 +31,8 @@ from core.database import async_session
|
||||
from models.study_memo_card import StudyMemoCard
|
||||
from services.study.publish_enqueue import backfill_publish_cards
|
||||
|
||||
# 개인 학습툴 카드 수 대비 넉넉(단일 outbox 적재 tx, 워커는 BATCH_SIZE 로 drain). 도달 시 가드 경보.
|
||||
PAGE = 100000
|
||||
# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx). 워커는 BATCH_SIZE 로 drain.
|
||||
PAGE = 5000
|
||||
|
||||
|
||||
async def run(dry_run: bool) -> None:
|
||||
@@ -55,13 +55,17 @@ async def run(dry_run: bool) -> None:
|
||||
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
|
||||
return
|
||||
|
||||
async with async_session() as session:
|
||||
n = await backfill_publish_cards(session, after_id=0, limit=PAGE)
|
||||
await session.commit()
|
||||
total = 0
|
||||
after = 0
|
||||
while True:
|
||||
async with async_session() as session:
|
||||
n, after = await backfill_publish_cards(session, after_id=after, limit=PAGE)
|
||||
await session.commit()
|
||||
total += n
|
||||
if n < PAGE:
|
||||
break
|
||||
|
||||
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
|
||||
if n >= PAGE:
|
||||
print(f"[warn] PAGE({PAGE}) 도달 — 카드가 더 있을 수 있음. after_id 페이징 추가 필요.")
|
||||
print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
|
||||
@@ -37,7 +37,7 @@ from core.database import async_session
|
||||
from models.study_topic import StudyTopic
|
||||
from services.study.publish_enqueue import backfill_publish_topics
|
||||
|
||||
# 개인 학습툴 주제 수 대비 넉넉. 도달 시 overflow 가드가 경보.
|
||||
# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx).
|
||||
PAGE = 5000
|
||||
|
||||
|
||||
@@ -58,13 +58,17 @@ async def run(dry_run: bool) -> None:
|
||||
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
|
||||
return
|
||||
|
||||
async with async_session() as session:
|
||||
n = await backfill_publish_topics(session, after_id=0, limit=PAGE)
|
||||
await session.commit()
|
||||
total = 0
|
||||
after = 0
|
||||
while True:
|
||||
async with async_session() as session:
|
||||
n, after = await backfill_publish_topics(session, after_id=after, limit=PAGE)
|
||||
await session.commit()
|
||||
total += n
|
||||
if n < PAGE:
|
||||
break
|
||||
|
||||
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
|
||||
if n >= PAGE:
|
||||
print(f"[warn] PAGE({PAGE}) 도달 — 주제가 더 있을 수 있음. after_id 페이징 추가 필요.")
|
||||
print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
|
||||
Reference in New Issue
Block a user