|
|
from __future__ import annotations |
|
|
|
|
|
import base64 |
|
|
import mimetypes |
|
|
import os |
|
|
import re |
|
|
import tempfile |
|
|
import xml.etree.ElementTree as ET |
|
|
from pathlib import Path |
|
|
from typing import Any, Dict, Optional |
|
|
|
|
|
import requests |
|
|
from langgraph.graph import StateGraph, START, END |
|
|
from typing_extensions import TypedDict |
|
|
|
|
|
import anthropic |
|
|
import dotenv |
|
|
|
|
|
dotenv.load_dotenv() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class AnkiGeneratorState(TypedDict, total=False): |
|
|
user_requirements: str |
|
|
card_types: str |
|
|
|
|
|
|
|
|
pdf_file: Optional[Path] |
|
|
img_file: Optional[Path] |
|
|
url: Optional[str] |
|
|
|
|
|
input_type: str |
|
|
|
|
|
|
|
|
model_response: str |
|
|
result: Dict[str, Any] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
ANTHROPIC_MODEL = "claude-sonnet-4-20250514" |
|
|
client = anthropic.Anthropic() |
|
|
|
|
|
|
|
|
def _file_to_b64(p: Path) -> str: |
|
|
return base64.b64encode(p.read_bytes()).decode() |
|
|
|
|
|
|
|
|
def _url_fetch(url: str, timeout: int = 15) -> tuple[str, bytes]: |
|
|
r = requests.get(url, timeout=timeout) |
|
|
r.raise_for_status() |
|
|
mime = r.headers.get("content-type", "application/octet-stream").split(";")[0] |
|
|
return mime, r.content |
|
|
|
|
|
|
|
|
def _join_text(msg) -> str: |
|
|
if isinstance(msg.content, list): |
|
|
return "\n".join(part.get("text", "") for part in msg.content if part.get("type") == "text") |
|
|
return str(msg.content) |
|
|
|
|
|
|
|
|
def _extract_xml(text: str) -> str: |
|
|
m = re.search(r"<anki_cards[\s\S]*?</anki_cards>", text, re.I) |
|
|
if not m: |
|
|
raise ValueError("LLM output missing <anki_cards> block") |
|
|
return m.group() |
|
|
|
|
|
|
|
|
def _parse_cards(xml_str: str) -> list[dict]: |
|
|
root = ET.fromstring(xml_str) |
|
|
cards = [] |
|
|
for card in root.findall("card"): |
|
|
cards.append({ |
|
|
"type": (card.findtext("type") or "").strip(), |
|
|
"front": (card.findtext("front") or "").strip(), |
|
|
"back": (card.findtext("back") or "").strip(), |
|
|
}) |
|
|
return cards |
|
|
|
|
|
|
|
|
def _prompt(src_kind: str, state: AnkiGeneratorState) -> str: |
|
|
return ( |
|
|
f"""You are an AI assistant tasked with generating Anki cards from a {src_kind}. |
|
|
Follow these rules:\n" |
|
|
1. Read the provided content.\n" |
|
|
2. Allowed card types: {state.get("card_types", "")}\n |
|
|
3. User notes: {state.get("user_requirements", "")}\n |
|
|
4. output your response as an XML block with <anki_cards> root element.\n""" |
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_input_type(state: AnkiGeneratorState) -> AnkiGeneratorState: |
|
|
if state.get("pdf_file"): |
|
|
state["input_type"] = "pdf" |
|
|
elif state.get("img_file"): |
|
|
state["input_type"] = "image" |
|
|
elif state.get("url"): |
|
|
state["input_type"] = "url" |
|
|
else: |
|
|
raise ValueError("Must supply pdf_file, img_file or url") |
|
|
return state |
|
|
|
|
|
|
|
|
def process_pdf(state: AnkiGeneratorState) -> AnkiGeneratorState: |
|
|
pdf_b64 = _file_to_b64(state["pdf_file"]) |
|
|
message = client.messages.create( |
|
|
model=ANTHROPIC_MODEL, |
|
|
max_tokens=10240, |
|
|
messages=[ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{ |
|
|
"type": "document", |
|
|
"source": { |
|
|
"type": "base64", |
|
|
"media_type": "application/pdf", |
|
|
"data": pdf_b64, |
|
|
}, |
|
|
}, |
|
|
{"type": "text", "text": _prompt("PDF", state)}, |
|
|
], |
|
|
} |
|
|
], |
|
|
) |
|
|
state["model_response"] = message.content[0].text |
|
|
return state |
|
|
|
|
|
|
|
|
def process_image(state: AnkiGeneratorState) -> AnkiGeneratorState: |
|
|
img_b64 = _file_to_b64(state["img_file"]) |
|
|
mime = mimetypes.guess_type(state["img_file"])[0] or "image/png" |
|
|
message = client.messages.create( |
|
|
model=ANTHROPIC_MODEL, |
|
|
max_tokens=10240, |
|
|
messages=[ |
|
|
{ |
|
|
"role": "user", |
|
|
"content": [ |
|
|
{ |
|
|
"type": "image", |
|
|
"source": {"type": "base64", "media_type": mime, "data": img_b64}, |
|
|
}, |
|
|
{"type": "text", "text": _prompt("image", state)}, |
|
|
], |
|
|
} |
|
|
], |
|
|
) |
|
|
state["model_response"] = message.content[0].text |
|
|
return state |
|
|
|
|
|
|
|
|
def process_url(state: AnkiGeneratorState) -> AnkiGeneratorState: |
|
|
mime, raw = _url_fetch(state["url"]) |
|
|
if mime == "application/pdf" or state["url"].lower().endswith(".pdf"): |
|
|
tmp = Path(tempfile.mkstemp(suffix=".pdf")[1]) |
|
|
tmp.write_bytes(raw) |
|
|
state["pdf_file"] = tmp |
|
|
return process_pdf(state) |
|
|
if mime.startswith("image/"): |
|
|
ext = mimetypes.guess_extension(mime) or ".png" |
|
|
tmp = Path(tempfile.mkstemp(suffix=ext)[1]) |
|
|
tmp.write_bytes(raw) |
|
|
state["img_file"] = tmp |
|
|
return process_image(state) |
|
|
text = raw.decode("utf-8", errors="ignore")[:15000] |
|
|
message = client.messages.create( |
|
|
model=ANTHROPIC_MODEL, |
|
|
max_tokens=10240, |
|
|
messages=[ |
|
|
{"role": "user", "content": [{"type": "text", "text": text}, {"type": "text", "text": _prompt("webpage", state)}]}, |
|
|
], |
|
|
) |
|
|
state["model_response"] = message.content[0].text |
|
|
return state |
|
|
|
|
|
|
|
|
def parse_and_generate(state: AnkiGeneratorState) -> AnkiGeneratorState: |
|
|
print(state["model_response"]) |
|
|
xml_str = _extract_xml(state["model_response"]) |
|
|
cards = _parse_cards(xml_str) |
|
|
if not cards: |
|
|
raise ValueError("No cards extracted") |
|
|
source = ( |
|
|
state.get("pdf_file") and state["pdf_file"].stem |
|
|
) or ( |
|
|
state.get("img_file") and state["img_file"].stem |
|
|
) or re.sub(r"\W+", "_", state.get("url", "source")) |
|
|
state["result"] = { |
|
|
"deck": { |
|
|
"name": f"{source}_AnkiDeck", |
|
|
"cards": cards, |
|
|
"tags": [t.strip() for t in state.get("user_requirements", "").split(",") if t.strip()], |
|
|
} |
|
|
} |
|
|
return state |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
graph = StateGraph(AnkiGeneratorState) |
|
|
|
|
|
for n, fn in [ |
|
|
("get_input_type", get_input_type), |
|
|
("process_pdf", process_pdf), |
|
|
("process_image", process_image), |
|
|
("process_url", process_url), |
|
|
("parse_and_generate", parse_and_generate), |
|
|
]: |
|
|
graph.add_node(n, fn) |
|
|
|
|
|
|
|
|
graph.add_edge(START, "get_input_type") |
|
|
|
|
|
graph.add_conditional_edges( |
|
|
"get_input_type", |
|
|
lambda state: state["input_type"], |
|
|
{"pdf": "process_pdf", "image": "process_image", "url": "process_url"}, |
|
|
) |
|
|
|
|
|
for node in ["process_pdf", "process_image", "process_url"]: |
|
|
graph.add_edge(node, "parse_and_generate") |
|
|
|
|
|
graph.add_edge("parse_and_generate", END) |
|
|
|
|
|
app_graph = graph.compile() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_anki_deck(**kwargs) -> Dict[str, Any]: |
|
|
state: AnkiGeneratorState = kwargs |
|
|
final = app_graph.invoke(state) |
|
|
return final["result"] |
|
|
|
|
|
|
|
|
|