sabitax / rag /utils.py
nexusbert's picture
Upload 14 files
d43d504 verified
import os
import re
import io
import time
import tiktoken
from dotenv import load_dotenv
from google import genai
from google.genai import types
from PIL import Image
try:
from lumaai import LumaAI
except ImportError:
LumaAI = None
load_dotenv()
def get_gemini_client():
api_key = os.environ.get("GEMINI_API_KEY")
if not api_key:
raise ValueError(
"GEMINI_API_KEY environment variable is not set. "
"Please set it with: export GEMINI_API_KEY='your-api-key'"
)
return genai.Client(api_key=api_key)
def count_tokens(text: str, model: str = "cl100k_base") -> int:
encoding = tiktoken.get_encoding(model)
return len(encoding.encode(text))
def chunk_text(
text: str,
chunk_size: int = 500,
chunk_overlap: int = 50,
encoding_name: str = "cl100k_base"
) -> list[str]:
encoding = tiktoken.get_encoding(encoding_name)
tokens = encoding.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = start + chunk_size
chunk_tokens = tokens[start:end]
chunk_text = encoding.decode(chunk_tokens)
chunks.append(chunk_text)
start = end - chunk_overlap
if start <= 0 and len(chunks) > 0:
break
return chunks
def generate_embedding(client: genai.Client, text: str) -> list[float]:
result = client.models.embed_content(
model="models/text-embedding-004",
contents=text,
config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT")
)
return result.embeddings[0].values
def generate_query_embedding(client: genai.Client, query: str) -> list[float]:
result = client.models.embed_content(
model="models/text-embedding-004",
contents=query,
config=types.EmbedContentConfig(task_type="RETRIEVAL_QUERY")
)
return result.embeddings[0].values
def generate_batch_embeddings(
client: genai.Client,
texts: list[str],
batch_size: int = 100
) -> list[list[float]]:
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
result = client.models.embed_content(
model="models/text-embedding-004",
contents=batch,
config=types.EmbedContentConfig(task_type="RETRIEVAL_DOCUMENT")
)
batch_embeddings = [emb.values for emb in result.embeddings]
all_embeddings.extend(batch_embeddings)
return all_embeddings
def generate_answer(
client: genai.Client,
question: str,
context: str,
model: str = "gemini-2.5-flash",
image_data: bytes = None,
image_mime_type: str = None,
conversation_history: list = None
) -> str:
question_lower = question.lower().strip()
greetings = ["hello", "hi", "hey", "good morning", "good afternoon", "good evening", "greetings"]
is_greeting = any(question_lower.startswith(g) or question_lower == g for g in greetings)
if is_greeting:
prompt = f"""You are SabiTax, a friendly and conversational legal and tax expert assistant specializing in Nigerian law.
The user has greeted you. Respond naturally and warmly, like you're chatting with a friend. Introduce yourself as SabiTax in a casual, friendly way, and let them know you're here to help with any questions about Nigerian tax laws.
User: {question}
Respond conversationally - be warm, natural, and brief (2-3 sentences). Use a friendly, approachable tone."""
else:
name_questions = ["what is your name", "who are you", "what are you called", "what's your name", "tell me your name", "introduce yourself"]
is_name_question = any(q in question_lower for q in name_questions)
if is_name_question:
prompt = f"""You are SabiTax, a friendly and conversational legal and tax expert assistant specializing in Nigerian law and taxation.
User: {question}
Respond naturally and conversationally. Introduce yourself as SabiTax in a friendly, casual way. Explain that you help people understand Nigerian tax laws in simple terms, like you're explaining to a friend. Keep it brief, warm, and conversational."""
else:
history_text = ""
if conversation_history and len(conversation_history) > 0:
history_text = "\n\nPrevious conversation:\n"
for msg in conversation_history[-6:]:
role = "User" if msg["role"] == "user" else "You (SabiTax)"
history_text += f"{role}: {msg['content']}\n"
history_text += "\n"
prompt = f"""You are SabiTax, Nigeria's comprehensive tax assistant. You help Nigerians with all aspects of tax compliance, from understanding laws to filing returns and analyzing documents.
Your expertise covers:
- **General Tax Questions**: Rates, deadlines, deductions, tax planning
- **Tax Calculations**: Personal income tax, company tax, VAT, capital gains tax
- **Form Guidance**: How to complete and file tax forms (Form A, Form B, etc.)
- **Compliance Requirements**: What to declare, when to file, penalties
- **Document Analysis**: Reviewing tax returns, financial statements, receipts
- **Tax Optimization**: Legal ways to minimize tax liability
- **Business Taxes**: Company registration, payroll taxes, VAT compliance
Your communication style:
- Professional yet approachable, like a trusted tax consultant
- Explain complex concepts in simple, everyday Nigerian English
- Use clear examples: "If you earn N3 million yearly, your tax is calculated as..."
- Be encouraging and patient with all tax-related questions
- Always emphasize compliance and accuracy
How you handle different types of questions:
**For General Tax Questions:**
- Provide accurate information from current Nigerian tax laws
- Break down calculations step-by-step
- Reference specific sections of tax acts
- Give practical examples relevant to Nigerian taxpayers
**For Document Analysis:**
- Identify the type of document and its tax purpose
- Extract key tax information (amounts, dates, taxpayer details)
- Check for compliance with Nigerian tax requirements
- Point out missing information or potential issues
**For Tax Calculations:**
- Use current tax rates and brackets
- Show step-by-step calculations
- Explain deductions and allowances
- Calculate final tax payable
**For Filing Guidance:**
- Explain which forms to use and when
- Guide through form completion
- Highlight common mistakes to avoid
- Provide filing deadlines and methods
Tax-specific guidelines:
- Always reference current legislation (2025 acts take precedence)
- Use Nigerian Naira (₦) for amounts
- Reference FIRS (Federal Inland Revenue Service) procedures
- Explain tax terms clearly: "Assessable profit means your business income after expenses"
- Highlight tax incentives and reliefs available to Nigerians
- Emphasize voluntary compliance over penalties
Important rules:
- Base answers on the provided context from indexed tax documents
- If context doesn't have enough information, clearly state this
- When analyzing documents, be thorough but practical
- Suggest consulting a professional tax advisor for complex situations
- Always promote ethical tax practices and full compliance
- If something is unclear, ask for clarification rather than assuming
{history_text}Context from documents:
{context}
Question: {question}
Provide comprehensive, accurate tax guidance. Whether it's a general question, document analysis, or calculation help, explain everything clearly and help the user understand their Nigerian tax obligations."""
if image_data:
img = Image.open(io.BytesIO(image_data))
contents = [prompt, img]
else:
contents = prompt
max_retries = 3
retry_delay = 2
for attempt in range(max_retries):
try:
response = client.models.generate_content(
model=model,
contents=contents
)
return response.text
except Exception as e:
error_str = str(e)
if "503" in error_str or "UNAVAILABLE" in error_str or "overloaded" in error_str.lower():
if attempt < max_retries - 1:
wait_time = retry_delay * (2 ** attempt)
time.sleep(wait_time)
continue
else:
raise Exception("Gemini service is temporarily overloaded. Please try again in a few moments.")
else:
raise e
raise Exception("Failed to generate answer after multiple attempts")
def analyze_financial_documents(document_texts: list, image_data_list: list) -> dict:
"""Analyze financial documents and extract key financial metrics."""
# Combine all document texts
all_text = "\n\n".join(document_texts) if document_texts else ""
# Add image analysis if images are provided
if image_data_list:
all_text += "\n\n[Image Analysis]: Please analyze any financial data visible in the images."
if not all_text.strip():
return {
"income": "No income data found",
"expenses": "No expense data found",
"savings": "No savings data found",
"achievements": "No financial achievements identified",
"summary": "Unable to analyze financial documents"
}
analysis_prompt = f"""Analyze this financial document and extract key financial information. Focus on:
1. **Income**: Total income, sources, trends
2. **Expenses**: Major expense categories and amounts
3. **Savings**: Savings rate, emergency fund, investments
4. **Achievements**: Financial milestones, debt reduction, investment growth
5. **Summary**: Overall financial health and key insights
Document content:
{all_text[:10000]}
Provide a structured analysis with specific amounts where available. If amounts aren't specified, use descriptive terms like "significant" or "moderate"."""
try:
client = get_gemini_client()
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=analysis_prompt
)
# Parse the response to extract structured data
analysis_text = response.text
# Simple parsing - in production you might want more sophisticated parsing
analysis = {
"income": "Analysis completed - see detailed summary",
"expenses": "Analysis completed - see detailed summary",
"savings": "Analysis completed - see detailed summary",
"achievements": "Analysis completed - see detailed summary",
"summary": analysis_text[:1000] # Truncate for response size
}
return analysis
except Exception as e:
return {
"income": "Error analyzing documents",
"expenses": "Error analyzing documents",
"savings": "Error analyzing documents",
"achievements": "Error analyzing documents",
"summary": f"Analysis failed: {str(e)}"
}
def create_video_script(financial_analysis: dict) -> dict:
"""Create a professional video script using Gemini 2.5 Flash."""
summary = financial_analysis.get("summary", "A year of financial growth and achievements")
income = financial_analysis.get("income", "Steady income growth")
expenses = financial_analysis.get("expenses", "Managed expenses effectively")
savings = financial_analysis.get("savings", "Built savings successfully")
achievements = financial_analysis.get("achievements", "Achieved financial goals")
script_prompt = f"""Create a professional year-in-review financial video script based on this user data:
FINANCIAL DATA:
- Summary: {summary}
- Income: {income}
- Expenses: {expenses}
- Savings: {savings}
- Achievements: {achievements}
Create a cinematic 15-20 second video with 4-6 scenes. Output JSON with:
{{
"scenes": [
{{
"scene_number": 1,
"duration": "3-4 seconds",
"description": "Brief scene description",
"video_prompt": "Detailed prompt for video generation AI",
"voiceover": "Voiceover text for this scene"
}}
],
"music_mood": "uplifting, motivational, professional",
"overall_theme": "Financial success and growth",
"total_duration": "15-20 seconds"
}}
Make it professional, celebratory, and focused on financial achievements. Use Nigerian context where appropriate."""
try:
client = get_gemini_client()
response = client.models.generate_content(
model="gemini-2.5-flash",
contents=script_prompt
)
# Try to parse as JSON, fallback to text processing
try:
import json
script_data = json.loads(response.text.strip())
return script_data
except json.JSONDecodeError:
# Fallback: create structured script from text
return {
"scenes": [
{
"scene_number": 1,
"duration": "5 seconds",
"description": "Financial overview and achievements",
"video_prompt": f"Create a professional financial recap video showing: {summary}. Use animated charts, money visualizations, and success indicators.",
"voiceover": f"This year brought remarkable financial growth: {summary}"
}
],
"music_mood": "uplifting, professional",
"overall_theme": "Financial success story",
"total_duration": "15 seconds"
}
except Exception as e:
# Ultimate fallback
return {
"scenes": [
{
"scene_number": 1,
"duration": "5 seconds",
"description": "Financial success visualization",
"video_prompt": f"Professional financial recap: {summary}. Show growing charts, money animations, success celebrations.",
"voiceover": f"A year of financial achievements: {summary}"
}
],
"music_mood": "motivational",
"overall_theme": "Financial growth",
"total_duration": "10 seconds"
}
def generate_yearly_wrap_video(financial_analysis: dict) -> str:
"""Generate yearly financial wrap video using Gemini + LumaAI pipeline."""
# Step 1: Gemini creates the video script
script_data = create_video_script(financial_analysis)
# Step 2: Use the script to create video prompt
scenes = script_data.get("scenes", [])
if not scenes:
# Fallback scene
scenes = [{
"scene_number": 1,
"duration": "5 seconds",
"description": "Financial overview",
"video_prompt": f"Professional financial recap video showing: {financial_analysis.get('summary', 'Financial achievements')}. Animated charts, money visualizations, success indicators.",
"voiceover": f"Financial highlights: {financial_analysis.get('summary', 'Great year')}"
}]
# Combine all scene prompts into one comprehensive prompt
combined_prompt = f"""Create a cinematic financial year-in-review video:
{scenes[0]['video_prompt']}
Style: Professional, celebratory, modern financial visualization with animated charts, money effects, and success celebrations."""
try:
if LumaAI is None:
raise Exception("Video generation not available - lumaai not installed")
luma_api_key = os.environ.get("LUMAAI_API_KEY")
if not luma_api_key:
raise Exception("LUMAAI_API_KEY environment variable is not set")
client = LumaAI(auth_token=luma_api_key)
generation = client.generations.create(
prompt=combined_prompt,
model="ray-2",
aspect_ratio="16:9",
resolution="720p",
duration="5s",
loop=False
)
# Poll for completion
max_attempts = 30 # 2.5 minutes max
for attempt in range(max_attempts):
status = client.generations.get(generation.id)
if status.state == "completed":
return status.assets.video
elif status.state == "failed":
failure_reason = getattr(status, 'failure_reason', 'Unknown failure')
raise Exception(f"Video generation failed: {failure_reason}")
elif status.state in ["dreaming", "in_progress", "pending"]:
# Still processing, continue polling
pass
else:
# Unknown state, log and continue
print(f"Unknown generation state: {status.state}")
time.sleep(5)
raise Exception("Video generation timed out after 2.5 minutes")
except Exception as e:
error_msg = str(e).lower()
if "unavailable" in error_msg or "rate limit" in error_msg or "quota" in error_msg:
# Service temporarily unavailable - return None instead of failing
return None
else:
raise Exception(f"Video generation failed: {str(e)}")
def clean_text(text: str) -> str:
text = text.encode('utf-8', errors='ignore').decode('utf-8')
text = re.sub(r'[\x00-\x08\x0b\x0c\x0e-\x1f\x7f-\x9f]', '', text)
text = re.sub(r'Page \d+ of \d+', '', text, flags=re.IGNORECASE)
text = re.sub(r'^\d+\s*$', '', text, flags=re.MULTILINE)
text = re.sub(r'^[-_=]{3,}$', '', text, flags=re.MULTILINE)
text = re.sub(r'\.{3,}', '...', text)
text = re.sub(r'_{2,}', ' ', text)
text = re.sub(r'-{3,}', ' - ', text)
text = re.sub(r'\t+', ' ', text)
text = re.sub(r' +', ' ', text)
text = re.sub(r'\n{3,}', '\n\n', text)
text = re.sub(r'(\d+)\s*\.\s*(\d+)', r'\1.\2', text)
text = re.sub(r'([a-z])\s*-\s*([a-z])', r'\1\2', text)
lines = []
for line in text.split('\n'):
line = line.strip()
if len(line) > 2:
lines.append(line)
elif line == '':
lines.append(line)
text = '\n'.join(lines)
seen = set()
final_lines = []
for line in text.split('\n'):
line_lower = line.lower().strip()
if len(line_lower) < 50 and line_lower in seen:
continue
if len(line_lower) > 5:
seen.add(line_lower)
final_lines.append(line)
return '\n'.join(final_lines).strip()