|
|
""" |
|
|
OCR and portfolio parsing module. |
|
|
|
|
|
Handles: |
|
|
- Text extraction from portfolio screenshots using Tesseract OCR |
|
|
- Parsing tickers and amounts using regex |
|
|
- JSON validation for user-edited portfolio data |
|
|
- Image preprocessing for better OCR accuracy |
|
|
""" |
|
|
|
|
|
import re |
|
|
import json |
|
|
from typing import Dict, Tuple, Optional |
|
|
from PIL import Image, ImageEnhance, ImageFilter |
|
|
import pytesseract |
|
|
import numpy as np |
|
|
|
|
|
|
|
|
MAX_TICKERS = 100 |
|
|
|
|
|
|
|
|
|
|
|
TICKER_PATTERNS = [ |
|
|
|
|
|
r'([A-Z]{1,5})\s*[\$€£]?\s*([\d,]+\.?\d*)', |
|
|
|
|
|
r'[\$€£]?\s*([\d,]+\.?\d*)\s+([A-Z]{1,5})', |
|
|
|
|
|
r'([A-Z]{1,5})\s*\n\s*[\$€£]?\s*([\d,]+\.?\d*)', |
|
|
|
|
|
r'([A-Z]{1,5})\s*[:|]\s*[\$€£]?\s*([\d,]+\.?\d*)', |
|
|
|
|
|
|
|
|
r'[\d,]+\.?\d*\s+([A-Z]{2,5})\s*[-–]\s*[\d,]+', |
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
REVOLUT_PATTERN = r'([\d,]+\.?\d*)\s*[\$€£]\s*\n.*?\s+([A-Z]{2,5})\s*[-–]' |
|
|
|
|
|
|
|
|
def is_dark_theme(image: Image.Image) -> bool: |
|
|
""" |
|
|
Detect if image uses dark theme (dark background, light text). |
|
|
|
|
|
Args: |
|
|
image: PIL Image object |
|
|
|
|
|
Returns: |
|
|
True if dark theme detected, False otherwise |
|
|
""" |
|
|
|
|
|
gray = image.convert('L') |
|
|
|
|
|
|
|
|
width, height = gray.size |
|
|
sample_region = gray.crop(( |
|
|
width // 4, |
|
|
height // 4, |
|
|
3 * width // 4, |
|
|
3 * height // 4 |
|
|
)) |
|
|
|
|
|
|
|
|
pixels = np.array(sample_region) |
|
|
avg_brightness = np.mean(pixels) |
|
|
|
|
|
|
|
|
return avg_brightness < 128 |
|
|
|
|
|
|
|
|
def preprocess_image(image: Image.Image) -> Image.Image: |
|
|
""" |
|
|
Preprocess image for better OCR accuracy. |
|
|
|
|
|
Applies: |
|
|
- Dark theme detection and inversion if needed |
|
|
- Grayscale conversion |
|
|
- Contrast enhancement |
|
|
- Sharpening |
|
|
- Noise reduction |
|
|
- Upscaling for small images |
|
|
|
|
|
Args: |
|
|
image: PIL Image object |
|
|
|
|
|
Returns: |
|
|
Preprocessed PIL Image object |
|
|
""" |
|
|
|
|
|
if is_dark_theme(image): |
|
|
|
|
|
from PIL import ImageOps |
|
|
image = ImageOps.invert(image.convert('RGB')).convert('L') |
|
|
else: |
|
|
|
|
|
image = image.convert('L') |
|
|
|
|
|
|
|
|
enhancer = ImageEnhance.Contrast(image) |
|
|
image = enhancer.enhance(2.0) |
|
|
|
|
|
|
|
|
image = image.filter(ImageFilter.SHARPEN) |
|
|
|
|
|
|
|
|
width, height = image.size |
|
|
if width < 800 or height < 800: |
|
|
scale = max(800 / width, 800 / height) |
|
|
new_size = (int(width * scale), int(height * scale)) |
|
|
image = image.resize(new_size, Image.Resampling.LANCZOS) |
|
|
|
|
|
return image |
|
|
|
|
|
|
|
|
def extract_text_from_image(image: Image.Image) -> Tuple[Optional[str], Optional[str]]: |
|
|
""" |
|
|
Extract text from uploaded portfolio screenshot using Tesseract OCR. |
|
|
|
|
|
Uses image preprocessing and custom Tesseract config for better accuracy. |
|
|
|
|
|
Args: |
|
|
image: PIL Image object |
|
|
|
|
|
Returns: |
|
|
Tuple of (extracted_text, error_message) |
|
|
- If successful: (text, None) |
|
|
- If failed: (None, error_message) |
|
|
""" |
|
|
try: |
|
|
|
|
|
pytesseract.get_tesseract_version() |
|
|
|
|
|
|
|
|
processed_image = preprocess_image(image) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
custom_config = r'--oem 3 --psm 6' |
|
|
|
|
|
|
|
|
text = pytesseract.image_to_string(processed_image, config=custom_config) |
|
|
|
|
|
|
|
|
if not text.strip(): |
|
|
|
|
|
custom_config = r'--oem 3 --psm 4' |
|
|
text = pytesseract.image_to_string(processed_image, config=custom_config) |
|
|
|
|
|
|
|
|
if not text.strip(): |
|
|
return None, "No text detected in image. Please upload a clearer screenshot or enter data manually." |
|
|
|
|
|
return text, None |
|
|
|
|
|
except pytesseract.TesseractNotFoundError: |
|
|
return None, "OCR engine (Tesseract) not available. Please check installation." |
|
|
except Exception as e: |
|
|
return None, f"OCR failed: {str(e)}" |
|
|
|
|
|
|
|
|
def parse_revolut_format(text: str) -> Dict[str, float]: |
|
|
""" |
|
|
Parse Revolut-specific format. |
|
|
|
|
|
Revolut format (typically 2 lines per stock): |
|
|
Line 1: [icon] Company Name [portfolio_value]$ |
|
|
Line 2: [shares] TICKER[separator] [price_per_share]$ [change%] |
|
|
|
|
|
Examples: |
|
|
Line 1: "@ Micron Technology 3 212,85 $" |
|
|
Line 2: "8,31 MU» 386,56 $ 4 109,73%" |
|
|
|
|
|
Handles variations: |
|
|
- Spaces in numbers: "3 256,40" |
|
|
- Different separators after ticker: "-", ":", "*", "»", "«" |
|
|
- Numbers without decimals: "172312" |
|
|
- Negative values in change column |
|
|
|
|
|
Args: |
|
|
text: Extracted text from OCR |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping tickers to amounts |
|
|
""" |
|
|
portfolio = {} |
|
|
lines = text.split('\n') |
|
|
|
|
|
|
|
|
i = 0 |
|
|
while i < len(lines): |
|
|
current_line = lines[i].strip() |
|
|
|
|
|
|
|
|
if not current_line: |
|
|
i += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
is_ticker_line = re.match(r'^[\d,]+[.,]?\d*\s*[A-Z]{2,5}[\s\-–:*«»]', current_line) |
|
|
|
|
|
if is_ticker_line: |
|
|
|
|
|
i += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
value_match = re.search(r'(?<![\-–])([\d\s,]+(?:[.,]\d{1,2})?)[:\']?\s*[\$€£]', current_line) |
|
|
|
|
|
if value_match: |
|
|
portfolio_value_str = value_match.group(1) |
|
|
|
|
|
|
|
|
|
|
|
clean_value = portfolio_value_str.replace(' ', '') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if not re.search(r'[.,]', clean_value) and len(clean_value) > 2: |
|
|
|
|
|
clean_value = clean_value[:-2] + '.' + clean_value[-2:] |
|
|
else: |
|
|
|
|
|
clean_value = clean_value.replace(',', '.') |
|
|
|
|
|
try: |
|
|
amount = float(clean_value) |
|
|
|
|
|
|
|
|
if amount < 50: |
|
|
i += 1 |
|
|
continue |
|
|
except ValueError: |
|
|
i += 1 |
|
|
continue |
|
|
|
|
|
|
|
|
ticker_found = False |
|
|
for lookahead in range(1, 3): |
|
|
if i + lookahead >= len(lines): |
|
|
break |
|
|
|
|
|
check_line = lines[i + lookahead].strip() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ticker_match = re.search(r'[\d,]+[.,]?\d*\s*([A-Z]{2,5})[\s\-–:*«»]', check_line) |
|
|
|
|
|
if ticker_match: |
|
|
ticker = ticker_match.group(1) |
|
|
|
|
|
|
|
|
if len(ticker) >= 2 and ticker not in ['AM', 'PM', 'USD', 'EUR', 'GBP', 'JPY', 'CHF']: |
|
|
|
|
|
if ticker not in portfolio: |
|
|
portfolio[ticker] = amount |
|
|
ticker_found = True |
|
|
i += lookahead + 1 |
|
|
break |
|
|
|
|
|
if not ticker_found: |
|
|
i += 1 |
|
|
else: |
|
|
i += 1 |
|
|
|
|
|
return portfolio |
|
|
|
|
|
|
|
|
def parse_portfolio(text: str) -> Dict[str, float]: |
|
|
""" |
|
|
Parse portfolio from extracted text using multiple regex patterns. |
|
|
|
|
|
Tries various patterns to handle different screenshot formats: |
|
|
- Revolut format (priority) |
|
|
- Ticker followed by amount: "AAPL 5000" or "AAPL $5,000.00" |
|
|
- Amount followed by ticker: "$5,000 AAPL" |
|
|
- Multi-line format: ticker on one line, amount on next |
|
|
- With separators: "AAPL | $5,000.00" |
|
|
|
|
|
Args: |
|
|
text: Extracted text from OCR |
|
|
|
|
|
Returns: |
|
|
Dictionary mapping tickers to amounts: {ticker: amount} |
|
|
Returns empty dict if no valid tickers found |
|
|
""" |
|
|
if not text: |
|
|
return {} |
|
|
|
|
|
|
|
|
revolut_portfolio = parse_revolut_format(text) |
|
|
if revolut_portfolio: |
|
|
return revolut_portfolio |
|
|
|
|
|
|
|
|
portfolio = {} |
|
|
|
|
|
|
|
|
for pattern in TICKER_PATTERNS: |
|
|
matches = re.findall(pattern, text, re.MULTILINE | re.IGNORECASE) |
|
|
|
|
|
for match in matches: |
|
|
try: |
|
|
|
|
|
|
|
|
group1, group2 = match |
|
|
|
|
|
|
|
|
if re.match(r'^[\d,.]+$', group1): |
|
|
amount_str = group1 |
|
|
ticker = group2.upper() |
|
|
else: |
|
|
ticker = group1.upper() |
|
|
amount_str = group2 |
|
|
|
|
|
|
|
|
if not re.match(r'^[A-Z]{1,10}$', ticker): |
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
clean_amount = re.sub(r'[\$€£,\s]', '', amount_str) |
|
|
|
|
|
clean_amount = clean_amount.replace(',', '.') |
|
|
|
|
|
|
|
|
amount = float(clean_amount) |
|
|
|
|
|
|
|
|
if amount > 1: |
|
|
|
|
|
if ticker not in portfolio or amount > portfolio[ticker]: |
|
|
portfolio[ticker] = amount |
|
|
|
|
|
except (ValueError, IndexError, AttributeError): |
|
|
|
|
|
continue |
|
|
|
|
|
|
|
|
|
|
|
false_positive_patterns = [ |
|
|
r'^ID$', r'^USD$', r'^EUR$', r'^GBP$', r'^JPY$', r'^CHF$', |
|
|
r'^AM$', r'^PM$', |
|
|
r'^JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC$', |
|
|
r'^[A-Z]{6,}$', |
|
|
] |
|
|
|
|
|
filtered_portfolio = {} |
|
|
for ticker, amount in portfolio.items(): |
|
|
is_false_positive = any(re.match(pattern, ticker) for pattern in false_positive_patterns) |
|
|
if not is_false_positive: |
|
|
filtered_portfolio[ticker] = amount |
|
|
|
|
|
return filtered_portfolio |
|
|
|
|
|
|
|
|
def validate_portfolio_json(json_str: str) -> Tuple[bool, Optional[Dict[str, float]], str]: |
|
|
""" |
|
|
Validate user-edited portfolio JSON. |
|
|
|
|
|
Expected format: {"AAPL": 5000, "GOOGL": 3000, ...} |
|
|
|
|
|
Args: |
|
|
json_str: JSON string to validate |
|
|
|
|
|
Returns: |
|
|
Tuple of (is_valid, parsed_dict, error_message) |
|
|
- If valid: (True, portfolio_dict, "") |
|
|
- If invalid: (False, None, error_message) |
|
|
""" |
|
|
if not json_str or not json_str.strip(): |
|
|
return False, None, "JSON is empty" |
|
|
|
|
|
try: |
|
|
|
|
|
data = json.loads(json_str) |
|
|
|
|
|
|
|
|
if not isinstance(data, dict): |
|
|
return False, None, "JSON must be a dictionary/object, not a list or other type" |
|
|
|
|
|
|
|
|
portfolio = {} |
|
|
for ticker, amount in data.items(): |
|
|
|
|
|
if not isinstance(ticker, str): |
|
|
return False, None, f"Ticker '{ticker}' must be a string" |
|
|
|
|
|
|
|
|
if not ticker.isupper(): |
|
|
return False, None, f"Ticker '{ticker}' should be uppercase (e.g., 'AAPL' not 'aapl')" |
|
|
|
|
|
|
|
|
if len(ticker) < 1 or len(ticker) > 10: |
|
|
return False, None, f"Ticker '{ticker}' length should be between 1-10 characters" |
|
|
|
|
|
|
|
|
try: |
|
|
amount_float = float(amount) |
|
|
except (TypeError, ValueError): |
|
|
return False, None, f"Amount for {ticker} must be a number, got: {amount}" |
|
|
|
|
|
|
|
|
if amount_float <= 0: |
|
|
return False, None, f"Amount for {ticker} must be positive, got: {amount_float}" |
|
|
|
|
|
portfolio[ticker] = amount_float |
|
|
|
|
|
|
|
|
if len(portfolio) == 0: |
|
|
return False, None, "Portfolio must contain at least one ticker" |
|
|
|
|
|
|
|
|
if len(portfolio) > MAX_TICKERS: |
|
|
return False, None, f"Portfolio exceeds maximum of {MAX_TICKERS} tickers" |
|
|
|
|
|
return True, portfolio, "" |
|
|
|
|
|
except json.JSONDecodeError as e: |
|
|
return False, None, f"Invalid JSON format: {str(e)}" |
|
|
except Exception as e: |
|
|
return False, None, f"Validation error: {str(e)}" |
|
|
|
|
|
|
|
|
def merge_portfolios(portfolios: list[Dict[str, float]]) -> Dict[str, float]: |
|
|
""" |
|
|
Merge multiple portfolio dictionaries. |
|
|
|
|
|
If the same ticker appears in multiple portfolios, amounts are summed. |
|
|
|
|
|
Args: |
|
|
portfolios: List of portfolio dictionaries |
|
|
|
|
|
Returns: |
|
|
Merged portfolio dictionary with summed amounts |
|
|
""" |
|
|
merged = {} |
|
|
|
|
|
for portfolio in portfolios: |
|
|
for ticker, amount in portfolio.items(): |
|
|
if ticker in merged: |
|
|
merged[ticker] += amount |
|
|
else: |
|
|
merged[ticker] = amount |
|
|
|
|
|
return merged |
|
|
|
|
|
|
|
|
def format_portfolio_json(portfolio: Dict[str, float], indent: int = 2) -> str: |
|
|
""" |
|
|
Format portfolio dictionary as pretty-printed JSON. |
|
|
|
|
|
Args: |
|
|
portfolio: Dictionary of {ticker: amount} |
|
|
indent: Number of spaces for indentation |
|
|
|
|
|
Returns: |
|
|
Formatted JSON string |
|
|
""" |
|
|
return json.dumps(portfolio, indent=indent, sort_keys=True) |
|
|
|