import gradio as gr import requests import re import json from datetime import datetime import os from dotenv import load_dotenv import markdown from PIL import Image from io import BytesIO import time # ========== 加载环境变量 ========== load_dotenv(dotenv_path='rank_cd.env') # ========== 初始化全局状态 ========== parsed_data = None uploaded_image = None uploaded_image_path = None # 新增:存储上传图片的路径 image_description = "" wp_config = { 'url': os.getenv('WORDPRESS_URL', 'https://cdgarment.com'), 'username': os.getenv('WORDPRESS_USERNAME', ''), 'password': os.getenv('WORDPRESS_APP_PASSWORD', ''), 'status': os.getenv('DEFAULT_STATUS', 'draft') } wp_config_locked = False post_status = "draft" parse_method = "auto" # ========== 增强解析器类 ========== class GeminiContentParser: def __init__(self): self.parsers = [ self.parse_json_format, self.parse_markdown_table_format, self.parse_simple_format ] def parse(self, content: str): """尝试多种解析策略""" for parser in self.parsers: result = parser(content) if result and (result.get('article_content') or result.get('content')): return result return None def parse_json_format(self, content: str): """解析JSON格式""" json_patterns = [ r'```json\s*(.*?)\s*```', r'{\s*"post_id".*?}', r'.*# Machine-Readable Data.*?({.*})', r'.*Machine-Readable Data.*?JSON.*?({.*})', ] for pattern in json_patterns: match = re.search(pattern, content, re.DOTALL | re.IGNORECASE) if match: try: json_str = match.group(1) except IndexError: json_str = match.group(0) try: data = json.loads(json_str) seo_toolkit = data.get('seo_toolkit', {}) article_content = data.get('article_content', '') if not article_content: before_json = content[:match.start()].strip() after_json = content[match.end():].strip() article_content = after_json if len(after_json) > len(before_json) else before_json lines = article_content.split('\n') article_title = lines[0].strip('# ').strip() if lines else "" result = { 'seo_title': seo_toolkit.get('seo_title', article_title), 'primary_keyword': seo_toolkit.get('primary_keyword', ''), 'secondary_keywords': seo_toolkit.get('secondary_keywords', []), 'meta_description': seo_toolkit.get('meta_description', seo_toolkit.get('description', '')), 'tags': seo_toolkit.get('tags', []), 'article_title': article_title, 'content': article_content, 'post_id': data.get('post_id', ''), 'character_count': seo_toolkit.get('character_count', 0), 'parse_method': 'json' } if result['seo_title']: slug = result['seo_title'].lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug) result['url_slug'] = slug[:100] return result except json.JSONDecodeError: continue return None def parse_markdown_table_format(self, content): """解析Markdown表格格式""" data = { 'seo_title': '', 'primary_keyword': '', 'meta_description': '', 'tags': [], 'article_title': '', 'content': '', 'url_slug': '', 'parse_method': 'markdown_table' } try: seo_title_match = re.search(r'SEO Title:\s*(.+?)(?=\n)', content) if seo_title_match: data['seo_title'] = seo_title_match.group(1).strip() keyword_match = re.search(r'Primary Keyword:\s*(.+?)(?=\n)', content) if keyword_match: data['primary_keyword'] = keyword_match.group(1).strip() meta_match = re.search(r'Meta Description:\s*(.+?)(?=\n)', content) if meta_match: data['meta_description'] = meta_match.group(1).strip() tags_match = re.search(r'Tags:\s*(.+?)(?=\n|📝|\$)', content) if tags_match: tags_str = tags_match.group(1).strip() data['tags'] = [tag.strip() for tag in re.split(r'[,;]', tags_str) if tag.strip()] article_match = re.search(r'(?:📝|▶|●|◆).*?(?:Article|Content)[:-]?\s*(.+)', content, re.DOTALL) if article_match: full_content = article_match.group(1).strip() lines = full_content.split('\n') if lines: data['article_title'] = lines[0].strip() data['content'] = '\n'.join(lines[1:]) if data['seo_title']: slug = data['seo_title'].lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug) data['url_slug'] = slug[:100] if not data['seo_title'] and data['article_title']: data['seo_title'] = data['article_title'] return data except Exception: return None def parse_simple_format(self, content): """解析简单冒号分隔格式""" data = { 'seo_title': '', 'primary_keyword': '', 'meta_description': '', 'tags': [], 'article_title': '', 'content': '', 'url_slug': '', 'parse_method': 'simple' } try: seo_title_match = re.search(r'SEO Title[:\s]+(.+)', content, re.IGNORECASE) if not seo_title_match: seo_title_match = re.search(r'Title[:\s]+(.+)', content, re.IGNORECASE) if seo_title_match: data['seo_title'] = seo_title_match.group(1).strip() keyword_match = re.search(r'Primary Keyword[:\s]+(.+)', content, re.IGNORECASE) if not keyword_match: keyword_match = re.search(r'Keyword[:\s]+(.+)', content, re.IGNORECASE) if keyword_match: data['primary_keyword'] = keyword_match.group(1).strip() meta_match = re.search(r'Meta Description[:\s]+(.+)', content, re.IGNORECASE) if not meta_match: meta_match = re.search(r'Description[:\s]+(.+)', content, re.IGNORECASE) if meta_match: data['meta_description'] = meta_match.group(1).strip() tags_match = re.search(r'Tags[:\s]+(.+)', content, re.IGNORECASE) if tags_match: tags_str = tags_match.group(1).strip() data['tags'] = [tag.strip() for tag in re.split(r'[,;]', tags_str) if tag.strip()] article_match = re.search(r'(?:Article|Content)[:\s]+(.+)', content, re.DOTALL | re.IGNORECASE) if not article_match: metadata_end = content.find('\n\n') if metadata_end != -1: article_content = content[metadata_end:].strip() lines = article_content.split('\n') if lines: data['article_title'] = lines[0].strip() data['content'] = '\n'.join(lines[1:]) else: full_content = article_match.group(1).strip() lines = full_content.split('\n') if lines: data['article_title'] = lines[0].strip() data['content'] = '\n'.join(lines[1:]) if data['seo_title']: slug = data['seo_title'].lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug) data['url_slug'] = slug[:100] if not data['seo_title'] and data['article_title']: data['seo_title'] = data['article_title'] return data except Exception: return None # ========== 功能函数 ========== def parse_gemini_content(content): global parsed_data, parse_method if not content or not content.strip(): return None, "❌ 内容为空" parser = GeminiContentParser() result = parser.parse(content) if result: parse_method = result.get('parse_method', 'unknown') parsed_data = result method_msg = { 'json': '✅ 使用 JSON 格式解析', 'markdown_table': 'ℹ️ 使用 Markdown 表格格式解析', 'simple': 'ℹ️ 使用简单格式解析' }.get(parse_method, f'ℹ️ 使用 {parse_method} 格式解析') return result, method_msg return None, "❌ 无法用任何已知格式解析内容" def get_or_create_tag(tag_name, wp_config): try: auth = (wp_config['username'], wp_config['password']) response = requests.get( f"{wp_config['url']}/wp-json/wp/v2/tags", auth=auth, params={'search': tag_name, 'per_page': 10} ) if response.status_code == 200: tags = response.json() for tag in tags: if tag['name'].lower() == tag_name.lower(): return tag['id'] create_response = requests.post( f"{wp_config['url']}/wp-json/wp/v2/tags", auth=auth, json={'name': tag_name} ) if create_response.status_code == 201: return create_response.json()['id'] return None except Exception as e: return f"❌ 处理标签 '{tag_name}' 时出错: {str(e)}" def upload_image_to_wordpress(image_file, wp_config, filename_slug): """上传图片到WordPress并返回媒体ID和图片URL""" try: # 如果是文件路径,打开文件 if isinstance(image_file, str): with open(image_file, 'rb') as f: image_data = f.read() img = Image.open(image_file) else: image_data = image_file.read() img = Image.open(BytesIO(image_data)) if img.mode == 'RGBA': background = Image.new('RGB', img.size, (255, 255, 255)) if 'A' in img.getbands(): background.paste(img, mask=img.split()[-1]) else: background.paste(img) img = background elif img.mode != 'RGB': img = img.convert('RGB') # 保持原始大小,不进行缩放 # 只检查是否太大需要压缩(超过5MB) if len(image_data) > 5 * 1024 * 1024: # 如果图片太大,适当压缩质量 max_size = 2000 if max(img.size) > max_size: ratio = max_size / max(img.size) new_size = tuple([int(dim * ratio) for dim in img.size]) img = img.resize(new_size, Image.Resampling.LANCZOS) buffer = BytesIO() img.save(buffer, format='JPEG', quality=85, optimize=True) image_data = buffer.getvalue() filename = f"{filename_slug}.jpg" files = { 'file': (filename, image_data, 'image/jpeg') } auth = (wp_config['username'], wp_config['password']) max_retries = 3 retry_delay = 2 for attempt in range(max_retries): try: response = requests.post( f"{wp_config['url']}/wp-json/wp/v2/media", auth=auth, files=files, timeout=60, verify=False ) if response.status_code == 201: media_data = response.json() try: update_response = requests.post( f"{wp_config['url']}/wp-json/wp/v2/media/{media_data['id']}", auth=auth, json={ 'alt_text': image_description, 'caption': filename_slug.replace('-', ' ').title(), 'description': f"Featured image for: {filename_slug.replace('-', ' ').title()}" }, timeout=10 ) except Exception: pass # 获取图片URL media_url = media_data.get('source_url', '') return media_data['id'], media_url, f"✅ 图片上传成功!(ID: {media_data['id']})" elif response.status_code == 413: return None, None, "❌ 图片太大,请压缩后重试" elif response.status_code == 401: return None, None, "❌ 认证失败,请检查 WordPress 凭据" else: if attempt < max_retries - 1: time.sleep(retry_delay * (attempt + 1)) continue else: return None, None, f"❌ 图片上传失败,状态码: {response.status_code}" except requests.exceptions.Timeout: if attempt < max_retries - 1: time.sleep(retry_delay * (attempt + 1)) continue else: return None, None, "❌ 图片上传超时" except requests.exceptions.ConnectionError: if attempt < max_retries - 1: time.sleep(retry_delay * (attempt + 1)) continue else: return None, None, "❌ 连接失败,请检查网络和服务器" except Exception as e: return None, None, f"❌ 上传时出错: {str(e)}" return None, None, "❌ 图片上传失败" except Exception as e: return None, None, f"❌ 处理图片时出错: {str(e)}" def create_wordpress_post(parsed_data, wp_config, media_id=None, media_url=None): """创建WordPress文章,如果上传了图片,将图片以原始大小添加到文章底部""" try: rank_math_meta = { 'rank_math_title': parsed_data.get('seo_title', ''), 'rank_math_description': parsed_data.get('meta_description', ''), 'rank_math_robots': ['index'] if post_status == 'publish' else ['noindex'], 'rank_math_news_sitemap_robots': 'index', 'rank_math_facebook_title': parsed_data.get('seo_title', ''), 'rank_math_facebook_description': parsed_data.get('meta_description', ''), 'rank_math_twitter_title': parsed_data.get('seo_title', ''), 'rank_math_twitter_description': parsed_data.get('meta_description', ''), 'rank_math_canonical_url': '', } if parsed_data.get('primary_keyword'): rank_math_meta['rank_math_focus_keyword'] = parsed_data['primary_keyword'] # 转换Markdown为HTML html_content = markdown.markdown(parsed_data.get('content', '')) # 如果上传了图片且有媒体URL,将图片添加到文章内容底部 if media_url: # 获取原始图片尺寸信息(如果可用) size_info = "" try: if uploaded_image_path: with Image.open(uploaded_image_path) as img: width, height = img.size size_info = f"尺寸: {width}×{height}px" except: pass # 创建图片HTML,使用原始大小(不设置width/height,让WordPress决定) img_html = f'''

