365 lines
12 KiB
Python
Executable File
365 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
완전한 PDF -> HTML 번역 시스템
|
|
PDF OCR -> NLLB 번역 -> KoBART 요약 -> HTML 생성
|
|
"""
|
|
|
|
import torch
|
|
import time
|
|
import json
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional, Tuple
|
|
import re
|
|
from dataclasses import dataclass
|
|
|
|
# 문서 처리
|
|
try:
|
|
import PyPDF2
|
|
import pdfplumber
|
|
from docx import Document
|
|
except ImportError:
|
|
print("문서 처리 라이브러리 설치 필요: pip install PyPDF2 pdfplumber python-docx")
|
|
|
|
# 번역 및 요약 모델
|
|
from transformers import (
|
|
AutoTokenizer, AutoModelForSeq2SeqLM,
|
|
PreTrainedTokenizerFast, BartForConditionalGeneration
|
|
)
|
|
|
|
@dataclass
|
|
class TranslationResult:
|
|
original_text: str
|
|
translated_text: str
|
|
summary: str
|
|
processing_time: float
|
|
metadata: Dict
|
|
|
|
class IntegratedTranslationSystem:
|
|
def __init__(self):
|
|
self.device = self._setup_device()
|
|
self.models = {}
|
|
self.tokenizers = {}
|
|
self.config = self._load_config()
|
|
|
|
print(f"번역 시스템 초기화 (디바이스: {self.device})")
|
|
|
|
def _setup_device(self) -> torch.device:
|
|
"""최적 디바이스 설정"""
|
|
if torch.backends.mps.is_available():
|
|
return torch.device("mps")
|
|
elif torch.cuda.is_available():
|
|
return torch.device("cuda")
|
|
else:
|
|
return torch.device("cpu")
|
|
|
|
def _load_config(self) -> Dict:
|
|
"""설정 파일 로드"""
|
|
config_path = Path("config/settings.json")
|
|
if config_path.exists():
|
|
with open(config_path, 'r', encoding='utf-8') as f:
|
|
return json.load(f)
|
|
|
|
# 기본 설정
|
|
return {
|
|
"translation": {
|
|
"chunk_size": 500,
|
|
"max_length": 512,
|
|
"num_beams": 4,
|
|
"batch_size": 4
|
|
},
|
|
"summarization": {
|
|
"max_length": 150,
|
|
"min_length": 30,
|
|
"num_beams": 4
|
|
}
|
|
}
|
|
|
|
def load_models(self):
|
|
"""모든 모델 로드"""
|
|
print("모델 로딩 중...")
|
|
|
|
# 1. NLLB 번역 모델
|
|
print(" NLLB 번역 모델...")
|
|
try:
|
|
self.tokenizers['nllb'] = AutoTokenizer.from_pretrained("facebook/nllb-200-3.3B")
|
|
self.models['nllb'] = AutoModelForSeq2SeqLM.from_pretrained(
|
|
"facebook/nllb-200-3.3B",
|
|
torch_dtype=torch.float16,
|
|
low_cpu_mem_usage=True
|
|
).to(self.device)
|
|
print(" NLLB 모델 로드 완료")
|
|
except Exception as e:
|
|
print(f" NLLB 모델 로드 실패: {e}")
|
|
return False
|
|
|
|
# 2. KoBART 요약 모델
|
|
print(" KoBART 요약 모델...")
|
|
try:
|
|
self.tokenizers['kobart'] = PreTrainedTokenizerFast.from_pretrained("gogamza/kobart-summarization")
|
|
self.models['kobart'] = BartForConditionalGeneration.from_pretrained(
|
|
"gogamza/kobart-summarization",
|
|
torch_dtype=torch.float16,
|
|
low_cpu_mem_usage=True
|
|
).to(self.device)
|
|
print(" KoBART 모델 로드 완료")
|
|
except Exception as e:
|
|
print(f" KoBART 모델 로드 실패: {e}")
|
|
print(" 요약 없이 번역만 진행")
|
|
self.models['kobart'] = None
|
|
|
|
print("모델 로딩 완료!")
|
|
return True
|
|
|
|
def detect_language(self, text: str) -> str:
|
|
"""언어 자동 감지"""
|
|
# 간단한 휴리스틱 언어 감지
|
|
korean_chars = len(re.findall(r'[가-힣]', text))
|
|
japanese_chars = len(re.findall(r'[ひらがなカタカナ一-龯]', text))
|
|
english_chars = len(re.findall(r'[a-zA-Z]', text))
|
|
|
|
total_chars = len(text.replace(' ', ''))
|
|
|
|
if total_chars == 0:
|
|
return "unknown"
|
|
|
|
if korean_chars / total_chars > 0.3:
|
|
return "korean"
|
|
elif japanese_chars / total_chars > 0.1:
|
|
return "japanese"
|
|
elif english_chars / total_chars > 0.5:
|
|
return "english"
|
|
else:
|
|
return "unknown"
|
|
|
|
def extract_text_from_pdf(self, pdf_path: str) -> str:
|
|
"""PDF에서 텍스트 추출"""
|
|
print(f"PDF 텍스트 추출: {pdf_path}")
|
|
|
|
text = ""
|
|
try:
|
|
# pdfplumber 우선 시도
|
|
import pdfplumber
|
|
with pdfplumber.open(pdf_path) as pdf:
|
|
for page_num, page in enumerate(pdf.pages, 1):
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
text += f"\n\n{page_text}"
|
|
|
|
print(f"PDF 텍스트 추출 완료: {len(text)}자")
|
|
|
|
except Exception as e:
|
|
print(f"pdfplumber 실패: {e}")
|
|
|
|
# PyPDF2 백업
|
|
try:
|
|
import PyPDF2
|
|
with open(pdf_path, 'rb') as file:
|
|
pdf_reader = PyPDF2.PdfReader(file)
|
|
for page in pdf_reader.pages:
|
|
page_text = page.extract_text()
|
|
if page_text:
|
|
text += f"\n\n{page_text}"
|
|
|
|
print(f"PyPDF2로 텍스트 추출 완료: {len(text)}자")
|
|
|
|
except Exception as e2:
|
|
print(f"PDF 텍스트 추출 완전 실패: {e2}")
|
|
return ""
|
|
|
|
return self._clean_text(text)
|
|
|
|
def _clean_text(self, text: str) -> str:
|
|
"""추출된 텍스트 정리"""
|
|
# 과도한 공백 정리
|
|
text = re.sub(r'\n\s*\n\s*\n', '\n\n', text)
|
|
text = re.sub(r'[ \t]+', ' ', text)
|
|
|
|
# 페이지 번호 제거
|
|
text = re.sub(r'\n\d+\n', '\n', text)
|
|
|
|
return text.strip()
|
|
|
|
def split_text_into_chunks(self, text: str, chunk_size: int = 500) -> List[str]:
|
|
"""텍스트를 번역 가능한 청크로 분할"""
|
|
sentences = re.split(r'[.!?]\s+', text)
|
|
chunks = []
|
|
current_chunk = ""
|
|
|
|
for sentence in sentences:
|
|
test_chunk = current_chunk + " " + sentence if current_chunk else sentence
|
|
|
|
if len(test_chunk) > chunk_size and current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
current_chunk = sentence
|
|
else:
|
|
current_chunk = test_chunk
|
|
|
|
if current_chunk:
|
|
chunks.append(current_chunk.strip())
|
|
|
|
print(f"텍스트 분할: {len(chunks)}개 청크")
|
|
return chunks
|
|
|
|
def translate_text(self, text: str, src_lang: str = "english") -> str:
|
|
"""NLLB로 텍스트 번역"""
|
|
if src_lang == "korean":
|
|
return text # 한국어는 번역하지 않음
|
|
|
|
# 언어 코드 매핑
|
|
lang_map = {
|
|
"english": "eng_Latn",
|
|
"japanese": "jpn_Jpan",
|
|
"korean": "kor_Hang"
|
|
}
|
|
|
|
src_code = lang_map.get(src_lang, "eng_Latn")
|
|
tgt_code = "kor_Hang"
|
|
|
|
tokenizer = self.tokenizers['nllb']
|
|
model = self.models['nllb']
|
|
|
|
# 청크별 번역
|
|
chunks = self.split_text_into_chunks(text, self.config["translation"]["chunk_size"])
|
|
translated_chunks = []
|
|
|
|
print(f"번역 시작: {src_lang} -> 한국어")
|
|
|
|
for i, chunk in enumerate(chunks):
|
|
print(f" 청크 {i+1}/{len(chunks)} 번역 중...")
|
|
|
|
inputs = tokenizer(chunk, return_tensors="pt", padding=True, truncation=True).to(self.device)
|
|
|
|
with torch.no_grad():
|
|
translated_tokens = model.generate(
|
|
**inputs,
|
|
forced_bos_token_id=tokenizer.convert_tokens_to_ids(tgt_code),
|
|
max_length=self.config["translation"]["max_length"],
|
|
num_beams=self.config["translation"]["num_beams"],
|
|
early_stopping=True
|
|
)
|
|
|
|
translated_chunk = tokenizer.decode(translated_tokens[0], skip_special_tokens=True)
|
|
translated_chunks.append(translated_chunk)
|
|
|
|
result = "\n\n".join(translated_chunks)
|
|
print(f"번역 완료: {len(result)}자")
|
|
return result
|
|
|
|
def summarize_text(self, text: str) -> str:
|
|
"""KoBART로 한국어 텍스트 요약"""
|
|
if self.models['kobart'] is None:
|
|
print("요약 모델 없음, 첫 300자 반환")
|
|
return text[:300] + "..." if len(text) > 300 else text
|
|
|
|
print("텍스트 요약 중...")
|
|
|
|
tokenizer = self.tokenizers['kobart']
|
|
model = self.models['kobart']
|
|
|
|
inputs = tokenizer(
|
|
text,
|
|
return_tensors="pt",
|
|
max_length=1024,
|
|
truncation=True,
|
|
padding=True,
|
|
return_token_type_ids=False
|
|
).to(self.device)
|
|
|
|
with torch.no_grad():
|
|
summary_ids = model.generate(
|
|
input_ids=inputs['input_ids'],
|
|
attention_mask=inputs['attention_mask'],
|
|
max_length=self.config["summarization"]["max_length"],
|
|
min_length=self.config["summarization"]["min_length"],
|
|
num_beams=self.config["summarization"]["num_beams"],
|
|
early_stopping=True,
|
|
no_repeat_ngram_size=2,
|
|
length_penalty=1.2
|
|
)
|
|
|
|
summary = tokenizer.decode(summary_ids[0], skip_special_tokens=True)
|
|
print(f"요약 완료: {len(summary)}자")
|
|
return summary
|
|
|
|
def process_document(self, input_path: str, output_dir: str = "output") -> TranslationResult:
|
|
"""전체 문서 처리 파이프라인"""
|
|
start_time = time.time()
|
|
|
|
print(f"문서 처리 시작: {input_path}")
|
|
|
|
# 1. 텍스트 추출
|
|
if input_path.lower().endswith('.pdf'):
|
|
original_text = self.extract_text_from_pdf(input_path)
|
|
else:
|
|
with open(input_path, 'r', encoding='utf-8') as f:
|
|
original_text = f.read()
|
|
|
|
if not original_text:
|
|
raise ValueError("텍스트 추출 실패")
|
|
|
|
# 2. 언어 감지
|
|
detected_lang = self.detect_language(original_text)
|
|
print(f"감지된 언어: {detected_lang}")
|
|
|
|
# 3. 번역
|
|
if detected_lang == "korean":
|
|
translated_text = original_text
|
|
print("한국어 문서, 번역 생략")
|
|
else:
|
|
translated_text = self.translate_text(original_text, detected_lang)
|
|
|
|
# 4. 요약
|
|
summary = self.summarize_text(translated_text)
|
|
|
|
# 5. 결과 저장
|
|
output_path = Path(output_dir)
|
|
output_path.mkdir(exist_ok=True)
|
|
|
|
base_name = Path(input_path).stem
|
|
|
|
# 텍스트 파일 저장
|
|
with open(output_path / f"{base_name}_translated.txt", 'w', encoding='utf-8') as f:
|
|
f.write(translated_text)
|
|
|
|
with open(output_path / f"{base_name}_summary.txt", 'w', encoding='utf-8') as f:
|
|
f.write(summary)
|
|
|
|
processing_time = time.time() - start_time
|
|
|
|
result = TranslationResult(
|
|
original_text=original_text,
|
|
translated_text=translated_text,
|
|
summary=summary,
|
|
processing_time=processing_time,
|
|
metadata={
|
|
"input_file": input_path,
|
|
"detected_language": detected_lang,
|
|
"original_chars": len(original_text),
|
|
"translated_chars": len(translated_text),
|
|
"summary_chars": len(summary),
|
|
"compression_ratio": len(summary) / len(translated_text) * 100 if translated_text else 0
|
|
}
|
|
)
|
|
|
|
print(f"문서 처리 완료! ({processing_time/60:.1f}분 소요)")
|
|
return result
|
|
|
|
def main():
|
|
"""메인 실행 함수"""
|
|
system = IntegratedTranslationSystem()
|
|
|
|
if not system.load_models():
|
|
print("모델 로딩 실패")
|
|
return None
|
|
|
|
print("\n" + "="*60)
|
|
print("통합 번역 시스템 준비 완료!")
|
|
print("사용법:")
|
|
print(" result = system.process_document('input.pdf')")
|
|
print("="*60)
|
|
|
|
return system
|
|
|
|
if __name__ == "__main__":
|
|
system = main()
|