jansahayak / agent_io /profiling_io.py
Anmol4521's picture
Upload 95 files
388aa42 verified
"""
Profiling Agent I/O Handler
Manages input/output for user profiling agent
"""
import json
import os
from datetime import datetime
class ProfilingIO:
"""Handles input/output operations for profiling agent"""
def __init__(self, input_file: str = "agent_io/profiling_input.json",
output_file: str = "agent_io/profiling_output.json"):
self.input_file = input_file
self.output_file = output_file
self._ensure_directory()
def _ensure_directory(self):
"""Create agent_io directory if it doesn't exist"""
os.makedirs(os.path.dirname(self.input_file), exist_ok=True)
def read_input(self) -> dict:
"""
Read profiling agent input from file
Returns:
Raw user input dictionary
"""
try:
if os.path.exists(self.input_file):
with open(self.input_file, 'r', encoding='utf-8') as f:
return json.load(f)
else:
return {"error": "Input file not found"}
except Exception as e:
return {"error": str(e)}
def write_input(self, user_input: str, documents: list = None):
"""
Write raw user input for profiling
Args:
user_input: Raw text input from user
documents: Optional list of uploaded documents
"""
input_data = {
"timestamp": datetime.now().isoformat(),
"user_input": user_input,
"documents": documents or [],
"agent": "user_profiling"
}
with open(self.input_file, 'w', encoding='utf-8') as f:
json.dump(input_data, f, indent=2, ensure_ascii=False)
def write_output(self, profile_data: dict, confidence: dict = None):
"""
Write extracted profile to output file
Args:
profile_data: Structured profile data
confidence: Optional confidence scores for extracted fields
"""
output_data = {
"timestamp": datetime.now().isoformat(),
"profile": profile_data,
"confidence": confidence or {},
"agent": "user_profiling"
}
with open(self.output_file, 'w', encoding='utf-8') as f:
json.dump(output_data, f, indent=2, ensure_ascii=False)
def read_output(self) -> dict:
"""
Read extracted profile
Returns:
Structured profile dictionary
"""
try:
if os.path.exists(self.output_file):
with open(self.output_file, 'r', encoding='utf-8') as f:
return json.load(f)
else:
return {"error": "Output file not found"}
except Exception as e:
return {"error": str(e)}
if __name__ == "__main__":
# Test ProfilingIO
io = ProfilingIO()
# Sample input
user_text = "I am 25 years old from Maharashtra, OBC category, income 3 lakh."
io.write_input(user_text, documents=["resume.pdf"])
print("Input written successfully")
# Sample output
profile = {
"age": 25,
"state": "Maharashtra",
"caste": "OBC",
"income": "300000"
}
io.write_output(profile, confidence={"age": 1.0, "state": 1.0})
print("Output written successfully")