MAGE10 / api /api_mage_x.py
Bima Ardhia
test
a724005
import os
import json
from fastapi import FastAPI, HTTPException, File, UploadFile
from pydantic import BaseModel
from firebase_admin import credentials, firestore
from langchain_google_firestore import FirestoreChatMessageHistory
from langchain.memory import ConversationBufferWindowMemory
from langchain.schema import HumanMessage, AIMessage
from dotenv import load_dotenv
load_dotenv()
from google.oauth2 import service_account
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from fastapi.middleware.cors import CORSMiddleware
import uuid
import tempfile
import firebase_admin
from agent.retrive_agent import run_llm
from pinecone import Pinecone
from langchain_openai import OpenAIEmbeddings
from datetime import datetime
# Load environment variables
# Memuat FIREBASE_SECRET dari environment variable
# Memuat SERVICE_ACCOUNT_SECRET dari environment variable
service_account_secret_string = os.getenv("SERVICE_ACCOUNT_SECRET")
if not service_account_secret_string:
raise ValueError("Environment variable SERVICE_ACCOUNT_SECRET is not set or empty.")
service_account_secret_dict = json.loads(service_account_secret_string)
# Initialize Google Drive API
FOLDER_ID = '1SUjdlLkt58srCwSffDMAoZJKpzPCWszn'
drive_creds = service_account.Credentials.from_service_account_info(service_account_secret_dict)
drive_service = build('drive', 'v3', credentials=drive_creds)
chatgpt_api_key = os.getenv("OPENAI_API_KEY")
pinecone_api_key = os.getenv("PINECONE_API_KEY")
firebase_credentials_json = os.getenv("FIREBASE_SECRET")
# Load environment variables
load_dotenv()
# Initialize FastAPI
app = FastAPI()
@app.get("/")
async def check():
return "API redi bolo !"
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Gantilah "*" dengan URL spesifik jika perlu
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Define data models
class ChatRequest(BaseModel):
user_id: str
prompt: str
class NewSessionRequest(BaseModel):
user_id: str
class UserInput(BaseModel):
user_id: str
# Initialize Firebase
PROJECT_ID = "recommendation-system-mage"
COLLECTION_NAME = "data_user"
# if not firebase_admin._apps:
# cred = credentials.Certificate("./recommendation-system-mage-firebase-adminsdk-ds1lw-1ac94ba6dd.json")
# firebase_admin.initialize_app(cred)
# client = firestore.client()
if firebase_credentials_json:
firebase_credentials_dict = json.loads(firebase_credentials_json)
# Inisialisasi Firebase hanya jika belum diinisialisasi
if not firebase_admin._apps:
cred = credentials.Certificate(firebase_credentials_dict)
firebase_admin.initialize_app(cred)
# Membuat klien Firestore
client = firestore.client()
pc = Pinecone(api_key=pinecone_api_key)
index_name = "mage-x-embeddings-all"
index = pc.Index(index_name)
embedding_model = OpenAIEmbeddings(model="text-embedding-ada-002", openai_api_key=chatgpt_api_key)
def create_embeddings(text):
return embedding_model.embed_query(text)
def fetch_and_embed_data(user_id):
collections = ["data_wisata", "data_umkm", "data_berita"]
for collection in collections:
doc_ref = client.collection(collection).document(user_id)
doc = doc_ref.get()
if doc.exists:
data = doc.to_dict()
if 'created_at' in data and data['created_at'] is not None:
data['created_at'] = data['created_at'].replace(tzinfo=None).isoformat()
try:
created_at_str = data["created_at"]
created_at_timestamp = int(datetime.strptime(created_at_str, "%Y-%m-%dT%H:%M:%SZ").timestamp() * 1000)
except ValueError:
print(f"Error: Format tanggal tidak valid untuk item: {doc.id}")
created_at_timestamp = None
text = ' '.join(str(value) for value in data.values() if value is not None)
print(text)
embedding = create_embeddings(text)
metadata = {
"firebase_id": str(doc.id),
"created_at": created_at_timestamp, # Gunakan timestamp di metadata
"likes_count": data.get("likes_count"), # Contoh metadata tambahan
"location": data.get("location", ""), # Contoh metadata tambahan
"category": data.get("category", ""), # Contoh metadata tambahan
"collection_type": collection,
"text": json.dumps(data)
}
index.upsert(vectors=[{
"id": str(doc.id),
"values": embedding,
"metadata": metadata,
}])
output = f"Data {user_id} berhasil di embbedings"
return output
@app.post("/embeddings")
async def get_recommendations(user_input: UserInput):
user_id = user_input.user_id
processed_documents = fetch_and_embed_data(user_id)
return {
"status": "success",
"processed_documents": processed_documents
}
def create_embeddings(text):
return embedding_model.embed_query(text)
import json
def recommend_similar_items(user_preferences, desired_collection_types):
# Menggabungkan embedding untuk semua referensi pengguna
combined_embeddings = []
for preference in user_preferences:
embedding = create_embeddings(preference)
combined_embeddings.append(embedding)
# Hitung rata-rata embedding jika ada lebih dari satu preferensi
if combined_embeddings:
user_embedding = [
sum(values) / len(values)
for values in zip(*combined_embeddings)
]
else:
raise HTTPException(status_code=400, detail="User preferences are empty.")
# Query Pinecone menggunakan rata-rata embedding
all_items_response = index.query(
namespace="",
vector=user_embedding,
top_k=20, # Ambil maksimal 20 item
include_values=True,
include_metadata=True,
)
# Filter hasil berdasarkan koleksi yang diinginkan
filtered_matches = []
for match in all_items_response['matches']:
collection_type = match['metadata']['collection_type']
if collection_type in desired_collection_types:
# Parse metadata text to dictionary
metadata_text = match['metadata'].get('text', '{}')
metadata_dict = json.loads(metadata_text)
# Add firebase_id and score to the parsed metadata
parsed_result = {
"firebase_id": match['metadata']['firebase_id'],
"score": match['score'],
**metadata_dict # Spread parsed metadata fields into the result
}
filtered_matches.append(parsed_result)
return filtered_matches
@app.post("/recommendation")
async def get_recommendations(user_input: UserInput):
"""
Mengembalikan rekomendasi dari kategori wisata, umkm, dan berita berdasarkan preferensi pengguna.
Setiap kategori akan menyertakan maksimal 20 rekomendasi, diurutkan berdasarkan similaritas.
"""
user_id = user_input.user_id
categories = {
"wisata": "data_wisata",
"umkm": "data_umkm",
"berita": "data_berita"
}
# Ambil data pengguna dari Firestore
user_doc = client.collection(COLLECTION_NAME).document(user_id).get()
if not user_doc.exists:
raise HTTPException(status_code=404, detail="User not found")
user_data = user_doc.to_dict()
# Hasil rekomendasi
recommendations = {}
for category, collection_name in categories.items():
# Ambil preferensi untuk kategori tertentu
category_reference = user_data.get(f"{category}_reference", [])
if not isinstance(category_reference, list) or not category_reference:
recommendations[category] = {"error": f"No valid {category} reference found for user"}
continue
try:
# Dapatkan rekomendasi untuk kategori ini
items = recommend_similar_items(
user_preferences=category_reference,
desired_collection_types=[collection_name]
)
# Ambil hingga 20 rekomendasi
recommendations[category] = sorted(items, key=lambda x: x["score"], reverse=True)[:20]
except Exception as e:
recommendations[category] = {"error": str(e)}
return recommendations
# Endpoint to initialize a new chat session
@app.post("/initialize_session")
def initialize_session(request: NewSessionRequest):
user_doc_ref = client.collection(COLLECTION_NAME).document(request.user_id)
user_doc = user_doc_ref.get()
if not user_doc.exists:
raise HTTPException(status_code=404, detail="User ID not found in Firebase.")
# Create new session
session_id = str(uuid.uuid4())
user_doc_ref.set({"last_session_id": session_id}, merge=True)
user_doc_ref.collection("message_history").document(session_id).set({})
return {"session_id": session_id}
@app.post("/chat")
def chat(request: ChatRequest):
user_doc_ref = client.collection(COLLECTION_NAME).document(request.user_id)
user_doc = user_doc_ref.get()
if not user_doc.exists:
raise HTTPException(status_code=404, detail="User ID not found in Firebase.")
# Corrected session ID retrieval
session_id = user_doc.get("last_session_id") if user_doc.exists and "last_session_id" in user_doc.to_dict() else str(uuid.uuid4())
user_doc_ref.set({"last_session_id": session_id}, merge=True)
# Initialize chat history and memory
chat_history = FirestoreChatMessageHistory(
session_id=session_id,
collection=f"{COLLECTION_NAME}/{request.user_id}/message_history",
client=client,
)
memory = ConversationBufferWindowMemory(k=5, chat_memory=chat_history)
# Generate response
generated_response = run_llm(query=request.prompt, chat_history=memory.chat_memory.messages)
generated_response = generated_response.replace("```", "").strip()
memory.save_context({"input": request.prompt}, {"output": generated_response})
return {"response": generated_response, "session_id": session_id}
# Endpoint to get chat history for a session
@app.get("/chat_history/{user_id}/{session_id}")
def get_chat_history(user_id: str, session_id: str):
chat_history = FirestoreChatMessageHistory(
session_id=session_id,
collection=f"{COLLECTION_NAME}/{user_id}/message_history",
client=client,
)
messages = [{"role": "user" if isinstance(msg, HumanMessage) else "assistant", "content": msg.content} for msg in chat_history.messages]
return {"chat_history": messages}
# Fungsi untuk mengunggah file ke Google Drive
def upload_to_drive(file_path: str, folder_id: str) -> str:
file_name = os.path.basename(file_path)
file_metadata = {
'name': file_name,
'parents': [folder_id]
}
media = MediaFileUpload(file_path, resumable=True)
uploaded_file = drive_service.files().create(
body=file_metadata, media_body=media, fields='id'
).execute()
permission = {'type': 'anyone', 'role': 'reader'}
drive_service.permissions().create(fileId=uploaded_file['id'], body=permission).execute()
public_url = f"https://drive.google.com/uc?export=view&id={uploaded_file['id']}"
return public_url
# Endpoint to upload a file to Google Drive
@app.post("/upload/")
async def upload_file(file: UploadFile = File(...)):
try:
unique_filename = f"{uuid.uuid4()}_{file.filename}"
file_path = os.path.join(tempfile.gettempdir(), unique_filename)
with open(file_path, "wb") as buffer:
buffer.write(await file.read())
public_link = upload_to_drive(file_path, FOLDER_ID)
os.remove(file_path)
return {"message": "File uploaded successfully", "link": public_link}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))