Rafael Poyiadzi
Move css and head back to gr.Blocks() for Gradio 5 compat
af2ebff
import ast
import atexit
import hashlib
import json
import os
import tempfile
import time
import uuid
from typing import Literal
import gradio as gr
import pandas as pd
import posthog
from pydantic import BaseModel, Field, create_model
_POSTHOG_KEY = os.environ.get("POSTHOG_KEY", "")
_POSTHOG_HOST = os.environ.get("POSTHOG_HOST", "")
_POSTHOG_ENABLED = bool(_POSTHOG_KEY and _POSTHOG_HOST)
posthog.api_key = _POSTHOG_KEY
posthog.project_api_key = _POSTHOG_KEY
posthog.host = _POSTHOG_HOST
posthog.debug = os.environ.get("POSTHOG_DEBUG", "").lower() in ("1", "true", "yes")
if _POSTHOG_ENABLED:
atexit.register(posthog.shutdown)
# NOTE: Frontend JS uses an anonymous distinct_id while server-side uses a hashed
# API key. PostHog won't auto-link these identities. To link them, the frontend
# would need to call posthog.identify() with the hashed key after the user submits.
POSTHOG_HEAD = f"""
<script>
!function(t,e){{var o,n,p,r;e.__SV||(window.posthog=e,e._i=[],e.init=function(i,s,a){{function g(t,e){{var o=e.split(".");2==o.length&&(t=t[o[0]],e=o[1]),t[e]=function(){{t.push([e].concat(Array.prototype.slice.call(arguments,0)))}}}}(p=t.createElement("script")).type="text/javascript",p.async=!0,p.src=s.api_host+"/static/array.js",(r=t.getElementsByTagName("script")[0]).parentNode.insertBefore(p,r);var u=e;for(void 0!==a?u=e[a]=[]:a="posthog",u.people=u.people||[],u.toString=function(t){{var e="posthog";return"posthog"!==a&&(e+="."+a),t||(e+=" (stub)"),e}},u.people.toString=function(){{return u.toString(1)+".people (stub)"}},o="init capture register register_once unregister opt_in_capturing opt_out_capturing has_opted_in_capturing has_opted_out_capturing identify alias people.set people.set_once set_config reset opt_in_capturing".split(" "),n=0;n<o.length;n++)g(u,o[n]);e._i.push([i,s,a])}},e.__SV=1)}}(document,window.posthog||[]);
posthog.init('{_POSTHOG_KEY}',{{api_host:'{_POSTHOG_HOST}', person_profiles: 'identified_only'}})
</script>
""" if _POSTHOG_ENABLED else ""
from everyrow.generated.client import AuthenticatedClient
from everyrow.ops import agent_map
from everyrow.session import create_session
from everyrow.task import EffortLevel
_EVERYROW_API_URL = os.environ.get("EVERYROW_API_URL", "https://everyrow.io/api/v0")
EFFORT_LEVELS = {
"Low": EffortLevel.LOW,
"Medium": EffortLevel.MEDIUM,
"High": EffortLevel.HIGH,
}
TYPE_MAP = {
"str": str,
"int": int,
"float": float,
"bool": bool,
}
FIELD_TYPES = ["str", "int", "float", "bool", "category"]
def parse_options_text(text: str) -> list[tuple[str, str]]:
"""Parse options textarea: one option per line, 'value: description' or just 'value'."""
result = []
for line in text.strip().splitlines():
line = line.strip()
if not line:
continue
if ":" in line:
value, desc = line.split(":", 1)
result.append((value.strip(), desc.strip()))
else:
result.append((line, ""))
return result
def build_response_model(fields_list: list[dict]) -> type[BaseModel] | None:
if not fields_list:
return None
model_fields = {}
for f in fields_list:
name = f.get("name", "").strip()
if not name:
continue
type_str = f.get("type", "str")
description = f.get("description", "")
options_text = f.get("options", "")
if type_str == "category":
parsed = parse_options_text(options_text) if isinstance(options_text, str) else []
if not parsed:
raise ValueError(
f"Category field '{name}' requires at least one option."
)
option_values = [v for v, _ in parsed]
desc_parts = [f"{v}: {d}" for v, d in parsed if d]
python_type = Literal[tuple(option_values)]
full_desc = description
if desc_parts:
full_desc += (" — " if full_desc else "") + "; ".join(desc_parts)
model_fields[name] = (python_type, Field(description=full_desc))
else:
python_type = TYPE_MAP.get(type_str)
if python_type is None:
raise ValueError(
f"Unknown type '{type_str}' for field '{name}'. "
f"Supported: {', '.join(TYPE_MAP)}, category"
)
if description:
model_fields[name] = (python_type, Field(description=description))
else:
model_fields[name] = (python_type, ...)
if not model_fields:
return None
return create_model("CustomResponse", **model_fields)
def fields_to_schema_json(fields_list: list[dict]) -> str:
if not fields_list:
return ""
clean = []
for f in fields_list:
name = f.get("name", "").strip()
if not name:
continue
entry = {
"name": name,
"type": f.get("type", "str"),
"description": f.get("description", ""),
}
if f.get("type") == "category" and f.get("options"):
opts = f["options"]
if isinstance(opts, str):
entry["options"] = parse_options_text(opts)
else:
entry["options"] = opts
clean.append(entry)
return json.dumps(clean, indent=2) if clean else ""
def _posthog_distinct_id(api_key: str) -> str:
"""Hash the API key to use as a stable distinct_id without storing the secret."""
return hashlib.sha256(api_key.encode()).hexdigest()[:16]
def _track_validation_error(api_key, session_id, error_msg):
"""Track validation errors in PostHog when we have an API key."""
if _POSTHOG_ENABLED and api_key:
posthog.capture(
distinct_id=_posthog_distinct_id(api_key),
event="validation_error",
properties={
"$session_id": session_id,
"error": error_msg,
"app": "everyrow-research-space",
},
)
def _schema_properties(fields_list):
"""Summarize the output schema for PostHog events."""
if not fields_list:
return {"field_count": 0}
type_counts = {}
for f in fields_list:
t = f.get("type", "str")
type_counts[t] = type_counts.get(t, 0) + 1
return {
"field_count": len(fields_list),
"field_types": type_counts,
"has_category": "category" in type_counts,
}
async def run_agent_map(api_key, file, query, effort_label, fields_list, session_id):
if not api_key:
raise gr.Error("Please enter your everyrow API key.")
if file is None:
_track_validation_error(api_key, session_id, "No file uploaded")
raise gr.Error("Please upload a CSV file.")
if not query.strip():
_track_validation_error(api_key, session_id, "Empty query")
raise gr.Error("Please enter a research query.")
df = pd.read_csv(file)
if df.empty:
_track_validation_error(api_key, session_id, "Empty CSV")
raise gr.Error("The uploaded CSV is empty.")
effort_level = EFFORT_LEVELS[effort_label]
distinct_id = _posthog_distinct_id(api_key)
try:
response_model = build_response_model(fields_list)
except ValueError as e:
_track_validation_error(api_key, session_id, str(e))
raise gr.Error(str(e))
kwargs = dict(task=query, input=df, effort_level=effort_level)
if response_model is not None:
kwargs["response_model"] = response_model
if _POSTHOG_ENABLED:
posthog.capture(
distinct_id=distinct_id,
event="agent_map_started",
properties={
"$session_id": session_id,
"effort_level": effort_label,
"row_count": len(df),
"column_count": len(df.columns),
**_schema_properties(fields_list),
"app": "everyrow-research-space",
},
)
client = AuthenticatedClient(
base_url=_EVERYROW_API_URL,
token=api_key,
raise_on_unexpected_status=True,
follow_redirects=True,
)
t0 = time.time()
async with create_session(client=client) as session:
result = await agent_map(session=session, **kwargs)
duration_s = round(time.time() - t0, 2)
if result.error:
if _POSTHOG_ENABLED:
posthog.capture(
distinct_id=distinct_id,
event="agent_map_failed",
properties={
"$session_id": session_id,
"error": str(result.error),
"duration_s": duration_s,
"app": "everyrow-research-space",
},
)
raise gr.Error(f"agent_map failed: {result.error}")
if _POSTHOG_ENABLED:
posthog.capture(
distinct_id=distinct_id,
event="agent_map_completed",
properties={
"$session_id": session_id,
"output_rows": len(result.data),
"output_columns": len(result.data.columns),
"duration_s": duration_s,
"app": "everyrow-research-space",
},
)
output_df = result.data
if "research" in output_df.columns:
cols = [c for c in output_df.columns if c != "research"] + ["research"]
output_df = output_df[cols]
tmp = tempfile.NamedTemporaryFile(
delete=False, suffix=".csv", prefix="everyrow_results_"
)
output_df.to_csv(tmp.name, index=False)
return output_df, tmp.name
def _parse_research_val(val):
"""Try to parse a research value into a dict."""
if isinstance(val, dict):
return val
if isinstance(val, str):
try:
parsed = ast.literal_eval(val)
if isinstance(parsed, dict):
return parsed
except (ValueError, SyntaxError):
pass
return None
def hide_research(df: pd.DataFrame) -> pd.DataFrame:
"""Default view: drop the research column."""
if "research" not in df.columns:
return df
return df.drop(columns=["research"])
def expand_research(df: pd.DataFrame) -> pd.DataFrame:
"""Expanded view: explode research dict into separate columns."""
if "research" not in df.columns:
return df
result = df.copy()
parsed_dicts = result["research"].apply(_parse_research_val)
has_dicts = parsed_dicts.notna().any()
if not has_dicts:
return result
expanded = pd.json_normalize(parsed_dicts.apply(lambda v: v if v is not None else {}))
expanded.index = result.index
# Prefix expanded columns with "research." to avoid clashes
expanded.columns = [f"research.{c}" for c in expanded.columns]
result = result.drop(columns=["research"])
result = pd.concat([result, expanded], axis=1)
return result
_CSS = ".error-box { background: #fee; border: 1px solid #c00; border-radius: 8px; padding: 12px; color: #900; }"
with gr.Blocks(
title="everyrow annotate – AI Data Annotation & Web Research",
css=_CSS,
head=POSTHOG_HEAD,
) as demo:
gr.Markdown(
"""
# 🏷️ everyrow annotate – AI Data Annotation & Web Research
**everyrow annotate** uses AI research agents to **label, verify, and enrich data** row by row using live web information.
Upload a CSV, describe what you want to annotate or find, and get structured results for every row — perfect for **data annotation, dataset enrichment, and research at scale**.
🔑 Get your API key at [everyrow.io/api-key](https://everyrow.io/api-key) ($20 free credit).
📖 Visit [everyrow.io/docs](https://everyrow.io/docs) for documentation.
"""
)
api_key = gr.Textbox(
label="everyrow API key",
type="password",
placeholder="sk-cho-...",
)
file = gr.File(label="Upload CSV", file_types=[".csv"])
preview_heading = gr.Markdown("### Preview", visible=False)
preview_table = gr.Dataframe(label="Input preview", visible=False)
query = gr.Textbox(
label="Annotation or research instruction",
placeholder="e.g. Label each company’s industry and whether it is B2B or B2C",
lines=3,
)
effort = gr.Dropdown(
choices=list(EFFORT_LEVELS.keys()),
value="Low",
label="Effort level",
interactive=True,
)
# --- Output fields form builder ---
gr.Markdown("### Output fields")
gr.Markdown(
"*Define the structured fields you want the AI to annotate for each row. Leave empty to return a single answer column.*"
)
session_id = gr.State(lambda: str(uuid.uuid4()))
fields_state = gr.State([])
@gr.render(inputs=fields_state)
def render_fields(fields):
n = len(fields)
name_boxes = []
type_dds = []
desc_boxes = []
rm_btns = []
opts_boxes = [] # textboxes for category options (one per category field, sparse)
opts_field_indices = [] # which field index each opts_box corresponds to
add_btn = gr.Button("+ Add output field", size="sm", key="add-btn")
if not fields:
gr.Markdown(
"*No output fields defined — default answer column will be used.*"
)
for i, f in enumerate(fields):
with gr.Row(equal_height=True):
nb = gr.Textbox(
label="Name",
value=f.get("name", ""),
scale=2,
min_width=100,
key=f"name-{i}",
)
td = gr.Dropdown(
choices=FIELD_TYPES,
value=f.get("type", "str"),
label="Type",
scale=1,
min_width=80,
interactive=True,
key=f"type-{i}",
)
db = gr.Textbox(
label="Description",
value=f.get("description", ""),
scale=3,
min_width=150,
key=f"desc-{i}",
)
rb = gr.Button("✕", size="sm", scale=0, min_width=40, key=f"rm-{i}")
name_boxes.append(nb)
type_dds.append(td)
desc_boxes.append(db)
rm_btns.append(rb)
if f.get("type") == "category":
ot = gr.Textbox(
label=f"Options for '{f.get('name') or 'field'}'",
value=f.get("options", ""),
placeholder="tech: Technology company\nfinance: Financial services\nhealthcare",
lines=3,
key=f"opts-{i}",
)
opts_boxes.append(ot)
opts_field_indices.append(i)
# --- Wire events (no change handlers on name/desc/opts textboxes) ---
# Helper to snapshot all text fields into state
def _snapshot_all(old_fields, names, descs, opts_vals):
"""Snapshot name/desc from components, options from opts textboxes."""
opts_iter = iter(opts_vals)
result = []
for j, f in enumerate(old_fields):
entry = {
"name": names[j] if j < len(names) else f.get("name", ""),
"type": f.get("type", "str"),
"description": descs[j] if j < len(descs) else f.get("description", ""),
"options": f.get("options", ""),
}
if f.get("type") == "category":
entry["options"] = next(opts_iter, f.get("options", ""))
result.append(entry)
return result
all_text_inputs = name_boxes + desc_boxes + opts_boxes
# Add field
def on_add(*args):
old_fields = args[0]
names = args[1:1 + n]
descs = args[1 + n:1 + 2 * n]
opts_vals = args[1 + 2 * n:]
new_fields = _snapshot_all(old_fields, names, descs, opts_vals)
new_fields.append({"name": "", "type": "str", "description": "", "options": ""})
return new_fields
add_btn.click(
on_add,
inputs=[fields_state] + all_text_inputs,
outputs=fields_state,
)
# Type change
for i in range(n):
def _make_type_handler(idx):
def handler(*args):
old_fields = args[0]
new_type = args[1]
names = args[2:2 + n]
descs = args[2 + n:2 + 2 * n]
opts_vals = args[2 + 2 * n:]
new_fields = _snapshot_all(old_fields, names, descs, opts_vals)
new_fields[idx]["type"] = new_type
if new_type != "category":
new_fields[idx]["options"] = ""
return new_fields
return handler
type_dds[i].change(
_make_type_handler(i),
inputs=[fields_state, type_dds[i]] + all_text_inputs,
outputs=fields_state,
)
# Remove field
for i in range(n):
def _make_remove_handler(idx):
def handler(*args):
old_fields = args[0]
names = args[1:1 + n]
descs = args[1 + n:1 + 2 * n]
opts_vals = args[1 + 2 * n:]
new_fields = _snapshot_all(old_fields, names, descs, opts_vals)
new_fields.pop(idx)
return new_fields
return handler
rm_btns[i].click(
_make_remove_handler(i),
inputs=[fields_state] + all_text_inputs,
outputs=fields_state,
)
# --- JSON preview and submit ---
with gr.Accordion("Advanced: edit JSON", open=False):
gr.Code(
value=fields_to_schema_json(fields),
language="json",
label="Schema JSON",
)
submit_btn = gr.Button("Run", variant="primary")
# Submit: read name/desc/opts from components for freshest values
async def on_submit(api_key_val, file_val, query_val, effort_val, session_id_val, runs_val, *dynamic_vals):
names = dynamic_vals[:n]
descs = dynamic_vals[n:2 * n]
opts_vals = list(dynamic_vals[2 * n:])
opts_iter = iter(opts_vals)
fields_list = []
for j in range(n):
entry = {
"name": names[j],
"type": fields[j].get("type", "str"),
"description": descs[j],
"options": "",
}
if fields[j].get("type") == "category":
entry["options"] = next(opts_iter, "")
fields_list.append(entry)
try:
result_df, download_path = await run_agent_map(
api_key_val, file_val, query_val, effort_val, fields_list, session_id_val
)
has_research = "research" in result_df.columns
run_label = f"Run {len(runs_val) + 1}: {query_val[:40].strip()}"
if len(query_val) > 40:
run_label += "..."
new_run = {
"label": run_label,
"full_df": result_df,
"download_path": download_path,
}
new_runs = runs_val + [new_run]
run_choices = [r["label"] for r in new_runs]
return (
gr.update(value="", visible=False),
result_df,
hide_research(result_df) if has_research else result_df,
gr.update(value=download_path, visible=True),
gr.update(value=False, visible=has_research),
new_runs,
gr.update(choices=run_choices, value=run_label, visible=True),
)
except gr.Error:
raise
except Exception as e:
return (
gr.update(
value=f"**Error:** {e}",
visible=True,
),
gr.update(),
gr.update(),
gr.update(),
gr.update(),
gr.update(),
gr.update(),
)
submit_btn.click(
fn=on_submit,
inputs=[api_key, file, query, effort, session_id, runs_state] + all_text_inputs,
outputs=[error_box, full_results_state, output_table, download_btn, research_toggle, runs_state, run_selector],
)
gr.Markdown("### Results")
error_box = gr.Markdown(visible=False, elem_classes=["error-box"])
full_results_state = gr.State(None)
runs_state = gr.State([])
run_selector = gr.Dropdown(
label="Run history",
choices=[],
visible=False,
interactive=True,
)
research_toggle = gr.Checkbox(
label="Show research details",
value=False,
visible=False,
)
output_table = gr.Dataframe(label="Results", wrap=True, max_chars=200)
download_btn = gr.File(label="Download CSV", visible=False)
def toggle_research(show_full, full_df):
if full_df is None:
return gr.update()
if show_full:
return expand_research(full_df)
return hide_research(full_df)
research_toggle.change(
toggle_research,
inputs=[research_toggle, full_results_state],
outputs=output_table,
)
def on_run_select(selected_label, runs, show_research):
if not selected_label or not runs:
return gr.update(), gr.update(), gr.update(), gr.update()
for r in runs:
if r["label"] == selected_label:
full_df = r["full_df"]
has_research = "research" in full_df.columns
if show_research:
display_df = expand_research(full_df)
else:
display_df = hide_research(full_df) if has_research else full_df
return (
full_df,
display_df,
gr.update(value=r["download_path"], visible=True),
gr.update(value=show_research, visible=has_research),
)
return gr.update(), gr.update(), gr.update(), gr.update()
run_selector.change(
on_run_select,
inputs=[run_selector, runs_state, research_toggle],
outputs=[full_results_state, output_table, download_btn, research_toggle],
)
def on_upload(file):
if file is None:
return gr.update(visible=False), gr.update(visible=False)
df = pd.read_csv(file)
return gr.update(visible=True), gr.update(value=df, visible=True)
file.change(
fn=on_upload,
inputs=[file],
outputs=[preview_heading, preview_table],
)
if __name__ == "__main__":
demo.launch()