import os import json import logging from dotenv import load_dotenv from PyPDF2 import PdfReader from pptx import Presentation from langchain.text_splitter import CharacterTextSplitter from goose3 import Goose import streamlit as st import whisper from pytube import YouTube from moviepy import VideoFileClip import time from langchain_community.vectorstores import Milvus from pymilvus import Collection, connections, utility from huggingface_hub import InferenceClient from prompts import build_evaluation_prompt EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" CHAT_MODEL = "deepseek-ai/DeepSeek-V3.2:novita" MILVUS_CONFIG = {"host": "localhost", "port": "19530"} DOCUMENT_CHUNK_SIZE = 1000 PDF_CHUNK_SIZE = 2500 PPTX_CHUNK_SIZE = 1800 CODE_CHUNK_SIZE = 1200 URL_CHUNK_SIZE = 1500 VIDEO_CHUNK_SIZE = 1000 CHUNK_OVERLAP = 150 CODE_FILE_TYPES = [ "py", "js", "ts", "jsx", "tsx", "java", "c", "cpp", "cs", "go", "rs", "php", "rb", "html", "css", "scss", "json", "yaml", "yml", "toml", "ini", "sh", "sql", "xml" ] load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s" ) connections.connect(alias="default", **MILVUS_CONFIG) HF_TOKEN = os.getenv("HF_TOKEN") def get_embeddings(): client = InferenceClient(api_key=HF_TOKEN) def embed_documents(texts): result = client.feature_extraction(texts, model=EMBEDDING_MODEL) if isinstance(result, dict): raise ValueError(f"Embedding API error: {result}") return result def embed_query(text): result = client.feature_extraction(text, model=EMBEDDING_MODEL) if isinstance(result, dict): raise ValueError(f"Embedding API error: {result}") return result return type( "EmbeddingAdapter", (), { "embed_documents": staticmethod(embed_documents), "embed_query": staticmethod(embed_query), }, )() def run_llm(prompt): client = InferenceClient(api_key=HF_TOKEN) completion = client.chat.completions.create( model=CHAT_MODEL, messages=[ { "role": "system", "content": "Answer only from the given context. Be concise and accurate." }, { "role": "user", "content": prompt } ], ) return completion.choices[0].message.content def login(): st.title("🔐 Login") user = st.text_input("Enter username") if st.button("Login"): if user: st.session_state["user_id"] = user.strip().lower() logging.info(f"Logged in as {st.session_state['user_id']}") st.success(f"Logged in as {user}") st.rerun() else: st.error("Enter username") def build_chunks(texts, metadatas, chunk_size): if not texts: return [], [] documents = CharacterTextSplitter( separator="\n", chunk_size=chunk_size, chunk_overlap=CHUNK_OVERLAP ).create_documents(texts, metadatas) return [doc.page_content for doc in documents], [doc.metadata for doc in documents] def save_source_texts(user_id, source_type, source_name, texts, locators, chunk_size): metadatas = [ { "source_type": source_type, "source_name": source_name, "locator": locator } for locator in locators ] chunks, metadatas = build_chunks(texts, metadatas, chunk_size) if not chunks: st.warning("No readable content was extracted from this source.") return process.success("Chunking done") logging.info( f"Chunking complete for {source_type} source '{source_name}' with {len(chunks)} chunks" ) collection_name = f"multigpt_{user_id}" logging.info(f"Storing {len(chunks)} chunks in collection '{collection_name}'") Milvus.from_texts( chunks, metadatas=metadatas, embedding=get_embeddings(), collection_name=collection_name, connection_args=MILVUS_CONFIG ) logging.info("Upload completed successfully") process.success("Uploaded") def ingest_text_document(file): user_id = st.session_state["user_id"] logging.info(f"Reading text file '{file.name}'") text = file.read().decode("utf-8", errors="ignore") save_source_texts(user_id, "text", file.name, [text], [""], DOCUMENT_CHUNK_SIZE) def ingest_pdf_document(file): user_id = st.session_state["user_id"] logging.info(f"Reading PDF '{file.name}'") reader = PdfReader(file) texts = [] locators = [] for index, page in enumerate(reader.pages, start=1): page_text = page.extract_text() or "" if page_text.strip(): texts.append(page_text) locators.append(f"page={index}") save_source_texts(user_id, "pdf", file.name, texts, locators, PDF_CHUNK_SIZE) def ingest_pptx_document(file): user_id = st.session_state["user_id"] logging.info(f"Reading PPTX '{file.name}'") presentation = Presentation(file) texts = [] locators = [] for index, slide in enumerate(presentation.slides, start=1): slide_parts = [] for shape in slide.shapes: if hasattr(shape, "text") and shape.text: slide_parts.append(shape.text) slide_text = "\n".join(part.strip() for part in slide_parts if part.strip()) if slide_text: texts.append(slide_text) locators.append(f"slide={index}") save_source_texts(user_id, "pptx", file.name, texts, locators, PPTX_CHUNK_SIZE) def ingest_code_files(files): user_id = st.session_state["user_id"] for file in files: logging.info(f"Reading code file '{file.name}'") text = file.read().decode("utf-8", errors="ignore") save_source_texts(user_id, "code", file.name, [text], [file.name], CODE_CHUNK_SIZE) def ingest_url(url): user_id = st.session_state["user_id"] logging.info(f"Fetching URL '{url}'") g = Goose() text = g.extract(url=url).cleaned_text save_source_texts(user_id, "url", url, [text], [url], URL_CHUNK_SIZE) def ingest_youtube_video(link): user_id = st.session_state["user_id"] logging.info(f"Starting video ingestion for '{link}'") yt = YouTube(link).streams.get_highest_resolution() yt.download(filename="video.mp4") process.success("Downloading video") logging.info("Video download completed") while not os.path.exists("video.mp4"): time.sleep(5) video = VideoFileClip("video.mp4") process.warning("Extracting audio") logging.info("Extracting audio from video") audio = video.audio audio.write_audiofile("audio.mp3") process.warning("Transcribing") logging.info("Running Whisper transcription") model = whisper.load_model("base") result = model.transcribe("audio.mp3") save_source_texts(user_id, "video", link, [result["text"]], [link], VIDEO_CHUNK_SIZE) def get_vector_store(collection_name): return Milvus( embedding_function=get_embeddings(), collection_name=collection_name, connection_args=MILVUS_CONFIG ) def collection_has_data(collection_name): if not utility.has_collection(collection_name): return False return get_vector_store(collection_name).col.num_entities > 0 def get_source_inventory(collection_name): if not utility.has_collection(collection_name): return [] collection = Collection(collection_name) collection.load() rows = collection.query( expr="pk >= 0", output_fields=["source_type", "source_name", "locator"] ) summary = {} for row in rows: key = (row.get("source_type", "unknown"), row.get("source_name", "unknown")) if key not in summary: summary[key] = { "source_type": key[0], "source_name": key[1], "chunks": 0, "locators": set() } summary[key]["chunks"] += 1 if row.get("locator"): summary[key]["locators"].add(row["locator"]) inventory = [] for item in summary.values(): inventory.append( { "source_type": item["source_type"], "source_name": item["source_name"], "chunks": item["chunks"], "locators": sorted(item["locators"]) if item["locators"] else [] } ) return sorted(inventory, key=lambda item: (item["source_type"], item["source_name"])) def render_evidence_inventory(): user_id = st.session_state["user_id"] collection_name = f"multigpt_{user_id}" st.subheader("Evidence Inventory") if not utility.has_collection(collection_name): logging.info(f"No collection found yet for '{collection_name}'") st.info("No project data has been uploaded for this user yet.") return inventory = get_source_inventory(collection_name) total_chunks = sum(item["chunks"] for item in inventory) logging.info( f"Loaded inventory for '{collection_name}' with {len(inventory)} sources and {total_chunks} chunks" ) st.caption(f"{len(inventory)} sources indexed across {total_chunks} chunks") if not inventory: st.info("The collection exists, but no source records were found.") return table_rows = [] for item in inventory: table_rows.append( { "Type": item["source_type"].upper(), "Source": item["source_name"], "Chunks": item["chunks"], "Locators": len(item["locators"]) } ) st.table(table_rows) def format_context(documents): entries = [] for index, doc in enumerate(documents, start=1): metadata = doc.metadata or {} source_type = metadata.get("source_type", "unknown") source_name = metadata.get("source_name", "unknown") locator_text = metadata.get("locator", "locator=unknown") entries.append( f"[Evidence {index}] source_type={source_type}; " f"source_name={source_name}; locator={locator_text}\n" f"{doc.page_content}" ) return "\n\n".join(entries) def get_rubric_criteria(): return [ "Problem Understanding", "Technical Approach", "Implementation Quality", "Innovation / Originality", "Communication & Demo Clarity", "Claim vs Reality Alignment", "Prototype Functionality" ] def parse_json_response(raw_response): try: return json.loads(raw_response) except json.JSONDecodeError: start = raw_response.find("{") end = raw_response.rfind("}") if start != -1 and end != -1 and end > start: return json.loads(raw_response[start:end + 1]) raise def normalize_evaluation_response(data): defaults = { "project_summary": { "purpose": "", "high_level_description": "" }, "sources_used": [], "claims_detected": [], "capabilities_detected": [], "evidence": [], "gaps_or_risks": [], "scores": [], "overall_assessment": { "verdict": "", "confidence": "low", "reason": "" } } if not isinstance(data, dict): return defaults normalized = defaults.copy() normalized.update({key: value for key, value in data.items() if key in normalized}) if not isinstance(normalized["project_summary"], dict): normalized["project_summary"] = defaults["project_summary"] else: normalized["project_summary"] = { "purpose": normalized["project_summary"].get("purpose", ""), "high_level_description": normalized["project_summary"].get("high_level_description", "") } if not isinstance(normalized["overall_assessment"], dict): normalized["overall_assessment"] = defaults["overall_assessment"] else: normalized["overall_assessment"] = { "verdict": normalized["overall_assessment"].get("verdict", ""), "confidence": normalized["overall_assessment"].get("confidence", "low"), "reason": normalized["overall_assessment"].get("reason", "") } for key in ["sources_used", "claims_detected", "capabilities_detected", "evidence", "gaps_or_risks", "scores"]: if not isinstance(normalized[key], list): normalized[key] = [] score_lookup = {} for item in normalized["scores"]: if not isinstance(item, dict): continue criterion = item.get("criterion") if criterion: score_lookup[criterion] = { "criterion": criterion, "score": max(1, min(5, int(item.get("score", 1)))) if str(item.get("score", "")).isdigit() else 1, "reasoning": item.get("reasoning", ""), "citations": item.get("citations", []) if isinstance(item.get("citations", []), list) else [], "confidence": max(0.0, min(1.0, float(item.get("confidence", 0.0)))) if isinstance(item.get("confidence", 0.0), (int, float)) else 0.0 } normalized["scores"] = [] for criterion in get_rubric_criteria(): normalized["scores"].append( score_lookup.get( criterion, { "criterion": criterion, "score": 1, "reasoning": "", "citations": [], "confidence": 0.0 } ) ) return normalized def run_evaluation(): user_id = st.session_state["user_id"] collection_name = f"multigpt_{user_id}" logging.info(f"Starting evaluation for collection '{collection_name}'") if not collection_has_data(collection_name): logging.info("Evaluation skipped because no uploaded project data was found") st.warning("No uploaded project data found for this user yet.") return process.warning("Retrieving project evidence") logging.info("Retrieving project evidence from Milvus") db = get_vector_store(collection_name) documents = db.similarity_search( "Evaluate this software project using all available uploaded evidence. " "Summarize capabilities, evidence, gaps, and overall assessment.", k=16 ) if not documents: logging.info("Evaluation stopped because no retrievable evidence was found") st.warning("No retrievable evidence was found for evaluation.") return prompt = build_evaluation_prompt(format_context(documents), get_rubric_criteria()) process.warning("Running evaluation") logging.info(f"Running evaluator on {len(documents)} retrieved evidence chunks") raw_response = run_llm(prompt) try: parsed_response = normalize_evaluation_response(parse_json_response(raw_response)) except json.JSONDecodeError: logging.info("Model response was not valid JSON") st.error("The model response was not valid JSON.") st.code(raw_response, language="json") return logging.info("Evaluation completed successfully") process.success("Evaluation ready") st.json(parsed_response) def add_evidence_page(): placeholder.title("Add Evidence") choice = st.sidebar.radio("Evidence Type", ['', 'DOCUMENT', 'CODE', 'URL', 'VIDEO']) if choice == 'DOCUMENT': st.caption("Upload decks, notes, specs, or README-style documents.") file = st.file_uploader("Upload document", type=["txt", "md", "pdf", "pptx"]) if file: extension = os.path.splitext(file.name)[1].lower() if extension in [".txt", ".md"]: ingest_text_document(file) elif extension == ".pdf": ingest_pdf_document(file) elif extension == ".pptx": ingest_pptx_document(file) else: st.error("Unsupported document type.") elif choice == 'CODE': st.caption("Upload source or configuration files that represent the implementation.") files = st.file_uploader( "Upload code files", type=CODE_FILE_TYPES, accept_multiple_files=True ) if files: ingest_code_files(files) elif choice == 'URL': st.caption("Add a product page, documentation page, or prototype URL.") url = st.text_input("Enter URL") if url: ingest_url(url) elif choice == 'VIDEO': st.caption("Add a YouTube demo or walkthrough link.") link = st.text_input("YouTube link") if link: ingest_youtube_video(link) def evaluate_page(): placeholder.title("Run Evaluation") st.write("Generate a structured evaluation using all uploaded evidence for this submission.") render_evidence_inventory() if st.button("Run Evaluation"): run_evaluation() def main(): global placeholder, process placeholder = st.empty() process = st.empty() if "user_id" not in st.session_state: login() return st.sidebar.write(f"👤 {st.session_state['user_id']}") page = st.sidebar.radio("Navigate", ['Add Evidence', 'Evaluate', 'Logout']) if page == "Add Evidence": add_evidence_page() elif page == "Evaluate": evaluate_page() elif page == "Logout": logging.info("Logging out and clearing session") st.session_state.clear() st.rerun() if __name__ == "__main__": main()