Spaces:
Sleeping
Sleeping
| # %% | |
| import os | |
| import sys | |
| # Change the current working directory to the directory where the script is located | |
| #__file__ = | |
| current_dir = os.path.dirname(os.path.abspath(__file__)) | |
| os.chdir(current_dir) | |
| # %% | |
| # import requests | |
| # from bs4 import BeautifulSoup | |
| # from urllib.parse import urljoin | |
| # import time | |
| # import concurrent.futures | |
| # from queue import Queue | |
| # from threading import Lock | |
| # def fetch_and_parse_links(url, base_url): | |
| # try: | |
| # response = requests.get(url, timeout=10) | |
| # response.raise_for_status() | |
| # soup = BeautifulSoup(response.content, 'html.parser') | |
| # main_div = soup.find('div', id='main') | |
| # if not main_div: | |
| # print(f"No div with id='main' found in {url}") | |
| # return [] | |
| # links = main_div.find_all('a', href=True) | |
| # paths = [] | |
| # for link in links: | |
| # href = urljoin(url, link['href']) | |
| # if href.startswith(base_url) and '#' not in href: | |
| # path = href[len(base_url):].strip("/") | |
| # if path and path not in paths: | |
| # paths.append(path) | |
| # return paths | |
| # except requests.RequestException as e: | |
| # print(f"Error fetching {url}: {e}") | |
| # return [] | |
| # def worker(base_url, to_visit_queue, visited_paths, unvisited_paths, tuples_list, lock): | |
| # while True: | |
| # current_path = to_visit_queue.get() | |
| # if current_path is None: | |
| # break | |
| # with lock: | |
| # if current_path in visited_paths: | |
| # to_visit_queue.task_done() | |
| # continue | |
| # visited_paths.add(current_path) | |
| # current_url = urljoin(base_url, current_path) | |
| # print(f"Visiting: {current_url}") | |
| # new_paths = fetch_and_parse_links(current_url, base_url) | |
| # with lock: | |
| # for new_path in new_paths: | |
| # if new_path not in visited_paths: | |
| # to_visit_queue.put(new_path) | |
| # unvisited_paths.add(new_path) | |
| # from_url = f"{base_url}{current_path}" | |
| # to_url = f"{base_url}{new_path}" | |
| # new_tuple = (from_url, to_url) | |
| # if new_tuple not in tuples_list: | |
| # tuples_list.append(new_tuple) | |
| # if current_path in unvisited_paths: | |
| # unvisited_paths.remove(current_path) | |
| # to_visit_queue.task_done() | |
| # time.sleep(1) # Be polite to the server | |
| # def create_tuples_from_paths(base_url, max_workers=5): | |
| # visited_paths = set() | |
| # unvisited_paths = set() | |
| # tuples_list = [] | |
| # to_visit_queue = Queue() | |
| # lock = Lock() | |
| # to_visit_queue.put("") # Start with an empty string to represent the root | |
| # with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: | |
| # futures = [] | |
| # for _ in range(max_workers): | |
| # future = executor.submit(worker, base_url, to_visit_queue, visited_paths, unvisited_paths, tuples_list, lock) | |
| # futures.append(future) | |
| # to_visit_queue.join() | |
| # for _ in range(max_workers): | |
| # to_visit_queue.put(None) | |
| # concurrent.futures.wait(futures) | |
| # return tuples_list, visited_paths, unvisited_paths | |
| # # Define the base URL | |
| # base_url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/" | |
| # import json | |
| # def load_json(file_path): | |
| # with open(file_path, 'r', encoding='utf-8') as file: | |
| # return json.load(file) | |
| # def flatten_list(nested_list): | |
| # for item in nested_list: | |
| # if isinstance(item, list): | |
| # yield from flatten_list(item) # Recursively yield from nested lists | |
| # else: | |
| # yield item | |
| # import polars as pl | |
| # # Define the base URL | |
| # base_url = 'https://www.gov.br/governodigital/pt-br/' | |
| # # Example usage | |
| # file_path = 'memory/graph_data_tiplet.json' # Replace with your actual file path | |
| # base_url = 'https://www.gov.br/governodigital/pt-br/' | |
| # json_data = load_json(file_path) | |
| # json_data = list(flatten_list(json_data)) | |
| # # Convert the list of URLs to a Polars DataFrame | |
| # df = pl.DataFrame({ | |
| # 'url': json_data | |
| # }) | |
| # # Remove the base URL and convert to path | |
| # df = df.with_columns( | |
| # (pl.col('url').str.replace(base_url, '')).alias('path') | |
| # ) | |
| # # Extract paths as a list | |
| # paths = df['path'].to_list() | |
| # # Build a hierarchical structure | |
| # def build_tree(paths): | |
| # tree = {} | |
| # for path in paths: | |
| # parts = path.strip('/').split('/') | |
| # current_level = tree | |
| # for part in parts: | |
| # if part not in current_level: | |
| # current_level[part] = {} | |
| # current_level = current_level[part] | |
| # return tree | |
| #%% | |
| from utils.llm import chat | |
| from utils.file import File | |
| import json | |
| system = File("prompts/system.md") | |
| knowledge = File("prompts/knowledge.md") | |
| graph = File("interface/visualization.html") | |
| graph_data = File("memory/graph_data.json") | |
| # user_question = input("Question?") | |
| # messages = [ | |
| # { | |
| # "role": "system", | |
| # "content": [ | |
| # { | |
| # "type": "text", | |
| # "text": system | |
| # } | |
| # ] | |
| # }, | |
| # { | |
| # "role": "user", | |
| # "content": [ | |
| # { | |
| # "type": "text", | |
| # "text": user_question | |
| # } | |
| # ] | |
| # } | |
| # ] | |
| def pipeline(messages): | |
| res = chat(messages=messages) | |
| response = res.choices[0].message.content | |
| return response | |
| # if __name__ == "__main__": | |
| # res = chat(messages=messages) | |
| # response = res.choices[0].message.content | |
| # print(response) | |
| #%% | |
| # from IPython.display import display, Markdown | |
| # def build_tree_structure(tree, indent=0): | |
| # """ | |
| # Recursively builds a string representation of the tree structure. | |
| # Args: | |
| # tree (dict): The hierarchical tree structure. | |
| # indent (int): The current level of indentation. | |
| # Returns: | |
| # str: A string representing the tree structure. | |
| # """ | |
| # result = "" | |
| # for key, subtree in tree.items(): | |
| # result += f"{' ' * indent} - {key}/\n" | |
| # if isinstance(subtree, dict): | |
| # result += build_tree_structure(subtree, indent + 1) | |
| # return result | |
| # # Create and print the hierarchical structure | |
| # tree_structure = build_tree(paths) | |
| # obj = build_tree_structure(tree_structure) | |
| # print(obj) | |
| # display(Markdown(obj)) | |
| # # print(json.dumps(tree_structure, indent=2)) | |
| # #%% | |
| # # Create tuples from paths and track visited/unvisited paths | |
| # tuples_list, visited_paths, unvisited_paths = create_tuples_from_paths(base_url, 10) | |
| # # Print the resulting list of tuples | |
| # print("\nTuples:") | |
| # for t in tuples_list: | |
| # print(t) | |
| # # Print visited and unvisited paths | |
| # print("\nVisited Paths:") | |
| # for p in visited_paths: | |
| # print(f"{base_url}{p}") | |
| # print("\nUnvisited Paths:") | |
| # for p in unvisited_paths: | |
| # print(f"{base_url}{p}") | |
| # # Print summary | |
| # print(f"\nTotal links found: {len(tuples_list)}") | |
| # print(f"Visited pages: {len(visited_paths)}") | |
| # print(f"Unvisited pages: {len(unvisited_paths)}") | |
| # # Create a dictionary to hold our graph data | |
| # graph_data = { | |
| # "nodes": [], | |
| # "edges": [] | |
| # } | |
| # import json | |
| # # Create a set to keep track of nodes we've added | |
| # added_nodes = set() | |
| # # Process the tuples to create nodes and edges | |
| # for from_url, to_url in tuples_list: | |
| # from_path = from_url[len(base_url):].strip("/") or "root" | |
| # to_path = to_url[len(base_url):].strip("/") | |
| # if from_path not in added_nodes: | |
| # graph_data["nodes"].append({"id": from_path, "label": from_path}) | |
| # added_nodes.add(from_path) | |
| # if to_path not in added_nodes: | |
| # graph_data["nodes"].append({"id": to_path, "label": to_path}) | |
| # added_nodes.add(to_path) | |
| # graph_data["edges"].append({"from": from_path, "to": to_path}) | |
| # # Save the graph data to a JSON file | |
| # with open('graph_data.json', 'w') as f: | |
| # json.dump(graph_data, f) | |
| # # Save the graph data to a JSON file | |
| # with open('graph_data_tiplet.json', 'w') as f: | |
| # json.dump(tuples_list, f) | |
| # print("Graph data saved to graph_data.json") | |
| # # %% | |
| # import requests | |
| # from bs4 import BeautifulSoup | |
| # from markdownify import markdownify as md | |
| # import os | |
| # os.chdir("/home/zuz/Projetos/LAMFO/SGD/prototipo01_atendimento_govBR") | |
| # from Banco_de_Dados.Estruturado.data2json import format_for_markdown | |
| # # URL da página web | |
| # url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br" | |
| # url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/atendimento-presencial" | |
| # url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/duvidas-na-conta-gov.br" | |
| # url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br/duvidas-na-conta-gov.br/recuperar-conta-gov.br" | |
| # # Obter o HTML da página | |
| # response = requests.get(url) | |
| # html_content = response.text | |
| # # Usar BeautifulSoup para analisar o HTML | |
| # soup = BeautifulSoup(html_content, 'html.parser') | |
| # # Extrair o conteúdo da div com id 'main' | |
| # main_div = soup.find('div', id='main') | |
| # a = format_for_markdown(main_div) | |
| # print(a) | |
| # if main_div: | |
| # # Converter o conteúdo da div para Markdown | |
| # markdown_content = md(str(main_div)) | |
| # # Remover quebras de linha extras (\n\n) | |
| # markdown_content = "\n".join([line for line in markdown_content.split("\n\n") if line.strip()]) | |
| # print(markdown_content) | |
| # # Salvar o conteúdo em Markdown em um arquivo | |
| # with open("main_content.md", "w", encoding="utf-8") as file: | |
| # file.write(markdown_content) | |
| # print("Conversão concluída e salva em 'main_content.md'.") | |
| # else: | |
| # print("Div com id 'main' não encontrada.") | |
| # # %% | |
| # import requests | |
| # def pipeline(): | |
| # # url = input("website: ") | |
| # url = "https://www.gov.br/governodigital/pt-br/acessibilidade-e-usuario/atendimento-gov.br" | |
| # response = requests.get(url).text | |
| # print(response) | |
| # import os | |
| # def print_directory_structure(path, level=0): | |
| # if not os.path.isdir(path): | |
| # print(f"{path} is not a valid directory.") | |
| # return | |
| # prefix = ' ' * 4 * level + '|-- ' | |
| # print(prefix + os.path.basename(path) + '/') | |
| # for item in os.listdir(path): | |
| # item_path = os.path.join(path, item) | |
| # if os.path.isdir(item_path): | |
| # print_directory_structure(item_path, level + 1) | |
| # else: | |
| # print(' ' * 4 * (level + 1) + '|-- ' + item) | |
| # # Replace 'your_path_here' with the path you want to print | |
| # your_path_here = '/home/zuz/Projetos/LAMFO/SGD/prototipo01_atendimento_govBR/AI_agent' | |
| # print_directory_structure(your_path_here) | |
| # if __name__ == "__main__": | |
| # pipeline() | |