meta: adding sync

This commit is contained in:
Sc0rch-thinks 2025-01-18 02:26:35 +08:00
parent cafdc99db1
commit a01fe1893c
6 changed files with 1389 additions and 0 deletions

39
.github/workflows/sync.yml vendored Normal file
View file

@ -0,0 +1,39 @@
name: "sync from forge.joshwel.co"
on:
workflow_dispatch:
schedule:
- cron: "0 * * * *" # every hour
permissions:
contents: write
jobs:
sync:
runs-on: ubuntu-latest
steps:
- uses: AdityaGarg8/remove-unwanted-software@v4.1
with:
remove-dotnet: 'true'
remove-android: 'true'
remove-haskell: 'true'
remove-codeql: 'true'
remove-docker-images: 'true'
remove-large-packages: 'true'
remove-cached-tools: 'true'
- uses: cachix/install-nix-action@v27
with:
nix_path: nixpkgs=channel:nixos-unstable
github_access_token: ${{ secrets.GITHUB_TOKEN }}
- uses: DeterminateSystems/magic-nix-cache-action@main
- name: clone forge.joshwel.co/Ryan/SSLR
run: |
mkdir -p ${{ runner.temp }}/SSLR
git clone https://forge.joshwel.co/Ryan/SSLR.git ${{ runner.temp }}/SSLR
- name: restep
env:
SS_RESTEPPER_TOKEN: ${{ secrets.PAT }}
run: |
cd ${{ runner.temp }}/SSLR
nix develop --command python sync.py

1
.gitignore vendored Normal file
View file

@ -0,0 +1 @@
__pycache__/

61
flake.lock Normal file
View file

