"""study_weakness — 이드 학습 약점 derived 스냅샷 워커 (LLM 0, SQL 집계). W3-2. study overlay(study.txt)가 요구하는 {weakness_snapshot_block}/{habit_signal_block} 의 source. 약점/태도 '판정'은 코드(SQL 집계 + bounded tier)가 한다 — LLM 은 번역만(study_diagnosis 표면). 주 집계면 = study_question_progress.pattern_state (learning_pattern.py 가 precompute 한 라벨): chronic_wrong = 최근 3 풀이 중 wrong>=2 / regressed = 회복 후 재오답 / unsure = 최신 '모르겠음'. coverage 공백 = study_questions LEFT JOIN progress(미답) anti-join. overdue = due_at<=now & stage<4. append-only: eid_study_weakness 에 매 run 새 스냅샷 INSERT (스탬프 actor='eid'+source_generated_at). '현재' = 최신 active 행. UPDATE/DELETE 는 DB RULE 차단. CronTrigger nightly(main.py). 임계는 튜닝 설정(hard gate 아님). conservative = 판정 줄이는 쪽(표본 미달이면 watch 상한). 판정/포맷 순수 함수 = services/study/weakness_compute.py (worker·surface 공용). """ from __future__ import annotations import logging from collections import defaultdict from datetime import datetime, timezone from sqlalchemy import and_, exists, func, or_, select from core.database import async_session from models.eid_review_set_draft import EidReviewSetDraft from models.eid_study_weakness import EidStudyWeakness from models.study_question import StudyQuestion from models.study_question_progress import StudyQuestionProgress from models.study_quiz_session import StudyQuizSession from models.study_topic import StudyTopic from models.user import User # noqa: F401 (mapper 초기화 defensive) from services.study.weakness_compute import decide_tier, overall_trend, topic_trend logger = logging.getLogger("study_weakness") # ── 튜닝 임계 (hard gate 아님 · conservative=판정 줄이는 쪽). 단일 관리처. ── MIN_TOPIC_ATTEMPTS = 5 # 표본 미달 → 약점 단정 X (watch 상한 / '지켜볼 토픽') CHRONIC_FOCUS = 3 # chronic >= → focus tier RELAPSE_FOCUS = 2 # relapsed >= → focus tier REVIEW_OVERDUE = 5 # overdue >= → review tier (단독) RECENT_SESSIONS = 5 # 추세 판정 윈도우 ABANDON_WINDOW = 20 # 세션 중단율 최근 N DRAFT_CAP = 50 # 복습세트 초안 문항 상한 async def _pattern_counts(session, user_id: int, topic_id: int) -> dict[str, int]: rows = ( await session.execute( select(StudyQuestionProgress.pattern_state, func.count()) .where( StudyQuestionProgress.user_id == user_id, StudyQuestionProgress.study_topic_id == topic_id, ) .group_by(StudyQuestionProgress.pattern_state) ) ).all() return {(ps or "none"): n for ps, n in rows} async def _overdue_count(session, user_id: int, topic_id: int, now: datetime) -> int: return ( await session.execute( select(func.count()) .select_from(StudyQuestionProgress) .where( StudyQuestionProgress.user_id == user_id, StudyQuestionProgress.study_topic_id == topic_id, StudyQuestionProgress.due_at.is_not(None), StudyQuestionProgress.due_at <= now, or_( StudyQuestionProgress.review_stage.is_(None), StudyQuestionProgress.review_stage < 4, ), ) ) ).scalar_one() async def _coverage_gap(session, user_id: int, topic_id: int) -> int: """active 문항 중 이 user 가 한 번도 안 푼 수 = anti-join(docstring 계약). total_active - attempted 차감 X — soft-delete/inactive 문항의 progress 가 남아(RESTRICT FK) attempted 를 부풀려 gap 을 과소집계하던 문제 회피(W3 review #2). """ return ( await session.execute( select(func.count()) .select_from(StudyQuestion) .where( StudyQuestion.study_topic_id == topic_id, StudyQuestion.is_active.is_(True), StudyQuestion.deleted_at.is_(None), ~exists().where( and_( StudyQuestionProgress.study_question_id == StudyQuestion.id, StudyQuestionProgress.user_id == user_id, ) ), ) ) ).scalar_one() async def _recent_sessions(session, user_id: int, topic_id: int) -> list[dict]: rows = ( await session.execute( select( StudyQuizSession.newly_correct_count, StudyQuizSession.relapsed_count, StudyQuizSession.chronic_remaining_count, ) .where( StudyQuizSession.user_id == user_id, StudyQuizSession.study_topic_id == topic_id, StudyQuizSession.status == "done", ) .order_by(StudyQuizSession.created_at.desc()) .limit(RECENT_SESSIONS) ) ).all() return [{"newly_correct": nc, "relapsed": rl, "chronic_remaining": cr} for nc, rl, cr in rows] async def _draft_question_ids(session, user_id: int, topic_id: int) -> list[int]: rows = ( await session.execute( select(StudyQuestionProgress.study_question_id) .where( StudyQuestionProgress.user_id == user_id, StudyQuestionProgress.study_topic_id == topic_id, StudyQuestionProgress.pattern_state.in_(["chronic_wrong", "regressed"]), ) ) ).scalars().all() return [int(q) for q in rows] async def _abandon_rate(session, user_id: int) -> float: rows = ( await session.execute( select(StudyQuizSession.status) .where(StudyQuizSession.user_id == user_id) .order_by(StudyQuizSession.created_at.desc()) .limit(ABANDON_WINDOW) ) ).scalars().all() if not rows: return 0.0 return rows.count("abandoned") / len(rows) def _draft_reason(chronic: int, relapsed: int) -> str: """초안 사유를 기여 pattern 에서 derive (하드코딩 X — W3 review #3).""" if relapsed and not chronic: return "relapse" if chronic and not relapsed: return "chronic" return "mixed" async def run() -> None: """APScheduler cron 진입점. 공부중 토픽 약점 derived 스냅샷 → eid_study_weakness append.""" now = datetime.now(timezone.utc) async with async_session() as session: topics = ( await session.execute( select(StudyTopic.id, StudyTopic.user_id, StudyTopic.name).where( StudyTopic.focused_at.is_not(None), StudyTopic.deleted_at.is_(None), ) ) ).all() if not topics: return by_user: dict[int, list] = defaultdict(list) for t in topics: by_user[t.user_id].append(t) inserted = 0 for uid, topic_list in by_user.items(): weaknesses: list[dict] = [] topic_trends: list[str] = [] unsure_topics: list[tuple[str, int]] = [] attempts_by_topic: dict[str, int] = {} draft_qids: list[int] = [] draft_chronic = 0 draft_relapsed = 0 total_attempted = 0 total_overdue = 0 for t in topic_list: counts = await _pattern_counts(session, uid, t.id) attempted = sum(counts.values()) # progress 행 수 = 풀어본 문항 수 chronic = counts.get("chronic_wrong", 0) relapsed = counts.get("regressed", 0) unsure = counts.get("unsure", 0) overdue = await _overdue_count(session, uid, t.id, now) coverage_gap = await _coverage_gap(session, uid, t.id) trend = topic_trend(await _recent_sessions(session, uid, t.id)) total_attempted += attempted total_overdue += overdue attempts_by_topic[t.name] = attempted if unsure: unsure_topics.append((t.name, unsure)) tier = decide_tier( chronic=chronic, relapsed=relapsed, overdue=overdue, unsure=unsure, attempted=attempted, min_attempts=MIN_TOPIC_ATTEMPTS, chronic_focus=CHRONIC_FOCUS, relapse_focus=RELAPSE_FOCUS, review_overdue=REVIEW_OVERDUE, ) if tier is None: continue topic_trends.append(trend) weaknesses.append({ "topic_id": t.id, "topic": t.name, "chronic": chronic, "relapsed": relapsed, "unsure": unsure, "coverage_gap": coverage_gap, "overdue": overdue, "trend": trend, "tier": tier, }) if tier in ("focus", "review"): draft_qids.extend(await _draft_question_ids(session, uid, t.id)) draft_chronic += chronic draft_relapsed += relapsed # 약점 강도순 정렬 (focus > review > watch, 그 안에서 chronic 많은 순) _rank = {"focus": 0, "review": 1, "watch": 2} weaknesses.sort(key=lambda w: (_rank.get(w["tier"], 9), -w["chronic"], -w["relapsed"])) # 태도 신호 (user-level) unsure_topics.sort(key=lambda x: -x[1]) skew_topic = None if attempts_by_topic: top_name, top_n = max(attempts_by_topic.items(), key=lambda x: x[1]) total_attempts_all = sum(attempts_by_topic.values()) or 1 if top_n >= MIN_TOPIC_ATTEMPTS and top_n >= 0.7 * total_attempts_all: skew_topic = top_name habits = { "avoidance_topics": [n for n, _ in unsure_topics[:3]], "session_abandon_rate": await _abandon_rate(session, uid), "stale_due_count": total_overdue, "skew_topic": skew_topic, } shallow = total_attempted < MIN_TOPIC_ATTEMPTS weakness = EidStudyWeakness( user_id=uid, weaknesses=weaknesses, habit_signals=habits, trend_label=overall_trend(topic_trends), sample_attempts=total_attempted, is_shallow_sample=shallow, status="active", actor="eid", source_generated_at=now, ) session.add(weakness) await session.flush() # weakness.id 확보(draft 바인딩용). commit 은 끝에 1회(append-only). if draft_qids: seen: set[int] = set() uniq = [q for q in draft_qids if not (q in seen or seen.add(q))] session.add(EidReviewSetDraft( user_id=uid, study_topic_id=None, question_ids=uniq[:DRAFT_CAP], reason=_draft_reason(draft_chronic, draft_relapsed), actor="eid", source_weakness_id=weakness.id, # 스냅샷 바인딩(W3 review #5/#6) source_generated_at=now, )) inserted += 1 await session.commit() if inserted: logger.info("study_weakness snapshot users=%d at=%s", inserted, now.isoformat())