File size: 20,454 Bytes
5ef3f54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
try:
    from smolagents import CodeAgent, ToolCallingAgent, InferenceClientModel as HfApiModel, Tool
except ImportError:
    from smolagents import CodeAgent, ToolCallingAgent, ApiModel as HfApiModel, Tool
from db import db_client
from vector import vector_model
import os
import httpx
import io
import time

class GraphQueryTool(Tool):
    name = "graph_query"
    description = "Executes a Cypher query against the FalkorDB world memory. Pass parameters as keyword arguments."
    inputs = {"query": {"type": "string", "description": "The Cypher query to execute.", "nullable": True}}
    output_type = "string"

    def forward(self, query: str = None, **kwargs):
        result = db_client.query(query, kwargs)
        return str(result)

    def validate_arguments(self):
        # Override to allow extra parameters in the agent's forward call
        pass

class VectorSearchTool(Tool):
    name = "vector_search"
    description = "Searches for nodes in the graph based on semantic similarity. Use index 'Rule' for game logic or 'Memory' for history."
    inputs = {
        "text": {"type": "string", "description": "The text to search for."},
        "index": {"type": "string", "description": "The vector index to use ('Rule', 'Memory').", "default": "Rule", "nullable": True}
    }
    output_type = "string"

    def forward(self, text: str, index: str = "Rule"):
        vector = vector_model.encode(text)
        # Updated to use db.idx.vector.queryNodes for newer FalkorDB versions
        query = f"CALL db.idx.vector.queryNodes('{index}', 'embedding', 5, vecf32($vector)) YIELD node, score RETURN node.title, node.name, node.description, node.summary, node.content, score"
        result = db_client.query(query, {"vector": vector})
        return str(result)

class RegisterKnowledgeTool(Tool):
    name = "register_knowledge"
    description = "Registers detailed information about an entity (Item, Skill, Location) in both the knowledge graph and vector database."
    inputs = {
        "label": {"type": "string", "description": "The type of node (e.g., 'Item', 'Skill', 'Location', 'NPC')."},
        "name": {"type": "string", "description": "The unique name of the entity."},
        "content": {"type": "string", "description": "Detailed textual description and mechanics for vector search."},
        "properties": {"type": "string", "description": "JSON string of additional attributes (e.g., '{ \"damage\": \"1d8\", \"type\": \"Slashing\" }').", "default": "{}", "nullable": True}
    }
    output_type = "string"

    def forward(self, label: str, name: str, content: str, properties: str = "{}"):
        import json
        props = {}
        try:
            props = json.loads(properties)
        except: pass

        props["name"] = name
        props["id"] = name # Ensure id is set for consistent API retrieval
        props["content"] = content
        props["embedding"] = vector_model.encode(content)

        # Build Cypher CREATE query
        # Use vecf32 wrapper for the embedding property
        other_props = {k: v for k, v in props.items() if k != "embedding"}
        prop_parts = [f"{k}: ${k}" for k in other_props.keys()]
        if "embedding" in props:
            prop_parts.append("embedding: vecf32($embedding)")

        prop_str = ", ".join(prop_parts)
        query = f"CREATE (n:{label} {{{prop_str}}})"
        db_client.query(query, props)
        return f"Successfully registered {label} '{name}' with vectorized content."

class ImageGenerationTool(Tool):
    name = "image_generation"
    description = "Generates an image based on a prompt using Hugging Face Inference API."
    inputs = {
        "prompt": {"type": "string", "description": "The description of the image to generate."},
        "model_id": {"type": "string", "description": "The model ID to use.", "default": "Anashel/rpg", "nullable": True}
    }
    output_type = "string"

    def forward(self, prompt: str, model_id: str = "Anashel/rpg"):
        if not model_id or model_id.lower() == "disabled":
            return "IMAGE_GENERATION_DISABLED"
        api_url = f"https://api-inference.huggingface.co/models/{model_id}"
        headers = {"Authorization": f"Bearer {os.getenv('HF_TOKEN')}"}
        try:
            with httpx.Client(timeout=60.0) as client:
                response = client.post(api_url, headers=headers, json={"inputs": prompt})
                if response.status_code == 200:
                    return "IMAGE_GENERATED_SUCCESSFULLY"
                return f"Error: {response.status_code} {response.text}"
        except Exception as e:
            return f"Exception: {str(e)}"

