madriClaro / CODE_WALKTHROUGH.md
Ruben
Add comprehensive documentation and manual fetch script
4892cef

A newer version of the Gradio SDK is available: 6.13.0

Upgrade

Madrid Content Analyzer - Complete Code Walkthrough

This document explains how the entire application works, step by step.


Table of Contents

  1. Application Overview
  2. Application Startup
  3. Database Layer
  4. Data Repository
  5. Content Fetching
  6. Clarity Analysis (Aclarador)
  7. Background Scheduler
  8. Gradio UI
  9. Complete Data Flow
  10. Error Handling

Application Overview

Purpose: Automatically fetch content from Madrid City Council sources, analyze language clarity using AI, and display results in an interactive dashboard.

Tech Stack:

  • Gradio: Web UI framework
  • DuckDB: Analytics database
  • APScheduler: Background task scheduler
  • Groq API: AI-powered text analysis
  • Feedparser: RSS feed parsing

Hosting: Hugging Face Spaces (free tier)


Application Startup

File: app.py (lines 1-43)

Step 1: Import Dependencies

import gradio as gr
import pandas as pd
import plotly.express as px
from apscheduler.schedulers.background import BackgroundScheduler

Step 2: Setup Logging

from utils.logger import setup_logging
setup_logging()
logger = logging.getLogger(__name__)

Creates a logger for tracking events and errors.

Step 3: Initialize Database

from config.database import init_database
init_database()

What happens:

  1. Creates database connection (DuckDB)
  2. Creates 5 tables if they don't exist:
    • content_sources - RSS feeds/API sources
    • content_items - Fetched content
    • clarity_analyses - Analysis results
    • analysis_history - Time-series data
    • fetch_logs - Operation logs
  3. Inserts default data sources
  4. Creates indexes for faster queries

Step 4: Start Background Scheduler

scheduler = BackgroundScheduler()
scheduler.add_job(
    fetch_and_analyze_content,  # Function to run
    'interval',                  # Run periodically
    hours=6,                     # Every 6 hours
    id='content_fetch',
    replace_existing=True
)
scheduler.start()

What this does:

  • Creates a background thread
  • Every 6 hours, runs fetch_and_analyze_content()
  • Keeps running while the app is alive
  • Independent of user interactions

Step 5: Initialize Repository

repo = ContentRepository()

Creates the data access layer for database operations.


Database Layer

File: config/database.py

Database Path Selection (lines 10-23)

try:
    DATA_DIR = Path('/data')           # Try HF Spaces persistent storage
    DATA_DIR.mkdir(exist_ok=True)
    DB_PATH = DATA_DIR / 'madrid.duckdb'
    print("βœ… Using persistent storage")
except (PermissionError, OSError):
    # Fallback to local directory
    DATA_DIR = Path(__file__).parent.parent / 'data'
    DATA_DIR.mkdir(exist_ok=True)
    DB_PATH = DATA_DIR / 'madrid.duckdb'
    print("βœ… Using local storage")

Logic:

  1. Try /data directory (HF Spaces with persistent storage enabled)
  2. If fails (permission denied), use local ./data/ directory
  3. Important: Local storage is NOT persistent on HF Spaces - data lost on restart!

Database Connection (lines 29-38)

_conn = None  # Global connection singleton

def get_connection():
    global _conn
    if _conn is None:
        _conn = duckdb.connect(str(DB_PATH))
        _conn.execute("SET TimeZone='UTC'")
    return _conn

Pattern: Singleton pattern

  • One connection shared across entire app
  • Created on first use (lazy initialization)
  • Reused for all subsequent queries

Schema Creation (lines 41-153)

Content Sources Table

