feat(chunk): Phase 0.1 chunk 인덱싱 — ORM/worker/migration 정리

GPU 서버에 untracked로만 존재하던 Phase 0.1 코드를 정식 commit:
- app/models/chunk.py — DocumentChunk ORM (country/source/domain 메타 포함)
- app/workers/chunk_worker.py — 6가지 chunking 전략 (legal/news/markdown/email/long_pdf/default)
- migrations/014_document_chunks.sql — pgvector + FTS + trigram 인덱스
- app/models/queue.py — ProcessingQueue enum에 'chunk' stage 추가
- app/workers/queue_consumer.py — chunk stage 등록, classify→[embed,chunk] 자동 연결

Phase 1 reranker 통합 작업의 전제 조건. document_chunks 테이블 기반 retrieval에 사용.
This commit is contained in:
Hyungi Ahn
2026-04-07 13:26:37 +09:00
parent a2941487fe
commit 378fbc7845
5 changed files with 442 additions and 3 deletions

46
app/models/chunk.py Normal file
View File

@@ -0,0 +1,46 @@
"""document_chunks 테이블 ORM — chunk 단위 검색 (Phase 0.1)"""
from datetime import datetime
from pgvector.sqlalchemy import Vector
from sqlalchemy import BigInteger, DateTime, ForeignKey, Integer, String, Text, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from core.database import Base
class DocumentChunk(Base):
__tablename__ = "document_chunks"
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
doc_id: Mapped[int] = mapped_column(
BigInteger, ForeignKey("documents.id", ondelete="CASCADE"), nullable=False
)
chunk_index: Mapped[int] = mapped_column(Integer, nullable=False)
# chunking 전략 메타
chunk_type: Mapped[str] = mapped_column(String(30), nullable=False)
section_title: Mapped[str | None] = mapped_column(Text)
heading_path: Mapped[str | None] = mapped_column(Text)
page: Mapped[int | None] = mapped_column(Integer)
# 다국어/domain 메타
language: Mapped[str | None] = mapped_column(String(10))
country: Mapped[str | None] = mapped_column(String(10))
source: Mapped[str | None] = mapped_column(String(100))
domain_category: Mapped[str] = mapped_column(String(20), nullable=False)
# 본문 + 임베딩
text: Mapped[str] = mapped_column(Text, nullable=False)
embedding = mapped_column(Vector(1024), nullable=True)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), default=datetime.now, onupdate=datetime.now
)
__table_args__ = (
UniqueConstraint("doc_id", "chunk_index", name="uq_chunks_doc_index"),
)

View File

