# import re
# from collections import defaultdict, deque
# def parse_step_data_blocks(text):
# blocks = re.findall(r"(.*?)", text, re.DOTALL)
# parsed_steps = []
# for block in blocks:
# match = re.search(r"Yêu cầu\s+(\d+):", block.strip())
# if match:
# step_id = int(match.group(1))
# parsed_steps.append({
# "id": step_id,
# "content": block.strip(),
# "dependencies": set(),
# })
# else:
# pass
# return parsed_steps
# # ==== Phân tích phụ thuộc ====
# def extract_dependencies(steps):
# for step in steps:
# deps = re.findall(r"Yêu cầu\s+(\d+)", step["content"])
# step["dependencies"] = {int(d) for d in deps if int(d) != step["id"]}
# # ==== Xây đồ thị phụ thuộc ====
# def build_dependency_graph(steps):
# graph = defaultdict(list)
# for step in steps:
# for dep in step["dependencies"]:
# graph[dep].append(step["id"])
# return graph
# # ==== Sắp xếp topo ====
# def topological_sort(steps, graph):
# indegree = {step["id"]: 0 for step in steps}
# for step in steps:
# for dep in step["dependencies"]:
# indegree[step["id"]] += 1
# queue = deque([sid for sid, deg in indegree.items() if deg == 0])
# sorted_order = []
# while queue:
# current = queue.popleft()
# sorted_order.append(current)
# for neighbor in graph[current]:
# indegree[neighbor] -= 1
# if indegree[neighbor] == 0:
# queue.append(neighbor)
# return sorted_order
# async def execute_steps(steps, execution_order, query_func,user_id, role,languages):
# results = {}
# step_map = {s["id"]: s for s in steps}
# for step_id in execution_order:
# step = step_map[step_id]
# merged_prompt = ""
# for dep in step["dependencies"]:
# merged_prompt += f"[Yêu cầu {dep}]\n{step_map[dep]['content']}\n"
# merged_prompt += f"[Kết quả {dep}]\n{results[dep]}\n"
# sql_warning = (
# "\n\n⚠️ **Lưu ý khi viết SQL**:\n"
# "- Tránh lỗi như `(1054, \"Unknown column 'oi.pro_id','oi.size' in 'field list'\")`\n"
# "- Vui lòng **kiểm tra kỹ tên cột** trong bảng thực tế.\n"
# "\n\n⚠️ **Lưu ý khi viết SQL** :\n"
# "- Tránh các lỗi như :\n"
# " Tuyệt đối cấm dùng các kí hiệu %, %%s để tránh bị báo lỗi"
# " (1054, \"Unknown column 'oi.pro_id' in 'field list'\")\n . Luôn đảm bảo bạn không bao giờ bị lỗi này"
# " (1054, \"Unknown column 'oi.note' in 'field list'\") . Luôn đảm bảo bạn không bao giờ bị lỗi này\n"
# " (1054, \"Unknown column 'oi.size' in 'field list'\") . Luôn đảm bảo bạn không bao giờ bị lỗi này \n"
# " (1054, \"Unknown column 'c.is_deleted' in 'on clause'\"). Luôn đảm bảo bạn không bao giờ bị lỗi này\n"
# "- 🔍 Quan trọng(Luôn luôn ghi nhớ): Bảng `order_item` **không có** cột `pro_id`, chỉ gồm các cột: `order_item_id`, `order_id`, `quantity`, `total_price`, `cart_id`,. để lấy chi tiết các item hãy nối qua cart tù cart nối cart_item\n"
# """- Tránh lỗi:(1054, "Unknown column 'c.is_deleted' in 'where clause'") vì cart không có thuộc tính is_deleted, cart_item không có thuộc tính user_id"""
# "- Quan trọng: Không cần thêm các điều kiện `is_deleted = 0` như:\n"
# " AND oi.is_deleted = 0\n"
# " AND ci.is_deleted = 0\n"
# " AND pv.is_deleted = 0\n"
# " AND p.is_deleted = 0\n"
# " AND cat.is_deleted = 0\n"
# " AND c.is_deleted = 0\n"
# "- ✅ Các câu SQL đã được kiểm tra và chạy thành công trên hệ thống – vui lòng không tự động thêm các điều kiện lọc `is_deleted`.\n"
# "Khi lấy chi tiết tiết các item thì cần lấy pro_name, size và, price"
# "Vui lòng luu ý rang order_item không có cột pro_id,size chỉ có các cột order_item_id, order_id, quantity, total_pric \n"
# "- Không cần thêm điều kiện `is_deleted = 0` trong các bảng như `oi`, `ci`, `pv`, `p`, `cat`, `c`.\n"
# )
# # Ghép prompt chính để gửi đi
# question_prompt = merged_prompt + f"[ Vui long thục hiện Yêu cầu {step_id}]\n{step['content']}" + sql_warning
# print(f"\n=== 🧩 Thực thi Yêu cầu {step_id} ===")
# # Gọi hàm async thực thi câu hỏi
# result = await query_func(question_prompt, user_id, languages, role)
# results[step_id] = result
# print(f"\n✅ 📤 Kết quả Yêu cầu {step_id}: {result}")
# return results
# async def run_pipeline(text, query_func,user_id,role,languages):
# steps = parse_step_data_blocks(text)
# extract_dependencies(steps)
# graph = build_dependency_graph(steps)
# order = topological_sort(steps, graph)
# print("\n🔗 Thứ tự thực thi hợp lệ theo phụ thuộc:", order)
# data = await execute_steps(steps, order, query_func,user_id,role,languages)
# return data
import re
from collections import defaultdict, deque
import asyncio
def parse_step_data_blocks(text):
blocks = re.findall(r"(.*?)", text, re.DOTALL)
parsed_steps = []
for block in blocks:
match = re.search(r"Yêu cầu\s+(\d+):", block.strip())
if match:
step_id = int(match.group(1))
parsed_steps.append({
"id": step_id,
"content": block.strip(),
"dependencies": set(),
})
return parsed_steps
def extract_dependencies(steps):
for step in steps:
deps = re.findall(r"Yêu cầu\s+(\d+)", step["content"])
step["dependencies"] = {int(d) for d in deps if int(d) != step["id"]}
def build_dependency_graph(steps):
graph = defaultdict(list)
for step in steps:
for dep in step["dependencies"]:
graph[dep].append(step["id"])
return graph
def topological_sort(steps, graph):
indegree = {step["id"]: 0 for step in steps}
for step in steps:
for dep in step["dependencies"]:
indegree[step["id"]] += 1
queue = deque([sid for sid, deg in indegree.items() if deg == 0])
sorted_order = []
while queue:
current = queue.popleft()
sorted_order.append(current)
for neighbor in graph[current]:
indegree[neighbor] -= 1
if indegree[neighbor] == 0:
queue.append(neighbor)
return sorted_order
async def execute_steps_parallel(steps, graph, query_func, user_id, role, languages):
results = {}
step_map = {s["id"]: s for s in steps}
indegree = {s["id"]: 0 for s in steps}
for step in steps:
for dep in step["dependencies"]:
indegree[step["id"]] += 1
queue = deque([sid for sid, deg in indegree.items() if deg == 0])
async def run_step(step_id):
step = step_map[step_id]
merged_prompt = ""
for dep in step["dependencies"]:
merged_prompt += f"[Yêu cầu {dep}]\n{step_map[dep]['content']}\n"
merged_prompt += f"[Kết quả {dep}]\n{results[dep]}\n"
sql_warning = (
"\n\n⚠️ **Lưu ý khi viết SQL**:\n"
"- Tránh lỗi như `(1054, \"Unknown column 'oi.pro_id','oi.size' in 'field list'\")`\n"
"- Vui lòng **kiểm tra kỹ tên cột** trong bảng thực tế.\n"
"\n\n⚠️ **Lưu ý khi viết SQL** :\n"
"- Tránh các lỗi như :\n"
" ###Tuyệt đối cấm dùng các kí hiệu %, %%s để tránh bị báo lỗi"
" (1054, \"Unknown column 'oi.pro_id' in 'field list'\")\n . Luôn đảm bảo bạn không bao giờ bị lỗi này"
" (1054, \"Unknown column 'oi.note' in 'field list'\") . Luôn đảm bảo bạn không bao giờ bị lỗi này\n"
" (1054, \"Unknown column 'oi.size' in 'field list'\") . Luôn đảm bảo bạn không bao giờ bị lỗi này \n"
" (1054, \"Unknown column 'c.is_deleted' in 'on clause'\"). Luôn đảm bảo bạn không bao giờ bị lỗi này\n"
"- 🔍 Quan trọng(Luôn luôn ghi nhớ): Bảng `order_item` **không có** cột `pro_id`, chỉ gồm các cột: `order_item_id`, `order_id`, `quantity`, `total_price`, `cart_id`,. để lấy chi tiết các item hãy nối qua cart tù cart nối cart_item\n"
"""- Tránh lỗi:(1054, "Unknown column 'c.is_deleted' in 'where clause'") vì cart không có thuộc tính is_deleted, cart_item không có thuộc tính user_id"""
"- Quan trọng: Không cần thêm các điều kiện `is_deleted = 0` như:\n"
" AND oi.is_deleted = 0\n"
" AND ci.is_deleted = 0\n"
" AND pv.is_deleted = 0\n"
" AND p.is_deleted = 0\n"
" AND cat.is_deleted = 0\n"
" AND c.is_deleted = 0\n"
"- ✅ Các câu SQL đã được kiểm tra và chạy thành công trên hệ thống – vui lòng không tự động thêm các điều kiện lọc `is_deleted`.\n"
"Khi lấy chi tiết tiết các item thì cần lấy pro_name, size và, price"
"Vui lòng luu ý rang order_item không có cột pro_id,size chỉ có các cột order_item_id, order_id, quantity, total_pric \n"
"- Không cần thêm điều kiện `is_deleted = 0` trong các bảng như `oi`, `ci`, `pv`, `p`, `cat`, `c`.\n"
)
question_prompt = merged_prompt + f"[ Vui lòng thực hiện Yêu cầu {step_id}]\n{step['content']}" + sql_warning
print(f"\n=== 🧩 Thực thi Yêu cầu {step_id} ===")
result = await query_func(question_prompt, user_id, languages, role)
results[step_id] = result
print(f"\n✅ 📤 Kết quả Yêu cầu {step_id}: {result}")
for neighbor in graph[step_id]:
indegree[neighbor] -= 1
if indegree[neighbor] == 0:
queue.append(neighbor)
while queue:
current_batch = list(queue)
queue.clear()
await asyncio.gather(*(run_step(sid) for sid in current_batch))
return results
async def run_pipeline(text, query_func, user_id, role, languages):
steps = parse_step_data_blocks(text)
extract_dependencies(steps)
graph = build_dependency_graph(steps)
order = topological_sort(steps, graph)
print("\n🔗 Thứ tự thực thi hợp lệ theo phụ thuộc:", order)
data = await execute_steps_parallel(steps, graph, query_func, user_id, role, languages)
return data