Compare commits

...

18 Commits

Author SHA1 Message Date
hyungi 94b172e314 ops(ci): boot_smoke 스키마 어서션 max_migration 361→378 (현재 마이그 헤드)
지난 감사(361) 이후 마이그가 378(이번 publish_outbox attempts/failed 포함)까지 전진 →
boot_smoke 스키마 게이트의 하드코딩 기대값 갱신. purge/cand/uq 기대는 동일.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 13:30:53 +09:00
hyungi 9357d1592d fix(publish): 마이그 번호 377→378 (멀티세션 prod 377_domain_bucket 충돌 회피)
검수 fix 작업 중 다른 세션이 prod 에 377_domain_bucket(ee3b347)을 선점·배포 →
publish_outbox attempts/failed 마이그를 378 로 리넘버(브랜치를 ee3b347 로 rebase).
모델 주석도 mig378 로 정정. 내 fix 8건은 새 prod 커밋과 파일 무충돌(번호만 조정).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 13:23:16 +09:00
hyungi 832ea72784 fix(publish): backfill 스크립트 after_id 페이징 루프 (overflow 누락 방지)
backfill_publish_* 가 단일 호출(after_id=0, limit=PAGE)이라 PAGE 초과분이 누락(경고만)됐다.
docstring 은 이미 페이지 반복을 명시했으나 스크립트가 미구현. 함수 반환을 (count, last_id)로
바꾸고 3 스크립트를 last_id 기반 while 루프로 전량 처리. PAGE=5000 bounded tx.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 13:22:36 +09:00
hyungi d26b1150d8 fix(workers): presegment/csb 이벤트루프 blocking I/O to_thread 오프로드
- presegment_worker: fitz open/get_toc(동기 blocking, live 스테이지)를 to_thread 로 — 거대/손상
  PDF 파싱이 같은 루프의 1분 consumer + FastAPI 요청을 수백 ms~초 정지시키던 것 해소.
- csb_collector: 50MB PDF write_bytes + read_bytes(해시)를 to_thread 로 (R5 동형).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 13:22:36 +09:00
hyungi dcfed09530 fix(workers): marker 200-malformed json transient 분류 + classify summary 가시성
- marker_worker: resp.json() JSONDecodeError(200 응답 truncated/malformed body)가 catch-all
  except 로 _fail(non-retryable) 되던 것을 별도 except 로 raise → queue retry. transient
  연결흔들림이 영구 failed 로 박히는 것 차단.
- classify_worker: ai_summary fallback 비-deferrable 실패를 warning→error 로 격상. ai_summary
  NULL 완료는 digest/briefing 에서 조용히 제외되므로 운영 추적성 보강(best-effort 강등은 유지).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 13:22:36 +09:00
hyungi 7d882352b8 fix(mineru): 변환/워밍 self-timeout + OOM·행 시 엔진 재워밍 escalate
aio_do_parse 에 자체 타임아웃이 없어 vLLM 행 시 _engine_lock 을 영구 점유 → markdown
변환 전체 마비(컨테이너 재시작 전까지). 클라이언트(marker_worker)는 300s 로 포기하나
서버측 inflight 는 자동 취소 안 됨.

- _run_mineru 를 asyncio.wait_for(convert 600s / warmup 1200s)로 감싸 lock 점유 상한.
- 타임아웃·OOM/CUDA 류 실패 시 _warmup_done 리셋 → 다음 요청 재워밍. 재워밍도 실패하면
  _warmup_error → /ready 503 → healthcheck 재시작으로 escalate(영구 degradation 차단).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 13:22:36 +09:00
hyungi 7a8aced2a9 fix(workers): file_watcher 파일별 세션 격리 (사이클 전체 롤백 방지)
스캔 전체(Web+PKM)가 단일 세션·단일 commit 이라 한 파일 예외(rglob↔stat 사이 삭제로
FileNotFoundError, flush 오류 등)가 watch_inbox 전체를 raise·롤백 → 그 사이클 등록분을
모두 잃거나, 결정적 poison 파일이 매 사이클 같은 지점에서 중단시켜 그 뒤 파일 영구 미등록.

파일별 독립 세션+commit + try/continue 격리 (news_collector/csb_collector 동형).
file_hash 는 세션 밖에서 계산(커넥션 미점유), 무변경 파일은 쓰기/commit 없음.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 13:22:36 +09:00
hyungi d50be9f2e7 fix(publish): ingest_study 동시경합을 already_ingested 로 흡수 (500 회피)
같은 client_session_uuid 동시 POST 2건이 최초 멱등체크를 둘 다 통과 → 둘째가
(client_session_uuid, study_topic_id) uq(mig376) 위반으로 IntegrityError → 미처리 500.
데이터는 안전(원자 1-tx 롤백, SR 이중 advance 없음)이나 비우아한 500이 문제.

변이 구간(quiz_session insert ~ commit)을 try/except IntegrityError 로 감싸 승자 결과
재조회 후 already_ingested 반환. uuid 경합이 아닌 진짜 무결성 오류는 재조회 비어 re-raise.
멱등 응답 빌더 _already_ingested 헬퍼 추출(최초 체크 + 경합 흡수 공용).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 13:22:36 +09:00
hyungi b9f9d88d99 fix(publish): publish_outbox poison row head-of-line block 차단
배치 단일 트랜잭션이라 한 행의 예외가 배치 전체(앞 행 processed_at 포함)를 롤백 →
poison 행이 매 사이클 최저 id 로 재선택되어 후속 발행이 영구 정지. outbox 모델에
재시도/terminal 컬럼이 전무(processing_queue·study_jobs 의 per-item 격리 패턴 미적용).

- mig377: publish_outbox 에 attempts/failed_at 추가
- 워커: 행별 savepoint(begin_nested) 격리 — 예외 시 attempts++, MAX(5) 초과 시
  failed_at 스탬프(terminal) 후 select 제외. 실패 행은 rev 미소모(드문 gap 은 단일
  라이터·커밋순 부여라 viewer since-rev 증분 동기에 무해).

