FinGraph / src /graphBuilder /neo4j /finGraph.py
dev-yuje's picture
fix: gpt-4o둜 μ—…κ·Έλ ˆμ΄λ“œ 및 κ·Έλž˜ν”„ 관계 μ—°κ²° λˆ„λ½ κ·Όλ³Έ ν•΄κ²°
c64138a
"""
finGraph.py β€” AI λ‰΄μŠ€ 지식 κ·Έλž˜ν”„ λΉŒλ”
=====================================
μ‹€ν–‰ μˆœμ„œ:
1. finScrapping.py μ‹€ν–‰ β†’ Articles_*.xlsx 생성
2. 이 파일 μ‹€ν–‰ β†’ Neo4j에 μ—”ν‹°ν‹°/관계/벑터 적재
λ…Έλ“œ: AICompany, AITechnology, AIService, AIField, Article, Content, Media
관계: DEVELOPS, INVESTS_IN, PARTNERS_WITH, APPLIES, USED_IN, RELATED_TO,
MENTIONS, HAS_CHUNK, PUBLISHED
"""
import glob
import json
import os
from typing import Dict, List, TypedDict
import dotenv
import neo4j
import pandas as pd
from langchain_openai import ChatOpenAI
from langgraph.graph import END, StateGraph
from neo4j_graphrag.embeddings.openai import OpenAIEmbeddings
from neo4j_graphrag.indexes import create_vector_index
from neo4j_graphrag.llm import OpenAILLM
dotenv.load_dotenv()
# Windows cp949 인코딩 ν™˜κ²½μ—μ„œ 이λͺ¨μ§€ 좜λ ₯ μ‹œ UnicodeEncodeError λ°©μ§€λ₯Ό μœ„ν•œ μ•ˆμ „ν•œ print ν•¨μˆ˜ μ •μ˜
def safe_print(*args, **kwargs) -> None:
import sys
try:
# endλ‚˜ sep 인자λ₯Ό μ˜¬λ°”λ₯΄κ²Œ μ²˜λ¦¬ν•  수 μžˆλ„λ‘ λ‚΄μž₯ print의 κΈ°λŠ₯ μ€€μˆ˜
sep = kwargs.get("sep", " ")
end = kwargs.get("end", "\n")
msg = sep.join(map(str, args))
sys.stdout.write(msg + end)
sys.stdout.flush()
except UnicodeEncodeError:
msg = sep.join(map(str, args))
cleaned = (
msg.replace("βœ…", "[OK]")
.replace("⚠️", "[WARN]")
.replace("🚨", "[ERR]")
.replace("⏭️", "[SKIP]")
.replace("πŸ€–", "[AI]")
.replace("🏒", "[COMP]")
.replace("🌌", "[GRAPH]")
.replace("πŸ“°", "[NEWS]")
.replace("πŸ”¬", "[TECH]")
.replace("πŸ”—", "[LINK]")
)
try:
sys.stdout.write(cleaned + end)
sys.stdout.flush()
except Exception:
ascii_msg = msg.encode("ascii", errors="replace").decode("ascii")
sys.stdout.write(ascii_msg + end)
sys.stdout.flush()
print = safe_print
def get_neo4j_driver() -> neo4j.Driver:
uri = os.getenv("NEO4J_URI", "neo4j://localhost:7687")
client_id = os.getenv("NEO4J_CLIENT_ID")
client_secret = os.getenv("NEO4J_CLIENT_SECRET")
if client_id and client_secret:
try:
d = neo4j.GraphDatabase.driver(uri, auth=(client_id, client_secret))
d.verify_connectivity()
return d
except Exception:
pass
username = os.getenv("NEO4J_USERNAME", "neo4j")
password = os.getenv("NEO4J_PASSWORD", "password")
d = neo4j.GraphDatabase.driver(uri, auth=(username, password))
d.verify_connectivity()
return d
driver = None
# μ—”ν‹°ν‹°/관계 μΆ”μΆœμ€ gpt-4oλ₯Ό μ‚¬μš©ν•˜μ—¬ κ·Έλž˜ν”„ ν’ˆμ§ˆμ„ μ΅œλŒ€ν™”ν•œλ‹€
chat_llm = ChatOpenAI(model="gpt-4o", temperature=0)
rag_llm = OpenAILLM(model_name="gpt-4o-mini", model_params={"temperature": 0})
embedder = OpenAIEmbeddings(model="text-embedding-3-small")
INDEX_NAME = "content_vector_index"
# ──────────────────────────────────────────
# 1. LangGraph νŒŒμ΄ν”„λΌμΈ μ •μ˜ (μ—”ν‹°ν‹°/관계 μΆ”μΆœ)
# ──────────────────────────────────────────
class ArticleState(TypedDict):
article_id: str
title: str
text: str
is_ai_related: bool
entities: List[Dict]
relations: List[Dict]
retry_count: int # μ—”ν‹°ν‹° μΆ”μΆœ μž¬μ‹œλ„ μΉ΄μš΄ν„°
reflection_feedback: str # μ—”ν‹°ν‹° μΆ”μΆœ μžκΈ°λ°˜μ„± ν”Όλ“œλ°±
relation_retry_count: int # 관계 μΆ”μΆœ μž¬μ‹œλ„ μΉ΄μš΄ν„°
relation_feedback: str # 관계 μΆ”μΆœ μžκΈ°λ°˜μ„± ν”Όλ“œλ°±
def check_ai_relevance(state: ArticleState) -> ArticleState:
"""Node 1: AI κ΄€λ ¨ μ—¬λΆ€ νŒλ³„"""
prompt = (
"λ‹€μŒ 기사가 AI(인곡지λŠ₯) κΈ°μˆ Β·κΈ°μ—…Β·μ„œλΉ„μŠ€μ™€ κ΄€λ ¨ 있으면 yes, μ•„λ‹ˆλ©΄ no둜만 λ‹΅ν•˜μ„Έμš”.\n\n"
f"{state['text'][:400]}\n\nλ‹΅λ³€(yes/no):"
)
res = chat_llm.invoke(prompt)
return {
**state,
"is_ai_related": str(res.content).strip().lower().startswith("yes"),
}
def extract_entities(state: ArticleState) -> ArticleState:
"""Node 2: μ—”ν‹°ν‹° μΆ”μΆœ (μžκΈ°λ°˜μ„± ν”Όλ“œλ°± 반영 및 νƒ€μž… μ •ν•©μ„± 검증)"""
retry_count = state.get("retry_count", 0) + 1
feedback = state.get("reflection_feedback", "")
feedback_prompt = ""
if feedback:
feedback_prompt = (
f"\n\n⚠️ [이전 μ‹œλ„μ— λŒ€ν•œ 검증 였λ₯˜ ν”Όλ“œλ°±]:\n{feedback}\n"
"μœ„ 였λ₯˜λ₯Ό λ°˜λ“œμ‹œ λΆ„μ„ν•˜μ—¬, μ΄λ²ˆμ—λŠ” μ€‘λ³΅λ˜κ±°λ‚˜ λΉ„μ–΄μžˆκ±°λ‚˜ λΆˆμ™„μ „ν•˜μ§€ μ•Šκ³  "
"μ •ν™•ν•œ νƒ€μž…κ³Ό μ„€λͺ…을 κ°–μΆ˜ μ˜¬λ°”λ₯Έ μ—”ν‹°ν‹°λ§Œ μ—„κ²©ν•˜κ²Œ JSON으둜 μΆ”μΆœν•΄μ£Όμ„Έμš”."
)
prompt = f"""λ‹€μŒ AI λ‰΄μŠ€μ—μ„œ 핡심 엔티티듀을 μΆ”μΆœν•˜μ„Έμš”.
μ—”ν‹°ν‹° μœ ν˜•:
- AICompany: κΈ°μ—…/κΈ°κ΄€ (예: μ‚Όμ„±μ „μž, OpenAI)
- AITechnology: AI 기술 (예: λŒ€κ·œλͺ¨μ–Έμ–΄λͺ¨λΈ, κ°•ν™”ν•™μŠ΅)
- AIService: μ„œλΉ„μŠ€/μ œν’ˆ (예: ChatGPT, HyperCLOVA X)
- AIField: 적용 λΆ„μ•Ό (예: 금육AI, AI λ°˜λ„μ²΄)
제λͺ©: {state["title"]}
λ³Έλ¬Έ: {state["text"][:900]}{feedback_prompt}
JSON으둜만 응닡: {{"entities":[{{"name":"...","type":"AICompany|AITechnology|AIService|AIField","description":"..."}}]}}"""
res = chat_llm.invoke(prompt)
entities = []
new_feedback = ""
try:
raw = str(res.content).strip()
if "```" in raw:
raw = raw.split("```")[1].lstrip("json")
data = json.loads(raw)
extracted = data.get("entities", [])
allowed_types = {"AICompany", "AITechnology", "AIService", "AIField"}
valid_entities = []
for e in extracted:
name = e.get("name", "").strip()
etype = e.get("type", "").strip()
desc = e.get("description", "").strip()
if not name:
new_feedback += "- μ—”ν‹°ν‹°μ˜ 이름(name) ν•„λ“œκ°€ λˆ„λ½λ˜μ—ˆκ±°λ‚˜ λΉ„μ–΄μžˆμŠ΅λ‹ˆλ‹€.\n"
continue
if etype not in allowed_types:
new_feedback += f"- μ—”ν‹°ν‹° '{name}'의 νƒ€μž… '{etype}'은 ν—ˆμš©λœ μ’…λ₯˜({', '.join(allowed_types)})κ°€ μ•„λ‹™λ‹ˆλ‹€.\n"
continue
if not desc:
new_feedback += f"- μ—”ν‹°ν‹° '{name}'에 λŒ€ν•œ μ„€λͺ…(description)이 μƒλž΅λ˜μ—ˆμŠ΅λ‹ˆλ‹€.\n"
continue
valid_entities.append({
"name": name,
"type": etype,
"description": desc
})
entities = valid_entities
if not entities:
new_feedback = "μœ νš¨ν•œ μ—”ν‹°ν‹°κ°€ ν•˜λ‚˜λ„ μΆ”μΆœλ˜μ§€ μ•Šμ•˜μŠ΅λ‹ˆλ‹€."
except Exception as err:
entities = []
new_feedback = f"응닡 JSON νŒŒμ‹± μ‹€νŒ¨ λ˜λŠ” ν˜•μ‹μ΄ μ˜¬λ°”λ₯΄μ§€ μ•ŠμŠ΅λ‹ˆλ‹€. μ—λŸ¬: {str(err)}"
return {
**state,
"entities": entities,
"retry_count": retry_count,
"reflection_feedback": new_feedback.strip()
}
def extract_relations(state: ArticleState) -> ArticleState:
"""Node 3: 관계 μΆ”μΆœ (μžκΈ°λ°˜μ„± ν”Όλ“œλ°± 반영 및 μ—”ν‹°ν‹°λͺ… μ •ν•©μ„± 검증)"""
if not state["entities"]:
return {**state, "relations": [], "relation_retry_count": 0, "relation_feedback": ""}
relation_retry = state.get("relation_retry_count", 0) + 1
rel_feedback = state.get("relation_feedback", "")
# μ—”ν‹°ν‹°λͺ… λͺ©λ‘μ„ μ •ν™•νžˆ μ œκ³΅ν•˜μ—¬ LLM이 이름을 μž„μ˜λ‘œ λ³€κ²½ν•˜μ§€ μ•Šλ„λ‘ ν•œλ‹€
names_list = [e["name"] for e in state["entities"]]
elist = "\n".join([f"- {e['name']} ({e['type']})" for e in state["entities"]])
feedback_prompt = ""
if rel_feedback:
feedback_prompt = (
f"\n\n⚠️ [이전 μ‹œλ„ 관계 μΆ”μΆœ 였λ₯˜ ν”Όλ“œλ°±]:\n{rel_feedback}\n"
"μœ„ 였λ₯˜λ₯Ό λ°˜λ“œμ‹œ μˆ˜μ •ν•˜μ—¬, source/target 이름이 μ—”ν‹°ν‹° λͺ©λ‘μ— μžˆλŠ” 이름과 μ •ν™•νžˆ μΌμΉ˜ν•˜λŠ” "
"κ΄€κ³„λ§Œ JSON으둜 μ‘λ‹΅ν•˜μ„Έμš”."
)
prompt = (
f"λ‹€μŒ AI λ‰΄μŠ€μ—μ„œ μ—”ν‹°ν‹° κ°„μ˜ 관계λ₯Ό μΆ”μΆœν•˜μ„Έμš”.\n\n"
f"μ—”ν‹°ν‹° λͺ©λ‘ (이름은 μ •ν™•νžˆ 이 λͺ©λ‘μ—μ„œλ§Œ μ‚¬μš©):\n{elist}\n\n"
f"λ³Έλ¬Έ: {state['text'][:900]}\n\n"
"관계 μœ ν˜•:\n"
"- DEVELOPS: 기업이 기술/μ„œλΉ„μŠ€λ₯Ό 개발\n"
"- INVESTS_IN: 기업이 λ‹€λ₯Έ κΈ°μ—…/뢄야에 투자\n"
"- PARTNERS_WITH: κΈ°μ—… κ°„ νŒŒνŠΈλ„ˆμ‹­/ν˜‘λ ₯\n"
"- APPLIES: 기업이 κΈ°μˆ μ„ νŠΉμ • 뢄야에 적용\n"
"- USED_IN: 기술/μ„œλΉ„μŠ€κ°€ νŠΉμ • λΆ„μ•Ό/μ œν’ˆμ— ν™œμš©\n"
"- RELATED_TO: 일반적 μ—°κ΄€ 관계\n\n"
"κ·œμΉ™: source와 target은 λ°˜λ“œμ‹œ μœ„ μ—”ν‹°ν‹° λͺ©λ‘μ˜ μ •ν™•ν•œ 이름을 μ‚¬μš©ν•  것. "
"μ—”ν‹°ν‹°κ°€ μ΅œμ†Œ 2개 이상이면 λ°˜λ“œμ‹œ 1개 μ΄μƒμ˜ 관계λ₯Ό μΆ”μΆœν•  것.\n\n"
f"{feedback_prompt}"
'JSON으둜만 응닡: {"relations":[{"source":"μ—”ν‹°ν‹°λͺ…","relation":"κ΄€κ³„μœ ν˜•","target":"μ—”ν‹°ν‹°λͺ…"}]}'
)
res = chat_llm.invoke(prompt)
relations: List[Dict] = []
new_rel_feedback = ""
try:
raw = str(res.content).strip()
if "```" in raw:
raw = raw.split("```")[1].lstrip("json").strip()
parsed = json.loads(raw).get("relations", [])
# μ—”ν‹°ν‹° 이름 μ§‘ν•©μœΌλ‘œ 관계 μ†ŒμŠ€/νƒ€κ²Ÿ μ •ν•©μ„± 검증
names_set = set(names_list)
allowed = {"DEVELOPS", "INVESTS_IN", "PARTNERS_WITH", "APPLIES", "USED_IN", "RELATED_TO"}
valid_rels: List[Dict] = []
for r in parsed:
src = r.get("source", "").strip()
tgt = r.get("target", "").strip()
rel = r.get("relation", "").strip().upper()
if src not in names_set:
new_rel_feedback += f"- source '{src}'이 μ—”ν‹°ν‹° λͺ©λ‘μ— μ—†μŒ\n"
continue
if tgt not in names_set:
new_rel_feedback += f"- target '{tgt}'이 μ—”ν‹°ν‹° λͺ©λ‘μ— μ—†μŒ\n"
continue
if rel not in allowed:
new_rel_feedback += f"- κ΄€κ³„μœ ν˜• '{rel}'은 ν—ˆμš©λ˜μ§€ μ•ŠμŒ\n"
continue
if src == tgt:
new_rel_feedback += f"- source와 target이 동일({src})ν•˜μ—¬ μ œμ™Έ\n"
continue
valid_rels.append({"source": src, "relation": rel, "target": tgt})
relations = valid_rels
# μ—”ν‹°ν‹°κ°€ 2개 이상인데 관계가 0개이면 ν”Όλ“œλ°±
if len(names_list) >= 2 and not relations:
new_rel_feedback = (
f"μ—”ν‹°ν‹°κ°€ {len(names_list)}κ°œμž„μ—λ„ 유효 관계가 0κ°œμž…λ‹ˆλ‹€. "
"λ³Έλ¬Έμ—μ„œ λ°˜λ“œμ‹œ μ—°κ΄€λ˜λŠ” μ—”ν‹°ν‹° μŒμ„ μ°Ύμ•„ 관계λ₯Ό μΆ”μΆœν•˜μ„Έμš”."
)
except Exception as err:
relations = []
new_rel_feedback = f"JSON νŒŒμ‹± μ‹€νŒ¨: {str(err)}"
return {
**state,
"relations": relations,
"relation_retry_count": relation_retry,
"relation_feedback": new_rel_feedback.strip(),
}
def route_after_check(state: ArticleState) -> str:
"""AI κ΄€λ ¨ 기사인지 νŒλ³„ ν›„ λΌμš°νŒ…"""
return "extract_entities" if state["is_ai_related"] else END
def validate_entities(state: ArticleState) -> str:
"""μ—”ν‹°ν‹° ν’ˆμ§ˆ 검증 β€” 미달 μ‹œ μ΅œλŒ€ 3회 μžκΈ°λ°˜μ„±(Self-Reflection) 루프"""
retry_count = state.get("retry_count", 0)
feedback = state.get("reflection_feedback", "")
entities = state.get("entities", [])
if (feedback or not entities) and retry_count < 3:
print(f" ⚠️ [μ—”ν‹°ν‹° Self-Reflection] ν’ˆμ§ˆ 미달 ({retry_count}/3). ν”Όλ“œλ°±: {feedback[:80]}")
return "extract_entities"
if feedback and retry_count >= 3:
print(f" 🚨 [μ—”ν‹°ν‹° Self-Reflection] 3회 초과, κ°•μ œ 톡과. ν”Όλ“œλ°±: {feedback[:80]}")
return "extract_relations"
def validate_relations(state: ArticleState) -> str:
"""관계 ν’ˆμ§ˆ 검증 β€” μ—”ν‹°ν‹° 2개 이상인데 관계 0개이면 μ΅œλŒ€ 2회 μž¬μ‹œλ„"""
rel_retry = state.get("relation_retry_count", 0)
rel_feedback = state.get("relation_feedback", "")
relations = state.get("relations", [])
entities = state.get("entities", [])
# μ—”ν‹°ν‹°κ°€ 2개 이상인데 관계가 μ—†κ³  아직 μž¬μ‹œλ„ μ—¬μœ κ°€ 있으면 루프
if len(entities) >= 2 and not relations and rel_retry < 2:
print(f" ⚠️ [관계 Self-Reflection] 관계 0개 ({rel_retry}/2). μž¬μ‹œλ„: {rel_feedback[:80]}")
return "extract_relations"
if rel_feedback and relations:
# 유효 관계가 μžˆμ§€λ§Œ 일뢀 ν”Όλ“œλ°±λ„ 있음 β€” 톡과
print(f" ⚠️ [관계 Self-Reflection] 일뢀 무효 관계 μ œμ™Έλ¨. 유효 관계: {len(relations)}개")
return END
builder = StateGraph(ArticleState)
builder.add_node("check_ai", check_ai_relevance)
builder.add_node("extract_entities", extract_entities)
builder.add_node("extract_relations", extract_relations)
builder.set_entry_point("check_ai")
builder.add_conditional_edges("check_ai", route_after_check)
# μ—”ν‹°ν‹° μžκΈ°λ°˜μ„± 루프
builder.add_conditional_edges(
"extract_entities",
validate_entities,
{
"extract_entities": "extract_entities",
"extract_relations": "extract_relations",
},
)
# 관계 μžκΈ°λ°˜μ„± 루프 (μ‹ κ·œ)
builder.add_conditional_edges(
"extract_relations",
validate_relations,
{
"extract_relations": "extract_relations",
END: END,
},
)
pipeline = builder.compile()
# ──────────────────────────────────────────
# 2. Neo4j μŠ€ν‚€λ§ˆ μ΄ˆκΈ°ν™” 및 적재 ν•¨μˆ˜
# ──────────────────────────────────────────
ENTITY_TYPE_MAP = {
"AICompany": "AICompany",
"AITechnology": "AITechnology",
"AIService": "AIService",
"AIField": "AIField",
}
def setup_schema(tx) -> None:
constraints = [
"CREATE CONSTRAINT IF NOT EXISTS FOR (n:AICompany) REQUIRE n.name IS UNIQUE",
"CREATE CONSTRAINT IF NOT EXISTS FOR (n:AITechnology) REQUIRE n.name IS UNIQUE",
"CREATE CONSTRAINT IF NOT EXISTS FOR (n:AIService) REQUIRE n.name IS UNIQUE",
"CREATE CONSTRAINT IF NOT EXISTS FOR (n:AIField) REQUIRE n.name IS UNIQUE",
"CREATE CONSTRAINT IF NOT EXISTS FOR (n:Article) REQUIRE n.article_id IS UNIQUE",
"CREATE CONSTRAINT IF NOT EXISTS FOR (n:Content) REQUIRE n.content_id IS UNIQUE",
"CREATE CONSTRAINT IF NOT EXISTS FOR (n:Media) REQUIRE n.name IS UNIQUE",
]
for c in constraints:
try:
tx.run(c)
except Exception:
pass
def upsert_entity(tx, e: Dict) -> None:
ntype = ENTITY_TYPE_MAP.get(e.get("type", "AICompany"), "AICompany")
tx.run(
f"MERGE (n:{ntype} {{name:$name}}) "
"ON CREATE SET n.description=$desc "
"ON MATCH SET n.description=COALESCE(n.description,$desc)",
name=e["name"],
desc=e.get("description", ""),
)
def upsert_relation(tx, r: Dict) -> None:
rel = r.get("relation", "RELATED_TO").upper().replace(" ", "_")
allowed = {
"DEVELOPS",
"INVESTS_IN",
"PARTNERS_WITH",
"APPLIES",
"USED_IN",
"RELATED_TO",
}
if rel not in allowed:
return
try:
tx.run(
f"MATCH (s {{name:$src}}) MATCH (t {{name:$tgt}}) MERGE (s)-[:{rel}]->(t)",
src=r["source"],
tgt=r["target"],
)
except Exception:
pass
def upsert_article_and_mentions(tx, row: pd.Series, entities: List[Dict]) -> None:
tx.run(
"MERGE (a:Article {article_id:$aid}) SET a.title=$title, a.url=$url, a.published_date=$date",
aid=row.get("article_id", ""),
title=row.get("title", ""),
url=row.get("url", ""),
date=str(row.get("published_date", "")),
)
if pd.notna(row.get("source", "")):
tx.run(
"MERGE (m:Media {name:$src}) WITH m MATCH (a:Article {article_id:$aid}) MERGE (m)-[:PUBLISHED]->(a)",
src=row["source"],
aid=row.get("article_id", ""),
)
for e in entities:
ntype = ENTITY_TYPE_MAP.get(e.get("type", "AICompany"), "AICompany")
try:
tx.run(
f"MATCH (a:Article {{article_id:$aid}}) MATCH (n:{ntype} {{name:$name}}) MERGE (a)-[:MENTIONS]->(n)",
aid=row.get("article_id", ""),
name=e["name"],
)
except Exception:
pass
def chunk_text(text: str, size: int = 500, overlap: int = 50) -> List[str]:
if not text or pd.isna(text):
return []
text = str(text)
return [text[i : i + size].strip() for i in range(0, len(text), size - overlap) if text[i : i + size].strip()]
# ──────────────────────────────────────────
# 3. 메인 μ‹€ν–‰ (슀크립트둜 직접 호좜 μ‹œ)
# ──────────────────────────────────────────
def is_article_loaded(tx, aid: str) -> bool:
"""이미 DB에 적재된 기사인지 μ²΄ν¬ν•˜μ—¬ 쀑볡 API 호좜 λ°©μ§€"""
res = tx.run("MATCH (a:Article {article_id:$aid}) RETURN count(a) as cnt", aid=aid)
single = res.single()
return (single["cnt"] > 0) if single else False
# ──────────────────────────────────────────
# 3. 메인 μ‹€ν–‰ (슀크립트둜 직접 호좜 μ‹œ)
# ──────────────────────────────────────────
def main() -> None:
global driver
driver = get_neo4j_driver()
# 1. λͺ¨λ“  μ—‘μ…€ 파일 λ‘œλ“œ ν›„ 병합 및 고유 κΈ°μ‚¬λ§Œ 필터링 (루트 및 scrapping 폴더 λͺ¨λ‘ 탐색)
xlsx_files = sorted(glob.glob("Articles_*.xlsx") + glob.glob(os.path.join("src", "graphBuilder", "scrapping", "Articles_*.xlsx")))
if not xlsx_files:
raise FileNotFoundError("Articles_*.xlsx 파일이 μ—†μŠ΅λ‹ˆλ‹€. finScrapping.pyλ₯Ό λ¨Όμ € μ‹€ν–‰ν•˜μ„Έμš”.")
dfs = []
for f in xlsx_files:
try:
dfs.append(pd.read_excel(f))
except Exception as e:
print(f"⚠️ {f} λ‘œλ“œ μ‹€νŒ¨: {e}")
df = pd.concat(dfs, ignore_index=True).drop_duplicates(subset=["url"])
print(f"βœ… λ‘œλ“œ μ™„λ£Œ: 총 {len(xlsx_files)}개 μ—‘μ…€ 파일 톡합 μ™„λ£Œ ({len(df)}건의 고유 기사 λŒ€μƒ)")
# 2. Neo4j μŠ€ν‚€λ§ˆ 생성 (μ‚­μ œν•˜μ§€ μ•Šκ³  μŠ€ν‚€λ§ˆλ§Œ μ€€λΉ„)
with driver.session() as s:
s.execute_write(setup_schema)
print("βœ… Neo4j μŠ€ν‚€λ§ˆ μ€€λΉ„ μ™„λ£Œ (κΈ°μ‘΄ 데이터 보쑴)")
# 3. μ—”ν‹°ν‹°/관계 μΆ”μΆœ 및 적재 (μ‹ κ·œ κΈ°μ‚¬λ§Œ 처리)
print(f"총 {len(df)}건 쀑 μ‹ κ·œ 기사 필터링 및 처리 μ‹œμž‘...")
for idx, row in df.iterrows():
aid = str(row.get("article_id", f"ART_{idx}"))
title = str(row.get("title", ""))
# 이미 적재된 기사인지 νŒλ³„
with driver.session() as s:
exists = s.execute_read(is_article_loaded, aid)
if exists:
print(f" ⏭️ [{idx + 1}/{len(df)}] 이미 적재됨 (μŠ€ν‚΅): {title[:35]}...")
continue
text = title + "\n" + str(row.get("content", ""))
state: ArticleState = dict(
article_id=aid,
title=title,
text=text,
is_ai_related=False,
entities=[],
relations=[],
retry_count=0,
reflection_feedback="",
relation_retry_count=0,
relation_feedback="",
)
out = pipeline.invoke(state)
if out["is_ai_related"]:
with driver.session() as s:
for entity in out["entities"]:
s.execute_write(upsert_entity, entity)
for r in out["relations"]:
s.execute_write(upsert_relation, r)
s.execute_write(upsert_article_and_mentions, row, out["entities"])
rel_cnt = len(out["relations"])
ent_cnt = len(out["entities"])
# μ—”ν‹°ν‹°κ°€ 2개 이상인데 관계가 μ—†μœΌλ©΄ κ²½κ³  ν‘œμ‹œ
rel_warn = " ⚠️ 관계=0" if ent_cnt >= 2 and rel_cnt == 0 else ""
print(
f" βœ… [{idx + 1}/{len(df)}] μ‹ κ·œ μ μž¬μ™„λ£Œ: {title[:35]}... "
f"| μ—”ν‹°ν‹°: {ent_cnt}개 | 관계: {rel_cnt}개{rel_warn}"
)
else:
print(f" ⏭️ [{idx + 1}/{len(df)}] AI λΉ„κ΄€λ ¨ (적재 μ œμ™Έ): {title[:35]}...")
print("\nβœ… μ—”ν‹°ν‹°/관계 μΆ”μΆœ 및 Neo4j 증뢄 적재 μ™„λ£Œ")
# 4. Content μ²­ν‚Ή + μž„λ² λ”© (μ‹ κ·œ κΈ°μ‚¬μ˜ 청크만 생성)
print("Content λ…Έλ“œ 생성 및 μ‹ κ·œ μž„λ² λ”© μ‹œμž‘...")
for idx, row in df.iterrows():
aid = str(row.get("article_id", f"ART_{idx}"))
# 이미 이 κΈ°μ‚¬μ˜ 청크가 μž„λ² λ”©λ˜μ–΄ μ—°κ²°λ˜μ–΄ μžˆλŠ”μ§€ 확인
with driver.session() as s:
res = s.run("MATCH (a:Article {article_id:$aid})-[:HAS_CHUNK]->(c:Content) RETURN count(c) as cnt", aid=aid)
single = res.single()
has_chunks = (single["cnt"] > 0) if single else False
if has_chunks:
continue
chunks = chunk_text(str(row.get("content", "")))
with driver.session() as s:
for i, chunk in enumerate(chunks):
cid = f"{aid}_chunk_{i}"
vec = embedder.embed_query(chunk)
s.run(
"MERGE (c:Content {content_id:$cid}) "
"SET c.chunk=$chunk, c.article_id=$aid, c.chunk_index=$i, c.embedding=$vec "
"WITH c MATCH (a:Article {article_id:$aid}) MERGE (a)-[:HAS_CHUNK]->(c)",
cid=cid,
chunk=chunk,
aid=aid,
i=i,
vec=vec,
)
print("βœ… Content λ…Έλ“œ μ‹ κ·œ μž„λ² λ”© 적재 μ™„λ£Œ")
# 5. 벑터 인덱슀 생성 (기쑴에 있으면 μ•Œμ•„μ„œ μƒλž΅λ¨)
create_vector_index(
driver,
INDEX_NAME,
label="Content",
embedding_property="embedding",
dimensions=1536,
similarity_fn="cosine",
)
print(f"βœ… 벑터 인덱슀 [{INDEX_NAME}] κ°±μ‹  및 검증 μ™„λ£Œ")
if __name__ == "__main__":
main()