图片附件

{image_description}
{image_description if image_description else "文章配图"} {size_info}
''' # 将图片HTML添加到内容底部 html_content += img_html # 基础文章数据 post_data = { 'title': parsed_data.get('seo_title', 'Untitled'), 'content': html_content, 'slug': parsed_data.get('url_slug', ''), 'status': post_status, 'meta': rank_math_meta } # 添加标签 if parsed_data.get('tags'): tag_ids = [] for tag_name in parsed_data['tags']: tag_id = get_or_create_tag(tag_name, wp_config) if tag_id and isinstance(tag_id, int): tag_ids.append(tag_id) if tag_ids: post_data['tags'] = tag_ids # 默认分类 post_data['categories'] = [1] # 设置特色图片 if media_id: post_data['featured_media'] = media_id # 设置社交图片 try: media_response = requests.get( f"{wp_config['url']}/wp-json/wp/v2/media/{media_id}", auth=(wp_config['username'], wp_config['password']) ) if media_response.status_code == 200: media_url_from_api = media_response.json().get('source_url', media_url) rank_math_meta['rank_math_facebook_image'] = media_url_from_api rank_math_meta['rank_math_twitter_image'] = media_url_from_api except: if media_url: rank_math_meta['rank_math_facebook_image'] = media_url rank_math_meta['rank_math_twitter_image'] = media_url # 发送到WordPress response = requests.post( f"{wp_config['url']}/wp-json/wp/v2/posts", auth=(wp_config['username'], wp_config['password']), json=post_data, headers={'Content-Type': 'application/json'} ) if response.status_code == 201: post_result = response.json() # 更新规范URL update_data = { 'meta': { 'rank_math_canonical_url': post_result['link'] } } update_response = requests.post( f"{wp_config['url']}/wp-json/wp/v2/posts/{post_result['id']}", auth=(wp_config['username'], wp_config['password']), json=update_data ) return post_result, None else: return None, f"WordPress API 错误: {response.text}" except Exception as e: return None, f"创建文章时出错: {str(e)}" # ========== Gradio UI 回调函数 ========== def parse_content_callback(content): global parsed_data result, msg = parse_gemini_content(content) if result: preview = f""" ### 📊 解析结果 **标题:** {result['seo_title']} **主关键词:** {result.get('primary_keyword', '无')} **标签:** {', '.join(result.get('tags', []))} **URL Slug:** {result.get('url_slug', '自动生成')} **内容长度:** {len(result.get('content', ''))} 字符 """ return msg, preview return msg, "" def update_image_description_callback(image): global uploaded_image, uploaded_image_path, image_description if image: uploaded_image_path = image # 存储文件路径 with open(image, 'rb') as f: uploaded_image = f.read() filename = os.path.basename(image) name_without_ext = '.'.join(filename.split('.')[:-1]) readable_name = name_without_ext.replace('_', ' ').replace('-', ' ').title() image_description = f"Featured image: {readable_name}" # 获取图片尺寸信息 try: with Image.open(image) as img: width, height = img.size size_info = f" ({width}×{height}px)" image_description += size_info except: pass return image_description, image_description, gr.update(value=image, label=f"📷 {filename}") return "", "", gr.update() def save_config_callback(url, username, password, status): global wp_config, wp_config_locked, post_status if not wp_config_locked: wp_config = { 'url': url.rstrip('/'), 'username': username, 'password': password, 'status': status } post_status = status return "✅ 配置已保存!" return "🔒 配置已锁定,无法保存" def toggle_lock_callback(locked): global wp_config_locked wp_config_locked = locked return gr.update(interactive=not locked), gr.update(interactive=not locked), gr.update(interactive=not locked) def publish_callback(): global parsed_data, uploaded_image_path, wp_config, post_status, image_description if not parsed_data: return "❌ 请先解析内容", "", "" if not all([wp_config.get('url'), wp_config.get('username'), wp_config.get('password')]): return "❌ WordPress 配置不完整", "", "" yield "🖼️ 正在上传图片到 WordPress...", "", "" media_id = None media_url = None if uploaded_image_path: media_id, media_url, img_msg = upload_image_to_wordpress( uploaded_image_path, wp_config, parsed_data.get('url_slug', 'post') ) if not media_id: return f"❌ {img_msg}", "", "" yield "📤 正在创建文章并设置 Rank Math 元数据...", "", "" result, error = create_wordpress_post(parsed_data, wp_config, media_id, media_url) if result: yield f"✅ 文章{'已发布' if post_status == 'publish' else '已保存为草稿'}!(ID: {result['id']})", f""" ### 📝 发布成功 **文章ID:** {result['id']} **状态:** {result['status']} **日期:** {result['date'][:10]} **链接:** {result['link']} **图片位置:** 已添加到文章内容底部(原始大小) """, json.dumps({ 'post_id': result['id'], 'title': result['title']['rendered'], 'link': result['link'], 'status': result['status'], 'slug': result['slug'], 'published_at': datetime.now().isoformat(), 'parse_method': parse_method, 'image_added_to_content': bool(media_url), 'image_url': media_url }, indent=2) else: yield f"❌ 发布失败: {error}", "", "" # ========== Gradio UI 布局 ========== with gr.Blocks(title="CdGarment WordPress Publisher", theme=gr.themes.Soft()) as demo: gr.Markdown("# 🏭 CdGarment WordPress Publisher") gr.Markdown("### ✓ 专为 Rank Math SEO 优化 • 图片将添加到文章底部(原始大小)") with gr.Row(): with gr.Column(scale=2): gr.Markdown("### 📋 粘贴 Gemini 内容") gr.Markdown("支持 JSON 和传统格式") example_content = """# Article #25: Smart Manufacturing: AI & Automation in our Humen Factory The apparel industry is undergoing a digital revolution... --- # Machine-Readable Data (For your Script) ```json { "post_id": "25", "seo_toolkit": { "primary_keyword": "AI in Garment Manufacturing", "secondary_keywords": [ "Smart apparel factory Humen", "Automated fabric cutting", "Digital clothing production 2026" ], "seo_title": "Smart Manufacturing: AI & Automation in our Humen Factory | CdGarment", "description": "Explore the future of fashion...", "character_count": 159, "tags": ["AI garment manufacturing", "smart factory", "automated apparel production", "digital fashion", "Humen factory"] } } ```""" content_input = gr.Textbox( label="粘贴你的 Gemini 输出", value=example_content, lines=15, placeholder="在此粘贴完整的 Gemini 输出(JSON 或传统格式)" ) parse_btn = gr.Button("🔍 解析内容", variant="primary") parse_msg = gr.Markdown() preview_box = gr.Markdown() with gr.Column(scale=1): gr.Markdown("### 🖼️ 图片上传") gr.Markdown("上传的图片将:1) 设置为特色图片 2) 以原始大小添加到文章内容底部") image_upload = gr.File( label="上传特色图片", file_types=["image"], type="filepath" ) img_desc_input = gr.Textbox( label="图片描述(用于alt文本)", placeholder="输入图片描述,将显示在图片下方", value="" ) image_preview = gr.Image(label="图片预览", height=200) gr.Markdown("---") gr.Markdown("### ⚙️ WordPress 设置") lock_toggle = gr.Checkbox(label="🔒 锁定配置", value=False) wp_url = gr.Textbox( label="WordPress URL", value=wp_config.get('url', 'https://cdgarment.com') ) wp_username = gr.Textbox( label="用户名", value=wp_config.get('username', ''), placeholder="admin" ) wp_password = gr.Textbox( label="应用密码", value=wp_config.get('password', ''), type="password", placeholder="••••••••" ) status_select = gr.Radio( choices=["draft", "publish"], value=post_status, label="文章状态" ) save_btn = gr.Button("💾 保存配置") save_msg = gr.Markdown() gr.Markdown("---") gr.Markdown("### 🚀 准备发布") with gr.Row(): content_status = gr.Markdown("❌ 内容") image_status = gr.Markdown("⚠️ 图片可选") wp_status = gr.Markdown("❓ WordPress") status_display = gr.Markdown(f"📊 状态: {post_status.upper()}") publish_btn = gr.Button("🚀 推送到 WordPress", variant="primary", size="lg") publish_progress = gr.Markdown() publish_result = gr.Markdown() json_output = gr.JSON(label="文章数据", visible=False) download_btn = gr.DownloadButton( "📥 下载文章数据", value="", visible=False ) # ========== 事件绑定 ========== parse_btn.click( parse_content_callback, inputs=[content_input], outputs=[parse_msg, preview_box] ) image_upload.change( update_image_description_callback, inputs=[image_upload], outputs=[img_desc_input, img_desc_input, image_preview] ) lock_toggle.change( toggle_lock_callback, inputs=[lock_toggle], outputs=[wp_url, wp_username, wp_password] ) save_btn.click( save_config_callback, inputs=[wp_url, wp_username, wp_password, status_select], outputs=[save_msg] ) publish_btn.click( publish_callback, inputs=[], outputs=[publish_progress, publish_result, json_output] ).then( lambda: (gr.update(visible=True), gr.update(visible=True)), outputs=[json_output, download_btn] ) gr.Markdown(f"---\nCdGarment WordPress Publisher • {datetime.now().year} • 格式: {parse_method.upper()} • 图片将添加到文章底部") if __name__ == "__main__": demo.launch(server_name="0.0.0.0", server_port=7860, share=True)