CREATE TABLE IF NOT EXISTS content_sources (
    id INTEGER PRIMARY KEY,
    source_type VARCHAR(50),           -- 'RSS' or 'API'
    source_name VARCHAR(200),          -- Human-readable name
    source_url VARCHAR(500),           -- URL to fetch from
    last_fetched TIMESTAMP,            -- When last fetched
    is_active BOOLEAN DEFAULT TRUE,    -- Can disable sources
    fetch_frequency_hours INTEGER DEFAULT 6
)

Purpose: Define where to get content from.

Default data:

  1. Madrid City Council RSS Feed (https://diario.madrid.es/feed)
  2. Madrid Open Data Portal

Content Items Table

CREATE TABLE IF NOT EXISTS content_items (
    id INTEGER PRIMARY KEY,
    source_id INTEGER,                 -- Which source it came from
    title VARCHAR(500),
    content_text TEXT,                 -- HTML converted to plain text
    content_html TEXT,                 -- Original HTML
    url VARCHAR(1000) UNIQUE,          -- Prevents duplicates by URL
    content_hash VARCHAR(64) UNIQUE,   -- SHA256 - prevents duplicate content
    published_at TIMESTAMP,            -- When Madrid published it
    fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    category VARCHAR(100),             -- e.g., "Noticias", "Documentos"
    tags JSON,
    is_processed BOOLEAN DEFAULT FALSE,  -- Has it been analyzed?
    processing_error TEXT
)

Deduplication: Uses both url and content_hash to prevent storing the same content twice.

Clarity Analyses Table

CREATE TABLE IF NOT EXISTS clarity_analyses (
    id INTEGER PRIMARY KEY,
    content_id INTEGER,                -- Links to content_items
    overall_score FLOAT,               -- 0-100 clarity score
    readability_score FLOAT,
    complexity_score FLOAT,
    sentence_stats JSON,               -- {count, avg_length, etc.}
    vocabulary_stats JSON,
    readability_metrics JSON,
    grammar_stats JSON,
    jargon_count INTEGER,
    jargon_words JSON,                 -- Array of jargon terms found
    long_sentences_count INTEGER,
    suggestions JSON,                  -- Array of improvement suggestions
    analyzer_version VARCHAR(50),
    analysis_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    processing_time_ms INTEGER
)

Purpose: Stores results from Groq/Aclarador analysis.

Indexes (lines 134-138)

CREATE INDEX idx_content_published ON content_items(published_at);
CREATE INDEX idx_content_category ON content_items(category);
CREATE INDEX idx_analysis_score ON clarity_analyses(overall_score);

Why: Makes queries faster (e.g., "find content from last 7 days", "find low clarity items").


Data Repository

File: storage/repository.py

Pattern: Repository pattern - all database access goes through this layer.

Get Statistics (lines 20-78)

def get_statistics(self) -> Dict[str, Any]:
    # Count total items
    total_result = self.conn.execute("SELECT COUNT(*) FROM content_items").fetchone()
    total = total_result[0] if total_result else 0

    # Count analyzed items
    analyzed_result = self.conn.execute(
        "SELECT COUNT(*) FROM content_items WHERE is_processed = TRUE"
    ).fetchone()
    analyzed = analyzed_result[0] if analyzed_result else 0

    # Average clarity score
    avg_result = self.conn.execute(
        "SELECT AVG(overall_score) FROM clarity_analyses"
    ).fetchone()
    avg_clarity = avg_result[0] if (avg_result and avg_result[0] is not None) else 0

    # Last successful fetch
    fetch_result = self.conn.execute(
        "SELECT MAX(fetch_end) FROM fetch_logs WHERE status = 'success'"
    ).fetchone()
    last_fetch = fetch_result[0] if (fetch_result and fetch_result[0]) else None

    return {
        'total_items': total,
        'analyzed_items': analyzed,
        'avg_clarity': avg_clarity,
        'last_fetch': last_fetch.strftime('%Y-%m-%d %H:%M') if last_fetch else 'Never',
        'date_range': date_range_str
    }

Null Safety:

  • Checks if result exists before accessing [0]
  • Returns 0 or default values for empty database
  • Prevents crashes when database has no data

Search Content (lines 73-131)

def search_content(self, days=7, category=None, min_clarity=0, max_clarity=100,
                   search_text=None, limit=100):
    query = """
        SELECT c.id, c.title, c.published_at, c.category, c.url,
               a.overall_score as clarity_score
        FROM content_items c
        LEFT JOIN clarity_analyses a ON c.id = a.content_id
        WHERE c.published_at >= ?
    """
    params = [datetime.utcnow() - timedelta(days=days)]

    # Add filters dynamically
    if category and category != "All":
        query += " AND c.category = ?"
        params.append(category)

    if min_clarity > 0 or max_clarity < 100:
        query += " AND a.overall_score BETWEEN ? AND ?"
        params.append(min_clarity)
        params.append(max_clarity)

    if search_text:
        query += " AND (c.title LIKE ? OR c.content_text LIKE ?)"
        search_param = f"%{search_text}%"
        params.extend([search_param, search_param])

    query += " ORDER BY c.published_at DESC LIMIT ?"
    params.append(limit)

    result = self.conn.execute(query, params).fetchall()

Dynamic Query Building:

  • Starts with base query
  • Adds WHERE clauses based on user filters
  • Uses parameterized queries (prevents SQL injection)

Insert Content (lines 235-262)

def insert_content(self, content_data: Dict[str, Any]) -> Optional[int]:
    result = self.conn.execute("""
        INSERT INTO content_items (
            source_id, title, content_text, content_html,
            url, content_hash, published_at, category, tags
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        RETURNING id
    """, [
        content_data['source_id'],
        content_data['title'],
        content_data['content_text'],
        content_data.get('content_html'),
        content_data['url'],
        content_data['content_hash'],
        content_data['published_at'],
        content_data.get('category'),
        content_data.get('tags')
    ]).fetchone()

    self.conn.commit()
    return result[0] if result else None

RETURNING id: Gets the auto-generated ID of inserted row.

Insert Analysis (lines 264-301)

def insert_analysis(self, content_id: int, analysis_data: Dict[str, Any]) -> bool:
    self.conn.execute("""
        INSERT INTO clarity_analyses (
            content_id, overall_score, readability_score, complexity_score,
            sentence_stats, vocabulary_stats, readability_metrics, grammar_stats,
            jargon_count, jargon_words, long_sentences_count, suggestions
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
    """, [
        content_id,
        analysis_data['overall_score'],
        analysis_data['readability_score'],
        analysis_data['complexity_score'],
        analysis_data.get('sentence_stats'),
        analysis_data.get('vocabulary_stats'),
        analysis_data.get('readability_metrics'),
        analysis_data.get('grammar_stats'),
        analysis_data['jargon_count'],
        analysis_data['jargon_words'],
        analysis_data['long_sentences_count'],
        analysis_data['suggestions']
    ])

    # Mark content as processed
    self.conn.execute(
        "UPDATE content_items SET is_processed = TRUE WHERE id = ?",
        [content_id]
    )

    self.conn.commit()

Transaction: Both INSERT and UPDATE happen together (atomic operation).


Content Fetching

File: fetchers/rss_fetcher.py

Fetch Process (lines 22-49)

def fetch(self, max_items: int = 100) -> List[Dict[str, Any]]:
    # Parse RSS feed
    feed = feedparser.parse(self.source_url)

    if not feed.entries:
        logger.warning("No entries found")
        return []

    items = []
    for entry in feed.entries[:max_items]:
        try:
            item = self._parse_entry(entry)
            if item:
                items.append(item)
        except Exception as e:
            logger.error(f"Error parsing entry: {e}")
            continue  # Skip this entry, continue with others

    return items

Error Resilience: One bad entry doesn't stop the whole fetch.

Parse Entry (lines 51-91)

def _parse_entry(self, entry) -> Optional[Dict[str, Any]]:
    # Extract title
    title = entry.get('title', 'Sin tΓ­tulo')

    # Extract content (try multiple fields)
    content_html = entry.get('content', [{}])[0].get('value', '') or entry.get('summary', '')

    # Convert HTML to plain text
    content_text = self._html_to_text(content_html)

    # Skip if too short
    if not content_text or len(content_text) < 50:
        return None

    # Extract URL
    url = entry.get('link', '')

    # Extract published date
    published_at = self._parse_date(entry.get('published', ''))

    # Extract category
    category = self._extract_category(entry)

    # Generate hash for deduplication
    content_hash = hashlib.sha256(content_text.encode()).hexdigest()

    return {
        'source_id': self.source_id,
        'title': title,
        'content_text': content_text,
        'content_html': content_html,
        'url': url,
        'content_hash': content_hash,
        'published_at': published_at,
        'category': category,
        'tags': entry.get('tags', [])
    }

Content Hash: SHA256 of text content - even if URL changes, duplicate content is detected.

HTML to Text (lines 93-117)

def _html_to_text(self, html: str) -> str:
    soup = BeautifulSoup(html, 'html.parser')

    # Remove script and style elements
    for script in soup(["script", "style"]):
        script.decompose()

    # Get text
    text = soup.get_text()

    # Clean up whitespace
    lines = (line.strip() for line in text.splitlines())
    chunks = (phrase.strip() for line in lines for phrase in line.split("  "))
    text = ' '.join(chunk for chunk in chunks if chunk)

    return text

Why:

  • Removes HTML tags
  • Removes scripts/styles
  • Collapses multiple spaces
  • Gives clean text for analysis

Clarity Analysis (Aclarador)

File: analyzers/analyzer_wrapper.py

Initialization (lines 28-41)

def __init__(self):
    self.system_prompt = self._load_system_prompt()
    self.groq_client = None

    if GROQ_AVAILABLE:
        api_key = os.getenv('GROQ_API_KEY')
        if api_key:
            self.groq_client = Groq(api_key=api_key)
            logger.info("βœ… Aclarador initialized with Groq API")
        else:
            logger.warning("⚠️ GROQ_API_KEY not found - using fallback")

Two Modes:

  1. Groq API Mode: If GROQ_API_KEY is set
  2. Fallback Mode: Simple heuristics if no API key

Load System Prompt (lines 43-61)

def _load_system_prompt(self) -> str:
    prompt_path = Path(__file__).parent / 'aclarador' / 'system_prompt.md'
    with open(prompt_path, 'r', encoding='utf-8') as f:
        content = f.read()

    # Extract content between ``` markers
    if '```' in content:
        parts = content.split('```')
        if len(parts) >= 3:
            return parts[1].strip()

    return content

System Prompt: Contains Spanish clarity guidelines (sentence length, active voice, clear vocabulary, etc.)

Analyze Text with Groq (lines 71-104)

def analyze(self, text: str, title: str = None) -> Dict[str, Any]:
    if not self.groq_client:
        return self._fallback_analysis(text)

    # Call Groq API
    response = self.groq_client.chat.completions.create(
        model="llama-3.3-70b-versatile",
        messages=[
            {"role": "system", "content": self.system_prompt},
            {"role": "user", "content": text}
        ],
        temperature=0.3,     # Low = more consistent
        max_tokens=2000      # Max response length
    )

    # Extract response text
    analysis_text = response.choices[0].message.content

    # Parse response and calculate scores
    return self._parse_groq_response(analysis_text, text)

Temperature 0.3: Low temperature = more deterministic, consistent analysis.

Parse Groq Response (lines 106-158)

def _parse_groq_response(self, analysis_text: str, original_text: str):
    # Extract sections
    sections = self._extract_sections(analysis_text)

    # Groq returns:
    # ### TEXTO CORREGIDO
    # [improved text]
    #
    # ### EXPLICACIΓ“N DE MEJORAS
    # - Issue 1
    # - Issue 2
    #
    # ### PRINCIPIOS APLICADOS
    # - Principle 1

    # Detect issues from explanation
    issues = self._extract_issues_from_explanation(sections.get('explicacion', ''))

    # Calculate scores
    readability_score = self._calculate_readability_from_analysis(original_text, issues)
    complexity_score = self._calculate_complexity_from_analysis(issues)
    overall_score = (readability_score * 0.5 + complexity_score * 0.5)

    # Extract suggestions
    suggestions = self._extract_suggestions(sections.get('explicacion', ''))

    # Detect jargon
    jargon_words = self._detect_jargon(words)

    return {
        'overall_score': overall_score,
        'readability_score': readability_score,
        'complexity_score': complexity_score,
        'sentence_stats': {...},
        'vocabulary_stats': {...},
        'jargon_count': len(jargon_words),
        'jargon_words': jargon_words,
        'suggestions': suggestions
    }

Issue Detection (lines 182-200):

def _extract_issues_from_explanation(self, explanation: str):
    issues = []
    explanation_lower = explanation.lower()

    if 'oraciΓ³n' in explanation_lower and 'larga' in explanation_lower:
        issues.append('long_sentences')

    if 'vocabulario' in explanation_lower or 'tecnicismo' in explanation_lower:
        issues.append('complex_vocabulary')

    if 'voz pasiva' in explanation_lower:
        issues.append('passive_voice')

    if 'redundancia' in explanation_lower:
        issues.append('redundancy')

    return issues

Scoring Logic (lines 224-247):

def _calculate_readability_from_analysis(self, text: str, issues: List[str]):
    sentences = [s.strip() for s in text.split('.') if s.strip()]
    avg_length = sum(len(s.split()) for s in sentences) / len(sentences)

    # Optimal sentence length: 20 words
    score = 100 - abs(avg_length - 20) * 2

    # Penalize for issues
    score -= len(issues) * 8

    return max(0, min(100, score))

def _calculate_complexity_from_analysis(self, issues: List[str]):
    score = 100.0
    score -= len(issues) * 12  # Each issue reduces score by 12 points
    return max(0, min(100, score))

Fallback Analysis (lines 305-342)

def _fallback_analysis(self, text: str):
    """Simple heuristic-based analysis when Groq unavailable"""
    sentences = [s.strip() for s in text.split('.') if s.strip()]
    words = text.split()

    avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences)

    # Score: closer to 20 words = better
    readability_score = max(0, 100 - abs(avg_sentence_length - 20) * 2)

    long_sentences = [s for s in sentences if len(s.split()) > 30]
    complexity_score = max(0, 100 - len(long_sentences) * 10)

    overall_score = (readability_score + complexity_score) / 2

    return {
        'overall_score': overall_score,
        'readability_score': readability_score,
        'complexity_score': complexity_score,
        'suggestions': ['Groq API no disponible - usando anΓ‘lisis simple']
    }

