Skip to content
Snippets Groups Projects
Commit fbec4743 authored by Nathan Daelman's avatar Nathan Daelman
Browse files

Apply file reference gathering recursively

parent 7e82dd46
No related branches found
No related tags found
No related merge requests found
from typing import Callable, Optional, Any
import os
from typing import Callable, Optional
# Type aliases for clarity
StartCondition = Callable[[str], bool]
EndCondition = Callable[[str], bool]
ProcessResult = tuple[list[str], list[str]]
ProcessResult = tuple[list[str], list[list[str]]]
LineProcessor = Callable[[str], str]
FileHandler = Callable[
[str], set[str]
] # TODO: narrow down in terms of function outputs
def process_lines(
......@@ -21,21 +26,73 @@ def process_lines(
start = start_condition or default_start_condition
end = end_condition or default_end_condition
processed: list[str] = []
remaining: list[str] = []
start_processing = False
def _process(lines: list[str], start_processing: bool) -> ProcessResult:
remaining: list[str] = []
processed: list[list[str]] = []
for line in lines:
if not start_processing and start(line):
start_processing = True
if start_processing:
if end(line):
processed.append(line.strip().strip("\n"))
remaining = lines[lines.index(line) + 1 :]
for i, line in enumerate(lines):
if not start_processing and start(line):
start_processing = True
remaining_segment, segment = _process(lines[i:], True)
remaining.extend(remaining_segment)
processed.extend(segment)
break
processed.append(line)
else:
remaining.append(line)
return remaining, processed
if start_processing:
if end(line):
processed.append([line.strip().strip("\n")] + lines[:i])
remaining = lines[i + 1:]
more_remaining, more_segments = _process(remaining, False)
remaining = more_remaining
processed.extend(more_segments)
break
processed.append([line])
return remaining, processed
return _process(lines, False)
def find_file(root_directory: str, file_reference: str) -> Optional[str]:
for dirpath, _, filenames in os.walk(root_directory):
if file_reference in filenames:
return os.path.join(dirpath, file_reference)
return None
def read_file(file_path: str, line_processor: LineProcessor) -> list[str]:
with open(file_path, "r") as file:
contents = [line_processor(line) for line in file.readlines()]
file.close()
return contents
def clean_line(line: str) -> str:
return line.strip("\n").strip()
def trace_file_references(
file_name: str,
search_root: str,
file_handler: FileHandler,
file_references: set[str],
) -> set[str]:
"""Recursively find file references extracted by a file handler."""
# Make a copy of file_references to avoid modifying the original set
updated_references = set(file_references)
file_path = find_file(search_root, file_name)
if file_path is not None:
updated_references.add(file_name)
new_file_references = file_handler(file_path)
for new_file_reference in new_file_references:
# Recursively update the set
updated_references = updated_references.union(
trace_file_references(
new_file_reference, search_root, file_handler, updated_references
)
)
return updated_references
from dataclasses import dataclass
import re
from typing import Optional
from general_parser import process_lines
from general_parser import ProcessResult, process_lines, read_file, clean_line
# Define data structures
@dataclass
class Comment:
message: str
@dataclass
class Formula:
value: str
filename: str
@dataclass
class Includes:
value: str
filename: str
# matching functions
def is_comment_started(line: str) -> bool:
return line.startswith("(*")
def is_comment_ended(line: str) -> bool:
return line.strip().strip("\n").endswith("*)")
def is_file_reference(line: str) -> bool:
return line.startswith("$include")
def is_formula_start(line: str) -> bool:
return ":=" in line
def is_formula_end(line: str) -> bool:
return line.strip().strip("\n").endswith(":")
# Finding functions
def find_comment(remaining: list[str]) -> tuple[list[str], list[str]]:
return process_lines(remaining, is_comment_started, is_comment_ended)
def find_file_reference(remaining: list[str]) -> tuple[list[str], list[str]]:
return process_lines(remaining, is_file_reference, None)
def find_formula(remaining: list[str]) -> tuple[list[str], list[str]]:
return process_lines(remaining, is_formula_start, is_formula_end)
# Processing functions
def process_file_reference(line: str) -> Optional[str]:
......@@ -50,3 +49,28 @@ def process_file_reference(line: str) -> Optional[str]:
if match:
return match.group(1)
return None
# Finding functions
def find_comments(remaining: list[str]) -> ProcessResult:
return process_lines(remaining, is_comment_started, is_comment_ended)
def find_file_references(remaining: list[str]) -> tuple[list[str], set[str]]:
remaining, extracted = process_lines(remaining, is_file_reference, None)
processed: set[str] = set()
for line_wrapped in extracted:
line = line_wrapped[0]
if (file_reference := process_file_reference(line)) is not None:
processed.add(file_reference)
return remaining, processed
def find_formulas(remaining: list[str]) -> ProcessResult:
return process_lines(remaining, is_formula_start, is_formula_end)
# Composed, ready-made functions
def handler_file_reference(file_path: str) -> set[str]:
return find_file_references(read_file(file_path, clean_line))[1]
import pytest
import yaml
from general_parser import trace_file_references
from maple_parser import handler_file_reference
# Read the YAML file
with open("config.yaml", "r") as file:
config = yaml.safe_load(file)
file.close()
libxc_src = config["libxc"]["path"]
gga_exc = f"{libxc_src}/maple/gga_exc/"
@pytest.fixture
def gga_k_apbe():
with open(f"{libxc_src}/maple/gga_exc/gga_k_apbe.mpl", "r") as file:
contents = [line.strip("\n") for line in file.readlines()]
file.close()
return contents
@pytest.mark.parametrize(
"file_name, search_root, expected",
[
(
"gga_k_apbe.mpl",
gga_exc,
{"gga_k_apbe.mpl", "gga_x_pbe.mpl"},
), # serial example
(
"gga_x_bpccac.mpl",
gga_exc,
{
"gga_x_bpccac.mpl",
"gga_x_pbe.mpl",
"gga_x_pw91.mpl",
},
), # parallel example
],
)
def test_recursion(file_name: str, search_root: str, expected: list[str]):
assert (
trace_file_references(file_name, search_root, handler_file_reference, set())
== expected
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment