File size: 8,461 Bytes
38ab39c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1c47eb5
 
 
 
38ab39c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
from fastapi import FastAPI, Form, HTTPException, BackgroundTasks
from fastapi.responses import Response
from cora_engine import CoraEngine
from cora_curator import CoraCurator
from cora_vision import CoraVision
from cora_memory import CoraMemory
import io
import os
import uuid

from pydantic import BaseModel

app = FastAPI(title="Cora API", description="Fake Historical Archive Generator")
engine = CoraEngine()
curator = CoraCurator()
vision = CoraVision()
memory = CoraMemory()

class AgentPrompt(BaseModel):
    prompt: str
    use_curator: bool = True

@app.get("/health")
def health_check():
    """Checks if the engine and HF connection are ready."""
    status = {"status": "online", "model": engine.MODEL_ID}
    if not engine.client:
        status["status"] = "offline (engine)"
    if not curator.client:
        status["curator"] = "offline"
    else:
        status["curator"] = curator.MODEL_ID
    
    # Check Vision/Memory (simple check if initialized)
    status["vision"] = "online" if vision.clip_model else "offline"
    status["memory"] = "online" if memory.client else "offline"
    
    return status

def archive_generation(image, prompt):
    """Helper to save image and metadata to Visual Memory."""
    try:
        filename = f"{uuid.uuid4()}.png"
        filepath = os.path.join("archive_images", filename)
        
        # Save to disk
        image.save(filepath)
        
        # Analyze (Vision)
        embedding = vision.embed_image(image)
        tags = vision.detect_tags(image)
        
        # Save to Memory (Vector DB)
        memory.save(filepath, embedding, prompt, tags)
        print(f"✅ Background Archiving Complete: {filepath} with tags {tags}")
    except Exception as e:
        print(f"❌ Background Archiving Failed: {e}")

@app.post("/agent/generate")
async def agent_generate(request: AgentPrompt, background_tasks: BackgroundTasks):
    """

    Agent-friendly endpoint receiving JSON.

    Returns the raw PNG image.

    """
    try:
        # Validate input
        if not request.prompt or not request.prompt.strip():
            raise HTTPException(
                status_code=400, 
                detail="Prompt cannot be empty. Please provide a description."
            )
        
        # 1. Curate (Refine Prompt)
        final_prompt = request.prompt
        if request.use_curator:
            try:
                final_prompt = curator.refine_prompt(request.prompt)
            except Exception as curator_error:
                print(f"Curator failed: {curator_error}, using original prompt")
                # Fallback to original if curator fails
                final_prompt = request.prompt
        
        # 2. Generate
        result = engine.generate_from_text(final_prompt)
        
        # 3. Archive (Background Task)
        # We pass a copy or the object itself. Since PIL images are in memory,
        # we need to be careful. However, 'result' is a PIL Image.
        # It's safer to pass the image object. background_tasks will run after return.
        background_tasks.add_task(archive_generation, result, final_prompt)
        
        # Return as PNG
        img_byte_arr = io.BytesIO()
        result.save(img_byte_arr, format='PNG')
        return Response(content=img_byte_arr.getvalue(), media_type="image/png")
        
    except HTTPException:
        raise
    except ValueError as e:
        # User input errors
        raise HTTPException(
            status_code=400, 
            detail=f"Invalid request: {str(e)}"
        )
    except RuntimeError as e:
        # Server/API errors
        error_msg = str(e).lower()
        if "timeout" in error_msg or "took too long" in error_msg:
            raise HTTPException(
                status_code=500,
                detail="Image generation timed out. Try a simpler prompt."
            )
        else:
            raise HTTPException(
                status_code=500, 
                detail=f"Generation failed: {str(e)}"
            )
    except Exception as e:
        print(f"Unexpected server error: {e}")
        raise HTTPException(
            status_code=500, 
            detail="An unexpected error occurred. Please try again."
        )

@app.post("/v1/archive")
async def generate_archive(

    background_tasks: BackgroundTasks,

    prompt: str = Form(...)

):
    """

    Generates an 'archive' style image from text.

    """
    try:
        # 1. Curate (Auto-refine for UI)
        enhanced_prompt = curator.refine_prompt(prompt)

        # 2. Generate
        result = engine.generate_from_text(enhanced_prompt)
        
        # 3. Archive (Background Task)
        background_tasks.add_task(archive_generation, result, enhanced_prompt)
        
        # Return as PNG
        img_byte_arr = io.BytesIO()
        result.save(img_byte_arr, format='PNG')
        return Response(content=img_byte_arr.getvalue(), media_type="image/png")
        
    except ValueError as e:
        raise HTTPException(status_code=400, detail=str(e))
    except RuntimeError as e:
        raise HTTPException(status_code=500, detail=str(e))
    except Exception as e:
        print(f"Server Error: {e}")
        raise HTTPException(status_code=500, detail="Internal Server Error")

class SearchQuery(BaseModel):
    query: str
    limit: int = 10

@app.post("/curator/search")
async def curator_search(request: SearchQuery):
    """

    Semantic search for the UI gallery with intelligent filtering.

    """
    try:
        # 1. Embed query
        emb = vision.embed_text(request.query)
        if not emb:
            return {"results": []}
        
        # 2. Extract potential tags from query for filtering
        query_lower = request.query.lower()
        tag_hints = []
        source_hint = None
        
        # Detect cultural/temporal keywords
        cultural_markers = {
            "roman": ["roman", "rome"],
            "greek": ["greek", "greece", "hellenic"],
            "egyptian": ["egypt", "egyptian"],
            "medieval": ["medieval", "middle ages"],
            "renaissance": ["renaissance"], 
            "enlightment century": ["enlightment century"],
            "industrial revolution":["industrial revolution"],
            "modern times" : ["modern times", "20th century", "21st century"],
        }
        
        for culture, keywords in cultural_markers.items():
            if any(kw in query_lower for kw in keywords):
                tag_hints.extend(keywords)
        
        # 3. Use hybrid search if we detected cultural markers
        if tag_hints:
            results = memory.search_hybrid(emb, k=request.limit, tag_filter=tag_hints)
        else:
            # Fallback to pure semantic if no specific markers
            results = memory.search_by_vector(emb, k=request.limit)
        
        # 4. Format result
        images = []
        if results['ids']:
            ids = results['ids'][0]
            metadatas = results['metadatas'][0]
            distances = results['distances'][0]
            
            for i, uid in enumerate(ids):
                path = metadatas[i].get('path')
                tags = metadatas[i].get('tags')
                prompt = metadatas[i].get('prompt')
                if path and os.path.exists(path):
                    # Convert local path to URL
                    filename = os.path.basename(path)
                    image_url = f"http://localhost:8000/archive_images/{filename}"
                    
                    images.append({
                        "path": image_url,  # Now a URL, not a local path
                        "tags": tags,
                        "prompt": prompt,
                        "score": float(distances[i])
                    })
        return {"results": images}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

# Mount static files to serve images to UI if needed
from fastapi.staticfiles import StaticFiles
if not os.path.exists("archive_images"):
    os.makedirs("archive_images")
app.mount("/archive_images", StaticFiles(directory="archive_images"), name="archive_images")

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)