feat(safety): C-1 검색 명시 필터 — material_type/jurisdiction/year 3-leg 동등 + documents exclude 해제

plan safety-library-1 C-1 (검색 핵심 경로 — byte 불변 invariant):
- AxisFilter + _axis_sql 헬퍼: 미지정 시 모든 SQL 절 빈 문자열(run_eval 회귀 0 보장)
- 3 leg 동등 적용: search_text(JOIN 후 WHERE) / _search_vector_docs(prod+cand) /
  _search_vector_chunks(★inner topk JOIN — R6 결정: outer post-filter면 ANN top-k 후
  좁은 필터 후보 붕괴. 미지정 시 JOIN 없음=byte 불변)
- SearchResult + material_type/jurisdiction/published_date (3 leg SELECT additive)
- year = COALESCE(published_date, created_at) (freshness 동일 사상)
- GET /documents/: material_type 지정 시 기본 exclude(news·law_monitor·note) 해제
- _axis_sql 단위 테스트 PASS (미지정=빈문자열+param0 / active 4절 / alias 분기)

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
This commit is contained in:
hyungi
2026-06-13 12:59:18 +09:00
parent 9a7e231dcc
commit 79deae0644
4 changed files with 112 additions and 12 deletions
+9
View File
@@ -543,6 +543,8 @@ async def list_documents(
category: str | None = Query(None, description="doc_category enum — 지정 시 기본 news/memo 제외 해제"),
has_suggestion: bool | None = Query(None, description="true: ai_suggestion IS NOT NULL"),
proposed_category: str | None = Query(None, description="ai_suggestion.proposed_category 필터"),
material_type: str | None = Query(None, description="안전 자료실 C-1: 자료유형. 지정 시 기본 exclude 해제"),
jurisdiction: str | None = Query(None, description="안전 자료실 C-1: 관할 (KR/US/...)"),
):
"""문서 목록 조회 (페이지네이션 + 필터).
@@ -556,6 +558,10 @@ async def list_documents(
if category:
# 명시적 카테고리 필터 — 기본 exclude 해제
query = query.where(Document.category == category)
elif material_type:
# 안전 자료실 C-1: material_type 지정 = 기본 exclude(news·law_monitor·note) 해제.
# 안전 코퍼스 본체(KOSHA 사례·CSB·법령 등)가 전부 note/crawl 채널이라 exclude 면 빈 화면.
query = query.where(Document.material_type == material_type)
else:
# 기본 목록: 뉴스/메모/법령 제외 (문서함 용도)
query = query.where(
@@ -564,6 +570,9 @@ async def list_documents(
Document.file_type != "note",
)
if jurisdiction:
query = query.where(Document.jurisdiction == jurisdiction)
if has_suggestion is True:
query = query.where(Document.ai_suggestion.isnot(None))
elif has_suggestion is False:
+21
View File
@@ -12,6 +12,7 @@
import asyncio
import hmac
import time
from datetime import date
from typing import Annotated, Literal
from fastapi import APIRouter, BackgroundTasks, Depends, Header, Query
@@ -31,6 +32,7 @@ from services.search.fusion_service import DEFAULT_FUSION
from services.search.grounding_check import check as grounding_check
from services.search.refusal_gate import RefusalDecision, decide as refusal_decide
from services.search import query_rewriter
from services.search.retrieval_service import AxisFilter
from services.search.search_pipeline import PipelineResult, run_search
from services.search.synthesis_service import SynthesisResult, synthesize
from services.search.verifier_service import VerifierResult, verify
@@ -70,6 +72,11 @@ class SearchResult(BaseModel):
# PR-RAG-Time-1: freshness decay 디버그 메타. apply_freshness_decay 가 채움.
# 비적용 row 도 채워짐(freshness_policy=None). base_score 는 항상 보존.
freshness_debug: dict | None = None
# 안전 자료실 C-1: 분류 축 메타 (3 leg SELECT 에서 채움 — additive, ranking 무관).
# D-1 UI 결과 카드 유형별 렌더 + 해외 법령(B-5) 가동 시 국가 무표지 혼재 차단의 선행 조건.
material_type: str | None = None
jurisdiction: str | None = None
published_date: date | None = None
# ─── Phase 0.4: 디버그 응답 스키마 ─────────────────────────
@@ -205,9 +212,22 @@ async def search(
"분리용. production 검색에는 사용 금지 (latency 큼)."
),
),
material_type: str | None = Query(
None, description="안전 자료실 C-1: 자료유형 필터 CSV (law,paper,incident,...). material_type = ANY"),
jurisdiction: str | None = Query(
None, description="안전 자료실 C-1: 관할 필터 (KR/US/EU/JP/GB/INT)"),
year_from: int | None = Query(None, ge=1900, le=2100, description="published_date 연도 하한 (NULL=created_at fallback)"),
year_to: int | None = Query(None, ge=1900, le=2100, description="published_date 연도 상한"),
):
"""문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)"""
try:
axis = AxisFilter(
material_types=[m.strip() for m in material_type.split(",") if m.strip()]
if material_type else None,
jurisdiction=jurisdiction,
year_from=year_from,
year_to=year_to,
)
pr = await run_search(
session,
q,
@@ -223,6 +243,7 @@ async def search(
rewrite_backend=rewrite_backend,
corpus_variant=corpus_variant,
exact_knn=exact_knn,
axis=axis,
)
except ValueError as e:
# _resolve_backend / _resolve_reranker / _resolve_rewrite_backend / _resolve_corpus_variant unknown slug → HTTP 400
+77 -11
View File
@@ -24,6 +24,7 @@ import asyncio
import hashlib
import re
import time
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any
from sqlalchemy import text
@@ -98,6 +99,46 @@ QWEN3_QUERY_INSTRUCT = (
"\nQuery: "
)
# ─── 안전 자료실 C-1: 분류 축 명시 필터 (3 leg 동등, byte 불변) ───────────────
# 미지정(active=False) 시 모든 SQL 절이 빈 문자열 → 기존 SQL byte 불변(run_eval 회귀 0).
# year 는 published_date NULL fallback created_at (freshness 와 동일 COALESCE 사상).
@dataclass
class AxisFilter:
material_types: list[str] | None = None # CSV → list, material_type = ANY
jurisdiction: str | None = None
year_from: int | None = None
year_to: int | None = None
def active(self) -> bool:
return bool(self.material_types or self.jurisdiction
or self.year_from is not None or self.year_to is not None)
def _axis_sql(alias: str, af: "AxisFilter | None", params: dict) -> str:
"""alias 기준 axis 필터 SQL — 미지정 시 '' (byte 불변). 반환 형태 ' AND ...'.
alias='' 이면 컬럼 직접 참조(단일 테이블 FROM documents 경로). 파라미터는 af_ prefix
호출측 기존 bind 충돌 방지.
"""
if af is None or not af.active():
return ""
p = (alias + ".") if alias else ""
cl: list[str] = []
if af.material_types:
cl.append(f"{p}material_type = ANY(:af_mt)")
params["af_mt"] = af.material_types
if af.jurisdiction:
cl.append(f"{p}jurisdiction = :af_jur")
params["af_jur"] = af.jurisdiction
if af.year_from is not None:
cl.append(f"COALESCE({p}published_date, {p}created_at::date) >= make_date(:af_yf, 1, 1)")
params["af_yf"] = af.year_from
if af.year_to is not None:
cl.append(f"COALESCE({p}published_date, {p}created_at::date) <= make_date(:af_yt, 12, 31)")
params["af_yt"] = af.year_to
return " AND " + " AND ".join(cl)
# 2단계 gate (R2-B1) — SQL string interpolation 직전 final allowlist.
_VALID_DOCS_TABLE = re.compile(r"^(documents|documents_cand_[a-z0-9_]+)$")
# corpus_chunks = document_chunks WHERE in_corpus=true 뷰 (Hier-Decomp-1 c2 choke point).
@@ -235,7 +276,7 @@ def query_embed_cache_stats() -> dict[str, int]:
async def search_text(
session: AsyncSession, query: str, limit: int
session: AsyncSession, query: str, limit: int, *, axis: "AxisFilter | None" = None
) -> list["SearchResult"]:
"""FTS + trigram 필드별 가중치 검색 (Phase 1.2-B UNION 분해).
@@ -266,8 +307,11 @@ async def search_text(
# SQLAlchemy async session 내 두 execute는 같은 connection 사용
await session.execute(text("SELECT set_limit(0.15)"))
_params: dict[str, Any] = {"q": query, "limit": limit}
_where = _axis_sql("d", axis, _params) # 미지정 시 '' (byte 불변)
result = await session.execute(
text("""
text(f"""
WITH candidates AS (
-- title trigram (idx_documents_title_trgm)
SELECT id FROM documents
@@ -320,13 +364,15 @@ async def search_text(
WHEN similarity(coalesce(d.ai_summary, ''), :q) >= 0.3 THEN 'summary'
WHEN similarity(coalesce(d.extracted_text, ''), :q) >= 0.3 THEN 'content'
ELSE 'fts'
END AS match_reason
END AS match_reason,
d.material_type, d.jurisdiction, d.published_date
FROM documents d
JOIN candidates c ON d.id = c.id
{("WHERE" + _where[4:]) if _where else ""}
ORDER BY score DESC
LIMIT :limit
"""),
{"q": query, "limit": limit},
_params,
)
return [SearchResult(**row._mapping) for row in result]
@@ -341,6 +387,7 @@ async def search_vector(
snapshot_chunk_id_max: int | None = None,
corpus_variant: str | None = None,
exact_knn: bool = False,
axis: "AxisFilter | None" = None,
) -> list["SearchResult"]:
"""Hybrid 벡터 검색 — doc + chunks 동시 retrieval (Phase 1.2-G).
@@ -415,6 +462,7 @@ async def search_vector(
docs_table=docs_table,
snapshot_doc_id_max=snapshot_doc_id_max,
exact_knn=exact_knn,
axis=axis,
)
async def _chunks_call() -> list["SearchResult"]:
@@ -424,6 +472,7 @@ async def search_vector(
chunks_table=chunks_table,
snapshot_chunk_id_max=snapshot_chunk_id_max,
exact_knn=exact_knn,
axis=axis,
)
doc_results, chunk_results = await asyncio.gather(_docs_call(), _chunks_call())
@@ -439,6 +488,7 @@ async def _search_vector_docs(
docs_table: str = "documents",
snapshot_doc_id_max: int | None = None,
exact_knn: bool = False,
axis: "AxisFilter | None" = None,
) -> list["SearchResult"]:
"""documents (또는 documents_cand_<slug>).embedding 직접 검색.
@@ -463,28 +513,32 @@ async def _search_vector_docs(
if snapshot_doc_id_max is not None:
snapshot_clause = " AND id <= :snapshot_doc_id_max"
params["snapshot_doc_id_max"] = snapshot_doc_id_max
axis_clause = _axis_sql("", axis, params) # alias 없음 (단일 FROM documents)
sql = f"""
SELECT id, title, ai_domain, ai_summary, file_format,
(1 - (embedding <=> cast(:embedding AS vector))) AS score,
left(extracted_text, 1200) AS snippet,
'vector_doc' AS match_reason,
NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title
NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title,
material_type, jurisdiction, published_date
FROM documents
WHERE embedding IS NOT NULL AND deleted_at IS NULL{snapshot_clause}
WHERE embedding IS NOT NULL AND deleted_at IS NULL{snapshot_clause}{axis_clause}
ORDER BY embedding <=> cast(:embedding AS vector)
LIMIT :limit
"""
else:
# candidate: docs_table 은 (doc_id, embed_input, embed_input_hash, embedding) 만 보유 → JOIN documents
axis_clause = _axis_sql("d", axis, params)
sql = f"""
SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format,
(1 - (c.embedding <=> cast(:embedding AS vector))) AS score,
left(d.extracted_text, 1200) AS snippet,
'vector_doc' AS match_reason,
NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title
NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title,
d.material_type, d.jurisdiction, d.published_date
FROM {docs_table} c
JOIN documents d ON d.id = c.doc_id
WHERE d.deleted_at IS NULL
WHERE d.deleted_at IS NULL{axis_clause}
ORDER BY c.embedding <=> cast(:embedding AS vector)
LIMIT :limit
"""
@@ -500,6 +554,7 @@ async def _search_vector_chunks(
chunks_table: str = "document_chunks",
snapshot_chunk_id_max: int | None = None,
exact_knn: bool = False,
axis: "AxisFilter | None" = None,
) -> list["SearchResult"]:
"""document_chunks (또는 document_chunks_cand_<slug>).embedding window partition.
@@ -525,12 +580,21 @@ async def _search_vector_chunks(
snapshot_clause = " AND c.id <= :snapshot_chunk_id_max"
params["snapshot_chunk_id_max"] = snapshot_chunk_id_max
# C-1: axis 필터는 inner topk 에 JOIN (R6 결정 — outer post-filter 면 ANN top-:inner_k
# 후보를 뽑은 뒤 거르므로 좁은 필터(GB 법령 등)에서 후보 붕괴). 미지정 시 JOIN 없음 = byte 불변.
if axis and axis.active():
chunk_join = " JOIN documents df ON df.id = c.doc_id"
chunk_axis = _axis_sql("df", axis, params)
else:
chunk_join = ""
chunk_axis = ""
sql = f"""
WITH topk AS (
SELECT c.id AS chunk_id, c.doc_id, c.chunk_index, c.section_title, c.text,
c.embedding <=> cast(:embedding AS vector) AS dist
FROM {chunks_table} c
WHERE c.embedding IS NOT NULL{snapshot_clause}
FROM {chunks_table} c{chunk_join}
WHERE c.embedding IS NOT NULL{snapshot_clause}{chunk_axis}
ORDER BY c.embedding <=> cast(:embedding AS vector)
LIMIT :inner_k
),
@@ -543,7 +607,9 @@ async def _search_vector_chunks(
d.ai_summary AS ai_summary, d.file_format AS file_format,
(1 - r.dist) AS score, left(r.text, 1200) AS snippet,
'vector_chunk' AS match_reason,
r.chunk_id AS chunk_id, r.chunk_index AS chunk_index, r.section_title AS section_title
r.chunk_id AS chunk_id, r.chunk_index AS chunk_index, r.section_title AS section_title,
d.material_type AS material_type, d.jurisdiction AS jurisdiction,
d.published_date AS published_date
FROM ranked r
JOIN documents d ON d.id = r.doc_id
WHERE r.rn <= 2 AND d.deleted_at IS NULL
+5 -1
View File
@@ -47,6 +47,7 @@ from .rerank_service import (
rerank_chunks,
)
from .retrieval_service import (
AxisFilter,
compress_chunks_to_docs,
search_text,
search_vector,
@@ -148,6 +149,7 @@ async def run_search(
rewrite_backend: str | None = None,
corpus_variant: str | None = None,
exact_knn: bool = False,
axis: AxisFilter | None = None,
) -> PipelineResult:
"""검색 파이프라인 실행.
@@ -275,6 +277,7 @@ async def run_search(
snapshot_chunk_id_max=snapshot_chunk_id_max,
corpus_variant=corpus_variant,
exact_knn=exact_knn,
axis=axis,
)
timing["vector_ms"] = (time.perf_counter() - t0) * 1000
if not raw_chunks:
@@ -284,7 +287,7 @@ async def run_search(
results = vector_results
else:
t0 = time.perf_counter()
text_results = await search_text(session, q, limit)
text_results = await search_text(session, q, limit, axis=axis)
timing["text_ms"] = (time.perf_counter() - t0) * 1000
if mode == "hybrid":
@@ -306,6 +309,7 @@ async def run_search(
snapshot_chunk_id_max=snapshot_chunk_id_max,
corpus_variant=corpus_variant,
exact_knn=exact_knn,
axis=axis,
)
timing["vector_ms"] = (time.perf_counter() - t1) * 1000