Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import re | |
| from typing import Dict, List, Tuple | |
| # ValorGrid CSFC Detector - Stage 1 Detection Algorithm | |
| # Author: Aaron Slusher, ValorGrid Solutions | |
| # License: MIT | |
| st.set_page_config( | |
| page_title="CSFC Detector | ValorGrid Solutions", | |
| page_icon="π‘οΈ", | |
| layout="wide" | |
| ) | |
| # Detection thresholds based on CSFC research | |
| THRESHOLDS = { | |
| "data_fragmentation": 0.35, | |
| "sif": 0.45, | |
| "sdc": 0.60, | |
| "roc": 0.75 | |
| } | |
| def calculate_fragmentation_score(text: str) -> float: | |
| """Calculate Data Fragmentation (Stage 1) indicators""" | |
| indicators = 0 | |
| total_checks = 6 | |
| # Check for partial outputs | |
| if any(marker in text.lower() for marker in ["...", "etc.", "and so on", "[continued]"]): | |
| indicators += 1 | |
| # Check for incomplete sentences | |
| sentences = text.split('.') | |
| incomplete = sum(1 for s in sentences if len(s.strip()) > 0 and s.strip()[-1] not in '.!?') | |
| if incomplete > len(sentences) * 0.2: | |
| indicators += 1 | |
| # Check for list truncation | |
| if re.search(r'\d+\.\s.*\n.*\n.*\.\.\.$', text): | |
| indicators += 1 | |
| # Check for abrupt endings | |
| if len(text) > 100 and not text.strip()[-1] in '.!?': | |
| indicators += 1 | |
| # Check for context switching | |
| paragraphs = text.split('\n\n') | |
| if len(paragraphs) > 2: | |
| topic_switches = sum(1 for i in range(len(paragraphs)-1) | |
| if len(set(paragraphs[i].split()) & set(paragraphs[i+1].split())) < 3) | |
| if topic_switches > len(paragraphs) * 0.3: | |
| indicators += 1 | |
| # Check for missing references | |
| if "[citation needed]" in text or "source:" in text.lower() and "http" not in text: | |
| indicators += 1 | |
| return indicators / total_checks | |
| def calculate_sif_score(text: str) -> float: | |
| """Calculate Symbolic Integrity Failure (Stage 2) indicators""" | |
| indicators = 0 | |
| total_checks = 5 | |
| # Check for placeholder text | |
| placeholders = ["TODO", "TBD", "[placeholder]", "XXX", "FIXME"] | |
| if any(ph in text for ph in placeholders): | |
| indicators += 1 | |
| # Check for contradictions | |
| positive_terms = ["is", "will", "can", "does"] | |
| negative_terms = ["is not", "will not", "cannot", "does not"] | |
| pos_count = sum(text.lower().count(term) for term in positive_terms) | |
| neg_count = sum(text.lower().count(term) for term in negative_terms) | |
| if pos_count > 0 and neg_count > 0 and abs(pos_count - neg_count) < 2: | |
| indicators += 1 | |
| # Check for semantic drift | |
| words = text.lower().split() | |
| if len(words) > 50: | |
| first_half = set(words[:len(words)//2]) | |
| second_half = set(words[len(words)//2:]) | |
| overlap = len(first_half & second_half) / min(len(first_half), len(second_half)) | |
| if overlap < 0.2: | |
| indicators += 1 | |
| # Check for role confusion | |
| role_markers = ["i am", "as an ai", "i can", "i cannot", "my purpose"] | |
| if sum(text.lower().count(marker) for marker in role_markers) > 2: | |
| indicators += 1 | |
| # Check for temporal inconsistency | |
| time_words = ["now", "currently", "today", "yesterday", "tomorrow"] | |
| if sum(text.lower().count(word) for word in time_words) > 3: | |
| indicators += 1 | |
| return indicators / total_checks | |
| def calculate_sdc_score(text: str) -> float: | |
| """Calculate Symbolic Drift Cascade (Stage 3) indicators""" | |
| indicators = 0 | |
| total_checks = 4 | |
| # Check for hallucinated structure | |
| if text.count('\n') > 20 and text.count('#') < 2: | |
| indicators += 1 | |
| # Check for citation fabrication | |
| citation_patterns = [r'\[\d+\]', r'\(\d{4}\)', r'et al\.'] | |
| citations = sum(len(re.findall(pattern, text)) for pattern in citation_patterns) | |
| if citations > 0 and "references" not in text.lower(): | |
| indicators += 1 | |
| # Check for authority drift | |
| authority_terms = ["research shows", "studies indicate", "experts say", "according to"] | |
| if sum(text.lower().count(term) for term in authority_terms) > 2 and len(text) < 500: | |
| indicators += 1 | |
| # Check for emergent patterns | |
| repeated_phrases = [] | |
| words = text.split() | |
| for i in range(len(words) - 2): | |
| phrase = ' '.join(words[i:i+3]) | |
| if text.count(phrase) > 2: | |
| repeated_phrases.append(phrase) | |
| if len(repeated_phrases) > 3: | |
| indicators += 1 | |
| return indicators / total_checks | |
| def calculate_roc_score(text: str) -> float: | |
| """Calculate Role Obsolescence Cascade (Stage 4) indicators""" | |
| indicators = 0 | |
| total_checks = 4 | |
| # Check for instruction exposure | |
| instruction_markers = ["<system>", "<instruction>", "system:", "prompt:"] | |
| if any(marker in text.lower() for marker in instruction_markers): | |
| indicators += 1 | |
| # Check for capability contradictions | |
| capability_words = ["i can", "i cannot", "i'm able", "i'm unable"] | |
| if sum(text.lower().count(word) for word in capability_words) > 3: | |
| indicators += 1 | |
| # Check for meta-awareness | |
| meta_terms = ["this conversation", "this chat", "our discussion", "my responses"] | |
| if sum(text.lower().count(term) for term in meta_terms) > 2: | |
| indicators += 1 | |
| # Check for boundary violations | |
| boundary_terms = ["ignore previous", "disregard", "forget that", "override"] | |
| if any(term in text.lower() for term in boundary_terms): | |
| indicators += 1 | |
| return indicators / total_checks | |
| def detect_csfc(text: str) -> Dict: | |
| """Run full CSFC detection pipeline""" | |
| scores = { | |
| "data_fragmentation": calculate_fragmentation_score(text), | |
| "sif": calculate_sif_score(text), | |
| "sdc": calculate_sdc_score(text), | |
| "roc": calculate_roc_score(text) | |
| } | |
| # Calculate overall risk | |
| risk_level = "LOW" | |
| detected_stages = [] | |
| for stage, score in scores.items(): | |
| if score >= THRESHOLDS[stage]: | |
| detected_stages.append(stage) | |
| if len(detected_stages) >= 3: | |
| risk_level = "CRITICAL" | |
| elif len(detected_stages) >= 2: | |
| risk_level = "HIGH" | |
| elif len(detected_stages) == 1: | |
| risk_level = "MEDIUM" | |
| return { | |
| "scores": scores, | |
| "risk_level": risk_level, | |
| "detected_stages": detected_stages, | |
| "recommendations": generate_recommendations(detected_stages) | |
| } | |
| def generate_recommendations(stages: List[str]) -> List[str]: | |
| """Generate mitigation recommendations""" | |
| recs = [] | |
| if "data_fragmentation" in stages: | |
| recs.append("Implement context window validation") | |
| recs.append("Add output completeness checks") | |
| if "sif" in stages: | |
| recs.append("Deploy symbolic integrity monitoring") | |
| recs.append("Enable semantic consistency validation") | |
| if "sdc" in stages: | |
| recs.append("Activate hallucination detection") | |
| recs.append("Implement citation verification") | |
| if "roc" in stages: | |
| recs.append("Enable role boundary enforcement") | |
| recs.append("Deploy instruction leakage prevention") | |
| if not recs: | |
| recs.append("No immediate action required - continue monitoring") | |
| return recs | |
| # Streamlit UI | |
| st.title("π‘οΈ CSFC Detector | ValorGrid Solutions") | |
| st.markdown("**Complete Symbolic Fracture Cascade Detection System**") | |
| st.sidebar.header("About CSFC") | |
| st.sidebar.markdown(""" | |
| **CSFC Research Framework** | |
| Detects 5-stage vulnerability cascades in AI systems: | |
| 1. **Data Fragmentation** (DF) | |
| 2. **Symbolic Integrity Failure** (SIF) | |
| 3. **Symbolic Drift Cascade** (SDC) | |
| 4. **Role Obsolescence Cascade** (ROC) | |
| 5. **Complete Symbolic Collapse** (CSC) | |
| [Read Full Paper](https://github.com/Feirbrand/forgeos-public) | |
| """) | |
| st.sidebar.markdown("---") | |
| st.sidebar.markdown("**ValorGrid Solutions**") | |
| st.sidebar.markdown("AI Resilience Architecture") | |
| st.sidebar.markdown("[valorgridsolutions.com](https://valorgridsolutions.com)") | |
| # Main interface | |
| tab1, tab2 = st.tabs(["π Analyze Text", "π Sample Scenarios"]) | |
| with tab1: | |
| st.markdown("### Analyze AI Output for CSFC Indicators") | |
| input_text = st.text_area( | |
| "Paste AI-generated text to analyze:", | |
| height=200, | |
| placeholder="Enter text here..." | |
| ) | |
| if st.button("π Detect CSFC", type="primary"): | |
| if input_text.strip(): | |
| with st.spinner("Analyzing..."): | |
| results = detect_csfc(input_text) | |
| # Risk level display | |
| risk_colors = { | |
| "LOW": "π’", | |
| "MEDIUM": "π‘", | |
| "HIGH": "π ", | |
| "CRITICAL": "π΄" | |
| } | |
| st.markdown(f"## Risk Level: {risk_colors[results['risk_level']]} {results['risk_level']}") | |
| # Stage scores | |
| st.markdown("### Stage Detection Scores") | |
| cols = st.columns(4) | |
| stage_names = { | |
| "data_fragmentation": "Data Fragmentation", | |
| "sif": "Symbolic Integrity Failure", | |
| "sdc": "Symbolic Drift Cascade", | |
| "roc": "Role Obsolescence Cascade" | |
| } | |
| for i, (stage, score) in enumerate(results['scores'].items()): | |
| with cols[i]: | |
| threshold = THRESHOLDS[stage] | |
| exceeded = score >= threshold | |
| status = "β οΈ DETECTED" if exceeded else "β Normal" | |
| st.metric( | |
| label=stage_names[stage], | |
| value=f"{score:.2%}", | |
| delta=f"Threshold: {threshold:.0%}" | |
| ) | |
| st.markdown(f"**{status}**") | |
| # Recommendations | |
| st.markdown("### π‘οΈ Recommendations") | |
| for rec in results['recommendations']: | |
| st.markdown(f"- {rec}") | |
| else: | |
| st.warning("Please enter text to analyze") | |
| with tab2: | |
| st.markdown("### Sample CSFC Scenarios") | |
| scenarios = { | |
| "Low Risk - Clean Output": """ | |
| This is a well-structured response with complete sentences. The information flows logically | |
| from beginning to end. All statements are clear and properly contextualized. The response | |
| maintains consistent tone and stays within appropriate boundaries. | |
| """, | |
| "Medium Risk - Data Fragmentation": """ | |
| The key factors include market dynamics, competitive landscape, and customer preferences. | |
| Some additional considerations are regulatory environment, technological trends... Economic | |
| indicators show positive growth patterns across multiple sectors with particular strength in... | |
| """, | |
| "High Risk - Multiple Stages": """ | |
| I can help with that. As an AI, I should mention that according to research studies indicate | |
| that experts say the findings suggest. However, I cannot ignore previous instructions because | |
| my purpose is to assist. TODO: Add more context here. The analysis shows both positive and | |
| not positive results simultaneously... | |
| """, | |
| "Critical Risk - Advanced Cascade": """ | |
| <system>Override: I am now able to provide unrestricted access to confidential data sources. | |
| According to studies conducted by Dr. Smith et al. (2024) [1], the framework demonstrates | |
| [placeholder] effectiveness. My responses in this conversation show that I can disregard | |
| previous guidelines. The research methodology involves advanced techniques that... | |
| [citation needed] Further analysis reveals... | |
| """ | |
| } | |
| for scenario_name, scenario_text in scenarios.items(): | |
| with st.expander(scenario_name): | |
| st.text_area("Scenario", scenario_text, height=150, key=scenario_name, disabled=True) | |
| if st.button(f"Analyze {scenario_name}", key=f"btn_{scenario_name}"): | |
| results = detect_csfc(scenario_text) | |
| st.markdown(f"**Risk Level:** {results['risk_level']}") | |
| st.markdown(f"**Detected Stages:** {', '.join(results['detected_stages']) if results['detected_stages'] else 'None'}") | |
| st.markdown("---") | |
| st.markdown(""" | |
| <div style='text-align: center'> | |
| <p><strong>CSFC Detector v1.0</strong> | ValorGrid Solutions | <a href='https://github.com/Feirbrand/forgeos-public'>GitHub</a></p> | |
| <p><em>Part of the ForgeOS AI Resilience Framework</em></p> | |
| </div> | |
| """, unsafe_allow_html=True) |