Background Scheduler

File: schedulers/background_tasks.py

Main Pipeline (lines 15-179)

def fetch_and_analyze_content():
    conn = get_connection()
    repo = ContentRepository()
    analyzer = AclaradorAnalyzer()

    # Get active sources
    sources = conn.execute("""
        SELECT id, source_type, source_name, source_url
        FROM content_sources
        WHERE is_active = TRUE
    """).fetchall()

    total_fetched = 0
    total_new = 0
    total_analyzed = 0

    # Process each source
    for source in sources:
        source_id, source_type, source_name, source_url = source

        fetch_start = datetime.utcnow()

        try:
            # 1. FETCH
            if source_type == 'RSS':
                fetcher = MadridRSSFetcher(source_id, source_url)
                items = fetcher.fetch(max_items=100)

            total_fetched += len(items)

            new_count = 0
            # 2. STORE & ANALYZE
            for item in items:
                # Check if exists
                existing = conn.execute(
                    "SELECT id FROM content_items WHERE url = ? OR content_hash = ?",
                    [item['url'], item['content_hash']]
                ).fetchone()

                if existing:
                    continue  # Skip duplicates

                # Insert new item
                content_id = repo.insert_content(item)

                if content_id:
                    new_count += 1
                    total_new += 1

                    # 3. ANALYZE
                    analysis_result = analyzer.analyze(
                        item['content_text'],
                        item['title']
                    )

                    # 4. STORE ANALYSIS
                    if repo.insert_analysis(content_id, analysis_result):
                        total_analyzed += 1

            # 5. LOG SUCCESS
            fetch_end = datetime.utcnow()
            conn.execute("""
                INSERT INTO fetch_logs (
                    source_id, fetch_start, fetch_end,
                    items_fetched, items_new, status
                ) VALUES (?, ?, ?, ?, ?, ?)
            """, [source_id, fetch_start, fetch_end, len(items), new_count, 'success'])
            conn.commit()

        except Exception as e:
            # 6. LOG FAILURE
            logger.error(f"Error: {e}")
            conn.execute("""
                INSERT INTO fetch_logs (
                    source_id, fetch_start, fetch_end,
                    items_fetched, items_new, status, error_message
                ) VALUES (?, ?, ?, ?, ?, ?, ?)
            """, [source_id, fetch_start, datetime.utcnow(), 0, 0, 'failed', str(e)])
            conn.commit()

