AnkiCardGenerator / agent.py
lostinthesea's picture
Update agent.py
59dd6a7 verified
from __future__ import annotations
import base64
import mimetypes
import os
import re
import tempfile
import xml.etree.ElementTree as ET
from pathlib import Path
from typing import Any, Dict, Optional
import requests
from langgraph.graph import StateGraph, START, END
from typing_extensions import TypedDict
import anthropic
import dotenv
# Load environment variables from .env file
dotenv.load_dotenv()
# ----------------------------------------------------------------------------
# 1. State definition
# ----------------------------------------------------------------------------
class AnkiGeneratorState(TypedDict, total=False):
user_requirements: str # Extra user instructions / tags
card_types: str # Allowed card types (string)
# Exactly one of the following
pdf_file: Optional[Path]
img_file: Optional[Path]
url: Optional[str]
input_type: str # "pdf" | "image" | "url"
# Internal artifacts
model_response: str
result: Dict[str, Any]
# ----------------------------------------------------------------------------
# 2. Helpers
# ----------------------------------------------------------------------------
ANTHROPIC_MODEL = "claude-sonnet-4-20250514"
client = anthropic.Anthropic()
def _file_to_b64(p: Path) -> str:
return base64.b64encode(p.read_bytes()).decode()
def _url_fetch(url: str, timeout: int = 15) -> tuple[str, bytes]:
r = requests.get(url, timeout=timeout)
r.raise_for_status()
mime = r.headers.get("content-type", "application/octet-stream").split(";")[0]
return mime, r.content
def _join_text(msg) -> str:
if isinstance(msg.content, list):
return "\n".join(part.get("text", "") for part in msg.content if part.get("type") == "text")
return str(msg.content)
def _extract_xml(text: str) -> str:
m = re.search(r"<anki_cards[\s\S]*?</anki_cards>", text, re.I)
if not m:
raise ValueError("LLM output missing <anki_cards> block")
return m.group()
def _parse_cards(xml_str: str) -> list[dict]:
root = ET.fromstring(xml_str)
cards = []
for card in root.findall("card"):
cards.append({
"type": (card.findtext("type") or "").strip(),
"front": (card.findtext("front") or "").strip(),
"back": (card.findtext("back") or "").strip(),
})
return cards
def _prompt(src_kind: str, state: AnkiGeneratorState) -> str:
return (
f"""You are an AI assistant tasked with generating Anki cards from a {src_kind}.
Follow these rules:\n"
1. Read the provided content.\n"
2. Allowed card types: {state.get("card_types", "")}\n
3. User notes: {state.get("user_requirements", "")}\n
4. output your response as an XML block with <anki_cards> root element.\n"""
)
# ----------------------------------------------------------------------------
# 3. Node implementations
# ----------------------------------------------------------------------------
def get_input_type(state: AnkiGeneratorState) -> AnkiGeneratorState:
if state.get("pdf_file"):
state["input_type"] = "pdf"
elif state.get("img_file"):
state["input_type"] = "image"
elif state.get("url"):
state["input_type"] = "url"
else:
raise ValueError("Must supply pdf_file, img_file or url")
return state
def process_pdf(state: AnkiGeneratorState) -> AnkiGeneratorState:
pdf_b64 = _file_to_b64(state["pdf_file"])
message = client.messages.create(
model=ANTHROPIC_MODEL,
max_tokens=10240,
messages=[
{
"role": "user",
"content": [
{
"type": "document",
"source": {
"type": "base64",
"media_type": "application/pdf",
"data": pdf_b64,
},
},
{"type": "text", "text": _prompt("PDF", state)},
],
}
],
)
state["model_response"] = message.content[0].text
return state
def process_image(state: AnkiGeneratorState) -> AnkiGeneratorState:
img_b64 = _file_to_b64(state["img_file"])
mime = mimetypes.guess_type(state["img_file"])[0] or "image/png"
message = client.messages.create(
model=ANTHROPIC_MODEL,
max_tokens=10240,
messages=[
{
"role": "user",
"content": [
{
"type": "image",
"source": {"type": "base64", "media_type": mime, "data": img_b64},
},
{"type": "text", "text": _prompt("image", state)},
],
}
],
)
state["model_response"] = message.content[0].text
return state
def process_url(state: AnkiGeneratorState) -> AnkiGeneratorState:
mime, raw = _url_fetch(state["url"])
if mime == "application/pdf" or state["url"].lower().endswith(".pdf"):
tmp = Path(tempfile.mkstemp(suffix=".pdf")[1])
tmp.write_bytes(raw)
state["pdf_file"] = tmp
return process_pdf(state)
if mime.startswith("image/"):
ext = mimetypes.guess_extension(mime) or ".png"
tmp = Path(tempfile.mkstemp(suffix=ext)[1])
tmp.write_bytes(raw)
state["img_file"] = tmp
return process_image(state)
text = raw.decode("utf-8", errors="ignore")[:15000]
message = client.messages.create(
model=ANTHROPIC_MODEL,
max_tokens=10240,
messages=[
{"role": "user", "content": [{"type": "text", "text": text}, {"type": "text", "text": _prompt("webpage", state)}]},
],
)
state["model_response"] = message.content[0].text
return state
def parse_and_generate(state: AnkiGeneratorState) -> AnkiGeneratorState:
print(state["model_response"])
xml_str = _extract_xml(state["model_response"])
cards = _parse_cards(xml_str)
if not cards:
raise ValueError("No cards extracted")
source = (
state.get("pdf_file") and state["pdf_file"].stem
) or (
state.get("img_file") and state["img_file"].stem
) or re.sub(r"\W+", "_", state.get("url", "source"))
state["result"] = {
"deck": {
"name": f"{source}_AnkiDeck",
"cards": cards,
"tags": [t.strip() for t in state.get("user_requirements", "").split(",") if t.strip()],
}
}
return state
# ----------------------------------------------------------------------------
# 4. Graph assembly
# ----------------------------------------------------------------------------
graph = StateGraph(AnkiGeneratorState)
for n, fn in [
("get_input_type", get_input_type),
("process_pdf", process_pdf),
("process_image", process_image),
("process_url", process_url),
("parse_and_generate", parse_and_generate),
]:
graph.add_node(n, fn)
# Conditional edges with single‑arg route func (current state only)
graph.add_edge(START, "get_input_type")
graph.add_conditional_edges(
"get_input_type",
lambda state: state["input_type"],
{"pdf": "process_pdf", "image": "process_image", "url": "process_url"},
)
for node in ["process_pdf", "process_image", "process_url"]:
graph.add_edge(node, "parse_and_generate")
graph.add_edge("parse_and_generate", END)
app_graph = graph.compile()
# ----------------------------------------------------------------------------
# 5. Public helper
# ----------------------------------------------------------------------------
def create_anki_deck(**kwargs) -> Dict[str, Any]:
state: AnkiGeneratorState = kwargs # type: ignore
final = app_graph.invoke(state)
return final["result"]