Spaces:
Sleeping
A newer version of the Gradio SDK is available: 6.13.0
Madrid Content Analyzer - Complete Code Walkthrough
This document explains how the entire application works, step by step.
Table of Contents
- Application Overview
- Application Startup
- Database Layer
- Data Repository
- Content Fetching
- Clarity Analysis (Aclarador)
- Background Scheduler
- Gradio UI
- Complete Data Flow
- Error Handling
Application Overview
Purpose: Automatically fetch content from Madrid City Council sources, analyze language clarity using AI, and display results in an interactive dashboard.
Tech Stack:
- Gradio: Web UI framework
- DuckDB: Analytics database
- APScheduler: Background task scheduler
- Groq API: AI-powered text analysis
- Feedparser: RSS feed parsing
Hosting: Hugging Face Spaces (free tier)
Application Startup
File: app.py (lines 1-43)
Step 1: Import Dependencies
import gradio as gr
import pandas as pd
import plotly.express as px
from apscheduler.schedulers.background import BackgroundScheduler
Step 2: Setup Logging
from utils.logger import setup_logging
setup_logging()
logger = logging.getLogger(__name__)
Creates a logger for tracking events and errors.
Step 3: Initialize Database
from config.database import init_database
init_database()
What happens:
- Creates database connection (DuckDB)
- Creates 5 tables if they don't exist:
content_sources- RSS feeds/API sourcescontent_items- Fetched contentclarity_analyses- Analysis resultsanalysis_history- Time-series datafetch_logs- Operation logs
- Inserts default data sources
- Creates indexes for faster queries
Step 4: Start Background Scheduler
scheduler = BackgroundScheduler()
scheduler.add_job(
fetch_and_analyze_content, # Function to run
'interval', # Run periodically
hours=6, # Every 6 hours
id='content_fetch',
replace_existing=True
)
scheduler.start()
What this does:
- Creates a background thread
- Every 6 hours, runs
fetch_and_analyze_content() - Keeps running while the app is alive
- Independent of user interactions
Step 5: Initialize Repository
repo = ContentRepository()
Creates the data access layer for database operations.
Database Layer
File: config/database.py
Database Path Selection (lines 10-23)
try:
DATA_DIR = Path('/data') # Try HF Spaces persistent storage
DATA_DIR.mkdir(exist_ok=True)
DB_PATH = DATA_DIR / 'madrid.duckdb'
print("β
Using persistent storage")
except (PermissionError, OSError):
# Fallback to local directory
DATA_DIR = Path(__file__).parent.parent / 'data'
DATA_DIR.mkdir(exist_ok=True)
DB_PATH = DATA_DIR / 'madrid.duckdb'
print("β
Using local storage")
Logic:
- Try
/datadirectory (HF Spaces with persistent storage enabled) - If fails (permission denied), use local
./data/directory - Important: Local storage is NOT persistent on HF Spaces - data lost on restart!
Database Connection (lines 29-38)
_conn = None # Global connection singleton
def get_connection():
global _conn
if _conn is None:
_conn = duckdb.connect(str(DB_PATH))
_conn.execute("SET TimeZone='UTC'")
return _conn
Pattern: Singleton pattern
- One connection shared across entire app
- Created on first use (lazy initialization)
- Reused for all subsequent queries
Schema Creation (lines 41-153)
Content Sources Table
CREATE TABLE IF NOT EXISTS content_sources (
id INTEGER PRIMARY KEY,
source_type VARCHAR(50), -- 'RSS' or 'API'
source_name VARCHAR(200), -- Human-readable name
source_url VARCHAR(500), -- URL to fetch from
last_fetched TIMESTAMP, -- When last fetched
is_active BOOLEAN DEFAULT TRUE, -- Can disable sources
fetch_frequency_hours INTEGER DEFAULT 6
)
Purpose: Define where to get content from.
Default data:
- Madrid City Council RSS Feed (
https://diario.madrid.es/feed) - Madrid Open Data Portal
Content Items Table
CREATE TABLE IF NOT EXISTS content_items (
id INTEGER PRIMARY KEY,
source_id INTEGER, -- Which source it came from
title VARCHAR(500),
content_text TEXT, -- HTML converted to plain text
content_html TEXT, -- Original HTML
url VARCHAR(1000) UNIQUE, -- Prevents duplicates by URL
content_hash VARCHAR(64) UNIQUE, -- SHA256 - prevents duplicate content
published_at TIMESTAMP, -- When Madrid published it
fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
category VARCHAR(100), -- e.g., "Noticias", "Documentos"
tags JSON,
is_processed BOOLEAN DEFAULT FALSE, -- Has it been analyzed?
processing_error TEXT
)
Deduplication: Uses both url and content_hash to prevent storing the same content twice.
Clarity Analyses Table
CREATE TABLE IF NOT EXISTS clarity_analyses (
id INTEGER PRIMARY KEY,
content_id INTEGER, -- Links to content_items
overall_score FLOAT, -- 0-100 clarity score
readability_score FLOAT,
complexity_score FLOAT,
sentence_stats JSON, -- {count, avg_length, etc.}
vocabulary_stats JSON,
readability_metrics JSON,
grammar_stats JSON,
jargon_count INTEGER,
jargon_words JSON, -- Array of jargon terms found
long_sentences_count INTEGER,
suggestions JSON, -- Array of improvement suggestions
analyzer_version VARCHAR(50),
analysis_date TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
processing_time_ms INTEGER
)
Purpose: Stores results from Groq/Aclarador analysis.
Indexes (lines 134-138)
CREATE INDEX idx_content_published ON content_items(published_at);
CREATE INDEX idx_content_category ON content_items(category);
CREATE INDEX idx_analysis_score ON clarity_analyses(overall_score);
Why: Makes queries faster (e.g., "find content from last 7 days", "find low clarity items").
Data Repository
File: storage/repository.py
Pattern: Repository pattern - all database access goes through this layer.
Get Statistics (lines 20-78)
def get_statistics(self) -> Dict[str, Any]:
# Count total items
total_result = self.conn.execute("SELECT COUNT(*) FROM content_items").fetchone()
total = total_result[0] if total_result else 0
# Count analyzed items
analyzed_result = self.conn.execute(
"SELECT COUNT(*) FROM content_items WHERE is_processed = TRUE"
).fetchone()
analyzed = analyzed_result[0] if analyzed_result else 0
# Average clarity score
avg_result = self.conn.execute(
"SELECT AVG(overall_score) FROM clarity_analyses"
).fetchone()
avg_clarity = avg_result[0] if (avg_result and avg_result[0] is not None) else 0
# Last successful fetch
fetch_result = self.conn.execute(
"SELECT MAX(fetch_end) FROM fetch_logs WHERE status = 'success'"
).fetchone()
last_fetch = fetch_result[0] if (fetch_result and fetch_result[0]) else None
return {
'total_items': total,
'analyzed_items': analyzed,
'avg_clarity': avg_clarity,
'last_fetch': last_fetch.strftime('%Y-%m-%d %H:%M') if last_fetch else 'Never',
'date_range': date_range_str
}
Null Safety:
- Checks if result exists before accessing
[0] - Returns 0 or default values for empty database
- Prevents crashes when database has no data
Search Content (lines 73-131)
def search_content(self, days=7, category=None, min_clarity=0, max_clarity=100,
search_text=None, limit=100):
query = """
SELECT c.id, c.title, c.published_at, c.category, c.url,
a.overall_score as clarity_score
FROM content_items c
LEFT JOIN clarity_analyses a ON c.id = a.content_id
WHERE c.published_at >= ?
"""
params = [datetime.utcnow() - timedelta(days=days)]
# Add filters dynamically
if category and category != "All":
query += " AND c.category = ?"
params.append(category)
if min_clarity > 0 or max_clarity < 100:
query += " AND a.overall_score BETWEEN ? AND ?"
params.append(min_clarity)
params.append(max_clarity)
if search_text:
query += " AND (c.title LIKE ? OR c.content_text LIKE ?)"
search_param = f"%{search_text}%"
params.extend([search_param, search_param])
query += " ORDER BY c.published_at DESC LIMIT ?"
params.append(limit)
result = self.conn.execute(query, params).fetchall()
Dynamic Query Building:
- Starts with base query
- Adds WHERE clauses based on user filters
- Uses parameterized queries (prevents SQL injection)
Insert Content (lines 235-262)
def insert_content(self, content_data: Dict[str, Any]) -> Optional[int]:
result = self.conn.execute("""
INSERT INTO content_items (
source_id, title, content_text, content_html,
url, content_hash, published_at, category, tags
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
RETURNING id
""", [
content_data['source_id'],
content_data['title'],
content_data['content_text'],
content_data.get('content_html'),
content_data['url'],
content_data['content_hash'],
content_data['published_at'],
content_data.get('category'),
content_data.get('tags')
]).fetchone()
self.conn.commit()
return result[0] if result else None
RETURNING id: Gets the auto-generated ID of inserted row.
Insert Analysis (lines 264-301)
def insert_analysis(self, content_id: int, analysis_data: Dict[str, Any]) -> bool:
self.conn.execute("""
INSERT INTO clarity_analyses (
content_id, overall_score, readability_score, complexity_score,
sentence_stats, vocabulary_stats, readability_metrics, grammar_stats,
jargon_count, jargon_words, long_sentences_count, suggestions
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", [
content_id,
analysis_data['overall_score'],
analysis_data['readability_score'],
analysis_data['complexity_score'],
analysis_data.get('sentence_stats'),
analysis_data.get('vocabulary_stats'),
analysis_data.get('readability_metrics'),
analysis_data.get('grammar_stats'),
analysis_data['jargon_count'],
analysis_data['jargon_words'],
analysis_data['long_sentences_count'],
analysis_data['suggestions']
])
# Mark content as processed
self.conn.execute(
"UPDATE content_items SET is_processed = TRUE WHERE id = ?",
[content_id]
)
self.conn.commit()
Transaction: Both INSERT and UPDATE happen together (atomic operation).
Content Fetching
File: fetchers/rss_fetcher.py
Fetch Process (lines 22-49)
def fetch(self, max_items: int = 100) -> List[Dict[str, Any]]:
# Parse RSS feed
feed = feedparser.parse(self.source_url)
if not feed.entries:
logger.warning("No entries found")
return []
items = []
for entry in feed.entries[:max_items]:
try:
item = self._parse_entry(entry)
if item:
items.append(item)
except Exception as e:
logger.error(f"Error parsing entry: {e}")
continue # Skip this entry, continue with others
return items
Error Resilience: One bad entry doesn't stop the whole fetch.
Parse Entry (lines 51-91)
def _parse_entry(self, entry) -> Optional[Dict[str, Any]]:
# Extract title
title = entry.get('title', 'Sin tΓtulo')
# Extract content (try multiple fields)
content_html = entry.get('content', [{}])[0].get('value', '') or entry.get('summary', '')
# Convert HTML to plain text
content_text = self._html_to_text(content_html)
# Skip if too short
if not content_text or len(content_text) < 50:
return None
# Extract URL
url = entry.get('link', '')
# Extract published date
published_at = self._parse_date(entry.get('published', ''))
# Extract category
category = self._extract_category(entry)
# Generate hash for deduplication
content_hash = hashlib.sha256(content_text.encode()).hexdigest()
return {
'source_id': self.source_id,
'title': title,
'content_text': content_text,
'content_html': content_html,
'url': url,
'content_hash': content_hash,
'published_at': published_at,
'category': category,
'tags': entry.get('tags', [])
}
Content Hash: SHA256 of text content - even if URL changes, duplicate content is detected.
HTML to Text (lines 93-117)
def _html_to_text(self, html: str) -> str:
soup = BeautifulSoup(html, 'html.parser')
# Remove script and style elements
for script in soup(["script", "style"]):
script.decompose()
# Get text
text = soup.get_text()
# Clean up whitespace
lines = (line.strip() for line in text.splitlines())
chunks = (phrase.strip() for line in lines for phrase in line.split(" "))
text = ' '.join(chunk for chunk in chunks if chunk)
return text
Why:
- Removes HTML tags
- Removes scripts/styles
- Collapses multiple spaces
- Gives clean text for analysis
Clarity Analysis (Aclarador)
File: analyzers/analyzer_wrapper.py
Initialization (lines 28-41)
def __init__(self):
self.system_prompt = self._load_system_prompt()
self.groq_client = None
if GROQ_AVAILABLE:
api_key = os.getenv('GROQ_API_KEY')
if api_key:
self.groq_client = Groq(api_key=api_key)
logger.info("β
Aclarador initialized with Groq API")
else:
logger.warning("β οΈ GROQ_API_KEY not found - using fallback")
Two Modes:
- Groq API Mode: If
GROQ_API_KEYis set - Fallback Mode: Simple heuristics if no API key
Load System Prompt (lines 43-61)
def _load_system_prompt(self) -> str:
prompt_path = Path(__file__).parent / 'aclarador' / 'system_prompt.md'
with open(prompt_path, 'r', encoding='utf-8') as f:
content = f.read()
# Extract content between ``` markers
if '```' in content:
parts = content.split('```')
if len(parts) >= 3:
return parts[1].strip()
return content
System Prompt: Contains Spanish clarity guidelines (sentence length, active voice, clear vocabulary, etc.)
Analyze Text with Groq (lines 71-104)
def analyze(self, text: str, title: str = None) -> Dict[str, Any]:
if not self.groq_client:
return self._fallback_analysis(text)
# Call Groq API
response = self.groq_client.chat.completions.create(
model="llama-3.3-70b-versatile",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": text}
],
temperature=0.3, # Low = more consistent
max_tokens=2000 # Max response length
)
# Extract response text
analysis_text = response.choices[0].message.content
# Parse response and calculate scores
return self._parse_groq_response(analysis_text, text)
Temperature 0.3: Low temperature = more deterministic, consistent analysis.
Parse Groq Response (lines 106-158)
def _parse_groq_response(self, analysis_text: str, original_text: str):
# Extract sections
sections = self._extract_sections(analysis_text)
# Groq returns:
# ### TEXTO CORREGIDO
# [improved text]
#
# ### EXPLICACIΓN DE MEJORAS
# - Issue 1
# - Issue 2
#
# ### PRINCIPIOS APLICADOS
# - Principle 1
# Detect issues from explanation
issues = self._extract_issues_from_explanation(sections.get('explicacion', ''))
# Calculate scores
readability_score = self._calculate_readability_from_analysis(original_text, issues)
complexity_score = self._calculate_complexity_from_analysis(issues)
overall_score = (readability_score * 0.5 + complexity_score * 0.5)
# Extract suggestions
suggestions = self._extract_suggestions(sections.get('explicacion', ''))
# Detect jargon
jargon_words = self._detect_jargon(words)
return {
'overall_score': overall_score,
'readability_score': readability_score,
'complexity_score': complexity_score,
'sentence_stats': {...},
'vocabulary_stats': {...},
'jargon_count': len(jargon_words),
'jargon_words': jargon_words,
'suggestions': suggestions
}
Issue Detection (lines 182-200):
def _extract_issues_from_explanation(self, explanation: str):
issues = []
explanation_lower = explanation.lower()
if 'oraciΓ³n' in explanation_lower and 'larga' in explanation_lower:
issues.append('long_sentences')
if 'vocabulario' in explanation_lower or 'tecnicismo' in explanation_lower:
issues.append('complex_vocabulary')
if 'voz pasiva' in explanation_lower:
issues.append('passive_voice')
if 'redundancia' in explanation_lower:
issues.append('redundancy')
return issues
Scoring Logic (lines 224-247):
def _calculate_readability_from_analysis(self, text: str, issues: List[str]):
sentences = [s.strip() for s in text.split('.') if s.strip()]
avg_length = sum(len(s.split()) for s in sentences) / len(sentences)
# Optimal sentence length: 20 words
score = 100 - abs(avg_length - 20) * 2
# Penalize for issues
score -= len(issues) * 8
return max(0, min(100, score))
def _calculate_complexity_from_analysis(self, issues: List[str]):
score = 100.0
score -= len(issues) * 12 # Each issue reduces score by 12 points
return max(0, min(100, score))
Fallback Analysis (lines 305-342)
def _fallback_analysis(self, text: str):
"""Simple heuristic-based analysis when Groq unavailable"""
sentences = [s.strip() for s in text.split('.') if s.strip()]
words = text.split()
avg_sentence_length = sum(len(s.split()) for s in sentences) / len(sentences)
# Score: closer to 20 words = better
readability_score = max(0, 100 - abs(avg_sentence_length - 20) * 2)
long_sentences = [s for s in sentences if len(s.split()) > 30]
complexity_score = max(0, 100 - len(long_sentences) * 10)
overall_score = (readability_score + complexity_score) / 2
return {
'overall_score': overall_score,
'readability_score': readability_score,
'complexity_score': complexity_score,
'suggestions': ['Groq API no disponible - usando anΓ‘lisis simple']
}
Background Scheduler
File: schedulers/background_tasks.py
Main Pipeline (lines 15-179)
def fetch_and_analyze_content():
conn = get_connection()
repo = ContentRepository()
analyzer = AclaradorAnalyzer()
# Get active sources
sources = conn.execute("""
SELECT id, source_type, source_name, source_url
FROM content_sources
WHERE is_active = TRUE
""").fetchall()
total_fetched = 0
total_new = 0
total_analyzed = 0
# Process each source
for source in sources:
source_id, source_type, source_name, source_url = source
fetch_start = datetime.utcnow()
try:
# 1. FETCH
if source_type == 'RSS':
fetcher = MadridRSSFetcher(source_id, source_url)
items = fetcher.fetch(max_items=100)
total_fetched += len(items)
new_count = 0
# 2. STORE & ANALYZE
for item in items:
# Check if exists
existing = conn.execute(
"SELECT id FROM content_items WHERE url = ? OR content_hash = ?",
[item['url'], item['content_hash']]
).fetchone()
if existing:
continue # Skip duplicates
# Insert new item
content_id = repo.insert_content(item)
if content_id:
new_count += 1
total_new += 1
# 3. ANALYZE
analysis_result = analyzer.analyze(
item['content_text'],
item['title']
)
# 4. STORE ANALYSIS
if repo.insert_analysis(content_id, analysis_result):
total_analyzed += 1
# 5. LOG SUCCESS
fetch_end = datetime.utcnow()
conn.execute("""
INSERT INTO fetch_logs (
source_id, fetch_start, fetch_end,
items_fetched, items_new, status
) VALUES (?, ?, ?, ?, ?, ?)
""", [source_id, fetch_start, fetch_end, len(items), new_count, 'success'])
conn.commit()
except Exception as e:
# 6. LOG FAILURE
logger.error(f"Error: {e}")
conn.execute("""
INSERT INTO fetch_logs (
source_id, fetch_start, fetch_end,
items_fetched, items_new, status, error_message
) VALUES (?, ?, ?, ?, ?, ?, ?)
""", [source_id, fetch_start, datetime.utcnow(), 0, 0, 'failed', str(e)])
conn.commit()
Pipeline Steps:
- Fetch content from sources
- Check for duplicates (URL or content hash)
- Insert new content
- Analyze with Groq/Aclarador
- Store analysis results
- Log operation (success or failure)
Error Handling: If one source fails, others continue.
Gradio UI
File: app.py (lines 425-596)
Dashboard Tab (lines 452-489)
with gr.Tab("π Dashboard"):
gr.Markdown("### Overview Statistics")
stats_display = gr.JSON(label="Statistics")
refresh_stats_btn = gr.Button("π Refresh Statistics", variant="primary")
refresh_stats_btn.click(get_dashboard_stats, outputs=stats_display)
# Charts
with gr.Row():
distribution_chart = gr.Plot(label="Clarity Score Distribution")
category_chart = gr.Plot(label="Scores by Category")
timeline_chart = gr.Plot(label="Content Timeline")
refresh_charts_btn = gr.Button("π Refresh Charts")
def refresh_all_charts():
return (
get_clarity_distribution(),
get_category_scores(),
get_content_timeline()
)
refresh_charts_btn.click(
refresh_all_charts,
outputs=[distribution_chart, category_chart, timeline_chart]
)
Pattern:
- Define UI components
- Define functions
- Connect with
.click()
Browse Content Tab (lines 494-529)
with gr.Tab("π Browse Content"):
# Filters
with gr.Row():
days_slider = gr.Slider(1, 90, value=7, step=1, label="Last N Days")
category_dropdown = gr.Dropdown(
["All", "Noticias", "Documentos", "Anuncios"],
value="All",
label="Category"
)
with gr.Row():
min_clarity = gr.Slider(0, 100, value=0, label="Min Clarity Score")
max_clarity = gr.Slider(0, 100, value=100, label="Max Clarity Score")
search_box = gr.Textbox(label="Search Text")
search_btn = gr.Button("π Search")
results_table = gr.Dataframe(label="Search Results")
# Connect button to function
search_btn.click(
search_content,
inputs=[days_slider, category_dropdown, min_clarity, max_clarity, search_box],
outputs=results_table
)
Dynamic Filtering: All filter values passed to search_content() function.
Settings Tab (lines 563-587)
with gr.Tab("βοΈ Settings"):
gr.Markdown("### Manual Operations")
fetch_btn = gr.Button("π Trigger Manual Fetch")
fetch_status = gr.Textbox(label="Fetch Status")
fetch_btn.click(trigger_manual_fetch, outputs=fetch_status)
Manual Fetch: User can trigger fetch immediately (doesn't wait 6 hours).
Chart Functions
Clarity Distribution (lines 70-109):
def get_clarity_distribution():
query = """
SELECT
CASE
WHEN overall_score < 30 THEN '0-29 (Poor)'
WHEN overall_score < 50 THEN '30-49 (Fair)'
WHEN overall_score < 70 THEN '50-69 (Good)'
WHEN overall_score < 90 THEN '70-89 (Very Good)'
ELSE '90-100 (Excellent)'
END as score_range,
COUNT(*) as count
FROM clarity_analyses
GROUP BY score_range
"""
df = conn.execute(query).df()
fig = px.bar(
df,
x='score_range',
y='count',
title='Clarity Score Distribution',
color='count',
color_continuous_scale='RdYlGn'
)
return fig
SQL CASE: Buckets scores into ranges.
Plotly: Creates interactive bar chart with color gradient.
Complete Data Flow
Automatic Flow (Every 6 Hours)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 1. APScheduler triggers fetch_and_analyze_content() β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 2. Get active sources from database β
β - Madrid RSS Feed β
β - Open Data Portal β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 3. For each source: β
β a. MadridRSSFetcher.fetch() β
β - Parse RSS feed β
β - Extract entries β
β - Convert HTML to text β
β - Generate content hash β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 4. For each item: β
β a. Check if exists (by URL or hash) β
β b. If duplicate β Skip β
β c. If new β Insert into content_items β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 5. For each new item: β
β a. AclaradorAnalyzer.analyze() β
β - If GROQ_API_KEY set: β
β * Load system prompt β
β * Call Groq API β
β * Parse response β
β - Else: β
β * Use fallback heuristics β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 6. Insert analysis results: β
β a. Insert into clarity_analyses β
β b. Update content_items.is_processed = TRUE β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 7. Log operation: β
β - Insert into fetch_logs (success/failure) β
β - Update content_sources.last_fetched β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
β 8. Data available in UI: β
β - Dashboard shows updated statistics β
β - Browse shows new content β
β - Analytics shows trends β
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
User Interaction Flow
User clicks "π Refresh Statistics"
β
Gradio calls get_dashboard_stats()
β
ContentRepository.get_statistics()
β
Execute SQL queries:
- COUNT content items
- COUNT analyzed items
- AVG clarity score
- MAX last fetch time
β
Return dict with stats
β
Gradio displays in JSON component
Search Flow
User sets filters:
- Last 7 days
- Category: "Noticias"
- Min clarity: 50
- Search text: "normativa"
β
User clicks "π Search"
β
Gradio calls search_content(7, "Noticias", 50, 100, "normativa")
β
ContentRepository.search_content()
β
Build dynamic SQL query:
SELECT ...
WHERE published_at >= (NOW - 7 days)
AND category = 'Noticias'
AND overall_score >= 50
AND (title LIKE '%normativa%' OR content_text LIKE '%normativa%')
ORDER BY published_at DESC
LIMIT 100
β
Convert to DataFrame
β
Gradio displays in table
Error Handling
Database Errors
Problem: Database empty on first run
# β Old code - crashes
total = self.conn.execute("SELECT COUNT(*) FROM content_items").fetchone()[0]
# β
New code - safe
total_result = self.conn.execute("SELECT COUNT(*) FROM content_items").fetchone()
total = total_result[0] if total_result else 0
Fetch Errors
Problem: One bad RSS entry crashes entire fetch
# β
Error handling
for entry in feed.entries:
try:
item = self._parse_entry(entry)
if item:
items.append(item)
except Exception as e:
logger.error(f"Error parsing entry: {e}")
continue # Skip this entry, continue with others
Analysis Errors
Problem: Groq API fails
def analyze(self, text: str):
try:
# Try Groq API
response = self.groq_client.chat.completions.create(...)
return self._parse_groq_response(response)
except Exception as e:
logger.error(f"Groq API error: {e}")
# Fall back to heuristics
return self._fallback_analysis(text)
Permission Errors
Problem: /data directory not accessible
try:
DATA_DIR = Path('/data')
DATA_DIR.mkdir(exist_ok=True)
except (PermissionError, OSError):
# Fall back to local directory
DATA_DIR = Path(__file__).parent.parent / 'data'
DATA_DIR.mkdir(exist_ok=True)
Key Design Patterns
1. Singleton Pattern
Where: Database connection Why: One connection shared across app
2. Repository Pattern
Where: ContentRepository Why: Separates data access from business logic
3. Fallback Pattern
Where: Aclarador analyzer, database path Why: Graceful degradation when resources unavailable
4. Error Resilience
Where: Fetch pipeline, analysis Why: One failure doesn't stop entire process
5. Null Safety
Where: All database queries Why: Empty database doesn't crash app
Performance Optimizations
1. Database Indexes
CREATE INDEX idx_content_published ON content_items(published_at);
Impact: Fast date-range queries
2. Connection Pooling
_conn = None # Reuse same connection
Impact: No overhead creating connections
3. Limit Results
LIMIT 100
Impact: Don't load entire database into memory
4. Deduplication at Insert
SELECT id FROM content_items WHERE url = ? OR content_hash = ?
Impact: No duplicate processing
5. Background Processing
scheduler = BackgroundScheduler()
Impact: Fetch doesn't block UI
Configuration Points
1. Fetch Frequency
File: app.py:38
hours=6 # Change to 1, 3, 12, 24
2. Groq API Key
Environment Variable: GROQ_API_KEY
3. Database Path
File: config/database.py:13-23
DATA_DIR = Path('/data') # Or local path
4. Max Items Per Fetch
File: schedulers/background_tasks.py:57
items = fetcher.fetch(max_items=100) # Increase/decrease
5. Default Data Sources
File: config/database.py:144-149
INSERT INTO content_sources VALUES
(1, 'RSS', 'Madrid...', 'https://...', 6),
(2, 'API', 'Open Data', 'https://...', 24)
Summary
The app is a closed loop:
- Scheduler triggers fetch every 6 hours
- Fetcher gets content from Madrid RSS
- Repository stores in DuckDB (deduplicates)
- Analyzer analyzes with Groq AI
- Repository stores analysis results
- UI displays everything to user
Key Features:
- β Fully automated
- β Error resilient
- β Deduplicates content
- β AI-powered analysis
- β Interactive dashboard
- β Free hosting on HF Spaces
All configuration: Environment variables and simple code changes - no complex config files!