#!/usr/bin/env python3 """ Docker Registry 镜像清理工具 按 tag 语义/时间排序,每个仓库保留最近 N 个版本(默认 3),其余逻辑删除。 支持可选触发容器内 `registry garbage-collect` 真正回收磁盘空间。 典型用法(在 Registry 宿主机上执行): # 保留最近 3 个 python3 registry-cleanup.py # 指定仓库列表 + GC python3 registry-cleanup.py \\ --registry http://localhost:5000 \\ --keep 3 \\ --repos viewsh-gateway,viewsh-module-iot-server \\ --gc-container registry # 空跑查看计划(不执行删除) python3 registry-cleanup.py --dry-run 退出码:0=成功 / 1=参数错误 / 2=Registry 不可达 / 3=部分仓库处理失败 """ import argparse import json import re import subprocess import sys import urllib.error import urllib.request from typing import List, Optional, Tuple DEFAULT_REGISTRY = "http://localhost:5000" DEFAULT_KEEP = 3 DEFAULT_REPOS = [ "viewsh-gateway", "viewsh-module-infra-server", "viewsh-module-iot-gateway", "viewsh-module-iot-server", "viewsh-module-ops-server", "viewsh-module-system-server", ] # 覆盖常见的 manifest 媒体类型,避免 BuildKit/OCI 推的 tag 取不到 digest MANIFEST_ACCEPT = ", ".join([ "application/vnd.docker.distribution.manifest.v2+json", "application/vnd.docker.distribution.manifest.list.v2+json", "application/vnd.oci.image.manifest.v1+json", "application/vnd.oci.image.index.v1+json", ]) def http_request(method: str, url: str, headers: Optional[dict] = None, timeout: int = 10): req = urllib.request.Request(url, method=method, headers=headers or {}) return urllib.request.urlopen(req, timeout=timeout) def list_tags(registry: str, repo: str) -> List[str]: url = f"{registry}/v2/{repo}/tags/list" try: with http_request("GET", url) as resp: body = resp.read().decode("utf-8") except urllib.error.HTTPError as e: if e.code == 404: return [] raise data = json.loads(body) return [t for t in (data.get("tags") or []) if t != "latest"] def get_manifest_digest(registry: str, repo: str, tag: str) -> Optional[str]: url = f"{registry}/v2/{repo}/manifests/{tag}" try: with http_request("HEAD", url, headers={"Accept": MANIFEST_ACCEPT}) as resp: return resp.headers.get("Docker-Content-Digest") except urllib.error.HTTPError: return None def delete_manifest(registry: str, repo: str, digest: str) -> bool: url = f"{registry}/v2/{repo}/manifests/{digest}" try: with http_request("DELETE", url) as resp: return 200 <= resp.status < 300 except urllib.error.HTTPError as e: # 202 Accepted 会抛 HTTPError,特殊处理 return 200 <= e.code < 300 # tag 排序:优先按"数字/build 号"降序,其次按字典序降序 _BUILD_NUM_RE = re.compile(r"(\d+)") def tag_sort_key(tag: str) -> Tuple[int, str]: """ 返回 (数字, 原始tag) 供降序排序使用。 - 若 tag 包含数字(如 build-123、1.2.3、20260424103000),取最后一个数字段作主排序键 - 否则数字位 = -1,只用字符串兜底 """ nums = _BUILD_NUM_RE.findall(tag) primary = int(nums[-1]) if nums else -1 return (primary, tag) def cleanup_repo( registry: str, repo: str, keep: int, dry_run: bool, ) -> Tuple[int, int, int]: """ 返回 (total, deleted, skipped) """ tags = list_tags(registry, repo) total = len(tags) if total == 0: print(f" └─ 无 tag,跳过") return 0, 0, 0 # 降序:越新的越靠前 tags.sort(key=tag_sort_key, reverse=True) keep_list = tags[:keep] delete_list = tags[keep:] print(f" ├─ 共 {total} 个 tag,保留最新 {len(keep_list)} 个:{keep_list}") if not delete_list: print(f" └─ 无需删除") return total, 0, 0 deleted = 0 skipped = 0 # 去重:多个 tag 可能指向同一 digest,只需要 DELETE 一次 seen_digests = set() for tag in delete_list: digest = get_manifest_digest(registry, repo, tag) if not digest: print(f" │ [SKIP] {tag:30s} (digest 取不到)") skipped += 1 continue if digest in seen_digests: print(f" │ [DEDUP] {tag:30s} {digest[:19]}...") continue seen_digests.add(digest) if dry_run: print(f" │ [DRY] {tag:30s} {digest[:19]}...") deleted += 1 continue ok = delete_manifest(registry, repo, digest) if ok: print(f" │ [DELETE] {tag:30s} {digest[:19]}...") deleted += 1 else: print(f" │ [FAIL] {tag:30s} {digest[:19]}...") skipped += 1 print(f" └─ 已删除 {deleted},跳过 {skipped}") return total, deleted, skipped def run_gc(container: str, dry_run: bool) -> bool: cmd = [ "docker", "exec", container, "registry", "garbage-collect", "--delete-untagged=true", "/etc/docker/registry/config.yml", ] print(f"\n🧹 触发 Registry GC:{' '.join(cmd)}") if dry_run: print(" (--dry-run 已跳过实际执行)") return True try: result = subprocess.run(cmd, capture_output=True, text=True, timeout=300) except subprocess.TimeoutExpired: print(" ❌ GC 超时(>5min)") return False except FileNotFoundError: print(" ❌ 找不到 docker 命令,确认脚本跑在 Registry 宿主机") return False if result.returncode == 0: # GC 输出可能很长,只打 tail tail = "\n".join(result.stdout.splitlines()[-10:]) print(f" ✅ GC 成功(输出末 10 行):\n{tail}") return True print(f" ❌ GC 失败 (rc={result.returncode})") if result.stderr: print(f" stderr: {result.stderr.strip()[:500]}") return False def parse_args(): p = argparse.ArgumentParser(description="Docker Registry 镜像清理") p.add_argument("--registry", default=DEFAULT_REGISTRY, help=f"Registry URL(默认 {DEFAULT_REGISTRY})") p.add_argument("--keep", type=int, default=DEFAULT_KEEP, help=f"每仓库保留版本数(默认 {DEFAULT_KEEP})") p.add_argument("--repos", default=",".join(DEFAULT_REPOS), help="逗号分隔的仓库名列表(默认:内置服务清单)") p.add_argument("--gc-container", default=None, help="Registry 容器名;指定则删除完后触发 garbage-collect") p.add_argument("--dry-run", action="store_true", help="只打印计划,不实际删除") return p.parse_args() def main(): args = parse_args() if args.keep < 1: print("❌ --keep 必须 >= 1", file=sys.stderr) return 1 repos = [r.strip() for r in args.repos.split(",") if r.strip()] if not repos: print("❌ 仓库列表为空", file=sys.stderr) return 1 # 连通性检查 try: with http_request("GET", f"{args.registry}/v2/", timeout=5): pass except (urllib.error.URLError, urllib.error.HTTPError) as e: # /v2/ 返回 401 也算通(部分 Registry 开启认证) if not (isinstance(e, urllib.error.HTTPError) and e.code == 401): print(f"❌ Registry 不可达:{args.registry} — {e}", file=sys.stderr) return 2 print(f"🎯 Registry={args.registry} keep={args.keep} dry_run={args.dry_run}") print(f"📦 仓库:{repos}\n") overall_total = overall_deleted = overall_skipped = 0 failed_repos = [] for repo in repos: print(f"=== {repo} ===") try: t, d, s = cleanup_repo(args.registry, repo, args.keep, args.dry_run) overall_total += t overall_deleted += d overall_skipped += s except Exception as e: print(f" ❌ 处理异常:{e}") failed_repos.append(repo) print(f"\n📊 总计:扫描 {overall_total} / 删除 {overall_deleted} / 跳过 {overall_skipped}") if failed_repos: print(f"⚠️ 失败仓库:{failed_repos}") # GC if args.gc_container and overall_deleted > 0: ok = run_gc(args.gc_container, args.dry_run) if not ok: return 3 elif args.gc_container: print("\n🟡 无 manifest 被删除,跳过 GC") else: print("\n💡 未指定 --gc-container,逻辑删除完成但磁盘尚未释放;") print(" 请在 Registry 宿主机手动执行:") print(f" docker exec registry garbage-collect \\") print(f" --delete-untagged=true /etc/docker/registry/config.yml") return 3 if failed_repos else 0 if __name__ == "__main__": sys.exit(main())