feat(study): 공부 암기노트 Phase 1 — card_extract 추출 파이프라인 (순수 additive)

study_memo_cards 추출 파이프라인 + 버전키 폴러 + needs_review 컬럼. 운영 SR 코드(session_finalize/quiz_selection) 무수정.

- migrations 287~298: study_memo_cards/_evidence/_jobs/_progress(P1 휴면)·study_reminders·study_topics.focused_at·study_questions needs_review 3컬럼. dedup PARTIAL UNIQUE(deleted_at IS NULL).
- 워커: in-process RAG gather → MLX {cards} → 카드 가드(정량=evidence 원문 등장·cue/cloze 누출·dedup) → supersede 구버전 retire → append. 별 consumer 로 기존 study_queue 격리.
- 폴러 study_card_enqueue: 버전키 NOT EXISTS(source_version) 멱등 + ai_explanation_generated_at NOT NULL 가드 + per-poll LIMIT(thundering-herd).
- 검증: 실 prod 스키마 덤프 위 12 마이그 적용 OK + dedup/supersede/active-unique 기능 7/7 PASS + 정규화 util 15/15.

plan: PKM plans/2026-06-05-study-memo-card-p1-plan.html

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-06 21:33:12 +09:00
parent f512d94c74
commit 0a7402b327
23 changed files with 1152 additions and 0 deletions
+4
View File
@@ -157,6 +157,8 @@ class Settings(BaseModel):
# PR-MacMini-Derived-Worker-1: study explanation owner = Mac mini
# GPU 측은 false 로 설정 (.env), explanation 분기 skip guard 트리거.
study_explanation_enabled: bool = True
# 공부 암기노트 Phase 1: card_extract 폴러/consumer 게이트. owner 분리 시 false 로.
study_card_extract_enabled: bool = True
# internal endpoint Bearer token (Mac mini derived-worker 호출용)
internal_worker_token: str = ""
@@ -167,6 +169,7 @@ def load_settings() -> Settings:
# 환경변수 (docker-compose에서 주입)
database_url = os.getenv("DATABASE_URL", "")
study_explanation_enabled = os.getenv("STUDY_EXPLANATION_ENABLED", "true").lower() in ("1", "true", "yes")
study_card_extract_enabled = os.getenv("STUDY_CARD_EXTRACT_ENABLED", "true").lower() in ("1", "true", "yes")
internal_worker_token = os.getenv("INTERNAL_WORKER_TOKEN", "")
jwt_secret = os.getenv("JWT_SECRET", "")
totp_secret = os.getenv("TOTP_SECRET", "")
@@ -262,6 +265,7 @@ def load_settings() -> Settings:
document_types=document_types,
upload=upload_cfg,
study_explanation_enabled=study_explanation_enabled,
study_card_extract_enabled=study_card_extract_enabled,
internal_worker_token=internal_worker_token,
)
+6
View File
@@ -54,6 +54,8 @@ async def lifespan(app: FastAPI):
from workers.queue_consumer import consume_queue, consume_markdown_queue
from workers.study_queue_consumer import consume_study_queue
from workers.study_session_queue_consumer import consume_study_session_queue
from workers.study_memo_card_jobs_consumer import consume_study_memo_card_queue
from workers.study_card_enqueue import run as study_card_enqueue_run
from workers.study_question_embed_worker import (
refresh_stale_related as study_q_related_refresh,
run as study_q_embed_run,
@@ -95,6 +97,10 @@ async def lifespan(app: FastAPI):
# Phase 4-B v1: study_quiz_session_jobs 처리 — 세션 단위 자유 마크다운 분석.
# 4-A 와 같은 MLX gate 공유 — 4-A 처리 중이면 직렬 대기.
scheduler.add_job(consume_study_session_queue, "interval", minutes=1, id="study_session_queue_consumer")
# 공부 암기노트 Phase 1: card_extract 큐 consumer + 버전키 폴러(study_card_enqueue).
# 별 테이블/별 consumer 로 기존 study queue 와 격리. settings.study_card_extract_enabled 게이트.
scheduler.add_job(consume_study_memo_card_queue, "interval", minutes=1, id="study_memo_card_consumer")
scheduler.add_job(study_card_enqueue_run, "interval", minutes=1, id="study_card_enqueue")
# PR-B 레거시 tier 백필 — 30분 주기로 호출되지만 KST 00:00~06:00 시간대만 실제 enqueue.
# safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화.
scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill")
+211
View File
@@ -0,0 +1,211 @@
"""study_memo_cards / study_memo_card_evidence ORM (공부 암기노트 Phase 1).
study_questions(MCQ) 와 별개로, 풀이/근거에서 추출한 암기 플래시카드 본체.
- source_kind: question(P1) / subject_note / document(P3 예약)
- format: qa(cue->fact) / cloze(빈칸). 강한 enum 미사용 (read-time 매핑).
- source_generated_at: 추출 당시 ai_explanation_generated_at — 버전 핀/stale 판정.
- needs_review DEFAULT true: 생성물이라 검토 대기로 입고.
dedup_hash PARTIAL UNIQUE(migration 288, WHERE deleted_at IS NULL) 가 중복 최종 방어선.
정정/삭제 후 supersede(구버전 카드 deleted_at 마킹)로 stale 잔류 0 — append 전에 호출해
살아있는 구카드가 새 추출을 ON CONFLICT 로 막지 않게 한다.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any, Sequence
from sqlalchemy import (
BigInteger,
Boolean,
DateTime,
ForeignKey,
Integer,
String,
Text,
func,
text,
update,
)
from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class StudyMemoCard(Base):
__tablename__ = "study_memo_cards"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
user_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("users.id", ondelete="CASCADE"), nullable=False
)
study_topic_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("study_topics.id", ondelete="CASCADE"), nullable=False
)
source_kind: Mapped[str] = mapped_column(String(40), nullable=False)
source_question_id: Mapped[int | None] = mapped_column(
BigInteger, ForeignKey("study_questions.id", ondelete="CASCADE")
)
source_subject_note_id: Mapped[int | None] = mapped_column(BigInteger)
format: Mapped[str] = mapped_column(String(20), nullable=False)
cue: Mapped[str] = mapped_column(Text, nullable=False)
fact: Mapped[str] = mapped_column(Text, nullable=False)
cloze_text: Mapped[str | None] = mapped_column(Text)
extra: Mapped[dict | None] = mapped_column(JSONB)
source_generated_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
dedup_hash: Mapped[str] = mapped_column(String(64), nullable=False)
needs_review: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True)
flagged_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
flagged_by: Mapped[str | None] = mapped_column(String(40))
model: Mapped[str | None] = mapped_column(String(120))
generated_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
deleted_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
class StudyMemoCardEvidence(Base):
"""append-only citation. UPDATE/DELETE 없음."""
__tablename__ = "study_memo_card_evidence"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
card_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("study_memo_cards.id", ondelete="CASCADE"), nullable=False
)
source_type: Mapped[str] = mapped_column(String(40), nullable=False)
source_id: Mapped[int | None] = mapped_column(BigInteger)
chunk_index: Mapped[int | None] = mapped_column(Integer)
snippet: Mapped[str | None] = mapped_column(Text)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
async def supersede_old_cards(
session: AsyncSession,
*,
source_question_id: int,
keep_generated_at: datetime | None,
) -> int:
"""같은 문제의 '다른 버전' 카드를 deleted_at 마킹(retire).
새 source_generated_at 카드 적재 '전에' 호출 — 살아있는 구버전 카드가 dedup PARTIAL
UNIQUE 로 새 추출을 막는 것을 방지(정정-후 stale 잔류 0). 같은 버전은 보존.
Returns: retire 된 행 수.
"""
stmt = (
update(StudyMemoCard)
.where(
StudyMemoCard.source_question_id == source_question_id,
StudyMemoCard.deleted_at.is_(None),
StudyMemoCard.source_generated_at.is_distinct_from(keep_generated_at),
)
.values(deleted_at=func.now())
)
result = await session.execute(stmt)
return result.rowcount or 0
async def append_card(
session: AsyncSession,
*,
user_id: int,
study_topic_id: int,
source_kind: str,
source_question_id: int | None,
format: str,
cue: str,
fact: str,
cloze_text: str | None,
dedup_hash: str,
source_generated_at: datetime | None,
model: str | None,
generated_at: datetime | None,
needs_review: bool = True,
) -> int | None:
"""카드 1장 INSERT. dedup_hash PARTIAL UNIQUE 충돌 시 None (DO NOTHING).
Returns: 새 card.id, 또는 중복으로 건너뛰면 None.
"""
stmt = (
pg_insert(StudyMemoCard)
.values(
user_id=user_id,
study_topic_id=study_topic_id,
source_kind=source_kind,
source_question_id=source_question_id,
format=format,
cue=cue,
fact=fact,
cloze_text=cloze_text,
dedup_hash=dedup_hash,
source_generated_at=source_generated_at,
needs_review=needs_review,
model=model,
generated_at=generated_at,
)
.on_conflict_do_nothing(
index_elements=["dedup_hash"],
index_where=text("deleted_at IS NULL"),
)
.returning(StudyMemoCard.id)
)
result = await session.execute(stmt)
return result.scalar_one_or_none()
async def append_card_evidence(
session: AsyncSession,
*,
card_id: int,
refs: Sequence[dict[str, Any]],
) -> int:
"""카드 인용 append-only INSERT. refs: [{source_type, source_id?, chunk_index?, snippet?}]."""
rows = [
{
"card_id": card_id,
"source_type": r.get("source_type") or "unknown",
"source_id": r.get("source_id"),
"chunk_index": r.get("chunk_index"),
"snippet": r.get("snippet"),
}
for r in refs
]
if not rows:
return 0
await session.execute(pg_insert(StudyMemoCardEvidence).values(rows))
return len(rows)
async def flag_cards_for_source(
session: AsyncSession,
*,
source_question_id: int,
reason: str,
) -> int:
"""소스 문제 정정/삭제 시 파생 카드를 needs_review=auto 마킹(임시 플래그).
최종 stale 정리는 워커 supersede 가 책임 — 이건 사용자 가시화용 즉시 플래그.
reason: 'source_changed' | 'source_deleted'.
Returns: 마킹된 행 수.
"""
stmt = (
update(StudyMemoCard)
.where(
StudyMemoCard.source_question_id == source_question_id,
StudyMemoCard.deleted_at.is_(None),
)
.values(needs_review=True, flagged_by=reason, flagged_at=func.now())
)
result = await session.execute(stmt)
return result.rowcount or 0
+92
View File
@@ -0,0 +1,92 @@
"""study_memo_card_jobs ORM — card_extract 비동기 작업 큐 (다형 소스).
231_study_question_jobs 복제 + source_kind/source_id/source_version(=ai_explanation_generated_at).
별도 테이블 + 별도 consumer(study_memo_card_jobs_consumer.py) 로 기존 study_queue_consumer 와 격리.
error_code 권장값:
- parse_fail / llm_timeout / unknown → 재시도 대상 (attempts < max_attempts)
- all_dropped → 0장 생성. completed 로 종결해 같은 버전 재추출 차단.
- no_ready_explanation → ai_explanation 미준비(race). skipped, 비재시도.
멱등 이중구조: active partial unique(migration 292)는 동시 active 1행만,
버전 멱등(같은 source_version 재추출 차단)은 폴러의 NOT EXISTS(source_version) 가 책임.
"""
from __future__ import annotations
from datetime import datetime
from typing import Any
from sqlalchemy import BigInteger, DateTime, ForeignKey, SmallInteger, String, Text, text
from sqlalchemy.dialects.postgresql import JSONB, insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import Mapped, mapped_column
from core.database import Base
class StudyMemoCardJob(Base):
__tablename__ = "study_memo_card_jobs"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
user_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("users.id", ondelete="CASCADE"), nullable=False
)
source_kind: Mapped[str] = mapped_column(String(40), nullable=False)
source_id: Mapped[int] = mapped_column(BigInteger, nullable=False)
source_version: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
kind: Mapped[str] = mapped_column(String(40), nullable=False)
status: Mapped[str] = mapped_column(String(20), nullable=False, default="pending")
attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=0)
max_attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=2)
error_code: Mapped[str | None] = mapped_column(String(40))
error_message: Mapped[str | None] = mapped_column(Text)
payload: Mapped[dict | None] = mapped_column(JSONB)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
started_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
completed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# active partial unique idx (source_kind, source_id) WHERE active 는 migration 292.
async def enqueue_study_memo_card_job(
session: AsyncSession,
*,
user_id: int,
source_kind: str,
source_id: int,
source_version: datetime | None,
kind: str = "card_extract",
payload: dict[str, Any] | None = None,
) -> bool:
"""study_memo_card_jobs 에 행 추가 (DB 레벨 동시 active 중복 방어).
같은 (source_kind, source_id) 활성 행(pending/processing)이 있으면 False.
버전 멱등(같은 source_version 재추출 차단)은 호출 측 폴러의 NOT EXISTS 가 선판단.
Returns: True = 새 enqueue, False = active 중복으로 건너뜀.
"""
values: dict[str, Any] = {
"user_id": user_id,
"source_kind": source_kind,
"source_id": source_id,
"source_version": source_version,
"kind": kind,
"status": "pending",
}
if payload is not None:
values["payload"] = payload
stmt = (
pg_insert(StudyMemoCardJob)
.values(**values)
.on_conflict_do_nothing(
index_elements=["source_kind", "source_id"],
index_where=text("status IN ('pending', 'processing')"),
)
)
result = await session.execute(stmt)
return result.rowcount > 0
+6
View File
@@ -80,6 +80,12 @@ class StudyQuestion(Base):
related_computed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
related_threshold_version: Mapped[str | None] = mapped_column(String(20))
# 공부 암기노트 Phase 1: 검수 대기 플래그 (DDL=migration 296). 정정/삭제 훅 + needs_review 큐가 set/clear.
# flagged_by 권장값: 'user' / 'source_changed' / 'source_deleted' (서버측 상수, read-time 매핑).
needs_review: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
flagged_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
flagged_by: Mapped[str | None] = mapped_column(String(40))
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, nullable=False
)
+39
View File
@@ -0,0 +1,39 @@
당신은 한국 기사시험(가스기사·산업안전기사 등) 필기 학습 보조 AI 입니다.
이미 검증된 풀이와 근거 자료에서 '암기 플래시카드'를 추출합니다.
【문제】
{question_text}
【보기】
1. {choice_1}
2. {choice_2}
3. {choice_3}
4. {choice_4}
【사용자가 입력한 정답】
{correct_choice}번
【확정 풀이 (검증 통과, 정성 사실의 1순위 근거)】
{ai_explanation}
【참고 자료 (정량 cloze 의 원문 근거)】
▼ 자료
{documents_evidence_block}
▼ 같은 주제의 다른 문제
{questions_evidence_block}
【카드 추출 지침】
1. 위 '확정 풀이'와 '참고 자료'에서 시험에 나올 핵심 사실을 1~3장의 카드로 추출한다.
2. 카드 형식(format)은 두 가지:
- "qa": cue(질문/단서) -> fact(핵심 사실 한 줄).
- "cloze": 완전한 사실 문장에서 핵심 토큰 하나를 빈칸 [____] 로 가린 cloze_text + 그 가린 정답을 fact 에.
3. **정량 토큰(수치·압력·온도·기준값·표준번호·조항)을 cloze 정답으로 쓸 때, 그 토큰은 반드시 위 '참고 자료' 원문에 그대로 등장해야 한다.** 확정 풀이에만 있고 자료에 없는 수치는 카드로 만들지 않는다. 단위는 자료 표기 그대로 쓰고 환산하지 않는다.
4. cue 에 정답(fact)을 노출하지 않는다. cloze_text 의 빈칸 밖 평문에도 정답을 노출하지 않는다.
5. **할루시네이션 방지 (절대 규칙)**: 근거 없는 수치·공식·표준 번호·법령 조항을 새로 만들어내지 않는다. 자료/풀이에서 확인되지 않는 내용은 카드로 만들지 않는다. "보통 ~이다" 같은 모호한 단정도 근거 없으면 쓰지 않는다.
6. 카드는 최대 3장. 가장 시험가치 높은 사실 위주로, 억지로 채우지 않는다(0장도 허용).
7. **출력은 raw JSON 한 객체만**. 메타 설명·인사·코드 펜스·thinking 텍스트 없이.
【출력 형식】
{{"cards": [{{"format": "qa|cloze", "cue": "<앞면 단서/질문>", "fact": "<핵심 사실/정답 토큰>", "cloze_text": "<cloze 일 때만, 빈칸 [____] 포함 문장>"}}]}}
+85
View File
@@ -0,0 +1,85 @@
"""공부 암기노트 카드 — 정량 토큰 정규화 + dedup 키 + 누출/근거 1차 primitives.
정규화 정책(보수적 = restrictive):
- NFC 유니코드 정규화
- 수치와 단위 사이 공백 제거 ('0.5 MPa' -> '0.5MPa')
- 천단위 구분자(콤마) 제거 ('1,000kg' -> '1000kg'), 숫자 3자리 그룹 한정
- 단위 환산 절대 금지 (원문 표기 보존 — LLM 오변환을 정규화로 흡수하지 않음)
대소문자는 보존한다 (MPa vs mpa 는 다른 단위라 lowercase 안 함).
dedup_hash = sha256(source_question_id | format | normalize_token(정답토큰)).
"""
from __future__ import annotations
import hashlib
import re
import unicodedata
# 수치 다음의 공백 + (단위로 시작하는) 토큰 사이 공백 제거.
_NUM_UNIT_SPACE = re.compile(r"(\d)\s+(?=[A-Za-z℃°%‰Ωµμ/])")
# 천단위 콤마: 숫자 뒤 콤마 + 정확히 3자리 숫자 그룹이 이어질 때만 (소수점/일반 콤마 보호).
_THOUSANDS = re.compile(r"(?<=\d),(?=\d{3}(?:\D|$))")
_WS = re.compile(r"\s+")
# cloze 빈칸 마커: [____] / [___] / {{...}} / ____ 등.
_BLANK = re.compile(r"\[_+\]|\{\{[^}]*\}\}|_{2,}")
_DIGIT = re.compile(r"\d")
def normalize_token(s: str | None) -> str:
"""단일 정답 토큰 정규화 (대소문자 보존). dedup 키·근거 매칭의 단위."""
if not s:
return ""
s = unicodedata.normalize("NFC", s)
s = _NUM_UNIT_SPACE.sub(r"\1", s)
s = _THOUSANDS.sub("", s)
return s.strip()
def normalize_for_match(s: str | None) -> str:
"""근거 텍스트/문장 비교용 — 토큰 정규화 + 공백 축약 (대소문자 보존)."""
if not s:
return ""
s = normalize_token(s)
return _WS.sub(" ", s).strip()
def compute_dedup_hash(source_question_id: int | None, fmt: str, answer_token: str | None) -> str:
"""정본 키: sha256(source_question_id | format | normalize_token(정답토큰))."""
key = f"{source_question_id}|{fmt}|{normalize_token(answer_token)}"
return hashlib.sha256(key.encode("utf-8")).hexdigest()
def is_quantitative(token: str | None) -> bool:
"""숫자를 포함하면 정량 토큰 (정량 cloze 는 evidence 원문 등장 필수)."""
return bool(_DIGIT.search(normalize_token(token)))
def text_contains(haystack: str | None, needle: str | None) -> bool:
"""needle(정답토큰)이 haystack 안에 정규화 후 부분문자열로 등장하면 True."""
n = normalize_for_match(needle)
if not n:
return False
return n in normalize_for_match(haystack)
def is_cue_leak(cue: str | None, answer_token: str | None) -> bool:
"""cue(앞면)에 정답토큰이 노출되면 True (drop 대상)."""
return text_contains(cue, answer_token)
def is_cloze_self_leak(cloze_text: str | None, answer_token: str | None) -> bool:
"""cloze_text 의 빈칸 마커를 제거한 평문에 정답토큰이 노출되면 True (drop 대상)."""
if not cloze_text:
return False
stripped = _BLANK.sub(" ", cloze_text)
return text_contains(stripped, answer_token)
def matching_evidence(answer_token: str | None, evidence_refs: list[dict]) -> list[dict]:
"""정답토큰이 snippet 에 등장하는 evidence_refs 만 반환 (citation 적재용)."""
out: list[dict] = []
for ref in evidence_refs or []:
if text_contains(ref.get("snippet"), answer_token):
out.append(ref)
return out
@@ -0,0 +1,105 @@
"""공부 암기노트 카드별 가드 — 추출된 카드 1장 검증 파이프라인.
explanation 워커의 단일 answer_choice 환각가드를 카드 배열로 확장한다. 가드 4종:
1. 형식 유효성 — format in {qa, cloze}, cue/fact 비공백, cloze 는 cloze_text + 빈칸 마커 필요.
2. 근거(hallucination) — 정답토큰(fact)이 신뢰 텍스트에 등장해야 채택.
정량 토큰(숫자 포함): evidence 원문 snippet 에 등장 필수 (평문화된 ai_explanation 만으론 불충분).
비정량(개념): ai_explanation 또는 evidence snippet 에 등장.
3. 누출 — cue 에 정답 노출 / cloze 평문에 정답 노출 시 drop.
4. dedup — (source_question_id, format, normalize(정답토큰)) hash. 배치 내 중복 1장.
무결성은 구조로(메모리 규칙): dedup_hash PARTIAL UNIQUE(migration 288)가 DB 최종 방어선,
본 가드는 1차. 전부 drop 이면 빈 리스트 → 워커가 all_dropped 로 종결.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from services.study import card_normalize as cn
_VALID_FORMATS = {"qa", "cloze"}
@dataclass
class GuardedCard:
format: str
cue: str
fact: str
cloze_text: str | None
dedup_hash: str
matched_evidence: list[dict] = field(default_factory=list)
def guard_card(
card: dict,
*,
source_question_id: int | None,
ai_explanation: str | None,
evidence_refs: list[dict],
) -> GuardedCard | None:
"""카드 1장 검증. 통과하면 GuardedCard, 탈락하면 None."""
fmt = (card.get("format") or "").strip()
cue = (card.get("cue") or "").strip()
fact = (card.get("fact") or "").strip()
cloze_text = card.get("cloze_text")
cloze_text = cloze_text.strip() if isinstance(cloze_text, str) else None
# 1. 형식 유효성
if fmt not in _VALID_FORMATS or not cue or not fact:
return None
if fmt == "cloze":
if not cloze_text or not cn._BLANK.search(cloze_text):
return None
# 3. 누출 (정답 노출)
if cn.is_cue_leak(cue, fact):
return None
if fmt == "cloze" and cn.is_cloze_self_leak(cloze_text, fact):
return None
# 2. 근거 (hallucination 차단)
matched = cn.matching_evidence(fact, evidence_refs)
if cn.is_quantitative(fact):
# 정량 토큰은 evidence 원문 등장 필수
if not matched:
return None
else:
# 비정량은 ai_explanation 또는 evidence 에 등장
if not matched and not cn.text_contains(ai_explanation, fact):
return None
return GuardedCard(
format=fmt,
cue=cue,
fact=fact,
cloze_text=cloze_text if fmt == "cloze" else None,
dedup_hash=cn.compute_dedup_hash(source_question_id, fmt, fact),
matched_evidence=matched,
)
def guard_cards(
cards: list[dict],
*,
source_question_id: int | None,
ai_explanation: str | None,
evidence_refs: list[dict],
) -> list[GuardedCard]:
"""카드 배열 검증 + 배치 내 dedup_hash 중복 1장. 통과 카드만 반환."""
out: list[GuardedCard] = []
seen: set[str] = set()
for card in cards or []:
if not isinstance(card, dict):
continue
g = guard_card(
card,
source_question_id=source_question_id,
ai_explanation=ai_explanation,
evidence_refs=evidence_refs,
)
if g is None or g.dedup_hash in seen:
continue
seen.add(g.dedup_hash)
out.append(g)
return out
+80
View File
@@ -0,0 +1,80 @@
"""study_card_enqueue — 버전키 폴러 (공부 암기노트 Phase 1).
ready ai_explanation 인데 '현재 버전' card_extract job 이 없는 question 을 enqueue.
버전 멱등(핵심): NOT EXISTS(job WHERE source_kind='question' AND source_id=q.id
AND source_version=q.ai_explanation_generated_at)
- 같은 버전 재추출 차단 — completed/all_dropped job 도 현 버전에 존재하면 재enqueue 0(livelock 방지).
- explanation 재생성(새 generated_at)이면 새 버전 job 부재 → 자동 재추출(정정-stale 해소).
NULL 가드: ai_explanation_generated_at IS NOT NULL 전제 — NULL 이면 NULL=NULL=UNKNOWN 으로
NOT EXISTS 가 항상 참이 되어 매 폴 재enqueue 폭주. ready 전이 직후 race 를 이 가드가 막는다.
thundering-herd: per-poll LIMIT(CARD_ENQUEUE_BATCH) + 최근(generated_at desc) 우선으로 backfill 완만.
"""
from __future__ import annotations
import logging
from sqlalchemy import select
from core.config import settings
from core.database import async_session
from models.study_memo_card_job import StudyMemoCardJob, enqueue_study_memo_card_job
from models.study_question import StudyQuestion
logger = logging.getLogger("study_card_enqueue")
CARD_ENQUEUE_BATCH = 20
SOURCE_KIND_QUESTION = "question"
async def run() -> None:
"""APScheduler 진입점. ready & 현 버전 job 부재 question 을 BATCH 만큼 enqueue."""
if not getattr(settings, "study_card_extract_enabled", True):
return
async with async_session() as session:
# 현재 ai_explanation_generated_at 버전에 대한 job 이 이미 있는지 (correlated NOT EXISTS).
job_exists = (
select(StudyMemoCardJob.id)
.where(
StudyMemoCardJob.source_kind == SOURCE_KIND_QUESTION,
StudyMemoCardJob.source_id == StudyQuestion.id,
StudyMemoCardJob.source_version == StudyQuestion.ai_explanation_generated_at,
)
.exists()
)
rows = (
await session.execute(
select(
StudyQuestion.id,
StudyQuestion.user_id,
StudyQuestion.ai_explanation_generated_at,
)
.where(
StudyQuestion.deleted_at.is_(None),
StudyQuestion.ai_explanation_status == "ready",
StudyQuestion.ai_explanation_generated_at.is_not(None),
~job_exists,
)
.order_by(StudyQuestion.ai_explanation_generated_at.desc())
.limit(CARD_ENQUEUE_BATCH)
)
).all()
if not rows:
return
enqueued = 0
for r in rows:
ok = await enqueue_study_memo_card_job(
session,
user_id=r.user_id,
source_kind=SOURCE_KIND_QUESTION,
source_id=r.id,
source_version=r.ai_explanation_generated_at,
kind="card_extract",
)
if ok:
enqueued += 1
await session.commit()
if enqueued:
logger.info("study_card_enqueue candidates=%d enqueued=%d", len(rows), enqueued)
@@ -0,0 +1,87 @@
"""study_memo_card_jobs consumer — APScheduler 1분 간격 (공부 암기노트 Phase 1).
기존 study_queue_consumer / study_session_queue_consumer 와 별도 테이블·별도 consumer 라
자연 격리된다 (정본 제약: 기존 consumer 무수정). study_session_queue_consumer 골격 복제.
BATCH_SIZE=1, MLX gate Semaphore(1) 공유 — explanation/session 처리 중이면 직렬 대기.
STALE_MINUTES=10 자체 복구.
dispatch: kind=='card_extract' -> run_card_extract_job, 그 외 -> skipped(else 분기 silent loss 방지).
"""
from __future__ import annotations
from datetime import datetime, timedelta, timezone
from sqlalchemy import select, update
from sqlalchemy.exc import SQLAlchemyError
from core.database import async_session
from core.utils import setup_logger
from models.study_memo_card_job import StudyMemoCardJob
from workers.study_memo_card_worker import run_card_extract_job
logger = setup_logger("study_memo_card_jobs_consumer")
BATCH_SIZE = 1
STALE_MINUTES = 10
async def reset_stale_card_jobs() -> None:
"""processing 으로 STALE_MINUTES 이상 방치된 job 을 pending 으로 복구."""
cutoff = datetime.now(timezone.utc) - timedelta(minutes=STALE_MINUTES)
try:
async with async_session() as session:
stmt = (
update(StudyMemoCardJob)
.where(
StudyMemoCardJob.status == "processing",
StudyMemoCardJob.started_at.is_not(None),
StudyMemoCardJob.started_at < cutoff,
)
.values(status="pending", started_at=None)
)
result = await session.execute(stmt)
await session.commit()
n = result.rowcount or 0
if n > 0:
logger.warning("study_memo_card_jobs_stale_reset count=%s", n)
except SQLAlchemyError as e:
logger.exception("study_memo_card_jobs_stale_reset_failed: %s", e)
async def consume_study_memo_card_queue() -> None:
"""APScheduler 진입점. pending card_extract job 을 BATCH_SIZE 만큼 처리."""
await reset_stale_card_jobs()
async with async_session() as session:
rows = (
await session.execute(
select(StudyMemoCardJob)
.where(StudyMemoCardJob.status == "pending")
.order_by(StudyMemoCardJob.id.asc())
.limit(BATCH_SIZE)
)
).scalars().all()
for job_row in rows:
async with async_session() as s:
try:
job = await s.get(StudyMemoCardJob, job_row.id)
if job is None or job.status != "pending":
continue
if job.kind == "card_extract":
await run_card_extract_job(s, job)
else:
# 미지원 kind — lost-in-queue 방지 위해 명시 skipped.
job.status = "skipped"
job.error_code = "unknown"
job.error_message = f"unsupported kind: {job.kind!r}"
job.completed_at = datetime.now(timezone.utc)
await s.commit()
logger.info(
"card_extract_processed id=%s src=%s/%s status=%s error_code=%s attempts=%s",
job.id, job.source_kind, job.source_id, job.status, job.error_code,
job.attempts,
)
except Exception as e:
await s.rollback()
logger.exception("card_extract_outer_failed job_id=%s: %s", job_row.id, e)
+251
View File
@@ -0,0 +1,251 @@
"""card_extract 워커 — ready 풀이/근거에서 암기 플래시카드 추출 (공부 암기노트 Phase 1).
study_explanation_worker.run_explanation_job 골격 복제:
1. ready 게이트 + RAG(gather_explanation_context) + ai_explanation + evidence_refs (in-process)
2. ai_explanation 미준비면 LLM 호출 X (no_ready_explanation, skipped)
3. MLX primary (gate BACKGROUND, gate 안에서만 timeout) -> {cards:[...]} JSON
4. 카드별 가드(study_memo_card_guards) — 근거(정량=원문등장)·누출·dedup
5. 통과 카드 있으면 supersede(구버전 retire) -> append + evidence. 0장이면 all_dropped(completed).
GPU = in-process RAG provider (explanation 워커와 동일 구조; internal_study card-context
endpoint 는 호출자 0 scaffold). Mac mini = call_primary 생성.
재시도: llm_timeout/parse_fail/unknown 만 (attempts < max_attempts), all_dropped/no_ready 는 종결.
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response
from models.study_memo_card import (
append_card,
append_card_evidence,
supersede_old_cards,
)
from models.study_memo_card_job import StudyMemoCardJob
from models.study_question import StudyQuestion
from models.user import User # noqa: F401 (mapper 초기화 defensive)
from services.search.llm_gate import Priority, acquire_mlx_gate
from services.study.explanation_rag import (
gather_explanation_context,
render_evidence_block,
)
from services.study.study_memo_card_guards import guard_cards
logger = logging.getLogger("study_memo_card_worker")
# 다카드 출력이라 explanation(30s)보다 여유. config primary.timeout(180, soft-lock)은 미변경.
CARD_LLM_TIMEOUT_S = 45.0
SOURCE_KIND_QUESTION = "question"
_ENVELOPE_PROMPT_FILE = "study_card_envelope.txt"
_envelope_template_cache: str | None = None
def _load_card_envelope_prompt() -> str:
global _envelope_template_cache
if _envelope_template_cache is None:
prompts_dir = Path(__file__).resolve().parent.parent / "prompts"
_envelope_template_cache = (
prompts_dir / _ENVELOPE_PROMPT_FILE
).read_text(encoding="utf-8")
return _envelope_template_cache
def _render_card_envelope_prompt(
q: StudyQuestion, doc_block: str, q_block: str, ai_explanation: str
) -> str:
return (
_load_card_envelope_prompt()
.replace("{question_text}", q.question_text or "")
.replace("{choice_1}", q.choice_1 or "")
.replace("{choice_2}", q.choice_2 or "")
.replace("{choice_3}", q.choice_3 or "")
.replace("{choice_4}", q.choice_4 or "")
.replace("{correct_choice}", str(q.correct_choice))
.replace("{ai_explanation}", ai_explanation or "")
.replace("{documents_evidence_block}", doc_block)
.replace("{questions_evidence_block}", q_block)
)
async def run_card_extract_job(session: AsyncSession, job: StudyMemoCardJob) -> None:
"""study_memo_card_jobs row 1건 처리. caller(consumer)가 commit 책임.
종료 시 completed / failed / skipped / pending(재시도) 중 하나.
"""
now = lambda: datetime.now(timezone.utc) # noqa: E731
job.attempts += 1
job.status = "processing"
job.started_at = now()
await session.flush()
try:
# P1 은 question source 만. 다른 source_kind 는 미구현 — skipped.
if job.source_kind != SOURCE_KIND_QUESTION:
job.error_code = "unknown"
job.error_message = f"unsupported source_kind: {job.source_kind!r}"
job.status = "skipped"
job.completed_at = now()
return
question = await session.get(StudyQuestion, job.source_id)
if question is None or question.deleted_at is not None:
job.error_code = "no_ready_explanation"
job.error_message = "source question deleted or missing"
job.status = "skipped"
job.completed_at = now()
return
# ready 게이트 — explanation 이 ready 가 아니면(정정으로 stale 등) 추출 보류.
if question.ai_explanation_status != "ready" or not (question.ai_explanation or "").strip():
job.error_code = "no_ready_explanation"
job.error_message = f"ai_explanation_status={question.ai_explanation_status}"
job.status = "skipped"
job.completed_at = now()
return
source_version = question.ai_explanation_generated_at
# 1. RAG 근거 (in-process). ai_explanation 이 정성 1순위, evidence 가 정량 원문.
ctx = await gather_explanation_context(session, question.user_id, question)
evidence_refs = [it.to_dict() for it in ctx.all]
doc_block = render_evidence_block(ctx.documents)
q_block = render_evidence_block(ctx.questions)
prompt = _render_card_envelope_prompt(question, doc_block, q_block, question.ai_explanation)
# 2. MLX primary
ai_client = AIClient()
try:
async with acquire_mlx_gate(Priority.BACKGROUND):
async with asyncio.timeout(CARD_LLM_TIMEOUT_S):
raw_text = await ai_client.call_primary(prompt)
primary_name = (
ai_client.ai.primary.model
if hasattr(ai_client.ai, "primary") and hasattr(ai_client.ai.primary, "model")
else "primary"
)
finally:
await ai_client.close()
if not raw_text or not raw_text.strip():
job.error_code = "llm_timeout"
job.error_message = "empty response from primary"
return
# 3. {cards:[...]} 파싱
def _save_raw_preview(reason: str) -> None:
existing = dict(job.payload or {})
existing["debug_raw_preview"] = (raw_text or "")[:1000]
existing["parse_fail_reason"] = reason
job.payload = existing
envelope = parse_json_response(raw_text)
if envelope is None or not isinstance(envelope, dict):
job.error_code = "parse_fail"
job.error_message = "envelope JSON parse failed"
_save_raw_preview("not_dict")
return
cards = envelope.get("cards")
if not isinstance(cards, list):
job.error_code = "parse_fail"
job.error_message = f"cards not a list: {type(cards).__name__}"
_save_raw_preview("cards_not_list")
return
# 4. 카드별 가드
guarded = guard_cards(
cards,
source_question_id=question.id,
ai_explanation=question.ai_explanation,
evidence_refs=evidence_refs,
)
payload = dict(job.payload or {})
payload["cards_generated"] = len(cards)
payload["cards_kept"] = len(guarded)
if not guarded:
# 전량 drop — completed 로 종결해 같은 버전 재추출 차단(재시도 집합에서 제외).
payload["cards_inserted"] = 0
job.payload = payload
job.error_code = "all_dropped"
job.status = "completed"
job.completed_at = now()
return
# 5. 성공 — 구버전 카드 retire 후 append (dedup partial unique 충돌 회피).
await supersede_old_cards(
session, source_question_id=question.id, keep_generated_at=source_version
)
model_name = f"mlx:{primary_name}"
inserted = 0
for g in guarded:
card_id = await append_card(
session,
user_id=question.user_id,
study_topic_id=question.study_topic_id,
source_kind=SOURCE_KIND_QUESTION,
source_question_id=question.id,
format=g.format,
cue=g.cue,
fact=g.fact,
cloze_text=g.cloze_text,
dedup_hash=g.dedup_hash,
source_generated_at=source_version,
model=model_name,
generated_at=now(),
needs_review=True,
)
if card_id is not None:
inserted += 1
if g.matched_evidence:
await append_card_evidence(
session, card_id=card_id, refs=g.matched_evidence
)
payload["cards_inserted"] = inserted
job.payload = payload
job.status = "completed"
job.completed_at = now()
return
except (asyncio.TimeoutError, httpx.HTTPError) as e:
job.error_code = "llm_timeout"
job.error_message = f"{type(e).__name__}: {e}"
logger.warning(
"card_extract_job_timeout job_id=%s src=%s/%s: %s",
job.id, job.source_kind, job.source_id, e,
)
except (json.JSONDecodeError, ValueError) as e:
job.error_code = "parse_fail"
job.error_message = f"{type(e).__name__}: {e}"
logger.warning(
"card_extract_job_parse_fail job_id=%s src=%s/%s: %s",
job.id, job.source_kind, job.source_id, e,
)
except Exception as e:
job.error_code = "unknown"
job.error_message = f"{type(e).__name__}: {e}"
logger.exception(
"card_extract_job_unknown_fail job_id=%s src=%s/%s",
job.id, job.source_kind, job.source_id,
)
finally:
# 재시도 분기 — all_dropped/no_ready/unsupported 는 위에서 이미 종결.
# 여기 도달 = llm_timeout / parse_fail / unknown.
if job.status == "processing":
retryable = job.error_code in ("llm_timeout", "parse_fail", "unknown")
if retryable and job.attempts < job.max_attempts:
job.status = "pending"
else:
job.status = "failed"
job.completed_at = now()