| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107 |
- """
- binary_repo.py - 管理二进制产物仓库。
- 对应原 Shell 中的 pull_binary_from_server / commit_binary_to_server。
- """
- import os
- import logging
- import subprocess
- from pathlib import Path
- from typing import List
- logger = logging.getLogger(__name__)
- class BinaryRepoError(Exception):
- """二进制仓库操作异常"""
- pass
- def pull_binaries(repo_dir: str) -> None:
- """
- 拉取二进制仓库到最新状态(相当于 git checkout 后清空工作区)。
- 对应原 Shell 的 pull_binary_from_server。
- Args:
- repo_dir: 本地仓库路径
- """
- path = Path(repo_dir)
- if not path.is_dir():
- logger.warning("Binary repo not found, skipping pull: %s", repo_dir)
- return
- # 切换到仓库目录
- original_cwd = os.getcwd()
- os.chdir(repo_dir)
- try:
- # 检查是否为有效的 git 仓库
- if not (path / '.git').is_dir():
- raise BinaryRepoError(f"Not a git repository: {repo_dir}")
- # 清理所有文件(包括隐藏文件,但保留 .git)
- _clean_worktree()
- # 恢复所有文件到 HEAD 状态
- subprocess.run(['git', 'checkout', '.'], check=True)
- logger.info("Binary repo synced: %s", repo_dir)
- except subprocess.CalledProcessError as e:
- raise BinaryRepoError(f"Failed to pull binary repo: {e}") from e
- finally:
- os.chdir(original_cwd)
- def commit_binaries(repo_dir: str, target_branch: str) -> None:
- """
- 自动提交二进制产物到远端分支。
- 对应原 Shell 的 commit_binary_to_server。
- Args:
- repo_dir: 本地仓库路径
- target_branch: 推送的目标分支
- """
- path = Path(repo_dir)
- if not path.is_dir():
- raise BinaryRepoError(f"Binary repo not found: {repo_dir}")
- original_cwd = os.getcwd()
- os.chdir(repo_dir)
- try:
- if not (path / '.git').is_dir():
- raise BinaryRepoError(f"Not a git repository: {repo_dir}")
- # git add 所有新增/修改文件
- subprocess.run(['git', 'add', '*'], check=True)
- # 尝试提交,如果 nothing to commit 则忽略错误
- try:
- subprocess.run(['git', 'commit', '-m', 'auto commit by server'],
- check=True, capture_output=True, text=True)
- except subprocess.CalledProcessError as e:
- if 'nothing to commit' in e.stderr.lower() or 'nothing added' in e.stderr.lower():
- logger.info("Nothing to commit in %s, skipping push", repo_dir)
- return
- else:
- raise
- # 推送到指定分支
- push_ref = f'HEAD:{target_branch}'
- subprocess.run(['git', 'push', 'origin', push_ref], check=True)
- logger.info("Binary repo pushed to %s: %s", target_branch, repo_dir)
- except subprocess.CalledProcessError as e:
- raise BinaryRepoError(f"Failed to commit/push binary repo: {e}") from e
- finally:
- os.chdir(original_cwd)
- def _clean_worktree() -> None:
- """
- 删除工作区所有文件(包括隐藏文件),但保持 .git 目录存在。
- """
- cwd = Path.cwd()
- for item in cwd.iterdir():
- if item.name == '.git':
- continue
- if item.is_dir():
- import shutil
- shutil.rmtree(item, ignore_errors=True)
- else:
- item.unlink(missing_ok=True)
|