Files
hyungi_document_server/app/workers/study_explanation_worker.py
T
Hyungi Ahn ff41feb3e3 fix(study): Phase 4-A parse_fail 디버깅 — 파서 fallback + raw 저장
운영 데이터에서 4-A study_question_jobs 의 33/114 가 'envelope JSON parse failed'
로 종결. parse_json_response 의 balanced 정규식이 못 잡는 케이스 다수 추정.

원인 분류 위해:
1. 파서 보강 (app/ai/client.py)
   - 기존 4단계 파싱 (fenced / balanced finditer / 전체 cleaned) 보존
   - 5단계 fallback 추가: first '{' ~ last '}' greedy slice → json.loads
   - envelope JSON 안에 내부 따옴표/뉴라인/escape 때문에 balanced 가 못 잡는
     케이스 방어. 모델이 JSON 앞뒤 자유 텍스트 섞어도 본체만 추출.
   - 회귀 위험 낮은 추가만 (앞 단계 성공 시 즉시 반환)

2. parse_fail 시 raw preview 저장 (study_explanation_worker)
   - 3개 inline parse_fail 분기 (not_dict / invalid_answer_choice /
     empty_explanation_md) 모두 _save_raw_preview() 헬퍼 호출
   - job.payload.debug_raw_preview = raw_text[:1000]
   - job.payload.parse_fail_reason = 분류 키
   - 향후 parse_fail row 의 payload 분석으로 원인 정확히 분류 가능

다음 단계: 배포 후 재발생 추이 + raw preview 분석 → prompt 추가 강화 또는
parser 추가 보강.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-02 07:48:10 +09:00

228 lines
9.0 KiB
Python