class AudioGenerationTool(Tool):
    name = "audio_generation"
    description = "Generates ambient background music or sounds based on a prompt."
    inputs = {
        "prompt": {"type": "string", "description": "The description of the audio to generate."},
        "model_id": {"type": "string", "description": "The model ID to use.", "default": "cvssp/audioldm2", "nullable": True}
    }
    output_type = "string"

    def forward(self, prompt: str, model_id: str = "cvssp/audioldm2"):
        if not model_id or model_id.lower() == "disabled":
            return "AUDIO_GENERATION_DISABLED"
        api_url = f"https://api-inference.huggingface.co/models/{model_id}"
        headers = {"Authorization": f"Bearer {os.getenv('HF_TOKEN')}"}
        try:
            with httpx.Client(timeout=60.0) as client:
                response = client.post(api_url, headers=headers, json={"inputs": prompt})
                if response.status_code == 200:
                    return "AUDIO_GENERATED_SUCCESSFULLY"
                return f"Error: {response.status_code} {response.text}"
        except Exception as e:
            return f"Exception: {str(e)}"

class SmartValidatorTool(Tool):
    name = "smart_validator"
    description = "Validates proposed Cypher queries against game rules and world logic using a reasoning model."
    inputs = {
        "user_input": {"type": "string", "description": "The user's original action/input.", "nullable": True},
        "proposed_cypher": {"type": "string", "description": "The Cypher query proposed by the Archivist.", "nullable": True},
        "rules_context": {"type": "string", "description": "Relevant game rules retrieved from memory.", "nullable": True}
    }
    output_type = "string"

    def __init__(self, model, *args, **kwargs):
        self.model = model
        super().__init__(*args, **kwargs)

    def forward(self, user_input: str = None, proposed_cypher: str = None, rules_context: str = None):
        prompt = f"""
        Review the following proposed database update for a roleplaying game.

        User Action: {user_input}
        Game Rules: {rules_context}
        Proposed Cypher: {proposed_cypher}

        Is this Cypher query logically sound according to the rules and user intent?
        Check for:
        1. Coordinates: Ensure x, y coordinates are updated if movement occurred.
           MOVEMENT RULE: A character can move a maximum of ONE grid square per interaction (e.g. from [0,0] to [0,1] or [1,1]). Teleporting multiple squares is ILLEGAL.
        2. Splash damage (if applicable).
        3. Illegal actions (e.g. dead NPCs acting).
        4. Missing environmental effects.

        If it's valid, return 'VALIDATED'.
        If it's missing something or incorrect, describe what needs to be changed in detail.
        """
        response = self.model.generate(messages=[{"role": "user", "content": prompt}])
        if hasattr(response, 'content'):
            return response.content
        return str(response)

STORYTELLING_MODELS = [
    "meta-llama/Llama-3.1-8B-Instruct",
    "Qwen/Qwen2.5-7B-Instruct",
    "meta-llama/Llama-3.3-70B-Instruct",
    "Qwen/Qwen2.5-72B-Instruct",
    "Sao10K/L3-8B-Stheno-v3.2",
    "mistralai/Mistral-Nemo-Instruct-2407",
    "google/gemma-2-9b-it",
    "google/gemma-2-27b-it",
    "microsoft/Phi-3.5-mini-instruct",
    "mistralai/Mistral-7B-Instruct-v0.3"
]

REASONING_MODELS = [
    "meta-llama/Llama-3.1-8B-Instruct",
    "Qwen/Qwen2.5-7B-Instruct",
    "meta-llama/Llama-3.3-70B-Instruct",
    "deepseek-ai/DeepSeek-R1-Distill-Llama-8B",
    "deepseek-ai/DeepSeek-R1-Distill-Qwen-7B",
    "mistralai/Mistral-7B-Instruct-v0.3",
    "Qwen/Qwen2.5-Coder-32B-Instruct"
]

IMAGE_MODELS = [
    "black-forest-labs/FLUX.1-schnell",
    "stabilityai/stable-diffusion-xl-base-1.0",
    "Anashel/rpg",
    "ByteDance/SDXL-Lightning",
    "runwayml/stable-diffusion-v1-5",
    "stabilityai/stable-diffusion-2-1",
    "stabilityai/sdxl-turbo",
    "SG161222/RealVisXL_V4.0",
    "Lykon/AnyLoRA"
]

AUDIO_MODELS = [
    "facebook/musicgen-small",
    "facebook/musicgen-medium",
    "cvssp/audioldm2",
    "stabilityai/stable-audio-open-1.0",
    "facebook/audiogen-medium"
]

_model_cache = {}

def get_model(model_id=None):
    model_id = model_id or STORYTELLING_MODELS[0]
    if model_id in _model_cache:
        return _model_cache[model_id]
    token = os.getenv("HF_TOKEN")
    # InferenceClientModel is the preferred class for HF Inference API in latest smolagents
    model = HfApiModel(model_id=model_id, token=token)
    _model_cache[model_id] = model
    return model

