Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| from fastapi import FastAPI, HTTPException, File, UploadFile | |
| from pydantic import BaseModel | |
| from firebase_admin import credentials, firestore | |
| from langchain_google_firestore import FirestoreChatMessageHistory | |
| from langchain.memory import ConversationBufferWindowMemory | |
| from langchain.schema import HumanMessage, AIMessage | |
| from dotenv import load_dotenv | |
| load_dotenv() | |
| from google.oauth2 import service_account | |
| from googleapiclient.discovery import build | |
| from googleapiclient.http import MediaFileUpload | |
| from fastapi.middleware.cors import CORSMiddleware | |
| import uuid | |
| import tempfile | |
| import firebase_admin | |
| from agent.retrive_agent import run_llm | |
| from pinecone import Pinecone | |
| from langchain_openai import OpenAIEmbeddings | |
| from datetime import datetime | |
| # Load environment variables | |
| # Memuat FIREBASE_SECRET dari environment variable | |
| # Memuat SERVICE_ACCOUNT_SECRET dari environment variable | |
| service_account_secret_string = os.getenv("SERVICE_ACCOUNT_SECRET") | |
| if not service_account_secret_string: | |
| raise ValueError("Environment variable SERVICE_ACCOUNT_SECRET is not set or empty.") | |
| service_account_secret_dict = json.loads(service_account_secret_string) | |
| # Initialize Google Drive API | |
| FOLDER_ID = '1SUjdlLkt58srCwSffDMAoZJKpzPCWszn' | |
| drive_creds = service_account.Credentials.from_service_account_info(service_account_secret_dict) | |
| drive_service = build('drive', 'v3', credentials=drive_creds) | |
| chatgpt_api_key = os.getenv("OPENAI_API_KEY") | |
| pinecone_api_key = os.getenv("PINECONE_API_KEY") | |
| firebase_credentials_json = os.getenv("FIREBASE_SECRET") | |
| # Load environment variables | |
| load_dotenv() | |
| # Initialize FastAPI | |
| app = FastAPI() | |
| async def check(): | |
| return "API redi bolo !" | |
| app.add_middleware( | |
| CORSMiddleware, | |
| allow_origins=["*"], # Gantilah "*" dengan URL spesifik jika perlu | |
| allow_credentials=True, | |
| allow_methods=["*"], | |
| allow_headers=["*"], | |
| ) | |
| # Define data models | |
| class ChatRequest(BaseModel): | |
| user_id: str | |
| prompt: str | |
| class NewSessionRequest(BaseModel): | |
| user_id: str | |
| class UserInput(BaseModel): | |
| user_id: str | |
| # Initialize Firebase | |
| PROJECT_ID = "recommendation-system-mage" | |
| COLLECTION_NAME = "data_user" | |
| # if not firebase_admin._apps: | |
| # cred = credentials.Certificate("./recommendation-system-mage-firebase-adminsdk-ds1lw-1ac94ba6dd.json") | |
| # firebase_admin.initialize_app(cred) | |
| # client = firestore.client() | |
| if firebase_credentials_json: | |
| firebase_credentials_dict = json.loads(firebase_credentials_json) | |
| # Inisialisasi Firebase hanya jika belum diinisialisasi | |
| if not firebase_admin._apps: | |
| cred = credentials.Certificate(firebase_credentials_dict) | |
| firebase_admin.initialize_app(cred) | |
| # Membuat klien Firestore | |
| client = firestore.client() | |
| pc = Pinecone(api_key=pinecone_api_key) | |
| index_name = "mage-x-embeddings-all" | |
| index = pc.Index(index_name) | |
| embedding_model = OpenAIEmbeddings(model="text-embedding-ada-002", openai_api_key=chatgpt_api_key) | |
| def create_embeddings(text): | |
| return embedding_model.embed_query(text) | |
| def fetch_and_embed_data(user_id): | |
| collections = ["data_wisata", "data_umkm", "data_berita"] | |
| for collection in collections: | |
| doc_ref = client.collection(collection).document(user_id) | |
| doc = doc_ref.get() | |
| if doc.exists: | |
| data = doc.to_dict() | |
| if 'created_at' in data and data['created_at'] is not None: | |
| data['created_at'] = data['created_at'].replace(tzinfo=None).isoformat() | |
| try: | |
| created_at_str = data["created_at"] | |
| created_at_timestamp = int(datetime.strptime(created_at_str, "%Y-%m-%dT%H:%M:%SZ").timestamp() * 1000) | |
| except ValueError: | |
| print(f"Error: Format tanggal tidak valid untuk item: {doc.id}") | |
| created_at_timestamp = None | |
| text = ' '.join(str(value) for value in data.values() if value is not None) | |
| print(text) | |
| embedding = create_embeddings(text) | |
| metadata = { | |
| "firebase_id": str(doc.id), | |
| "created_at": created_at_timestamp, # Gunakan timestamp di metadata | |
| "likes_count": data.get("likes_count"), # Contoh metadata tambahan | |
| "location": data.get("location", ""), # Contoh metadata tambahan | |
| "category": data.get("category", ""), # Contoh metadata tambahan | |
| "collection_type": collection, | |
| "text": json.dumps(data) | |
| } | |
| index.upsert(vectors=[{ | |
| "id": str(doc.id), | |
| "values": embedding, | |
| "metadata": metadata, | |
| }]) | |
| output = f"Data {user_id} berhasil di embbedings" | |
| return output | |
| async def get_recommendations(user_input: UserInput): | |
| user_id = user_input.user_id | |
| processed_documents = fetch_and_embed_data(user_id) | |
| return { | |
| "status": "success", | |
| "processed_documents": processed_documents | |
| } | |
| def create_embeddings(text): | |
| return embedding_model.embed_query(text) | |
| import json | |
| def recommend_similar_items(user_preferences, desired_collection_types): | |
| # Menggabungkan embedding untuk semua referensi pengguna | |
| combined_embeddings = [] | |
| for preference in user_preferences: | |
| embedding = create_embeddings(preference) | |
| combined_embeddings.append(embedding) | |
| # Hitung rata-rata embedding jika ada lebih dari satu preferensi | |
| if combined_embeddings: | |
| user_embedding = [ | |
| sum(values) / len(values) | |
| for values in zip(*combined_embeddings) | |
| ] | |
| else: | |
| raise HTTPException(status_code=400, detail="User preferences are empty.") | |
| # Query Pinecone menggunakan rata-rata embedding | |
| all_items_response = index.query( | |
| namespace="", | |
| vector=user_embedding, | |
| top_k=20, # Ambil maksimal 20 item | |
| include_values=True, | |
| include_metadata=True, | |
| ) | |
| # Filter hasil berdasarkan koleksi yang diinginkan | |
| filtered_matches = [] | |
| for match in all_items_response['matches']: | |
| collection_type = match['metadata']['collection_type'] | |
| if collection_type in desired_collection_types: | |
| # Parse metadata text to dictionary | |
| metadata_text = match['metadata'].get('text', '{}') | |
| metadata_dict = json.loads(metadata_text) | |
| # Add firebase_id and score to the parsed metadata | |
| parsed_result = { | |
| "firebase_id": match['metadata']['firebase_id'], | |
| "score": match['score'], | |
| **metadata_dict # Spread parsed metadata fields into the result | |
| } | |
| filtered_matches.append(parsed_result) | |
| return filtered_matches | |
| async def get_recommendations(user_input: UserInput): | |
| """ | |
| Mengembalikan rekomendasi dari kategori wisata, umkm, dan berita berdasarkan preferensi pengguna. | |
| Setiap kategori akan menyertakan maksimal 20 rekomendasi, diurutkan berdasarkan similaritas. | |
| """ | |
| user_id = user_input.user_id | |
| categories = { | |
| "wisata": "data_wisata", | |
| "umkm": "data_umkm", | |
| "berita": "data_berita" | |
| } | |
| # Ambil data pengguna dari Firestore | |
| user_doc = client.collection(COLLECTION_NAME).document(user_id).get() | |
| if not user_doc.exists: | |
| raise HTTPException(status_code=404, detail="User not found") | |
| user_data = user_doc.to_dict() | |
| # Hasil rekomendasi | |
| recommendations = {} | |
| for category, collection_name in categories.items(): | |
| # Ambil preferensi untuk kategori tertentu | |
| category_reference = user_data.get(f"{category}_reference", []) | |
| if not isinstance(category_reference, list) or not category_reference: | |
| recommendations[category] = {"error": f"No valid {category} reference found for user"} | |
| continue | |
| try: | |
| # Dapatkan rekomendasi untuk kategori ini | |
| items = recommend_similar_items( | |
| user_preferences=category_reference, | |
| desired_collection_types=[collection_name] | |
| ) | |
| # Ambil hingga 20 rekomendasi | |
| recommendations[category] = sorted(items, key=lambda x: x["score"], reverse=True)[:20] | |
| except Exception as e: | |
| recommendations[category] = {"error": str(e)} | |
| return recommendations | |
| # Endpoint to initialize a new chat session | |
| def initialize_session(request: NewSessionRequest): | |
| user_doc_ref = client.collection(COLLECTION_NAME).document(request.user_id) | |
| user_doc = user_doc_ref.get() | |
| if not user_doc.exists: | |
| raise HTTPException(status_code=404, detail="User ID not found in Firebase.") | |
| # Create new session | |
| session_id = str(uuid.uuid4()) | |
| user_doc_ref.set({"last_session_id": session_id}, merge=True) | |
| user_doc_ref.collection("message_history").document(session_id).set({}) | |
| return {"session_id": session_id} | |
| def chat(request: ChatRequest): | |
| user_doc_ref = client.collection(COLLECTION_NAME).document(request.user_id) | |
| user_doc = user_doc_ref.get() | |
| if not user_doc.exists: | |
| raise HTTPException(status_code=404, detail="User ID not found in Firebase.") | |
| # Corrected session ID retrieval | |
| session_id = user_doc.get("last_session_id") if user_doc.exists and "last_session_id" in user_doc.to_dict() else str(uuid.uuid4()) | |
| user_doc_ref.set({"last_session_id": session_id}, merge=True) | |
| # Initialize chat history and memory | |
| chat_history = FirestoreChatMessageHistory( | |
| session_id=session_id, | |
| collection=f"{COLLECTION_NAME}/{request.user_id}/message_history", | |
| client=client, | |
| ) | |
| memory = ConversationBufferWindowMemory(k=5, chat_memory=chat_history) | |
| # Generate response | |
| generated_response = run_llm(query=request.prompt, chat_history=memory.chat_memory.messages) | |
| generated_response = generated_response.replace("```", "").strip() | |
| memory.save_context({"input": request.prompt}, {"output": generated_response}) | |
| return {"response": generated_response, "session_id": session_id} | |
| # Endpoint to get chat history for a session | |
| def get_chat_history(user_id: str, session_id: str): | |
| chat_history = FirestoreChatMessageHistory( | |
| session_id=session_id, | |
| collection=f"{COLLECTION_NAME}/{user_id}/message_history", | |
| client=client, | |
| ) | |
| messages = [{"role": "user" if isinstance(msg, HumanMessage) else "assistant", "content": msg.content} for msg in chat_history.messages] | |
| return {"chat_history": messages} | |
| # Fungsi untuk mengunggah file ke Google Drive | |
| def upload_to_drive(file_path: str, folder_id: str) -> str: | |
| file_name = os.path.basename(file_path) | |
| file_metadata = { | |
| 'name': file_name, | |
| 'parents': [folder_id] | |
| } | |
| media = MediaFileUpload(file_path, resumable=True) | |
| uploaded_file = drive_service.files().create( | |
| body=file_metadata, media_body=media, fields='id' | |
| ).execute() | |
| permission = {'type': 'anyone', 'role': 'reader'} | |
| drive_service.permissions().create(fileId=uploaded_file['id'], body=permission).execute() | |
| public_url = f"https://drive.google.com/uc?export=view&id={uploaded_file['id']}" | |
| return public_url | |
| # Endpoint to upload a file to Google Drive | |
| async def upload_file(file: UploadFile = File(...)): | |
| try: | |
| unique_filename = f"{uuid.uuid4()}_{file.filename}" | |
| file_path = os.path.join(tempfile.gettempdir(), unique_filename) | |
| with open(file_path, "wb") as buffer: | |
| buffer.write(await file.read()) | |
| public_link = upload_to_drive(file_path, FOLDER_ID) | |
| os.remove(file_path) | |
| return {"message": "File uploaded successfully", "link": public_link} | |
| except Exception as e: | |
| raise HTTPException(status_code=500, detail=str(e)) | |