drop agit

This commit is contained in:
Jörg Thalheim
2025-08-20 16:37:55 +02:00
parent 76eb3c13e9
commit df32da304f
5 changed files with 0 additions and 668 deletions

View File

@@ -33,7 +33,6 @@
self'.packages.tea-create-pr
self'.packages.merge-after-ci
self'.packages.pending-reviews
self'.packages.agit
# treefmt with config defined in ./flake-parts/formatting.nix
config.treefmt.build.wrapper
];

View File

@@ -1,59 +0,0 @@
# agit
A helper script for the AGit workflow with a gitea instance.
<!-- `$ agit --help` -->
```
usage: agit [-h] {create,c,list,l} ...
AGit utility for creating and pulling PRs
positional arguments:
{create,c,list,l} Commands
create (c) Create an AGit PR
list (l) List open AGit pull requests
options:
-h, --help show this help message and exit
The defaults that are assumed are:
TARGET_REMOTE_REPOSITORY = origin
DEFAULT_TARGET_BRANCH = main
Examples:
$ agit create
Opens editor to compose PR title and description (first line is title, rest is body)
$ agit create --auto
Creates PR using latest commit message automatically
$ agit create --topic "my-feature"
Set a custom topic.
$ agit create --force
Force push to a certain topic
$ agit list
Lists all open pull requests for the current repository
```
References:
- https://docs.gitea.com/usage/agit
- https://git-repo.info/en/2020/03/agit-flow-and-git-repo/
## How to fetch AGit PR's
For a hypothetical PR with the number #4077:
```
git fetch origin pull/4077/head:your-favorite-name
```
Replace `your-favorite-name` with your preferred branch name.
You can push back to the PR with with:
```
agit create --topic="The topic of the open PR"
```

View File

