docs,tooling: cooler restepper + sidestepper

- docs: added detailed docs on restepper and sidestepper
- re/sidestepper: command inovations are safer
- re/sidestepper: option to automatically install dependencies
- sidestepper: behaviour is now correct + multiprocessing option
- restepper: rely on sidestepper for a few functions
- restepper: multithreaded repo duplication
- restepper: chunk filtering into a single command
This commit is contained in:
Mark Joshwel 2024-07-15 03:54:14 +08:00
parent f572cef7b3
commit 8966008025
3 changed files with 910 additions and 280 deletions

121
README.md
View file

@ -14,14 +14,16 @@ Submission Mirror: <https://github.com/markjoshwel/sota>
| sai | lead 3d artist | quality checker | @sai-thinks | @sippy-thinks |
- [Handbook](#handbook)
- [on 3D Modelling (Maya, Blender, ZBrush, etc.)](#on-3d-modelling-maya-blender-zbrush-etc)
- [on Graphic and UI/UX Design](#on-graphic-and-uiux-design)
- [on Game Development](#on-game-development)
- [on Game and Level Design](#on-game-and-level-design)
- [on Documentation (for All Modules)](#on-documentation-for-all-modules)
- [on Repository Syncing](#on-repository-syncing)
- [on 3D Modelling (Maya, Blender, ZBrush, etc.)](#on-3d-modelling-maya-blender-zbrush-etc)
- [on Graphic and UI/UX Design](#on-graphic-and-uiux-design)
- [on Game Development](#on-game-development)
- [on Game and Level Design](#on-game-and-level-design)
- [on Documentation (for All Modules)](#on-documentation-for-all-modules)
- [on Repository Syncing](#on-repository-syncing)
- [Syncing to GitHub via ReStepper](#syncing-to-github-via-restepper)
- [SideStepper and the .sotaignore file](#sidestepper-and-the-sotaignore-file)
- [Licence and Credits](#licence-and-credits)
- [Third-party Licences](#third-party-licences)
- [Third-party Licences](#third-party-licences)
## Handbook
@ -33,7 +35,7 @@ Submission Mirror: <https://github.com/markjoshwel/sota>
design-as-in-modelling your assets with modularity in mind, anything that can
be modular should be modular
design-as-in-look should be checked with the group
design-as-in-look should be checked with the group
structure your files similarly:
@ -63,7 +65,8 @@ Modelling
|:----:|:----:|
if it involves the brand:
follow the brand guidelines at [Documentation/sota staircase Brand Guidelines.pdf](Documentation/sota%20staircase%20Brand%20Guidelines.pdf)
follow the brand guidelines
at [Documentation/sota staircase Brand Guidelines.pdf](Documentation/sota%20staircase%20Brand%20Guidelines.pdf)
and then send it to mark for approval (●'◡'●)
@ -105,7 +108,8 @@ on [the forge](https://forge.joshwel.co/mark/sota/issues)
| Lead | kinda everyone more so mark |
|:----:|:---------------------------:|
follow the brand guidelines at [Documentation/sota staircase Brand Guidelines.pdf](Documentation/sota%20staircase%20Brand%20Guidelines.pdf)
follow the brand guidelines
at [Documentation/sota staircase Brand Guidelines.pdf](Documentation/sota%20staircase%20Brand%20Guidelines.pdf)
source files (.docx, .fig, etc.) should be in the respective modules' directory,
and then exported as .pdfs to `Documentation/*.pdf`
@ -115,18 +119,89 @@ and then exported as .pdfs to `Documentation/*.pdf`
| Wizard | Mark |
|:------:|:----:|
#### Syncing to GitHub via ReStepper
instructions:
```text
python restepper.py
python sync.py
```
if it screams at you, fix them
if it screams at you, fix them
if it breaks, refer to the resident "wizard"
for what the script does, see the script itself: [sync.py](sync.py)
##### Advanced Usage
you probably don't need to ever use these :p
the following environment variables can be set:
- `SOTA_SIDESTEP_MAX_WORKERS`
how many workers to use for repository duplication,
default is how many cpu threads are available
the following command line arguments can be used:
- `--skipsotaignoregen`
skips generating a `.sotaignore` file,
useful if you _know_ you've already generated one beforehand
- `--test`
does everything except actually pushing to github
there's more, but god forbid you need to use them unless you're changing the script,
search for `argv` in the script if you're comfortable with dragons
#### SideStepper and the .sotaignore file
the `.sotaignore` file is a file that tells the sync script what to ignore when syncing
to github, and should be in the root of the repository
it is automatically generated by the sync script and should not be manually edited
unless there's a file we want to exclude
any file over 100MB is automatically added when running ReStepper (the sync script) or
SideStepper (the script that generates the `.sotaignore` file)
to manually generate this without syncing, run:
```text
python sidestepper.py
```
we may or may not want to add the contents of the `.sotaignore` file to the `.gitignore`
but that's probably better off as a case-by-case basis type thing
for what the script does, see the script itself: [sidestepper.py](sidestepper.py)
##### Advanced Usage
you probably don't need to ever use these :p
the following environment variables can be set:
- `SOTA_SIDESTEP_CHUNK_SIZE`
how many files to chunk for file finding, default is 16
- `SOTA_SIDESTEP_MAX_WORKERS`
how many workers to use for file finding,
default is how many cpu threads are available
- `SOTA_SIDESTEP_PARALLEL`
whether to use multiprocessing for large file finding, default is false
hilariously it's ~4-5x slower than single-threaded file finding, but the option
is still present because it was made before the fourth implementation of
the large file finding algorithm
(now called SideStepper because names are fun, sue me)
the following command line arguments can be used:
- `--parallel`
same behaviour as setting the `SOTA_SIDESTEP_PARALLEL` environment variable
## Licence and Credits
"NP resources" hereby refers to resources provided by Ngee Ann Polytechnic (NP) for the
@ -145,15 +220,15 @@ development of the project
specifically coming from, or in part have had the following software and/or
services involved:
- Autodesk Maya
- Adobe Substance 3D
- Substance 3D Modeler
- Substance 3D Sampler
- Substance 3D Designer
- Substance 3D Painter
- Substance 3D Stager
- Substance 3D Assets
- Autodesk Maya
- Adobe Substance 3D
- Substance 3D Modeler
- Substance 3D Sampler
- Substance 3D Designer
- Substance 3D Painter
- Substance 3D Stager
- Substance 3D Assets
would be all rights reserved, unless otherwise stated
(_i mean mr q said this already lol_)
@ -186,6 +261,6 @@ exceptions to the above licences are as follows:
> Example:
>
> - Frogman by Frog Creator: Standard Unity Asset Store EULA (Extension Asset)
> `Assets/Characters/Frogman`
> `Assets/Characters/Frogman`
>
> comma-separate multiple licences, and use code blocks if you need to list multiple files/directories/patterns

555
sidestepper.py Normal file
View file

@ -0,0 +1,555 @@
# sota staircase SideStepper
# a somewhat fast .gitignore-respecting large file finder
# licence: 0BSD
from dataclasses import dataclass
from functools import cache
from multiprocessing import Manager, cpu_count
# noinspection PyProtectedMember
from multiprocessing.managers import ListProxy
from os import getenv
from os.path import abspath
from pathlib import Path
from subprocess import CompletedProcess
from subprocess import run as _run
from sys import argv, executable, stderr
from textwrap import indent
from time import time
from traceback import format_tb
from typing import Final, Generator, Generic, Iterable, Iterator, NamedTuple, TypeVar
# constants
INDENT = " "
REPO_DIR: Final[Path] = Path(__file__).parent
REPO_SOTAIGNORE: Final[Path] = REPO_DIR.joinpath(".sotaignore")
_SOTA_SIDESTEP_CHUNK_SIZE = getenv("SIDESTEP_CHUNK_SIZE")
SOTA_SIDESTEP_CHUNK_SIZE: Final[int] = (
int(_SOTA_SIDESTEP_CHUNK_SIZE)
if (
(_SOTA_SIDESTEP_CHUNK_SIZE is not None)
and (_SOTA_SIDESTEP_CHUNK_SIZE.isdigit())
)
else 16
)
_SOTA_SIDESTEP_MAX_WORKERS = getenv("SIDESTEP_MAX_WORKERS")
SOTA_SIDESTEP_MAX_WORKERS: Final[int] = (
int(_SOTA_SIDESTEP_MAX_WORKERS)
if (
(_SOTA_SIDESTEP_MAX_WORKERS is not None)
and (_SOTA_SIDESTEP_MAX_WORKERS.isdigit())
)
else cpu_count()
)
SOTA_SIDESTEP_LARGE_FILE_SIZE: Final[int] = 100000000 # 100mb
SOTA_SIDESTEP_PARALLEL: Final[bool] = getenv("SIDESTEP_PARALLEL") is not None
# define these before importing third-party modules because we use them in the import check
def generate_command_failure_message(cp: CompletedProcess) -> str:
return "\n".join(
[
f"\n\nfailure: command '{cp.args}' failed with exit code {cp.returncode}",
f"{INDENT}stdout:",
(
indent(text=cp.stdout.decode(), prefix=f"{INDENT}{INDENT}")
if (isinstance(cp.stdout, bytes) and (cp.stdout != b""))
else f"{INDENT}{INDENT}(no output)"
),
f"{INDENT}stderr:",
(
indent(text=cp.stderr.decode(), prefix=f"{INDENT}{INDENT}")
if (isinstance(cp.stderr, bytes) and (cp.stderr != b""))
else f"{INDENT}{INDENT}(no output)"
)
+ "\n",
]
)
def run(
command: str | list,
cwd: Path | str | None = None,
capture_output: bool = True,
give_input: str | None = None,
) -> CompletedProcess:
"""
exception-safe-ish wrapper around subprocess.run()
args:
command: str | list
the command to run
cwd: Path | str | None = None
the working directory
capture_output: bool = True
whether to capture the output
returns: CompletedProcess
the return object from subprocess.run()
"""
# noinspection PyBroadException
try:
cp = _run(
command,
shell=True if isinstance(command, list) else False,
cwd=cwd,
capture_output=capture_output,
input=give_input.encode() if give_input else None,
)
except Exception as run_exc:
print(
f"\n\nfailure: command '{command}' failed with exception",
f"{INDENT}{run_exc.__class__.__name__}: {run_exc}",
indent(text="\n".join(format_tb(run_exc.__traceback__)), prefix=INDENT),
sep="\n",
)
exit(-1)
return cp
# attempt to import third-party modules
# if they're not installed, prompt the user to optionally install them automatically
_could_not_import: list[str] = []
_could_not_import_exc: Exception | None = None
try:
from gitignore_parser import IgnoreRule, rule_from_pattern # type: ignore
except ImportError as _import_exc:
_could_not_import.append("gitignore_parser")
_could_not_import_exc = _import_exc
try:
# noinspection PyUnresolvedReferences
from tqdm import tqdm
# noinspection PyUnresolvedReferences
from tqdm.contrib.concurrent import process_map
except ImportError as _import_exc:
_could_not_import.append("tqdm")
_could_not_import_exc = _import_exc
if _could_not_import:
for module in _could_not_import:
print(
f"critical error: '{module}' is not installed, "
f"please run 'pip install {module}' to install it",
)
# install the missing modules
if input("\ninstall these with pip? y/n: ").lower() == "y":
print("installing...", end="", flush=True)
_cp = run([executable, "-m", "pip", "install", *_could_not_import])
if _cp.returncode != 0:
print(generate_command_failure_message(_cp))
exit(-1)
print(" done", flush=True)
# check if they were installed successfully
_cp = run(
[
executable,
"-c",
";".join([f"import {module}" for module in _could_not_import]),
]
)
if _cp.returncode != 0:
print(generate_command_failure_message(_cp))
print(
"critical error: post-install check failed. reverting installation...",
end="",
flush=True,
)
_cp = run([executable, "-m", "pip", "uninstall", *_could_not_import, "-y"])
if _cp.returncode != 0:
print(generate_command_failure_message(_cp))
print(" done", flush=True)
exit(-1)
elif __name__ == "__main__":
# rerun the script if we're running as one
exit(
run(
[executable, Path(__file__).absolute(), *argv[1:]], capture_output=False
).returncode
)
else:
# we're being imported, raise an error
raise EnvironmentError(
"automatic dependency installation successful"
) from _could_not_import_exc
A = TypeVar("A")
B = TypeVar("B")
class OneSided(Generic[A, B], NamedTuple):
"""
generic tuple with two elements, a and b, given by a generator
in which element 'a' is a constant and b is from an iterable/iterator
"""
a: A
b: B
def one_sided(a: A, bbb: Iterable[B]) -> Iterator[OneSided[A, B]]:
"""
generator that yields OneSided instances with a constant 'a' element
and elements from the given iterable/iterator 'bbb' as the 'b' element
"""
for b in bbb:
yield OneSided(a, b)
@dataclass(eq=True, frozen=True)
class SideStepIgnoreMatcher:
"""immutable gitignore matcher"""
root: Path
# (
# (.gitignore file directory path, (ignore rule, ...)),
# (.gitignore file directory path, (ignore rule, ...)),
# ...
# )
rules: tuple[tuple[Path, tuple[IgnoreRule, ...]], ...] = tuple()
def add_gitignore(self, gitignore: Path) -> "SideStepIgnoreMatcher":
"""returns a new SidestepIgnoreMatcher with rules from the given gitignore file"""
new_ruleset: list[IgnoreRule] = []
for line_no, line_text in enumerate(gitignore.read_text().splitlines()):
rule = rule_from_pattern(
pattern=line_text.rstrip("\n"),
base_path=Path(abspath(gitignore.parent)),
source=(gitignore, line_no),
)
if rule:
new_ruleset.append(rule)
return SideStepIgnoreMatcher(
root=self.root, rules=self.rules + ((gitignore.parent, tuple(new_ruleset)),)
)
def match(self, file: Path) -> bool:
"""returns True if the file is ignored by any of the rules in the gitignore files, False otherwise"""
matched = False
# check to see if the gitignore affects the file
for ignore_dir, ruleset in self.rules:
if str(ignore_dir) not in str(file):
continue
if not self._possibly_negated(ruleset):
matched = matched or any(r.match(file) for r in ruleset)
else:
for rule in reversed(ruleset):
if rule.match(file):
matched = matched or not rule.negation
return matched
def match_trytrytry(self, file: Path) -> Path | None:
"""
same as match, but also checks if the gitignore files ignore any parent directories;
horribly slow and dumb, thus the name 'trytrytry'
returns the ignored parent path if the file is ignored, None otherwise
"""
trytrytry: Path = file
while trytrytry != trytrytry.parent:
if self.match(trytrytry):
return trytrytry
if len(self.root.parts) == len(trytrytry.parts):
return None
trytrytry = trytrytry.parent
return None
@cache
def _possibly_negated(self, ruleset: tuple[IgnoreRule, ...]) -> bool:
return any(rule.negation for rule in ruleset)
def _parallel() -> bool:
"""
helper function to determine if we should use multiprocessing;
checks the environment variable SIDESTEP_PARALLEL and the command line arguments
returns: bool
"""
if SOTA_SIDESTEP_PARALLEL:
return True
elif "--parallel" in argv:
return True
return False
def _iter_files(
target: Path,
pattern: str = "*",
) -> Generator[Path, None, None]:
"""
generator that yields files in the target directory excluding '.git/**'
args:
target: Path
the directory to search in
pattern: str = "*"
the file pattern to search for
yields: Path
file in the target directory
"""
repo_dir = target.joinpath(".git/")
for target_file in target.rglob(pattern):
if not target_file.is_file():
continue
if repo_dir in target_file.parents:
continue
yield target_file
def iter_files(target_dir: Path) -> tuple[list[Path], SideStepIgnoreMatcher]:
"""
get all non-git files and register .gitignore files
args:
target_dir: Path
the directory to search in
returns: tuple[list[Path], SideStepIgnoreMatcher]
list of all files in the target directory and a SideStepIgnoreMatcher instance
"""
all_files: list[Path] = []
sim = SideStepIgnoreMatcher(root=target_dir)
for file in tqdm(
_iter_files(target_dir),
desc="1 pre | finding large files - scanning (1/3)",
leave=False,
):
all_files.append(file)
if file.name == ".gitignore":
sim = sim.add_gitignore(file)
return all_files, sim
def _filter_sim_match(
os: OneSided[tuple[list[Path], SideStepIgnoreMatcher], Path],
) -> Path | None:
"""first filter pass function, thread-safe-ish"""
(ignore_dirs, sim), file = os.a, os.b
ignored = False
for ign_dir in ignore_dirs:
if str(ign_dir) in str(file):
ignored = True
break
if (not ignored) and ((ttt := sim.match_trytrytry(file)) is not None):
if ttt.is_dir() and ttt not in ignore_dirs:
ignore_dirs.append(ttt)
return None
return file
def _filter_ign_dirs_and_size(os: OneSided[list[Path], Path]) -> Path | None:
"""second filter pass function, thread-safe-ish"""
ignore_dirs, file = os.a, os.b
for ign_dir in ignore_dirs:
if str(ign_dir) in str(file):
return None
else:
# we're here because the file is not ignored by any of the rules
# (the 'else' clause is only executed if the for loop completes without breaking)
if file.stat().st_size > SOTA_SIDESTEP_LARGE_FILE_SIZE:
return file
return None
def _find_large_files_single(target: Path) -> list[Path]:
"""single-process implementation of find_large_files"""
files, sim = iter_files(target)
ignore_dirs: list[Path] = []
_files = []
for fsm_os in tqdm(
one_sided(a=(ignore_dirs, sim), bbb=files),
desc="1 pre | finding large files - iod-ttt file matching (2/3)",
leave=False,
total=len(files),
):
if f := _filter_sim_match(fsm_os):
_files.append(f)
large_files = []
for fds_os in tqdm(
one_sided(a=ignore_dirs, bbb=_files),
desc="1 pre | finding large files - dir rematching (3/3)",
leave=False,
total=len(_files),
):
if f := _filter_ign_dirs_and_size(fds_os):
large_files.append(f)
return large_files
def _find_large_files_parallel(target: Path) -> list[Path]:
"""multiprocess implementation of find_large_files"""
files, sim = iter_files(target)
manager = Manager()
ignore_dirs: ListProxy[Path] = manager.list()
_files: list[Path] = [
f
for f in process_map(
_filter_sim_match,
one_sided(a=(ignore_dirs, sim), bbb=files),
desc="1 pre | finding large files - iod-ttt file matching (2/3)",
leave=False,
chunksize=SOTA_SIDESTEP_CHUNK_SIZE,
max_workers=SOTA_SIDESTEP_MAX_WORKERS,
total=len(files),
)
if f is not None
]
return [
f
for f in process_map(
_filter_ign_dirs_and_size,
one_sided(a=ignore_dirs, bbb=_files),
desc="1 pre | finding large files - dir rematching (3/3)",
leave=False,
chunksize=SOTA_SIDESTEP_CHUNK_SIZE,
max_workers=SOTA_SIDESTEP_MAX_WORKERS,
total=len(files),
)
if f is not None
]
def find_large_files(target: Path) -> list[Path]:
"""
finds all files larger than a certain size in a directory;
uses SOTA_SIDESTEP_LARGE_FILE_SIZE as the size threshold
args:
target_dir: Path
the directory to search in
returns: list[Path]
list of large files
"""
if _parallel():
return _find_large_files_parallel(target)
else:
return _find_large_files_single(target)
def write_sotaignore(large_files: list[Path]) -> bool:
"""
writes out a .sotaignore file with a list of large files,
updating an existing one if already present
args:
large_files: list[Path]
list of large files
returns: bool
True if anything was written, False otherwise (no changes)
"""
if not large_files:
return False
old_sotaignore = (
REPO_SOTAIGNORE.read_text().strip().splitlines()
if REPO_SOTAIGNORE.exists()
else []
)
new_sotaignore = [ln for ln in old_sotaignore] + [
lf.relative_to(REPO_DIR).as_posix()
for lf in large_files
if lf.relative_to(REPO_DIR).as_posix() not in old_sotaignore
]
if new_sotaignore == old_sotaignore:
return False
# check if the sotaignore file starts with a comment
if new_sotaignore and not new_sotaignore[0].startswith("#"):
for line in [
"# .sotaignore file generated by sota staircase ReStepper/SideStepper",
"# anything here either can't or shouldn't be uploaded github",
"# unless you know what you're doing, don't edit this file! >:(",
][::-1]:
new_sotaignore.insert(0, line)
REPO_SOTAIGNORE.touch(exist_ok=True)
REPO_SOTAIGNORE.write_text("\n".join(new_sotaignore) + "\n")
return True
def main() -> None:
"""command-line entry function"""
print(
"\nsota staircase SideStepper",
f" repo root : {REPO_DIR.relative_to(Path.cwd())}",
(
f" .sotaignore : {REPO_SOTAIGNORE.relative_to(Path.cwd())} "
f"({'exists' if REPO_SOTAIGNORE.exists() else 'does not exist'})"
),
f" parallel? : {'yes' if _parallel() else 'no'}\n",
sep="\n",
file=stderr,
)
cumulative_start_time = time()
print(f"1/2{INDENT}finding large files... ", end="", file=stderr)
start_time = time()
large_files = find_large_files(REPO_DIR)
end_time = time()
print(
f"1/2{INDENT}finding large files... "
f"done in {end_time - start_time:.2f}"
f"(found {len(large_files)})",
file=stderr,
)
print(f"2/2{INDENT}writing .sotaignore file... ", end="", file=stderr)
start_time = time()
was_written = write_sotaignore(large_files)
end_time = time()
print(
("done" if was_written else "skipped") + f" in {end_time - start_time:.2f}\n",
file=stderr,
)
for file in large_files:
print(file.relative_to(REPO_DIR))
cumulative_end_time = time()
time_taken = cumulative_end_time - cumulative_start_time
time_taken_string: str
if time_taken > 60:
time_taken_string = f"{int(time_taken // 60)}{int(time_taken % 60)}"
else:
time_taken_string = f"{time_taken:.2f}"
print(
f"\n--- done! took {time_taken_string}~ " "☆*: .。. o(≧▽≦)o .。.:*☆ ---",
flush=True,
file=stderr,
)
if __name__ == "__main__":
main()

514
sync.py
View file

@ -1,79 +1,132 @@
# sota staircase ReStepper
# forge -> github one-way repo sync script
# licence: 0BSD
from os.path import getsize
from multiprocessing.pool import ThreadPool
from pathlib import Path
from pprint import pformat
from shutil import copytree
from subprocess import CompletedProcess, run
from sys import argv, stderr
from shutil import copy2, copytree
from subprocess import CompletedProcess
from subprocess import run as _run
from sys import argv, executable
from tempfile import TemporaryDirectory
from textwrap import indent
from time import time
from traceback import format_tb
from typing import Any, Callable, Final, TypeVar
from typing import Callable, Final, TypeVar
try:
from gitignore_parser import parse_gitignore # type: ignore
except ImportError:
print(
"critical error: 'gitignore_parser' is not installed, please run 'pip install gitignore-parser' to install it"
from sidestepper import (
SOTA_SIDESTEP_MAX_WORKERS,
find_large_files,
generate_command_failure_message,
run,
write_sotaignore,
)
exit(1)
except EnvironmentError:
# specific error raised when third-party modules not found, but were automatically
# installed, so we need to restart the script
exit(_run([executable, Path(__file__).absolute(), *argv[1:]]).returncode)
# we can only guarantee third-party modules are installed after sidestepper
from tqdm import tqdm
# constants
INDENT: Final[str] = " "
REPO_DIR: Final[Path] = Path(__file__).parent
REPO_SOTAIGNORE: Final[Path] = REPO_DIR.joinpath(".sotaignore")
REPO_URL_GITHUB: Final[str] = "github.com/markjoshwel/sota"
REPO_URL_FORGE: Final[str] = "forge.joshwel.co/mark/sota"
COMMIT_MESSAGE: Final[str] = "chore(restep): sync with forge"
COMMIT_AUTHOR: Final[str] = "sota staircase ReStepper <ssrestepper@joshwel.co>"
NEUTERED_GITATTRIBUTES: Final[str] = (
"""# auto detect text files and perform lf normalization\n* text=auto\n"""
)
# generics because i <3 static types
Rc = TypeVar("Rc")
# dictionary to share state across steps
r: dict[str, str] = {}
R = TypeVar("R")
def _default_post_func(rc: Rc) -> Rc:
class CopyHighway:
"""
default post-call function for steps, does nothing
multithreaded file copying class that gives a copy2-like function
for use with shutil.copytree(); also displays a progress bar
"""
def __init__(self, message: str, total: int):
"""
multithreaded file copying class that gives a copy2-like function
for use with shutil.copytree()
args:
message: str
message to display in the progress bar
total: int
total number of files to copy
"""
self.pool = ThreadPool(
processes=SOTA_SIDESTEP_MAX_WORKERS,
)
self.pbar = tqdm(
total=total,
desc=message,
unit=" files",
leave=False,
)
def callback(self, a: R):
self.pbar.update()
return a
def copy2(self, source: str, dest: str):
"""shutil.copy2()-like function for use with shutil.copytree()"""
self.pool.apply_async(copy2, args=(source, dest), callback=self.callback)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.close()
self.pool.join()
self.pbar.close()
def _default_post_func(cp: R) -> R:
"""
default post-call function for steps; does nothing
for steps that return a CompletedProcess, this function will run the
`_command_post_func` function
args:
rc: Rc
cp: R
return object from a step function
returns: R
the return object from the step function
"""
if isinstance(rc, CompletedProcess):
_command_post_func(rc)
return rc
if isinstance(cp, CompletedProcess):
_command_post_func(cp)
return cp
def _command_post_func(
rc: CompletedProcess,
cp: CompletedProcess,
fail_on_error: bool = True,
quit_early: bool = False,
quit_message: str = "the command gave unexpected output",
) -> CompletedProcess:
"""
default post-call function for command steps, checks if the command was
default post-call function for command steps; checks if the command was
successful and prints the output if it wasn't
if the command was successful, the stdout and stderr are stored in the
shared state dictionary r under 'stdout' and 'stderr' respectively
args:
rc: CompletedProcess
return object from subprocess.run
cp: CompletedProcess
return object from subprocess.run()
fail_on_error: bool
whether to fail on error
quit_early: bool
@ -81,169 +134,87 @@ def _command_post_func(
quit_message: str
the message to print if quitting early
returns:
CompletedProcess
the return object from subprocess.run
returns: CompletedProcess
the return object from subprocess.run()
"""
if quit_early:
print(f"\n\nfailure: {quit_message}\n", file=stderr)
print(f"\n\nfailure: {quit_message}\n")
else:
r["stdout"] = rc.stdout.decode() if isinstance(rc.stdout, bytes) else "\0"
r["stderr"] = rc.stderr.decode() if isinstance(rc.stderr, bytes) else "\0"
r["stdout"] = cp.stdout.decode() if isinstance(cp.stdout, bytes) else "\0"
r["stderr"] = cp.stderr.decode() if isinstance(cp.stderr, bytes) else "\0"
r["blank/stdout"] = "yes" if (r["stdout"].strip() == "") else ""
r["blank/stderr"] = "yes" if (r["stderr"].strip() == "") else ""
r["blank"] = "yes" if (r["blank/stdout"] and r["blank/stderr"]) else ""
r["errored"] = "" if (rc.returncode == 0) else str(rc.returncode)
r["errored"] = "" if (cp.returncode == 0) else str(cp.returncode)
# return if the command was successful
# or if we're not failing on error
if (rc.returncode == 0) or (not fail_on_error):
return rc
if (cp.returncode == 0) or (not fail_on_error):
return cp
else:
print(
f"\n\nfailure: command '{rc.args}' failed with exit code {rc.returncode}",
f"{INDENT}stdout:",
(
indent(text=rc.stdout.decode(), prefix=f"{INDENT}{INDENT}")
if (isinstance(rc.stdout, bytes) and (rc.stdout != b""))
else f"{INDENT}{INDENT}(no output)"
),
f"{INDENT}stderr:",
(
indent(text=rc.stderr.decode(), prefix=f"{INDENT}{INDENT}")
if (isinstance(rc.stderr, bytes) and (rc.stderr != b""))
else f"{INDENT}{INDENT}(no output)"
)
+ "\n",
sep="\n",
)
print(generate_command_failure_message(cp))
exit(
rc.returncode if (isinstance(rc.returncode, int) and rc.returncode != 0) else 1
cp.returncode if (isinstance(cp.returncode, int) and cp.returncode != 0) else 1
)
def get_large_files(target_dir: Path, max_bytes: int = 100000000) -> list[Path]:
def post_filter_repo_check(cp: CompletedProcess) -> CompletedProcess:
"""
recursively iterate through a directory and find files that are over a
certain size, respecting any .gitignore files
args:
target_dir: Path
the directory to search
max_bytes: int
the maximum size in bytes
returns:
list[Path]
list of large files
post-call function for checking if git-filter-repo is installed
and optionally installing it if it isn't
"""
gitignore_matchers: dict[Path, Callable[[Any], bool]] = {}
large_files: list[Path] = []
all_files: list[Path] = []
for f in target_dir.rglob("*"):
if not f.is_file():
continue
if str(REPO_DIR.joinpath(".git")) in str(f.parent):
continue
all_files.append(f)
if cp.returncode == 0:
return cp
target_dir_gitignore = target_dir.joinpath(".gitignore")
if not target_dir_gitignore.exists():
return []
# first pass: check for .gitignore files
for repo_file in all_files:
# is this not a .gitignore file? skip
if repo_file.name != ".gitignore":
continue
# if we're here, the file is a .gitignore file
# add it to the parser
gitignore_matchers[repo_file.parent] = parse_gitignore(
repo_file, base_dir=repo_file.parent
if input("git filter-repo is not installed, install it? y/n: ").lower() != "y":
print(
"install it using 'pip install git-filter-repo' "
"or 'pipx install git-filter-repo'",
)
return cp
for repo_file in all_files:
# if the file is a directory, skip
# if not repo_file.is_file():
# continue
# check if pipx is installed
use_pipx = False
# # if we're in the .git directory, skip
# if str(REPO_DIR.joinpath(".git/")) in str(repo_file):
# continue
check_pipx_cp = run(["pipx", "--version"])
if check_pipx_cp.returncode == 0:
use_pipx = True
else:
run([executable, "-m", "pip", "install", "pipx"])
# check if it's ignored
for ignore_dir, matcher in gitignore_matchers.items():
# if we're not in the ignore directory, skip
if str(ignore_dir) not in str(repo_file):
continue
# double check
check_pipx_cp = run(["pipx", "--version"])
if check_pipx_cp.returncode == 0:
use_pipx = True
# if pipx still can't be found, might be some environment fuckery
# if the file is ignored, skip
if matcher(repo_file):
# print("ignored:", repo_file)
continue
# if we're here, the file is not ignored
# check if it's over 100mb
if getsize(repo_file) > 100000000:
large_files.append(repo_file)
return large_files
def generate_sotaignore(large_files: list[Path]) -> None:
"""
generate a .sotaignore file from a list of large files and the existing
.sotaignore file
args:
large_files: list[Path]
list of large files
"""
old_sotaignore = (
REPO_SOTAIGNORE.read_text().strip().splitlines()
if REPO_SOTAIGNORE.exists()
else []
# install git-filter-repo
pip_invocation: list[str] = ["pipx"] if use_pipx else [executable, "-m", "pip"]
print(
f"running '{' '.join([*pip_invocation, "install", "git-filter-repo"])}'... ",
end="",
)
install_rc = run([*pip_invocation, "install", "git-filter-repo"])
if install_rc.returncode != 0:
print("error")
_command_post_func(install_rc)
else:
print("done\n")
new_sotaignore = [ln for ln in old_sotaignore] + [
lf.relative_to(REPO_DIR).as_posix()
for lf in large_files
if lf.relative_to(REPO_DIR).as_posix() not in old_sotaignore
]
# check if the sotaignore file starts with a comment
if new_sotaignore and not new_sotaignore[0].startswith("#"):
new_sotaignore.insert(
0,
"# unless you know what you're doing, don't edit this file",
)
new_sotaignore.insert(
0,
"# anything here either can't or shouldn't be uploaded github",
)
new_sotaignore.insert(
0,
"#",
)
new_sotaignore.insert(
0,
"# .sotaignore file generated by sota staircase ReStepper",
# check if it is reachable
if run(["git", "filter-repo", "--version"]).returncode != 0:
# revert
run([*pip_invocation, "uninstall", "git-filter-repo"])
print(
"failure: could not install git-filter-repo automatically. "
"do it yourself o(*≧▽≦)ツ┏━┓"
)
if new_sotaignore == []:
return
REPO_SOTAIGNORE.touch(exist_ok=True)
REPO_SOTAIGNORE.write_text("\n".join(new_sotaignore) + "\n", encoding="utf-8")
return cp
def rewrite_gitattributes(target_dir: Path) -> None:
@ -260,101 +231,94 @@ def rewrite_gitattributes(target_dir: Path) -> None:
repo_file.write_text(NEUTERED_GITATTRIBUTES, encoding="utf-8")
# helper function for running steps
def step(
func: Callable[[], Rc],
func: Callable[[], R],
desc: str = "",
post_func: Callable[[Rc], Rc] = _default_post_func,
) -> Rc:
post_func: Callable[[R], R] = _default_post_func,
post_print: bool = True,
) -> R:
"""
helper function for running steps
args:
desc: str
description of the step
func: Callable[[], Rc]
func: Callable[[], R]
function to run
post_func: Callable[[Rc], Rc]
post function to run after func
post_func: Callable[[R], R]
post-function to run after func
post_print: bool
whether to print done after the step
returns:
Rc
R
return object from func
"""
# run the function
if desc != "":
print(f"{desc}..", end="", file=stderr)
stderr.flush()
print(f"{desc}..", end="", flush=True)
start_time = time()
try:
rc = func()
cp = func()
except Exception as exc:
print(
f"\n\nfailure running step: {exc} ({exc.__class__.__name__})",
"\n".join(format_tb(exc.__traceback__)) + "\n",
file=stderr,
sep="\n",
)
exit(1)
if desc != "":
print(".", end="", file=stderr)
stderr.flush()
print(".", end="", flush=True)
# run the post function
# run the post-function
try:
rp = post_func(rc)
rp = post_func(cp)
except Exception as exc:
print(
f"\n\nfailure running post-step: {exc} ({exc.__class__.__name__})",
"\n".join(format_tb(exc.__traceback__)) + "\n",
file=stderr,
sep="\n",
)
exit(1)
end_time = time()
# yay
if desc != "":
print(" done", file=stderr)
stderr.flush()
if desc != "" and post_print:
print(f" done in {end_time - start_time:.2f}", flush=True)
return rp
def post_remote_v(rc: CompletedProcess) -> CompletedProcess:
def post_remote_v(cp: CompletedProcess) -> CompletedProcess:
"""
post-call function for 'git remote -v' command, parses the output and
checks for the forge and github remotes, storing them in the shared state
under 'remote/forge', 'remote/forge/url', 'remote/github', and
'remote/github/url' respectively
args:
rc: CompletedProcess
return object from subprocess.run
returns:
CompletedProcess
return object from subprocess.run
"""
if not isinstance(rc.stdout, bytes):
return _command_post_func(rc)
if not isinstance(cp.stdout, bytes):
return _command_post_func(cp)
for line in rc.stdout.decode().split("\n"):
for line in cp.stdout.decode().split("\n"):
# github https://github.com/markjoshwel/sota (fetch)
# github https://github.com/markjoshwel/sota (push)
# origin https://forge.joshwel.co/mark/sota.git (fetch)
# origin https://forge.joshwel.co/mark/sota.git (push)
sline = line.split(maxsplit=1)
split_line = line.split(maxsplit=1)
if len(line) < 2:
continue
# remote='origin' url='https://forge.joshwel.co/mark/sota.git (fetch)'
remote, url = sline
remote, url = split_line
# clean up the url
if (REPO_URL_FORGE in url) or (REPO_URL_GITHUB in url):
@ -369,7 +333,7 @@ def post_remote_v(rc: CompletedProcess) -> CompletedProcess:
r["remote/github"] = remote
r["remote/github/url"] = url
return _command_post_func(rc)
return _command_post_func(cp)
def err(message: str, exc: Exception | None = None) -> None:
@ -398,7 +362,6 @@ def err(message: str, exc: Exception | None = None) -> None:
)
)
+ (indent(text=pformat(r), prefix=INDENT) + "\n"),
file=stderr,
sep="\n",
)
exit(1)
@ -409,6 +372,7 @@ def main() -> None:
command line entry point
"""
cumulative_start_time = time()
with TemporaryDirectory(delete="--keep" not in argv) as dir_temp:
print(
"\nsota staircase ReStepper\n"
@ -420,53 +384,76 @@ def main() -> None:
# helper partial function for command
def cmd(
command: str, wd: Path | str = dir_temp, **kwargs
command: str,
wd: Path | str = dir_temp,
capture_output: bool = True,
give_input: str | None = None,
) -> Callable[[], CompletedProcess]:
return lambda: run(
command,
shell=True,
cwd=wd,
capture_output=True,
**kwargs,
capture_output=capture_output,
give_input=give_input,
)
step(
func=cmd("git filter-repo --version"),
post_func=lambda rc: _command_post_func(
rc,
quit_early=rc.returncode != 0,
quit_message="git filter-repo is not installed, install it using 'pip install git-filter-repo' or 'pipx install git-filter-repo'",
),
post_func=post_filter_repo_check,
)
step(func=cmd("git status --porcelain", wd=REPO_DIR))
step(cmd("git status --porcelain", wd=REPO_DIR))
if (not r["blank"]) and ("--iknowwhatimdoing" not in argv):
err(
"critical error: repository is not clean, please commit changes first",
)
step(
desc="1 pre\tgenerating .sotaignore",
func=lambda: generate_sotaignore(get_large_files(REPO_DIR)),
)
if "--skipsotaignoregen" not in argv:
(print("1 pre | finding large files", end="", flush=True),)
start_time = time()
large_files = find_large_files(REPO_DIR)
end_time = time()
print(
"1 pre | finding large files... "
f"done in {end_time - start_time:.2f}″ (found {len(large_files)})"
)
step(
desc="2 pre\tduplicating repo",
func=lambda: (
copytree(
src=REPO_DIR,
dst=dir_temp,
dirs_exist_ok=True,
if large_files:
start_time = time()
was_written = step(
desc="2 pre | writing .sotaignore",
func=lambda: write_sotaignore(large_files),
post_func=lambda cp: cp,
post_print=False,
)
),
end_time = time()
if was_written:
print(f" done in {end_time - start_time:.2f}")
else:
print(" not needed")
print("3 pre | duplicating repo... pre-scanning", end="", flush=True)
start_time = time()
with CopyHighway(
"3 pre | duplicating repo", total=len(list(REPO_DIR.rglob("*")))
) as copier:
copytree(
src=REPO_DIR,
dst=dir_temp,
copy_function=copier.copy2,
dirs_exist_ok=True,
)
end_time = time()
print(
f"3 pre | duplicating repo... done in {end_time - start_time:.2f}",
flush=True,
)
step(
func=cmd('python -c "import pathlib; print(pathlib.Path.cwd().absolute())"')
)
step(cmd('python -c "import pathlib; print(pathlib.Path.cwd().absolute())"'))
if str(Path(dir_temp).absolute()) != r["stdout"].strip():
err(
f"critical error (whuh? internal?): not inside the temp dir '{str(Path(dir_temp).absolute())}'"
"critical error (whuh? internal?): "
f"not inside the temp dir '{str(Path(dir_temp).absolute())}'"
)
# check for forge and github remotes
@ -478,31 +465,31 @@ def main() -> None:
err("critical error (whuh?): no forge remote found")
# get the current branch
step(
func=cmd("git branch --show-current"),
)
step(cmd("git branch --show-current"))
branch = r["stdout"].strip()
if r.get("errored", "yes") or branch == "":
err("critical error (whuh?): couldn't get current branch")
step(func=cmd(f"git fetch {r['remote/forge']}"))
step(func=cmd(f"git rev-list HEAD...{r['remote/forge']}/{branch} --count"))
step(cmd(f"git fetch {r['remote/forge']}"))
step(cmd(f"git rev-list HEAD...{r['remote/forge']}/{branch} --count"))
if (r.get("stdout", "").strip() != "0") and ("--dirty" not in argv):
err(
"critical error (whuh?): not up to date with forge... sync your changes first?"
"critical error (whuh?): "
"not up to date with forge... sync your changes first?"
)
step(desc="3 lfs\tfetch lfs objects", func=cmd("git lfs fetch"))
step(desc="4 lfs | fetch lfs objects", func=cmd("git lfs fetch"))
step(
desc="4 lfs\tmigrating lfs objects",
desc="5 lfs | migrating lfs objects",
func=cmd(
'git lfs migrate export --everything --include="*" --remote=origin'
'git lfs migrate export --everything --include="*" --remote=origin',
give_input="y\n",
),
)
step(
desc="5 lfs\tuninstall lfs in repo",
desc="6 lfs | uninstall lfs in repo",
func=cmd("git lfs uninstall"),
)
@ -511,42 +498,50 @@ def main() -> None:
)
if not r["blank"]:
err(
"critical error (whuh? internal?): lfs objects still exist post-migrate and uninstall"
"critical error (whuh? internal?): "
"lfs objects still exist post-migrate and uninstall"
)
temp_sotaignore = Path(dir_temp).joinpath(".sotaignore")
if temp_sotaignore.exists():
if REPO_SOTAIGNORE.exists():
try:
sotaignore = temp_sotaignore.read_text(encoding="utf-8").strip()
sotaignore = REPO_SOTAIGNORE.read_text(encoding="utf-8").strip()
except Exception as exc:
err("critical error: couldn't read .sotaignore file", exc=exc)
sotaignore_large_files: list[str] = [
sotaignored_files: list[str] = [
line
for line in sotaignore.splitlines()
if not line.startswith("#") and line.strip() != ""
]
# FUTURE: if this becomes slow, start chunking --path arguments
# https://stackoverflow.com/questions/43762338/how-to-remove-file-from-git-history
step(
desc=f"7 lfs | filtering {len(sotaignored_files)} file(s)",
func=cmd(
"git filter-repo --force --invert-paths "
+ " ".join(f'--path ""{lf}' "" for lf in sotaignored_files)
),
)
for n, lf in enumerate(sotaignore_large_files, start=1):
step(
desc=f"6 lfs\tfilter ({n}/{len(sotaignore_large_files)}) - {lf}",
func=cmd(f'git filter-repo --force --invert-paths --path "{lf}"'),
)
# also copy to the temp repo; step 5 (lfs migrate) wipes uncommitted changes
copy2(REPO_SOTAIGNORE, Path(dir_temp).joinpath(".sotaignore"))
step(
desc="7 fin\tneuter .gitattributes",
desc="8 fin | neuter .gitattributes",
func=lambda: rewrite_gitattributes(Path(dir_temp)),
)
def add_and_commit() -> CompletedProcess:
cp = cmd("git add *")()
if cp.returncode != 0:
return cp
return cmd(
"git commit --allow-empty "
f'-am "{COMMIT_MESSAGE}" --author="{COMMIT_AUTHOR}"',
)()
step(
desc="8 fin\tcommit",
func=cmd(
f"""git commit -am "{COMMIT_MESSAGE}" --author="{COMMIT_AUTHOR}" --allow-empty""",
),
desc="9 fin | commit",
func=add_and_commit,
)
if r.get("remote/github") is None:
@ -558,7 +553,7 @@ def main() -> None:
r["remote/github"] = "github"
step(
desc=f"9 fin\tpushing to github/{branch}",
desc=f"X fin | pushing to github/{branch}",
func=cmd(
f"git push {r['remote/github']} {branch} --force"
if ("--test" not in argv)
@ -566,12 +561,17 @@ def main() -> None:
),
)
step(
desc="X fin\tcleanup",
func=lambda: None,
)
print("\n--- done! ☆*: .。. o(≧▽≦)o .。.:*☆ ---\n", file=stderr)
cumulative_end_time = time()
time_taken = cumulative_end_time - cumulative_start_time
time_taken_string: str
if time_taken > 60:
time_taken_string = f"{int(time_taken // 60)}{int(time_taken % 60)}"
else:
time_taken_string = f"{time_taken:.2f}"
print(
f"\n--- done! took {time_taken_string}~ " "☆*: .。. o(≧▽≦)o .。.:*☆ ---",
flush=True,
)
if __name__ == "__main__":