b20c4f933b
문제: 1천+ 문항 토픽에서 보기 페이지 prev/next 가 page_size=200 cap 으로
회차 외 문항만 받아 같은 회차 prev/next 누락 회귀.
해결:
- /study-topics/{tid}/questions 에 exam_round Query 파라미터 추가 (exact match).
- StudyQuestionSummary 응답에 exam_question_number 필드 추가.
- exam_round 필터 시 정렬 = exam_question_number asc NULLS LAST, created_at asc.
- 보기 페이지 loadRoundSiblings 가 ?exam_round= 로 한 회차만 fetch.
- 토스트 문구 "토픽 200문제 초과" → "이 회차에 200문항 초과" (의미 일치).
추가 — 가스기사 기출 일괄 import 스크립트:
- scripts/import_gas_questions.py: md 파서 + dry-run + apply.
· exam_question_number 3소스 (파일명/제목/메타) 일치 검증.
· subject 정규화 (괄호 세부분류는 scope 로 이동, 5과목 통일).
· 이미지 4케이스 판정 + import_reports/{회차}_image_required.md 생성.
· 첫 실패 abort 기본, --skip-existing/--continue-on-error 옵션.
· 토큰 사전 검사 (GET /study-topics/{tid}).
- import_reports/: 2019년 1~3회 + 2020년 1~2회 리포트.
- 운영: 4회분 360문항 자동 import 완료 (이미지 4건 자동 첨부).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
723 lines
27 KiB
Python
723 lines
27 KiB
Python
"""scripts/import_gas_questions.py — 가스기사 기출 md 일괄 import.
|
|
|
|
usage:
|
|
export STUDY_IMPORT_TOKEN=eyJ... # /api/auth/login 으로 발급
|
|
python scripts/import_gas_questions.py "~/Desktop/가스기사/2019년 2회"
|
|
python scripts/import_gas_questions.py "~/Desktop/가스기사/2019년 2회" --apply
|
|
python scripts/import_gas_questions.py "~/Desktop/가스기사/2019년 2회" --apply --skip-existing
|
|
|
|
규칙 (plan crispy-petting-dijkstra.md 기준):
|
|
- exam_question_number 는 파일명/제목/메타 3소스 일치 의무. 불일치 시 dry-run 실패.
|
|
- 이미지: 첫 줄 "**⚠ 이미지 별도 업로드 필요**" 마커 + {N}.png 매트릭스 4종.
|
|
· marker yes + png yes → attach_planned (apply 시 첨부)
|
|
· marker yes + png no → missing_png (dry-run 실패)
|
|
· marker no + png yes → placeholder_skipped (참고)
|
|
· marker no + png no → 텍스트 문항 (정상)
|
|
- outer fence only strip: terminated wrap 또는 unterminated (백틱 그룹 1).
|
|
- apply 기본 = 첫 실패에서 abort. --continue-on-error / --skip-existing 옵션.
|
|
- dry-run 시 import_reports/{exam_round}_image_required.md 항상 생성.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
import httpx
|
|
|
|
|
|
API_BASE = os.environ.get("STUDY_IMPORT_API", "https://document.hyungi.net/api")
|
|
TOPIC_ID = int(os.environ.get("STUDY_IMPORT_TOPIC_ID", "4"))
|
|
TOKEN = os.environ.get("STUDY_IMPORT_TOKEN", "")
|
|
|
|
# 폴더명 → subject 매핑 (NN 접두 + 공백 또는 _ 분리 모두 허용).
|
|
SUBJECT_MAP = {
|
|
"01 가스유체역학": "가스유체역학",
|
|
"01_가스유체역학": "가스유체역학",
|
|
"01 가스유체": "가스유체역학",
|
|
"02 연소공학": "연소공학",
|
|
"02_연소공학": "연소공학",
|
|
"03 가스설비": "가스설비",
|
|
"03_가스설비": "가스설비",
|
|
"04 가스안전관리": "가스안전관리",
|
|
"04_가스안전관리": "가스안전관리",
|
|
"05 가스계측기기": "가스계측기기",
|
|
"05_가스계측기기": "가스계측기기",
|
|
"05 가스계측": "가스계측기기",
|
|
}
|
|
|
|
CIRCLED = {"①": 1, "②": 2, "③": 3, "④": 4}
|
|
IMAGE_MARKER = "⚠ 이미지 별도 업로드 필요"
|
|
|
|
# subject 정규화: 5개 시험 과목으로 고정. md 가 "연소공학 (열역학)" 같이
|
|
# 괄호 세부 분류를 가지면 subject 는 head 만, 괄호 안은 scope 앞에 붙임.
|
|
CANONICAL_SUBJECTS = {
|
|
"가스유체역학",
|
|
"연소공학",
|
|
"가스설비",
|
|
"가스안전관리",
|
|
"가스계측기기",
|
|
}
|
|
|
|
|
|
def normalize_subject(
|
|
raw_subject: str, raw_scope: Optional[str]
|
|
) -> tuple[str, Optional[str]]:
|
|
"""('연소공학 (열역학)', 'X') → ('연소공학', '열역학 · X')."""
|
|
s = (raw_subject or "").strip()
|
|
if not s or s in CANONICAL_SUBJECTS:
|
|
return s, raw_scope
|
|
m = re.match(r"^(.+?)\s*\(\s*([^()]+?)\s*\)\s*$", s)
|
|
if not m:
|
|
return s, raw_scope
|
|
head = m.group(1).strip()
|
|
detail = m.group(2).strip()
|
|
if head not in CANONICAL_SUBJECTS:
|
|
# 정규 5과목 외면 정규화 안 함 (사용자 검토 대상)
|
|
return s, raw_scope
|
|
if raw_scope:
|
|
new_scope = f"{detail} · {raw_scope}"
|
|
else:
|
|
new_scope = detail
|
|
return head, new_scope
|
|
|
|
|
|
# ─── outer fence strip (PR-12 stripOuterFence 와 동일 룰) ───
|
|
|
|
TERM_RE = re.compile(r"^```[A-Za-z0-9_-]*[ \t]*\n([\s\S]*?)\n```$")
|
|
UNTERM_RE = re.compile(r"^```[A-Za-z0-9_-]*[ \t]*\n([\s\S]*)$")
|
|
|
|
|
|
def strip_outer_fence(text: str) -> str:
|
|
"""전체 텍스트가 단일 fenced block 으로 감싸진 경우만 unwrap. 아니면 원본."""
|
|
if not text:
|
|
return text
|
|
trimmed = text.strip()
|
|
# (1) terminated
|
|
m = TERM_RE.match(trimmed)
|
|
if m:
|
|
inner = m.group(1)
|
|
if "```" not in inner:
|
|
return inner
|
|
return text
|
|
# (2) unterminated (백틱 그룹 == 1)
|
|
if trimmed.count("```") == 1:
|
|
m2 = UNTERM_RE.match(trimmed)
|
|
if m2:
|
|
return m2.group(1)
|
|
return text
|
|
|
|
|
|
# ─── md 파서 ───
|
|
|
|
|
|
@dataclass
|
|
class ParsedQuestion:
|
|
file_path: Path
|
|
exam_round: str # 메타에서 추출
|
|
exam_question_number: int # 파일명/제목/메타 일치 후
|
|
subject: str
|
|
scope: Optional[str]
|
|
source_note: Optional[str]
|
|
question_text: str
|
|
choice_1: str
|
|
choice_2: str
|
|
choice_3: str
|
|
choice_4: str
|
|
correct_choice: int # 1~4
|
|
explanation: Optional[str]
|
|
image_marker: bool
|
|
image_description: Optional[str]
|
|
png_path: Optional[Path] # 존재하면 path, 없으면 None
|
|
parse_errors: list[str] = field(default_factory=list)
|
|
|
|
|
|
@dataclass
|
|
class ImageStatus:
|
|
"""4 케이스: attach_planned / missing_png / placeholder_skipped / text_only."""
|
|
state: str
|
|
description: Optional[str] = None
|
|
|
|
|
|
def classify_image(p: ParsedQuestion) -> ImageStatus:
|
|
if p.image_marker and p.png_path and p.png_path.exists():
|
|
return ImageStatus("attach_planned", p.image_description)
|
|
if p.image_marker and (not p.png_path or not p.png_path.exists()):
|
|
return ImageStatus("missing_png", p.image_description)
|
|
if not p.image_marker and p.png_path and p.png_path.exists():
|
|
return ImageStatus("placeholder_skipped")
|
|
return ImageStatus("text_only")
|
|
|
|
|
|
def _extract_section_fence(md: str, header: str) -> Optional[str]:
|
|
"""### {header} 다음 fenced block 본문. 없으면 None.
|
|
|
|
fenced block 이 없으면 다음 ### 또는 EOF 까지 plain 텍스트 가져옴 (fallback).
|
|
"""
|
|
pat_fenced = re.compile(
|
|
rf"^### {re.escape(header)}\s*$\s*\n```[A-Za-z0-9_-]*[ \t]*\n([\s\S]*?)\n```",
|
|
re.MULTILINE,
|
|
)
|
|
m = pat_fenced.search(md)
|
|
if m:
|
|
return m.group(1).rstrip()
|
|
# fallback — fence 없는 경우
|
|
pat_plain = re.compile(
|
|
rf"^### {re.escape(header)}\s*$\s*\n([\s\S]*?)(?=^###|\Z)",
|
|
re.MULTILINE,
|
|
)
|
|
m = pat_plain.search(md)
|
|
if m:
|
|
return m.group(1).strip()
|
|
return None
|
|
|
|
|
|
def _extract_meta(md: str) -> dict[str, str]:
|
|
"""### 메타 섹션의 - **키**: 값 항목 dict."""
|
|
out: dict[str, str] = {}
|
|
section = re.search(r"^### 메타\s*$\s*\n([\s\S]*?)(?=^###|\Z)", md, re.MULTILINE)
|
|
if not section:
|
|
return out
|
|
body = section.group(1)
|
|
for line in body.splitlines():
|
|
m = re.match(r"^\s*-\s*\*\*([^*]+)\*\*\s*:\s*(.+?)\s*$", line)
|
|
if m:
|
|
out[m.group(1).strip()] = m.group(2).strip()
|
|
return out
|
|
|
|
|
|
def _parse_choices(raw: str) -> tuple[str, str, str, str]:
|
|
"""보기 fenced block → 4개 텍스트.
|
|
|
|
한 줄당 "{N}번: ..." 형태. 멀티라인 보기는 다음 N번 보기 직전까지 합침.
|
|
"""
|
|
lines = raw.splitlines()
|
|
buckets: dict[int, list[str]] = {1: [], 2: [], 3: [], 4: []}
|
|
current: Optional[int] = None
|
|
for line in lines:
|
|
m = re.match(r"^\s*(\d)번\s*:\s*(.*)$", line)
|
|
if m:
|
|
current = int(m.group(1))
|
|
if 1 <= current <= 4:
|
|
buckets[current].append(m.group(2).rstrip())
|
|
else:
|
|
current = None
|
|
elif current is not None:
|
|
stripped = line.rstrip()
|
|
if stripped:
|
|
buckets[current].append(stripped)
|
|
result: list[str] = []
|
|
for i in (1, 2, 3, 4):
|
|
joined = "\n".join(buckets[i]).strip()
|
|
result.append(joined)
|
|
return tuple(result) # type: ignore
|
|
|
|
|
|
def _parse_correct(raw: str) -> Optional[int]:
|
|
"""### 정답 본문에서 1~4 추출. **N** 우선, ①~④ 폴백."""
|
|
if not raw:
|
|
return None
|
|
text = raw.strip()
|
|
m = re.search(r"\*\*\s*([1-4])\s*\*\*", text)
|
|
if m:
|
|
return int(m.group(1))
|
|
for ch, n in CIRCLED.items():
|
|
if ch in text:
|
|
return n
|
|
m2 = re.search(r"\b([1-4])\b", text)
|
|
if m2:
|
|
return int(m2.group(1))
|
|
return None
|
|
|
|
|
|
def parse_md(file_path: Path) -> ParsedQuestion:
|
|
"""단일 md 파싱. parse_errors 비어있으면 OK, 있으면 dry-run 실패 사유."""
|
|
md = file_path.read_text(encoding="utf-8")
|
|
errors: list[str] = []
|
|
|
|
# 첫 줄 + 이미지 마커
|
|
lines = md.splitlines()
|
|
title_line = lines[0] if lines else ""
|
|
image_marker = False
|
|
image_desc: Optional[str] = None
|
|
for line in lines[1:6]:
|
|
if IMAGE_MARKER in line:
|
|
image_marker = True
|
|
mdesc = re.search(r"이미지 별도 업로드 필요\s*\*\*\s*[—\-]\s*(.+)$", line)
|
|
if mdesc:
|
|
image_desc = mdesc.group(1).strip()
|
|
break
|
|
|
|
# 제목에서 회차/과목/번호 추출
|
|
title_qnum: Optional[int] = None
|
|
title_round: Optional[str] = None
|
|
title_subject: Optional[str] = None
|
|
m_title = re.match(r"^#\s*(.+?)\s+(\S+)\s+(\d+)번\s*$", title_line)
|
|
if m_title:
|
|
title_round = m_title.group(1).strip()
|
|
title_subject = m_title.group(2).strip()
|
|
title_qnum = int(m_title.group(3))
|
|
|
|
# 파일명에서 번호
|
|
file_qnum: Optional[int] = None
|
|
m_file = re.match(r"^(\d+)\.md$", file_path.name)
|
|
if m_file:
|
|
file_qnum = int(m_file.group(1))
|
|
|
|
# 메타
|
|
meta = _extract_meta(md)
|
|
meta_qnum: Optional[int] = None
|
|
if "문항 번호" in meta:
|
|
try:
|
|
meta_qnum = int(meta["문항 번호"])
|
|
except ValueError:
|
|
errors.append(f"메타 문항 번호 파싱 실패: {meta['문항 번호']!r}")
|
|
|
|
# 3 소스 일치 검증
|
|
qnums = {x for x in (file_qnum, title_qnum, meta_qnum) if x is not None}
|
|
if len(qnums) == 0:
|
|
errors.append("문항 번호 추출 실패 (파일명/제목/메타 모두)")
|
|
final_qnum = -1
|
|
elif len(qnums) > 1:
|
|
errors.append(
|
|
f"문항 번호 불일치: 파일명={file_qnum} 제목={title_qnum} 메타={meta_qnum}"
|
|
)
|
|
final_qnum = file_qnum or title_qnum or meta_qnum or -1
|
|
else:
|
|
final_qnum = qnums.pop()
|
|
|
|
# 메타 필수 — subject / exam_round
|
|
subject_meta = meta.get("과목", "").strip()
|
|
if not subject_meta:
|
|
# 폴더명 폴백
|
|
folder = file_path.parent.name
|
|
subject_meta = SUBJECT_MAP.get(folder, "")
|
|
if not subject_meta:
|
|
errors.append("과목 메타 누락")
|
|
|
|
exam_round = meta.get("회차", "").strip()
|
|
if not exam_round and title_round:
|
|
exam_round = title_round
|
|
if not exam_round:
|
|
errors.append("회차 메타 누락")
|
|
|
|
scope = meta.get("범위", "").strip() or None
|
|
source_note = meta.get("출처/메모", "").strip() or None
|
|
|
|
# subject 정규화: 괄호 세부 분류는 scope 로 이동. 5과목 외면 그대로 보존.
|
|
subject_meta, scope = normalize_subject(subject_meta, scope)
|
|
|
|
# 본문/보기/정답/해설
|
|
qtext_raw = _extract_section_fence(md, "문제 본문")
|
|
if qtext_raw is None:
|
|
errors.append("### 문제 본문 섹션 없음")
|
|
qtext_raw = ""
|
|
question_text = strip_outer_fence(qtext_raw).strip()
|
|
if not question_text:
|
|
errors.append("문제 본문 비어있음")
|
|
|
|
choices_raw = _extract_section_fence(md, "보기")
|
|
if choices_raw is None:
|
|
errors.append("### 보기 섹션 없음")
|
|
c1, c2, c3, c4 = "", "", "", ""
|
|
else:
|
|
c1, c2, c3, c4 = _parse_choices(strip_outer_fence(choices_raw))
|
|
for i, c in enumerate((c1, c2, c3, c4), 1):
|
|
if not c:
|
|
errors.append(f"보기 {i}번 비어있음")
|
|
|
|
correct_raw = _extract_section_fence(md, "정답")
|
|
if correct_raw is None:
|
|
# ### 정답 다음 단순 한 줄 (fenced 없는 경우 _extract_section_fence 가 fallback 처리)
|
|
errors.append("### 정답 섹션 없음")
|
|
correct = -1
|
|
else:
|
|
cc = _parse_correct(correct_raw)
|
|
if cc is None:
|
|
errors.append(f"정답 파싱 실패: {correct_raw[:50]!r}")
|
|
correct = -1
|
|
else:
|
|
correct = cc
|
|
|
|
expl_raw = _extract_section_fence(md, "해설")
|
|
explanation = strip_outer_fence(expl_raw).strip() if expl_raw else None
|
|
|
|
# 이미지 png 경로
|
|
png_path: Optional[Path] = None
|
|
if file_qnum is not None:
|
|
candidate = file_path.parent / f"{file_qnum}.png"
|
|
if candidate.exists():
|
|
png_path = candidate
|
|
|
|
# 이미지 4 케이스 검증
|
|
if image_marker and (png_path is None):
|
|
errors.append(f"이미지 마커 있는데 png 누락: {file_qnum}.png")
|
|
|
|
return ParsedQuestion(
|
|
file_path=file_path,
|
|
exam_round=exam_round,
|
|
exam_question_number=final_qnum,
|
|
subject=subject_meta,
|
|
scope=scope,
|
|
source_note=source_note,
|
|
question_text=question_text,
|
|
choice_1=c1,
|
|
choice_2=c2,
|
|
choice_3=c3,
|
|
choice_4=c4,
|
|
correct_choice=correct,
|
|
explanation=explanation,
|
|
image_marker=image_marker,
|
|
image_description=image_desc,
|
|
png_path=png_path,
|
|
parse_errors=errors,
|
|
)
|
|
|
|
|
|
# ─── HTTP client ───
|
|
|
|
|
|
def _client() -> httpx.Client:
|
|
if not TOKEN:
|
|
print("ERROR: STUDY_IMPORT_TOKEN 환경변수 필요. /api/auth/login 으로 발급.", file=sys.stderr)
|
|
sys.exit(2)
|
|
return httpx.Client(
|
|
base_url=API_BASE,
|
|
headers={"Authorization": f"Bearer {TOKEN}"},
|
|
timeout=30.0,
|
|
)
|
|
|
|
|
|
def verify_token(c: httpx.Client) -> dict:
|
|
"""GET /study-topics/{TOPIC_ID} 로 토큰 + 접근 권한 검사. 실패 시 exit."""
|
|
r = c.get(f"/study-topics/{TOPIC_ID}")
|
|
if r.status_code == 200:
|
|
return r.json()
|
|
print(f"ERROR: 토큰/권한 검증 실패 ({r.status_code}): {r.text[:200]}", file=sys.stderr)
|
|
sys.exit(3)
|
|
|
|
|
|
def fetch_existing_for_conflict(c: httpx.Client, exam_round: str) -> dict[int, dict]:
|
|
"""충돌 출력용 — qnum → 기존 행 메타.
|
|
|
|
/study-topics/{tid}/questions 의 StudyQuestionSummary 에는 exam_question_number
|
|
필드가 없어 통합뷰 detail (StudyTopicQuestionSummary, exam_question_number 포함)
|
|
을 사용한다. PR-11 에서 추가됨.
|
|
"""
|
|
out: dict[int, dict] = {}
|
|
r = c.get(f"/study-topics/{TOPIC_ID}")
|
|
r.raise_for_status()
|
|
body = r.json()
|
|
for it in body.get("sections", {}).get("questions", []):
|
|
if it.get("exam_round") == exam_round and it.get("exam_question_number"):
|
|
out[int(it["exam_question_number"])] = {
|
|
"id": it["id"],
|
|
"subject": it.get("subject"),
|
|
"scope": it.get("scope"),
|
|
}
|
|
return out
|
|
|
|
|
|
def post_question(c: httpx.Client, p: ParsedQuestion) -> dict:
|
|
body = {
|
|
"question_text": p.question_text,
|
|
"choice_1": p.choice_1,
|
|
"choice_2": p.choice_2,
|
|
"choice_3": p.choice_3,
|
|
"choice_4": p.choice_4,
|
|
"correct_choice": p.correct_choice,
|
|
"subject": p.subject or None,
|
|
"scope": p.scope,
|
|
"exam_name": None,
|
|
"exam_round": p.exam_round,
|
|
"exam_question_number": p.exam_question_number, # 항상 명시
|
|
"explanation": p.explanation,
|
|
"source_note": p.source_note,
|
|
"is_active": True,
|
|
}
|
|
r = c.post(f"/study-topics/{TOPIC_ID}/questions", json=body)
|
|
if r.status_code != 201:
|
|
raise RuntimeError(f"POST 실패 ({r.status_code}): {r.text[:300]}")
|
|
return r.json()
|
|
|
|
|
|
def post_image(c: httpx.Client, qid: int, png_path: Path) -> dict:
|
|
with png_path.open("rb") as f:
|
|
r = c.post(
|
|
f"/study-questions/{qid}/images",
|
|
files={"file": (png_path.name, f, "image/png")},
|
|
)
|
|
if r.status_code != 201:
|
|
raise RuntimeError(f"이미지 POST 실패 qid={qid} ({r.status_code}): {r.text[:300]}")
|
|
return r.json()
|
|
|
|
|
|
# ─── 리포트 ───
|
|
|
|
|
|
def write_image_report(out_dir: Path, exam_round: str, items: list[ParsedQuestion]) -> Path:
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
safe_round = exam_round.replace("/", "_").replace(" ", "_")
|
|
out_path = out_dir / f"{safe_round}_image_required.md"
|
|
|
|
classified: dict[str, list[ParsedQuestion]] = {
|
|
"attach_planned": [], "missing_png": [], "placeholder_skipped": []
|
|
}
|
|
for p in items:
|
|
s = classify_image(p).state
|
|
if s in classified:
|
|
classified[s].append(p)
|
|
|
|
lines: list[str] = []
|
|
lines.append(f"# {exam_round} — 이미지 필요 문항 리포트\n")
|
|
|
|
lines.append("## 사용자가 확인할 것\n")
|
|
n_missing = len(classified["missing_png"])
|
|
n_placeholder = len(classified["placeholder_skipped"])
|
|
n_attach = len(classified["attach_planned"])
|
|
|
|
if n_missing > 0:
|
|
lines.append(f"- **missing_png — {n_missing}건 ⚠** (이 항목이 있으면 apply 금지)")
|
|
for p in classified["missing_png"]:
|
|
lines.append(f" - {p.exam_question_number}번 / {p.subject} — {p.exam_question_number}.png 필요")
|
|
else:
|
|
lines.append("- missing_png — 0건")
|
|
|
|
if n_placeholder > 0:
|
|
lines.append(
|
|
f"- **placeholder_skipped — {n_placeholder}건** "
|
|
"(마커 없는데 png 있음 — 실제 도식이면 md 첫 줄에 `**⚠ 이미지 별도 업로드 필요**` 마커 추가 검토)"
|
|
)
|
|
for p in classified["placeholder_skipped"]:
|
|
lines.append(f" - {p.exam_question_number}번 / {p.subject}")
|
|
else:
|
|
lines.append("- placeholder_skipped — 0건")
|
|
|
|
lines.append(f"- **attach_planned — {n_attach}건** (apply 시 자동 첨부 예정)")
|
|
lines.append("")
|
|
|
|
lines.append("---\n")
|
|
|
|
if n_attach > 0:
|
|
lines.append(f"## 첨부 예정 (attach_planned) — {n_attach}건\n")
|
|
for p in classified["attach_planned"]:
|
|
lines.append(f"### {p.exam_question_number}번 / {p.subject}{' / ' + p.scope if p.scope else ''}")
|
|
lines.append(f"- md: {p.file_path}")
|
|
lines.append(f"- png: {p.png_path}")
|
|
lines.append(f"- 설명: {p.image_description or '(없음)'}")
|
|
lines.append("- 상태: attach_planned (apply 시 자동 첨부)\n")
|
|
|
|
if n_missing > 0:
|
|
lines.append(f"## 누락 (missing_png) — {n_missing}건 ⚠\n")
|
|
for p in classified["missing_png"]:
|
|
lines.append(f"### {p.exam_question_number}번 / {p.subject}{' / ' + p.scope if p.scope else ''}")
|
|
lines.append(f"- md: {p.file_path}")
|
|
lines.append(f"- png: MISSING ({p.exam_question_number}.png)")
|
|
lines.append(f"- 설명: {p.image_description or '(없음)'}")
|
|
lines.append("- 상태: missing_png (dry-run 실패. apply 금지)")
|
|
lines.append("- 처리: 이미지 파일 필요\n")
|
|
|
|
if n_placeholder > 0:
|
|
lines.append(f"## OCR placeholder (참고) — {n_placeholder}건\n")
|
|
lines.append("마커는 없지만 png 가 존재 — OCR 원본일 가능성. 마커 누락 실수인지 검토.\n")
|
|
for p in classified["placeholder_skipped"]:
|
|
lines.append(f"### {p.exam_question_number}번 / {p.subject}")
|
|
lines.append(f"- md: {p.file_path}")
|
|
lines.append(f"- png: {p.png_path}")
|
|
lines.append("- 마커: 없음 — 첨부 안 함\n")
|
|
|
|
out_path.write_text("\n".join(lines), encoding="utf-8")
|
|
return out_path
|
|
|
|
|
|
# ─── 메인 ───
|
|
|
|
|
|
def discover_md_files(round_dir: Path) -> list[Path]:
|
|
"""회차 폴더 안 모든 *.md 수집. 폴더 정렬 + 파일 번호 정렬."""
|
|
out: list[Path] = []
|
|
if not round_dir.is_dir():
|
|
return out
|
|
for sub in sorted(round_dir.iterdir()):
|
|
if not sub.is_dir():
|
|
continue
|
|
for f in sub.glob("*.md"):
|
|
out.append(f)
|
|
return sorted(out, key=lambda p: (p.parent.name, int(re.match(r"^(\d+)", p.name).group(1)) if re.match(r"^\d", p.name) else 9999))
|
|
|
|
|
|
def cmd_run(round_dir: Path, apply: bool, skip_existing: bool, continue_on_error: bool) -> int:
|
|
print(f"[{'APPLY' if apply else 'DRY-RUN'}] 회차 폴더: {round_dir}\n")
|
|
md_files = discover_md_files(round_dir)
|
|
if not md_files:
|
|
print("md 파일 없음.", file=sys.stderr)
|
|
return 1
|
|
print(f"발견 md: {len(md_files)}개\n")
|
|
|
|
# 1차 — 파싱
|
|
parsed: list[ParsedQuestion] = []
|
|
parse_failed: list[ParsedQuestion] = []
|
|
for f in md_files:
|
|
p = parse_md(f)
|
|
parsed.append(p)
|
|
if p.parse_errors:
|
|
parse_failed.append(p)
|
|
|
|
# 회차명 (모든 파싱 결과의 exam_round 가 같아야 함)
|
|
rounds = {p.exam_round for p in parsed if p.exam_round}
|
|
if len(rounds) != 1:
|
|
print(f"회차명 불일치: {rounds}", file=sys.stderr)
|
|
return 1
|
|
exam_round = next(iter(rounds))
|
|
|
|
# 분포 + 이미지 분류
|
|
by_subject: dict[str, int] = {}
|
|
for p in parsed:
|
|
by_subject[p.subject] = by_subject.get(p.subject, 0) + 1
|
|
print(f"회차: {exam_round}")
|
|
print(f"과목 분포: {', '.join(f'{k} {v}' for k, v in by_subject.items())}\n")
|
|
|
|
# 이미지 4 케이스
|
|
img_attach = [p for p in parsed if classify_image(p).state == "attach_planned"]
|
|
img_missing = [p for p in parsed if classify_image(p).state == "missing_png"]
|
|
img_placeholder = [p for p in parsed if classify_image(p).state == "placeholder_skipped"]
|
|
print(f"이미지 첨부 예정: {len(img_attach)}건 / 누락: {len(img_missing)}건 / placeholder: {len(img_placeholder)}건\n")
|
|
|
|
# 리포트 항상 생성
|
|
report_dir = Path(__file__).parent.parent / "import_reports"
|
|
report_path = write_image_report(report_dir, exam_round, parsed)
|
|
print(f"리포트 저장: {report_path}\n")
|
|
|
|
# 파싱 실패 출력
|
|
if parse_failed:
|
|
print(f"⚠ 파싱 실패: {len(parse_failed)}건")
|
|
for p in parse_failed:
|
|
print(f" {p.file_path}")
|
|
for e in p.parse_errors:
|
|
print(f" - {e}")
|
|
print()
|
|
|
|
# 토큰 검사 + 충돌 검사
|
|
with _client() as c:
|
|
verify_token(c)
|
|
existing = fetch_existing_for_conflict(c, exam_round)
|
|
|
|
conflicts = [p for p in parsed if p.exam_question_number in existing]
|
|
if conflicts:
|
|
print(f"⚠ 서버 충돌: {len(conflicts)}건 (같은 회차+문항번호 이미 등록)")
|
|
for p in conflicts:
|
|
ex = existing[p.exam_question_number]
|
|
print(f" 충돌: {p.exam_round} {p.exam_question_number}번")
|
|
print(f" - 기존: id={ex['id']}, subject={ex['subject']}, scope={ex.get('scope')}")
|
|
print(f" - import: file={p.file_path}, subject={p.subject}, scope={p.scope}")
|
|
print()
|
|
|
|
# Dry-run 종료 판정
|
|
if not apply:
|
|
# dry-run 결과 — 통과/실패 판정만 출력
|
|
print("─── DRY-RUN 결과 ───")
|
|
ok = (
|
|
not parse_failed
|
|
and not img_missing
|
|
and (not conflicts or skip_existing)
|
|
)
|
|
if ok:
|
|
target = [p for p in parsed if p.exam_question_number not in existing] if skip_existing else parsed
|
|
print(f"✓ apply 가능 — 등록 예정 {len(target)}건 (이미지 첨부 {sum(1 for p in target if classify_image(p).state == 'attach_planned')}건)")
|
|
else:
|
|
print("✗ apply 불가:")
|
|
if parse_failed:
|
|
print(f" - 파싱 실패 {len(parse_failed)}건 (md 수정 필요)")
|
|
if img_missing:
|
|
print(f" - 이미지 누락 {len(img_missing)}건 (png 추가 필요)")
|
|
if conflicts and not skip_existing:
|
|
print(f" - 서버 충돌 {len(conflicts)}건 (--skip-existing 으로 우회 가능)")
|
|
return 0 if ok else 1
|
|
|
|
# ─── APPLY ───
|
|
if parse_failed and not continue_on_error:
|
|
print("✗ 파싱 실패 있음. apply 중단.", file=sys.stderr)
|
|
return 1
|
|
if img_missing and not continue_on_error:
|
|
print("✗ 이미지 누락 있음. apply 중단.", file=sys.stderr)
|
|
return 1
|
|
if conflicts and not skip_existing:
|
|
print("✗ 서버 충돌 있음. --skip-existing 옵션 필요.", file=sys.stderr)
|
|
return 1
|
|
|
|
target = [
|
|
p for p in parsed
|
|
if p.exam_question_number not in existing
|
|
and not p.parse_errors
|
|
and classify_image(p).state != "missing_png"
|
|
]
|
|
print(f"\n등록 시작: {len(target)}건\n")
|
|
|
|
registered = 0
|
|
failed_q: list[tuple[ParsedQuestion, str]] = []
|
|
img_success = 0
|
|
img_fail: list[tuple[int, str]] = []
|
|
|
|
with _client() as c:
|
|
for p in target:
|
|
try:
|
|
resp = post_question(c, p)
|
|
qid = resp["id"]
|
|
registered += 1
|
|
print(f" ✓ {p.exam_question_number}번 등록 (qid={qid})")
|
|
|
|
if classify_image(p).state == "attach_planned" and p.png_path is not None:
|
|
try:
|
|
post_image(c, qid, p.png_path)
|
|
img_success += 1
|
|
print(f" + 이미지 첨부 OK ({p.png_path.name})")
|
|
except Exception as ie:
|
|
img_fail.append((qid, str(ie)))
|
|
print(f" ⚠ 이미지 첨부 실패 qid={qid}: {ie}", file=sys.stderr)
|
|
if not continue_on_error:
|
|
print(
|
|
f"\n abort. qid={qid} 까지 등록됨. "
|
|
f"실패한 png={p.png_path}. "
|
|
f"문제 수정 후 --skip-existing 으로 재실행 가능.",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
except Exception as e:
|
|
failed_q.append((p, str(e)))
|
|
print(f" ✗ {p.exam_question_number}번 실패: {e}", file=sys.stderr)
|
|
if not continue_on_error:
|
|
print(
|
|
f"\n abort. 이전 {registered}건 등록됨. "
|
|
f"실패: {p.exam_round} {p.exam_question_number}번 ({p.file_path}). "
|
|
f"수정 후 --skip-existing 으로 재실행 가능.",
|
|
file=sys.stderr,
|
|
)
|
|
return 2
|
|
|
|
print(f"\n─── APPLY 결과 ───")
|
|
print(f"등록: {registered} / 실패: {len(failed_q)}")
|
|
print(f"이미지 첨부: 성공 {img_success} / 실패 {len(img_fail)} / placeholder skip {len(img_placeholder)}")
|
|
print("임베딩 트리거 자동. 1분 후 ready 예상.")
|
|
return 0
|
|
|
|
|
|
def main() -> None:
|
|
p = argparse.ArgumentParser()
|
|
p.add_argument("round_dir", help="회차 폴더 (예: ~/Desktop/가스기사/2019년 2회)")
|
|
p.add_argument("--apply", action="store_true", help="실제 import (기본은 dry-run)")
|
|
p.add_argument("--skip-existing", action="store_true", help="서버 충돌 문항 skip")
|
|
p.add_argument("--continue-on-error", action="store_true", help="한 문항 실패해도 다음 진행")
|
|
args = p.parse_args()
|
|
|
|
round_dir = Path(args.round_dir).expanduser().resolve()
|
|
sys.exit(cmd_run(round_dir, apply=args.apply, skip_existing=args.skip_existing, continue_on_error=args.continue_on_error))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|