0a7402b327
study_memo_cards 추출 파이프라인 + 버전키 폴러 + needs_review 컬럼. 운영 SR 코드(session_finalize/quiz_selection) 무수정.
- migrations 287~298: study_memo_cards/_evidence/_jobs/_progress(P1 휴면)·study_reminders·study_topics.focused_at·study_questions needs_review 3컬럼. dedup PARTIAL UNIQUE(deleted_at IS NULL).
- 워커: in-process RAG gather → MLX {cards} → 카드 가드(정량=evidence 원문 등장·cue/cloze 누출·dedup) → supersede 구버전 retire → append. 별 consumer 로 기존 study_queue 격리.
- 폴러 study_card_enqueue: 버전키 NOT EXISTS(source_version) 멱등 + ai_explanation_generated_at NOT NULL 가드 + per-poll LIMIT(thundering-herd).
- 검증: 실 prod 스키마 덤프 위 12 마이그 적용 OK + dedup/supersede/active-unique 기능 7/7 PASS + 정규화 util 15/15.
plan: PKM plans/2026-06-05-study-memo-card-p1-plan.html
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
93 lines
3.7 KiB
Python
93 lines
3.7 KiB
Python
"""study_memo_card_jobs ORM — card_extract 비동기 작업 큐 (다형 소스).
|
|
|
|
231_study_question_jobs 복제 + source_kind/source_id/source_version(=ai_explanation_generated_at).
|
|
별도 테이블 + 별도 consumer(study_memo_card_jobs_consumer.py) 로 기존 study_queue_consumer 와 격리.
|
|
|
|
error_code 권장값:
|
|
- parse_fail / llm_timeout / unknown → 재시도 대상 (attempts < max_attempts)
|
|
- all_dropped → 0장 생성. completed 로 종결해 같은 버전 재추출 차단.
|
|
- no_ready_explanation → ai_explanation 미준비(race). skipped, 비재시도.
|
|
|
|
멱등 이중구조: active partial unique(migration 292)는 동시 active 1행만,
|
|
버전 멱등(같은 source_version 재추출 차단)은 폴러의 NOT EXISTS(source_version) 가 책임.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
from sqlalchemy import BigInteger, DateTime, ForeignKey, SmallInteger, String, Text, text
|
|
from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy.orm import Mapped, mapped_column
|
|
|
|
from core.database import Base
|
|
|
|
|
|
class StudyMemoCardJob(Base):
|
|
__tablename__ = "study_memo_card_jobs"
|
|
|
|
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
|
|
user_id: Mapped[int] = mapped_column(
|
|
BigInteger, ForeignKey("users.id", ondelete="CASCADE"), nullable=False
|
|
)
|
|
|
|
source_kind: Mapped[str] = mapped_column(String(40), nullable=False)
|
|
source_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
|
|
source_version: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
|
|
|
kind: Mapped[str] = mapped_column(String(40), nullable=False)
|
|
status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending")
|
|
attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=0)
|
|
max_attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=2)
|
|
error_code: Mapped[str | None] = mapped_column(String(40))
|
|
error_message: Mapped[str | None] = mapped_column(Text)
|
|
payload: Mapped[dict | None] = mapped_column(JSONB)
|
|
created_at: Mapped[datetime] = mapped_column(
|
|
DateTime(timezone=True), default=datetime.now, nullable=False
|
|
)
|
|
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
|
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
|
|
|
|
# active partial unique idx (source_kind, source_id) WHERE active 는 migration 292.
|
|
|
|
|
|
async def enqueue_study_memo_card_job(
|
|
session: AsyncSession,
|
|
*,
|
|
user_id: int,
|
|
source_kind: str,
|
|
source_id: int,
|
|
source_version: datetime | None,
|
|
kind: str = "card_extract",
|
|
payload: dict[str, Any] | None = None,
|
|
) -> bool:
|
|
"""study_memo_card_jobs 에 행 추가 (DB 레벨 동시 active 중복 방어).
|
|
|
|
같은 (source_kind, source_id) 활성 행(pending/processing)이 있으면 False.
|
|
버전 멱등(같은 source_version 재추출 차단)은 호출 측 폴러의 NOT EXISTS 가 선판단.
|
|
|
|
Returns: True = 새 enqueue, False = active 중복으로 건너뜀.
|
|
"""
|
|
values: dict[str, Any] = {
|
|
"user_id": user_id,
|
|
"source_kind": source_kind,
|
|
"source_id": source_id,
|
|
"source_version": source_version,
|
|
"kind": kind,
|
|
"status": "pending",
|
|
}
|
|
if payload is not None:
|
|
values["payload"] = payload
|
|
stmt = (
|
|
pg_insert(StudyMemoCardJob)
|
|
.values(**values)
|
|
.on_conflict_do_nothing(
|
|
index_elements=["source_kind", "source_id"],
|
|
index_where=text("status IN ('pending', 'processing')"),
|
|
)
|
|
)
|
|
result = await session.execute(stmt)
|
|
return result.rowcount > 0
|