@@ -1,580 +0,0 @@
import argparse
import contextlib
import json
import os
import subprocess
import sys
import tempfile
import urllib.error
import urllib.request
from pathlib import Path
# push origin HEAD:refs/for/main
# HEAD: The target branch
# origin: The target repository (not a fork!)
# HEAD: The local branch containing the changes you are proposing
TARGET_REMOTE_REPOSITORY = "origin"
DEFAULT_TARGET_BRANCH = "main"
def get_gitea_api_url(remote: str = "origin") -> str:
"""Parse the gitea api url, this parser is fairly naive, but should work for most setups"""
exit_code, remote_url, error = run_git_command(["git", "remote", "get-url", remote])
if exit_code != 0:
print(f"Error getting remote URL for '{remote}': {error}")
sys.exit(1)
# Parse different remote URL formats
# SSH formats: git@git.clan.lol:clan/clan-core.git or gitea@git.clan.lol:clan/clan-core.git
# HTTPS format: https://git.clan.lol/clan/clan-core.git
if (
"@" in remote_url
and ":" in remote_url
and not remote_url.startswith("https://")
):
# SSH format: [user]@git.clan.lol:clan/clan-core.git
host_and_path = remote_url.split("@")[1] # git.clan.lol:clan/clan-core.git
host = host_and_path.split(":")[0] # git.clan.lol
repo_path = host_and_path.split(":")[1] # clan/clan-core.git
repo_path = repo_path.removesuffix(".git") # clan/clan-core
elif remote_url.startswith("https://"):
# HTTPS format: https://git.clan.lol/clan/clan-core.git
url_parts = remote_url.replace("https://", "").split("/")
host = url_parts[0] # git.clan.lol
repo_path = "/".join(url_parts[1:]) # clan/clan-core.git
if repo_path.endswith(".git"):
repo_path = repo_path.removesuffix(".git") # clan/clan-core
else:
print(f"Unsupported remote URL format: {remote_url}")
sys.exit(1)
api_url = f"https://{host}/api/v1/repos/{repo_path}/pulls"
return api_url
def fetch_open_prs(remote: str = "origin") -> list[dict]:
"""Fetch open pull requests from the Gitea API."""
api_url = get_gitea_api_url(remote)
try:
with urllib.request.urlopen(f"{api_url}?state=open") as response:
data = json.loads(response.read().decode())
return data
except urllib.error.URLError as e:
print(f"Error fetching PRs from {api_url}: {e}")
sys.exit(1)
except json.JSONDecodeError as e:
print(f"Error parsing JSON response: {e}")
sys.exit(1)
def get_repo_info_from_api_url(api_url: str) -> tuple[str, str]:
"""Extract repository owner and name from API URL."""
# api_url format: https://git.clan.lol/api/v1/repos/clan/clan-core/pulls
parts = api_url.split("/")
if len(parts) >= 6 and "repos" in parts:
repo_index = parts.index("repos")
if repo_index + 2 < len(parts):
owner = parts[repo_index + 1]
repo_name = parts[repo_index + 2]
return owner, repo_name
msg = f"Invalid API URL format: {api_url}"
raise ValueError(msg)
def fetch_pr_statuses(
repo_owner: str,
repo_name: str,
commit_sha: str,
host: str,
) -> list[dict]:
"""Fetch CI statuses for a specific commit SHA."""
status_url = (
f"https://{host}/api/v1/repos/{repo_owner}/{repo_name}/statuses/{commit_sha}"
)
try:
request = urllib.request.Request(status_url)
with urllib.request.urlopen(request, timeout=3) as response:
data = json.loads(response.read().decode())
return data
except (urllib.error.URLError, json.JSONDecodeError, TimeoutError):
# Fail silently for individual status requests to keep listing fast
return []
def get_latest_status_by_context(statuses: list[dict]) -> dict[str, str]:
"""Group statuses by context and return the latest status for each context."""
context_statuses = {}
for status in statuses:
context = status.get("context", "unknown")
created_at = status.get("created_at", "")
status_state = status.get("status", "unknown")
if (
context not in context_statuses
or created_at > context_statuses[context]["created_at"]
):
context_statuses[context] = {
"status": status_state,
"created_at": created_at,
}
return {context: info["status"] for context, info in context_statuses.items()}
def status_to_emoji(status: str) -> str:
"""Convert status string to emoji."""
status_map = {"success": "", "failure": "", "pending": "🟡", "error": ""}
return status_map.get(status.lower(), "")
def create_osc8_link(url: str, text: str) -> str:
return f"\033]8;;{url}\033\\{text}\033]8;;\033\\"
def format_pr_with_status(pr: dict, remote: str = "origin") -> str:
"""Format PR title with status emojis and OSC8 link."""
title = pr["title"]
pr_url = pr.get("html_url", "")
commit_sha = pr.get("head", {}).get("sha")
if not commit_sha:
if pr_url:
return create_osc8_link(pr_url, title)
return title
try:
api_url = get_gitea_api_url(remote)
repo_owner, repo_name = get_repo_info_from_api_url(api_url)
host = api_url.split("/")[2]
statuses = fetch_pr_statuses(repo_owner, repo_name, commit_sha, host)
if not statuses:
if pr_url:
return create_osc8_link(pr_url, title)
return title
latest_statuses = get_latest_status_by_context(statuses)
emojis = [status_to_emoji(status) for status in latest_statuses.values()]
formatted_title = f"{title} {' '.join(emojis)}" if emojis else title
return create_osc8_link(pr_url, formatted_title) if pr_url else formatted_title
except (ValueError, IndexError):
# If there's any error in processing, just return the title with link if available
if pr_url:
return create_osc8_link(pr_url, title)
return title
def run_git_command(command: list) -> tuple[int, str, str]:
"""Run a git command and return exit code, stdout, and stderr."""
try:
result = subprocess.run(command, capture_output=True, text=True, check=False)
return result.returncode, result.stdout.strip(), result.stderr.strip()
except (OSError, subprocess.SubprocessError) as e:
return 1, "", str(e)
def get_current_branch_name() -> str:
exit_code, branch_name, error = run_git_command(
["git", "rev-parse", "--abbrev-ref", "HEAD"],
)
if exit_code != 0:
print(f"Error getting branch name: {error}")
sys.exit(1)
return branch_name.strip()
def get_latest_commit_info() -> tuple[str, str]:
"""Get the title and body of the latest commit."""
exit_code, commit_msg, error = run_git_command(
["git", "log", "-1", "--pretty=format:%B"],
)
if exit_code != 0:
print(f"Error getting commit info: {error}")
sys.exit(1)
lines = commit_msg.strip().split("\n")
title = lines[0].strip() if lines else ""
body_lines = []
for line in lines[1:]:
if body_lines or line.strip():
body_lines.append(line)
body = "\n".join(body_lines).strip()
return title, body
def get_commits_since_main() -> list[tuple[str, str]]:
"""Get all commits since main as (title, body) tuples."""
exit_code, commit_log, error = run_git_command(
[
"git",
"log",
"main..HEAD",
"--no-merges",
"--pretty=format:%s|%b|---END---",
],
)
if exit_code != 0:
print(f"Error getting commits since main: {error}")
return []
if not commit_log:
return []
commits = []
commit_messages = commit_log.split("---END---")
for commit_msg in commit_messages:
commit_msg = commit_msg.strip()
if not commit_msg:
continue
parts = commit_msg.split("|")
if len(parts) < 2:
continue
title = parts[0].strip()
body = parts[1].strip() if len(parts) > 1 else ""
if not title:
continue
commits.append((title, body))
return commits
def open_editor_for_pr() -> tuple[str, str]:
"""Open editor to get PR title and description. First line is title, rest is description."""
commits_since_main = get_commits_since_main()
with tempfile.NamedTemporaryFile(
mode="w+",
suffix="COMMIT_EDITMSG",
delete=False,
) as temp_file:
temp_file.flush()
temp_file_path = temp_file.name
for title, body in commits_since_main:
temp_file.write(f"{title}\n")
if body:
temp_file.write(f"{body}\n")
temp_file.write("\n")
temp_file.write("\n")
temp_file.write("# Please enter the PR title on the first line.\n")
temp_file.write("# Lines starting with '#' will be ignored.\n")
temp_file.write("# The first line will be used as the PR title.\n")
temp_file.write("# Everything else will be used as the PR description.\n")
temp_file.write(
"# To abort creation of the PR, close editor with an error code.\n",
)
temp_file.write("# In vim for example you can use :cq!\n")
temp_file.write("#\n")
temp_file.write("# All commits since main:\n")
temp_file.write("#\n")
for i, (title, body) in enumerate(commits_since_main, 1):
temp_file.write(f"# Commit {i}:\n")
temp_file.write(f"# {title}\n")
if body:
for line in body.split("\n"):
temp_file.write(f"# {line}\n")
temp_file.write("#\n")
try:
editor = os.environ.get("EDITOR", "vim")
exit_code = subprocess.call([editor, temp_file_path])
if exit_code != 0:
print(f"Editor exited with code {exit_code}.")
print("AGit PR creation has been aborted.")
sys.exit(1)
with Path(temp_file_path).open() as f:
content = f.read()
lines = [
line for line in content.split("\n") if not line.lstrip().startswith("#")
]
cleaned_content = "\n".join(lines).strip()
if not cleaned_content:
print("No content provided, aborting.")
sys.exit(0)
content_lines = cleaned_content.split("\n")
title = content_lines[0].strip()
if not title:
print("No title provided, aborting.")
sys.exit(0)
description_lines = []
for line in content_lines[1:]:
if description_lines or line.strip():
description_lines.append(line)
description = "\n".join(description_lines).strip()
return title, description
finally:
with contextlib.suppress(OSError):
Path(temp_file_path).unlink()
def create_agit_push(
remote: str = "origin",
branch: str = "main",
topic: str | None = None,
title: str | None = None,
description: str | None = None,
force_push: bool = False,
local_branch: str = "HEAD",
) -> None:
if topic is None:
if title is not None:
topic = title
else:
topic = get_current_branch_name()
refspec = f"{local_branch}:refs/for/{branch}"
push_cmd = ["git", "push", remote, refspec]
push_cmd.extend(["-o", f"topic={topic}"])
if title:
push_cmd.extend(["-o", f"title={title}"])
if description:
escaped_desc = description.rstrip("\n").replace('"', '\\"')
push_cmd.extend(["-o", f"description={escaped_desc}"])
if force_push:
push_cmd.extend(["-o", "force-push"])
if description:
print(
f" Description: {description[:50]}..."
if len(description) > 50
else f" Description: {description}",
)
print()
exit_code, stdout, stderr = run_git_command(push_cmd)
if stdout:
print(stdout)
if stderr:
print(stderr, file=sys.stderr)
if exit_code != 0:
print("\nPush failed!")
sys.exit(exit_code)
else:
print("\nPush successful!")
def cmd_create(args: argparse.Namespace) -> None:
"""Handle the create subcommand."""
title = args.title
description = args.description
if not args.auto and (title is None or description is None):
editor_title, editor_description = open_editor_for_pr()
if title is None:
title = editor_title
if description is None:
description = editor_description
create_agit_push(
remote=args.remote,
branch=args.branch,
topic=args.topic,
title=title,
description=description,
force_push=args.force,
local_branch=args.local_branch,
)
def cmd_list(args: argparse.Namespace) -> None:
"""Handle the list subcommand."""
prs = fetch_open_prs(args.remote)
if not prs:
print("No open AGit pull requests found.")
return
# This is the only way I found to query the actual AGit PRs
# Gitea doesn't seem to have an actual api endpoint for them
filtered_prs = [pr for pr in prs if pr.get("head", {}).get("label", "") == ""]
if not filtered_prs:
print("No open AGit pull requests found.")
return
for pr in filtered_prs:
formatted_pr = format_pr_with_status(pr, args.remote)
print(formatted_pr)
def create_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
prog="agit",
description="AGit utility for creating and pulling PRs",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"""
The defaults that are assumed are:
TARGET_REMOTE_REPOSITORY = {TARGET_REMOTE_REPOSITORY}
DEFAULT_TARGET_BRANCH = {DEFAULT_TARGET_BRANCH}
Examples:
$ agit create
Opens editor to compose PR title and description (first line is title, rest is body)
$ agit create --auto
Creates PR using latest commit message automatically
$ agit create --topic "my-feature"
Set a custom topic.
$ agit create --force
Force push to a certain topic
$ agit list
Lists all open pull requests for the current repository
""",
)
subparsers = parser.add_subparsers(dest="subcommand", help="Commands")
create_parser = subparsers.add_parser(
"create",
aliases=["c"],
help="Create an AGit PR",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
$ agit create
Opens editor to compose PR title and description (first line is title, rest is body).
$ agit create --auto
Creates PR using latest commit message automatically (old behavior).
$ agit create --topic "my-feature"
Set a custom topic.
$ agit create --force
Force push to a certain topic
""",
)
list_parser = subparsers.add_parser(
"list",
aliases=["l"],
help="List open AGit pull requests",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=f"""
Examples:
$ agit list
Lists all open AGit PRs for the current repository.
$ agit list --remote upstream
Lists PRs using the 'upstream' remote instead of '{TARGET_REMOTE_REPOSITORY}'.
""",
)
list_parser.add_argument(
"-r",
"--remote",
default=TARGET_REMOTE_REPOSITORY,
help=f"Git remote to use for fetching PRs (default: {TARGET_REMOTE_REPOSITORY})",
)
create_parser.add_argument(
"-r",
"--remote",
default=TARGET_REMOTE_REPOSITORY,
help=f"Git remote to push to (default: {TARGET_REMOTE_REPOSITORY})",
)
create_parser.add_argument(
"-b",
"--branch",
default=DEFAULT_TARGET_BRANCH,
help=f"Target branch for the PR (default: {DEFAULT_TARGET_BRANCH})",
)
create_parser.add_argument(
"-l",
"--local-branch",
default="HEAD",
help="Local branch to push (default: HEAD)",
)
create_parser.add_argument(
"-t",
"--topic",
help="Set PR topic (default: current branch name)",
)
create_parser.add_argument(
"--title",
help="Set the PR title (default: last commit title)",
)
create_parser.add_argument(
"--description",
help="Override the PR description (default: commit body)",
)
create_parser.add_argument(
"-f",
"--force",
action="store_true",
help="Force push the changes",
)
create_parser.add_argument(
"-a",
"--auto",
action="store_true",
help="Skip editor and use commit message automatically",
)
create_parser.set_defaults(func=cmd_create)
list_parser.set_defaults(func=cmd_list)
return parser
def main() -> None:
parser = create_parser()
args = parser.parse_args()
if args.subcommand is None:
parser.print_help()
sys.exit(0)
args.func(args)
if __name__ == "__main__":
main()

View File

@@ -1,27 +0,0 @@
{
bash,
callPackage,
git,
lib,
openssh,
...
}:
let
writers = callPackage ../builders/script-writers.nix { };
in
writers.writePython3Bin "agit" {
flakeIgnore = [
"E501"
"W503" # treefmt reapplies the conditions to trigger this check
];
makeWrapperArgs = [
"--prefix"
"PATH"
":"
(lib.makeBinPath [
bash
git
openssh
])
];
} ./agit.py

View File

@@ -16,7 +16,6 @@
{ config, pkgs, ... }:
{
packages = {
agit = pkgs.callPackage ./agit { };
tea-create-pr = pkgs.callPackage ./tea-create-pr { };
zerotier-members = pkgs.callPackage ./zerotier-members { };
merge-after-ci = pkgs.callPackage ./merge-after-ci { inherit (config.packages) tea-create-pr; };