Spaces:
Sleeping
Sleeping
| import re | |
| from bs4 import BeautifulSoup | |
| import requests | |
| import json | |
| import io | |
| import fitz | |
| from pptx import Presentation | |
| from io import BytesIO | |
| import chardet | |
| from docx import Document | |
| import pandas as pd | |
| from sumarize import summarize | |
| from io import BytesIO | |
| from pdfminer.pdfinterp import PDFResourceManager, PDFPageInterpreter | |
| from pdfminer.converter import TextConverter | |
| from io import StringIO | |
| from pdfminer.layout import LAParams | |
| from pdfminer.pdfpage import PDFPage | |
| def trim_input_words(input_str, max_new_tokens = 512, max_total_tokens=32768): | |
| words = input_str.split() | |
| max_input_tokens = max_total_tokens - max_new_tokens | |
| if len(words) > max_input_tokens - 100: | |
| words = words[:max_input_tokens] | |
| trimmed_input_str = ' '.join(words) | |
| return trimmed_input_str | |
| def select_words_until_char_limit(s, char_limit): | |
| s_no_punct = re.sub(r'[^\w\s]', '', s) # remove punctuation, but leave spaces | |
| words = s_no_punct.split() | |
| selected_words = [] | |
| total_chars = 0 | |
| for word in words: | |
| if total_chars + len(word) + 1 <= char_limit: | |
| selected_words.append(word) | |
| total_chars += len(word) + 1 # add 1 for the space | |
| else: | |
| break | |
| f = trim_input_words(' '.join(selected_words)) | |
| return f | |
| def downl(url): | |
| try: | |
| rq = requests.get(url) | |
| if rq.status_code != 200: | |
| return "" | |
| bs = BeautifulSoup(rq.text, features='lxml') | |
| lis = bs.find_all('ul', class_='dropdown-menu')[-1].find_all('li') | |
| link = lis[-1].find('a').get('href') | |
| print(link) | |
| return link | |
| except Exception as e: | |
| return "" | |
| def pdf(url): | |
| # Download the PDF content | |
| response = requests.get(url) | |
| pdf_content = response.content | |
| # Convert the bytes object to a file-like object | |
| pdf_file = BytesIO(pdf_content) | |
| # Extract text from the downloaded PDF content | |
| resource_manager = PDFResourceManager() | |
| fake_file_handle = StringIO() | |
| converter = TextConverter(resource_manager, fake_file_handle, laparams=LAParams()) | |
| page_interpreter = PDFPageInterpreter(resource_manager, converter) | |
| for page in PDFPage.get_pages(pdf_file): | |
| page_interpreter.process_page(page) | |
| text = fake_file_handle.getvalue() | |
| f = select_words_until_char_limit(text, 30000) | |
| converter.close() | |
| fake_file_handle.close() | |
| return f | |
| def excel(link : str) -> str: | |
| try: | |
| response = requests.get(link) | |
| if response.status_code == 200: | |
| file_content = response.content | |
| df = pd.read_excel(BytesIO(file_content)) | |
| if df.shape[0] > 50: | |
| sample_size = 50 | |
| sample_df = df.sample(n=sample_size, random_state=42) | |
| else: | |
| sample_df = df | |
| json_data = sample_df.to_json(orient='records') | |
| js = json.loads(json_data) | |
| rs = select_words_until_char_limit(f"{js}", 32000) | |
| return rs | |
| else: | |
| print("Failed to download file") | |
| return "No dat avaible error" | |
| except Exception as e: | |
| print(e) | |
| return "No data avaible" | |
| def csv(link : str) -> str: | |
| try: | |
| response = requests.get(link) | |
| if response.status_code == 200: | |
| file_content = response.content | |
| detected_encoding = chardet.detect(file_content)['encoding'] | |
| df = pd.read_csv(io.BytesIO(file_content), encoding=detected_encoding, sep=';') | |
| if df.empty: | |
| print("The DataFrame is empty.") | |
| return 'The data frame is empty' | |
| if df.shape[0] > 50: | |
| sample_size = 50 | |
| sample_df = df.sample(n=sample_size, random_state=42) | |
| else: | |
| sample_df = df | |
| json_data = sample_df.to_json(orient='records') | |
| js = json.loads(json_data) | |
| rs = select_words_until_char_limit(f"{js}", 32000) | |
| return rs | |
| except Exception as e: | |
| return 'No data avaible' | |
| def docx(url : str) -> str: | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() # Ensure we notice bad responses | |
| # Read the .docx file | |
| file_stream = io.BytesIO(response.content) | |
| doc = Document(file_stream) | |
| # Extract text | |
| full_text = [] | |
| for para in doc.paragraphs: | |
| full_text.append(para.text) | |
| f = "\n".join(full_text) | |
| n = select_words_until_char_limit(f, 32000) | |
| return n | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| return 'No data avaible' | |
| def pptx(url : str) -> str: | |
| try: | |
| response = requests.get(url) | |
| response.raise_for_status() | |
| # Read the .pptx file | |
| file_stream = io.BytesIO(response.content) | |
| presentation = Presentation(file_stream) | |
| # Extract text | |
| full_text = [] | |
| for slide in presentation.slides: | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text"): | |
| full_text.append(shape.text) | |
| g = "\n".join(full_text) | |
| c = select_words_until_char_limit(g, 32000) | |
| return c | |
| except Exception as e: | |
| print(f"An error occurred: {e}") | |
| return 'No data avaible' | |
| def get_data(url): | |
| ki = url.replace('\nObservation', '').replace('"\nObservation', '') | |
| jo = downl(ki) | |
| ext = jo.split(".")[-1] | |
| if ext == 'xlsx' or ext == 'xls' or ext == 'xlsm': | |
| rs = excel(jo) | |
| return summarize.invoke({"input":rs}) | |
| elif ext == 'pdf': | |
| rs = pdf(jo) | |
| return summarize.invoke({"input":rs}) | |
| elif ext == 'docx': | |
| rs = docx(jo) | |
| return summarize.invoke({"input":rs}) | |
| elif ext == 'csv': | |
| rs = csv(jo) | |
| return summarize.invoke({"input":rs}) | |
| elif ext == 'pptx' or ext == 'ppt': | |
| rs = pptx(jo) | |
| return summarize.invoke({"input":rs}) | |
| elif ext == 'doc': | |
| return "L'extension .doc non supportée." | |
| return "No data returned" | |