import gradio as gr import requests import re import json from datetime import datetime import os from dotenv import load_dotenv import markdown from PIL import Image from io import BytesIO import time # ========== 加载环境变量 ========== load_dotenv(dotenv_path='rank_cd.env') # ========== 初始化全局状态 ========== parsed_data = None uploaded_image = None uploaded_image_path = None # 新增:存储上传图片的路径 image_description = "" wp_config = { 'url': os.getenv('WORDPRESS_URL', 'https://cdgarment.com'), 'username': os.getenv('WORDPRESS_USERNAME', ''), 'password': os.getenv('WORDPRESS_APP_PASSWORD', ''), 'status': os.getenv('DEFAULT_STATUS', 'draft') } wp_config_locked = False post_status = "draft" parse_method = "auto" # ========== 增强解析器类 ========== class GeminiContentParser: def __init__(self): self.parsers = [ self.parse_json_format, self.parse_markdown_table_format, self.parse_simple_format ] def parse(self, content: str): """尝试多种解析策略""" for parser in self.parsers: result = parser(content) if result and (result.get('article_content') or result.get('content')): return result return None def parse_json_format(self, content: str): """解析JSON格式""" json_patterns = [ r'```json\s*(.*?)\s*```', r'{\s*"post_id".*?}', r'.*# Machine-Readable Data.*?({.*})', r'.*Machine-Readable Data.*?JSON.*?({.*})', ] for pattern in json_patterns: match = re.search(pattern, content, re.DOTALL | re.IGNORECASE) if match: try: json_str = match.group(1) except IndexError: json_str = match.group(0) try: data = json.loads(json_str) seo_toolkit = data.get('seo_toolkit', {}) article_content = data.get('article_content', '') if not article_content: before_json = content[:match.start()].strip() after_json = content[match.end():].strip() article_content = after_json if len(after_json) > len(before_json) else before_json lines = article_content.split('\n') article_title = lines[0].strip('# ').strip() if lines else "" result = { 'seo_title': seo_toolkit.get('seo_title', article_title), 'primary_keyword': seo_toolkit.get('primary_keyword', ''), 'secondary_keywords': seo_toolkit.get('secondary_keywords', []), 'meta_description': seo_toolkit.get('meta_description', seo_toolkit.get('description', '')), 'tags': seo_toolkit.get('tags', []), 'article_title': article_title, 'content': article_content, 'post_id': data.get('post_id', ''), 'character_count': seo_toolkit.get('character_count', 0), 'parse_method': 'json' } if result['seo_title']: slug = result['seo_title'].lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug) result['url_slug'] = slug[:100] return result except json.JSONDecodeError: continue return None def parse_markdown_table_format(self, content): """解析Markdown表格格式""" data = { 'seo_title': '', 'primary_keyword': '', 'meta_description': '', 'tags': [], 'article_title': '', 'content': '', 'url_slug': '', 'parse_method': 'markdown_table' } try: seo_title_match = re.search(r'SEO Title:\s*(.+?)(?=\n)', content) if seo_title_match: data['seo_title'] = seo_title_match.group(1).strip() keyword_match = re.search(r'Primary Keyword:\s*(.+?)(?=\n)', content) if keyword_match: data['primary_keyword'] = keyword_match.group(1).strip() meta_match = re.search(r'Meta Description:\s*(.+?)(?=\n)', content) if meta_match: data['meta_description'] = meta_match.group(1).strip() tags_match = re.search(r'Tags:\s*(.+?)(?=\n|📝|\$)', content) if tags_match: tags_str = tags_match.group(1).strip() data['tags'] = [tag.strip() for tag in re.split(r'[,;]', tags_str) if tag.strip()] article_match = re.search(r'(?:📝|▶|●|◆).*?(?:Article|Content)[:-]?\s*(.+)', content, re.DOTALL) if article_match: full_content = article_match.group(1).strip() lines = full_content.split('\n') if lines: data['article_title'] = lines[0].strip() data['content'] = '\n'.join(lines[1:]) if data['seo_title']: slug = data['seo_title'].lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug) data['url_slug'] = slug[:100] if not data['seo_title'] and data['article_title']: data['seo_title'] = data['article_title'] return data except Exception: return None def parse_simple_format(self, content): """解析简单冒号分隔格式""" data = { 'seo_title': '', 'primary_keyword': '', 'meta_description': '', 'tags': [], 'article_title': '', 'content': '', 'url_slug': '', 'parse_method': 'simple' } try: seo_title_match = re.search(r'SEO Title[:\s]+(.+)', content, re.IGNORECASE) if not seo_title_match: seo_title_match = re.search(r'Title[:\s]+(.+)', content, re.IGNORECASE) if seo_title_match: data['seo_title'] = seo_title_match.group(1).strip() keyword_match = re.search(r'Primary Keyword[:\s]+(.+)', content, re.IGNORECASE) if not keyword_match: keyword_match = re.search(r'Keyword[:\s]+(.+)', content, re.IGNORECASE) if keyword_match: data['primary_keyword'] = keyword_match.group(1).strip() meta_match = re.search(r'Meta Description[:\s]+(.+)', content, re.IGNORECASE) if not meta_match: meta_match = re.search(r'Description[:\s]+(.+)', content, re.IGNORECASE) if meta_match: data['meta_description'] = meta_match.group(1).strip() tags_match = re.search(r'Tags[:\s]+(.+)', content, re.IGNORECASE) if tags_match: tags_str = tags_match.group(1).strip() data['tags'] = [tag.strip() for tag in re.split(r'[,;]', tags_str) if tag.strip()] article_match = re.search(r'(?:Article|Content)[:\s]+(.+)', content, re.DOTALL | re.IGNORECASE) if not article_match: metadata_end = content.find('\n\n') if metadata_end != -1: article_content = content[metadata_end:].strip() lines = article_content.split('\n') if lines: data['article_title'] = lines[0].strip() data['content'] = '\n'.join(lines[1:]) else: full_content = article_match.group(1).strip() lines = full_content.split('\n') if lines: data['article_title'] = lines[0].strip() data['content'] = '\n'.join(lines[1:]) if data['seo_title']: slug = data['seo_title'].lower() slug = re.sub(r'[^\w\s-]', '', slug) slug = re.sub(r'[-\s]+', '-', slug) data['url_slug'] = slug[:100] if not data['seo_title'] and data['article_title']: data['seo_title'] = data['article_title'] return data except Exception: return None # ========== 功能函数 ========== def parse_gemini_content(content): global parsed_data, parse_method if not content or not content.strip(): return None, "❌ 内容为空" parser = GeminiContentParser() result = parser.parse(content) if result: parse_method = result.get('parse_method', 'unknown') parsed_data = result method_msg = { 'json': '✅ 使用 JSON 格式解析', 'markdown_table': 'ℹ️ 使用 Markdown 表格格式解析', 'simple': 'ℹ️ 使用简单格式解析' }.get(parse_method, f'ℹ️ 使用 {parse_method} 格式解析') return result, method_msg return None, "❌ 无法用任何已知格式解析内容" def get_or_create_tag(tag_name, wp_config): try: auth = (wp_config['username'], wp_config['password']) response = requests.get( f"{wp_config['url']}/wp-json/wp/v2/tags", auth=auth, params={'search': tag_name, 'per_page': 10} ) if response.status_code == 200: tags = response.json() for tag in tags: if tag['name'].lower() == tag_name.lower(): return tag['id'] create_response = requests.post( f"{wp_config['url']}/wp-json/wp/v2/tags", auth=auth, json={'name': tag_name} ) if create_response.status_code == 201: return create_response.json()['id'] return None except Exception as e: return f"❌ 处理标签 '{tag_name}' 时出错: {str(e)}" def upload_image_to_wordpress(image_file, wp_config, filename_slug): """上传图片到WordPress并返回媒体ID和图片URL""" try: # 如果是文件路径,打开文件 if isinstance(image_file, str): with open(image_file, 'rb') as f: image_data = f.read() img = Image.open(image_file) else: image_data = image_file.read() img = Image.open(BytesIO(image_data)) if img.mode == 'RGBA': background = Image.new('RGB', img.size, (255, 255, 255)) if 'A' in img.getbands(): background.paste(img, mask=img.split()[-1]) else: background.paste(img) img = background elif img.mode != 'RGB': img = img.convert('RGB') # 保持原始大小,不进行缩放 # 只检查是否太大需要压缩(超过5MB) if len(image_data) > 5 * 1024 * 1024: # 如果图片太大,适当压缩质量 max_size = 2000 if max(img.size) > max_size: ratio = max_size / max(img.size) new_size = tuple([int(dim * ratio) for dim in img.size]) img = img.resize(new_size, Image.Resampling.LANCZOS) buffer = BytesIO() img.save(buffer, format='JPEG', quality=85, optimize=True) image_data = buffer.getvalue() filename = f"{filename_slug}.jpg" files = { 'file': (filename, image_data, 'image/jpeg') } auth = (wp_config['username'], wp_config['password']) max_retries = 3 retry_delay = 2 for attempt in range(max_retries): try: response = requests.post( f"{wp_config['url']}/wp-json/wp/v2/media", auth=auth, files=files, timeout=60, verify=False ) if response.status_code == 201: media_data = response.json() try: update_response = requests.post( f"{wp_config['url']}/wp-json/wp/v2/media/{media_data['id']}", auth=auth, json={ 'alt_text': image_description, 'caption': filename_slug.replace('-', ' ').title(), 'description': f"Featured image for: {filename_slug.replace('-', ' ').title()}" }, timeout=10 ) except Exception: pass # 获取图片URL media_url = media_data.get('source_url', '') return media_data['id'], media_url, f"✅ 图片上传成功!(ID: {media_data['id']})" elif response.status_code == 413: return None, None, "❌ 图片太大,请压缩后重试" elif response.status_code == 401: return None, None, "❌ 认证失败,请检查 WordPress 凭据" else: if attempt < max_retries - 1: time.sleep(retry_delay * (attempt + 1)) continue else: return None, None, f"❌ 图片上传失败,状态码: {response.status_code}" except requests.exceptions.Timeout: if attempt < max_retries - 1: time.sleep(retry_delay * (attempt + 1)) continue else: return None, None, "❌ 图片上传超时" except requests.exceptions.ConnectionError: if attempt < max_retries - 1: time.sleep(retry_delay * (attempt + 1)) continue else: return None, None, "❌ 连接失败,请检查网络和服务器" except Exception as e: return None, None, f"❌ 上传时出错: {str(e)}" return None, None, "❌ 图片上传失败" except Exception as e: return None, None, f"❌ 处理图片时出错: {str(e)}" def create_wordpress_post(parsed_data, wp_config, media_id=None, media_url=None): """创建WordPress文章,如果上传了图片,将图片以原始大小添加到文章底部""" try: rank_math_meta = { 'rank_math_title': parsed_data.get('seo_title', ''), 'rank_math_description': parsed_data.get('meta_description', ''), 'rank_math_robots': ['index'] if post_status == 'publish' else ['noindex'], 'rank_math_news_sitemap_robots': 'index', 'rank_math_facebook_title': parsed_data.get('seo_title', ''), 'rank_math_facebook_description': parsed_data.get('meta_description', ''), 'rank_math_twitter_title': parsed_data.get('seo_title', ''), 'rank_math_twitter_description': parsed_data.get('meta_description', ''), 'rank_math_canonical_url': '', } if parsed_data.get('primary_keyword'): rank_math_meta['rank_math_focus_keyword'] = parsed_data['primary_keyword'] # 转换Markdown为HTML html_content = markdown.markdown(parsed_data.get('content', '')) # 如果上传了图片且有媒体URL,将图片添加到文章内容底部 if media_url: # 获取原始图片尺寸信息(如果可用) size_info = "" try: if uploaded_image_path: with Image.open(uploaded_image_path) as img: width, height = img.size size_info = f"尺寸: {width}×{height}px" except: pass # 创建图片HTML,使用原始大小(不设置width/height,让WordPress决定) img_html = f'''