Spaces:
Sleeping
Sleeping
| import os | |
| import csv | |
| import json | |
| import logging | |
| import asyncio | |
| from pathlib import Path | |
| import inference_logic | |
| # Configure Logging | |
| logger = logging.getLogger(__name__) | |
| # --- Prompts for User Analysis --- | |
| PROMPT_USER_PROFILING = """ | |
| You are an Expert Intelligence Analyst specializing in Information Integrity and Social Influence Operations. | |
| **TASK:** | |
| Analyze the following timeline of social media posts from a single user: "@{username}". | |
| Your goal is to construct a "Credibility & Bias Profile" based on their historical behavior. | |
| **INPUT DATA (Recent Posts):** | |
| {timeline_text} | |
| **ANALYSIS REQUIREMENTS:** | |
| 1. **Thematic Clusters:** What subjects does this user repeatedly post about? (e.g., "Crypto", "US Politics", "Climate Skepticism"). | |
| 2. **Echo Chamber Indicators:** Does the user frequently repost specific domains or engage with specific narratives without adding nuance? | |
| 3. **Emotional Valence:** Analyze the aggregate emotional tone (Alarmist, Neutral, Aggressive, Satirical). | |
| 4. **Bias Detection:** Identify explicit political or ideological biases based on the text. | |
| 5. **Credibility Weighting:** Based on the content, assign a "Historical Credibility Score" (0.0 to 1.0). | |
| * 0.0 = High frequency of inflammatory/unverified claims. | |
| * 1.0 = Consistently neutral or verified sourcing. | |
| **OUTPUT FORMAT (Strict JSON):** | |
| {{ | |
| "username": "@{username}", | |
| "thematic_clusters": ["Topic A", "Topic B"], | |
| "echo_chamber_detected": boolean, | |
| "bias_assessment": "Description of bias...", | |
| "emotional_valence": "Dominant tone...", | |
| "credibility_score": float, | |
| "summary_profile": "A concise paragraph summarizing the user's role in the information ecosystem." | |
| }} | |
| """ | |
| async def load_user_history(username: str, limit: int = 50) -> str: | |
| """ | |
| Reads the user's history.csv and formats it into a text block for the LLM. | |
| """ | |
| csv_path = Path(f"data/profiles/{username}/history.csv") | |
| if not csv_path.exists(): | |
| return "" | |
| timeline_entries =[] | |
| try: | |
| with open(csv_path, 'r', encoding='utf-8', errors='replace') as f: | |
| reader = csv.DictReader(f) | |
| # Read all, sort by date descending if needed, but scraper usually does desc | |
| rows = list(reader) | |
| # Take latest 'limit' posts | |
| recent_rows = rows[-limit:] | |
| for row in recent_rows: | |
| entry = ( | |
| f"[{row['timestamp']}] " | |
| f"{'REPOST' if row.get('is_reply')=='True' else 'POST'}: " | |
| f"\"{row['text']}\" " | |
| f"(Likes: {row['metric_likes']}, Views: {row['metric_views']})" | |
| ) | |
| timeline_entries.append(entry) | |
| except Exception as e: | |
| logger.error(f"Error reading history for {username}: {e}") | |
| return "" | |
| return "\n".join(timeline_entries) | |
| async def generate_user_profile_report(username: str): | |
| """ | |
| Orchestrates the analysis pipeline: | |
| 1. Load History. | |
| 2. Construct Prompt. | |
| 3. Call LLM (using Vertex/Gemini config from environment or default). | |
| 4. Save JSON Report. | |
| """ | |
| logger.info(f"Starting analysis for user: {username}") | |
| timeline_text = await load_user_history(username) | |
| if not timeline_text: | |
| return {"error": "No history found or empty timeline."} | |
| # Format Prompt | |
| prompt = PROMPT_USER_PROFILING.format(username=username, timeline_text=timeline_text) | |
| # Use Vertex AI by default if configured, else try Gemini Legacy | |
| # For now, we reuse the pipeline functions in inference_logic if available, | |
| # or create a direct call here for simplicity. | |
| # We'll assume Vertex is the primary backend for this advanced analysis | |
| # This requires valid credentials in the environment or passed config. | |
| # Fallback to a placeholder if no model is loaded. | |
| report_json = {} | |
| try: | |
| # Attempt to use the existing Vertex Client in inference_logic if initialized | |
| # Otherwise, we instantiate a quick one if env vars exist | |
| project_id = os.getenv("VERTEX_PROJECT_ID") | |
| location = os.getenv("VERTEX_LOCATION", "us-central1") | |
| api_key = os.getenv("VERTEX_API_KEY") | |
| if inference_logic.genai and project_id: | |
| from google.genai import Client | |
| from google.genai.types import GenerateContentConfig | |
| if api_key: | |
| client = Client(vertexai=True, project=project_id, location=location, api_key=api_key) | |
| else: | |
| client = Client(vertexai=True, project=project_id, location=location) | |
| response = client.models.generate_content( | |
| model="gemini-1.5-pro", | |
| contents=prompt, | |
| config=GenerateContentConfig(response_mime_type="application/json") | |
| ) | |
| report_text = response.text | |
| report_json = json.loads(report_text) | |
| else: | |
| # Fallback Mock for Demo/LITE mode | |
| logger.warning("Vertex AI credentials not found. Generating Mock Analysis.") | |
| report_json = { | |
| "username": f"@{username}", | |
| "thematic_clusters":["Simulated Topic 1", "Simulated Topic 2"], | |
| "bias_assessment": "System running in LITE mode. Configure Vertex AI for real analysis.", | |
| "credibility_score": 0.5, | |
| "summary_profile": "Mock profile generated because AI backend is not active." | |
| } | |
| except Exception as e: | |
| logger.error(f"LLM Analysis failed: {e}") | |
| report_json = {"error": str(e)} | |
| # Save Report | |
| output_path = Path(f"data/profiles/{username}/analysis_report.json") | |
| with open(output_path, 'w', encoding='utf-8') as f: | |
| json.dump(report_json, f, indent=2) | |
| return report_json | |