Spaces:
Sleeping
Sleeping
| import os | |
| import json | |
| import logging | |
| from dotenv import load_dotenv | |
| from PyPDF2 import PdfReader | |
| from pptx import Presentation | |
| from langchain.text_splitter import CharacterTextSplitter | |
| from goose3 import Goose | |
| import streamlit as st | |
| import whisper | |
| from pytube import YouTube | |
| from moviepy import VideoFileClip | |
| import time | |
| from langchain_community.vectorstores import Milvus | |
| from pymilvus import Collection, connections, utility | |
| from huggingface_hub import InferenceClient | |
| from prompts import build_evaluation_prompt | |
| EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
| CHAT_MODEL = "deepseek-ai/DeepSeek-V3.2:novita" | |
| MILVUS_CONFIG = {"host": "localhost", "port": "19530"} | |
| DOCUMENT_CHUNK_SIZE = 1000 | |
| PDF_CHUNK_SIZE = 2500 | |
| PPTX_CHUNK_SIZE = 1800 | |
| CODE_CHUNK_SIZE = 1200 | |
| URL_CHUNK_SIZE = 1500 | |
| VIDEO_CHUNK_SIZE = 1000 | |
| CHUNK_OVERLAP = 150 | |
| CODE_FILE_TYPES = [ | |
| "py", "js", "ts", "jsx", "tsx", "java", "c", "cpp", "cs", "go", "rs", | |
| "php", "rb", "html", "css", "scss", "json", "yaml", "yml", "toml", | |
| "ini", "sh", "sql", "xml" | |
| ] | |
| load_dotenv() | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s [%(levelname)s] %(message)s" | |
| ) | |
| connections.connect(alias="default", **MILVUS_CONFIG) | |
| HF_TOKEN = os.getenv("HF_TOKEN") | |
| def get_embeddings(): | |
| client = InferenceClient(api_key=HF_TOKEN) | |
| def embed_documents(texts): | |
| result = client.feature_extraction(texts, model=EMBEDDING_MODEL) | |
| if isinstance(result, dict): | |
| raise ValueError(f"Embedding API error: {result}") | |
| return result | |
| def embed_query(text): | |
| result = client.feature_extraction(text, model=EMBEDDING_MODEL) | |
| if isinstance(result, dict): | |
| raise ValueError(f"Embedding API error: {result}") | |
| return result | |
| return type( | |
| "EmbeddingAdapter", | |
| (), | |
| { | |
| "embed_documents": staticmethod(embed_documents), | |
| "embed_query": staticmethod(embed_query), | |
| }, | |
| )() | |
| def run_llm(prompt): | |
| client = InferenceClient(api_key=HF_TOKEN) | |
| completion = client.chat.completions.create( | |
| model=CHAT_MODEL, | |
| messages=[ | |
| { | |
| "role": "system", | |
| "content": "Answer only from the given context. Be concise and accurate." | |
| }, | |
| { | |
| "role": "user", | |
| "content": prompt | |
| } | |
| ], | |
| ) | |
| return completion.choices[0].message.content | |
| def login(): | |
| st.title("🔐 Login") | |
| user = st.text_input("Enter username") | |
| if st.button("Login"): | |
| if user: | |
| st.session_state["user_id"] = user.strip().lower() | |
| logging.info(f"Logged in as {st.session_state['user_id']}") | |
| st.success(f"Logged in as {user}") | |
| st.rerun() | |
| else: | |
| st.error("Enter username") | |
| def build_chunks(texts, metadatas, chunk_size): | |
| if not texts: | |
| return [], [] | |
| documents = CharacterTextSplitter( | |
| separator="\n", | |
| chunk_size=chunk_size, | |
| chunk_overlap=CHUNK_OVERLAP | |
| ).create_documents(texts, metadatas) | |
| return [doc.page_content for doc in documents], [doc.metadata for doc in documents] | |
| def save_source_texts(user_id, source_type, source_name, texts, locators, chunk_size): | |
| metadatas = [ | |
| { | |
| "source_type": source_type, | |
| "source_name": source_name, | |
| "locator": locator | |
| } | |
| for locator in locators | |
| ] | |
| chunks, metadatas = build_chunks(texts, metadatas, chunk_size) | |
| if not chunks: | |
| st.warning("No readable content was extracted from this source.") | |
| return | |
| process.success("Chunking done") | |
| logging.info( | |
| f"Chunking complete for {source_type} source '{source_name}' with {len(chunks)} chunks" | |
| ) | |
| collection_name = f"multigpt_{user_id}" | |
| logging.info(f"Storing {len(chunks)} chunks in collection '{collection_name}'") | |
| Milvus.from_texts( | |
| chunks, | |
| metadatas=metadatas, | |
| embedding=get_embeddings(), | |
| collection_name=collection_name, | |
| connection_args=MILVUS_CONFIG | |
| ) | |
| logging.info("Upload completed successfully") | |
| process.success("Uploaded") | |
| def ingest_text_document(file): | |
| user_id = st.session_state["user_id"] | |
| logging.info(f"Reading text file '{file.name}'") | |
| text = file.read().decode("utf-8", errors="ignore") | |
| save_source_texts(user_id, "text", file.name, [text], [""], DOCUMENT_CHUNK_SIZE) | |
| def ingest_pdf_document(file): | |
| user_id = st.session_state["user_id"] | |
| logging.info(f"Reading PDF '{file.name}'") | |
| reader = PdfReader(file) | |
| texts = [] | |
| locators = [] | |
| for index, page in enumerate(reader.pages, start=1): | |
| page_text = page.extract_text() or "" | |
| if page_text.strip(): | |
| texts.append(page_text) | |
| locators.append(f"page={index}") | |
| save_source_texts(user_id, "pdf", file.name, texts, locators, PDF_CHUNK_SIZE) | |
| def ingest_pptx_document(file): | |
| user_id = st.session_state["user_id"] | |
| logging.info(f"Reading PPTX '{file.name}'") | |
| presentation = Presentation(file) | |
| texts = [] | |
| locators = [] | |
| for index, slide in enumerate(presentation.slides, start=1): | |
| slide_parts = [] | |
| for shape in slide.shapes: | |
| if hasattr(shape, "text") and shape.text: | |
| slide_parts.append(shape.text) | |
| slide_text = "\n".join(part.strip() for part in slide_parts if part.strip()) | |
| if slide_text: | |
| texts.append(slide_text) | |
| locators.append(f"slide={index}") | |
| save_source_texts(user_id, "pptx", file.name, texts, locators, PPTX_CHUNK_SIZE) | |
| def ingest_code_files(files): | |
| user_id = st.session_state["user_id"] | |
| for file in files: | |
| logging.info(f"Reading code file '{file.name}'") | |
| text = file.read().decode("utf-8", errors="ignore") | |
| save_source_texts(user_id, "code", file.name, [text], [file.name], CODE_CHUNK_SIZE) | |
| def ingest_url(url): | |
| user_id = st.session_state["user_id"] | |
| logging.info(f"Fetching URL '{url}'") | |
| g = Goose() | |
| text = g.extract(url=url).cleaned_text | |
| save_source_texts(user_id, "url", url, [text], [url], URL_CHUNK_SIZE) | |
| def ingest_youtube_video(link): | |
| user_id = st.session_state["user_id"] | |
| logging.info(f"Starting video ingestion for '{link}'") | |
| yt = YouTube(link).streams.get_highest_resolution() | |
| yt.download(filename="video.mp4") | |
| process.success("Downloading video") | |
| logging.info("Video download completed") | |
| while not os.path.exists("video.mp4"): | |
| time.sleep(5) | |
| video = VideoFileClip("video.mp4") | |
| process.warning("Extracting audio") | |
| logging.info("Extracting audio from video") | |
| audio = video.audio | |
| audio.write_audiofile("audio.mp3") | |
| process.warning("Transcribing") | |
| logging.info("Running Whisper transcription") | |
| model = whisper.load_model("base") | |
| result = model.transcribe("audio.mp3") | |
| save_source_texts(user_id, "video", link, [result["text"]], [link], VIDEO_CHUNK_SIZE) | |
| def get_vector_store(collection_name): | |
| return Milvus( | |
| embedding_function=get_embeddings(), | |
| collection_name=collection_name, | |
| connection_args=MILVUS_CONFIG | |
| ) | |
| def collection_has_data(collection_name): | |
| if not utility.has_collection(collection_name): | |
| return False | |
| return get_vector_store(collection_name).col.num_entities > 0 | |
| def get_source_inventory(collection_name): | |
| if not utility.has_collection(collection_name): | |
| return [] | |
| collection = Collection(collection_name) | |
| collection.load() | |
| rows = collection.query( | |
| expr="pk >= 0", | |
| output_fields=["source_type", "source_name", "locator"] | |
| ) | |
| summary = {} | |
| for row in rows: | |
| key = (row.get("source_type", "unknown"), row.get("source_name", "unknown")) | |
| if key not in summary: | |
| summary[key] = { | |
| "source_type": key[0], | |
| "source_name": key[1], | |
| "chunks": 0, | |
| "locators": set() | |
| } | |
| summary[key]["chunks"] += 1 | |
| if row.get("locator"): | |
| summary[key]["locators"].add(row["locator"]) | |
| inventory = [] | |
| for item in summary.values(): | |
| inventory.append( | |
| { | |
| "source_type": item["source_type"], | |
| "source_name": item["source_name"], | |
| "chunks": item["chunks"], | |
| "locators": sorted(item["locators"]) if item["locators"] else [] | |
| } | |
| ) | |
| return sorted(inventory, key=lambda item: (item["source_type"], item["source_name"])) | |
| def render_evidence_inventory(): | |
| user_id = st.session_state["user_id"] | |
| collection_name = f"multigpt_{user_id}" | |
| st.subheader("Evidence Inventory") | |
| if not utility.has_collection(collection_name): | |
| logging.info(f"No collection found yet for '{collection_name}'") | |
| st.info("No project data has been uploaded for this user yet.") | |
| return | |
| inventory = get_source_inventory(collection_name) | |
| total_chunks = sum(item["chunks"] for item in inventory) | |
| logging.info( | |
| f"Loaded inventory for '{collection_name}' with {len(inventory)} sources and {total_chunks} chunks" | |
| ) | |
| st.caption(f"{len(inventory)} sources indexed across {total_chunks} chunks") | |
| if not inventory: | |
| st.info("The collection exists, but no source records were found.") | |
| return | |
| table_rows = [] | |
| for item in inventory: | |
| table_rows.append( | |
| { | |
| "Type": item["source_type"].upper(), | |
| "Source": item["source_name"], | |
| "Chunks": item["chunks"], | |
| "Locators": len(item["locators"]) | |
| } | |
| ) | |
| st.table(table_rows) | |
| def format_context(documents): | |
| entries = [] | |
| for index, doc in enumerate(documents, start=1): | |
| metadata = doc.metadata or {} | |
| source_type = metadata.get("source_type", "unknown") | |
| source_name = metadata.get("source_name", "unknown") | |
| locator_text = metadata.get("locator", "locator=unknown") | |
| entries.append( | |
| f"[Evidence {index}] source_type={source_type}; " | |
| f"source_name={source_name}; locator={locator_text}\n" | |
| f"{doc.page_content}" | |
| ) | |
| return "\n\n".join(entries) | |
| def get_rubric_criteria(): | |
| return [ | |
| "Problem Understanding", | |
| "Technical Approach", | |
| "Implementation Quality", | |
| "Innovation / Originality", | |
| "Communication & Demo Clarity", | |
| "Claim vs Reality Alignment", | |
| "Prototype Functionality" | |
| ] | |
| def parse_json_response(raw_response): | |
| try: | |
| return json.loads(raw_response) | |
| except json.JSONDecodeError: | |
| start = raw_response.find("{") | |
| end = raw_response.rfind("}") | |
| if start != -1 and end != -1 and end > start: | |
| return json.loads(raw_response[start:end + 1]) | |
| raise | |
| def normalize_evaluation_response(data): | |
| defaults = { | |
| "project_summary": { | |
| "purpose": "", | |
| "high_level_description": "" | |
| }, | |
| "sources_used": [], | |
| "claims_detected": [], | |
| "capabilities_detected": [], | |
| "evidence": [], | |
| "gaps_or_risks": [], | |
| "scores": [], | |
| "overall_assessment": { | |
| "verdict": "", | |
| "confidence": "low", | |
| "reason": "" | |
| } | |
| } | |
| if not isinstance(data, dict): | |
| return defaults | |
| normalized = defaults.copy() | |
| normalized.update({key: value for key, value in data.items() if key in normalized}) | |
| if not isinstance(normalized["project_summary"], dict): | |
| normalized["project_summary"] = defaults["project_summary"] | |
| else: | |
| normalized["project_summary"] = { | |
| "purpose": normalized["project_summary"].get("purpose", ""), | |
| "high_level_description": normalized["project_summary"].get("high_level_description", "") | |
| } | |
| if not isinstance(normalized["overall_assessment"], dict): | |
| normalized["overall_assessment"] = defaults["overall_assessment"] | |
| else: | |
| normalized["overall_assessment"] = { | |
| "verdict": normalized["overall_assessment"].get("verdict", ""), | |
| "confidence": normalized["overall_assessment"].get("confidence", "low"), | |
| "reason": normalized["overall_assessment"].get("reason", "") | |
| } | |
| for key in ["sources_used", "claims_detected", "capabilities_detected", "evidence", "gaps_or_risks", "scores"]: | |
| if not isinstance(normalized[key], list): | |
| normalized[key] = [] | |
| score_lookup = {} | |
| for item in normalized["scores"]: | |
| if not isinstance(item, dict): | |
| continue | |
| criterion = item.get("criterion") | |
| if criterion: | |
| score_lookup[criterion] = { | |
| "criterion": criterion, | |
| "score": max(1, min(5, int(item.get("score", 1)))) if str(item.get("score", "")).isdigit() else 1, | |
| "reasoning": item.get("reasoning", ""), | |
| "citations": item.get("citations", []) if isinstance(item.get("citations", []), list) else [], | |
| "confidence": max(0.0, min(1.0, float(item.get("confidence", 0.0)))) if isinstance(item.get("confidence", 0.0), (int, float)) else 0.0 | |
| } | |
| normalized["scores"] = [] | |
| for criterion in get_rubric_criteria(): | |
| normalized["scores"].append( | |
| score_lookup.get( | |
| criterion, | |
| { | |
| "criterion": criterion, | |
| "score": 1, | |
| "reasoning": "", | |
| "citations": [], | |
| "confidence": 0.0 | |
| } | |
| ) | |
| ) | |
| return normalized | |
| def run_evaluation(): | |
| user_id = st.session_state["user_id"] | |
| collection_name = f"multigpt_{user_id}" | |
| logging.info(f"Starting evaluation for collection '{collection_name}'") | |
| if not collection_has_data(collection_name): | |
| logging.info("Evaluation skipped because no uploaded project data was found") | |
| st.warning("No uploaded project data found for this user yet.") | |
| return | |
| process.warning("Retrieving project evidence") | |
| logging.info("Retrieving project evidence from Milvus") | |
| db = get_vector_store(collection_name) | |
| documents = db.similarity_search( | |
| "Evaluate this software project using all available uploaded evidence. " | |
| "Summarize capabilities, evidence, gaps, and overall assessment.", | |
| k=16 | |
| ) | |
| if not documents: | |
| logging.info("Evaluation stopped because no retrievable evidence was found") | |
| st.warning("No retrievable evidence was found for evaluation.") | |
| return | |
| prompt = build_evaluation_prompt(format_context(documents), get_rubric_criteria()) | |
| process.warning("Running evaluation") | |
| logging.info(f"Running evaluator on {len(documents)} retrieved evidence chunks") | |
| raw_response = run_llm(prompt) | |
| try: | |
| parsed_response = normalize_evaluation_response(parse_json_response(raw_response)) | |
| except json.JSONDecodeError: | |
| logging.info("Model response was not valid JSON") | |
| st.error("The model response was not valid JSON.") | |
| st.code(raw_response, language="json") | |
| return | |
| logging.info("Evaluation completed successfully") | |
| process.success("Evaluation ready") | |
| st.json(parsed_response) | |
| def add_evidence_page(): | |
| placeholder.title("Add Evidence") | |
| choice = st.sidebar.radio("Evidence Type", ['', 'DOCUMENT', 'CODE', 'URL', 'VIDEO']) | |
| if choice == 'DOCUMENT': | |
| st.caption("Upload decks, notes, specs, or README-style documents.") | |
| file = st.file_uploader("Upload document", type=["txt", "md", "pdf", "pptx"]) | |
| if file: | |
| extension = os.path.splitext(file.name)[1].lower() | |
| if extension in [".txt", ".md"]: | |
| ingest_text_document(file) | |
| elif extension == ".pdf": | |
| ingest_pdf_document(file) | |
| elif extension == ".pptx": | |
| ingest_pptx_document(file) | |
| else: | |
| st.error("Unsupported document type.") | |
| elif choice == 'CODE': | |
| st.caption("Upload source or configuration files that represent the implementation.") | |
| files = st.file_uploader( | |
| "Upload code files", | |
| type=CODE_FILE_TYPES, | |
| accept_multiple_files=True | |
| ) | |
| if files: | |
| ingest_code_files(files) | |
| elif choice == 'URL': | |
| st.caption("Add a product page, documentation page, or prototype URL.") | |
| url = st.text_input("Enter URL") | |
| if url: | |
| ingest_url(url) | |
| elif choice == 'VIDEO': | |
| st.caption("Add a YouTube demo or walkthrough link.") | |
| link = st.text_input("YouTube link") | |
| if link: | |
| ingest_youtube_video(link) | |
| def evaluate_page(): | |
| placeholder.title("Run Evaluation") | |
| st.write("Generate a structured evaluation using all uploaded evidence for this submission.") | |
| render_evidence_inventory() | |
| if st.button("Run Evaluation"): | |
| run_evaluation() | |
| def main(): | |
| global placeholder, process | |
| placeholder = st.empty() | |
| process = st.empty() | |
| if "user_id" not in st.session_state: | |
| login() | |
| return | |
| st.sidebar.write(f"👤 {st.session_state['user_id']}") | |
| page = st.sidebar.radio("Navigate", ['Add Evidence', 'Evaluate', 'Logout']) | |
| if page == "Add Evidence": | |
| add_evidence_page() | |
| elif page == "Evaluate": | |
| evaluate_page() | |
| elif page == "Logout": | |
| logging.info("Logging out and clearing session") | |
| st.session_state.clear() | |
| st.rerun() | |
| if __name__ == "__main__": | |
| main() | |