| from typing import Dict, List, Optional |
| from datetime import datetime |
| import asyncio |
| import logging |
| from sqlalchemy.orm import Session |
|
|
| from core.ai_gateway import AIGateway, ModelProvider |
| from database.models import ContentItem, User |
|
|
| logger = logging.getLogger(__name__) |
|
|
| class ContentStudioService: |
| def __init__(self, ai_gateway: AIGateway): |
| self.ai_gateway = ai_gateway |
| |
| async def generate_content( |
| self, |
| topic: str, |
| content_type: str, |
| platform: str, |
| user_id: str, |
| db: Session |
| ) -> ContentItem: |
| """ |
| Generate content based on topic and requirements |
| """ |
| try: |
| |
| prompt = self._create_content_prompt(topic, content_type, platform) |
| |
| |
| generated_content = await self.ai_gateway.generate_text( |
| prompt=prompt, |
| provider=ModelProvider.LOCAL_LLAMA, |
| max_length=1024 |
| ) |
| |
| |
| content_item = ContentItem( |
| owner_id=user_id, |
| title=f"{topic[:50]}...", |
| content_type=content_type, |
| original_prompt=prompt, |
| generated_content=generated_content, |
| platform_specific_variants={}, |
| tags=[] |
| ) |
| |
| |
| platform_variants = await self._generate_platform_variants( |
| generated_content, platform |
| ) |
| content_item.platform_specific_variants = platform_variants |
| |
| |
| tags = self._extract_tags(generated_content) |
| content_item.tags = tags |
| |
| |
| db.add(content_item) |
| db.commit() |
| db.refresh(content_item) |
| |
| logger.info(f"Generated content for user {user_id}: {content_item.id}") |
| return content_item |
| |
| except Exception as e: |
| logger.error(f"Error generating content: {e}") |
| db.rollback() |
| raise |
| |
| def _create_content_prompt(self, topic: str, content_type: str, platform: str) -> str: |
| """ |
| Create appropriate prompt based on content type and platform |
| """ |
| platform_guidelines = { |
| "twitter": "Keep it under 280 characters, use engaging language, include relevant hashtags", |
| "instagram": "Focus on visual appeal, use emojis, include call-to-action", |
| "youtube": "Create compelling hook in first 15 seconds, include timestamps", |
| "blog": "Structure with headings, include SEO keywords, make it scannable" |
| } |
| |
| guidelines = platform_guidelines.get(platform, "Make it engaging and platform-appropriate") |
| |
| prompts = { |
| "social": f"Create engaging social media content about '{topic}'. {guidelines}.", |
| "blog": f"Write a comprehensive blog post about '{topic}'. {guidelines}.", |
| "video_script": f"Generate a video script about '{topic}'. {guidelines}.", |
| "newsletter": f"Draft newsletter content about '{topic}'. {guidelines}." |
| } |
| |
| return prompts.get(content_type, f"Create content about '{topic}' for {platform}. {guidelines}.") |
| |
| async def _generate_platform_variants( |
| self, |
| original_content: str, |
| target_platform: str |
| ) -> Dict[str, str]: |
| """ |
| Generate platform-specific content variants |
| """ |
| variants = {} |
| |
| |
| platforms = ["twitter", "instagram", "linkedin", "facebook", "youtube"] |
| |
| for platform in platforms: |
| if platform != target_platform: |
| prompt = f"Convert this content for {platform}: {original_content}" |
| try: |
| variant = await self.ai_gateway.generate_text( |
| prompt=prompt, |
| max_length=512 |
| ) |
| variants[platform] = variant |
| except Exception as e: |
| logger.warning(f"Could not generate variant for {platform}: {e}") |
| variants[platform] = original_content |
| |
| return variants |
| |
| def _extract_tags(self, content: str) -> List[str]: |
| """ |
| Extract tags from content (simplified implementation) |
| """ |
| |
| words = content.lower().split() |
| |
| word_counts = {} |
| for word in words: |
| clean_word = ''.join(c for c in word if c.isalnum()) |
| if len(clean_word) > 3: |
| word_counts[clean_word] = word_counts.get(clean_word, 0) + 1 |
| |
| |
| sorted_words = sorted(word_counts.items(), key=lambda x: x[1], reverse=True) |
| return [word for word, count in sorted_words[:5]] |
| |
| async def optimize_content( |
| self, |
| content_id: str, |
| optimization_type: str, |
| db: Session |
| ) -> ContentItem: |
| """ |
| Optimize existing content for specific purposes |
| """ |
| content_item = db.query(ContentItem).filter(ContentItem.id == content_id).first() |
| if not content_item: |
| raise ValueError(f"Content item {content_id} not found") |
| |
| optimization_prompts = { |
| "seo": f"Optimize this content for SEO: {content_item.generated_content}", |
| "engagement": f"Rewrite this to increase engagement: {content_item.generated_content}", |
| "readability": f"Improve readability of this content: {content_item.generated_content}", |
| "conversion": f"Optimize this content for conversion: {content_item.generated_content}" |
| } |
| |
| if optimization_type not in optimization_prompts: |
| raise ValueError(f"Unknown optimization type: {optimization_type}") |
| |
| optimized_content = await self.ai_gateway.generate_text( |
| prompt=optimization_prompts[optimization_type], |
| max_length=1024 |
| ) |
| |
| content_item.generated_content = optimized_content |
| content_item.updated_at = datetime.utcnow() |
| |
| db.commit() |
| db.refresh(content_item) |
| |
| return content_item |