Spaces:
Sleeping
Sleeping
| import streamlit as st | |
| import base64 | |
| import requests | |
| from PIL import Image, ImageDraw | |
| from io import BytesIO | |
| import fitz # PyMuPDF | |
| import time | |
| # Configuration - Get API key from Streamlit secrets | |
| GEMINI_API_KEY = st.secrets["GEMINI_API_KEY"] | |
| GEMINI_MODEL = "gemini-2-flash" | |
| DOCUMENT_TYPES = ["Land Records", "Caste Certificates", "Property Registrations"] | |
| # Initialize session state | |
| def initialize_session_state(): | |
| if "chat_history" not in st.session_state: | |
| st.session_state["chat_history"] = [] | |
| if "processed_doc" not in st.session_state: | |
| st.session_state["processed_doc"] = None | |
| if "doc_preview" not in st.session_state: | |
| st.session_state["doc_preview"] = None | |
| if "uploaded_file" not in st.session_state: | |
| st.session_state["uploaded_file"] = None | |
| # Reset session state | |
| def reset_session_state(): | |
| for key in ["chat_history", "processed_doc", "doc_preview", "uploaded_file"]: | |
| st.session_state.pop(key, None) | |
| # Encode uploaded file to base64 | |
| def encode_file(uploaded_file): | |
| try: | |
| file_bytes = uploaded_file.getvalue() | |
| if uploaded_file.type == "application/pdf": | |
| pdf = fitz.open(stream=BytesIO(file_bytes)) | |
| page = pdf[0] | |
| pix = page.get_pixmap() | |
| img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples) | |
| elif uploaded_file.type.startswith('image/'): | |
| img = Image.open(BytesIO(file_bytes)) | |
| elif uploaded_file.type == "text/plain": | |
| text = file_bytes.decode('utf-8') | |
| img = Image.new('RGB', (800, 600), color=(73, 109, 137)) | |
| d = ImageDraw.Draw(img) | |
| d.text((10, 10), text, fill=(255, 255, 0)) | |
| else: | |
| st.error("Unsupported file format") | |
| return None | |
| img_byte_arr = BytesIO() | |
| img.save(img_byte_arr, format='JPEG') | |
| return base64.b64encode(img_byte_arr.getvalue()).decode('utf-8') | |
| except Exception as e: | |
| st.error(f"File processing error: {str(e)}") | |
| return None | |
| # Query Gemini API | |
| def query_gemini(prompt, image_b64=None): | |
| url = f"https://generativelanguage.googleapis.com/v1/models/{GEMINI_MODEL}:generateContent?key={GEMINI_API_KEY}" | |
| parts = [{"text": prompt}] | |
| if image_b64: | |
| parts.append({"inline_data": {"mime_type": "image/jpeg", "data": image_b64}}) | |
| try: | |
| response = requests.post( | |
| url, | |
| json={"contents": [{"parts": parts}]}, | |
| headers={"Content-Type": "application/json"}, | |
| timeout=30 | |
| ) | |
| if response.status_code != 200: | |
| st.error(f"API Request failed with status code: {response.status_code}") | |
| return None | |
| data = response.json() | |
| if 'error' in data: | |
| st.error(f"API Error: {data['error'].get('message', 'Unknown error')}") | |
| return None | |
| if not data.get('candidates'): | |
| st.error("No response candidates found in API response") | |
| return None | |
| candidate = data['candidates'][0] | |
| return candidate.get('content', {}).get('parts', [{}])[0].get('text', 'No response text found') | |
| except requests.exceptions.RequestException as e: | |
| st.error(f"API Request failed: {str(e)}") | |
| return None | |
| except Exception as e: | |
| st.error(f"Unexpected error: {str(e)}") | |
| return None | |
| # Process the uploaded document | |
| def process_document(): | |
| if not st.session_state.uploaded_file: | |
| st.error("Please upload a document first.") | |
| return | |
| try: | |
| with st.spinner("Analyzing document..."): | |
| image_b64 = encode_file(st.session_state.uploaded_file) | |
| if not image_b64: | |
| return | |
| classify_prompt = f"Classify this document into one of these categories: {', '.join(DOCUMENT_TYPES)}. Respond only with the category name." | |
| doc_type = query_gemini(classify_prompt, image_b64) | |
| extract_prompt = """Extract key details including: | |
| - Names | |
| - Dates | |
| - Identification numbers | |
| - Locations | |
| Format as a bullet-point list.""" | |
| details = query_gemini(extract_prompt, image_b64) | |
| verify_prompt = "Analyze this document for signs of tampering. Provide verification status." | |
| verification = query_gemini(verify_prompt, image_b64) | |
| st.session_state.processed_doc = { | |
| "type": doc_type or "Unclassified", | |
| "details": details or "No details extracted", | |
| "verification": verification or "Verification failed", | |
| } | |
| st.success("Document processing complete!") | |
| time.sleep(1) | |
| except Exception as e: | |
| st.error(f"Document processing failed: {str(e)}") | |
| st.session_state.processed_doc = None | |
| # Main app function | |
| def main(): | |
| st.set_page_config(page_title="DocVerify AI", layout="wide") | |
| initialize_session_state() | |
| st.sidebar.header("Document Controls") | |
| st.sidebar.file_uploader("Upload Document", type=["pdf", "jpg", "jpeg", "png", "txt"], key="uploaded_file", on_change=process_document) | |
| if st.sidebar.button("New Document"): | |
| reset_session_state() | |
| st.rerun() | |
| st.title("DocVerify AI - Document Analysis") | |
| if st.session_state.processed_doc: | |
| st.subheader("Document Summary") | |
| st.markdown(f"**Type:** {st.session_state.processed_doc['type']}") | |
| st.markdown(f"**Verification Status:** {st.session_state.processed_doc['verification']}") | |
| st.text_area("Extracted Details", st.session_state.processed_doc['details'], height=200) | |
| if __name__ == "__main__": | |
| main() | |