"""Phase 4-A 풀이 prefetch worker — wrong/unsure 문제의 AI 풀이를 batch 로 미리 생성.
Plan: ~/.claude/plans/nifty-sparking-spindle.md
study_question_jobs (kind='explanation') row 1건을 받아 처리:
1. RAG 근거 수집 (PR-3 의 explanation_rag.py 재사용)
2. evidence 둘 다 비어있으면 LLM 호출 X → status='skipped'
3. MLX primary 호출 (gate Semaphore(1) 공유) → envelope JSON
4. 환각 가드 — answer_choice == question.correct_choice 검증
5. 통과 시 study_questions.ai_explanation 캐시 박기
terminal status (completed/failed/skipped) 는 completed_at 항상 기록.
재시도 정책 — guard_fail/evidence_missing 은 final, 그 외 (llm_timeout/parse_fail/unknown)
는 attempts < max_attempts 면 pending 으로 복귀.
"""
from __future__ import annotations
import asyncio
import json
import logging
from datetime import datetime, timezone
from pathlib import Path
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient, parse_json_response
from models.study_question import StudyQuestion
from models.study_question_job import StudyQuestionJob
from services.search.llm_gate import get_mlx_gate
from services.study.explanation_rag import (
gather_explanation_context,
render_evidence_block,
)
logger = logging.getLogger(__name__)
# PR-3 LLM_TIMEOUT_S 와 동일 안전 마진 (26B 평균 ~10s, gate 직렬화 고려)
LLM_TIMEOUT_S = 30.0
_ENVELOPE_PROMPT_FILE = "study_explanation_envelope.txt"
_envelope_template_cache: str | None = None
def _load_envelope_prompt() -> str:
global _envelope_template_cache
if _envelope_template_cache is None:
prompts_dir = Path(__file__).resolve().parent.parent / "prompts"
_envelope_template_cache = (
prompts_dir / _ENVELOPE_PROMPT_FILE
).read_text(encoding="utf-8")
return _envelope_template_cache
def _render_envelope_prompt(q: StudyQuestion, doc_block: str, q_block: str) -> str:
return (
_load_envelope_prompt()
.replace("{question_text}", q.question_text or "")
.replace("{choice_1}", q.choice_1 or "")
.replace("{choice_2}", q.choice_2 or "")
.replace("{choice_3}", q.choice_3 or "")
.replace("{choice_4}", q.choice_4 or "")
.replace("{correct_choice}", str(q.correct_choice))
.replace("{documents_evidence_block}", doc_block)
.replace("{questions_evidence_block}", q_block)
)
async def run_explanation_job(session: AsyncSession, job: StudyQuestionJob) -> None:
"""Phase 4-A: study_question_jobs row 1건 처리. caller 가 commit 책임.
job.status 는 호출 전 'pending' 가정. 종료 시 completed/failed/skipped/pending(재시도)
중 하나.
"""
now = lambda: datetime.now(timezone.utc) # noqa: E731
# attempt + processing 단정
job.attempts += 1
job.status = "processing"
job.started_at = now()
await session.flush()
try:
question = await session.get(StudyQuestion, job.study_question_id)
if question is None or question.deleted_at is not None:
# 삭제된 문제 — job 도 skipped 로 종결.
job.error_code = "evidence_missing"
job.error_message = "question deleted or missing"
job.status = "skipped"
job.completed_at = now()
return
# race-safe — PR-3 실시간 호출이 이미 ready 박았으면 즉시 종결.
if question.ai_explanation_status == "ready":
job.status = "completed"
job.completed_at = now()
return
# 1. RAG 근거 수집
ctx = await gather_explanation_context(session, job.user_id, question)
if not ctx.documents and not ctx.questions:
# evidence 없음 — LLM 호출 X. job/question 둘 다 skipped 통일.
job.error_code = "evidence_missing"
job.error_message = "no document/question evidence in this topic"
job.status = "skipped"
job.completed_at = now()
question.ai_explanation_status = "skipped"
question.updated_at = now()
return
# 2. 프롬프트 + MLX primary
doc_block = render_evidence_block(ctx.documents)
q_block = render_evidence_block(ctx.questions)
prompt = _render_envelope_prompt(question, doc_block, q_block)
ai_client = AIClient()
try:
async with get_mlx_gate():
async with asyncio.timeout(LLM_TIMEOUT_S):
raw_text = await ai_client.call_primary(prompt)
primary_name = (
ai_client.ai.primary.model
if hasattr(ai_client.ai, "primary") and hasattr(ai_client.ai.primary, "model")
else "primary"
)
finally:
await ai_client.close()
if not raw_text or not raw_text.strip():
# 빈 응답도 timeout 류로 처리 — 재시도 후보.
job.error_code = "llm_timeout"
job.error_message = "empty response from primary"
return
# 3. envelope 파싱.
# parse_fail 시 raw 응답 첫 1000자를 payload.debug_raw_preview 에 저장 — 운영 분석.
# parse_json_response 가 None 또는 dict 아닌 경우 모두 분류.
def _save_raw_preview(reason: str) -> None:
preview = (raw_text or "")[:1000]
existing = dict(job.payload or {})
existing["debug_raw_preview"] = preview
existing["parse_fail_reason"] = reason
job.payload = existing
envelope = parse_json_response(raw_text)
if envelope is None or not isinstance(envelope, dict):
job.error_code = "parse_fail"
job.error_message = "envelope JSON parse failed"
_save_raw_preview("not_dict")
return
answer_choice = envelope.get("answer_choice")
explanation_md = envelope.get("explanation_md") or ""
confidence = envelope.get("confidence")
if not isinstance(answer_choice, int) or answer_choice not in (1, 2, 3, 4):
job.error_code = "parse_fail"
job.error_message = f"invalid answer_choice: {answer_choice!r}"
_save_raw_preview("invalid_answer_choice")
return
if not explanation_md.strip():
job.error_code = "parse_fail"
job.error_message = "empty explanation_md"
_save_raw_preview("empty_explanation_md")
return
# 4. 환각 가드 — 정답 번호 일치
if answer_choice != question.correct_choice:
job.error_code = "guard_fail"
job.error_message = (
f"answer_choice={answer_choice} != correct_choice={question.correct_choice}"
)
job.status = "failed"
job.completed_at = now()
question.ai_explanation_status = "failed"
question.updated_at = now()
return
# 5. 성공 — confidence 는 1차 통과 (Phase 4-B 임계 결정).
# 운영 분석 자산으로 payload 에 confidence 보존.
job_payload = dict(job.payload or {})
job_payload["confidence"] = confidence
job.payload = job_payload
question.ai_explanation = explanation_md
question.ai_explanation_status = "ready"
question.ai_explanation_generated_at = now()
question.ai_explanation_model = f"mlx:{primary_name}"
question.updated_at = question.ai_explanation_generated_at
job.status = "completed"
job.completed_at = now()
return
except (asyncio.TimeoutError, httpx.HTTPError) as e:
job.error_code = "llm_timeout"
job.error_message = f"{type(e).__name__}: {e}"
logger.warning(
"study_explanation_job_timeout job_id=%s qid=%s: %s",
job.id, job.study_question_id, e,
)
except (json.JSONDecodeError, ValueError) as e:
job.error_code = "parse_fail"
job.error_message = f"{type(e).__name__}: {e}"
logger.warning(
"study_explanation_job_parse_fail job_id=%s qid=%s: %s",
job.id, job.study_question_id, e,
)
except Exception as e:
# 예상 못한 예외 — error_code 미세팅 시 finally 가 None 을 retryable 로 보면 무한 루프.
# 명시적으로 'unknown' 박아 재시도 정책 안에 들어가게.
job.error_code = "unknown"
job.error_message = f"{type(e).__name__}: {e}"
logger.exception(
"study_explanation_job_unknown_fail job_id=%s qid=%s",
job.id, job.study_question_id,
)
finally:
# 재시도 분기 — guard_fail/evidence_missing 은 위 try 에서 이미 단정 종결.
# 여기 도달 케이스는 llm_timeout / parse_fail / unknown.
if job.status == "processing":
retryable = job.error_code in ("llm_timeout", "parse_fail", "unknown")
if retryable and job.attempts < job.max_attempts:
job.status = "pending" # 다음 cycle 재시도
else:
job.status = "failed"
job.completed_at = now()