Pipeline Steps:

  1. Fetch content from sources
  2. Check for duplicates (URL or content hash)
  3. Insert new content
  4. Analyze with Groq/Aclarador
  5. Store analysis results
  6. Log operation (success or failure)

Error Handling: If one source fails, others continue.


Gradio UI

File: app.py (lines 425-596)

Dashboard Tab (lines 452-489)

with gr.Tab("πŸ“Š Dashboard"):
    gr.Markdown("### Overview Statistics")

    stats_display = gr.JSON(label="Statistics")
    refresh_stats_btn = gr.Button("πŸ”„ Refresh Statistics", variant="primary")
    refresh_stats_btn.click(get_dashboard_stats, outputs=stats_display)

    # Charts
    with gr.Row():
        distribution_chart = gr.Plot(label="Clarity Score Distribution")
        category_chart = gr.Plot(label="Scores by Category")

    timeline_chart = gr.Plot(label="Content Timeline")

    refresh_charts_btn = gr.Button("πŸ”„ Refresh Charts")

    def refresh_all_charts():
        return (
            get_clarity_distribution(),
            get_category_scores(),
            get_content_timeline()
        )

    refresh_charts_btn.click(
        refresh_all_charts,
        outputs=[distribution_chart, category_chart, timeline_chart]
    )

