From 8966008025a54734a1949d92bf66a17e009d44a1 Mon Sep 17 00:00:00 2001 From: Mark Joshwel Date: Mon, 15 Jul 2024 03:54:14 +0800 Subject: [PATCH] docs,tooling: cooler restepper + sidestepper - docs: added detailed docs on restepper and sidestepper - re/sidestepper: command inovations are safer - re/sidestepper: option to automatically install dependencies - sidestepper: behaviour is now correct + multiprocessing option - restepper: rely on sidestepper for a few functions - restepper: multithreaded repo duplication - restepper: chunk filtering into a single command --- README.md | 121 +++++++++-- sidestepper.py | 555 +++++++++++++++++++++++++++++++++++++++++++++++++ sync.py | 514 ++++++++++++++++++++++----------------------- 3 files changed, 910 insertions(+), 280 deletions(-) create mode 100644 sidestepper.py diff --git a/README.md b/README.md index 9cb3c35..896cdfb 100644 --- a/README.md +++ b/README.md @@ -14,14 +14,16 @@ Submission Mirror: | sai | lead 3d artist | quality checker | @sai-thinks | @sippy-thinks | - [Handbook](#handbook) - - [on 3D Modelling (Maya, Blender, ZBrush, etc.)](#on-3d-modelling-maya-blender-zbrush-etc) - - [on Graphic and UI/UX Design](#on-graphic-and-uiux-design) - - [on Game Development](#on-game-development) - - [on Game and Level Design](#on-game-and-level-design) - - [on Documentation (for All Modules)](#on-documentation-for-all-modules) - - [on Repository Syncing](#on-repository-syncing) + - [on 3D Modelling (Maya, Blender, ZBrush, etc.)](#on-3d-modelling-maya-blender-zbrush-etc) + - [on Graphic and UI/UX Design](#on-graphic-and-uiux-design) + - [on Game Development](#on-game-development) + - [on Game and Level Design](#on-game-and-level-design) + - [on Documentation (for All Modules)](#on-documentation-for-all-modules) + - [on Repository Syncing](#on-repository-syncing) + - [Syncing to GitHub via ReStepper](#syncing-to-github-via-restepper) + - [SideStepper and the .sotaignore file](#sidestepper-and-the-sotaignore-file) - [Licence and Credits](#licence-and-credits) - - [Third-party Licences](#third-party-licences) + - [Third-party Licences](#third-party-licences) ## Handbook @@ -33,7 +35,7 @@ Submission Mirror: design-as-in-modelling your assets with modularity in mind, anything that can be modular should be modular -design-as-in-look should be checked with the group +design-as-in-look should be checked with the group structure your files similarly: @@ -63,7 +65,8 @@ Modelling |:----:|:----:| if it involves the brand: -follow the brand guidelines at [Documentation/sota staircase Brand Guidelines.pdf](Documentation/sota%20staircase%20Brand%20Guidelines.pdf) +follow the brand guidelines +at [Documentation/sota staircase Brand Guidelines.pdf](Documentation/sota%20staircase%20Brand%20Guidelines.pdf) and then send it to mark for approval (●'◡'●) @@ -105,7 +108,8 @@ on [the forge](https://forge.joshwel.co/mark/sota/issues) | Lead | kinda everyone more so mark | |:----:|:---------------------------:| -follow the brand guidelines at [Documentation/sota staircase Brand Guidelines.pdf](Documentation/sota%20staircase%20Brand%20Guidelines.pdf) +follow the brand guidelines +at [Documentation/sota staircase Brand Guidelines.pdf](Documentation/sota%20staircase%20Brand%20Guidelines.pdf) source files (.docx, .fig, etc.) should be in the respective modules' directory, and then exported as .pdfs to `Documentation/*.pdf` @@ -115,18 +119,89 @@ and then exported as .pdfs to `Documentation/*.pdf` | Wizard | Mark | |:------:|:----:| +#### Syncing to GitHub via ReStepper + instructions: ```text -python restepper.py +python sync.py ``` -if it screams at you, fix them - +if it screams at you, fix them if it breaks, refer to the resident "wizard" for what the script does, see the script itself: [sync.py](sync.py) +##### Advanced Usage + +you probably don't need to ever use these :p + +the following environment variables can be set: + +- `SOTA_SIDESTEP_MAX_WORKERS` + how many workers to use for repository duplication, + default is how many cpu threads are available + +the following command line arguments can be used: + +- `--skipsotaignoregen` + skips generating a `.sotaignore` file, + useful if you _know_ you've already generated one beforehand + +- `--test` + does everything except actually pushing to github + +there's more, but god forbid you need to use them unless you're changing the script, +search for `argv` in the script if you're comfortable with dragons + +#### SideStepper and the .sotaignore file + +the `.sotaignore` file is a file that tells the sync script what to ignore when syncing +to github, and should be in the root of the repository + +it is automatically generated by the sync script and should not be manually edited +unless there's a file we want to exclude + +any file over 100MB is automatically added when running ReStepper (the sync script) or +SideStepper (the script that generates the `.sotaignore` file) + +to manually generate this without syncing, run: + +```text +python sidestepper.py +``` + +we may or may not want to add the contents of the `.sotaignore` file to the `.gitignore` +but that's probably better off as a case-by-case basis type thing + +for what the script does, see the script itself: [sidestepper.py](sidestepper.py) + +##### Advanced Usage + +you probably don't need to ever use these :p + +the following environment variables can be set: + +- `SOTA_SIDESTEP_CHUNK_SIZE` + how many files to chunk for file finding, default is 16 + +- `SOTA_SIDESTEP_MAX_WORKERS` + how many workers to use for file finding, + default is how many cpu threads are available + +- `SOTA_SIDESTEP_PARALLEL` + whether to use multiprocessing for large file finding, default is false + + hilariously it's ~4-5x slower than single-threaded file finding, but the option + is still present because it was made before the fourth implementation of + the large file finding algorithm + (now called SideStepper because names are fun, sue me) + +the following command line arguments can be used: + +- `--parallel` + same behaviour as setting the `SOTA_SIDESTEP_PARALLEL` environment variable + ## Licence and Credits "NP resources" hereby refers to resources provided by Ngee Ann Polytechnic (NP) for the @@ -145,15 +220,15 @@ development of the project specifically coming from, or in part have had the following software and/or services involved: - - Autodesk Maya - - Adobe Substance 3D - - Substance 3D Modeler - - Substance 3D Sampler - - Substance 3D Designer - - Substance 3D Painter - - Substance 3D Stager - - Substance 3D Assets - + - Autodesk Maya + - Adobe Substance 3D + - Substance 3D Modeler + - Substance 3D Sampler + - Substance 3D Designer + - Substance 3D Painter + - Substance 3D Stager + - Substance 3D Assets + would be all rights reserved, unless otherwise stated (_i mean mr q said this already lol_) @@ -186,6 +261,6 @@ exceptions to the above licences are as follows: > Example: > > - Frogman by Frog Creator: Standard Unity Asset Store EULA (Extension Asset) -> `Assets/Characters/Frogman` + > `Assets/Characters/Frogman` > > comma-separate multiple licences, and use code blocks if you need to list multiple files/directories/patterns diff --git a/sidestepper.py b/sidestepper.py new file mode 100644 index 0000000..a8044a5 --- /dev/null +++ b/sidestepper.py @@ -0,0 +1,555 @@ +# sota staircase SideStepper +# a somewhat fast .gitignore-respecting large file finder +# licence: 0BSD + +from dataclasses import dataclass +from functools import cache +from multiprocessing import Manager, cpu_count + +# noinspection PyProtectedMember +from multiprocessing.managers import ListProxy +from os import getenv +from os.path import abspath +from pathlib import Path +from subprocess import CompletedProcess +from subprocess import run as _run +from sys import argv, executable, stderr +from textwrap import indent +from time import time +from traceback import format_tb +from typing import Final, Generator, Generic, Iterable, Iterator, NamedTuple, TypeVar + +# constants +INDENT = " " +REPO_DIR: Final[Path] = Path(__file__).parent +REPO_SOTAIGNORE: Final[Path] = REPO_DIR.joinpath(".sotaignore") +_SOTA_SIDESTEP_CHUNK_SIZE = getenv("SIDESTEP_CHUNK_SIZE") +SOTA_SIDESTEP_CHUNK_SIZE: Final[int] = ( + int(_SOTA_SIDESTEP_CHUNK_SIZE) + if ( + (_SOTA_SIDESTEP_CHUNK_SIZE is not None) + and (_SOTA_SIDESTEP_CHUNK_SIZE.isdigit()) + ) + else 16 +) +_SOTA_SIDESTEP_MAX_WORKERS = getenv("SIDESTEP_MAX_WORKERS") +SOTA_SIDESTEP_MAX_WORKERS: Final[int] = ( + int(_SOTA_SIDESTEP_MAX_WORKERS) + if ( + (_SOTA_SIDESTEP_MAX_WORKERS is not None) + and (_SOTA_SIDESTEP_MAX_WORKERS.isdigit()) + ) + else cpu_count() +) +SOTA_SIDESTEP_LARGE_FILE_SIZE: Final[int] = 100000000 # 100mb +SOTA_SIDESTEP_PARALLEL: Final[bool] = getenv("SIDESTEP_PARALLEL") is not None + + +# define these before importing third-party modules because we use them in the import check +def generate_command_failure_message(cp: CompletedProcess) -> str: + return "\n".join( + [ + f"\n\nfailure: command '{cp.args}' failed with exit code {cp.returncode}", + f"{INDENT}stdout:", + ( + indent(text=cp.stdout.decode(), prefix=f"{INDENT}{INDENT}") + if (isinstance(cp.stdout, bytes) and (cp.stdout != b"")) + else f"{INDENT}{INDENT}(no output)" + ), + f"{INDENT}stderr:", + ( + indent(text=cp.stderr.decode(), prefix=f"{INDENT}{INDENT}") + if (isinstance(cp.stderr, bytes) and (cp.stderr != b"")) + else f"{INDENT}{INDENT}(no output)" + ) + + "\n", + ] + ) + + +def run( + command: str | list, + cwd: Path | str | None = None, + capture_output: bool = True, + give_input: str | None = None, +) -> CompletedProcess: + """ + exception-safe-ish wrapper around subprocess.run() + + args: + command: str | list + the command to run + cwd: Path | str | None = None + the working directory + capture_output: bool = True + whether to capture the output + + returns: CompletedProcess + the return object from subprocess.run() + """ + + # noinspection PyBroadException + try: + cp = _run( + command, + shell=True if isinstance(command, list) else False, + cwd=cwd, + capture_output=capture_output, + input=give_input.encode() if give_input else None, + ) + except Exception as run_exc: + print( + f"\n\nfailure: command '{command}' failed with exception", + f"{INDENT}{run_exc.__class__.__name__}: {run_exc}", + indent(text="\n".join(format_tb(run_exc.__traceback__)), prefix=INDENT), + sep="\n", + ) + exit(-1) + return cp + + +# attempt to import third-party modules +# if they're not installed, prompt the user to optionally install them automatically +_could_not_import: list[str] = [] +_could_not_import_exc: Exception | None = None + +try: + from gitignore_parser import IgnoreRule, rule_from_pattern # type: ignore +except ImportError as _import_exc: + _could_not_import.append("gitignore_parser") + _could_not_import_exc = _import_exc + +try: + # noinspection PyUnresolvedReferences + from tqdm import tqdm + + # noinspection PyUnresolvedReferences + from tqdm.contrib.concurrent import process_map +except ImportError as _import_exc: + _could_not_import.append("tqdm") + _could_not_import_exc = _import_exc + +if _could_not_import: + for module in _could_not_import: + print( + f"critical error: '{module}' is not installed, " + f"please run 'pip install {module}' to install it", + ) + + # install the missing modules + if input("\ninstall these with pip? y/n: ").lower() == "y": + print("installing...", end="", flush=True) + _cp = run([executable, "-m", "pip", "install", *_could_not_import]) + if _cp.returncode != 0: + print(generate_command_failure_message(_cp)) + exit(-1) + print(" done", flush=True) + + # check if they were installed successfully + _cp = run( + [ + executable, + "-c", + ";".join([f"import {module}" for module in _could_not_import]), + ] + ) + if _cp.returncode != 0: + print(generate_command_failure_message(_cp)) + + print( + "critical error: post-install check failed. reverting installation...", + end="", + flush=True, + ) + _cp = run([executable, "-m", "pip", "uninstall", *_could_not_import, "-y"]) + if _cp.returncode != 0: + print(generate_command_failure_message(_cp)) + print(" done", flush=True) + + exit(-1) + + elif __name__ == "__main__": + # rerun the script if we're running as one + exit( + run( + [executable, Path(__file__).absolute(), *argv[1:]], capture_output=False + ).returncode + ) + + else: + # we're being imported, raise an error + raise EnvironmentError( + "automatic dependency installation successful" + ) from _could_not_import_exc + +A = TypeVar("A") +B = TypeVar("B") + + +class OneSided(Generic[A, B], NamedTuple): + """ + generic tuple with two elements, a and b, given by a generator + in which element 'a' is a constant and b is from an iterable/iterator + """ + + a: A + b: B + + +def one_sided(a: A, bbb: Iterable[B]) -> Iterator[OneSided[A, B]]: + """ + generator that yields OneSided instances with a constant 'a' element + and elements from the given iterable/iterator 'bbb' as the 'b' element + """ + for b in bbb: + yield OneSided(a, b) + + +@dataclass(eq=True, frozen=True) +class SideStepIgnoreMatcher: + """immutable gitignore matcher""" + + root: Path + # ( + # (.gitignore file directory path, (ignore rule, ...)), + # (.gitignore file directory path, (ignore rule, ...)), + # ... + # ) + rules: tuple[tuple[Path, tuple[IgnoreRule, ...]], ...] = tuple() + + def add_gitignore(self, gitignore: Path) -> "SideStepIgnoreMatcher": + """returns a new SidestepIgnoreMatcher with rules from the given gitignore file""" + + new_ruleset: list[IgnoreRule] = [] + for line_no, line_text in enumerate(gitignore.read_text().splitlines()): + rule = rule_from_pattern( + pattern=line_text.rstrip("\n"), + base_path=Path(abspath(gitignore.parent)), + source=(gitignore, line_no), + ) + if rule: + new_ruleset.append(rule) + + return SideStepIgnoreMatcher( + root=self.root, rules=self.rules + ((gitignore.parent, tuple(new_ruleset)),) + ) + + def match(self, file: Path) -> bool: + """returns True if the file is ignored by any of the rules in the gitignore files, False otherwise""" + matched = False + + # check to see if the gitignore affects the file + for ignore_dir, ruleset in self.rules: + if str(ignore_dir) not in str(file): + continue + if not self._possibly_negated(ruleset): + matched = matched or any(r.match(file) for r in ruleset) + else: + for rule in reversed(ruleset): + if rule.match(file): + matched = matched or not rule.negation + return matched + + def match_trytrytry(self, file: Path) -> Path | None: + """ + same as match, but also checks if the gitignore files ignore any parent directories; + horribly slow and dumb, thus the name 'trytrytry' + + returns the ignored parent path if the file is ignored, None otherwise + """ + + trytrytry: Path = file + while trytrytry != trytrytry.parent: + if self.match(trytrytry): + return trytrytry + if len(self.root.parts) == len(trytrytry.parts): + return None + trytrytry = trytrytry.parent + return None + + @cache + def _possibly_negated(self, ruleset: tuple[IgnoreRule, ...]) -> bool: + return any(rule.negation for rule in ruleset) + + +def _parallel() -> bool: + """ + helper function to determine if we should use multiprocessing; + checks the environment variable SIDESTEP_PARALLEL and the command line arguments + + returns: bool + """ + if SOTA_SIDESTEP_PARALLEL: + return True + elif "--parallel" in argv: + return True + return False + + +def _iter_files( + target: Path, + pattern: str = "*", +) -> Generator[Path, None, None]: + """ + generator that yields files in the target directory excluding '.git/**' + + args: + target: Path + the directory to search in + pattern: str = "*" + the file pattern to search for + + yields: Path + file in the target directory + """ + repo_dir = target.joinpath(".git/") + for target_file in target.rglob(pattern): + if not target_file.is_file(): + continue + if repo_dir in target_file.parents: + continue + yield target_file + + +def iter_files(target_dir: Path) -> tuple[list[Path], SideStepIgnoreMatcher]: + """ + get all non-git files and register .gitignore files + + args: + target_dir: Path + the directory to search in + + returns: tuple[list[Path], SideStepIgnoreMatcher] + list of all files in the target directory and a SideStepIgnoreMatcher instance + """ + + all_files: list[Path] = [] + sim = SideStepIgnoreMatcher(root=target_dir) + + for file in tqdm( + _iter_files(target_dir), + desc="1 pre | finding large files - scanning (1/3)", + leave=False, + ): + all_files.append(file) + if file.name == ".gitignore": + sim = sim.add_gitignore(file) + + return all_files, sim + + +def _filter_sim_match( + os: OneSided[tuple[list[Path], SideStepIgnoreMatcher], Path], +) -> Path | None: + """first filter pass function, thread-safe-ish""" + (ignore_dirs, sim), file = os.a, os.b + + ignored = False + for ign_dir in ignore_dirs: + if str(ign_dir) in str(file): + ignored = True + break + + if (not ignored) and ((ttt := sim.match_trytrytry(file)) is not None): + if ttt.is_dir() and ttt not in ignore_dirs: + ignore_dirs.append(ttt) + return None + return file + + +def _filter_ign_dirs_and_size(os: OneSided[list[Path], Path]) -> Path | None: + """second filter pass function, thread-safe-ish""" + ignore_dirs, file = os.a, os.b + + for ign_dir in ignore_dirs: + if str(ign_dir) in str(file): + return None + else: + # we're here because the file is not ignored by any of the rules + # (the 'else' clause is only executed if the for loop completes without breaking) + if file.stat().st_size > SOTA_SIDESTEP_LARGE_FILE_SIZE: + return file + return None + + +def _find_large_files_single(target: Path) -> list[Path]: + """single-process implementation of find_large_files""" + files, sim = iter_files(target) + ignore_dirs: list[Path] = [] + + _files = [] + for fsm_os in tqdm( + one_sided(a=(ignore_dirs, sim), bbb=files), + desc="1 pre | finding large files - iod-ttt file matching (2/3)", + leave=False, + total=len(files), + ): + if f := _filter_sim_match(fsm_os): + _files.append(f) + + large_files = [] + for fds_os in tqdm( + one_sided(a=ignore_dirs, bbb=_files), + desc="1 pre | finding large files - dir rematching (3/3)", + leave=False, + total=len(_files), + ): + if f := _filter_ign_dirs_and_size(fds_os): + large_files.append(f) + + return large_files + + +def _find_large_files_parallel(target: Path) -> list[Path]: + """multiprocess implementation of find_large_files""" + files, sim = iter_files(target) + manager = Manager() + ignore_dirs: ListProxy[Path] = manager.list() + + _files: list[Path] = [ + f + for f in process_map( + _filter_sim_match, + one_sided(a=(ignore_dirs, sim), bbb=files), + desc="1 pre | finding large files - iod-ttt file matching (2/3)", + leave=False, + chunksize=SOTA_SIDESTEP_CHUNK_SIZE, + max_workers=SOTA_SIDESTEP_MAX_WORKERS, + total=len(files), + ) + if f is not None + ] + + return [ + f + for f in process_map( + _filter_ign_dirs_and_size, + one_sided(a=ignore_dirs, bbb=_files), + desc="1 pre | finding large files - dir rematching (3/3)", + leave=False, + chunksize=SOTA_SIDESTEP_CHUNK_SIZE, + max_workers=SOTA_SIDESTEP_MAX_WORKERS, + total=len(files), + ) + if f is not None + ] + + +def find_large_files(target: Path) -> list[Path]: + """ + finds all files larger than a certain size in a directory; + uses SOTA_SIDESTEP_LARGE_FILE_SIZE as the size threshold + + args: + target_dir: Path + the directory to search in + + returns: list[Path] + list of large files + """ + if _parallel(): + return _find_large_files_parallel(target) + else: + return _find_large_files_single(target) + + +def write_sotaignore(large_files: list[Path]) -> bool: + """ + writes out a .sotaignore file with a list of large files, + updating an existing one if already present + + args: + large_files: list[Path] + list of large files + + returns: bool + True if anything was written, False otherwise (no changes) + """ + if not large_files: + return False + + old_sotaignore = ( + REPO_SOTAIGNORE.read_text().strip().splitlines() + if REPO_SOTAIGNORE.exists() + else [] + ) + + new_sotaignore = [ln for ln in old_sotaignore] + [ + lf.relative_to(REPO_DIR).as_posix() + for lf in large_files + if lf.relative_to(REPO_DIR).as_posix() not in old_sotaignore + ] + + if new_sotaignore == old_sotaignore: + return False + + # check if the sotaignore file starts with a comment + if new_sotaignore and not new_sotaignore[0].startswith("#"): + for line in [ + "# .sotaignore file generated by sota staircase ReStepper/SideStepper", + "# anything here either can't or shouldn't be uploaded github", + "# unless you know what you're doing, don't edit this file! >:(", + ][::-1]: + new_sotaignore.insert(0, line) + + REPO_SOTAIGNORE.touch(exist_ok=True) + REPO_SOTAIGNORE.write_text("\n".join(new_sotaignore) + "\n") + return True + + +def main() -> None: + """command-line entry function""" + + print( + "\nsota staircase SideStepper", + f" repo root : {REPO_DIR.relative_to(Path.cwd())}", + ( + f" .sotaignore : {REPO_SOTAIGNORE.relative_to(Path.cwd())} " + f"({'exists' if REPO_SOTAIGNORE.exists() else 'does not exist'})" + ), + f" parallel? : {'yes' if _parallel() else 'no'}\n", + sep="\n", + file=stderr, + ) + + cumulative_start_time = time() + + print(f"1/2{INDENT}finding large files... ", end="", file=stderr) + start_time = time() + large_files = find_large_files(REPO_DIR) + end_time = time() + print( + f"1/2{INDENT}finding large files... " + f"done in {end_time - start_time:.2f}″ " + f"(found {len(large_files)})", + file=stderr, + ) + + print(f"2/2{INDENT}writing .sotaignore file... ", end="", file=stderr) + start_time = time() + was_written = write_sotaignore(large_files) + end_time = time() + print( + ("done" if was_written else "skipped") + f" in {end_time - start_time:.2f}″\n", + file=stderr, + ) + + for file in large_files: + print(file.relative_to(REPO_DIR)) + + cumulative_end_time = time() + time_taken = cumulative_end_time - cumulative_start_time + time_taken_string: str + if time_taken > 60: + time_taken_string = f"{int(time_taken // 60)}′{int(time_taken % 60)}″" + else: + time_taken_string = f"{time_taken:.2f}″" + print( + f"\n--- done! took {time_taken_string}~ " "☆*: .。. o(≧▽≦)o .。.:*☆ ---", + flush=True, + file=stderr, + ) + + +if __name__ == "__main__": + main() diff --git a/sync.py b/sync.py index a69fc3c..e40a1d5 100644 --- a/sync.py +++ b/sync.py @@ -1,79 +1,132 @@ # sota staircase ReStepper +# forge -> github one-way repo sync script # licence: 0BSD - -from os.path import getsize +from multiprocessing.pool import ThreadPool from pathlib import Path from pprint import pformat -from shutil import copytree -from subprocess import CompletedProcess, run -from sys import argv, stderr +from shutil import copy2, copytree +from subprocess import CompletedProcess +from subprocess import run as _run +from sys import argv, executable from tempfile import TemporaryDirectory from textwrap import indent +from time import time from traceback import format_tb -from typing import Any, Callable, Final, TypeVar +from typing import Callable, Final, TypeVar try: - from gitignore_parser import parse_gitignore # type: ignore -except ImportError: - print( - "critical error: 'gitignore_parser' is not installed, please run 'pip install gitignore-parser' to install it" + from sidestepper import ( + SOTA_SIDESTEP_MAX_WORKERS, + find_large_files, + generate_command_failure_message, + run, + write_sotaignore, ) - exit(1) +except EnvironmentError: + # specific error raised when third-party modules not found, but were automatically + # installed, so we need to restart the script + exit(_run([executable, Path(__file__).absolute(), *argv[1:]]).returncode) + +# we can only guarantee third-party modules are installed after sidestepper +from tqdm import tqdm # constants INDENT: Final[str] = " " - REPO_DIR: Final[Path] = Path(__file__).parent REPO_SOTAIGNORE: Final[Path] = REPO_DIR.joinpath(".sotaignore") REPO_URL_GITHUB: Final[str] = "github.com/markjoshwel/sota" REPO_URL_FORGE: Final[str] = "forge.joshwel.co/mark/sota" - COMMIT_MESSAGE: Final[str] = "chore(restep): sync with forge" COMMIT_AUTHOR: Final[str] = "sota staircase ReStepper " - NEUTERED_GITATTRIBUTES: Final[str] = ( """# auto detect text files and perform lf normalization\n* text=auto\n""" ) -# generics because i <3 static types -Rc = TypeVar("Rc") - # dictionary to share state across steps r: dict[str, str] = {} +R = TypeVar("R") -def _default_post_func(rc: Rc) -> Rc: + +class CopyHighway: """ - default post-call function for steps, does nothing + multithreaded file copying class that gives a copy2-like function + for use with shutil.copytree(); also displays a progress bar + """ + + def __init__(self, message: str, total: int): + """ + multithreaded file copying class that gives a copy2-like function + for use with shutil.copytree() + + args: + message: str + message to display in the progress bar + total: int + total number of files to copy + """ + self.pool = ThreadPool( + processes=SOTA_SIDESTEP_MAX_WORKERS, + ) + self.pbar = tqdm( + total=total, + desc=message, + unit=" files", + leave=False, + ) + + def callback(self, a: R): + self.pbar.update() + return a + + def copy2(self, source: str, dest: str): + """shutil.copy2()-like function for use with shutil.copytree()""" + self.pool.apply_async(copy2, args=(source, dest), callback=self.callback) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.pool.close() + self.pool.join() + self.pbar.close() + + +def _default_post_func(cp: R) -> R: + """ + default post-call function for steps; does nothing for steps that return a CompletedProcess, this function will run the `_command_post_func` function args: - rc: Rc + cp: R return object from a step function + + returns: R + the return object from the step function """ - if isinstance(rc, CompletedProcess): - _command_post_func(rc) - return rc + if isinstance(cp, CompletedProcess): + _command_post_func(cp) + return cp def _command_post_func( - rc: CompletedProcess, + cp: CompletedProcess, fail_on_error: bool = True, quit_early: bool = False, quit_message: str = "the command gave unexpected output", ) -> CompletedProcess: """ - default post-call function for command steps, checks if the command was + default post-call function for command steps; checks if the command was successful and prints the output if it wasn't if the command was successful, the stdout and stderr are stored in the shared state dictionary r under 'stdout' and 'stderr' respectively args: - rc: CompletedProcess - return object from subprocess.run + cp: CompletedProcess + return object from subprocess.run() fail_on_error: bool whether to fail on error quit_early: bool @@ -81,169 +134,87 @@ def _command_post_func( quit_message: str the message to print if quitting early - returns: - CompletedProcess - the return object from subprocess.run + returns: CompletedProcess + the return object from subprocess.run() """ if quit_early: - print(f"\n\nfailure: {quit_message}\n", file=stderr) + print(f"\n\nfailure: {quit_message}\n") else: - r["stdout"] = rc.stdout.decode() if isinstance(rc.stdout, bytes) else "\0" - r["stderr"] = rc.stderr.decode() if isinstance(rc.stderr, bytes) else "\0" + r["stdout"] = cp.stdout.decode() if isinstance(cp.stdout, bytes) else "\0" + r["stderr"] = cp.stderr.decode() if isinstance(cp.stderr, bytes) else "\0" r["blank/stdout"] = "yes" if (r["stdout"].strip() == "") else "" r["blank/stderr"] = "yes" if (r["stderr"].strip() == "") else "" r["blank"] = "yes" if (r["blank/stdout"] and r["blank/stderr"]) else "" - r["errored"] = "" if (rc.returncode == 0) else str(rc.returncode) + r["errored"] = "" if (cp.returncode == 0) else str(cp.returncode) # return if the command was successful # or if we're not failing on error - if (rc.returncode == 0) or (not fail_on_error): - return rc - + if (cp.returncode == 0) or (not fail_on_error): + return cp else: - print( - f"\n\nfailure: command '{rc.args}' failed with exit code {rc.returncode}", - f"{INDENT}stdout:", - ( - indent(text=rc.stdout.decode(), prefix=f"{INDENT}{INDENT}") - if (isinstance(rc.stdout, bytes) and (rc.stdout != b"")) - else f"{INDENT}{INDENT}(no output)" - ), - f"{INDENT}stderr:", - ( - indent(text=rc.stderr.decode(), prefix=f"{INDENT}{INDENT}") - if (isinstance(rc.stderr, bytes) and (rc.stderr != b"")) - else f"{INDENT}{INDENT}(no output)" - ) - + "\n", - sep="\n", - ) + print(generate_command_failure_message(cp)) exit( - rc.returncode if (isinstance(rc.returncode, int) and rc.returncode != 0) else 1 + cp.returncode if (isinstance(cp.returncode, int) and cp.returncode != 0) else 1 ) -def get_large_files(target_dir: Path, max_bytes: int = 100000000) -> list[Path]: +def post_filter_repo_check(cp: CompletedProcess) -> CompletedProcess: """ - recursively iterate through a directory and find files that are over a - certain size, respecting any .gitignore files - - args: - target_dir: Path - the directory to search - max_bytes: int - the maximum size in bytes - - returns: - list[Path] - list of large files + post-call function for checking if git-filter-repo is installed + and optionally installing it if it isn't """ - gitignore_matchers: dict[Path, Callable[[Any], bool]] = {} - large_files: list[Path] = [] - all_files: list[Path] = [] - for f in target_dir.rglob("*"): - if not f.is_file(): - continue - if str(REPO_DIR.joinpath(".git")) in str(f.parent): - continue - all_files.append(f) + if cp.returncode == 0: + return cp - target_dir_gitignore = target_dir.joinpath(".gitignore") - if not target_dir_gitignore.exists(): - return [] - - # first pass: check for .gitignore files - for repo_file in all_files: - # is this not a .gitignore file? skip - if repo_file.name != ".gitignore": - continue - - # if we're here, the file is a .gitignore file - # add it to the parser - gitignore_matchers[repo_file.parent] = parse_gitignore( - repo_file, base_dir=repo_file.parent + if input("git filter-repo is not installed, install it? y/n: ").lower() != "y": + print( + "install it using 'pip install git-filter-repo' " + "or 'pipx install git-filter-repo'", ) + return cp - for repo_file in all_files: - # if the file is a directory, skip - # if not repo_file.is_file(): - # continue + # check if pipx is installed + use_pipx = False - # # if we're in the .git directory, skip - # if str(REPO_DIR.joinpath(".git/")) in str(repo_file): - # continue + check_pipx_cp = run(["pipx", "--version"]) + if check_pipx_cp.returncode == 0: + use_pipx = True + else: + run([executable, "-m", "pip", "install", "pipx"]) - # check if it's ignored - for ignore_dir, matcher in gitignore_matchers.items(): - # if we're not in the ignore directory, skip - if str(ignore_dir) not in str(repo_file): - continue + # double check + check_pipx_cp = run(["pipx", "--version"]) + if check_pipx_cp.returncode == 0: + use_pipx = True + # if pipx still can't be found, might be some environment fuckery - # if the file is ignored, skip - if matcher(repo_file): - # print("ignored:", repo_file) - continue - - # if we're here, the file is not ignored - # check if it's over 100mb - - if getsize(repo_file) > 100000000: - large_files.append(repo_file) - - return large_files - - -def generate_sotaignore(large_files: list[Path]) -> None: - """ - generate a .sotaignore file from a list of large files and the existing - .sotaignore file - - args: - large_files: list[Path] - list of large files - """ - - old_sotaignore = ( - REPO_SOTAIGNORE.read_text().strip().splitlines() - if REPO_SOTAIGNORE.exists() - else [] + # install git-filter-repo + pip_invocation: list[str] = ["pipx"] if use_pipx else [executable, "-m", "pip"] + print( + f"running '{' '.join([*pip_invocation, "install", "git-filter-repo"])}'... ", + end="", ) + install_rc = run([*pip_invocation, "install", "git-filter-repo"]) + if install_rc.returncode != 0: + print("error") + _command_post_func(install_rc) + else: + print("done\n") - new_sotaignore = [ln for ln in old_sotaignore] + [ - lf.relative_to(REPO_DIR).as_posix() - for lf in large_files - if lf.relative_to(REPO_DIR).as_posix() not in old_sotaignore - ] - - # check if the sotaignore file starts with a comment - - if new_sotaignore and not new_sotaignore[0].startswith("#"): - new_sotaignore.insert( - 0, - "# unless you know what you're doing, don't edit this file", - ) - new_sotaignore.insert( - 0, - "# anything here either can't or shouldn't be uploaded github", - ) - new_sotaignore.insert( - 0, - "#", - ) - new_sotaignore.insert( - 0, - "# .sotaignore file generated by sota staircase ReStepper", + # check if it is reachable + if run(["git", "filter-repo", "--version"]).returncode != 0: + # revert + run([*pip_invocation, "uninstall", "git-filter-repo"]) + print( + "failure: could not install git-filter-repo automatically. " + "do it yourself o(*≧▽≦)ツ┏━┓" ) - if new_sotaignore == []: - return - - REPO_SOTAIGNORE.touch(exist_ok=True) - REPO_SOTAIGNORE.write_text("\n".join(new_sotaignore) + "\n", encoding="utf-8") + return cp def rewrite_gitattributes(target_dir: Path) -> None: @@ -260,101 +231,94 @@ def rewrite_gitattributes(target_dir: Path) -> None: repo_file.write_text(NEUTERED_GITATTRIBUTES, encoding="utf-8") -# helper function for running steps def step( - func: Callable[[], Rc], + func: Callable[[], R], desc: str = "", - post_func: Callable[[Rc], Rc] = _default_post_func, -) -> Rc: + post_func: Callable[[R], R] = _default_post_func, + post_print: bool = True, +) -> R: """ helper function for running steps args: desc: str description of the step - func: Callable[[], Rc] + func: Callable[[], R] function to run - post_func: Callable[[Rc], Rc] - post function to run after func + post_func: Callable[[R], R] + post-function to run after func + post_print: bool + whether to print done after the step returns: - Rc + R return object from func """ # run the function if desc != "": - print(f"{desc}..", end="", file=stderr) - stderr.flush() + print(f"{desc}..", end="", flush=True) + + start_time = time() try: - rc = func() + cp = func() except Exception as exc: print( f"\n\nfailure running step: {exc} ({exc.__class__.__name__})", "\n".join(format_tb(exc.__traceback__)) + "\n", - file=stderr, sep="\n", ) exit(1) if desc != "": - print(".", end="", file=stderr) - stderr.flush() + print(".", end="", flush=True) - # run the post function + # run the post-function try: - rp = post_func(rc) + rp = post_func(cp) except Exception as exc: print( f"\n\nfailure running post-step: {exc} ({exc.__class__.__name__})", "\n".join(format_tb(exc.__traceback__)) + "\n", - file=stderr, sep="\n", ) exit(1) + end_time = time() + # yay - if desc != "": - print(" done", file=stderr) - stderr.flush() + if desc != "" and post_print: + print(f" done in {end_time - start_time:.2f}″", flush=True) return rp -def post_remote_v(rc: CompletedProcess) -> CompletedProcess: +def post_remote_v(cp: CompletedProcess) -> CompletedProcess: """ post-call function for 'git remote -v' command, parses the output and checks for the forge and github remotes, storing them in the shared state under 'remote/forge', 'remote/forge/url', 'remote/github', and 'remote/github/url' respectively - - args: - rc: CompletedProcess - return object from subprocess.run - - returns: - CompletedProcess - return object from subprocess.run """ - if not isinstance(rc.stdout, bytes): - return _command_post_func(rc) + if not isinstance(cp.stdout, bytes): + return _command_post_func(cp) - for line in rc.stdout.decode().split("\n"): + for line in cp.stdout.decode().split("\n"): # github https://github.com/markjoshwel/sota (fetch) # github https://github.com/markjoshwel/sota (push) # origin https://forge.joshwel.co/mark/sota.git (fetch) # origin https://forge.joshwel.co/mark/sota.git (push) - sline = line.split(maxsplit=1) + split_line = line.split(maxsplit=1) if len(line) < 2: continue # remote='origin' url='https://forge.joshwel.co/mark/sota.git (fetch)' - remote, url = sline + remote, url = split_line # clean up the url if (REPO_URL_FORGE in url) or (REPO_URL_GITHUB in url): @@ -369,7 +333,7 @@ def post_remote_v(rc: CompletedProcess) -> CompletedProcess: r["remote/github"] = remote r["remote/github/url"] = url - return _command_post_func(rc) + return _command_post_func(cp) def err(message: str, exc: Exception | None = None) -> None: @@ -398,7 +362,6 @@ def err(message: str, exc: Exception | None = None) -> None: ) ) + (indent(text=pformat(r), prefix=INDENT) + "\n"), - file=stderr, sep="\n", ) exit(1) @@ -409,6 +372,7 @@ def main() -> None: command line entry point """ + cumulative_start_time = time() with TemporaryDirectory(delete="--keep" not in argv) as dir_temp: print( "\nsota staircase ReStepper\n" @@ -420,53 +384,76 @@ def main() -> None: # helper partial function for command def cmd( - command: str, wd: Path | str = dir_temp, **kwargs + command: str, + wd: Path | str = dir_temp, + capture_output: bool = True, + give_input: str | None = None, ) -> Callable[[], CompletedProcess]: return lambda: run( command, - shell=True, cwd=wd, - capture_output=True, - **kwargs, + capture_output=capture_output, + give_input=give_input, ) step( func=cmd("git filter-repo --version"), - post_func=lambda rc: _command_post_func( - rc, - quit_early=rc.returncode != 0, - quit_message="git filter-repo is not installed, install it using 'pip install git-filter-repo' or 'pipx install git-filter-repo'", - ), + post_func=post_filter_repo_check, ) - step(func=cmd("git status --porcelain", wd=REPO_DIR)) + step(cmd("git status --porcelain", wd=REPO_DIR)) if (not r["blank"]) and ("--iknowwhatimdoing" not in argv): err( "critical error: repository is not clean, please commit changes first", ) - step( - desc="1 pre\tgenerating .sotaignore", - func=lambda: generate_sotaignore(get_large_files(REPO_DIR)), - ) + if "--skipsotaignoregen" not in argv: + (print("1 pre | finding large files", end="", flush=True),) + start_time = time() + large_files = find_large_files(REPO_DIR) + end_time = time() + print( + "1 pre | finding large files... " + f"done in {end_time - start_time:.2f}″ (found {len(large_files)})" + ) - step( - desc="2 pre\tduplicating repo", - func=lambda: ( - copytree( - src=REPO_DIR, - dst=dir_temp, - dirs_exist_ok=True, + if large_files: + start_time = time() + was_written = step( + desc="2 pre | writing .sotaignore", + func=lambda: write_sotaignore(large_files), + post_func=lambda cp: cp, + post_print=False, ) - ), + end_time = time() + if was_written: + print(f" done in {end_time - start_time:.2f}″") + else: + print(" not needed") + + print("3 pre | duplicating repo... pre-scanning", end="", flush=True) + + start_time = time() + with CopyHighway( + "3 pre | duplicating repo", total=len(list(REPO_DIR.rglob("*"))) + ) as copier: + copytree( + src=REPO_DIR, + dst=dir_temp, + copy_function=copier.copy2, + dirs_exist_ok=True, + ) + end_time = time() + print( + f"3 pre | duplicating repo... done in {end_time - start_time:.2f}″", + flush=True, ) - step( - func=cmd('python -c "import pathlib; print(pathlib.Path.cwd().absolute())"') - ) + step(cmd('python -c "import pathlib; print(pathlib.Path.cwd().absolute())"')) if str(Path(dir_temp).absolute()) != r["stdout"].strip(): err( - f"critical error (whuh? internal?): not inside the temp dir '{str(Path(dir_temp).absolute())}'" + "critical error (whuh? internal?): " + f"not inside the temp dir '{str(Path(dir_temp).absolute())}'" ) # check for forge and github remotes @@ -478,31 +465,31 @@ def main() -> None: err("critical error (whuh?): no forge remote found") # get the current branch - step( - func=cmd("git branch --show-current"), - ) + step(cmd("git branch --show-current")) branch = r["stdout"].strip() if r.get("errored", "yes") or branch == "": err("critical error (whuh?): couldn't get current branch") - step(func=cmd(f"git fetch {r['remote/forge']}")) - step(func=cmd(f"git rev-list HEAD...{r['remote/forge']}/{branch} --count")) + step(cmd(f"git fetch {r['remote/forge']}")) + step(cmd(f"git rev-list HEAD...{r['remote/forge']}/{branch} --count")) if (r.get("stdout", "").strip() != "0") and ("--dirty" not in argv): err( - "critical error (whuh?): not up to date with forge... sync your changes first?" + "critical error (whuh?): " + "not up to date with forge... sync your changes first?" ) - step(desc="3 lfs\tfetch lfs objects", func=cmd("git lfs fetch")) + step(desc="4 lfs | fetch lfs objects", func=cmd("git lfs fetch")) step( - desc="4 lfs\tmigrating lfs objects", + desc="5 lfs | migrating lfs objects", func=cmd( - 'git lfs migrate export --everything --include="*" --remote=origin' + 'git lfs migrate export --everything --include="*" --remote=origin', + give_input="y\n", ), ) step( - desc="5 lfs\tuninstall lfs in repo", + desc="6 lfs | uninstall lfs in repo", func=cmd("git lfs uninstall"), ) @@ -511,42 +498,50 @@ def main() -> None: ) if not r["blank"]: err( - "critical error (whuh? internal?): lfs objects still exist post-migrate and uninstall" + "critical error (whuh? internal?): " + "lfs objects still exist post-migrate and uninstall" ) - temp_sotaignore = Path(dir_temp).joinpath(".sotaignore") - - if temp_sotaignore.exists(): + if REPO_SOTAIGNORE.exists(): try: - sotaignore = temp_sotaignore.read_text(encoding="utf-8").strip() + sotaignore = REPO_SOTAIGNORE.read_text(encoding="utf-8").strip() except Exception as exc: err("critical error: couldn't read .sotaignore file", exc=exc) - sotaignore_large_files: list[str] = [ + sotaignored_files: list[str] = [ line for line in sotaignore.splitlines() if not line.startswith("#") and line.strip() != "" ] - # FUTURE: if this becomes slow, start chunking --path arguments - # https://stackoverflow.com/questions/43762338/how-to-remove-file-from-git-history + step( + desc=f"7 lfs | filtering {len(sotaignored_files)} file(s)", + func=cmd( + "git filter-repo --force --invert-paths " + + " ".join(f'--path ""{lf}' "" for lf in sotaignored_files) + ), + ) - for n, lf in enumerate(sotaignore_large_files, start=1): - step( - desc=f"6 lfs\tfilter ({n}/{len(sotaignore_large_files)}) - {lf}", - func=cmd(f'git filter-repo --force --invert-paths --path "{lf}"'), - ) + # also copy to the temp repo; step 5 (lfs migrate) wipes uncommitted changes + copy2(REPO_SOTAIGNORE, Path(dir_temp).joinpath(".sotaignore")) step( - desc="7 fin\tneuter .gitattributes", + desc="8 fin | neuter .gitattributes", func=lambda: rewrite_gitattributes(Path(dir_temp)), ) + def add_and_commit() -> CompletedProcess: + cp = cmd("git add *")() + if cp.returncode != 0: + return cp + return cmd( + "git commit --allow-empty " + f'-am "{COMMIT_MESSAGE}" --author="{COMMIT_AUTHOR}"', + )() + step( - desc="8 fin\tcommit", - func=cmd( - f"""git commit -am "{COMMIT_MESSAGE}" --author="{COMMIT_AUTHOR}" --allow-empty""", - ), + desc="9 fin | commit", + func=add_and_commit, ) if r.get("remote/github") is None: @@ -558,7 +553,7 @@ def main() -> None: r["remote/github"] = "github" step( - desc=f"9 fin\tpushing to github/{branch}", + desc=f"X fin | pushing to github/{branch}", func=cmd( f"git push {r['remote/github']} {branch} --force" if ("--test" not in argv) @@ -566,12 +561,17 @@ def main() -> None: ), ) - step( - desc="X fin\tcleanup", - func=lambda: None, - ) - - print("\n--- done! ☆*: .。. o(≧▽≦)o .。.:*☆ ---\n", file=stderr) + cumulative_end_time = time() + time_taken = cumulative_end_time - cumulative_start_time + time_taken_string: str + if time_taken > 60: + time_taken_string = f"{int(time_taken // 60)}′{int(time_taken % 60)}″" + else: + time_taken_string = f"{time_taken:.2f}″" + print( + f"\n--- done! took {time_taken_string}~ " "☆*: .。. o(≧▽≦)o .。.:*☆ ---", + flush=True, + ) if __name__ == "__main__":