Spaces:
Sleeping
Sleeping
| # APP.PY | |
| from msal import PublicClientApplication | |
| import requests | |
| import gradio as gr | |
| import pandas as pd | |
| import tiktoken | |
| import tempfile | |
| from PyPDF2 import PdfReader | |
| from tqdm import tqdm | |
| from pydantic import BaseModel, Field | |
| from phi.agent import Agent, RunResponse | |
| from phi.model.groq import Groq | |
| from sentence_transformers import SentenceTransformer | |
| from sentence_transformers import CrossEncoder | |
| #from gradio_client import Client, handle_file | |
| import os | |
| from pptx import Presentation | |
| from pptx2img import PPTXConverter # For splitting slides | |
| import uuid | |
| import shutil | |
| from PIL import Image | |
| import pandas as pd | |
| import requests | |
| import gradio as gr | |
| from pydantic import BaseModel, Field | |
| from typing import List | |
| import tiktoken | |
| from datetime import datetime | |
| import zipfile | |
| from PIL import Image | |
| import gradio as gr | |
| import threading | |
| import time | |
| # Importing functions from files | |
| # from upload_function import process_presentation,get_folder_id | |
| # from view_ppt import search_ppts | |
| # from stats_dashboard import get_dashboard_stats ,update_dashboard | |
| # from search_slides import search_slides,combine_slides_as_zip | |
| # Configure Microsoft Authentication | |
| # Access secrets securely | |
| GROQ_API_KEY = os.getenv("GROQ_API_KEY") | |
| CLIENT_ID = os.getenv("CLIENT_ID") | |
| TENANT_ID = os.getenv("TENANT_ID") | |
| ADMIN_USERNAME = os.getenv("ADMIN_USERNAME") | |
| ADMIN_PASSWORD = os.getenv("ADMIN_PASSWORD") | |
| AUTHORITY = f"https://login.microsoftonline.com/{TENANT_ID}" | |
| SCOPES = ["Files.ReadWrite.All", "User.Read"] | |
| os.environ["GROQ_API_KEY"] = GROQ_API_KEY | |
| embedding_model = SentenceTransformer('nomic-ai/nomic-embed-text-v1', trust_remote_code=True) | |
| from sentence_transformers import CrossEncoder | |
| cross_encoder = CrossEncoder('cross-encoder/ms-marco-MiniLM-L-6-v2') # For reranking) # For reranking | |
| access_token_state = {"token": None} | |
| flow_state = {"flow": None} | |
| global headers | |
| global df | |
| global search_results | |
| from config import temp_file_path # Import the global variable | |
| headers = { | |
| "Authorization": None, | |
| "Content-Type": "application/json" | |
| } | |
| # Local cache directory for downloaded files | |
| LOCAL_CACHE_DIR = "local_cache" | |
| os.makedirs(LOCAL_CACHE_DIR, exist_ok=True) | |
| app = PublicClientApplication(client_id=CLIENT_ID, authority=AUTHORITY) | |
| # Define Metadata Schema | |
| class PPTMetadata(BaseModel): | |
| PPT_Unique_ID: str = Field(description="A unique identifier for the presentation (e.g., filename or hash).") | |
| Suitable_Title: str = Field(description="A concise and meaningful title for the presentation.") | |
| Slide_Category: str = Field(description="The category or theme of the slides (e.g., Risk management, Data Analytics, Technology etc ).") | |
| PPT_Owner:str = Field(description="The owner of the presentation ie who makes the presentation (eg: NCTC,DG Systems, Directorate of Logistics etc ,Not available if not found )") | |
| Audience_Forum: str = Field(description="The intended audience or forum for the presentation/to whom the presentaiton is made (e.g., NACIN, WCO, Presentation before Member (CBIC),Not available if not found).") | |
| Short_Summary: str = Field(description="A brief summary of the presentation's content with all keywords in 10 sentences covering all keywords.") | |
| # Function to download metadata file from OneDrive | |
| def download_metadata_file(metadata_folder_id, headers): | |
| metadata_file_name = "Master_metadata.csv" | |
| url = f"https://graph.microsoft.com/v1.0/me/drive/items/{metadata_folder_id}/children" | |
| response = requests.get(url, headers=headers) | |
| if response.status_code != 200: | |
| raise ValueError(f"Failed to list folder contents. Error: {response.text}") | |
| items = response.json().get("value", []) | |
| file_item = next((item for item in items if item['name'] == metadata_file_name), None) | |
| if not file_item: | |
| raise FileNotFoundError(f"{metadata_file_name} not found in OneDrive folder.") | |
| download_url = file_item["@microsoft.graph.downloadUrl"] | |
| response = requests.get(download_url) | |
| if response.status_code != 200: | |
| raise ValueError(f"Failed to download {metadata_file_name}. Error: {response.text}") | |
| # Use tempfile to create a temporary file | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".csv") as temp_file: | |
| temp_file.write(response.content) | |
| temp_file_path = temp_file.name # Save the path to the temporary file | |
| print(f"β Downloaded: {metadata_file_name} to temporary file: {temp_file_path}") | |
| # with open(metadata_file_name, 'wb') as f: | |
| # f.write(response.content) | |
| # print(f"β Downloaded: {metadata_file_name}") | |
| return temp_file_path | |
| ##################################################### STATS DASHBOARD ################################################################## | |
| def update_dashboard(): | |
| total_ppts, total_slides, chart_data, latest_html = get_dashboard_stats() | |
| return ( | |
| gr.update(visible=True), | |
| gr.update(value=f"<div><h3>Total PPTs: {total_ppts}</h3></div>"), | |
| gr.update(value=f"<div><h3>Total Slides: {total_slides}</h3></div>"), | |
| gr.update(value=chart_data), | |
| gr.update(value=latest_html) | |
| ) | |
| import pandas as pd | |
| import gradio as gr | |
| import os | |
| def get_dashboard_stats(): | |
| # Load metadata CSV | |
| global temp_file_path | |
| global df | |
| print('Reading CSV...',temp_file_path) | |
| #metadata_file_name= "Master_metadata.csv" | |
| # df = pd.read_csv(metadata_file_name) | |
| #temp_file_path = os.path.join("/tmp", metadata_file_name) | |
| df = pd.read_csv(temp_file_path) | |
| # Ensure upload_date column is in datetime format | |
| df["Upload_date"] = pd.to_datetime(df["Upload_date"], errors="coerce") | |
| print(df) | |
| # Total unique PPTs and slides | |
| total_ppts = df["PPT_Unique_ID"].nunique() | |
| total_slides = len(df) | |
| # Monthly PPT uploads | |
| df["month_year"] = df["Upload_date"].dt.to_period("M").astype(str) | |
| monthly_stats = df.groupby("month_year")["PPT_Unique_ID"].nunique().reset_index() | |
| monthly_stats.columns = ["Month", "PPT Uploads"] | |
| # Gradio BarPlot requires a DataFrame | |
| chart_data = monthly_stats | |
| # Latest 5 PPTs by upload date | |
| latest_df = df.drop_duplicates(subset="PPT_Unique_ID").sort_values("Upload_date", ascending=False) | |
| latest_5 = latest_df[["Suitable_Title", "Slide_Category","Upload_date"]].head(5) | |
| # Create HTML for the latest PPTs list | |
| # Create HTML for the latest PPTs list with heading | |
| latest_html = "<h4 style='margin-bottom: 8px;'>π Top 5 Latest Uploaded PPTs</h4><ul style='line-height:1.6em;'>" | |
| for _, row in latest_5.iterrows(): | |
| title = row["Suitable_Title"] | |
| category = row["Slide_Category"] | |
| date_str = row["Upload_date"].strftime("%Y-%m-%d") if pd.notnull(row["Upload_date"]) else "Unknown Date" | |
| latest_html += f"<li><b>{title}</b> <br><i>{category}</i> β <span style='color:gray;'>{date_str}</span></li>" | |
| latest_html += "</ul>" | |
| return total_ppts, total_slides, chart_data, latest_html | |
| ############################################################# UPLOAD PPT ####################################################################### | |
| import requests | |
| def get_access_token(): | |
| flow = app.initiate_device_flow(scopes=SCOPES) | |
| print("Go to", flow["verification_uri"]) | |
| print("Enter the code:", flow["user_code"]) | |
| result = app.acquire_token_by_device_flow(flow) | |
| if "access_token" not in result: | |
| print("β Could not acquire token:", result.get("error_description")) | |
| exit() | |
| return result["access_token"] | |
| # Function to generate a unique PPT ID | |
| def generate_unique_ppt_id(): | |
| return str(uuid.uuid4())[:8] # Generate an 8-character unique ID | |
| def truncate_text_to_tokens(text, max_tokens, model_name="cl100k_base"): | |
| encoding = tiktoken.get_encoding(model_name) | |
| tokens = encoding.encode(text) | |
| truncated_tokens = tokens[:max_tokens] | |
| return encoding.decode(truncated_tokens) | |
| def split_and_convert_ppt(file_path, output_folder_slides, output_folder_images): | |
| os.makedirs(output_folder_slides, exist_ok=True) | |
| os.makedirs(output_folder_images, exist_ok=True) | |
| presentation = Presentation(file_path) | |
| slide_texts = [] | |
| file_name = os.path.basename(file_path).split('.')[0] | |
| print('File Name ',file_name) | |
| print('File Path ',file_path) | |
| for i in range(len(presentation.slides)): | |
| unique_slide_id = f"{file_name}_{ppt_unique_id}_slide_{i + 1}" | |
| slide_file_path = os.path.join(output_folder_slides, f"{unique_slide_id}.pptx") | |
| print('Slide_file_path',slide_file_path) | |
| image_path = os.path.join(output_folder_images, f"{unique_slide_id}_slide_1.png") # refer to pptx2img it stores iamge in this format new_name = f"{pptx_name}_slide_{idx + 1}.png" | |
| print('Image file path',image_path) | |
| # β Step 1: Create a single-slide PPTX | |
| new_presentation = Presentation(file_path) | |
| slide_indexes_to_remove = [j for j in range(len(new_presentation.slides)) if j != i] | |
| for idx in sorted(slide_indexes_to_remove, reverse=True): | |
| r_id = new_presentation.slides._sldIdLst[idx].rId | |
| new_presentation.part.drop_rel(r_id) | |
| del new_presentation.slides._sldIdLst[idx] | |
| new_presentation.save(slide_file_path) | |
| del new_presentation | |
| # β Step 2: Convert the single-slide PPTX to image | |
| converter = PPTXConverter() | |
| converter.convert_pptx_to_images(slide_file_path, output_folder_images) | |
| print(f"Slide {i+1} converted to image: {image_path}") | |
| # β Step 3: Extract text from the slide image # Switching off OCR | |
| #slide_text = extract_text_from_image(image_path) | |
| #using PPTX for text extraction(actualy its quality is better then tesseratct) | |
| # Extract text using python-pptx (editable text) | |
| slide = presentation.slides[i] | |
| pptx_text = "" | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text"): | |
| pptx_text += shape.text.strip() + "\n" | |
| print(f"π‘ PPTX Text Extractedfrom slide {i + 1}:\n", pptx_text.strip()) | |
| slide_texts.append(pptx_text.strip()) | |
| return slide_texts | |
| def generate_metadata_with_retry(full_text, retries=3, max_tokens=5000, decrement=100, model_name="cl100k_base"): | |
| for attempt in range(1, retries + 2): | |
| try: | |
| truncated_text = truncate_text_to_tokens(full_text, max_tokens, model_name) | |
| print(f"π Attempt {attempt}: Generating metadata with ~{count_tokens(truncated_text)} tokens...") | |
| metadata = generate_metadata(truncated_text) | |
| print("π Metadata generated successfully.") | |
| return metadata # β Return on success | |
| except Exception as e: | |
| print(f"β Error on attempt {attempt}: {str(e)}") | |
| if attempt == retries + 1: | |
| print("π¨ Max retries reached. Metadata generation failed.") | |
| return None | |
| else: | |
| max_tokens -= decrement | |
| print(f"π Retrying with {max_tokens} tokens...") | |
| # Function to generate metadata using phidata agent | |
| def generate_metadata(ocr_text): | |
| # Initialize the Agent with detailed instructions | |
| metadata_agent = Agent( | |
| name="Metadata Generator", | |
| role="Generates structured metadata for presentations based on their content.", | |
| instructions=[ | |
| "Your task is to analyze the provided text and generate structured metadata for the presentation.", | |
| "Carefully evaluate the content to determine the most appropriate values for each metadata field.", | |
| # Rule 1: PPT Unique ID | |
| "For the 'PPT_Unique_ID', use the first 8 characters of the MD5 hash of the input text. " | |
| "This ensures uniqueness across presentations.", | |
| # Rule 2: Suitable Title | |
| "For the 'Suitable_Title', create a concise and meaningful title that captures the essence of the presentation. " | |
| "Focus on first slide where title of presentation is given along with key themes, topics, or keywords mentioned in the text.", | |
| # Rule 3: Slide Category | |
| "For the 'Slide_Category', classify the presentation into one of the following categories: " | |
| "The category or theme of the slides (e.g., Risk management , Data Analytics , Technology etc)" | |
| "Base your decision on the overall theme or subject matter of the content.", | |
| # Rule 4 :PPT owner | |
| "Find The owner of the presentation ie who makes the presentation (eg: Done by name and designation ie Mr. baswaraj ,Princpial ADG , Additional Director ,or organisations like NCTC,DG Systems, Directorate of Logistics etc)" | |
| "Dont Asssume if u could not found ,mention Not Available" | |
| # Rule 5: Audience/Forum | |
| "For the 'Audience_Forum', identify the target audience or forum for the presentation. " | |
| "(e.g.,NACIN , WCO, Presentation before Member (CBIC)etc )." | |
| "Dont Asssume if could not found ,mention Not Available" | |
| "Consider the tone, language, and purpose of the content.", | |
| # Rule 6: Short Summary | |
| "For the 'Short_Summary', provide a brief summary of the presentation's content with all keywords in 10 sentences. " | |
| "Highlight the keywords ,topics, main points or objectives of the presentation.", | |
| "Mention the title also in the short summary ,owner and audience of the presentation" | |
| # General Guidelines | |
| "Ensure all fields are filled and meaningful. If unsure about a field, make an educated guess based on the context.", | |
| ], | |
| model=Groq(id="deepseek-r1-distill-llama-70b"), # Replace with actual model ID | |
| response_model=PPTMetadata, | |
| markdown=True, | |
| debug_mode=True, | |
| show_tool_calls=True, | |
| monitoring=True) | |
| # Run the agent to generate metadata | |
| response = metadata_agent.run( | |
| f"Generate data fields for the following presentation content: {ocr_text}") | |
| return response.content | |
| # Function to get folder ID in OneDrive | |
| def get_folder_id(folder_path, headers): | |
| folders = folder_path.split("/") | |
| parent_id = None | |
| print("creating folder id for ",folder_path) | |
| for folder_name in folders: | |
| url = f"https://graph.microsoft.com/v1.0/me/drive/root/children" if not parent_id else f"https://graph.microsoft.com/v1.0/me/drive/items/{parent_id}/children" | |
| response = requests.get(url, headers=headers) | |
| if response.status_code != 200: | |
| print(f"Failed to retrieve folder '{folder_name}'. Error: {response.text}") | |
| return None | |
| items = response.json().get("value", []) | |
| folder_item = next((item for item in items if item["name"] == folder_name and "folder" in item), None) | |
| if not folder_item: | |
| # Create the folder if it doesn't exist | |
| create_url = "https://graph.microsoft.com/v1.0/me/drive/root/children" if not parent_id else f"https://graph.microsoft.com/v1.0/me/drive/items/{parent_id}/children" | |
| create_response = requests.post(create_url, headers=headers, json={ | |
| "name": folder_name, | |
| "folder": {}, | |
| "@microsoft.graph.conflictBehavior": "rename" | |
| }) | |
| if create_response.status_code not in [200, 201]: | |
| print(f"Failed to create folder '{folder_name}'. Error: {create_response.text}") | |
| return None | |
| folder_item = create_response.json() | |
| parent_id = folder_item["id"] | |
| return parent_id | |
| # Function to upload file to OneDrive | |
| def upload_to_onedrive(file_path, folder_id, headers): | |
| file_name = os.path.basename(file_path) | |
| upload_url = f"https://graph.microsoft.com/v1.0/me/drive/items/{folder_id}:/{file_name}:/content" | |
| with open(file_path, "rb") as file: | |
| file_content = file.read() | |
| response = requests.put(upload_url, headers=headers, data=file_content) | |
| if response.status_code in [200, 201]: | |
| print(f"Uploaded {file_name} to OneDrive.") | |
| return response.json()["id"] | |
| else: | |
| print(f"Failed to upload {file_name}. Error: {response.text}") | |
| return None | |
| # Function to count tokens using tiktoken | |
| def count_tokens(text, model_name="cl100k_base"): | |
| encoding = tiktoken.get_encoding(model_name) | |
| tokens = encoding.encode(text) | |
| return len(tokens) | |
| def list_folder_files(folder_id, headers): | |
| url = f"https://graph.microsoft.com/v1.0/me/drive/items/{folder_id}/children" | |
| response = requests.get(url, headers=headers) | |
| if response.status_code != 200: | |
| raise ValueError(f"Failed to list folder contents. Error: {response.text}") | |
| return response.json().get("value", []) | |
| def download_onedrive_file(file_id, filename, headers): | |
| url = f"https://graph.microsoft.com/v1.0/me/drive/items/{file_id}" | |
| r = requests.get(url, headers=headers).json() | |
| download_url = r.get("@microsoft.graph.downloadUrl") | |
| response = requests.get(download_url) | |
| with open(filename, 'wb') as f: | |
| f.write(response.content) | |
| def update_and_upload_metadata_simplified(metadata_list, metadata_folder_id, metadata_with_fulltext_folder_id, headers): | |
| df_new = pd.DataFrame(metadata_list, columns=[ | |
| "Unique_Slide_ID", "Slide_OCR_Text", "PPT_OCR_Text", "Slide_Embedding", "Short_Summary_Embedding", | |
| "PPT_Unique_ID", "Suitable_Title", "Slide_Category", "PPT_Owner", "Audience_Forum", "Short_Summary", | |
| "Slide_File_Path", "Slide_File_ID", "Full_PPT_File_Path", "Full_PPT_File_ID", | |
| "Thumbnail_File_Path", "Thumbnail_File_ID","Upload_date"]) | |
| for csv_file, folder_id, drop_column in [ | |
| ("Master_metadata.csv", metadata_folder_id, 'PPT_OCR_Text'), | |
| ("Master_fulltext_metadata.csv", metadata_with_fulltext_folder_id, None)]: | |
| #folder_id = get_folder_id(folder_path, headers) | |
| files = list_folder_files(folder_id, headers) | |
| file_item = next((item for item in files if item['name'] == csv_file), None) | |
| print('File items', file_item) | |
| if file_item: | |
| download_onedrive_file(file_item['id'], csv_file, headers) | |
| df_existing = pd.read_csv(csv_file) | |
| df_merged = pd.concat([df_existing, df_new], ignore_index=True) | |
| else: | |
| df_merged = df_new | |
| if drop_column: | |
| df_merged = df_merged.drop(columns=[drop_column]) | |
| df_merged.to_csv(csv_file, index=False) | |
| upload_to_onedrive(csv_file, folder_id, headers) | |
| print(f"β Uploaded: {csv_file}") | |
| return "β PPT Processing and Metadata update complete!" | |
| # Main processing function | |
| def process_presentation(file): | |
| try: | |
| # Step 0: Validate file format | |
| file_path = file.name if hasattr(file, "name") else file | |
| file_extension = os.path.splitext(file_path)[-1].lower() | |
| gr.Info() | |
| if file_extension not in ['.pptx']: | |
| raise ValueError("Unsupported file format. Please upload .pptx") | |
| # Extract the base file name (without extension) | |
| file_name = os.path.basename(file_path).split('.')[0] | |
| print('File Name ',file_name) | |
| # Step 1: Generate unique PPT ID | |
| global ppt_unique_id | |
| ppt_unique_id = generate_unique_ppt_id() | |
| upload_date = datetime.now().strftime('%Y-%m-%d') | |
| # Step 2: Acquire access token via device flow | |
| # access_token = get_access_token() | |
| # print('access_token',access_token) | |
| print('PPT_unique id',ppt_unique_id) | |
| # Step 3: Get folder IDs for OneDrive | |
| # headers = { | |
| # "Authorization": f"Bearer {access_token}", | |
| # "Content-Type": "application/json" | |
| # } | |
| gr.Info('Connecting to OneDrive..') | |
| ppt_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/ppt_repo", headers) | |
| slides_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slides_repo", headers) | |
| slide_image_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slide_image_repo", headers) | |
| metadata_folder_id=get_folder_id('Projects Apps/PPT Maker/Metadata_file',headers) | |
| metadata_with_fulltext_folder_id=get_folder_id('Projects Apps/PPT Maker/Metadata_with_fulltext',headers) | |
| print('ppt_repo_folder_id',ppt_repo_folder_id) | |
| print('slides_repo_folder_id',slides_repo_folder_id) | |
| print('slide_image_repo_folder_id',slide_image_repo_folder_id) | |
| print('metadata_folder_id',metadata_folder_id) | |
| if not (ppt_repo_folder_id and slides_repo_folder_id and slide_image_repo_folder_id and metadata_folder_id) : | |
| gr.Error('Could not find or create required folders in OneDrive.') | |
| raise ValueError("Could not find or create required folders in OneDrive.") | |
| # Step 2: Upload the full PPT file to OneDrive | |
| #ppt_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/ppt_repo") | |
| # β Step: Check if file already exists in ppt_repo | |
| existing_files = list_folder_files(ppt_repo_folder_id, headers) | |
| ppt_file_name = os.path.basename(file_path) | |
| if any(item['name'] == ppt_file_name for item in existing_files): | |
| gr.Error('β οΈ A file named ' + ppt_file_name + ' already exists in the PPT repository. Please rename your file or delete the existing one before re-uploading.') | |
| return f"β οΈ A file named '{ppt_file_name}' already exists in the PPT repository. Please rename your file or delete the existing one before re-uploading." | |
| full_ppt_file_id = upload_to_onedrive(file_path, ppt_repo_folder_id,headers) | |
| gr.Info('PPT uploaded to OneDrive..') | |
| full_ppt_file_name = os.path.basename(file_path) | |
| full_ppt_file_path = f"/Projects Apps/PPT Maker/ppt_repo/{full_ppt_file_name}" | |
| # Step 3: Split PPT into individual slides and convert to images | |
| gr.Info('Processing the PPT and indexing ..it may take a while ') | |
| temp_output_folder_slides = "temp_slides" | |
| temp_output_folder_images = "temp_images" | |
| slide_texts = split_and_convert_ppt(file_path, temp_output_folder_slides, temp_output_folder_images) | |
| print('PPT splitted and converted successfully') | |
| # Compile full OCR text | |
| full_text = "\n".join(slide_texts) | |
| gr.Info('AI agent processing the data .') | |
| metadata = generate_metadata_with_retry(full_text, retries=3, max_tokens=5000, decrement=100, model_name="cl100k_base") | |
| # Step 5: Process each slide and prepare metadata for storage | |
| #slides_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slides_repo") | |
| #slide_image_repo_folder_id = get_folder_id("Projects Apps/PPT Maker/slide_image_repo") | |
| metadata_list = [] | |
| gr.Info('Uploading the individual slides and images into repo ') | |
| for i, slide_text in enumerate(slide_texts): | |
| unique_slide_id = f"{file_name}_{ppt_unique_id}_slide_{i + 1}" | |
| slide_file_path = f"{temp_output_folder_slides}/{unique_slide_id}.pptx" | |
| slide_image_path = f"{temp_output_folder_images}/{unique_slide_id}_slide_1.png" | |
| # Upload individual slide (.pptx) to slides_repo | |
| slide_file_id = upload_to_onedrive(slide_file_path, slides_repo_folder_id,headers) | |
| slide_file_path_onedrive = f"/Projects Apps/PPT Maker/slides_repo/{unique_slide_id}.pptx" | |
| print(f'Slide{i} uploaded into Onedrive') | |
| # Upload slide image (.png) to slide_image_repo | |
| thumbnail_file_id = upload_to_onedrive(slide_image_path, slide_image_repo_folder_id,headers) | |
| thumbnail_file_path_onedrive = f"/Projects Apps/PPT Maker/slide_image_repo/{unique_slide_id}.png" | |
| print(f'Image{i} uploaded into Onedrive') | |
| # Generate embedding for the slide | |
| slide_embedding = embedding_model.encode(slide_text).tolist() | |
| short_summary_embedding = embedding_model.encode(metadata.Short_Summary).tolist() | |
| # Prepare metadata for storage | |
| metadata_list.append([ | |
| unique_slide_id, # Unique Slide ID | |
| slide_text, # Slide OCR Text | |
| full_text, # PPT OCR Text | |
| str(slide_embedding), # Embedding | |
| str(short_summary_embedding), | |
| ppt_unique_id, # PPT Unique ID | |
| metadata.Suitable_Title, # Suitable Title | |
| metadata.Slide_Category, # Slide Category | |
| metadata.PPT_Owner, # PPT Owner | |
| metadata.Audience_Forum, # Audience Forum | |
| metadata.Short_Summary, # Short Summary | |
| slide_file_path_onedrive, # Slide File Path (.pptx) | |
| slide_file_id, # Slide File ID (.pptx) | |
| full_ppt_file_path, # Full PPT File Path | |
| full_ppt_file_id, # Full PPT File ID | |
| thumbnail_file_path_onedrive, # Thumbnail File Path (.png) | |
| thumbnail_file_id , # Thumbnail File ID (.png) | |
| upload_date # upload date | |
| ]) | |
| # Clean up temporary files for this slide | |
| os.remove(slide_file_path) | |
| os.remove(slide_image_path) | |
| print('Slides cleared from temp') | |
| # # Clean up temporary folders | |
| # os.rmdir(temp_output_folder_slides) | |
| # os.rmdir(temp_output_folder_images) | |
| # Clean up temporary folders (forcefully deletes all contents inside) | |
| shutil.rmtree(temp_output_folder_slides, ignore_errors=True) | |
| shutil.rmtree(temp_output_folder_images, ignore_errors=True) | |
| print('Temp folders cleared') | |
| gr.Info('Vectorising the meta data and uploading in Onedrive..') | |
| return update_and_upload_metadata_simplified( | |
| metadata_list, | |
| metadata_folder_id, | |
| metadata_with_fulltext_folder_id, | |
| headers | |
| ) | |
| except Exception as e: | |
| return f"An error occurred: {str(e)}" | |
| ############################################################################### SEARCH PPT ###################################### | |
| import requests | |
| from sentence_transformers import SentenceTransformer, CrossEncoder | |
| from sklearn.metrics.pairwise import cosine_similarity | |
| import os | |
| import shutil | |
| import gradio as gr | |
| # Local cache directory for downloaded files | |
| LOCAL_CACHE_DIR = "local_cache" | |
| os.makedirs(LOCAL_CACHE_DIR, exist_ok=True) | |
| # Function to download a file from OneDrive to the local cache | |
| def download_file_from_onedrive(file_path, file_id, headers): | |
| local_file_path = os.path.join(LOCAL_CACHE_DIR, os.path.basename(file_path)) | |
| if not os.path.exists(local_file_path): # Avoid re-downloading | |
| download_url = f"https://graph.microsoft.com/v1.0/me/drive/items/{file_id}/content" | |
| response = requests.get(download_url, headers=headers) | |
| if response.status_code != 200: | |
| raise ValueError(f"Failed to download file {file_path}. Error: {response.text}") | |
| with open(local_file_path, "wb") as f: | |
| f.write(response.content) | |
| print(f"β Downloaded: {file_path} -> {local_file_path}") | |
| return local_file_path | |
| # Function to search PPTs | |
| def search_ppts(query, num_results): | |
| global df | |
| gr.Info("Searching the relevant PPTs .") | |
| # Generate query embedding | |
| query_embedding = embedding_model.encode(query).tolist() | |
| # Filter the DataFrame to include only rows where Unique_Slide_ID ends with "slide_1" | |
| df1 = df[df['Unique_Slide_ID'].str.endswith("slide_1", na=False)] | |
| # Compute cosine similarity scores | |
| df1['similarity'] = df1['Short_Summary_Embedding'].apply( | |
| lambda x: cosine_similarity([query_embedding], [eval(x)])[0][0] | |
| ) | |
| # Sort by cosine similarity score | |
| df1 = df1.sort_values(by='similarity', ascending=False) | |
| # Get top N results for reranking | |
| top_n = min(50, len(df1)) # Take top 50 results for reranking | |
| top_results = df1.head(top_n) | |
| # Prepare input pairs for cross-encoder reranking | |
| pairs = [(query, row['Short_Summary']) for _, row in top_results.iterrows()] | |
| # Rerank using cross-encoder | |
| gr.Info("Doing Semantic Reranking for most appropriate results ") | |
| rerank_scores = cross_encoder.predict(pairs) | |
| top_results = top_results.copy() # Avoid SettingWithCopyWarning | |
| top_results['rerank_score'] = rerank_scores | |
| # Sort by rerank score | |
| top_results = top_results.sort_values(by='rerank_score', ascending=False) | |
| print(top_results) | |
| # Prepare results | |
| results = [] | |
| gr.Info('Downloading PPT images and ppt') | |
| print('Downloading PPT images and ppt') | |
| for _, row in top_results.head(num_results).iterrows(): | |
| # Download slide image locally | |
| slide_image_path = download_file_from_onedrive( | |
| row['Thumbnail_File_Path'], row['Thumbnail_File_ID'], headers | |
| ) | |
| # Download full PPT locally | |
| ppt_download_link = download_file_from_onedrive( | |
| row['Full_PPT_File_Path'], row['Full_PPT_File_ID'], headers | |
| ) | |
| title = row['Suitable_Title'] | |
| owner = row['PPT_Owner'] | |
| category = row['Slide_Category'] | |
| summary = row['Short_Summary'] | |
| results.append({ | |
| "image": slide_image_path, | |
| "title": title, | |
| "owner": owner, | |
| "category": category, | |
| "summary": summary, | |
| "download_link": ppt_download_link | |
| }) | |
| print("downloading complete ") | |
| # Update visibility of rows | |
| visible_rows = min(len(results), num_results) | |
| row_updates = [] | |
| row_updates = [] | |
| for i in range(20): | |
| if i < len(results): | |
| result = results[i] | |
| row_updates.extend([ | |
| gr.update(visible=True), # β Make the row visible | |
| gr.update(value=result["image"], visible=True), | |
| gr.update(value=f"<b>Title:</b> {result['title']}<br><b>Owner:</b> {result['owner']}<br><b>Category:</b> {result['category']}", visible=True), | |
| gr.update(value=result["summary"], visible=True), | |
| gr.update(value=result["download_link"], visible=True), | |
| ]) | |
| else: | |
| row_updates.extend([gr.update(visible=False)] * 5) # row + 4 components | |
| return row_updates | |
| ################################################################ SEARCH SLIDES ######################## | |
| import requests | |
| import gradio as gr | |
| import pandas as pd | |
| import tiktoken | |
| import tempfile | |
| from PyPDF2 import PdfReader | |
| from tqdm import tqdm | |
| from pydantic import BaseModel, Field | |
| from phi.agent import Agent, RunResponse | |
| from phi.model.groq import Groq | |
| from sentence_transformers import SentenceTransformer | |
| from sentence_transformers import CrossEncoder | |
| #from gradio_client import Client, handle_file | |
| import os | |
| from pptx import Presentation | |
| from pptx2img import PPTXConverter # For splitting slides | |
| import uuid | |
| import shutil | |
| from PIL import Image | |
| import pandas as pd | |
| import requests | |
| import gradio as gr | |
| from pydantic import BaseModel, Field | |
| from typing import List | |
| import tiktoken | |
| from datetime import datetime | |
| import zipfile | |
| from PIL import Image | |
| import gradio as gr | |
| import threading | |
| import time | |
| # Global variable to store search results | |
| search_results = [] | |
| def search_slides(query, num_results): | |
| global search_results # Use the global variable to store results | |
| global df | |
| # # Load metadata file | |
| # gr.Info("Downloading the master file to search..") | |
| # metadata_folder_id = get_folder_id("Projects Apps/PPT Maker/Metadata_file", headers) | |
| # download_metadata_file(metadata_folder_id, headers) # Explicit call to download metadata | |
| # metadata_file = "Master_metadata.csv" | |
| # if not os.path.exists(metadata_file): | |
| # return [gr.update(visible=False) for _ in range(20)], "Metadata file not found." | |
| # df = pd.read_csv(metadata_file) | |
| gr.Info("Searching the relevant slides.") | |
| # Generate query embedding | |
| query_embedding = embedding_model.encode(query).tolist() | |
| # Compute cosine similarity scores | |
| df['similarity'] = df['Slide_Embedding'].apply( | |
| lambda x: cosine_similarity([query_embedding], [eval(x)])[0][0] | |
| ) | |
| # Sort by cosine similarity score | |
| df = df.sort_values(by='similarity', ascending=False) | |
| # Get top N results for reranking | |
| top_n = min(50, len(df)) # Take top 50 results for reranking | |
| top_results = df.head(top_n) | |
| # Prepare input pairs for cross-encoder reranking | |
| pairs = [(query, row['Short_Summary']) for _, row in top_results.iterrows()] | |
| # Rerank using cross-encoder | |
| gr.Info("Doing Semantic Reranking for most appropriate results") | |
| rerank_scores = cross_encoder.predict(pairs) | |
| top_results = top_results.copy() # Avoid SettingWithCopyWarning | |
| top_results['rerank_score'] = rerank_scores | |
| # Sort by rerank score | |
| top_results = top_results.sort_values(by='rerank_score', ascending=False) | |
| # Prepare results | |
| results = [] | |
| gr.Info('Downloading slide images') | |
| for _, row in top_results.head(num_results).iterrows(): | |
| # Download slide image locally | |
| slide_image_path = download_file_from_onedrive( | |
| row['Thumbnail_File_Path'], row['Thumbnail_File_ID'], headers | |
| ) | |
| # Download full PPT locally | |
| slide_download_link = download_file_from_onedrive( | |
| row['Slide_File_Path'], row['Slide_File_ID'], headers | |
| ) | |
| title = row['Suitable_Title'] | |
| owner = row['PPT_Owner'] | |
| category = row['Slide_Category'] | |
| summary = row['Short_Summary'] | |
| results.append({ | |
| "image": slide_image_path, | |
| "title": title, | |
| "owner": owner, | |
| "category": category, | |
| "summary": summary, | |
| "slide_path": slide_download_link | |
| }) | |
| # Store results in the global variable | |
| search_results = results | |
| # Update visibility of rows | |
| visible_rows = min(len(results), num_results) | |
| row_updates = [] | |
| for i in range(20): # Loop through all 20 rows | |
| if i < visible_rows: # For rows with results | |
| result = results[i] | |
| row_updates.extend([ | |
| gr.update(visible=True), # Row visibility | |
| gr.update(value=result["image"], visible=True), | |
| gr.update(value=f"<b>Title:</b> {result['title']}<br><b>Owner:</b> {result['owner']}<br><b>Category:</b> {result['category']}", visible=True), | |
| gr.update(value=result["slide_path"], visible=True), # Slide path for identification | |
| gr.update(visible=True) # Checkbox visibility | |
| ]) | |
| else: # For rows without results | |
| row_updates.extend([gr.update(visible=False)] * 6) # Row + 5 components | |
| return row_updates | |
| def combine_slides_as_zip(*checkbox_values): | |
| """ | |
| Collects selected individual slide files and zips them. | |
| Returns the path to the ZIP file. | |
| """ | |
| selected_files = [ | |
| result["slide_path"] for result, selected in zip(search_results, checkbox_values) if selected | |
| ] | |
| if not selected_files: | |
| return "No slides selected." | |
| zip_filename = os.path.join(LOCAL_CACHE_DIR, "selected_slides.zip") | |
| with zipfile.ZipFile(zip_filename, 'w') as zipf: | |
| for file_path in selected_files: | |
| arcname = os.path.basename(file_path) # Only filename in zip | |
| zipf.write(file_path, arcname=arcname) | |
| return zip_filename | |
| # Background thread to wait for login | |
| def background_login(flow): | |
| global headers | |
| result = app.acquire_token_by_device_flow(flow) | |
| access_token = result["access_token"] | |
| if "access_token" in result: | |
| access_token_state["token"] = result["access_token"] | |
| access_token = result["access_token"] | |
| headers = { | |
| "Authorization": f"Bearer {access_token}", | |
| "Content-Type": "application/json" | |
| } | |
| else: | |
| access_token_state["token"] = "ERROR" | |
| def login_action(): | |
| flow = app.initiate_device_flow(scopes=SCOPES) | |
| flow_state["flow"] = flow | |
| login_url = flow["verification_uri"] | |
| login_code = flow["user_code"] | |
| instructions = f""" | |
| <p style='text-align:center; color:#1E3A8A;'>Please go to the following link to authenticate:</p> | |
| <p style='text-align:center;'><a href='{login_url}' target='_blank'>{login_url}</a></p> | |
| <p style='text-align:center;'>Enter the code: <strong>{login_code}</strong></p> | |
| """ | |
| # Start background login thread | |
| threading.Thread(target=background_login, args=(flow,), daemon=True).start() | |
| return gr.update(value=instructions, visible=True) | |
| # Check token and control UI switch | |
| def check_login_status(): | |
| token = access_token_state["token"] | |
| if token == "ERROR": | |
| return gr.update(visible=True, value="β Login failed.Click Login button again to Try again"), gr.update(visible=True), gr.update(visible=False) | |
| elif token: | |
| return gr.update(value="", visible=False), gr.update(visible=False), gr.update(visible=True) | |
| else: | |
| return gr.update(visible=False), gr.update(visible=True), gr.update(visible=False) | |
| def validate_admin_access(username, password): | |
| if username == ADMIN_USERNAME and password == ADMIN_PASSWORD: | |
| return ( | |
| gr.update(visible=False), # Hide admin login form | |
| gr.update(visible=True), # Show admin upload UI | |
| gr.update(visible=False, value="") # Clear any error | |
| ) | |
| else: | |
| return ( | |
| gr.update(visible=True), | |
| gr.update(visible=False), | |
| gr.update(visible=True, value="β Invalid credentials") | |
| ) | |
| def load_and_store_metadata_df(): | |
| global temp_file_path | |
| # Load metadata file | |
| gr.Info("Downloading the master file ..We will be ready shortly") | |
| metadata_folder_id = get_folder_id("Projects Apps/PPT Maker/Metadata_file", headers) | |
| temp_file_path =download_metadata_file(metadata_folder_id, headers) # Explicit call to download metadata | |
| # metadata_file = "Master_metadata.csv" | |
| # temp_file_path = os.path.join("/tmp", metadata_file_name) | |
| if not os.path.exists(temp_file_path): | |
| return [gr.update(visible=False) for _ in range(20)], "Metadata file not found." | |
| # if not os.path.exists(metadata_file): | |
| # return [gr.update(visible=False) for _ in range(20)], "Metadata file not found." | |
| #CSS for checkboxes | |
| css=""" | |
| .gr-button { | |
| background-color: #1E3A8A; | |
| color: white; | |
| } | |
| /* Style for checkbox column */ | |
| .checkbox-column { | |
| background-color: #EFF6FF; | |
| border-radius: 10px; | |
| padding: 10px; | |
| margin-top: 8px; | |
| margin-bottom: 8px; | |
| box-shadow: 0 1px 4px rgba(0,0,0,0.1); | |
| transition: box-shadow 0.3s ease; | |
| } | |
| .checkbox-column:hover { | |
| box-shadow: 0 2px 8px rgba(0,0,0,0.2); | |
| } | |
| /* Style the checkbox directly */ | |
| .gr-checkbox { | |
| font-weight: bold; | |
| color: #1D4ED8; | |
| } | |
| """ | |
| # # # MAIN APP # # # | |
| with gr.Blocks(css=css) as demo: | |
| with gr.Column(visible=True) as login_section: | |
| gr.HTML("<h1 style='text-align:center; color:#1E3A8A;'>NCTC SlideFinder</h1>") | |
| # π Subheading | |
| gr.HTML("<h3 style='text-align:center; color:#0F766E;'>PPT Repo and Smart Search Powered by AI</h3>") | |
| gr.HTML(""" | |
| <div style='text-align:center;'> | |
| <img src='/file=logo.jpg' width='200' height='200' style='margin-top:10px;' /> | |
| </div> | |
| """) | |
| login_button = gr.Button("π Login") | |
| auth_instructions = gr.HTML(visible=False) | |
| login_error = gr.Textbox(visible=False, interactive=False, label="", show_label=False) | |
| status_checker = gr.Button("β Check Login Status") | |
| with gr.Column(visible=False) as main_app_section: | |
| gr.Markdown("<h2 style='text-align:center; color:#0F766E;'>Welcome to NCTC PPT Repository</h2>") | |
| with gr.Tab("π Stats Dashboard"): | |
| with gr.Column() as dashboard_section: | |
| gr.Markdown("### π Dashboard Overview") | |
| with gr.Row(): | |
| total_ppt_box = gr.HTML() | |
| total_slides_box = gr.HTML() | |
| with gr.Row(): | |
| chart_output = gr.BarPlot(x="Month", y="PPT Uploads", label="Monthly PPT Uploads") | |
| latest_ppts_output = gr.HTML() | |
| with gr.Tab("Upload PPT"): | |
| # file_input = gr.File(label="Upload PPT File") | |
| # output_text = gr.Textbox(label="Processing Status") | |
| # submit_button = gr.Button("Process") | |
| # submit_button.click(process_presentation, inputs=file_input, outputs=output_text) | |
| with gr.Column() as admin_access_section: | |
| gr.Markdown("### π Admin Access Required") | |
| username_input = gr.Textbox(label="Username", placeholder="Enter username") | |
| password_input = gr.Textbox(label="Password", type="password", placeholder="Enter password") | |
| admin_login_msg = gr.Textbox(visible=False, interactive=False, show_label=False) | |
| admin_login_button = gr.Button("π Proceed") | |
| with gr.Column(visible=False) as admin_upload_ui: | |
| file_input = gr.File(label="Upload PPT File") | |
| output_text = gr.Textbox(label="Processing Status") | |
| submit_button = gr.Button("Process") | |
| submit_button.click(process_presentation, inputs=file_input, outputs=output_text) | |
| admin_login_button.click( | |
| validate_admin_access, | |
| inputs=[username_input, password_input], | |
| outputs=[admin_access_section, admin_upload_ui, admin_login_msg] | |
| ) | |
| with gr.Tab("Search PPT"): | |
| query_input = gr.Textbox(label="Enter Search Query", placeholder="e.g., Risk Management") | |
| num_results_input = gr.Number(label="Number of Results", value=5, minimum=1, maximum=20) | |
| search_button = gr.Button("π Search") | |
| result_rows = [] | |
| result_components = [] | |
| for i in range(20): | |
| with gr.Row(visible=False) as row: | |
| with gr.Column(scale=2): # image small | |
| image_output = gr.Image(label="Slide Image") | |
| with gr.Column(scale=1): # image small | |
| info_output = gr.HTML(label="PPT Info") | |
| with gr.Column(scale=2): # image small | |
| summary_output = gr.Textbox(label="Short Summary", lines=3) | |
| with gr.Column(scale=1): # image small | |
| # download_button = gr.Button("Download PPT") | |
| download_file = gr.File( label="π₯ Download PPT") | |
| result_rows.append(row) # β Track rows | |
| result_components.extend([row, image_output, info_output, summary_output, download_file]) | |
| search_button.click( | |
| search_ppts, | |
| inputs=[query_input, num_results_input], | |
| outputs=result_components | |
| ) | |
| with gr.Tab("Search and Combine Slides"): | |
| query_input = gr.Textbox(label="Enter Search Query to search slides", placeholder="e.g., Risk Management") | |
| num_results_input = gr.Number(label="Number of Slides you need", value=5, minimum=1, maximum=20) | |
| search_button = gr.Button("π Search") | |
| result_rows = [] | |
| result_components = [] | |
| checkboxes = [] | |
| for i in range(20): | |
| with gr.Row(visible=False) as row: | |
| with gr.Column(scale=4): # Image small | |
| image_output = gr.Image(label="Slide Image") | |
| with gr.Column(scale=2): # Info small | |
| info_output = gr.HTML(label="Slide Info") | |
| # with gr.Column(scale=2): # Summary small | |
| # summary_output = gr.Textbox(label="Short Summary", lines=3) | |
| with gr.Column(scale=1): # Slide ID small | |
| download_file = gr.File( label="π₯ Download Slide") | |
| #slide_id_output = gr.Textbox(label="Slide ID", interactive=False) | |
| with gr.Column(scale=1, elem_classes=["checkbox-column"]): # Checkbox small | |
| checkbox = gr.Checkbox(label="Select to Combine") | |
| checkboxes.append(checkbox) | |
| result_rows.append(row) # Track rows | |
| result_components.extend([row, image_output, info_output, download_file, checkbox]) | |
| combine_button = gr.Button("Combine Selected Slides") | |
| combined_ppt_output = gr.File(label="Download Combined PPT") | |
| search_button.click( | |
| search_slides, | |
| inputs=[query_input, num_results_input], | |
| outputs=result_components | |
| ) | |
| combine_button.click( | |
| combine_slides_as_zip, | |
| inputs=checkboxes, | |
| outputs=gr.File(label="Download ZIP") | |
| ) | |
| login_button.click(login_action, inputs=[], outputs=[auth_instructions]) | |
| status_checker.click( | |
| check_login_status, | |
| inputs=[], | |
| outputs=[login_error, login_section, main_app_section] | |
| ).then( | |
| fn=load_and_store_metadata_df, | |
| inputs=[], | |
| outputs=[] | |
| ).then( | |
| fn=update_dashboard, | |
| inputs=[], | |
| outputs=[dashboard_section, total_ppt_box, total_slides_box, chart_output, latest_ppts_output] | |
| ) | |
| demo.launch(debug=True, allowed_paths=[LOCAL_CACHE_DIR]) |