@@ -14,7 +14,7 @@ class ProcessingQueue(Base):
id: Mapped[int] = mapped_column(BigInteger, primary_key=True)
document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False)
stage: Mapped[str] = mapped_column(
Enum("extract", "classify", "summarize", "embed", "preview", name="process_stage"), nullable=False
Enum("extract", "classify", "summarize", "embed", "chunk", "preview", name="process_stage"), nullable=False
)
status: Mapped[str] = mapped_column(
Enum("pending", "processing", "completed", "failed", name="process_status"),

337
app/workers/chunk_worker.py Normal file
View File

@@ -0,0 +1,337 @@
"""Chunk 워커 — 문서 유형별 chunking + bge-m3 임베딩 (Phase 0.1)
승부처는 chunk 품질. 문서 유형별로 다른 전략:
- 법령: 조/항 단위 (구조적, overlap 불필요)
- 뉴스: 문단 단위 (overlap ~15%)
- 일반 문서: 슬라이딩 윈도우 (overlap 15-25%)
- 긴 PDF: 슬라이딩 윈도우 (overlap 20-30%)
- 마크다운: heading section 단위 (overlap 없음)
- 이메일: 본문 전체 (대부분 짧음)
"""
import re
from datetime import datetime, timezone
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from ai.client import AIClient
from core.utils import setup_logger
from models.chunk import DocumentChunk
from models.document import Document
from models.news_source import NewsSource
logger = setup_logger("chunk_worker")
# ─── 상수 ───
# 문자 기준(bge-m3는 8192 토큰 여유 있음, 한국어 1토큰≈2.5자)
DEFAULT_WINDOW_CHARS = 1500 # ~600 tokens (ko 기준)
DEFAULT_OVERLAP_CHARS = 300 # ~20% overlap
LONG_PDF_WINDOW_CHARS = 2000 # ~800 tokens
LONG_PDF_OVERLAP_CHARS = 500 # ~25% overlap
NEWS_OVERLAP_CHARS = 150 # ~15%
MIN_CHUNK_CHARS = 50 # 너무 짧은 chunk는 버림
# ─── 언어 감지 (간단한 휴리스틱) ───
def _detect_language(text: str) -> str:
"""문자 비율 기반 언어 감지"""
if not text:
return "unknown"
sample = text[:2000]
ko = sum(1 for c in sample if "\uac00" <= c <= "\ud7a3")
ja = sum(1 for c in sample if "\u3040" <= c <= "\u30ff")
zh = sum(1 for c in sample if "\u4e00" <= c <= "\u9fff")
en = sum(1 for c in sample if c.isascii() and c.isalpha())
total = ko + ja + zh + en
if total == 0:
return "unknown"
# CJK 우선 (한중일은 한자 overlap이 있으므로 순서 중요)
if ja / total > 0.1:
return "ja"
if ko / total > 0.2:
return "ko"
if zh / total > 0.2:
return "zh"
if en / total > 0.5:
return "en"
return "ko" # 기본값
# ─── 문서 유형 판별 ───
def _classify_chunk_strategy(doc: Document) -> str:
"""문서 유형에 따라 chunking 전략 선택"""
if doc.source_channel == "news":
return "news"
if doc.ai_domain and "Legislation" in doc.ai_domain:
return "legal"
if doc.file_format == "md" or doc.file_format == "markdown":
return "markdown"
if doc.file_format in ("eml", "msg"):
return "email"
if doc.file_format == "pdf":
# 본문 길이로 긴 PDF 구분
if doc.extracted_text and len(doc.extracted_text) > 20000:
return "long_pdf"
return "pdf"
return "default"
# ─── Chunking 전략 ───
def _chunk_legal(text: str) -> list[dict]:
"""법령: 제N조 단위로 분할 (상위 조문 컨텍스트 보존)"""
# "제 1 조", "제1조", "제 1 조(제목)" 등 매칭
pattern = re.compile(r"(제\s*\d+\s*조(?:의\s*\d+)?(?:\([^)]*\))?)")
parts = pattern.split(text)
chunks = []
# parts[0] = 조 이전 서문, parts[1], parts[2] = (마커, 본문) pairs
if parts[0].strip() and len(parts[0]) >= MIN_CHUNK_CHARS:
chunks.append({
"text": parts[0].strip()[:DEFAULT_WINDOW_CHARS],
"chunk_type": "section",
"section_title": "서문",
})
i = 1
while i < len(parts):
marker = parts[i]
body = parts[i + 1] if i + 1 < len(parts) else ""
full = f"{marker} {body}".strip()
if len(full) >= MIN_CHUNK_CHARS:
# 너무 길면 슬라이싱 (조문이 매우 긴 경우)
if len(full) <= DEFAULT_WINDOW_CHARS:
chunks.append({
"text": full,
"chunk_type": "legal_article",
"section_title": marker.strip(),
})
else:
# 긴 조문은 윈도우로 추가 분할
for offset in range(0, len(full), DEFAULT_WINDOW_CHARS - 200):
sub = full[offset : offset + DEFAULT_WINDOW_CHARS]
if len(sub) >= MIN_CHUNK_CHARS:
chunks.append({
"text": sub,
"chunk_type": "legal_article",
"section_title": marker.strip(),
})
i += 2
# 법령이지만 조문 패턴이 없으면 기본 슬라이딩 윈도우로 fallback
if not chunks:
return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "section")
return chunks
def _chunk_news(text: str) -> list[dict]:
"""뉴스: 문단 단위 (빈 줄 기준), 너무 짧으면 병합"""
paragraphs = [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()]
chunks = []
buffer = ""
for p in paragraphs:
if len(buffer) + len(p) < DEFAULT_WINDOW_CHARS:
buffer = f"{buffer}\n\n{p}".strip() if buffer else p
else:
if len(buffer) >= MIN_CHUNK_CHARS:
chunks.append({"text": buffer, "chunk_type": "paragraph", "section_title": None})
buffer = p
if buffer and len(buffer) >= MIN_CHUNK_CHARS:
chunks.append({"text": buffer, "chunk_type": "paragraph", "section_title": None})
if not chunks:
return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, NEWS_OVERLAP_CHARS, "paragraph")
return chunks
def _chunk_markdown(text: str) -> list[dict]:
"""마크다운: heading section 단위"""
# '#', '##', '###' 기준 분할
pattern = re.compile(r"^(#{1,6}\s+.+)$", re.MULTILINE)
matches = list(pattern.finditer(text))
chunks = []
if not matches:
return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "section")
# 첫 heading 이전 서문
if matches[0].start() > 0:
preface = text[: matches[0].start()].strip()
if len(preface) >= MIN_CHUNK_CHARS:
chunks.append({"text": preface, "chunk_type": "section", "section_title": "서문"})
for i, m in enumerate(matches):
start = m.start()
end = matches[i + 1].start() if i + 1 < len(matches) else len(text)
section_text = text[start:end].strip()
heading = m.group(1).strip("# ").strip()
if len(section_text) < MIN_CHUNK_CHARS:
continue
# 긴 섹션은 추가 분할
if len(section_text) <= DEFAULT_WINDOW_CHARS:
chunks.append({
"text": section_text,
"chunk_type": "section",
"section_title": heading,
})
else:
sub_chunks = _chunk_sliding(
section_text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "section"
)
for sc in sub_chunks:
sc["section_title"] = heading
chunks.append(sc)
return chunks
def _chunk_email(text: str) -> list[dict]:
"""이메일: 본문 전체 (짧음)"""
text = text.strip()
if len(text) < MIN_CHUNK_CHARS:
return []
# 너무 길면 슬라이딩으로 분할
if len(text) > DEFAULT_WINDOW_CHARS * 2:
return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "email_body")
return [{"text": text, "chunk_type": "email_body", "section_title": None}]
def _chunk_sliding(
text: str, window: int, overlap: int, chunk_type: str
) -> list[dict]:
"""슬라이딩 윈도우 분할 (문장 경계 가능한 한 보존)"""
chunks = []
stride = window - overlap
if stride <= 0:
stride = window
i = 0
while i < len(text):
end = min(i + window, len(text))
# 문장 경계에 맞춰 조정 (끝에 가까운 마침표/줄바꿈)
if end < len(text):
for punct in [". ", ".\n", "", "\n\n", "\n"]:
cut = text.rfind(punct, max(i + window - 300, i), end)
if cut > i:
end = cut + len(punct)
break
chunk_text = text[i:end].strip()
if len(chunk_text) >= MIN_CHUNK_CHARS:
chunks.append({
"text": chunk_text,
"chunk_type": chunk_type,
"section_title": None,
})
if end >= len(text):
break
i = max(end - overlap, i + 1)
return chunks
def _chunk_document(doc: Document) -> list[dict]:
"""문서 유형별 chunking 디스패처"""
text = doc.extracted_text or ""
if not text.strip():
return []
strategy = _classify_chunk_strategy(doc)
if strategy == "legal":
return _chunk_legal(text)
if strategy == "news":
return _chunk_news(text)
if strategy == "markdown":
return _chunk_markdown(text)
if strategy == "email":
return _chunk_email(text)
if strategy == "long_pdf":
return _chunk_sliding(text, LONG_PDF_WINDOW_CHARS, LONG_PDF_OVERLAP_CHARS, "window")
# default (pdf, general)
return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "window")
# ─── 뉴스 소스 메타데이터 조회 ───
async def _lookup_news_source(
session: AsyncSession, doc: Document
) -> tuple[str | None, str | None, str | None]:
"""뉴스 문서의 country/source/language를 news_sources에서 조회
매칭 방식: doc.ai_sub_group = source.name.split(' ')[0]
"""
if doc.source_channel != "news":
return None, None, None
source_name = doc.ai_sub_group or ""
if not source_name:
return None, None, None
# news_sources에서 이름이 일치하는 레코드 찾기 (prefix match)
result = await session.execute(select(NewsSource))
sources = result.scalars().all()
for src in sources:
if src.name.split(" ")[0] == source_name:
return src.country, src.name, src.language
return None, source_name, None
# ─── 메인 워커 함수 ───
async def process(document_id: int, session: AsyncSession) -> None:
"""문서를 chunks로 분할하고 bge-m3로 임베딩"""
doc = await session.get(Document, document_id)
if not doc:
raise ValueError(f"문서 ID {document_id}를 찾을 수 없음")
if not doc.extracted_text:
logger.warning(f"[chunk] document_id={document_id}: extracted_text 없음, 스킵")
return
# chunking
chunk_dicts = _chunk_document(doc)
if not chunk_dicts:
logger.warning(f"[chunk] document_id={document_id}: chunks 생성 실패")
return
# 메타데이터 준비
language = _detect_language(doc.extracted_text)
country, source, src_lang = await _lookup_news_source(session, doc)
if src_lang:
language = src_lang
domain_category = "news" if doc.source_channel == "news" else "document"
# 기존 chunks 삭제 (재처리)
await session.execute(delete(DocumentChunk).where(DocumentChunk.doc_id == document_id))
# 임베딩 + 저장
client = AIClient()
try:
for idx, c in enumerate(chunk_dicts):
try:
embedding = await client.embed(c["text"])
except Exception as e:
logger.warning(f"[chunk] document_id={document_id} chunk {idx} 임베딩 실패: {e}")
embedding = None
chunk = DocumentChunk(
doc_id=document_id,
chunk_index=idx,
chunk_type=c["chunk_type"],
section_title=c.get("section_title"),
heading_path=None, # 추후 마크다운 tree에서 채움
page=None, # 추후 PDF 파서에서 채움
language=language,
country=country,
source=source,
domain_category=domain_category,
text=c["text"],
embedding=embedding,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
session.add(chunk)
logger.info(
f"[chunk] document_id={document_id}: {len(chunk_dicts)}개 chunks 생성 "
f"(strategy={_classify_chunk_strategy(doc)}, lang={language}, "
f"domain={domain_category}, country={country})"
)
finally:
await client.close()

View File

@@ -11,7 +11,7 @@ from models.queue import ProcessingQueue
logger = setup_logger("queue_consumer")
# stage별 배치 크기
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "preview": 2}
BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, "preview": 2}
STALE_THRESHOLD_MINUTES = 10
@@ -34,7 +34,7 @@ async def reset_stale_items():
async def enqueue_next_stage(document_id: int, current_stage: str):
"""현재 stage 완료 후 다음 stage를 pending으로 등록"""
next_stages = {"extract": ["classify", "preview"], "classify": ["embed"]}
next_stages = {"extract": ["classify", "preview"], "classify": ["embed", "chunk"]}
stages = next_stages.get(current_stage, [])
if not stages:
return
@@ -62,6 +62,7 @@ async def enqueue_next_stage(document_id: int, current_stage: str):
async def consume_queue():
"""큐에서 pending 항목을 가져와 stage별 워커 실행"""
from workers.classify_worker import process as classify_process
from workers.chunk_worker import process as chunk_process
from workers.embed_worker import process as embed_process
from workers.extract_worker import process as extract_process
from workers.preview_worker import process as preview_process
@@ -72,6 +73,7 @@ async def consume_queue():
"classify": classify_process,
"summarize": summarize_process,
"embed": embed_process,
"chunk": chunk_process,
"preview": preview_process,
}

