From 378fbc78459c735eff76d2f8e62615956a9b884a Mon Sep 17 00:00:00 2001 From: Hyungi Ahn Date: Tue, 7 Apr 2026 13:26:37 +0900 Subject: [PATCH] =?UTF-8?q?feat(chunk):=20Phase=200.1=20chunk=20=EC=9D=B8?= =?UTF-8?q?=EB=8D=B1=EC=8B=B1=20=E2=80=94=20ORM/worker/migration=20?= =?UTF-8?q?=EC=A0=95=EB=A6=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit GPU 서버에 untracked로만 존재하던 Phase 0.1 코드를 정식 commit: - app/models/chunk.py — DocumentChunk ORM (country/source/domain 메타 포함) - app/workers/chunk_worker.py — 6가지 chunking 전략 (legal/news/markdown/email/long_pdf/default) - migrations/014_document_chunks.sql — pgvector + FTS + trigram 인덱스 - app/models/queue.py — ProcessingQueue enum에 'chunk' stage 추가 - app/workers/queue_consumer.py — chunk stage 등록, classify→[embed,chunk] 자동 연결 Phase 1 reranker 통합 작업의 전제 조건. document_chunks 테이블 기반 retrieval에 사용. --- app/models/chunk.py | 46 ++++ app/models/queue.py | 2 +- app/workers/chunk_worker.py | 337 +++++++++++++++++++++++++++++ app/workers/queue_consumer.py | 6 +- migrations/014_document_chunks.sql | 54 +++++ 5 files changed, 442 insertions(+), 3 deletions(-) create mode 100644 app/models/chunk.py create mode 100644 app/workers/chunk_worker.py create mode 100644 migrations/014_document_chunks.sql diff --git a/app/models/chunk.py b/app/models/chunk.py new file mode 100644 index 0000000..cc37364 --- /dev/null +++ b/app/models/chunk.py @@ -0,0 +1,46 @@ +"""document_chunks 테이블 ORM — chunk 단위 검색 (Phase 0.1)""" + +from datetime import datetime + +from pgvector.sqlalchemy import Vector +from sqlalchemy import BigInteger, DateTime, ForeignKey, Integer, String, Text, UniqueConstraint +from sqlalchemy.orm import Mapped, mapped_column, relationship + +from core.database import Base + + +class DocumentChunk(Base): + __tablename__ = "document_chunks" + + id: Mapped[int] = mapped_column(BigInteger, primary_key=True) + doc_id: Mapped[int] = mapped_column( + BigInteger, ForeignKey("documents.id", ondelete="CASCADE"), nullable=False + ) + chunk_index: Mapped[int] = mapped_column(Integer, nullable=False) + + # chunking 전략 메타 + chunk_type: Mapped[str] = mapped_column(String(30), nullable=False) + section_title: Mapped[str | None] = mapped_column(Text) + heading_path: Mapped[str | None] = mapped_column(Text) + page: Mapped[int | None] = mapped_column(Integer) + + # 다국어/domain 메타 + language: Mapped[str | None] = mapped_column(String(10)) + country: Mapped[str | None] = mapped_column(String(10)) + source: Mapped[str | None] = mapped_column(String(100)) + domain_category: Mapped[str] = mapped_column(String(20), nullable=False) + + # 본문 + 임베딩 + text: Mapped[str] = mapped_column(Text, nullable=False) + embedding = mapped_column(Vector(1024), nullable=True) + + created_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now + ) + updated_at: Mapped[datetime] = mapped_column( + DateTime(timezone=True), default=datetime.now, onupdate=datetime.now + ) + + __table_args__ = ( + UniqueConstraint("doc_id", "chunk_index", name="uq_chunks_doc_index"), + ) diff --git a/app/models/queue.py b/app/models/queue.py index a388272..480729b 100644 --- a/app/models/queue.py +++ b/app/models/queue.py @@ -14,7 +14,7 @@ class ProcessingQueue(Base): id: Mapped[int] = mapped_column(BigInteger, primary_key=True) document_id: Mapped[int] = mapped_column(BigInteger, ForeignKey("documents.id"), nullable=False) stage: Mapped[str] = mapped_column( - Enum("extract", "classify", "summarize", "embed", "preview", name="process_stage"), nullable=False + Enum("extract", "classify", "summarize", "embed", "chunk", "preview", name="process_stage"), nullable=False ) status: Mapped[str] = mapped_column( Enum("pending", "processing", "completed", "failed", name="process_status"), diff --git a/app/workers/chunk_worker.py b/app/workers/chunk_worker.py new file mode 100644 index 0000000..d67ccd1 --- /dev/null +++ b/app/workers/chunk_worker.py @@ -0,0 +1,337 @@ +"""Chunk 워커 — 문서 유형별 chunking + bge-m3 임베딩 (Phase 0.1) + +승부처는 chunk 품질. 문서 유형별로 다른 전략: +- 법령: 조/항 단위 (구조적, overlap 불필요) +- 뉴스: 문단 단위 (overlap ~15%) +- 일반 문서: 슬라이딩 윈도우 (overlap 15-25%) +- 긴 PDF: 슬라이딩 윈도우 (overlap 20-30%) +- 마크다운: heading section 단위 (overlap 없음) +- 이메일: 본문 전체 (대부분 짧음) +""" + +import re +from datetime import datetime, timezone + +from sqlalchemy import delete, select +from sqlalchemy.ext.asyncio import AsyncSession + +from ai.client import AIClient +from core.utils import setup_logger +from models.chunk import DocumentChunk +from models.document import Document +from models.news_source import NewsSource + +logger = setup_logger("chunk_worker") + +# ─── 상수 ─── +# 문자 기준(bge-m3는 8192 토큰 여유 있음, 한국어 1토큰≈2.5자) +DEFAULT_WINDOW_CHARS = 1500 # ~600 tokens (ko 기준) +DEFAULT_OVERLAP_CHARS = 300 # ~20% overlap +LONG_PDF_WINDOW_CHARS = 2000 # ~800 tokens +LONG_PDF_OVERLAP_CHARS = 500 # ~25% overlap +NEWS_OVERLAP_CHARS = 150 # ~15% +MIN_CHUNK_CHARS = 50 # 너무 짧은 chunk는 버림 + + +# ─── 언어 감지 (간단한 휴리스틱) ─── +def _detect_language(text: str) -> str: + """문자 비율 기반 언어 감지""" + if not text: + return "unknown" + sample = text[:2000] + ko = sum(1 for c in sample if "\uac00" <= c <= "\ud7a3") + ja = sum(1 for c in sample if "\u3040" <= c <= "\u30ff") + zh = sum(1 for c in sample if "\u4e00" <= c <= "\u9fff") + en = sum(1 for c in sample if c.isascii() and c.isalpha()) + total = ko + ja + zh + en + if total == 0: + return "unknown" + # CJK 우선 (한중일은 한자 overlap이 있으므로 순서 중요) + if ja / total > 0.1: + return "ja" + if ko / total > 0.2: + return "ko" + if zh / total > 0.2: + return "zh" + if en / total > 0.5: + return "en" + return "ko" # 기본값 + + +# ─── 문서 유형 판별 ─── +def _classify_chunk_strategy(doc: Document) -> str: + """문서 유형에 따라 chunking 전략 선택""" + if doc.source_channel == "news": + return "news" + if doc.ai_domain and "Legislation" in doc.ai_domain: + return "legal" + if doc.file_format == "md" or doc.file_format == "markdown": + return "markdown" + if doc.file_format in ("eml", "msg"): + return "email" + if doc.file_format == "pdf": + # 본문 길이로 긴 PDF 구분 + if doc.extracted_text and len(doc.extracted_text) > 20000: + return "long_pdf" + return "pdf" + return "default" + + +# ─── Chunking 전략 ─── +def _chunk_legal(text: str) -> list[dict]: + """법령: 제N조 단위로 분할 (상위 조문 컨텍스트 보존)""" + # "제 1 조", "제1조", "제 1 조(제목)" 등 매칭 + pattern = re.compile(r"(제\s*\d+\s*조(?:의\s*\d+)?(?:\([^)]*\))?)") + parts = pattern.split(text) + + chunks = [] + # parts[0] = 조 이전 서문, parts[1], parts[2] = (마커, 본문) pairs + if parts[0].strip() and len(parts[0]) >= MIN_CHUNK_CHARS: + chunks.append({ + "text": parts[0].strip()[:DEFAULT_WINDOW_CHARS], + "chunk_type": "section", + "section_title": "서문", + }) + + i = 1 + while i < len(parts): + marker = parts[i] + body = parts[i + 1] if i + 1 < len(parts) else "" + full = f"{marker} {body}".strip() + if len(full) >= MIN_CHUNK_CHARS: + # 너무 길면 슬라이싱 (조문이 매우 긴 경우) + if len(full) <= DEFAULT_WINDOW_CHARS: + chunks.append({ + "text": full, + "chunk_type": "legal_article", + "section_title": marker.strip(), + }) + else: + # 긴 조문은 윈도우로 추가 분할 + for offset in range(0, len(full), DEFAULT_WINDOW_CHARS - 200): + sub = full[offset : offset + DEFAULT_WINDOW_CHARS] + if len(sub) >= MIN_CHUNK_CHARS: + chunks.append({ + "text": sub, + "chunk_type": "legal_article", + "section_title": marker.strip(), + }) + i += 2 + + # 법령이지만 조문 패턴이 없으면 기본 슬라이딩 윈도우로 fallback + if not chunks: + return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "section") + return chunks + + +def _chunk_news(text: str) -> list[dict]: + """뉴스: 문단 단위 (빈 줄 기준), 너무 짧으면 병합""" + paragraphs = [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()] + chunks = [] + buffer = "" + for p in paragraphs: + if len(buffer) + len(p) < DEFAULT_WINDOW_CHARS: + buffer = f"{buffer}\n\n{p}".strip() if buffer else p + else: + if len(buffer) >= MIN_CHUNK_CHARS: + chunks.append({"text": buffer, "chunk_type": "paragraph", "section_title": None}) + buffer = p + if buffer and len(buffer) >= MIN_CHUNK_CHARS: + chunks.append({"text": buffer, "chunk_type": "paragraph", "section_title": None}) + if not chunks: + return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, NEWS_OVERLAP_CHARS, "paragraph") + return chunks + + +def _chunk_markdown(text: str) -> list[dict]: + """마크다운: heading section 단위""" + # '#', '##', '###' 기준 분할 + pattern = re.compile(r"^(#{1,6}\s+.+)$", re.MULTILINE) + matches = list(pattern.finditer(text)) + + chunks = [] + if not matches: + return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "section") + + # 첫 heading 이전 서문 + if matches[0].start() > 0: + preface = text[: matches[0].start()].strip() + if len(preface) >= MIN_CHUNK_CHARS: + chunks.append({"text": preface, "chunk_type": "section", "section_title": "서문"}) + + for i, m in enumerate(matches): + start = m.start() + end = matches[i + 1].start() if i + 1 < len(matches) else len(text) + section_text = text[start:end].strip() + heading = m.group(1).strip("# ").strip() + if len(section_text) < MIN_CHUNK_CHARS: + continue + # 긴 섹션은 추가 분할 + if len(section_text) <= DEFAULT_WINDOW_CHARS: + chunks.append({ + "text": section_text, + "chunk_type": "section", + "section_title": heading, + }) + else: + sub_chunks = _chunk_sliding( + section_text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "section" + ) + for sc in sub_chunks: + sc["section_title"] = heading + chunks.append(sc) + return chunks + + +def _chunk_email(text: str) -> list[dict]: + """이메일: 본문 전체 (짧음)""" + text = text.strip() + if len(text) < MIN_CHUNK_CHARS: + return [] + # 너무 길면 슬라이딩으로 분할 + if len(text) > DEFAULT_WINDOW_CHARS * 2: + return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "email_body") + return [{"text": text, "chunk_type": "email_body", "section_title": None}] + + +def _chunk_sliding( + text: str, window: int, overlap: int, chunk_type: str +) -> list[dict]: + """슬라이딩 윈도우 분할 (문장 경계 가능한 한 보존)""" + chunks = [] + stride = window - overlap + if stride <= 0: + stride = window + + i = 0 + while i < len(text): + end = min(i + window, len(text)) + # 문장 경계에 맞춰 조정 (끝에 가까운 마침표/줄바꿈) + if end < len(text): + for punct in [". ", ".\n", "。", "\n\n", "\n"]: + cut = text.rfind(punct, max(i + window - 300, i), end) + if cut > i: + end = cut + len(punct) + break + chunk_text = text[i:end].strip() + if len(chunk_text) >= MIN_CHUNK_CHARS: + chunks.append({ + "text": chunk_text, + "chunk_type": chunk_type, + "section_title": None, + }) + if end >= len(text): + break + i = max(end - overlap, i + 1) + return chunks + + +def _chunk_document(doc: Document) -> list[dict]: + """문서 유형별 chunking 디스패처""" + text = doc.extracted_text or "" + if not text.strip(): + return [] + + strategy = _classify_chunk_strategy(doc) + + if strategy == "legal": + return _chunk_legal(text) + if strategy == "news": + return _chunk_news(text) + if strategy == "markdown": + return _chunk_markdown(text) + if strategy == "email": + return _chunk_email(text) + if strategy == "long_pdf": + return _chunk_sliding(text, LONG_PDF_WINDOW_CHARS, LONG_PDF_OVERLAP_CHARS, "window") + # default (pdf, general) + return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "window") + + +# ─── 뉴스 소스 메타데이터 조회 ─── +async def _lookup_news_source( + session: AsyncSession, doc: Document +) -> tuple[str | None, str | None, str | None]: + """뉴스 문서의 country/source/language를 news_sources에서 조회 + + 매칭 방식: doc.ai_sub_group = source.name.split(' ')[0] + """ + if doc.source_channel != "news": + return None, None, None + + source_name = doc.ai_sub_group or "" + if not source_name: + return None, None, None + + # news_sources에서 이름이 일치하는 레코드 찾기 (prefix match) + result = await session.execute(select(NewsSource)) + sources = result.scalars().all() + for src in sources: + if src.name.split(" ")[0] == source_name: + return src.country, src.name, src.language + + return None, source_name, None + + +# ─── 메인 워커 함수 ─── +async def process(document_id: int, session: AsyncSession) -> None: + """문서를 chunks로 분할하고 bge-m3로 임베딩""" + doc = await session.get(Document, document_id) + if not doc: + raise ValueError(f"문서 ID {document_id}를 찾을 수 없음") + + if not doc.extracted_text: + logger.warning(f"[chunk] document_id={document_id}: extracted_text 없음, 스킵") + return + + # chunking + chunk_dicts = _chunk_document(doc) + if not chunk_dicts: + logger.warning(f"[chunk] document_id={document_id}: chunks 생성 실패") + return + + # 메타데이터 준비 + language = _detect_language(doc.extracted_text) + country, source, src_lang = await _lookup_news_source(session, doc) + if src_lang: + language = src_lang + domain_category = "news" if doc.source_channel == "news" else "document" + + # 기존 chunks 삭제 (재처리) + await session.execute(delete(DocumentChunk).where(DocumentChunk.doc_id == document_id)) + + # 임베딩 + 저장 + client = AIClient() + try: + for idx, c in enumerate(chunk_dicts): + try: + embedding = await client.embed(c["text"]) + except Exception as e: + logger.warning(f"[chunk] document_id={document_id} chunk {idx} 임베딩 실패: {e}") + embedding = None + + chunk = DocumentChunk( + doc_id=document_id, + chunk_index=idx, + chunk_type=c["chunk_type"], + section_title=c.get("section_title"), + heading_path=None, # 추후 마크다운 tree에서 채움 + page=None, # 추후 PDF 파서에서 채움 + language=language, + country=country, + source=source, + domain_category=domain_category, + text=c["text"], + embedding=embedding, + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + ) + session.add(chunk) + + logger.info( + f"[chunk] document_id={document_id}: {len(chunk_dicts)}개 chunks 생성 " + f"(strategy={_classify_chunk_strategy(doc)}, lang={language}, " + f"domain={domain_category}, country={country})" + ) + finally: + await client.close() diff --git a/app/workers/queue_consumer.py b/app/workers/queue_consumer.py index 6a21b93..2729034 100644 --- a/app/workers/queue_consumer.py +++ b/app/workers/queue_consumer.py @@ -11,7 +11,7 @@ from models.queue import ProcessingQueue logger = setup_logger("queue_consumer") # stage별 배치 크기 -BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "preview": 2} +BATCH_SIZE = {"extract": 5, "classify": 3, "summarize": 3, "embed": 1, "chunk": 1, "preview": 2} STALE_THRESHOLD_MINUTES = 10 @@ -34,7 +34,7 @@ async def reset_stale_items(): async def enqueue_next_stage(document_id: int, current_stage: str): """현재 stage 완료 후 다음 stage를 pending으로 등록""" - next_stages = {"extract": ["classify", "preview"], "classify": ["embed"]} + next_stages = {"extract": ["classify", "preview"], "classify": ["embed", "chunk"]} stages = next_stages.get(current_stage, []) if not stages: return @@ -62,6 +62,7 @@ async def enqueue_next_stage(document_id: int, current_stage: str): async def consume_queue(): """큐에서 pending 항목을 가져와 stage별 워커 실행""" from workers.classify_worker import process as classify_process + from workers.chunk_worker import process as chunk_process from workers.embed_worker import process as embed_process from workers.extract_worker import process as extract_process from workers.preview_worker import process as preview_process @@ -72,6 +73,7 @@ async def consume_queue(): "classify": classify_process, "summarize": summarize_process, "embed": embed_process, + "chunk": chunk_process, "preview": preview_process, } diff --git a/migrations/014_document_chunks.sql b/migrations/014_document_chunks.sql new file mode 100644 index 0000000..52d02e3 --- /dev/null +++ b/migrations/014_document_chunks.sql @@ -0,0 +1,54 @@ +-- 문서 chunk 단위 검색 테이블 (chunk 중심 retrieval) +-- Phase 0.1: 문서를 의미 단위(조항/문단/슬라이딩 윈도우)로 쪼개어 별도 테이블에 저장 +-- 검색 품질 승부처는 chunk 품질 + reranker 입력 품질 + +CREATE TABLE document_chunks ( + id BIGSERIAL PRIMARY KEY, + doc_id BIGINT REFERENCES documents(id) ON DELETE CASCADE NOT NULL, + chunk_index INTEGER NOT NULL, -- 문서 내 순서 + + -- chunking 전략 메타 + chunk_type VARCHAR(30) NOT NULL, -- legal_article | paragraph | section | window | email_body + section_title TEXT, -- "제80조", "3.1 Methods" 등 + heading_path TEXT, -- "Chapter 3 > Section 2" + page INTEGER, -- PDF 페이지 번호 + + -- 다국어 및 domain-aware retrieval 메타 + language VARCHAR(10), -- ko | en | ja | zh | fr | de + country VARCHAR(10), -- KR | US | JP | CN | FR | DE (뉴스 필수) + source VARCHAR(100), -- 경향신문 | NYT | ... (뉴스 필수) + domain_category VARCHAR(20) NOT NULL, -- document | news (파이프라인 분기 키) + + -- 본문 + 임베딩 + text TEXT NOT NULL, + embedding vector(1024), -- bge-m3 + + created_at TIMESTAMPTZ DEFAULT NOW(), + updated_at TIMESTAMPTZ DEFAULT NOW(), + + UNIQUE (doc_id, chunk_index) +); + +-- 벡터 유사도 인덱스 (IVFFlat, cosine) +-- lists는 데이터 증가에 따라 조정 필요. 초기 500-5000 chunks 기준. +CREATE INDEX idx_chunks_embedding ON document_chunks + USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100); + +-- FTS 인덱스 (simple tokenizer — 한국어는 trigram 보조) +CREATE INDEX idx_chunks_fts ON document_chunks + USING GIN (to_tsvector('simple', text)); + +-- Trigram 인덱스 (한국어/부분매치 보조) +CREATE EXTENSION IF NOT EXISTS pg_trgm; +CREATE INDEX idx_chunks_trgm ON document_chunks + USING GIN (text gin_trgm_ops); + +-- 필터용 B-tree 인덱스 +CREATE INDEX idx_chunks_doc_id ON document_chunks (doc_id); +CREATE INDEX idx_chunks_domain_category ON document_chunks (domain_category); +CREATE INDEX idx_chunks_country ON document_chunks (country) WHERE country IS NOT NULL; +CREATE INDEX idx_chunks_source ON document_chunks (source) WHERE source IS NOT NULL; +CREATE INDEX idx_chunks_language ON document_chunks (language) WHERE language IS NOT NULL; + +-- chunk stage 추가 (processing_queue에서 chunk 단계로 처리) +ALTER TYPE process_stage ADD VALUE IF NOT EXISTS 'chunk';