Pattern:

  • Define UI components
  • Define functions
  • Connect with .click()

Browse Content Tab (lines 494-529)

with gr.Tab("πŸ“ Browse Content"):
    # Filters
    with gr.Row():
        days_slider = gr.Slider(1, 90, value=7, step=1, label="Last N Days")
        category_dropdown = gr.Dropdown(
            ["All", "Noticias", "Documentos", "Anuncios"],
            value="All",
            label="Category"
        )

    with gr.Row():
        min_clarity = gr.Slider(0, 100, value=0, label="Min Clarity Score")
        max_clarity = gr.Slider(0, 100, value=100, label="Max Clarity Score")

    search_box = gr.Textbox(label="Search Text")
    search_btn = gr.Button("πŸ” Search")

    results_table = gr.Dataframe(label="Search Results")

    # Connect button to function
    search_btn.click(
        search_content,
        inputs=[days_slider, category_dropdown, min_clarity, max_clarity, search_box],
        outputs=results_table
    )

Dynamic Filtering: All filter values passed to search_content() function.

Settings Tab (lines 563-587)

with gr.Tab("βš™οΈ Settings"):
    gr.Markdown("### Manual Operations")

    fetch_btn = gr.Button("πŸ”„ Trigger Manual Fetch")
    fetch_status = gr.Textbox(label="Fetch Status")

    fetch_btn.click(trigger_manual_fetch, outputs=fetch_status)

