diff --git a/app/api/documents.py b/app/api/documents.py index 397eba5..d2b847b 100644 --- a/app/api/documents.py +++ b/app/api/documents.py @@ -543,6 +543,8 @@ async def list_documents( category: str | None = Query(None, description="doc_category enum — 지정 시 기본 news/memo 제외 해제"), has_suggestion: bool | None = Query(None, description="true: ai_suggestion IS NOT NULL"), proposed_category: str | None = Query(None, description="ai_suggestion.proposed_category 필터"), + material_type: str | None = Query(None, description="안전 자료실 C-1: 자료유형. 지정 시 기본 exclude 해제"), + jurisdiction: str | None = Query(None, description="안전 자료실 C-1: 관할 (KR/US/...)"), ): """문서 목록 조회 (페이지네이션 + 필터). @@ -556,6 +558,10 @@ async def list_documents( if category: # 명시적 카테고리 필터 — 기본 exclude 해제 query = query.where(Document.category == category) + elif material_type: + # 안전 자료실 C-1: material_type 지정 = 기본 exclude(news·law_monitor·note) 해제. + # 안전 코퍼스 본체(KOSHA 사례·CSB·법령 등)가 전부 note/crawl 채널이라 exclude 면 빈 화면. + query = query.where(Document.material_type == material_type) else: # 기본 목록: 뉴스/메모/법령 제외 (문서함 용도) query = query.where( @@ -564,6 +570,9 @@ async def list_documents( Document.file_type != "note", ) + if jurisdiction: + query = query.where(Document.jurisdiction == jurisdiction) + if has_suggestion is True: query = query.where(Document.ai_suggestion.isnot(None)) elif has_suggestion is False: diff --git a/app/api/search.py b/app/api/search.py index 431f0c3..65a415d 100644 --- a/app/api/search.py +++ b/app/api/search.py @@ -12,6 +12,7 @@ import asyncio import hmac import time +from datetime import date from typing import Annotated, Literal from fastapi import APIRouter, BackgroundTasks, Depends, Header, Query @@ -31,6 +32,7 @@ from services.search.fusion_service import DEFAULT_FUSION from services.search.grounding_check import check as grounding_check from services.search.refusal_gate import RefusalDecision, decide as refusal_decide from services.search import query_rewriter +from services.search.retrieval_service import AxisFilter from services.search.search_pipeline import PipelineResult, run_search from services.search.synthesis_service import SynthesisResult, synthesize from services.search.verifier_service import VerifierResult, verify @@ -70,6 +72,11 @@ class SearchResult(BaseModel): # PR-RAG-Time-1: freshness decay 디버그 메타. apply_freshness_decay 가 채움. # 비적용 row 도 채워짐(freshness_policy=None). base_score 는 항상 보존. freshness_debug: dict | None = None + # 안전 자료실 C-1: 분류 축 메타 (3 leg SELECT 에서 채움 — additive, ranking 무관). + # D-1 UI 결과 카드 유형별 렌더 + 해외 법령(B-5) 가동 시 국가 무표지 혼재 차단의 선행 조건. + material_type: str | None = None + jurisdiction: str | None = None + published_date: date | None = None # ─── Phase 0.4: 디버그 응답 스키마 ───────────────────────── @@ -205,9 +212,22 @@ async def search( "분리용. production 검색에는 사용 금지 (latency 큼)." ), ), + material_type: str | None = Query( + None, description="안전 자료실 C-1: 자료유형 필터 CSV (law,paper,incident,...). material_type = ANY"), + jurisdiction: str | None = Query( + None, description="안전 자료실 C-1: 관할 필터 (KR/US/EU/JP/GB/INT)"), + year_from: int | None = Query(None, ge=1900, le=2100, description="published_date 연도 하한 (NULL=created_at fallback)"), + year_to: int | None = Query(None, ge=1900, le=2100, description="published_date 연도 상한"), ): """문서 검색 — FTS + ILIKE + 벡터 결합 (Phase 3.1 이후 run_search wrapper)""" try: + axis = AxisFilter( + material_types=[m.strip() for m in material_type.split(",") if m.strip()] + if material_type else None, + jurisdiction=jurisdiction, + year_from=year_from, + year_to=year_to, + ) pr = await run_search( session, q, @@ -223,6 +243,7 @@ async def search( rewrite_backend=rewrite_backend, corpus_variant=corpus_variant, exact_knn=exact_knn, + axis=axis, ) except ValueError as e: # _resolve_backend / _resolve_reranker / _resolve_rewrite_backend / _resolve_corpus_variant unknown slug → HTTP 400 diff --git a/app/services/search/retrieval_service.py b/app/services/search/retrieval_service.py index 56a338b..448e25c 100644 --- a/app/services/search/retrieval_service.py +++ b/app/services/search/retrieval_service.py @@ -24,6 +24,7 @@ import asyncio import hashlib import re import time +from dataclasses import dataclass from typing import TYPE_CHECKING, Any from sqlalchemy import text @@ -98,6 +99,46 @@ QWEN3_QUERY_INSTRUCT = ( "\nQuery: " ) +# ─── 안전 자료실 C-1: 분류 축 명시 필터 (3 leg 동등, byte 불변) ─────────────── +# 미지정(active=False) 시 모든 SQL 절이 빈 문자열 → 기존 SQL byte 불변(run_eval 회귀 0). +# year 는 published_date NULL fallback created_at (freshness 와 동일 COALESCE 사상). +@dataclass +class AxisFilter: + material_types: list[str] | None = None # CSV → list, material_type = ANY + jurisdiction: str | None = None + year_from: int | None = None + year_to: int | None = None + + def active(self) -> bool: + return bool(self.material_types or self.jurisdiction + or self.year_from is not None or self.year_to is not None) + + +def _axis_sql(alias: str, af: "AxisFilter | None", params: dict) -> str: + """alias 기준 axis 필터 SQL — 미지정 시 '' (byte 불변). 반환 형태 ' AND ...'. + + alias='' 이면 컬럼 직접 참조(단일 테이블 FROM documents 경로). 파라미터는 af_ prefix + 로 호출측 기존 bind 와 충돌 방지. + """ + if af is None or not af.active(): + return "" + p = (alias + ".") if alias else "" + cl: list[str] = [] + if af.material_types: + cl.append(f"{p}material_type = ANY(:af_mt)") + params["af_mt"] = af.material_types + if af.jurisdiction: + cl.append(f"{p}jurisdiction = :af_jur") + params["af_jur"] = af.jurisdiction + if af.year_from is not None: + cl.append(f"COALESCE({p}published_date, {p}created_at::date) >= make_date(:af_yf, 1, 1)") + params["af_yf"] = af.year_from + if af.year_to is not None: + cl.append(f"COALESCE({p}published_date, {p}created_at::date) <= make_date(:af_yt, 12, 31)") + params["af_yt"] = af.year_to + return " AND " + " AND ".join(cl) + + # 2단계 gate (R2-B1) — SQL string interpolation 직전 final allowlist. _VALID_DOCS_TABLE = re.compile(r"^(documents|documents_cand_[a-z0-9_]+)$") # corpus_chunks = document_chunks WHERE in_corpus=true 뷰 (Hier-Decomp-1 c2 choke point). @@ -235,7 +276,7 @@ def query_embed_cache_stats() -> dict[str, int]: async def search_text( - session: AsyncSession, query: str, limit: int + session: AsyncSession, query: str, limit: int, *, axis: "AxisFilter | None" = None ) -> list["SearchResult"]: """FTS + trigram 필드별 가중치 검색 (Phase 1.2-B UNION 분해). @@ -266,8 +307,11 @@ async def search_text( # SQLAlchemy async session 내 두 execute는 같은 connection 사용 await session.execute(text("SELECT set_limit(0.15)")) + _params: dict[str, Any] = {"q": query, "limit": limit} + _where = _axis_sql("d", axis, _params) # 미지정 시 '' (byte 불변) + result = await session.execute( - text(""" + text(f""" WITH candidates AS ( -- title trigram (idx_documents_title_trgm) SELECT id FROM documents @@ -320,13 +364,15 @@ async def search_text( WHEN similarity(coalesce(d.ai_summary, ''), :q) >= 0.3 THEN 'summary' WHEN similarity(coalesce(d.extracted_text, ''), :q) >= 0.3 THEN 'content' ELSE 'fts' - END AS match_reason + END AS match_reason, + d.material_type, d.jurisdiction, d.published_date FROM documents d JOIN candidates c ON d.id = c.id + {("WHERE" + _where[4:]) if _where else ""} ORDER BY score DESC LIMIT :limit """), - {"q": query, "limit": limit}, + _params, ) return [SearchResult(**row._mapping) for row in result] @@ -341,6 +387,7 @@ async def search_vector( snapshot_chunk_id_max: int | None = None, corpus_variant: str | None = None, exact_knn: bool = False, + axis: "AxisFilter | None" = None, ) -> list["SearchResult"]: """Hybrid 벡터 검색 — doc + chunks 동시 retrieval (Phase 1.2-G). @@ -415,6 +462,7 @@ async def search_vector( docs_table=docs_table, snapshot_doc_id_max=snapshot_doc_id_max, exact_knn=exact_knn, + axis=axis, ) async def _chunks_call() -> list["SearchResult"]: @@ -424,6 +472,7 @@ async def search_vector( chunks_table=chunks_table, snapshot_chunk_id_max=snapshot_chunk_id_max, exact_knn=exact_knn, + axis=axis, ) doc_results, chunk_results = await asyncio.gather(_docs_call(), _chunks_call()) @@ -439,6 +488,7 @@ async def _search_vector_docs( docs_table: str = "documents", snapshot_doc_id_max: int | None = None, exact_knn: bool = False, + axis: "AxisFilter | None" = None, ) -> list["SearchResult"]: """documents (또는 documents_cand_).embedding 직접 검색. @@ -463,28 +513,32 @@ async def _search_vector_docs( if snapshot_doc_id_max is not None: snapshot_clause = " AND id <= :snapshot_doc_id_max" params["snapshot_doc_id_max"] = snapshot_doc_id_max + axis_clause = _axis_sql("", axis, params) # alias 없음 (단일 FROM documents) sql = f""" SELECT id, title, ai_domain, ai_summary, file_format, (1 - (embedding <=> cast(:embedding AS vector))) AS score, left(extracted_text, 1200) AS snippet, 'vector_doc' AS match_reason, - NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title + NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title, + material_type, jurisdiction, published_date FROM documents - WHERE embedding IS NOT NULL AND deleted_at IS NULL{snapshot_clause} + WHERE embedding IS NOT NULL AND deleted_at IS NULL{snapshot_clause}{axis_clause} ORDER BY embedding <=> cast(:embedding AS vector) LIMIT :limit """ else: # candidate: docs_table 은 (doc_id, embed_input, embed_input_hash, embedding) 만 보유 → JOIN documents + axis_clause = _axis_sql("d", axis, params) sql = f""" SELECT d.id, d.title, d.ai_domain, d.ai_summary, d.file_format, (1 - (c.embedding <=> cast(:embedding AS vector))) AS score, left(d.extracted_text, 1200) AS snippet, 'vector_doc' AS match_reason, - NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title + NULL::bigint AS chunk_id, NULL::integer AS chunk_index, NULL::text AS section_title, + d.material_type, d.jurisdiction, d.published_date FROM {docs_table} c JOIN documents d ON d.id = c.doc_id - WHERE d.deleted_at IS NULL + WHERE d.deleted_at IS NULL{axis_clause} ORDER BY c.embedding <=> cast(:embedding AS vector) LIMIT :limit """ @@ -500,6 +554,7 @@ async def _search_vector_chunks( chunks_table: str = "document_chunks", snapshot_chunk_id_max: int | None = None, exact_knn: bool = False, + axis: "AxisFilter | None" = None, ) -> list["SearchResult"]: """document_chunks (또는 document_chunks_cand_).embedding window partition. @@ -525,12 +580,21 @@ async def _search_vector_chunks( snapshot_clause = " AND c.id <= :snapshot_chunk_id_max" params["snapshot_chunk_id_max"] = snapshot_chunk_id_max + # C-1: axis 필터는 inner topk 에 JOIN (R6 결정 — outer post-filter 면 ANN top-:inner_k + # 후보를 뽑은 뒤 거르므로 좁은 필터(GB 법령 등)에서 후보 붕괴). 미지정 시 JOIN 없음 = byte 불변. + if axis and axis.active(): + chunk_join = " JOIN documents df ON df.id = c.doc_id" + chunk_axis = _axis_sql("df", axis, params) + else: + chunk_join = "" + chunk_axis = "" + sql = f""" WITH topk AS ( SELECT c.id AS chunk_id, c.doc_id, c.chunk_index, c.section_title, c.text, c.embedding <=> cast(:embedding AS vector) AS dist - FROM {chunks_table} c - WHERE c.embedding IS NOT NULL{snapshot_clause} + FROM {chunks_table} c{chunk_join} + WHERE c.embedding IS NOT NULL{snapshot_clause}{chunk_axis} ORDER BY c.embedding <=> cast(:embedding AS vector) LIMIT :inner_k ), @@ -543,7 +607,9 @@ async def _search_vector_chunks( d.ai_summary AS ai_summary, d.file_format AS file_format, (1 - r.dist) AS score, left(r.text, 1200) AS snippet, 'vector_chunk' AS match_reason, - r.chunk_id AS chunk_id, r.chunk_index AS chunk_index, r.section_title AS section_title + r.chunk_id AS chunk_id, r.chunk_index AS chunk_index, r.section_title AS section_title, + d.material_type AS material_type, d.jurisdiction AS jurisdiction, + d.published_date AS published_date FROM ranked r JOIN documents d ON d.id = r.doc_id WHERE r.rn <= 2 AND d.deleted_at IS NULL diff --git a/app/services/search/search_pipeline.py b/app/services/search/search_pipeline.py index 0d7d9b9..59b1acb 100644 --- a/app/services/search/search_pipeline.py +++ b/app/services/search/search_pipeline.py @@ -47,6 +47,7 @@ from .rerank_service import ( rerank_chunks, ) from .retrieval_service import ( + AxisFilter, compress_chunks_to_docs, search_text, search_vector, @@ -148,6 +149,7 @@ async def run_search( rewrite_backend: str | None = None, corpus_variant: str | None = None, exact_knn: bool = False, + axis: AxisFilter | None = None, ) -> PipelineResult: """검색 파이프라인 실행. @@ -275,6 +277,7 @@ async def run_search( snapshot_chunk_id_max=snapshot_chunk_id_max, corpus_variant=corpus_variant, exact_knn=exact_knn, + axis=axis, ) timing["vector_ms"] = (time.perf_counter() - t0) * 1000 if not raw_chunks: @@ -284,7 +287,7 @@ async def run_search( results = vector_results else: t0 = time.perf_counter() - text_results = await search_text(session, q, limit) + text_results = await search_text(session, q, limit, axis=axis) timing["text_ms"] = (time.perf_counter() - t0) * 1000 if mode == "hybrid": @@ -306,6 +309,7 @@ async def run_search( snapshot_chunk_id_max=snapshot_chunk_id_max, corpus_variant=corpus_variant, exact_knn=exact_knn, + axis=axis, ) timing["vector_ms"] = (time.perf_counter() - t1) * 1000