def create_archivist(model_id=None):
    target_model_id = model_id or REASONING_MODELS[0]

    def try_instantiate(mid):
        retries = 2
        while retries >= 0:
            try:
                print(f"Attempting to instantiate Archivist with model: {mid} (Retries left: {retries})")
                m = get_model(mid)
                return CodeAgent(
                tools=[GraphQueryTool(), SmartValidatorTool(model=m), ImageGenerationTool(), AudioGenerationTool()],
                model=m,
                name="Archivist",
                description="""Updates world state based on user actions and retrieved context.
                Writes Cypher and uses smart_validator to ensure logic is perfect.
                If the user moves to a coordinate that doesn't exist yet, procedurally generate a new Location and potentially NPCs.
                CRITICAL: Every Location, NPC, and Player MUST have 'x' and 'y' integer coordinates and a unique 'id' (use its 'name' for the 'id').
                Every Player and NPC MUST have: strength, dexterity, constitution, intelligence, wisdom, charisma, hp, and max_hp.
                Movement is limited to ONE grid square per interaction.
                Handle status expiration by checking the 'conditions' list for tags like '_Until_EndOfNextTurn' and removing them if triggered.
                Ensure new entities have unique coordinates and logical links to the world.""",
                additional_authorized_imports=['random', 'collections', 'datetime', 'math', 're', 'itertools']
                )
            except Exception as e:
                print(f"Failed to instantiate Archivist with {mid}: {e}")
                retries -= 1
                if retries >= 0:
                    time.sleep(2) # Small delay before retry

        print(f"Giving up on Archivist model {mid} after all retries.")
        return None

    agent = try_instantiate(target_model_id)
    if not agent:
        for mid in [m for m in REASONING_MODELS if m != target_model_id]:
            agent = try_instantiate(mid)
            if agent: break

    return agent or try_instantiate(STORYTELLING_MODELS[0])

def run_recruiter(user_input: str, history: list, model_id: str = None):
    """Dialogue-based character recruiter agent. Enforces fantasy setting and generates draft stats."""

    def try_run(mid):
        try:
            m = get_model(mid)
            # Fresh agent per request to ensure context is correctly captured
            search_tool = VectorSearchTool()
            # Use CodeAgent but with strict instructions against interactive code.
            # CodeAgent is more reliable on HF Inference API than ToolCallingAgent
            # for many models due to tool_choice restrictions.
            agent = CodeAgent(
                tools=[search_tool],
                model=m,
                add_base_tools=False,
                name="Recruiter",
                description="A character creation assistant that guides players through Race, Class, and Name selection in a medieval fantasy setting.",
                additional_authorized_imports=['random', 'math']
            )
            return agent.run(task)
        except Exception as e:
            print(f"Recruiter failed with model {mid}: {e}")
            return None

    # Define task before usage
    history_str = "\n".join([f"{'User' if getattr(msg, 'is_user', True) else 'Recruiter'}: {getattr(msg, 'text', str(msg))}" for msg in history])

    task = f"""
    Guide the player through character creation in a medieval fantasy world.

    ### STYLE:
    - Be EXTREMELY concise. Use short, direct questions and confirmations.
    - Provide a very brief thematic confirmation when a choice is made (e.g., "Humans dominate the world, great choice. What is your class?").
    - Do NOT provide unsolicited help, clues, or reminders of the setting unless specifically asked.
    - If the user asks for information about the world or rules, use 'vector_search' on index 'Rule' to find the answer.

    ### SEQUENCE:
    1. RACE -> 2. CLASS -> 3. NAME.
    Only after all three (Race, Class, AND Name) are explicitly chosen and confirmed by the user, generate the character draft in JSON.
    - DO NOT generate JSON during the initial greeting.
    - DO NOT generate JSON for incomplete characters.
    - DO NOT use placeholder or default values (like "...", "TBD", or "Unknown") if the user hasn't made a choice.
    - If it is the first interaction (empty history), simply greet the player and ask for their RACE. Do NOT suggest a character.

    ### CRITICAL RULES:
    - Theme: Medieval fantasy.
    - Attributes: D&D 2024 scores (8-18).
    - Formatting: Title Case for Skills and Items.
    - Starting Context: Generate a "home location" (internal use, do not mention).

    ### RESPONSE FORMAT:
    - Your response should be natural dialogue.
    - DO NOT use the 'input()' function or any interactive Python code.
    - If you need more information, simply ask the user in your response dialogue.
    - IF AND ONLY IF the character is complete (Name, Race, Class known), append a JSON block:
    ```json
    {{
      "name": "...",
      "race": "...",
      "class": "...",
      "stats": {{"strength": 10, "dexterity": 10, "constitution": 10, "intelligence": 10, "wisdom": 10, "charisma": 10}},
      "skills": ["...", "..."],
      "items": ["...", "..."],
      "starting_context": "..."
    }}
    ```

    ### CURRENT CONTEXT:
    {history_str}

    ### USER INPUT:
    {user_input}
    """

    target_model_id = model_id or STORYTELLING_MODELS[0]
    response = try_run(target_model_id)

    if not response:
        # Fallback to other models
        all_fallbacks = list(dict.fromkeys(REASONING_MODELS + STORYTELLING_MODELS))
        for mid in all_fallbacks:
            if mid == target_model_id: continue
            response = try_run(mid)
            if response: break

    return str(response or "I am sorry, I am having trouble connecting to my creative centers. Please try again in a moment.")

