Spaces:
Sleeping
Sleeping
| #!/usr/bin/env python3 | |
| import gradio as gr | |
| import os | |
| import logging | |
| import asyncio | |
| from typing import Tuple, Optional | |
| from pathlib import Path | |
| # 设置日志 | |
| logging.basicConfig( | |
| level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s" | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # MCP 初始化状态管理 | |
| class MCPManager: | |
| def __init__(self): | |
| self.ready_event = asyncio.Event() | |
| self.initialization_started = False | |
| self.initialization_complete = False | |
| async def initialize(self): | |
| """异步初始化MCP相关组件""" | |
| if self.initialization_started: | |
| await self.ready_event.wait() | |
| return | |
| self.initialization_started = True | |
| logger.info("🔧 Starting MCP initialization...") | |
| try: | |
| # 模拟初始化过程,确保有足够时间 | |
| await asyncio.sleep(1.0) | |
| # 这里可以添加实际的MCP初始化逻辑 | |
| logger.info("✅ MCP initialization complete") | |
| self.initialization_complete = True | |
| self.ready_event.set() | |
| except Exception as e: | |
| logger.error(f"❌ MCP initialization failed: {e}") | |
| raise | |
| # 全局MCP管理器实例 | |
| mcp_manager = MCPManager() | |
| # 文档提取器 | |
| class SimpleDocumentExtractor: | |
| def __init__(self): | |
| self.initialized = False | |
| def initialize(self): | |
| """同步初始化""" | |
| if not self.initialized: | |
| logger.info("📄 Initializing document extractor...") | |
| self._check_dependencies() | |
| self.initialized = True | |
| logger.info("✅ Document extractor initialized") | |
| def _check_dependencies(self): | |
| """检查必要的依赖项""" | |
| try: | |
| import importlib.util | |
| if importlib.util.find_spec("PyPDF2") is not None: | |
| logger.info("✅ PyPDF2 available for PDF processing") | |
| else: | |
| logger.warning("⚠️ PyPDF2 not available") | |
| if importlib.util.find_spec("docx") is not None: | |
| logger.info("✅ python-docx available for DOCX processing") | |
| else: | |
| logger.warning("⚠️ python-docx not available") | |
| except Exception as e: | |
| logger.warning(f"⚠️ Error checking dependencies: {e}") | |
| def extract(self, file_path: str) -> str: | |
| """提取文档内容""" | |
| file_name = Path(file_path).name | |
| file_ext = Path(file_path).suffix.lower() | |
| try: | |
| if file_ext == ".pdf": | |
| content = self._extract_pdf(file_path) | |
| elif file_ext == ".docx": | |
| content = self._extract_docx(file_path) | |
| elif file_ext == ".txt": | |
| content = self._extract_txt(file_path) | |
| else: | |
| return f"# {file_name}\n\n❌ Unsupported file format: {file_ext}" | |
| if not content.strip(): | |
| return f"# {file_name}\n\n⚠️ No text content found in the document." | |
| return f"# {file_name}\n\n{content}" | |
| except Exception as e: | |
| logger.error(f"Error extracting content from {file_name}: {e}") | |
| return f"# {file_name}\n\n❌ Error extracting content: {str(e)}" | |
| def _extract_pdf(self, file_path: str) -> str: | |
| """从PDF文件提取文本""" | |
| try: | |
| import PyPDF2 | |
| with open(file_path, "rb") as file: | |
| reader = PyPDF2.PdfReader(file) | |
| text_content = [] | |
| for page_num, page in enumerate(reader.pages, 1): | |
| page_text = page.extract_text() | |
| if page_text.strip(): | |
| text_content.append(f"## Page {page_num}\n\n{page_text}") | |
| return "\n\n".join(text_content) | |
| except ImportError: | |
| return "❌ PyPDF2 library not available. Please install it with: pip install PyPDF2" | |
| except Exception as e: | |
| return f"❌ Error reading PDF: {str(e)}" | |
| def _extract_docx(self, file_path: str) -> str: | |
| """从DOCX文件提取文本""" | |
| try: | |
| import docx | |
| doc = docx.Document(file_path) | |
| text_content = [] | |
| for para in doc.paragraphs: | |
| if para.text.strip(): | |
| text_content.append(para.text) | |
| return "\n\n".join(text_content) | |
| except ImportError: | |
| return "❌ python-docx library not available. Please install it with: pip install python-docx" | |
| except Exception as e: | |
| return f"❌ Error reading DOCX: {str(e)}" | |
| def _extract_txt(self, file_path: str) -> str: | |
| """从TXT文件提取文本""" | |
| try: | |
| with open(file_path, "r", encoding="utf-8") as file: | |
| return file.read() | |
| except UnicodeDecodeError: | |
| # 尝试其他编码 | |
| try: | |
| with open(file_path, "r", encoding="latin-1") as file: | |
| return file.read() | |
| except Exception as e: | |
| return f"❌ Error reading text file with encoding: {str(e)}" | |
| except Exception as e: | |
| return f"❌ Error reading text file: {str(e)}" | |
| _extractor = None | |
| def get_extractor() -> SimpleDocumentExtractor: | |
| global _extractor | |
| if _extractor is None: | |
| _extractor = SimpleDocumentExtractor() | |
| _extractor.initialize() | |
| return _extractor | |
| def extract_document(file) -> Tuple[str, str]: | |
| """处理文档提取请求""" | |
| if file is None: | |
| return "", "❌ Please upload a file" | |
| try: | |
| # 添加调试信息 | |
| logger.info(f"Received file object type: {type(file)}") | |
| logger.info(f"File object content: {file}") | |
| file_path = _extract_file_path(file) | |
| if not file_path: | |
| return "", f"❌ Invalid file object: {type(file)}" | |
| logger.info(f"Extracted file path: {file_path}") | |
| if not os.path.exists(file_path): | |
| return "", f"❌ File not found: {file_path}" | |
| content = get_extractor().extract(file_path) | |
| return content, f"✅ Extracted content from {Path(file_path).name}" | |
| except Exception as e: | |
| logger.error(f"Extraction error: {e}") | |
| return "", f"❌ Extraction failed: {str(e)}" | |
| def _extract_file_path(file) -> Optional[str]: | |
| """从file对象中提取文件路径""" | |
| try: | |
| # 处理 Gradio 文件对象 | |
| if file is None: | |
| return None | |
| # 如果是字符串路径,直接返回 | |
| if isinstance(file, str) and file.strip(): | |
| return file.strip() | |
| # 如果有 name 属性(标准文件对象) | |
| if hasattr(file, "name") and file.name: | |
| return str(file.name) | |
| # 如果是字典格式 | |
| if isinstance(file, dict): | |
| # 检查不同的可能键名 | |
| for key in ["name", "path", "filepath", "file_path"]: | |
| if key in file and file[key]: | |
| return str(file[key]) | |
| # 如果支持文件系统路径协议 | |
| if hasattr(file, "__fspath__"): | |
| return str(file) | |
| # 记录未知的文件对象类型以便调试 | |
| logger.debug(f"Unknown file object type: {type(file)}, content: {file}") | |
| return None | |
| except Exception as e: | |
| logger.error(f"Error extracting file path: {e}") | |
| return None | |
| def check_mcp_status() -> str: | |
| """检查MCP状态""" | |
| if mcp_manager.initialization_complete: | |
| return "🟢 Ready" | |
| elif mcp_manager.initialization_started: | |
| return "🟡 Initializing..." | |
| else: | |
| return "🔴 Not Started" | |
| def create_interface(): | |
| """创建Gradio界面""" | |
| with gr.Blocks( | |
| title="Document Extractor with MCP", | |
| theme=gr.themes.Soft(), | |
| css=""" | |
| .status-ready { color: green !important; } | |
| .status-init { color: orange !important; } | |
| .status-error { color: red !important; } | |
| """, | |
| ) as app: | |
| gr.Markdown("# 📄 Document Extraction Tool with MCP Support") | |
| gr.Markdown("Upload PDF or DOCX files to extract content as Markdown.") | |
| with gr.Row(): | |
| with gr.Column(scale=2): | |
| status_output = gr.Textbox( | |
| label="🔧 MCP Server Status", | |
| interactive=False, | |
| value="🟡 Initializing...", | |
| ) | |
| with gr.Column(scale=1): | |
| check_btn = gr.Button("🔄 Refresh Status", variant="secondary") | |
| check_btn.click(fn=check_mcp_status, inputs=[], outputs=status_output) | |
| gr.Markdown("---") | |
| with gr.Row(): | |
| with gr.Column(): | |
| file_input = gr.File( | |
| file_types=[".pdf", ".docx", ".txt"], | |
| label="📁 Upload Document", | |
| type="filepath", | |
| ) | |
| extract_btn = gr.Button("🚀 Extract Content", variant="primary") | |
| with gr.Column(): | |
| status_text = gr.Textbox( | |
| label="📊 Processing Status", | |
| interactive=False, | |
| placeholder="Upload a file and click Extract Content", | |
| ) | |
| content_output = gr.Textbox( | |
| label="📝 Extracted Markdown Content", | |
| lines=20, | |
| interactive=False, | |
| show_copy_button=True, | |
| placeholder="Extracted content will appear here...", | |
| ) | |
| extract_btn.click( | |
| fn=extract_document, | |
| inputs=file_input, | |
| outputs=[content_output, status_text], | |
| ) | |
| # 初始加载时更新状态 | |
| app.load(fn=check_mcp_status, inputs=[], outputs=status_output) | |
| return app | |
| async def async_main(): | |
| """异步主函数""" | |
| logger.info("🚀 Starting document extraction tool with MCP support...") | |
| try: | |
| # 初始化文档提取器 | |
| logger.info("📄 Initializing document extractor...") | |
| get_extractor() | |
| # 异步初始化MCP | |
| await mcp_manager.initialize() | |
| # 创建界面 | |
| app = create_interface() | |
| # 启动应用(适配 Hugging Face Spaces,必须加 share=True) | |
| app.launch(share=True, show_error=True) | |
| except Exception as e: | |
| logger.error(f"❌ Application startup failed: {e}") | |
| raise | |
| def main(): | |
| """主入口函数""" | |
| try: | |
| # 确保事件循环正确设置 | |
| if os.name == "nt": # Windows | |
| asyncio.set_event_loop_policy(asyncio.WindowsProactorEventLoopPolicy()) | |
| # 使用同步方式运行,避免异步问题 | |
| loop = asyncio.new_event_loop() | |
| asyncio.set_event_loop(loop) | |
| try: | |
| loop.run_until_complete(async_main()) | |
| finally: | |
| loop.close() | |
| except KeyboardInterrupt: | |
| logger.info("🛑 Application stopped by user") | |
| except Exception as e: | |
| logger.error(f"❌ Application error: {e}") | |
| raise | |
| if __name__ == "__main__": | |
| main() | |