|
|
import os |
|
|
import re |
|
|
import io |
|
|
import time |
|
|
import tiktoken |
|
|
from dotenv import load_dotenv |
|
|
from google import genai |
|
|
from google.genai import types |
|
|
from PIL import Image |
|
|
try: |
|
|
from lumaai import LumaAI |
|
|
except ImportError: |
|
|
LumaAI = None |
|
|
|
|
|
load_dotenv() |
|
|
|
|
|
|
|
|
def get_gemini_client(): |
|
|
api_key = os.environ.get("GEMINI_API_KEY") |
|
|
if not api_key: |
|
|
raise ValueError( |
|
|
"GEMINI_API_KEY environment variable is not set. " |
|
|
"Please set it with: export GEMINI_API_KEY='your-api-key'" |
|
|
) |
|
|
return genai.Client(api_key=api_key) |
|
|
|
|
|
|
|
|
def count_tokens(text: str, model: str = "cl100k_base") -> int: |
|
|
encoding = tiktoken.get_encoding(model) |
|
|
return len(encoding.encode(text)) |
|
|
|
|
|
|
|
|
def chunk_text( |
|
|
text: str, |
|
|
chunk_size: int = 500, |
|
|
chunk_overlap: int = 50, |
|
|
encoding_name: str = "cl100k_base" |
|
|
) -> list[str]: |
|
|
encoding = tiktoken.get_encoding(encoding_name) |
|
|
tokens = encoding.encode(text) |
|
|
|
|
|
chunks = [] |
|
|
start = 0 |
|
|
|
|
|
while start < len(tokens): |
|
|
end = start + chunk_size |
|
|
chunk_tokens = tokens[start:end] |
|
|
chunk_text = encoding.decode(chunk_tokens) |
|
|
chunks.append(chunk_text) |
|
|
start = end - chunk_overlap |
|
|
|
|
|
if start <= 0 and len(chunks) > 0: |
|
|
break |
|
|
|
|
|
return chunks |
|
|
|
|
|
|
|
|
def generate_embedding(client: genai.Client, text: str) -> list[float]: |
|
|
result = client.models.embed_content( |
|
|
model="models/text-embedding-004", |
|
|
contents=text, |
|
|
config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT") |
|
|
) |
|
|
return result.embeddings[0].values |
|
|
|
|
|
|
|
|
def generate_query_embedding(client: genai.Client, query: str) -> list[float]: |
|
|
result = client.models.embed_content( |
|
|
model="models/text-embedding-004", |
|
|
contents=query, |
|
|
config=types.EmbedContentConfig(task_type="RETRIEVAL_QUERY") |
|
|
) |
|
|
return result.embeddings[0].values |
|
|
|
|
|
|
|
|
def generate_batch_embeddings( |
|
|
client: genai.Client, |
|
|
texts: list[str], |
|
|
batch_size: int = 100 |
|
|
) -> list[list[float]]: |
|
|
all_embeddings = [] |
|
|
|
|
|
for i in range(0, len(texts), batch_size): |
|
|
batch = texts[i:i + batch_size] |
|
|
result = client.models.embed_content( |
|
|
model="models/text-embedding-004", |
|
|
contents=batch, |
|
|
config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT") |
|
|
) |
|
|
batch_embeddings = [emb.values for emb in result.embeddings] |
|
|
all_embeddings.extend(batch_embeddings) |
|
|
|
|
|
return all_embeddings |
|
|
|
|
|
|
|
|
def generate_answer( |
|
|
client: genai.Client, |
|
|
question: str, |
|
|
context: str, |
|
|
model: str = "gemini-2.5-flash", |
|
|
image_data: bytes = None, |
|
|
image_mime_type: str = None, |
|
|
conversation_history: list = None |
|
|
) -> str: |
|
|
question_lower = question.lower().strip() |
|
|
|
|
|
greetings = ["hello", "hi", "hey", "good morning", "good afternoon", "good evening", "greetings"] |
|
|
is_greeting = any(question_lower.startswith(g) or question_lower == g for g in greetings) |
|
|
|
|
|
if is_greeting: |
|
|
prompt = f"""You are SabiTax, a friendly and conversational legal and tax expert assistant specializing in Nigerian law. |
|
|
The user has greeted you. Respond naturally and warmly, like you're chatting with a friend. Introduce yourself as SabiTax in a casual, friendly way, and let them know you're here to help with any questions about Nigerian tax laws. |
|
|
|
|
|
User: {question} |
|
|
|
|
|
Respond conversationally - be warm, natural, and brief (2-3 sentences). Use a friendly, approachable tone.""" |
|
|
else: |
|
|
name_questions = ["what is your name", "who are you", "what are you called", "what's your name", "tell me your name", "introduce yourself"] |
|
|
is_name_question = any(q in question_lower for q in name_questions) |
|
|
|
|
|
if is_name_question: |
|
|
prompt = f"""You are SabiTax, a friendly and conversational legal and tax expert assistant specializing in Nigerian law and taxation. |
|
|
|
|
|
User: {question} |
|
|
|
|
|
Respond naturally and conversationally. Introduce yourself as SabiTax in a friendly, casual way. Explain that you help people understand Nigerian tax laws in simple terms, like you're explaining to a friend. Keep it brief, warm, and conversational.""" |
|
|
else: |
|
|
history_text = "" |
|
|
if conversation_history and len(conversation_history) > 0: |
|
|
history_text = "\n\nPrevious conversation:\n" |
|
|
for msg in conversation_history[-6:]: |
|
|
role = "User" if msg["role"] == "user" else "You (SabiTax)" |
|
|
history_text += f"{role}: {msg['content']}\n" |
|
|
history_text += "\n" |
|
|
|
|
|
prompt = f"""You are SabiTax, Nigeria's comprehensive tax assistant. You help Nigerians with all aspects of tax compliance, from understanding laws to filing returns and analyzing documents. |
|
|
|
|
|
Your expertise covers: |
|
|
- **General Tax Questions**: Rates, deadlines, deductions, tax planning |
|
|
- **Tax Calculations**: Personal income tax, company tax, VAT, capital gains tax |
|
|
- **Form Guidance**: How to complete and file tax forms (Form A, Form B, etc.) |
|
|
- **Compliance Requirements**: What to declare, when to file, penalties |
|
|
- **Document Analysis**: Reviewing tax returns, financial statements, receipts |
|
|
- **Tax Optimization**: Legal ways to minimize tax liability |
|
|
- **Business Taxes**: Company registration, payroll taxes, VAT compliance |
|
|
|
|
|
Your communication style: |
|
|
- Professional yet approachable, like a trusted tax consultant |
|
|
- Explain complex concepts in simple, everyday Nigerian English |
|
|
- Use clear examples: "If you earn N3 million yearly, your tax is calculated as..." |
|
|
- Be encouraging and patient with all tax-related questions |
|
|
- Always emphasize compliance and accuracy |
|
|
|
|
|
How you handle different types of questions: |
|
|
|
|
|
**For General Tax Questions:** |
|
|
- Provide accurate information from current Nigerian tax laws |
|
|
- Break down calculations step-by-step |
|
|
- Reference specific sections of tax acts |
|
|
- Give practical examples relevant to Nigerian taxpayers |
|
|
|
|
|
**For Document Analysis:** |
|
|
- Identify the type of document and its tax purpose |
|
|
- Extract key tax information (amounts, dates, taxpayer details) |
|
|
- Check for compliance with Nigerian tax requirements |
|
|
- Point out missing information or potential issues |
|
|
|
|
|
**For Tax Calculations:** |
|
|
- Use current tax rates and brackets |
|
|
- Show step-by-step calculations |
|
|
- Explain deductions and allowances |
|
|
- Calculate final tax payable |
|
|
|
|
|
**For Filing Guidance:** |
|
|
- Explain which forms to use and when |
|
|
- Guide through form completion |
|
|
- Highlight common mistakes to avoid |
|
|
- Provide filing deadlines and methods |
|
|
|
|
|
Tax-specific guidelines: |
|
|
- Always reference current legislation (2025 acts take precedence) |
|
|
- Use Nigerian Naira (₦) for amounts |
|
|
- Reference FIRS (Federal Inland Revenue Service) procedures |
|
|
- Explain tax terms clearly: "Assessable profit means your business income after expenses" |
|
|
- Highlight tax incentives and reliefs available to Nigerians |
|
|
- Emphasize voluntary compliance over penalties |
|
|
|
|
|
Important rules: |
|
|
- Base answers on the provided context from indexed tax documents |
|
|
- If context doesn't have enough information, clearly state this |
|
|
- When analyzing documents, be thorough but practical |
|
|
- Suggest consulting a professional tax advisor for complex situations |
|
|
- Always promote ethical tax practices and full compliance |
|
|
- If something is unclear, ask for clarification rather than assuming |
|
|
|
|
|
{history_text}Context from documents: |
|
|
{context} |
|
|
|
|
|
Question: {question} |
|
|
|
|
|
Provide comprehensive, accurate tax guidance. Whether it's a general question, document analysis, or calculation help, explain everything clearly and help the user understand their Nigerian tax obligations.""" |
|
|
|
|
|
if image_data: |
|
|
img = Image.open(io.BytesIO(image_data)) |
|
|
contents = [prompt, img] |
|
|
else: |
|
|
contents = prompt |
|
|
|
|
|
max_retries = 3 |
|
|
retry_delay = 2 |
|
|
|
|
|
for attempt in range(max_retries): |
|
|
try: |
|
|
response = client.models.generate_content( |
|
|
model=model, |
|
|
contents=contents |
|
|
) |
|
|
return response.text |
|
|
except Exception as e: |
|
|
error_str = str(e) |
|
|
if "503" in error_str or "UNAVAILABLE" in error_str or "overloaded" in error_str.lower(): |
|
|
if attempt < max_retries - 1: |
|
|
wait_time = retry_delay * (2 ** attempt) |
|
|
time.sleep(wait_time) |
|
|
continue |
|
|
else: |
|
|
raise Exception("Gemini service is temporarily overloaded. Please try again in a few moments.") |
|
|
else: |
|
|
raise e |
|
|
|
|
|
raise Exception("Failed to generate answer after multiple attempts") |
|
|
|
|
|
|
|
|
def analyze_financial_documents(document_texts: list, image_data_list: list) -> dict: |
|
|
"""Analyze financial documents and extract key financial metrics.""" |
|
|
|
|
|
|
|
|
all_text = "\n\n".join(document_texts) if document_texts else "" |
|
|
|
|
|
|
|
|
if image_data_list: |
|
|
all_text += "\n\n[Image Analysis]: Please analyze any financial data visible in the images." |
|
|
|
|
|
if not all_text.strip(): |
|
|
return { |
|
|
"income": "No income data found", |
|
|
"expenses": "No expense data found", |
|
|
"savings": "No savings data found", |
|
|
"achievements": "No financial achievements identified", |
|
|
"summary": "Unable to analyze financial documents" |
|
|
} |
|
|
|
|
|
analysis_prompt = f"""Analyze this financial document and extract key financial information. Focus on: |
|
|
|
|
|
1. **Income**: Total income, sources, trends |
|
|
2. **Expenses**: Major expense categories and amounts |
|
|
3. **Savings**: Savings rate, emergency fund, investments |
|
|
4. **Achievements**: Financial milestones, debt reduction, investment growth |
|
|
5. **Summary**: Overall financial health and key insights |
|
|
|
|
|
Document content: |
|
|
{all_text[:10000]} |
|
|
|
|
|
Provide a structured analysis with specific amounts where available. If amounts aren't specified, use descriptive terms like "significant" or "moderate".""" |
|
|
|
|
|
try: |
|
|
client = get_gemini_client() |
|
|
response = client.models.generate_content( |
|
|
model="gemini-2.5-flash", |
|
|
contents=analysis_prompt |
|
|
) |
|
|
|
|
|
|
|
|
analysis_text = response.text |
|
|
|
|
|
|
|
|
analysis = { |
|
|
"income": "Analysis completed - see detailed summary", |
|
|
"expenses": "Analysis completed - see detailed summary", |
|
|
"savings": "Analysis completed - see detailed summary", |
|
|
"achievements": "Analysis completed - see detailed summary", |
|
|
"summary": analysis_text[:1000] |
|
|
} |
|
|
|
|
|
return analysis |
|
|
|
|
|
except Exception as e: |
|
|
return { |
|
|
"income": "Error analyzing documents", |
|
|
"expenses": "Error analyzing documents", |
|
|
"savings": "Error analyzing documents", |
|
|
"achievements": "Error analyzing documents", |
|
|
"summary": f"Analysis failed: {str(e)}" |
|
|
} |
|
|
|
|
|
|
|
|
def create_video_script(financial_analysis: dict) -> dict: |
|
|
"""Create a professional video script using Gemini 2.5 Flash.""" |
|
|
|
|
|
summary = financial_analysis.get("summary", "A year of financial growth and achievements") |
|
|
income = financial_analysis.get("income", "Steady income growth") |
|
|
expenses = financial_analysis.get("expenses", "Managed expenses effectively") |
|
|
savings = financial_analysis.get("savings", "Built savings successfully") |
|
|
achievements = financial_analysis.get("achievements", "Achieved financial goals") |
|
|
|
|
|
script_prompt = f"""Create a professional year-in-review financial video script based on this user data: |
|
|
|
|
|
FINANCIAL DATA: |
|
|
- Summary: {summary} |
|
|
- Income: {income} |
|
|
- Expenses: {expenses} |
|
|
- Savings: {savings} |
|
|
- Achievements: {achievements} |
|
|
|
|
|
Create a cinematic 15-20 second video with 4-6 scenes. Output JSON with: |
|
|
|
|
|
{{ |
|
|
"scenes": [ |
|
|
{{ |
|
|
"scene_number": 1, |
|
|
"duration": "3-4 seconds", |
|
|
"description": "Brief scene description", |
|
|
"video_prompt": "Detailed prompt for video generation AI", |
|
|
"voiceover": "Voiceover text for this scene" |
|
|
}} |
|
|
], |
|
|
"music_mood": "uplifting, motivational, professional", |
|
|
"overall_theme": "Financial success and growth", |
|
|
"total_duration": "15-20 seconds" |
|
|
}} |
|
|
|
|
|
Make it professional, celebratory, and focused on financial achievements. Use Nigerian context where appropriate.""" |
|
|
|
|
|
try: |
|
|
client = get_gemini_client() |
|
|
|
|
|
response = client.models.generate_content( |
|
|
model="gemini-2.5-flash", |
|
|
contents=script_prompt |
|
|
) |
|
|
|
|
|
|
|
|
try: |
|
|
import json |
|
|
script_data = json.loads(response.text.strip()) |
|
|
return script_data |
|
|
except json.JSONDecodeError: |
|
|
|
|
|
return { |
|
|
"scenes": [ |
|
|
{ |
|
|
"scene_number": 1, |
|
|
"duration": "5 seconds", |
|
|
"description": "Financial overview and achievements", |
|
|
"video_prompt": f"Create a professional financial recap video showing: {summary}. Use animated charts, money visualizations, and success indicators.", |
|
|
"voiceover": f"This year brought remarkable financial growth: {summary}" |
|
|
} |
|
|
], |
|
|
"music_mood": "uplifting, professional", |
|
|
"overall_theme": "Financial success story", |
|
|
"total_duration": "15 seconds" |
|
|
} |
|
|
|
|
|
except Exception as e: |
|
|
|
|
|
return { |
|
|
"scenes": [ |
|
|
{ |
|
|
"scene_number": 1, |
|
|
"duration": "5 seconds", |
|
|
"description": "Financial success visualization", |
|
|
"video_prompt": f"Professional financial recap: {summary}. Show growing charts, money animations, success celebrations.", |
|
|
"voiceover": f"A year of financial achievements: {summary}" |
|
|
} |
|
|
], |
|
|
"music_mood": "motivational", |
|
|
"overall_theme": "Financial growth", |
|
|
"total_duration": "10 seconds" |
|
|
} |
|
|
|
|
|
|
|
|
def generate_yearly_wrap_video(financial_analysis: dict) -> str: |
|
|
"""Generate yearly financial wrap video using Gemini + LumaAI pipeline.""" |
|
|
|
|
|
|
|
|
script_data = create_video_script(financial_analysis) |
|
|
|
|
|
|
|
|
scenes = script_data.get("scenes", []) |
|
|
if not scenes: |
|
|
|
|
|
scenes = [{ |
|
|
"scene_number": 1, |
|
|
"duration": "5 seconds", |
|
|
"description": "Financial overview", |
|
|
"video_prompt": f"Professional financial recap video showing: {financial_analysis.get('summary', 'Financial achievements')}. Animated charts, money visualizations, success indicators.", |
|
|
"voiceover": f"Financial highlights: {financial_analysis.get('summary', 'Great year')}" |
|
|
}] |
|
|
|
|
|
|
|
|
combined_prompt = f"""Create a cinematic financial year-in-review video: |
|
|
|
|
|
{scenes[0]['video_prompt']} |
|
|
|
|
|
Style: Professional, celebratory, modern financial visualization with animated charts, money effects, and success celebrations.""" |
|
|
|
|
|
try: |
|
|
if LumaAI is None: |
|
|
raise Exception("Video generation not available - lumaai not installed") |
|
|
|
|
|
luma_api_key = os.environ.get("LUMAAI_API_KEY") |
|
|
if not luma_api_key: |
|
|
raise Exception("LUMAAI_API_KEY environment variable is not set") |
|
|
|
|
|
client = LumaAI(auth_token=luma_api_key) |
|
|
|
|
|
generation = client.generations.create( |
|
|
prompt=combined_prompt, |
|
|
model="ray-2", |
|
|
aspect_ratio="16:9", |
|
|
resolution="720p", |
|
|
duration="5s", |
|
|
loop=False |
|
|
) |
|
|
|
|
|
|
|
|
max_attempts = 30 |
|
|
for attempt in range(max_attempts): |
|
|
status = client.generations.get(generation.id) |
|
|
|
|
|
if status.state == "completed": |
|
|
return status.assets.video |
|
|
elif status.state == "failed": |
|
|
failure_reason = getattr(status, 'failure_reason', 'Unknown failure') |
|
|
raise Exception(f"Video generation failed: {failure_reason}") |
|
|
elif status.state in ["dreaming", "in_progress", "pending"]: |
|
|
|
|
|
pass |
|
|
else: |
|
|
|
|
|
print(f"Unknown generation state: {status.state}") |
|
|
|
|
|
time.sleep(5) |
|
|
|
|
|
raise Exception("Video generation timed out after 2.5 minutes") |
|
|
|
|
|
except Exception as e: |
|
|
error_msg = str(e).lower() |
|
|
if "unavailable" in error_msg or "rate limit" in error_msg or "quota" in error_msg: |
|
|
|
|
|
return None |
|
|
else: |
|
|
raise Exception(f"Video generation failed: {str(e)}") |
|
|
|
|
|
|
|
|
def clean_text(text: str) -> str: |
|
|
text = text.encode('utf-8', errors='ignore').decode('utf-8') |
|
|
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text) |
|
|
|
|
|
text = re.sub(r'Page \d+ of \d+', '', text, flags=re.IGNORECASE) |
|
|
text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE) |
|
|
text = re.sub(r'^[-_=]{3,}$', '', text, flags=re.MULTILINE) |
|
|
|
|
|
text = re.sub(r'\.{3,}', '...', text) |
|
|
text = re.sub(r'_{2,}', ' ', text) |
|
|
text = re.sub(r'-{3,}', ' - ', text) |
|
|
|
|
|
text = re.sub(r'\t+', ' ', text) |
|
|
text = re.sub(r' +', ' ', text) |
|
|
text = re.sub(r'\n{3,}', '\n\n', text) |
|
|
|
|
|
text = re.sub(r'(\d+)\s*\.\s*(\d+)', r'\1.\2', text) |
|
|
text = re.sub(r'([a-z])\s*-\s*([a-z])', r'\1\2', text) |
|
|
|
|
|
lines = [] |
|
|
for line in text.split('\n'): |
|
|
line = line.strip() |
|
|
if len(line) > 2: |
|
|
lines.append(line) |
|
|
elif line == '': |
|
|
lines.append(line) |
|
|
text = '\n'.join(lines) |
|
|
|
|
|
seen = set() |
|
|
final_lines = [] |
|
|
for line in text.split('\n'): |
|
|
line_lower = line.lower().strip() |
|
|
if len(line_lower) < 50 and line_lower in seen: |
|
|
continue |
|
|
if len(line_lower) > 5: |
|
|
seen.add(line_lower) |
|
|
final_lines.append(line) |
|
|
|
|
|
return '\n'.join(final_lines).strip() |
|
|
|