|
|
|
|
|
|
|
|
import fitz |
|
|
import json |
|
|
from pathlib import Path |
|
|
import torch |
|
|
from transformers import pipeline, AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig |
|
|
import os |
|
|
from huggingface_hub import hf_hub_download, HfApi |
|
|
from datetime import datetime |
|
|
import zipfile |
|
|
|
|
|
|
|
|
MODEL_ID = "mistralai/Mistral-7B-Instruct-v0.3" |
|
|
PROMPT_TEMPLATE = """ |
|
|
[INST]Tu es un expert en analyse de données financières de la BRVM. Extrais les informations du texte suivant et retourne-les sous la forme d'un objet JSON unique et valide. Ta réponse doit commencer par `{{` et se terminer par `}}`. N'inclus aucun texte, explication ou formatage en dehors de l'objet JSON. |
|
|
|
|
|
**Texte du bulletin à analyser :** |
|
|
{texte_pdf}[/INST] |
|
|
""" |
|
|
|
|
|
MAX_NEW_TOKENS = 8192 |
|
|
|
|
|
|
|
|
def initialize_model_pipeline(): |
|
|
try: |
|
|
bnb_config = BitsAndBytesConfig( |
|
|
load_in_4bit=True, |
|
|
bnb_4bit_quant_type="nf4", |
|
|
bnb_4bit_compute_dtype=torch.bfloat16, |
|
|
bnb_4bit_use_double_quant=True, |
|
|
) |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(MODEL_ID) |
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
MODEL_ID, |
|
|
quantization_config=bnb_config, |
|
|
device_map="auto", |
|
|
torch_dtype=torch.bfloat16, |
|
|
trust_remote_code=True |
|
|
) |
|
|
|
|
|
if tokenizer.pad_token is None: |
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
extractor_pipeline = pipeline( |
|
|
"text-generation", |
|
|
model=model, |
|
|
tokenizer=tokenizer, |
|
|
) |
|
|
return extractor_pipeline |
|
|
except Exception as e: |
|
|
print(f"Erreur chargement modèle : {e}") |
|
|
return None |
|
|
|
|
|
|
|
|
def download_and_extract_pdfs(repo_id, zip_filename, target_folder, cache_folder): |
|
|
try: |
|
|
local_zip_path = hf_hub_download( |
|
|
repo_id=repo_id, |
|
|
filename=zip_filename, |
|
|
repo_type="dataset", |
|
|
cache_dir=cache_folder |
|
|
) |
|
|
|
|
|
target_folder.mkdir(parents=True, exist_ok=True) |
|
|
extracted_files = [] |
|
|
|
|
|
with zipfile.ZipFile(local_zip_path, 'r') as z: |
|
|
for member in z.infolist(): |
|
|
if member.is_dir() or member.filename.startswith('/') or '..' in member.filename: |
|
|
continue |
|
|
if member.filename.lower().endswith('.pdf'): |
|
|
target_path = target_folder / Path(member.filename).name |
|
|
with z.open(member) as source, open(target_path, "wb") as target: |
|
|
target.write(source.read()) |
|
|
extracted_files.append(target_path) |
|
|
|
|
|
return extracted_files |
|
|
except Exception as e: |
|
|
print(f"Erreur extraction PDF : {e}") |
|
|
return [] |
|
|
|
|
|
def extract_text_from_pdf(pdf_path): |
|
|
try: |
|
|
with fitz.open(pdf_path) as doc: |
|
|
return "\n".join(page.get_text() for page in doc) |
|
|
except Exception as e: |
|
|
print(f"Erreur lecture PDF : {e}") |
|
|
return "" |
|
|
|
|
|
|
|
|
def parse_json_from_model_output(raw_output): |
|
|
try: |
|
|
generated_text = raw_output.split("[/INST]")[-1].strip() |
|
|
start_index = generated_text.find('{') |
|
|
end_index = generated_text.rfind('}') |
|
|
if start_index != -1 and end_index != -1 and end_index > start_index: |
|
|
json_str = generated_text[start_index : end_index + 1] |
|
|
return json.loads(json_str) |
|
|
else: |
|
|
raise ValueError("Accolades JSON non trouvées.") |
|
|
except Exception as e: |
|
|
return {"error": "ParsingFailed", "details": str(e), "raw_output": raw_output} |
|
|
|
|
|
def process_single_pdf(pdf_path, pipeline): |
|
|
text = extract_text_from_pdf(pdf_path) |
|
|
if not text.strip(): |
|
|
return {"error": "PDF vide", "source_file": pdf_path.name} |
|
|
|
|
|
prompt = PROMPT_TEMPLATE.format(texte_pdf=text[:30000]) |
|
|
try: |
|
|
response = pipeline( |
|
|
prompt, |
|
|
max_new_tokens=MAX_NEW_TOKENS, |
|
|
temperature=0.2, |
|
|
do_sample=False, |
|
|
return_full_text=False, |
|
|
pad_token_id=pipeline.tokenizer.eos_token_id |
|
|
) |
|
|
raw_output = response[0]['generated_text'] |
|
|
data = parse_json_from_model_output(f"[INST]{prompt}[/INST]{raw_output}") |
|
|
data['source_file'] = pdf_path.name |
|
|
return data |
|
|
except Exception as e: |
|
|
return {"error": "PipelineError", "details": str(e), "source_file": pdf_path.name} |
|
|
|
|
|
|
|
|
def upload_results_to_hf_single(result, repo_id, hf_token): |
|
|
if not hf_token: |
|
|
print("HF_TOKEN manquant.") |
|
|
return |
|
|
try: |
|
|
api = HfApi(token=hf_token) |
|
|
temp_path = Path("temp_result.json") |
|
|
with open(temp_path, "w", encoding="utf-8") as f: |
|
|
json.dump(result, f, ensure_ascii=False, indent=2) |
|
|
|
|
|
api.upload_file( |
|
|
path_or_fileobj=str(temp_path), |
|
|
repo_id=repo_id, |
|
|
repo_type="dataset", |
|
|
path_in_repo=f"{result['source_file']}.json", |
|
|
commit_message=f"Upload {result['source_file']}" |
|
|
) |
|
|
temp_path.unlink() |
|
|
except Exception as e: |
|
|
print(f"Erreur upload : {e}") |
|
|
|