import asyncio import httpx from services.ollama_client import ollama_client from services.db_client import get_daily_qc_stats, get_issues_for_date from services.utils import load_prompt from config import settings REPORT_PROMPT_PATH = "prompts/daily_report.txt" async def _fetch_one(client: httpx.AsyncClient, url: str, params: dict, headers: dict): try: r = await client.get(url, params=params, headers=headers) if r.status_code == 200: return r.json() except Exception: pass return None async def _fetch_system1_data(date_str: str, token: str) -> dict: headers = {"Authorization": f"Bearer {token}"} params = {"date": date_str} base = settings.SYSTEM1_API_URL try: async with httpx.AsyncClient(timeout=15.0) as client: attendance, work_reports, patrol = await asyncio.gather( _fetch_one(client, f"{base}/api/attendance/daily-status", params, headers), _fetch_one(client, f"{base}/api/daily-work-reports/summary", params, headers), _fetch_one(client, f"{base}/api/patrol/today-status", params, headers), ) except Exception: attendance = work_reports = patrol = None return {"attendance": attendance, "work_reports": work_reports, "patrol": patrol} def _format_attendance(data) -> str: if not data: return "데이터 없음" if isinstance(data, dict): parts = [] for k, v in data.items(): parts.append(f" {k}: {v}") return "\n".join(parts) return str(data) def _format_work_reports(data) -> str: if not data: return "데이터 없음" return str(data) def _format_qc_issues(issues: list[dict], stats: dict) -> str: lines = [] lines.append(f"전체: {stats.get('total', 0)}건") lines.append(f"금일 신규: {stats.get('new_today', 0)}건") lines.append(f"진행중: {stats.get('in_progress', 0)}건") lines.append(f"완료: {stats.get('completed', 0)}건") lines.append(f"미검토: {stats.get('pending', 0)}건") if issues: lines.append("\n금일 신규 이슈:") for iss in issues[:10]: cat = iss.get("category", "") desc = (iss.get("description") or "")[:50] status = iss.get("review_status", "") lines.append(f" - [{cat}] {desc} (상태: {status})") return "\n".join(lines) def _format_patrol(data) -> str: if not data: return "데이터 없음" return str(data) async def generate_daily_report( date_str: str, project_id: int = None, token: str = "" ) -> dict: system1_data = await _fetch_system1_data(date_str, token) qc_stats = get_daily_qc_stats(date_str) qc_issues = get_issues_for_date(date_str) template = load_prompt(REPORT_PROMPT_PATH) prompt = template.format( date=date_str, attendance_data=_format_attendance(system1_data["attendance"]), work_report_data=_format_work_reports(system1_data["work_reports"]), qc_issue_data=_format_qc_issues(qc_issues, qc_stats), patrol_data=_format_patrol(system1_data["patrol"]), ) report_text = await ollama_client.generate_text(prompt) return { "date": date_str, "report": report_text, "stats": { "qc": qc_stats, "new_issues_count": len(qc_issues), }, }