"""Chunk 워커 — 문서 유형별 chunking + bge-m3 임베딩 (Phase 0.1) 승부처는 chunk 품질. 문서 유형별로 다른 전략: - 법령: 조/항 단위 (구조적, overlap 불필요) - 뉴스: 문단 단위 (overlap ~15%) - 일반 문서: 슬라이딩 윈도우 (overlap 15-25%) - 긴 PDF: 슬라이딩 윈도우 (overlap 20-30%) - 마크다운: heading section 단위 (overlap 없음) - 이메일: 본문 전체 (대부분 짧음) """ import re from datetime import datetime, timezone from sqlalchemy import delete, select from sqlalchemy.ext.asyncio import AsyncSession from ai.client import AIClient from core.utils import setup_logger from models.chunk import DocumentChunk from models.document import Document from models.news_source import NewsSource logger = setup_logger("chunk_worker") # ─── 상수 ─── # 문자 기준(bge-m3는 8192 토큰 여유 있음, 한국어 1토큰≈2.5자) DEFAULT_WINDOW_CHARS = 1500 # ~600 tokens (ko 기준) DEFAULT_OVERLAP_CHARS = 300 # ~20% overlap LONG_PDF_WINDOW_CHARS = 2000 # ~800 tokens LONG_PDF_OVERLAP_CHARS = 500 # ~25% overlap NEWS_OVERLAP_CHARS = 150 # ~15% MIN_CHUNK_CHARS = 50 # 너무 짧은 chunk는 버림 # ─── 언어 감지 (간단한 휴리스틱) ─── def _detect_language(text: str) -> str: """문자 비율 기반 언어 감지""" if not text: return "unknown" sample = text[:2000] ko = sum(1 for c in sample if "\uac00" <= c <= "\ud7a3") ja = sum(1 for c in sample if "\u3040" <= c <= "\u30ff") zh = sum(1 for c in sample if "\u4e00" <= c <= "\u9fff") en = sum(1 for c in sample if c.isascii() and c.isalpha()) total = ko + ja + zh + en if total == 0: return "unknown" # CJK 우선 (한중일은 한자 overlap이 있으므로 순서 중요) if ja / total > 0.1: return "ja" if ko / total > 0.2: return "ko" if zh / total > 0.2: return "zh" if en / total > 0.5: return "en" return "ko" # 기본값 # ─── 문서 유형 판별 ─── def _classify_chunk_strategy(doc: Document) -> str: """문서 유형에 따라 chunking 전략 선택""" if doc.source_channel == "news": return "news" if doc.ai_domain and "Legislation" in doc.ai_domain: return "legal" if doc.file_format == "md" or doc.file_format == "markdown": return "markdown" if doc.file_format in ("eml", "msg"): return "email" if doc.file_format == "pdf": # 본문 길이로 긴 PDF 구분 if doc.extracted_text and len(doc.extracted_text) > 20000: return "long_pdf" return "pdf" return "default" # ─── Chunking 전략 ─── def _chunk_legal(text: str) -> list[dict]: """법령: 제N조 단위로 분할 (상위 조문 컨텍스트 보존)""" # "제 1 조", "제1조", "제 1 조(제목)" 등 매칭 pattern = re.compile(r"(제\s*\d+\s*조(?:의\s*\d+)?(?:\([^)]*\))?)") parts = pattern.split(text) chunks = [] # parts[0] = 조 이전 서문, parts[1], parts[2] = (마커, 본문) pairs if parts[0].strip() and len(parts[0]) >= MIN_CHUNK_CHARS: chunks.append({ "text": parts[0].strip()[:DEFAULT_WINDOW_CHARS], "chunk_type": "section", "section_title": "서문", }) i = 1 while i < len(parts): marker = parts[i] body = parts[i + 1] if i + 1 < len(parts) else "" full = f"{marker} {body}".strip() if len(full) >= MIN_CHUNK_CHARS: # 너무 길면 슬라이싱 (조문이 매우 긴 경우) if len(full) <= DEFAULT_WINDOW_CHARS: chunks.append({ "text": full, "chunk_type": "legal_article", "section_title": marker.strip(), }) else: # 긴 조문은 윈도우로 추가 분할 for offset in range(0, len(full), DEFAULT_WINDOW_CHARS - 200): sub = full[offset : offset + DEFAULT_WINDOW_CHARS] if len(sub) >= MIN_CHUNK_CHARS: chunks.append({ "text": sub, "chunk_type": "legal_article", "section_title": marker.strip(), }) i += 2 # 법령이지만 조문 패턴이 없으면 기본 슬라이딩 윈도우로 fallback if not chunks: return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "section") return chunks def _chunk_news(text: str) -> list[dict]: """뉴스: 문단 단위 (빈 줄 기준), 너무 짧으면 병합""" paragraphs = [p.strip() for p in re.split(r"\n\s*\n", text) if p.strip()] chunks = [] buffer = "" for p in paragraphs: if len(buffer) + len(p) < DEFAULT_WINDOW_CHARS: buffer = f"{buffer}\n\n{p}".strip() if buffer else p else: if len(buffer) >= MIN_CHUNK_CHARS: chunks.append({"text": buffer, "chunk_type": "paragraph", "section_title": None}) buffer = p if buffer and len(buffer) >= MIN_CHUNK_CHARS: chunks.append({"text": buffer, "chunk_type": "paragraph", "section_title": None}) if not chunks: return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, NEWS_OVERLAP_CHARS, "paragraph") return chunks def _chunk_markdown(text: str) -> list[dict]: """마크다운: heading section 단위""" # '#', '##', '###' 기준 분할 pattern = re.compile(r"^(#{1,6}\s+.+)$", re.MULTILINE) matches = list(pattern.finditer(text)) chunks = [] if not matches: return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "section") # 첫 heading 이전 서문 if matches[0].start() > 0: preface = text[: matches[0].start()].strip() if len(preface) >= MIN_CHUNK_CHARS: chunks.append({"text": preface, "chunk_type": "section", "section_title": "서문"}) for i, m in enumerate(matches): start = m.start() end = matches[i + 1].start() if i + 1 < len(matches) else len(text) section_text = text[start:end].strip() heading = m.group(1).strip("# ").strip() if len(section_text) < MIN_CHUNK_CHARS: continue # 긴 섹션은 추가 분할 if len(section_text) <= DEFAULT_WINDOW_CHARS: chunks.append({ "text": section_text, "chunk_type": "section", "section_title": heading, }) else: sub_chunks = _chunk_sliding( section_text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "section" ) for sc in sub_chunks: sc["section_title"] = heading chunks.append(sc) return chunks def _chunk_email(text: str) -> list[dict]: """이메일: 본문 전체 (짧음)""" text = text.strip() if len(text) < MIN_CHUNK_CHARS: return [] # 너무 길면 슬라이딩으로 분할 if len(text) > DEFAULT_WINDOW_CHARS * 2: return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "email_body") return [{"text": text, "chunk_type": "email_body", "section_title": None}] def _chunk_sliding( text: str, window: int, overlap: int, chunk_type: str ) -> list[dict]: """슬라이딩 윈도우 분할 (문장 경계 가능한 한 보존)""" chunks = [] stride = window - overlap if stride <= 0: stride = window i = 0 while i < len(text): end = min(i + window, len(text)) # 문장 경계에 맞춰 조정 (끝에 가까운 마침표/줄바꿈) if end < len(text): for punct in [". ", ".\n", "。", "\n\n", "\n"]: cut = text.rfind(punct, max(i + window - 300, i), end) if cut > i: end = cut + len(punct) break chunk_text = text[i:end].strip() if len(chunk_text) >= MIN_CHUNK_CHARS: chunks.append({ "text": chunk_text, "chunk_type": chunk_type, "section_title": None, }) if end >= len(text): break i = max(end - overlap, i + 1) return chunks def _chunk_document(doc: Document) -> list[dict]: """문서 유형별 chunking 디스패처""" text = doc.extracted_text or "" if not text.strip(): return [] strategy = _classify_chunk_strategy(doc) if strategy == "legal": return _chunk_legal(text) if strategy == "news": return _chunk_news(text) if strategy == "markdown": return _chunk_markdown(text) if strategy == "email": return _chunk_email(text) if strategy == "long_pdf": return _chunk_sliding(text, LONG_PDF_WINDOW_CHARS, LONG_PDF_OVERLAP_CHARS, "window") # default (pdf, general) return _chunk_sliding(text, DEFAULT_WINDOW_CHARS, DEFAULT_OVERLAP_CHARS, "window") # ─── 뉴스 소스 메타데이터 조회 ─── async def _lookup_news_source( session: AsyncSession, doc: Document ) -> tuple[str | None, str | None, str | None]: """뉴스 문서의 country/source/language를 news_sources에서 조회 매칭 방식: doc.ai_sub_group = source.name.split(' ')[0] """ if doc.source_channel != "news": return None, None, None source_name = doc.ai_sub_group or "" if not source_name: return None, None, None # news_sources에서 이름이 일치하는 레코드 찾기 (prefix match) result = await session.execute(select(NewsSource)) sources = result.scalars().all() for src in sources: if src.name.split(" ")[0] == source_name: return src.country, src.name, src.language return None, source_name, None # ─── 메인 워커 함수 ─── async def process(document_id: int, session: AsyncSession) -> None: """문서를 chunks로 분할하고 bge-m3로 임베딩""" doc = await session.get(Document, document_id) if not doc: raise ValueError(f"문서 ID {document_id}를 찾을 수 없음") if not doc.extracted_text: logger.warning(f"[chunk] document_id={document_id}: extracted_text 없음, 스킵") return # chunking chunk_dicts = _chunk_document(doc) if not chunk_dicts: logger.warning(f"[chunk] document_id={document_id}: chunks 생성 실패") return # 메타데이터 준비 language = _detect_language(doc.extracted_text) country, source, src_lang = await _lookup_news_source(session, doc) if src_lang: language = src_lang domain_category = "news" if doc.source_channel == "news" else "document" # 기존 chunks 삭제 (재처리) await session.execute(delete(DocumentChunk).where(DocumentChunk.doc_id == document_id)) # 임베딩 + 저장 client = AIClient() try: for idx, c in enumerate(chunk_dicts): try: embedding = await client.embed(c["text"]) except Exception as e: logger.warning(f"[chunk] document_id={document_id} chunk {idx} 임베딩 실패: {e}") embedding = None chunk = DocumentChunk( doc_id=document_id, chunk_index=idx, chunk_type=c["chunk_type"], section_title=c.get("section_title"), heading_path=None, # 추후 마크다운 tree에서 채움 page=None, # 추후 PDF 파서에서 채움 language=language, country=country, source=source, domain_category=domain_category, text=c["text"], embedding=embedding, created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc), ) session.add(chunk) logger.info( f"[chunk] document_id={document_id}: {len(chunk_dicts)}개 chunks 생성 " f"(strategy={_classify_chunk_strategy(doc)}, lang={language}, " f"domain={domain_category}, country={country})" ) finally: await client.close()