import os import json from fastapi import FastAPI, HTTPException, File, UploadFile from pydantic import BaseModel from firebase_admin import credentials, firestore from langchain_google_firestore import FirestoreChatMessageHistory from langchain.memory import ConversationBufferWindowMemory from langchain.schema import HumanMessage, AIMessage from dotenv import load_dotenv load_dotenv() from google.oauth2 import service_account from googleapiclient.discovery import build from googleapiclient.http import MediaFileUpload from fastapi.middleware.cors import CORSMiddleware import uuid import tempfile import firebase_admin from agent.retrive_agent import run_llm from pinecone import Pinecone from langchain_openai import OpenAIEmbeddings from datetime import datetime # Load environment variables # Memuat FIREBASE_SECRET dari environment variable # Memuat SERVICE_ACCOUNT_SECRET dari environment variable service_account_secret_string = os.getenv("SERVICE_ACCOUNT_SECRET") if not service_account_secret_string: raise ValueError("Environment variable SERVICE_ACCOUNT_SECRET is not set or empty.") service_account_secret_dict = json.loads(service_account_secret_string) # Initialize Google Drive API FOLDER_ID = '1SUjdlLkt58srCwSffDMAoZJKpzPCWszn' drive_creds = service_account.Credentials.from_service_account_info(service_account_secret_dict) drive_service = build('drive', 'v3', credentials=drive_creds) chatgpt_api_key = os.getenv("OPENAI_API_KEY") pinecone_api_key = os.getenv("PINECONE_API_KEY") firebase_credentials_json = os.getenv("FIREBASE_SECRET") # Load environment variables load_dotenv() # Initialize FastAPI app = FastAPI() @app.get("/") async def check(): return "API redi bolo !" app.add_middleware( CORSMiddleware, allow_origins=["*"], # Gantilah "*" dengan URL spesifik jika perlu allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Define data models class ChatRequest(BaseModel): user_id: str prompt: str class NewSessionRequest(BaseModel): user_id: str class UserInput(BaseModel): user_id: str # Initialize Firebase PROJECT_ID = "recommendation-system-mage" COLLECTION_NAME = "data_user" # if not firebase_admin._apps: # cred = credentials.Certificate("./recommendation-system-mage-firebase-adminsdk-ds1lw-1ac94ba6dd.json") # firebase_admin.initialize_app(cred) # client = firestore.client() if firebase_credentials_json: firebase_credentials_dict = json.loads(firebase_credentials_json) # Inisialisasi Firebase hanya jika belum diinisialisasi if not firebase_admin._apps: cred = credentials.Certificate(firebase_credentials_dict) firebase_admin.initialize_app(cred) # Membuat klien Firestore client = firestore.client() pc = Pinecone(api_key=pinecone_api_key) index_name = "mage-x-embeddings-all" index = pc.Index(index_name) embedding_model = OpenAIEmbeddings(model="text-embedding-ada-002", openai_api_key=chatgpt_api_key) def create_embeddings(text): return embedding_model.embed_query(text) def fetch_and_embed_data(user_id): collections = ["data_wisata", "data_umkm", "data_berita"] for collection in collections: doc_ref = client.collection(collection).document(user_id) doc = doc_ref.get() if doc.exists: data = doc.to_dict() if 'created_at' in data and data['created_at'] is not None: data['created_at'] = data['created_at'].replace(tzinfo=None).isoformat() try: created_at_str = data["created_at"] created_at_timestamp = int(datetime.strptime(created_at_str, "%Y-%m-%dT%H:%M:%SZ").timestamp() * 1000) except ValueError: print(f"Error: Format tanggal tidak valid untuk item: {doc.id}") created_at_timestamp = None text = ' '.join(str(value) for value in data.values() if value is not None) print(text) embedding = create_embeddings(text) metadata = { "firebase_id": str(doc.id), "created_at": created_at_timestamp, # Gunakan timestamp di metadata "likes_count": data.get("likes_count"), # Contoh metadata tambahan "location": data.get("location", ""), # Contoh metadata tambahan "category": data.get("category", ""), # Contoh metadata tambahan "collection_type": collection, "text": json.dumps(data) } index.upsert(vectors=[{ "id": str(doc.id), "values": embedding, "metadata": metadata, }]) output = f"Data {user_id} berhasil di embbedings" return output @app.post("/embeddings") async def get_recommendations(user_input: UserInput): user_id = user_input.user_id processed_documents = fetch_and_embed_data(user_id) return { "status": "success", "processed_documents": processed_documents } def create_embeddings(text): return embedding_model.embed_query(text) import json def recommend_similar_items(user_preferences, desired_collection_types): # Menggabungkan embedding untuk semua referensi pengguna combined_embeddings = [] for preference in user_preferences: embedding = create_embeddings(preference) combined_embeddings.append(embedding) # Hitung rata-rata embedding jika ada lebih dari satu preferensi if combined_embeddings: user_embedding = [ sum(values) / len(values) for values in zip(*combined_embeddings) ] else: raise HTTPException(status_code=400, detail="User preferences are empty.") # Query Pinecone menggunakan rata-rata embedding all_items_response = index.query( namespace="", vector=user_embedding, top_k=20, # Ambil maksimal 20 item include_values=True, include_metadata=True, ) # Filter hasil berdasarkan koleksi yang diinginkan filtered_matches = [] for match in all_items_response['matches']: collection_type = match['metadata']['collection_type'] if collection_type in desired_collection_types: # Parse metadata text to dictionary metadata_text = match['metadata'].get('text', '{}') metadata_dict = json.loads(metadata_text) # Add firebase_id and score to the parsed metadata parsed_result = { "firebase_id": match['metadata']['firebase_id'], "score": match['score'], **metadata_dict # Spread parsed metadata fields into the result } filtered_matches.append(parsed_result) return filtered_matches @app.post("/recommendation") async def get_recommendations(user_input: UserInput): """ Mengembalikan rekomendasi dari kategori wisata, umkm, dan berita berdasarkan preferensi pengguna. Setiap kategori akan menyertakan maksimal 20 rekomendasi, diurutkan berdasarkan similaritas. """ user_id = user_input.user_id categories = { "wisata": "data_wisata", "umkm": "data_umkm", "berita": "data_berita" } # Ambil data pengguna dari Firestore user_doc = client.collection(COLLECTION_NAME).document(user_id).get() if not user_doc.exists: raise HTTPException(status_code=404, detail="User not found") user_data = user_doc.to_dict() # Hasil rekomendasi recommendations = {} for category, collection_name in categories.items(): # Ambil preferensi untuk kategori tertentu category_reference = user_data.get(f"{category}_reference", []) if not isinstance(category_reference, list) or not category_reference: recommendations[category] = {"error": f"No valid {category} reference found for user"} continue try: # Dapatkan rekomendasi untuk kategori ini items = recommend_similar_items( user_preferences=category_reference, desired_collection_types=[collection_name] ) # Ambil hingga 20 rekomendasi recommendations[category] = sorted(items, key=lambda x: x["score"], reverse=True)[:20] except Exception as e: recommendations[category] = {"error": str(e)} return recommendations # Endpoint to initialize a new chat session @app.post("/initialize_session") def initialize_session(request: NewSessionRequest): user_doc_ref = client.collection(COLLECTION_NAME).document(request.user_id) user_doc = user_doc_ref.get() if not user_doc.exists: raise HTTPException(status_code=404, detail="User ID not found in Firebase.") # Create new session session_id = str(uuid.uuid4()) user_doc_ref.set({"last_session_id": session_id}, merge=True) user_doc_ref.collection("message_history").document(session_id).set({}) return {"session_id": session_id} @app.post("/chat") def chat(request: ChatRequest): user_doc_ref = client.collection(COLLECTION_NAME).document(request.user_id) user_doc = user_doc_ref.get() if not user_doc.exists: raise HTTPException(status_code=404, detail="User ID not found in Firebase.") # Corrected session ID retrieval session_id = user_doc.get("last_session_id") if user_doc.exists and "last_session_id" in user_doc.to_dict() else str(uuid.uuid4()) user_doc_ref.set({"last_session_id": session_id}, merge=True) # Initialize chat history and memory chat_history = FirestoreChatMessageHistory( session_id=session_id, collection=f"{COLLECTION_NAME}/{request.user_id}/message_history", client=client, ) memory = ConversationBufferWindowMemory(k=5, chat_memory=chat_history) # Generate response generated_response = run_llm(query=request.prompt, chat_history=memory.chat_memory.messages) generated_response = generated_response.replace("```", "").strip() memory.save_context({"input": request.prompt}, {"output": generated_response}) return {"response": generated_response, "session_id": session_id} # Endpoint to get chat history for a session @app.get("/chat_history/{user_id}/{session_id}") def get_chat_history(user_id: str, session_id: str): chat_history = FirestoreChatMessageHistory( session_id=session_id, collection=f"{COLLECTION_NAME}/{user_id}/message_history", client=client, ) messages = [{"role": "user" if isinstance(msg, HumanMessage) else "assistant", "content": msg.content} for msg in chat_history.messages] return {"chat_history": messages} # Fungsi untuk mengunggah file ke Google Drive def upload_to_drive(file_path: str, folder_id: str) -> str: file_name = os.path.basename(file_path) file_metadata = { 'name': file_name, 'parents': [folder_id] } media = MediaFileUpload(file_path, resumable=True) uploaded_file = drive_service.files().create( body=file_metadata, media_body=media, fields='id' ).execute() permission = {'type': 'anyone', 'role': 'reader'} drive_service.permissions().create(fileId=uploaded_file['id'], body=permission).execute() public_url = f"https://drive.google.com/uc?export=view&id={uploaded_file['id']}" return public_url # Endpoint to upload a file to Google Drive @app.post("/upload/") async def upload_file(file: UploadFile = File(...)): try: unique_filename = f"{uuid.uuid4()}_{file.filename}" file_path = os.path.join(tempfile.gettempdir(), unique_filename) with open(file_path, "wb") as buffer: buffer.write(await file.read()) public_link = upload_to_drive(file_path, FOLDER_ID) os.remove(file_path) return {"message": "File uploaded successfully", "link": public_link} except Exception as e: raise HTTPException(status_code=500, detail=str(e))