Spaces:
Sleeping
Sleeping
| # UPLOAD FUNCTIONS.PY | |
| import requests | |
| import gradio as gr | |
| import pandas as pd | |
| import tiktoken | |
| import tempfile | |
| from PyPDF2 import PdfReader | |
| from tqdm import tqdm | |
| from pydantic import BaseModel, Field | |
| from phi.agent import Agent, RunResponse | |
| from phi.model.groq import Groq | |
| from sentence_transformers import SentenceTransformer | |
| from sentence_transformers import CrossEncoder | |
| #from gradio_client import Client, handle_file | |
| import os | |
| from pptx import Presentation | |
| from pptx2img import PPTXConverter # For splitting slides | |
| import uuid | |
| import shutil | |
| from PIL import Image | |
| import pandas as pd | |
| import requests | |
| import gradio as gr | |
| from pydantic import BaseModel, Field | |
| from typing import List | |
| import tiktoken | |
| from datetime import datetime | |
| import zipfile | |
| from PIL import Image | |
| import gradio as gr | |
| import threading | |
| import time | |
| import requests | |
| def get_access_token(): | |
| flow = app.initiate_device_flow(scopes=SCOPES) | |
| print("Go to", flow["verification_uri"]) | |
| print("Enter the code:", flow["user_code"]) | |
| result = app.acquire_token_by_device_flow(flow) | |
| if "access_token" not in result: | |
| print("❌ Could not acquire token:", result.get("error_description")) | |
| exit() | |
| return result["access_token"] | |
| # Function to generate a unique PPT ID | |
| def generate_unique_ppt_id(): | |
| return str(uuid.uuid4())[:8] # Generate an 8-character unique ID | |
| def truncate_text_to_tokens(text, max_tokens, model_name="cl100k_base"): | |
| encoding = tiktoken.get_encoding(model_name) | |
| tokens = encoding.encode(text) | |
| truncated_tokens = tokens[:max_tokens] | |
| return encoding.decode(truncated_tokens) | |
| def split_and_convert_ppt(file_path, output_folder_slides, output_folder_images): | |
| os.makedirs(output_folder_slides, exist_ok=True) | |
| os.makedirs(output_folder_images, exist_ok=True) | |
| presentation = Presentation(file_path) | |
| slide_texts = [] | |
| file_name = os.path.basename(file_path).split('.')[0] | |
| print('File Name ',file_name) | |
| print('File Path ',file_path) | |
| for i in range(len(presentation.slides)): | |
| unique_slide_id = f"{file_name}_{ppt_unique_id}_slide_{i + 1}" | |
| slide_file_path = os.path.join(output_folder_slides, f"{unique_slide_id}.pptx") | |
| print('Slide_file_path',slide_file_path) | |
| image_path = os.path.join(output_folder_images, f"{unique_slide_id}_slide_1.png") # refer to pptx2img it stores iamge in this format new_name = f"{pptx_name}_slide_{idx + 1}.png" | |
| print('Image file path',image_path) | |
| # ✅ Step 1: Create a single-slide PPTX | |
| new_presentation = Presentation(file_path) | |
| slide_indexes_to_remove = [j for j in range(len(new_presentation.slides)) if j != i] | |
| for idx in sorted(slide_indexes_to_remove, reverse=True): | |
| r_id = new_presentation.slides._sldIdLst[idx].rId | |
| new_presentation.part.drop_rel(r_id) | |
| del new_presentation.slides._sldIdLst[idx] | |
| new_presentation.save(slide_file_path) | |
| del new_presentation | |
| # ✅ Step 2: Convert the single-slide PPTX to image | |
| converter = PPTXConverter() | |
| converter.convert_pptx_to_images(slide_file_path, output_folder_images) | |
| print(f"Slide {i+1} converted to image: {image_path}") | |
| # ✅ Step 3: Extract text from the slide image # Switching off OCR | |
| #slide_text = extract_text_from_image(image_path) | |
| #using PPTX for text extraction(actualy its quality is better then tesseratct) | |
| # Extract text using python-pptx (editable text) | |
| slide = presentation.slides[i] | |
| pptx_text = "" | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text"): | |
| pptx_text += shape.text.strip() + "\n" | |
| print(f"🔡 PPTX Text Extractedfrom slide {i + 1}:\n", pptx_text.strip()) | |
| slide_texts.append(pptx_text.strip()) | |
| return slide_texts | |
| def generate_metadata_with_retry(full_text, retries=3, max_tokens=5000, decrement=100, model_name="cl100k_base"): | |
| for attempt in range(1, retries + 2): | |
| try: | |
| truncated_text = truncate_text_to_tokens(full_text, max_tokens, model_name) | |
| print(f"🔁 Attempt {attempt}: Generating metadata with ~{count_tokens(truncated_text)} tokens...") | |
| metadata = generate_metadata(truncated_text) | |
| print("📝 Metadata generated successfully.") | |
| return metadata # ✅ Return on success | |
| except Exception as e: | |
| print(f"❌ Error on attempt {attempt}: {str(e)}") | |
| if attempt == retries + 1: | |
| print("🚨 Max retries reached. Metadata generation failed.") | |
| return None | |
| else: | |
| max_tokens -= decrement | |
| print(f"🔄 Retrying with {max_tokens} tokens...") | |
| # Function to generate metadata using phidata agent | |
| def generate_metadata(ocr_text): | |
| # Initialize the Agent with detailed instructions | |
| metadata_agent = Agent( | |
| name="Metadata Generator", | |
| role="Generates structured metadata for presentations based on their content.", | |
| instructions=[ | |
| "Your task is to analyze the provided text and generate structured metadata for the presentation.", | |
| "Carefully evaluate the content to determine the most appropriate values for each metadata field.", | |
| # Rule 1: PPT Unique ID | |
| "For the 'PPT_Unique_ID', use the first 8 characters of the MD5 hash of the input text. " | |
| "This ensures uniqueness across presentations.", | |
| # Rule 2: Suitable Title | |
| "For the 'Suitable_Title', create a concise and meaningful title that captures the essence of the presentation. " | |
| "Focus on first slide where title of presentation is given along with key themes, topics, or keywords mentioned in the text.", | |
| # Rule 3: Slide Category | |
| "For the 'Slide_Category', classify the presentation into one of the following categories: " | |
| "The category or theme of the slides (e.g., Risk management , Data Analytics , Technology etc)" | |
| "Base your decision on the overall theme or subject matter of the content.", | |
| # Rule 4 :PPT owner | |
| "Find The owner of the presentation ie who makes the presentation (eg: Done by name and designation ie Mr. baswaraj ,Princpial ADG , Additional Director ,or organisations like NCTC,DG Systems, Directorate of Logistics etc)" | |
| "Dont Asssume if u could not found ,mention Not Available" | |
| # Rule 5: Audience/Forum | |
| "For the 'Audience_Forum', identify the target audience or forum for the presentation. " | |
| "(e.g.,NACIN , WCO, Presentation before Member (CBIC)etc )." | |
| "Dont Asssume if could not found ,mention Not Available" | |
| "Consider the tone, language, and purpose of the content.", | |
| # Rule 6: Short Summary | |
| "For the 'Short_Summary', provide a brief summary of the presentation's content with all keywords in 10 sentences. " | |
| "Highlight the keywords ,topics, main points or objectives of the presentation.", | |
| "Mention the title also in the short summary ,owner and audience of the presentation" | |
| # General Guidelines | |
| "Ensure all fields are filled and meaningful. If unsure about a field, make an educated guess based on the context.", | |
| ], | |
| model=Groq(id="deepseek-r1-distill-llama-70b"), # Replace with actual model ID | |
| response_model=PPTMetadata, | |
| markdown=True, | |
| debug_mode=True, | |
| show_tool_calls=True, | |
| monitoring=True) | |
| # Run the agent to generate metadata | |
| response = metadata_agent.run( | |
| f"Generate data fields for the following presentation content: {ocr_text}") | |
| return response.content | |
| # Function to get folder ID in OneDrive | |
| def get_folder_id(folder_path, headers): | |
| folders = folder_path.split("/") | |
| parent_id = None | |
| print("creating folder id for ",folder_path) | |
| for folder_name in folders: | |
| url = f"https://graph.microsoft.com/v1.0/me/drive/root/children" if not parent_id else f"https://graph.microsoft.com/v1.0/me/drive/items/{parent_id}/children" | |
| response = requests.get(url, headers=headers) | |
| if response.status_code != 200: | |
| print(f"Failed to retrieve folder '{folder_name}'. Error: {response.text}") | |
| return None | |
| items = response.json().get("value", []) | |
| folder_item = next((item for item in items if item["name"] == folder_name and "folder" in item), None) | |
| if not folder_item: | |
| # Create the folder if it doesn't exist | |
| create_url = "https://graph.microsoft.com/v1.0/me/drive/root/children" if not parent_id else f"https://graph.microsoft.com/v1.0/me/drive/items/{parent_id}/children" | |
| create_response = requests.post(create_url, headers=headers, json={ | |
| "name": folder_name, | |
| "folder": {}, | |
| "@microsoft.graph.conflictBehavior": "rename" | |
| }) | |
| if create_response.status_code not in [200, 201]: | |
| print(f"Failed to create folder '{folder_name}'. Error: {create_response.text}") | |
| return None | |
| folder_item = create_response.json() | |
| parent_id = folder_item["id"] | |
| return parent_id | |
| # Function to upload file to OneDrive | |
| def upload_to_onedrive(file_path, folder_id, headers): | |
| file_name = os.path.basename(file_path) | |
| upload_url = f"https://graph.microsoft.com/v1.0/me/drive/items/{folder_id}:/{file_name}:/content" | |
| with open(file_path, "rb") as file: | |
| file_content = file.read() | |
| response = requests.put(upload_url, headers=headers, data=file_content) | |
| if response.status_code in [200, 201]: | |
| print(f"Uploaded {file_name} to OneDrive.") | |
| return response.json()["id"] | |
| else: | |
| print(f"Failed to upload {file_name}. Error: {response.text}") | |
| return None | |
| # Function to count tokens using tiktoken | |
| def count_tokens(text, model_name="cl100k_base"): | |
| encoding = tiktoken.get_encoding(model_name) | |
| tokens = encoding.encode(text) | |
| return len(tokens) | |
| def list_folder_files(folder_id, headers): | |
| url = f"https://graph.microsoft.com/v1.0/me/drive/items/{folder_id}/children" | |
| response = requests.get(url, headers=headers) | |
| if response.status_code != 200: | |
| raise ValueError(f"Failed to list folder contents. Error: {response.text}") | |
| return response.json().get("value", []) | |
| def download_onedrive_file(file_id, filename, headers): | |
| url = f"https://graph.microsoft.com/v1.0/me/drive/items/{file_id}" | |
| r = requests.get(url, headers=headers).json() | |
| download_url = r.get("@microsoft.graph.downloadUrl") | |
| response = requests.get(download_url) | |
| with open(filename, 'wb') as f: | |
| f.write(response.content) | |
| def update_and_upload_metadata_simplified(metadata_list, metadata_folder_id, metadata_with_fulltext_folder_id, headers): | |
| df_new = pd.DataFrame(metadata_list, columns=[ | |
| "Unique_Slide_ID", "Slide_OCR_Text", "PPT_OCR_Text", "Slide_Embedding", "Short_Summary_Embedding", | |
| "PPT_Unique_ID", "Suitable_Title", "Slide_Category", "PPT_Owner", "Audience_Forum", "Short_Summary", | |
| "Slide_File_Path", "Slide_File_ID", "Full_PPT_File_Path", "Full_PPT_File_ID", | |
| "Thumbnail_File_Path", "Thumbnail_File_ID","Upload_date"]) | |
| for csv_file, folder_id, drop_column in [ | |
| ("Master_metadata.csv", metadata_folder_id, 'PPT_OCR_Text'), | |
| ("Master_fulltext_metadata.csv", metadata_with_fulltext_folder_id, None)]: | |
| #folder_id = get_folder_id(folder_path, headers) | |
| files = list_folder_files(folder_id, headers) | |
| file_item = next((item for item in files if item['name'] == csv_file), None) | |
| print('File items', file_item) | |
| if file_item: | |
| download_onedrive_file(file_item['id'], csv_file, headers) | |
| df_existing = pd.read_csv(csv_file) | |
| df_merged = pd.concat([df_existing, df_new], ignore_index=True) | |
| else: | |
| df_merged = df_new | |
| if drop_column: | |
| df_merged = df_merged.drop(columns=[drop_column]) | |
| df_merged.to_csv(csv_file, index=False) | |
| upload_to_onedrive(csv_file, folder_id, headers) | |
| print(f"✅ Uploaded: {csv_file}") | |
| return "✅PPT Processing and Metadata update complete!" | |
| # Main processing function | |
| def process_presentation(file): | |
| try: | |
| # Step 0: Validate file format | |
| file_path = file.name if hasattr(file, "name") else file | |
| file_extension = os.path.splitext(file_path)[-1].lower() | |
| gr.Info() | |
| if file_extension not in ['.pptx']: | |
| raise ValueError("Unsupported file format. Please upload .pptx") | |
| # Extract the base file name (without extension) | |
| file_name = os.path.basename(file_path).split('.')[0] | |
| print('File Name ',file_name) | |
| # Step 1: Generate unique PPT ID | |
| global ppt_unique_id | |
| ppt_unique_id = generate_unique_ppt_id() | |
| upload_date = datetime.now().strftime('%Y-%m-%d') | |
| # Step 2: Acquire access token via device flow | |
| # access_token = get_access_token() | |
| # print('access_token',access_token) | |
| print('PPT_unique id',ppt_unique_id) | |
| # Step 3: Get folder IDs for OneDrive | |
| # headers = { | |
| # "Authorization": f"Bearer {access_token}", | |
| # "Content-Type": "application/json" | |
| # } | |
| gr.Info('Connecting to OneDrive..') | |
| ppt_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/ppt_repo", headers) | |
| slides_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slides_repo", headers) | |
| slide_image_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slide_image_repo", headers) | |
| metadata_folder_id=get_folder_id('Projects Apps/PPT Maker/Metadata_file',headers) | |
| metadata_with_fulltext_folder_id=get_folder_id('Projects Apps/PPT Maker/Metadata_with_fulltext',headers) | |
| print('ppt_repo_folder_id',ppt_repo_folder_id) | |
| print('slides_repo_folder_id',slides_repo_folder_id) | |
| print('slide_image_repo_folder_id',slide_image_repo_folder_id) | |
| print('metadata_folder_id',metadata_folder_id) | |
| if not (ppt_repo_folder_id and slides_repo_folder_id and slide_image_repo_folder_id and metadata_folder_id) : | |
| gr.Error('Could not find or create required folders in OneDrive.') | |
| raise ValueError("Could not find or create required folders in OneDrive.") | |
| # Step 2: Upload the full PPT file to OneDrive | |
| #ppt_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/ppt_repo") | |
| # ✅ Step: Check if file already exists in ppt_repo | |
| existing_files = list_folder_files(ppt_repo_folder_id, headers) | |
| ppt_file_name = os.path.basename(file_path) | |
| if any(item['name'] == ppt_file_name for item in existing_files): | |
| gr.Error('⚠️ A file named ' + ppt_file_name + ' already exists in the PPT repository. Please rename your file or delete the existing one before re-uploading.') | |
| return f"⚠️ A file named '{ppt_file_name}' already exists in the PPT repository. Please rename your file or delete the existing one before re-uploading." | |
| full_ppt_file_id = upload_to_onedrive(file_path, ppt_repo_folder_id,headers) | |
| gr.Info('PPT uploaded to OneDrive..') | |
| full_ppt_file_name = os.path.basename(file_path) | |
| full_ppt_file_path = f"/Projects Apps/PPT Maker/ppt_repo/{full_ppt_file_name}" | |
| # Step 3: Split PPT into individual slides and convert to images | |
| gr.Info('Processing the PPT and indexing ..it may take a while ') | |
| temp_output_folder_slides = "/temp/temp_slides" | |
| temp_output_folder_images = "/temp/temp_images" | |
| slide_texts = split_and_convert_ppt(file_path, temp_output_folder_slides, temp_output_folder_images) | |
| print('PPT splitted and converted successfully') | |
| # Compile full OCR text | |
| full_text = "\n".join(slide_texts) | |
| gr.Info('AI agent processing the data .') | |
| metadata = generate_metadata_with_retry(full_text, retries=3, max_tokens=5000, decrement=100, model_name="cl100k_base") | |
| # Step 5: Process each slide and prepare metadata for storage | |
| #slides_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slides_repo") | |
| #slide_image_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slide_image_repo") | |
| metadata_list = [] | |
| gr.Info('Uploading the individual slides and images into repo ') | |
| for i, slide_text in enumerate(slide_texts): | |
| unique_slide_id = f"{file_name}_{ppt_unique_id}_slide_{i + 1}" | |
| slide_file_path = f"{temp_output_folder_slides}/{unique_slide_id}.pptx" | |
| slide_image_path = f"{temp_output_folder_images}/{unique_slide_id}_slide_1.png" | |
| # Upload individual slide (.pptx) to slides_repo | |
| slide_file_id = upload_to_onedrive(slide_file_path, slides_repo_folder_id,headers) | |
| slide_file_path_onedrive = f"/Projects Apps/PPT Maker/slides_repo/{unique_slide_id}.pptx" | |
| print(f'Slide{i} uploaded into Onedrive') | |
| # Upload slide image (.png) to slide_image_repo | |
| thumbnail_file_id = upload_to_onedrive(slide_image_path, slide_image_repo_folder_id,headers) | |
| thumbnail_file_path_onedrive = f"/Projects Apps/PPT Maker/slide_image_repo/{unique_slide_id}.png" | |
| print(f'Image{i} uploaded into Onedrive') | |
| # Generate embedding for the slide | |
| slide_embedding = embedding_model.encode(slide_text).tolist() | |
| short_summary_embedding = embedding_model.encode(metadata.Short_Summary).tolist() | |
| # Prepare metadata for storage | |
| metadata_list.append([ | |
| unique_slide_id, # Unique Slide ID | |
| slide_text, # Slide OCR Text | |
| full_text, # PPT OCR Text | |
| str(slide_embedding), # Embedding | |
| str(short_summary_embedding), | |
| ppt_unique_id, # PPT Unique ID | |
| metadata.Suitable_Title, # Suitable Title | |
| metadata.Slide_Category, # Slide Category | |
| metadata.PPT_Owner, # PPT Owner | |
| metadata.Audience_Forum, # Audience Forum | |
| metadata.Short_Summary, # Short Summary | |
| slide_file_path_onedrive, # Slide File Path (.pptx) | |
| slide_file_id, # Slide File ID (.pptx) | |
| full_ppt_file_path, # Full PPT File Path | |
| full_ppt_file_id, # Full PPT File ID | |
| thumbnail_file_path_onedrive, # Thumbnail File Path (.png) | |
| thumbnail_file_id , # Thumbnail File ID (.png) | |
| upload_date # upload date | |
| ]) | |
| # Clean up temporary files for this slide | |
| os.remove(slide_file_path) | |
| os.remove(slide_image_path) | |
| print('Slides cleared from temp') | |
| # # Clean up temporary folders | |
| # os.rmdir(temp_output_folder_slides) | |
| # os.rmdir(temp_output_folder_images) | |
| # Clean up temporary folders (forcefully deletes all contents inside) | |
| shutil.rmtree(temp_output_folder_slides, ignore_errors=True) | |
| shutil.rmtree(temp_output_folder_images, ignore_errors=True) | |
| print('Temp folders cleared') | |
| gr.Info('Vectorising the meta data and uploading in Onedrive..') | |
| return update_and_upload_metadata_simplified( | |
| metadata_list, | |
| metadata_folder_id, | |
| metadata_with_fulltext_folder_id, | |
| headers | |
| ) | |
| except Exception as e: | |
| return f"An error occurred: {str(e)}" | |