import os import shutil import tempfile import gradio as gr from PyPDF2 import PdfReader from langchain.text_splitter import CharacterTextSplitter from langchain_community.vectorstores import Chroma from langchain.memory import ConversationBufferMemory from langchain.chains import ConversationalRetrievalChain from langchain_openai import OpenAIEmbeddings, ChatOpenAI from langchain_community.document_loaders import UnstructuredPDFLoader from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.http import MediaIoBaseDownload import io import json SCOPES = ['https://www.googleapis.com/auth/drive.readonly'] CHROMA_DIR = "chroma_db" chain = None carpetas_drive_global = {} servicio_drive_global = None # Autenticación manual para Hugging Face def authenticate_manual(credentials_path, code): with open(credentials_path, "r") as f: client_config = json.load(f) client_config["installed"]["redirect_uris"] = ["urn:ietf:wg:oauth:2.0:oob"] flow = InstalledAppFlow.from_client_config(client_config, SCOPES, redirect_uri="urn:ietf:wg:oauth:2.0:oob") auth_url, _ = flow.authorization_url(prompt='consent') if code: flow.fetch_token(code=code) creds = flow.credentials with open("token.json", 'w') as token: token.write(creds.to_json()) return build('drive', 'v3', credentials=creds), "✅ Conectado correctamente.", "" return None, "Copia y abre este enlace para autorizar:", f"[{auth_url}]({auth_url})" # Obtener carpetas de Drive def listar_carpetas_drive(service): results = service.files().list( q="mimeType='application/vnd.google-apps.folder' and trashed=false", fields="files(id, name)").execute() folders = results.get('files', []) return {f['name']: f['id'] for f in folders} # Descargar PDFs def get_pdfs_from_drive_by_id(service, folder_id): query = f"'{folder_id}' in parents and mimeType='application/pdf'" pdfs = service.files().list(q=query, fields="files(id, name)").execute().get('files', []) temp_dir = os.path.join(tempfile.gettempdir(), "google_drive_rag_pdfs") if os.path.exists(temp_dir): for f in os.listdir(temp_dir): os.remove(os.path.join(temp_dir, f)) else: os.makedirs(temp_dir) for pdf in pdfs: try: request = service.files().get_media(fileId=pdf['id']) file_path = os.path.join(temp_dir, pdf['name']) fh = io.FileIO(file_path, 'wb') downloader = MediaIoBaseDownload(fh, request) done = False while not done: status, done = downloader.next_chunk() except Exception as e: print(f"❌ Error al descargar {pdf['name']}: {e}") return temp_dir # Extraer texto def extract_text_from_pdfs(folder_path): all_text = "" for filename in os.listdir(folder_path): if filename.endswith(".pdf"): pdf_path = os.path.join(folder_path, filename) try: loader = UnstructuredPDFLoader(pdf_path) documents = loader.load() for doc in documents: all_text += doc.page_content or "" except Exception as e: print(f"❌ Error procesando {filename}: {e}") return all_text # Vectorstore def create_vectorstore_from_text(text, api_key): if not text.strip(): raise ValueError("❌ No se extrajo texto válido de los PDFs.") if os.path.exists(CHROMA_DIR): shutil.rmtree(CHROMA_DIR) text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100) docs = text_splitter.create_documents([text]) os.environ["OPENAI_API_KEY"] = api_key embeddings = OpenAIEmbeddings() vectordb = Chroma.from_documents(docs, embedding=embeddings, persist_directory=CHROMA_DIR) return vectordb # Conversational chain def setup_qa_chain(vectordb): retriever = vectordb.as_retriever() memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) return ConversationalRetrievalChain.from_llm( llm=ChatOpenAI(model_name="gpt-3.5-turbo"), retriever=retriever, memory=memory ) # Gradio UI with gr.Blocks(title="Google Drive RAG") as demo: gr.Markdown(""" # 📂 Google Drive RAG Conecta tu cuenta de Google Drive, selecciona una carpeta con PDFs, y realiza preguntas sobre su contenido usando RAG + OpenAI. """) with gr.Accordion("🔐 Paso 1: Autenticación", open=True): with gr.Row(): credentials_file = gr.File(label="📄 Subir credentials.json") token_file = gr.File(label="🔑 Subir token.json (opcional)") api_key_input = gr.Textbox(label="🔑 OpenAI API Key", type="password", placeholder="sk-...") codigo_auth = gr.Textbox(label="📥 Pega el código de autorización aquí (después de abrir el link)") cargar_btn = gr.Button("🔁 Conectar con Google Drive") estado = gr.Textbox(label="Estado de conexión", interactive=False) auth_link = gr.Markdown("") with gr.Accordion("📁 Paso 2: Seleccionar carpeta y procesar PDFs", open=True): carpeta_dropdown = gr.Dropdown(label="📂 Carpetas disponibles") procesar_btn = gr.Button("📥 Procesar PDFs de la carpeta seleccionada") estado_proceso = gr.Textbox(label="Estado del procesamiento", interactive=False) with gr.Accordion("💬 Paso 3: Pregunta sobre tus documentos", open=True): chatbot = gr.Chatbot() user_input = gr.Textbox(label="Escribe tu pregunta") enviar_btn = gr.Button("Enviar pregunta") def conectar_drive(cred_file, auth_code): global carpetas_drive_global, servicio_drive_global try: service, estado_text, link = authenticate_manual(cred_file.name, auth_code.strip()) if not service: return gr.update(choices=[]), estado_text, link carpetas = listar_carpetas_drive(service) carpetas_drive_global = carpetas servicio_drive_global = service return gr.update(choices=list(carpetas.keys())), estado_text, "" except Exception as e: return gr.update(choices=[]), f"❌ Error: {e}", "" def procesar_pdfs(nombre_carpeta, cred_file, tok_file, api_key): try: service = servicio_drive_global folder_id = carpetas_drive_global.get(nombre_carpeta) if not folder_id: return "❌ Carpeta no encontrada." folder_path = get_pdfs_from_drive_by_id(service, folder_id) text = extract_text_from_pdfs(folder_path) vectordb = create_vectorstore_from_text(text, api_key) global chain chain = setup_qa_chain(vectordb) return f"✅ PDFs procesados correctamente desde carpeta: {nombre_carpeta}" except Exception as e: return f"❌ Error: {e}" def preguntar(pregunta, chat_history): global chain if not pregunta.strip(): return "", chat_history respuesta = chain({"question": pregunta}) chat_history.append((pregunta, respuesta['answer'])) return "", chat_history cargar_btn.click(fn=conectar_drive, inputs=[credentials_file, codigo_auth], outputs=[carpeta_dropdown, estado, auth_link]) procesar_btn.click(fn=procesar_pdfs, inputs=[carpeta_dropdown, credentials_file, token_file, api_key_input], outputs=estado_proceso) enviar_btn.click(fn=preguntar, inputs=[user_input, chatbot], outputs=[user_input, chatbot]) # Para Hugging Face Spaces if __name__ == "__main__": demo.launch(share=True)