Spaces:
Sleeping
Sleeping
File size: 4,253 Bytes
70ca2a3 585d753 70ca2a3 01a35a9 70ca2a3 01a35a9 70ca2a3 01a35a9 70ca2a3 01a35a9 70ca2a3 01a35a9 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
import os
from typing import List, Dict
import duckdb
import pandas as pd
from huggingface_hub import hf_hub_download
# Initialisations
REPO_ID = "Loren/articles_database"
cache_dir = "/tmp"
os.makedirs(cache_dir, exist_ok=True)
# Rediriger le cache HF globalement
os.environ["HF_HOME"] = cache_dir
os.environ["HF_DATASETS_CACHE"] = cache_dir
os.environ["TRANSFORMERS_CACHE"] = cache_dir
# Téléchargement des fichiers Parquet depuis Hugging Face
articles_parquet = hf_hub_download(
repo_id=REPO_ID,
filename="articles.parquet",
repo_type="dataset",
cache_dir=cache_dir)
tags_parquet = hf_hub_download(
repo_id=REPO_ID,
filename="tags.parquet",
repo_type="dataset",
cache_dir=cache_dir)
tag_article_parquet = hf_hub_download(
repo_id=REPO_ID,
filename="tag_article.parquet",
repo_type="dataset",
cache_dir=cache_dir)
# Connexion DuckDB en mémoire
con = duckdb.connect()
# Créer des tables DuckDB directement à partir des fichiers Parquet
con.execute(f"CREATE VIEW articles AS SELECT * FROM parquet_scan('{articles_parquet}')")
con.execute(f"CREATE VIEW tags AS SELECT * FROM parquet_scan('{tags_parquet}')")
con.execute(f"CREATE VIEW tag_article AS SELECT * FROM parquet_scan('{tag_article_parquet}')")
# Fonctions d'accès aux données
def fetch_tags() -> List[str]:
"""
Récupère la liste de tous les tags disponibles dans la base de données.
Returns:
Dict: Un dictionnaire contenant le statut et les résultats.
- Si succès :
{
"status": "ok",
"result": List[str] # Liste des noms de tags triés par ordre alphabétique
}
- En cas d'erreur :
{
"status": "error",
"code": str, # Nom de l'exception
"message": str # Message de l'exception
}
"""
try:
query = "SELECT tag_name FROM tags ORDER BY tag_name"
result = con.execute(query).fetchall()
return {"status": "ok", "result": [row[0] for row in result]}
except Exception as e:
return {"status": "error", "code": type(e).__name__, "message": str(e)}
def fetch_articles_by_tags(tags: List[str]) -> List[Dict]:
"""
Récupère les articles associés à un ou plusieurs tags.
Args:
tags (List[str]): Une liste de noms de tags pour filtrer les articles.
Returns:
Dict: Un dictionnaire contenant le statut et les résultats.
- Si succès :
{
"status": "ok",
"result": List[Dict] # Liste de dictionnaires représentant les articles
}
Chaque dictionnaire contient les clés :
- 'article_id': int, ID de l'article
- 'article_title': str, Titre de l'article
- 'article_url': str, URL de l'article
- En cas d'erreur ou si aucun tag fourni :
{
"status": "error",
"code": str, # Code d'erreur ou nom de l'exception
"message": str # Message d'erreur
}
Notes:
- Si la liste `tags` est vide, la fonction retourne une liste vide.
- Les résultats incluent uniquement les articles correspondant à au moins un des tags fournis.
"""
if not tags:
return {"status": "error", "code": "no_tags", "message": "Aucun tag fourni."}
try:
placeholders = ",".join(["?"] * len(tags))
query = f"""SELECT distinct a.article_id, a.article_title, a.article_url
FROM tags t, tag_article ta, articles a
WHERE t.tag_id = ta.tag_id
AND ta.article_id = a.article_id
AND t.tag_name IN ({placeholders})
"""
result = con.execute(query, tags).fetchdf()
return {"status": "ok", "result": result.to_dict(orient="records")}
except Exception as e:
return {"status": "error", "code": type(e).__name__, "message": str(e)} |