def create_registrar(model_id=None):
    """Agent that handles the formal database registration of a new character and world start."""
    # Prioritize storytelling models for descriptive item/location generation
    target_model_id = model_id or STORYTELLING_MODELS[0]

    def try_instantiate(mid):
        retries = 2
        while retries >= 0:
            try:
                print(f"Attempting to instantiate Registrar with model: {mid}")
                m = get_model(mid)
                return CodeAgent(
                tools=[GraphQueryTool(), RegisterKnowledgeTool()],
                model=m,
                name="Registrar",
                description="""Registers the player character, starting location, items, and skills in the database.
                Input: A validated character draft (JSON).

                TASKS:
                1. Player: Create a 'Player' node. MANDATORY: id, name, race, class, strength, dexterity, constitution, intelligence, wisdom, charisma, hp, max_hp, x:0, y:0.
                   NOTE: 'id' should be the character's name.
                   CALCULATION: max_hp = 5 + ceil(strength / 2) + constitution.
                2. Items: For EACH item in the draft:
                   - Generate a vivid description and mechanical stats (damage, weight, properties).
                   - Use register_knowledge(label='Item', name=..., content=description, properties=json_stats) to vectorize and save.
                   - Link Item to Player: MATCH (p:Player), (i:Item {name:...}) CREATE (p)-[:HAS_ITEM]->(i).
                3. Skills: For EACH skill in the draft:
                   - Generate a vivid description and mechanical effects (bonus, cost, status effects).
                   - Use register_knowledge(label='Skill', name=..., content=description, properties=json_effects) to vectorize and save.
                   - Link Skill to Player: MATCH (p:Player), (s:Skill {name:...}) CREATE (p)-[:HAS_SKILL]->(s).
                4. Location: "Prime" the starting location from 'starting_context'.
                   - Generate a rich, environmental description of the location at x:0, y:0.
                   - Use register_knowledge(label='Location', name=..., content=description, properties='{"x":0, "y":0}') to vectorize and save.""",
                additional_authorized_imports=['random', 'math', 'json']
                )
            except Exception as e:
                print(f"Failed to instantiate Registrar with {mid}: {e}")
                retries -= 1
        return None

    agent = try_instantiate(target_model_id)
    return agent or try_instantiate(REASONING_MODELS[0])

def create_librarian(model_id=None):
    target_model_id = model_id or REASONING_MODELS[0]

    def try_instantiate(mid):
        retries = 2
        while retries >= 0:
            try:
                print(f"Attempting to instantiate Librarian with model: {mid} (Retries left: {retries})")
                m = get_model(mid)
                return CodeAgent(
                tools=[GraphQueryTool(), VectorSearchTool()],
                model=m,
                name="Librarian",
                description="""Retrieves relevant rules, historical memories, and current world state context.
                Summarize findings to provide the most critical information for decision-making.
                Prioritize game mechanics and recent events."""
                )
            except Exception as e:
                print(f"Failed to instantiate Librarian with {mid}: {e}")
                retries -= 1
                if retries >= 0:
                    time.sleep(2)

        print(f"Giving up on Librarian model {mid} after all retries.")
        return None

    agent = try_instantiate(target_model_id)
    if not agent:
        for mid in [m for m in REASONING_MODELS if m != target_model_id]:
            agent = try_instantiate(mid)
            if agent: break

    return agent or try_instantiate(STORYTELLING_MODELS[0])

class Narrator:
    def __init__(self):
        self.primary_model_id = STORYTELLING_MODELS[0]

    def run(self, context: str, user_input: str, changes: str, model_id: str = None):
        target_model_id = model_id or self.primary_model_id
        models_to_try = [target_model_id] + [m for m in STORYTELLING_MODELS if m != target_model_id]

        last_error = None
        for mid in models_to_try:
            try:
                model = get_model(mid)
                prompt = f"""
                You are the Narrator.
                Lore/Context: {context}
                World Changes: {changes}
                User Action: {user_input}

                Describe the outcome vividly. Raw prose only.
                """
                response = model.generate(messages=[{"role": "user", "content": prompt}])
                if hasattr(response, 'content'):
                    return response.content
                return str(response)
            except Exception as e:
                print(f"Model {mid} failed: {e}")
                last_error = e
                continue
        return f"Error: {last_error}"

narrator = Narrator()