diff --git a/app/api/study_questions.py b/app/api/study_questions.py index b2eaf88..fb5486e 100644 --- a/app/api/study_questions.py +++ b/app/api/study_questions.py @@ -802,6 +802,13 @@ async def update_question( if EMBED_STALE_TRIGGER & fields_set and q.embedding_status == "ready": q.embedding_status = "stale" + # PR-12-A 후속: related 캐시 stale 전이. + # 임베딩 본문 변경 (위 EMBED_STALE_TRIGGER) + exam_round 변경 시. + # exam_round 는 related-types 의 회차 필터 조건이라 본인 round 바뀌면 candidate 재선정 필요. + RELATED_STALE_TRIGGER = EMBED_STALE_TRIGGER | {"exam_round"} + if RELATED_STALE_TRIGGER & fields_set and q.related_computed_at is not None: + q.related_computed_at = None + q.updated_at = datetime.now(timezone.utc) await session.commit() @@ -840,11 +847,26 @@ async def soft_delete_question( session: Annotated[AsyncSession, Depends(get_session)], ): """soft delete only. attempts 는 RESTRICT FK 로 보호되어 영구 보존. - hard delete 호출 경로 자체가 없음 — DB 레벨에서도 attempts 가 있으면 거부.""" + hard delete 호출 경로 자체가 없음 — DB 레벨에서도 attempts 가 있으면 거부. + + PR-12-A 후속: 같은 토픽의 다른 문제 related 캐시에 이 qid 가 candidate 로 + 남아있을 수 있음. 같은 토픽 ready 행들의 related_computed_at 을 NULL 마킹 → cron 재계산. + """ q = await session.get(StudyQuestion, question_id) q = _verify_question_ownership(q, user) q.deleted_at = datetime.now(timezone.utc) q.updated_at = q.deleted_at + await session.execute( + update(StudyQuestion) + .where( + StudyQuestion.study_topic_id == q.study_topic_id, + StudyQuestion.id != q.id, + StudyQuestion.embedding_status == "ready", + StudyQuestion.deleted_at.is_(None), + StudyQuestion.related_computed_at.is_not(None), + ) + .values(related_computed_at=None) + ) await session.commit() @@ -1141,8 +1163,17 @@ async def list_related_types( - 유사 유형 = 같은 개념·풀이 패턴이 다른 회차에 등장 (개념 복습 가치) 공통 service 함수 사용 — bulk endpoint 와 분류 로직 공유 (drift 회피). + + PR-12-A 후속: study_questions.related_* 컬럼 캐시 우선 조회. cache hit (computed_at IS NOT NULL + + threshold version 일치) 면 HNSW 검색 생략. miss 면 즉시 계산 + 캐시 저장 (cron 못 따라온 안전망). """ - from services.study.related_types import classify_related_for_question + from datetime import datetime, timezone + from services.study.related_types import ( + THRESHOLD_VERSION, + classify_related_for_question, + deserialize_candidates, + serialize_candidates, + ) src = await session.get(StudyQuestion, question_id) src = _verify_question_ownership(src, user) @@ -1164,38 +1195,58 @@ async def list_related_types( similar_round_count=0, ) + def _candidates_to_items(cands): + return [ + RelatedQuestionItem( + id=c.id, + study_topic_id=c.study_topic_id, + question_text=c.question_text, + subject=c.subject, + scope=c.scope, + exam_round=c.exam_round, + exam_question_number=c.exam_question_number, + similarity=round(c.similarity, 4), + ) + for c in cands + ] + + # cache hit 조건: computed_at 존재 + 임계값 fingerprint 일치 + if ( + src.related_computed_at is not None + and src.related_threshold_version == THRESHOLD_VERSION + ): + repeat_cands = deserialize_candidates(src.related_repeat, src.study_topic_id) + similar_cands = deserialize_candidates(src.related_similar, src.study_topic_id) + return RelatedTypesResponse( + source_id=question_id, + source_status="ready", + source_exam_round=src_round_or_none, + repeat_questions=_candidates_to_items(repeat_cands), + similar_questions=_candidates_to_items(similar_cands), + repeat_related_count=len(repeat_cands), + repeat_round_count=src.related_repeat_round_count or 0, + repeat_grade=src.related_repeat_grade, + similar_related_count=len(similar_cands), + similar_round_count=src.related_similar_round_count or 0, + ) + + # cache miss → 즉시 계산 + 저장 (cron 다음 틱 기다리지 않음, 빈 응답 회피) cls = await classify_related_for_question(session, user_id=user.id, source=src) + src.related_repeat = serialize_candidates(cls.repeat) + src.related_similar = serialize_candidates(cls.similar) + src.related_repeat_round_count = cls.repeat_round_count + src.related_similar_round_count = cls.similar_round_count + src.related_repeat_grade = cls.repeat_grade + src.related_computed_at = datetime.now(timezone.utc) + src.related_threshold_version = THRESHOLD_VERSION + await session.commit() return RelatedTypesResponse( source_id=question_id, source_status="ready", source_exam_round=src_round_or_none, - repeat_questions=[ - RelatedQuestionItem( - id=c.id, - study_topic_id=c.study_topic_id, - question_text=c.question_text, - subject=c.subject, - scope=c.scope, - exam_round=c.exam_round, - exam_question_number=c.exam_question_number, - similarity=round(c.similarity, 4), - ) - for c in cls.repeat - ], - similar_questions=[ - RelatedQuestionItem( - id=c.id, - study_topic_id=c.study_topic_id, - question_text=c.question_text, - subject=c.subject, - scope=c.scope, - exam_round=c.exam_round, - exam_question_number=c.exam_question_number, - similarity=round(c.similarity, 4), - ) - for c in cls.similar - ], + repeat_questions=_candidates_to_items(cls.repeat), + similar_questions=_candidates_to_items(cls.similar), repeat_related_count=cls.repeat_related_count, repeat_round_count=cls.repeat_round_count, repeat_grade=cls.repeat_grade, diff --git a/app/main.py b/app/main.py index 1864b58..60d4638 100644 --- a/app/main.py +++ b/app/main.py @@ -43,7 +43,10 @@ async def lifespan(app: FastAPI): from workers.mailplus_archive import run as mailplus_run from workers.news_collector import run as news_collector_run from workers.queue_consumer import consume_queue - from workers.study_question_embed_worker import run as study_q_embed_run + from workers.study_question_embed_worker import ( + refresh_stale_related as study_q_related_refresh, + run as study_q_embed_run, + ) from workers.tier_backfill import run as tier_backfill_run from workers.upload_cleanup import cleanup_orphan_uploads @@ -68,6 +71,9 @@ async def lifespan(app: FastAPI): # PR-4: study_questions 자동 임베딩 (status='none/failed/stale' 행을 batch=10 처리). # 별도 큐 테이블 없이 status 자체가 큐. backfill 도 cron 이 'none' 행을 자연스럽게 처리. scheduler.add_job(study_q_embed_run, "interval", minutes=1, id="study_q_embed") + # PR-12-A 후속: related-types 캐시 stale 행 재계산. 임베딩 워커와 분리한 별도 cron. + # 새 문제 ready / 같은 토픽 invalidation / 임계값 변경 시 NULL 마킹된 행을 batch=20 처리. + scheduler.add_job(study_q_related_refresh, "interval", minutes=1, id="study_q_related_refresh") # PR-B 레거시 tier 백필 — 30분 주기로 호출되지만 KST 00:00~06:00 시간대만 실제 enqueue. # safety > law > manual 우선순위로 25건씩. 6720 레거시 → 야간당 ~150건 → 약 45일 소화. scheduler.add_job(tier_backfill_run, "interval", minutes=30, id="tier_backfill") diff --git a/app/models/study_question.py b/app/models/study_question.py index c81f5ed..afb7f56 100644 --- a/app/models/study_question.py +++ b/app/models/study_question.py @@ -11,6 +11,7 @@ from datetime import datetime from pgvector.sqlalchemy import Vector from sqlalchemy import BigInteger, Boolean, DateTime, ForeignKey, Integer, SmallInteger, String, Text +from sqlalchemy.dialects.postgresql import JSONB from sqlalchemy.orm import Mapped, mapped_column, relationship from core.database import Base @@ -69,6 +70,16 @@ class StudyQuestion(Base): ) embedding_model: Mapped[str | None] = mapped_column(String(120)) + # PR-12-A 후속: related-types 영속 캐시. 임베딩 ready 워커가 채우고, + # 같은 토픽 다른 문제 ready 시 related_computed_at=NULL 마킹 → 다음 cron 재계산. + related_repeat: Mapped[list | None] = mapped_column(JSONB) + related_similar: Mapped[list | None] = mapped_column(JSONB) + related_repeat_round_count: Mapped[int | None] = mapped_column(Integer) + related_similar_round_count: Mapped[int | None] = mapped_column(Integer) + related_repeat_grade: Mapped[str | None] = mapped_column(String(50)) + related_computed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True)) + related_threshold_version: Mapped[str | None] = mapped_column(String(20)) + created_at: Mapped[datetime] = mapped_column( DateTime(timezone=True), default=datetime.now, nullable=False ) diff --git a/app/services/study/related_types.py b/app/services/study/related_types.py index cf8d1b9..4945303 100644 --- a/app/services/study/related_types.py +++ b/app/services/study/related_types.py @@ -32,6 +32,11 @@ REPEAT_K = 10 SIMILAR_K = 8 RELATED_QUERY_LIMIT = 50 # cosine top-K 1차 fetch (회차 필터 + threshold split 후 K cap) +# 임계값 fingerprint — 임계값 변경 시 일괄 캐시 invalidate 키. 상수 갱신 후 +# `UPDATE study_questions SET related_computed_at=NULL WHERE related_threshold_version != '<신>'` +# 한 번 돌리면 cron 이 자동 재계산. +THRESHOLD_VERSION = f"v1_{REPEAT_THRESHOLD}_{SIMILAR_THRESHOLD}" + # PR-12-B: 출제 단계 spacing (한 세션 안 같은 유형 과밀 방지). 회차 무관. SPACING_THRESHOLD = SIMILAR_THRESHOLD # 0.85 PER_TYPE_CAP = 2 # local neighbor cap (transitive cluster 보장 X) @@ -114,6 +119,41 @@ def compute_repeat_grade( return None +def serialize_candidates(cands: list[RelatedCandidate]) -> list[dict]: + """RelatedCandidate → JSONB 저장용 dict. study_topic_id 는 부모 row 와 동일하므로 생략.""" + return [ + { + "id": c.id, + "sim": round(c.similarity, 4), + "exam_round": c.exam_round, + "exam_question_number": c.exam_question_number, + "subject": c.subject, + "scope": c.scope, + "qtext": c.question_text, # 이미 80자 truncate 된 상태 + } + for c in cands + ] + + +def deserialize_candidates(rows: list[dict] | None, study_topic_id: int) -> list[RelatedCandidate]: + """JSONB → RelatedCandidate. NULL 이면 빈 리스트.""" + if not rows: + return [] + return [ + RelatedCandidate( + id=int(r["id"]), + study_topic_id=study_topic_id, + question_text=r.get("qtext") or "", + subject=r.get("subject"), + scope=r.get("scope"), + exam_round=r.get("exam_round"), + exam_question_number=r.get("exam_question_number"), + similarity=float(r.get("sim") or 0.0), + ) + for r in rows + ] + + def _norm_round(s: str | None) -> str: return (s or "").strip() diff --git a/app/workers/study_question_embed_worker.py b/app/workers/study_question_embed_worker.py index 6fba682..18be04c 100644 --- a/app/workers/study_question_embed_worker.py +++ b/app/workers/study_question_embed_worker.py @@ -30,6 +30,11 @@ from sqlalchemy.ext.asyncio import AsyncSession from ai.client import AIClient from core.database import async_session from models.study_question import StudyQuestion +from services.study.related_types import ( + THRESHOLD_VERSION, + classify_related_for_question, + serialize_candidates, +) # 별도 process 진입점(예: docker exec python -c) 에서 워커 단독 실행 시 mapper # 초기화 실패 방지를 위해 relationship + ForeignKey chain 의 모든 참조 모델 import. # 운영 path(main lifespan) 는 라우터 import 로 자동 등록되지만 defensive. @@ -112,9 +117,62 @@ async def _process_one(session: AsyncSession, qid: int, client: AIClient) -> boo q.updated_at = q.embedding_updated_at await session.commit() logger.info("study_q_embed_ok qid=%s len=%d", qid, len(vec) if vec else 0) + + # 임베딩 ready 직후 related-types 캐시 채우고 같은 토픽 다른 문제 invalidate. + # 실패해도 임베딩 자체는 성공이므로 ok 반환 (다음 cron 틱에서 _refresh_related_for 가 처리). + try: + await _fill_related_cache(session, qid) + await _invalidate_topic_related(session, q.study_topic_id, exclude_qid=qid) + await session.commit() + except Exception as e: + logger.warning("study_q_related_fill_failed qid=%s err=%s: %s", qid, type(e).__name__, e) + await session.rollback() return True +async def _fill_related_cache(session: AsyncSession, qid: int) -> None: + """단일 문제의 related 결과를 계산해서 study_questions 캐시 컬럼 7개에 저장.""" + q = await session.get(StudyQuestion, qid) + if q is None or q.deleted_at is not None or q.embedding_status != "ready": + return + src_round = (q.exam_round or "").strip() + if not src_round: + # 회차 미지정 → 빈 캐시. 다음 회차 입력 시 stale 마킹 후 재계산. + q.related_repeat = [] + q.related_similar = [] + q.related_repeat_round_count = 0 + q.related_similar_round_count = 0 + q.related_repeat_grade = None + q.related_computed_at = datetime.now(timezone.utc) + q.related_threshold_version = THRESHOLD_VERSION + return + cls = await classify_related_for_question(session, user_id=q.user_id, source=q) + q.related_repeat = serialize_candidates(cls.repeat) + q.related_similar = serialize_candidates(cls.similar) + q.related_repeat_round_count = cls.repeat_round_count + q.related_similar_round_count = cls.similar_round_count + q.related_repeat_grade = cls.repeat_grade + q.related_computed_at = datetime.now(timezone.utc) + q.related_threshold_version = THRESHOLD_VERSION + + +async def _invalidate_topic_related( + session: AsyncSession, study_topic_id: int, *, exclude_qid: int +) -> None: + """같은 토픽의 다른 ready 문제들의 related 캐시를 stale 마킹.""" + await session.execute( + update(StudyQuestion) + .where( + StudyQuestion.study_topic_id == study_topic_id, + StudyQuestion.id != exclude_qid, + StudyQuestion.embedding_status == "ready", + StudyQuestion.deleted_at.is_(None), + StudyQuestion.related_computed_at.is_not(None), # 이미 stale 인 행 재마킹 안 함 + ) + .values(related_computed_at=None) + ) + + async def run() -> None: """APScheduler cron 진입점. status in {none, failed, stale} 행을 BATCH_SIZE 만큼 처리.""" async with async_session() as session: @@ -143,3 +201,37 @@ async def run() -> None: logger.info("study_q_embed_run done ok=%d/%d", ok_count, len(rows)) finally: await client.close() + + +# ─── related cache 재계산 워커 (PR-12-A 후속) ─── +RELATED_REFRESH_BATCH = 20 # 한 cron 틱에 채울 stale 행 수 + +async def refresh_stale_related() -> None: + """related_computed_at IS NULL 인 ready 행을 재계산. 임베딩 워커와 분리한 별도 cron. + embedding 자체는 안 만지고 related 캐시만 채움 — 외부 호출 없으므로 빠름 (HNSW 검색만).""" + async with async_session() as session: + rows = ( + await session.execute( + select(StudyQuestion.id) + .where( + StudyQuestion.deleted_at.is_(None), + StudyQuestion.embedding_status == "ready", + StudyQuestion.related_computed_at.is_(None), + ) + .order_by(StudyQuestion.updated_at.asc()) + .limit(RELATED_REFRESH_BATCH) + ) + ).scalars().all() + if not rows: + return + logger.info("study_q_related_refresh candidates=%d", len(rows)) + ok = 0 + for qid in rows: + try: + await _fill_related_cache(session, qid) + await session.commit() + ok += 1 + except Exception as e: + logger.warning("study_q_related_refresh_failed qid=%s err=%s: %s", qid, type(e).__name__, e) + await session.rollback() + logger.info("study_q_related_refresh done ok=%d/%d", ok, len(rows)) diff --git a/migrations/220_study_q_related_cache.sql b/migrations/220_study_q_related_cache.sql new file mode 100644 index 0000000..e892643 --- /dev/null +++ b/migrations/220_study_q_related_cache.sql @@ -0,0 +1,22 @@ +-- 220_study_q_related_cache.sql +-- PR-12-A 후속: study_questions 에 related-types 결과 영속 캐시. +-- 매 진입마다 HNSW cosine 검색 반복하던 비용을 1회 계산 + 영속 저장으로 전환. +-- +-- 임베딩 워커가 ready 처리 후 같은 트랜잭션에서 채움. 같은 topic 다른 문제가 +-- ready 가 되면 invalidation 으로 related_computed_at = NULL 마킹 → cron 재계산. +-- +-- 컬럼: +-- related_repeat / related_similar JSONB — [{id, sim, exam_round, exam_question_number, subject, scope, qtext}] +-- related_repeat_round_count / related_similar_round_count INTEGER — 회차 distinct 수 +-- related_repeat_grade VARCHAR(50) — 단골/잘 나오는 반복 출제/반복 출제/신출/빈출 (또는 NULL) +-- related_computed_at TIMESTAMPTZ — NULL = stale, 다음 cron 재계산 대상 +-- related_threshold_version VARCHAR(20) — 'v1_0.95_0.85' 같은 임계값 fingerprint + +ALTER TABLE study_questions + ADD COLUMN IF NOT EXISTS related_repeat JSONB, + ADD COLUMN IF NOT EXISTS related_similar JSONB, + ADD COLUMN IF NOT EXISTS related_repeat_round_count INTEGER, + ADD COLUMN IF NOT EXISTS related_similar_round_count INTEGER, + ADD COLUMN IF NOT EXISTS related_repeat_grade VARCHAR(50), + ADD COLUMN IF NOT EXISTS related_computed_at TIMESTAMPTZ, + ADD COLUMN IF NOT EXISTS related_threshold_version VARCHAR(20); diff --git a/migrations/221_study_q_related_stale_idx.sql b/migrations/221_study_q_related_stale_idx.sql new file mode 100644 index 0000000..95598fd --- /dev/null +++ b/migrations/221_study_q_related_stale_idx.sql @@ -0,0 +1,6 @@ +-- 221_study_q_related_stale_idx.sql +-- 워커가 stale (related_computed_at IS NULL) 행을 토픽별로 빠르게 찾기 위한 partial index. + +CREATE INDEX IF NOT EXISTS idx_study_q_related_stale + ON study_questions (study_topic_id, related_computed_at) + WHERE deleted_at IS NULL AND embedding_status = 'ready' AND related_computed_at IS NULL;