This repository has been archived on 2024-08-16. You can view files and clone it, but cannot push or open issues or pull requests.
sota/sidestepper.py
Mark Joshwel e4639b03df tooling: faster repo dupe + std elapsed string gen
rewrite a few things on sidestepper so that we can get back the sim
and ignored directories found by the large file finding algorithn (LargeFileFilterResult)

from 23.5″ to 4.6″ (58″ overall to ~35″ overall) -- approx 5x faster rn
2024-07-27 02:28:57 +08:00

617 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# sota staircase SideStepper
# a somewhat fast .gitignore-respecting large file finder
# licence: 0BSD
from dataclasses import dataclass
from functools import cache
from multiprocessing import Manager, cpu_count
# noinspection PyProtectedMember
from multiprocessing.managers import ListProxy
from os import getenv
from os.path import abspath
from pathlib import Path
from subprocess import CompletedProcess
from subprocess import run as _run
from sys import argv, executable, stderr
from textwrap import indent
from time import time
from traceback import format_tb
from typing import Final, Generator, Generic, Iterable, Iterator, NamedTuple, TypeVar
# constants
INDENT = " "
REPO_DIR: Final[Path] = Path(__file__).parent
REPO_SOTAIGNORE: Final[Path] = REPO_DIR.joinpath(".sotaignore")
_SOTA_SIDESTEP_CHUNK_SIZE = getenv("SIDESTEP_CHUNK_SIZE")
SOTA_SIDESTEP_CHUNK_SIZE: Final[int] = (
int(_SOTA_SIDESTEP_CHUNK_SIZE)
if (
(_SOTA_SIDESTEP_CHUNK_SIZE is not None)
and (_SOTA_SIDESTEP_CHUNK_SIZE.isdigit())
)
else 16
)
_SOTA_SIDESTEP_MAX_WORKERS = getenv("SIDESTEP_MAX_WORKERS")
SOTA_SIDESTEP_MAX_WORKERS: Final[int] = (
int(_SOTA_SIDESTEP_MAX_WORKERS)
if (
(_SOTA_SIDESTEP_MAX_WORKERS is not None)
and (_SOTA_SIDESTEP_MAX_WORKERS.isdigit())
)
else cpu_count()
)
SOTA_SIDESTEP_LARGE_FILE_SIZE: Final[int] = 100000000 # 100mb
SOTA_SIDESTEP_PARALLEL: Final[bool] = getenv("SIDESTEP_PARALLEL") is not None
# define these before importing third-party modules because we use them in the import check
def generate_command_failure_message(cp: CompletedProcess) -> str:
return "\n".join(
[
f"\n\nfailure: command '{cp.args}' failed with exit code {cp.returncode}",
f"{INDENT}stdout:",
(
indent(text=cp.stdout.decode(), prefix=f"{INDENT}{INDENT}")
if (isinstance(cp.stdout, bytes) and (cp.stdout != b""))
else f"{INDENT}{INDENT}(no output)"
),
f"{INDENT}stderr:",
(
indent(text=cp.stderr.decode(), prefix=f"{INDENT}{INDENT}")
if (isinstance(cp.stderr, bytes) and (cp.stderr != b""))
else f"{INDENT}{INDENT}(no output)"
)
+ "\n",
]
)
def run(
command: str | list,
cwd: Path | str | None = None,
capture_output: bool = True,
give_input: str | None = None,
) -> CompletedProcess:
"""
exception-safe-ish wrapper around subprocess.run()
args:
command: str | list
the command to run
cwd: Path | str | None = None
the working directory
capture_output: bool = True
whether to capture the output
returns: CompletedProcess
the return object from subprocess.run()
"""
# noinspection PyBroadException
try:
cp = _run(
command,
shell=True if isinstance(command, list) else False,
cwd=cwd,
capture_output=capture_output,
input=give_input.encode() if give_input else None,
)
except Exception as run_exc:
print(
f"\n\nfailure: command '{command}' failed with exception",
f"{INDENT}{run_exc.__class__.__name__}: {run_exc}",
indent(text="\n".join(format_tb(run_exc.__traceback__)), prefix=INDENT),
sep="\n",
)
exit(-1)
return cp
# attempt to import third-party modules
# if they're not installed, prompt the user to optionally install them automatically
_could_not_import: list[str] = []
_could_not_import_exc: Exception | None = None
try:
from gitignore_parser import IgnoreRule, rule_from_pattern # type: ignore
except ImportError as _import_exc:
_could_not_import.append("gitignore_parser")
_could_not_import_exc = _import_exc
try:
# noinspection PyUnresolvedReferences
from tqdm import tqdm
# noinspection PyUnresolvedReferences
from tqdm.contrib.concurrent import process_map
except ImportError as _import_exc:
_could_not_import.append("tqdm")
_could_not_import_exc = _import_exc
if _could_not_import:
for module in _could_not_import:
print(
f"critical error: '{module}' is not installed, "
f"please run 'pip install {module}' to install it",
)
# install the missing modules
if input("\ninstall these with pip? y/n: ").lower() == "y":
print("installing...", end="", flush=True)
_cp = run([executable, "-m", "pip", "install", *_could_not_import])
if _cp.returncode != 0:
print(generate_command_failure_message(_cp))
exit(-1)
print(" done", flush=True)
# check if they were installed successfully
_cp = run(
[
executable,
"-c",
";".join([f"import {module}" for module in _could_not_import]),
]
)
if _cp.returncode != 0:
print(generate_command_failure_message(_cp))
print(
"critical error: post-install check failed. reverting installation...",
end="",
flush=True,
)
_cp = run([executable, "-m", "pip", "uninstall", *_could_not_import, "-y"])
if _cp.returncode != 0:
print(generate_command_failure_message(_cp))
print(" done", flush=True)
exit(-1)
elif __name__ == "__main__":
# rerun the script if we're running as one
exit(
run(
[executable, Path(__file__).absolute(), *argv[1:]], capture_output=False
).returncode
)
else:
# we're being imported, raise an error
raise EnvironmentError(
"automatic dependency installation successful"
) from _could_not_import_exc
A = TypeVar("A")
B = TypeVar("B")
class OneSided(Generic[A, B], NamedTuple):
"""
generic tuple with two elements, a and b, given by a generator
in which element 'a' is a constant and b is from an iterable/iterator
"""
a: A
b: B
def one_sided(a: A, bbb: Iterable[B]) -> Iterator[OneSided[A, B]]:
"""
generator that yields OneSided instances with a constant 'a' element
and elements from the given iterable/iterator 'bbb' as the 'b' element
"""
for b in bbb:
yield OneSided(a, b)
def generate_time_elapsed_string(time_taken: float) -> str:
"""generates a human-readable time-elapsed string from a time taken float"""
hours = int(time_taken // 3600)
minutes = int(time_taken % 3600 // 60)
seconds = int(time_taken % 60)
time_taken_string: str
if time_taken > 3600:
time_taken_string = f"{hours}h {minutes} {seconds}"
elif time_taken > 60:
time_taken_string = f"{minutes} {seconds}"
else:
time_taken_string = f"{time_taken:.2f}"
return time_taken_string
@dataclass(eq=True, frozen=True)
class SideStepIgnoreMatcher:
"""immutable gitignore matcher"""
root: Path
# (
# (.gitignore file directory path, (ignore rule, ...)),
# (.gitignore file directory path, (ignore rule, ...)),
# ...
# )
rules: tuple[tuple[Path, tuple[IgnoreRule, ...]], ...] = tuple()
def add_gitignore(self, gitignore: Path) -> "SideStepIgnoreMatcher":
"""returns a new SidestepIgnoreMatcher with rules from the given gitignore file"""
new_ruleset: list[IgnoreRule] = []
for line_no, line_text in enumerate(gitignore.read_text().splitlines()):
rule = rule_from_pattern(
pattern=line_text.rstrip("\n"),
base_path=Path(abspath(gitignore.parent)),
source=(gitignore, line_no),
)
if rule:
new_ruleset.append(rule)
return SideStepIgnoreMatcher(
root=self.root, rules=self.rules + ((gitignore.parent, tuple(new_ruleset)),)
)
def match(self, file: Path | str) -> bool:
"""returns True if the file is ignored by any of the rules in the gitignore files, False otherwise"""
matched = False
# check to see if the gitignore affects the file
for ignore_dir, ruleset in self.rules:
if str(ignore_dir) not in str(file):
continue
if not self._possibly_negated(ruleset):
matched = matched or any(r.match(file) for r in ruleset)
else:
for rule in reversed(ruleset):
if rule.match(file):
matched = matched or not rule.negation
return matched
def match_trytrytry(self, file: Path) -> Path | None:
"""
same as match, but also checks if the gitignore files ignore any parent directories;
horribly slow and dumb, thus the name 'trytrytry'
returns the ignored parent path if the file is ignored, None otherwise
"""
trytrytry: Path = file
while trytrytry != trytrytry.parent:
if self.match(trytrytry):
return trytrytry
if len(self.root.parts) == len(trytrytry.parts):
return None
trytrytry = trytrytry.parent
return None
@cache
def _possibly_negated(self, ruleset: tuple[IgnoreRule, ...]) -> bool:
return any(rule.negation for rule in ruleset)
@dataclass(eq=True, frozen=True)
class LargeFileFilterResult:
"""
result data structure of the large file filter
files: tuple[Path, ...]
large files found
matcher: SideStepIgnoreMatcher
the *ignore matcher instance
ignore_directories: tuple[Path, ...]
directories that were ignored
"""
files: tuple[Path, ...]
matcher: SideStepIgnoreMatcher
ignore_directories: tuple[Path, ...]
def _parallel() -> bool:
"""
helper function to determine if we should use multiprocessing;
checks the environment variable SIDESTEP_PARALLEL and the command line arguments
returns: bool
"""
if SOTA_SIDESTEP_PARALLEL:
return True
elif "--parallel" in argv:
return True
return False
def _iter_files(
target: Path,
pattern: str = "*",
) -> Generator[Path, None, None]:
"""
generator that yields files in the target directory excluding '.git/**'
args:
target: Path
the directory to search in
pattern: str = "*"
the file pattern to search for
yields: Path
file in the target directory
"""
repo_dir = target.joinpath(".git/")
for target_file in target.rglob(pattern):
if not target_file.is_file():
continue
if repo_dir in target_file.parents:
continue
yield target_file
def iter_files(target_dir: Path) -> tuple[tuple[Path, ...], SideStepIgnoreMatcher]:
"""
get all non-git files and register .gitignore files
args:
target_dir: Path
the directory to search in
returns: tuple[tuple[Path, ...], SideStepIgnoreMatcher]
tuple of all files in the target directory and a SideStepIgnoreMatcher instance
"""
all_files: list[Path] = []
sim = SideStepIgnoreMatcher(root=target_dir)
for file in tqdm(
_iter_files(target_dir),
desc="1 pre | finding large files - scanning (1/3)",
leave=False,
):
all_files.append(file)
if file.name == ".gitignore":
sim = sim.add_gitignore(file)
return tuple(all_files), sim
def _filter_sim_match(
os: OneSided[tuple[list[Path], SideStepIgnoreMatcher], Path],
) -> Path | None:
"""first filter pass function, thread-safe-ish"""
(ignore_dirs, sim), file = os.a, os.b
ignored = False
for ign_dir in ignore_dirs:
if str(ign_dir) in str(file):
ignored = True
break
if (not ignored) and ((ttt := sim.match_trytrytry(file)) is not None):
if ttt.is_dir() and ttt not in ignore_dirs:
ignore_dirs.append(ttt)
return None
return file
def _filter_ign_dirs_and_size(os: OneSided[list[Path], Path]) -> Path | None:
"""second filter pass function, thread-safe-ish"""
ignore_dirs, file = os.a, os.b
for ign_dir in ignore_dirs:
if str(ign_dir) in str(file):
return None
else:
# we're here because the file is not ignored by any of the rules
# (the 'else' clause is only executed if the for loop completes without breaking)
if file.stat().st_size > SOTA_SIDESTEP_LARGE_FILE_SIZE:
return file
return None
def _find_large_files_single(
files: tuple[Path, ...], sim: SideStepIgnoreMatcher
) -> LargeFileFilterResult:
"""single-process implementation of find_large_files"""
ignore_dirs: list[Path] = []
_files = []
for fsm_os in tqdm(
one_sided(a=(ignore_dirs, sim), bbb=files),
desc="1 pre | finding large files - iod-ttt file matching (2/3)",
leave=False,
total=len(files),
):
if f := _filter_sim_match(fsm_os):
_files.append(f)
large_files = []
for fds_os in tqdm(
one_sided(a=ignore_dirs, bbb=_files),
desc="1 pre | finding large files - dir rematching (3/3)",
leave=False,
total=len(_files),
):
f = _filter_ign_dirs_and_size(fds_os)
if f is not None:
large_files.append(f)
return LargeFileFilterResult(
files=tuple(large_files),
matcher=sim,
ignore_directories=tuple(ignore_dirs),
)
def _find_large_files_parallel(
files: tuple[Path, ...], sim: SideStepIgnoreMatcher
) -> LargeFileFilterResult:
"""multiprocess implementation of find_large_files"""
manager = Manager()
ignore_dirs: ListProxy[Path] = manager.list()
_files: list[Path] = [
f
for f in process_map(
_filter_sim_match,
one_sided(a=(ignore_dirs, sim), bbb=files),
desc="1 pre | finding large files - iod-ttt file matching (2/3)",
leave=False,
chunksize=SOTA_SIDESTEP_CHUNK_SIZE,
max_workers=SOTA_SIDESTEP_MAX_WORKERS,
total=len(files),
)
if f is not None
]
large_files: tuple[Path, ...] = tuple(
[
f
for f in process_map(
_filter_ign_dirs_and_size,
one_sided(a=ignore_dirs, bbb=_files),
desc="1 pre | finding large files - dir rematching (3/3)",
leave=False,
chunksize=SOTA_SIDESTEP_CHUNK_SIZE,
max_workers=SOTA_SIDESTEP_MAX_WORKERS,
total=len(files),
)
if f is not None
]
)
return LargeFileFilterResult(
files=large_files,
matcher=sim,
ignore_directories=tuple(ignore_dirs),
)
def find_large_files(
files: tuple[Path, ...], matcher: SideStepIgnoreMatcher
) -> LargeFileFilterResult:
"""
finds all files larger than a certain size in a directory;
uses SOTA_SIDESTEP_LARGE_FILE_SIZE as the size threshold
args:
files: tuple[Path, ...]
list of files to search through
matcher: SideStepIgnoreMatcher
the ignore matcher instance from iter_files()
returns: LargeFileFilterResult
"""
if _parallel():
return _find_large_files_parallel(files, matcher)
else:
return _find_large_files_single(files, matcher)
def write_sotaignore(large_files: tuple[Path, ...]) -> bool:
"""
writes out a .sotaignore file with a list of large files,
updating an existing one if already present
args:
large_files: list[Path]
list of large files
returns: bool
True if anything was written, False otherwise (no changes)
"""
if not large_files:
return False
old_sotaignore = (
REPO_SOTAIGNORE.read_text().strip().splitlines()
if REPO_SOTAIGNORE.exists()
else []
)
new_sotaignore = [ln for ln in old_sotaignore] + [
lf.relative_to(REPO_DIR).as_posix()
for lf in large_files
if lf.relative_to(REPO_DIR).as_posix() not in old_sotaignore
]
if new_sotaignore == old_sotaignore:
return False
# check if the sotaignore file starts with a comment
if new_sotaignore and not new_sotaignore[0].startswith("#"):
for line in [
"# .sotaignore file generated by sota staircase ReStepper/SideStepper",
"# anything here either can't or shouldn't be uploaded github",
"# unless you know what you're doing, don't edit this file! >:(",
][::-1]:
new_sotaignore.insert(0, line)
REPO_SOTAIGNORE.touch(exist_ok=True)
REPO_SOTAIGNORE.write_text("\n".join(new_sotaignore) + "\n")
return True
def main() -> None:
"""command-line entry function"""
print(
"\nsota staircase SideStepper",
f" repo root : {REPO_DIR.relative_to(Path.cwd())}",
(
f" .sotaignore : {REPO_SOTAIGNORE.relative_to(Path.cwd())} "
f"({'exists' if REPO_SOTAIGNORE.exists() else 'does not exist'})"
),
f" parallel? : {'yes' if _parallel() else 'no'}\n",
sep="\n",
file=stderr,
)
cumulative_start_time = time()
print(f"1/3{INDENT}pre-scanning repository... ", end="", file=stderr)
start_time = time()
files, sim = iter_files(REPO_DIR)
end_time = time()
print(
f"1/3{INDENT}pre-scanning repository... "
f"done in {generate_time_elapsed_string(end_time - start_time)} "
f"(found {len(files)})",
file=stderr,
)
print(f"2/3{INDENT}finding large files... ", end="", file=stderr)
start_time = time()
large_files = find_large_files(files, sim).files
end_time = time()
print(
f"2/3{INDENT}finding large files... "
f"done in {generate_time_elapsed_string(end_time - start_time)} "
f"(found {len(large_files)})",
file=stderr,
)
print(f"3/3{INDENT}writing .sotaignore file... ", end="", file=stderr)
start_time = time()
was_written = write_sotaignore(large_files)
end_time = time()
print(
("done" if was_written else "skipped")
+ f" in {generate_time_elapsed_string(end_time - start_time)}\n",
file=stderr,
)
for file in large_files:
print(file.relative_to(REPO_DIR))
cumulative_end_time = time()
print(
f"\n--- done! took {generate_time_elapsed_string(cumulative_end_time - cumulative_start_time)}~ "
"☆*: .。. o(≧▽≦)o .。.:*☆ ---",
flush=True,
file=stderr,
)
if __name__ == "__main__":
main()