View File

@@ -0,0 +1,54 @@
-- 문서 chunk 단위 검색 테이블 (chunk 중심 retrieval)
-- Phase 0.1: 문서를 의미 단위(조항/문단/슬라이딩 윈도우)로 쪼개어 별도 테이블에 저장
-- 검색 품질 승부처는 chunk 품질 + reranker 입력 품질
CREATE TABLE document_chunks (
id BIGSERIAL PRIMARY KEY,
doc_id BIGINT REFERENCES documents(id) ON DELETE CASCADE NOT NULL,
chunk_index INTEGER NOT NULL, -- 문서 내 순서
-- chunking 전략 메타
chunk_type VARCHAR(30) NOT NULL, -- legal_article | paragraph | section | window | email_body
section_title TEXT, -- "제80조", "3.1 Methods" 등
heading_path TEXT, -- "Chapter 3 > Section 2"
page INTEGER, -- PDF 페이지 번호
-- 다국어 및 domain-aware retrieval 메타
language VARCHAR(10), -- ko | en | ja | zh | fr | de
country VARCHAR(10), -- KR | US | JP | CN | FR | DE (뉴스 필수)
source VARCHAR(100), -- 경향신문 | NYT | ... (뉴스 필수)
domain_category VARCHAR(20) NOT NULL, -- document | news (파이프라인 분기 키)
-- 본문 + 임베딩
text TEXT NOT NULL,
embedding vector(1024), -- bge-m3
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW(),
UNIQUE (doc_id, chunk_index)
);
-- 벡터 유사도 인덱스 (IVFFlat, cosine)
-- lists는 데이터 증가에 따라 조정 필요. 초기 500-5000 chunks 기준.
CREATE INDEX idx_chunks_embedding ON document_chunks
USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
-- FTS 인덱스 (simple tokenizer — 한국어는 trigram 보조)
CREATE INDEX idx_chunks_fts ON document_chunks
USING GIN (to_tsvector('simple', text));
-- Trigram 인덱스 (한국어/부분매치 보조)
CREATE EXTENSION IF NOT EXISTS pg_trgm;
CREATE INDEX idx_chunks_trgm ON document_chunks
USING GIN (text gin_trgm_ops);
-- 필터용 B-tree 인덱스
CREATE INDEX idx_chunks_doc_id ON document_chunks (doc_id);
CREATE INDEX idx_chunks_domain_category ON document_chunks (domain_category);
CREATE INDEX idx_chunks_country ON document_chunks (country) WHERE country IS NOT NULL;
CREATE INDEX idx_chunks_source ON document_chunks (source) WHERE source IS NOT NULL;
CREATE INDEX idx_chunks_language ON document_chunks (language) WHERE language IS NOT NULL;
-- chunk stage 추가 (processing_queue에서 chunk 단계로 처리)
ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'chunk';