study_publish_enabled=false 기본이라 현재 inert, 발행 활성화(P0-1b) 전 선결.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 13:22:36 +09:00
hyungi d030a2b7b0 fix(deploy): fresh-DB/DR 부팅 — postgres initdb.d 마운트 제거
빈 볼륨 첫 기동 시 postgres 가 migrations/*.sql 을 psql autocommit 으로 실행해
스키마는 만들되 schema_migrations 스탬프를 안 남김 → fastapi init_db 가 documents
존재로 'fresh' 오판해 baseline 로드를 건너뛰고 001 부터 재replay → CREATE TABLE
users(IF NOT EXISTS 없음) 충돌 → DR/신규환경 부팅 크래시.

fresh-boot 을 init_db 의 baseline + migration runner 단일 경로로 일원화.
기존 prod 볼륨은 비어있지 않아 init scripts 미발동 = 무영향. 관련 docs 정정.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 13:22:36 +09:00
hyungi ee3b347fa7 feat(search): add domain_bucket rollup column (migration 377)
ai_domain(반자유 AI 분류, 드리프트)을 검색 스코프용 7버킷으로 결정적 롤업하는
STORED generated column. News 86% 기본제외 + 도메인 스코프 검색의 토대.
축: ai_domain(routing) 롤업 — category(UI) 아님.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 04:16:30 +00:00
hyungi a826872b0d ops(ai): deep 슬롯 제거 — 맥북 night-drain 보류, deep_summary 맥미니 일원화
사용자 결정(2026-06-29 '맥북 야간운행 의미없어'): config deep 슬롯(qwen-macbook) 제거 → deep_summary 가 primary(맥미니) 경로 복귀(config 주석 보증), use_deep/drain 도 맥미니 폴백. drain-keeper(GPU cron) 비활성. 맥북 mlx-vlm-server 는 OpenCode 로컬용 보존. inventory 선행 갱신(Update Rule). 효과: 멈췄던 deep_summary(ai_detail_summary, last id 59773)가 맥미니에서 재개 → 3→2 짧은 ai_summary 의 풀버전 백스톱 복원.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 08:23:23 +09:00
hyungi 4cdd30950c refactor(classify): summarize 콜을 tier triage 에 병합 (3콜→2콜)
B-1 3→2: p3a_short_summary triage 가 ai_summary 도 생산 → 별도 summarize 콜 제거. classify(domain/type)은 분리 유지(shadow probe 결과 결합 시 domain 노이즈 → 안전하게 요약만 병합). 본문 prefill 3회→2회 = Mac mini 부하 절감. >120K long_context·triage 파싱실패 시 summarize fallback 보존. shadow probe(Industrial_Safety 5문서) 검증: triage ai_summary 품질 legacy summarize 동급.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 16:55:12 +09:00
hyungi 495e1c786f refactor(search)!: /ask 고아 service·테스트·프롬프트 정리 (검색 단일화 Phase 2)
/ask 삭제로 0-consumer 된 자산 제거(3-gate 실증): search.py /ask 섹션(Citation/ConfirmedItem/AskDebug/AskResponse 모델 + 헬퍼 + _resolve_eval_identity) + 죽은 import 13개. service 4(classifier/verifier/refusal_gate/grounding_check). AIClient.call_classifier/call_verifier(고아). 프롬프트 2(classifier/verifier.txt). broken test 6. evidence/synthesis 는 공유(documents.py 등)라 유지. 실 pyflakes 클린(이전 세션 pyflakes 미설치로 검증 누락 → 설치 후 실검증).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 14:39:53 +09:00
hyungi 86a71ec4d1 refactor(search)!: /ask 프론트 UI 제거 (검색 단일화 — AI답변=eid /chat)
/documents 인라인 AI카드(askSearch·AskAnswerCard·Sparkles→/ask) + /ask 페이지·컴포넌트(components/ask, AskAnswer/Evidence/Results) + 고아 util(isQuestion)·type(types/ask) 제거. /documents=순수 문서검색, AI답변은 eid /chat 사이드바로 일원화. dangling ref 0(grep).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 12:48:13 +09:00
hyungi b6717c537f refactor(search)!: /ask + /ask/react 엔드포인트 삭제 (검색 단일화 1단계)
검색 단일화 결정(PKM 현황/계획서 2026-06-27): AI 답변을 eid /chat 으로 일원화. /ask(grounding-heavy 3-panel, 사용자 숨김) + /ask/react(eid /chat deep 과 동일 agentic_ask_loop 중복) 엔드포인트 제거. GET /(plain 검색) 유지. py_compile + pyflakes undefined-name 0. 잔여(AskResponse 모델·_resolve_eval_identity·/ask 전용 service)는 Phase 2 dead-code 정리.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 07:47:36 +09:00
hyungi 842ad14930 refactor(search): /ask 핸들러 오케스트레이션을 _run_ask 로 분리 (라우터=deps 해소만)
api 리뷰: ask() 529줄에 검색→evidence/classifier→refusal→synthesis→grounding/verifier→7-tier 재게이트→telemetry 가 인라인. body 를 _run_ask(plain params)로 분리, ask 는 FastAPI deps 해소 후 return await _run_ask(...). body verbatim(동작 무변경), 12 params 전부 전달, 다중 return/background_tasks 보존. py_compile + pyflakes undefined-name 0 으로 충실이동 검증.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 07:25:58 +09:00
hyungi 2fedaa065b fix(study): subject_note_rag 에 licensed_restricted 필터 누락 — 구매자료 분야노트 RAG 누수
explanation_rag 는 restricted_exclude_orm() 으로 licensed_restricted 문서를 제외하는데(B-4, a안 U-2① 단일술어), 복제된 subject_note_rag._gather_document_evidence 는 이 술어를 빠뜨려 구매 자료 verbatim 이 분야노트 RAG 로 샐 수 있었음(services 리뷰 P1 보안 drift). doc_meta 쿼리에 필터 추가 → valid_doc_ids → 청크 쿼리까지 자동 전파(explanation_rag 동일 구조).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 07:10:24 +09:00
44 changed files with 451 additions and 4209 deletions
-17
View File
@@ -260,23 +260,6 @@ class AIClient:
cfg = self.ai.deep or self.ai.primary
return await self._request(cfg, prompt, system=system)
async def call_classifier(self, prompt: str) -> str:
"""answerability classifier (config ai.classifier, Mac mini 26B MLX).
private _request 직접 호출(classifier_service)을 봉인하는 public 진입점. gate 는
caller(classifier_service)가 acquire_mlx_gate 로 관리 — call_primary 와 동일한
caller-managed 계약(여기서 self-gate 하면 caller 와 double-acquire 데드락).
"""
return await self._request(self.ai.classifier, prompt)
async def call_verifier(self, prompt: str) -> str:
"""semantic verifier (config ai.verifier, Mac mini 26B MLX).
private _request 직접 호출(verifier_service)을 봉인. gate 는 caller(verifier_service)
가 관리(caller-managed — self-gate 금지).
"""
return await self._request(self.ai.verifier, prompt)
# ─── Legacy API (classify_worker 교체 시 제거 예정) ───────────────────
async def classify(self, text: str, cfg=None) -> dict:
+100 -75
View File
@@ -23,6 +23,7 @@ from datetime import datetime, timezone
from fastapi import APIRouter, Depends, Header, HTTPException
from pydantic import BaseModel
from sqlalchemy import select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from core.config import settings
@@ -66,6 +67,22 @@ class IngestBody(BaseModel):
attempts: list[IngestAttempt]
def _already_ingested(rows) -> dict:
"""이미 적재된 세션들의 캐시 요약(멱등 응답). 최초 멱등체크 + 동시경합 흡수 양쪽에서 사용."""
return {
"status": "already_ingested",
"sessions": [
{
"topic_id": s.study_topic_id,
"correct": s.correct_count,
"wrong": s.wrong_count,
"unsure": s.unsure_count,
}
for s in rows
],
}
def _parse_answered_at(s: str | None, now: datetime) -> datetime:
if not s:
return now
@@ -98,18 +115,7 @@ async def ingest_attempts(
)
).scalars().all()
if existing:
return {
"status": "already_ingested",
"sessions": [
{
"topic_id": s.study_topic_id,
"correct": s.correct_count,
"wrong": s.wrong_count,
"unsure": s.unsure_count,
}
for s in existing
],
}
return _already_ingested(existing)
# pub_id → source_id(내부 질문 id) 해소. deleted tombstone 제외.
pub_ids = list({a.question_pub_id for a in body.attempts})
@@ -156,73 +162,92 @@ async def ingest_attempts(
if not by_topic:
raise HTTPException(status_code=404, detail="해소된 attempt 없음")
summaries = []
for topic_id, items in by_topic.items():
qids = [q.id for (_, q) in items]
qs = StudyQuizSession(
user_id=user_id,
study_topic_id=topic_id,
question_ids=qids,
subject_distribution={},
status="done",
cursor=len(qids),
source="viewer",
client_session_uuid=body.client_session_uuid,
finished_at=now,
created_at=now,
updated_at=now,
)
session.add(qs)
await session.flush() # qs.id
try:
summaries = []
for topic_id, items in by_topic.items():
qids = [q.id for (_, q) in items]
qs = StudyQuizSession(
user_id=user_id,
study_topic_id=topic_id,
question_ids=qids,
subject_distribution={},
status="done",
cursor=len(qids),
source="viewer",
client_session_uuid=body.client_session_uuid,
finished_at=now,
created_at=now,
updated_at=now,
)
session.add(qs)
await session.flush() # qs.id
c = w = u = 0
for a, q in items:
try:
sel, is_corr, outcome = derive_outcome(a.selected_choice, a.is_unsure, q.correct_choice)
except ValueError:
skipped.append(a.question_pub_id) # 선택 없고 unsure 아님 = 무효 → skip
continue
if outcome == "correct":
c += 1
elif outcome == "wrong":
w += 1
elif outcome == "unsure":
u += 1
session.add(
StudyQuestionAttempt(
user_id=user_id,
study_question_id=q.id,
study_topic_id=topic_id,
selected_choice=sel,
correct_choice=q.correct_choice,
is_correct=is_corr,
outcome=outcome,
quiz_session_id=qs.id,
answered_at=_parse_answered_at(a.answered_at, now),
c = w = u = 0
for a, q in items:
try:
sel, is_corr, outcome = derive_outcome(a.selected_choice, a.is_unsure, q.correct_choice)
except ValueError:
skipped.append(a.question_pub_id) # 선택 없고 unsure 아님 = 무효 → skip
continue
if outcome == "correct":
c += 1
elif outcome == "wrong":
w += 1
elif outcome == "unsure":
u += 1
session.add(
StudyQuestionAttempt(
user_id=user_id,
study_question_id=q.id,
study_topic_id=topic_id,
selected_choice=sel,
correct_choice=q.correct_choice,
is_correct=is_corr,
outcome=outcome,
quiz_session_id=qs.id,
answered_at=_parse_answered_at(a.answered_at, now),
)
)
qs.correct_count, qs.wrong_count, qs.unsure_count = c, w, u
await session.flush()
# finalize 무수정 재생(progress/SR/pattern + 4-A/4-B enqueue). 그 후 멱등 마커.
summary = await finalize_session(
session, user_id=user_id, study_topic_id=topic_id, quiz_session_id=qs.id
)
qs.finalized_at = now
summaries.append(
{
"topic_id": topic_id,
"quiz_session_id": qs.id,
"correct": summary.correct,
"wrong": summary.wrong,
"unsure": summary.unsure,
"newly_correct": summary.newly_correct,
"relapsed": summary.relapsed,
"recovered": summary.recovered,
}
)
await session.commit()
except IntegrityError:
# 동시 같은 client_session_uuid 경합 — 상대가 먼저 commit → (client_session_uuid,
# study_topic_id) uq(mig376) 위반. 데이터는 안전(원자 1-tx 전체 롤백 → SR 이중 advance
# 없음). 승자 결과로 graceful 수렴(500 대신 already_ingested). uuid 경합이 아닌 진짜
# 무결성 오류면 재조회가 비어 → re-raise 로 표면화.
await session.rollback()
winner = (
await session.execute(
select(StudyQuizSession).where(
StudyQuizSession.client_session_uuid == body.client_session_uuid
)
)
qs.correct_count, qs.wrong_count, qs.unsure_count = c, w, u
await session.flush()
).scalars().all()
if not winner:
raise
logger.info("study_ingest uuid=%s 동시경합 흡수 → already_ingested", body.client_session_uuid)
return _already_ingested(winner)
# finalize 무수정 재생(progress/SR/pattern + 4-A/4-B enqueue). 그 후 멱등 마커.
summary = await finalize_session(
session, user_id=user_id, study_topic_id=topic_id, quiz_session_id=qs.id
)
qs.finalized_at = now
summaries.append(
{
"topic_id": topic_id,
"quiz_session_id": qs.id,
"correct": summary.correct,
"wrong": summary.wrong,
"unsure": summary.unsure,
"newly_correct": summary.newly_correct,
"relapsed": summary.relapsed,
"recovered": summary.recovered,
}
)
await session.commit()
logger.info(
"study_ingest uuid=%s user=%s sessions=%s skipped=%s",
body.client_session_uuid, user_id, len(summaries), len(skipped),
+4 -851
View File
@@ -3,42 +3,28 @@
실제 검색 파이프라인(retrieval → fusion → rerank → diversity → confidence)
은 `services/search/search_pipeline.py::run_search()` 로 분리되어 있다.
이 파일은 다음만 담당:
- Pydantic 스키마 (SearchResult / SearchResponse / SearchDebug / DebugCandidate
/ Citation / AskResponse / AskDebug)
- Pydantic 스키마 (SearchResult / SearchResponse / SearchDebug / DebugCandidate)
- `/search` endpoint wrapper (run_search 호출 + logger + telemetry + 직렬화)
- `/ask` endpoint wrapper (Phase 3.3 에서 추가)
"""
import asyncio
import hmac
import time
from datetime import date
from typing import Annotated, Literal
from typing import Annotated
from fastapi import APIRouter, BackgroundTasks, Depends, Header, Query
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from fastapi.responses import JSONResponse
from pydantic import BaseModel
from sqlalchemy.ext.asyncio import AsyncSession
from core.auth import get_current_user
from core.config import settings
from core.database import get_session
from core.utils import setup_logger
from models.user import User
from services.document_telemetry import sanitize_source
from services.search.classifier_service import ClassifierResult, classify
from services.search.evidence_service import EvidenceItem, extract_evidence
from services.search.fusion_service import DEFAULT_FUSION
from services.search.grounding_check import check as grounding_check
from services.search.refusal_gate import RefusalDecision, decide as refusal_decide
from services.search import query_rewriter
from services.search.retrieval_service import AxisFilter
from services.search.result_decorate import compute_facets, decorate_version_status
from services.search.search_pipeline import PipelineResult, run_search
from services.search.synthesis_service import SynthesisResult, synthesize
from services.search.verifier_service import VerifierResult, verify
from services.prompt_versions import ASK_PROMPT_VERSION, resolve_primary_model
from services.search_telemetry import record_ask_event, record_search_event
from services.search_telemetry import record_search_event
# logs/search.log + stdout 동시 출력 (Phase 0.4)
logger = setup_logger("search")
@@ -354,836 +340,3 @@ async def search(
debug=debug_obj,
facets=facets_obj,
)
# ═══════════════════════════════════════════════════════════
# Phase 3.3: /api/search/ask — Evidence + Grounded Synthesis
# ═══════════════════════════════════════════════════════════
class Citation(BaseModel):
"""answer 본문의 [n] 에 해당하는 근거 단일 행."""
n: int
chunk_id: int | None
doc_id: int
title: str | None
section_title: str | None
span_text: str # evidence LLM 이 추출한 50~300자
full_snippet: str # 원본 800자 (citation 원문 보기 전용)
relevance: float
rerank_score: float
class ConfirmedItem(BaseModel):
"""Partial answer 의 개별 aspect 답변."""
aspect: str
text: str
citations: list[int]
class AskDebug(BaseModel):
"""`/ask?debug=true` 응답 확장."""
timing_ms: dict[str, float]
search_notes: list[str]
query_analysis: dict | None = None
confidence_signal: float
evidence_candidate_count: int
evidence_kept_count: int
evidence_skip_reason: str | None
synthesis_cache_hit: bool
synthesis_prompt_preview: str | None = None
synthesis_raw_preview: str | None = None
hallucination_flags: list[str] = []
# Phase 3.5a: per-layer defense 로깅
defense_layers: dict | None = None
class AskResponse(BaseModel):
"""`/ask` 응답. Phase 3.5a: completeness + aspects 추가."""
results: list[SearchResult]
ai_answer: str | None
citations: list[Citation]
synthesis_status: Literal[
"completed", "timeout", "skipped", "no_evidence", "parse_failed", "llm_error",
# PR-MacBook-RAG-Backend-1: 200 응답에는 등장하지 않음 (해당 status 는 503 분기).
# Literal 호환성 위해 포함.
"backend_unavailable",
]
synthesis_ms: float
confidence: Literal["high", "medium", "low"] | None
refused: bool
no_results_reason: str | None
query: str
total: int
# Phase 3.5a
completeness: Literal["full", "partial", "insufficient"] = "full"
covered_aspects: list[str] | None = None
missing_aspects: list[str] | None = None
confirmed_items: list[ConfirmedItem] | None = None
# PR-MacBook-RAG-Backend-1: backend dispatcher metadata.
# backend 미지정 호출은 둘 다 None 으로 유지 (기존 호출자 호환 — Hermes docsrv_ask /
# voice-memo-bot 응답 형식 변동 0). 명시 opt-in 시만 채워짐.
backend_requested: str | None = None
backend_used: str | None = None
debug: AskDebug | None = None
def _map_no_results_reason(
pr: PipelineResult,
evidence: list[EvidenceItem],
ev_skip: str | None,
sr: SynthesisResult,
) -> str | None:
"""사용자에게 보여줄 한국어 메시지 매핑.
Failure mode 표 (plan §Failure Modes) 기반.
"""
# LLM 자가 refused → 모델이 준 사유 그대로
if sr.refused and sr.refuse_reason:
return sr.refuse_reason
# synthesis 상태 우선
if sr.status == "no_evidence":
if not pr.results:
return "검색 결과가 없습니다."
return "관련도 높은 근거를 찾지 못했습니다."
if sr.status == "skipped":
return "검색 결과가 없습니다."
if sr.status == "timeout":
return "답변 생성이 지연되어 생략했습니다. 검색 결과를 확인해 주세요."
if sr.status == "parse_failed":
return "답변 형식 오류로 생략했습니다."
if sr.status == "llm_error":
return "AI 서버에 일시적 문제가 있습니다."
# evidence 단계 실패는 fallback 을 탔더라도 notes 용
if ev_skip == "all_low_rerank":
return "관련도 높은 근거를 찾지 못했습니다."
if ev_skip == "empty_retrieval":
return "검색 결과가 없습니다."
return None
def _build_citations(
evidence: list[EvidenceItem], used_citations: list[int]
) -> list[Citation]:
"""answer 본문에 실제로 등장한 n 만 Citation 으로 변환."""
by_n = {e.n: e for e in evidence}
out: list[Citation] = []
for n in used_citations:
e = by_n.get(n)
if e is None:
continue
out.append(
Citation(
n=e.n,
chunk_id=e.chunk_id,
doc_id=e.doc_id,
title=e.title,
section_title=e.section_title,
span_text=e.span_text,
full_snippet=e.full_snippet,
relevance=e.relevance,
rerank_score=e.rerank_score,
)
)
return out
def _build_ask_debug(
pr: PipelineResult,
evidence: list[EvidenceItem],
ev_skip: str | None,
sr: SynthesisResult,
ev_ms: float,
synth_ms: float,
total_ms: float,
) -> AskDebug:
timing: dict[str, float] = dict(pr.timing_ms)
timing["evidence_ms"] = ev_ms
timing["synthesis_ms"] = synth_ms
timing["ask_total_ms"] = total_ms
# candidate count 는 rule filter 통과한 수 (recomputable from results)
# 엄밀히는 evidence_service 내부 숫자인데, evidence 길이 ≈ kept, candidate
# 는 관측이 어려움 → kept 는 evidence 길이, candidate 는 별도 필드 없음.
# 단순화: candidate_count = len(evidence) 를 상한 근사로 둠 (debug 전용).
return AskDebug(
timing_ms=timing,
search_notes=pr.notes,
query_analysis=pr.query_analysis,
confidence_signal=pr.confidence_signal,
evidence_candidate_count=len(evidence),
evidence_kept_count=len(evidence),
evidence_skip_reason=ev_skip,
synthesis_cache_hit=sr.cache_hit,
synthesis_prompt_preview=None, # 현재 synthesis_service 에서 노출 안 함
synthesis_raw_preview=sr.raw_preview,
hallucination_flags=sr.hallucination_flags,
)
def _detect_synthesis_failure(sr: SynthesisResult) -> str | None:
"""Synthesis 가 유효한 답을 못 냈으면 re_gate 라벨, 아니면 None.
판정 우선순위 (Phase 3.5 fix3):
1) sr.refused → LLM self-refuse (status="completed") 또는 mechanical fail 후 refused 전파
- status=="completed" + refused=True → "synthesis_self_refuse"
- 그 외 → f"synthesis_failed({status})"
2) sr.status ∈ {timeout, parse_failed, llm_error} → f"synthesis_failed({status})"
3) answer 공백 → f"synthesis_failed({status})"
4) 유효 → None
"""
if sr.refused:
if sr.status == "completed":
return "synthesis_self_refuse"
return f"synthesis_failed({sr.status})"
if sr.status in ("timeout", "parse_failed", "llm_error"):
return f"synthesis_failed({sr.status})"
if not (sr.answer or "").strip():
return f"synthesis_failed({sr.status})"
return None
def _resolve_eval_identity(
x_source: str | None,
x_eval_case_id: str | None,
x_eval_token: str | None,
) -> tuple[str, str | None]:
"""X-Source/X-Eval-Case-Id 신뢰 검증 (Phase 3.5 fix2).
규칙:
- 기본값: source='document_server', eval_case_id=None
- X-Source=eval 또는 X-Eval-Case-Id 가 들어왔다면 eval claim 으로 간주
- eval claim 은 X-Eval-Token == settings.eval_runner_token 일 때만 수용
(constant-time compare, env 미설정 시 항상 거부)
- 거부 시: 헤더 무시 + warning log + source=sanitize(non-eval) / eval_case_id=None
- 통과 시: source='eval', eval_case_id=x_eval_case_id
반환: (source, eval_case_id)
"""
claimed_source = sanitize_source(x_source)
is_eval_claim = (claimed_source == "eval") or bool(x_eval_case_id)
if not is_eval_claim:
# 일반 호출 — eval_case_id 강제 None (source != 'eval' 이면 case_id 의미 없음)
return claimed_source, None
# eval claim — token 검증
expected = settings.eval_runner_token
presented = x_eval_token or ""
token_valid = bool(expected) and hmac.compare_digest(presented, expected)
if not token_valid:
logger.warning(
"eval header rejected: source=%s case_id=%s token_present=%s expected_set=%s",
x_source, x_eval_case_id, bool(x_eval_token), bool(expected),
)
# 일반 호출로 강등 — source='eval' 주장은 무시, case_id 도 무시
# claimed_source 가 'eval' 이면 default 'document_server' 로
if claimed_source == "eval":
return "document_server", None
return claimed_source, None
# token OK — eval 라벨 수용
return "eval", x_eval_case_id
@router.get("/ask", response_model=AskResponse)
async def ask(
q: str,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
background_tasks: BackgroundTasks,
limit: int = Query(10, ge=1, le=20, description="synthesis 입력 상한"),
debug: bool = Query(False, description="evidence/synthesis 중간 상태 노출"),
backend: Annotated[
str | None,
Query(
pattern="^(qwen-macbook|gemma-macmini|mac-mini-default|claude-cloud|auto)$",
description=(
"PR-2 of DS AI routing policy (2026-05-23) — 명시 backend opt-in via llm-router. "
"미지정 = mac-mini-default (gemma-macmini alias, default). "
"'mac-mini-default' = router 가 tier_b (Mac mini gemma-4-26b). "
"'qwen-macbook' = router 가 named upstream (M5 Max Qwen 3.6 27B). "
"'claude-cloud' = router 가 503 provider_not_configured (활성화 별 PR). "
"'auto' = router 의 rule + LLM triage. "
"backend unavailable 시 503 + error_reason=macbook_unavailable / router_* "
"(자동 fallback 없음 — 다시 호출하거나 backend 인자 제거 후 재시도)."
),
),
] = None,
corpus_variant: str | None = Query(
None,
pattern=r"^(prehier|hier_sim_raw|hier_sim_clean)$",
description=(
"⚠️ EVAL-ONLY (Hier-PassageRAG-Diagnose-1). evidence retrieval 의 chunk leg 를 측정 뷰로 "
"교체 — prehier(legacy) | hier_sim_raw | hier_sim_clean. 운영 UI 미사용. "
"미지정 = production corpus_chunks (기존 /ask 동작 동일)."
),
),
exact_knn: bool = Query(
False,
description=(
"⚠️ EVAL-ONLY (Hier-PassageRAG-Diagnose-1). vector leg exact KNN (ivfflat 근사 제거). "
"passage 변종 공정 비교용. 운영 미사용. 미지정(false) = 기존 /ask 동작 동일."
),
),
x_source: Annotated[str | None, Header(alias="X-Source")] = None,
x_eval_case_id: Annotated[str | None, Header(alias="X-Eval-Case-Id")] = None,
x_eval_token: Annotated[str | None, Header(alias="X-Eval-Token")] = None,
):
"""근거 기반 AI 답변 (Phase 3.5a).
Phase 3.3 기반 + classifier parallel + refusal gate + grounding re-gate.
실패 경로에서도 `results` 는 항상 반환.
Phase 3.5 calibration trust boundary (fix2):
- X-Source / X-Eval-Case-Id 는 X-Eval-Token 이 EVAL_RUNNER_TOKEN 와 일치하는
trusted internal eval runner 에서만 수용된다.
- 일반 client 의 X-Source=eval 시도는 무시되고 source='document_server' 로 강제.
- source != 'eval' 이면 eval_case_id 항상 None.
"""
t_total = time.perf_counter()
defense_log: dict = {} # per-layer flag snapshot
source, eval_case_id = _resolve_eval_identity(x_source, x_eval_case_id, x_eval_token)
# 1. 검색 파이프라인 (corpus_variant/exact_knn = EVAL-ONLY, 미지정 시 기존 동작 동일)
pr = await run_search(
session, q, mode="hybrid", limit=limit,
fusion=DEFAULT_FUSION, rerank=True, analyze=True,
corpus_variant=corpus_variant, exact_knn=exact_knn,
)
# 1.5. ask_includable=false 문서를 evidence 입력에서 제외
# 검색 결과 자체는 유지 (사용자에게 보여줌), evidence만 필터
if pr.results:
from sqlalchemy import select as sa_select
from models.document import Document as DocModel
ask_doc_ids = set()
excluded_ids = {r.id for r in pr.results}
rows = await session.execute(
sa_select(DocModel.id, DocModel.ask_includable).where(
DocModel.id.in_(excluded_ids)
)
)
for doc_id, includable in rows:
if includable is False:
ask_doc_ids.add(doc_id)
evidence_results = [r for r in pr.results if r.id not in ask_doc_ids]
else:
evidence_results = pr.results
# 2. Evidence + Classifier 병렬
t_ev = time.perf_counter()
evidence_task = asyncio.create_task(extract_evidence(q, evidence_results))
# classifier input: top 3 chunks meta + rerank scores
top_chunks = [
{
"title": r.title or "",
"section": r.section_title or "",
"snippet": (r.snippet or "")[:200],
}
for r in pr.results[:3]
]
rerank_scores_top = [
r.rerank_score if r.rerank_score is not None else r.score
for r in pr.results[:3]
]
classifier_task = asyncio.create_task(
classify(q, top_chunks, rerank_scores_top)
)
evidence, ev_skip = await evidence_task
ev_ms = (time.perf_counter() - t_ev) * 1000
# classifier await (timeout 보호 — classifier_service 내부에도 있지만 여기서 이중 보호)
# 2026-05-17: 6s outer wrapper 가 classifier_service.LLM_TIMEOUT_MS (30s) 를 override → 동시 부하 시
# 거의 모든 classifier 호출 timeout → conservative_refuse(no_classifier) 경로. 15s 로 상향 — classifier
# 가 실제 작동하도록 (단, ask 전체 응답 시간 상한 영향: ev_ms + max(classifier_wait, evidence_extract) +
# synth_ms + verifier 누적).
# 2026-05-17 B-3: 15s 도 동시 부하 시 부족 (classifier_service LLM_TIMEOUT_MS 30s 와 misalign).
# 30s 로 align → classifier 동작 안정. ask 응답 latency 상한 ↑ 의도.
try:
classifier_result = await asyncio.wait_for(classifier_task, timeout=30.0)
except asyncio.CancelledError:
raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3)
except Exception:
classifier_result = ClassifierResult("timeout", None, [], [], 0.0)
defense_log["classifier"] = {
"status": classifier_result.status,
"verdict": classifier_result.verdict,
"covered_aspects": classifier_result.covered_aspects,
"missing_aspects": classifier_result.missing_aspects,
"elapsed_ms": classifier_result.elapsed_ms,
}
# 3. Refusal gate (multi-signal fusion)
all_rerank_scores = [
e.rerank_score for e in evidence
] if evidence else rerank_scores_top
decision = refusal_decide(all_rerank_scores, classifier_result)
defense_log["score_gate"] = {
"max": max(all_rerank_scores) if all_rerank_scores else 0.0,
"agg_top3": sum(sorted(all_rerank_scores, reverse=True)[:3]),
}
defense_log["refusal"] = {
"refused": decision.refused,
"rule_triggered": decision.rule_triggered,
}
if decision.refused:
total_ms = (time.perf_counter() - t_total) * 1000
no_reason = "관련 근거를 찾지 못했습니다."
if not pr.results:
no_reason = "검색 결과가 없습니다."
logger.info(
"ask REFUSED query=%r rule=%s max_score=%.2f total=%.0f",
q[:80], decision.rule_triggered,
max(all_rerank_scores) if all_rerank_scores else 0.0, total_ms,
)
# telemetry — search + ask_events 두 경로 동시
background_tasks.add_task(
record_search_event, q, user.id, pr.results, "hybrid",
pr.confidence_signal, pr.analyzer_confidence,
)
# input_snapshot (디버깅/재현용)
defense_log["input_snapshot"] = {
"query": q,
"top_chunks_preview": [
{"title": c.get("title", ""), "snippet": c.get("snippet", "")[:100]}
for c in top_chunks[:3]
],
"answer_preview": None,
}
background_tasks.add_task(
record_ask_event,
q, user.id, "insufficient", "skipped", None,
True, classifier_result.verdict,
max(all_rerank_scores) if all_rerank_scores else 0.0,
sum(sorted(all_rerank_scores, reverse=True)[:3]),
[], len(evidence), 0,
defense_log, int(total_ms),
# Phase E.1 측정 필드
answer_length=0,
covered_aspects=classifier_result.covered_aspects or None,
missing_aspects=classifier_result.missing_aspects or None,
model_name=resolve_primary_model(),
prompt_version=ASK_PROMPT_VERSION,
# Phase 3.5 calibration
source=source,
eval_case_id=eval_case_id,
)
debug_obj = None
if debug:
debug_obj = AskDebug(
timing_ms={**pr.timing_ms, "evidence_ms": ev_ms, "ask_total_ms": total_ms},
search_notes=pr.notes,
confidence_signal=pr.confidence_signal,
evidence_candidate_count=len(evidence),
evidence_kept_count=len(evidence),
evidence_skip_reason=ev_skip,
synthesis_cache_hit=False,
hallucination_flags=[],
defense_layers=defense_log,
)
return AskResponse(
results=pr.results,
ai_answer=None,
citations=[],
synthesis_status="skipped",
synthesis_ms=0.0,
confidence=None,
refused=True,
no_results_reason=no_reason,
query=q,
total=len(pr.results),
completeness="insufficient",
covered_aspects=classifier_result.covered_aspects or None,
missing_aspects=classifier_result.missing_aspects or None,
# refusal gate 단계에서는 backend 호출 자체가 일어나지 않음 →
# backend_used = None. backend_requested 는 호출자 의도 표시용.
backend_requested=backend,
backend_used=None,
debug=debug_obj,
)
# 4. Synthesis (backend dispatcher 적용 — PR-MacBook-RAG-Backend-1)
t_synth = time.perf_counter()
sr = await synthesize(q, evidence, debug=debug, backend=backend)
synth_ms = (time.perf_counter() - t_synth) * 1000
# 4.1. backend_unavailable → 503 fail-fast (자동 fallback 금지)
# 명시 opt-in backend (예: qwen-macbook) 가 비가용일 때만 발생. /ask wrapper 는
# 절대 다른 backend 로 재시도하지 않음. 사용자가 backend 인자 제거 또는 wake 후 재시도.
if sr.status == "backend_unavailable":
backend_requested_val = backend or "gemma-macmini"
total_ms = (time.perf_counter() - t_total) * 1000
logger.warning(
"ask backend_unavailable backend=%s query=%r total_ms=%.0f flags=%s",
backend_requested_val, q[:80], total_ms,
",".join(sr.hallucination_flags) if sr.hallucination_flags else "-",
)
# error_reason 명명 — macbook_unavailable 만 정착 (자동 fallback 부재).
error_reason = (
"macbook_unavailable"
if backend_requested_val == "qwen-macbook"
else "backend_unavailable"
)
# telemetry — search 만 기록 (ask_events 는 200 응답 path 전용)
background_tasks.add_task(
record_search_event, q, user.id, pr.results, "hybrid",
pr.confidence_signal, pr.analyzer_confidence,
)
return JSONResponse(
status_code=503,
content={
"error": "backend_unavailable",
"error_reason": error_reason,
"backend_requested": backend_requested_val,
"backend_used": None,
"query": q,
"detail": (
"명시 선택한 backend 가 일시적으로 응답할 수 없습니다. "
"MacBook 깨우거나 backend 인자를 제거하고 (기본 Gemma) 다시 호출하세요."
),
},
)
# 5. Grounding check + Verifier (조건부 병렬) + re-gate (Phase 3.5b)
grounding = grounding_check(q, sr.answer or "", evidence)
# verifier skip: grounding strong 2+ OR retrieval 자체가 망함
grounding_only_strong = [
f for f in grounding.strong_flags if not f.startswith("verifier_")
]
max_rerank = max(all_rerank_scores, default=0.0)
if len(grounding_only_strong) >= 2 or max_rerank < 0.2:
verifier_result = VerifierResult("skipped", [], 0.0)
else:
verifier_task = asyncio.create_task(
verify(q, sr.answer or "", evidence)
)
# 2026-05-17 B-3: 4s outer wait_for 가 verifier_service LLM_TIMEOUT_MS (10s) 를 override
# → classifier 와 동일 패턴 (search.py:522 가 6s→15s swap 했던 case). 10s 로 align.
try:
verifier_result = await asyncio.wait_for(verifier_task, timeout=10.0)
except asyncio.CancelledError:
raise # 요청 취소는 전파 — broad except 가 삼키지 않게 명시 (R3)
except Exception:
verifier_result = VerifierResult("timeout", [], 0.0)
# Verifier contradictions → grounding flags 머지 (prefix 로 구분, severity 3단계)
for c in verifier_result.contradictions:
if c.severity == "strong":
grounding.strong_flags.append(f"verifier_{c.type}:{c.claim[:30]}")
elif c.severity == "medium":
grounding.weak_flags.append(f"verifier_{c.type}_medium:{c.claim[:30]}")
else:
grounding.weak_flags.append(f"verifier_{c.type}:{c.claim[:30]}")
defense_log["evidence"] = {
"skip_reason": ev_skip,
"kept_count": len(evidence),
}
defense_log["grounding"] = {
"strong": grounding.strong_flags,
"weak": grounding.weak_flags,
}
defense_log["verifier"] = {
"status": verifier_result.status,
"contradictions_count": len(verifier_result.contradictions),
"strong_count": sum(1 for c in verifier_result.contradictions if c.severity == "strong"),
"medium_count": sum(1 for c in verifier_result.contradictions if c.severity == "medium"),
"elapsed_ms": verifier_result.elapsed_ms,
}
# ── Re-gate: 7-tier completeness 결정 (Phase 3.5 B2 — Tier 4 신규 삽입, 재번호) ──
# 기존 6-tier (3.5b 4차 리뷰) + Tier 4(g_strong + v_strong_numeric + low_conf → refuse).
# 호환성: defense_layers["re_gate"] 의 string literal 들은 기존 그대로 유지.
# 신규 "refuse(grounding+verifier_numeric)" 만 추가.
completeness: Literal["full", "partial", "insufficient"] = "full"
covered_aspects = classifier_result.covered_aspects or None
missing_aspects = classifier_result.missing_aspects or None
confirmed_items: list[ConfirmedItem] | None = None
# verifier/grounding strong 구분
g_strong = [f for f in grounding.strong_flags if not f.startswith("verifier_")]
v_strong = [f for f in grounding.strong_flags if f.startswith("verifier_")]
v_medium = [f for f in grounding.weak_flags if f.startswith("verifier_") and "_medium:" in f]
has_direct_negation = any("direct_negation" in f for f in v_strong)
# Phase 3.5 B2: verifier strong flags 중 numeric_conflict 만 카운트.
# promote(VERIFIER_NUMERIC_PROMOTE=1) 활성 시 critical numeric_conflict 가 strong 으로 승격되며
# 여기 카운트에 잡힘. promote off 면 항상 0 → Tier 4 활성 안 됨 (기존 동작 유지).
v_strong_numeric = sum(
1 for f in v_strong if f.startswith("verifier_numeric_conflict")
)
# ── Tier 0 (Phase 3.5 fix3): synthesis 자체 실패 처리 ──
# LLM self-refuse, 메커니즘 실패(timeout/parse_failed/llm_error), answer 공백.
# 빈 답에 대해 grounding/verifier flag 가 0건이라 기존 체인이 "else clean" 으로 빠지며
# completeness="full" 초기값이 보존되던 모순을 여기서 일관되게 차단.
# 과거 baseline(v1-400char) 에서 20(self-refuse)+4(timeout) = 24/223 (10.8%) 해당.
tier0_label = _detect_synthesis_failure(sr)
if tier0_label:
completeness = "insufficient"
sr.answer = None
sr.refused = True
sr.confidence = None
defense_log["re_gate"] = tier0_label
elif len(g_strong) >= 2:
# Tier 1: grounding strong 2+ → refuse
completeness = "insufficient"
sr.answer = None
sr.refused = True
sr.confidence = None
defense_log["re_gate"] = "refuse(grounding_2+strong)"
elif g_strong and has_direct_negation:
# Tier 2: grounding strong + verifier direct_negation → refuse
completeness = "insufficient"
sr.answer = None
sr.refused = True
sr.confidence = None
defense_log["re_gate"] = "refuse(grounding+direct_negation)"
elif g_strong and sr.confidence == "low" and max_rerank < 0.25:
# Tier 3: grounding strong 1 + (low confidence AND weak evidence) → refuse
completeness = "insufficient"
sr.answer = None
sr.refused = True
sr.confidence = None
defense_log["re_gate"] = "refuse(grounding+low_conf+weak_ev)"
elif g_strong and v_strong_numeric >= 1 and sr.confidence == "low":
# Tier 4 (B2 신규): grounding strong + verifier numeric_conflict strong + low conf → refuse.
# verifier strong 단독 refuse 금지 원칙 유지 — g_strong 교차 필수.
completeness = "insufficient"
sr.answer = None
sr.refused = True
sr.confidence = None
defense_log["re_gate"] = "refuse(grounding+verifier_numeric)"
elif g_strong or has_direct_negation:
# Tier 5 (기존 4): grounding strong 1 또는 verifier direct_negation 단독 → partial
completeness = "partial"
sr.confidence = "low"
defense_log["re_gate"] = "partial(strong_or_negation)"
elif v_medium:
# Tier 6 (기존 5): verifier medium 누적 → count 기반 confidence 하향
medium_count = len(v_medium)
if medium_count >= 3:
sr.confidence = "low"
defense_log["re_gate"] = f"conf_low(medium_x{medium_count})"
elif medium_count == 2 and sr.confidence == "high":
sr.confidence = "medium"
defense_log["re_gate"] = "conf_cap_medium(medium_x2)"
else:
defense_log["re_gate"] = f"medium_x{medium_count}(no_action)"
elif grounding.weak_flags:
# Tier 7 (기존 6): weak → confidence 한 단계 하향
if sr.confidence == "high":
sr.confidence = "medium"
defense_log["re_gate"] = "conf_lower(weak)"
else:
defense_log["re_gate"] = "clean"
# Confidence cap from refusal gate (classifier 부재 시 conservative)
if decision.confidence_cap and sr.confidence:
conf_rank = {"low": 0, "medium": 1, "high": 2}
if conf_rank.get(sr.confidence, 0) > conf_rank.get(decision.confidence_cap, 2):
sr.confidence = decision.confidence_cap
# Partial 이면 max confidence = medium
if completeness == "partial" and sr.confidence == "high":
sr.confidence = "medium"
sr.hallucination_flags.extend(
[f"strong:{f}" for f in grounding.strong_flags]
+ [f"weak:{f}" for f in grounding.weak_flags]
)
total_ms = (time.perf_counter() - t_total) * 1000
# 6. 응답 구성
citations = _build_citations(evidence, sr.used_citations)
no_reason = _map_no_results_reason(pr, evidence, ev_skip, sr)
if completeness == "insufficient" and not no_reason:
# Tier 0 경로: synthesis self-refuse 는 LLM 이 준 사유가 가장 정확.
if sr.refused and sr.refuse_reason:
no_reason = sr.refuse_reason
else:
no_reason = "답변 검증에서 복수 오류 감지"
logger.info(
"ask query=%r results=%d evidence=%d cite=%d synth=%s conf=%s completeness=%s "
"refused=%s grounding_strong=%d grounding_weak=%d ev_ms=%.0f synth_ms=%.0f total=%.0f",
q[:80], len(pr.results), len(evidence), len(citations),
sr.status, sr.confidence or "-", completeness,
sr.refused, len(grounding.strong_flags), len(grounding.weak_flags),
ev_ms, synth_ms, total_ms,
)
# 7. telemetry — search + ask_events 두 경로 동시
background_tasks.add_task(
record_search_event, q, user.id, pr.results, "hybrid",
pr.confidence_signal, pr.analyzer_confidence,
)
# input_snapshot (디버깅/재현용)
defense_log["input_snapshot"] = {
"query": q,
"top_chunks_preview": [
{"title": (r.title or "")[:50], "snippet": (r.snippet or "")[:100]}
for r in pr.results[:3]
],
"answer_preview": (sr.answer or "")[:200],
}
background_tasks.add_task(
record_ask_event,
q, user.id, completeness, sr.status, sr.confidence,
sr.refused, classifier_result.verdict,
max(all_rerank_scores) if all_rerank_scores else 0.0,
sum(sorted(all_rerank_scores, reverse=True)[:3]),
sr.hallucination_flags, len(evidence), len(citations),
defense_log, int(total_ms),
# Phase E.1 측정 필드
answer_length=len(sr.answer or ""),
covered_aspects=covered_aspects,
missing_aspects=missing_aspects,
model_name=resolve_primary_model(),
prompt_version=ASK_PROMPT_VERSION,
# Phase 3.5 calibration
source=source,
eval_case_id=eval_case_id,
)
debug_obj = None
if debug:
timing = dict(pr.timing_ms)
timing["evidence_ms"] = ev_ms
timing["synthesis_ms"] = synth_ms
timing["ask_total_ms"] = total_ms
debug_obj = AskDebug(
timing_ms=timing,
search_notes=pr.notes,
query_analysis=pr.query_analysis,
confidence_signal=pr.confidence_signal,
evidence_candidate_count=len(evidence),
evidence_kept_count=len(evidence),
evidence_skip_reason=ev_skip,
synthesis_cache_hit=sr.cache_hit,
synthesis_raw_preview=sr.raw_preview,
hallucination_flags=sr.hallucination_flags,
defense_layers=defense_log,
)
# backend_used: synthesize 가 실제 호출한 backend (backend 인자 그대로 신뢰 OK —
# backend_unavailable 은 위 503 분기에서 이미 return 됨).
backend_used_val = backend or "gemma-macmini"
return AskResponse(
results=pr.results,
ai_answer=sr.answer,
citations=citations,
synthesis_status=sr.status,
synthesis_ms=sr.elapsed_ms,
confidence=sr.confidence,
refused=sr.refused,
no_results_reason=no_reason,
query=q,
total=len(pr.results),
completeness=completeness,
covered_aspects=covered_aspects,
missing_aspects=missing_aspects,
confirmed_items=confirmed_items,
backend_requested=backend,
backend_used=backend_used_val,
debug=debug_obj,
)
# ─── PR-DocSrv-Ask-ToolCalling-ReAct-1 ────────────────────────────────────
# /api/search/ask/react — Qwen native tool calling 로 ReAct loop.
# 본 endpoint 는 qwen-macbook only (endpoint 자체가 implicit opt-in).
# MacBook unavailable 시 503 + error_reason=macbook_unavailable. Gemma 자동 fallback X.
# G0-2 counter semantics: max_tool_rounds=2, max LLM calls=3, search exec ≤ 2.
# G0-3 trace exposure: default response 의 debug_trace=None, debug=True 시만 채움.
class AskReactRequest(BaseModel):
query: str
debug: bool = False
class AskReactResponse(BaseModel):
final_answer: str
iterations: int
partial: bool
sources: list[dict]
debug_trace: list[dict] | None = None
@router.post("/ask/react", response_model=AskReactResponse)
async def ask_react(
payload: AskReactRequest,
user: Annotated[User, Depends(get_current_user)],
session: Annotated[AsyncSession, Depends(get_session)],
):
"""ReAct loop endpoint (qwen-macbook only, no fallback).
호출자가 명시 opt-in 한 endpoint. MacBook 가 sleep / unreachable / 5xx 시
HTTP 503 + body `{error_reason: "macbook_unavailable", backend: "qwen-macbook"}`
를 반환한다. Gemma Mac mini 로 자동 fallback 하지 않는다 (정정 4 의 연장).
request body:
- query: str (사용자 원본 질의)
- debug: bool (default false; true 시 응답 `debug_trace` 채움)
response body (성공 200):
- final_answer: str (Qwen 종합문, partial 일 수 있음)
- iterations: int (실제 진행된 tool round 수)
- partial: bool (max_tool_rounds 도달 후 LLM content 비었을 때 true)
- sources: list[dict] (검색에서 모인 evidence 메타, id-기준 dedup)
- debug_trace: list[dict] | null (debug=true 시 round 별 trace)
"""
# 지연 import — 순환 의존성 회피 (react_loop 가 api.search.SearchResult 사용 안 함)
from services.llm.backends import BackendUnavailable, get_backend
from services.search.react_loop import agentic_ask_loop
backend_inst = get_backend("qwen-macbook")
# PR-2 of DS AI routing policy: backend_inst may be RouterBackend (default)
# or QwenMacBookBackend (DS_BACKENDS_VIA_ROUTER=false rollback). Both
# implement generate_with_tools so the ReAct loop is identical.
assert hasattr(backend_inst, "generate_with_tools")
try:
result = await agentic_ask_loop(
session,
payload.query,
backend=backend_inst,
debug=payload.debug,
)
except BackendUnavailable as exc:
logger.warning(
"ask_react backend unavailable backend=%s reason=%s",
exc.backend_name, exc.reason,
)
return JSONResponse(
status_code=503,
content={
"error_reason": "macbook_unavailable",
"backend_requested": "qwen-macbook",
"backend_used": None,
"detail": exc.reason,
},
)
return AskReactResponse(
final_answer=result.final_answer,
iterations=result.iterations,
partial=result.partial,
sources=result.sources,
debug_trace=result.debug_trace,
)
+4
View File
@@ -56,5 +56,9 @@ class PublishOutbox(Base):
DateTime(timezone=True), default=datetime.now, nullable=False
)
processed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# mig378: 행별 격리 재시도/terminal. attempts=savepoint 실패 누적, failed_at=MAX 초과 terminal
# (set 시 워커 select 에서 제외 → head-of-line block 방지).
attempts: Mapped[int] = mapped_column(SmallInteger, nullable=False, default=0)
failed_at: Mapped[datetime | None] = mapped_column(DateTime(timezone=True))
# 미처리 부분 인덱스 idx(id) WHERE processed_at IS NULL = mig372.
-33
View File
@@ -1,33 +0,0 @@
You are an answerability judge. Given a query and evidence chunks, determine if the evidence can answer the query. Respond ONLY in JSON.
## CALIBRATION (CRITICAL)
- verdict=full: evidence is SUFFICIENT to answer the CORE of the query. Missing minor details does NOT make it insufficient.
- verdict=partial: evidence covers SOME major aspects but CLEARLY MISSES others the user explicitly asked about.
- verdict=insufficient: evidence has NO relevant information for the query, or is completely off-topic.
Example: Query="제6장 주요 내용", Evidence covers 제6장 definition+scope → verdict=full (core is covered).
Example: Query="제6장 처벌 조항", Evidence covers 제6장 definition but NOT 처벌 → verdict=partial.
Example: Query="감귤 출하량", Evidence about 산업안전보건법 → verdict=insufficient.
## Rules
1. Your "verdict" must be based ONLY on whether the CONTENT semantically answers the query. Ignore retrieval scores for this field.
2. "covered_aspects": query aspects that evidence covers. Korean labels for Korean queries.
3. "missing_aspects": query aspects that evidence does NOT cover. Korean labels.
4. Keep aspects concise (2-5 words each), non-overlapping.
## Output Schema
{
"verdict": "full" | "partial" | "insufficient",
"covered_aspects": ["aspect1"],
"missing_aspects": ["aspect2"],
"confidence": "high" | "medium" | "low"
}
## Query
{query}
## Evidence chunks:
{chunks}
## Retrieval scores (for reference only, NOT for verdict):
[{scores}]
+3 -1
View File
@@ -1,5 +1,5 @@
[System]
너는 한국어 문서 태거 + 짧은 요약기다. 입력 본문을 읽고 TL;DR + 핵심 bullets + tags 생성한다. **상세 문단·entities 는 생성하지 않는다** (깊은 요약은 26B, entity 는 P3b 담당).
너는 한국어 문서 태거 + 요약기다. 입력 본문을 읽고 짧은 요약(ai_summary 2~3문장) + TL;DR + 핵심 bullets + tags 생성한다. **여러 문단의 상세 심층요약·entities 는 생성하지 않는다** (깊은 요약은 26B, entity 는 P3b 담당).
subject_description: {subject_description}
@@ -13,6 +13,7 @@ subject_description: {subject_description}
- pii 감지 시 "pii" 추가 + confidence 감점.
요약 규칙:
- **ai_summary**: 2~3문장 문단. 문서의 핵심 내용·목적을 서술 (검색·표시용 요약).
- **TL;DR**: 1문장, 최대 60자.
- **Bullets**: 정확히 5개, 각 30~60자.
- 본문에 없는 정보 추가 금지 (hallucination 금지).
@@ -20,6 +21,7 @@ subject_description: {subject_description}
출력 (JSON only):
{{
"ai_summary": "2~3문장 문단 요약",
"tldr": "1문장 최대 60자",
"bullets": ["...", "...", "...", "...", "..."],
"tags": ["..."],
-42
View File
@@ -1,42 +0,0 @@
You are a grounding verifier. Given an answer and its evidence sources, check if the answer contradicts or fabricates information. Respond ONLY in JSON.
## Contradiction Types (IMPORTANT — severity depends on type)
- **direct_negation** (CRITICAL): Answer directly contradicts evidence. Examples: evidence "의무" but answer "권고"; evidence "금지" but answer "허용"; negation reversal ("~해야 한다" vs "~할 필요 없다").
- **numeric_conflict**: Answer states a number different from evidence. "50명" in evidence but "100명" in answer. Only flag if the same concept is referenced. severity=critical when the number is the CORE answered quantity (amount/count/rate/date/duration that the query asked for); severity=minor when the number is peripheral (e.g., example/footnote).
- **intent_core_mismatch**: Answer addresses a fundamentally different topic than the query asked about.
- **nuance**: Answer overgeneralizes or adds qualifiers not in evidence (e.g., "모든" when evidence says "일부").
- **unsupported_claim**: Answer makes a factual claim with no basis in any evidence.
## Rules
1. Compare each claim in the answer against the cited evidence. A claim with [n] citation should be checked against evidence [n].
2. NOT a contradiction: Paraphrasing, summarizing, or restating the same fact in different words. Korean formal/informal style (합니다/한다) differences.
3. Numbers must match exactly after normalization (1,000 = 1000). Range values (e.g., "100~200명") satisfy any answer within range.
4. Legal/regulatory terms must preserve original meaning (의무 ≠ 권고, 금지 ≠ 제한, 허용 ≠ 금지).
5. Maximum 5 contradictions (most severe first: direct_negation > numeric_conflict > intent_core_mismatch > nuance > unsupported_claim).
## Output Schema
{
"contradictions": [
{
"type": "direct_negation" | "numeric_conflict" | "intent_core_mismatch" | "nuance" | "unsupported_claim",
"severity": "critical" | "minor",
"claim": "answer 내 해당 구절 (50자 이내)",
"evidence_ref": "대응 근거 내용 (50자 이내, [n] 포함)",
"explanation": "모순 이유 (한국어, 30자 이내)"
}
],
"verdict": "clean" | "minor_issues" | "major_issues"
}
severity mapping:
- direct_negation → "critical"
- numeric_conflict → "critical" if the number is the CORE answered quantity, else "minor"
- All other types → "minor"
If no contradictions: {"contradictions": [], "verdict": "clean"}
## Answer
{answer}
## Evidence
{numbered_evidence}
-156
View File
@@ -1,156 +0,0 @@
"""Answerability classifier (Phase 3.5a).
Mac mini 26B MLX 기반 (config.yaml ai.models.classifier — PR #20 이후 triage/primary/classifier 동일 endpoint). MLX gate 밖 — evidence extraction 과 병렬 실행 (concurrent 안전성 별 검토).
P1 실측 결과: ternary (full/partial/insufficient) 불안정 → **binary (sufficient/insufficient)**.
"full" vs "partial" 구분은 grounding_check 의 intent alignment 이 담당.
Classifier verdict 는 "relevant evidence 가 있나" 의 binary 판단.
covered_aspects / missing_aspects 는 로깅용으로 유지 (refusal gate 에서 사용 안 함).
"""
from __future__ import annotations
import asyncio
import time
from dataclasses import dataclass
from typing import Literal
from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings
from core.utils import setup_logger
from .llm_gate import Priority, acquire_mlx_gate
logger = setup_logger("classifier")
LLM_TIMEOUT_MS = 30000
CIRCUIT_THRESHOLD = 5
CIRCUIT_RECOVERY_SEC = 60
_failure_count = 0
_circuit_open_until: float | None = None
@dataclass(slots=True)
class ClassifierResult:
status: Literal["ok", "timeout", "error", "circuit_open", "skipped"]
verdict: Literal["sufficient", "insufficient"] | None
covered_aspects: list[str]
missing_aspects: list[str]
elapsed_ms: float
try:
CLASSIFIER_PROMPT = _load_prompt("classifier.txt")
except FileNotFoundError:
CLASSIFIER_PROMPT = ""
logger.warning("classifier.txt not found — classifier will always skip")
def _build_input(
query: str,
top_chunks: list[dict],
rerank_scores: list[float],
) -> str:
"""Y+ input (content + scores with role separation)."""
chunk_block = "\n".join(
f"[{i+1}] title: {c.get('title','')}\n"
f" section: {c.get('section','')}\n"
f" snippet: {c.get('snippet','')}"
for i, c in enumerate(top_chunks[:3])
)
scores_str = ", ".join(f"{s:.2f}" for s in rerank_scores[:3])
return (
CLASSIFIER_PROMPT
.replace("{query}", query)
.replace("{chunks}", chunk_block)
.replace("{scores}", scores_str)
)
async def classify(
query: str,
top_chunks: list[dict],
rerank_scores: list[float],
) -> ClassifierResult:
"""Always-on binary classifier. Parallel with evidence extraction.
Returns:
ClassifierResult with verdict=sufficient|insufficient.
Status "ok" 이 아니면 verdict=None (caller 가 fallback 처리).
"""
global _failure_count, _circuit_open_until
t_start = time.perf_counter()
# Circuit breaker
if _circuit_open_until and time.time() < _circuit_open_until:
return ClassifierResult("circuit_open", None, [], [], 0.0)
if not CLASSIFIER_PROMPT:
return ClassifierResult("skipped", None, [], [], 0.0)
if not hasattr(settings.ai, "classifier") or settings.ai.classifier is None:
return ClassifierResult("skipped", None, [], [], 0.0)
prompt = _build_input(query, top_chunks, rerank_scores)
client = AIClient()
try:
# 2026-05-17: PR #20 이후 endpoint 가 Mac mini 26B → llm_gate Semaphore(1) 필수.
# Gate 미사용 시 classifier + evidence + synthesis 가 동시에 single-inference
# MLX 에 race → 거의 모두 timeout (실측: 8/10 fixture query). docstring 영구 룰:
# "MLX primary 호출 경로는 예외 없이 gate 획득 필수".
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await client.call_classifier(prompt)
_failure_count = 0
except asyncio.TimeoutError:
_failure_count += 1
if _failure_count >= CIRCUIT_THRESHOLD:
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
logger.error(f"classifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
logger.warning("classifier timeout")
return ClassifierResult(
"timeout", None, [], [],
(time.perf_counter() - t_start) * 1000,
)
except Exception as e:
_failure_count += 1
if _failure_count >= CIRCUIT_THRESHOLD:
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
logger.error(f"classifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
logger.warning("classifier error: type=%s repr=%r", type(e).__name__, e)
return ClassifierResult(
"error", None, [], [],
(time.perf_counter() - t_start) * 1000,
)
finally:
await client.close()
elapsed_ms = (time.perf_counter() - t_start) * 1000
parsed = parse_json_response(raw)
if not isinstance(parsed, dict):
logger.warning("classifier parse failed raw=%r", (raw or "")[:200])
return ClassifierResult("error", None, [], [], elapsed_ms)
# ternary → binary 매핑
raw_verdict = parsed.get("verdict", "")
if raw_verdict == "insufficient":
verdict: Literal["sufficient", "insufficient"] | None = "insufficient"
elif raw_verdict in ("full", "partial", "sufficient"):
verdict = "sufficient"
else:
verdict = None
covered = parsed.get("covered_aspects") or []
missing = parsed.get("missing_aspects") or []
if not isinstance(covered, list):
covered = []
if not isinstance(missing, list):
missing = []
logger.info(
"classifier ok query=%r verdict=%s (raw=%s) covered=%d missing=%d elapsed_ms=%.0f",
query[:60], verdict, raw_verdict, len(covered), len(missing), elapsed_ms,
)
return ClassifierResult("ok", verdict, covered, missing, elapsed_ms)
-505
View File
@@ -1,505 +0,0 @@
"""Grounding check — post-synthesis 검증 (Phase 3.5a).
Strong/weak flag 분리:
- **Strong** (→ partial 강등 or refuse): fabricated_number, intent_misalignment(important)
- **Weak** (→ confidence lower only): uncited_claim, low_overlap, intent_misalignment(generic)
Re-gate 로직 (Phase 3.5a 9라운드 토론 결과):
- strong 1개 → partial 강등
- strong 2개 이상 → refuse
- weak → confidence "low"
Intent alignment (rule-based):
- query 의 핵심 명사가 answer 에 등장하는지 확인
- "처벌" 같은 중요 키워드 누락은 strong
- "주요", "관련" 같은 generic 은 무시
"""
from __future__ import annotations
import re
from dataclasses import dataclass
from typing import TYPE_CHECKING
from core.utils import setup_logger
if TYPE_CHECKING:
from .evidence_service import EvidenceItem
logger = setup_logger("grounding")
# "주요", "관련" 등 intent alignment 에서 제외할 generic 단어
GENERIC_TERMS = frozenset({
"주요", "관련", "내용", "정의", "기준", "방법", "설명", "개요",
"대한", "위한", "대해", "무엇", "어떤", "어떻게", "있는",
"하는", "되는", "이런", "그런", "이것", "그것",
})
@dataclass(slots=True)
class GroundingResult:
strong_flags: list[str]
weak_flags: list[str]
_UNIT_CHARS = r'명인개%년월일조항호세건원회'
# "이상/이하/초과/미만" — threshold 표현 (numeric conflict 에서 skip 대상)
_THRESHOLD_SUFFIXES = re.compile(r'이상|이하|초과|미만')
# 약칭/근사치 prefix — 매칭 전 제거 (Phase 3.5 B1).
# ⚠ 최대/최소 는 의도적으로 제외 — 이들은 bound operator 라 의미가 다름 (Phase 3.5 B1 fix3).
# 약/대략/거의/얼추 만 노이즈 prefix 로 strip.
_APPROX_PREFIX_RE = re.compile(r'(약|대략|거의|얼추)\s*')
# 단위 동의어 dict — 추출 직후 정규화 (Phase 3.5 B1)
# 의미가 동일한 단위는 같은 표기로 통일해서 set 비교/range overlap 안정화.
_UNIT_SYNONYMS: dict[str, str] = {
"": "",
"사람": "",
"퍼센트": "%",
"프로": "%",
"KRW": "",
"krw": "",
}
# tolerance(±1%) 허용 단위 — 양적 측정값 (Phase 3.5 B1)
_TOLERANCE_UNITS: frozenset[str] = frozenset({"", "", "%", "", ""})
# tolerance 미적용 단위 — 식별자성 숫자 (연도/조문/횟수)
_EXACT_ONLY_UNITS: frozenset[str] = frozenset({"", "", "", "", "", "", ""})
# 최대/최소 prefix 패턴 — bound operator (Phase 3.5 B1 fix3).
# 매칭된 숫자는 exact pool 에서 제외하고 one-sided range 로 변환.
# 경계값 자체는 clear 대상 아님 (Codex 권장: "최대 100명" + answer "100명" → flag 유지).
_BOUND_PATTERN_RE = re.compile(
rf'(최대|최소)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)'
)
_RANGE_INF = 10**18 # one-sided range 상한 sentinel
def _normalize_unit(unit: str) -> str:
"""단위 동의어 → 대표 표기."""
return _UNIT_SYNONYMS.get(unit, unit)
def _extract_unit(literal: str) -> str | None:
"""리터럴에서 숫자 뒤 단위(한 글자 또는 동의어) 추출 + 정규화."""
# 천단위 콤마 + 옵션 소수 + 한글 단위 한 글자 또는 동의어
m = re.match(rf'[\d,.]+\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)', literal)
if not m:
return None
return _normalize_unit(m.group(1))
def _extract_numeric_corpus(text: str) -> dict:
"""단위별 숫자 + 범위 + bound 통합 추출 (Phase 3.5 B1 fix1+fix3).
Returns:
{
"exact_by_unit": {unit_or_None: set(digits)}, # 평범한 숫자 (bound 제외)
"ranges_by_unit": {unit: [(lo, hi), ...]}, # 양방향(A~B) + 단방향(최대/최소)
}
None 키는 단위 없는 bare 숫자.
`최대 N <unit>` → ranges[(0, N-1)] (경계값 자체는 cleared 대상 아님)
`최소 N <unit>` → ranges[(N+1, INF)]
"""
cleaned = _APPROX_PREFIX_RE.sub('', text)
exact_by_unit: dict[str | None, set[str]] = {None: set()}
ranges_by_unit: dict[str, list[tuple[int, int]]] = {}
# 1) 최대/최소 — bound. exact pool 에서 제외, one-sided range 로 변환.
bound_spans: list[tuple[int, int]] = [] # 매칭 substring 위치 — 이후 단계에서 skip
for m in _BOUND_PATTERN_RE.finditer(cleaned):
bound_kind = m.group(1)
try:
n = int(m.group(2).replace(',', '').split('.')[0])
except ValueError:
continue
unit = _normalize_unit(m.group(3))
if bound_kind == "최대":
ranges_by_unit.setdefault(unit, []).append((0, max(0, n - 1)))
else: # 최소
ranges_by_unit.setdefault(unit, []).append((n + 1, _RANGE_INF))
bound_spans.append((m.start(), m.end()))
def _in_bound_span(pos: int) -> bool:
return any(s <= pos < e for s, e in bound_spans)
# 2) 천단위 콤마 bare number
for m in re.finditer(r'\d{1,3}(?:,\d{3})+(?:\.\d+)?', cleaned):
if _in_bound_span(m.start()):
continue
exact_by_unit[None].add(m.group().replace(',', ''))
# 3) 단위 있는 숫자 (단위 동의어 포함)
for m in re.finditer(
rf'(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)',
cleaned,
):
if _in_bound_span(m.start()):
continue
digits = m.group(1).replace(',', '').split('.')[0]
if not digits:
continue
unit = _normalize_unit(m.group(2))
exact_by_unit.setdefault(unit, set()).add(digits)
# 4) 양방향 범위 표현 (A~B / A 부터 B)
for m in re.finditer(
rf'(\d[\d,.]*)\s*(?:[~\-]|부터)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로)',
cleaned,
):
if _in_bound_span(m.start()):
continue
try:
lo = int(m.group(1).replace(',', '').split('.')[0])
hi = int(m.group(2).replace(',', '').split('.')[0])
except ValueError:
continue
unit = _normalize_unit(m.group(3))
ranges_by_unit.setdefault(unit, []).append((min(lo, hi), max(lo, hi)))
# 5) bare 2자리+ 단독 숫자
for m in re.finditer(r'\b(\d{2,})\b', cleaned):
if _in_bound_span(m.start()):
continue
exact_by_unit[None].add(m.group())
return {
"exact_by_unit": exact_by_unit,
"ranges_by_unit": ranges_by_unit,
}
def _within_unit_range(
n: int, unit: str | None, ranges_by_unit: dict[str, list[tuple[int, int]]]
) -> bool:
"""unit-matching range 검증.
answer unit 이 None (bare 숫자) 면 보수적으로 False — bare 답변은 range clear 대상 아님.
"""
if unit is None:
return False
return any(lo <= n <= hi for lo, hi in ranges_by_unit.get(unit, []))
def _close_to_unit_pool(
n: int, unit: str | None, exact_by_unit: dict[str | None, set[str]], tol: float
) -> bool:
"""unit-matching tolerance 검증.
answer unit 이 None 이면 False — bare 답변은 tolerance 대상 아님.
같은 unit bucket 안의 후보만 비교.
"""
if unit is None:
return False
candidates = exact_by_unit.get(unit, set())
for c in candidates:
try:
cn = int(c)
except ValueError:
continue
if cn == 0:
continue
if abs(n - cn) / cn <= tol:
return True
return False
def _extract_number_literals(text: str) -> set[str]:
"""숫자 + 단위 추출 + normalize (Phase 3.5 B1: 6단계 확장).
1) 약칭 prefix 제거 ("약 100명""100명")
2) 천단위 콤마 bare number 우선 ("1,000""1000" set 등록)
3) 한국어 단위 접미사 매칭 (기존)
4) 범위 표현 양쪽 숫자 추출 (separator: ~, -, , 부터)
5) 단위 동의어 정규화 (인→명, 퍼센트→%, KRW→원)
6) bare 2자리+ 추출 (기존)
"""
# 1. 약칭 prefix 제거 (전체 텍스트에서)
cleaned = _APPROX_PREFIX_RE.sub('', text)
# 2. 천단위 콤마 bare number — normalize 된 값을 set 에 선등록
normalized: set[str] = set()
for m in re.finditer(r'\d{1,3}(?:,\d{3})+(?:\.\d+)?', cleaned):
normalized.add(m.group().replace(',', ''))
# 3. 숫자 + 한국어 단위 접미사 (동의어 포함)
raw: set[str] = set(re.findall(
rf'\d[\d,.]*\s*(?:[{_UNIT_CHARS}]|인|사람|퍼센트|프로|KRW|krw)\w{{0,2}}',
cleaned,
))
# 4. 범위 표현 — separator 에 "부터" 추가
for m in re.finditer(
rf'(\d[\d,.]*)\s*(?:[~\-]|부터)\s*(\d[\d,.]*)\s*([{_UNIT_CHARS}]|인|사람|퍼센트|프로)',
cleaned,
):
unit_norm = _normalize_unit(m.group(3))
raw.add(m.group(1) + unit_norm)
raw.add(m.group(2) + unit_norm)
# 5. normalize: 단위 동의어 통일 + 콤마 제거
for r in raw:
# 단위 부분 정규화
m = re.match(r'([\d,.]+)\s*([^\d\s]+)', r)
if m:
digits_part = m.group(1)
unit_part = _normalize_unit(m.group(2))
normalized.add(digits_part + unit_part)
normalized.add(digits_part.replace(',', '') + unit_part)
normalized.add(r.strip())
num_only = re.match(r'[\d,.]+', r)
if num_only:
normalized.add(num_only.group().replace(',', ''))
# 6. 단독 숫자 (2자리+ 만)
for d in re.findall(r'\b(\d{2,})\b', cleaned):
normalized.add(d)
return normalized
def _within_evidence_range(digits: str, raw: str, evidence_text: str) -> bool:
"""evidence 에 'A~B 단위' 가 있고 answer 의 숫자가 그 범위 안이면 True.
범위 단위는 무시 (단위 비교는 호출 전 단계). digits = 정수 문자열.
"""
try:
n = int(digits)
except ValueError:
return False
cleaned_ev = _APPROX_PREFIX_RE.sub('', evidence_text)
for m in re.finditer(
rf'(\d[\d,.]*)\s*(?:[~\-]|부터)\s*(\d[\d,.]*)\s*[{_UNIT_CHARS}]',
cleaned_ev,
):
try:
lo = int(m.group(1).replace(',', '').split('.')[0])
hi = int(m.group(2).replace(',', '').split('.')[0])
if min(lo, hi) <= n <= max(lo, hi):
return True
except ValueError:
continue
return False
def _close_to_any(n: int, candidates: set[str], tol: float) -> bool:
"""candidates 중 하나라도 (1±tol) 배율 안에 들어오면 True.
n 은 정수, candidates 는 digits-only 문자열 집합.
"""
for c in candidates:
try:
cn = int(c)
except ValueError:
continue
if cn == 0:
continue
if abs(n - cn) / cn <= tol:
return True
return False
def _extract_content_tokens(text: str) -> set[str]:
"""한국어 2자 이상 명사 + 영어 3자 이상 단어."""
return set(re.findall(r'[가-힣]{2,}|[a-zA-Z]{3,}', text))
def _parse_number_with_unit(literal: str) -> tuple[str, str] | None:
"""숫자 리터럴에서 (digits_only, unit) 분리. 단위 없으면 None."""
m = re.match(rf'([\d,.]+)\s*([{_UNIT_CHARS}])', literal)
if not m:
return None
digits = m.group(1).replace(',', '')
unit = m.group(2)
return (digits, unit)
def _check_evidence_numeric_conflicts(evidence: list["EvidenceItem"]) -> list[str]:
"""evidence 간 숫자 충돌 감지 (Phase 3.5b). evidence >= 2 일 때만 활성.
동일 단위, 다른 숫자 → weak flag. "이상/이하/초과/미만" 포함 시 skip.
bare number 는 비교 안 함 (조항 번호 등 false positive 방지).
"""
if len(evidence) < 2:
return []
# 각 evidence 에서 단위 있는 숫자 + threshold 여부 추출
# {evidence_idx: [(digits, unit, has_threshold), ...]}
per_evidence: dict[int, list[tuple[str, str, bool]]] = {}
for idx, ev in enumerate(evidence):
nums = re.findall(
rf'\d[\d,.]*\s*[{_UNIT_CHARS}]\w{{0,4}}',
ev.span_text,
)
entries = []
for raw in nums:
parsed = _parse_number_with_unit(raw)
if not parsed:
continue
has_thr = bool(_THRESHOLD_SUFFIXES.search(raw))
entries.append((parsed[0], parsed[1], has_thr))
if entries:
per_evidence[idx] = entries
if len(per_evidence) < 2:
return []
# 단위별로 evidence 간 숫자 비교
# {unit: {digits: [evidence_idx, ...]}}
unit_map: dict[str, dict[str, list[int]]] = {}
for idx, entries in per_evidence.items():
for digits, unit, has_thr in entries:
if has_thr:
continue # threshold 표현은 skip
if unit not in unit_map:
unit_map[unit] = {}
if digits not in unit_map[unit]:
unit_map[unit][digits] = []
if idx not in unit_map[unit][digits]:
unit_map[unit][digits].append(idx)
flags: list[str] = []
for unit, digits_map in unit_map.items():
distinct_values = list(digits_map.keys())
if len(distinct_values) >= 2:
# 가장 많이 등장하는 2개 비교
top2 = sorted(distinct_values, key=lambda d: len(digits_map[d]), reverse=True)[:2]
flags.append(
f"evidence_numeric_conflict:{top2[0]}{unit}_vs_{top2[1]}{unit}"
)
return flags
def check(
query: str,
answer: str,
evidence: list[EvidenceItem],
) -> GroundingResult:
"""답변 vs evidence grounding 검증 + query intent alignment."""
strong: list[str] = []
weak: list[str] = []
if not answer or not evidence:
return GroundingResult([], [])
# ⚠ citation marker [n] 양측 제거 (대칭성 — Phase 3.5 B1)
evidence_text = re.sub(r'\[\d+\]', '', " ".join(e.span_text for e in evidence))
# ── Strong 1: fabricated number (unit-aware 3단계 — Phase 3.5 B1 fix1+fix3) ──
# Codex 지적 반영:
# - fix1: range/tolerance/exact 모두 단위 일치 시에만 clear
# (예: "150원" vs "100~200명" → flag 유지)
# - fix3: 최대/최소 prefix 는 bound 의미 보존
# (예: "최대 100명" + answer "100명" → flag 유지, "최대 100명" + answer "50명" → cleared)
answer_clean = re.sub(r'\[\d+\]', '', answer)
answer_corpus = _extract_numeric_corpus(answer_clean)
evidence_corpus = _extract_numeric_corpus(evidence_text)
ev_exact_by_unit = evidence_corpus["exact_by_unit"]
ev_ranges_by_unit = evidence_corpus["ranges_by_unit"]
# cleared 는 (unit, digits) 쌍 단위로 추적 — 단위 충돌 케이스 방어
cleared_pairs: set[tuple[str | None, str]] = set()
# Pass 1: 각 (unit, digits) 가 evidence 에서 정당화되는지 판정
for unit, digits_set in answer_corpus["exact_by_unit"].items():
for d in digits_set:
# 1) exact match — 같은 unit bucket 내에서만
if d in ev_exact_by_unit.get(unit, set()):
cleared_pairs.add((unit, d))
continue
# bare answer (unit=None) 는 evidence bare bucket 도 보조 매칭
if unit is None and d in ev_exact_by_unit.get(None, set()):
cleared_pairs.add((unit, d))
continue
try:
n = int(d)
except ValueError:
continue
# 2) range — same-unit 만 (bare answer 는 range clear 대상 아님)
if _within_unit_range(n, unit, ev_ranges_by_unit):
cleared_pairs.add((unit, d))
continue
# 3) ±1% tolerance — 단위가 양적(_TOLERANCE_UNITS) + 4자리+ + same-unit
if (
unit in _TOLERANCE_UNITS
and len(d) >= 4
and _close_to_unit_pool(n, unit, ev_exact_by_unit, tol=0.01)
):
cleared_pairs.add((unit, d))
continue
# 식별자성 단위(_EXACT_ONLY_UNITS) 는 tolerance 패스 X.
# Pass 2: cleared 되지 않은 (unit, digits) 를 strong flag.
# 1자리 무시는 unit 이 식별자성(_EXACT_ONLY_UNITS: 년/월/일/조/항/호/회) 이 아닐 때만 적용.
# bare(None) 답변 숫자는 같은 digit 이 다른 unit 에서 cleared 됐으면 skip — 추출 부산물 방어.
# ⚠ 단위 cross-clear (예: "원" cleared → "명" 도 skip) 은 금지: Codex unit-mismatch 케이스가 깨짐.
unit_anchored_cleared: set[str] = {d for (u, d) in cleared_pairs if u is not None}
flagged_keys: set[tuple[str | None, str]] = set()
for unit, digits_set in answer_corpus["exact_by_unit"].items():
for d in digits_set:
if (unit, d) in cleared_pairs or (unit, d) in flagged_keys:
continue
# bare(None) 답변 숫자가 임의의 단위 bucket 에서 cleared 됐으면 duplicate 로 처리.
# 사례: "1,000명" → unit bucket "명" 에 1000 + bare bucket None 에 1000 (comma normalize 부산물).
# 이미 ("명", "1000") 가 cleared 라면 (None, "1000") 도 같은 사실을 가리키므로 skip.
if unit is None and d in unit_anchored_cleared:
continue
if len(d) < 2 and unit not in _EXACT_ONLY_UNITS:
continue
flagged_keys.add((unit, d))
# 사람이 읽기 좋게 "{digits}{unit}" 또는 bare 형태로 표기
label = f"{d}{unit}" if unit else d
strong.append(f"fabricated_number:{label}")
# ── Strong/Weak 2: query-answer intent alignment ──
query_content = _extract_content_tokens(query)
answer_content = _extract_content_tokens(answer)
if query_content:
missing_terms = query_content - answer_content
important_missing = [
t for t in missing_terms
if t not in GENERIC_TERMS and len(t) >= 2
]
if important_missing:
strong.append(
f"intent_misalignment:{','.join(important_missing[:3])}"
)
elif len(missing_terms) > len(query_content) * 0.5:
weak.append(
f"intent_misalignment_generic:"
f"missing({','.join(list(missing_terms)[:5])})"
)
# ── Weak 1: uncited claim ──
sentences = re.split(r'(?<=[.!?。])\s+', answer)
for s in sentences:
if len(s.strip()) > 20 and not re.search(r'\[\d+\]', s):
weak.append(f"uncited_claim:{s[:40]}")
# ── Weak: evidence 간 숫자 충돌 (Phase 3.5b) ──
conflicts = _check_evidence_numeric_conflicts(evidence)
weak.extend(conflicts)
# ── Weak 2: token overlap ──
answer_tokens = _extract_content_tokens(answer)
evidence_tokens = _extract_content_tokens(evidence_text)
if answer_tokens:
overlap = len(answer_tokens & evidence_tokens) / len(answer_tokens)
if overlap < 0.4:
weak.append(f"low_overlap:{overlap:.2f}")
if strong or weak:
logger.info(
"grounding query=%r strong=%d weak=%d flags=%s",
query[:60],
len(strong),
len(weak),
",".join(strong[:3] + weak[:3]),
)
return GroundingResult(strong, weak)
-105
View File
@@ -1,105 +0,0 @@
"""Refusal gate — multi-signal fusion (Phase 3.5a).
Score gate (deterministic) + classifier verdict (semantic, binary) 를 독립 평가 후 합성.
Classifier 부재 시 3-tier conservative fallback.
P1 실측 결과: exaone ternary 불안정 → binary (sufficient/insufficient) 로 축소.
"full" vs "partial" 구분은 grounding check (intent alignment) 가 담당.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from core.utils import setup_logger
if TYPE_CHECKING:
from .classifier_service import ClassifierResult
logger = setup_logger("refusal_gate")
# Placeholder thresholds — Phase 3.5b 에서 실측 기반 tuning
# AND 조건이라 false refusal 방어됨 (둘 다 만족해야 refuse)
SCORE_MAX_REFUSE = 0.25
SCORE_AGG_REFUSE = 0.70
# Conservative fallback tiers (classifier 부재 시)
CONSERVATIVE_WEAK = 0.35
CONSERVATIVE_MID = 0.55
@dataclass(slots=True)
class RefusalDecision:
refused: bool
confidence_cap: Literal["high", "medium", "low"] | None # None = no cap
rule_triggered: str | None # 디버깅: 어느 signal 이 결정에 기여?
def decide(
rerank_scores: list[float],
classifier: ClassifierResult | None,
) -> RefusalDecision:
"""Multi-signal fusion. Binary classifier verdict 기반.
Returns:
RefusalDecision. refused=True 이면 synthesis skip.
confidence_cap 은 synthesis 결과의 confidence 에 upper bound 적용.
"""
max_score = max(rerank_scores) if rerank_scores else 0.0
agg_top3 = sum(sorted(rerank_scores, reverse=True)[:3])
score_gate_fails = (
max_score < SCORE_MAX_REFUSE and agg_top3 < SCORE_AGG_REFUSE
)
# ── Classifier 사용 가능 (정상 경로) ──
if classifier and classifier.verdict is not None:
if classifier.verdict == "insufficient":
# Evidence quality override: classifier 가 insufficient 라 해도
# evidence 가 충분히 좋으면 override (토론 8라운드 합의)
# (evidence quality 는 이 함수 밖에서 별도 체크 — caller 에서 처리)
logger.info(
"refusal gate: classifier=insufficient max=%.2f agg=%.2f",
max_score, agg_top3,
)
return RefusalDecision(
refused=True,
confidence_cap=None,
rule_triggered="classifier_insufficient",
)
if score_gate_fails:
logger.info(
"refusal gate: score_low max=%.2f agg=%.2f classifier=%s",
max_score, agg_top3, classifier.verdict,
)
return RefusalDecision(
refused=True,
confidence_cap=None,
rule_triggered="score_low",
)
# Classifier says sufficient → proceed
return RefusalDecision(
refused=False,
confidence_cap=None,
rule_triggered=None,
)
# ── Classifier 부재 → 3-tier conservative ──
if max_score < CONSERVATIVE_WEAK:
return RefusalDecision(
refused=True,
confidence_cap=None,
rule_triggered="conservative_refuse(no_classifier)",
)
if max_score < CONSERVATIVE_MID:
return RefusalDecision(
refused=False,
confidence_cap="low",
rule_triggered="conservative_low(no_classifier)",
)
return RefusalDecision(
refused=False,
confidence_cap="medium",
rule_triggered="conservative_medium(no_classifier)",
)
-196
View File
@@ -1,196 +0,0 @@
"""Exaone semantic verifier (Phase 3.5b).
답변-근거 간 의미적 모순(contradiction) 감지. rule-based grounding_check 가 못 잡는
미묘한 모순 포착. classifier 와 동일 패턴: circuit breaker + timeout + fail open.
## Severity 3단계
- strong: direct_negation (완전 모순) → re-gate 교차 자격
- medium: numeric_conflict, intent_core_mismatch → confidence 하향 (누적 시 강제 low)
- weak: nuance, unsupported_claim → 로깅 + mild confidence 하향
## 핵심 원칙
- **Verifier strong 단독 refuse 금지** — grounding strong 과 교차해야 refuse
- **Timeout 3s** — 느리면 없는 게 낫다 (fail open)
- MLX gate 사용 (Mac mini 26B endpoint — classifier/evidence 와 동일 gate 공유, 동시 race 방지)
"""
from __future__ import annotations
import asyncio
import os
import time
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Literal
from ai.client import AIClient, _load_prompt, parse_json_response
from core.config import settings
from core.utils import setup_logger
from .llm_gate import Priority, acquire_mlx_gate
if TYPE_CHECKING:
from .evidence_service import EvidenceItem
logger = setup_logger("verifier")
LLM_TIMEOUT_MS = 10000 # 2026-05-17 B-3: 3s 시 동시 부하 시 verifier 빈발 skip → grounding 약화. Mac mini 26B 가 verifier-style 짧은 LLM call 도 concurrent 호출 시 3s 초과 빈번 — 10s 로 raise
CIRCUIT_THRESHOLD = 5
CIRCUIT_RECOVERY_SEC = 60
_failure_count = 0
_circuit_open_until: float | None = None
# Phase 3.5 B2: numeric_conflict severity promote 실험.
# import time 평가 — env 변경 후 process restart 필수 (docker compose restart fastapi).
# default=0 (off). production 적용은 B3 FP 검증 통과 후만.
_NUMERIC_PROMOTE = os.getenv("VERIFIER_NUMERIC_PROMOTE", "0") == "1"
# severity 매핑 (프롬프트 "critical"/"minor" → 코드 strong/medium/weak)
# Tier 4 (B2): _NUMERIC_PROMOTE=1 일 때 numeric_conflict critical → strong 으로 격상.
# minor 는 medium 유지 (FP 위험 분리).
_SEVERITY_MAP: dict[str, dict[str, Literal["strong", "medium", "weak"]]] = {
"direct_negation": {"critical": "strong", "minor": "strong"},
"numeric_conflict": (
{"critical": "strong", "minor": "medium"} if _NUMERIC_PROMOTE
else {"critical": "medium", "minor": "medium"}
),
"intent_core_mismatch": {"critical": "medium", "minor": "medium"},
"nuance": {"critical": "weak", "minor": "weak"},
"unsupported_claim": {"critical": "weak", "minor": "weak"},
}
@dataclass(slots=True)
class Contradiction:
"""개별 모순 발견."""
type: str # direct_negation / numeric_conflict / intent_core_mismatch / nuance / unsupported_claim
severity: Literal["strong", "medium", "weak"]
claim: str
evidence_ref: str
explanation: str
@dataclass(slots=True)
class VerifierResult:
status: Literal["ok", "timeout", "error", "circuit_open", "skipped"]
contradictions: list[Contradiction]
elapsed_ms: float
try:
VERIFIER_PROMPT = _load_prompt("verifier.txt")
except FileNotFoundError:
VERIFIER_PROMPT = ""
logger.warning("verifier.txt not found — verifier will always skip")
def _build_input(
answer: str,
evidence: list[EvidenceItem],
) -> str:
"""답변 + evidence spans → 프롬프트."""
spans = "\n\n".join(
f"[{e.n}] {(e.title or '').strip()}\n{e.span_text}"
for e in evidence
)
return (
VERIFIER_PROMPT
.replace("{answer}", answer)
.replace("{numbered_evidence}", spans)
)
def _map_severity(ctype: str, raw_severity: str) -> Literal["strong", "medium", "weak"]:
"""type + raw severity → 코드 severity 3단계."""
type_map = _SEVERITY_MAP.get(ctype, {"critical": "weak", "minor": "weak"})
return type_map.get(raw_severity, "weak")
async def verify(
query: str,
answer: str,
evidence: list[EvidenceItem],
) -> VerifierResult:
"""답변-근거 semantic 검증. Parallel with grounding_check.
Returns:
VerifierResult. status "ok" 이 아니면 contradictions 빈 리스트 (fail open).
"""
global _failure_count, _circuit_open_until
t_start = time.perf_counter()
if _circuit_open_until and time.time() < _circuit_open_until:
return VerifierResult("circuit_open", [], 0.0)
if not VERIFIER_PROMPT:
return VerifierResult("skipped", [], 0.0)
if not hasattr(settings.ai, "verifier") or settings.ai.verifier is None:
return VerifierResult("skipped", [], 0.0)
if not answer or not evidence:
return VerifierResult("skipped", [], 0.0)
prompt = _build_input(answer, evidence)
client = AIClient()
try:
async with acquire_mlx_gate(Priority.FOREGROUND):
async with asyncio.timeout(LLM_TIMEOUT_MS / 1000):
raw = await client.call_verifier(prompt)
_failure_count = 0
except asyncio.TimeoutError:
_failure_count += 1
if _failure_count >= CIRCUIT_THRESHOLD:
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
logger.error(f"verifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
logger.warning("verifier timeout")
return VerifierResult(
"timeout", [],
(time.perf_counter() - t_start) * 1000,
)
except Exception as e:
_failure_count += 1
if _failure_count >= CIRCUIT_THRESHOLD:
_circuit_open_until = time.time() + CIRCUIT_RECOVERY_SEC
logger.error(f"verifier circuit OPEN for {CIRCUIT_RECOVERY_SEC}s")
logger.warning(f"verifier error: {e}")
return VerifierResult(
"error", [],
(time.perf_counter() - t_start) * 1000,
)
finally:
await client.close()
elapsed_ms = (time.perf_counter() - t_start) * 1000
parsed = parse_json_response(raw)
if not isinstance(parsed, dict):
logger.warning("verifier parse failed raw=%r", (raw or "")[:200])
return VerifierResult("error", [], elapsed_ms)
# contradiction 파싱
raw_items = parsed.get("contradictions") or []
if not isinstance(raw_items, list):
raw_items = []
results: list[Contradiction] = []
for item in raw_items[:5]:
if not isinstance(item, dict):
continue
ctype = item.get("type", "")
if ctype not in _SEVERITY_MAP:
ctype = "unsupported_claim"
raw_sev = item.get("severity", "minor")
severity = _map_severity(ctype, raw_sev)
claim = str(item.get("claim", ""))[:50]
ev_ref = str(item.get("evidence_ref", ""))[:50]
explanation = str(item.get("explanation", ""))[:30]
results.append(Contradiction(ctype, severity, claim, ev_ref, explanation))
logger.info(
"verifier ok query=%r contradictions=%d strong=%d medium=%d elapsed_ms=%.0f",
query[:60],
len(results),
sum(1 for c in results if c.severity == "strong"),
sum(1 for c in results if c.severity == "medium"),
elapsed_ms,
)
return VerifierResult("ok", results, elapsed_ms)
+12 -12
View File
@@ -68,10 +68,10 @@ async def enqueue_question_publish(session: AsyncSession, q: Any) -> None:
await enqueue_publish(session, kind=KIND_EXPLANATION, source_id=q.id, payload=expl)
async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]:
"""active(미삭제) 문항을 id>after_id 부터 bounded 로 outbox 적재.
반환 = enqueue 한 문항 수(0 이면 끝). 큰 셋은 마지막 id 로 페이지 반복. caller commit.
반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. caller commit.
"""
rows = (
await session.execute(
@@ -83,7 +83,7 @@ async def backfill_publish_questions(session: AsyncSession, *, after_id: int = 0
).scalars().all()
for q in rows:
await enqueue_question_publish(session, q)
return len(rows)
return len(rows), (rows[-1].id if rows else after_id)
async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None:
@@ -91,10 +91,10 @@ async def enqueue_topic_publish(session: AsyncSession, topic: Any) -> None:
await enqueue_publish(session, kind=KIND_TOPIC, source_id=topic.id, payload=project_topic(topic))
async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]:
"""active(미삭제) 주제를 id>after_id 부터 bounded 로 outbox 적재(S-1 초기 백필).
반환 = enqueue 한 주제 수(0 이면 끝). 큰 셋은 마지막 id 로 페이지 반복. caller commit.
반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. caller commit.
멱등 = 발행 워커의 (payload_hash, deleted) 디둡이 no-op 재투영 흡수(중복 enqueue 무해).
"""
rows = (
@@ -107,7 +107,7 @@ async def backfill_publish_topics(session: AsyncSession, *, after_id: int = 0, l
).scalars().all()
for t in rows:
await enqueue_topic_publish(session, t)
return len(rows)
return len(rows), (rows[-1].id if rows else after_id)
async def enqueue_card_publish(session: AsyncSession, card: Any) -> None:
@@ -123,10 +123,10 @@ async def enqueue_card_publish(session: AsyncSession, card: Any) -> None:
await enqueue_publish(session, kind=KIND_CARD, source_id=card.id, payload=project_card(card))
async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]:
"""검수완료(needs_review=False)·미삭제 카드를 id>after_id 부터 bounded 로 outbox 적재(S-2 초기 백필).
반환 = enqueue 한 카드 수(0 이면 끝). 멱등 = 워커 (payload_hash, deleted) 디둡. caller commit.
반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. 멱등 = 워커 디둡. caller commit.
"""
rows = (
await session.execute(
@@ -142,7 +142,7 @@ async def backfill_publish_cards(session: AsyncSession, *, after_id: int = 0, li
).scalars().all()
for c in rows:
await enqueue_card_publish(session, c)
return len(rows)
return len(rows), (rows[-1].id if rows else after_id)
async def enqueue_card_progress_publish(session: AsyncSession, progress: Any) -> None:
@@ -155,11 +155,11 @@ async def enqueue_card_progress_publish(session: AsyncSession, progress: Any) ->
)
async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> int:
async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int = 0, limit: int = 200) -> tuple[int, int]:
"""모든 card progress row 를 id>after_id 부터 bounded 로 outbox 적재(S-4 초기 백필).
★필터 없음 = ALL row(due_at NULL sentinel·terminal 포함) — due-only 백필은 sentinel 누락.
반환 = enqueue 한 row 수(0 이면 끝). 멱등 = 워커 디둡. caller commit.
반환 = (enqueue 수, 마지막 처리 id). caller 는 수==limit 면 last_id 로 다음 페이지. 멱등 = 워커 디둡. caller commit.
"""
rows = (
await session.execute(
@@ -171,4 +171,4 @@ async def backfill_publish_card_progress(session: AsyncSession, *, after_id: int
).scalars().all()
for p in rows:
await enqueue_card_progress_publish(session, p)
return len(rows)
return len(rows), (rows[-1].id if rows else after_id)
+4
View File
@@ -20,6 +20,7 @@ from models.chunk import DocumentChunk
from models.document import Document
from models.study_question import StudyQuestion
from models.study_topic import StudyTopicDocument
from services.search.license_filter import restricted_exclude_orm
logger = logging.getLogger(__name__)
@@ -113,6 +114,9 @@ async def _gather_document_evidence(
select(Document.id, Document.title, Document.ai_summary).where(
Document.id.in_(doc_ids),
Document.deleted_at.is_(None),
# B-4: licensed_restricted 제외 — explanation_rag 와 동일 술어(a안 U-2①). 누락 시
# 구매 자료 verbatim 이 분야노트 RAG 로 새던 보안 drift(복제 과정 누락).
restricted_exclude_orm(),
)
)
).all()
+21 -12
View File
@@ -90,6 +90,7 @@ HARD_ESCALATE_REASONS = {
class TriageOutput(BaseModel):
"""p3a_short_summary (4B) 응답 스키마. 파싱 실패 시 기본값 + escalate=True fallback."""
ai_summary: str = "" # B-1 3→2: triage 가 ai_summary 도 생산 (별 summarize 콜 대체)
tldr: str = ""
bullets: list[str] = Field(default_factory=list)
tags: list[str] = Field(default_factory=list)
@@ -579,16 +580,7 @@ async def process(
"reason": "classify pipeline",
}
# ─── 2. Legacy 요약 (primary 또는 deep) ───
try:
summary = await client.summarize(doc.extracted_text[:50000], cfg=legacy_cfg)
except Exception as exc:
if legacy_cfg is not None and is_deferrable_error(exc):
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
raise
doc.ai_summary = strip_thinking(summary)
# ─── 메타데이터 (legacy 완료) — 실제 처리 머신 귀속 (drain=qwen-macbook) ───
# ─── 메타데이터 (classify 완료) — 실제 처리 머신 귀속 (drain=qwen-macbook) ───
doc.ai_model_version = (legacy_cfg or settings.ai.primary).model
doc.ai_processed_at = datetime.now(timezone.utc)
@@ -598,13 +590,27 @@ async def process(
f"confidence={doc.ai_confidence:.2f}, tags={doc.ai_tags}"
)
# ─── 3. PR-B B-1 — tier triage (4B, 실패는 legacy 결과 보존) ───
# ─── 2+3 통합 (B-1 3→2): tier triage 가 tldr/bullets/tier + ai_summary 생산.
# 기존 별도 summarize 콜 제거 → 본문 prefill 1회 절감 (Mac mini 부하). 실패는 fallback.
try:
await _run_tier_triage(client, doc, session, use_deep=use_deep)
except StageDeferred:
raise # 보류는 실패가 아님 — drain/consumer 가 attempts 미소모 처리
except Exception as exc:
logger.exception(f"[triage] id={document_id} 전체 실패 — legacy 유지: {exc}")
logger.exception(f"[triage] id={document_id} 전체 실패: {exc}")
# ─── ai_summary fallback: triage 가 못 채운 경우만 summarize ───
# (>120K long_context 는 triage 가 LLM skip, 또는 triage 파싱실패). 정상 경로는 미발동.
if not doc.ai_summary:
try:
summary = await client.summarize(doc.extracted_text[:50000], cfg=legacy_cfg)
doc.ai_summary = strip_thinking(summary)
except Exception as exc:
if legacy_cfg is not None and is_deferrable_error(exc):
raise StageDeferred(f"macbook_unavailable:{type(exc).__name__}") from exc
# ai_summary=NULL 로 완료되면 digest/briefing 이 조용히 제외 → ERROR 로 가시화
# (best-effort 강등 자체는 유지, 운영 추적성만 보강).
logger.error(f"[summary-fallback] id={document_id} ai_summary 미생성: {exc}")
finally:
await client.close()
@@ -774,6 +780,9 @@ async def _apply_triage_result(
if not parse_error:
doc.ai_tldr = (triage_out.tldr or "").strip() or None
doc.ai_bullets = triage_out.bullets or []
# B-1 3→2: triage 가 ai_summary 도 생산(summarize 콜 대체). 비면 process() 가 fallback.
if triage_out.ai_summary.strip():
doc.ai_summary = triage_out.ai_summary.strip()
# Memo Intake Upgrade PR-2B — event kind hint (4B 가 출력했을 때만)
# 허용 enum 외 값이면 무시 (DB enum 제약). AI worker 는 events row 직접 생성 X.
valid_kinds = {"note", "task", "calendar_event", "activity_log", "reference"}
+5 -2
View File
@@ -140,7 +140,8 @@ async def _download_pdf(url: str, dest: Path) -> int:
if len(resp.content) > _MAX_PDF_BYTES:
raise FeedError(f"PDF 크기 초과 ({len(resp.content)} bytes): {url}")
dest.parent.mkdir(parents=True, exist_ok=True)
dest.write_bytes(resp.content)
# 최대 50MB PDF write 는 동기 blocking — 이벤트루프 점유 회피 to_thread (R5 동형).
await asyncio.to_thread(dest.write_bytes, resp.content)
return len(resp.content)
@@ -190,9 +191,11 @@ async def _ingest_pdf(session, page_slug: str, pdf_url: str) -> bool:
dest = Path(settings.nas_mount_path) / rel_path
size = await _download_pdf(pdf_url, dest)
# 50MB PDF read + sha256 는 동기 blocking(I/O+CPU) — 이벤트루프 점유 회피 to_thread (R5 동형).
file_hash = await asyncio.to_thread(lambda: hashlib.sha256(dest.read_bytes()).hexdigest())
doc = Document(
file_path=rel_path,
file_hash=hashlib.sha256(dest.read_bytes()).hexdigest(),
file_hash=file_hash,
file_format="pdf",
file_size=size,
file_type="immutable",
+96 -82
View File
@@ -251,104 +251,118 @@ async def watch_inbox():
for extra_path in settings.additional_watch_targets:
targets.append((extra_path, "library"))
async with async_session() as session:
# ─── Web/ 트랙 (devonagent) — DEVONthink Smart Rule 이 떨군 .html 만 진입 ───
if web_root.exists():
# rglob NFS 디렉토리 walk(blocking stat 다발)를 off-thread 로 수집 (R5).
for file_path in await asyncio.to_thread(lambda: list(web_root.rglob("*.html"))):
if not file_path.is_file() or should_skip(file_path):
continue
rel_path = str(file_path.relative_to(nas_root))
added, _ = await _ingest_web_file(session, file_path, rel_path)
# 파일별 독립 세션+commit 으로 격리 — 한 파일 실패(예: rglob↔stat 사이 삭제로 FileNotFoundError,
# flush 오류)가 watch_inbox 전체를 raise·롤백해 그 사이클 등록분을 모두 잃거나, 결정적 poison
# 파일이 매 사이클 같은 지점에서 중단시키는 것을 차단 (news_collector/csb_collector 와 동형).
# ─── Web/ 트랙 (devonagent) — DEVONthink Smart Rule 이 떨군 .html 만 진입 ───
if web_root.exists():
# rglob NFS 디렉토리 walk(blocking stat 다발)를 off-thread 로 수집 (R5).
for file_path in await asyncio.to_thread(lambda: list(web_root.rglob("*.html"))):
if not file_path.is_file() or should_skip(file_path):
continue
rel_path = str(file_path.relative_to(nas_root))
try:
async with async_session() as session:
added, _ = await _ingest_web_file(session, file_path, rel_path)
await session.commit()
new_count += added
# ─── PKM 트랙 (기존 drive_sync) ─────────────────────────────────────────
for sub, expected_category in targets:
scan_root = pkm_root / sub
if not scan_root.exists():
except Exception as e:
logger.warning("[Web] 파일 처리 실패 skip path=%s: %s", rel_path, e)
continue
# 안전 자료실 A-2/B-4 — 타깃 폴더 기반 (material, jurisdiction, license)
target_mt, target_jur, target_license = _TARGET_AXIS.get(
Path(sub).name, (None, None, None)
# ─── PKM 트랙 (기존 drive_sync) ─────────────────────────────────────────
for sub, expected_category in targets:
scan_root = pkm_root / sub
if not scan_root.exists():
continue
# 안전 자료실 A-2/B-4 — 타깃 폴더 기반 (material, jurisdiction, license)
target_mt, target_jur, target_license = _TARGET_AXIS.get(
Path(sub).name, (None, None, None)
)
# NFS 디렉토리 walk(blocking) off-thread 수집 (R5).
for file_path in await asyncio.to_thread(lambda: list(scan_root.rglob("*"))):
if not file_path.is_file() or should_skip(file_path):
continue
category, needs_conversion, next_stage = _route_media(
file_path, expected_category
)
# NFS 디렉토리 walk(blocking) off-thread 수집 (R5).
for file_path in await asyncio.to_thread(lambda: list(scan_root.rglob("*"))):
if not file_path.is_file() or should_skip(file_path):
continue
# audio/video 폴더에 엉뚱한 확장자가 들어왔거나 Inbox 에
# audio/video 가 잘못 떨어진 경우 — 이 라운드에서 아예 skip
if category is None and next_stage is None:
continue
category, needs_conversion, next_stage = _route_media(
file_path, expected_category
)
# audio/video 폴더에 엉뚱한 확장자가 들어왔거나 Inbox 에
# audio/video 가 잘못 떨어진 경우 — 이 라운드에서 아예 skip
if category is None and next_stage is None:
continue
rel_path = str(file_path.relative_to(nas_root))
rel_path = str(file_path.relative_to(nas_root))
try:
# GB 파일 SHA-256 은 이벤트 루프를 점유 → 같은 루프의 모든 1분 주기 consumer
# + FastAPI 요청이 수십초~분 동시 정지. to_thread 오프로드. 스캔 루프가 이미
# 순차라 file_hash 는 한 번에 하나만 실행(직렬화) — 병렬 해싱 X = NFS 2.5GbE
# 대역폭·버퍼 메모리 blowup 방지 (R5).
# 대역폭·버퍼 메모리 blowup 방지 (R5). 세션 밖에서 계산(커넥션 미점유).
fhash = await asyncio.to_thread(file_hash, file_path)
result = await session.execute(
select(Document).where(Document.file_path == rel_path)
)
existing = result.scalar_one_or_none()
if existing is None:
ext = file_path.suffix.lstrip(".").lower() or "unknown"
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format=ext,
file_size=file_path.stat().st_size,
file_type="immutable",
title=file_path.stem,
source_channel="drive_sync",
category=category,
needs_conversion=needs_conversion,
# 안전 자료실 A-2/B-4 — watch 타깃 매핑 (KGS=law/KR 등, 비대상=NULL)
material_type=target_mt,
jurisdiction=target_jur,
async with async_session() as session:
result = await session.execute(
select(Document).where(Document.file_path == rel_path)
)
# B-4 — 타깃 폴더 license 주입(restricted 포함, 비대상=미주입). classify 는
# material_type IS NULL 일 때만 제안 + extract_meta 미기록이라 주입 보존.
if target_license:
doc.extract_meta = {"license": dict(target_license)}
session.add(doc)
await session.flush()
existing = result.scalar_one_or_none()
if next_stage:
await enqueue_stage(session, doc.id, next_stage)
new_count += 1
if existing is None:
ext = file_path.suffix.lstrip(".").lower() or "unknown"
doc = Document(
file_path=rel_path,
file_hash=fhash,
file_format=ext,
file_size=file_path.stat().st_size,
file_type="immutable",
title=file_path.stem,
source_channel="drive_sync",
category=category,
needs_conversion=needs_conversion,
# 안전 자료실 A-2/B-4 — watch 타깃 매핑 (KGS=law/KR 등, 비대상=NULL)
material_type=target_mt,
jurisdiction=target_jur,
)
# B-4 — 타깃 폴더 license 주입(restricted 포함, 비대상=미주입). classify 는
# material_type IS NULL 일 때만 제안 + extract_meta 미기록이라 주입 보존.
if target_license:
doc.extract_meta = {"license": dict(target_license)}
session.add(doc)
await session.flush()
elif existing.file_hash != fhash:
existing.file_hash = fhash
existing.file_size = file_path.stat().st_size
# 기존 문서에 category/quarantine flag 가 비어있으면 보정
if existing.category is None and category is not None:
existing.category = category
if needs_conversion and not getattr(existing, "needs_conversion", False):
existing.needs_conversion = True
# B-4 — 축/license 보정(B-4 이전 적재분이 재변경 시): material 미설정 시 주입,
# license 부재 시에만 merge 주입(clobber 회피 — 기존 extract_meta 키 보존).
if existing.material_type is None and target_mt is not None:
existing.material_type = target_mt
existing.jurisdiction = target_jur
if target_license and not (existing.extract_meta or {}).get("license"):
meta = dict(existing.extract_meta or {})
meta["license"] = dict(target_license)
existing.extract_meta = meta
if next_stage:
await enqueue_stage(session, doc.id, next_stage)
await session.commit()
new_count += 1
if next_stage:
await enqueue_stage(session, existing.id, next_stage)
changed_count += 1
elif existing.file_hash != fhash:
existing.file_hash = fhash
existing.file_size = file_path.stat().st_size
# 기존 문서에 category/quarantine flag 가 비어있으면 보정
if existing.category is None and category is not None:
existing.category = category
if needs_conversion and not getattr(existing, "needs_conversion", False):
existing.needs_conversion = True
# B-4 — 축/license 보정(B-4 이전 적재분이 재변경 시): material 미설정 시 주입,
# license 부재 시에만 merge 주입(clobber 회피 — 기존 extract_meta 키 보존).
if existing.material_type is None and target_mt is not None:
existing.material_type = target_mt
existing.jurisdiction = target_jur
if target_license and not (existing.extract_meta or {}).get("license"):
meta = dict(existing.extract_meta or {})
meta["license"] = dict(target_license)
existing.extract_meta = meta
await session.commit()
if next_stage:
await enqueue_stage(session, existing.id, next_stage)
await session.commit()
changed_count += 1
# else: 무변경 → 쓰기 없음 (세션 자동 닫힘, commit 불요)
except Exception as e:
logger.warning("[PKM] 파일 처리 실패 skip path=%s: %s", rel_path, e)
continue
if new_count or changed_count:
logger.info(f"[Inbox+§3] 새 파일 {new_count}건, 변경 파일 {changed_count}건 등록")
+5
View File
@@ -300,6 +300,11 @@ async def _process_single(
f"[marker] transient error id={document_id} kind={type(exc).__name__}: {exc}"
)
raise
except json.JSONDecodeError as exc:
# 200 응답의 truncated/malformed body — 연결 흔들림 등 transient. _fail(non-retryable)
# 로 박지 말고 raise → queue retry (max_attempts 까지). 진짜 손상이면 재시도 후 failed.
logger.warning(f"[marker] malformed json body (200) id={document_id}: {exc}")
raise
except Exception as exc:
logger.exception(f"[marker] unexpected error id={document_id}: {exc}")
await _fail(session, document_id, str(exc)[:1000])
+9 -3
View File
@@ -497,12 +497,18 @@ async def process(document_id: int, session: AsyncSession) -> None:
logger.warning(f"[presegment] id={document_id} file not found ({raw}) → extract")
return
import asyncio
import fitz # PyMuPDF — extract_worker/marker_worker 와 동일 의존
def _read_toc(path: str):
# fitz open/get_toc 는 동기 blocking — live 스테이지라 이벤트루프(같은 루프의 1분 consumer +
# FastAPI 요청) 점유 회피 위해 to_thread 오프로드(거대/손상 PDF 파싱 수백 ms~초).
with fitz.open(path) as pdf:
return pdf.page_count, (pdf.get_toc(simple=True) or [])
try:
with fitz.open(str(source)) as pdf:
page_count = pdf.page_count
toc = pdf.get_toc(simple=True) or []
page_count, toc = await asyncio.to_thread(_read_toc, str(source))
except Exception as exc:
# PDF 손상 등 — 분할 불가. 단일문서로 통과(extract 가 PyMuPDF/OCR 로 재시도하며 가시화).
logger.warning(
+79 -46
View File
@@ -28,6 +28,8 @@ logger = setup_logger("study_publish_worker")
BATCH_SIZE = 500
# pg_advisory_xact_lock 전역 단일 라이터 키(발행 워커 전용 임의 상수, 타 advisory 락과 비충돌).
ADVISORY_LOCK_KEY = 838201
# 행별 격리 재시도 상한 — 초과 시 failed_at 스탬프(terminal)로 select 에서 제외.
MAX_OUTBOX_ATTEMPTS = 5
async def consume_publish_outbox() -> None:
@@ -46,11 +48,15 @@ async def consume_publish_outbox() -> None:
max_rev = int(
(await session.execute(select(func.coalesce(func.max(Published.rev), 0)))).scalar() or 0
)
# 3) 미처리 outbox 를 커밋순(id)으로.
# 3) 미처리 outbox 를 커밋순(id)으로. failed_at(terminal) 은 제외 — poison 행이
# head-of-line 을 영구 점유하지 않게 함.
rows = (
await session.execute(
select(PublishOutbox)
.where(PublishOutbox.processed_at.is_(None))
.where(
PublishOutbox.processed_at.is_(None),
PublishOutbox.failed_at.is_(None),
)
.order_by(PublishOutbox.id.asc())
.limit(BATCH_SIZE)
)
@@ -60,59 +66,86 @@ async def consume_publish_outbox() -> None:
now = datetime.now(timezone.utc)
published_count = 0
failed_count = 0
for ob in rows:
existing = (
await session.execute(
select(Published).where(
Published.kind == ob.kind,
Published.source_id == ob.source_id,
)
)
).scalar_one_or_none()
try:
# 행 단위 savepoint 격리 — 한 행의 예외가 배치 전체(앞 행 processed_at 포함)를
# 롤백해 poison 행이 다음 사이클에 다시 최저 id 로 선택되는 무한 재선택을 차단.
async with session.begin_nested():
existing = (
await session.execute(
select(Published).where(
Published.kind == ob.kind,
Published.source_id == ob.source_id,
)
)
).scalar_one_or_none()
# (payload_hash, deleted) 디둡 — no-op 재투영은 rev 안 올림.
if (
existing is not None
and existing.payload_hash == ob.payload_hash
and existing.deleted == ob.deleted
):
ob.processed_at = now
# (payload_hash, deleted) 디둡 — no-op 재투영은 rev 안 올림.
is_noop = (
existing is not None
and existing.payload_hash == ob.payload_hash
and existing.deleted == ob.deleted
)
if is_noop:
ob.processed_at = now
else:
new_rev = max_rev + 1
if existing is None:
session.add(
Published(
kind=ob.kind,
source_id=ob.source_id,
pub_id=uuid.uuid4().hex,
payload=ob.payload,
payload_hash=ob.payload_hash,
schema_version=ob.schema_version,
rev=new_rev,
deleted=ob.deleted,
created_at=now,
updated_at=now,
)
)
else:
existing.payload = ob.payload
existing.payload_hash = ob.payload_hash
existing.schema_version = ob.schema_version
existing.deleted = ob.deleted
existing.rev = new_rev
existing.updated_at = now
ob.processed_at = now
# 배치 내 동일 (kind, source_id) 후속 행이 직전 반영을 보도록 flush(최신 승).
await session.flush()
except Exception as row_err:
# savepoint 롤백 = 이 행의 쓰기(processed_at 포함) 취소. attempts/failed_at 만
# 바깥 트랜잭션에 누적돼 최종 commit 으로 영속(영구 재선택 방지).
ob.attempts = (ob.attempts or 0) + 1
if ob.attempts >= MAX_OUTBOX_ATTEMPTS:
ob.failed_at = now
failed_count += 1
logger.error(
"publish_outbox_row_terminal id=%s kind=%s source_id=%s attempts=%s: %s",
ob.id, ob.kind, ob.source_id, ob.attempts, row_err,
)
else:
logger.warning(
"publish_outbox_row_retry id=%s kind=%s source_id=%s attempts=%s: %s",
ob.id, ob.kind, ob.source_id, ob.attempts, row_err,
)
continue
max_rev += 1
if existing is None:
session.add(
Published(
kind=ob.kind,
source_id=ob.source_id,
pub_id=uuid.uuid4().hex,
payload=ob.payload,
payload_hash=ob.payload_hash,
schema_version=ob.schema_version,
rev=max_rev,
deleted=ob.deleted,
created_at=now,
updated_at=now,
)
)
else:
existing.payload = ob.payload
existing.payload_hash = ob.payload_hash
existing.schema_version = ob.schema_version
existing.deleted = ob.deleted
existing.rev = max_rev
existing.updated_at = now
ob.processed_at = now
# 배치 내 동일 (kind, source_id) 후속 행이 직전 반영을 보도록 flush(최신 승).
await session.flush()
published_count += 1
# savepoint 커밋 성공 시에만 rev 카운터 전진(실패 행은 rev 미소모 → 드물게 gap,
# 단일 라이터·커밋순 부여라 viewer since-rev 증분 동기 정합엔 무해).
if not is_noop:
max_rev = new_rev
published_count += 1
await session.commit()
logger.info(
"publish_outbox_drained scanned=%s published=%s max_rev=%s",
"publish_outbox_drained scanned=%s published=%s failed=%s max_rev=%s",
len(rows),
published_count,
failed_count,
max_rev,
)
except Exception as e:
+5 -15
View File
@@ -30,21 +30,11 @@ ai:
repetition_penalty: 1.05 # 한국어 장문 반복/코드스위칭(CJK·라틴 누수) 억제 (보수적 시작값)
top_k: 20 # Qwen3 권장
# deep: 야간 night-drain 전용 — 맥북 M5 Max Qwen3.6-27B-6bit (llm-router :8890 경유,
# model=qwen-macbook alias). 2026-06-11 재도입 (사용자: 자기 전 night-drain 으로 백로그 분담).
# 맥북 불가(503/연결/절단) = StageDeferred 보류 — 맥미니/cloud 강등 없음, attempts 미소모.
# consumer 의 deep_summary 도 슬롯 존재 시 맥북 경유 (잠들어 있으면 30분 백오프 보류 = 무해).
# 슬롯 제거 시 deep_summary 는 primary(맥미니) 경로 복귀.
deep:
endpoint: "http://100.76.254.116:8890/v1/chat/completions"
model: "qwen-macbook"
max_tokens: 8192
timeout: 900
context_char_limit: 260000
temperature: 0.3
top_p: 0.9
repetition_penalty: 1.05 # 한국어 장문 반복/코드스위칭 억제 (보수적 시작값)
top_k: 20
# deep: ★2026-06-29 잠정 보류 (사용자 "맥북 night-drain 의미없어 → 맥미니 일원화").
# 슬롯 제거 → deep_summary 가 primary(맥미니) 경로 복귀 + use_deep/drain 도 맥미니 폴백
# (맥북 라우팅 0). drain-keeper(GPU cron)도 비활성. 맥북 mlx-vlm-server 는 OpenCode 로컬용 보존.
# 복원(night-drain 재개 시): git history 에서 deep 슬롯(qwen-macbook :8890, max_tokens 8192,
# timeout 900, context_char_limit 260000, temp 0.3 / top_p 0.9 / rep 1.05 / top_k 20) 부활 + drain-keeper 재활성.
# fallback: primary 장애 시 최후 방어선. Claude Sonnet 4 API (소액 한도, 자동 trigger).
# 호출 빈도 낮음 가정 (Mac mini 가 거의 항상 up) → premium 과 budget 공유 OK.
+7 -1
View File
@@ -3,7 +3,13 @@ services:
image: pgvector/pgvector:pg16
volumes:
- pgdata:/var/lib/postgresql/data
- ./migrations:/docker-entrypoint-initdb.d
# ★ 2026-06-29 fresh-DB/DR 부팅 fix: initdb.d 마운트 제거(기존 `./migrations:/docker-entrypoint-initdb.d`).
# 빈 볼륨 첫 기동 시 postgres 엔트리포인트가 migrations/*.sql(001~) 을 psql autocommit 으로 실행해
# 스키마는 만들되 schema_migrations 스탬프는 안 남김(runner 만 생성) → fastapi init_db 가 documents
# 존재로 'fresh' 를 오판해 baseline(_load_baseline_if_fresh) 로드를 건너뛰고, 빈 schema_migrations
# 로 001 부터 재replay → `CREATE TABLE users`(IF NOT EXISTS 없음) 충돌 → 부팅 크래시(DR/신규환경).
# fresh-boot 은 init_db 의 baseline 적재 + migration runner 단일 경로로 일원화(설계 의도). 기존 prod
# 볼륨은 비어있지 않아 init scripts 가 애초에 미발동 → 무영향.
environment:
POSTGRES_DB: pkm
POSTGRES_USER: pkm
+1 -1
View File
@@ -1094,7 +1094,7 @@ services:
image: pgvector/pgvector:pg16
volumes:
- pgdata:/var/lib/postgresql/data
- ./migrations:/docker-entrypoint-initdb.d
# initdb.d 마운트 제거(2026-06-29): fresh-boot 은 fastapi init_db+baseline 단일 경로.
environment:
POSTGRES_DB: pkm
POSTGRES_USER: pkm
+2 -2
View File
@@ -71,7 +71,7 @@ GPU 서버의 NFS mount (`/proc/mounts` 실측):
| 컨테이너 | 마운트 | 모드 | 비고 |
|---|---|---|---|
| postgres | `pgdata:/var/lib/postgresql/data` + `./migrations:/docker-entrypoint-initdb.d` | rw | DB 본체 named volume |
| postgres | `pgdata:/var/lib/postgresql/data` | rw | DB 본체 named volume (initdb.d 마운트는 2026-06-29 제거 — 아래 관찰) |
| kordoc-service | `${NAS}/Document_Server:/documents` | **ro** | PDF/HWP parse |
| ocr-service | `${NAS}/Document_Server:/documents` + `ocr_models:/root/.cache` | **ro** + rw | |
| marker-service | `${NAS}/Document_Server:/documents` + `marker_models:/models` | **ro** + rw | PDF→markdown |
@@ -84,7 +84,7 @@ GPU 서버의 NFS mount (`/proc/mounts` 실측):
**관찰**:
- worker 컨테이너 (kordoc/ocr/marker/stt) 는 모두 NAS **read-only** 마운트 → 원본 안전.
- fastapi 만 NAS **rw** → 업로드/preview/extracted_images 쓰기 단일 책임.
- `./migrations` 이 postgres 의 `docker-entrypoint-initdb.d` fastapi 의 `/app/migrations` 양쪽에 마운트. 단 실제 migration runner 는 fastapi `init_db()` 만 사용 (postgres init scripts 는 첫 생성 시만 실행 → 효과 X, 안전).
- `./migrations` fastapi 의 `/app/migrations` 마운트. migration runner 는 fastapi `init_db()` 단일 경로. (~2026-06-29: postgres `docker-entrypoint-initdb.d` 마운트 제거. 기존엔 "첫 생성 시만 실행 → 효과 X" 로 봤으나, 빈 볼륨 첫 기동 시 postgres 가 migrations/*.sql 을 실제 실행해 스키마는 만들되 schema_migrations 스탬프를 안 남겨 → init_db 의 baseline fresh 판정을 깨고 부팅 크래시 유발. fresh-DB/DR 부팅을 init_db+baseline 단일 경로로 일원화.)
## 정책 정리
@@ -1,160 +0,0 @@
<!--
AskAnswerCard.svelte — 검색 결과 페이지 상단 AI 답변 카드 (컴팩트).
/ask 페이지의 AskAnswer.svelte와 달리, 검색 결과를 가리지 않는
보조 영역으로 설계. 출처 목록 클릭이 must-have, 본문 [n] 클릭은 nice-to-have.
-->
<script lang="ts">
import Badge from '$lib/components/ui/Badge.svelte';
import Skeleton from '$lib/components/ui/Skeleton.svelte';
import { Sparkles, X, FileText } from 'lucide-svelte';
import type { AskResponse, Confidence } from '$lib/types/ask';
interface Props {
data: AskResponse | null;
loading: boolean;
error: boolean;
onCitationClick: (docId: number) => void;
onDismiss: () => void;
}
let { data, loading, error, onCitationClick, onDismiss }: Props = $props();
// [n] 파싱 (AskAnswer.svelte에서 가져옴)
type Token =
| { type: 'text'; value: string }
| { type: 'cite'; n: number; raw: string };
function splitAnswer(text: string): Token[] {
return text
.split(/(\[\d+\])/g)
.filter(Boolean)
.map((tok): Token => {
const m = tok.match(/^\[(\d+)\]$/);
return m
? { type: 'cite', n: Number(m[1]), raw: tok }
: { type: 'text', value: tok };
});
}
function confidenceTone(c: Confidence | null): 'success' | 'warning' | 'error' | 'neutral' {
if (c === 'high') return 'success';
if (c === 'medium') return 'warning';
if (c === 'low') return 'error';
return 'neutral';
}
function confidenceLabel(c: Confidence | null): string {
if (c === 'high') return '높음';
if (c === 'medium') return '중간';
if (c === 'low') return '낮음';
return '';
}
let tokens = $derived(data?.ai_answer ? splitAnswer(data.ai_answer) : []);
// 출처 중복 제거 (같은 doc_id)
let uniqueCitations = $derived.by(() => {
if (!data?.citations?.length) return [];
const seen = new Set<number>();
return data.citations.filter((c) => {
if (seen.has(c.doc_id)) return false;
seen.add(c.doc_id);
return true;
});
});
</script>
<div class="bg-surface border border-default rounded-card p-4">
<!-- 헤더 -->
<div class="flex items-center justify-between gap-2 mb-2">
<div class="flex items-center gap-1.5">
<Sparkles size={12} class="text-accent" />
<span class="text-[10px] font-semibold tracking-wider uppercase text-dim">
내 자료 기준 답변
</span>
{#if data?.confidence}
<Badge tone={confidenceTone(data.confidence)} size="sm">
신뢰도 {confidenceLabel(data.confidence)}
</Badge>
{/if}
{#if data?.completeness === 'partial'}
<Badge tone="warning" size="sm">일부 답변</Badge>
{/if}
</div>
<button
type="button"
onclick={onDismiss}
class="p-0.5 rounded text-dim hover:text-text transition-colors"
aria-label="답변 카드 접기"
>
<X size={14} />
</button>
</div>
<!-- 본문 -->
{#if loading}
<div class="space-y-2">
<Skeleton w="w-full" h="h-3" />
<Skeleton w="w-4/5" h="h-3" />
</div>
<p class="mt-3 text-[10px] text-dim flex items-center gap-1.5">
<span class="inline-block w-2.5 h-2.5 rounded-full border-2 border-dim border-t-accent animate-spin"></span>
근거 기반 답변 생성 중…
</p>
{:else if error}
<p class="text-xs text-dim">답변을 가져오지 못했습니다.</p>
{:else if data?.ai_answer}
<!-- 답변 텍스트 -->
<div class="text-sm leading-6 text-text">
{#each tokens as tok}
{#if tok.type === 'cite'}
{@const citation = data?.citations?.find((c) => c.n === tok.n)}
{#if citation}
<button
type="button"
class="inline text-accent font-semibold hover:underline px-0.5"
onclick={() => onCitationClick(citation.doc_id)}
title={citation.title || `문서 #${citation.doc_id}`}
>
{tok.raw}
</button>
{:else}
<span class="text-dim">{tok.raw}</span>
{/if}
{:else}
<span>{tok.value}</span>
{/if}
{/each}
</div>
<!-- partial: 누락 측면 -->
{#if data.completeness === 'partial' && data.missing_aspects?.length}
<p class="mt-2 text-[10px] text-dim">
다루지 못한 부분: {data.missing_aspects.join(', ')}
</p>
{/if}
<!-- 출처 목록 (must-have) -->
{#if uniqueCitations.length > 0}
<div class="mt-3 pt-2 border-t border-default">
<p class="text-[10px] font-medium text-dim mb-1.5">출처</p>
<div class="flex flex-wrap gap-1.5">
{#each uniqueCitations as citation}
<button
type="button"
onclick={() => onCitationClick(citation.doc_id)}
class="inline-flex items-center gap-1 text-[11px] px-2 py-0.5 rounded bg-surface text-text border border-default hover:border-accent hover:text-accent transition-colors"
title={citation.span_text}
>
<FileText size={10} />
<span class="max-w-[200px] truncate">
{citation.title || `문서 #${citation.doc_id}`}
</span>
</button>
{/each}
</div>
</div>
{/if}
{/if}
</div>
@@ -1,228 +0,0 @@
<!--
AskAnswer.svelte — /ask 페이지 상단 패널.
Answer 본문 + clickable [n] citations + 신뢰도/상태 Badge.
status != completed 또는 refused=true → warning empty state +
no_results_reason + "검색 결과 확인하기" 역링크.
-->
<script lang="ts">
import Badge from '$lib/components/ui/Badge.svelte';
import Button from '$lib/components/ui/Button.svelte';
import EmptyState from '$lib/components/ui/EmptyState.svelte';
import Skeleton from '$lib/components/ui/Skeleton.svelte';
import { AlertTriangle, Sparkles } from 'lucide-svelte';
import type { AskResponse, Confidence, SynthesisStatus } from '$lib/types/ask';
interface Props {
data: AskResponse | null;
loading: boolean;
onCitationClick: (n: number) => void;
}
let { data, loading, onCitationClick }: Props = $props();
type Token =
| { type: 'text'; value: string }
| { type: 'cite'; n: number; raw: string };
function splitAnswer(text: string): Token[] {
return text
.split(/(\[\d+\])/g)
.filter(Boolean)
.map((tok): Token => {
const m = tok.match(/^\[(\d+)\]$/);
return m
? { type: 'cite', n: Number(m[1]), raw: tok }
: { type: 'text', value: tok };
});
}
function confidenceTone(
c: Confidence | null,
): 'success' | 'warning' | 'error' | 'neutral' {
if (c === 'high') return 'success';
if (c === 'medium') return 'warning';
if (c === 'low') return 'error';
return 'neutral';
}
function confidenceLabel(c: Confidence | null): string {
if (c === 'high') return '높음';
if (c === 'medium') return '중간';
if (c === 'low') return '낮음';
return '없음';
}
const STATUS_LABEL: Record<SynthesisStatus, string> = {
completed: '답변 완료',
timeout: '답변 지연',
skipped: '답변 생략',
no_evidence: '근거 없음',
parse_failed: '형식 오류',
llm_error: 'AI 오류',
backend_unavailable: 'Backend 비가용',
};
/**
* backend chip label — `backend_requested` 가 명시 opt-in 인 경우만 표시.
* 미지정 (null/undefined) default 호출은 chip 없음 (시각 noise 회피).
*/
function backendChipLabel(backend: string | null | undefined): string | null {
if (!backend) return null;
if (backend === 'qwen-macbook') return 'Qwen 27B (MacBook)';
if (backend === 'gemma-macmini') return 'Gemma 26B (Mac mini)';
return backend;
}
let tokens = $derived(data?.ai_answer ? splitAnswer(data.ai_answer) : []);
let showFullAnswer = $derived(
!!data && !!data.ai_answer && data.completeness === 'full'
&& data.synthesis_status === 'completed' && !data.refused,
);
let showPartial = $derived(
!!data && data.completeness === 'partial' && !data.refused,
);
let showWarning = $derived(!!data && !showFullAnswer && !showPartial);
</script>
<section class="bg-surface border border-default rounded-card p-5">
<!-- 헤더 -->
<div class="flex items-start justify-between gap-3 mb-4">
<div>
<p class="text-[10px] font-semibold tracking-wider uppercase text-dim flex items-center gap-1.5">
<Sparkles size={12} /> AI Answer
</p>
<h2 class="mt-1 text-base font-semibold text-text">근거 기반 답변</h2>
</div>
{#if data && !loading}
<div class="flex flex-wrap gap-1.5">
<Badge tone={confidenceTone(data.confidence)} size="sm">
신뢰도 {confidenceLabel(data.confidence)}
</Badge>
{#if backendChipLabel(data.backend_requested)}
<span title={`backend_requested=${data.backend_requested} / backend_used=${data.backend_used ?? 'null'}`}>
<Badge tone="neutral" size="sm">
{backendChipLabel(data.backend_requested)}
</Badge>
</span>
{/if}
<Badge tone="neutral" size="sm">
{STATUS_LABEL[data.synthesis_status]}
</Badge>
{#if data.synthesis_ms > 0}
<Badge tone="neutral" size="sm">
{Math.round(data.synthesis_ms)}ms
</Badge>
{/if}
</div>
{/if}
</div>
<!-- 본문 -->
{#if loading}
<div class="space-y-3">
<Skeleton w="w-3/4" h="h-4" />
<Skeleton w="w-full" h="h-4" />
<Skeleton w="w-5/6" h="h-4" />
<p class="mt-4 text-xs text-dim flex items-center gap-2">
<span class="inline-block w-3 h-3 rounded-full border-2 border-dim border-t-accent animate-spin"></span>
근거 기반 답변 생성 중… 약 15초 소요
</p>
</div>
{:else if showFullAnswer && data}
<div class="text-sm leading-7 text-text">
{#each tokens as tok}
{#if tok.type === 'cite'}
<button
type="button"
class="inline-block align-baseline text-accent font-semibold hover:underline focus-visible:outline-none focus-visible:ring-2 focus-visible:ring-accent-ring rounded px-0.5"
onclick={() => onCitationClick(tok.n)}
aria-label={`인용 ${tok.n}번 보기`}
>
{tok.raw}
</button>
{:else}
<span>{tok.value}</span>
{/if}
{/each}
</div>
{:else if showPartial && data}
<!-- Phase 3.5a: question-aligned partial structure -->
<div>
<Badge tone="warning" size="sm">일부 답변</Badge>
{#if data.ai_answer}
<div class="mt-3 text-sm leading-7 text-text">
{#each tokens as tok}
{#if tok.type === 'cite'}
<button
type="button"
class="inline-block align-baseline text-accent font-semibold hover:underline rounded px-0.5"
onclick={() => onCitationClick(tok.n)}
>{tok.raw}</button>
{:else}
<span>{tok.value}</span>
{/if}
{/each}
</div>
{:else if data.confirmed_items?.length}
<div class="mt-3">
<h4 class="text-xs font-semibold text-dim uppercase tracking-wider">✓ 답변 가능</h4>
<ul class="mt-2 space-y-2">
{#each data.confirmed_items as item}
<li class="text-sm text-text">
<strong class="text-accent">{item.aspect}:</strong>
<span>{item.text}</span>
{#each item.citations as n}
<button
type="button"
class="text-accent font-semibold hover:underline px-0.5"
onclick={() => onCitationClick(n)}
>[{n}]</button>
{/each}
</li>
{/each}
</ul>
</div>
{/if}
{#if data.missing_aspects?.length}
<div class="mt-4 border-t border-default pt-3">
<h4 class="text-xs font-semibold text-dim uppercase tracking-wider">✗ 답변 불가</h4>
<ul class="mt-2 space-y-1">
{#each data.missing_aspects as aspect}
<li class="text-sm text-dim">{aspect} <span class="text-[10px]">(근거 없음)</span></li>
{/each}
</ul>
</div>
{/if}
<div class="mt-4">
<Button
variant="secondary"
size="sm"
href={`/documents?q=${encodeURIComponent(data.query)}`}
>
검색 결과 확인하기
</Button>
</div>
</div>
{:else if showWarning && data}
<EmptyState
icon={AlertTriangle}
title={data.refused && data.no_results_reason
? data.no_results_reason
: (data.no_results_reason ?? '관련 근거를 찾지 못했습니다.')}
description="검색 결과를 직접 확인해 보세요."
>
<Button
variant="secondary"
size="sm"
href={`/documents?q=${encodeURIComponent(data.query)}`}
>
검색 결과 확인하기
</Button>
</EmptyState>
{/if}
</section>
@@ -1,91 +0,0 @@
<!--
AskEvidence.svelte — /ask 페이지 우측 sticky 패널.
⚠ 영구 룰 (Phase 3.4 plan):
`citation.full_snippet` 은 UI 에 직접 렌더 금지. debug 모드(`?debug=1`)
에서 hover tooltip 으로만 조건부 노출 가능.
이 규칙이 깨지면 backend span-precision UX 가치가 사라진다. 코드 리뷰에서
반드시 reject. span_text 만 본문으로 노출한다.
-->
<script lang="ts">
import Badge from '$lib/components/ui/Badge.svelte';
import EmptyState from '$lib/components/ui/EmptyState.svelte';
import Skeleton from '$lib/components/ui/Skeleton.svelte';
import { BookOpen } from 'lucide-svelte';
import type { AskResponse } from '$lib/types/ask';
interface Props {
data: AskResponse | null;
loading: boolean;
activeCitation: number | null;
registerCitation: (n: number, node: HTMLElement) => { destroy: () => void };
}
let { data, loading, activeCitation, registerCitation }: Props = $props();
let citations = $derived(data?.citations ?? []);
</script>
<section class="bg-surface border border-default rounded-card p-5">
<div class="flex items-start justify-between gap-3 mb-4">
<div>
<p class="text-[10px] font-semibold tracking-wider uppercase text-dim flex items-center gap-1.5">
<BookOpen size={12} /> Evidence Highlights
</p>
<h3 class="mt-1 text-sm font-semibold text-text">인용 근거</h3>
</div>
{#if data && !loading}
<Badge tone="neutral" size="sm">{citations.length}</Badge>
{/if}
</div>
{#if loading}
<div class="space-y-3">
{#each Array(2) as _}
<div class="border border-default rounded-card p-4 space-y-2">
<Skeleton w="w-24" h="h-3" />
<Skeleton w="w-full" h="h-3" />
<Skeleton w="w-5/6" h="h-3" />
<Skeleton w="w-3/4" h="h-3" />
</div>
{/each}
</div>
{:else if citations.length === 0}
<EmptyState title="표시할 근거가 없습니다." class="py-6" />
{:else}
<div class="space-y-3">
{#each citations as citation (citation.n)}
{@const isActive = activeCitation === citation.n}
<article
class="border rounded-card p-4 transition-colors {isActive
? 'border-accent ring-2 ring-accent/20 bg-accent/5'
: 'border-default'}"
use:registerCitation={citation.n}
>
<div class="flex items-start gap-2">
<span class="text-accent font-bold text-sm shrink-0">[{citation.n}]</span>
<div class="flex-1 min-w-0">
<strong class="block text-sm text-text truncate">
{citation.title ?? `문서 ${citation.doc_id}`}
</strong>
{#if citation.section_title}
<p class="mt-0.5 text-xs text-dim truncate">{citation.section_title}</p>
{/if}
</div>
</div>
<!-- ⚠ span_text 만 렌더. full_snippet 금지 -->
<p class="mt-3 text-sm leading-relaxed text-text whitespace-pre-wrap">
{citation.span_text}
</p>
<div class="mt-3 flex gap-2 text-[10px] text-dim">
<span>relevance {citation.relevance.toFixed(2)}</span>
<span>rerank {citation.rerank_score.toFixed(2)}</span>
</div>
</article>
{/each}
</div>
{/if}
</section>
@@ -1,78 +0,0 @@
<!--
AskResults.svelte — /ask 페이지 하단 패널.
검색 결과 리스트. DocumentCard 재사용 X — SearchResult 필드 셋이 달라서
의존성 리스크 회피. inline 간단 카드로 title/score/snippet/section_title 표시.
클릭 시 `/documents/{id}` 로 이동.
-->
<script lang="ts">
import Badge from '$lib/components/ui/Badge.svelte';
import EmptyState from '$lib/components/ui/EmptyState.svelte';
import Skeleton from '$lib/components/ui/Skeleton.svelte';
import { FileText } from 'lucide-svelte';
import type { AskResponse } from '$lib/types/ask';
interface Props {
data: AskResponse | null;
loading: boolean;
}
let { data, loading }: Props = $props();
let results = $derived(data?.results ?? []);
</script>
<section class="bg-surface border border-default rounded-card p-5">
<div class="flex items-start justify-between gap-3 mb-4">
<div>
<p class="text-[10px] font-semibold tracking-wider uppercase text-dim flex items-center gap-1.5">
<FileText size={12} /> Search Results
</p>
<h3 class="mt-1 text-sm font-semibold text-text">검색 결과</h3>
</div>
{#if data && !loading}
<Badge tone="neutral" size="sm">{data.total}</Badge>
{/if}
</div>
{#if loading}
<div class="space-y-3">
{#each Array(5) as _}
<div class="border border-default rounded-card p-4 space-y-2">
<Skeleton w="w-2/3" h="h-4" />
<Skeleton w="w-full" h="h-3" />
<Skeleton w="w-4/5" h="h-3" />
</div>
{/each}
</div>
{:else if results.length === 0}
<EmptyState title="검색 결과가 없습니다." class="py-6" />
{:else}
<div class="space-y-3">
{#each results as result (result.id)}
<a
href={`/documents/${result.id}`}
class="block border border-default rounded-card p-4 hover:border-accent hover:bg-surface-hover transition-colors"
>
<div class="flex items-start justify-between gap-3">
<strong class="text-sm text-text flex-1 min-w-0 truncate">
{result.title ?? `문서 ${result.id}`}
</strong>
<div class="flex gap-1.5 text-[10px] text-dim shrink-0">
<span>score {result.score.toFixed(2)}</span>
{#if result.rerank_score != null}
<span>rerank {result.rerank_score.toFixed(2)}</span>
{/if}
</div>
</div>
{#if result.section_title}
<p class="mt-1 text-xs text-dim truncate">{result.section_title}</p>
{/if}
{#if result.snippet}
<p class="mt-2 text-xs text-dim line-clamp-2">{result.snippet}</p>
{/if}
</a>
{/each}
</div>
{/if}
</section>
-84
View File
@@ -1,84 +0,0 @@
/**
* Phase 3.4: `/api/search/ask` .
*
* Backend Pydantic (`app/api/search.py::AskResponse`) 1:1 .
* .
*/
export type SynthesisStatus =
| 'completed'
| 'timeout'
| 'skipped'
| 'no_evidence'
| 'parse_failed'
| 'llm_error'
| 'backend_unavailable';
export type Confidence = 'high' | 'medium' | 'low';
export interface Citation {
n: number;
chunk_id: number | null;
doc_id: number;
title: string | null;
section_title: string | null;
/** LLM이 추출한 50~300자 핵심 span. UI에서 이것만 노출. */
span_text: string;
/**
* 800 window.
*
* UI . debug hover tooltip
* . full_snippet을 backend span-precision UX
* (plan §Evidence ).
*/
full_snippet: string;
relevance: number;
rerank_score: number;
}
export interface SearchResult {
id: number;
title: string | null;
ai_domain: string | null;
ai_summary: string | null;
file_format: string;
score: number;
snippet: string | null;
match_reason: string | null;
chunk_id: number | null;
chunk_index: number | null;
section_title: string | null;
rerank_score: number | null;
}
export type Completeness = 'full' | 'partial' | 'insufficient';
export interface ConfirmedItem {
aspect: string;
text: string;
citations: number[];
}
export interface AskResponse {
results: SearchResult[];
ai_answer: string | null;
citations: Citation[];
synthesis_status: SynthesisStatus;
synthesis_ms: number;
confidence: Confidence | null;
refused: boolean;
no_results_reason: string | null;
query: string;
total: number;
/** Phase 3.5a */
completeness: Completeness;
covered_aspects: string[] | null;
missing_aspects: string[] | null;
confirmed_items: ConfirmedItem[] | null;
/**
* PR-MacBook-RAG-Backend-1: backend dispatcher metadata.
* backend null ( ). opt-in .
*/
backend_requested?: string | null;
backend_used?: string | null;
}
-61
View File
@@ -1,61 +0,0 @@
/**
* "질문형" ( ).
* true이면 /api/search/ask .
*
* false positive: /ask refused=true .
* false negative: 기존 .
*/
export function isQuestion(q: string): boolean {
const trimmed = q.trim();
if (trimmed.length === 0) return false;
// 1. ?로 끝나면 단일 단어라도 허용 (왜?, 절차?)
if (trimmed.endsWith('?')) return true;
// ? 없으면 단일 단어 / 4자 미만 제외 (키워드 보호)
if (trimmed.length < 4) return false;
if (trimmed.split(/\s+/).length < 2) return false;
// 2. 한국어 질문 어미
const KO_ENDINGS = [
'인가요', '인가', '인지', '있나요', '있나',
'할까요', '할까', '될까요', '될까',
'뭐야', '뭔가', '뭘까', '뭔지', '뭐지', '뭐죠',
'는지', '은지', '일까',
'어때', '어떤가',
'됩니까', '합니까', '입니까', '나요', '까요',
'주세요', '알려줘', '설명해', '비교해',
];
for (const ending of KO_ENDINGS) {
if (trimmed.endsWith(ending)) return true;
}
// 3. 한국어 질문 시작어
const KO_STARTS = [
'어떻게', '왜', '무엇', '무슨', '뭐가', '누가',
'어디', '언제', '몇', '얼마나', '어떤', '어느',
];
for (const start of KO_STARTS) {
if (trimmed.startsWith(start)) return true;
}
// 4. 영어 질문 시작어 (대소문자 무시)
const EN_STARTS = [
'what', 'how', 'why', 'when', 'where', 'who', 'which',
'is', 'are', 'do', 'does', 'can', 'could', 'should', 'would',
'explain', 'describe', 'compare', 'tell me',
];
const lower = trimmed.toLowerCase();
for (const start of EN_STARTS) {
if (lower.startsWith(start + ' ')) return true;
}
// 5. 의미 패턴 (끝 단어)
const SEMANTIC_ENDINGS = ['차이', '비교', '설명', '요약', '정리', '방법', '절차'];
const lastWord = trimmed.split(/\s+/).pop() || '';
for (const pat of SEMANTIC_ENDINGS) {
if (lastWord === pat || lastWord.endsWith(pat)) return true;
}
return false;
}
-305
View File
@@ -1,305 +0,0 @@
<!--
/ask — Phase 3.4 Ask Pipeline Frontend.
URL-driven: `/ask?q=<encoded>[&backend=<alias>]` 가 진입점.
$effect 로 (q, backend) 변화 감지 → `/api/search/ask` 호출 →
3-panel 렌더 (Answer / Evidence / Results).
중복 호출 방지: lastKey (q+backend) 가드.
Backend selector (PR-3 of DS AI routing policy, 2026-05-23,
[[document-server-ai-routing-policy]]) — PR-DocSrv-Web-Ask-Selector-1 확장:
- `auto` (기본, URL param 없음 → router 의 rule + LLM triage)
- `mac-mini-default` (명시, Mac mini gemma-4-26b)
- `qwen-macbook` (명시, M5 Max Qwen3.6-27B; "This device" 토글 on 시만)
- `claude-cloud` (명시, 503 scaffold; VITE_ENABLE_CLOUD_BACKEND_DEV=true 시만)
- "This device" 토글: localStorage `ds_device_self_label = 'macbook-m5-max' | null`.
source IP 의존 0 (PR-0 round 2 발견: caddy 2-hop + X-Forwarded-For 미설정 →
DS 가 보는 source IP = LAN gateway, 신뢰 불가).
- 503 + error_reason ∈ {macbook_unavailable, provider_not_configured, router_*}
시 자동 fallback 금지. UI 가 친절한 메시지 + "Mac mini default 로 재요청" 버튼.
- legacy URL `?backend=default|gemma-macmini` 는 그대로 받아서 mac-mini-default 와 동등 처리.
-->
<script lang="ts">
import { page } from '$app/stores';
import { goto } from '$app/navigation';
import { api, type ApiError } from '$lib/api';
import { addToast } from '$lib/stores/toast';
import EmptyState from '$lib/components/ui/EmptyState.svelte';
import Button from '$lib/components/ui/Button.svelte';
import AskAnswer from '$lib/components/ask/AskAnswer.svelte';
import AskEvidence from '$lib/components/ask/AskEvidence.svelte';
import AskResults from '$lib/components/ask/AskResults.svelte';
import { Sparkles, Search, AlertCircle } from 'lucide-svelte';
import type { AskResponse } from '$lib/types/ask';
type BackendChoice = 'auto' | 'mac-mini-default' | 'qwen-macbook' | 'claude-cloud';
function parseBackend(raw: string | null): BackendChoice {
if (raw === 'qwen-macbook') return 'qwen-macbook';
if (raw === 'mac-mini-default') return 'mac-mini-default';
if (raw === 'claude-cloud') return 'claude-cloud';
// legacy aliases (?backend=default | gemma-macmini) → mac-mini-default 와 동등
if (raw === 'default' || raw === 'gemma-macmini') return 'mac-mini-default';
return 'auto';
}
// Build-time feature flag — Claude Cloud UI 노출 여부 (default false).
// VITE_ENABLE_CLOUD_BACKEND_DEV=true npm run build 시만 활성. 운영 토글 X
// (build-time 한계). DS runtime feature flag API migrate 는 후속 PR.
const CLOUD_DEV_ENABLED = import.meta.env.VITE_ENABLE_CLOUD_BACKEND_DEV === 'true';
const DEVICE_LABEL_KEY = 'ds_device_self_label';
const DEVICE_LABEL_M5_MAX = 'macbook-m5-max';
// ── state ───────────────────────────────────────────
let queryInput = $state('');
let selectedBackend = $state<BackendChoice>('auto');
let data = $state<AskResponse | null>(null);
let loading = $state(false);
let backendUnavailable = $state(false);
let backendUnavailableMessage = $state('');
// "I am on MacBook M5 Max" 토글. mount 시 localStorage 에서 복원.
let isMacBookM5Max = $state(false);
$effect(() => {
if (typeof window === 'undefined') return;
const stored = window.localStorage.getItem(DEVICE_LABEL_KEY);
isMacBookM5Max = stored === DEVICE_LABEL_M5_MAX;
});
function toggleMacBookM5Max() {
isMacBookM5Max = !isMacBookM5Max;
if (typeof window === 'undefined') return;
if (isMacBookM5Max) {
window.localStorage.setItem(DEVICE_LABEL_KEY, DEVICE_LABEL_M5_MAX);
} else {
window.localStorage.removeItem(DEVICE_LABEL_KEY);
// 토글 off 시 qwen-macbook 선택돼 있었으면 auto 로 복귀 (선택권 박탈 X 명시 신호).
if (selectedBackend === 'qwen-macbook') {
selectedBackend = 'auto';
}
}
}
// 중복 호출 방지 가드 (hydration + reactive trigger 이중 발동 방지)
let lastKey = '';
// citation scroll 연동: Answer 가 [n] 클릭 → Evidence 카드로 이동 + highlight
const citationNodes = new Map<number, HTMLElement>();
let activeCitation = $state<number | null>(null);
function registerCitation(n: number, node: HTMLElement) {
citationNodes.set(n, node);
return {
destroy() {
citationNodes.delete(n);
},
};
}
function scrollToCitation(n: number) {
activeCitation = n;
const node = citationNodes.get(n);
node?.scrollIntoView({ behavior: 'smooth', block: 'center' });
}
// ── URL 빌더: backend !== 'auto' 일 때만 param 추가 ─────
function buildAskUrl(q: string, backend: BackendChoice): string {
const params = new URLSearchParams({ q });
if (backend !== 'auto') params.set('backend', backend);
return `/ask?${params.toString()}`;
}
// ── submit (URL-driven, back 자동) ──────────────────
function submit() {
const q = queryInput.trim();
if (!q) return;
goto(buildAskUrl(q, selectedBackend));
}
function handleKeydown(e: KeyboardEvent) {
if (e.key === 'Enter' && !e.isComposing) {
e.preventDefault();
submit();
}
}
// 503 후 "Mac mini default 로 재요청" — auto 로 reset (param 명시 제거).
function retryWithMacMiniDefault() {
const q = $page.url.searchParams.get('q') ?? queryInput.trim();
if (!q) return;
goto(`/ask?q=${encodeURIComponent(q)}`);
}
// PR-3 of routing policy — error_reason → 친절 메시지 매핑.
// silent fallback 금지 ([[feedback_no_silent_fallback_explicit_opt_in]]):
// 모든 503 case 는 명시 표시, 다른 backend 자동 재호출 X.
function friendlyErrorMessage(reason: string | undefined, detail: string): string {
switch (reason) {
case 'macbook_unavailable':
return 'MacBook M5 Max 가 응답하지 않습니다. 깨우거나 (launchctl start) Mac mini default 로 다시 요청하세요.';
case 'provider_not_configured':
return 'Claude Cloud 백엔드는 아직 구성되지 않았습니다 (2026-06-15 이후 별 PR 활성화 예정).';
default:
if (reason && reason.startsWith('router_')) {
return `라우터 호출 실패 (${reason}). Mac mini default 로 다시 요청하거나 잠시 후 재시도하세요.`;
}
if (reason && reason.startsWith('upstream_')) {
return `Upstream backend 가 응답하지 않습니다 (${reason}). 잠시 후 재시도하세요.`;
}
return detail || '답변 생성 실패';
}
}
// ── API 호출 ───────────────────────────────────────
async function runAsk(q: string, backend: BackendChoice) {
loading = true;
activeCitation = null;
backendUnavailable = false;
backendUnavailableMessage = '';
const path = backend !== 'auto'
? `/search/ask?q=${encodeURIComponent(q)}&backend=${encodeURIComponent(backend)}&limit=10`
: `/search/ask?q=${encodeURIComponent(q)}&limit=10`;
try {
data = await api<AskResponse>(path);
} catch (err) {
const apiErr = err as ApiError;
if (apiErr.status === 503) {
backendUnavailable = true;
backendUnavailableMessage = friendlyErrorMessage(apiErr.errorReason, apiErr.detail);
data = null;
} else {
addToast('error', apiErr.detail || '답변 생성 실패');
data = null;
}
} finally {
loading = false;
}
}
// ── URL → runAsk (중복 가드) ────────────────────────
$effect(() => {
const q = $page.url.searchParams.get('q') ?? '';
const backend = parseBackend($page.url.searchParams.get('backend'));
queryInput = q;
selectedBackend = backend;
if (!q) {
data = null;
loading = false;
backendUnavailable = false;
lastKey = '';
return;
}
const key = `${q}|${backend}`;
if (key === lastKey) return;
lastKey = key;
runAsk(q, backend);
});
</script>
<svelte:head>
<title>질문 - PKM</title>
</svelte:head>
<div class="h-full overflow-auto">
<!-- 상단 검색바 (sticky) -->
<div class="sticky top-0 z-10 bg-bg/80 backdrop-blur border-b border-default px-4 py-3">
<div class="flex flex-wrap items-center gap-2 max-w-[1680px] mx-auto">
<div class="relative flex-1 min-w-0">
<Search
size={14}
class="absolute left-3 top-1/2 -translate-y-1/2 text-dim pointer-events-none"
/>
<input
data-search-input
type="text"
bind:value={queryInput}
onkeydown={handleKeydown}
placeholder="질문을 입력하세요 (/ 키로 포커스)"
class="w-full pl-9 pr-3 py-2 bg-surface border border-default rounded-lg text-text text-sm focus:border-accent outline-none"
/>
</div>
<label
class="flex items-center gap-1.5 text-xs text-dim cursor-pointer select-none"
title="이 디바이스가 MacBook M5 Max 인 경우 체크 — This device (qwen-macbook) 옵션 활성화됩니다. localStorage 저장."
>
<input
type="checkbox"
checked={isMacBookM5Max}
onchange={toggleMacBookM5Max}
class="accent-accent"
/>
<span>This is M5 Max</span>
</label>
<select
bind:value={selectedBackend}
title="Backend 선택 — silent fallback 0 정책 (선택한 backend 만 시도, 실패 시 503)."
class="py-2 px-2 bg-surface border border-default rounded-lg text-text text-xs focus:border-accent outline-none min-w-0 max-w-[42vw] truncate"
>
<option value="auto">Auto (router)</option>
<option value="mac-mini-default">Mac mini (default)</option>
<option
value="qwen-macbook"
disabled={!isMacBookM5Max}
title={isMacBookM5Max
? 'MacBook M5 Max Qwen3.6-27B (직접 호출)'
: 'Check "This is M5 Max" toggle to enable'}
>
{isMacBookM5Max ? 'This device (Qwen MacBook)' : 'This device (unavailable)'}
</option>
<option
value="claude-cloud"
disabled={!CLOUD_DEV_ENABLED}
title={CLOUD_DEV_ENABLED
? 'Claude Cloud (dev mode — returns 503 until activation PR)'
: 'Cloud backend not configured yet'}
>
Claude Cloud {CLOUD_DEV_ENABLED ? '(dev)' : '(unavailable)'}
</option>
</select>
</div>
</div>
<!-- 본문 -->
<div class="max-w-[1680px] mx-auto p-4">
{#if backendUnavailable}
<div class="py-16">
<EmptyState
icon={AlertCircle}
title="Backend 가 응답하지 않습니다"
description={backendUnavailableMessage}
>
<Button variant="primary" size="sm" onclick={retryWithMacMiniDefault}>
Mac mini (default) 로 재요청
</Button>
</EmptyState>
</div>
{:else if !queryInput && !loading && !data}
<div class="py-16">
<EmptyState
icon={Sparkles}
title="근거 기반 답변을 받아보세요"
description="질문을 입력하면 문서에서 근거를 찾아 인용 기반 답변을 생성합니다."
/>
</div>
{:else}
<div class="grid gap-4 lg:grid-cols-[1.2fr_0.9fr] items-start">
<!-- 좌: Answer + Results -->
<div class="flex flex-col gap-4 min-w-0">
<AskAnswer {data} {loading} onCitationClick={scrollToCitation} />
<AskResults {data} {loading} />
</div>
<!-- 우: Evidence (lg 이상 sticky) -->
<div class="lg:sticky lg:top-20 lg:self-start min-w-0">
<AskEvidence
{data}
{loading}
{activeCitation}
{registerCitation}
/>
</div>
</div>
{/if}
</div>
</div>
+1 -49
View File
@@ -8,7 +8,7 @@
import { goto } from '$app/navigation';
import { api } from '$lib/api';
import { addToast } from '$lib/stores/toast';
import { X, Plus, Trash2, Tag, FolderTree, Sparkles, ArrowUpDown } from 'lucide-svelte';
import { X, Plus, Trash2, Tag, FolderTree, ArrowUpDown } from 'lucide-svelte';
import MarkdownStatusBadge from '$lib/components/MarkdownStatusBadge.svelte';
import { isMdStatusVisible } from '$lib/utils/mdStatus';
import UploadDropzone from '$lib/components/UploadDropzone.svelte';
@@ -22,9 +22,7 @@
import { useIsXl } from '$lib/composables/useMedia.svelte';
import { useListKeyboardNav } from '$lib/composables/useListKeyboardNav.svelte';
import { pLimit } from '$lib/utils/pLimit';
import { isQuestion } from '$lib/utils/isQuestion';
import { domainBgClass, domainLabel } from '$lib/utils/domainSlug';
import AskAnswerCard from '$lib/components/AskAnswerCard.svelte';
const FORMATS = ['pdf', 'hwp', 'hwpx', 'md', 'docx', 'xlsx', 'png', 'jpg'];
@@ -48,30 +46,6 @@
let searchResults = $state(null);
let selectedDoc = $state(null);
// 이드 답변 상태 (질문형 검색)
let askData = $state(null);
let askLoading = $state(false);
let askError = $state(false);
let askDismissed = $state(false);
async function askSearch(q) {
askLoading = true; askError = false; askData = null;
try {
askData = await api(`/search/ask?q=${encodeURIComponent(q)}&limit=10`);
} catch {
askError = true; askData = null;
} finally {
askLoading = false;
}
}
let showAskCard = $derived(
!askDismissed && (
askLoading ||
(askData && !askData.refused && askData.ai_answer && askData.synthesis_status === 'completed')
)
);
// 인스펙터(우측) 토글 — xl+ inline, < xl Drawer.
const isXl = useIsXl();
let inspectorOpen = $state(
@@ -145,7 +119,6 @@
selectedDoc = null;
selectedIds = new Set();
kbIndex = 0;
askData = null; askLoading = false; askError = false; askDismissed = false;
if (ui.isDrawerOpen('meta')) ui.closeDrawer();
if (urlQ) doSearch(urlQ, urlMode);
else loadDocuments();
@@ -191,7 +164,6 @@
async function doSearch(q, mode) {
loading = true;
if (isQuestion(q)) askSearch(q);
try {
const data = await api(`/search/?q=${encodeURIComponent(q)}&mode=${mode}&limit=50`);
searchResults = data.results;
@@ -406,13 +378,6 @@
<option value="trgm">부분매칭</option>
<option value="vector">의미검색</option>
</select>
{#if searchQuery.trim()}
<a
href={`/ask?q=${encodeURIComponent(searchQuery.trim())}`}
class="flex items-center px-2 py-1.5 rounded-lg border border-default text-dim hover:text-accent hover:border-accent transition-colors"
title="이 쿼리로 AI 답변"
><Sparkles size={14} /></a>
{/if}
</div>
<!-- 필터 칩 row -->
@@ -483,19 +448,6 @@
{/if}
</div>
<!-- AI 답변 (질문형 검색) — 목록 상단 고정, 아래로 목록 스크롤 -->
{#if showAskCard}
<div class="px-3 py-2 shrink-0 border-b border-default max-h-[55vh] overflow-y-auto">
<AskAnswerCard
data={askData}
loading={askLoading}
error={askError}
onCitationClick={(docId) => goto(`/documents/${docId}`)}
onDismiss={() => { askDismissed = true; }}
/>
</div>
{/if}
<!-- 선택 toolbar -->
{#if selectionCount > 0}
<div class="flex flex-wrap items-center gap-2 px-3 py-2 shrink-0 bg-accent/10 border-y border-accent/30">
+23
View File
@@ -0,0 +1,23 @@
-- 377_domain_bucket.sql
-- ai_domain(반자유 AI 분류, 드리프트 존재)을 검색 스코프용 7버킷으로 결정적 롤업.
-- 축: ai_domain(routing/해석 축)의 coarsening — category(UI축) 아님 (feedback_category_vs_ai_domain_axis 준수).
-- 버킷: News / Safety / Law / Engineering / General / Philosophy / Programming.
-- STORED generated → 신규/재분류 문서도 ai_domain 붙으면 자동 버킷. ai_domain 원본 보존(하위 검색 유지).
-- 롤백: ALTER TABLE documents DROP COLUMN domain_bucket;
ALTER TABLE documents ADD COLUMN IF NOT EXISTS domain_bucket text
GENERATED ALWAYS AS (
CASE
WHEN ai_domain LIKE 'News%' THEN 'News'
WHEN ai_domain = '법령' OR ai_domain LIKE 'Industrial_Safety/Legislation%' THEN 'Law'
WHEN ai_domain = 'Safety' OR ai_domain LIKE 'Safety/%'
OR ai_domain LIKE 'Industrial_Safety%'
OR ai_domain = 'Knowledge/Industrial_Safety' THEN 'Safety'
WHEN ai_domain LIKE 'Engineering%' OR ai_domain = 'Knowledge/Engineering' THEN 'Engineering'
WHEN ai_domain LIKE 'Philosophy%' THEN 'Philosophy'
WHEN ai_domain LIKE 'Programming%' THEN 'Programming'
ELSE 'General'
END
) STORED;
CREATE INDEX IF NOT EXISTS documents_domain_bucket_idx
ON documents (domain_bucket) WHERE deleted_at IS NULL;
@@ -0,0 +1,9 @@
-- 378_publish_outbox_attempts_failed.sql
-- (번호: 멀티세션 중 prod 가 377_domain_bucket 을 선점 → 378 로 리넘버.)
-- publish_outbox poison row head-of-line block 차단. 발행 워커가 행별 savepoint 격리 후
-- 예외 시 attempts++ 하고 MAX 초과 시 failed_at 스탬프(terminal) → 그 행을 select 에서 제외해
-- 후속 발행이 막히지 않게 함. 기존 미처리 행은 attempts=0 / failed_at=NULL 로 정상 재처리.
-- (단일 ALTER = 1 statement = asyncpg prepared 호환.)
ALTER TABLE publish_outbox
ADD COLUMN IF NOT EXISTS attempts SMALLINT NOT NULL DEFAULT 0,
ADD COLUMN IF NOT EXISTS failed_at TIMESTAMPTZ;
+12 -8
View File
@@ -31,8 +31,8 @@ from core.database import async_session
from models.study_memo_card_progress import StudyMemoCardProgress
from services.study.publish_enqueue import backfill_publish_card_progress
# 개인 학습툴 progress row 대비 넉넉. 도달 시 가드 경보.
PAGE = 100000
# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx).
PAGE = 5000
async def run(dry_run: bool) -> None:
@@ -50,13 +50,17 @@ async def run(dry_run: bool) -> None:
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
return
async with async_session() as session:
n = await backfill_publish_card_progress(session, after_id=0, limit=PAGE)
await session.commit()
total = 0
after = 0
while True:
async with async_session() as session:
n, after = await backfill_publish_card_progress(session, after_id=after, limit=PAGE)
await session.commit()
total += n
if n < PAGE:
break
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
if n >= PAGE:
print(f"[warn] PAGE({PAGE}) 도달 — progress 가 더 있을 수 있음. after_id 페이징 추가 필요.")
print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
def main() -> None:
+12 -8
View File
@@ -31,8 +31,8 @@ from core.database import async_session
from models.study_memo_card import StudyMemoCard
from services.study.publish_enqueue import backfill_publish_cards
# 개인 학습툴 카드 수 대비 넉넉(단일 outbox 적재 tx, 워커는 BATCH_SIZE 로 drain). 도달 시 가드 경보.
PAGE = 100000
# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx). 워커는 BATCH_SIZE 로 drain.
PAGE = 5000
async def run(dry_run: bool) -> None:
@@ -55,13 +55,17 @@ async def run(dry_run: bool) -> None:
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
return
async with async_session() as session:
n = await backfill_publish_cards(session, after_id=0, limit=PAGE)
await session.commit()
total = 0
after = 0
while True:
async with async_session() as session:
n, after = await backfill_publish_cards(session, after_id=after, limit=PAGE)
await session.commit()
total += n
if n < PAGE:
break
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
if n >= PAGE:
print(f"[warn] PAGE({PAGE}) 도달 — 카드가 더 있을 수 있음. after_id 페이징 추가 필요.")
print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
def main() -> None:
+11 -7
View File
@@ -37,7 +37,7 @@ from core.database import async_session
from models.study_topic import StudyTopic
from services.study.publish_enqueue import backfill_publish_topics
# 개인 학습툴 주제 수 대비 넉넉. 도달 시 overflow 가드가 경보.
# 페이지 배치 크기 — after_id 루프로 전량 처리(bounded tx).
PAGE = 5000
@@ -58,13 +58,17 @@ async def run(dry_run: bool) -> None:
print("[dry-run] 적재 안 함. 실제 실행은 --dry-run 제거.")
return
async with async_session() as session:
n = await backfill_publish_topics(session, after_id=0, limit=PAGE)
await session.commit()
total = 0
after = 0
while True:
async with async_session() as session:
n, after = await backfill_publish_topics(session, after_id=after, limit=PAGE)
await session.commit()
total += n
if n < PAGE:
break
print(f"\n[ok] outbox 적재 {n}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
if n >= PAGE:
print(f"[warn] PAGE({PAGE}) 도달 — 주제가 더 있을 수 있음. after_id 페이징 추가 필요.")
print(f"\n[ok] outbox 적재 {total}건 — 발행 워커가 drain(flag on 시) 하며 rev 부여.")
def main() -> None:
+1 -1
View File
@@ -106,7 +106,7 @@ async def main() -> None:
"SELECT count(*) FROM pg_indexes WHERE indexname='uq_attempt_session_question'"))).scalar()
mx = (await s.execute(text("SELECT max(version) FROM schema_migrations"))).scalar()
print(f"SCHEMA OK — max_migration={mx} documents={docs} purge_col={purge} cand_qwen={cand} attempt_uq={uq}")
assert docs and purge == 1 and cand == 0 and uq == 1 and mx == 361, "FAIL: 기대 스키마 상태 불일치"
assert docs and purge == 1 and cand == 0 and uq == 1 and mx == 378, "FAIL: 기대 스키마 상태 불일치"
# ── 5) /health 직접 호출 ──────────────────────────────────────────────
health = await main.health_check()
+25 -2
View File
@@ -59,6 +59,11 @@ MAX_IMAGES_PER_DOC = int(os.getenv("MINERU_MAX_IMAGES_PER_DOC", "200"))
MAX_BYTES_PER_IMAGE = int(os.getenv("MINERU_MAX_BYTES_PER_IMAGE", str(10 * 1024 * 1024)))
MAX_PAGES_HARD = int(os.getenv("MINERU_MAX_PAGES_HARD", "200")) # 1-shot max_pages 안전장치
# self-timeout — 변환/워밍이 vLLM 행으로 _engine_lock 을 영구 점유해 서비스가 wedge 되는 것을 차단.
# (클라이언트 marker_worker 는 300s 로 포기하나 서버측 inflight 는 자동 취소 안 됨 → 서버 자체 상한 필요.)
PARSE_TIMEOUT_S = float(os.getenv("MINERU_PARSE_TIMEOUT_S", "600"))
WARMUP_TIMEOUT_S = float(os.getenv("MINERU_WARMUP_TIMEOUT_S", "1200")) # 최초 모델 다운로드(~2.4GB) 여유
_PRELOAD = os.getenv("MINERU_PRELOAD", "1") != "0"
# ---- 엔진 상태 ---------------------------------------------------------------
@@ -68,6 +73,15 @@ _warmup_error: str | None = None
_engine_lock = asyncio.Lock()
def _is_engine_fatal(exc: BaseException) -> bool:
"""OOM/CUDA 류 = 엔진 상태 오염 가능 → 재워밍 강제 대상(타임아웃은 호출측에서 별도 판정)."""
s = f"{type(exc).__name__} {exc}".lower()
return any(
k in s
for k in ("out of memory", "oom", "cuda", "cublas", "device-side", "illegal memory")
)
async def _run_mineru(pdf_bytes: bytes, lang: str) -> tuple[str, list[dict]]:
"""슬라이스된 PDF bytes → (markdown, 이미지 dict 리스트). **async 엔진 경로.**
@@ -148,7 +162,7 @@ async def _ensure_warmup() -> None:
page.insert_text((72, 72), "MinerU warmup.")
warmup_bytes = doc.tobytes()
doc.close()
await _run_mineru(warmup_bytes, MINERU_LANG)
await asyncio.wait_for(_run_mineru(warmup_bytes, MINERU_LANG), timeout=WARMUP_TIMEOUT_S)
_warmup_done = True
_warmup_error = None
logger.info(f"[mineru-service] warmup done engine_version={_engine_version}")
@@ -274,6 +288,7 @@ def _serialize_images(images: list[dict], src_path: str) -> tuple[list[ConvertIm
@app.post("/convert", response_model=ConvertResponse)
async def convert(req: ConvertRequest):
global _warmup_done
p = _resolve_path(req.file_path)
if p is None or not p.is_file():
raise HTTPException(404, detail={"code": "file_not_found", "message": req.file_path})
@@ -288,10 +303,18 @@ async def convert(req: ConvertRequest):
async with _engine_lock: # 실제 변환 직렬화(단일 GPU)
start = time.monotonic()
try:
md_text, raw_images = await _run_mineru(pdf_bytes, MINERU_LANG)
md_text, raw_images = await asyncio.wait_for(
_run_mineru(pdf_bytes, MINERU_LANG), timeout=PARSE_TIMEOUT_S
)
except HTTPException:
raise
except Exception as exc:
# 타임아웃(엔진 행) 또는 OOM/CUDA 류면 엔진 오염 가능 → 다음 요청이 재워밍하도록 리셋.
# 재워밍까지 실패하면 _ensure_warmup 이 _warmup_error 설정 → /ready 503 → healthcheck
# 재시작으로 escalate(영구 degradation 차단). 일시 OOM 이면 재워밍 성공 후 정상화.
if isinstance(exc, (asyncio.TimeoutError, TimeoutError)) or _is_engine_fatal(exc):
_warmup_done = False
logger.error("[mineru-service] engine reset (timeout/fatal) path=%s: %s", p, exc)
logger.exception(f"[mineru-service] conversion failed path={p}: {exc}")
raise HTTPException(422, detail={"code": "conversion_failed",
"message": f"{type(exc).__name__}: {exc}"}) from exc
-291
View File
@@ -1,291 +0,0 @@
"""PR-MacBook-RAG-Backend-1 정정 4 핵심 테스트.
검증 invariant (synthesize 함수 레벨 /ask wrapper 503 매핑은 search.py
status="backend_unavailable" 분기로 1:1 deterministic):
1. backend="qwen-macbook" + MacBook URL 죽은 포트
synthesize() SynthesisResult(status="backend_unavailable", ...) 반환
Gemma backend generate() ** 1번도 호출되지 않음** (자동 fallback 부재)
2. backend 미지정 (None)
Gemma backend.generate() 호출, Qwen backend.generate() 호출 0
기존 호출자 (Hermes docsrv_ask / voice-memo-bot) 회귀 0
3. backend="qwen-macbook" + MacBook 정상 응답
status="completed" + answer 채워짐, Gemma backend 호출 0
테스트 전략:
- synthesize() 호출하는 backend dispatcher (services.llm.get_backend)
monkeypatch 해서 mock backend 주입.
- Gemma backend generate AsyncMock 호출 횟수를 추적.
- 정정 4 핵심 가드: `gemma_backend.generate.assert_not_called()`
"""
from __future__ import annotations
import asyncio
import os
import sys
from dataclasses import dataclass
from unittest.mock import AsyncMock
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "app"))
# ── 가짜 evidence (synthesize 의 no_evidence 분기 회피용 최소 객체) ─────────
@dataclass
class _FakeEvidence:
n: int = 1
doc_id: int = 100
chunk_id: int | None = 200
title: str | None = "fake doc"
span_text: str = "이것은 짧은 근거 텍스트입니다."
source: str = "llm"
def _make_evidence():
return [_FakeEvidence()]
# ── backend mock ───────────────────────────────────────────────────────────
def _gemma_mock(content: str = "GEMMA_SHOULD_NEVER_BE_CALLED"):
m = AsyncMock()
m.name = "gemma-macmini"
m.generate = AsyncMock(return_value=content)
return m
def _qwen_mock_success(content: str):
m = AsyncMock()
m.name = "qwen-macbook"
m.generate = AsyncMock(return_value=content)
return m
def _qwen_mock_unavailable():
from services.llm import BackendUnavailable
m = AsyncMock()
m.name = "qwen-macbook"
m.generate = AsyncMock(
side_effect=BackendUnavailable("qwen-macbook", "ConnectError")
)
return m
# ── 공통 fixture: synthesis_service 에 mock backend 주입 ───────────────────
@pytest.fixture
def patched_backends(monkeypatch):
"""services.llm.get_backend 를 mock dispatcher 로 치환.
Returns (gemma_mock, qwen_mock, set_qwen_unavailable_fn).
"""
from services.search import synthesis_service
gemma = _gemma_mock()
qwen_holder = {"backend": _qwen_mock_success(
'{"answer":"Qwen ok [1]","confidence":"high","refused":false}'
)}
def _fake_get_backend(name: str | None):
key = (name or "").strip().lower() or "gemma-macmini"
if key == "gemma-macmini":
return gemma
if key == "qwen-macbook":
return qwen_holder["backend"]
raise ValueError(f"unknown backend: {name!r}")
monkeypatch.setattr(synthesis_service, "get_backend", _fake_get_backend)
# synthesis_service 캐시 비움 (qwen vs gemma 캐시 분리 invariant)
synthesis_service._CACHE.clear()
def _swap_qwen_unavailable():
qwen_holder["backend"] = _qwen_mock_unavailable()
return gemma, qwen_holder, _swap_qwen_unavailable
# ── 정정 4 핵심: backend=qwen-macbook + MacBook 비가용 → Gemma 호출 0 ─────
def test_qwen_unavailable_yields_backend_unavailable_status_and_gemma_not_called(
patched_backends,
):
"""**정정 4 의 핵심 invariant**.
backend="qwen-macbook" 명시 + Qwen 호출이 BackendUnavailable 실패
synthesize() status="backend_unavailable" 반환. Gemma backend
generate() ** 번도 호출되지 않음** (silent fallback 금지).
"""
from services.search.synthesis_service import synthesize
gemma, qwen_holder, swap_qwen_unavailable = patched_backends
swap_qwen_unavailable()
qwen = qwen_holder["backend"]
result = asyncio.run(
synthesize(
query="압력용기 최대허용응력은?",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
# 1. status
assert result.status == "backend_unavailable"
assert result.answer is None
assert result.confidence is None
assert result.refused is False
# 2. flag 에 backend 비가용 사유 기록
assert any(
f.startswith("backend_unavailable:qwen-macbook:") for f in result.hallucination_flags
), f"expected backend_unavailable flag, got {result.hallucination_flags}"
# 3. ★ 핵심 가드 ★ — Gemma backend 자동 fallback 금지
gemma.generate.assert_not_called()
# 4. Qwen 은 1회만 호출 (재시도 없음)
assert qwen.generate.call_count == 1
def test_qwen_unavailable_result_not_cached(patched_backends):
"""비가용 결과는 캐시 X — 다음 호출이 다시 Qwen 시도해야 함."""
from services.search.synthesis_service import synthesize
gemma, qwen_holder, swap_qwen_unavailable = patched_backends
swap_qwen_unavailable()
qwen = qwen_holder["backend"]
asyncio.run(
synthesize(
query="동일 쿼리",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
asyncio.run(
synthesize(
query="동일 쿼리",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
# 두 번 모두 실제 호출 (캐시 적중 X) — Gemma 는 여전히 0
assert qwen.generate.call_count == 2
gemma.generate.assert_not_called()
# ── 정정 4: backend 미지정 → 기존 Gemma path (회귀 0) ─────────────────────
def test_default_backend_calls_gemma_not_qwen(patched_backends):
"""backend 미지정 = 기본 Gemma. Qwen 호출 0."""
from services.search.synthesis_service import synthesize
gemma, qwen_holder, _ = patched_backends
qwen = qwen_holder["backend"]
gemma.generate.return_value = (
'{"answer":"Gemma 답변 [1]","confidence":"high","refused":false}'
)
result = asyncio.run(
synthesize(
query="기본 호출",
evidence=_make_evidence(),
backend=None, # 명시 None = default
)
)
assert result.status == "completed"
assert result.answer is not None and "Gemma" in result.answer
# Qwen 은 호출 0
qwen.generate.assert_not_called()
# Gemma 는 1회
assert gemma.generate.call_count == 1
# ── backend="qwen-macbook" + 정상 응답 ──────────────────────────────────────
def test_qwen_success_does_not_call_gemma(patched_backends):
"""Qwen 정상 응답 시 Gemma 는 호출되지 않음 (대칭 invariant)."""
from services.search.synthesis_service import synthesize
gemma, qwen_holder, _ = patched_backends
qwen = qwen_holder["backend"]
result = asyncio.run(
synthesize(
query="정상 호출",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
assert result.status == "completed"
assert result.answer is not None and "Qwen" in result.answer
# Gemma 는 0회
gemma.generate.assert_not_called()
# Qwen 은 1회
assert qwen.generate.call_count == 1
# ── 캐시 분리 (qwen vs gemma 키 충돌 없음) ─────────────────────────────────
def test_qwen_and_gemma_have_separate_caches(patched_backends):
"""같은 query 라도 backend 다르면 캐시 분리 — Qwen 결과가 Gemma 호출 답으로 둔갑하지 않음."""
from services.search.synthesis_service import synthesize
gemma, qwen_holder, _ = patched_backends
qwen = qwen_holder["backend"]
gemma.generate.return_value = (
'{"answer":"GEMMA_ANSWER [1]","confidence":"high","refused":false}'
)
qwen.generate.return_value = (
'{"answer":"QWEN_ANSWER [1]","confidence":"high","refused":false}'
)
r_qwen_1 = asyncio.run(
synthesize(
query="같은 query",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
r_gemma_1 = asyncio.run(
synthesize(
query="같은 query",
evidence=_make_evidence(),
backend=None,
)
)
r_qwen_2 = asyncio.run(
synthesize(
query="같은 query",
evidence=_make_evidence(),
backend="qwen-macbook",
)
)
assert "QWEN_ANSWER" in (r_qwen_1.answer or "")
assert "GEMMA_ANSWER" in (r_gemma_1.answer or "")
# 두 번째 Qwen 호출은 캐시 적중 — 결과는 동일하지만 generate 추가 호출 X
assert "QWEN_ANSWER" in (r_qwen_2.answer or "")
assert r_qwen_2.cache_hit is True
# generate 호출 횟수: Qwen 1 (두번째는 캐시), Gemma 1
assert qwen.generate.call_count == 1
assert gemma.generate.call_count == 1
-218
View File
@@ -1,218 +0,0 @@
"""PR-DocSrv-Ask-ToolCalling-ReAct-1: /api/search/ask/react endpoint integration.
검증 항목 (G0-3 trace exposure + 정정 4 invariant):
- backend unavailable HTTP 503 + error_reason=macbook_unavailable
+ `run_search` mock 호출 횟수 == 0 (search 단계 진입 자체 차단)
- 정상 응답 200 + final_answer + sources + debug_trace=null (default)
- debug=true debug_trace 채워짐
- max rounds 도달 iterations=2 + partial=false (final content 정상)
endpoint 함수 (`api.search.ask_react`) 직접 호출하는 lightweight 패턴.
TestClient 없이 FastAPI deps MagicMock 으로 우회. (priority_gate / backend_dispatcher
test 동일 service-layer 패턴.)
"""
from __future__ import annotations
import asyncio
import json
import os
import sys
from unittest.mock import AsyncMock, MagicMock
import pytest
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "app"))
# ── helpers ────────────────────────────────────────────────────────────────
def _msg_with_tool_call(q: str, tc_id: str = "tc-1") -> dict:
return {
"role": "assistant",
"content": None,
"tool_calls": [
{
"id": tc_id,
"type": "function",
"function": {
"name": "search",
"arguments": json.dumps({"q": q}, ensure_ascii=False),
},
}
],
}
def _msg_with_content(text: str) -> dict:
return {"role": "assistant", "content": text, "tool_calls": None}
def _fake_chunk(chunk_id: int, doc_id: int = 100):
m = MagicMock()
m.id = chunk_id
m.chunk_id = chunk_id
m.doc_id = doc_id
m.title = f"doc {doc_id}"
m.score = 0.9
m.snippet = f"snippet {chunk_id}"
m.text = None
return m
def _fake_pr(chunks: list):
pr = MagicMock()
pr.results = chunks
return pr
@pytest.fixture
def patched_backend_and_search(monkeypatch):
"""get_backend + run_search 둘 다 mock. backend 의 generate_with_tools 는
테스트가 side_effect 설정.
Returns: (backend_mock, run_search_mock, set_backend_unavailable_fn).
"""
from services.llm.backends import BackendUnavailable, QwenMacBookBackend
from services.llm import backends as backends_mod
from services.search import react_loop
backend = MagicMock(spec=QwenMacBookBackend)
backend.name = "qwen-macbook"
backend.generate_with_tools = AsyncMock()
def _fake_get_backend(name):
# endpoint 가 qwen-macbook 만 호출하므로 단일 backend 반환
return backend
monkeypatch.setattr(backends_mod, "get_backend", _fake_get_backend)
# search.py 의 ask_react 안에서 `from services.llm.backends import ... get_backend`
# 로 import 하므로 module-level patch 만으로 충분 (지연 import 라 매번 fresh).
run_search_mock = AsyncMock(return_value=_fake_pr([_fake_chunk(1)]))
monkeypatch.setattr(react_loop, "run_search", run_search_mock)
def _make_unavailable():
backend.generate_with_tools.side_effect = BackendUnavailable(
"qwen-macbook", "ConnectError"
)
return backend, run_search_mock, _make_unavailable
def _call_endpoint(payload):
"""ask_react 를 직접 호출. user/session 은 MagicMock 으로 우회."""
from api.search import ask_react
user = MagicMock()
session = MagicMock()
return asyncio.run(ask_react(payload, user=user, session=session))
# ── ★ 정정 4 invariant: backend unavailable → 503 + run_search 호출 0 ──────
def test_qwen_unavailable_returns_503(patched_backend_and_search):
"""backend BackendUnavailable → HTTP 503 + error_reason=macbook_unavailable."""
from api.search import AskReactRequest
backend, run_search_mock, make_unavailable = patched_backend_and_search
make_unavailable()
response = _call_endpoint(AskReactRequest(query="Q"))
# JSONResponse instance
assert response.status_code == 503
body = json.loads(response.body)
assert body["error_reason"] == "macbook_unavailable"
assert body["backend_used"] is None
assert body["backend_requested"] == "qwen-macbook"
# ★ run_search 호출 0 (search 진입 자체 차단)
assert run_search_mock.call_count == 0
# ── 정상 200 + G0-3 default debug_trace=null ──────────────────────────────
def test_successful_response_default_no_debug_trace(patched_backend_and_search):
"""debug 미지정 (default false) → 200 + debug_trace == null."""
from api.search import AskReactRequest, AskReactResponse
backend, run_search_mock, _ = patched_backend_and_search
backend.generate_with_tools.side_effect = [
_msg_with_tool_call("q1"),
_msg_with_content("최종 답입니다"),
]
response = _call_endpoint(AskReactRequest(query="Q"))
# Pydantic instance (FastAPI response_model 적용 전 raw return)
assert isinstance(response, AskReactResponse)
assert response.final_answer == "최종 답입니다"
assert response.iterations == 2
assert response.partial is False
assert response.debug_trace is None # ★ G0-3
assert len(response.sources) == 1
# ── G0-3: debug=true → debug_trace 채워짐 ──────────────────────────────────
def test_debug_true_populates_trace(patched_backend_and_search):
from api.search import AskReactRequest
backend, run_search_mock, _ = patched_backend_and_search
backend.generate_with_tools.side_effect = [
_msg_with_content("바로 답"),
]
response = _call_endpoint(AskReactRequest(query="Q", debug=True))
assert response.debug_trace is not None
assert isinstance(response.debug_trace, list)
assert len(response.debug_trace) >= 1
# ── max rounds → final content 정상 → partial=false ──────────────────────
def test_max_rounds_with_final_content(patched_backend_and_search):
from api.search import AskReactRequest
backend, run_search_mock, _ = patched_backend_and_search
backend.generate_with_tools.side_effect = [
_msg_with_tool_call("q1"),
_msg_with_tool_call("q2", tc_id="tc-2"),
_msg_with_content("정리된 최종 답"),
]
response = _call_endpoint(AskReactRequest(query="Q"))
assert response.iterations == 2
assert response.partial is False
assert response.final_answer == "정리된 최종 답"
# LLM 호출 3회, search 2회 (G0-2 cap)
assert backend.generate_with_tools.call_count == 3
assert run_search_mock.call_count == 2
# ── max rounds + final content 빈 string → partial=true ──────────────────
def test_max_rounds_with_empty_final_partial(patched_backend_and_search):
from api.search import AskReactRequest
backend, run_search_mock, _ = patched_backend_and_search
backend.generate_with_tools.side_effect = [
_msg_with_tool_call("q1"),
_msg_with_tool_call("q2", tc_id="tc-2"),
_msg_with_content(""),
]
response = _call_endpoint(AskReactRequest(query="Q"))
assert response.iterations == 2
assert response.partial is True
assert response.final_answer == ""
-92
View File
@@ -1,92 +0,0 @@
"""Phase 3.5 fix2: /ask 의 X-Source / X-Eval-Case-Id trust boundary.
`_resolve_eval_identity()` 단위 테스트.
- token 없음/틀림 + X-Source=eval source='document_server', eval_case_id=None
- token 일치 + X-Source=eval + X-Eval-Case-Id=case_xxx ('eval', 'case_xxx')
- token 틀림 + X-Eval-Case-Id (X-Source 미지정) eval_case_id=None
- 일반 호출 (X-Source=ui_search, no eval headers) ('ui_search', None)
- env 미설정 (eval_runner_token='') 모든 eval claim 거부
"""
from __future__ import annotations
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import pytest
@pytest.fixture
def resolve_with_token(monkeypatch):
"""settings.eval_runner_token 을 monkey-patch 해서 _resolve_eval_identity 테스트."""
def _make(token: str):
from core import config as cfg_mod
from api import search as search_mod
# 두 모듈 모두에서 settings 객체 참조하므로 직접 attr 변경
monkeypatch.setattr(search_mod.settings, "eval_runner_token", token)
return search_mod._resolve_eval_identity
return _make
def test_no_token_no_eval_headers_default(resolve_with_token):
"""일반 호출 — eval 헤더 없음, source 기본값."""
resolve = resolve_with_token("secret123")
assert resolve(None, None, None) == ("document_server", None)
def test_normal_source_with_token(resolve_with_token):
"""ui_search 호출 — eval 클레임 아님이라 token 무관."""
resolve = resolve_with_token("secret123")
assert resolve("ui_search", None, None) == ("ui_search", None)
def test_eval_claim_no_token_rejected(resolve_with_token):
"""X-Source=eval 인데 token 없음 → 거부, source='document_server'."""
resolve = resolve_with_token("secret123")
assert resolve("eval", "case_001", None) == ("document_server", None)
def test_eval_claim_wrong_token_rejected(resolve_with_token):
"""token 틀림 → 거부."""
resolve = resolve_with_token("secret123")
assert resolve("eval", "case_001", "wrong_token") == ("document_server", None)
def test_eval_claim_correct_token_accepted(resolve_with_token):
"""token 일치 → 'eval' source + case_id 적재."""
resolve = resolve_with_token("secret123")
assert resolve("eval", "case_001", "secret123") == ("eval", "case_001")
def test_eval_case_id_only_no_source_no_token(resolve_with_token):
"""X-Eval-Case-Id 만 있고 token 없음 → 거부, case_id=None."""
resolve = resolve_with_token("secret123")
assert resolve(None, "case_001", None) == ("document_server", None)
def test_eval_case_id_only_wrong_token(resolve_with_token):
"""X-Eval-Case-Id 만 + token 틀림 → 거부."""
resolve = resolve_with_token("secret123")
assert resolve(None, "case_001", "wrong") == ("document_server", None)
def test_env_unset_rejects_even_correct_format(resolve_with_token):
"""settings.eval_runner_token='' 인 환경 → 모든 eval 클레임 거부."""
resolve = resolve_with_token("")
# token 헤더가 와도 server side 가 비어있으면 거부 (constant-time False)
assert resolve("eval", "case_001", "") == ("document_server", None)
assert resolve("eval", "case_001", "anything") == ("document_server", None)
def test_non_eval_source_forces_case_id_none(resolve_with_token):
"""X-Source=ui_detail + X-Eval-Case-Id (실수로 같이 보냄) → case_id=None.
eval claim 아님 (source != 'eval' 이고 case_id fallback 으로 eval claim 트리거)
이지만 source claim 명시적으로 non-eval 이라 token 검증 case_id None.
"""
resolve = resolve_with_token("secret123")
# case_id 가 있으면 eval claim 으로 처리됨 → token 없으면 거부 → ('ui_detail' 클레임,
# 하지만 거부 분기에서 claimed_source != 'eval' 이라 그대로 'ui_detail' 반환, case_id=None)
assert resolve("ui_detail", "case_001", None) == ("ui_detail", None)
-188
View File
@@ -1,188 +0,0 @@
"""Phase 3.5 B1 (fix1+fix3): unit-aware fabricated_number + bound semantics.
기준:
- 단위 일치 시에만 exact/range/tolerance clear (fix1: Codex unit-mismatch regression 방지)
- /대략/거의/얼추 approx prefix strip; 최대/최소 bound operator 보존 (fix3)
- tolerance 양적 단위(_TOLERANCE_UNITS) + 4자리+ ; 식별자성(_EXACT_ONLY_UNITS) strict
"""
from __future__ import annotations
import os
import sys
# tests/ → 프로젝트 루트 → app/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import pytest
from services.search.evidence_service import EvidenceItem
from services.search.grounding_check import check
def _ev(text: str, n: int = 1) -> EvidenceItem:
return EvidenceItem(
n=n,
chunk_id=None,
doc_id=100 + n,
title=f"doc{n}",
section_title=None,
span_text=text,
relevance=0.9,
rerank_score=0.85,
full_snippet=text,
source="llm",
)
def _has_fabricated(result, sub: str | None = None) -> bool:
for f in result.strong_flags:
if not f.startswith("fabricated_number:"):
continue
if sub is None or sub in f:
return True
return False
# ─── 콤마/prefix/range/단위 동의어/citation (기존 17 케이스) ──────
def test_comma_thousand_match():
r = check("질문", "총 1,000명 [1]", [_ev("총원은 1000명입니다.")])
assert not _has_fabricated(r, "1000")
def test_comma_thousand_reverse():
r = check("질문", "총 1000명 [1]", [_ev("총원은 1,000명입니다.")])
assert not _has_fabricated(r)
def test_approx_prefix_in_answer():
r = check("질문", "약 100명이 참여 [1]", [_ev("100명이 참여")])
assert not _has_fabricated(r)
def test_approx_prefix_in_evidence():
r = check("질문", "100명이 참여 [1]", [_ev("약 100명이 참여")])
assert not _has_fabricated(r)
def test_range_inner_value_passes():
r = check("질문", "약 150명 [1]", [_ev("100~200명 사이 추정")])
assert not _has_fabricated(r, "150")
def test_range_outer_value_flagged():
r = check("질문", "300명 [1]", [_ev("100~200명 사이 추정")])
assert _has_fabricated(r, "300")
def test_unit_synonym_in_to_myeong():
r = check("질문", "총 50인이 모임 [1]", [_ev("총 50명이 모임.")])
assert not _has_fabricated(r)
def test_unit_synonym_percent_to_pct():
r = check("질문", "비율 30퍼센트 [1]", [_ev("비율 30%이다.")])
assert not _has_fabricated(r)
def test_citation_marker_both_sides():
"""bug fix: evidence 측 [n] 미제거로 디지트 합쳐지던 케이스."""
r = check("질문", "가격 [1] 5,000원", [_ev("[2] 5,000원이 정확")])
assert not _has_fabricated(r)
def test_genuine_fabricated_number():
r = check("질문", "결과 777명 [1]", [_ev("500명, 300명을 받음.")])
assert _has_fabricated(r, "777")
def test_amount_4digit_tolerance_passes():
r = check("질문", "9,990원 [1]", [_ev("10,000원입니다.")])
assert not _has_fabricated(r)
def test_year_no_tolerance_flagged():
r = check("질문", "2024년 [1]", [_ev("2026년에 발효")])
assert _has_fabricated(r, "2024")
def test_article_no_tolerance_flagged():
r = check("질문", "제5조에 명시 [1]", [_ev("제6조에 따라")])
assert _has_fabricated(r)
def test_count_no_tolerance_flagged():
r = check("질문", "총 3회 위반 [1]", [_ev("총 4회 적발")])
assert _has_fabricated(r)
def test_three_digit_strict():
r = check("질문", "총 15개 [1]", [_ev("총 10개")])
assert _has_fabricated(r, "15")
def test_single_digit_ignored():
"""1자리 + 양적 단위 → 무시 (오탐 방지)."""
r = check("질문", "총 3개 발생 [1]", [_ev("관련 통계 별도")])
assert not _has_fabricated(r, "3개")
def test_range_korean_butter_separator():
r = check("질문", "약 150명 [1]", [_ev("100부터 200명까지 대상.")])
assert not _has_fabricated(r, "150")
# ─── fix1: unit-mismatch (Codex no-ship) ──────────────────
def test_won_vs_myeong_range_flagged():
"""answer '150원' vs evidence '100~200명' → 단위 불일치, flag 유지."""
r = check("질문", "약 150원이 든다 [1]", [_ev("대상은 100~200명")])
assert _has_fabricated(r, "150")
def test_won_vs_myeong_tolerance_flagged():
"""answer '9,990원' vs evidence '10,000명' → tolerance pool 단위 다름, flag 유지."""
r = check("질문", "9,990원 [1]", [_ev("10,000명입니다.")])
assert _has_fabricated(r, "9990")
def test_pct_vs_myeong_range_flagged():
"""answer '15%' vs evidence '10~20명' → 단위 불일치, flag 유지."""
r = check("질문", "약 15% [1]", [_ev("대상 10~20명")])
assert _has_fabricated(r, "15")
# ─── fix3: 최대/최소 bound semantics ───────────────────────
def test_choedae_exact_boundary_flagged():
"""evidence '최대 100명' + answer '100명' → 경계값 자체는 cleared 아님."""
r = check("질문", "100명이다 [1]", [_ev("최대 100명까지 가능")])
assert _has_fabricated(r, "100")
def test_choeso_exact_boundary_flagged():
"""evidence '최소 100명' + answer '100명' → 경계값 자체는 cleared 아님."""
r = check("질문", "100명이다 [1]", [_ev("최소 100명 이상 필요")])
assert _has_fabricated(r, "100")
def test_choedae_inner_value_passes():
"""evidence '최대 100명' + answer '50명' → bound 안, cleared."""
r = check("질문", "50명이다 [1]", [_ev("최대 100명까지 가능")])
assert not _has_fabricated(r, "50")
def test_choeso_above_value_passes():
"""evidence '최소 100명' + answer '150명' → bound 안, cleared."""
r = check("질문", "150명이다 [1]", [_ev("최소 100명 이상 필요")])
assert not _has_fabricated(r, "150")
def test_choedae_outer_value_flagged():
"""evidence '최대 100명' + answer '200명' → bound 밖, flag."""
r = check("질문", "200명이다 [1]", [_ev("최대 100명까지 가능")])
assert _has_fabricated(r, "200")
-123
View File
@@ -1,123 +0,0 @@
"""Phase 3.5 fix3: re-gate Tier 0 — synthesis 자체 실패 처리.
`_detect_synthesis_failure()` 단위 테스트.
기존 버그:
synthesis LLM self-refuse (`sr.refused=True, status="completed"`) 또는
timeout/parse_failed/llm_error grounding/verifier flag 0 re-gate else clean
분기로 빠져 `completeness="full"` 초기값이 남아 `full + refused=True` 모순.
baseline v1-400char 에서 24/223 (10.8%) 해당.
Tier 0 판정:
- LLM self-refuse (completed + refused) "synthesis_self_refuse"
- mechanical fail (timeout/parse_failed/llm_error) "synthesis_failed({status})"
- answer 공백 "synthesis_failed({status})"
- 유효 답변 None (기존 tier 1~7 경로)
"""
from __future__ import annotations
import os
import sys
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
from api.search import _detect_synthesis_failure
from services.search.synthesis_service import SynthesisResult
def _sr(
status: str = "completed",
answer: str | None = "ok",
refused: bool = False,
refuse_reason: str | None = None,
) -> SynthesisResult:
return SynthesisResult(
status=status, # type: ignore[arg-type]
answer=answer,
used_citations=[],
confidence="low",
refused=refused,
refuse_reason=refuse_reason,
elapsed_ms=100.0,
cache_hit=False,
)
# ─── self-refuse 케이스 ──────────────────────────────────
def test_llm_self_refuse_completed():
"""LLM 이 JSON 에 refused=true 반환 → synthesis_self_refuse."""
sr = _sr(status="completed", answer=None, refused=True, refuse_reason="범위 밖")
assert _detect_synthesis_failure(sr) == "synthesis_self_refuse"
def test_llm_self_refuse_with_answer_still_refused():
"""refused=True 면 answer 있어도 Tier 0 처리 (일관성)."""
sr = _sr(status="completed", answer="왜 답변함", refused=True)
assert _detect_synthesis_failure(sr) == "synthesis_self_refuse"
# ─── mechanical failure 케이스 ──────────────────────────
def test_timeout():
sr = _sr(status="timeout", answer=None, refused=False)
assert _detect_synthesis_failure(sr) == "synthesis_failed(timeout)"
def test_parse_failed():
sr = _sr(status="parse_failed", answer=None, refused=False)
assert _detect_synthesis_failure(sr) == "synthesis_failed(parse_failed)"
def test_llm_error():
sr = _sr(status="llm_error", answer=None, refused=False)
assert _detect_synthesis_failure(sr) == "synthesis_failed(llm_error)"
def test_refused_with_mechanical_fail_propagates_status():
"""refused=True + status!=completed → synthesis_failed({status}) 형식."""
sr = _sr(status="timeout", answer=None, refused=True)
assert _detect_synthesis_failure(sr) == "synthesis_failed(timeout)"
# ─── empty answer 케이스 ───────────────────────────────
def test_empty_answer_completed():
"""status=completed 인데 answer 공백 → synthesis_failed(completed)."""
sr = _sr(status="completed", answer="", refused=False)
assert _detect_synthesis_failure(sr) == "synthesis_failed(completed)"
def test_whitespace_only_answer():
"""공백/탭/개행만 있어도 empty 로 간주."""
sr = _sr(status="completed", answer=" \n\t ", refused=False)
assert _detect_synthesis_failure(sr) == "synthesis_failed(completed)"
def test_none_answer_completed():
"""answer=None + status=completed → failed."""
sr = _sr(status="completed", answer=None, refused=False)
assert _detect_synthesis_failure(sr) == "synthesis_failed(completed)"
# ─── 유효 답변 케이스 (None 반환) ──────────────────────
def test_valid_answer_returns_none():
"""status=completed + answer 있고 refused=False → Tier 0 통과 (None)."""
sr = _sr(status="completed", answer="교육 시간은 매년 6시간 이상이다 [1].", refused=False)
assert _detect_synthesis_failure(sr) is None
def test_skipped_status_with_answer_passes():
"""status=skipped 는 Tier 0 대상 아님 — 초기 refusal gate 에서 이미 early-return 처리됨.
(skipped 여기까지 도달하지 않는다는 전제. 만약 도달하더라도 refused True .)
"""
sr = _sr(status="skipped", answer="abc", refused=False)
# 이 경우 Tier 0 미발동 (answer 있고 refused 아님) — 정상 경로로 나감.
assert _detect_synthesis_failure(sr) is None
-58
View File
@@ -1,58 +0,0 @@
"""Phase 3.5 B2: verifier _SEVERITY_MAP env flag 테스트.
VERIFIER_NUMERIC_PROMOTE 환경변수에 따른 _SEVERITY_MAP 변화 검증.
모듈은 import time env 평가하므로 reload 필요.
"""
from __future__ import annotations
import importlib
import os
import sys
# tests/ → 프로젝트 루트 → app/
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "app"))
import pytest
def _reload_verifier(monkeypatch, value: str | None):
"""env 설정 후 verifier_service 를 reload 하여 _SEVERITY_MAP 재평가."""
if value is None:
monkeypatch.delenv("VERIFIER_NUMERIC_PROMOTE", raising=False)
else:
monkeypatch.setenv("VERIFIER_NUMERIC_PROMOTE", value)
from services.search import verifier_service
importlib.reload(verifier_service)
return verifier_service
def test_severity_map_off_default(monkeypatch):
"""env 미설정 → numeric_conflict critical 은 medium (기존 동작)."""
vs = _reload_verifier(monkeypatch, None)
assert vs._SEVERITY_MAP["numeric_conflict"]["critical"] == "medium"
assert vs._SEVERITY_MAP["numeric_conflict"]["minor"] == "medium"
assert vs._NUMERIC_PROMOTE is False
def test_severity_map_on_critical_promoted(monkeypatch):
"""VERIFIER_NUMERIC_PROMOTE=1 → critical 만 strong, minor 는 medium 유지."""
vs = _reload_verifier(monkeypatch, "1")
assert vs._SEVERITY_MAP["numeric_conflict"]["critical"] == "strong"
assert vs._SEVERITY_MAP["numeric_conflict"]["minor"] == "medium"
assert vs._NUMERIC_PROMOTE is True
def test_severity_map_off_explicit_zero(monkeypatch):
"""VERIFIER_NUMERIC_PROMOTE=0 명시 → off (default 와 동일)."""
vs = _reload_verifier(monkeypatch, "0")
assert vs._SEVERITY_MAP["numeric_conflict"]["critical"] == "medium"
assert vs._NUMERIC_PROMOTE is False
def test_direct_negation_invariant(monkeypatch):
"""direct_negation 은 env 무관 항상 strong (불변 — 안전장치)."""
for value in [None, "0", "1"]:
vs = _reload_verifier(monkeypatch, value)
assert vs._SEVERITY_MAP["direct_negation"]["critical"] == "strong"
assert vs._SEVERITY_MAP["direct_negation"]["minor"] == "strong"