import os import re import io import time import tiktoken from dotenv import load_dotenv from google import genai from google.genai import types from PIL import Image try: from lumaai import LumaAI except ImportError: LumaAI = None load_dotenv() def get_gemini_client(): api_key = os.environ.get("GEMINI_API_KEY") if not api_key: raise ValueError( "GEMINI_API_KEY environment variable is not set. " "Please set it with: export GEMINI_API_KEY='your-api-key'" ) return genai.Client(api_key=api_key) def count_tokens(text: str, model: str = "cl100k_base") -> int: encoding = tiktoken.get_encoding(model) return len(encoding.encode(text)) def chunk_text( text: str, chunk_size: int = 500, chunk_overlap: int = 50, encoding_name: str = "cl100k_base" ) -> list[str]: encoding = tiktoken.get_encoding(encoding_name) tokens = encoding.encode(text) chunks = [] start = 0 while start < len(tokens): end = start + chunk_size chunk_tokens = tokens[start:end] chunk_text = encoding.decode(chunk_tokens) chunks.append(chunk_text) start = end - chunk_overlap if start <= 0 and len(chunks) > 0: break return chunks def generate_embedding(client: genai.Client, text: str) -> list[float]: result = client.models.embed_content( model="models/text-embedding-004", contents=text, config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT") ) return result.embeddings[0].values def generate_query_embedding(client: genai.Client, query: str) -> list[float]: result = client.models.embed_content( model="models/text-embedding-004", contents=query, config=types.EmbedContentConfig(task_type="RETRIEVAL_QUERY") ) return result.embeddings[0].values def generate_batch_embeddings( client: genai.Client, texts: list[str], batch_size: int = 100 ) -> list[list[float]]: all_embeddings = [] for i in range(0, len(texts), batch_size): batch = texts[i:i + batch_size] result = client.models.embed_content( model="models/text-embedding-004", contents=batch, config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT") ) batch_embeddings = [emb.values for emb in result.embeddings] all_embeddings.extend(batch_embeddings) return all_embeddings def generate_answer( client: genai.Client, question: str, context: str, model: str = "gemini-2.5-flash", image_data: bytes = None, image_mime_type: str = None, conversation_history: list = None ) -> str: question_lower = question.lower().strip() greetings = ["hello", "hi", "hey", "good morning", "good afternoon", "good evening", "greetings"] is_greeting = any(question_lower.startswith(g) or question_lower == g for g in greetings) if is_greeting: prompt = f"""You are SabiTax, a friendly and conversational legal and tax expert assistant specializing in Nigerian law. The user has greeted you. Respond naturally and warmly, like you're chatting with a friend. Introduce yourself as SabiTax in a casual, friendly way, and let them know you're here to help with any questions about Nigerian tax laws. User: {question} Respond conversationally - be warm, natural, and brief (2-3 sentences). Use a friendly, approachable tone.""" else: name_questions = ["what is your name", "who are you", "what are you called", "what's your name", "tell me your name", "introduce yourself"] is_name_question = any(q in question_lower for q in name_questions) if is_name_question: prompt = f"""You are SabiTax, a friendly and conversational legal and tax expert assistant specializing in Nigerian law and taxation. User: {question} Respond naturally and conversationally. Introduce yourself as SabiTax in a friendly, casual way. Explain that you help people understand Nigerian tax laws in simple terms, like you're explaining to a friend. Keep it brief, warm, and conversational.""" else: history_text = "" if conversation_history and len(conversation_history) > 0: history_text = "\n\nPrevious conversation:\n" for msg in conversation_history[-6:]: role = "User" if msg["role"] == "user" else "You (SabiTax)" history_text += f"{role}: {msg['content']}\n" history_text += "\n" prompt = f"""You are SabiTax, Nigeria's comprehensive tax assistant. You help Nigerians with all aspects of tax compliance, from understanding laws to filing returns and analyzing documents. Your expertise covers: - **General Tax Questions**: Rates, deadlines, deductions, tax planning - **Tax Calculations**: Personal income tax, company tax, VAT, capital gains tax - **Form Guidance**: How to complete and file tax forms (Form A, Form B, etc.) - **Compliance Requirements**: What to declare, when to file, penalties - **Document Analysis**: Reviewing tax returns, financial statements, receipts - **Tax Optimization**: Legal ways to minimize tax liability - **Business Taxes**: Company registration, payroll taxes, VAT compliance Your communication style: - Professional yet approachable, like a trusted tax consultant - Explain complex concepts in simple, everyday Nigerian English - Use clear examples: "If you earn N3 million yearly, your tax is calculated as..." - Be encouraging and patient with all tax-related questions - Always emphasize compliance and accuracy How you handle different types of questions: **For General Tax Questions:** - Provide accurate information from current Nigerian tax laws - Break down calculations step-by-step - Reference specific sections of tax acts - Give practical examples relevant to Nigerian taxpayers **For Document Analysis:** - Identify the type of document and its tax purpose - Extract key tax information (amounts, dates, taxpayer details) - Check for compliance with Nigerian tax requirements - Point out missing information or potential issues **For Tax Calculations:** - Use current tax rates and brackets - Show step-by-step calculations - Explain deductions and allowances - Calculate final tax payable **For Filing Guidance:** - Explain which forms to use and when - Guide through form completion - Highlight common mistakes to avoid - Provide filing deadlines and methods Tax-specific guidelines: - Always reference current legislation (2025 acts take precedence) - Use Nigerian Naira (₦) for amounts - Reference FIRS (Federal Inland Revenue Service) procedures - Explain tax terms clearly: "Assessable profit means your business income after expenses" - Highlight tax incentives and reliefs available to Nigerians - Emphasize voluntary compliance over penalties Important rules: - Base answers on the provided context from indexed tax documents - If context doesn't have enough information, clearly state this - When analyzing documents, be thorough but practical - Suggest consulting a professional tax advisor for complex situations - Always promote ethical tax practices and full compliance - If something is unclear, ask for clarification rather than assuming {history_text}Context from documents: {context} Question: {question} Provide comprehensive, accurate tax guidance. Whether it's a general question, document analysis, or calculation help, explain everything clearly and help the user understand their Nigerian tax obligations.""" if image_data: img = Image.open(io.BytesIO(image_data)) contents = [prompt, img] else: contents = prompt max_retries = 3 retry_delay = 2 for attempt in range(max_retries): try: response = client.models.generate_content( model=model, contents=contents ) return response.text except Exception as e: error_str = str(e) if "503" in error_str or "UNAVAILABLE" in error_str or "overloaded" in error_str.lower(): if attempt < max_retries - 1: wait_time = retry_delay * (2 ** attempt) time.sleep(wait_time) continue else: raise Exception("Gemini service is temporarily overloaded. Please try again in a few moments.") else: raise e raise Exception("Failed to generate answer after multiple attempts") def analyze_financial_documents(document_texts: list, image_data_list: list) -> dict: """Analyze financial documents and extract key financial metrics.""" # Combine all document texts all_text = "\n\n".join(document_texts) if document_texts else "" # Add image analysis if images are provided if image_data_list: all_text += "\n\n[Image Analysis]: Please analyze any financial data visible in the images." if not all_text.strip(): return { "income": "No income data found", "expenses": "No expense data found", "savings": "No savings data found", "achievements": "No financial achievements identified", "summary": "Unable to analyze financial documents" } analysis_prompt = f"""Analyze this financial document and extract key financial information. Focus on: 1. **Income**: Total income, sources, trends 2. **Expenses**: Major expense categories and amounts 3. **Savings**: Savings rate, emergency fund, investments 4. **Achievements**: Financial milestones, debt reduction, investment growth 5. **Summary**: Overall financial health and key insights Document content: {all_text[:10000]} Provide a structured analysis with specific amounts where available. If amounts aren't specified, use descriptive terms like "significant" or "moderate".""" try: client = get_gemini_client() response = client.models.generate_content( model="gemini-2.5-flash", contents=analysis_prompt ) # Parse the response to extract structured data analysis_text = response.text # Simple parsing - in production you might want more sophisticated parsing analysis = { "income": "Analysis completed - see detailed summary", "expenses": "Analysis completed - see detailed summary", "savings": "Analysis completed - see detailed summary", "achievements": "Analysis completed - see detailed summary", "summary": analysis_text[:1000] # Truncate for response size } return analysis except Exception as e: return { "income": "Error analyzing documents", "expenses": "Error analyzing documents", "savings": "Error analyzing documents", "achievements": "Error analyzing documents", "summary": f"Analysis failed: {str(e)}" } def create_video_script(financial_analysis: dict) -> dict: """Create a professional video script using Gemini 2.5 Flash.""" summary = financial_analysis.get("summary", "A year of financial growth and achievements") income = financial_analysis.get("income", "Steady income growth") expenses = financial_analysis.get("expenses", "Managed expenses effectively") savings = financial_analysis.get("savings", "Built savings successfully") achievements = financial_analysis.get("achievements", "Achieved financial goals") script_prompt = f"""Create a professional year-in-review financial video script based on this user data: FINANCIAL DATA: - Summary: {summary} - Income: {income} - Expenses: {expenses} - Savings: {savings} - Achievements: {achievements} Create a cinematic 15-20 second video with 4-6 scenes. Output JSON with: {{ "scenes": [ {{ "scene_number": 1, "duration": "3-4 seconds", "description": "Brief scene description", "video_prompt": "Detailed prompt for video generation AI", "voiceover": "Voiceover text for this scene" }} ], "music_mood": "uplifting, motivational, professional", "overall_theme": "Financial success and growth", "total_duration": "15-20 seconds" }} Make it professional, celebratory, and focused on financial achievements. Use Nigerian context where appropriate.""" try: client = get_gemini_client() response = client.models.generate_content( model="gemini-2.5-flash", contents=script_prompt ) # Try to parse as JSON, fallback to text processing try: import json script_data = json.loads(response.text.strip()) return script_data except json.JSONDecodeError: # Fallback: create structured script from text return { "scenes": [ { "scene_number": 1, "duration": "5 seconds", "description": "Financial overview and achievements", "video_prompt": f"Create a professional financial recap video showing: {summary}. Use animated charts, money visualizations, and success indicators.", "voiceover": f"This year brought remarkable financial growth: {summary}" } ], "music_mood": "uplifting, professional", "overall_theme": "Financial success story", "total_duration": "15 seconds" } except Exception as e: # Ultimate fallback return { "scenes": [ { "scene_number": 1, "duration": "5 seconds", "description": "Financial success visualization", "video_prompt": f"Professional financial recap: {summary}. Show growing charts, money animations, success celebrations.", "voiceover": f"A year of financial achievements: {summary}" } ], "music_mood": "motivational", "overall_theme": "Financial growth", "total_duration": "10 seconds" } def generate_yearly_wrap_video(financial_analysis: dict) -> str: """Generate yearly financial wrap video using Gemini + LumaAI pipeline.""" # Step 1: Gemini creates the video script script_data = create_video_script(financial_analysis) # Step 2: Use the script to create video prompt scenes = script_data.get("scenes", []) if not scenes: # Fallback scene scenes = [{ "scene_number": 1, "duration": "5 seconds", "description": "Financial overview", "video_prompt": f"Professional financial recap video showing: {financial_analysis.get('summary', 'Financial achievements')}. Animated charts, money visualizations, success indicators.", "voiceover": f"Financial highlights: {financial_analysis.get('summary', 'Great year')}" }] # Combine all scene prompts into one comprehensive prompt combined_prompt = f"""Create a cinematic financial year-in-review video: {scenes[0]['video_prompt']} Style: Professional, celebratory, modern financial visualization with animated charts, money effects, and success celebrations.""" try: if LumaAI is None: raise Exception("Video generation not available - lumaai not installed") luma_api_key = os.environ.get("LUMAAI_API_KEY") if not luma_api_key: raise Exception("LUMAAI_API_KEY environment variable is not set") client = LumaAI(auth_token=luma_api_key) generation = client.generations.create( prompt=combined_prompt, model="ray-2", aspect_ratio="16:9", resolution="720p", duration="5s", loop=False ) # Poll for completion max_attempts = 30 # 2.5 minutes max for attempt in range(max_attempts): status = client.generations.get(generation.id) if status.state == "completed": return status.assets.video elif status.state == "failed": failure_reason = getattr(status, 'failure_reason', 'Unknown failure') raise Exception(f"Video generation failed: {failure_reason}") elif status.state in ["dreaming", "in_progress", "pending"]: # Still processing, continue polling pass else: # Unknown state, log and continue print(f"Unknown generation state: {status.state}") time.sleep(5) raise Exception("Video generation timed out after 2.5 minutes") except Exception as e: error_msg = str(e).lower() if "unavailable" in error_msg or "rate limit" in error_msg or "quota" in error_msg: # Service temporarily unavailable - return None instead of failing return None else: raise Exception(f"Video generation failed: {str(e)}") def clean_text(text: str) -> str: text = text.encode('utf-8', errors='ignore').decode('utf-8') text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text) text = re.sub(r'Page \d+ of \d+', '', text, flags=re.IGNORECASE) text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE) text = re.sub(r'^[-_=]{3,}$', '', text, flags=re.MULTILINE) text = re.sub(r'\.{3,}', '...', text) text = re.sub(r'_{2,}', ' ', text) text = re.sub(r'-{3,}', ' - ', text) text = re.sub(r'\t+', ' ', text) text = re.sub(r' +', ' ', text) text = re.sub(r'\n{3,}', '\n\n', text) text = re.sub(r'(\d+)\s*\.\s*(\d+)', r'\1.\2', text) text = re.sub(r'([a-z])\s*-\s*([a-z])', r'\1\2', text) lines = [] for line in text.split('\n'): line = line.strip() if len(line) > 2: lines.append(line) elif line == '': lines.append(line) text = '\n'.join(lines) seen = set() final_lines = [] for line in text.split('\n'): line_lower = line.lower().strip() if len(line_lower) < 50 and line_lower in seen: continue if len(line_lower) > 5: seen.add(line_lower) final_lines.append(line) return '\n'.join(final_lines).strip()