# import re # from collections import defaultdict, deque # def parse_step_data_blocks(text): # blocks = re.findall(r"(.*?)", text, re.DOTALL) # parsed_steps = [] # for block in blocks: # match = re.search(r"Yêu cầu\s+(\d+):", block.strip()) # if match: # step_id = int(match.group(1)) # parsed_steps.append({ # "id": step_id, # "content": block.strip(), # "dependencies": set(), # }) # else: # pass # return parsed_steps # # ==== Phân tích phụ thuộc ==== # def extract_dependencies(steps): # for step in steps: # deps = re.findall(r"Yêu cầu\s+(\d+)", step["content"]) # step["dependencies"] = {int(d) for d in deps if int(d) != step["id"]} # # ==== Xây đồ thị phụ thuộc ==== # def build_dependency_graph(steps): # graph = defaultdict(list) # for step in steps: # for dep in step["dependencies"]: # graph[dep].append(step["id"]) # return graph # # ==== Sắp xếp topo ==== # def topological_sort(steps, graph): # indegree = {step["id"]: 0 for step in steps} # for step in steps: # for dep in step["dependencies"]: # indegree[step["id"]] += 1 # queue = deque([sid for sid, deg in indegree.items() if deg == 0]) # sorted_order = [] # while queue: # current = queue.popleft() # sorted_order.append(current) # for neighbor in graph[current]: # indegree[neighbor] -= 1 # if indegree[neighbor] == 0: # queue.append(neighbor) # return sorted_order # async def execute_steps(steps, execution_order, query_func,user_id, role,languages): # results = {} # step_map = {s["id"]: s for s in steps} # for step_id in execution_order: # step = step_map[step_id] # merged_prompt = "" # for dep in step["dependencies"]: # merged_prompt += f"[Yêu cầu {dep}]\n{step_map[dep]['content']}\n" # merged_prompt += f"[Kết quả {dep}]\n{results[dep]}\n" # sql_warning = ( # "\n\n⚠️ **Lưu ý khi viết SQL**:\n" # "- Tránh lỗi như `(1054, \"Unknown column 'oi.pro_id','oi.size' in 'field list'\")`\n" # "- Vui lòng **kiểm tra kỹ tên cột** trong bảng thực tế.\n" # "\n\n⚠️ **Lưu ý khi viết SQL** :\n" # "- Tránh các lỗi như :\n" # " Tuyệt đối cấm dùng các kí hiệu %, %%s để tránh bị báo lỗi" # " (1054, \"Unknown column 'oi.pro_id' in 'field list'\")\n . Luôn đảm bảo bạn không bao giờ bị lỗi này" # " (1054, \"Unknown column 'oi.note' in 'field list'\") . Luôn đảm bảo bạn không bao giờ bị lỗi này\n" # " (1054, \"Unknown column 'oi.size' in 'field list'\") . Luôn đảm bảo bạn không bao giờ bị lỗi này \n" # " (1054, \"Unknown column 'c.is_deleted' in 'on clause'\"). Luôn đảm bảo bạn không bao giờ bị lỗi này\n" # "- 🔍 Quan trọng(Luôn luôn ghi nhớ): Bảng `order_item` **không có** cột `pro_id`, chỉ gồm các cột: `order_item_id`, `order_id`, `quantity`, `total_price`, `cart_id`,. để lấy chi tiết các item hãy nối qua cart tù cart nối cart_item\n" # """- Tránh lỗi:(1054, "Unknown column 'c.is_deleted' in 'where clause'") vì cart không có thuộc tính is_deleted, cart_item không có thuộc tính user_id""" # "- Quan trọng: Không cần thêm các điều kiện `is_deleted = 0` như:\n" # " AND oi.is_deleted = 0\n" # " AND ci.is_deleted = 0\n" # " AND pv.is_deleted = 0\n" # " AND p.is_deleted = 0\n" # " AND cat.is_deleted = 0\n" # " AND c.is_deleted = 0\n" # "- ✅ Các câu SQL đã được kiểm tra và chạy thành công trên hệ thống – vui lòng không tự động thêm các điều kiện lọc `is_deleted`.\n" # "Khi lấy chi tiết tiết các item thì cần lấy pro_name, size và, price" # "Vui lòng luu ý rang order_item không có cột pro_id,size chỉ có các cột order_item_id, order_id, quantity, total_pric \n" # "- Không cần thêm điều kiện `is_deleted = 0` trong các bảng như `oi`, `ci`, `pv`, `p`, `cat`, `c`.\n" # ) # # Ghép prompt chính để gửi đi # question_prompt = merged_prompt + f"[ Vui long thục hiện Yêu cầu {step_id}]\n{step['content']}" + sql_warning # print(f"\n=== 🧩 Thực thi Yêu cầu {step_id} ===") # # Gọi hàm async thực thi câu hỏi # result = await query_func(question_prompt, user_id, languages, role) # results[step_id] = result # print(f"\n✅ 📤 Kết quả Yêu cầu {step_id}: {result}") # return results # async def run_pipeline(text, query_func,user_id,role,languages): # steps = parse_step_data_blocks(text) # extract_dependencies(steps) # graph = build_dependency_graph(steps) # order = topological_sort(steps, graph) # print("\n🔗 Thứ tự thực thi hợp lệ theo phụ thuộc:", order) # data = await execute_steps(steps, order, query_func,user_id,role,languages) # return data import re from collections import defaultdict, deque import asyncio def parse_step_data_blocks(text): blocks = re.findall(r"(.*?)", text, re.DOTALL) parsed_steps = [] for block in blocks: match = re.search(r"Yêu cầu\s+(\d+):", block.strip()) if match: step_id = int(match.group(1)) parsed_steps.append({ "id": step_id, "content": block.strip(), "dependencies": set(), }) return parsed_steps def extract_dependencies(steps): for step in steps: deps = re.findall(r"Yêu cầu\s+(\d+)", step["content"]) step["dependencies"] = {int(d) for d in deps if int(d) != step["id"]} def build_dependency_graph(steps): graph = defaultdict(list) for step in steps: for dep in step["dependencies"]: graph[dep].append(step["id"]) return graph def topological_sort(steps, graph): indegree = {step["id"]: 0 for step in steps} for step in steps: for dep in step["dependencies"]: indegree[step["id"]] += 1 queue = deque([sid for sid, deg in indegree.items() if deg == 0]) sorted_order = [] while queue: current = queue.popleft() sorted_order.append(current) for neighbor in graph[current]: indegree[neighbor] -= 1 if indegree[neighbor] == 0: queue.append(neighbor) return sorted_order async def execute_steps_parallel(steps, graph, query_func, user_id, role, languages): results = {} step_map = {s["id"]: s for s in steps} indegree = {s["id"]: 0 for s in steps} for step in steps: for dep in step["dependencies"]: indegree[step["id"]] += 1 queue = deque([sid for sid, deg in indegree.items() if deg == 0]) async def run_step(step_id): step = step_map[step_id] merged_prompt = "" for dep in step["dependencies"]: merged_prompt += f"[Yêu cầu {dep}]\n{step_map[dep]['content']}\n" merged_prompt += f"[Kết quả {dep}]\n{results[dep]}\n" sql_warning = ( "\n\n⚠️ **Lưu ý khi viết SQL**:\n" "- Tránh lỗi như `(1054, \"Unknown column 'oi.pro_id','oi.size' in 'field list'\")`\n" "- Vui lòng **kiểm tra kỹ tên cột** trong bảng thực tế.\n" "\n\n⚠️ **Lưu ý khi viết SQL** :\n" "- Tránh các lỗi như :\n" " ###Tuyệt đối cấm dùng các kí hiệu %, %%s để tránh bị báo lỗi" " (1054, \"Unknown column 'oi.pro_id' in 'field list'\")\n . Luôn đảm bảo bạn không bao giờ bị lỗi này" " (1054, \"Unknown column 'oi.note' in 'field list'\") . Luôn đảm bảo bạn không bao giờ bị lỗi này\n" " (1054, \"Unknown column 'oi.size' in 'field list'\") . Luôn đảm bảo bạn không bao giờ bị lỗi này \n" " (1054, \"Unknown column 'c.is_deleted' in 'on clause'\"). Luôn đảm bảo bạn không bao giờ bị lỗi này\n" "- 🔍 Quan trọng(Luôn luôn ghi nhớ): Bảng `order_item` **không có** cột `pro_id`, chỉ gồm các cột: `order_item_id`, `order_id`, `quantity`, `total_price`, `cart_id`,. để lấy chi tiết các item hãy nối qua cart tù cart nối cart_item\n" """- Tránh lỗi:(1054, "Unknown column 'c.is_deleted' in 'where clause'") vì cart không có thuộc tính is_deleted, cart_item không có thuộc tính user_id""" "- Quan trọng: Không cần thêm các điều kiện `is_deleted = 0` như:\n" " AND oi.is_deleted = 0\n" " AND ci.is_deleted = 0\n" " AND pv.is_deleted = 0\n" " AND p.is_deleted = 0\n" " AND cat.is_deleted = 0\n" " AND c.is_deleted = 0\n" "- ✅ Các câu SQL đã được kiểm tra và chạy thành công trên hệ thống – vui lòng không tự động thêm các điều kiện lọc `is_deleted`.\n" "Khi lấy chi tiết tiết các item thì cần lấy pro_name, size và, price" "Vui lòng luu ý rang order_item không có cột pro_id,size chỉ có các cột order_item_id, order_id, quantity, total_pric \n" "- Không cần thêm điều kiện `is_deleted = 0` trong các bảng như `oi`, `ci`, `pv`, `p`, `cat`, `c`.\n" ) question_prompt = merged_prompt + f"[ Vui lòng thực hiện Yêu cầu {step_id}]\n{step['content']}" + sql_warning print(f"\n=== 🧩 Thực thi Yêu cầu {step_id} ===") result = await query_func(question_prompt, user_id, languages, role) results[step_id] = result print(f"\n✅ 📤 Kết quả Yêu cầu {step_id}: {result}") for neighbor in graph[step_id]: indegree[neighbor] -= 1 if indegree[neighbor] == 0: queue.append(neighbor) while queue: current_batch = list(queue) queue.clear() await asyncio.gather(*(run_step(sid) for sid in current_batch)) return results async def run_pipeline(text, query_func, user_id, role, languages): steps = parse_step_data_blocks(text) extract_dependencies(steps) graph = build_dependency_graph(steps) order = topological_sort(steps, graph) print("\n🔗 Thứ tự thực thi hợp lệ theo phụ thuộc:", order) data = await execute_steps_parallel(steps, graph, query_func, user_id, role, languages) return data