Spaces:
Sleeping
Sleeping
| import ast | |
| import atexit | |
| import hashlib | |
| import json | |
| import os | |
| import tempfile | |
| import time | |
| import uuid | |
| from typing import Literal | |
| import gradio as gr | |
| import pandas as pd | |
| import posthog | |
| from pydantic import BaseModel, Field, create_model | |
| _POSTHOG_KEY = os.environ.get("POSTHOG_KEY", "") | |
| _POSTHOG_HOST = os.environ.get("POSTHOG_HOST", "") | |
| _POSTHOG_ENABLED = bool(_POSTHOG_KEY and _POSTHOG_HOST) | |
| posthog.api_key = _POSTHOG_KEY | |
| posthog.project_api_key = _POSTHOG_KEY | |
| posthog.host = _POSTHOG_HOST | |
| posthog.debug = os.environ.get("POSTHOG_DEBUG", "").lower() in ("1", "true", "yes") | |
| if _POSTHOG_ENABLED: | |
| atexit.register(posthog.shutdown) | |
| # NOTE: Frontend JS uses an anonymous distinct_id while server-side uses a hashed | |
| # API key. PostHog won't auto-link these identities. To link them, the frontend | |
| # would need to call posthog.identify() with the hashed key after the user submits. | |
| POSTHOG_HEAD = f""" | |
| <script> | |
| !function(t,e){{var o,n,p,r;e.__SV||(window.posthog=e,e._i=[],e.init=function(i,s,a){{function g(t,e){{var o=e.split(".");2==o.length&&(t=t[o[0]],e=o[1]),t[e]=function(){{t.push([e].concat(Array.prototype.slice.call(arguments,0)))}}}}(p=t.createElement("script")).type="text/javascript",p.async=!0,p.src=s.api_host+"/static/array.js",(r=t.getElementsByTagName("script")[0]).parentNode.insertBefore(p,r);var u=e;for(void 0!==a?u=e[a]=[]:a="posthog",u.people=u.people||[],u.toString=function(t){{var e="posthog";return"posthog"!==a&&(e+="."+a),t||(e+=" (stub)"),e}},u.people.toString=function(){{return u.toString(1)+".people (stub)"}},o="init capture register register_once unregister opt_in_capturing opt_out_capturing has_opted_in_capturing has_opted_out_capturing identify alias people.set people.set_once set_config reset opt_in_capturing".split(" "),n=0;n<o.length;n++)g(u,o[n]);e._i.push([i,s,a])}},e.__SV=1)}}(document,window.posthog||[]); | |
| posthog.init('{_POSTHOG_KEY}',{{api_host:'{_POSTHOG_HOST}', person_profiles: 'identified_only'}}) | |
| </script> | |
| """ if _POSTHOG_ENABLED else "" | |
| from everyrow.generated.client import AuthenticatedClient | |
| from everyrow.ops import agent_map | |
| from everyrow.session import create_session | |
| from everyrow.task import EffortLevel | |
| _EVERYROW_API_URL = os.environ.get("EVERYROW_API_URL", "https://everyrow.io/api/v0") | |
| EFFORT_LEVELS = { | |
| "Low": EffortLevel.LOW, | |
| "Medium": EffortLevel.MEDIUM, | |
| "High": EffortLevel.HIGH, | |
| } | |
| TYPE_MAP = { | |
| "str": str, | |
| "int": int, | |
| "float": float, | |
| "bool": bool, | |
| } | |
| FIELD_TYPES = ["str", "int", "float", "bool", "category"] | |
| def parse_options_text(text: str) -> list[tuple[str, str]]: | |
| """Parse options textarea: one option per line, 'value: description' or just 'value'.""" | |
| result = [] | |
| for line in text.strip().splitlines(): | |
| line = line.strip() | |
| if not line: | |
| continue | |
| if ":" in line: | |
| value, desc = line.split(":", 1) | |
| result.append((value.strip(), desc.strip())) | |
| else: | |
| result.append((line, "")) | |
| return result | |
| def build_response_model(fields_list: list[dict]) -> type[BaseModel] | None: | |
| if not fields_list: | |
| return None | |
| model_fields = {} | |
| for f in fields_list: | |
| name = f.get("name", "").strip() | |
| if not name: | |
| continue | |
| type_str = f.get("type", "str") | |
| description = f.get("description", "") | |
| options_text = f.get("options", "") | |
| if type_str == "category": | |
| parsed = parse_options_text(options_text) if isinstance(options_text, str) else [] | |
| if not parsed: | |
| raise ValueError( | |
| f"Category field '{name}' requires at least one option." | |
| ) | |
| option_values = [v for v, _ in parsed] | |
| desc_parts = [f"{v}: {d}" for v, d in parsed if d] | |
| python_type = Literal[tuple(option_values)] | |
| full_desc = description | |
| if desc_parts: | |
| full_desc += (" — " if full_desc else "") + "; ".join(desc_parts) | |
| model_fields[name] = (python_type, Field(description=full_desc)) | |
| else: | |
| python_type = TYPE_MAP.get(type_str) | |
| if python_type is None: | |
| raise ValueError( | |
| f"Unknown type '{type_str}' for field '{name}'. " | |
| f"Supported: {', '.join(TYPE_MAP)}, category" | |
| ) | |
| if description: | |
| model_fields[name] = (python_type, Field(description=description)) | |
| else: | |
| model_fields[name] = (python_type, ...) | |
| if not model_fields: | |
| return None | |
| return create_model("CustomResponse", **model_fields) | |
| def fields_to_schema_json(fields_list: list[dict]) -> str: | |
| if not fields_list: | |
| return "" | |
| clean = [] | |
| for f in fields_list: | |
| name = f.get("name", "").strip() | |
| if not name: | |
| continue | |
| entry = { | |
| "name": name, | |
| "type": f.get("type", "str"), | |
| "description": f.get("description", ""), | |
| } | |
| if f.get("type") == "category" and f.get("options"): | |
| opts = f["options"] | |
| if isinstance(opts, str): | |
| entry["options"] = parse_options_text(opts) | |
| else: | |
| entry["options"] = opts | |
| clean.append(entry) | |
| return json.dumps(clean, indent=2) if clean else "" | |
| def _posthog_distinct_id(api_key: str) -> str: | |
| """Hash the API key to use as a stable distinct_id without storing the secret.""" | |
| return hashlib.sha256(api_key.encode()).hexdigest()[:16] | |
| def _track_validation_error(api_key, session_id, error_msg): | |
| """Track validation errors in PostHog when we have an API key.""" | |
| if _POSTHOG_ENABLED and api_key: | |
| posthog.capture( | |
| distinct_id=_posthog_distinct_id(api_key), | |
| event="validation_error", | |
| properties={ | |
| "$session_id": session_id, | |
| "error": error_msg, | |
| "app": "everyrow-research-space", | |
| }, | |
| ) | |
| def _schema_properties(fields_list): | |
| """Summarize the output schema for PostHog events.""" | |
| if not fields_list: | |
| return {"field_count": 0} | |
| type_counts = {} | |
| for f in fields_list: | |
| t = f.get("type", "str") | |
| type_counts[t] = type_counts.get(t, 0) + 1 | |
| return { | |
| "field_count": len(fields_list), | |
| "field_types": type_counts, | |
| "has_category": "category" in type_counts, | |
| } | |
| async def run_agent_map(api_key, file, query, effort_label, fields_list, session_id): | |
| if not api_key: | |
| raise gr.Error("Please enter your everyrow API key.") | |
| if file is None: | |
| _track_validation_error(api_key, session_id, "No file uploaded") | |
| raise gr.Error("Please upload a CSV file.") | |
| if not query.strip(): | |
| _track_validation_error(api_key, session_id, "Empty query") | |
| raise gr.Error("Please enter a research query.") | |
| df = pd.read_csv(file) | |
| if df.empty: | |
| _track_validation_error(api_key, session_id, "Empty CSV") | |
| raise gr.Error("The uploaded CSV is empty.") | |
| effort_level = EFFORT_LEVELS[effort_label] | |
| distinct_id = _posthog_distinct_id(api_key) | |
| try: | |
| response_model = build_response_model(fields_list) | |
| except ValueError as e: | |
| _track_validation_error(api_key, session_id, str(e)) | |
| raise gr.Error(str(e)) | |
| kwargs = dict(task=query, input=df, effort_level=effort_level) | |
| if response_model is not None: | |
| kwargs["response_model"] = response_model | |
| if _POSTHOG_ENABLED: | |
| posthog.capture( | |
| distinct_id=distinct_id, | |
| event="agent_map_started", | |
| properties={ | |
| "$session_id": session_id, | |
| "effort_level": effort_label, | |
| "row_count": len(df), | |
| "column_count": len(df.columns), | |
| **_schema_properties(fields_list), | |
| "app": "everyrow-research-space", | |
| }, | |
| ) | |
| client = AuthenticatedClient( | |
| base_url=_EVERYROW_API_URL, | |
| token=api_key, | |
| raise_on_unexpected_status=True, | |
| follow_redirects=True, | |
| ) | |
| t0 = time.time() | |
| async with create_session(client=client) as session: | |
| result = await agent_map(session=session, **kwargs) | |
| duration_s = round(time.time() - t0, 2) | |
| if result.error: | |
| if _POSTHOG_ENABLED: | |
| posthog.capture( | |
| distinct_id=distinct_id, | |
| event="agent_map_failed", | |
| properties={ | |
| "$session_id": session_id, | |
| "error": str(result.error), | |
| "duration_s": duration_s, | |
| "app": "everyrow-research-space", | |
| }, | |
| ) | |
| raise gr.Error(f"agent_map failed: {result.error}") | |
| if _POSTHOG_ENABLED: | |
| posthog.capture( | |
| distinct_id=distinct_id, | |
| event="agent_map_completed", | |
| properties={ | |
| "$session_id": session_id, | |
| "output_rows": len(result.data), | |
| "output_columns": len(result.data.columns), | |
| "duration_s": duration_s, | |
| "app": "everyrow-research-space", | |
| }, | |
| ) | |
| output_df = result.data | |
| if "research" in output_df.columns: | |
| cols = [c for c in output_df.columns if c != "research"] + ["research"] | |
| output_df = output_df[cols] | |
| tmp = tempfile.NamedTemporaryFile( | |
| delete=False, suffix=".csv", prefix="everyrow_results_" | |
| ) | |
| output_df.to_csv(tmp.name, index=False) | |
| return output_df, tmp.name | |
| def _parse_research_val(val): | |
| """Try to parse a research value into a dict.""" | |
| if isinstance(val, dict): | |
| return val | |
| if isinstance(val, str): | |
| try: | |
| parsed = ast.literal_eval(val) | |
| if isinstance(parsed, dict): | |
| return parsed | |
| except (ValueError, SyntaxError): | |
| pass | |
| return None | |
| def hide_research(df: pd.DataFrame) -> pd.DataFrame: | |
| """Default view: drop the research column.""" | |
| if "research" not in df.columns: | |
| return df | |
| return df.drop(columns=["research"]) | |
| def expand_research(df: pd.DataFrame) -> pd.DataFrame: | |
| """Expanded view: explode research dict into separate columns.""" | |
| if "research" not in df.columns: | |
| return df | |
| result = df.copy() | |
| parsed_dicts = result["research"].apply(_parse_research_val) | |
| has_dicts = parsed_dicts.notna().any() | |
| if not has_dicts: | |
| return result | |
| expanded = pd.json_normalize(parsed_dicts.apply(lambda v: v if v is not None else {})) | |
| expanded.index = result.index | |
| # Prefix expanded columns with "research." to avoid clashes | |
| expanded.columns = [f"research.{c}" for c in expanded.columns] | |
| result = result.drop(columns=["research"]) | |
| result = pd.concat([result, expanded], axis=1) | |
| return result | |
| _CSS = ".error-box { background: #fee; border: 1px solid #c00; border-radius: 8px; padding: 12px; color: #900; }" | |
| with gr.Blocks( | |
| title="everyrow annotate – AI Data Annotation & Web Research", | |
| css=_CSS, | |
| head=POSTHOG_HEAD, | |
| ) as demo: | |
| gr.Markdown( | |
| """ | |
| # 🏷️ everyrow annotate – AI Data Annotation & Web Research | |
| **everyrow annotate** uses AI research agents to **label, verify, and enrich data** row by row using live web information. | |
| Upload a CSV, describe what you want to annotate or find, and get structured results for every row — perfect for **data annotation, dataset enrichment, and research at scale**. | |
| 🔑 Get your API key at [everyrow.io/api-key](https://everyrow.io/api-key) ($20 free credit). | |
| 📖 Visit [everyrow.io/docs](https://everyrow.io/docs) for documentation. | |
| """ | |
| ) | |
| api_key = gr.Textbox( | |
| label="everyrow API key", | |
| type="password", | |
| placeholder="sk-cho-...", | |
| ) | |
| file = gr.File(label="Upload CSV", file_types=[".csv"]) | |
| preview_heading = gr.Markdown("### Preview", visible=False) | |
| preview_table = gr.Dataframe(label="Input preview", visible=False) | |
| query = gr.Textbox( | |
| label="Annotation or research instruction", | |
| placeholder="e.g. Label each company’s industry and whether it is B2B or B2C", | |
| lines=3, | |
| ) | |
| effort = gr.Dropdown( | |
| choices=list(EFFORT_LEVELS.keys()), | |
| value="Low", | |
| label="Effort level", | |
| interactive=True, | |
| ) | |
| # --- Output fields form builder --- | |
| gr.Markdown("### Output fields") | |
| gr.Markdown( | |
| "*Define the structured fields you want the AI to annotate for each row. Leave empty to return a single answer column.*" | |
| ) | |
| session_id = gr.State(lambda: str(uuid.uuid4())) | |
| fields_state = gr.State([]) | |
| def render_fields(fields): | |
| n = len(fields) | |
| name_boxes = [] | |
| type_dds = [] | |
| desc_boxes = [] | |
| rm_btns = [] | |
| opts_boxes = [] # textboxes for category options (one per category field, sparse) | |
| opts_field_indices = [] # which field index each opts_box corresponds to | |
| add_btn = gr.Button("+ Add output field", size="sm", key="add-btn") | |
| if not fields: | |
| gr.Markdown( | |
| "*No output fields defined — default answer column will be used.*" | |
| ) | |
| for i, f in enumerate(fields): | |
| with gr.Row(equal_height=True): | |
| nb = gr.Textbox( | |
| label="Name", | |
| value=f.get("name", ""), | |
| scale=2, | |
| min_width=100, | |
| key=f"name-{i}", | |
| ) | |
| td = gr.Dropdown( | |
| choices=FIELD_TYPES, | |
| value=f.get("type", "str"), | |
| label="Type", | |
| scale=1, | |
| min_width=80, | |
| interactive=True, | |
| key=f"type-{i}", | |
| ) | |
| db = gr.Textbox( | |
| label="Description", | |
| value=f.get("description", ""), | |
| scale=3, | |
| min_width=150, | |
| key=f"desc-{i}", | |
| ) | |
| rb = gr.Button("✕", size="sm", scale=0, min_width=40, key=f"rm-{i}") | |
| name_boxes.append(nb) | |
| type_dds.append(td) | |
| desc_boxes.append(db) | |
| rm_btns.append(rb) | |
| if f.get("type") == "category": | |
| ot = gr.Textbox( | |
| label=f"Options for '{f.get('name') or 'field'}'", | |
| value=f.get("options", ""), | |
| placeholder="tech: Technology company\nfinance: Financial services\nhealthcare", | |
| lines=3, | |
| key=f"opts-{i}", | |
| ) | |
| opts_boxes.append(ot) | |
| opts_field_indices.append(i) | |
| # --- Wire events (no change handlers on name/desc/opts textboxes) --- | |
| # Helper to snapshot all text fields into state | |
| def _snapshot_all(old_fields, names, descs, opts_vals): | |
| """Snapshot name/desc from components, options from opts textboxes.""" | |
| opts_iter = iter(opts_vals) | |
| result = [] | |
| for j, f in enumerate(old_fields): | |
| entry = { | |
| "name": names[j] if j < len(names) else f.get("name", ""), | |
| "type": f.get("type", "str"), | |
| "description": descs[j] if j < len(descs) else f.get("description", ""), | |
| "options": f.get("options", ""), | |
| } | |
| if f.get("type") == "category": | |
| entry["options"] = next(opts_iter, f.get("options", "")) | |
| result.append(entry) | |
| return result | |
| all_text_inputs = name_boxes + desc_boxes + opts_boxes | |
| # Add field | |
| def on_add(*args): | |
| old_fields = args[0] | |
| names = args[1:1 + n] | |
| descs = args[1 + n:1 + 2 * n] | |
| opts_vals = args[1 + 2 * n:] | |
| new_fields = _snapshot_all(old_fields, names, descs, opts_vals) | |
| new_fields.append({"name": "", "type": "str", "description": "", "options": ""}) | |
| return new_fields | |
| add_btn.click( | |
| on_add, | |
| inputs=[fields_state] + all_text_inputs, | |
| outputs=fields_state, | |
| ) | |
| # Type change | |
| for i in range(n): | |
| def _make_type_handler(idx): | |
| def handler(*args): | |
| old_fields = args[0] | |
| new_type = args[1] | |
| names = args[2:2 + n] | |
| descs = args[2 + n:2 + 2 * n] | |
| opts_vals = args[2 + 2 * n:] | |
| new_fields = _snapshot_all(old_fields, names, descs, opts_vals) | |
| new_fields[idx]["type"] = new_type | |
| if new_type != "category": | |
| new_fields[idx]["options"] = "" | |
| return new_fields | |
| return handler | |
| type_dds[i].change( | |
| _make_type_handler(i), | |
| inputs=[fields_state, type_dds[i]] + all_text_inputs, | |
| outputs=fields_state, | |
| ) | |
| # Remove field | |
| for i in range(n): | |
| def _make_remove_handler(idx): | |
| def handler(*args): | |
| old_fields = args[0] | |
| names = args[1:1 + n] | |
| descs = args[1 + n:1 + 2 * n] | |
| opts_vals = args[1 + 2 * n:] | |
| new_fields = _snapshot_all(old_fields, names, descs, opts_vals) | |
| new_fields.pop(idx) | |
| return new_fields | |
| return handler | |
| rm_btns[i].click( | |
| _make_remove_handler(i), | |
| inputs=[fields_state] + all_text_inputs, | |
| outputs=fields_state, | |
| ) | |
| # --- JSON preview and submit --- | |
| with gr.Accordion("Advanced: edit JSON", open=False): | |
| gr.Code( | |
| value=fields_to_schema_json(fields), | |
| language="json", | |
| label="Schema JSON", | |
| ) | |
| submit_btn = gr.Button("Run", variant="primary") | |
| # Submit: read name/desc/opts from components for freshest values | |
| async def on_submit(api_key_val, file_val, query_val, effort_val, session_id_val, runs_val, *dynamic_vals): | |
| names = dynamic_vals[:n] | |
| descs = dynamic_vals[n:2 * n] | |
| opts_vals = list(dynamic_vals[2 * n:]) | |
| opts_iter = iter(opts_vals) | |
| fields_list = [] | |
| for j in range(n): | |
| entry = { | |
| "name": names[j], | |
| "type": fields[j].get("type", "str"), | |
| "description": descs[j], | |
| "options": "", | |
| } | |
| if fields[j].get("type") == "category": | |
| entry["options"] = next(opts_iter, "") | |
| fields_list.append(entry) | |
| try: | |
| result_df, download_path = await run_agent_map( | |
| api_key_val, file_val, query_val, effort_val, fields_list, session_id_val | |
| ) | |
| has_research = "research" in result_df.columns | |
| run_label = f"Run {len(runs_val) + 1}: {query_val[:40].strip()}" | |
| if len(query_val) > 40: | |
| run_label += "..." | |
| new_run = { | |
| "label": run_label, | |
| "full_df": result_df, | |
| "download_path": download_path, | |
| } | |
| new_runs = runs_val + [new_run] | |
| run_choices = [r["label"] for r in new_runs] | |
| return ( | |
| gr.update(value="", visible=False), | |
| result_df, | |
| hide_research(result_df) if has_research else result_df, | |
| gr.update(value=download_path, visible=True), | |
| gr.update(value=False, visible=has_research), | |
| new_runs, | |
| gr.update(choices=run_choices, value=run_label, visible=True), | |
| ) | |
| except gr.Error: | |
| raise | |
| except Exception as e: | |
| return ( | |
| gr.update( | |
| value=f"**Error:** {e}", | |
| visible=True, | |
| ), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| gr.update(), | |
| ) | |
| submit_btn.click( | |
| fn=on_submit, | |
| inputs=[api_key, file, query, effort, session_id, runs_state] + all_text_inputs, | |
| outputs=[error_box, full_results_state, output_table, download_btn, research_toggle, runs_state, run_selector], | |
| ) | |
| gr.Markdown("### Results") | |
| error_box = gr.Markdown(visible=False, elem_classes=["error-box"]) | |
| full_results_state = gr.State(None) | |
| runs_state = gr.State([]) | |
| run_selector = gr.Dropdown( | |
| label="Run history", | |
| choices=[], | |
| visible=False, | |
| interactive=True, | |
| ) | |
| research_toggle = gr.Checkbox( | |
| label="Show research details", | |
| value=False, | |
| visible=False, | |
| ) | |
| output_table = gr.Dataframe(label="Results", wrap=True, max_chars=200) | |
| download_btn = gr.File(label="Download CSV", visible=False) | |
| def toggle_research(show_full, full_df): | |
| if full_df is None: | |
| return gr.update() | |
| if show_full: | |
| return expand_research(full_df) | |
| return hide_research(full_df) | |
| research_toggle.change( | |
| toggle_research, | |
| inputs=[research_toggle, full_results_state], | |
| outputs=output_table, | |
| ) | |
| def on_run_select(selected_label, runs, show_research): | |
| if not selected_label or not runs: | |
| return gr.update(), gr.update(), gr.update(), gr.update() | |
| for r in runs: | |
| if r["label"] == selected_label: | |
| full_df = r["full_df"] | |
| has_research = "research" in full_df.columns | |
| if show_research: | |
| display_df = expand_research(full_df) | |
| else: | |
| display_df = hide_research(full_df) if has_research else full_df | |
| return ( | |
| full_df, | |
| display_df, | |
| gr.update(value=r["download_path"], visible=True), | |
| gr.update(value=show_research, visible=has_research), | |
| ) | |
| return gr.update(), gr.update(), gr.update(), gr.update() | |
| run_selector.change( | |
| on_run_select, | |
| inputs=[run_selector, runs_state, research_toggle], | |
| outputs=[full_results_state, output_table, download_btn, research_toggle], | |
| ) | |
| def on_upload(file): | |
| if file is None: | |
| return gr.update(visible=False), gr.update(visible=False) | |
| df = pd.read_csv(file) | |
| return gr.update(visible=True), gr.update(value=df, visible=True) | |
| file.change( | |
| fn=on_upload, | |
| inputs=[file], | |
| outputs=[preview_heading, preview_table], | |
| ) | |
| if __name__ == "__main__": | |
| demo.launch() | |