Evaluator-core / app.py
jayeshdiro
Initial commit
facefda
import os
import json
import logging
from dotenv import load_dotenv
from PyPDF2 import PdfReader
from pptx import Presentation
from langchain.text_splitter import CharacterTextSplitter
from goose3 import Goose
import streamlit as st
import whisper
from pytube import YouTube
from moviepy import VideoFileClip
import time
from langchain_community.vectorstores import Milvus
from pymilvus import Collection, connections, utility
from huggingface_hub import InferenceClient
from prompts import build_evaluation_prompt
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
CHAT_MODEL = "deepseek-ai/DeepSeek-V3.2:novita"
MILVUS_CONFIG = {"host": "localhost", "port": "19530"}
DOCUMENT_CHUNK_SIZE = 1000
PDF_CHUNK_SIZE = 2500
PPTX_CHUNK_SIZE = 1800
CODE_CHUNK_SIZE = 1200
URL_CHUNK_SIZE = 1500
VIDEO_CHUNK_SIZE = 1000
CHUNK_OVERLAP = 150
CODE_FILE_TYPES = [
"py", "js", "ts", "jsx", "tsx", "java", "c", "cpp", "cs", "go", "rs",
"php", "rb", "html", "css", "scss", "json", "yaml", "yml", "toml",
"ini", "sh", "sql", "xml"
]
load_dotenv()
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s"
)
connections.connect(alias="default", **MILVUS_CONFIG)
HF_TOKEN = os.getenv("HF_TOKEN")
def get_embeddings():
client = InferenceClient(api_key=HF_TOKEN)
def embed_documents(texts):
result = client.feature_extraction(texts, model=EMBEDDING_MODEL)
if isinstance(result, dict):
raise ValueError(f"Embedding API error: {result}")
return result
def embed_query(text):
result = client.feature_extraction(text, model=EMBEDDING_MODEL)
if isinstance(result, dict):
raise ValueError(f"Embedding API error: {result}")
return result
return type(
"EmbeddingAdapter",
(),
{
"embed_documents": staticmethod(embed_documents),
"embed_query": staticmethod(embed_query),
},
)()
def run_llm(prompt):
client = InferenceClient(api_key=HF_TOKEN)
completion = client.chat.completions.create(
model=CHAT_MODEL,
messages=[
{
"role": "system",
"content": "Answer only from the given context. Be concise and accurate."
},
{
"role": "user",
"content": prompt
}
],
)
return completion.choices[0].message.content
def login():
st.title("🔐 Login")
user = st.text_input("Enter username")
if st.button("Login"):
if user:
st.session_state["user_id"] = user.strip().lower()
logging.info(f"Logged in as {st.session_state['user_id']}")
st.success(f"Logged in as {user}")
st.rerun()
else:
st.error("Enter username")
def build_chunks(texts, metadatas, chunk_size):
if not texts:
return [], []
documents = CharacterTextSplitter(
separator="\n",
chunk_size=chunk_size,
chunk_overlap=CHUNK_OVERLAP
).create_documents(texts, metadatas)
return [doc.page_content for doc in documents], [doc.metadata for doc in documents]
def save_source_texts(user_id, source_type, source_name, texts, locators, chunk_size):
metadatas = [
{
"source_type": source_type,
"source_name": source_name,
"locator": locator
}
for locator in locators
]
chunks, metadatas = build_chunks(texts, metadatas, chunk_size)
if not chunks:
st.warning("No readable content was extracted from this source.")
return
process.success("Chunking done")
logging.info(
f"Chunking complete for {source_type} source '{source_name}' with {len(chunks)} chunks"
)
collection_name = f"multigpt_{user_id}"
logging.info(f"Storing {len(chunks)} chunks in collection '{collection_name}'")
Milvus.from_texts(
chunks,
metadatas=metadatas,
embedding=get_embeddings(),
collection_name=collection_name,
connection_args=MILVUS_CONFIG
)
logging.info("Upload completed successfully")
process.success("Uploaded")
def ingest_text_document(file):
user_id = st.session_state["user_id"]
logging.info(f"Reading text file '{file.name}'")
text = file.read().decode("utf-8", errors="ignore")
save_source_texts(user_id, "text", file.name, [text], [""], DOCUMENT_CHUNK_SIZE)
def ingest_pdf_document(file):
user_id = st.session_state["user_id"]
logging.info(f"Reading PDF '{file.name}'")
reader = PdfReader(file)
texts = []
locators = []
for index, page in enumerate(reader.pages, start=1):
page_text = page.extract_text() or ""
if page_text.strip():
texts.append(page_text)
locators.append(f"page={index}")
save_source_texts(user_id, "pdf", file.name, texts, locators, PDF_CHUNK_SIZE)
def ingest_pptx_document(file):
user_id = st.session_state["user_id"]
logging.info(f"Reading PPTX '{file.name}'")
presentation = Presentation(file)
texts = []
locators = []
for index, slide in enumerate(presentation.slides, start=1):
slide_parts = []
for shape in slide.shapes:
if hasattr(shape, "text") and shape.text:
slide_parts.append(shape.text)
slide_text = "\n".join(part.strip() for part in slide_parts if part.strip())
if slide_text:
texts.append(slide_text)
locators.append(f"slide={index}")
save_source_texts(user_id, "pptx", file.name, texts, locators, PPTX_CHUNK_SIZE)
def ingest_code_files(files):
user_id = st.session_state["user_id"]
for file in files:
logging.info(f"Reading code file '{file.name}'")
text = file.read().decode("utf-8", errors="ignore")
save_source_texts(user_id, "code", file.name, [text], [file.name], CODE_CHUNK_SIZE)
def ingest_url(url):
user_id = st.session_state["user_id"]
logging.info(f"Fetching URL '{url}'")
g = Goose()
text = g.extract(url=url).cleaned_text
save_source_texts(user_id, "url", url, [text], [url], URL_CHUNK_SIZE)
def ingest_youtube_video(link):
user_id = st.session_state["user_id"]
logging.info(f"Starting video ingestion for '{link}'")
yt = YouTube(link).streams.get_highest_resolution()
yt.download(filename="video.mp4")
process.success("Downloading video")
logging.info("Video download completed")
while not os.path.exists("video.mp4"):
time.sleep(5)
video = VideoFileClip("video.mp4")
process.warning("Extracting audio")
logging.info("Extracting audio from video")
audio = video.audio
audio.write_audiofile("audio.mp3")
process.warning("Transcribing")
logging.info("Running Whisper transcription")
model = whisper.load_model("base")
result = model.transcribe("audio.mp3")
save_source_texts(user_id, "video", link, [result["text"]], [link], VIDEO_CHUNK_SIZE)
def get_vector_store(collection_name):
return Milvus(
embedding_function=get_embeddings(),
collection_name=collection_name,
connection_args=MILVUS_CONFIG
)
def collection_has_data(collection_name):
if not utility.has_collection(collection_name):
return False
return get_vector_store(collection_name).col.num_entities > 0
def get_source_inventory(collection_name):
if not utility.has_collection(collection_name):
return []
collection = Collection(collection_name)
collection.load()
rows = collection.query(
expr="pk >= 0",
output_fields=["source_type", "source_name", "locator"]
)
summary = {}
for row in rows:
key = (row.get("source_type", "unknown"), row.get("source_name", "unknown"))
if key not in summary:
summary[key] = {
"source_type": key[0],
"source_name": key[1],
"chunks": 0,
"locators": set()
}
summary[key]["chunks"] += 1
if row.get("locator"):
summary[key]["locators"].add(row["locator"])
inventory = []
for item in summary.values():
inventory.append(
{
"source_type": item["source_type"],
"source_name": item["source_name"],
"chunks": item["chunks"],
"locators": sorted(item["locators"]) if item["locators"] else []
}
)
return sorted(inventory, key=lambda item: (item["source_type"], item["source_name"]))
def render_evidence_inventory():
user_id = st.session_state["user_id"]
collection_name = f"multigpt_{user_id}"
st.subheader("Evidence Inventory")
if not utility.has_collection(collection_name):
logging.info(f"No collection found yet for '{collection_name}'")
st.info("No project data has been uploaded for this user yet.")
return
inventory = get_source_inventory(collection_name)
total_chunks = sum(item["chunks"] for item in inventory)
logging.info(
f"Loaded inventory for '{collection_name}' with {len(inventory)} sources and {total_chunks} chunks"
)
st.caption(f"{len(inventory)} sources indexed across {total_chunks} chunks")
if not inventory:
st.info("The collection exists, but no source records were found.")
return
table_rows = []
for item in inventory:
table_rows.append(
{
"Type": item["source_type"].upper(),
"Source": item["source_name"],
"Chunks": item["chunks"],
"Locators": len(item["locators"])
}
)
st.table(table_rows)
def format_context(documents):
entries = []
for index, doc in enumerate(documents, start=1):
metadata = doc.metadata or {}
source_type = metadata.get("source_type", "unknown")
source_name = metadata.get("source_name", "unknown")
locator_text = metadata.get("locator", "locator=unknown")
entries.append(
f"[Evidence {index}] source_type={source_type}; "
f"source_name={source_name}; locator={locator_text}\n"
f"{doc.page_content}"
)
return "\n\n".join(entries)
def get_rubric_criteria():
return [
"Problem Understanding",
"Technical Approach",
"Implementation Quality",
"Innovation / Originality",
"Communication & Demo Clarity",
"Claim vs Reality Alignment",
"Prototype Functionality"
]
def parse_json_response(raw_response):
try:
return json.loads(raw_response)
except json.JSONDecodeError:
start = raw_response.find("{")
end = raw_response.rfind("}")
if start != -1 and end != -1 and end > start:
return json.loads(raw_response[start:end + 1])
raise
def normalize_evaluation_response(data):
defaults = {
"project_summary": {
"purpose": "",
"high_level_description": ""
},
"sources_used": [],
"claims_detected": [],
"capabilities_detected": [],
"evidence": [],
"gaps_or_risks": [],
"scores": [],
"overall_assessment": {
"verdict": "",
"confidence": "low",
"reason": ""
}
}
if not isinstance(data, dict):
return defaults
normalized = defaults.copy()
normalized.update({key: value for key, value in data.items() if key in normalized})
if not isinstance(normalized["project_summary"], dict):
normalized["project_summary"] = defaults["project_summary"]
else:
normalized["project_summary"] = {
"purpose": normalized["project_summary"].get("purpose", ""),
"high_level_description": normalized["project_summary"].get("high_level_description", "")
}
if not isinstance(normalized["overall_assessment"], dict):
normalized["overall_assessment"] = defaults["overall_assessment"]
else:
normalized["overall_assessment"] = {
"verdict": normalized["overall_assessment"].get("verdict", ""),
"confidence": normalized["overall_assessment"].get("confidence", "low"),
"reason": normalized["overall_assessment"].get("reason", "")
}
for key in ["sources_used", "claims_detected", "capabilities_detected", "evidence", "gaps_or_risks", "scores"]:
if not isinstance(normalized[key], list):
normalized[key] = []
score_lookup = {}
for item in normalized["scores"]:
if not isinstance(item, dict):
continue
criterion = item.get("criterion")
if criterion:
score_lookup[criterion] = {
"criterion": criterion,
"score": max(1, min(5, int(item.get("score", 1)))) if str(item.get("score", "")).isdigit() else 1,
"reasoning": item.get("reasoning", ""),
"citations": item.get("citations", []) if isinstance(item.get("citations", []), list) else [],
"confidence": max(0.0, min(1.0, float(item.get("confidence", 0.0)))) if isinstance(item.get("confidence", 0.0), (int, float)) else 0.0
}
normalized["scores"] = []
for criterion in get_rubric_criteria():
normalized["scores"].append(
score_lookup.get(
criterion,
{
"criterion": criterion,
"score": 1,
"reasoning": "",
"citations": [],
"confidence": 0.0
}
)
)
return normalized
def run_evaluation():
user_id = st.session_state["user_id"]
collection_name = f"multigpt_{user_id}"
logging.info(f"Starting evaluation for collection '{collection_name}'")
if not collection_has_data(collection_name):
logging.info("Evaluation skipped because no uploaded project data was found")
st.warning("No uploaded project data found for this user yet.")
return
process.warning("Retrieving project evidence")
logging.info("Retrieving project evidence from Milvus")
db = get_vector_store(collection_name)
documents = db.similarity_search(
"Evaluate this software project using all available uploaded evidence. "
"Summarize capabilities, evidence, gaps, and overall assessment.",
k=16
)
if not documents:
logging.info("Evaluation stopped because no retrievable evidence was found")
st.warning("No retrievable evidence was found for evaluation.")
return
prompt = build_evaluation_prompt(format_context(documents), get_rubric_criteria())
process.warning("Running evaluation")
logging.info(f"Running evaluator on {len(documents)} retrieved evidence chunks")
raw_response = run_llm(prompt)
try:
parsed_response = normalize_evaluation_response(parse_json_response(raw_response))
except json.JSONDecodeError:
logging.info("Model response was not valid JSON")
st.error("The model response was not valid JSON.")
st.code(raw_response, language="json")
return
logging.info("Evaluation completed successfully")
process.success("Evaluation ready")
st.json(parsed_response)
def add_evidence_page():
placeholder.title("Add Evidence")
choice = st.sidebar.radio("Evidence Type", ['', 'DOCUMENT', 'CODE', 'URL', 'VIDEO'])
if choice == 'DOCUMENT':
st.caption("Upload decks, notes, specs, or README-style documents.")
file = st.file_uploader("Upload document", type=["txt", "md", "pdf", "pptx"])
if file:
extension = os.path.splitext(file.name)[1].lower()
if extension in [".txt", ".md"]:
ingest_text_document(file)
elif extension == ".pdf":
ingest_pdf_document(file)
elif extension == ".pptx":
ingest_pptx_document(file)
else:
st.error("Unsupported document type.")
elif choice == 'CODE':
st.caption("Upload source or configuration files that represent the implementation.")
files = st.file_uploader(
"Upload code files",
type=CODE_FILE_TYPES,
accept_multiple_files=True
)
if files:
ingest_code_files(files)
elif choice == 'URL':
st.caption("Add a product page, documentation page, or prototype URL.")
url = st.text_input("Enter URL")
if url:
ingest_url(url)
elif choice == 'VIDEO':
st.caption("Add a YouTube demo or walkthrough link.")
link = st.text_input("YouTube link")
if link:
ingest_youtube_video(link)
def evaluate_page():
placeholder.title("Run Evaluation")
st.write("Generate a structured evaluation using all uploaded evidence for this submission.")
render_evidence_inventory()
if st.button("Run Evaluation"):
run_evaluation()
def main():
global placeholder, process
placeholder = st.empty()
process = st.empty()
if "user_id" not in st.session_state:
login()
return
st.sidebar.write(f"👤 {st.session_state['user_id']}")
page = st.sidebar.radio("Navigate", ['Add Evidence', 'Evaluate', 'Logout'])
if page == "Add Evidence":
add_evidence_page()
elif page == "Evaluate":
evaluate_page()
elif page == "Logout":
logging.info("Logging out and clearing session")
st.session_state.clear()
st.rerun()
if __name__ == "__main__":
main()