Spaces:
Sleeping
Sleeping
| import os | |
| from typing import List, Dict | |
| import duckdb | |
| import pandas as pd | |
| from huggingface_hub import hf_hub_download | |
| # Initialisations | |
| REPO_ID = "Loren/articles_database" | |
| cache_dir = "/tmp" | |
| os.makedirs(cache_dir, exist_ok=True) | |
| # Rediriger le cache HF globalement | |
| os.environ["HF_HOME"] = cache_dir | |
| os.environ["HF_DATASETS_CACHE"] = cache_dir | |
| os.environ["TRANSFORMERS_CACHE"] = cache_dir | |
| # Téléchargement des fichiers Parquet depuis Hugging Face | |
| articles_parquet = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="articles.parquet", | |
| repo_type="dataset", | |
| cache_dir=cache_dir) | |
| tags_parquet = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="tags.parquet", | |
| repo_type="dataset", | |
| cache_dir=cache_dir) | |
| tag_article_parquet = hf_hub_download( | |
| repo_id=REPO_ID, | |
| filename="tag_article.parquet", | |
| repo_type="dataset", | |
| cache_dir=cache_dir) | |
| # Connexion DuckDB en mémoire | |
| con = duckdb.connect() | |
| # Créer des tables DuckDB directement à partir des fichiers Parquet | |
| con.execute(f"CREATE VIEW articles AS SELECT * FROM parquet_scan('{articles_parquet}')") | |
| con.execute(f"CREATE VIEW tags AS SELECT * FROM parquet_scan('{tags_parquet}')") | |
| con.execute(f"CREATE VIEW tag_article AS SELECT * FROM parquet_scan('{tag_article_parquet}')") | |
| # Fonctions d'accès aux données | |
| def fetch_tags() -> List[str]: | |
| """ | |
| Récupère la liste de tous les tags disponibles dans la base de données. | |
| Returns: | |
| Dict: Un dictionnaire contenant le statut et les résultats. | |
| - Si succès : | |
| { | |
| "status": "ok", | |
| "result": List[str] # Liste des noms de tags triés par ordre alphabétique | |
| } | |
| - En cas d'erreur : | |
| { | |
| "status": "error", | |
| "code": str, # Nom de l'exception | |
| "message": str # Message de l'exception | |
| } | |
| """ | |
| try: | |
| query = "SELECT tag_name FROM tags ORDER BY tag_name" | |
| result = con.execute(query).fetchall() | |
| return {"status": "ok", "result": [row[0] for row in result]} | |
| except Exception as e: | |
| return {"status": "error", "code": type(e).__name__, "message": str(e)} | |
| def fetch_articles_by_tags(tags: List[str]) -> List[Dict]: | |
| """ | |
| Récupère les articles associés à un ou plusieurs tags. | |
| Args: | |
| tags (List[str]): Une liste de noms de tags pour filtrer les articles. | |
| Returns: | |
| Dict: Un dictionnaire contenant le statut et les résultats. | |
| - Si succès : | |
| { | |
| "status": "ok", | |
| "result": List[Dict] # Liste de dictionnaires représentant les articles | |
| } | |
| Chaque dictionnaire contient les clés : | |
| - 'article_id': int, ID de l'article | |
| - 'article_title': str, Titre de l'article | |
| - 'article_url': str, URL de l'article | |
| - En cas d'erreur ou si aucun tag fourni : | |
| { | |
| "status": "error", | |
| "code": str, # Code d'erreur ou nom de l'exception | |
| "message": str # Message d'erreur | |
| } | |
| Notes: | |
| - Si la liste `tags` est vide, la fonction retourne une liste vide. | |
| - Les résultats incluent uniquement les articles correspondant à au moins un des tags fournis. | |
| """ | |
| if not tags: | |
| return {"status": "error", "code": "no_tags", "message": "Aucun tag fourni."} | |
| try: | |
| placeholders = ",".join(["?"] * len(tags)) | |
| query = f"""SELECT distinct a.article_id, a.article_title, a.article_url | |
| FROM tags t, tag_article ta, articles a | |
| WHERE t.tag_id = ta.tag_id | |
| AND ta.article_id = a.article_id | |
| AND t.tag_name IN ({placeholders}) | |
| """ | |
| result = con.execute(query, tags).fetchdf() | |
| return {"status": "ok", "result": result.to_dict(orient="records")} | |
| except Exception as e: | |
| return {"status": "error", "code": type(e).__name__, "message": str(e)} |