Proto-AGI / report_enhancement.py
seawolf2357's picture
Upload report_enhancement.py
bbd74a7 verified
"""
AETHER Proto-AGI v2.2 - Report Enhancement Module
์ถœ์ฒ˜ ๊ด€๋ฆฌ, ์œ ํ˜•๋ณ„ ํฌ๋งทํ„ฐ, ์ „๋ฌธ ๋ณด๊ณ ์„œ ์ƒ์„ฑ๊ธฐ
์ด ํŒŒ์ผ์˜ ํด๋ž˜์Šค๋“ค์„ core.py์— ํ†ตํ•ฉํ•˜์„ธ์š”.
๊ธฐ์กด ReportGenerator ํด๋ž˜์Šค๋ฅผ ์•„๋ž˜ ProfessionalReportGenerator๋กœ ๊ต์ฒดํ•˜์„ธ์š”.
"""
import json
import re
from datetime import datetime
from dataclasses import dataclass, field
from typing import Optional, List, Dict, Any, Tuple
from enum import Enum
# ==================== SourceManager (P0-2) ====================
@dataclass
class WebSource:
"""์›น ๊ฒ€์ƒ‰ ์ถœ์ฒ˜"""
title: str
url: str
snippet: str
element: str # ์–ด๋А ์˜คํ–‰ ๋‹จ๊ณ„์—์„œ ์‚ฌ์šฉ๋˜์—ˆ๋Š”์ง€
timestamp: str = field(default_factory=lambda: datetime.now().isoformat())
relevance_score: float = 0.0
@dataclass
class KnowledgeSource:
"""์ง€์‹ DB ์ถœ์ฒ˜"""
knowledge_id: str
goal: str
quality_score: float
element: str
created_at: str
snippet: str
@dataclass
class CrawlSource:
"""URL ํฌ๋กค๋ง ์ถœ์ฒ˜"""
url: str
title: str
content_preview: str
crawled_at: str
success: bool
class SourceManager:
"""
์ถœ์ฒ˜/์ธ์šฉ ํ†ตํ•ฉ ๊ด€๋ฆฌ ์‹œ์Šคํ…œ
๋ชจ๋“  ๋ถ„์„ ๊ณผ์ •์—์„œ ์‚ฌ์šฉ๋œ ์ถœ์ฒ˜๋ฅผ ์ถ”์ ํ•˜๊ณ 
์ตœ์ข… ๋ณด๊ณ ์„œ์— ์ฒด๊ณ„์ ์œผ๋กœ ํ‘œ์‹œํ•ฉ๋‹ˆ๋‹ค.
"""
def __init__(self):
self.web_sources: List[WebSource] = []
self.knowledge_sources: List[KnowledgeSource] = []
self.crawl_sources: List[CrawlSource] = []
self._citation_counter = 0
self._citation_map: Dict[str, int] = {} # url/id -> citation number
def reset(self):
"""์ƒˆ ๋ถ„์„ ์„ธ์…˜ ์‹œ์ž‘ ์‹œ ์ดˆ๊ธฐํ™”"""
self.web_sources = []
self.knowledge_sources = []
self.crawl_sources = []
self._citation_counter = 0
self._citation_map = {}
# ===== ์ถœ์ฒ˜ ์ถ”๊ฐ€ =====
def add_web_source(self, title: str, url: str, snippet: str,
element: str, relevance_score: float = 0.0) -> int:
"""์›น ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ ์ถœ์ฒ˜ ์ถ”๊ฐ€"""
if url in self._citation_map:
return self._citation_map[url]
self._citation_counter += 1
citation_num = self._citation_counter
self._citation_map[url] = citation_num
source = WebSource(
title=title[:100] if title else "Unknown",
url=url,
snippet=snippet[:300] if snippet else "",
element=element,
relevance_score=relevance_score
)
self.web_sources.append(source)
return citation_num
def add_knowledge_source(self, knowledge_id: str, goal: str,
quality_score: float, element: str,
created_at: str, snippet: str) -> int:
"""์ง€์‹ DB ์ถœ์ฒ˜ ์ถ”๊ฐ€"""
if knowledge_id in self._citation_map:
return self._citation_map[knowledge_id]
self._citation_counter += 1
citation_num = self._citation_counter
self._citation_map[knowledge_id] = citation_num
source = KnowledgeSource(
knowledge_id=knowledge_id,
goal=goal[:100] if goal else "",
quality_score=quality_score,
element=element,
created_at=created_at,
snippet=snippet[:200] if snippet else ""
)
self.knowledge_sources.append(source)
return citation_num
def add_crawl_source(self, url: str, title: str,
content_preview: str, crawled_at: str,
success: bool = True) -> int:
"""ํฌ๋กค๋ง ์ถœ์ฒ˜ ์ถ”๊ฐ€"""
if url in self._citation_map:
return self._citation_map[url]
self._citation_counter += 1
citation_num = self._citation_counter
self._citation_map[url] = citation_num
source = CrawlSource(
url=url,
title=title[:100] if title else url[:50],
content_preview=content_preview[:200] if content_preview else "",
crawled_at=crawled_at,
success=success
)
self.crawl_sources.append(source)
return citation_num
def add_from_search_result(self, search_result: Dict, element: str):
"""Brave ๊ฒ€์ƒ‰ ๊ฒฐ๊ณผ์—์„œ ์ถœ์ฒ˜ ์ผ๊ด„ ์ถ”๊ฐ€"""
if not search_result or not search_result.get("success"):
return
for result in search_result.get("results", []):
self.add_web_source(
title=result.get("title", ""),
url=result.get("url", ""),
snippet=result.get("description", ""),
element=element
)
# ===== ์ถœ์ฒ˜ ์กฐํšŒ =====
def get_citation_number(self, url_or_id: str) -> Optional[int]:
"""URL ๋˜๋Š” ID๋กœ ์ธ์šฉ ๋ฒˆํ˜ธ ์กฐํšŒ"""
return self._citation_map.get(url_or_id)
def get_total_sources(self) -> Dict[str, int]:
"""์ถœ์ฒ˜ ์œ ํ˜•๋ณ„ ๊ฐœ์ˆ˜"""
return {
"web": len(self.web_sources),
"knowledge": len(self.knowledge_sources),
"crawl": len(self.crawl_sources),
"total": len(self.web_sources) + len(self.knowledge_sources) + len(self.crawl_sources)
}
def get_sources_by_element(self) -> Dict[str, List]:
"""์˜คํ–‰ ๋‹จ๊ณ„๋ณ„ ์‚ฌ์šฉ๋œ ์ถœ์ฒ˜"""
by_element = {"ๅœŸ": [], "้‡‘": [], "ๆฐด": [], "ๆœจ": [], "็ซ": []}
for src in self.web_sources:
if src.element in by_element:
by_element[src.element].append({
"type": "web",
"title": src.title,
"url": src.url
})
for src in self.knowledge_sources:
if src.element in by_element:
by_element[src.element].append({
"type": "knowledge",
"id": src.knowledge_id,
"quality": src.quality_score
})
return by_element
# ===== ์ถœ๋ ฅ ํฌ๋งทํŒ… =====
def format_references_section(self, language: str = "EN") -> str:
"""์ตœ์ข… ๋ณด๊ณ ์„œ์šฉ ์ถœ์ฒ˜ ์„น์…˜ ์ƒ์„ฑ"""
if language == "KR":
return self._format_references_kr()
return self._format_references_en()
def _format_references_en(self) -> str:
"""์˜์–ด ์ถœ์ฒ˜ ์„น์…˜"""
lines = ["## ๐Ÿ“š Sources & References", ""]
total = self.get_total_sources()
if total["total"] == 0:
lines.append("*No external sources used in this analysis.*")
return "\n".join(lines)
# ์š”์•ฝ ํ†ต๊ณ„
lines.append(f"**Total Sources**: {total['total']} "
f"(Web: {total['web']}, Knowledge DB: {total['knowledge']}, Crawl: {total['crawl']})")
lines.append("")
# ์›น ์ถœ์ฒ˜
if self.web_sources:
lines.append("### ๐ŸŒ Web Sources")
lines.append("| # | Title | Source | Element |")
lines.append("|---|-------|--------|---------|")
for i, src in enumerate(self.web_sources[:10], 1):
domain = self._extract_domain(src.url)
lines.append(f"| [{i}] | {src.title[:40]}{'...' if len(src.title) > 40 else ''} | [{domain}]({src.url}) | {src.element} |")
if len(self.web_sources) > 10:
lines.append(f"| ... | *+{len(self.web_sources) - 10} more sources* | | |")
lines.append("")
# ์ง€์‹ DB ์ถœ์ฒ˜
if self.knowledge_sources:
lines.append("### ๐Ÿง  Knowledge Database")
lines.append("| # | Related Goal | Quality | Element |")
lines.append("|---|--------------|---------|---------|")
for i, src in enumerate(self.knowledge_sources[:5], 1):
quality_bar = 'โ–ˆ' * int(src.quality_score * 5) + 'โ–‘' * (5 - int(src.quality_score * 5))
lines.append(f"| [K{i}] | {src.goal[:35]}{'...' if len(src.goal) > 35 else ''} | {quality_bar} {src.quality_score:.0%} | {src.element} |")
lines.append("")
# ํฌ๋กค๋ง ์ถœ์ฒ˜
if self.crawl_sources:
lines.append("### ๐Ÿ”— Crawled Pages")
for i, src in enumerate(self.crawl_sources[:5], 1):
status = "โœ…" if src.success else "โš ๏ธ"
lines.append(f"- {status} [{src.title[:50]}]({src.url})")
lines.append("")
return "\n".join(lines)
def _format_references_kr(self) -> str:
"""ํ•œ๊ตญ์–ด ์ถœ์ฒ˜ ์„น์…˜"""
lines = ["## ๐Ÿ“š ์ถœ์ฒ˜ ๋ฐ ์ฐธ๊ณ ์ž๋ฃŒ", ""]
total = self.get_total_sources()
if total["total"] == 0:
lines.append("*์ด ๋ถ„์„์— ์‚ฌ์šฉ๋œ ์™ธ๋ถ€ ์ถœ์ฒ˜๊ฐ€ ์—†์Šต๋‹ˆ๋‹ค.*")
return "\n".join(lines)
# ์š”์•ฝ ํ†ต๊ณ„
lines.append(f"**์ด ์ถœ์ฒ˜**: {total['total']}๊ฑด "
f"(์›น: {total['web']}, ์ง€์‹DB: {total['knowledge']}, ํฌ๋กค๋ง: {total['crawl']})")
lines.append("")
# ์›น ์ถœ์ฒ˜
if self.web_sources:
lines.append("### ๐ŸŒ ์›น ๊ฒ€์ƒ‰ ์ถœ์ฒ˜")
lines.append("| # | ์ œ๋ชฉ | ์ถœ์ฒ˜ | ๋‹จ๊ณ„ |")
lines.append("|---|------|------|------|")
for i, src in enumerate(self.web_sources[:10], 1):
domain = self._extract_domain(src.url)
element_name = {"ๅœŸ": "๊ฐ๋…", "้‡‘": "๋น„ํ‰", "ๆฐด": "๋ฆฌ์„œ์น˜", "ๆœจ": "์ฐฝ๋ฐœ", "็ซ": "์‹คํ–‰"}.get(src.element, src.element)
lines.append(f"| [{i}] | {src.title[:40]}{'...' if len(src.title) > 40 else ''} | [{domain}]({src.url}) | {src.element}({element_name}) |")
if len(self.web_sources) > 10:
lines.append(f"| ... | *+{len(self.web_sources) - 10}๊ฑด ์ถ”๊ฐ€* | | |")
lines.append("")
# ์ง€์‹ DB ์ถœ์ฒ˜
if self.knowledge_sources:
lines.append("### ๐Ÿง  ์ง€์‹ ๋ฐ์ดํ„ฐ๋ฒ ์ด์Šค")
lines.append("| # | ๊ด€๋ จ ๋ชฉํ‘œ | ํ’ˆ์งˆ | ๋‹จ๊ณ„ |")
lines.append("|---|----------|------|------|")
for i, src in enumerate(self.knowledge_sources[:5], 1):
quality_bar = 'โ–ˆ' * int(src.quality_score * 5) + 'โ–‘' * (5 - int(src.quality_score * 5))
lines.append(f"| [K{i}] | {src.goal[:35]}{'...' if len(src.goal) > 35 else ''} | {quality_bar} {src.quality_score:.0%} | {src.element} |")
lines.append("")
# ํฌ๋กค๋ง ์ถœ์ฒ˜
if self.crawl_sources:
lines.append("### ๐Ÿ”— ํฌ๋กค๋ง ํŽ˜์ด์ง€")
for i, src in enumerate(self.crawl_sources[:5], 1):
status = "โœ…" if src.success else "โš ๏ธ"
lines.append(f"- {status} [{src.title[:50]}]({src.url})")
lines.append("")
return "\n".join(lines)
def format_inline_citation(self, url_or_id: str) -> str:
"""๋ณธ๋ฌธ ๋‚ด ์ธ๋ผ์ธ ์ธ์šฉ ํ˜•์‹ ์ƒ์„ฑ [1], [K2] ๋“ฑ"""
num = self._citation_map.get(url_or_id)
if num is None:
return ""
# ์ง€์‹ DB ์ถœ์ฒ˜์ธ์ง€ ํ™•์ธ
for src in self.knowledge_sources:
if src.knowledge_id == url_or_id:
return f"[K{num}]"
return f"[{num}]"
def _extract_domain(self, url: str) -> str:
"""URL์—์„œ ๋„๋ฉ”์ธ ์ถ”์ถœ"""
try:
from urllib.parse import urlparse
parsed = urlparse(url)
domain = parsed.netloc
# www. ์ œ๊ฑฐ
if domain.startswith("www."):
domain = domain[4:]
return domain[:30]
except:
return url[:30]
def to_dict(self) -> Dict:
"""์ง๋ ฌํ™”์šฉ ๋”•์…”๋„ˆ๋ฆฌ ๋ณ€ํ™˜"""
return {
"web_sources": [
{"title": s.title, "url": s.url, "element": s.element, "snippet": s.snippet}
for s in self.web_sources
],
"knowledge_sources": [
{"id": s.knowledge_id, "goal": s.goal, "quality": s.quality_score, "element": s.element}
for s in self.knowledge_sources
],
"crawl_sources": [
{"url": s.url, "title": s.title, "success": s.success}
for s in self.crawl_sources
],
"total": self.get_total_sources()
}
# ==================== TypedOutputFormatter (P1-1) ====================
class TypedOutputFormatter:
"""
์งˆ๋ฌธ ์œ ํ˜•๋ณ„ ๋งž์ถค ์ถœ๋ ฅ ํฌ๋งทํ„ฐ
์˜ˆ์ธก, ๋น„๊ต, ์ „๋žต, ๋ถ„์„, ๋ฐœ๋ช…, ์Šคํ† ๋ฆฌ, ๋ ˆ์‹œํ”ผ ๋“ฑ
๊ฐ ์œ ํ˜•์— ์ตœ์ ํ™”๋œ ์ถœ๋ ฅ ๊ตฌ์กฐ๋ฅผ ์ œ๊ณตํ•ฉ๋‹ˆ๋‹ค.
"""
# ์งˆ๋ฌธ ์œ ํ˜•๋ณ„ ์•„์ด์ฝ˜
TYPE_ICONS = {
"prediction": "๐Ÿ”ฎ",
"comparison": "โš–๏ธ",
"strategy": "๐ŸŽฏ",
"analysis": "๐Ÿ”ฌ",
"invention": "๐Ÿ’ก",
"story": "๐Ÿ“–",
"recipe": "๐Ÿณ",
"general": "๐Ÿ“‹"
}
# ํ•œ๊ตญ์–ด ์œ ํ˜•๋ช…
TYPE_NAMES_KR = {
"prediction": "์˜ˆ์ธก ๋ถ„์„",
"comparison": "๋น„๊ต ๋ถ„์„",
"strategy": "์ „๋žต ์ˆ˜๋ฆฝ",
"analysis": "์‹ฌ์ธต ๋ถ„์„",
"invention": "๋ฐœ๋ช…/ํŠนํ—ˆ",
"story": "์Šคํ† ๋ฆฌ/์ฐฝ์ž‘",
"recipe": "๋ ˆ์‹œํ”ผ/์š”๋ฆฌ",
"general": "์ผ๋ฐ˜ ๋ถ„์„"
}
TYPE_NAMES_EN = {
"prediction": "Prediction Analysis",
"comparison": "Comparative Analysis",
"strategy": "Strategic Planning",
"analysis": "In-depth Analysis",
"invention": "Invention/Patent",
"story": "Story/Creative",
"recipe": "Recipe/Cooking",
"general": "General Analysis"
}
@classmethod
def format(cls, question_type: str, data: Dict, language: str = "EN") -> str:
"""์งˆ๋ฌธ ์œ ํ˜•์— ๋งž๋Š” ํฌ๋งท ์ ์šฉ"""
formatters = {
"prediction": cls.format_prediction,
"comparison": cls.format_comparison,
"strategy": cls.format_strategy,
"analysis": cls.format_analysis,
"invention": cls.format_invention,
"story": cls.format_story,
"recipe": cls.format_recipe,
"general": cls.format_general
}
formatter = formatters.get(question_type, cls.format_general)
return formatter(data, language)
@classmethod
def get_type_header(cls, question_type: str, language: str = "EN") -> str:
"""์งˆ๋ฌธ ์œ ํ˜• ํ—ค๋” ์ƒ์„ฑ"""
icon = cls.TYPE_ICONS.get(question_type, "๐Ÿ“‹")
if language == "KR":
name = cls.TYPE_NAMES_KR.get(question_type, "์ผ๋ฐ˜ ๋ถ„์„")
return f"{icon} **๋ถ„์„ ์œ ํ˜•**: {name}"
else:
name = cls.TYPE_NAMES_EN.get(question_type, "General Analysis")
return f"{icon} **Analysis Type**: {name}"
# ===== ์˜ˆ์ธก ๋ถ„์„ ํฌ๋งท =====
@classmethod
def format_prediction(cls, data: Dict, language: str = "EN") -> str:
"""์˜ˆ์ธก ๋ถ„์„์šฉ ํฌ๋งท - ์‹œ๋‚˜๋ฆฌ์˜ค, ํ™•๋ฅ , ๋ณ€์ˆ˜"""
if language == "KR":
return cls._format_prediction_kr(data)
return cls._format_prediction_en(data)
@classmethod
def _format_prediction_en(cls, data: Dict) -> str:
lines = ["### ๐Ÿ”ฎ Prediction Framework", ""]
# ์‹œ๋‚˜๋ฆฌ์˜ค ํ…Œ์ด๋ธ”
scenarios = data.get("scenarios", [])
if scenarios:
lines.append("#### Scenario Analysis")
lines.append("| Scenario | Probability | Key Drivers | Timeline |")
lines.append("|----------|-------------|-------------|----------|")
for i, s in enumerate(scenarios[:4], 1):
prob = s.get("probability", "?")
drivers = s.get("drivers", "-")[:30]
timeline = s.get("timeline", "-")
lines.append(f"| **{s.get('name', f'Scenario {i}')}** | {prob} | {drivers} | {timeline} |")
lines.append("")
# ํ•ต์‹ฌ ๋ณ€์ˆ˜
variables = data.get("key_variables", [])
if variables:
lines.append("#### Key Variables & Uncertainties")
for v in variables[:5]:
impact = v.get("impact", "medium")
icon = "๐Ÿ”ด" if impact == "high" else "๐ŸŸก" if impact == "medium" else "๐ŸŸข"
lines.append(f"- {icon} **{v.get('name', 'Variable')}**: {v.get('description', '')[:60]}")
lines.append("")
# ์˜ˆ์ธก ์‹ ๋ขฐ๋„
confidence = data.get("prediction_confidence", 0)
if confidence:
bar = 'โ–ˆ' * int(confidence * 10) + 'โ–‘' * (10 - int(confidence * 10))
lines.append(f"**Prediction Confidence**: {bar} {confidence:.0%}")
lines.append("")
return "\n".join(lines)
@classmethod
def _format_prediction_kr(cls, data: Dict) -> str:
lines = ["### ๐Ÿ”ฎ ์˜ˆ์ธก ํ”„๋ ˆ์ž„์›Œํฌ", ""]
# ์‹œ๋‚˜๋ฆฌ์˜ค ํ…Œ์ด๋ธ”
scenarios = data.get("scenarios", [])
if scenarios:
lines.append("#### ์‹œ๋‚˜๋ฆฌ์˜ค ๋ถ„์„")
lines.append("| ์‹œ๋‚˜๋ฆฌ์˜ค | ํ™•๋ฅ  | ํ•ต์‹ฌ ๋™์ธ | ์‹œ์  |")
lines.append("|----------|------|----------|------|")
for i, s in enumerate(scenarios[:4], 1):
prob = s.get("probability", "?")
drivers = s.get("drivers", "-")[:30]
timeline = s.get("timeline", "-")
lines.append(f"| **{s.get('name', f'์‹œ๋‚˜๋ฆฌ์˜ค {i}')}** | {prob} | {drivers} | {timeline} |")
lines.append("")
# ํ•ต์‹ฌ ๋ณ€์ˆ˜
variables = data.get("key_variables", [])
if variables:
lines.append("#### ํ•ต์‹ฌ ๋ณ€์ˆ˜ ๋ฐ ๋ถˆํ™•์‹ค์„ฑ")
for v in variables[:5]:
impact = v.get("impact", "medium")
icon = "๐Ÿ”ด" if impact == "high" else "๐ŸŸก" if impact == "medium" else "๐ŸŸข"
lines.append(f"- {icon} **{v.get('name', '๋ณ€์ˆ˜')}**: {v.get('description', '')[:60]}")
lines.append("")
# ์˜ˆ์ธก ์‹ ๋ขฐ๋„
confidence = data.get("prediction_confidence", 0)
if confidence:
bar = 'โ–ˆ' * int(confidence * 10) + 'โ–‘' * (10 - int(confidence * 10))
lines.append(f"**์˜ˆ์ธก ์‹ ๋ขฐ๋„**: {bar} {confidence:.0%}")
lines.append("")
return "\n".join(lines)
# ===== ๋น„๊ต ๋ถ„์„ ํฌ๋งท =====
@classmethod
def format_comparison(cls, data: Dict, language: str = "EN") -> str:
"""๋น„๊ต ๋ถ„์„์šฉ ํฌ๋งท - ๋งคํŠธ๋ฆญ์Šค, ์žฅ๋‹จ์ , ๊ถŒ์žฅ"""
if language == "KR":
return cls._format_comparison_kr(data)
return cls._format_comparison_en(data)
@classmethod
def _format_comparison_en(cls, data: Dict) -> str:
lines = ["### โš–๏ธ Comparison Matrix", ""]
# ๋น„๊ต ๋Œ€์ƒ
items = data.get("comparison_items", [])
criteria = data.get("criteria", [])
if items and criteria:
# ํ—ค๋” ์ƒ์„ฑ
header = "| Criteria |"
separator = "|----------|"
for item in items[:4]:
header += f" {item.get('name', 'Option')[:15]} |"
separator += "--------|"
lines.append(header)
lines.append(separator)
# ๊ฐ ๊ธฐ์ค€๋ณ„ ๋น„๊ต
for crit in criteria[:6]:
row = f"| **{crit.get('name', 'Criterion')[:20]}** |"
for item in items[:4]:
score = item.get("scores", {}).get(crit.get("name", ""), "-")
if isinstance(score, (int, float)):
score = f"{'โญ' * int(score)}" if score <= 5 else str(score)
row += f" {str(score)[:10]} |"
lines.append(row)
lines.append("")
# ์žฅ๋‹จ์ 
pros_cons = data.get("pros_cons", {})
if pros_cons:
lines.append("#### Pros & Cons Summary")
for item_name, pc in pros_cons.items():
lines.append(f"**{item_name}**")
if pc.get("pros"):
lines.append(f" - โœ… Pros: {', '.join(pc['pros'][:3])}")
if pc.get("cons"):
lines.append(f" - โŒ Cons: {', '.join(pc['cons'][:3])}")
lines.append("")
# ๊ถŒ์žฅ์‚ฌํ•ญ
recommendation = data.get("recommendation", "")
if recommendation:
lines.append(f"#### ๐Ÿ’ก Recommendation")
lines.append(f"> {recommendation}")
lines.append("")
return "\n".join(lines)
@classmethod
def _format_comparison_kr(cls, data: Dict) -> str:
lines = ["### โš–๏ธ ๋น„๊ต ๋งคํŠธ๋ฆญ์Šค", ""]
# ๋น„๊ต ๋Œ€์ƒ
items = data.get("comparison_items", [])
criteria = data.get("criteria", [])
if items and criteria:
header = "| ๊ธฐ์ค€ |"
separator = "|------|"
for item in items[:4]:
header += f" {item.get('name', '์˜ต์…˜')[:15]} |"
separator += "--------|"
lines.append(header)
lines.append(separator)
for crit in criteria[:6]:
row = f"| **{crit.get('name', '๊ธฐ์ค€')[:20]}** |"
for item in items[:4]:
score = item.get("scores", {}).get(crit.get("name", ""), "-")
if isinstance(score, (int, float)):
score = f"{'โญ' * int(score)}" if score <= 5 else str(score)
row += f" {str(score)[:10]} |"
lines.append(row)
lines.append("")
# ์žฅ๋‹จ์ 
pros_cons = data.get("pros_cons", {})
if pros_cons:
lines.append("#### ์žฅ๋‹จ์  ์š”์•ฝ")
for item_name, pc in pros_cons.items():
lines.append(f"**{item_name}**")
if pc.get("pros"):
lines.append(f" - โœ… ์žฅ์ : {', '.join(pc['pros'][:3])}")
if pc.get("cons"):
lines.append(f" - โŒ ๋‹จ์ : {', '.join(pc['cons'][:3])}")
lines.append("")
# ๊ถŒ์žฅ์‚ฌํ•ญ
recommendation = data.get("recommendation", "")
if recommendation:
lines.append(f"#### ๐Ÿ’ก ๊ถŒ์žฅ์‚ฌํ•ญ")
lines.append(f"> {recommendation}")
lines.append("")
return "\n".join(lines)
# ===== ์ „๋žต ๋ถ„์„ ํฌ๋งท =====
@classmethod
def format_strategy(cls, data: Dict, language: str = "EN") -> str:
"""์ „๋žต ๋ถ„์„์šฉ ํฌ๋งท - ๋ชฉํ‘œ, ๋กœ๋“œ๋งต, KPI"""
if language == "KR":
return cls._format_strategy_kr(data)
return cls._format_strategy_en(data)
@classmethod
def _format_strategy_en(cls, data: Dict) -> str:
lines = ["### ๐ŸŽฏ Strategic Framework", ""]
# ์ „๋žต ๋ชฉํ‘œ
objectives = data.get("objectives", [])
if objectives:
lines.append("#### Strategic Objectives")
for i, obj in enumerate(objectives[:5], 1):
lines.append(f"{i}. **{obj.get('name', 'Objective')}**: {obj.get('description', '')[:60]}")
lines.append("")
# ์‹คํ–‰ ๋กœ๋“œ๋งต
roadmap = data.get("roadmap", [])
if roadmap:
lines.append("#### Implementation Roadmap")
lines.append("| Phase | Timeline | Key Actions | Deliverables |")
lines.append("|-------|----------|-------------|--------------|")
for phase in roadmap[:5]:
actions = phase.get("actions", ["-"])
deliverables = phase.get("deliverables", ["-"])
lines.append(f"| {phase.get('name', 'Phase')} | {phase.get('timeline', '-')} | {', '.join(actions[:2])} | {', '.join(deliverables[:2])} |")
lines.append("")
# KPI
kpis = data.get("kpis", [])
if kpis:
lines.append("#### Key Performance Indicators")
for kpi in kpis[:5]:
target = kpi.get("target", "TBD")
lines.append(f"- ๐Ÿ“Š **{kpi.get('name', 'KPI')}**: Target {target}")
lines.append("")
return "\n".join(lines)
@classmethod
def _format_strategy_kr(cls, data: Dict) -> str:
lines = ["### ๐ŸŽฏ ์ „๋žต ํ”„๋ ˆ์ž„์›Œํฌ", ""]
# ์ „๋žต ๋ชฉํ‘œ
objectives = data.get("objectives", [])
if objectives:
lines.append("#### ์ „๋žต ๋ชฉํ‘œ")
for i, obj in enumerate(objectives[:5], 1):
lines.append(f"{i}. **{obj.get('name', '๋ชฉํ‘œ')}**: {obj.get('description', '')[:60]}")
lines.append("")
# ์‹คํ–‰ ๋กœ๋“œ๋งต
roadmap = data.get("roadmap", [])
if roadmap:
lines.append("#### ์‹คํ–‰ ๋กœ๋“œ๋งต")
lines.append("| ๋‹จ๊ณ„ | ๊ธฐ๊ฐ„ | ์ฃผ์š” ํ™œ๋™ | ์‚ฐ์ถœ๋ฌผ |")
lines.append("|------|------|----------|--------|")
for phase in roadmap[:5]:
actions = phase.get("actions", ["-"])
deliverables = phase.get("deliverables", ["-"])
lines.append(f"| {phase.get('name', '๋‹จ๊ณ„')} | {phase.get('timeline', '-')} | {', '.join(actions[:2])} | {', '.join(deliverables[:2])} |")
lines.append("")
# KPI
kpis = data.get("kpis", [])
if kpis:
lines.append("#### ํ•ต์‹ฌ ์„ฑ๊ณผ ์ง€ํ‘œ (KPI)")
for kpi in kpis[:5]:
target = kpi.get("target", "TBD")
lines.append(f"- ๐Ÿ“Š **{kpi.get('name', 'KPI')}**: ๋ชฉํ‘œ {target}")
lines.append("")
return "\n".join(lines)
# ===== ์ผ๋ฐ˜ ๋ถ„์„ ํฌ๋งท =====
@classmethod
def format_analysis(cls, data: Dict, language: str = "EN") -> str:
"""์ผ๋ฐ˜ ๋ถ„์„์šฉ ํฌ๋งท"""
if language == "KR":
return cls._format_analysis_kr(data)
return cls._format_analysis_en(data)
@classmethod
def _format_analysis_en(cls, data: Dict) -> str:
lines = ["### ๐Ÿ”ฌ Analysis Framework", ""]
# ์ฃผ์š” ๋ฐœ๊ฒฌ
findings = data.get("findings", [])
if findings:
lines.append("#### Key Findings")
for i, f in enumerate(findings[:5], 1):
lines.append(f"{i}. {f}")
lines.append("")
# ์›์ธ ๋ถ„์„
causes = data.get("causes", [])
if causes:
lines.append("#### Root Cause Analysis")
for c in causes[:4]:
lines.append(f"- **{c.get('cause', 'Cause')}** โ†’ {c.get('effect', 'Effect')[:50]}")
lines.append("")
# ๋ฐ์ดํ„ฐ ํฌ์ธํŠธ
data_points = data.get("data_points", [])
if data_points:
lines.append("#### Supporting Data")
for dp in data_points[:5]:
lines.append(f"- ๐Ÿ“ˆ {dp}")
lines.append("")
return "\n".join(lines)
@classmethod
def _format_analysis_kr(cls, data: Dict) -> str:
lines = ["### ๐Ÿ”ฌ ๋ถ„์„ ํ”„๋ ˆ์ž„์›Œํฌ", ""]
# ์ฃผ์š” ๋ฐœ๊ฒฌ
findings = data.get("findings", [])
if findings:
lines.append("#### ์ฃผ์š” ๋ฐœ๊ฒฌ์‚ฌํ•ญ")
for i, f in enumerate(findings[:5], 1):
lines.append(f"{i}. {f}")
lines.append("")
# ์›์ธ ๋ถ„์„
causes = data.get("causes", [])
if causes:
lines.append("#### ์›์ธ ๋ถ„์„")
for c in causes[:4]:
lines.append(f"- **{c.get('cause', '์›์ธ')}** โ†’ {c.get('effect', '๊ฒฐ๊ณผ')[:50]}")
lines.append("")
# ๋ฐ์ดํ„ฐ ํฌ์ธํŠธ
data_points = data.get("data_points", [])
if data_points:
lines.append("#### ๊ทผ๊ฑฐ ๋ฐ์ดํ„ฐ")
for dp in data_points[:5]:
lines.append(f"- ๐Ÿ“ˆ {dp}")
lines.append("")
return "\n".join(lines)
# ===== ๋ฐœ๋ช…/ํŠนํ—ˆ ํฌ๋งท =====
@classmethod
def format_invention(cls, data: Dict, language: str = "EN") -> str:
"""๋ฐœ๋ช…/ํŠนํ—ˆ์šฉ ํฌ๋งท"""
if language == "KR":
lines = ["### ๐Ÿ’ก ๋ฐœ๋ช… ๊ฐœ์š”", ""]
if data.get("invention_name"):
lines.append(f"**๋ฐœ๋ช…์˜ ๋ช…์นญ**: {data['invention_name']}")
lines.append("")
if data.get("problem"):
lines.append("#### ํ•ด๊ฒฐํ•˜๊ณ ์ž ํ•˜๋Š” ๊ณผ์ œ")
lines.append(data["problem"])
lines.append("")
if data.get("solution"):
lines.append("#### ๊ณผ์ œ ํ•ด๊ฒฐ ์ˆ˜๋‹จ")
lines.append(data["solution"])
lines.append("")
claims = data.get("claims", [])
if claims:
lines.append("#### ์ฒญ๊ตฌํ•ญ ์ดˆ์•ˆ")
for i, claim in enumerate(claims[:5], 1):
lines.append(f"**์ฒญ๊ตฌํ•ญ {i}**: {claim}")
lines.append("")
if data.get("advantages"):
lines.append("#### ๋ฐœ๋ช…์˜ ํšจ๊ณผ")
for adv in data["advantages"][:5]:
lines.append(f"- {adv}")
lines.append("")
else:
lines = ["### ๐Ÿ’ก Invention Overview", ""]
if data.get("invention_name"):
lines.append(f"**Title**: {data['invention_name']}")
lines.append("")
if data.get("problem"):
lines.append("#### Problem to Solve")
lines.append(data["problem"])
lines.append("")
if data.get("solution"):
lines.append("#### Solution")
lines.append(data["solution"])
lines.append("")
claims = data.get("claims", [])
if claims:
lines.append("#### Draft Claims")
for i, claim in enumerate(claims[:5], 1):
lines.append(f"**Claim {i}**: {claim}")
lines.append("")
if data.get("advantages"):
lines.append("#### Advantages")
for adv in data["advantages"][:5]:
lines.append(f"- {adv}")
lines.append("")
return "\n".join(lines)
# ===== ์Šคํ† ๋ฆฌ/์ฐฝ์ž‘ ํฌ๋งท =====
@classmethod
def format_story(cls, data: Dict, language: str = "EN") -> str:
"""์Šคํ† ๋ฆฌ/์ฐฝ์ž‘์šฉ ํฌ๋งท"""
if language == "KR":
lines = ["### ๐Ÿ“– ์Šคํ† ๋ฆฌ ๊ตฌ์กฐ", ""]
if data.get("logline"):
lines.append(f"**๋กœ๊ทธ๋ผ์ธ**: {data['logline']}")
lines.append("")
characters = data.get("characters", [])
if characters:
lines.append("#### ๋“ฑ์žฅ์ธ๋ฌผ")
for char in characters[:5]:
lines.append(f"- **{char.get('name', '์บ๋ฆญํ„ฐ')}**: {char.get('description', '')[:60]}")
lines.append("")
structure = data.get("structure", {})
if structure:
lines.append("#### ํ”Œ๋กฏ ๊ตฌ์กฐ")
for key, value in [("setup", "๋ฐœ๋‹จ"), ("conflict", "๊ฐˆ๋“ฑ"), ("climax", "์ ˆ์ •"), ("resolution", "๊ฒฐ๋ง")]:
if structure.get(key):
lines.append(f"- **{value}**: {structure[key][:80]}")
lines.append("")
if data.get("theme"):
lines.append(f"#### ์ฃผ์ œ")
lines.append(data["theme"])
lines.append("")
else:
lines = ["### ๐Ÿ“– Story Structure", ""]
if data.get("logline"):
lines.append(f"**Logline**: {data['logline']}")
lines.append("")
characters = data.get("characters", [])
if characters:
lines.append("#### Characters")
for char in characters[:5]:
lines.append(f"- **{char.get('name', 'Character')}**: {char.get('description', '')[:60]}")
lines.append("")
structure = data.get("structure", {})
if structure:
lines.append("#### Plot Structure")
for key in ["setup", "conflict", "climax", "resolution"]:
if structure.get(key):
lines.append(f"- **{key.capitalize()}**: {structure[key][:80]}")
lines.append("")
if data.get("theme"):
lines.append(f"#### Theme")
lines.append(data["theme"])
lines.append("")
return "\n".join(lines)
# ===== ๋ ˆ์‹œํ”ผ ํฌ๋งท =====
@classmethod
def format_recipe(cls, data: Dict, language: str = "EN") -> str:
"""๋ ˆ์‹œํ”ผ์šฉ ํฌ๋งท"""
if language == "KR":
lines = ["### ๐Ÿณ ๋ ˆ์‹œํ”ผ", ""]
if data.get("dish_name"):
lines.append(f"**์š”๋ฆฌ๋ช…**: {data['dish_name']}")
if data.get("servings"):
lines.append(f"**๋ถ„๋Ÿ‰**: {data['servings']}")
if data.get("time"):
lines.append(f"**์กฐ๋ฆฌ์‹œ๊ฐ„**: {data['time']}")
lines.append("")
ingredients = data.get("ingredients", [])
if ingredients:
lines.append("#### ์žฌ๋ฃŒ")
for ing in ingredients:
lines.append(f"- {ing}")
lines.append("")
steps = data.get("steps", [])
if steps:
lines.append("#### ์กฐ๋ฆฌ ์ˆœ์„œ")
for i, step in enumerate(steps, 1):
lines.append(f"{i}. {step}")
lines.append("")
tips = data.get("tips", [])
if tips:
lines.append("#### ๐Ÿ’ก ์š”๋ฆฌ ํŒ")
for tip in tips[:3]:
lines.append(f"- {tip}")
lines.append("")
else:
lines = ["### ๐Ÿณ Recipe", ""]
if data.get("dish_name"):
lines.append(f"**Dish**: {data['dish_name']}")
if data.get("servings"):
lines.append(f"**Servings**: {data['servings']}")
if data.get("time"):
lines.append(f"**Time**: {data['time']}")
lines.append("")
ingredients = data.get("ingredients", [])
if ingredients:
lines.append("#### Ingredients")
for ing in ingredients:
lines.append(f"- {ing}")
lines.append("")
steps = data.get("steps", [])
if steps:
lines.append("#### Instructions")
for i, step in enumerate(steps, 1):
lines.append(f"{i}. {step}")
lines.append("")
tips = data.get("tips", [])
if tips:
lines.append("#### ๐Ÿ’ก Tips")
for tip in tips[:3]:
lines.append(f"- {tip}")
lines.append("")
return "\n".join(lines)
# ===== ์ผ๋ฐ˜ ํฌ๋งท =====
@classmethod
def format_general(cls, data: Dict, language: str = "EN") -> str:
"""์ผ๋ฐ˜ ๋ถ„์„์šฉ ๊ธฐ๋ณธ ํฌ๋งท"""
lines = []
# ์ฃผ์š” ๋‚ด์šฉ์ด ์žˆ์œผ๋ฉด ๊ทธ๋Œ€๋กœ ์ถœ๋ ฅ
main_content = data.get("main_content", data.get("result", ""))
if main_content:
lines.append(main_content)
lines.append("")
# ํ•ต์‹ฌ ํฌ์ธํŠธ
key_points = data.get("key_points", data.get("deliverables", []))
if key_points:
header = "#### ํ•ต์‹ฌ ํฌ์ธํŠธ" if language == "KR" else "#### Key Points"
lines.append(header)
for point in key_points[:5]:
lines.append(f"- {point}")
lines.append("")
return "\n".join(lines)
# ===== ์œ ํ‹ธ๋ฆฌํ‹ฐ =====
@classmethod
def extract_typed_data(cls, element_outputs: Dict, question_type: str) -> Dict:
"""
Element ์ถœ๋ ฅ์—์„œ ์งˆ๋ฌธ ์œ ํ˜•์— ๋งž๋Š” ๊ตฌ์กฐํ™”๋œ ๋ฐ์ดํ„ฐ ์ถ”์ถœ
์‹ค์ œ LLM ์‘๋‹ต์—์„œ ํŒจํ„ด์„ ์ธ์‹ํ•˜์—ฌ ๊ตฌ์กฐํ™”ํ•ฉ๋‹ˆ๋‹ค.
"""
typed_data = {}
fire_data = element_outputs.get("็ซ", {})
water_data = element_outputs.get("ๆฐด", {})
wood_data = element_outputs.get("ๆœจ", {})
metal_data = element_outputs.get("้‡‘", {})
# ๊ณตํ†ต ๋ฐ์ดํ„ฐ
typed_data["main_content"] = fire_data.get("result", "")
typed_data["key_points"] = fire_data.get("deliverables", [])
typed_data["findings"] = water_data.get("findings", [])
typed_data["risks"] = metal_data.get("risks", [])
# ์˜ˆ์ธก ์œ ํ˜• ํŠนํ™”
if question_type == "prediction":
# ์‹œ๋‚˜๋ฆฌ์˜ค ์ถ”์ถœ ์‹œ๋„
result_text = str(fire_data.get("result", ""))
scenarios = cls._extract_scenarios(result_text)
if scenarios:
typed_data["scenarios"] = scenarios
# ๋ณ€์ˆ˜ ์ถ”์ถœ
variables = []
for risk in metal_data.get("risks", []):
variables.append({
"name": str(risk)[:30],
"description": str(risk),
"impact": "high" if any(kw in str(risk).lower() for kw in ["์ค‘์š”", "ํ•ต์‹ฌ", "critical", "major"]) else "medium"
})
typed_data["key_variables"] = variables
# ๋น„๊ต ์œ ํ˜• ํŠนํ™”
elif question_type == "comparison":
# ๋น„๊ต ํ•ญ๋ชฉ ์ถ”์ถœ ์‹œ๋„
typed_data["comparison_items"] = []
typed_data["criteria"] = []
typed_data["recommendation"] = wood_data.get("key_insight", "")
# ์ „๋žต ์œ ํ˜• ํŠนํ™”
elif question_type == "strategy":
# ๋ชฉํ‘œ ์ถ”์ถœ
objectives = []
for idea in wood_data.get("ideas", []):
objectives.append({
"name": str(idea)[:50],
"description": str(idea)
})
typed_data["objectives"] = objectives
typed_data["kpis"] = []
return typed_data
@classmethod
def _extract_scenarios(cls, text: str) -> List[Dict]:
"""ํ…์ŠคํŠธ์—์„œ ์‹œ๋‚˜๋ฆฌ์˜ค ํŒจํ„ด ์ถ”์ถœ"""
scenarios = []
# ์‹œ๋‚˜๋ฆฌ์˜ค ํŒจํ„ด ๋งค์นญ
patterns = [
r'์‹œ๋‚˜๋ฆฌ์˜ค\s*[A-Za-z0-9๊ฐ€-ํžฃ]+[:\s]+(.{10,100})',
r'Scenario\s*[A-Za-z0-9]+[:\s]+(.{10,100})',
r'(\d+)\s*[%]?\s*ํ™•๋ฅ [๋กœ์œผ]?\s+(.{10,80})',
r'(\d+)\s*[%]\s*probability[:\s]+(.{10,80})',
]
for pattern in patterns:
matches = re.findall(pattern, text, re.IGNORECASE)
for match in matches[:4]:
if isinstance(match, tuple):
scenarios.append({
"name": f"Scenario",
"probability": match[0] if match[0].isdigit() else "?",
"description": match[-1] if len(match) > 1 else match[0],
"drivers": "-",
"timeline": "-"
})
else:
scenarios.append({
"name": "Scenario",
"description": match,
"probability": "?",
"drivers": "-",
"timeline": "-"
})
return scenarios[:4]
# ==================== ProfessionalReportGenerator (P0-3) ====================
class ProfessionalReportGenerator:
"""
์ „๋ฌธ ๋ณด๊ณ ์„œ ์ƒ์„ฑ๊ธฐ (ReportGenerator ๋Œ€์ฒด)
๊ธฐ์กด ReportGenerator๋ฅผ ํ™•์žฅํ•˜์—ฌ:
- Executive Summary ์ถ”๊ฐ€
- ์งˆ๋ฌธ ์œ ํ˜•๋ณ„ ๋งž์ถค ํฌ๋งท
- ์ถœ์ฒ˜ ๊ด€๋ฆฌ ํ†ตํ•ฉ
- ํ’ˆ์งˆ ๋Œ€์‹œ๋ณด๋“œ ์ƒ๋‹จ ๋ฐฐ์น˜
"""
def __init__(self, source_manager: Optional[SourceManager] = None):
self.source_manager = source_manager or SourceManager()
def generate(self, state, stats: Dict, source_manager: Optional[SourceManager] = None) -> str:
"""
์ „๋ฌธ ๋ณด๊ณ ์„œ ์ƒ์„ฑ
Args:
state: CycleState ๊ฐ์ฒด
stats: ๋ฉ”๋ชจ๋ฆฌ ํ†ต๊ณ„
source_manager: ์ถœ์ฒ˜ ๊ด€๋ฆฌ์ž (์„ ํƒ)
Returns:
๋งˆํฌ๋‹ค์šด ํ˜•์‹์˜ ์ „๋ฌธ ๋ณด๊ณ ์„œ
"""
if source_manager:
self.source_manager = source_manager
language = getattr(state, 'language', 'EN')
# ๋ฐ์ดํ„ฐ ์ถ”์ถœ
goal = state.goal
score = state.satisfaction_score
iterations = state.iteration
element_outputs = state.element_outputs
question_type = getattr(state, 'goal_clarity', None)
if question_type and hasattr(question_type, 'goal_type'):
question_type = question_type.goal_type
else:
question_type = self._detect_question_type(goal)
# Element ๋ฐ์ดํ„ฐ
earth_data = element_outputs.get("ๅœŸ", {})
metal_data = element_outputs.get("้‡‘", {})
water_data = element_outputs.get("ๆฐด", {})
wood_data = element_outputs.get("ๆœจ", {})
fire_data = element_outputs.get("็ซ", {})
# ๋ฉ”ํƒ€์ธ์ง€ ๋ฐ์ดํ„ฐ
metacog_assessments = getattr(state, 'metacog_assessments', [])
# ๋ณด๊ณ ์„œ ์ƒ์„ฑ
if language == "KR":
return self._generate_korean(
goal, score, iterations, question_type,
earth_data, metal_data, water_data, wood_data, fire_data,
metacog_assessments, state.session_id
)
else:
return self._generate_english(
goal, score, iterations, question_type,
earth_data, metal_data, water_data, wood_data, fire_data,
metacog_assessments, state.session_id
)
def _generate_english(self, goal, score, iterations, question_type,
earth_data, metal_data, water_data, wood_data, fire_data,
metacog_assessments, session_id) -> str:
"""์˜์–ด ๋ณด๊ณ ์„œ ์ƒ์„ฑ"""
sections = []
# ===== 1. HEADER =====
sections.append(self._header_section_en(goal, question_type))
# ===== 2. EXECUTIVE SUMMARY =====
sections.append(self._executive_summary_en(
goal, score, iterations, fire_data, wood_data, metacog_assessments
))
# ===== 3. QUALITY DASHBOARD =====
sections.append(self._quality_dashboard_en(metacog_assessments, score))
# ===== 4. MAIN ANALYSIS RESULT =====
sections.append(self._main_result_en(fire_data, earth_data))
# ===== 5. TYPE-SPECIFIC FORMATTED DATA =====
typed_data = TypedOutputFormatter.extract_typed_data(
{"ๅœŸ": earth_data, "้‡‘": metal_data, "ๆฐด": water_data, "ๆœจ": wood_data, "็ซ": fire_data},
question_type
)
type_section = TypedOutputFormatter.format(question_type, typed_data, "EN")
if type_section.strip():
sections.append(type_section)
# ===== 6. KEY INSIGHTS =====
sections.append(self._insights_section_en(wood_data))
# ===== 7. VERIFIED FACTS & RISKS =====
sections.append(self._facts_risks_section_en(metal_data))
# ===== 8. RESEARCH FINDINGS =====
sections.append(self._findings_section_en(water_data))
# ===== 9. SOURCES & REFERENCES =====
sections.append(self.source_manager.format_references_section("EN"))
# ===== 10. ANALYSIS METADATA =====
sections.append(self._metadata_section_en(session_id, iterations, score))
return "\n".join(filter(None, sections))
def _generate_korean(self, goal, score, iterations, question_type,
earth_data, metal_data, water_data, wood_data, fire_data,
metacog_assessments, session_id) -> str:
"""ํ•œ๊ตญ์–ด ๋ณด๊ณ ์„œ ์ƒ์„ฑ"""
sections = []
# ===== 1. HEADER =====
sections.append(self._header_section_kr(goal, question_type))
# ===== 2. EXECUTIVE SUMMARY =====
sections.append(self._executive_summary_kr(
goal, score, iterations, fire_data, wood_data, metacog_assessments
))
# ===== 3. QUALITY DASHBOARD =====
sections.append(self._quality_dashboard_kr(metacog_assessments, score))
# ===== 4. MAIN ANALYSIS RESULT =====
sections.append(self._main_result_kr(fire_data, earth_data))
# ===== 5. TYPE-SPECIFIC FORMATTED DATA =====
typed_data = TypedOutputFormatter.extract_typed_data(
{"ๅœŸ": earth_data, "้‡‘": metal_data, "ๆฐด": water_data, "ๆœจ": wood_data, "็ซ": fire_data},
question_type
)
type_section = TypedOutputFormatter.format(question_type, typed_data, "KR")
if type_section.strip():
sections.append(type_section)
# ===== 6. KEY INSIGHTS =====
sections.append(self._insights_section_kr(wood_data))
# ===== 7. VERIFIED FACTS & RISKS =====
sections.append(self._facts_risks_section_kr(metal_data))
# ===== 8. RESEARCH FINDINGS =====
sections.append(self._findings_section_kr(water_data))
# ===== 9. SOURCES & REFERENCES =====
sections.append(self.source_manager.format_references_section("KR"))
# ===== 10. ANALYSIS METADATA =====
sections.append(self._metadata_section_kr(session_id, iterations, score))
return "\n".join(filter(None, sections))
# ===== Section Generators (English) =====
def _header_section_en(self, goal: str, question_type: str) -> str:
type_header = TypedOutputFormatter.get_type_header(question_type, "EN")
return f"""# ๐ŸŽฏ Analysis Report
---
## ๐Ÿ“‹ Question
> **{goal}**
{type_header}
---
"""
def _executive_summary_en(self, goal, score, iterations, fire_data, wood_data, metacog_assessments) -> str:
"""Executive Summary ์„น์…˜"""
lines = ["## ๐Ÿ“Š Executive Summary", ""]
# ํ•ต์‹ฌ ๊ฒฐ๋ก  ์ถ”์ถœ (3์ค„)
main_result = fire_data.get("result", "")
key_insight = wood_data.get("key_insight", wood_data.get("reframe", ""))
deliverables = fire_data.get("deliverables", [])
conclusions = []
if main_result:
# ์ฒซ ๋ฌธ์žฅ ๋˜๋Š” ํ•ต์‹ฌ ๋ถ€๋ถ„ ์ถ”์ถœ
first_sentence = main_result.split('.')[0][:150] if main_result else ""
if first_sentence:
conclusions.append(first_sentence)
if key_insight:
conclusions.append(str(key_insight)[:100])
if deliverables and len(conclusions) < 3:
for d in deliverables[:2]:
if len(conclusions) >= 3:
break
conclusions.append(str(d)[:80])
if conclusions:
lines.append("### ๐ŸŽฏ Key Conclusions")
for i, c in enumerate(conclusions[:3], 1):
lines.append(f"{i}. {c}{'...' if len(c) >= 80 else ''}")
lines.append("")
# ์‹ ๋ขฐ๋„ ๋ฐ”
score_bar = 'โ–ˆ' * int(score * 10) + 'โ–‘' * (10 - int(score * 10))
avg_quality = 0
if metacog_assessments:
avg_quality = sum(a.overall_score for a in metacog_assessments) / len(metacog_assessments)
quality_bar = 'โ–ˆ' * int(avg_quality * 10) + 'โ–‘' * (10 - int(avg_quality * 10))
lines.append("### ๐Ÿ“ˆ Analysis Metrics")
lines.append(f"| Metric | Score |")
lines.append(f"|--------|-------|")
lines.append(f"| Confidence | {score_bar} **{score:.0%}** |")
lines.append(f"| Quality | {quality_bar} **{avg_quality:.0%}** |")
lines.append(f"| Iterations | **{iterations}** cycles |")
lines.append("")
return "\n".join(lines)
def _quality_dashboard_en(self, metacog_assessments, score) -> str:
"""ํ’ˆ์งˆ ๋Œ€์‹œ๋ณด๋“œ ์„น์…˜"""
if not metacog_assessments:
return ""
lines = ["## ๐Ÿง  Quality Dashboard", ""]
# ํ‰๊ท  ๊ณ„์‚ฐ
avg_factual = sum(a.factual_confidence for a in metacog_assessments) / len(metacog_assessments)
avg_logic = sum(a.logical_coherence for a in metacog_assessments) / len(metacog_assessments)
avg_complete = sum(a.completeness for a in metacog_assessments) / len(metacog_assessments)
avg_specific = sum(a.specificity for a in metacog_assessments) / len(metacog_assessments)
lines.append("| Dimension | Score | Status |")
lines.append("|-----------|-------|--------|")
for name, val in [
("Factual Grounding", avg_factual),
("Logical Coherence", avg_logic),
("Completeness", avg_complete),
("Specificity", avg_specific)
]:
bar = 'โ–ˆ' * int(val * 10) + 'โ–‘' * (10 - int(val * 10))
status = "โœ…" if val >= 0.7 else "โš ๏ธ" if val >= 0.5 else "โŒ"
lines.append(f"| {name} | {bar} {val:.0%} | {status} |")
lines.append("")
# ๊ฐœ์„  ๊ถŒ์žฅ์‚ฌํ•ญ
all_recommendations = []
for a in metacog_assessments[-3:]: # ์ตœ๊ทผ 3๊ฐœ
all_recommendations.extend(a.recommendations)
if all_recommendations:
unique_recs = list(dict.fromkeys(all_recommendations))[:3]
lines.append("### ๐Ÿ’ก Improvement Suggestions")
for rec in unique_recs:
lines.append(f"- {rec}")
lines.append("")
return "\n".join(lines)
def _main_result_en(self, fire_data, earth_data) -> str:
"""๋ฉ”์ธ ๊ฒฐ๊ณผ ์„น์…˜"""
lines = ["## ๐Ÿ’ก Main Analysis Result", ""]
main_result = fire_data.get("result", "")
if main_result:
lines.append(main_result)
lines.append("")
else:
assessment = earth_data.get("assessment", "")
if assessment:
lines.append(assessment)
lines.append("")
else:
lines.append("*Analysis not completed. Please increase iteration count and try again.*")
lines.append("")
return "\n".join(lines)
def _insights_section_en(self, wood_data) -> str:
"""์ธ์‚ฌ์ดํŠธ ์„น์…˜"""
lines = []
key_insight = wood_data.get("key_insight", wood_data.get("reframe", ""))
top3 = wood_data.get("selected_top3", wood_data.get("ideas", []))
if key_insight or top3:
lines.append("## ๐Ÿ”‘ Key Insights")
lines.append("")
if key_insight:
lines.append(f"> ๐Ÿ’ก **Core Insight**: {key_insight}")
lines.append("")
if top3:
lines.append("### Top Ideas")
for i, idea in enumerate(top3[:3], 1):
lines.append(f"**{i}.** {idea}")
lines.append("")
return "\n".join(lines)
def _facts_risks_section_en(self, metal_data) -> str:
"""๊ฒ€์ฆ๋œ ์‚ฌ์‹ค ๋ฐ ๋ฆฌ์Šคํฌ ์„น์…˜"""
lines = []
verified = metal_data.get("verified_facts", [])
risks = metal_data.get("risks", [])
critiques = metal_data.get("critiques", [])
if verified or risks or critiques:
lines.append("## โš–๏ธ Verification & Risks")
lines.append("")
if verified:
lines.append("### โœ… Verified Facts")
for fact in verified[:5]:
lines.append(f"- {fact}")
lines.append("")
if risks:
lines.append("### โš ๏ธ Identified Risks")
for risk in risks[:4]:
lines.append(f"- ๐Ÿ”ธ {risk}")
lines.append("")
if critiques:
lines.append("### ๐Ÿ” Critical Points")
for critique in critiques[:3]:
lines.append(f"- {critique}")
lines.append("")
return "\n".join(lines)
def _findings_section_en(self, water_data) -> str:
"""์—ฐ๊ตฌ ๋ฐœ๊ฒฌ ์„น์…˜"""
lines = []
findings = water_data.get("findings", [])
key_data = water_data.get("key_data", [])
expert_views = water_data.get("expert_views", [])
if findings or key_data:
lines.append("## ๐Ÿ“Š Research Findings")
lines.append("")
if findings:
lines.append("### Key Findings")
for finding in findings[:5]:
lines.append(f"- {finding}")
lines.append("")
if key_data:
lines.append("### Supporting Data")
for data in key_data[:3]:
lines.append(f"- ๐Ÿ“ˆ {data}")
lines.append("")
if expert_views:
lines.append("### Expert Perspectives")
for view in expert_views[:2]:
lines.append(f"- ๐Ÿ’ฌ {view}")
lines.append("")
return "\n".join(lines)
def _metadata_section_en(self, session_id, iterations, score) -> str:
"""๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์„น์…˜"""
from datetime import datetime
return f"""---
## ๐Ÿ“‹ Analysis Information
| Item | Value |
|------|-------|
| Session ID | `{session_id}` |
| Iterations | {iterations} cycles |
| Final Confidence | {score:.0%} |
| Generated At | {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
| Engine | AETHER Proto-AGI v2.2 |
---
*This report was generated by AETHER Proto-AGI (SOMA Five Elements ยท SLAI Self-Learning ยท MAIA Emergence)*
"""
# ===== Section Generators (Korean) =====
def _header_section_kr(self, goal: str, question_type: str) -> str:
type_header = TypedOutputFormatter.get_type_header(question_type, "KR")
return f"""# ๐ŸŽฏ ๋ถ„์„ ๋ณด๊ณ ์„œ
---
## ๐Ÿ“‹ ์งˆ๋ฌธ
> **{goal}**
{type_header}
---
"""
def _executive_summary_kr(self, goal, score, iterations, fire_data, wood_data, metacog_assessments) -> str:
"""Executive Summary ์„น์…˜ (ํ•œ๊ตญ์–ด)"""
lines = ["## ๐Ÿ“Š ํ•ต์‹ฌ ์š”์•ฝ (Executive Summary)", ""]
# ํ•ต์‹ฌ ๊ฒฐ๋ก  ์ถ”์ถœ (3์ค„)
main_result = fire_data.get("result", "")
key_insight = wood_data.get("key_insight", wood_data.get("reframe", ""))
deliverables = fire_data.get("deliverables", [])
conclusions = []
if main_result:
first_sentence = main_result.split('.')[0][:150] if main_result else ""
if first_sentence:
conclusions.append(first_sentence)
if key_insight:
conclusions.append(str(key_insight)[:100])
if deliverables and len(conclusions) < 3:
for d in deliverables[:2]:
if len(conclusions) >= 3:
break
conclusions.append(str(d)[:80])
if conclusions:
lines.append("### ๐ŸŽฏ ํ•ต์‹ฌ ๊ฒฐ๋ก ")
for i, c in enumerate(conclusions[:3], 1):
lines.append(f"{i}. {c}{'...' if len(c) >= 80 else ''}")
lines.append("")
# ์‹ ๋ขฐ๋„ ๋ฐ”
score_bar = 'โ–ˆ' * int(score * 10) + 'โ–‘' * (10 - int(score * 10))
avg_quality = 0
if metacog_assessments:
avg_quality = sum(a.overall_score for a in metacog_assessments) / len(metacog_assessments)
quality_bar = 'โ–ˆ' * int(avg_quality * 10) + 'โ–‘' * (10 - int(avg_quality * 10))
lines.append("### ๐Ÿ“ˆ ๋ถ„์„ ์ง€ํ‘œ")
lines.append(f"| ์ง€ํ‘œ | ์ ์ˆ˜ |")
lines.append(f"|------|------|")
lines.append(f"| ์‹ ๋ขฐ๋„ | {score_bar} **{score:.0%}** |")
lines.append(f"| ํ’ˆ์งˆ | {quality_bar} **{avg_quality:.0%}** |")
lines.append(f"| ์ˆœํ™˜ | **{iterations}**ํšŒ |")
lines.append("")
return "\n".join(lines)
def _quality_dashboard_kr(self, metacog_assessments, score) -> str:
"""ํ’ˆ์งˆ ๋Œ€์‹œ๋ณด๋“œ ์„น์…˜ (ํ•œ๊ตญ์–ด)"""
if not metacog_assessments:
return ""
lines = ["## ๐Ÿง  ํ’ˆ์งˆ ๋Œ€์‹œ๋ณด๋“œ", ""]
avg_factual = sum(a.factual_confidence for a in metacog_assessments) / len(metacog_assessments)
avg_logic = sum(a.logical_coherence for a in metacog_assessments) / len(metacog_assessments)
avg_complete = sum(a.completeness for a in metacog_assessments) / len(metacog_assessments)
avg_specific = sum(a.specificity for a in metacog_assessments) / len(metacog_assessments)
lines.append("| ์ฐจ์› | ์ ์ˆ˜ | ์ƒํƒœ |")
lines.append("|------|------|------|")
for name, val in [
("์‚ฌ์‹ค ๊ทผ๊ฑฐ", avg_factual),
("๋…ผ๋ฆฌ์  ์ผ๊ด€์„ฑ", avg_logic),
("์™„์„ฑ๋„", avg_complete),
("๊ตฌ์ฒด์„ฑ", avg_specific)
]:
bar = 'โ–ˆ' * int(val * 10) + 'โ–‘' * (10 - int(val * 10))
status = "โœ…" if val >= 0.7 else "โš ๏ธ" if val >= 0.5 else "โŒ"
lines.append(f"| {name} | {bar} {val:.0%} | {status} |")
lines.append("")
# ๊ฐœ์„  ๊ถŒ์žฅ์‚ฌํ•ญ
all_recommendations = []
for a in metacog_assessments[-3:]:
all_recommendations.extend(a.recommendations)
if all_recommendations:
unique_recs = list(dict.fromkeys(all_recommendations))[:3]
lines.append("### ๐Ÿ’ก ๊ฐœ์„  ๊ถŒ์žฅ์‚ฌํ•ญ")
for rec in unique_recs:
lines.append(f"- {rec}")
lines.append("")
return "\n".join(lines)
def _main_result_kr(self, fire_data, earth_data) -> str:
"""๋ฉ”์ธ ๊ฒฐ๊ณผ ์„น์…˜ (ํ•œ๊ตญ์–ด)"""
lines = ["## ๐Ÿ’ก ํ•ต์‹ฌ ๋ถ„์„ ๊ฒฐ๊ณผ", ""]
main_result = fire_data.get("result", "")
if main_result:
lines.append(main_result)
lines.append("")
else:
assessment = earth_data.get("assessment", "")
if assessment:
lines.append(assessment)
lines.append("")
else:
lines.append("*๋ถ„์„์ด ์™„๋ฃŒ๋˜์ง€ ์•Š์•˜์Šต๋‹ˆ๋‹ค. ์ˆœํ™˜ ํšŸ์ˆ˜๋ฅผ ๋Š˜๋ ค ๋‹ค์‹œ ์‹œ๋„ํ•ด์ฃผ์„ธ์š”.*")
lines.append("")
return "\n".join(lines)
def _insights_section_kr(self, wood_data) -> str:
"""์ธ์‚ฌ์ดํŠธ ์„น์…˜ (ํ•œ๊ตญ์–ด)"""
lines = []
key_insight = wood_data.get("key_insight", wood_data.get("reframe", ""))
top3 = wood_data.get("selected_top3", wood_data.get("ideas", []))
if key_insight or top3:
lines.append("## ๐Ÿ”‘ ํ•ต์‹ฌ ์ธ์‚ฌ์ดํŠธ")
lines.append("")
if key_insight:
lines.append(f"> ๐Ÿ’ก **ํ•ต์‹ฌ ํ†ต์ฐฐ**: {key_insight}")
lines.append("")
if top3:
lines.append("### ์ฃผ์š” ์•„์ด๋””์–ด")
for i, idea in enumerate(top3[:3], 1):
lines.append(f"**{i}.** {idea}")
lines.append("")
return "\n".join(lines)
def _facts_risks_section_kr(self, metal_data) -> str:
"""๊ฒ€์ฆ๋œ ์‚ฌ์‹ค ๋ฐ ๋ฆฌ์Šคํฌ ์„น์…˜ (ํ•œ๊ตญ์–ด)"""
lines = []
verified = metal_data.get("verified_facts", [])
risks = metal_data.get("risks", [])
critiques = metal_data.get("critiques", [])
if verified or risks or critiques:
lines.append("## โš–๏ธ ๊ฒ€์ฆ ๋ฐ ๋ฆฌ์Šคํฌ")
lines.append("")
if verified:
lines.append("### โœ… ๊ฒ€์ฆ๋œ ์‚ฌ์‹ค")
for fact in verified[:5]:
lines.append(f"- {fact}")
lines.append("")
if risks:
lines.append("### โš ๏ธ ์‹๋ณ„๋œ ๋ฆฌ์Šคํฌ")
for risk in risks[:4]:
lines.append(f"- ๐Ÿ”ธ {risk}")
lines.append("")
if critiques:
lines.append("### ๐Ÿ” ๋น„ํŒ์  ๊ฒ€ํ† ")
for critique in critiques[:3]:
lines.append(f"- {critique}")
lines.append("")
return "\n".join(lines)
def _findings_section_kr(self, water_data) -> str:
"""์—ฐ๊ตฌ ๋ฐœ๊ฒฌ ์„น์…˜ (ํ•œ๊ตญ์–ด)"""
lines = []
findings = water_data.get("findings", [])
key_data = water_data.get("key_data", [])
expert_views = water_data.get("expert_views", [])
if findings or key_data:
lines.append("## ๐Ÿ“Š ์—ฐ๊ตฌ ๋ฐœ๊ฒฌ")
lines.append("")
if findings:
lines.append("### ์ฃผ์š” ๋ฐœ๊ฒฌ")
for finding in findings[:5]:
lines.append(f"- {finding}")
lines.append("")
if key_data:
lines.append("### ๊ทผ๊ฑฐ ๋ฐ์ดํ„ฐ")
for data in key_data[:3]:
lines.append(f"- ๐Ÿ“ˆ {data}")
lines.append("")
if expert_views:
lines.append("### ์ „๋ฌธ๊ฐ€ ์˜๊ฒฌ")
for view in expert_views[:2]:
lines.append(f"- ๐Ÿ’ฌ {view}")
lines.append("")
return "\n".join(lines)
def _metadata_section_kr(self, session_id, iterations, score) -> str:
"""๋ฉ”ํƒ€๋ฐ์ดํ„ฐ ์„น์…˜ (ํ•œ๊ตญ์–ด)"""
from datetime import datetime
return f"""---
## ๐Ÿ“‹ ๋ถ„์„ ์ •๋ณด
| ํ•ญ๋ชฉ | ๊ฐ’ |
|------|-----|
| ์„ธ์…˜ ID | `{session_id}` |
| ๋ถ„์„ ์ˆœํ™˜ | {iterations}ํšŒ |
| ์ตœ์ข… ์‹ ๋ขฐ๋„ | {score:.0%} |
| ์ƒ์„ฑ ์‹œ๊ฐ„ | {datetime.now().strftime("%Y๋…„ %m์›” %d์ผ %H:%M:%S")} |
| ์—”์ง„ | AETHER Proto-AGI v2.2 |
---
*์ด ๋ณด๊ณ ์„œ๋Š” AETHER Proto-AGI (SOMA ์˜คํ–‰ ์ˆœํ™˜ ยท SLAI ์ž๊ธฐํ•™์Šต ยท MAIA ์ฐฝ๋ฐœ)์— ์˜ํ•ด ์ž๋™ ์ƒ์„ฑ๋˜์—ˆ์Šต๋‹ˆ๋‹ค.*
"""
# ===== Utility Methods =====
def _detect_question_type(self, goal: str) -> str:
"""์งˆ๋ฌธ ์œ ํ˜• ๊ฐ์ง€"""
goal_lower = goal.lower()
if any(kw in goal_lower for kw in ['์˜ˆ์ธก', '์ „๋ง', '๋ ๊นŒ', '๋  ๊ฒƒ', '๋ฏธ๋ž˜', '2025', '2026', '2027', '2028', 'predict', 'forecast']):
return "prediction"
elif any(kw in goal_lower for kw in ['์ „๋žต', '๋ฐฉ๋ฒ•', '์–ด๋–ป๊ฒŒ', '๋ฐฉ์•ˆ', '๊ณ„ํš', '์ˆ˜๋ฆฝ', 'strategy', 'how to', 'plan']):
return "strategy"
elif any(kw in goal_lower for kw in ['๋น„๊ต', '์ฐจ์ด', 'vs', 'VS', '๋Œ€', '์–ด๋А ๊ฒƒ', 'compare', 'versus', 'difference']):
return "comparison"
elif any(kw in goal_lower for kw in ['๋ถ„์„', '์™œ', '์›์ธ', '์˜ํ–ฅ', 'ํ˜„ํ™ฉ', 'analyze', 'why', 'cause', 'impact']):
return "analysis"
elif any(kw in goal_lower for kw in ['ํŠนํ—ˆ', '๋ฐœ๋ช…', '์•„์ด๋””์–ด', 'ํ˜์‹ ', 'patent', 'invention', 'innovate']):
return "invention"
elif any(kw in goal_lower for kw in ['์†Œ์„ค', '์Šคํ† ๋ฆฌ', '์‹œ๋‚˜๋ฆฌ์˜ค', '์›นํˆฐ', 'story', 'novel', 'script']):
return "story"
elif any(kw in goal_lower for kw in ['์š”๋ฆฌ', '๋ ˆ์‹œํ”ผ', '์Œ์‹', 'recipe', 'cook', 'dish']):
return "recipe"
else:
return "general"
@staticmethod
def generate_progress(state) -> str:
"""์ง„ํ–‰ ์ค‘ ๋ณด๊ณ ์„œ (๊ธฐ์กด ํ˜ธํ™˜)"""
progress_bar = 'โ–ˆ' * int(state.satisfaction_score * 10) + 'โ–‘' * (10 - int(state.satisfaction_score * 10))
current_element = ""
if state.history:
last = state.history[-1]
elem_name = last.get("element", "")
elem_map = {
"ๅœŸ": "๐ŸŸค ๅœŸ ๊ฐ๋…", "้‡‘": "โšช ้‡‘ ๋น„ํ‰", "ๆฐด": "๐Ÿ”ต ๆฐด ๋ฆฌ์„œ์น˜",
"ๆœจ": "๐ŸŸข ๆœจ ์ฐฝ๋ฐœ", "็ซ": "๐Ÿ”ด ็ซ ์‹คํ–‰"
}
current_element = elem_map.get(elem_name, elem_name)
language = getattr(state, 'language', 'EN')
if language == "KR":
return f"""# โณ ๋ถ„์„ ์ง„ํ–‰ ์ค‘...
## ๐Ÿ“‹ ์งˆ๋ฌธ
> **{state.goal}**
## ๐Ÿ“Š ํ˜„์žฌ ์ƒํƒœ
| ํ•ญ๋ชฉ | ์ƒํƒœ |
|------|------|
| ์ง„ํ–‰ ์ˆœํ™˜ | {state.iteration}ํšŒ |
| ํ˜„์žฌ ๋‹จ๊ณ„ | {current_element} |
| ์ง„ํ–‰๋„ | {progress_bar} {state.satisfaction_score:.0%} |
---
๐Ÿ’ก **์˜ค์ผ€์ŠคํŠธ๋ ˆ์ด์…˜ ๋กœ๊ทธ**๋ฅผ ํŽผ์ณ์„œ ์‹ค์‹œ๊ฐ„ ๋ถ„์„ ๊ณผ์ •์„ ํ™•์ธํ•˜์„ธ์š”.
*ๅœŸ(๊ฐ๋…) โ†’ ้‡‘(๋น„ํ‰) โ†’ ๆฐด(๋ฆฌ์„œ์น˜) โ†’ ๆœจ(์ฐฝ๋ฐœ) โ†’ ็ซ(์‹คํ–‰) ์ˆœํ™˜ ์ค‘...*
"""
else:
return f"""# โณ Analysis in Progress...
## ๐Ÿ“‹ Question
> **{state.goal}**
## ๐Ÿ“Š Current Status
| Item | Status |
|------|--------|
| Iteration | {state.iteration} |
| Current Stage | {current_element} |
| Progress | {progress_bar} {state.satisfaction_score:.0%} |
---
๐Ÿ’ก Expand **Orchestration Log** to see real-time analysis progress.
*Earth โ†’ Metal โ†’ Water โ†’ Wood โ†’ Fire cycle in progress...*
"""
# ==================== ๊ธฐ์กด ReportGenerator์™€์˜ ํ˜ธํ™˜์„ฑ ====================
# ๊ธฐ์กด ์ฝ”๋“œ์—์„œ ReportGenerator๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ๋ถ€๋ถ„์„ ์œ„ํ•œ ๋ž˜ํผ
class ReportGenerator:
"""
๊ธฐ์กด ํ˜ธํ™˜์„ฑ์„ ์œ„ํ•œ ๋ž˜ํผ ํด๋ž˜์Šค
ProfessionalReportGenerator๋กœ ์œ„์ž„ํ•ฉ๋‹ˆ๋‹ค.
"""
_instance = None
_professional_generator = None
@classmethod
def _get_generator(cls):
if cls._professional_generator is None:
cls._professional_generator = ProfessionalReportGenerator()
return cls._professional_generator
@staticmethod
def generate(state, stats: Dict) -> str:
"""๊ธฐ์กด ํ˜ธ์ถœ ๋ฐฉ์‹ ํ˜ธํ™˜"""
generator = ReportGenerator._get_generator()
return generator.generate(state, stats)
@staticmethod
def generate_progress(state) -> str:
"""์ง„ํ–‰ ์ค‘ ๋ณด๊ณ ์„œ (๊ธฐ์กด ํ˜ธํ™˜)"""
return ProfessionalReportGenerator.generate_progress(state)
# ==================== ์‚ฌ์šฉ ์˜ˆ์‹œ ====================
if __name__ == "__main__":
print("=" * 60)
print("AETHER Proto-AGI v2.2 - Report Enhancement Module")
print("=" * 60)
print()
print("์ด ๋ชจ๋“ˆ์˜ ํด๋ž˜์Šค๋“ค์„ core.py์— ํ†ตํ•ฉํ•˜์„ธ์š”:")
print()
print("1. SourceManager - ์ถœ์ฒ˜/์ธ์šฉ ๊ด€๋ฆฌ")
print(" - add_web_source(), add_knowledge_source()")
print(" - format_references_section()")
print()
print("2. TypedOutputFormatter - ์งˆ๋ฌธ ์œ ํ˜•๋ณ„ ํฌ๋งท")
print(" - format_prediction(), format_comparison(), etc.")
print()
print("3. ProfessionalReportGenerator - ์ „๋ฌธ ๋ณด๊ณ ์„œ ์ƒ์„ฑ")
print(" - Executive Summary")
print(" - Quality Dashboard")
print(" - Type-specific formatting")
print(" - Source references")
print()
print("=" * 60)
# ํ…Œ์ŠคํŠธ ๋ฐ์ดํ„ฐ
print("\n[TypedOutputFormatter ํ…Œ์ŠคํŠธ]")
test_data = {
"scenarios": [
{"name": "Scenario A", "probability": "60%", "drivers": "Strong growth", "timeline": "2025 Q2"},
{"name": "Scenario B", "probability": "30%", "drivers": "Moderate", "timeline": "2025 Q4"},
],
"key_variables": [
{"name": "Interest Rate", "description": "Fed policy changes", "impact": "high"},
{"name": "Tech Innovation", "description": "AI disruption", "impact": "medium"},
]
}
print(TypedOutputFormatter.format_prediction(test_data, "EN"))
print("\n[SourceManager ํ…Œ์ŠคํŠธ]")
sm = SourceManager()
sm.add_web_source("Example Article", "https://example.com/article", "This is a test", "ๆฐด")
sm.add_web_source("Another Source", "https://test.com/page", "More content here", "้‡‘")
print(sm.format_references_section("EN"))