File size: 5,718 Bytes
1bd7131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
"""
Debug script to test Gemini service with API keys from environment.
Keys should be in GEMINI_KEYS environment variable, comma-separated.

Usage:
    GEMINI_KEYS="key1,key2,key3" python tests/debug_gemini_service.py
"""
import os
import sys
import asyncio
import logging
import base64
from dotenv import load_dotenv

# Load environment variables
load_dotenv()

# Add parent directory to path
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), "..")))

from services.gemini_service import GeminiService, MODELS

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# Test image path
TEST_IMAGE_PATH = os.path.join(os.path.dirname(os.path.dirname(__file__)), "test.jpg")


def load_test_image():
    """Load test image and return base64 + mime type."""
    if not os.path.exists(TEST_IMAGE_PATH):
        logger.error(f"Test image not found: {TEST_IMAGE_PATH}")
        return None, None
    
    with open(TEST_IMAGE_PATH, "rb") as f:
        image_data = f.read()
    
    base64_image = base64.b64encode(image_data).decode("utf-8")
    mime_type = "image/jpeg"
    
    logger.info(f"Loaded test image: {TEST_IMAGE_PATH} ({len(image_data)} bytes)")
    return base64_image, mime_type


async def test_generate_text(service: GeminiService, key_index: int):
    """Test simple text generation."""
    logger.info(f"[Key {key_index}] Testing text generation...")
    try:
        result = await service.generate_text("Say hello in one word.")
        logger.info(f"[Key {key_index}] Text generation result: {result[:100]}...")
        return True
    except Exception as e:
        logger.error(f"[Key {key_index}] Text generation failed: {e}")
        return False


async def test_analyze_image(service: GeminiService, key_index: int, base64_image: str, mime_type: str):
    """Test image analysis."""
    logger.info(f"[Key {key_index}] Testing image analysis...")
    try:
        result = await service.analyze_image(
            base64_image=base64_image,
            mime_type=mime_type,
            prompt="Describe this image in one sentence."
        )
        logger.info(f"[Key {key_index}] Image analysis result: {result[:100]}...")
        return True
    except Exception as e:
        logger.error(f"[Key {key_index}] Image analysis failed: {e}")
        return False


async def test_generate_animation_prompt(service: GeminiService, key_index: int, base64_image: str, mime_type: str):
    """Test animation prompt generation."""
    logger.info(f"[Key {key_index}] Testing animation prompt generation...")
    try:
        result = await service.generate_animation_prompt(
            base64_image=base64_image,
            mime_type=mime_type
        )
        logger.info(f"[Key {key_index}] Animation prompt result: {result[:100]}...")
        return True
    except Exception as e:
        logger.error(f"[Key {key_index}] Animation prompt generation failed: {e}")
        return False


async def test_key(api_key: str, key_index: int, base64_image: str, mime_type: str):
    """Test all basic operations with a single API key."""
    logger.info(f"\n{'='*50}")
    logger.info(f"Testing Key {key_index}: {api_key[:10]}...{api_key[-4:]}")
    logger.info(f"{'='*50}")
    
    try:
        service = GeminiService(api_key)
    except Exception as e:
        logger.error(f"[Key {key_index}] Failed to initialize service: {e}")
        return {"key_index": key_index, "valid": False, "error": str(e)}
    
    results = {
        "key_index": key_index,
        "key_preview": f"{api_key[:10]}...{api_key[-4:]}",
        "text_generation": await test_generate_text(service, key_index),
        "image_analysis": await test_analyze_image(service, key_index, base64_image, mime_type),
        "animation_prompt": await test_generate_animation_prompt(service, key_index, base64_image, mime_type),
    }
    
    results["valid"] = all([
        results["text_generation"],
        results["image_analysis"],
        results["animation_prompt"]
    ])
    
    return results


async def main():
    # Load test image
    base64_image, mime_type = load_test_image()
    if not base64_image:
        logger.error("Cannot run tests without test image. Please add test.jpg to project root.")
        return
    
    gemini_keys_str = os.getenv("GEMINI_KEYS", "")
    
    if not gemini_keys_str:
        logger.error("GEMINI_KEYS environment variable not set.")
        logger.info("Usage: GEMINI_KEYS='key1,key2,key3' python tests/debug_gemini_service.py")
        return
    
    keys = [k.strip() for k in gemini_keys_str.split(",") if k.strip()]
    
    if not keys:
        logger.error("No valid keys found in GEMINI_KEYS.")
        return
    
    logger.info(f"Found {len(keys)} API key(s) to test.")
    logger.info(f"Available models: {MODELS}")
    
    all_results = []
    for i, key in enumerate(keys):
        result = await test_key(key, i + 1, base64_image, mime_type)
        all_results.append(result)
    
    # Summary
    logger.info(f"\n{'='*50}")
    logger.info("SUMMARY")
    logger.info(f"{'='*50}")
    
    valid_count = sum(1 for r in all_results if r.get("valid", False))
    logger.info(f"Valid keys: {valid_count}/{len(keys)}")
    
    for result in all_results:
        status = "βœ“ VALID" if result.get("valid") else "βœ— INVALID"
        logger.info(f"  Key {result['key_index']}: {status}")
        if not result.get("valid"):
            for test_name in ["text_generation", "image_analysis", "animation_prompt"]:
                if test_name in result and not result[test_name]:
                    logger.info(f"    - {test_name}: FAILED")


if __name__ == "__main__":
    asyncio.run(main())