Spaces:
Sleeping
Sleeping
Jose Martin Rangel Espinoza
🔧 Refactor authentication to use global variables for Drive service and folders
1cd8943 | import os | |
| import shutil | |
| import tempfile | |
| import gradio as gr | |
| from PyPDF2 import PdfReader | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from langchain_community.vectorstores import Chroma | |
| from langchain.memory import ConversationBufferMemory | |
| from langchain.chains import ConversationalRetrievalChain | |
| from langchain_openai import OpenAIEmbeddings, ChatOpenAI | |
| from langchain_community.document_loaders import UnstructuredPDFLoader | |
| from google.oauth2.credentials import Credentials | |
| from google_auth_oauthlib.flow import InstalledAppFlow | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaIoBaseDownload | |
| import io | |
| import json | |
| SCOPES = ['https://www.googleapis.com/auth/drive.readonly'] | |
| CHROMA_DIR = "chroma_db" | |
| chain = None | |
| carpetas_drive_global = {} | |
| servicio_drive_global = None | |
| # Autenticación manual para Hugging Face | |
| def authenticate_manual(credentials_path, code): | |
| with open(credentials_path, "r") as f: | |
| client_config = json.load(f) | |
| client_config["installed"]["redirect_uris"] = ["urn:ietf:wg:oauth:2.0:oob"] | |
| flow = InstalledAppFlow.from_client_config(client_config, SCOPES, redirect_uri="urn:ietf:wg:oauth:2.0:oob") | |
| auth_url, _ = flow.authorization_url(prompt='consent') | |
| if code: | |
| flow.fetch_token(code=code) | |
| creds = flow.credentials | |
| with open("token.json", 'w') as token: | |
| token.write(creds.to_json()) | |
| return build('drive', 'v3', credentials=creds), "✅ Conectado correctamente.", "" | |
| return None, "Copia y abre este enlace para autorizar:", f"[{auth_url}]({auth_url})" | |
| # Obtener carpetas de Drive | |
| def listar_carpetas_drive(service): | |
| results = service.files().list( | |
| q="mimeType='application/vnd.google-apps.folder' and trashed=false", | |
| fields="files(id, name)").execute() | |
| folders = results.get('files', []) | |
| return {f['name']: f['id'] for f in folders} | |
| # Descargar PDFs | |
| def get_pdfs_from_drive_by_id(service, folder_id): | |
| query = f"'{folder_id}' in parents and mimeType='application/pdf'" | |
| pdfs = service.files().list(q=query, fields="files(id, name)").execute().get('files', []) | |
| temp_dir = os.path.join(tempfile.gettempdir(), "google_drive_rag_pdfs") | |
| if os.path.exists(temp_dir): | |
| for f in os.listdir(temp_dir): | |
| os.remove(os.path.join(temp_dir, f)) | |
| else: | |
| os.makedirs(temp_dir) | |
| for pdf in pdfs: | |
| try: | |
| request = service.files().get_media(fileId=pdf['id']) | |
| file_path = os.path.join(temp_dir, pdf['name']) | |
| fh = io.FileIO(file_path, 'wb') | |
| downloader = MediaIoBaseDownload(fh, request) | |
| done = False | |
| while not done: | |
| status, done = downloader.next_chunk() | |
| except Exception as e: | |
| print(f"❌ Error al descargar {pdf['name']}: {e}") | |
| return temp_dir | |
| # Extraer texto | |
| def extract_text_from_pdfs(folder_path): | |
| all_text = "" | |
| for filename in os.listdir(folder_path): | |
| if filename.endswith(".pdf"): | |
| pdf_path = os.path.join(folder_path, filename) | |
| try: | |
| loader = UnstructuredPDFLoader(pdf_path) | |
| documents = loader.load() | |
| for doc in documents: | |
| all_text += doc.page_content or "" | |
| except Exception as e: | |
| print(f"❌ Error procesando {filename}: {e}") | |
| return all_text | |
| # Vectorstore | |
| def create_vectorstore_from_text(text, api_key): | |
| if not text.strip(): | |
| raise ValueError("❌ No se extrajo texto válido de los PDFs.") | |
| if os.path.exists(CHROMA_DIR): | |
| shutil.rmtree(CHROMA_DIR) | |
| text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100) | |
| docs = text_splitter.create_documents([text]) | |
| os.environ["OPENAI_API_KEY"] = api_key | |
| embeddings = OpenAIEmbeddings() | |
| vectordb = Chroma.from_documents(docs, embedding=embeddings, persist_directory=CHROMA_DIR) | |
| return vectordb | |
| # Conversational chain | |
| def setup_qa_chain(vectordb): | |
| retriever = vectordb.as_retriever() | |
| memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True) | |
| return ConversationalRetrievalChain.from_llm( | |
| llm=ChatOpenAI(model_name="gpt-3.5-turbo"), | |
| retriever=retriever, | |
| memory=memory | |
| ) | |
| # Gradio UI | |
| with gr.Blocks(title="Google Drive RAG") as demo: | |
| gr.Markdown(""" | |
| # 📂 Google Drive RAG | |
| Conecta tu cuenta de Google Drive, selecciona una carpeta con PDFs, y realiza preguntas sobre su contenido usando RAG + OpenAI. | |
| """) | |
| with gr.Accordion("🔐 Paso 1: Autenticación", open=True): | |
| with gr.Row(): | |
| credentials_file = gr.File(label="📄 Subir credentials.json") | |
| token_file = gr.File(label="🔑 Subir token.json (opcional)") | |
| api_key_input = gr.Textbox(label="🔑 OpenAI API Key", type="password", placeholder="sk-...") | |
| codigo_auth = gr.Textbox(label="📥 Pega el código de autorización aquí (después de abrir el link)") | |
| cargar_btn = gr.Button("🔁 Conectar con Google Drive") | |
| estado = gr.Textbox(label="Estado de conexión", interactive=False) | |
| auth_link = gr.Markdown("") | |
| with gr.Accordion("📁 Paso 2: Seleccionar carpeta y procesar PDFs", open=True): | |
| carpeta_dropdown = gr.Dropdown(label="📂 Carpetas disponibles") | |
| procesar_btn = gr.Button("📥 Procesar PDFs de la carpeta seleccionada") | |
| estado_proceso = gr.Textbox(label="Estado del procesamiento", interactive=False) | |
| with gr.Accordion("💬 Paso 3: Pregunta sobre tus documentos", open=True): | |
| chatbot = gr.Chatbot() | |
| user_input = gr.Textbox(label="Escribe tu pregunta") | |
| enviar_btn = gr.Button("Enviar pregunta") | |
| def conectar_drive(cred_file, auth_code): | |
| global carpetas_drive_global, servicio_drive_global | |
| try: | |
| service, estado_text, link = authenticate_manual(cred_file.name, auth_code.strip()) | |
| if not service: | |
| return gr.update(choices=[]), estado_text, link | |
| carpetas = listar_carpetas_drive(service) | |
| carpetas_drive_global = carpetas | |
| servicio_drive_global = service | |
| return gr.update(choices=list(carpetas.keys())), estado_text, "" | |
| except Exception as e: | |
| return gr.update(choices=[]), f"❌ Error: {e}", "" | |
| def procesar_pdfs(nombre_carpeta, cred_file, tok_file, api_key): | |
| try: | |
| service = servicio_drive_global | |
| folder_id = carpetas_drive_global.get(nombre_carpeta) | |
| if not folder_id: | |
| return "❌ Carpeta no encontrada." | |
| folder_path = get_pdfs_from_drive_by_id(service, folder_id) | |
| text = extract_text_from_pdfs(folder_path) | |
| vectordb = create_vectorstore_from_text(text, api_key) | |
| global chain | |
| chain = setup_qa_chain(vectordb) | |
| return f"✅ PDFs procesados correctamente desde carpeta: {nombre_carpeta}" | |
| except Exception as e: | |
| return f"❌ Error: {e}" | |
| def preguntar(pregunta, chat_history): | |
| global chain | |
| if not pregunta.strip(): | |
| return "", chat_history | |
| respuesta = chain({"question": pregunta}) | |
| chat_history.append((pregunta, respuesta['answer'])) | |
| return "", chat_history | |
| cargar_btn.click(fn=conectar_drive, inputs=[credentials_file, codigo_auth], outputs=[carpeta_dropdown, estado, auth_link]) | |
| procesar_btn.click(fn=procesar_pdfs, inputs=[carpeta_dropdown, credentials_file, token_file, api_key_input], outputs=estado_proceso) | |
| enviar_btn.click(fn=preguntar, inputs=[user_input, chatbot], outputs=[user_input, chatbot]) | |
| # Para Hugging Face Spaces | |
| if __name__ == "__main__": | |
| demo.launch(share=True) | |