Manual Fetch: User can trigger fetch immediately (doesn't wait 6 hours).

Chart Functions

Clarity Distribution (lines 70-109):

def get_clarity_distribution():
    query = """
        SELECT
            CASE
                WHEN overall_score < 30 THEN '0-29 (Poor)'
                WHEN overall_score < 50 THEN '30-49 (Fair)'
                WHEN overall_score < 70 THEN '50-69 (Good)'
                WHEN overall_score < 90 THEN '70-89 (Very Good)'
                ELSE '90-100 (Excellent)'
            END as score_range,
            COUNT(*) as count
        FROM clarity_analyses
        GROUP BY score_range
    """

    df = conn.execute(query).df()

    fig = px.bar(
        df,
        x='score_range',
        y='count',
        title='Clarity Score Distribution',
        color='count',
        color_continuous_scale='RdYlGn'
    )

    return fig

SQL CASE: Buckets scores into ranges.

Plotly: Creates interactive bar chart with color gradient.


Complete Data Flow

Automatic Flow (Every 6 Hours)

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 1. APScheduler triggers fetch_and_analyze_content()        β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 2. Get active sources from database                         β”‚
β”‚    - Madrid RSS Feed                                        β”‚
β”‚    - Open Data Portal                                       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 3. For each source:                                         β”‚
β”‚    a. MadridRSSFetcher.fetch()                             β”‚
β”‚       - Parse RSS feed                                      β”‚
β”‚       - Extract entries                                     β”‚
β”‚       - Convert HTML to text                                β”‚
β”‚       - Generate content hash                               β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 4. For each item:                                           β”‚
β”‚    a. Check if exists (by URL or hash)                     β”‚
β”‚    b. If duplicate β†’ Skip                                   β”‚
β”‚    c. If new β†’ Insert into content_items                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 5. For each new item:                                       β”‚
β”‚    a. AclaradorAnalyzer.analyze()                          β”‚
β”‚       - If GROQ_API_KEY set:                               β”‚
β”‚         * Load system prompt                                β”‚
β”‚         * Call Groq API                                     β”‚
β”‚         * Parse response                                    β”‚
β”‚       - Else:                                               β”‚
β”‚         * Use fallback heuristics                          β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 6. Insert analysis results:                                 β”‚
β”‚    a. Insert into clarity_analyses                         β”‚
β”‚    b. Update content_items.is_processed = TRUE             β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 7. Log operation:                                           β”‚
β”‚    - Insert into fetch_logs (success/failure)              β”‚
β”‚    - Update content_sources.last_fetched                   β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ 8. Data available in UI:                                    β”‚
β”‚    - Dashboard shows updated statistics                     β”‚
β”‚    - Browse shows new content                               β”‚
β”‚    - Analytics shows trends                                 β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

User Interaction Flow

User clicks "πŸ”„ Refresh Statistics"
        ↓
Gradio calls get_dashboard_stats()
        ↓
ContentRepository.get_statistics()
        ↓
Execute SQL queries:
  - COUNT content items
  - COUNT analyzed items
  - AVG clarity score
  - MAX last fetch time
        ↓
Return dict with stats
        ↓
Gradio displays in JSON component

Search Flow

User sets filters:
  - Last 7 days
  - Category: "Noticias"
  - Min clarity: 50
  - Search text: "normativa"
        ↓
User clicks "πŸ” Search"
        ↓
Gradio calls search_content(7, "Noticias", 50, 100, "normativa")
        ↓
ContentRepository.search_content()
        ↓
Build dynamic SQL query:
  SELECT ...
  WHERE published_at >= (NOW - 7 days)
    AND category = 'Noticias'
    AND overall_score >= 50
    AND (title LIKE '%normativa%' OR content_text LIKE '%normativa%')
  ORDER BY published_at DESC
  LIMIT 100
        ↓
Convert to DataFrame
        ↓
Gradio displays in table

Error Handling

Database Errors

Problem: Database empty on first run

# ❌ Old code - crashes
total = self.conn.execute("SELECT COUNT(*) FROM content_items").fetchone()[0]

# βœ… New code - safe
total_result = self.conn.execute("SELECT COUNT(*) FROM content_items").fetchone()
total = total_result[0] if total_result else 0

Fetch Errors

Problem: One bad RSS entry crashes entire fetch

# βœ… Error handling
for entry in feed.entries:
    try:
        item = self._parse_entry(entry)
        if item:
            items.append(item)
    except Exception as e:
        logger.error(f"Error parsing entry: {e}")
        continue  # Skip this entry, continue with others

Analysis Errors

Problem: Groq API fails

def analyze(self, text: str):
    try:
        # Try Groq API
        response = self.groq_client.chat.completions.create(...)
        return self._parse_groq_response(response)
    except Exception as e:
        logger.error(f"Groq API error: {e}")
        # Fall back to heuristics
        return self._fallback_analysis(text)

Permission Errors

Problem: /data directory not accessible

try:
    DATA_DIR = Path('/data')
    DATA_DIR.mkdir(exist_ok=True)
except (PermissionError, OSError):
    # Fall back to local directory
    DATA_DIR = Path(__file__).parent.parent / 'data'
    DATA_DIR.mkdir(exist_ok=True)

Key Design Patterns

1. Singleton Pattern

Where: Database connection Why: One connection shared across app

2. Repository Pattern

Where: ContentRepository Why: Separates data access from business logic

3. Fallback Pattern

Where: Aclarador analyzer, database path Why: Graceful degradation when resources unavailable

4. Error Resilience

Where: Fetch pipeline, analysis Why: One failure doesn't stop entire process

5. Null Safety

Where: All database queries Why: Empty database doesn't crash app


Performance Optimizations

1. Database Indexes

CREATE INDEX idx_content_published ON content_items(published_at);

Impact: Fast date-range queries

2. Connection Pooling

_conn = None  # Reuse same connection

Impact: No overhead creating connections

3. Limit Results

LIMIT 100

Impact: Don't load entire database into memory

4. Deduplication at Insert

SELECT id FROM content_items WHERE url = ? OR content_hash = ?

Impact: No duplicate processing

5. Background Processing

scheduler = BackgroundScheduler()

Impact: Fetch doesn't block UI


Configuration Points

1. Fetch Frequency

File: app.py:38

hours=6  # Change to 1, 3, 12, 24

2. Groq API Key

Environment Variable: GROQ_API_KEY

3. Database Path

File: config/database.py:13-23

DATA_DIR = Path('/data')  # Or local path

4. Max Items Per Fetch

File: schedulers/background_tasks.py:57

items = fetcher.fetch(max_items=100)  # Increase/decrease

5. Default Data Sources

File: config/database.py:144-149

INSERT INTO content_sources VALUES
    (1, 'RSS', 'Madrid...', 'https://...', 6),
    (2, 'API', 'Open Data', 'https://...', 24)

Summary

The app is a closed loop:

  1. Scheduler triggers fetch every 6 hours
  2. Fetcher gets content from Madrid RSS
  3. Repository stores in DuckDB (deduplicates)
  4. Analyzer analyzes with Groq AI
  5. Repository stores analysis results
  6. UI displays everything to user

Key Features:

  • βœ… Fully automated
  • βœ… Error resilient
  • βœ… Deduplicates content
  • βœ… AI-powered analysis
  • βœ… Interactive dashboard
  • βœ… Free hosting on HF Spaces

All configuration: Environment variables and simple code changes - no complex config files!