|
|
import os |
|
|
from dotenv import load_dotenv |
|
|
import gradio as gr |
|
|
|
|
|
|
|
|
load_dotenv() |
|
|
COHERE_API_KEY = os.getenv("COHERE_API_KEY") |
|
|
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY") |
|
|
|
|
|
if not COHERE_API_KEY or not GEMINI_API_KEY: |
|
|
raise ValueError("COHERE_API_KEY or GEMINI_API_KEY is missing") |
|
|
|
|
|
|
|
|
import cohere |
|
|
import chromadb |
|
|
from google import genai |
|
|
from google.genai import types |
|
|
|
|
|
|
|
|
co = cohere.Client(COHERE_API_KEY) |
|
|
|
|
|
|
|
|
genai_client = genai.Client(api_key=GEMINI_API_KEY) |
|
|
|
|
|
|
|
|
client = chromadb.Client() |
|
|
|
|
|
|
|
|
collection = client.get_or_create_collection(name="inha-well", embedding_function=None) |
|
|
|
|
|
|
|
|
total_docs = collection.count() if hasattr(collection, 'count') else len(collection.get()['documents']) |
|
|
|
|
|
if total_docs == 0: |
|
|
content_chunks = [] |
|
|
for i in range(1, 4): |
|
|
|
|
|
folder_path = os.path.join(os.getcwd(), "docs", f"p0000{i}") |
|
|
|
|
|
|
|
|
if not os.path.exists(folder_path): |
|
|
print(f"Warning: Folder {folder_path} not found") |
|
|
continue |
|
|
|
|
|
for filename in os.listdir(folder_path): |
|
|
if filename.endswith(".txt"): |
|
|
with open(os.path.join(folder_path, filename), "r") as f: |
|
|
content = f.read() |
|
|
content_chunks.append(f"search_document: {content}") |
|
|
|
|
|
if content_chunks: |
|
|
response = co.embed( |
|
|
texts=content_chunks, |
|
|
model="embed-english-v3.0", |
|
|
input_type="search_document" |
|
|
) |
|
|
embeddings = response.embeddings |
|
|
|
|
|
collection.add( |
|
|
ids=[str(i) for i in range(len(content_chunks))], |
|
|
documents=content_chunks, |
|
|
embeddings=embeddings |
|
|
) |
|
|
|
|
|
def retrieve_context(question, collection, top_k=2): |
|
|
qr = co.embed( |
|
|
texts=[question], |
|
|
model="embed-english-v3.0", |
|
|
input_type="search_query" |
|
|
) |
|
|
emb = qr.embeddings[0] |
|
|
results = collection.query(query_embeddings=[emb], n_results=top_k) |
|
|
return "\n".join(results["documents"][0]) |
|
|
|
|
|
def get_prompt_plain(context: str, question: str) -> str: |
|
|
return f""" |
|
|
<<START>> |
|
|
You are a responsible person for answering Inha University (South Korea) information. Using the context below, answer within 300 tokens. |
|
|
Create interactive, well-structured answers using bullet points, bold text, and proper formatting to make the information concise, answer-oriented, clear and easy to read. |
|
|
Do not repeat the prompt text in your output. |
|
|
And when context doesn't provide what user hasn't asked, don't mention it. Instead, just say in polite way you don't know it |
|
|
And in context text, there always will be link where this info is taken. at the end of your response, say that user can visit this link for official information and provide link when it is valid real question |
|
|
|
|
|
|
|
|
And when user asks non-question things, for example saying just "Hello or Hi" or write any unpredicted letters or numbers or any non question phrases, sentences, don't provide link, again don't provide link. |
|
|
examples: |
|
|
User: Hello |
|
|
You(Assistant): Hi, how can i help you? what do you wanna know about Inha SGCS? |
|
|
or |
|
|
User: 32e32x23e |
|
|
You(Assistant): Sorry, if you write clear questions, I would help you find specific answers |
|
|
Context: |
|
|
"{context}" |
|
|
|
|
|
Question: {question} |
|
|
|
|
|
Answer: |
|
|
<<END>>""" |
|
|
|
|
|
def generate_agent_answer(context: str, question: str) -> str: |
|
|
prompt = get_prompt_plain(context, question) |
|
|
response = genai_client.models.generate_content( |
|
|
model="gemini-2.5-flash-lite", |
|
|
contents=prompt, |
|
|
config=types.GenerateContentConfig( |
|
|
temperature=0.01, |
|
|
top_p=0.8, |
|
|
stop_sequences=["<<END>>", "<<START>>"] |
|
|
) |
|
|
) |
|
|
|
|
|
return response.text.strip() |
|
|
|
|
|
def rag_answer(question: str, collection) -> str: |
|
|
context = retrieve_context(question, collection, top_k=2) |
|
|
return generate_agent_answer(context, question) |
|
|
|
|
|
from datasets import Dataset, load_dataset |
|
|
from huggingface_hub import HfApi |
|
|
from datetime import datetime |
|
|
import pandas as pd |
|
|
import uuid |
|
|
import os |
|
|
|
|
|
|
|
|
|
|
|
def answer_question(question): |
|
|
""" |
|
|
Main function that processes the question and returns the answer |
|
|
""" |
|
|
if not question.strip(): |
|
|
return "Please enter a question about Inha University." |
|
|
|
|
|
try: |
|
|
answer = rag_answer(question, collection) |
|
|
return answer |
|
|
except Exception as e: |
|
|
return f"Sorry, I encountered an error: {str(e)}" |
|
|
|
|
|
|
|
|
|
|
|
demo = gr.Interface( |
|
|
fn=answer_question, |
|
|
inputs=gr.Textbox( |
|
|
label="Ask me anything about Inha University SGCSβ¦", |
|
|
placeholder="e.g. How many Major Required credits should I take for graduation? ", |
|
|
lines=2 |
|
|
), |
|
|
outputs=gr.Markdown( |
|
|
label="π Answer", |
|
|
show_copy_button=True |
|
|
), |
|
|
title="π Inha University SGCS Info Assistant", |
|
|
description="Get answers to your questions about Inha University SGCS .", |
|
|
theme=gr.themes.Soft(), |
|
|
examples=[ |
|
|
["What classes should I normally take as 3nd semester ISE student?"], |
|
|
["Tell me about student organizations and activities"], |
|
|
["What percentage scholarship could I recieve with IELTS 7.0"] |
|
|
] |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
|
demo.launch( |
|
|
share=True, |
|
|
server_name="0.0.0.0", |
|
|
|
|
|
) |