Jose Martin Rangel Espinoza
🔧 Refactor authentication to use global variables for Drive service and folders
1cd8943
import os
import shutil
import tempfile
import gradio as gr
from PyPDF2 import PdfReader
from langchain.text_splitter import CharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain.memory import ConversationBufferMemory
from langchain.chains import ConversationalRetrievalChain
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_community.document_loaders import UnstructuredPDFLoader
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
import io
import json
SCOPES = ['https://www.googleapis.com/auth/drive.readonly']
CHROMA_DIR = "chroma_db"
chain = None
carpetas_drive_global = {}
servicio_drive_global = None
# Autenticación manual para Hugging Face
def authenticate_manual(credentials_path, code):
with open(credentials_path, "r") as f:
client_config = json.load(f)
client_config["installed"]["redirect_uris"] = ["urn:ietf:wg:oauth:2.0:oob"]
flow = InstalledAppFlow.from_client_config(client_config, SCOPES, redirect_uri="urn:ietf:wg:oauth:2.0:oob")
auth_url, _ = flow.authorization_url(prompt='consent')
if code:
flow.fetch_token(code=code)
creds = flow.credentials
with open("token.json", 'w') as token:
token.write(creds.to_json())
return build('drive', 'v3', credentials=creds), "✅ Conectado correctamente.", ""
return None, "Copia y abre este enlace para autorizar:", f"[{auth_url}]({auth_url})"
# Obtener carpetas de Drive
def listar_carpetas_drive(service):
results = service.files().list(
q="mimeType='application/vnd.google-apps.folder' and trashed=false",
fields="files(id, name)").execute()
folders = results.get('files', [])
return {f['name']: f['id'] for f in folders}
# Descargar PDFs
def get_pdfs_from_drive_by_id(service, folder_id):
query = f"'{folder_id}' in parents and mimeType='application/pdf'"
pdfs = service.files().list(q=query, fields="files(id, name)").execute().get('files', [])
temp_dir = os.path.join(tempfile.gettempdir(), "google_drive_rag_pdfs")
if os.path.exists(temp_dir):
for f in os.listdir(temp_dir):
os.remove(os.path.join(temp_dir, f))
else:
os.makedirs(temp_dir)
for pdf in pdfs:
try:
request = service.files().get_media(fileId=pdf['id'])
file_path = os.path.join(temp_dir, pdf['name'])
fh = io.FileIO(file_path, 'wb')
downloader = MediaIoBaseDownload(fh, request)
done = False
while not done:
status, done = downloader.next_chunk()
except Exception as e:
print(f"❌ Error al descargar {pdf['name']}: {e}")
return temp_dir
# Extraer texto
def extract_text_from_pdfs(folder_path):
all_text = ""
for filename in os.listdir(folder_path):
if filename.endswith(".pdf"):
pdf_path = os.path.join(folder_path, filename)
try:
loader = UnstructuredPDFLoader(pdf_path)
documents = loader.load()
for doc in documents:
all_text += doc.page_content or ""
except Exception as e:
print(f"❌ Error procesando {filename}: {e}")
return all_text
# Vectorstore
def create_vectorstore_from_text(text, api_key):
if not text.strip():
raise ValueError("❌ No se extrajo texto válido de los PDFs.")
if os.path.exists(CHROMA_DIR):
shutil.rmtree(CHROMA_DIR)
text_splitter = CharacterTextSplitter(chunk_size=1000, chunk_overlap=100)
docs = text_splitter.create_documents([text])
os.environ["OPENAI_API_KEY"] = api_key
embeddings = OpenAIEmbeddings()
vectordb = Chroma.from_documents(docs, embedding=embeddings, persist_directory=CHROMA_DIR)
return vectordb
# Conversational chain
def setup_qa_chain(vectordb):
retriever = vectordb.as_retriever()
memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
return ConversationalRetrievalChain.from_llm(
llm=ChatOpenAI(model_name="gpt-3.5-turbo"),
retriever=retriever,
memory=memory
)
# Gradio UI
with gr.Blocks(title="Google Drive RAG") as demo:
gr.Markdown("""
# 📂 Google Drive RAG
Conecta tu cuenta de Google Drive, selecciona una carpeta con PDFs, y realiza preguntas sobre su contenido usando RAG + OpenAI.
""")
with gr.Accordion("🔐 Paso 1: Autenticación", open=True):
with gr.Row():
credentials_file = gr.File(label="📄 Subir credentials.json")
token_file = gr.File(label="🔑 Subir token.json (opcional)")
api_key_input = gr.Textbox(label="🔑 OpenAI API Key", type="password", placeholder="sk-...")
codigo_auth = gr.Textbox(label="📥 Pega el código de autorización aquí (después de abrir el link)")
cargar_btn = gr.Button("🔁 Conectar con Google Drive")
estado = gr.Textbox(label="Estado de conexión", interactive=False)
auth_link = gr.Markdown("")
with gr.Accordion("📁 Paso 2: Seleccionar carpeta y procesar PDFs", open=True):
carpeta_dropdown = gr.Dropdown(label="📂 Carpetas disponibles")
procesar_btn = gr.Button("📥 Procesar PDFs de la carpeta seleccionada")
estado_proceso = gr.Textbox(label="Estado del procesamiento", interactive=False)
with gr.Accordion("💬 Paso 3: Pregunta sobre tus documentos", open=True):
chatbot = gr.Chatbot()
user_input = gr.Textbox(label="Escribe tu pregunta")
enviar_btn = gr.Button("Enviar pregunta")
def conectar_drive(cred_file, auth_code):
global carpetas_drive_global, servicio_drive_global
try:
service, estado_text, link = authenticate_manual(cred_file.name, auth_code.strip())
if not service:
return gr.update(choices=[]), estado_text, link
carpetas = listar_carpetas_drive(service)
carpetas_drive_global = carpetas
servicio_drive_global = service
return gr.update(choices=list(carpetas.keys())), estado_text, ""
except Exception as e:
return gr.update(choices=[]), f"❌ Error: {e}", ""
def procesar_pdfs(nombre_carpeta, cred_file, tok_file, api_key):
try:
service = servicio_drive_global
folder_id = carpetas_drive_global.get(nombre_carpeta)
if not folder_id:
return "❌ Carpeta no encontrada."
folder_path = get_pdfs_from_drive_by_id(service, folder_id)
text = extract_text_from_pdfs(folder_path)
vectordb = create_vectorstore_from_text(text, api_key)
global chain
chain = setup_qa_chain(vectordb)
return f"✅ PDFs procesados correctamente desde carpeta: {nombre_carpeta}"
except Exception as e:
return f"❌ Error: {e}"
def preguntar(pregunta, chat_history):
global chain
if not pregunta.strip():
return "", chat_history
respuesta = chain({"question": pregunta})
chat_history.append((pregunta, respuesta['answer']))
return "", chat_history
cargar_btn.click(fn=conectar_drive, inputs=[credentials_file, codigo_auth], outputs=[carpeta_dropdown, estado, auth_link])
procesar_btn.click(fn=procesar_pdfs, inputs=[carpeta_dropdown, credentials_file, token_file, api_key_input], outputs=estado_proceso)
enviar_btn.click(fn=preguntar, inputs=[user_input, chatbot], outputs=[user_input, chatbot])
# Para Hugging Face Spaces
if __name__ == "__main__":
demo.launch(share=True)