EUFRAUDE / appV0.py
MMOON's picture
Rename app.py to appV0.py
dca68e4 verified
import streamlit as st
import requests
from bs4 import BeautifulSoup
import re
import os
import sqlite3
import pandas as pd
from datetime import datetime
import urllib.parse
import io
import PyPDF2
import time
import random
# Configuration de l'application
st.set_page_config(
page_title="Moniteur de Fraudes Alimentaires EU",
page_icon="🍽️",
layout="wide"
)
# Constantes
BASE_URL = "https://knowledge4policy.ec.europa.eu/food-fraud-quality/monthly-food-fraud-summary-reports_en"
REPORTS_BASE_URL = "https://knowledge4policy.ec.europa.eu/food-fraud-quality/monthly-food-fraud-summary-reports/publication/"
DB_PATH = "food_fraud_database.db"
REPORTS_DIR = "reports"
# Création des dossiers nécessaires s'ils n'existent pas
if not os.path.exists(REPORTS_DIR):
os.makedirs(REPORTS_DIR)
# Informations sur le stockage
storage_info = """
### Informations sur le stockage des données
- **Base de données** : Les informations sont stockées dans `food_fraud_database.db` (SQLite)
- **Fichiers PDF** : Les rapports sont stockés dans le dossier `reports/`, organisés par année
- **Persistance** : Les données sont conservées entre les sessions
"""
# Initialisation de la base de données
def init_db():
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
# Table des rapports mensuels
c.execute('''
CREATE TABLE IF NOT EXISTS reports (
id INTEGER PRIMARY KEY AUTOINCREMENT,
month TEXT,
year INTEGER,
filename TEXT,
url TEXT,
local_path TEXT,
download_date TIMESTAMP,
processed BOOLEAN
)
''')
# Table des cas de fraude
c.execute('''
CREATE TABLE IF NOT EXISTS fraud_cases (
id INTEGER PRIMARY KEY AUTOINCREMENT,
report_id INTEGER,
case_date TEXT,
country TEXT,
product_type TEXT,
fraud_type TEXT,
description TEXT,
source_urls TEXT,
FOREIGN KEY (report_id) REFERENCES reports (id)
)
''')
conn.commit()
conn.close()
# Fonction pour extraire les mois et années d'une URL de rapport
def extract_month_year_from_url(url):
try:
filename = url.split('/')[-1]
patterns = [
r'food-fraud-(?:summary|monthly)-(?:report-)?([a-zA-Z\-&]+)-(\d{4})(?:_en)?',
r'-([a-zA-Z]+(?:-[a-zA-Z]+)?)-(\d{4})(?:_en)?',
r'-([a-zA-Z]+(?:-?&-?[a-zA-Z]+)?)-(\d{4})(?:_en)?'
]
for pattern in patterns:
match = re.search(pattern, filename)
if match:
month_text = match.group(1).strip().lower()
year = int(match.group(2))
month = month_text.replace('-', ' ').replace('&', ' and ')
return month, year
parts = filename.split('-')
if len(parts) >= 4:
month_part = parts[-2]
year_part = parts[-1].split('_')[0]
if year_part.isdigit() and len(year_part) == 4:
return month_part, int(year_part)
print(f"Impossible d'extraire le mois et l'année: {url}")
return None, None
except Exception as e:
print(f"Erreur d'extraction: {e}, URL: {url}")
return None, None
# Fonction pour récupérer les rapports mensuels
def get_monthly_reports():
try:
response = requests.get(BASE_URL)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
report_links = []
report_set = set()
# Rechercher dans le texte
all_text = soup.get_text()
report_pattern = r'Food Fraud (?:Monthly|Summary) Report - ([A-Za-z\s&]+) (\d{4})'
report_matches = re.finditer(report_pattern, all_text)
for match in report_matches:
month_text = match.group(1).strip()
year = match.group(2)
report_key = f"{month_text}_{year}"
if report_key not in report_set:
report_set.add(report_key)
month_url_part = month_text.lower().replace(' ', '-').replace('&', '-')
report_url = f"{REPORTS_BASE_URL}food-fraud-summary-{month_url_part}-{year}_en"
report_links.append({
'title': f"Food Fraud Monthly Report - {month_text} {year}",
'url': report_url,
'month': month_text.lower(),
'year': int(year)
})
# Chercher les liens réels
links = soup.find_all('a', href=True)
for link in links:
href = link['href']
title = link.get_text().strip()
if ('/food-fraud-quality/monthly-food-fraud-summary-reports/publication/' in href and
('food-fraud-summary-' in href or 'food-fraud-monthly-' in href)):
full_url = urllib.parse.urljoin(BASE_URL, href)
month, year = extract_month_year_from_url(href)
if month and year:
report_key = f"{month}_{year}"
if report_key not in report_set:
report_set.add(report_key)
report_links.append({
'title': title if title else f"Food Fraud Report - {month} {year}",
'url': full_url,
'month': month,
'year': year
})
# Trier les rapports
report_links.sort(key=lambda x: (x['year'], x['month']), reverse=True)
return report_links
except Exception as e:
st.error(f"Erreur lors de l'extraction des liens: {e}")
return []
# Fonction pour générer plusieurs formats d'URL possibles
def generate_possible_urls(month, year):
# Normaliser le mois
month_normalized = month.lower().strip()
# Remplacer les caractères spéciaux et espaces pour l'URL
month_url = month_normalized.replace(' ', '-').replace('&', '-')
month_url_and = month_normalized.replace(' ', '-').replace('&', 'and')
month_url_no_and = month_normalized.replace(' & ', '-').replace('&', '')
# Formats d'URL possibles
url_formats = [
# Format standard
f"{REPORTS_BASE_URL}food-fraud-summary-{month_url}-{year}_en",
# Format avec 'monthly-report'
f"{REPORTS_BASE_URL}food-fraud-monthly-report-{month_url}-{year}_en",
# Format avec 'and' au lieu de '&'
f"{REPORTS_BASE_URL}food-fraud-summary-{month_url_and}-{year}_en",
# Format sans '&'
f"{REPORTS_BASE_URL}food-fraud-summary-{month_url_no_and}-{year}_en",
# Variantes sans '_en' à la fin
f"{REPORTS_BASE_URL}food-fraud-summary-{month_url}-{year}",
f"{REPORTS_BASE_URL}food-fraud-monthly-report-{month_url}-{year}",
# Autres variantes possibles
f"{REPORTS_BASE_URL}monthly-food-fraud-summary-{month_url}-{year}_en",
f"{REPORTS_BASE_URL}monthly-food-fraud-report-{month_url}-{year}_en"
]
# Cas spécial pour les mois composés (comme "November & December")
if '&' in month_normalized:
parts = month_normalized.split('&')
if len(parts) == 2:
first_month = parts[0].strip()
second_month = parts[1].strip()
# Format avec tiret entre les mois
url_formats.append(f"{REPORTS_BASE_URL}food-fraud-summary-{first_month}-{second_month}-{year}_en")
url_formats.append(f"{REPORTS_BASE_URL}food-fraud-monthly-report-{first_month}-{second_month}-{year}_en")
# Format avec 'and' entre les mois
url_formats.append(f"{REPORTS_BASE_URL}food-fraud-summary-{first_month}-and-{second_month}-{year}_en")
url_formats.append(f"{REPORTS_BASE_URL}food-fraud-monthly-report-{first_month}-and-{second_month}-{year}_en")
return url_formats
# Fonction pour obtenir le lien de téléchargement PDF
def get_pdf_download_link(report_url):
try:
# Ajouter un délai aléatoire pour éviter trop de requêtes
time.sleep(random.uniform(1, 3))
# Essayer l'URL standard
response = requests.get(report_url)
response.raise_for_status()
soup = BeautifulSoup(response.text, 'html.parser')
# Rechercher plusieurs types de liens de téléchargement
pdf_links = []
# Liens avec la classe 'ecl-file__download'
download_links = soup.find_all('a', class_='ecl-file__download')
for link in download_links:
href = link.get('href')
if href and href.endswith('.pdf'):
full_url = urllib.parse.urljoin(report_url, href)
pdf_links.append(full_url)
# Autres liens PDF
other_pdf_links = soup.find_all('a', href=lambda href: href and href.endswith('.pdf'))
for link in other_pdf_links:
href = link.get('href')
full_url = urllib.parse.urljoin(report_url, href)
if full_url not in pdf_links:
pdf_links.append(full_url)
# Liens contenant "download"
download_text_links = soup.find_all('a', text=lambda text: text and 'download' in text.lower())
for link in download_text_links:
href = link.get('href')
if href:
full_url = urllib.parse.urljoin(report_url, href)
if full_url not in pdf_links:
pdf_links.append(full_url)
# Si l'URL standard ne fonctionne pas, essayer un format alternatif
if not pdf_links:
# Extraire le mois et l'année de l'URL
parts = report_url.split('/')
filename_part = parts[-1]
if 'food-fraud-summary-' in filename_part:
# Essayer un format alternatif courant
alt_filename = filename_part.replace('food-fraud-summary-', 'food-fraud-monthly-report-')
alt_url = '/'.join(parts[:-1]) + '/' + alt_filename
try:
alt_response = requests.get(alt_url)
alt_response.raise_for_status()
alt_soup = BeautifulSoup(alt_response.text, 'html.parser')
alt_links = alt_soup.find_all('a', class_='ecl-file__download')
for link in alt_links:
href = link.get('href')
if href and href.endswith('.pdf'):
full_url = urllib.parse.urljoin(alt_url, href)
pdf_links.append(full_url)
# Rechercher d'autres liens PDF
other_pdf_links = alt_soup.find_all('a', href=lambda href: href and href.endswith('.pdf'))
for link in other_pdf_links:
href = link.get('href')
full_url = urllib.parse.urljoin(alt_url, href)
if full_url not in pdf_links:
pdf_links.append(full_url)
except Exception as alt_e:
pass
# Cas spécial pour les mois composés
if not pdf_links and "november-&-december" in report_url.lower():
try:
alt_url = report_url.replace("november-&-december", "november-december")
alt_url = alt_url.replace("November-&-December", "November-December")
alt_response = requests.get(alt_url)
alt_response.raise_for_status()
alt_soup = BeautifulSoup(alt_response.text, 'html.parser')
alt_links = alt_soup.find_all('a', class_='ecl-file__download')
for link in alt_links:
href = link.get('href')
if href and href.endswith('.pdf'):
full_url = urllib.parse.urljoin(alt_url, href)
pdf_links.append(full_url)
except Exception as alt_e:
pass
# Chercher des liens alternatifs dans la page
if not pdf_links:
for link in soup.find_all('a', href=True):
href = link['href']
if '/publication/' in href and href != report_url:
try:
alt_url = urllib.parse.urljoin(report_url, href)
time.sleep(random.uniform(1, 2))
alt_response = requests.get(alt_url)
alt_response.raise_for_status()
alt_soup = BeautifulSoup(alt_response.text, 'html.parser')
alt_links = alt_soup.find_all('a', class_='ecl-file__download')
for link in alt_links:
href = link.get('href')
if href and href.endswith('.pdf'):
full_url = urllib.parse.urljoin(alt_url, href)
pdf_links.append(full_url)
except Exception as alt_e:
continue
return pdf_links[0] if pdf_links else None
except requests.exceptions.RequestException as e:
if isinstance(e, requests.exceptions.TooManyRedirects):
st.error(f"Trop de redirections lors de l'accès à {report_url}")
elif isinstance(e, requests.exceptions.HTTPError) and e.response.status_code == 429:
st.error(f"Trop de requêtes. Attendez quelques minutes avant de réessayer.")
else:
st.error(f"Erreur lors de l'extraction du lien PDF: {e}")
return None
except Exception as e:
st.error(f"Erreur lors de l'extraction du lien PDF: {e}")
return None
# Fonction pour télécharger le PDF avec système de réessai
def download_pdf(url, month, year, max_retries=3):
# Nettoyer le mois pour le nom de fichier
clean_month = month.lower().replace('&', 'and').replace(' ', '_')
for attempt in range(max_retries):
try:
# Ajouter un délai entre les tentatives
if attempt > 0:
delay = 2 ** attempt # Backoff exponentiel: 2, 4, 8 secondes
st.info(f"Tentative {attempt+1}/{max_retries} dans {delay} secondes...")
time.sleep(delay)
# En-têtes pour simuler un navigateur
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
'Accept-Language': 'en-US,en;q=0.5',
'DNT': '1',
'Connection': 'keep-alive',
'Upgrade-Insecure-Requests': '1',
}
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
# Vérifier que le contenu est bien un PDF
content_type = response.headers.get('Content-Type', '')
if 'application/pdf' not in content_type and not url.lower().endswith('.pdf'):
st.warning(f"Le contenu téléchargé ne semble pas être un PDF (Content-Type: {content_type})")
# Vérifier si c'est une page HTML qui pourrait contenir le lien vers le PDF
if 'text/html' in content_type:
soup = BeautifulSoup(response.text, 'html.parser')
pdf_links = []
# Chercher des liens PDF
for link in soup.find_all('a', href=True):
href = link['href']
if href.lower().endswith('.pdf'):
pdf_url = urllib.parse.urljoin(url, href)
pdf_links.append(pdf_url)
if pdf_links:
st.info(f"Lien PDF trouvé dans la page HTML. Tentative de téléchargement...")
# Télécharger le premier PDF trouvé
pdf_response = requests.get(pdf_links[0], headers=headers, timeout=30)
pdf_response.raise_for_status()
# Créer le dossier de l'année si nécessaire
year_dir = os.path.join(REPORTS_DIR, str(year))
if not os.path.exists(year_dir):
os.makedirs(year_dir)
# Nom du fichier local
filename = f"food_fraud_{clean_month}_{year}.pdf"
local_path = os.path.join(year_dir, filename)
# Sauvegarder le fichier
with open(local_path, 'wb') as f:
f.write(pdf_response.content)
return local_path
else:
continue # Essayer la prochaine tentative si aucun PDF n'est trouvé
# Créer le dossier de l'année si nécessaire
year_dir = os.path.join(REPORTS_DIR, str(year))
if not os.path.exists(year_dir):
os.makedirs(year_dir)
# Nom du fichier local
filename = f"food_fraud_{clean_month}_{year}.pdf"
local_path = os.path.join(year_dir, filename)
# Sauvegarder le fichier
with open(local_path, 'wb') as f:
f.write(response.content)
# Vérifier la taille du fichier
file_size = os.path.getsize(local_path)
if file_size < 1000: # Moins de 1KB, probablement pas un PDF valide
st.warning(f"Le fichier téléchargé est trop petit ({file_size} octets), pourrait ne pas être un PDF valide.")
# Mais continuer quand même car certains PDFs peuvent être petits
return local_path
except requests.exceptions.RequestException as e:
if isinstance(e, requests.exceptions.HTTPError) and e.response.status_code == 429:
if attempt < max_retries - 1:
st.warning(f"Trop de requêtes (429). Nouvelle tentative {attempt+2}/{max_retries} après pause...")
time.sleep(10) # Pause plus longue pour les erreurs 429
else:
st.error(f"Échec après {max_retries} tentatives: trop de requêtes (429)")
else:
st.error(f"Erreur lors du téléchargement (tentative {attempt+1}/{max_retries}): {e}")
if attempt >= max_retries - 1:
st.error(f"Échec après {max_retries} tentatives")
except Exception as e:
st.error(f"Erreur inattendue lors du téléchargement (tentative {attempt+1}/{max_retries}): {e}")
if attempt >= max_retries - 1:
st.error(f"Échec après {max_retries} tentatives")
return None # Toutes les tentatives ont échoué
# Fonction pour télécharger un rapport avec différentes URLs
def download_report(report_info, delay_between_reports=5):
title = report_info['title']
month = report_info['month']
year = report_info['year']
st.info(f"Tentative de téléchargement de {title}...")
# Ajouter un délai entre les téléchargements
time.sleep(delay_between_reports)
# Générer plusieurs URLs possibles
possible_urls = generate_possible_urls(month, year)
# D'abord essayer l'URL fournie
if 'url' in report_info and report_info['url']:
possible_urls.insert(0, report_info['url'])
for url in possible_urls:
st.info(f"Essai de l'URL: {url}")
# Tenter d'obtenir le lien de téléchargement PDF
pdf_url = get_pdf_download_link(url)
if pdf_url:
# Télécharger le PDF
local_path = download_pdf(pdf_url, month, year)
if local_path:
# Sauvegarder les informations dans la base de données
filename = os.path.basename(local_path)
report_id = save_report_to_db(month, year, filename, url, local_path)
# Extraire les cas de fraude du PDF
fraud_cases = extract_fraud_cases_from_pdf(local_path)
if fraud_cases:
# Sauvegarder les cas de fraude dans la base de données
save_fraud_cases_to_db(report_id, fraud_cases)
return True, local_path, len(fraud_cases)
else:
return True, local_path, 0
# Toutes les URLs ont échoué
return False, None, 0
# Fonction pour extraire les cas de fraude du PDF
def extract_fraud_cases_from_pdf(pdf_path):
fraud_cases = []
try:
with open(pdf_path, 'rb') as f:
pdf_reader = PyPDF2.PdfReader(f)
text = ""
for page_num in range(len(pdf_reader.pages)):
page = pdf_reader.pages[page_num]
text += page.extract_text()
case_pattern = r'(\d{2}/\d{2}/\d{4})\s+([^\n]+?)\s+([A-Z][a-z]+)\s+([^\n]+?)\s+([^\n]+?)(?=\n)'
matches = re.finditer(case_pattern, text)
for match in matches:
date = match.group(1)
sources = match.group(2).strip().split()
country = match.group(3).strip()
product_type = match.group(4).strip()
fraud_type = match.group(5).strip()
start_pos = match.end()
next_match = re.search(pattern=case_pattern, string=text[start_pos:])
if next_match:
description_text = text[start_pos:start_pos + next_match.start()].strip()
else:
description_text = text[start_pos:start_pos + 500].strip()
description_text = re.sub(r'\n+', ' ', description_text)
case = {
'date': date,
'country': country,
'product_type': product_type,
'fraud_type': fraud_type,
'description': description_text,
'sources': sources
}
fraud_cases.append(case)
return fraud_cases
except Exception as e:
st.error(f"Erreur lors de l'extraction: {e}")
return []
# Fonction pour sauvegarder les rapports dans la base de données
def save_report_to_db(month, year, filename, url, local_path=None):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
c.execute("SELECT id FROM reports WHERE month = ? AND year = ?", (month, year))
existing = c.fetchone()
if existing:
report_id = existing[0]
c.execute('''
UPDATE reports
SET filename = ?, url = ?, local_path = ?, download_date = ?, processed = ?
WHERE id = ?
''', (filename, url, local_path, datetime.now(), False, report_id))
else:
c.execute('''
INSERT INTO reports (month, year, filename, url, local_path, download_date, processed)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (month, year, filename, url, local_path, datetime.now(), False))
report_id = c.lastrowid
conn.commit()
conn.close()
return report_id
# Fonction pour sauvegarder les cas de fraude dans la base de données
def save_fraud_cases_to_db(report_id, fraud_cases):
conn = sqlite3.connect(DB_PATH)
c = conn.cursor()
for case in fraud_cases:
sources = ','.join(case.get('sources', []))
c.execute('''
INSERT INTO fraud_cases
(report_id, case_date, country, product_type, fraud_type, description, source_urls)
VALUES (?, ?, ?, ?, ?, ?, ?)
''', (
report_id,
case.get('date', ''),
case.get('country', ''),
case.get('product_type', ''),
case.get('fraud_type', ''),
case.get('description', ''),
sources
))
c.execute("UPDATE reports SET processed = ? WHERE id = ?", (True, report_id))
conn.commit()
conn.close()
# Fonction pour obtenir les rapports stockés
def get_stored_reports():
conn = sqlite3.connect(DB_PATH)
df = pd.read_sql_query("SELECT * FROM reports ORDER BY year DESC, month DESC", conn)
conn.close()
return df
# Fonction pour obtenir les cas de fraude d'un rapport
def get_fraud_cases_for_report(report_id):
conn = sqlite3.connect(DB_PATH)
df = pd.read_sql_query("SELECT * FROM fraud_cases WHERE report_id = ?", conn, params=(report_id,))
conn.close()
return df
# Fonction pour rechercher des cas de fraude
def search_fraud_cases(search_term=None, country=None, product_type=None, fraud_type=None):
conn = sqlite3.connect(DB_PATH)
query = "SELECT * FROM fraud_cases WHERE 1=1"
params = []
if search_term:
query += " AND (description LIKE ? OR country LIKE ? OR product_type LIKE ? OR fraud_type LIKE ?)"
search_pattern = f"%{search_term}%"
params.extend([search_pattern, search_pattern, search_pattern, search_pattern])
if country:
query += " AND country = ?"
params.append(country)
if product_type:
query += " AND product_type = ?"
params.append(product_type)
if fraud_type:
query += " AND fraud_type = ?"
params.append(fraud_type)
df = pd.read_sql_query(query, conn, params=params)
conn.close()
return df
# Interface utilisateur Streamlit
def main():
st.title("Moniteur de Fraudes Alimentaires EU")
# Initialisation de la base de données
init_db()
# Initialisation de la session state
if 'selected_reports' not in st.session_state:
st.session_state.selected_reports = []
if 'reports_data' not in st.session_state:
st.session_state.reports_data = None
# Menu latéral
st.sidebar.title("Navigation")
page = st.sidebar.radio("Aller à", ["Télécharger des rapports", "Explorer les données", "Recherche avancée", "Statistiques"])
if page == "Télécharger des rapports":
st.header("Téléchargement des Rapports Mensuels")
# Informations sur le stockage
with st.expander("Informations sur le stockage des données", expanded=False):
st.markdown(storage_info)
# Bouton pour récupérer les rapports
get_reports = st.button("Récupérer les rapports disponibles")
# Si le bouton est cliqué ou si les données sont déjà en mémoire
if get_reports or st.session_state.reports_data is not None:
# Ne récupérer les données que si le bouton est cliqué ou si les données ne sont pas en mémoire
if get_reports or st.session_state.reports_data is None:
with st.spinner("Récupération des rapports..."):
reports = get_monthly_reports()
if reports:
# Créer le DataFrame des rapports
report_df = pd.DataFrame([
{
'Titre': r['title'],
'URL': r['url'],
'Mois': r['month'],
'Année': r['year'],
'ID': f"{r['month']}_{r['year']}_{i}"
} for i, r in enumerate(reports) if r['month'] is not None
])
# Supprimer les doublons
report_df_no_dupes = report_df.drop_duplicates(subset=['Mois', 'Année'], keep='first')
# Stocker les données en mémoire
st.session_state.reports_data = report_df_no_dupes
else:
st.error("Aucun rapport mensuel trouvé.")
st.session_state.reports_data = None
# Afficher les données si elles sont disponibles
if st.session_state.reports_data is not None:
report_df_no_dupes = st.session_state.reports_data
st.success(f"{len(report_df_no_dupes)} rapports trouvés.")
st.dataframe(report_df_no_dupes[['Titre', 'URL', 'Mois', 'Année']])
st.subheader("Sélectionnez les rapports à télécharger")
# Organiser par année
years = sorted(report_df_no_dupes['Année'].unique(), reverse=True)
# Pour chaque année
for year in years:
year_reports = report_df_no_dupes[report_df_no_dupes['Année'] == year]
with st.expander(f"Rapports de {year} ({len(year_reports)} disponibles)", expanded=True if year == years[0] else False):
# Pour chaque rapport de cette année
for i, row in year_reports.iterrows():
# Créer une clé unique pour ce rapport
report_id = f"{row['Mois']}_{row['Année']}"
checkbox_key = f"check_{year}_{row['Mois']}_{i}"
# Vérifier si ce rapport est déjà dans la liste des sélections
is_already_selected = any(
r['month'] == row['Mois'] and r['year'] == row['Année']
for r in st.session_state.selected_reports
)
# Créer le checkbox
is_selected = st.checkbox(
f"{row['Titre']}",
value=is_already_selected,
key=checkbox_key
)
# Mettre à jour la liste des rapports sélectionnés
report_info = {
'title': row['Titre'],
'url': row['URL'],
'month': row['Mois'],
'year': row['Année'],
'id': report_id
}
if is_selected and not is_already_selected:
st.session_state.selected_reports.append(report_info)
elif not is_selected and is_already_selected:
st.session_state.selected_reports = [
r for r in st.session_state.selected_reports
if not (r['month'] == report_info['month'] and r['year'] == report_info['year'])
]
# Afficher le nombre de rapports sélectionnés
num_selected = len(st.session_state.selected_reports)
if num_selected > 0:
st.success(f"{num_selected} rapport{'s' if num_selected > 1 else ''} sélectionné{'s' if num_selected > 1 else ''}")
# Liste des rapports sélectionnés
st.write("Rapports sélectionnés :")
for report in st.session_state.selected_reports:
st.write(f"- {report['title']}")
# Bouton de téléchargement
if st.button("Télécharger les rapports sélectionnés", key="download_btn"):
download_statuses = []
for report_info in st.session_state.selected_reports:
# Utiliser la nouvelle fonction de téléchargement avec multiples tentatives
success, local_path, num_cases = download_report(report_info, delay_between_reports=3)
if success:
if num_cases > 0:
download_statuses.append({
"title": report_info['title'],
"status": "success",
"message": f"Téléchargé et {num_cases} cas de fraude extraits."
})
else:
download_statuses.append({
"title": report_info['title'],
"status": "warning",
"message": "Téléchargé mais aucun cas de fraude n'a pu être extrait."
})
else:
download_statuses.append({
"title": report_info['title'],
"status": "error",
"message": "Échec du téléchargement après plusieurs tentatives."
})
# Afficher les résultats des téléchargements
st.subheader("Résultats des téléchargements")
for status in download_statuses:
if status["status"] == "success":
st.success(f"{status['title']}: {status['message']}")
elif status["status"] == "warning":
st.warning(f"{status['title']}: {status['message']}")
else:
st.error(f"{status['title']}: {status['message']}")
# Bouton pour effacer la sélection
if st.button("Effacer la sélection et recommencer"):
st.session_state.selected_reports = []
st.experimental_rerun()
# Afficher les rapports déjà stockés
st.header("Rapports stockés dans la base de données")
stored_reports = get_stored_reports()
if not stored_reports.empty:
st.write(f"**{len(stored_reports)} rapports déjà stockés dans la base de données**")
col1, col2 = st.columns(2)
with col1:
years = sorted(stored_reports['year'].unique(), reverse=True)
for year in years:
year_reports = stored_reports[stored_reports['year'] == year]
st.subheader(f"Année {year} ({len(year_reports)} rapports)")
months_list = [f"- {row['month'].capitalize()}" for _, row in year_reports.iterrows()]
st.markdown("\n".join(months_list))
with col2:
st.subheader("Emplacement des fichiers")
st.info(f"Les rapports PDF sont stockés dans le dossier: `{os.path.abspath(REPORTS_DIR)}`")
st.info(f"La base de données se trouve à: `{os.path.abspath(DB_PATH)}`")
# Calcul de la taille totale des PDF
total_size = 0
for root, dirs, files in os.walk(REPORTS_DIR):
for file in files:
if file.endswith('.pdf'):
file_path = os.path.join(root, file)
if os.path.exists(file_path):
total_size += os.path.getsize(file_path)
total_size_mb = total_size / (1024 * 1024)
st.info(f"Espace disque utilisé par les PDF: {total_size_mb:.2f} MB")
with st.expander("Afficher les détails complets des rapports stockés", expanded=False):
st.dataframe(stored_reports)
else:
st.info("Aucun rapport n'est encore stocké dans la base de données.")
elif page == "Explorer les données":
st.header("Explorer les données de fraude alimentaire")
stored_reports = get_stored_reports()
if not stored_reports.empty:
report_options = [f"{row['month']} {row['year']}" for _, row in stored_reports.iterrows()]
selected_report = st.selectbox("Sélectionnez un rapport à explorer", options=report_options)
if selected_report:
month, year = selected_report.split()
report_id = stored_reports[(stored_reports['month'] == month) & (stored_reports['year'] == int(year))]['id'].iloc[0]
fraud_cases = get_fraud_cases_for_report(report_id)
if not fraud_cases.empty:
st.write(f"**{len(fraud_cases)} cas de fraude trouvés dans le rapport {selected_report}**")
st.dataframe(fraud_cases)
st.subheader("Répartition par pays")
country_counts = fraud_cases['country'].value_counts()
st.bar_chart(country_counts)
st.subheader("Répartition par type de produit")
product_counts = fraud_cases['product_type'].value_counts()
st.bar_chart(product_counts)
st.subheader("Répartition par type de fraude")
fraud_counts = fraud_cases['fraud_type'].value_counts()
st.bar_chart(fraud_counts)
else:
st.info(f"Aucun cas de fraude trouvé dans le rapport {selected_report}.")
else:
st.info("Aucun rapport n'est encore stocké dans la base de données.")
elif page == "Recherche avancée":
st.header("Recherche avancée de cas de fraude")
conn = sqlite3.connect(DB_PATH)
countries = pd.read_sql_query("SELECT DISTINCT country FROM fraud_cases WHERE country != ''", conn)['country'].tolist()
product_types = pd.read_sql_query("SELECT DISTINCT product_type FROM fraud_cases WHERE product_type != ''", conn)['product_type'].tolist()
fraud_types = pd.read_sql_query("SELECT DISTINCT fraud_type FROM fraud_cases WHERE fraud_type != ''", conn)['fraud_type'].tolist()
conn.close()
col1, col2 = st.columns(2)
with col1:
search_term = st.text_input("Recherche par mot-clé")
selected_country = st.selectbox("Filtrer par pays", options=[""] + countries)
with col2:
selected_product = st.selectbox("Filtrer par type de produit", options=[""] + product_types)
selected_fraud = st.selectbox("Filtrer par type de fraude", options=[""] + fraud_types)
if st.button("Rechercher"):
results = search_fraud_cases(
search_term=search_term if search_term else None,
country=selected_country if selected_country else None,
product_type=selected_product if selected_product else None,
fraud_type=selected_fraud if selected_fraud else None
)
if not results.empty:
st.success(f"{len(results)} cas de fraude trouvés.")
st.dataframe(results)
# Exporter les résultats
csv_data = results.to_csv(index=False).encode('utf-8')
st.download_button(
label="Télécharger les résultats (CSV)",
data=csv_data,
file_name="resultats_recherche_fraude.csv",
mime="text/csv"
)
else:
st.info("Aucun résultat trouvé pour ces critères de recherche.")
elif page == "Statistiques":
st.header("Statistiques sur les fraudes alimentaires")
col1, col2 = st.columns(2)
with col1:
start_year = st.number_input("Année de début", min_value=2000, max_value=datetime.now().year, value=2020)
with col2:
end_year = st.number_input("Année de fin", min_value=2000, max_value=datetime.now().year, value=datetime.now().year)
if st.button("Générer les statistiques"):
conn = sqlite3.connect(DB_PATH)
query = """
SELECT fc.*
FROM fraud_cases fc
JOIN reports r ON fc.report_id = r.id
WHERE r.year >= ? AND r.year <= ?
"""
filtered_cases = pd.read_sql_query(query, conn, params=(start_year, end_year))
conn.close()
if not filtered_cases.empty:
st.success(f"{len(filtered_cases)} cas de fraude trouvés pour la période {start_year}-{end_year}.")
st.subheader("Évolution du nombre de cas par année")
yearly_counts = pd.read_sql_query("""
SELECT r.year, COUNT(*) as count
FROM fraud_cases fc
JOIN reports r ON fc.report_id = r.id
WHERE r.year >= ? AND r.year <= ?
GROUP BY r.year
ORDER BY r.year
""", sqlite3.connect(DB_PATH), params=(start_year, end_year))
st.line_chart(yearly_counts.set_index('year'))
# Top 10 des pays avec le plus de cas
st.subheader("Top 10 des pays avec le plus de cas")
top_countries = filtered_cases['country'].value_counts().head(10)
st.bar_chart(top_countries)
# Top 10 des types de produits concernés
st.subheader("Top 10 des types de produits concernés")
top_products = filtered_cases['product_type'].value_counts().head(10)
st.bar_chart(top_products)
# Types de fraude les plus courants
st.subheader("Types de fraude les plus courants")
top_frauds = filtered_cases['fraud_type'].value_counts().head(5)
st.bar_chart(top_frauds)
# Matrice de corrélation pays/produit
st.subheader("Matrice de corrélation pays/produit")
cross_tab = pd.crosstab(filtered_cases['country'], filtered_cases['product_type'])
# Limiter aux 10 pays et 10 produits les plus fréquents pour la lisibilité
top_countries = filtered_cases['country'].value_counts().head(10).index
top_products = filtered_cases['product_type'].value_counts().head(10).index
cross_tab_filtered = cross_tab.loc[top_countries, top_products]
st.dataframe(cross_tab_filtered)
else:
st.info(f"Aucun cas de fraude trouvé pour la période {start_year}-{end_year}.")
# Point d'entrée du programme
if __name__ == "__main__":
main()