diff --git a/.github/workflows/sync.yml b/.github/workflows/sync.yml new file mode 100644 index 0000000..4f98013 --- /dev/null +++ b/.github/workflows/sync.yml @@ -0,0 +1,39 @@ +name: "sync from forge.joshwel.co" +on: + workflow_dispatch: + schedule: + - cron: "0 * * * *" # every hour +permissions: + contents: write + +jobs: + sync: + runs-on: ubuntu-latest + steps: + - uses: AdityaGarg8/remove-unwanted-software@v4.1 + with: + remove-dotnet: 'true' + remove-android: 'true' + remove-haskell: 'true' + remove-codeql: 'true' + remove-docker-images: 'true' + remove-large-packages: 'true' + remove-cached-tools: 'true' + + - uses: cachix/install-nix-action@v27 + with: + nix_path: nixpkgs=channel:nixos-unstable + github_access_token: ${{ secrets.GITHUB_TOKEN }} + - uses: DeterminateSystems/magic-nix-cache-action@main + + - name: clone forge.joshwel.co/Ryan/SSLR + run: | + mkdir -p ${{ runner.temp }}/SSLR + git clone https://forge.joshwel.co/Ryan/SSLR.git ${{ runner.temp }}/SSLR + + - name: restep + env: + SS_RESTEPPER_TOKEN: ${{ secrets.PAT }} + run: | + cd ${{ runner.temp }}/SSLR + nix develop --command python sync.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..c18dd8d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +__pycache__/ diff --git a/flake.lock b/flake.lock new file mode 100644 index 0000000..240098e --- /dev/null +++ b/flake.lock @@ -0,0 +1,61 @@ +{ + "nodes": { + "flake-utils": { + "inputs": { + "systems": "systems" + }, + "locked": { + "lastModified": 1710146030, + "narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=", + "owner": "numtide", + "repo": "flake-utils", + "rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a", + "type": "github" + }, + "original": { + "owner": "numtide", + "repo": "flake-utils", + "type": "github" + } + }, + "nixpkgs": { + "locked": { + "lastModified": 1722062969, + "narHash": "sha256-QOS0ykELUmPbrrUGmegAUlpmUFznDQeR4q7rFhl8eQg=", + "owner": "NixOS", + "repo": "nixpkgs", + "rev": "b73c2221a46c13557b1b3be9c2070cc42cf01eb3", + "type": "github" + }, + "original": { + "owner": "NixOS", + "ref": "nixos-unstable", + "repo": "nixpkgs", + "type": "github" + } + }, + "root": { + "inputs": { + "flake-utils": "flake-utils", + "nixpkgs": "nixpkgs" + } + }, + "systems": { + "locked": { + "lastModified": 1681028828, + "narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=", + "owner": "nix-systems", + "repo": "default", + "rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e", + "type": "github" + }, + "original": { + "owner": "nix-systems", + "repo": "default", + "type": "github" + } + } + }, + "root": "root", + "version": 7 +} diff --git a/flake.nix b/flake.nix new file mode 100644 index 0000000..fdc37f2 --- /dev/null +++ b/flake.nix @@ -0,0 +1,28 @@ +{ + description = "flake for running the sota staircase ReStepper"; + + inputs = { + nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable"; + flake-utils.url = "github:numtide/flake-utils"; + }; + + outputs = { self, nixpkgs, flake-utils }: + flake-utils.lib.eachDefaultSystem (system: + let + pkgs = nixpkgs.legacyPackages.${system}; + in + with pkgs; { + devShells.default = mkShellNoCC { + buildInputs = [ + git + git-lfs + git-filter-repo + (python312.withPackages (python-pkgs: [ + python-pkgs.tqdm + python-pkgs.gitignore-parser + ])) + ]; + }; + } + ); +} diff --git a/sidestepper.py b/sidestepper.py new file mode 100644 index 0000000..db26b85 --- /dev/null +++ b/sidestepper.py @@ -0,0 +1,616 @@ +# sota staircase SideStepper +# a somewhat fast .gitignore-respecting large file finder +# licence: 0BSD + +from dataclasses import dataclass +from functools import cache +from multiprocessing import Manager, cpu_count + +# noinspection PyProtectedMember +from multiprocessing.managers import ListProxy +from os import getenv +from os.path import abspath +from pathlib import Path +from subprocess import CompletedProcess +from subprocess import run as _run +from sys import argv, executable, stderr +from textwrap import indent +from time import time +from traceback import format_tb +from typing import Final, Generator, Generic, Iterable, Iterator, NamedTuple, TypeVar + +# constants +INDENT = " " +REPO_DIR: Final[Path] = Path(__file__).parent +REPO_SOTAIGNORE: Final[Path] = REPO_DIR.joinpath(".sotaignore") +_SOTA_SIDESTEP_CHUNK_SIZE = getenv("SIDESTEP_CHUNK_SIZE") +SOTA_SIDESTEP_CHUNK_SIZE: Final[int] = ( + int(_SOTA_SIDESTEP_CHUNK_SIZE) + if ( + (_SOTA_SIDESTEP_CHUNK_SIZE is not None) + and (_SOTA_SIDESTEP_CHUNK_SIZE.isdigit()) + ) + else 16 +) +_SOTA_SIDESTEP_MAX_WORKERS = getenv("SIDESTEP_MAX_WORKERS") +SOTA_SIDESTEP_MAX_WORKERS: Final[int] = ( + int(_SOTA_SIDESTEP_MAX_WORKERS) + if ( + (_SOTA_SIDESTEP_MAX_WORKERS is not None) + and (_SOTA_SIDESTEP_MAX_WORKERS.isdigit()) + ) + else cpu_count() +) +SOTA_SIDESTEP_LARGE_FILE_SIZE: Final[int] = 100000000 # 100mb +SOTA_SIDESTEP_PARALLEL: Final[bool] = getenv("SIDESTEP_PARALLEL") is not None + + +# define these before importing third-party modules because we use them in the import check +def generate_command_failure_message(cp: CompletedProcess) -> str: + return "\n".join( + [ + f"\n\nfailure: command '{cp.args}' failed with exit code {cp.returncode}", + f"{INDENT}stdout:", + ( + indent(text=cp.stdout.decode(), prefix=f"{INDENT}{INDENT}") + if (isinstance(cp.stdout, bytes) and (cp.stdout != b"")) + else f"{INDENT}{INDENT}(no output)" + ), + f"{INDENT}stderr:", + ( + indent(text=cp.stderr.decode(), prefix=f"{INDENT}{INDENT}") + if (isinstance(cp.stderr, bytes) and (cp.stderr != b"")) + else f"{INDENT}{INDENT}(no output)" + ) + + "\n", + ] + ) + + +def run( + command: str | list, + cwd: Path | str | None = None, + capture_output: bool = True, + give_input: str | None = None, +) -> CompletedProcess: + """ + exception-safe-ish wrapper around subprocess.run() + + args: + command: str | list + the command to run + cwd: Path | str | None = None + the working directory + capture_output: bool = True + whether to capture the output + + returns: CompletedProcess + the return object from subprocess.run() + """ + + # noinspection PyBroadException + try: + cp = _run( + command, + shell=False if isinstance(command, list) else True, + cwd=cwd, + capture_output=capture_output, + input=give_input.encode() if give_input else None, + ) + except Exception as run_exc: + print( + f"\n\nfailure: command '{command}' failed with exception", + f"{INDENT}{run_exc.__class__.__name__}: {run_exc}", + indent(text="\n".join(format_tb(run_exc.__traceback__)), prefix=INDENT), + sep="\n", + ) + exit(-1) + return cp + + +# attempt to import third-party modules +# if they're not installed, prompt the user to optionally install them automatically +_could_not_import: list[str] = [] +_could_not_import_exc: Exception | None = None + +try: + from gitignore_parser import IgnoreRule, rule_from_pattern # type: ignore +except ImportError as _import_exc: + _could_not_import.append("gitignore_parser") + _could_not_import_exc = _import_exc + +try: + # noinspection PyUnresolvedReferences + from tqdm import tqdm + + # noinspection PyUnresolvedReferences + from tqdm.contrib.concurrent import process_map +except ImportError as _import_exc: + _could_not_import.append("tqdm") + _could_not_import_exc = _import_exc + +if _could_not_import: + for module in _could_not_import: + print( + f"critical error: '{module}' is not installed, " + f"please run 'pip install {module}' to install it", + ) + + # install the missing modules + if input("\ninstall these with pip? y/n: ").lower() == "y": + print("installing...", end="", flush=True) + _cp = run([executable, "-m", "pip", "install", *_could_not_import]) + if _cp.returncode != 0: + print(generate_command_failure_message(_cp)) + exit(-1) + print(" done", flush=True) + + # check if they were installed successfully + _cp = run( + [ + executable, + "-c", + ";".join([f"import {module}" for module in _could_not_import]), + ] + ) + if _cp.returncode != 0: + print(generate_command_failure_message(_cp)) + + print( + "critical error: post-install check failed. reverting installation...", + end="", + flush=True, + ) + _cp = run([executable, "-m", "pip", "uninstall", *_could_not_import, "-y"]) + if _cp.returncode != 0: + print(generate_command_failure_message(_cp)) + print(" done", flush=True) + + exit(-1) + + elif __name__ == "__main__": + # rerun the script if we're running as one + exit( + run( + [executable, Path(__file__).absolute(), *argv[1:]], capture_output=False + ).returncode + ) + + else: + # we're being imported, raise an error + raise EnvironmentError( + "automatic dependency installation successful" + ) from _could_not_import_exc + +A = TypeVar("A") +B = TypeVar("B") + + +class OneSided(Generic[A, B], NamedTuple): + """ + generic tuple with two elements, a and b, given by a generator + in which element 'a' is a constant and b is from an iterable/iterator + """ + + a: A + b: B + + +def one_sided(a: A, bbb: Iterable[B]) -> Iterator[OneSided[A, B]]: + """ + generator that yields OneSided instances with a constant 'a' element + and elements from the given iterable/iterator 'bbb' as the 'b' element + """ + for b in bbb: + yield OneSided(a, b) + + +def generate_time_elapsed_string(time_taken: float) -> str: + """generates a human-readable time-elapsed string from a time taken float""" + hours = int(time_taken // 3600) + minutes = int(time_taken % 3600 // 60) + seconds = int(time_taken % 60) + + time_taken_string: str + + if time_taken > 3600: + time_taken_string = f"{hours}h {minutes}′ {seconds}″" + elif time_taken > 60: + time_taken_string = f"{minutes}′ {seconds}″" + else: + time_taken_string = f"{time_taken:.2f}″" + + return time_taken_string + + +@dataclass(eq=True, frozen=True) +class SideStepIgnoreMatcher: + """immutable gitignore matcher""" + + root: Path + # ( + # (.gitignore file directory path, (ignore rule, ...)), + # (.gitignore file directory path, (ignore rule, ...)), + # ... + # ) + rules: tuple[tuple[Path, tuple[IgnoreRule, ...]], ...] = tuple() + + def add_gitignore(self, gitignore: Path) -> "SideStepIgnoreMatcher": + """returns a new SidestepIgnoreMatcher with rules from the given gitignore file""" + + new_ruleset: list[IgnoreRule] = [] + for line_no, line_text in enumerate(gitignore.read_text().splitlines()): + rule = rule_from_pattern( + pattern=line_text.rstrip("\n"), + base_path=Path(abspath(gitignore.parent)), + source=(gitignore, line_no), + ) + if rule: + new_ruleset.append(rule) + + return SideStepIgnoreMatcher( + root=self.root, rules=self.rules + ((gitignore.parent, tuple(new_ruleset)),) + ) + + def match(self, file: Path | str) -> bool: + """returns True if the file is ignored by any of the rules in the gitignore files, False otherwise""" + matched = False + + # check to see if the gitignore affects the file + for ignore_dir, ruleset in self.rules: + if str(ignore_dir) not in str(file): + continue + if not self._possibly_negated(ruleset): + matched = matched or any(r.match(file) for r in ruleset) + else: + for rule in reversed(ruleset): + if rule.match(file): + matched = matched or not rule.negation + return matched + + def match_trytrytry(self, file: Path) -> Path | None: + """ + same as match, but also checks if the gitignore files ignore any parent directories; + horribly slow and dumb, thus the name 'trytrytry' + + returns the ignored parent path if the file is ignored, None otherwise + """ + + trytrytry: Path = file + while trytrytry != trytrytry.parent: + if self.match(trytrytry): + return trytrytry + if len(self.root.parts) == len(trytrytry.parts): + return None + trytrytry = trytrytry.parent + return None + + @cache + def _possibly_negated(self, ruleset: tuple[IgnoreRule, ...]) -> bool: + return any(rule.negation for rule in ruleset) + + +@dataclass(eq=True, frozen=True) +class LargeFileFilterResult: + """ + result data structure of the large file filter + + files: tuple[Path, ...] + large files found + matcher: SideStepIgnoreMatcher + the *ignore matcher instance + ignore_directories: tuple[Path, ...] + directories that were ignored + """ + + files: tuple[Path, ...] + matcher: SideStepIgnoreMatcher + ignore_directories: tuple[Path, ...] + + +def _parallel() -> bool: + """ + helper function to determine if we should use multiprocessing; + checks the environment variable SIDESTEP_PARALLEL and the command line arguments + + returns: bool + """ + if SOTA_SIDESTEP_PARALLEL: + return True + elif "--parallel" in argv: + return True + return False + + +def _iter_files( + target: Path, + pattern: str = "*", +) -> Generator[Path, None, None]: + """ + generator that yields files in the target directory excluding '.git/**' + + args: + target: Path + the directory to search in + pattern: str = "*" + the file pattern to search for + + yields: Path + file in the target directory + """ + repo_dir = target.joinpath(".git/") + for target_file in target.rglob(pattern): + if not target_file.is_file(): + continue + if repo_dir in target_file.parents: + continue + yield target_file + + +def iter_files(target_dir: Path) -> tuple[tuple[Path, ...], SideStepIgnoreMatcher]: + """ + get all non-git files and register .gitignore files + + args: + target_dir: Path + the directory to search in + + returns: tuple[tuple[Path, ...], SideStepIgnoreMatcher] + tuple of all files in the target directory and a SideStepIgnoreMatcher instance + """ + + all_files: list[Path] = [] + sim = SideStepIgnoreMatcher(root=target_dir) + + for file in tqdm( + _iter_files(target_dir), + desc="1 pre | finding large files - scanning (1/3)", + leave=False, + ): + all_files.append(file) + if file.name == ".gitignore": + sim = sim.add_gitignore(file) + + return tuple(all_files), sim + + +def _filter_sim_match( + os: OneSided[tuple[list[Path], SideStepIgnoreMatcher], Path], +) -> Path | None: + """first filter pass function, thread-safe-ish""" + (ignore_dirs, sim), file = os.a, os.b + + ignored = False + for ign_dir in ignore_dirs: + if str(ign_dir) in str(file): + ignored = True + break + + if (not ignored) and ((ttt := sim.match_trytrytry(file)) is not None): + if ttt.is_dir() and ttt not in ignore_dirs: + ignore_dirs.append(ttt) + return None + return file + + +def _filter_ign_dirs_and_size(os: OneSided[list[Path], Path]) -> Path | None: + """second filter pass function, thread-safe-ish""" + ignore_dirs, file = os.a, os.b + + for ign_dir in ignore_dirs: + if str(ign_dir) in str(file): + return None + else: + # we're here because the file is not ignored by any of the rules + # (the 'else' clause is only executed if the for loop completes without breaking) + if file.stat().st_size > SOTA_SIDESTEP_LARGE_FILE_SIZE: + return file + return None + + +def _find_large_files_single( + files: tuple[Path, ...], sim: SideStepIgnoreMatcher +) -> LargeFileFilterResult: + """single-process implementation of find_large_files""" + ignore_dirs: list[Path] = [] + + _files = [] + for fsm_os in tqdm( + one_sided(a=(ignore_dirs, sim), bbb=files), + desc="1 pre | finding large files - iod-ttt file matching (2/3)", + leave=False, + total=len(files), + ): + if f := _filter_sim_match(fsm_os): + _files.append(f) + + large_files = [] + for fds_os in tqdm( + one_sided(a=ignore_dirs, bbb=_files), + desc="1 pre | finding large files - dir rematching (3/3)", + leave=False, + total=len(_files), + ): + f = _filter_ign_dirs_and_size(fds_os) + if f is not None: + large_files.append(f) + + return LargeFileFilterResult( + files=tuple(large_files), + matcher=sim, + ignore_directories=tuple(ignore_dirs), + ) + + +def _find_large_files_parallel( + files: tuple[Path, ...], sim: SideStepIgnoreMatcher +) -> LargeFileFilterResult: + """multiprocess implementation of find_large_files""" + manager = Manager() + ignore_dirs: ListProxy[Path] = manager.list() + + _files: list[Path] = [ + f + for f in process_map( + _filter_sim_match, + one_sided(a=(ignore_dirs, sim), bbb=files), + desc="1 pre | finding large files - iod-ttt file matching (2/3)", + leave=False, + chunksize=SOTA_SIDESTEP_CHUNK_SIZE, + max_workers=SOTA_SIDESTEP_MAX_WORKERS, + total=len(files), + ) + if f is not None + ] + + large_files: tuple[Path, ...] = tuple( + [ + f + for f in process_map( + _filter_ign_dirs_and_size, + one_sided(a=ignore_dirs, bbb=_files), + desc="1 pre | finding large files - dir rematching (3/3)", + leave=False, + chunksize=SOTA_SIDESTEP_CHUNK_SIZE, + max_workers=SOTA_SIDESTEP_MAX_WORKERS, + total=len(files), + ) + if f is not None + ] + ) + + return LargeFileFilterResult( + files=large_files, + matcher=sim, + ignore_directories=tuple(ignore_dirs), + ) + + +def find_large_files( + files: tuple[Path, ...], matcher: SideStepIgnoreMatcher +) -> LargeFileFilterResult: + """ + finds all files larger than a certain size in a directory; + uses SOTA_SIDESTEP_LARGE_FILE_SIZE as the size threshold + + args: + files: tuple[Path, ...] + list of files to search through + matcher: SideStepIgnoreMatcher + the ignore matcher instance from iter_files() + + returns: LargeFileFilterResult + """ + if _parallel(): + return _find_large_files_parallel(files, matcher) + else: + return _find_large_files_single(files, matcher) + + +def write_sotaignore(large_files: tuple[Path, ...]) -> bool: + """ + writes out a .sotaignore file with a list of large files, + updating an existing one if already present + + args: + large_files: list[Path] + list of large files + + returns: bool + True if anything was written, False otherwise (no changes) + """ + if not large_files: + return False + + old_sotaignore = ( + REPO_SOTAIGNORE.read_text().strip().splitlines() + if REPO_SOTAIGNORE.exists() + else [] + ) + + new_sotaignore = [ln for ln in old_sotaignore] + [ + lf.relative_to(REPO_DIR).as_posix() + for lf in large_files + if lf.relative_to(REPO_DIR).as_posix() not in old_sotaignore + ] + + if new_sotaignore == old_sotaignore: + return False + + # check if the sotaignore file starts with a comment + if new_sotaignore and not new_sotaignore[0].startswith("#"): + for line in [ + "# .sotaignore file generated by sota staircase ReStepper/SideStepper", + "# anything here either can't or shouldn't be uploaded github", + "# unless you know what you're doing, don't edit this file! >:(", + ][::-1]: + new_sotaignore.insert(0, line) + + REPO_SOTAIGNORE.touch(exist_ok=True) + REPO_SOTAIGNORE.write_text("\n".join(new_sotaignore) + "\n") + return True + + +def main() -> None: + """command-line entry function""" + + print( + "\nsota staircase SideStepper", + f" repo root : {REPO_DIR.relative_to(Path.cwd())}", + ( + f" .sotaignore : {REPO_SOTAIGNORE.relative_to(Path.cwd())} " + f"({'exists' if REPO_SOTAIGNORE.exists() else 'does not exist'})" + ), + f" parallel? : {'yes' if _parallel() else 'no'}\n", + sep="\n", + file=stderr, + ) + + cumulative_start_time = time() + + print(f"1/3{INDENT}pre-scanning repository... ", end="", file=stderr) + start_time = time() + files, sim = iter_files(REPO_DIR) + end_time = time() + print( + f"1/3{INDENT}pre-scanning repository... " + f"done in {generate_time_elapsed_string(end_time - start_time)} " + f"(found {len(files)})", + file=stderr, + ) + + print(f"2/3{INDENT}finding large files... ", end="", file=stderr) + start_time = time() + large_files = find_large_files(files, sim).files + end_time = time() + print( + f"2/3{INDENT}finding large files... " + f"done in {generate_time_elapsed_string(end_time - start_time)} " + f"(found {len(large_files)})", + file=stderr, + ) + + print(f"3/3{INDENT}writing .sotaignore file... ", end="", file=stderr) + start_time = time() + was_written = write_sotaignore(large_files) + end_time = time() + print( + ("done" if was_written else "skipped") + + f" in {generate_time_elapsed_string(end_time - start_time)}\n", + file=stderr, + ) + + for file in large_files: + print(file.relative_to(REPO_DIR).as_posix()) + + cumulative_end_time = time() + print( + f"\n--- done! took {generate_time_elapsed_string(cumulative_end_time - cumulative_start_time)}~ " + "☆*: .。. o(≧▽≦)o .。.:*☆ ---", + flush=True, + file=stderr, + ) + + +if __name__ == "__main__": + main() diff --git a/sync.py b/sync.py new file mode 100644 index 0000000..1f219c8 --- /dev/null +++ b/sync.py @@ -0,0 +1,644 @@ +# sota staircase ReStepper +# forge -> github one-way repo sync script +# licence: 0BSD +from multiprocessing.pool import ThreadPool +from os import getenv +from pathlib import Path +from pprint import pformat +from shutil import copy2, copytree +from subprocess import CompletedProcess +from subprocess import run as _run +from sys import argv, executable +from tempfile import TemporaryDirectory +from textwrap import indent +from time import time +from traceback import format_tb +from typing import Callable, Final, TypeVar + +try: + from sidestepper import ( + SOTA_SIDESTEP_MAX_WORKERS, + LargeFileFilterResult, + find_large_files, + generate_command_failure_message, + generate_time_elapsed_string, + iter_files, + run, + write_sotaignore, + ) +except EnvironmentError: + # specific error raised when third-party modules not found, but were automatically + # installed, so we need to restart the script + exit(_run([executable, Path(__file__).absolute(), *argv[1:]]).returncode) + +# we can only guarantee third-party modules are installed after sidestepper +from tqdm import tqdm + +# constants +INDENT: Final[str] = " " +REPO_DIR: Final[Path] = Path(__file__).parent +REPO_SOTAIGNORE: Final[Path] = REPO_DIR.joinpath(".sotaignore") +REPO_URL_GITHUB: Final[str] = "github.com/Sc0rch-thinks/sslr" +REPO_URL_FORGE: Final[str] = "forge.joshwel.co/Ryan/SSLR" +COMMIT_MESSAGE: Final[str] = "chore(restep): sync with forge" +COMMIT_AUTHOR: Final[str] = "sota staircase ReStepper " +NEUTERED_GITATTRIBUTES: Final[str] = ( + """# auto detect text files and perform lf normalization\n* text=auto\n""" +) +GH_ACT: Final[bool] = getenv("GITHUB_ACTIONS", "").lower() == "true" +GH_TOKEN: Final[str] = getenv("SS_RESTEPPER_TOKEN", "") +if GH_ACT and GH_TOKEN == "": + print( + "critical error: no personal access token found in SS_RESTEP_TOKEN, " + "may not have permission to push to github" + ) + exit(1) + +# dictionary to share state across steps +r: dict[str, str] = {} + +R = TypeVar("R") + + +class CopyHighway: + """ + multithreaded file copying class that gives a copy2-like function + for use with shutil.copytree(); also displays a progress bar + """ + + pool: ThreadPool + pbar: tqdm + lff_result: LargeFileFilterResult | None + respect_ignore: bool = True + + def __init__( + self, message: str, total: int, lff_result: LargeFileFilterResult | None + ): + """ + multithreaded file copying class that gives a copy2-like function + for use with shutil.copytree() + + args: + message: str + message to display in the progress bar + total: int + total number of files to copy + lff_result: LargeFileFilterResult + result of the large file filter + """ + self.pool = ThreadPool( + processes=SOTA_SIDESTEP_MAX_WORKERS, + ) + self.pbar = tqdm( + total=total, + desc=message, + unit=" files", + leave=False, + ) + self.lff_result = lff_result + self.respect_ignore = False if "--dupethelongway" in argv else True + + def callback(self, a: R): + self.pbar.update() + return a + + def copy2(self, source: Path | str, dest: Path | str) -> None: + """shutil.copy2()-like function for use with shutil.copytree()""" + + if self.respect_ignore and (self.lff_result is not None): + # ignore check 1: dir + for ign_dir in self.lff_result.ignore_directories: + if str(ign_dir) in str(source): + self.pbar.update() + return None + + # ignore check 2: file + # ... we don't need to use the trytrytry method + # ... because we already did that as part of the large file filter, + # ... and as such we checked for it with the first check above + if self.lff_result.matcher.match(source): + self.pbar.update() + return None + + self.pool.apply_async(copy2, args=(source, dest), callback=self.callback) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.pool.close() + self.pool.join() + self.pbar.close() + + +def _default_post_func(cp: R) -> R: + """ + default post-call function for steps; does nothing + + for steps that return a CompletedProcess, this function will run the + `_command_post_func` function + + args: + cp: R + return object from a step function + + returns: R + the return object from the step function + """ + if isinstance(cp, CompletedProcess): + _command_post_func(cp) + return cp + + +def _command_post_func( + cp: CompletedProcess, + fail_on_error: bool = True, + quit_early: bool = False, + quit_message: str = "the command gave unexpected output", +) -> CompletedProcess: + """ + default post-call function for command steps; checks if the command was + successful and prints the output if it wasn't + + if the command was successful, the stdout and stderr are stored in the + shared state dictionary r under 'stdout' and 'stderr' respectively + + args: + cp: CompletedProcess + return object from subprocess.run() + fail_on_error: bool + whether to fail on error + quit_early: bool + whether to quit early + quit_message: str + the message to print if quitting early + + returns: CompletedProcess + the return object from subprocess.run() + """ + + if quit_early: + print(f"\n\nfailure: {quit_message}\n") + + else: + r["stdout"] = cp.stdout.decode() if isinstance(cp.stdout, bytes) else "\0" + r["stderr"] = cp.stderr.decode() if isinstance(cp.stderr, bytes) else "\0" + r["blank/stdout"] = "yes" if (r["stdout"].strip() == "") else "" + r["blank/stderr"] = "yes" if (r["stderr"].strip() == "") else "" + r["blank"] = "yes" if (r["blank/stdout"] and r["blank/stderr"]) else "" + r["errored"] = "" if (cp.returncode == 0) else str(cp.returncode) + + # return if the command was successful + # or if we're not failing on error + if (cp.returncode == 0) or (not fail_on_error): + return cp + else: + print(generate_command_failure_message(cp)) + + exit( + cp.returncode if (isinstance(cp.returncode, int) and cp.returncode != 0) else 1 + ) + + +def post_filter_repo_check(cp: CompletedProcess) -> CompletedProcess: + """ + post-call function for checking if git-filter-repo is installed + and optionally installing it if it isn't + """ + + if cp.returncode == 0: + return cp + + if input("git filter-repo is not installed, install it? y/n: ").lower() != "y": + print( + "install it using 'pip install git-filter-repo' " + "or 'pipx install git-filter-repo'", + ) + return cp + + # check if pipx is installed + use_pipx = False + + check_pipx_cp = run(["pipx", "--version"]) + if check_pipx_cp.returncode == 0: + use_pipx = True + + # install git-filter-repo + pip_invocation: list[str] = ["pipx"] if use_pipx else [executable, "-m", "pip"] + print( + f"running '{' '.join([*pip_invocation, 'install', 'git-filter-repo'])}'... ", + end="", + flush=True, + ) + install_rc = run([*pip_invocation, "install", "git-filter-repo"]) + if install_rc.returncode != 0: + print("error") + _command_post_func(install_rc) + exit(install_rc.returncode) + else: + print("done\n") + + # check if it is reachable + if run(["git", "filter-repo", "--version"]).returncode != 0: + # revert + run([*pip_invocation, "uninstall", "git-filter-repo"]) + print( + "failure: could not install git-filter-repo automatically. " + "do it yourself o(*≧▽≦)ツ┏━┓" + ) + exit(-1) + + return cp + + +def rewrite_gitattributes(target_dir: Path) -> None: + """ + rewrite the .gitattributes file in a directory to disable git-lfs + + args: + target_dir: Path + the directory to search + """ + + # recursively search for .gitattributes files + for repo_file in target_dir.rglob(".gitattributes"): + repo_file.write_text(NEUTERED_GITATTRIBUTES, encoding="utf-8") + + +def step( + func: Callable[[], R], + desc: str = "", + post_func: Callable[[R], R] = _default_post_func, + post_print: bool = True, +) -> R: + """ + helper function for running steps + + args: + desc: str + description of the step + func: Callable[[], R] + function to run + post_func: Callable[[R], R] + post-function to run after func + post_print: bool + whether to print done after the step + + returns: + R + return object from func + """ + + # run the function + if desc != "": + print(f"{desc}..", end="", flush=True) + + start_time = time() + + try: + cp = func() + + except Exception as exc: + print( + f"\n\nfailure running step: {exc} ({exc.__class__.__name__})", + "\n".join(format_tb(exc.__traceback__)) + "\n", + sep="\n", + ) + exit(1) + + if desc != "": + print(".", end="", flush=True) + + # run the post-function + try: + rp = post_func(cp) + + except Exception as exc: + print( + f"\n\nfailure running post-step: {exc} ({exc.__class__.__name__})", + "\n".join(format_tb(exc.__traceback__)) + "\n", + sep="\n", + ) + exit(1) + + end_time = time() + + # yay + if desc != "" and post_print: + print( + f" done in {generate_time_elapsed_string(end_time - start_time)}", + flush=True, + ) + + return rp + + +def post_remote_v(cp: CompletedProcess) -> CompletedProcess: + """ + post-call function for 'git remote -v' command, parses the output and + checks for the forge and github remotes, storing them in the shared state + under 'remote/forge', 'remote/forge/url', 'remote/github', and + 'remote/github/url' respectively + """ + + if not isinstance(cp.stdout, bytes): + return _command_post_func(cp) + + for line in cp.stdout.decode().split("\n"): + # github https://github.com/markjoshwel/sota (fetch) + # github https://github.com/markjoshwel/sota (push) + # origin https://forge.joshwel.co/mark/sota.git (fetch) + # origin https://forge.joshwel.co/mark/sota.git (push) + + split_line = line.split(maxsplit=1) + if len(line) < 2: + continue + + # remote='origin' url='https://forge.joshwel.co/mark/sota.git (fetch)' + remote, url = split_line + + # clean up the url + if (REPO_URL_FORGE in url) or (REPO_URL_GITHUB in url): + # url='https://forge.joshwel.co/mark/sota.git' + url = url.split("(", maxsplit=1)[0].strip() + + if REPO_URL_FORGE in url: + r["remote/forge"] = remote + r["remote/forge/url"] = url + + elif REPO_URL_GITHUB in url: + r["remote/github"] = remote + r["remote/github/url"] = url + + return _command_post_func(cp) + + +def err(message: str, exc: Exception | None = None) -> None: + """ + helper function for printing error messages, prints the message and the + shared state dictionary r + + args: + message: str + the error message to print + exc: Exception | None + the exception that caused the error, if any + """ + + print( + "\n" + message, + ( + "" + if (exc is None) + else indent( + text=( + f"{exc} ({exc.__class__.__name__})\n" + f"{'\n'.join(format_tb(exc.__traceback__))}\n" + ), + prefix=INDENT, + ) + ) + + (indent(text=pformat(r), prefix=INDENT) + "\n"), + sep="\n", + ) + exit(1) + + +def main() -> None: + """ + command line entry point + """ + + cumulative_start_time = time() + with TemporaryDirectory(delete="--keep" not in argv) as dir_temp: + print( + "\nsota staircase ReStepper\n" + "\n" + "directories\n" + f" real repo : {REPO_DIR}\n" + f" temp repo : {dir_temp}\n", + f" is gh act : {GH_ACT}\n" if GH_ACT else "", + sep="", + ) + + # helper partial function for command + def cmd( + command: str, + wd: Path | str = dir_temp, + capture_output: bool = True, + give_input: str | None = None, + ) -> Callable[[], CompletedProcess]: + return lambda: run( + command, + cwd=wd, + capture_output=capture_output, + give_input=give_input, + ) + + step( + func=cmd("git filter-repo --version"), + post_func=post_filter_repo_check, + ) + + step(cmd("git status --porcelain", wd=REPO_DIR)) + if (not r["blank"]) and ("--iknowwhatimdoing" not in argv): + err( + "critical error: repository is not clean, please commit changes first", + ) + + start_time = time() + print("1 pre | finding large files", end="", flush=True) + files, sim = iter_files(REPO_DIR) + + flf_filter_result: LargeFileFilterResult | None = None + if "--skipsotaignoregen" not in argv: + flf_filter_result = find_large_files(files, sim) + large_files = flf_filter_result.files + end_time = time() + print( + "1 pre | finding large files... " + f"done in {generate_time_elapsed_string(end_time - start_time)} (found {len(large_files)})" + ) + + if large_files: + start_time = time() + was_written = step( + desc="2 pre | writing .sotaignore", + func=lambda: write_sotaignore(large_files), + post_func=lambda cp: cp, + post_print=False, + ) + end_time = time() + if was_written: + print( + f" done in {generate_time_elapsed_string(end_time - start_time)}" + ) + else: + print(" not needed") + else: + end_time = time() + print( + "1 pre | finding large files... " + f"skipped in {generate_time_elapsed_string(end_time - start_time)}" + ) + + print("3 pre | duplicating repo... pre-scanning", end="", flush=True) + + start_time = time() + with CopyHighway( + message="3 pre | duplicating repo", + total=len(list(REPO_DIR.rglob("*"))), + lff_result=flf_filter_result, + ) as copier: + copytree( + src=REPO_DIR, + dst=dir_temp, + copy_function=copier.copy2, + dirs_exist_ok=True, + ) + end_time = time() + print( + f"3 pre | duplicating repo... done in {generate_time_elapsed_string(end_time - start_time)}", + flush=True, + ) + + step(cmd('python -c "import pathlib; print(pathlib.Path.cwd().absolute())"')) + if str(Path(dir_temp).absolute()) != r["stdout"].strip(): + err( + "critical error (whuh? internal?): " + f"not inside the temp dir '{str(Path(dir_temp).absolute())}'" + ) + + # check for forge and github remotes + step( + func=cmd("git remote -v"), + post_func=post_remote_v, + ) + if "remote/forge" not in r: + err("critical error (whuh?): no forge remote found") + + # get the current branch + step(cmd("git branch --show-current")) + branch = r["stdout"].strip() + if r.get("errored", "yes") or branch == "": + err("critical error (whuh?): couldn't get current branch") + + step(cmd(f"git fetch {r['remote/forge']}")) + step(cmd(f"git rev-list HEAD...{r['remote/forge']}/{branch} --count")) + if (r.get("stdout", "").strip() != "0") and ("--dirty" not in argv): + err( + "critical error (whuh?): " + "not up to date with forge... sync your changes first?" + ) + + step(desc="4 lfs | fetch lfs objects", func=cmd("git lfs fetch")) + + step( + desc="5 lfs | migrating lfs objects", + func=cmd( + 'git lfs migrate export --everything --include="*" --remote=origin', + give_input="y\n", + ), + ) + + step( + desc="6 lfs | uninstall lfs in repo", + func=cmd("git lfs uninstall"), + ) + + step( + func=cmd("git lfs ls-files"), + ) + if not r["blank"]: + err( + "critical error (whuh? internal?): " + "lfs objects still exist post-migrate and uninstall" + ) + + if REPO_SOTAIGNORE.exists(): + try: + sotaignore = REPO_SOTAIGNORE.read_text(encoding="utf-8").strip() + except Exception as exc: + err("critical error: couldn't read .sotaignore file", exc=exc) + + sotaignored_files: list[str] = [ + line + for line in sotaignore.splitlines() + if not line.startswith("#") and line.strip() != "" + ] + + step( + desc=f"7 lfs | filter repo and {len(sotaignored_files)} file(s)", + func=cmd( + "git filter-repo --force --strip-blobs-bigger-than 100M --invert-paths " + + " ".join(f'--path ""{lf}' "" for lf in sotaignored_files) + ), + ) + + # also copy to the temp repo; step 5 (lfs migrate) wipes uncommitted changes + copy2(REPO_SOTAIGNORE, Path(dir_temp).joinpath(".sotaignore")) + + def add_and_commit() -> CompletedProcess: + if GH_ACT: + cp = cmd("git config user.name 'github-actions[bot]'")() + if cp.returncode != 0: + return cp + + cp = cmd( + "git config user.email 'github-actions[bot]@users.noreply.github.com'" + )() + if cp.returncode != 0: + return cp + + cp = cmd("git add -A")() + if cp.returncode != 0: + return cp + + return cmd( + "git commit --allow-empty " + f'-am "{COMMIT_MESSAGE}" --author="{COMMIT_AUTHOR}"', + )() + + def neuter_and_commit(): + rewrite_gitattributes(Path(dir_temp)) + add_and_commit() + + step( + desc="8 fin | neuter .gitattributes and commit", + func=neuter_and_commit, + ) + + if r.get("remote/github") is None: + step( + func=cmd(f"git remote add github https://{REPO_URL_GITHUB}.git"), + ) + if r.get("errored", "yes"): + err("critical error (whuh?): couldn't add github remote") + r["remote/github"] = "github" + + step( + desc=f"9 fin | fetch {r['remote/github']}", + func=cmd(f"git fetch {r['remote/github']}"), + ) + + push_invocation = ( + f"git push {r['remote/github']} {branch} --force" + if not GH_ACT + else f"git push https://markjoshwel:{GH_TOKEN}@{REPO_URL_GITHUB}.git {branch} --force" + ) + + step( + desc=f"X fin | pushing to {r['remote/github']}/{branch}", + func=cmd(push_invocation if ("--test" not in argv) else "git --version"), + ) + + cumulative_end_time = time() + print( + f"\n--- done! took {generate_time_elapsed_string(cumulative_end_time - cumulative_start_time)}~ " + "☆*: .。. o(≧▽≦)o .。.:*☆ ---", + flush=True, + ) + + +if __name__ == "__main__": + main()