import ast import atexit import hashlib import json import os import tempfile import time import uuid from typing import Literal import gradio as gr import pandas as pd import posthog from pydantic import BaseModel, Field, create_model _POSTHOG_KEY = os.environ.get("POSTHOG_KEY", "") _POSTHOG_HOST = os.environ.get("POSTHOG_HOST", "") _POSTHOG_ENABLED = bool(_POSTHOG_KEY and _POSTHOG_HOST) posthog.api_key = _POSTHOG_KEY posthog.project_api_key = _POSTHOG_KEY posthog.host = _POSTHOG_HOST posthog.debug = os.environ.get("POSTHOG_DEBUG", "").lower() in ("1", "true", "yes") if _POSTHOG_ENABLED: atexit.register(posthog.shutdown) # NOTE: Frontend JS uses an anonymous distinct_id while server-side uses a hashed # API key. PostHog won't auto-link these identities. To link them, the frontend # would need to call posthog.identify() with the hashed key after the user submits. POSTHOG_HEAD = f""" """ if _POSTHOG_ENABLED else "" from everyrow.generated.client import AuthenticatedClient from everyrow.ops import agent_map from everyrow.session import create_session from everyrow.task import EffortLevel _EVERYROW_API_URL = os.environ.get("EVERYROW_API_URL", "https://everyrow.io/api/v0") EFFORT_LEVELS = { "Low": EffortLevel.LOW, "Medium": EffortLevel.MEDIUM, "High": EffortLevel.HIGH, } TYPE_MAP = { "str": str, "int": int, "float": float, "bool": bool, } FIELD_TYPES = ["str", "int", "float", "bool", "category"] def parse_options_text(text: str) -> list[tuple[str, str]]: """Parse options textarea: one option per line, 'value: description' or just 'value'.""" result = [] for line in text.strip().splitlines(): line = line.strip() if not line: continue if ":" in line: value, desc = line.split(":", 1) result.append((value.strip(), desc.strip())) else: result.append((line, "")) return result def build_response_model(fields_list: list[dict]) -> type[BaseModel] | None: if not fields_list: return None model_fields = {} for f in fields_list: name = f.get("name", "").strip() if not name: continue type_str = f.get("type", "str") description = f.get("description", "") options_text = f.get("options", "") if type_str == "category": parsed = parse_options_text(options_text) if isinstance(options_text, str) else [] if not parsed: raise ValueError( f"Category field '{name}' requires at least one option." ) option_values = [v for v, _ in parsed] desc_parts = [f"{v}: {d}" for v, d in parsed if d] python_type = Literal[tuple(option_values)] full_desc = description if desc_parts: full_desc += (" β€” " if full_desc else "") + "; ".join(desc_parts) model_fields[name] = (python_type, Field(description=full_desc)) else: python_type = TYPE_MAP.get(type_str) if python_type is None: raise ValueError( f"Unknown type '{type_str}' for field '{name}'. " f"Supported: {', '.join(TYPE_MAP)}, category" ) if description: model_fields[name] = (python_type, Field(description=description)) else: model_fields[name] = (python_type, ...) if not model_fields: return None return create_model("CustomResponse", **model_fields) def fields_to_schema_json(fields_list: list[dict]) -> str: if not fields_list: return "" clean = [] for f in fields_list: name = f.get("name", "").strip() if not name: continue entry = { "name": name, "type": f.get("type", "str"), "description": f.get("description", ""), } if f.get("type") == "category" and f.get("options"): opts = f["options"] if isinstance(opts, str): entry["options"] = parse_options_text(opts) else: entry["options"] = opts clean.append(entry) return json.dumps(clean, indent=2) if clean else "" def _posthog_distinct_id(api_key: str) -> str: """Hash the API key to use as a stable distinct_id without storing the secret.""" return hashlib.sha256(api_key.encode()).hexdigest()[:16] def _track_validation_error(api_key, session_id, error_msg): """Track validation errors in PostHog when we have an API key.""" if _POSTHOG_ENABLED and api_key: posthog.capture( distinct_id=_posthog_distinct_id(api_key), event="validation_error", properties={ "$session_id": session_id, "error": error_msg, "app": "everyrow-research-space", }, ) def _schema_properties(fields_list): """Summarize the output schema for PostHog events.""" if not fields_list: return {"field_count": 0} type_counts = {} for f in fields_list: t = f.get("type", "str") type_counts[t] = type_counts.get(t, 0) + 1 return { "field_count": len(fields_list), "field_types": type_counts, "has_category": "category" in type_counts, } async def run_agent_map(api_key, file, query, effort_label, fields_list, session_id): if not api_key: raise gr.Error("Please enter your everyrow API key.") if file is None: _track_validation_error(api_key, session_id, "No file uploaded") raise gr.Error("Please upload a CSV file.") if not query.strip(): _track_validation_error(api_key, session_id, "Empty query") raise gr.Error("Please enter a research query.") df = pd.read_csv(file) if df.empty: _track_validation_error(api_key, session_id, "Empty CSV") raise gr.Error("The uploaded CSV is empty.") effort_level = EFFORT_LEVELS[effort_label] distinct_id = _posthog_distinct_id(api_key) try: response_model = build_response_model(fields_list) except ValueError as e: _track_validation_error(api_key, session_id, str(e)) raise gr.Error(str(e)) kwargs = dict(task=query, input=df, effort_level=effort_level) if response_model is not None: kwargs["response_model"] = response_model if _POSTHOG_ENABLED: posthog.capture( distinct_id=distinct_id, event="agent_map_started", properties={ "$session_id": session_id, "effort_level": effort_label, "row_count": len(df), "column_count": len(df.columns), **_schema_properties(fields_list), "app": "everyrow-research-space", }, ) client = AuthenticatedClient( base_url=_EVERYROW_API_URL, token=api_key, raise_on_unexpected_status=True, follow_redirects=True, ) t0 = time.time() async with create_session(client=client) as session: result = await agent_map(session=session, **kwargs) duration_s = round(time.time() - t0, 2) if result.error: if _POSTHOG_ENABLED: posthog.capture( distinct_id=distinct_id, event="agent_map_failed", properties={ "$session_id": session_id, "error": str(result.error), "duration_s": duration_s, "app": "everyrow-research-space", }, ) raise gr.Error(f"agent_map failed: {result.error}") if _POSTHOG_ENABLED: posthog.capture( distinct_id=distinct_id, event="agent_map_completed", properties={ "$session_id": session_id, "output_rows": len(result.data), "output_columns": len(result.data.columns), "duration_s": duration_s, "app": "everyrow-research-space", }, ) output_df = result.data if "research" in output_df.columns: cols = [c for c in output_df.columns if c != "research"] + ["research"] output_df = output_df[cols] tmp = tempfile.NamedTemporaryFile( delete=False, suffix=".csv", prefix="everyrow_results_" ) output_df.to_csv(tmp.name, index=False) return output_df, tmp.name def _parse_research_val(val): """Try to parse a research value into a dict.""" if isinstance(val, dict): return val if isinstance(val, str): try: parsed = ast.literal_eval(val) if isinstance(parsed, dict): return parsed except (ValueError, SyntaxError): pass return None def hide_research(df: pd.DataFrame) -> pd.DataFrame: """Default view: drop the research column.""" if "research" not in df.columns: return df return df.drop(columns=["research"]) def expand_research(df: pd.DataFrame) -> pd.DataFrame: """Expanded view: explode research dict into separate columns.""" if "research" not in df.columns: return df result = df.copy() parsed_dicts = result["research"].apply(_parse_research_val) has_dicts = parsed_dicts.notna().any() if not has_dicts: return result expanded = pd.json_normalize(parsed_dicts.apply(lambda v: v if v is not None else {})) expanded.index = result.index # Prefix expanded columns with "research." to avoid clashes expanded.columns = [f"research.{c}" for c in expanded.columns] result = result.drop(columns=["research"]) result = pd.concat([result, expanded], axis=1) return result _CSS = ".error-box { background: #fee; border: 1px solid #c00; border-radius: 8px; padding: 12px; color: #900; }" with gr.Blocks( title="everyrow annotate – AI Data Annotation & Web Research", css=_CSS, head=POSTHOG_HEAD, ) as demo: gr.Markdown( """ # 🏷️ everyrow annotate – AI Data Annotation & Web Research **everyrow annotate** uses AI research agents to **label, verify, and enrich data** row by row using live web information. Upload a CSV, describe what you want to annotate or find, and get structured results for every row β€” perfect for **data annotation, dataset enrichment, and research at scale**. πŸ”‘ Get your API key at [everyrow.io/api-key](https://everyrow.io/api-key) ($20 free credit). πŸ“– Visit [everyrow.io/docs](https://everyrow.io/docs) for documentation. """ ) api_key = gr.Textbox( label="everyrow API key", type="password", placeholder="sk-cho-...", ) file = gr.File(label="Upload CSV", file_types=[".csv"]) preview_heading = gr.Markdown("### Preview", visible=False) preview_table = gr.Dataframe(label="Input preview", visible=False) query = gr.Textbox( label="Annotation or research instruction", placeholder="e.g. Label each company’s industry and whether it is B2B or B2C", lines=3, ) effort = gr.Dropdown( choices=list(EFFORT_LEVELS.keys()), value="Low", label="Effort level", interactive=True, ) # --- Output fields form builder --- gr.Markdown("### Output fields") gr.Markdown( "*Define the structured fields you want the AI to annotate for each row. Leave empty to return a single answer column.*" ) session_id = gr.State(lambda: str(uuid.uuid4())) fields_state = gr.State([]) @gr.render(inputs=fields_state) def render_fields(fields): n = len(fields) name_boxes = [] type_dds = [] desc_boxes = [] rm_btns = [] opts_boxes = [] # textboxes for category options (one per category field, sparse) opts_field_indices = [] # which field index each opts_box corresponds to add_btn = gr.Button("+ Add output field", size="sm", key="add-btn") if not fields: gr.Markdown( "*No output fields defined β€” default answer column will be used.*" ) for i, f in enumerate(fields): with gr.Row(equal_height=True): nb = gr.Textbox( label="Name", value=f.get("name", ""), scale=2, min_width=100, key=f"name-{i}", ) td = gr.Dropdown( choices=FIELD_TYPES, value=f.get("type", "str"), label="Type", scale=1, min_width=80, interactive=True, key=f"type-{i}", ) db = gr.Textbox( label="Description", value=f.get("description", ""), scale=3, min_width=150, key=f"desc-{i}", ) rb = gr.Button("βœ•", size="sm", scale=0, min_width=40, key=f"rm-{i}") name_boxes.append(nb) type_dds.append(td) desc_boxes.append(db) rm_btns.append(rb) if f.get("type") == "category": ot = gr.Textbox( label=f"Options for '{f.get('name') or 'field'}'", value=f.get("options", ""), placeholder="tech: Technology company\nfinance: Financial services\nhealthcare", lines=3, key=f"opts-{i}", ) opts_boxes.append(ot) opts_field_indices.append(i) # --- Wire events (no change handlers on name/desc/opts textboxes) --- # Helper to snapshot all text fields into state def _snapshot_all(old_fields, names, descs, opts_vals): """Snapshot name/desc from components, options from opts textboxes.""" opts_iter = iter(opts_vals) result = [] for j, f in enumerate(old_fields): entry = { "name": names[j] if j < len(names) else f.get("name", ""), "type": f.get("type", "str"), "description": descs[j] if j < len(descs) else f.get("description", ""), "options": f.get("options", ""), } if f.get("type") == "category": entry["options"] = next(opts_iter, f.get("options", "")) result.append(entry) return result all_text_inputs = name_boxes + desc_boxes + opts_boxes # Add field def on_add(*args): old_fields = args[0] names = args[1:1 + n] descs = args[1 + n:1 + 2 * n] opts_vals = args[1 + 2 * n:] new_fields = _snapshot_all(old_fields, names, descs, opts_vals) new_fields.append({"name": "", "type": "str", "description": "", "options": ""}) return new_fields add_btn.click( on_add, inputs=[fields_state] + all_text_inputs, outputs=fields_state, ) # Type change for i in range(n): def _make_type_handler(idx): def handler(*args): old_fields = args[0] new_type = args[1] names = args[2:2 + n] descs = args[2 + n:2 + 2 * n] opts_vals = args[2 + 2 * n:] new_fields = _snapshot_all(old_fields, names, descs, opts_vals) new_fields[idx]["type"] = new_type if new_type != "category": new_fields[idx]["options"] = "" return new_fields return handler type_dds[i].change( _make_type_handler(i), inputs=[fields_state, type_dds[i]] + all_text_inputs, outputs=fields_state, ) # Remove field for i in range(n): def _make_remove_handler(idx): def handler(*args): old_fields = args[0] names = args[1:1 + n] descs = args[1 + n:1 + 2 * n] opts_vals = args[1 + 2 * n:] new_fields = _snapshot_all(old_fields, names, descs, opts_vals) new_fields.pop(idx) return new_fields return handler rm_btns[i].click( _make_remove_handler(i), inputs=[fields_state] + all_text_inputs, outputs=fields_state, ) # --- JSON preview and submit --- with gr.Accordion("Advanced: edit JSON", open=False): gr.Code( value=fields_to_schema_json(fields), language="json", label="Schema JSON", ) submit_btn = gr.Button("Run", variant="primary") # Submit: read name/desc/opts from components for freshest values async def on_submit(api_key_val, file_val, query_val, effort_val, session_id_val, runs_val, *dynamic_vals): names = dynamic_vals[:n] descs = dynamic_vals[n:2 * n] opts_vals = list(dynamic_vals[2 * n:]) opts_iter = iter(opts_vals) fields_list = [] for j in range(n): entry = { "name": names[j], "type": fields[j].get("type", "str"), "description": descs[j], "options": "", } if fields[j].get("type") == "category": entry["options"] = next(opts_iter, "") fields_list.append(entry) try: result_df, download_path = await run_agent_map( api_key_val, file_val, query_val, effort_val, fields_list, session_id_val ) has_research = "research" in result_df.columns run_label = f"Run {len(runs_val) + 1}: {query_val[:40].strip()}" if len(query_val) > 40: run_label += "..." new_run = { "label": run_label, "full_df": result_df, "download_path": download_path, } new_runs = runs_val + [new_run] run_choices = [r["label"] for r in new_runs] return ( gr.update(value="", visible=False), result_df, hide_research(result_df) if has_research else result_df, gr.update(value=download_path, visible=True), gr.update(value=False, visible=has_research), new_runs, gr.update(choices=run_choices, value=run_label, visible=True), ) except gr.Error: raise except Exception as e: return ( gr.update( value=f"**Error:** {e}", visible=True, ), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), gr.update(), ) submit_btn.click( fn=on_submit, inputs=[api_key, file, query, effort, session_id, runs_state] + all_text_inputs, outputs=[error_box, full_results_state, output_table, download_btn, research_toggle, runs_state, run_selector], ) gr.Markdown("### Results") error_box = gr.Markdown(visible=False, elem_classes=["error-box"]) full_results_state = gr.State(None) runs_state = gr.State([]) run_selector = gr.Dropdown( label="Run history", choices=[], visible=False, interactive=True, ) research_toggle = gr.Checkbox( label="Show research details", value=False, visible=False, ) output_table = gr.Dataframe(label="Results", wrap=True, max_chars=200) download_btn = gr.File(label="Download CSV", visible=False) def toggle_research(show_full, full_df): if full_df is None: return gr.update() if show_full: return expand_research(full_df) return hide_research(full_df) research_toggle.change( toggle_research, inputs=[research_toggle, full_results_state], outputs=output_table, ) def on_run_select(selected_label, runs, show_research): if not selected_label or not runs: return gr.update(), gr.update(), gr.update(), gr.update() for r in runs: if r["label"] == selected_label: full_df = r["full_df"] has_research = "research" in full_df.columns if show_research: display_df = expand_research(full_df) else: display_df = hide_research(full_df) if has_research else full_df return ( full_df, display_df, gr.update(value=r["download_path"], visible=True), gr.update(value=show_research, visible=has_research), ) return gr.update(), gr.update(), gr.update(), gr.update() run_selector.change( on_run_select, inputs=[run_selector, runs_state, research_toggle], outputs=[full_results_state, output_table, download_btn, research_toggle], ) def on_upload(file): if file is None: return gr.update(visible=False), gr.update(visible=False) df = pd.read_csv(file) return gr.update(visible=True), gr.update(value=df, visible=True) file.change( fn=on_upload, inputs=[file], outputs=[preview_heading, preview_table], ) if __name__ == "__main__": demo.launch()