Spaces:
Runtime error
Runtime error
| import ast | |
| from langchain.schema import Document | |
| def chunk_python_code_with_metadata(source_code, reference): | |
| """ | |
| Entry point method to process the Python file. | |
| It invokes the iterate_ast function. | |
| """ | |
| documents = [] | |
| print(f"Processing file: {reference}") | |
| iterate_ast(source_code, documents, reference) | |
| # Determine usage based on the reference path | |
| if reference.startswith("kadi_apy/lib/"): | |
| usage = "library" | |
| elif reference.startswith("kadi_apy/cli/"): | |
| usage = "cli_library" | |
| elif reference.startswith("doc/"): | |
| usage = "doc" | |
| else: | |
| usage = "undefined" | |
| # Add metadata for usage to all documents | |
| for doc in documents: | |
| doc.metadata["reference"] = reference | |
| doc.metadata["usage"] = usage # Add the determined usage metadata | |
| return documents | |
| def iterate_ast(source_code, documents, reference): | |
| """ | |
| Parses the AST of the given Python file and delegates | |
| handling to specific methods based on node types. | |
| """ | |
| # Parse the source code into an abstract syntax tree (AST) | |
| tree = ast.parse(source_code, filename=reference) | |
| # Gather all top-level imports for later use | |
| imports_dict = extract_imports(tree) | |
| first_level_nodes = list(ast.iter_child_nodes(tree)) | |
| # Check if there are no first-level nodes | |
| if not first_level_nodes: | |
| handle_no_first_level_node_found(documents, source_code, imports_dict, reference) | |
| return | |
| all_imports = all(isinstance(node, (ast.Import, ast.ImportFrom)) for node in first_level_nodes) | |
| if all_imports: | |
| handle_first_level_imports_only(documents, source_code, imports_dict, reference) | |
| # Iterate over first-level nodes | |
| for first_level_node in ast.iter_child_nodes(tree): | |
| if isinstance(first_level_node, ast.ClassDef): | |
| handle_first_level_class(first_level_node, documents, source_code, imports_dict) | |
| elif isinstance(first_level_node, ast.FunctionDef): | |
| handle_first_level_func(first_level_node, documents, source_code, imports_dict) | |
| elif isinstance(first_level_node, ast.Assign): | |
| handle_first_level_assign(first_level_node, documents, source_code, imports_dict) | |
| def handle_first_level_imports_only(documents, source_code, imports_dict, reference): | |
| # Check if the file path before ".py" is "__init__" | |
| if reference.endswith("__init__.py"): | |
| type = "__init__-file" | |
| else: | |
| type = "undefined" | |
| # Create and store a Document with the full source code | |
| doc = Document( | |
| page_content=source_code, | |
| metadata={ | |
| "type": type, | |
| "imports": imports_dict | |
| } | |
| ) | |
| documents.append(doc) | |
| def extract_imports(tree): | |
| """ | |
| Extracts all import statements from the AST tree and organizes them | |
| into a dictionary keyed by their fully qualified names for later analysis. | |
| """ | |
| imports_dict = {} | |
| for node in ast.walk(tree): | |
| if isinstance(node, ast.Import): | |
| for alias in node.names: | |
| imports_dict[alias.name] = alias.name | |
| elif isinstance(node, ast.ImportFrom): | |
| module = node.module if node.module else "" | |
| for alias in node.names: | |
| full_name = f"{module}.{alias.name}" if module else alias.name | |
| imports_dict[alias.name] = full_name | |
| return imports_dict | |
| def analyze_imports(node, imports_dict): | |
| """ | |
| Analyzes the given node's body and signature to find relevant imports. | |
| """ | |
| relevant_imports = set() | |
| for sub_node in ast.walk(node): | |
| if isinstance(sub_node, ast.Name) and sub_node.id in imports_dict: | |
| relevant_imports.add(imports_dict[sub_node.id]) | |
| return list(relevant_imports) | |
| def handle_not_yet_defined_first_level_cases(documents, source_code, imports_dict): | |
| if source_code: | |
| doc = Document( | |
| page_content=source_code, | |
| metadata={ | |
| "type": "undefined", | |
| "imports": imports_dict | |
| } | |
| ) | |
| documents.append(doc) | |
| def handle_no_first_level_node_found(documents, source_code, imports_dict, reference): | |
| """ | |
| Handles cases where no top-level nodes are found in the AST. | |
| Stores the full content (likely comments) in a Document object | |
| with metadata indicating type 'no code' or 'init' based on the reference. | |
| """ | |
| # Check if the file path before ".py" is "__init__" | |
| if reference.endswith("__init__.py"): | |
| type = "__init__-file" | |
| else: | |
| type = "undefined" | |
| # Create and store a Document with the full source code | |
| doc = Document( | |
| page_content=source_code, | |
| metadata={ | |
| "type": type, | |
| "imports": imports_dict | |
| } | |
| ) | |
| documents.append(doc) | |
| def handle_first_level_assign(assign_node, documents, source_code, imports_dict): | |
| """ | |
| Handles assignment statements at the first level of the AST by storing them | |
| in a Document object with metadata, including relevant imports. | |
| """ | |
| # Extract assignment source code | |
| assign_start_line = assign_node.lineno | |
| assign_end_line = assign_node.end_lineno | |
| assign_source = '\n'.join(source_code.splitlines()[assign_start_line-1:assign_end_line]) | |
| # Extract relevant imports for this assignment | |
| assign_imports = analyze_imports(assign_node, imports_dict) | |
| # Create and store Document for the assignment | |
| doc = Document( | |
| page_content=assign_source, | |
| metadata={ | |
| "type": "Assign", | |
| "imports": assign_imports | |
| } | |
| ) | |
| documents.append(doc) | |
| def handle_first_level_class(class_node, documents, source_code, imports_dict): | |
| """ | |
| Handles classes at the first level of the AST by storing them | |
| in a Document object with metadata, including relevant imports. | |
| """ | |
| # Extract class source code | |
| class_start_line = class_node.lineno | |
| # Find the line where the first function (def) starts or the next top-level node | |
| class_body_lines = [child.lineno for child in class_node.body if isinstance(child, ast.FunctionDef)] | |
| class_end_line = min(class_body_lines, default=class_node.end_lineno) - 1 # Use `-1` to exclude the next node | |
| # Generate the class source code | |
| class_source = '\n'.join(source_code.splitlines()[class_start_line-1:class_end_line]) | |
| # Extract relevant imports for this class | |
| class_imports = analyze_imports(class_node, imports_dict) | |
| # Create and store Document for the class | |
| doc = Document( | |
| page_content=class_source, | |
| metadata={ | |
| "type": "class", | |
| "class": class_node.name, | |
| "visibility": "public", | |
| "imports": class_imports, | |
| } | |
| ) | |
| documents.append(doc) | |
| # Handle methods within the class | |
| for second_level_node in ast.iter_child_nodes(class_node): | |
| if isinstance(second_level_node, ast.FunctionDef): | |
| # Extract method source code | |
| method_start_line = ( | |
| second_level_node.decorator_list[0].lineno | |
| if second_level_node.decorator_list else second_level_node.lineno | |
| ) | |
| method_end_line = second_level_node.end_lineno | |
| method_source = '\n'.join(source_code.splitlines()[method_start_line-1:method_end_line]) | |
| # Determine visibility metadata | |
| visibility = "internal" if second_level_node.name.startswith("_") else "public" | |
| # Extract relevant imports for this method | |
| method_imports = analyze_imports(second_level_node, imports_dict) | |
| # Create and store Document | |
| doc = Document( | |
| page_content=method_source, | |
| metadata={ | |
| "type": "method", | |
| "method": second_level_node.name, | |
| "visibility": "visibility", | |
| "imports": method_imports, | |
| "class": class_node.name | |
| } | |
| ) | |
| documents.append(doc) | |
| def handle_first_level_func(function_node, documents, source_code, imports_dict): | |
| """ | |
| Handles functions at the first level of the AST by storing them | |
| in a Document object with metadata, including relevant imports. | |
| """ | |
| # Extract function source code | |
| function_start_line = ( | |
| function_node.decorator_list[0].lineno | |
| if function_node.decorator_list else function_node.lineno | |
| ) | |
| function_end_line = function_node.end_lineno | |
| function_source = '\n'.join(source_code.splitlines()[function_start_line-1:function_end_line]) | |
| # Determine visibility metadata | |
| visibility = "internal" if function_node.name.startswith("_") else "public" | |
| # Check if the function is a CLI command (e.g., decorated with `@apy_command`) | |
| is_command = any( | |
| decorator.id == "apy_command" # Check decorator name | |
| for decorator in function_node.decorator_list | |
| if hasattr(decorator, "id") # Ensure the decorator has an identifier | |
| ) | |
| # Extract relevant imports for this function | |
| function_imports = analyze_imports(function_node, imports_dict) | |
| # Create and store Document | |
| if is_command: | |
| doc = Document( | |
| page_content=function_source, | |
| metadata={ | |
| "type": "command", | |
| "command": function_node.name, | |
| "visibility": "public", | |
| "imports": function_imports | |
| } | |
| ) | |
| else: | |
| doc = Document( | |
| page_content=function_source, | |
| metadata={ | |
| "type": "function", | |
| "method": function_node.name, | |
| "visibility": visibility, | |
| "imports": function_imports | |
| } | |
| ) | |
| documents.append(doc) | |
| # Example usage | |
| #file_path = r"C:\Users\Anwender\Downloads\exampleScript.py" | |
| #with open(file_path, "r", encoding="utf-8") as file: | |
| # source_code = file.read() | |
| #chunkPythonFiles(source_code, file_path) | |
| import os | |
| def process_folder(folder_path): | |
| # Initialize a counter for the number of Python files | |
| python_file_count = 0 | |
| docsT = [] | |
| # Walk through all subdirectories and files in the folder | |
| for root, _, files in os.walk(folder_path): | |
| for file_name in files: | |
| # Create the full file path | |
| file_path = os.path.join(root, file_name) | |
| #print(file_path) | |
| # Ensure it's a Python file | |
| if file_name.endswith(".py"): | |
| python_file_count += 1 # Increment the counter | |
| with open(file_path, "r", encoding="utf-8") as file: | |
| source_code = file.read() | |
| print(file_name) | |
| # Call your function | |
| docs = chunkPythonFiles(source_code, file_path) | |
| print("HWHWHWWHWHWHWH!:" ,len(docs)) | |
| docsT.extend(docs) | |
| # Print the total number of Python files processed | |
| print(f"Total Python files processed: {python_file_count}") | |
| print(f"Total docs files processed: {len(docsT)}") | |