From 9d4aa201a87dbfe4a7ed33cd3ba757e803e14481 Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Tue, 28 Apr 2026 08:54:02 +0900 Subject: [PATCH] =?UTF-8?q?feat(study):=20study=5Fquestions=20=EC=9E=90?= =?UTF-8?q?=EB=8F=99=20=EC=9E=84=EB=B2=A0=EB=94=A9=20(PR-4)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 문제 본문 + 보기 1~4 → bge-m3 1024차원. status 자체가 큐 역할 (별도 큐 테이블 없음 — ProcessingQueue 인프라 영향 0). APScheduler 1분 cron 이 status in {none, failed, stale} 행을 batch=10 처리. 새 문제는 default 'none' 으로 자동 backfill. 데이터 모델 (migrations 193~194): - study_questions: embedding vector(1024), embedding_status VARCHAR(20) DEFAULT 'none' (none/pending/ready/failed/stale), embedding_updated_at, embedding_model - HNSW partial index (vector_cosine_ops) WHERE deleted_at IS NULL AND embedding IS NOT NULL — bge-m3 cosine 기준, documents.embedding (ivfflat) 과 ops 일관 재계산 트리거: question_text / choice_1~4 변경 시 ready→stale 자동. correct_choice / explanation / subject / scope 변경은 재계산 안 함 (의미 검색에 영향 없음). 워커 (workers/study_question_embed_worker.py): - race-safe pending 마킹 (조건부 UPDATE WHERE status IN none/failed/stale) - AIClient.embed(text) bge-m3 호출, 15s timeout - 실패 시 status='failed', 직전 embedding 보존, 다음 cron 틱에 재시도 - 본문 = "문제: ...\n보기:\n1. ...\n2. ...\n3. ...\n4. ..." (subject/scope 의도 제외 — 분류명이 의미 검색 노이즈) 후속 PR 예정: 비슷한 문제 검색 UI / 중복 입력 감지 / RAG 정확도 향상 / 오답 클러스터링. 본 PR 은 임베딩 저장·재계산·backfill 까지만. Co-Authored-By: Claude Opus 4.7 (1M context) --- app/api/study_questions.py | 12 +- app/main.py | 4 + app/models/study_question.py | 13 ++ app/workers/study_question_embed_worker.py | 134 ++++++++++++++++++ migrations/193_study_questions_embedding.sql | 18 +++ .../194_study_questions_embedding_idx.sql | 11 ++ 6 files changed, 190 insertions(+), 2 deletions(-) create mode 100644 app/workers/study_question_embed_worker.py create mode 100644 migrations/193_study_questions_embedding.sql create mode 100644 migrations/194_study_questions_embedding_idx.sql diff --git a/app/api/study_questions.py b/app/api/study_questions.py index 52c9d86..ad533a8 100644 --- a/app/api/study_questions.py +++ b/app/api/study_questions.py @@ -620,12 +620,20 @@ async def update_question( # PR-3: 문제 핵심 필드 변경 시 AI 해설 stale 전이 (본문은 보존, UI 배지로 안내). # ready 상태에서만 stale 로 전이 — pending/failed/none/stale 은 변경 안 함. - STALE_TRIGGER = { + AI_STALE_TRIGGER = { "question_text", "choice_1", "choice_2", "choice_3", "choice_4", "correct_choice", } - if STALE_TRIGGER & fields_set and q.ai_explanation_status == "ready": + if AI_STALE_TRIGGER & fields_set and q.ai_explanation_status == "ready": q.ai_explanation_status = "stale" + # PR-4: 임베딩 stale 전이. 본문(question_text/choice_*)이 바뀌었을 때만 재계산. + # correct_choice 변경은 의미 검색에 영향 없으므로 재계산 안 함. + EMBED_STALE_TRIGGER = { + "question_text", "choice_1", "choice_2", "choice_3", "choice_4", + } + if EMBED_STALE_TRIGGER & fields_set and q.embedding_status == "ready": + q.embedding_status = "stale" + q.updated_at = datetime.now(timezone.utc) await session.commit() diff --git a/app/main.py b/app/main.py index 750f1b6..1864b58 100644 --- a/app/main.py +++ b/app/main.py @@ -43,6 +43,7 @@ async def lifespan(app: FastAPI): from workers.mailplus_archive import run as mailplus_run from workers.news_collector import run as news_collector_run from workers.queue_consumer import consume_queue + from workers.study_question_embed_worker import run as study_q_embed_run from workers.tier_backfill import run as tier_backfill_run from workers.upload_cleanup import cleanup_orphan_uploads @@ -64,6 +65,9 @@ async def lifespan(app: FastAPI): scheduler.add_job(consume_queue, "interval", minutes=1, id="queue_consumer") scheduler.add_job(watch_inbox, "interval", minutes=5, id="file_watcher") scheduler.add_job(cleanup_orphan_uploads, "interval", minutes=10, id="upload_cleanup") + # PR-4: study_questions 자동 임베딩 (status='none/failed/stale' 행을 batch=10 처리). + # 별도 큐 테이블 없이 status 자체가 큐. backfill 도 cron 이 'none' 행을 자연스럽게 처리. + scheduler.add_job(study_q_embed_run, "interval", minutes=1, id="study_q_embed") # PR-B 레거시 tier 백필 — 30분 주기로 호출되지만 KST 00:00~06:00 시간대만 실제 enqueue. # safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화. scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill") diff --git a/app/models/study_question.py b/app/models/study_question.py index 0d5bef0..e930a9e 100644 --- a/app/models/study_question.py +++ b/app/models/study_question.py @@ -9,6 +9,7 @@ PR-2 가드레일: from datetime import datetime +from pgvector.sqlalchemy import Vector from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, SmallInteger, String, Text from sqlalchemy.orm import Mapped, mapped_column, relationship @@ -53,6 +54,18 @@ class StudyQuestion(Base): ) ai_explanation_model: Mapped[str | None] = mapped_column(String(120)) + # PR-4: 자동 임베딩 (bge-m3 1024차원). status 가 큐 역할. + # 재계산 트리거 = question_text / choice_1~4 변경. + # correct_choice / subject / scope / explanation 변경은 재계산 안 함. + embedding = mapped_column(Vector(1024), nullable=True) + embedding_status: Mapped[str] = mapped_column( + String(20), default="none", nullable=False + ) + embedding_updated_at: Mapped[datetime | None] = mapped_column( + DateTime(timezone=True) + ) + embedding_model: Mapped[str | None] = mapped_column(String(120)) + created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=datetime.now, nullable=False ) diff --git a/app/workers/study_question_embed_worker.py b/app/workers/study_question_embed_worker.py new file mode 100644 index 0000000..b1bc9be --- /dev/null +++ b/app/workers/study_question_embed_worker.py @@ -0,0 +1,134 @@ +"""study_questions 자동 임베딩 워커 (PR-4). + +별도 큐 테이블 없이 `embedding_status` 자체가 큐 역할: + status in {none, failed, stale} → cron 처리 대상 + status='pending' → race-safe 마킹 (조건부 UPDATE) + status='ready' → embedding 완료 + status='failed' → 다음 cron 틱에 재시도 + +호출: + - main.py lifespan APScheduler 가 1분 간격 cron 으로 run() 진입 + - 새 문제 입력은 default 'none' → 다음 틱에 자동 처리 (zero-config) + - PATCH question_text/choice_* 로 'stale' 전이 → 다음 틱에 재계산 + - backfill 별도 명령 없음 — cron 이 'none' 행을 자동 backfill + +임베딩 본문 = `문제: \n보기:\n1. \n2. \n3. \n4. ` +subject/scope 는 의도적으로 제외 (분류명이 의미 검색에 노이즈). + +bge-m3 1024차원, vector_cosine_ops 인덱스 (HNSW partial). +""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime, timezone + +from sqlalchemy import select, update +from sqlalchemy.ext.asyncio import AsyncSession + +from ai.client import AIClient +from core.database import async_session +from models.study_question import StudyQuestion + +logger = logging.getLogger("study_question_embed_worker") + +EMBED_MODEL = "bge-m3" +BATCH_SIZE = 10 # 한 cron 틱에 처리할 최대 행 수 +EMBED_TIMEOUT_S = 15.0 # bge-m3 단일 호출 timeout (Ollama, 보통 < 1s) + + +def _build_embed_text(q: StudyQuestion) -> str: + """문제 본문 + 보기 1~4. subject/scope 제외 (분류명 노이즈 방지).""" + return ( + f"문제: {q.question_text}\n" + f"보기:\n" + f"1. {q.choice_1}\n" + f"2. {q.choice_2}\n" + f"3. {q.choice_3}\n" + f"4. {q.choice_4}" + ) + + +async def _claim_question(session: AsyncSession, qid: int) -> bool: + """status 를 'pending' 으로 race-safe 마킹. 이미 pending 이면 False (다른 cron 인스턴스가 잡았음). + none/failed/stale 상태에서만 lock 획득. + """ + result = await session.execute( + update(StudyQuestion) + .where( + StudyQuestion.id == qid, + StudyQuestion.embedding_status.in_(("none", "failed", "stale")), + ) + .values(embedding_status="pending", updated_at=datetime.now(timezone.utc)) + .returning(StudyQuestion.id) + ) + return result.scalar_one_or_none() is not None + + +async def _process_one(session: AsyncSession, qid: int, client: AIClient) -> bool: + """단일 question 임베딩. 성공 True, 실패 False.""" + if not await _claim_question(session, qid): + # 다른 인스턴스가 이미 잡음 — 스킵 + return False + await session.commit() + + q = await session.get(StudyQuestion, qid) + if q is None or q.deleted_at is not None: + # 삭제됨 — pending 그대로 두지 말고 failed 로 (다음 cron 에서 다시 안 잡힘은 아님 — 어쨌든 정리) + if q is not None: + q.embedding_status = "failed" + await session.commit() + return False + + text = _build_embed_text(q) + try: + async with asyncio.timeout(EMBED_TIMEOUT_S): + vec = await client.embed(text) + except (asyncio.TimeoutError, Exception) as e: + logger.warning("study_q_embed_failed qid=%s err=%s: %s", qid, type(e).__name__, e) + # 실패 — status='failed'. 직전 embedding 보존. + q.embedding_status = "failed" + q.updated_at = datetime.now(timezone.utc) + await session.commit() + return False + + # 성공 + q.embedding = vec + q.embedding_status = "ready" + q.embedding_model = EMBED_MODEL + q.embedding_updated_at = datetime.now(timezone.utc) + q.updated_at = q.embedding_updated_at + await session.commit() + logger.info("study_q_embed_ok qid=%s len=%d", qid, len(vec) if vec else 0) + return True + + +async def run() -> None: + """APScheduler cron 진입점. status in {none, failed, stale} 행을 BATCH_SIZE 만큼 처리.""" + async with async_session() as session: + rows = ( + await session.execute( + select(StudyQuestion.id) + .where( + StudyQuestion.deleted_at.is_(None), + StudyQuestion.embedding_status.in_(("none", "failed", "stale")), + ) + .order_by(StudyQuestion.updated_at.asc()) + .limit(BATCH_SIZE) + ) + ).scalars().all() + + if not rows: + return + + logger.info("study_q_embed_run candidates=%d", len(rows)) + client = AIClient() + try: + ok_count = 0 + for qid in rows: + if await _process_one(session, qid, client): + ok_count += 1 + logger.info("study_q_embed_run done ok=%d/%d", ok_count, len(rows)) + finally: + await client.close() diff --git a/migrations/193_study_questions_embedding.sql b/migrations/193_study_questions_embedding.sql new file mode 100644 index 0000000..b71b224 --- /dev/null +++ b/migrations/193_study_questions_embedding.sql @@ -0,0 +1,18 @@ +-- 193_study_questions_embedding.sql (1/2) +-- study_questions 자동 임베딩 (PR-4). 문제 본문 + 보기 1~4 → bge-m3 1024차원. +-- +-- embedding_status 권장값 (강한 enum 미사용, status 자체가 큐 역할): +-- none — 신규 입력 (default, 한번도 임베딩 안 됨) +-- pending — 처리 진행 중 (race-safe 조건부 UPDATE 로 보호) +-- ready — 완료 +-- failed — 실패 (재시도 가능). 직전 embedding 보존. +-- stale — question_text/choice_1~4 변경되어 outdated. cron 이 다음 틱에 재계산. +-- +-- 재계산 트리거: question_text / choice_1~4 변경. correct_choice·explanation·subject·scope 변경은 재계산 안 함. +-- 별도 큐 테이블 미신설 — embedding_status 가 큐 역할 (cron polling). ProcessingQueue 인프라 영향 없음. + +ALTER TABLE study_questions + ADD COLUMN IF NOT EXISTS embedding vector(1024), + ADD COLUMN IF NOT EXISTS embedding_status VARCHAR(20) NOT NULL DEFAULT 'none', + ADD COLUMN IF NOT EXISTS embedding_updated_at TIMESTAMPTZ, + ADD COLUMN IF NOT EXISTS embedding_model VARCHAR(120); diff --git a/migrations/194_study_questions_embedding_idx.sql b/migrations/194_study_questions_embedding_idx.sql new file mode 100644 index 0000000..400db97 --- /dev/null +++ b/migrations/194_study_questions_embedding_idx.sql @@ -0,0 +1,11 @@ +-- 194_study_questions_embedding_idx.sql (2/2) +-- HNSW partial index — bge-m3 cosine 기준 (documents 인덱스가 vector_cosine_ops 와 일관). +-- partial: 삭제·미생성 행 제외해서 인덱스 부피 절약. +-- +-- documents 는 ivfflat 사용했지만 study_questions 는 데이터 규모가 작고 검색 빈도가 낮아 +-- HNSW recall 우위·튜닝 단순함이 더 큼. 향후 데이터 폭증 시 ivfflat 으로 변경 검토. + +CREATE INDEX IF NOT EXISTS idx_study_questions_embedding_hnsw + ON study_questions + USING hnsw (embedding vector_cosine_ops) + WHERE deleted_at IS NULL AND embedding IS NOT NULL;