| """ |
| AETHER Proto-AGI v2.2 - Report Enhancement Module |
| ์ถ์ฒ ๊ด๋ฆฌ, ์ ํ๋ณ ํฌ๋งทํฐ, ์ ๋ฌธ ๋ณด๊ณ ์ ์์ฑ๊ธฐ |
| |
| ์ด ํ์ผ์ ํด๋์ค๋ค์ core.py์ ํตํฉํ์ธ์. |
| ๊ธฐ์กด ReportGenerator ํด๋์ค๋ฅผ ์๋ ProfessionalReportGenerator๋ก ๊ต์ฒดํ์ธ์. |
| """ |
|
|
| import json |
| import re |
| from datetime import datetime |
| from dataclasses import dataclass, field |
| from typing import Optional, List, Dict, Any, Tuple |
| from enum import Enum |
|
|
|
|
| |
|
|
| @dataclass |
| class WebSource: |
| """์น ๊ฒ์ ์ถ์ฒ""" |
| title: str |
| url: str |
| snippet: str |
| element: str |
| timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) |
| relevance_score: float = 0.0 |
|
|
| @dataclass |
| class KnowledgeSource: |
| """์ง์ DB ์ถ์ฒ""" |
| knowledge_id: str |
| goal: str |
| quality_score: float |
| element: str |
| created_at: str |
| snippet: str |
|
|
| @dataclass |
| class CrawlSource: |
| """URL ํฌ๋กค๋ง ์ถ์ฒ""" |
| url: str |
| title: str |
| content_preview: str |
| crawled_at: str |
| success: bool |
|
|
|
|
| class SourceManager: |
| """ |
| ์ถ์ฒ/์ธ์ฉ ํตํฉ ๊ด๋ฆฌ ์์คํ
|
| |
| ๋ชจ๋ ๋ถ์ ๊ณผ์ ์์ ์ฌ์ฉ๋ ์ถ์ฒ๋ฅผ ์ถ์ ํ๊ณ |
| ์ต์ข
๋ณด๊ณ ์์ ์ฒด๊ณ์ ์ผ๋ก ํ์ํฉ๋๋ค. |
| """ |
| |
| def __init__(self): |
| self.web_sources: List[WebSource] = [] |
| self.knowledge_sources: List[KnowledgeSource] = [] |
| self.crawl_sources: List[CrawlSource] = [] |
| self._citation_counter = 0 |
| self._citation_map: Dict[str, int] = {} |
| |
| def reset(self): |
| """์ ๋ถ์ ์ธ์
์์ ์ ์ด๊ธฐํ""" |
| self.web_sources = [] |
| self.knowledge_sources = [] |
| self.crawl_sources = [] |
| self._citation_counter = 0 |
| self._citation_map = {} |
| |
| |
| |
| def add_web_source(self, title: str, url: str, snippet: str, |
| element: str, relevance_score: float = 0.0) -> int: |
| """์น ๊ฒ์ ๊ฒฐ๊ณผ ์ถ์ฒ ์ถ๊ฐ""" |
| if url in self._citation_map: |
| return self._citation_map[url] |
| |
| self._citation_counter += 1 |
| citation_num = self._citation_counter |
| self._citation_map[url] = citation_num |
| |
| source = WebSource( |
| title=title[:100] if title else "Unknown", |
| url=url, |
| snippet=snippet[:300] if snippet else "", |
| element=element, |
| relevance_score=relevance_score |
| ) |
| self.web_sources.append(source) |
| return citation_num |
| |
| def add_knowledge_source(self, knowledge_id: str, goal: str, |
| quality_score: float, element: str, |
| created_at: str, snippet: str) -> int: |
| """์ง์ DB ์ถ์ฒ ์ถ๊ฐ""" |
| if knowledge_id in self._citation_map: |
| return self._citation_map[knowledge_id] |
| |
| self._citation_counter += 1 |
| citation_num = self._citation_counter |
| self._citation_map[knowledge_id] = citation_num |
| |
| source = KnowledgeSource( |
| knowledge_id=knowledge_id, |
| goal=goal[:100] if goal else "", |
| quality_score=quality_score, |
| element=element, |
| created_at=created_at, |
| snippet=snippet[:200] if snippet else "" |
| ) |
| self.knowledge_sources.append(source) |
| return citation_num |
| |
| def add_crawl_source(self, url: str, title: str, |
| content_preview: str, crawled_at: str, |
| success: bool = True) -> int: |
| """ํฌ๋กค๋ง ์ถ์ฒ ์ถ๊ฐ""" |
| if url in self._citation_map: |
| return self._citation_map[url] |
| |
| self._citation_counter += 1 |
| citation_num = self._citation_counter |
| self._citation_map[url] = citation_num |
| |
| source = CrawlSource( |
| url=url, |
| title=title[:100] if title else url[:50], |
| content_preview=content_preview[:200] if content_preview else "", |
| crawled_at=crawled_at, |
| success=success |
| ) |
| self.crawl_sources.append(source) |
| return citation_num |
| |
| def add_from_search_result(self, search_result: Dict, element: str): |
| """Brave ๊ฒ์ ๊ฒฐ๊ณผ์์ ์ถ์ฒ ์ผ๊ด ์ถ๊ฐ""" |
| if not search_result or not search_result.get("success"): |
| return |
| |
| for result in search_result.get("results", []): |
| self.add_web_source( |
| title=result.get("title", ""), |
| url=result.get("url", ""), |
| snippet=result.get("description", ""), |
| element=element |
| ) |
| |
| |
| |
| def get_citation_number(self, url_or_id: str) -> Optional[int]: |
| """URL ๋๋ ID๋ก ์ธ์ฉ ๋ฒํธ ์กฐํ""" |
| return self._citation_map.get(url_or_id) |
| |
| def get_total_sources(self) -> Dict[str, int]: |
| """์ถ์ฒ ์ ํ๋ณ ๊ฐ์""" |
| return { |
| "web": len(self.web_sources), |
| "knowledge": len(self.knowledge_sources), |
| "crawl": len(self.crawl_sources), |
| "total": len(self.web_sources) + len(self.knowledge_sources) + len(self.crawl_sources) |
| } |
| |
| def get_sources_by_element(self) -> Dict[str, List]: |
| """์คํ ๋จ๊ณ๋ณ ์ฌ์ฉ๋ ์ถ์ฒ""" |
| by_element = {"ๅ": [], "้": [], "ๆฐด": [], "ๆจ": [], "็ซ": []} |
| |
| for src in self.web_sources: |
| if src.element in by_element: |
| by_element[src.element].append({ |
| "type": "web", |
| "title": src.title, |
| "url": src.url |
| }) |
| |
| for src in self.knowledge_sources: |
| if src.element in by_element: |
| by_element[src.element].append({ |
| "type": "knowledge", |
| "id": src.knowledge_id, |
| "quality": src.quality_score |
| }) |
| |
| return by_element |
| |
| |
| |
| def format_references_section(self, language: str = "EN") -> str: |
| """์ต์ข
๋ณด๊ณ ์์ฉ ์ถ์ฒ ์น์
์์ฑ""" |
| if language == "KR": |
| return self._format_references_kr() |
| return self._format_references_en() |
| |
| def _format_references_en(self) -> str: |
| """์์ด ์ถ์ฒ ์น์
""" |
| lines = ["## ๐ Sources & References", ""] |
| |
| total = self.get_total_sources() |
| if total["total"] == 0: |
| lines.append("*No external sources used in this analysis.*") |
| return "\n".join(lines) |
| |
| |
| lines.append(f"**Total Sources**: {total['total']} " |
| f"(Web: {total['web']}, Knowledge DB: {total['knowledge']}, Crawl: {total['crawl']})") |
| lines.append("") |
| |
| |
| if self.web_sources: |
| lines.append("### ๐ Web Sources") |
| lines.append("| # | Title | Source | Element |") |
| lines.append("|---|-------|--------|---------|") |
| for i, src in enumerate(self.web_sources[:10], 1): |
| domain = self._extract_domain(src.url) |
| lines.append(f"| [{i}] | {src.title[:40]}{'...' if len(src.title) > 40 else ''} | [{domain}]({src.url}) | {src.element} |") |
| if len(self.web_sources) > 10: |
| lines.append(f"| ... | *+{len(self.web_sources) - 10} more sources* | | |") |
| lines.append("") |
| |
| |
| if self.knowledge_sources: |
| lines.append("### ๐ง Knowledge Database") |
| lines.append("| # | Related Goal | Quality | Element |") |
| lines.append("|---|--------------|---------|---------|") |
| for i, src in enumerate(self.knowledge_sources[:5], 1): |
| quality_bar = 'โ' * int(src.quality_score * 5) + 'โ' * (5 - int(src.quality_score * 5)) |
| lines.append(f"| [K{i}] | {src.goal[:35]}{'...' if len(src.goal) > 35 else ''} | {quality_bar} {src.quality_score:.0%} | {src.element} |") |
| lines.append("") |
| |
| |
| if self.crawl_sources: |
| lines.append("### ๐ Crawled Pages") |
| for i, src in enumerate(self.crawl_sources[:5], 1): |
| status = "โ
" if src.success else "โ ๏ธ" |
| lines.append(f"- {status} [{src.title[:50]}]({src.url})") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _format_references_kr(self) -> str: |
| """ํ๊ตญ์ด ์ถ์ฒ ์น์
""" |
| lines = ["## ๐ ์ถ์ฒ ๋ฐ ์ฐธ๊ณ ์๋ฃ", ""] |
| |
| total = self.get_total_sources() |
| if total["total"] == 0: |
| lines.append("*์ด ๋ถ์์ ์ฌ์ฉ๋ ์ธ๋ถ ์ถ์ฒ๊ฐ ์์ต๋๋ค.*") |
| return "\n".join(lines) |
| |
| |
| lines.append(f"**์ด ์ถ์ฒ**: {total['total']}๊ฑด " |
| f"(์น: {total['web']}, ์ง์DB: {total['knowledge']}, ํฌ๋กค๋ง: {total['crawl']})") |
| lines.append("") |
| |
| |
| if self.web_sources: |
| lines.append("### ๐ ์น ๊ฒ์ ์ถ์ฒ") |
| lines.append("| # | ์ ๋ชฉ | ์ถ์ฒ | ๋จ๊ณ |") |
| lines.append("|---|------|------|------|") |
| for i, src in enumerate(self.web_sources[:10], 1): |
| domain = self._extract_domain(src.url) |
| element_name = {"ๅ": "๊ฐ๋
", "้": "๋นํ", "ๆฐด": "๋ฆฌ์์น", "ๆจ": "์ฐฝ๋ฐ", "็ซ": "์คํ"}.get(src.element, src.element) |
| lines.append(f"| [{i}] | {src.title[:40]}{'...' if len(src.title) > 40 else ''} | [{domain}]({src.url}) | {src.element}({element_name}) |") |
| if len(self.web_sources) > 10: |
| lines.append(f"| ... | *+{len(self.web_sources) - 10}๊ฑด ์ถ๊ฐ* | | |") |
| lines.append("") |
| |
| |
| if self.knowledge_sources: |
| lines.append("### ๐ง ์ง์ ๋ฐ์ดํฐ๋ฒ ์ด์ค") |
| lines.append("| # | ๊ด๋ จ ๋ชฉํ | ํ์ง | ๋จ๊ณ |") |
| lines.append("|---|----------|------|------|") |
| for i, src in enumerate(self.knowledge_sources[:5], 1): |
| quality_bar = 'โ' * int(src.quality_score * 5) + 'โ' * (5 - int(src.quality_score * 5)) |
| lines.append(f"| [K{i}] | {src.goal[:35]}{'...' if len(src.goal) > 35 else ''} | {quality_bar} {src.quality_score:.0%} | {src.element} |") |
| lines.append("") |
| |
| |
| if self.crawl_sources: |
| lines.append("### ๐ ํฌ๋กค๋ง ํ์ด์ง") |
| for i, src in enumerate(self.crawl_sources[:5], 1): |
| status = "โ
" if src.success else "โ ๏ธ" |
| lines.append(f"- {status} [{src.title[:50]}]({src.url})") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def format_inline_citation(self, url_or_id: str) -> str: |
| """๋ณธ๋ฌธ ๋ด ์ธ๋ผ์ธ ์ธ์ฉ ํ์ ์์ฑ [1], [K2] ๋ฑ""" |
| num = self._citation_map.get(url_or_id) |
| if num is None: |
| return "" |
| |
| |
| for src in self.knowledge_sources: |
| if src.knowledge_id == url_or_id: |
| return f"[K{num}]" |
| |
| return f"[{num}]" |
| |
| def _extract_domain(self, url: str) -> str: |
| """URL์์ ๋๋ฉ์ธ ์ถ์ถ""" |
| try: |
| from urllib.parse import urlparse |
| parsed = urlparse(url) |
| domain = parsed.netloc |
| |
| if domain.startswith("www."): |
| domain = domain[4:] |
| return domain[:30] |
| except: |
| return url[:30] |
| |
| def to_dict(self) -> Dict: |
| """์ง๋ ฌํ์ฉ ๋์
๋๋ฆฌ ๋ณํ""" |
| return { |
| "web_sources": [ |
| {"title": s.title, "url": s.url, "element": s.element, "snippet": s.snippet} |
| for s in self.web_sources |
| ], |
| "knowledge_sources": [ |
| {"id": s.knowledge_id, "goal": s.goal, "quality": s.quality_score, "element": s.element} |
| for s in self.knowledge_sources |
| ], |
| "crawl_sources": [ |
| {"url": s.url, "title": s.title, "success": s.success} |
| for s in self.crawl_sources |
| ], |
| "total": self.get_total_sources() |
| } |
|
|
|
|
| |
|
|
| class TypedOutputFormatter: |
| """ |
| ์ง๋ฌธ ์ ํ๋ณ ๋ง์ถค ์ถ๋ ฅ ํฌ๋งทํฐ |
| |
| ์์ธก, ๋น๊ต, ์ ๋ต, ๋ถ์, ๋ฐ๋ช
, ์คํ ๋ฆฌ, ๋ ์ํผ ๋ฑ |
| ๊ฐ ์ ํ์ ์ต์ ํ๋ ์ถ๋ ฅ ๊ตฌ์กฐ๋ฅผ ์ ๊ณตํฉ๋๋ค. |
| """ |
| |
| |
| TYPE_ICONS = { |
| "prediction": "๐ฎ", |
| "comparison": "โ๏ธ", |
| "strategy": "๐ฏ", |
| "analysis": "๐ฌ", |
| "invention": "๐ก", |
| "story": "๐", |
| "recipe": "๐ณ", |
| "general": "๐" |
| } |
| |
| |
| TYPE_NAMES_KR = { |
| "prediction": "์์ธก ๋ถ์", |
| "comparison": "๋น๊ต ๋ถ์", |
| "strategy": "์ ๋ต ์๋ฆฝ", |
| "analysis": "์ฌ์ธต ๋ถ์", |
| "invention": "๋ฐ๋ช
/ํนํ", |
| "story": "์คํ ๋ฆฌ/์ฐฝ์", |
| "recipe": "๋ ์ํผ/์๋ฆฌ", |
| "general": "์ผ๋ฐ ๋ถ์" |
| } |
| |
| TYPE_NAMES_EN = { |
| "prediction": "Prediction Analysis", |
| "comparison": "Comparative Analysis", |
| "strategy": "Strategic Planning", |
| "analysis": "In-depth Analysis", |
| "invention": "Invention/Patent", |
| "story": "Story/Creative", |
| "recipe": "Recipe/Cooking", |
| "general": "General Analysis" |
| } |
| |
| @classmethod |
| def format(cls, question_type: str, data: Dict, language: str = "EN") -> str: |
| """์ง๋ฌธ ์ ํ์ ๋ง๋ ํฌ๋งท ์ ์ฉ""" |
| formatters = { |
| "prediction": cls.format_prediction, |
| "comparison": cls.format_comparison, |
| "strategy": cls.format_strategy, |
| "analysis": cls.format_analysis, |
| "invention": cls.format_invention, |
| "story": cls.format_story, |
| "recipe": cls.format_recipe, |
| "general": cls.format_general |
| } |
| |
| formatter = formatters.get(question_type, cls.format_general) |
| return formatter(data, language) |
| |
| @classmethod |
| def get_type_header(cls, question_type: str, language: str = "EN") -> str: |
| """์ง๋ฌธ ์ ํ ํค๋ ์์ฑ""" |
| icon = cls.TYPE_ICONS.get(question_type, "๐") |
| if language == "KR": |
| name = cls.TYPE_NAMES_KR.get(question_type, "์ผ๋ฐ ๋ถ์") |
| return f"{icon} **๋ถ์ ์ ํ**: {name}" |
| else: |
| name = cls.TYPE_NAMES_EN.get(question_type, "General Analysis") |
| return f"{icon} **Analysis Type**: {name}" |
| |
| |
| |
| @classmethod |
| def format_prediction(cls, data: Dict, language: str = "EN") -> str: |
| """์์ธก ๋ถ์์ฉ ํฌ๋งท - ์๋๋ฆฌ์ค, ํ๋ฅ , ๋ณ์""" |
| if language == "KR": |
| return cls._format_prediction_kr(data) |
| return cls._format_prediction_en(data) |
| |
| @classmethod |
| def _format_prediction_en(cls, data: Dict) -> str: |
| lines = ["### ๐ฎ Prediction Framework", ""] |
| |
| |
| scenarios = data.get("scenarios", []) |
| if scenarios: |
| lines.append("#### Scenario Analysis") |
| lines.append("| Scenario | Probability | Key Drivers | Timeline |") |
| lines.append("|----------|-------------|-------------|----------|") |
| for i, s in enumerate(scenarios[:4], 1): |
| prob = s.get("probability", "?") |
| drivers = s.get("drivers", "-")[:30] |
| timeline = s.get("timeline", "-") |
| lines.append(f"| **{s.get('name', f'Scenario {i}')}** | {prob} | {drivers} | {timeline} |") |
| lines.append("") |
| |
| |
| variables = data.get("key_variables", []) |
| if variables: |
| lines.append("#### Key Variables & Uncertainties") |
| for v in variables[:5]: |
| impact = v.get("impact", "medium") |
| icon = "๐ด" if impact == "high" else "๐ก" if impact == "medium" else "๐ข" |
| lines.append(f"- {icon} **{v.get('name', 'Variable')}**: {v.get('description', '')[:60]}") |
| lines.append("") |
| |
| |
| confidence = data.get("prediction_confidence", 0) |
| if confidence: |
| bar = 'โ' * int(confidence * 10) + 'โ' * (10 - int(confidence * 10)) |
| lines.append(f"**Prediction Confidence**: {bar} {confidence:.0%}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| @classmethod |
| def _format_prediction_kr(cls, data: Dict) -> str: |
| lines = ["### ๐ฎ ์์ธก ํ๋ ์์ํฌ", ""] |
| |
| |
| scenarios = data.get("scenarios", []) |
| if scenarios: |
| lines.append("#### ์๋๋ฆฌ์ค ๋ถ์") |
| lines.append("| ์๋๋ฆฌ์ค | ํ๋ฅ | ํต์ฌ ๋์ธ | ์์ |") |
| lines.append("|----------|------|----------|------|") |
| for i, s in enumerate(scenarios[:4], 1): |
| prob = s.get("probability", "?") |
| drivers = s.get("drivers", "-")[:30] |
| timeline = s.get("timeline", "-") |
| lines.append(f"| **{s.get('name', f'์๋๋ฆฌ์ค {i}')}** | {prob} | {drivers} | {timeline} |") |
| lines.append("") |
| |
| |
| variables = data.get("key_variables", []) |
| if variables: |
| lines.append("#### ํต์ฌ ๋ณ์ ๋ฐ ๋ถํ์ค์ฑ") |
| for v in variables[:5]: |
| impact = v.get("impact", "medium") |
| icon = "๐ด" if impact == "high" else "๐ก" if impact == "medium" else "๐ข" |
| lines.append(f"- {icon} **{v.get('name', '๋ณ์')}**: {v.get('description', '')[:60]}") |
| lines.append("") |
| |
| |
| confidence = data.get("prediction_confidence", 0) |
| if confidence: |
| bar = 'โ' * int(confidence * 10) + 'โ' * (10 - int(confidence * 10)) |
| lines.append(f"**์์ธก ์ ๋ขฐ๋**: {bar} {confidence:.0%}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| |
| |
| @classmethod |
| def format_comparison(cls, data: Dict, language: str = "EN") -> str: |
| """๋น๊ต ๋ถ์์ฉ ํฌ๋งท - ๋งคํธ๋ฆญ์ค, ์ฅ๋จ์ , ๊ถ์ฅ""" |
| if language == "KR": |
| return cls._format_comparison_kr(data) |
| return cls._format_comparison_en(data) |
| |
| @classmethod |
| def _format_comparison_en(cls, data: Dict) -> str: |
| lines = ["### โ๏ธ Comparison Matrix", ""] |
| |
| |
| items = data.get("comparison_items", []) |
| criteria = data.get("criteria", []) |
| |
| if items and criteria: |
| |
| header = "| Criteria |" |
| separator = "|----------|" |
| for item in items[:4]: |
| header += f" {item.get('name', 'Option')[:15]} |" |
| separator += "--------|" |
| lines.append(header) |
| lines.append(separator) |
| |
| |
| for crit in criteria[:6]: |
| row = f"| **{crit.get('name', 'Criterion')[:20]}** |" |
| for item in items[:4]: |
| score = item.get("scores", {}).get(crit.get("name", ""), "-") |
| if isinstance(score, (int, float)): |
| score = f"{'โญ' * int(score)}" if score <= 5 else str(score) |
| row += f" {str(score)[:10]} |" |
| lines.append(row) |
| lines.append("") |
| |
| |
| pros_cons = data.get("pros_cons", {}) |
| if pros_cons: |
| lines.append("#### Pros & Cons Summary") |
| for item_name, pc in pros_cons.items(): |
| lines.append(f"**{item_name}**") |
| if pc.get("pros"): |
| lines.append(f" - โ
Pros: {', '.join(pc['pros'][:3])}") |
| if pc.get("cons"): |
| lines.append(f" - โ Cons: {', '.join(pc['cons'][:3])}") |
| lines.append("") |
| |
| |
| recommendation = data.get("recommendation", "") |
| if recommendation: |
| lines.append(f"#### ๐ก Recommendation") |
| lines.append(f"> {recommendation}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| @classmethod |
| def _format_comparison_kr(cls, data: Dict) -> str: |
| lines = ["### โ๏ธ ๋น๊ต ๋งคํธ๋ฆญ์ค", ""] |
| |
| |
| items = data.get("comparison_items", []) |
| criteria = data.get("criteria", []) |
| |
| if items and criteria: |
| header = "| ๊ธฐ์ค |" |
| separator = "|------|" |
| for item in items[:4]: |
| header += f" {item.get('name', '์ต์
')[:15]} |" |
| separator += "--------|" |
| lines.append(header) |
| lines.append(separator) |
| |
| for crit in criteria[:6]: |
| row = f"| **{crit.get('name', '๊ธฐ์ค')[:20]}** |" |
| for item in items[:4]: |
| score = item.get("scores", {}).get(crit.get("name", ""), "-") |
| if isinstance(score, (int, float)): |
| score = f"{'โญ' * int(score)}" if score <= 5 else str(score) |
| row += f" {str(score)[:10]} |" |
| lines.append(row) |
| lines.append("") |
| |
| |
| pros_cons = data.get("pros_cons", {}) |
| if pros_cons: |
| lines.append("#### ์ฅ๋จ์ ์์ฝ") |
| for item_name, pc in pros_cons.items(): |
| lines.append(f"**{item_name}**") |
| if pc.get("pros"): |
| lines.append(f" - โ
์ฅ์ : {', '.join(pc['pros'][:3])}") |
| if pc.get("cons"): |
| lines.append(f" - โ ๋จ์ : {', '.join(pc['cons'][:3])}") |
| lines.append("") |
| |
| |
| recommendation = data.get("recommendation", "") |
| if recommendation: |
| lines.append(f"#### ๐ก ๊ถ์ฅ์ฌํญ") |
| lines.append(f"> {recommendation}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| |
| |
| @classmethod |
| def format_strategy(cls, data: Dict, language: str = "EN") -> str: |
| """์ ๋ต ๋ถ์์ฉ ํฌ๋งท - ๋ชฉํ, ๋ก๋๋งต, KPI""" |
| if language == "KR": |
| return cls._format_strategy_kr(data) |
| return cls._format_strategy_en(data) |
| |
| @classmethod |
| def _format_strategy_en(cls, data: Dict) -> str: |
| lines = ["### ๐ฏ Strategic Framework", ""] |
| |
| |
| objectives = data.get("objectives", []) |
| if objectives: |
| lines.append("#### Strategic Objectives") |
| for i, obj in enumerate(objectives[:5], 1): |
| lines.append(f"{i}. **{obj.get('name', 'Objective')}**: {obj.get('description', '')[:60]}") |
| lines.append("") |
| |
| |
| roadmap = data.get("roadmap", []) |
| if roadmap: |
| lines.append("#### Implementation Roadmap") |
| lines.append("| Phase | Timeline | Key Actions | Deliverables |") |
| lines.append("|-------|----------|-------------|--------------|") |
| for phase in roadmap[:5]: |
| actions = phase.get("actions", ["-"]) |
| deliverables = phase.get("deliverables", ["-"]) |
| lines.append(f"| {phase.get('name', 'Phase')} | {phase.get('timeline', '-')} | {', '.join(actions[:2])} | {', '.join(deliverables[:2])} |") |
| lines.append("") |
| |
| |
| kpis = data.get("kpis", []) |
| if kpis: |
| lines.append("#### Key Performance Indicators") |
| for kpi in kpis[:5]: |
| target = kpi.get("target", "TBD") |
| lines.append(f"- ๐ **{kpi.get('name', 'KPI')}**: Target {target}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| @classmethod |
| def _format_strategy_kr(cls, data: Dict) -> str: |
| lines = ["### ๐ฏ ์ ๋ต ํ๋ ์์ํฌ", ""] |
| |
| |
| objectives = data.get("objectives", []) |
| if objectives: |
| lines.append("#### ์ ๋ต ๋ชฉํ") |
| for i, obj in enumerate(objectives[:5], 1): |
| lines.append(f"{i}. **{obj.get('name', '๋ชฉํ')}**: {obj.get('description', '')[:60]}") |
| lines.append("") |
| |
| |
| roadmap = data.get("roadmap", []) |
| if roadmap: |
| lines.append("#### ์คํ ๋ก๋๋งต") |
| lines.append("| ๋จ๊ณ | ๊ธฐ๊ฐ | ์ฃผ์ ํ๋ | ์ฐ์ถ๋ฌผ |") |
| lines.append("|------|------|----------|--------|") |
| for phase in roadmap[:5]: |
| actions = phase.get("actions", ["-"]) |
| deliverables = phase.get("deliverables", ["-"]) |
| lines.append(f"| {phase.get('name', '๋จ๊ณ')} | {phase.get('timeline', '-')} | {', '.join(actions[:2])} | {', '.join(deliverables[:2])} |") |
| lines.append("") |
| |
| |
| kpis = data.get("kpis", []) |
| if kpis: |
| lines.append("#### ํต์ฌ ์ฑ๊ณผ ์งํ (KPI)") |
| for kpi in kpis[:5]: |
| target = kpi.get("target", "TBD") |
| lines.append(f"- ๐ **{kpi.get('name', 'KPI')}**: ๋ชฉํ {target}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| |
| |
| @classmethod |
| def format_analysis(cls, data: Dict, language: str = "EN") -> str: |
| """์ผ๋ฐ ๋ถ์์ฉ ํฌ๋งท""" |
| if language == "KR": |
| return cls._format_analysis_kr(data) |
| return cls._format_analysis_en(data) |
| |
| @classmethod |
| def _format_analysis_en(cls, data: Dict) -> str: |
| lines = ["### ๐ฌ Analysis Framework", ""] |
| |
| |
| findings = data.get("findings", []) |
| if findings: |
| lines.append("#### Key Findings") |
| for i, f in enumerate(findings[:5], 1): |
| lines.append(f"{i}. {f}") |
| lines.append("") |
| |
| |
| causes = data.get("causes", []) |
| if causes: |
| lines.append("#### Root Cause Analysis") |
| for c in causes[:4]: |
| lines.append(f"- **{c.get('cause', 'Cause')}** โ {c.get('effect', 'Effect')[:50]}") |
| lines.append("") |
| |
| |
| data_points = data.get("data_points", []) |
| if data_points: |
| lines.append("#### Supporting Data") |
| for dp in data_points[:5]: |
| lines.append(f"- ๐ {dp}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| @classmethod |
| def _format_analysis_kr(cls, data: Dict) -> str: |
| lines = ["### ๐ฌ ๋ถ์ ํ๋ ์์ํฌ", ""] |
| |
| |
| findings = data.get("findings", []) |
| if findings: |
| lines.append("#### ์ฃผ์ ๋ฐ๊ฒฌ์ฌํญ") |
| for i, f in enumerate(findings[:5], 1): |
| lines.append(f"{i}. {f}") |
| lines.append("") |
| |
| |
| causes = data.get("causes", []) |
| if causes: |
| lines.append("#### ์์ธ ๋ถ์") |
| for c in causes[:4]: |
| lines.append(f"- **{c.get('cause', '์์ธ')}** โ {c.get('effect', '๊ฒฐ๊ณผ')[:50]}") |
| lines.append("") |
| |
| |
| data_points = data.get("data_points", []) |
| if data_points: |
| lines.append("#### ๊ทผ๊ฑฐ ๋ฐ์ดํฐ") |
| for dp in data_points[:5]: |
| lines.append(f"- ๐ {dp}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| |
| |
| @classmethod |
| def format_invention(cls, data: Dict, language: str = "EN") -> str: |
| """๋ฐ๋ช
/ํนํ์ฉ ํฌ๋งท""" |
| if language == "KR": |
| lines = ["### ๐ก ๋ฐ๋ช
๊ฐ์", ""] |
| |
| if data.get("invention_name"): |
| lines.append(f"**๋ฐ๋ช
์ ๋ช
์นญ**: {data['invention_name']}") |
| lines.append("") |
| |
| if data.get("problem"): |
| lines.append("#### ํด๊ฒฐํ๊ณ ์ ํ๋ ๊ณผ์ ") |
| lines.append(data["problem"]) |
| lines.append("") |
| |
| if data.get("solution"): |
| lines.append("#### ๊ณผ์ ํด๊ฒฐ ์๋จ") |
| lines.append(data["solution"]) |
| lines.append("") |
| |
| claims = data.get("claims", []) |
| if claims: |
| lines.append("#### ์ฒญ๊ตฌํญ ์ด์") |
| for i, claim in enumerate(claims[:5], 1): |
| lines.append(f"**์ฒญ๊ตฌํญ {i}**: {claim}") |
| lines.append("") |
| |
| if data.get("advantages"): |
| lines.append("#### ๋ฐ๋ช
์ ํจ๊ณผ") |
| for adv in data["advantages"][:5]: |
| lines.append(f"- {adv}") |
| lines.append("") |
| else: |
| lines = ["### ๐ก Invention Overview", ""] |
| |
| if data.get("invention_name"): |
| lines.append(f"**Title**: {data['invention_name']}") |
| lines.append("") |
| |
| if data.get("problem"): |
| lines.append("#### Problem to Solve") |
| lines.append(data["problem"]) |
| lines.append("") |
| |
| if data.get("solution"): |
| lines.append("#### Solution") |
| lines.append(data["solution"]) |
| lines.append("") |
| |
| claims = data.get("claims", []) |
| if claims: |
| lines.append("#### Draft Claims") |
| for i, claim in enumerate(claims[:5], 1): |
| lines.append(f"**Claim {i}**: {claim}") |
| lines.append("") |
| |
| if data.get("advantages"): |
| lines.append("#### Advantages") |
| for adv in data["advantages"][:5]: |
| lines.append(f"- {adv}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| |
| |
| @classmethod |
| def format_story(cls, data: Dict, language: str = "EN") -> str: |
| """์คํ ๋ฆฌ/์ฐฝ์์ฉ ํฌ๋งท""" |
| if language == "KR": |
| lines = ["### ๐ ์คํ ๋ฆฌ ๊ตฌ์กฐ", ""] |
| |
| if data.get("logline"): |
| lines.append(f"**๋ก๊ทธ๋ผ์ธ**: {data['logline']}") |
| lines.append("") |
| |
| characters = data.get("characters", []) |
| if characters: |
| lines.append("#### ๋ฑ์ฅ์ธ๋ฌผ") |
| for char in characters[:5]: |
| lines.append(f"- **{char.get('name', '์บ๋ฆญํฐ')}**: {char.get('description', '')[:60]}") |
| lines.append("") |
| |
| structure = data.get("structure", {}) |
| if structure: |
| lines.append("#### ํ๋กฏ ๊ตฌ์กฐ") |
| for key, value in [("setup", "๋ฐ๋จ"), ("conflict", "๊ฐ๋ฑ"), ("climax", "์ ์ "), ("resolution", "๊ฒฐ๋ง")]: |
| if structure.get(key): |
| lines.append(f"- **{value}**: {structure[key][:80]}") |
| lines.append("") |
| |
| if data.get("theme"): |
| lines.append(f"#### ์ฃผ์ ") |
| lines.append(data["theme"]) |
| lines.append("") |
| else: |
| lines = ["### ๐ Story Structure", ""] |
| |
| if data.get("logline"): |
| lines.append(f"**Logline**: {data['logline']}") |
| lines.append("") |
| |
| characters = data.get("characters", []) |
| if characters: |
| lines.append("#### Characters") |
| for char in characters[:5]: |
| lines.append(f"- **{char.get('name', 'Character')}**: {char.get('description', '')[:60]}") |
| lines.append("") |
| |
| structure = data.get("structure", {}) |
| if structure: |
| lines.append("#### Plot Structure") |
| for key in ["setup", "conflict", "climax", "resolution"]: |
| if structure.get(key): |
| lines.append(f"- **{key.capitalize()}**: {structure[key][:80]}") |
| lines.append("") |
| |
| if data.get("theme"): |
| lines.append(f"#### Theme") |
| lines.append(data["theme"]) |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| |
| |
| @classmethod |
| def format_recipe(cls, data: Dict, language: str = "EN") -> str: |
| """๋ ์ํผ์ฉ ํฌ๋งท""" |
| if language == "KR": |
| lines = ["### ๐ณ ๋ ์ํผ", ""] |
| |
| if data.get("dish_name"): |
| lines.append(f"**์๋ฆฌ๋ช
**: {data['dish_name']}") |
| if data.get("servings"): |
| lines.append(f"**๋ถ๋**: {data['servings']}") |
| if data.get("time"): |
| lines.append(f"**์กฐ๋ฆฌ์๊ฐ**: {data['time']}") |
| lines.append("") |
| |
| ingredients = data.get("ingredients", []) |
| if ingredients: |
| lines.append("#### ์ฌ๋ฃ") |
| for ing in ingredients: |
| lines.append(f"- {ing}") |
| lines.append("") |
| |
| steps = data.get("steps", []) |
| if steps: |
| lines.append("#### ์กฐ๋ฆฌ ์์") |
| for i, step in enumerate(steps, 1): |
| lines.append(f"{i}. {step}") |
| lines.append("") |
| |
| tips = data.get("tips", []) |
| if tips: |
| lines.append("#### ๐ก ์๋ฆฌ ํ") |
| for tip in tips[:3]: |
| lines.append(f"- {tip}") |
| lines.append("") |
| else: |
| lines = ["### ๐ณ Recipe", ""] |
| |
| if data.get("dish_name"): |
| lines.append(f"**Dish**: {data['dish_name']}") |
| if data.get("servings"): |
| lines.append(f"**Servings**: {data['servings']}") |
| if data.get("time"): |
| lines.append(f"**Time**: {data['time']}") |
| lines.append("") |
| |
| ingredients = data.get("ingredients", []) |
| if ingredients: |
| lines.append("#### Ingredients") |
| for ing in ingredients: |
| lines.append(f"- {ing}") |
| lines.append("") |
| |
| steps = data.get("steps", []) |
| if steps: |
| lines.append("#### Instructions") |
| for i, step in enumerate(steps, 1): |
| lines.append(f"{i}. {step}") |
| lines.append("") |
| |
| tips = data.get("tips", []) |
| if tips: |
| lines.append("#### ๐ก Tips") |
| for tip in tips[:3]: |
| lines.append(f"- {tip}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| |
| |
| @classmethod |
| def format_general(cls, data: Dict, language: str = "EN") -> str: |
| """์ผ๋ฐ ๋ถ์์ฉ ๊ธฐ๋ณธ ํฌ๋งท""" |
| lines = [] |
| |
| |
| main_content = data.get("main_content", data.get("result", "")) |
| if main_content: |
| lines.append(main_content) |
| lines.append("") |
| |
| |
| key_points = data.get("key_points", data.get("deliverables", [])) |
| if key_points: |
| header = "#### ํต์ฌ ํฌ์ธํธ" if language == "KR" else "#### Key Points" |
| lines.append(header) |
| for point in key_points[:5]: |
| lines.append(f"- {point}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| |
| |
| @classmethod |
| def extract_typed_data(cls, element_outputs: Dict, question_type: str) -> Dict: |
| """ |
| Element ์ถ๋ ฅ์์ ์ง๋ฌธ ์ ํ์ ๋ง๋ ๊ตฌ์กฐํ๋ ๋ฐ์ดํฐ ์ถ์ถ |
| |
| ์ค์ LLM ์๋ต์์ ํจํด์ ์ธ์ํ์ฌ ๊ตฌ์กฐํํฉ๋๋ค. |
| """ |
| typed_data = {} |
| |
| fire_data = element_outputs.get("็ซ", {}) |
| water_data = element_outputs.get("ๆฐด", {}) |
| wood_data = element_outputs.get("ๆจ", {}) |
| metal_data = element_outputs.get("้", {}) |
| |
| |
| typed_data["main_content"] = fire_data.get("result", "") |
| typed_data["key_points"] = fire_data.get("deliverables", []) |
| typed_data["findings"] = water_data.get("findings", []) |
| typed_data["risks"] = metal_data.get("risks", []) |
| |
| |
| if question_type == "prediction": |
| |
| result_text = str(fire_data.get("result", "")) |
| scenarios = cls._extract_scenarios(result_text) |
| if scenarios: |
| typed_data["scenarios"] = scenarios |
| |
| |
| variables = [] |
| for risk in metal_data.get("risks", []): |
| variables.append({ |
| "name": str(risk)[:30], |
| "description": str(risk), |
| "impact": "high" if any(kw in str(risk).lower() for kw in ["์ค์", "ํต์ฌ", "critical", "major"]) else "medium" |
| }) |
| typed_data["key_variables"] = variables |
| |
| |
| elif question_type == "comparison": |
| |
| typed_data["comparison_items"] = [] |
| typed_data["criteria"] = [] |
| typed_data["recommendation"] = wood_data.get("key_insight", "") |
| |
| |
| elif question_type == "strategy": |
| |
| objectives = [] |
| for idea in wood_data.get("ideas", []): |
| objectives.append({ |
| "name": str(idea)[:50], |
| "description": str(idea) |
| }) |
| typed_data["objectives"] = objectives |
| typed_data["kpis"] = [] |
| |
| return typed_data |
| |
| @classmethod |
| def _extract_scenarios(cls, text: str) -> List[Dict]: |
| """ํ
์คํธ์์ ์๋๋ฆฌ์ค ํจํด ์ถ์ถ""" |
| scenarios = [] |
| |
| |
| patterns = [ |
| r'์๋๋ฆฌ์ค\s*[A-Za-z0-9๊ฐ-ํฃ]+[:\s]+(.{10,100})', |
| r'Scenario\s*[A-Za-z0-9]+[:\s]+(.{10,100})', |
| r'(\d+)\s*[%]?\s*ํ๋ฅ [๋ก์ผ]?\s+(.{10,80})', |
| r'(\d+)\s*[%]\s*probability[:\s]+(.{10,80})', |
| ] |
| |
| for pattern in patterns: |
| matches = re.findall(pattern, text, re.IGNORECASE) |
| for match in matches[:4]: |
| if isinstance(match, tuple): |
| scenarios.append({ |
| "name": f"Scenario", |
| "probability": match[0] if match[0].isdigit() else "?", |
| "description": match[-1] if len(match) > 1 else match[0], |
| "drivers": "-", |
| "timeline": "-" |
| }) |
| else: |
| scenarios.append({ |
| "name": "Scenario", |
| "description": match, |
| "probability": "?", |
| "drivers": "-", |
| "timeline": "-" |
| }) |
| |
| return scenarios[:4] |
|
|
|
|
| |
|
|
| class ProfessionalReportGenerator: |
| """ |
| ์ ๋ฌธ ๋ณด๊ณ ์ ์์ฑ๊ธฐ (ReportGenerator ๋์ฒด) |
| |
| ๊ธฐ์กด ReportGenerator๋ฅผ ํ์ฅํ์ฌ: |
| - Executive Summary ์ถ๊ฐ |
| - ์ง๋ฌธ ์ ํ๋ณ ๋ง์ถค ํฌ๋งท |
| - ์ถ์ฒ ๊ด๋ฆฌ ํตํฉ |
| - ํ์ง ๋์๋ณด๋ ์๋จ ๋ฐฐ์น |
| """ |
| |
| def __init__(self, source_manager: Optional[SourceManager] = None): |
| self.source_manager = source_manager or SourceManager() |
| |
| def generate(self, state, stats: Dict, source_manager: Optional[SourceManager] = None) -> str: |
| """ |
| ์ ๋ฌธ ๋ณด๊ณ ์ ์์ฑ |
| |
| Args: |
| state: CycleState ๊ฐ์ฒด |
| stats: ๋ฉ๋ชจ๋ฆฌ ํต๊ณ |
| source_manager: ์ถ์ฒ ๊ด๋ฆฌ์ (์ ํ) |
| |
| Returns: |
| ๋งํฌ๋ค์ด ํ์์ ์ ๋ฌธ ๋ณด๊ณ ์ |
| """ |
| if source_manager: |
| self.source_manager = source_manager |
| |
| language = getattr(state, 'language', 'EN') |
| |
| |
| goal = state.goal |
| score = state.satisfaction_score |
| iterations = state.iteration |
| element_outputs = state.element_outputs |
| question_type = getattr(state, 'goal_clarity', None) |
| if question_type and hasattr(question_type, 'goal_type'): |
| question_type = question_type.goal_type |
| else: |
| question_type = self._detect_question_type(goal) |
| |
| |
| earth_data = element_outputs.get("ๅ", {}) |
| metal_data = element_outputs.get("้", {}) |
| water_data = element_outputs.get("ๆฐด", {}) |
| wood_data = element_outputs.get("ๆจ", {}) |
| fire_data = element_outputs.get("็ซ", {}) |
| |
| |
| metacog_assessments = getattr(state, 'metacog_assessments', []) |
| |
| |
| if language == "KR": |
| return self._generate_korean( |
| goal, score, iterations, question_type, |
| earth_data, metal_data, water_data, wood_data, fire_data, |
| metacog_assessments, state.session_id |
| ) |
| else: |
| return self._generate_english( |
| goal, score, iterations, question_type, |
| earth_data, metal_data, water_data, wood_data, fire_data, |
| metacog_assessments, state.session_id |
| ) |
| |
| def _generate_english(self, goal, score, iterations, question_type, |
| earth_data, metal_data, water_data, wood_data, fire_data, |
| metacog_assessments, session_id) -> str: |
| """์์ด ๋ณด๊ณ ์ ์์ฑ""" |
| sections = [] |
| |
| |
| sections.append(self._header_section_en(goal, question_type)) |
| |
| |
| sections.append(self._executive_summary_en( |
| goal, score, iterations, fire_data, wood_data, metacog_assessments |
| )) |
| |
| |
| sections.append(self._quality_dashboard_en(metacog_assessments, score)) |
| |
| |
| sections.append(self._main_result_en(fire_data, earth_data)) |
| |
| |
| typed_data = TypedOutputFormatter.extract_typed_data( |
| {"ๅ": earth_data, "้": metal_data, "ๆฐด": water_data, "ๆจ": wood_data, "็ซ": fire_data}, |
| question_type |
| ) |
| type_section = TypedOutputFormatter.format(question_type, typed_data, "EN") |
| if type_section.strip(): |
| sections.append(type_section) |
| |
| |
| sections.append(self._insights_section_en(wood_data)) |
| |
| |
| sections.append(self._facts_risks_section_en(metal_data)) |
| |
| |
| sections.append(self._findings_section_en(water_data)) |
| |
| |
| sections.append(self.source_manager.format_references_section("EN")) |
| |
| |
| sections.append(self._metadata_section_en(session_id, iterations, score)) |
| |
| return "\n".join(filter(None, sections)) |
| |
| def _generate_korean(self, goal, score, iterations, question_type, |
| earth_data, metal_data, water_data, wood_data, fire_data, |
| metacog_assessments, session_id) -> str: |
| """ํ๊ตญ์ด ๋ณด๊ณ ์ ์์ฑ""" |
| sections = [] |
| |
| |
| sections.append(self._header_section_kr(goal, question_type)) |
| |
| |
| sections.append(self._executive_summary_kr( |
| goal, score, iterations, fire_data, wood_data, metacog_assessments |
| )) |
| |
| |
| sections.append(self._quality_dashboard_kr(metacog_assessments, score)) |
| |
| |
| sections.append(self._main_result_kr(fire_data, earth_data)) |
| |
| |
| typed_data = TypedOutputFormatter.extract_typed_data( |
| {"ๅ": earth_data, "้": metal_data, "ๆฐด": water_data, "ๆจ": wood_data, "็ซ": fire_data}, |
| question_type |
| ) |
| type_section = TypedOutputFormatter.format(question_type, typed_data, "KR") |
| if type_section.strip(): |
| sections.append(type_section) |
| |
| |
| sections.append(self._insights_section_kr(wood_data)) |
| |
| |
| sections.append(self._facts_risks_section_kr(metal_data)) |
| |
| |
| sections.append(self._findings_section_kr(water_data)) |
| |
| |
| sections.append(self.source_manager.format_references_section("KR")) |
| |
| |
| sections.append(self._metadata_section_kr(session_id, iterations, score)) |
| |
| return "\n".join(filter(None, sections)) |
| |
| |
| |
| def _header_section_en(self, goal: str, question_type: str) -> str: |
| type_header = TypedOutputFormatter.get_type_header(question_type, "EN") |
| return f"""# ๐ฏ Analysis Report |
| |
| --- |
| |
| ## ๐ Question |
| > **{goal}** |
| |
| {type_header} |
| |
| --- |
| """ |
| |
| def _executive_summary_en(self, goal, score, iterations, fire_data, wood_data, metacog_assessments) -> str: |
| """Executive Summary ์น์
""" |
| lines = ["## ๐ Executive Summary", ""] |
| |
| |
| main_result = fire_data.get("result", "") |
| key_insight = wood_data.get("key_insight", wood_data.get("reframe", "")) |
| deliverables = fire_data.get("deliverables", []) |
| |
| conclusions = [] |
| if main_result: |
| |
| first_sentence = main_result.split('.')[0][:150] if main_result else "" |
| if first_sentence: |
| conclusions.append(first_sentence) |
| if key_insight: |
| conclusions.append(str(key_insight)[:100]) |
| if deliverables and len(conclusions) < 3: |
| for d in deliverables[:2]: |
| if len(conclusions) >= 3: |
| break |
| conclusions.append(str(d)[:80]) |
| |
| if conclusions: |
| lines.append("### ๐ฏ Key Conclusions") |
| for i, c in enumerate(conclusions[:3], 1): |
| lines.append(f"{i}. {c}{'...' if len(c) >= 80 else ''}") |
| lines.append("") |
| |
| |
| score_bar = 'โ' * int(score * 10) + 'โ' * (10 - int(score * 10)) |
| avg_quality = 0 |
| if metacog_assessments: |
| avg_quality = sum(a.overall_score for a in metacog_assessments) / len(metacog_assessments) |
| quality_bar = 'โ' * int(avg_quality * 10) + 'โ' * (10 - int(avg_quality * 10)) |
| |
| lines.append("### ๐ Analysis Metrics") |
| lines.append(f"| Metric | Score |") |
| lines.append(f"|--------|-------|") |
| lines.append(f"| Confidence | {score_bar} **{score:.0%}** |") |
| lines.append(f"| Quality | {quality_bar} **{avg_quality:.0%}** |") |
| lines.append(f"| Iterations | **{iterations}** cycles |") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _quality_dashboard_en(self, metacog_assessments, score) -> str: |
| """ํ์ง ๋์๋ณด๋ ์น์
""" |
| if not metacog_assessments: |
| return "" |
| |
| lines = ["## ๐ง Quality Dashboard", ""] |
| |
| |
| avg_factual = sum(a.factual_confidence for a in metacog_assessments) / len(metacog_assessments) |
| avg_logic = sum(a.logical_coherence for a in metacog_assessments) / len(metacog_assessments) |
| avg_complete = sum(a.completeness for a in metacog_assessments) / len(metacog_assessments) |
| avg_specific = sum(a.specificity for a in metacog_assessments) / len(metacog_assessments) |
| |
| lines.append("| Dimension | Score | Status |") |
| lines.append("|-----------|-------|--------|") |
| |
| for name, val in [ |
| ("Factual Grounding", avg_factual), |
| ("Logical Coherence", avg_logic), |
| ("Completeness", avg_complete), |
| ("Specificity", avg_specific) |
| ]: |
| bar = 'โ' * int(val * 10) + 'โ' * (10 - int(val * 10)) |
| status = "โ
" if val >= 0.7 else "โ ๏ธ" if val >= 0.5 else "โ" |
| lines.append(f"| {name} | {bar} {val:.0%} | {status} |") |
| |
| lines.append("") |
| |
| |
| all_recommendations = [] |
| for a in metacog_assessments[-3:]: |
| all_recommendations.extend(a.recommendations) |
| |
| if all_recommendations: |
| unique_recs = list(dict.fromkeys(all_recommendations))[:3] |
| lines.append("### ๐ก Improvement Suggestions") |
| for rec in unique_recs: |
| lines.append(f"- {rec}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _main_result_en(self, fire_data, earth_data) -> str: |
| """๋ฉ์ธ ๊ฒฐ๊ณผ ์น์
""" |
| lines = ["## ๐ก Main Analysis Result", ""] |
| |
| main_result = fire_data.get("result", "") |
| if main_result: |
| lines.append(main_result) |
| lines.append("") |
| else: |
| assessment = earth_data.get("assessment", "") |
| if assessment: |
| lines.append(assessment) |
| lines.append("") |
| else: |
| lines.append("*Analysis not completed. Please increase iteration count and try again.*") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _insights_section_en(self, wood_data) -> str: |
| """์ธ์ฌ์ดํธ ์น์
""" |
| lines = [] |
| |
| key_insight = wood_data.get("key_insight", wood_data.get("reframe", "")) |
| top3 = wood_data.get("selected_top3", wood_data.get("ideas", [])) |
| |
| if key_insight or top3: |
| lines.append("## ๐ Key Insights") |
| lines.append("") |
| |
| if key_insight: |
| lines.append(f"> ๐ก **Core Insight**: {key_insight}") |
| lines.append("") |
| |
| if top3: |
| lines.append("### Top Ideas") |
| for i, idea in enumerate(top3[:3], 1): |
| lines.append(f"**{i}.** {idea}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _facts_risks_section_en(self, metal_data) -> str: |
| """๊ฒ์ฆ๋ ์ฌ์ค ๋ฐ ๋ฆฌ์คํฌ ์น์
""" |
| lines = [] |
| |
| verified = metal_data.get("verified_facts", []) |
| risks = metal_data.get("risks", []) |
| critiques = metal_data.get("critiques", []) |
| |
| if verified or risks or critiques: |
| lines.append("## โ๏ธ Verification & Risks") |
| lines.append("") |
| |
| if verified: |
| lines.append("### โ
Verified Facts") |
| for fact in verified[:5]: |
| lines.append(f"- {fact}") |
| lines.append("") |
| |
| if risks: |
| lines.append("### โ ๏ธ Identified Risks") |
| for risk in risks[:4]: |
| lines.append(f"- ๐ธ {risk}") |
| lines.append("") |
| |
| if critiques: |
| lines.append("### ๐ Critical Points") |
| for critique in critiques[:3]: |
| lines.append(f"- {critique}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _findings_section_en(self, water_data) -> str: |
| """์ฐ๊ตฌ ๋ฐ๊ฒฌ ์น์
""" |
| lines = [] |
| |
| findings = water_data.get("findings", []) |
| key_data = water_data.get("key_data", []) |
| expert_views = water_data.get("expert_views", []) |
| |
| if findings or key_data: |
| lines.append("## ๐ Research Findings") |
| lines.append("") |
| |
| if findings: |
| lines.append("### Key Findings") |
| for finding in findings[:5]: |
| lines.append(f"- {finding}") |
| lines.append("") |
| |
| if key_data: |
| lines.append("### Supporting Data") |
| for data in key_data[:3]: |
| lines.append(f"- ๐ {data}") |
| lines.append("") |
| |
| if expert_views: |
| lines.append("### Expert Perspectives") |
| for view in expert_views[:2]: |
| lines.append(f"- ๐ฌ {view}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _metadata_section_en(self, session_id, iterations, score) -> str: |
| """๋ฉํ๋ฐ์ดํฐ ์น์
""" |
| from datetime import datetime |
| |
| return f"""--- |
| |
| ## ๐ Analysis Information |
| |
| | Item | Value | |
| |------|-------| |
| | Session ID | `{session_id}` | |
| | Iterations | {iterations} cycles | |
| | Final Confidence | {score:.0%} | |
| | Generated At | {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} | |
| | Engine | AETHER Proto-AGI v2.2 | |
| |
| --- |
| |
| *This report was generated by AETHER Proto-AGI (SOMA Five Elements ยท SLAI Self-Learning ยท MAIA Emergence)* |
| """ |
| |
| |
| |
| def _header_section_kr(self, goal: str, question_type: str) -> str: |
| type_header = TypedOutputFormatter.get_type_header(question_type, "KR") |
| return f"""# ๐ฏ ๋ถ์ ๋ณด๊ณ ์ |
| |
| --- |
| |
| ## ๐ ์ง๋ฌธ |
| > **{goal}** |
| |
| {type_header} |
| |
| --- |
| """ |
| |
| def _executive_summary_kr(self, goal, score, iterations, fire_data, wood_data, metacog_assessments) -> str: |
| """Executive Summary ์น์
(ํ๊ตญ์ด)""" |
| lines = ["## ๐ ํต์ฌ ์์ฝ (Executive Summary)", ""] |
| |
| |
| main_result = fire_data.get("result", "") |
| key_insight = wood_data.get("key_insight", wood_data.get("reframe", "")) |
| deliverables = fire_data.get("deliverables", []) |
| |
| conclusions = [] |
| if main_result: |
| first_sentence = main_result.split('.')[0][:150] if main_result else "" |
| if first_sentence: |
| conclusions.append(first_sentence) |
| if key_insight: |
| conclusions.append(str(key_insight)[:100]) |
| if deliverables and len(conclusions) < 3: |
| for d in deliverables[:2]: |
| if len(conclusions) >= 3: |
| break |
| conclusions.append(str(d)[:80]) |
| |
| if conclusions: |
| lines.append("### ๐ฏ ํต์ฌ ๊ฒฐ๋ก ") |
| for i, c in enumerate(conclusions[:3], 1): |
| lines.append(f"{i}. {c}{'...' if len(c) >= 80 else ''}") |
| lines.append("") |
| |
| |
| score_bar = 'โ' * int(score * 10) + 'โ' * (10 - int(score * 10)) |
| avg_quality = 0 |
| if metacog_assessments: |
| avg_quality = sum(a.overall_score for a in metacog_assessments) / len(metacog_assessments) |
| quality_bar = 'โ' * int(avg_quality * 10) + 'โ' * (10 - int(avg_quality * 10)) |
| |
| lines.append("### ๐ ๋ถ์ ์งํ") |
| lines.append(f"| ์งํ | ์ ์ |") |
| lines.append(f"|------|------|") |
| lines.append(f"| ์ ๋ขฐ๋ | {score_bar} **{score:.0%}** |") |
| lines.append(f"| ํ์ง | {quality_bar} **{avg_quality:.0%}** |") |
| lines.append(f"| ์ํ | **{iterations}**ํ |") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _quality_dashboard_kr(self, metacog_assessments, score) -> str: |
| """ํ์ง ๋์๋ณด๋ ์น์
(ํ๊ตญ์ด)""" |
| if not metacog_assessments: |
| return "" |
| |
| lines = ["## ๐ง ํ์ง ๋์๋ณด๋", ""] |
| |
| avg_factual = sum(a.factual_confidence for a in metacog_assessments) / len(metacog_assessments) |
| avg_logic = sum(a.logical_coherence for a in metacog_assessments) / len(metacog_assessments) |
| avg_complete = sum(a.completeness for a in metacog_assessments) / len(metacog_assessments) |
| avg_specific = sum(a.specificity for a in metacog_assessments) / len(metacog_assessments) |
| |
| lines.append("| ์ฐจ์ | ์ ์ | ์ํ |") |
| lines.append("|------|------|------|") |
| |
| for name, val in [ |
| ("์ฌ์ค ๊ทผ๊ฑฐ", avg_factual), |
| ("๋
ผ๋ฆฌ์ ์ผ๊ด์ฑ", avg_logic), |
| ("์์ฑ๋", avg_complete), |
| ("๊ตฌ์ฒด์ฑ", avg_specific) |
| ]: |
| bar = 'โ' * int(val * 10) + 'โ' * (10 - int(val * 10)) |
| status = "โ
" if val >= 0.7 else "โ ๏ธ" if val >= 0.5 else "โ" |
| lines.append(f"| {name} | {bar} {val:.0%} | {status} |") |
| |
| lines.append("") |
| |
| |
| all_recommendations = [] |
| for a in metacog_assessments[-3:]: |
| all_recommendations.extend(a.recommendations) |
| |
| if all_recommendations: |
| unique_recs = list(dict.fromkeys(all_recommendations))[:3] |
| lines.append("### ๐ก ๊ฐ์ ๊ถ์ฅ์ฌํญ") |
| for rec in unique_recs: |
| lines.append(f"- {rec}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _main_result_kr(self, fire_data, earth_data) -> str: |
| """๋ฉ์ธ ๊ฒฐ๊ณผ ์น์
(ํ๊ตญ์ด)""" |
| lines = ["## ๐ก ํต์ฌ ๋ถ์ ๊ฒฐ๊ณผ", ""] |
| |
| main_result = fire_data.get("result", "") |
| if main_result: |
| lines.append(main_result) |
| lines.append("") |
| else: |
| assessment = earth_data.get("assessment", "") |
| if assessment: |
| lines.append(assessment) |
| lines.append("") |
| else: |
| lines.append("*๋ถ์์ด ์๋ฃ๋์ง ์์์ต๋๋ค. ์ํ ํ์๋ฅผ ๋๋ ค ๋ค์ ์๋ํด์ฃผ์ธ์.*") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _insights_section_kr(self, wood_data) -> str: |
| """์ธ์ฌ์ดํธ ์น์
(ํ๊ตญ์ด)""" |
| lines = [] |
| |
| key_insight = wood_data.get("key_insight", wood_data.get("reframe", "")) |
| top3 = wood_data.get("selected_top3", wood_data.get("ideas", [])) |
| |
| if key_insight or top3: |
| lines.append("## ๐ ํต์ฌ ์ธ์ฌ์ดํธ") |
| lines.append("") |
| |
| if key_insight: |
| lines.append(f"> ๐ก **ํต์ฌ ํต์ฐฐ**: {key_insight}") |
| lines.append("") |
| |
| if top3: |
| lines.append("### ์ฃผ์ ์์ด๋์ด") |
| for i, idea in enumerate(top3[:3], 1): |
| lines.append(f"**{i}.** {idea}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _facts_risks_section_kr(self, metal_data) -> str: |
| """๊ฒ์ฆ๋ ์ฌ์ค ๋ฐ ๋ฆฌ์คํฌ ์น์
(ํ๊ตญ์ด)""" |
| lines = [] |
| |
| verified = metal_data.get("verified_facts", []) |
| risks = metal_data.get("risks", []) |
| critiques = metal_data.get("critiques", []) |
| |
| if verified or risks or critiques: |
| lines.append("## โ๏ธ ๊ฒ์ฆ ๋ฐ ๋ฆฌ์คํฌ") |
| lines.append("") |
| |
| if verified: |
| lines.append("### โ
๊ฒ์ฆ๋ ์ฌ์ค") |
| for fact in verified[:5]: |
| lines.append(f"- {fact}") |
| lines.append("") |
| |
| if risks: |
| lines.append("### โ ๏ธ ์๋ณ๋ ๋ฆฌ์คํฌ") |
| for risk in risks[:4]: |
| lines.append(f"- ๐ธ {risk}") |
| lines.append("") |
| |
| if critiques: |
| lines.append("### ๐ ๋นํ์ ๊ฒํ ") |
| for critique in critiques[:3]: |
| lines.append(f"- {critique}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _findings_section_kr(self, water_data) -> str: |
| """์ฐ๊ตฌ ๋ฐ๊ฒฌ ์น์
(ํ๊ตญ์ด)""" |
| lines = [] |
| |
| findings = water_data.get("findings", []) |
| key_data = water_data.get("key_data", []) |
| expert_views = water_data.get("expert_views", []) |
| |
| if findings or key_data: |
| lines.append("## ๐ ์ฐ๊ตฌ ๋ฐ๊ฒฌ") |
| lines.append("") |
| |
| if findings: |
| lines.append("### ์ฃผ์ ๋ฐ๊ฒฌ") |
| for finding in findings[:5]: |
| lines.append(f"- {finding}") |
| lines.append("") |
| |
| if key_data: |
| lines.append("### ๊ทผ๊ฑฐ ๋ฐ์ดํฐ") |
| for data in key_data[:3]: |
| lines.append(f"- ๐ {data}") |
| lines.append("") |
| |
| if expert_views: |
| lines.append("### ์ ๋ฌธ๊ฐ ์๊ฒฌ") |
| for view in expert_views[:2]: |
| lines.append(f"- ๐ฌ {view}") |
| lines.append("") |
| |
| return "\n".join(lines) |
| |
| def _metadata_section_kr(self, session_id, iterations, score) -> str: |
| """๋ฉํ๋ฐ์ดํฐ ์น์
(ํ๊ตญ์ด)""" |
| from datetime import datetime |
| |
| return f"""--- |
| |
| ## ๐ ๋ถ์ ์ ๋ณด |
| |
| | ํญ๋ชฉ | ๊ฐ | |
| |------|-----| |
| | ์ธ์
ID | `{session_id}` | |
| | ๋ถ์ ์ํ | {iterations}ํ | |
| | ์ต์ข
์ ๋ขฐ๋ | {score:.0%} | |
| | ์์ฑ ์๊ฐ | {datetime.now().strftime("%Y๋
%m์ %d์ผ %H:%M:%S")} | |
| | ์์ง | AETHER Proto-AGI v2.2 | |
| |
| --- |
| |
| *์ด ๋ณด๊ณ ์๋ AETHER Proto-AGI (SOMA ์คํ ์ํ ยท SLAI ์๊ธฐํ์ต ยท MAIA ์ฐฝ๋ฐ)์ ์ํด ์๋ ์์ฑ๋์์ต๋๋ค.* |
| """ |
| |
| |
| |
| def _detect_question_type(self, goal: str) -> str: |
| """์ง๋ฌธ ์ ํ ๊ฐ์ง""" |
| goal_lower = goal.lower() |
| |
| if any(kw in goal_lower for kw in ['์์ธก', '์ ๋ง', '๋ ๊น', '๋ ๊ฒ', '๋ฏธ๋', '2025', '2026', '2027', '2028', 'predict', 'forecast']): |
| return "prediction" |
| elif any(kw in goal_lower for kw in ['์ ๋ต', '๋ฐฉ๋ฒ', '์ด๋ป๊ฒ', '๋ฐฉ์', '๊ณํ', '์๋ฆฝ', 'strategy', 'how to', 'plan']): |
| return "strategy" |
| elif any(kw in goal_lower for kw in ['๋น๊ต', '์ฐจ์ด', 'vs', 'VS', '๋', '์ด๋ ๊ฒ', 'compare', 'versus', 'difference']): |
| return "comparison" |
| elif any(kw in goal_lower for kw in ['๋ถ์', '์', '์์ธ', '์ํฅ', 'ํํฉ', 'analyze', 'why', 'cause', 'impact']): |
| return "analysis" |
| elif any(kw in goal_lower for kw in ['ํนํ', '๋ฐ๋ช
', '์์ด๋์ด', 'ํ์ ', 'patent', 'invention', 'innovate']): |
| return "invention" |
| elif any(kw in goal_lower for kw in ['์์ค', '์คํ ๋ฆฌ', '์๋๋ฆฌ์ค', '์นํฐ', 'story', 'novel', 'script']): |
| return "story" |
| elif any(kw in goal_lower for kw in ['์๋ฆฌ', '๋ ์ํผ', '์์', 'recipe', 'cook', 'dish']): |
| return "recipe" |
| else: |
| return "general" |
| |
| @staticmethod |
| def generate_progress(state) -> str: |
| """์งํ ์ค ๋ณด๊ณ ์ (๊ธฐ์กด ํธํ)""" |
| progress_bar = 'โ' * int(state.satisfaction_score * 10) + 'โ' * (10 - int(state.satisfaction_score * 10)) |
| |
| current_element = "" |
| if state.history: |
| last = state.history[-1] |
| elem_name = last.get("element", "") |
| elem_map = { |
| "ๅ": "๐ค ๅ ๊ฐ๋
", "้": "โช ้ ๋นํ", "ๆฐด": "๐ต ๆฐด ๋ฆฌ์์น", |
| "ๆจ": "๐ข ๆจ ์ฐฝ๋ฐ", "็ซ": "๐ด ็ซ ์คํ" |
| } |
| current_element = elem_map.get(elem_name, elem_name) |
| |
| language = getattr(state, 'language', 'EN') |
| |
| if language == "KR": |
| return f"""# โณ ๋ถ์ ์งํ ์ค... |
| |
| ## ๐ ์ง๋ฌธ |
| > **{state.goal}** |
| |
| ## ๐ ํ์ฌ ์ํ |
| | ํญ๋ชฉ | ์ํ | |
| |------|------| |
| | ์งํ ์ํ | {state.iteration}ํ | |
| | ํ์ฌ ๋จ๊ณ | {current_element} | |
| | ์งํ๋ | {progress_bar} {state.satisfaction_score:.0%} | |
| |
| --- |
| ๐ก **์ค์ผ์คํธ๋ ์ด์
๋ก๊ทธ**๋ฅผ ํผ์ณ์ ์ค์๊ฐ ๋ถ์ ๊ณผ์ ์ ํ์ธํ์ธ์. |
| |
| *ๅ(๊ฐ๋
) โ ้(๋นํ) โ ๆฐด(๋ฆฌ์์น) โ ๆจ(์ฐฝ๋ฐ) โ ็ซ(์คํ) ์ํ ์ค...* |
| """ |
| else: |
| return f"""# โณ Analysis in Progress... |
| |
| ## ๐ Question |
| > **{state.goal}** |
| |
| ## ๐ Current Status |
| | Item | Status | |
| |------|--------| |
| | Iteration | {state.iteration} | |
| | Current Stage | {current_element} | |
| | Progress | {progress_bar} {state.satisfaction_score:.0%} | |
| |
| --- |
| ๐ก Expand **Orchestration Log** to see real-time analysis progress. |
| |
| *Earth โ Metal โ Water โ Wood โ Fire cycle in progress...* |
| """ |
|
|
|
|
| |
|
|
| |
| class ReportGenerator: |
| """ |
| ๊ธฐ์กด ํธํ์ฑ์ ์ํ ๋ํผ ํด๋์ค |
| ProfessionalReportGenerator๋ก ์์ํฉ๋๋ค. |
| """ |
| |
| _instance = None |
| _professional_generator = None |
| |
| @classmethod |
| def _get_generator(cls): |
| if cls._professional_generator is None: |
| cls._professional_generator = ProfessionalReportGenerator() |
| return cls._professional_generator |
| |
| @staticmethod |
| def generate(state, stats: Dict) -> str: |
| """๊ธฐ์กด ํธ์ถ ๋ฐฉ์ ํธํ""" |
| generator = ReportGenerator._get_generator() |
| return generator.generate(state, stats) |
| |
| @staticmethod |
| def generate_progress(state) -> str: |
| """์งํ ์ค ๋ณด๊ณ ์ (๊ธฐ์กด ํธํ)""" |
| return ProfessionalReportGenerator.generate_progress(state) |
|
|
|
|
| |
|
|
| if __name__ == "__main__": |
| print("=" * 60) |
| print("AETHER Proto-AGI v2.2 - Report Enhancement Module") |
| print("=" * 60) |
| print() |
| print("์ด ๋ชจ๋์ ํด๋์ค๋ค์ core.py์ ํตํฉํ์ธ์:") |
| print() |
| print("1. SourceManager - ์ถ์ฒ/์ธ์ฉ ๊ด๋ฆฌ") |
| print(" - add_web_source(), add_knowledge_source()") |
| print(" - format_references_section()") |
| print() |
| print("2. TypedOutputFormatter - ์ง๋ฌธ ์ ํ๋ณ ํฌ๋งท") |
| print(" - format_prediction(), format_comparison(), etc.") |
| print() |
| print("3. ProfessionalReportGenerator - ์ ๋ฌธ ๋ณด๊ณ ์ ์์ฑ") |
| print(" - Executive Summary") |
| print(" - Quality Dashboard") |
| print(" - Type-specific formatting") |
| print(" - Source references") |
| print() |
| print("=" * 60) |
| |
| |
| print("\n[TypedOutputFormatter ํ
์คํธ]") |
| test_data = { |
| "scenarios": [ |
| {"name": "Scenario A", "probability": "60%", "drivers": "Strong growth", "timeline": "2025 Q2"}, |
| {"name": "Scenario B", "probability": "30%", "drivers": "Moderate", "timeline": "2025 Q4"}, |
| ], |
| "key_variables": [ |
| {"name": "Interest Rate", "description": "Fed policy changes", "impact": "high"}, |
| {"name": "Tech Innovation", "description": "AI disruption", "impact": "medium"}, |
| ] |
| } |
| print(TypedOutputFormatter.format_prediction(test_data, "EN")) |
| |
| print("\n[SourceManager ํ
์คํธ]") |
| sm = SourceManager() |
| sm.add_web_source("Example Article", "https://example.com/article", "This is a test", "ๆฐด") |
| sm.add_web_source("Another Source", "https://test.com/page", "More content here", "้") |
| print(sm.format_references_section("EN")) |
|
|