@ -0,0 +1,61 @@
{
"nodes": {
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1710146030,
"narHash": "sha256-SZ5L6eA7HJ/nmkzGG7/ISclqe6oZdOZTNoesiInkXPQ=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "b1d9ab70662946ef0850d488da1c9019f3a9752a",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1722062969,
"narHash": "sha256-QOS0ykELUmPbrrUGmegAUlpmUFznDQeR4q7rFhl8eQg=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "b73c2221a46c13557b1b3be9c2070cc42cf01eb3",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

28
flake.nix Normal file
View file

@ -0,0 +1,28 @@
{
description = "flake for running the sota staircase ReStepper";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
};
outputs = { self, nixpkgs, flake-utils }:
flake-utils.lib.eachDefaultSystem (system:
let
pkgs = nixpkgs.legacyPackages.${system};
in
with pkgs; {
devShells.default = mkShellNoCC {
buildInputs = [
git
git-lfs
git-filter-repo
(python312.withPackages (python-pkgs: [
python-pkgs.tqdm
python-pkgs.gitignore-parser
]))
];
};
}
);
}

616
sidestepper.py Normal file
View file

@ -0,0 +1,616 @@
# sota staircase SideStepper
# a somewhat fast .gitignore-respecting large file finder
# licence: 0BSD
from dataclasses import dataclass
from functools import cache
from multiprocessing import Manager, cpu_count
# noinspection PyProtectedMember
from multiprocessing.managers import ListProxy
from os import getenv
from os.path import abspath
from pathlib import Path
from subprocess import CompletedProcess
from subprocess import run as _run
from sys import argv, executable, stderr
from textwrap import indent
from time import time
from traceback import format_tb
from typing import Final, Generator, Generic, Iterable, Iterator, NamedTuple, TypeVar
# constants
INDENT = " "
REPO_DIR: Final[Path] = Path(__file__).parent
REPO_SOTAIGNORE: Final[Path] = REPO_DIR.joinpath(".sotaignore")
_SOTA_SIDESTEP_CHUNK_SIZE = getenv("SIDESTEP_CHUNK_SIZE")
SOTA_SIDESTEP_CHUNK_SIZE: Final[int] = (
int(_SOTA_SIDESTEP_CHUNK_SIZE)
if (
(_SOTA_SIDESTEP_CHUNK_SIZE is not None)
and (_SOTA_SIDESTEP_CHUNK_SIZE.isdigit())
)
else 16
)
_SOTA_SIDESTEP_MAX_WORKERS = getenv("SIDESTEP_MAX_WORKERS")
SOTA_SIDESTEP_MAX_WORKERS: Final[int] = (
int(_SOTA_SIDESTEP_MAX_WORKERS)
if (
(_SOTA_SIDESTEP_MAX_WORKERS is not None)
and (_SOTA_SIDESTEP_MAX_WORKERS.isdigit())
)
else cpu_count()
)
SOTA_SIDESTEP_LARGE_FILE_SIZE: Final[int] = 100000000 # 100mb
SOTA_SIDESTEP_PARALLEL: Final[bool] = getenv("SIDESTEP_PARALLEL") is not None
# define these before importing third-party modules because we use them in the import check
def generate_command_failure_message(cp: CompletedProcess) -> str:
return "\n".join(
[
f"\n\nfailure: command '{cp.args}' failed with exit code {cp.returncode}",
f"{INDENT}stdout:",
(
indent(text=cp.stdout.decode(), prefix=f"{INDENT}{INDENT}")
if (isinstance(cp.stdout, bytes) and (cp.stdout != b""))
else f"{INDENT}{INDENT}(no output)"
),
f"{INDENT}stderr:",
(
indent(text=cp.stderr.decode(), prefix=f"{INDENT}{INDENT}")
if (isinstance(cp.stderr, bytes) and (cp.stderr != b""))
else f"{INDENT}{INDENT}(no output)"
)
+ "\n",
]
)
def run(
command: str | list,
cwd: Path | str | None = None,
capture_output: bool = True,
give_input: str | None = None,
) -> CompletedProcess:
"""
exception-safe-ish wrapper around subprocess.run()
args:
command: str | list
the command to run
cwd: Path | str | None = None
the working directory
capture_output: bool = True
whether to capture the output
returns: CompletedProcess
the return object from subprocess.run()
"""
# noinspection PyBroadException
try:
cp = _run(
command,
shell=False if isinstance(command, list) else True,
cwd=cwd,
capture_output=capture_output,
input=give_input.encode() if give_input else None,
)
except Exception as run_exc:
print(
f"\n\nfailure: command '{command}' failed with exception",
f"{INDENT}{run_exc.__class__.__name__}: {run_exc}",
indent(text="\n".join(format_tb(run_exc.__traceback__)), prefix=INDENT),
sep="\n",
)
exit(-1)
return cp
# attempt to import third-party modules
# if they're not installed, prompt the user to optionally install them automatically
_could_not_import: list[str] = []
_could_not_import_exc: Exception | None = None
try:
from gitignore_parser import IgnoreRule, rule_from_pattern # type: ignore
except ImportError as _import_exc:
_could_not_import.append("gitignore_parser")
_could_not_import_exc = _import_exc
try:
# noinspection PyUnresolvedReferences
from tqdm import tqdm
# noinspection PyUnresolvedReferences
from tqdm.contrib.concurrent import process_map
except ImportError as _import_exc:
_could_not_import.append("tqdm")
_could_not_import_exc = _import_exc
if _could_not_import:
for module in _could_not_import:
print(
f"critical error: '{module}' is not installed, "
f"please run 'pip install {module}' to install it",
)
# install the missing modules
if input("\ninstall these with pip? y/n: ").lower() == "y":
print("installing...", end="", flush=True)
_cp = run([executable, "-m", "pip", "install", *_could_not_import])
if _cp.returncode != 0:
print(generate_command_failure_message(_cp))
exit(-1)
print(" done", flush=True)
# check if they were installed successfully
_cp = run(
[
executable,
"-c",
";".join([f"import {module}" for module in _could_not_import]),
]
)
if _cp.returncode != 0:
print(generate_command_failure_message(_cp))
print(
"critical error: post-install check failed. reverting installation...",
end="",
flush=True,
)
_cp = run([executable, "-m", "pip", "uninstall", *_could_not_import, "-y"])
if _cp.returncode != 0:
print(generate_command_failure_message(_cp))
print(" done", flush=True)
exit(-1)
elif __name__ == "__main__":
# rerun the script if we're running as one
exit(
run(
[executable, Path(__file__).absolute(), *argv[1:]], capture_output=False
).returncode
)
else:
# we're being imported, raise an error
raise EnvironmentError(
"automatic dependency installation successful"
) from _could_not_import_exc
A = TypeVar("A")
B = TypeVar("B")
class OneSided(Generic[A, B], NamedTuple):
"""
generic tuple with two elements, a and b, given by a generator
in which element 'a' is a constant and b is from an iterable/iterator
"""
a: A
b: B
def one_sided(a: A, bbb: Iterable[B]) -> Iterator[OneSided[A, B]]:
"""
generator that yields OneSided instances with a constant 'a' element
and elements from the given iterable/iterator 'bbb' as the 'b' element
"""
for b in bbb:
yield OneSided(a, b)
def generate_time_elapsed_string(time_taken: float) -> str:
"""generates a human-readable time-elapsed string from a time taken float"""
hours = int(time_taken // 3600)
minutes = int(time_taken % 3600 // 60)
seconds = int(time_taken % 60)
time_taken_string: str
if time_taken > 3600:
time_taken_string = f"{hours}h {minutes} {seconds}"
elif time_taken > 60:
time_taken_string = f"{minutes} {seconds}"
else:
time_taken_string = f"{time_taken:.2f}"
return time_taken_string
@dataclass(eq=True, frozen=True)
class SideStepIgnoreMatcher:
"""immutable gitignore matcher"""
root: Path
# (
# (.gitignore file directory path, (ignore rule, ...)),
# (.gitignore file directory path, (ignore rule, ...)),
# ...
# )
rules: tuple[tuple[Path, tuple[IgnoreRule, ...]], ...] = tuple()
def add_gitignore(self, gitignore: Path) -> "SideStepIgnoreMatcher":
"""returns a new SidestepIgnoreMatcher with rules from the given gitignore file"""
new_ruleset: list[IgnoreRule] = []
for line_no, line_text in enumerate(gitignore.read_text().splitlines()):
rule = rule_from_pattern(
pattern=line_text.rstrip("\n"),
base_path=Path(abspath(gitignore.parent)),
source=(gitignore, line_no),
)
if rule:
new_ruleset.append(rule)
return SideStepIgnoreMatcher(
root=self.root, rules=self.rules + ((gitignore.parent, tuple(new_ruleset)),)
)
def match(self, file: Path | str) -> bool:
"""returns True if the file is ignored by any of the rules in the gitignore files, False otherwise"""
matched = False
# check to see if the gitignore affects the file
for ignore_dir, ruleset in self.rules:
if str(ignore_dir) not in str(file):
continue
if not self._possibly_negated(ruleset):
matched = matched or any(r.match(file) for r in ruleset)
else:
for rule in reversed(ruleset):
if rule.match(file):
matched = matched or not rule.negation
return matched
def match_trytrytry(self, file: Path) -> Path | None:
"""
same as match, but also checks if the gitignore files ignore any parent directories;
horribly slow and dumb, thus the name 'trytrytry'
returns the ignored parent path if the file is ignored, None otherwise
"""
trytrytry: Path = file
while trytrytry != trytrytry.parent:
if self.match(trytrytry):
return trytrytry
if len(self.root.parts) == len(trytrytry.parts):
return None
trytrytry = trytrytry.parent
return None
@cache
def _possibly_negated(self, ruleset: tuple[IgnoreRule, ...]) -> bool:
return any(rule.negation for rule in ruleset)
@dataclass(eq=True, frozen=True)
class LargeFileFilterResult:
"""
result data structure of the large file filter
files: tuple[Path, ...]
large files found
matcher: SideStepIgnoreMatcher
the *ignore matcher instance
ignore_directories: tuple[Path, ...]
directories that were ignored
"""
files: tuple[Path, ...]
matcher: SideStepIgnoreMatcher
ignore_directories: tuple[Path, ...]
def _parallel() -> bool:
"""
helper function to determine if we should use multiprocessing;
checks the environment variable SIDESTEP_PARALLEL and the command line arguments
returns: bool
"""
if SOTA_SIDESTEP_PARALLEL:
return True
elif "--parallel" in argv:
return True
return False
def _iter_files(
target: Path,
pattern: str = "*",
) -> Generator[Path, None, None]:
"""
generator that yields files in the target directory excluding '.git/**'
args:
target: Path
the directory to search in
pattern: str = "*"
the file pattern to search for
yields: Path
file in the target directory
"""
repo_dir = target.joinpath(".git/")
for target_file in target.rglob(pattern):
if not target_file.is_file():
continue
if repo_dir in target_file.parents:
continue
yield target_file
def iter_files(target_dir: Path) -> tuple[tuple[Path, ...], SideStepIgnoreMatcher]:
"""
get all non-git files and register .gitignore files
args:
target_dir: Path
the directory to search in
returns: tuple[tuple[Path, ...], SideStepIgnoreMatcher]
tuple of all files in the target directory and a SideStepIgnoreMatcher instance
"""
all_files: list[Path] = []
sim = SideStepIgnoreMatcher(root=target_dir)
for file in tqdm(
_iter_files(target_dir),
desc="1 pre | finding large files - scanning (1/3)",
leave=False,
):
all_files.append(file)
if file.name == ".gitignore":
sim = sim.add_gitignore(file)
return tuple(all_files), sim
def _filter_sim_match(
os: OneSided[tuple[list[Path], SideStepIgnoreMatcher], Path],
) -> Path | None:
"""first filter pass function, thread-safe-ish"""
(ignore_dirs, sim), file = os.a, os.b
ignored = False
for ign_dir in ignore_dirs:
if str(ign_dir) in str(file):
ignored = True
break
if (not ignored) and ((ttt := sim.match_trytrytry(file)) is not None):
if ttt.is_dir() and ttt not in ignore_dirs:
ignore_dirs.append(ttt)
return None
return file
def _filter_ign_dirs_and_size(os: OneSided[list[Path], Path]) -> Path | None:
"""second filter pass function, thread-safe-ish"""
ignore_dirs, file = os.a, os.b
for ign_dir in ignore_dirs:
if str(ign_dir) in str(file):
return None
else:
# we're here because the file is not ignored by any of the rules
# (the 'else' clause is only executed if the for loop completes without breaking)
if file.stat().st_size > SOTA_SIDESTEP_LARGE_FILE_SIZE:
return file
return None
def _find_large_files_single(
files: tuple[Path, ...], sim: SideStepIgnoreMatcher
) -> LargeFileFilterResult:
"""single-process implementation of find_large_files"""
ignore_dirs: list[Path] = []
_files = []
for fsm_os in tqdm(
one_sided(a=(ignore_dirs, sim), bbb=files),
desc="1 pre | finding large files - iod-ttt file matching (2/3)",
leave=False,
total=len(files),
):
if f := _filter_sim_match(fsm_os):
_files.append(f)
large_files = []
for fds_os in tqdm(
one_sided(a=ignore_dirs, bbb=_files),
desc="1 pre | finding large files - dir rematching (3/3)",
leave=False,
total=len(_files),
):
f = _filter_ign_dirs_and_size(fds_os)
if f is not None:
large_files.append(f)
return LargeFileFilterResult(
files=tuple(large_files),
matcher=sim,
ignore_directories=tuple(ignore_dirs),
)
def _find_large_files_parallel(
files: tuple[Path, ...], sim: SideStepIgnoreMatcher
) -> LargeFileFilterResult:
"""multiprocess implementation of find_large_files"""
manager = Manager()
ignore_dirs: ListProxy[Path] = manager.list()
_files: list[Path] = [
f
for f in process_map(
_filter_sim_match,
one_sided(a=(ignore_dirs, sim), bbb=files),
desc="1 pre | finding large files - iod-ttt file matching (2/3)",
leave=False,
chunksize=SOTA_SIDESTEP_CHUNK_SIZE,
max_workers=SOTA_SIDESTEP_MAX_WORKERS,
total=len(files),
)
if f is not None
]
large_files: tuple[Path, ...] = tuple(
[
f
for f in process_map(
_filter_ign_dirs_and_size,
one_sided(a=ignore_dirs, bbb=_files),
desc="1 pre | finding large files - dir rematching (3/3)",
leave=False,
chunksize=SOTA_SIDESTEP_CHUNK_SIZE,
max_workers=SOTA_SIDESTEP_MAX_WORKERS,
total=len(files),
)
if f is not None
]
)
return LargeFileFilterResult(
files=large_files,
matcher=sim,
ignore_directories=tuple(ignore_dirs),
)
def find_large_files(
files: tuple[Path, ...], matcher: SideStepIgnoreMatcher
) -> LargeFileFilterResult:
"""
finds all files larger than a certain size in a directory;
uses SOTA_SIDESTEP_LARGE_FILE_SIZE as the size threshold
args:
files: tuple[Path, ...]
list of files to search through
matcher: SideStepIgnoreMatcher
the ignore matcher instance from iter_files()
returns: LargeFileFilterResult
"""
if _parallel():
return _find_large_files_parallel(files, matcher)
else:
return _find_large_files_single(files, matcher)
def write_sotaignore(large_files: tuple[Path, ...]) -> bool:
"""
writes out a .sotaignore file with a list of large files,
updating an existing one if already present
args:
large_files: list[Path]
list of large files
returns: bool
True if anything was written, False otherwise (no changes)
"""
if not large_files:
return False
old_sotaignore = (
REPO_SOTAIGNORE.read_text().strip().splitlines()
if REPO_SOTAIGNORE.exists()
else []
)
new_sotaignore = [ln for ln in old_sotaignore] + [
lf.relative_to(REPO_DIR).as_posix()
for lf in large_files
if lf.relative_to(REPO_DIR).as_posix() not in old_sotaignore
]
if new_sotaignore == old_sotaignore:
return False
# check if the sotaignore file starts with a comment
if new_sotaignore and not new_sotaignore[0].startswith("#"):
for line in [
"# .sotaignore file generated by sota staircase ReStepper/SideStepper",
"# anything here either can't or shouldn't be uploaded github",
"# unless you know what you're doing, don't edit this file! >:(",
][::-1]:
new_sotaignore.insert(0, line)
REPO_SOTAIGNORE.touch(exist_ok=True)
REPO_SOTAIGNORE.write_text("\n".join(new_sotaignore) + "\n")
return True
def main() -> None:
"""command-line entry function"""
print(
"\nsota staircase SideStepper",
f" repo root : {REPO_DIR.relative_to(Path.cwd())}",
(
f" .sotaignore : {REPO_SOTAIGNORE.relative_to(Path.cwd())} "
f"({'exists' if REPO_SOTAIGNORE.exists() else 'does not exist'})"
),
f" parallel? : {'yes' if _parallel() else 'no'}\n",
sep="\n",
file=stderr,
)
cumulative_start_time = time()
print(f"1/3{INDENT}pre-scanning repository... ", end="", file=stderr)
start_time = time()
files, sim = iter_files(REPO_DIR)
end_time = time()
print(
f"1/3{INDENT}pre-scanning repository... "
f"done in {generate_time_elapsed_string(end_time - start_time)} "
f"(found {len(files)})",
file=stderr,
)
print(f"2/3{INDENT}finding large files... ", end="", file=stderr)
start_time = time()
large_files = find_large_files(files, sim).files
end_time = time()
print(
f"2/3{INDENT}finding large files... "
f"done in {generate_time_elapsed_string(end_time - start_time)} "
f"(found {len(large_files)})",
file=stderr,
)
print(f"3/3{INDENT}writing .sotaignore file... ", end="", file=stderr)
start_time = time()
was_written = write_sotaignore(large_files)
end_time = time()
print(
("done" if was_written else "skipped")
+ f" in {generate_time_elapsed_string(end_time - start_time)}\n",
file=stderr,
)
for file in large_files:
print(file.relative_to(REPO_DIR).as_posix())
cumulative_end_time = time()
print(
f"\n--- done! took {generate_time_elapsed_string(cumulative_end_time - cumulative_start_time)}~ "
"☆*: .。. o(≧▽≦)o .。.:*☆ ---",
flush=True,
file=stderr,
)
if __name__ == "__main__":
main()

644
sync.py Normal file
View file

@ -0,0 +1,644 @@
# sota staircase ReStepper
# forge -> github one-way repo sync script
# licence: 0BSD
from multiprocessing.pool import ThreadPool
from os import getenv
from pathlib import Path
from pprint import pformat
from shutil import copy2, copytree
from subprocess import CompletedProcess
from subprocess import run as _run
from sys import argv, executable
from tempfile import TemporaryDirectory
from textwrap import indent
from time import time
from traceback import format_tb
from typing import Callable, Final, TypeVar
try:
from sidestepper import (
SOTA_SIDESTEP_MAX_WORKERS,
LargeFileFilterResult,
find_large_files,
generate_command_failure_message,
generate_time_elapsed_string,
iter_files,
run,
write_sotaignore,
)
except EnvironmentError:
# specific error raised when third-party modules not found, but were automatically
# installed, so we need to restart the script
exit(_run([executable, Path(__file__).absolute(), *argv[1:]]).returncode)
# we can only guarantee third-party modules are installed after sidestepper
from tqdm import tqdm
# constants
INDENT: Final[str] = " "
REPO_DIR: Final[Path] = Path(__file__).parent
REPO_SOTAIGNORE: Final[Path] = REPO_DIR.joinpath(".sotaignore")
REPO_URL_GITHUB: Final[str] = "github.com/Sc0rch-thinks/sslr"
REPO_URL_FORGE: Final[str] = "forge.joshwel.co/Ryan/SSLR"
COMMIT_MESSAGE: Final[str] = "chore(restep): sync with forge"
COMMIT_AUTHOR: Final[str] = "sota staircase ReStepper <ssrestepper@joshwel.co>"
NEUTERED_GITATTRIBUTES: Final[str] = (
"""# auto detect text files and perform lf normalization\n* text=auto\n"""
)
GH_ACT: Final[bool] = getenv("GITHUB_ACTIONS", "").lower() == "true"
GH_TOKEN: Final[str] = getenv("SS_RESTEPPER_TOKEN", "")
if GH_ACT and GH_TOKEN == "":
print(
"critical error: no personal access token found in SS_RESTEP_TOKEN, "
"may not have permission to push to github"
)
exit(1)
# dictionary to share state across steps
r: dict[str, str] = {}
R = TypeVar("R")
class CopyHighway:
"""
multithreaded file copying class that gives a copy2-like function
for use with shutil.copytree(); also displays a progress bar
"""
pool: ThreadPool
pbar: tqdm
lff_result: LargeFileFilterResult | None
respect_ignore: bool = True
def __init__(
self, message: str, total: int, lff_result: LargeFileFilterResult | None
):
"""
multithreaded file copying class that gives a copy2-like function
for use with shutil.copytree()
args:
message: str
message to display in the progress bar
total: int
total number of files to copy
lff_result: LargeFileFilterResult
result of the large file filter
"""
self.pool = ThreadPool(
processes=SOTA_SIDESTEP_MAX_WORKERS,
)
self.pbar = tqdm(
total=total,
desc=message,
unit=" files",
leave=False,
)
self.lff_result = lff_result
self.respect_ignore = False if "--dupethelongway" in argv else True
def callback(self, a: R):
self.pbar.update()
return a
def copy2(self, source: Path | str, dest: Path | str) -> None:
"""shutil.copy2()-like function for use with shutil.copytree()"""
if self.respect_ignore and (self.lff_result is not None):
# ignore check 1: dir
for ign_dir in self.lff_result.ignore_directories:
if str(ign_dir) in str(source):
self.pbar.update()
return None
# ignore check 2: file
# ... we don't need to use the trytrytry method
# ... because we already did that as part of the large file filter,
# ... and as such we checked for it with the first check above
if self.lff_result.matcher.match(source):
self.pbar.update()
return None
self.pool.apply_async(copy2, args=(source, dest), callback=self.callback)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.close()
self.pool.join()
self.pbar.close()
def _default_post_func(cp: R) -> R:
"""
default post-call function for steps; does nothing
for steps that return a CompletedProcess, this function will run the
`_command_post_func` function
args:
cp: R
return object from a step function
returns: R
the return object from the step function
"""
if isinstance(cp, CompletedProcess):
_command_post_func(cp)
return cp
def _command_post_func(
cp: CompletedProcess,
fail_on_error: bool = True,
quit_early: bool = False,
quit_message: str = "the command gave unexpected output",
) -> CompletedProcess:
"""
default post-call function for command steps; checks if the command was
successful and prints the output if it wasn't
if the command was successful, the stdout and stderr are stored in the
shared state dictionary r under 'stdout' and 'stderr' respectively
args:
cp: CompletedProcess
return object from subprocess.run()
fail_on_error: bool
whether to fail on error
quit_early: bool
whether to quit early
quit_message: str
the message to print if quitting early
returns: CompletedProcess
the return object from subprocess.run()
"""
if quit_early:
print(f"\n\nfailure: {quit_message}\n")
else:
r["stdout"] = cp.stdout.decode() if isinstance(cp.stdout, bytes) else "\0"
r["stderr"] = cp.stderr.decode() if isinstance(cp.stderr, bytes) else "\0"
r["blank/stdout"] = "yes" if (r["stdout"].strip() == "") else ""
r["blank/stderr"] = "yes" if (r["stderr"].strip() == "") else ""
r["blank"] = "yes" if (r["blank/stdout"] and r["blank/stderr"]) else ""
r["errored"] = "" if (cp.returncode == 0) else str(cp.returncode)
# return if the command was successful
# or if we're not failing on error
if (cp.returncode == 0) or (not fail_on_error):
return cp
else:
print(generate_command_failure_message(cp))
exit(
cp.returncode if (isinstance(cp.returncode, int) and cp.returncode != 0) else 1
)
def post_filter_repo_check(cp: CompletedProcess) -> CompletedProcess:
"""
post-call function for checking if git-filter-repo is installed
and optionally installing it if it isn't
"""
if cp.returncode == 0:
return cp
if input("git filter-repo is not installed, install it? y/n: ").lower() != "y":
print(
"install it using 'pip install git-filter-repo' "
"or 'pipx install git-filter-repo'",
)
return cp
# check if pipx is installed
use_pipx = False
check_pipx_cp = run(["pipx", "--version"])
if check_pipx_cp.returncode == 0:
use_pipx = True
# install git-filter-repo
pip_invocation: list[str] = ["pipx"] if use_pipx else [executable, "-m", "pip"]
print(
f"running '{' '.join([*pip_invocation, 'install', 'git-filter-repo'])}'... ",
end="",
flush=True,
)
install_rc = run([*pip_invocation, "install", "git-filter-repo"])
if install_rc.returncode != 0:
print("error")
_command_post_func(install_rc)
exit(install_rc.returncode)
else:
print("done\n")
# check if it is reachable
if run(["git", "filter-repo", "--version"]).returncode != 0:
# revert
run([*pip_invocation, "uninstall", "git-filter-repo"])
print(
"failure: could not install git-filter-repo automatically. "
"do it yourself o(*≧▽≦)ツ┏━┓"
)
exit(-1)
return cp
def rewrite_gitattributes(target_dir: Path) -> None:
"""
rewrite the .gitattributes file in a directory to disable git-lfs
args:
target_dir: Path
the directory to search
"""
# recursively search for .gitattributes files
for repo_file in target_dir.rglob(".gitattributes"):
repo_file.write_text(NEUTERED_GITATTRIBUTES, encoding="utf-8")
def step(
func: Callable[[], R],
desc: str = "",
post_func: Callable[[R], R] = _default_post_func,
post_print: bool = True,
) -> R:
"""
helper function for running steps
args:
desc: str
description of the step
func: Callable[[], R]
function to run
post_func: Callable[[R], R]
post-function to run after func
post_print: bool
whether to print done after the step
returns:
R
return object from func
"""
# run the function
if desc != "":
print(f"{desc}..", end="", flush=True)
start_time = time()
try:
cp = func()
except Exception as exc:
print(
f"\n\nfailure running step: {exc} ({exc.__class__.__name__})",
"\n".join(format_tb(exc.__traceback__)) + "\n",
sep="\n",
)
exit(1)
if desc != "":
print(".", end="", flush=True)
# run the post-function
try:
rp = post_func(cp)
except Exception as exc:
print(
f"\n\nfailure running post-step: {exc} ({exc.__class__.__name__})",
"\n".join(format_tb(exc.__traceback__)) + "\n",
sep="\n",
)
exit(1)
end_time = time()
# yay
if desc != "" and post_print:
print(
f" done in {generate_time_elapsed_string(end_time - start_time)}",
flush=True,
)
return rp
def post_remote_v(cp: CompletedProcess) -> CompletedProcess:
"""
post-call function for 'git remote -v' command, parses the output and
checks for the forge and github remotes, storing them in the shared state
under 'remote/forge', 'remote/forge/url', 'remote/github', and
'remote/github/url' respectively
"""
if not isinstance(cp.stdout, bytes):
return _command_post_func(cp)
for line in cp.stdout.decode().split("\n"):
# github https://github.com/markjoshwel/sota (fetch)
# github https://github.com/markjoshwel/sota (push)
# origin https://forge.joshwel.co/mark/sota.git (fetch)
# origin https://forge.joshwel.co/mark/sota.git (push)
split_line = line.split(maxsplit=1)
if len(line) < 2:
continue
# remote='origin' url='https://forge.joshwel.co/mark/sota.git (fetch)'
remote, url = split_line
# clean up the url
if (REPO_URL_FORGE in url) or (REPO_URL_GITHUB in url):
# url='https://forge.joshwel.co/mark/sota.git'
url = url.split("(", maxsplit=1)[0].strip()
if REPO_URL_FORGE in url:
r["remote/forge"] = remote
r["remote/forge/url"] = url
elif REPO_URL_GITHUB in url:
r["remote/github"] = remote
r["remote/github/url"] = url
return _command_post_func(cp)
def err(message: str, exc: Exception | None = None) -> None:
"""
helper function for printing error messages, prints the message and the
shared state dictionary r
args:
message: str
the error message to print
exc: Exception | None
the exception that caused the error, if any
"""
print(
"\n" + message,
(
""
if (exc is None)
else indent(
text=(
f"{exc} ({exc.__class__.__name__})\n"
f"{'\n'.join(format_tb(exc.__traceback__))}\n"
),
prefix=INDENT,
)
)
+ (indent(text=pformat(r), prefix=INDENT) + "\n"),
sep="\n",
)
exit(1)
def main() -> None:
"""
command line entry point
"""
cumulative_start_time = time()
with TemporaryDirectory(delete="--keep" not in argv) as dir_temp:
print(
"\nsota staircase ReStepper\n"
"\n"
"directories\n"
f" real repo : {REPO_DIR}\n"
f" temp repo : {dir_temp}\n",
f" is gh act : {GH_ACT}\n" if GH_ACT else "",
sep="",
)
# helper partial function for command
def cmd(
command: str,
wd: Path | str = dir_temp,
capture_output: bool = True,
give_input: str | None = None,
) -> Callable[[], CompletedProcess]:
return lambda: run(
command,
cwd=wd,
capture_output=capture_output,
give_input=give_input,
)
step(
func=cmd("git filter-repo --version"),
post_func=post_filter_repo_check,
)
step(cmd("git status --porcelain", wd=REPO_DIR))
if (not r["blank"]) and ("--iknowwhatimdoing" not in argv):
err(
"critical error: repository is not clean, please commit changes first",
)
start_time = time()
print("1 pre | finding large files", end="", flush=True)
files, sim = iter_files(REPO_DIR)
flf_filter_result: LargeFileFilterResult | None = None
if "--skipsotaignoregen" not in argv:
flf_filter_result = find_large_files(files, sim)
large_files = flf_filter_result.files
end_time = time()
print(
"1 pre | finding large files... "
f"done in {generate_time_elapsed_string(end_time - start_time)} (found {len(large_files)})"
)
if large_files:
start_time = time()
was_written = step(
desc="2 pre | writing .sotaignore",
func=lambda: write_sotaignore(large_files),
post_func=lambda cp: cp,
post_print=False,
)
end_time = time()
if was_written:
print(
f" done in {generate_time_elapsed_string(end_time - start_time)}"
)
else:
print(" not needed")
else:
end_time = time()
print(
"1 pre | finding large files... "
f"skipped in {generate_time_elapsed_string(end_time - start_time)}"
)
print("3 pre | duplicating repo... pre-scanning", end="", flush=True)
start_time = time()
with CopyHighway(
message="3 pre | duplicating repo",
total=len(list(REPO_DIR.rglob("*"))),
lff_result=flf_filter_result,
) as copier:
copytree(
src=REPO_DIR,
dst=dir_temp,
copy_function=copier.copy2,
dirs_exist_ok=True,
)
end_time = time()
print(
f"3 pre | duplicating repo... done in {generate_time_elapsed_string(end_time - start_time)}",
flush=True,
)
step(cmd('python -c "import pathlib; print(pathlib.Path.cwd().absolute())"'))
if str(Path(dir_temp).absolute()) != r["stdout"].strip():
err(
"critical error (whuh? internal?): "
f"not inside the temp dir '{str(Path(dir_temp).absolute())}'"
)
# check for forge and github remotes
step(
func=cmd("git remote -v"),
post_func=post_remote_v,
)
if "remote/forge" not in r:
err("critical error (whuh?): no forge remote found")
# get the current branch
step(cmd("git branch --show-current"))
branch = r["stdout"].strip()
if r.get("errored", "yes") or branch == "":
err("critical error (whuh?): couldn't get current branch")
step(cmd(f"git fetch {r['remote/forge']}"))
step(cmd(f"git rev-list HEAD...{r['remote/forge']}/{branch} --count"))
if (r.get("stdout", "").strip() != "0") and ("--dirty" not in argv):
err(
"critical error (whuh?): "
"not up to date with forge... sync your changes first?"
)
step(desc="4 lfs | fetch lfs objects", func=cmd("git lfs fetch"))
step(
desc="5 lfs | migrating lfs objects",
func=cmd(
'git lfs migrate export --everything --include="*" --remote=origin',
give_input="y\n",
),
)
step(
desc="6 lfs | uninstall lfs in repo",
func=cmd("git lfs uninstall"),
)
step(
func=cmd("git lfs ls-files"),
)
if not r["blank"]:
err(
"critical error (whuh? internal?): "
"lfs objects still exist post-migrate and uninstall"
)
if REPO_SOTAIGNORE.exists():
try:
sotaignore = REPO_SOTAIGNORE.read_text(encoding="utf-8").strip()
except Exception as exc:
err("critical error: couldn't read .sotaignore file", exc=exc)
sotaignored_files: list[str] = [
line
for line in sotaignore.splitlines()
if not line.startswith("#") and line.strip() != ""
]
step(
desc=f"7 lfs | filter repo and {len(sotaignored_files)} file(s)",
func=cmd(
"git filter-repo --force --strip-blobs-bigger-than 100M --invert-paths "
+ " ".join(f'--path ""{lf}' "" for lf in sotaignored_files)
),
)
# also copy to the temp repo; step 5 (lfs migrate) wipes uncommitted changes
copy2(REPO_SOTAIGNORE, Path(dir_temp).joinpath(".sotaignore"))
def add_and_commit() -> CompletedProcess:
if GH_ACT:
cp = cmd("git config user.name 'github-actions[bot]'")()
if cp.returncode != 0:
return cp
cp = cmd(
"git config user.email 'github-actions[bot]@users.noreply.github.com'"
)()
if cp.returncode != 0:
return cp
cp = cmd("git add -A")()
if cp.returncode != 0:
return cp
return cmd(
"git commit --allow-empty "
f'-am "{COMMIT_MESSAGE}" --author="{COMMIT_AUTHOR}"',
)()
def neuter_and_commit():
rewrite_gitattributes(Path(dir_temp))
add_and_commit()
step(
desc="8 fin | neuter .gitattributes and commit",
func=neuter_and_commit,
)
if r.get("remote/github") is None:
step(
func=cmd(f"git remote add github https://{REPO_URL_GITHUB}.git"),
)
if r.get("errored", "yes"):
err("critical error (whuh?): couldn't add github remote")
r["remote/github"] = "github"
step(
desc=f"9 fin | fetch {r['remote/github']}",
func=cmd(f"git fetch {r['remote/github']}"),
)
push_invocation = (
f"git push {r['remote/github']} {branch} --force"
if not GH_ACT
else f"git push https://markjoshwel:{GH_TOKEN}@{REPO_URL_GITHUB}.git {branch} --force"
)
step(
desc=f"X fin | pushing to {r['remote/github']}/{branch}",
func=cmd(push_invocation if ("--test" not in argv) else "git --version"),
)
cumulative_end_time = time()
print(
f"\n--- done! took {generate_time_elapsed_string(cumulative_end_time - cumulative_start_time)}~ "
"☆*: .。. o(≧▽≦)o .。.:*☆ ---",
flush=True,
)
if __name__ == "__main__":
main()