Browse Source

first commit

lihetongxue 1 month ago
commit
015ff13a64
12 changed files with 1527 additions and 0 deletions
  1. 294 0
      artifacts.py
  2. 107 0
      binary_repo.py
  3. 89 0
      build_mcu.py
  4. 116 0
      build_soc.py
  5. 123 0
      config.py
  6. 47 0
      environment.py
  7. 79 0
      ftp_upload.py
  8. 170 0
      gerrit.py
  9. 56 0
      logging_conf.py
  10. 194 0
      pipeline.py
  11. 113 0
      repo_manager.py
  12. 139 0
      utils.py

+ 294 - 0
artifacts.py

@@ -0,0 +1,294 @@
+"""
+artifacts.py - 构建产物管理:日志、快照、QAC、FTP准备、发布基线。
+对应原 Shell 脚本的 Collecting commit log, qac report, ftp upload,
+commit_snapshot_manifest 等部分。
+"""
+from __future__ import annotations          # 允许延迟类型注解求值
+import os
+import shutil
+import subprocess
+import logging
+import tempfile
+import xml.etree.ElementTree as ET
+from pathlib import Path
+from typing import Tuple, List
+
+from gerrit import _run_ssh                  # 直接使用 gerrit 提供的 SSH 工具
+
+logger = logging.getLogger(__name__)
+
+
+class ArtifactError(Exception):
+    """产物处理异常"""
+    pass
+
+
+# ---------------------------------------------------------------------------
+# 1. 提交日志收集
+# ---------------------------------------------------------------------------
+def collect_logs(repo_dir: str, output_csv: str) -> None:
+    """收集近 7 天的提交日志,保存为 CSV 格式"""
+    repo_dir = Path(repo_dir)
+    if not (repo_dir / '.repo').is_dir():
+        logger.warning("No .repo dir in %s, skip collect_logs", repo_dir)
+        return
+
+    script_content = '''#!/bin/sh
+git log --no-merges --after="$(date +"%Y-%m-%d %H:%M:%S" -d "-7 day")" \
+    --date=format:'%Y-%m-%d %H:%M:%S' \
+    --pretty=format:'"'$(basename $(pwd))'","%H","%an","%ae","%ad","%cd","%s"%n'
+'''
+    _run_repo_forall_with_script(repo_dir, script_content, output_csv)
+
+
+def collect_newest_one_commit(repo_dir: str, output_csv: str) -> None:
+    """收集每个项目的最新一次提交,保存为 CSV 格式"""
+    repo_dir = Path(repo_dir)
+    if not (repo_dir / '.repo').is_dir():
+        logger.warning("No .repo dir in %s, skip collect_newest_one_commit", repo_dir)
+        return
+
+    script_content = '''#!/bin/sh
+git log --no-merges -n 1 --date=format:'%Y-%m-%d %H:%M:%S' \
+    --pretty=format:'"'$(basename $(pwd))'","%H","%an","%ae","%ad","%cd","%s"%n'
+'''
+    _run_repo_forall_with_script(repo_dir, script_content, output_csv)
+
+
+def _run_repo_forall_with_script(repo_dir: Path, script: str, output_csv: str) -> None:
+    """创建临时脚本,用 repo forall -c 执行,结果追加到指定 CSV 文件"""
+    with tempfile.NamedTemporaryFile(mode='w', suffix='.sh', delete=False) as f:
+        f.write(script)
+        f.flush()
+        script_path = f.name
+
+    try:
+        cmd = ['repo', 'forall', '-c', f'sh {script_path}']
+        with open(output_csv, 'a') as out:
+            subprocess.run(cmd, cwd=str(repo_dir), stdout=out, check=True)
+    finally:
+        os.unlink(script_path)
+
+
+# ---------------------------------------------------------------------------
+# 2. 审查记录 & 快照
+# ---------------------------------------------------------------------------
+def collect_review_records(repo_dir: str, output_dir: str) -> None:
+    """
+    从 Gerrit 获取已合并的变更历史,并生成当前代码快照。
+    对应原 Shell 的 get_repo_history_from_gerrit + create_snapshot。
+    """
+    repo_dir = Path(repo_dir)
+    repo_name = repo_dir.name
+    output_dir = Path(output_dir)
+    output_dir.mkdir(parents=True, exist_ok=True)
+
+    # 获取所有 project 名称和分支
+    manifest = repo_dir / '.repo' / 'manifests' / 'default.xml'
+    if not manifest.is_file():
+        logger.warning("Manifest not found in %s, skipping review records", repo_dir)
+        return
+
+    projects, branch = _parse_manifest(manifest)
+
+    # Gerrit 连接信息 —— 直接从环境变量读取(与 config.py 保持一致的默认值)
+    host = os.environ.get('GERRIT_HOST', '10.2.90.253')
+    port = int(os.environ.get('GERRIT_PORT', '29418'))
+    user = os.environ.get('GERRIT_NAME', 'jenkins')
+
+    # 生成 Gerrit 历史文件
+    history_file = output_dir / f"{repo_name}_merged_history.txt"
+    with open(history_file, 'w') as f:
+        for project in projects:
+            try:
+                result = _get_history_to_str(host, port, user, project, branch)
+                f.write(result)
+            except Exception as e:
+                logger.warning("Failed to get history for %s: %s", project, e)
+
+    # 生成快照 XML
+    snapshot_xml = output_dir / f"{repo_name}_snapshot.xml"
+    subprocess.run(['repo', 'manifest', '-r', '-o', str(snapshot_xml)],
+                   cwd=str(repo_dir), check=True)
+
+
+def _parse_manifest(manifest_file: Path) -> Tuple[List[str], str]:
+    """从 default.xml 提取所有 project name 和 default revision"""
+    tree = ET.parse(manifest_file)
+    root = tree.getroot()
+    projects = [elem.get('name') for elem in root.findall('project')]
+    branch = root.find('default').get('revision', 'master')
+    return projects, branch
+
+
+def _get_history_to_str(host: str, port: int, user: str, project: str, branch: str) -> str:
+    """获取 Gerrit 历史并返回字符串(内部使用)"""
+    query = f"query --format=TEXT status:merged project:{project} branch:{branch} after:2020-01-01"
+    return _run_ssh(host, port, user, query)
+
+
+# ---------------------------------------------------------------------------
+# 3. QAC 报告收集
+# ---------------------------------------------------------------------------
+def collect_qac_report(mcu_sdk_dir: str, output_dir: str) -> None:
+    """复制 MCU SDK 中的 QAC 报告到输出目录"""
+    qac_source = Path(mcu_sdk_dir) / '..' / 'qac_report' / 'qac_output' / 'soc' / 'sdk_la'
+    if not qac_source.is_dir():
+        logger.info("No QAC report found at %s, skipping", qac_source)
+        return
+
+    output_dir = Path(output_dir)
+    if output_dir.exists():
+        shutil.rmtree(output_dir)
+    output_dir.mkdir(parents=True)
+
+    # 复制所有 html 和 xlsx 文件
+    for root, dirs, files in os.walk(qac_source):
+        for file in files:
+            if any(file.lower().endswith(ext) for ext in ['.html', '.xlsx']):
+                src_path = Path(root) / file
+                rel_path = src_path.relative_to(qac_source)
+                dest_path = output_dir / rel_path
+                dest_path.parent.mkdir(parents=True, exist_ok=True)
+                shutil.copy2(src_path, dest_path)
+    logger.info("QAC report copied to %s", output_dir)
+
+
+# ---------------------------------------------------------------------------
+# 4. FTP 上传准备
+# ---------------------------------------------------------------------------
+def prepare_ftp_upload(cfg, collect_log_file, newest_log_file,
+                       review_records_dir, qac_report_dir) -> None:
+    """
+    将所有构建产物移动到本地 FTP 目录,并生成 env.properties。
+    对应原 Shell 的 prepare_for_upload_ftp()。
+    """
+    # 确定 SOC 部署输出目录
+    soc_dir = Path(cfg.soc_dir)
+    deploy_root = soc_dir / 'out' / 'deploy'
+    if not deploy_root.is_dir():
+        raise ArtifactError("No deploy directory found in SOC output")
+    deploy_dirs = list(deploy_root.iterdir())
+    if not deploy_dirs:
+        raise ArtifactError("No deploy subdirectories found")
+    deploy_dir_name = deploy_dirs[0].name
+
+    image_deploy_local = Path(cfg.deploy_image_local_dir) / cfg.repo_branch / deploy_dir_name
+    image_deploy_local.mkdir(parents=True, exist_ok=True)
+
+    # 移动整个 deploy 目录
+    src = deploy_dirs[0]
+    for item in os.listdir(src):
+        src_item = src / item
+        dst_item = image_deploy_local / item
+        if src_item.is_dir():
+            shutil.move(str(src_item), str(dst_item))
+        else:
+            shutil.move(str(src_item), str(dst_item))
+    logger.info("Deploy files moved to %s", image_deploy_local)
+
+    # 决定是否需要测试提示
+    test_msg = "无需测试"
+    commit_log_filename = "no_commit"
+    if Path(collect_log_file).is_file():
+        with open(collect_log_file, 'r') as f:
+            lines = f.readlines()
+            # 排除 auto commit 行
+            non_auto = [line for line in lines if 'auto commit by server' not in line]
+            if len(non_auto) > 1:  # 至少有一行有效信息(不含表头)
+                test_msg = "需要测试"
+                commit_log_filename = Path(collect_log_file).name
+                shutil.copy2(collect_log_file, image_deploy_local)
+
+    # 复制其他文件
+    for src_file, dst_name in [
+        (newest_log_file, Path(newest_log_file).name) if Path(newest_log_file).is_file() else None,
+        (review_records_dir, Path(review_records_dir).name) if Path(review_records_dir).is_dir() else None,
+        (qac_report_dir, Path(qac_report_dir).name) if Path(qac_report_dir).is_dir() else None,
+    ]:
+        if src_file is None:
+            continue
+        src_path = Path(src_file)
+        dst_path = image_deploy_local / dst_name
+        if src_path.is_dir():
+            shutil.copytree(src_path, dst_path, dirs_exist_ok=True)
+        else:
+            shutil.copy2(src_path, dst_path)
+
+    # 构建时间戳
+    if '[' in deploy_dir_name:
+        image_build_time = deploy_dir_name.split('[')[1].split(']')[0].replace('-', '')
+    else:
+        from datetime import datetime
+        image_build_time = datetime.now().strftime('%Y%m%d%H%M%S')
+    cfg.image_build_time = image_build_time  # 存回 config 备用
+
+    # 生成 env.properties
+    ftp_user = os.environ.get('ftpUser', 'ftpuser')
+    ftp_password = os.environ.get('ftpPasword', '123456')
+    ftp_addr = os.environ.get('ftpAddr', '10.2.90.252')
+    ftp_port = os.environ.get('ftpPort', '21')
+    ftp_root = f"DB-{datetime.now().year}/{cfg.ftp_root}"
+    deploy_rel = f"{cfg.repo_branch}/{deploy_dir_name}"
+    ftp_deploy_url = f"ftp://{ftp_user}:{ftp_password}@{ftp_addr}:{ftp_port}/{ftp_root}/{deploy_rel}"
+    ftp_commit_log_url = f"{ftp_deploy_url}/{commit_log_filename}" if commit_log_filename != "no_commit" else ""
+
+    env_content = f"""store.repo_branch={cfg.repo_branch}
+store.deploy_dir={ftp_deploy_url}
+store.commit_log={ftp_commit_log_url}
+store.test_msg={test_msg}
+"""
+    env_file = image_deploy_local / 'env.properties'
+    with open(env_file, 'w') as f:
+        f.write(env_content)
+    logger.info("env.properties generated at %s", env_file)
+
+
+# ---------------------------------------------------------------------------
+# 5. 发布基线 manifest 提交
+# ---------------------------------------------------------------------------
+def commit_snapshot_manifest(repo_dir: str, version: str) -> None:
+    """生成 release_<version>.xml 并提交到 manifest 仓库的相应分支"""
+    if not version:
+        logger.info("No release version specified, skip commit manifest")
+        return
+
+    repo_dir = Path(repo_dir)
+    manifests_repo = repo_dir / '.repo' / 'manifests'
+    if not (manifests_repo / '.git').is_dir():
+        logger.warning("No manifests git repo in %s, skip commit manifest", repo_dir)
+        return
+
+    release_dir = manifests_repo / 'release'
+    release_dir.mkdir(parents=True, exist_ok=True)
+
+    # 生成快照 XML
+    snapshot_file = release_dir / f'release_{version}.xml'
+    subprocess.run(['repo', 'manifest', '-r', '-o', str(snapshot_file)],
+                   cwd=str(repo_dir), check=True)
+
+    # 提交并推送
+    current_branch = _get_current_branch(manifests_repo)
+    subprocess.run(['git', 'add', f'release/release_{version}.xml'],
+                   cwd=str(manifests_repo), check=True)
+    try:
+        subprocess.run(['git', 'commit', '-m', f'Auto commit by server: add release_{version}.xml'],
+                       cwd=str(manifests_repo), check=True, capture_output=True)
+    except subprocess.CalledProcessError as e:
+        if 'nothing to commit' in e.stderr.decode().lower():
+            logger.info("Nothing to commit, skipping push for release manifest")
+            return
+        raise
+
+    # 推送到当前分支(通常为 REPO_BRANCH 或默认分支)
+    push_branch = os.environ.get('REPO_BRANCH', current_branch)
+    subprocess.run(['git', 'push', 'origin', f'HEAD:{push_branch}'],
+                   cwd=str(manifests_repo), check=True)
+    logger.info("Release manifest %s pushed to %s", snapshot_file.name, push_branch)
+
+
+def _get_current_branch(repo_dir: Path) -> str:
+    """获取仓库当前分支"""
+    result = subprocess.run(['git', 'rev-parse', '--abbrev-ref', 'HEAD'],
+                            cwd=str(repo_dir), capture_output=True, text=True, check=True)
+    return result.stdout.strip()

+ 107 - 0
binary_repo.py

@@ -0,0 +1,107 @@
+"""
+binary_repo.py - 管理二进制产物仓库。
+对应原 Shell 中的 pull_binary_from_server / commit_binary_to_server。
+"""
+import os
+import logging
+import subprocess
+from pathlib import Path
+from typing import List
+
+logger = logging.getLogger(__name__)
+
+
+class BinaryRepoError(Exception):
+    """二进制仓库操作异常"""
+    pass
+
+
+def pull_binaries(repo_dir: str) -> None:
+    """
+    拉取二进制仓库到最新状态(相当于 git checkout 后清空工作区)。
+    对应原 Shell 的 pull_binary_from_server。
+
+    Args:
+        repo_dir: 本地仓库路径
+    """
+    path = Path(repo_dir)
+    if not path.is_dir():
+        logger.warning("Binary repo not found, skipping pull: %s", repo_dir)
+        return
+
+    # 切换到仓库目录
+    original_cwd = os.getcwd()
+    os.chdir(repo_dir)
+    try:
+        # 检查是否为有效的 git 仓库
+        if not (path / '.git').is_dir():
+            raise BinaryRepoError(f"Not a git repository: {repo_dir}")
+
+        # 清理所有文件(包括隐藏文件,但保留 .git)
+        _clean_worktree()
+
+        # 恢复所有文件到 HEAD 状态
+        subprocess.run(['git', 'checkout', '.'], check=True)
+        logger.info("Binary repo synced: %s", repo_dir)
+    except subprocess.CalledProcessError as e:
+        raise BinaryRepoError(f"Failed to pull binary repo: {e}") from e
+    finally:
+        os.chdir(original_cwd)
+
+
+def commit_binaries(repo_dir: str, target_branch: str) -> None:
+    """
+    自动提交二进制产物到远端分支。
+    对应原 Shell 的 commit_binary_to_server。
+
+    Args:
+        repo_dir: 本地仓库路径
+        target_branch: 推送的目标分支
+    """
+    path = Path(repo_dir)
+    if not path.is_dir():
+        raise BinaryRepoError(f"Binary repo not found: {repo_dir}")
+
+    original_cwd = os.getcwd()
+    os.chdir(repo_dir)
+    try:
+        if not (path / '.git').is_dir():
+            raise BinaryRepoError(f"Not a git repository: {repo_dir}")
+
+        # git add 所有新增/修改文件
+        subprocess.run(['git', 'add', '*'], check=True)
+
+        # 尝试提交,如果 nothing to commit 则忽略错误
+        try:
+            subprocess.run(['git', 'commit', '-m', 'auto commit by server'],
+                           check=True, capture_output=True, text=True)
+        except subprocess.CalledProcessError as e:
+            if 'nothing to commit' in e.stderr.lower() or 'nothing added' in e.stderr.lower():
+                logger.info("Nothing to commit in %s, skipping push", repo_dir)
+                return
+            else:
+                raise
+
+        # 推送到指定分支
+        push_ref = f'HEAD:{target_branch}'
+        subprocess.run(['git', 'push', 'origin', push_ref], check=True)
+        logger.info("Binary repo pushed to %s: %s", target_branch, repo_dir)
+    except subprocess.CalledProcessError as e:
+        raise BinaryRepoError(f"Failed to commit/push binary repo: {e}") from e
+    finally:
+        os.chdir(original_cwd)
+
+
+def _clean_worktree() -> None:
+    """
+    删除工作区所有文件(包括隐藏文件),但保持 .git 目录存在。
+    """
+    cwd = Path.cwd()
+    for item in cwd.iterdir():
+        if item.name == '.git':
+            continue
+        if item.is_dir():
+            import shutil
+            shutil.rmtree(item, ignore_errors=True)
+        else:
+            item.unlink(missing_ok=True)

+ 89 - 0
build_mcu.py

@@ -0,0 +1,89 @@
+import os
+import logging
+from pathlib import Path
+from typing import Optional, List
+
+from utils import run_command_with_retry
+
+logger = logging.getLogger(__name__)
+
+
+class BuildMCUError(Exception):
+    """MCU 编译异常"""
+    pass
+
+
+def compile_mcu(mcu_sdk_dir: str, profile_mode: str, soc_project: str):
+    """
+    执行 MCU 编译。
+    对应原 Shell 的 compile_mcu() 函数。
+
+    Args:
+        mcu_sdk_dir: MCU SDK 根目录 (例如 .../mcu/mcu_sdk)
+        profile_mode: 'Release' 或 'Debug'
+        soc_project: SOC 项目名,用于判断是否启用 DV 模式
+    """
+    build_dir = os.path.join(mcu_sdk_dir, 'build')
+    if not os.path.isdir(build_dir):
+        raise BuildMCUError(f"MCU build directory not found: {build_dir}")
+
+    # 清理输出目录
+    output_dir = os.path.join(mcu_sdk_dir, 'output')
+    if os.path.isdir(output_dir):
+        import shutil
+        shutil.rmtree(output_dir)
+        logger.info("Cleaned MCU output directory: %s", output_dir)
+
+    # 构造额外参数,与原 Shell 逻辑一致
+    extra_args = _build_extra_args(profile_mode, soc_project)
+
+    # 完整构建命令
+    cmd_parts = [
+        'python3', './build_autosar.py',
+        'lite', 'evm',
+    ] + extra_args + ['j6b']
+
+    logger.info("Compiling MCU with command: %s", ' '.join(cmd_parts))
+
+    # 执行命令(带 license 重试)
+    try:
+        run_command_with_retry(
+            cmd_parts,
+            cwd=build_dir,
+            retry_count=5,
+            retry_pattern=r'No licenses available for toolchain',
+            log_file=os.path.join(os.environ.get('WORKSPACE', '/tmp'), 'command_log.txt')
+        )
+    except Exception as e:
+        raise BuildMCUError(f"MCU compilation failed: {e}") from e
+
+
+def prepare_mcu(mcu_sdk_dir: str, soc_dir: str):
+    """
+    编译前的准备工作(当前为空函数,与原 Shell 一致)。
+    如需未来扩展配置生成代码等,可在此添加。
+    """
+    # 原脚本 prepare_for_compile_mcu 为空
+    pass
+
+
+def _build_extra_args(profile_mode: str, soc_project: str) -> List[str]:
+    """
+    构建额外参数列表。
+    对应原 Shell 中的 extra_para 变量拼接逻辑:
+    - 若 SOC_PROJECT 包含 'DV',则加 'dv',否则加 'plus'
+    - 若 PROFILE_MODE 为 'Release',则加 'release',否则加 'debug'
+    """
+    extra = []
+
+    if 'DV' in soc_project.upper():
+        extra.append('dv')
+    else:
+        extra.append('plus')
+
+    if profile_mode == 'Release':
+        extra.append('release')
+    else:
+        extra.append('debug')
+
+    return extra

+ 116 - 0
build_soc.py

@@ -0,0 +1,116 @@
+import os
+import shutil
+import logging
+import subprocess
+from pathlib import Path
+from typing import List
+
+logger = logging.getLogger(__name__)
+
+
+class BuildSOCError(Exception):
+    """SOC 编译异常"""
+    pass
+
+
+def prepare_soc(mcu_sdk_dir: str, soc_dir: str, soc_project: str):
+    """
+    将 MCU 编译产物拷贝到 SOC 的 hbbin/mcubin/deploy/ (或 deploy_dv/)。
+    对应原 Shell 的 prepare_for_compile_soc()。
+    """
+    # MCU 输出目录
+    mcu_output = os.path.join(mcu_sdk_dir, 'output', 'dbg')
+    if not os.path.isdir(mcu_output):
+        raise BuildSOCError(f"MCU output directory not found: {mcu_output}")
+
+    # 确定目标目录:deploy 或 deploy_dv
+    if 'DV' in soc_project.upper():
+        deploy_dir = os.path.join(soc_dir, 'hbbin', 'mcubin', 'deploy_dv')
+    else:
+        deploy_dir = os.path.join(soc_dir, 'hbbin', 'mcubin', 'deploy')
+    os.makedirs(deploy_dir, exist_ok=True)
+
+    # 复制 J6_MCU.zip
+    src_zip = os.path.join(mcu_output, 'J6_MCU.zip')
+    if os.path.isfile(src_zip):
+        shutil.copy2(src_zip, deploy_dir)
+        logger.info("Copied %s to %s", src_zip, deploy_dir)
+    else:
+        raise BuildSOCError(f"Missing MCU archive: {src_zip}")
+
+    # 复制 .map 和 .elf 文件
+    for item in os.listdir(mcu_output):
+        if item.endswith('.map') or item.endswith('.elf'):
+            src = os.path.join(mcu_output, item)
+            if os.path.isfile(src):
+                shutil.copy2(src, deploy_dir)
+                logger.info("Copied %s to %s", src, deploy_dir)
+
+
+def compile_soc(soc_dir: str, soc_project: str,
+                secure_enable: bool = False,
+                factory_emmc_img_enable: bool = False,
+                profile_mode: str = 'Debug'):
+    """
+    执行 SOC 编译和部署。
+    对应原 Shell 的 compile_soc()。
+    """
+    # 切换到 SOC 根目录
+    original_cwd = os.getcwd()
+    os.chdir(soc_dir)
+    try:
+        # 检查 QNX 工具链
+        if not os.path.isdir('/opt/qnx800'):
+            raise BuildSOCError("QNX toolchain not found: /opt/qnx800")
+
+        # 建立软链接(如果不存在)
+        target_link = os.path.join('build_tools', 'Compiler', 'qnx800')
+        if not os.path.islink(target_link) and not os.path.exists(target_link):
+            os.makedirs(os.path.dirname(target_link), exist_ok=True)
+            os.symlink('/opt/qnx800', target_link)
+            logger.info("Created symlink: %s -> /opt/qnx800", target_link)
+
+        # 1. 编译镜像
+        build_cmd = ['./build-tools/build.sh', '-c', 'invo-all-image']
+        logger.info("SOC compile: %s", ' '.join(build_cmd))
+        subprocess.run(build_cmd, check=True)
+
+        # 2. 部署镜像(生成所有需要的镜像文件)
+        deploy_cmd = ['./build-tools/build.sh'] + _build_deploy_args(soc_project,
+                                                                     secure_enable,
+                                                                     factory_emmc_img_enable,
+                                                                     profile_mode) + ['-b', 'invo-all-image']
+        logger.info("SOC deploy: %s", ' '.join(deploy_cmd))
+        subprocess.run(deploy_cmd, check=True)
+
+    except subprocess.CalledProcessError as e:
+        raise BuildSOCError(f"SOC build failed: {e}") from e
+    finally:
+        os.chdir(original_cwd)
+
+
+def _build_deploy_args(soc_project: str, secure_enable: bool,
+                       factory_emmc_img_enable: bool, profile_mode: str) -> List[str]:
+    """
+    构造部署命令的额外参数。
+    对应原 Shell 的 extra_para 逻辑。
+    """
+    args = []
+
+    # DV 项目添加 -d
+    if 'DV' in soc_project.upper():
+        args.append('-d')
+
+    # 安全编译
+    if secure_enable:
+        args.append('-s')
+
+    # 工厂 eMMC 镜像
+    if factory_emmc_img_enable:
+        args.append('-f')
+
+    # Release 模式
+    if profile_mode == 'Release':
+        args.append('-r')
+
+    return args

+ 123 - 0
config.py

@@ -0,0 +1,123 @@
+import os
+from dataclasses import dataclass, field
+from typing import Optional, List
+
+@dataclass
+class PiplineConfig:
+    """封装流水线所有配置参数"""
+    # gerrit配置
+    gerrit_name: str = "jenkins"
+    gerrit_host: str = "10.2.90.253"
+    gerrit_port: int = 29418
+    gerrit_changes: List[str] = field(default_factory=list)
+    gerrit_flag: bool = False
+
+    # 构建基础配置
+    workspace: str = ""
+    soc_project: str = ""
+    soc_branch: str = ""
+    mcu_release_version: str = ""
+    soc_release_version: str = ""
+    profile_mode: str = "Debug"
+
+    # FTP配置
+    ftp_root: str = ""
+    deploy_image_local_dir: str = ""
+    
+    ftp_host: str = os.environ.get('ftpAddr', '10.2.90.252')
+    ftp_port: int = int(os.environ.get('ftpPort', '21'))
+    ftp_user: str = os.environ.get('ftpUser', 'ftpuser')
+    ftp_password: str = os.environ.get('ftpPasword', '123456')
+
+    # 分支
+    repo_branch: str = ""
+    auto_commit_branch: str = "AD26IDV01-dev"
+
+    # 功能开关
+    upgrade_package_bootloader: bool = False
+    upgrade_package_camera: bool = False
+    upgrade_package_recovery: bool = False
+    factory_emmc_img_enable: bool = False
+    secure_enable: bool = False
+
+    # 构建快照
+    snapshot: str = "default"
+    build_user: str = ""
+
+    # 自动提交控制
+    auto_commit_server: bool = True
+
+    # 二进制仓库列表
+    binary_repos: List[str] = field(default_factory=lambda: [])
+
+    # 路径
+    mcu_dir: str = ""
+    soc_dir: str = ""
+
+    # manifest 仓库 URL
+    mcu_manifest_url: str = ""
+    soc_manifest_url: str = ""
+
+    @classmethod
+    def from_env(cls) -> 'PiplineConfig':
+        """从环境变量中加载配置, 并处理shell脚本中的逻辑"""
+        cfg = cls()
+
+        # 基础路径
+        cfg.workspace = os.environ.get("WORKSPACE", "")
+        cfg.soc_project = os.environ.get('SOC_PROJECT', '')
+        cfg.soc_branch = os.environ.get('SOC_BRANCH', '')
+        cfg.mcu_release_version = os.environ.get('mcu_release_version', '')
+        cfg.soc_release_version = os.environ.get('soc_release_version', '')
+        cfg.build_user = os.environ.get('BUILD_USER', '')
+
+        cfg.mcu_dir = os.path.join(cfg.workspace, 'mcu')
+        cfg.soc_dir = os.path.join(cfg.workspace, 'soc')
+        cfg.mcu_manifest_url = f"ssh://{cfg.gerrit_host}/INVO/MCU/HR/manifest"
+        cfg.soc_manifest_url = f"ssh://{cfg.gerrit_host}/INVO/SOC/HR/manifest"
+
+        # ---- 分支处理 ----
+        cfg.repo_branch = cfg.soc_branch  # Shell: REPO_BRANCH="$SOC_BRANCH"
+
+        # ---- FTP 路径 ----
+        cfg.ftp_root = f"HORIZON/J6B/{cfg.soc_project}" if cfg.soc_project else ""
+        cfg.deploy_image_local_dir = f"{cfg.workspace}/{cfg.ftp_root}" if cfg.ftp_root else ""
+
+        # ---- 二进制仓库目录 ----
+        cfg.binary_repos = [f"{cfg.workspace}/soc/hbbin/mcubin/"]
+
+        # ---- Gerrit ----
+        cfg.gerrit_changes = os.environ.get('GERRIT_CHANGES', '')
+        cfg.gerrit_flag = bool(cfg.gerrit_changes.strip()) if cfg.gerrit_changes else False
+
+        # ---- 快照 ----
+        cfg.snapshot = os.environ.get('SNAPSHOT', 'default')
+
+        # ---- 功能开关 (字符串 -> 布尔) ----
+        cfg.upgrade_package_bootloader = os.environ.get('upgrade_package_bootloader', 'false').lower() == 'true'
+        cfg.upgrade_package_camera = os.environ.get('upgrade_package_camera', 'false').lower() == 'true'
+        cfg.upgrade_package_recovery = os.environ.get('upgrade_package_recovery', 'false').lower() == 'true'
+        cfg.factory_emmc_img_enable = os.environ.get('factory_emmc_img_enable', 'false').lower() == 'true'
+        cfg.secure_enable = os.environ.get('secure_enable', 'false').lower() == 'true'
+
+        # ---- 触发方式导致的默认值覆盖 ----
+        # 原脚本:如果 BUILD_USER 为空(定时触发),则关闭部分功能
+        if not cfg.build_user.strip():
+            cfg.gerrit_changes = ""
+            cfg.gerrit_flag = False
+            cfg.upgrade_package_recovery = False
+            cfg.upgrade_package_bootloader = False
+            cfg.secure_enable = False
+            cfg.factory_emmc_img_enable = False
+
+        # ---- 自动提交策略 ----
+        # Shell: AUTO_COMMIT_SERVER=1 仅当 GERRIT_FLAG=0
+        cfg.auto_commit_server = not cfg.gerrit_flag
+
+        # ---- Profile mode ----
+        cfg.profile_mode = os.environ.get('PROFILE_MODE', 'Debug')  # 原脚本有判断,但主流程未显式设置
+
+        # ---- 额外常量 ----
+        cfg.threads_num = 12  # Shell 中有 THREADS_NUM=12,但未使用,保留
+
+        return cfg

+ 47 - 0
environment.py

@@ -0,0 +1,47 @@
+import os
+import subprocess
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+def setup_environment():
+    """
+    加载 /etc/profile.d/invo.sh 的环境变量到当前 Python 进程。
+    同时设置 ulimit -n 4096 和 PYTHONUNBUFFERED=1。
+    """
+    _source_shell_script("/etc/profile.d/invo.sh")
+    _set_resource_limits()
+    os.environ['PYTHONUNBUFFERED'] = '1'
+    os.environ.setdefault('LESSCHARSET', 'utf-8')
+
+
+def _source_shell_script(script_path: str):
+    """执行 source <script> && env,将结果合并到 os.environ"""
+    if not os.path.isfile(script_path):
+        logger.warning(f"Shell script not found: {script_path}")
+        return
+
+    cmd = f"bash -c 'source {script_path} && env'"
+    try:
+        output = subprocess.check_output(cmd, shell=True, stderr=subprocess.PIPE,
+                                         universal_newlines=True)
+    except subprocess.CalledProcessError as e:
+        logger.error(f"Failed to source {script_path}: {e.stderr}")
+        raise
+
+    for line in output.splitlines():
+        key, _, value = line.partition('=')
+        if key:
+            os.environ[key] = value
+
+
+def _set_resource_limits():
+    """ulimit -n 4096"""
+    try:
+        import resource
+        soft, hard = resource.getrlimit(resource.RLIMIT_NOFILE)
+        resource.setrlimit(resource.RLIMIT_NOFILE, (4096, hard))
+        logger.info("File descriptor limit set to 4096")
+    except ValueError:
+        logger.warning("Could not set file descriptor limit")

+ 79 - 0
ftp_upload.py

@@ -0,0 +1,79 @@
+"""
+ftp_upload.py - 将本地部署目录通过 FTP 上传到服务器。
+可被 artifacts.py 调用,也可在 pipeline.py 中独立使用。
+"""
+import ftplib
+import os
+import logging
+from pathlib import Path
+from typing import Optional
+
+logger = logging.getLogger(__name__)
+
+
+class FTPUploadError(Exception):
+    """FTP 上传异常"""
+    pass
+
+
+def upload_directory(cfg, local_dir: str) -> None:
+    """
+    将本地目录递归上传到 FTP 服务器。
+
+    Args:
+        cfg: PipelineConfig 对象,包含 FTP 连接信息与 ftp_root 路径。
+        local_dir: 本地待上传目录路径(如 .../HORIZON/J6B/AD26IDV01/xxx/deploy_dir)
+    """
+    local_path = Path(local_dir)
+    if not local_path.is_dir():
+        raise FTPUploadError(f"Local upload directory not found: {local_dir}")
+
+    # 目标 FTP 根路径:DB-年/FTP_ROOT,与原 shell 一致
+    import datetime
+    ftp_root = f"DB-{datetime.datetime.now().year}/{cfg.ftp_root}"
+
+    logger.info("Connecting to FTP %s:%d", cfg.ftp_host, cfg.ftp_port)
+    ftp = ftplib.FTP()
+    try:
+        ftp.connect(host=cfg.ftp_host, port=cfg.ftp_port, timeout=30)
+        ftp.login(user=cfg.ftp_user, passwd=cfg.ftp_password)
+        ftp.set_pasv(True)                     # 被动模式
+        _mkdir_p(ftp, ftp_root)                # 确保远端根目录存在
+        _upload_recursive(ftp, local_path, ftp_root)
+        logger.info("FTP upload completed successfully: %s -> %s", local_dir, ftp_root)
+    except ftplib.all_errors as e:
+        raise FTPUploadError(f"FTP upload failed: {e}") from e
+    finally:
+        try:
+            ftp.quit()
+        except Exception:
+            pass
+
+
+def _mkdir_p(ftp: ftplib.FTP, remote_path: str) -> None:
+    """递归创建远端目录,类似 mkdir -p"""
+    dirs = remote_path.strip('/').split('/')
+    current = ''
+    for d in dirs:
+        current += f"/{d}"
+        try:
+            ftp.cwd(current)
+        except ftplib.error_perm:
+            ftp.mkd(current)
+            ftp.cwd(current)
+
+
+def _upload_recursive(ftp: ftplib.FTP, local_path: Path, remote_dir: str) -> None:
+    """递归上传文件与子目录"""
+    ftp.cwd(remote_dir)
+    for item in local_path.iterdir():
+        remote_full = f"{remote_dir}/{item.name}"
+        if item.is_file():
+            with open(item, 'rb') as f:
+                ftp.storbinary(f'STOR {item.name}', f)
+            logger.debug("Uploaded file: %s", remote_full)
+        elif item.is_dir():
+            _mkdir_p(ftp, remote_full)
+            _upload_recursive(ftp, item, remote_full)
+        else:
+            logger.warning("Skipping non-regular file: %s", item)

+ 170 - 0
gerrit.py

@@ -0,0 +1,170 @@
+import subprocess
+import logging
+import os
+from typing import List, Dict, Optional
+
+logger = logging.getLogger(__name__)
+
+
+class GerritError(Exception):
+    """Gerrit 操作异常"""
+    pass
+
+
+def _run_ssh(host: str, port: int, user: str, command: str) -> str:
+    """执行 Gerrit SSH 命令并返回标准输出"""
+    cmd = [
+        'ssh',
+        '-p', str(port),
+        f'{user}@{host}',
+        'gerrit', *command.split()
+    ]
+    try:
+        result = subprocess.run(cmd, capture_output=True, text=True, check=True)
+        return result.stdout
+    except subprocess.CalledProcessError as e:
+        error_msg = e.stderr.strip()
+        logger.error("Gerrit SSH command failed: %s", error_msg)
+        raise GerritError(error_msg) from e
+
+
+def query_changes(host: str, port: int, user: str,
+                  change_ids: List[str]) -> List[Dict[str, str]]:
+    """
+    查询 Gerrit 上的变更信息。
+
+    Args:
+        host: Gerrit 主机
+        port: SSH 端口
+        user: Gerrit 用户名
+        change_ids: 变更 ID 列表(如 ['12345', '12346'])
+
+    Returns:
+        每个变更的信息字典列表,包含 change_id, project, revision, ref
+    """
+    changes_info = []
+    for cid in change_ids:
+        output = _run_ssh(host, port, user,
+                          f"query --current-patch-set status:open change:{cid}")
+
+        project = revision = ref = None
+        for line in output.splitlines():
+            line = line.strip()
+            if line.startswith('project:'):
+                project = line.split(':', 1)[1].strip()
+            elif line.startswith('revision:'):
+                revision = line.split(':', 1)[1].strip()
+            elif line.startswith('ref:') and cid in line:
+                # ref 行格式:ref: refs/changes/xx/12345/1
+                ref = line.split('ref:', 1)[1].strip()
+                # 确保提取到 refs/changes/... 部分
+                if 'refs/' in ref:
+                    ref = ref[ref.index('refs/'):]
+
+        if not project or not revision:
+            raise GerritError(f"Cannot parse project/revision for change {cid}")
+
+        changes_info.append({
+            'change_id': cid,
+            'project': project,
+            'revision': revision,
+            'ref': ref or ''
+        })
+        logger.info("Change %s: project=%s, revision=%s", cid, project, revision)
+
+    return changes_info
+
+
+def _find_repo_root(project: str, mcu_dir: str, soc_dir: str) -> Optional[str]:
+    """
+    根据 project 名称,找到它属于 MCU 还是 SOC 工程。
+    优先检查 manifest/default.xml 中是否包含该项目。
+    """
+    for repo_dir in [soc_dir, mcu_dir]:
+        manifest = os.path.join(repo_dir, '.repo', 'manifests', 'default.xml')
+        if os.path.isfile(manifest):
+            try:
+                with open(manifest, 'r') as f:
+                    if project in f.read():
+                        return repo_dir
+            except Exception:
+                continue
+    # 备选:使用 repo list 命令查找
+    for repo_dir in [soc_dir, mcu_dir]:
+        try:
+            result = subprocess.run(
+                ['repo', 'list'], cwd=repo_dir,
+                capture_output=True, text=True, check=False
+            )
+            if project in result.stdout:
+                return repo_dir
+        except Exception:
+            continue
+
+    return None
+
+
+def apply_changes(changes_info: List[Dict], mcu_dir: str, soc_dir: str):
+    """
+    将 Gerrit 变更通过 repo download 应用到对应的工程。
+
+    Args:
+        changes_info: query_changes 的返回结果
+        mcu_dir: MCU 工程的根目录
+        soc_dir: SOC 工程的根目录
+    """
+    for info in changes_info:
+        project = info['project']
+        ref = info['ref']
+
+        repo_root = _find_repo_root(project, mcu_dir, soc_dir)
+        if repo_root is None:
+            raise GerritError(f"Project {project} not found in MCU or SOC manifest")
+
+        # ref 可能带前缀,确保为 refs/changes/... 的形式
+        if not ref.startswith('refs/'):
+            ref = 'refs/changes/' + ref.lstrip('/')
+        # repo download 需要 'project_path change/ref' 的形式
+        # 如果 ref 已包含 'refs/changes/...',直接用;否则拼接
+        download_ref = f"{ref}" if '/' in ref else f"refs/changes/{ref}"
+        # 原脚本中 download_gerrit_project 使用了 change$change_index,即直接使用查询到的 ref 部分
+        # 这里我们直接使用 ref 即可(通常是 refs/changes/<NN>/<change>/<patchset>)
+
+        cmd = ['repo', 'download', project, ref]
+        logger.info("Applying change %s: %s", info['change_id'], ' '.join(cmd))
+        try:
+            subprocess.run(cmd, cwd=repo_root, check=True)
+        except subprocess.CalledProcessError as e:
+            raise GerritError(f"repo download failed for {info['change_id']}: {e}") from e
+
+
+def label_verified(host: str, port: int, user: str,
+                   changes_info: List[Dict]):
+    """
+    给所有变更打上 Verified +1 标签。
+
+    Args:
+        changes_info: query_changes 的返回结果
+    """
+    for info in changes_info:
+        revision = info['revision']
+        _run_ssh(host, port, user, f"review --verified +1 {revision}")
+        logger.info("Verified +1 on change %s", info['change_id'])
+
+
+def get_history(host: str, port: int, user: str,
+                project: str, branch: str, output_file: str):
+    """
+    查询已合并的评审记录,并保存到指定文件。
+
+    Args:
+        project: Git 项目名(如 INVO/MCU/APP/canbus_mng)
+        branch: 分支名
+        output_file: 输出文件路径
+    """
+    # 查询已合并的状态,起始时间定为较早的时间,确保覆盖所有历史
+    query = f"query --format=TEXT status:merged project:{project} branch:{branch} after:2020-01-01"
+    output = _run_ssh(host, port, user, query)
+    with open(output_file, 'w', encoding='utf-8') as f:
+        f.write(output)
+    logger.info("Saved merged history for %s to %s", project, output_file)

+ 56 - 0
logging_conf.py

@@ -0,0 +1,56 @@
+"""
+logging_conf.py - 统一日志配置
+对应原 Shell 中 echo + tee 的组合效果。
+"""
+import logging
+import sys
+from pathlib import Path
+from typing import Optional
+
+
+def setup_logging(
+    level: int = logging.INFO,
+    log_file: Optional[str] = None,
+    console: bool = True,
+) -> None:
+    """
+    配置根日志记录器。
+
+    Args:
+        level: 日志级别,例如 logging.DEBUG / logging.INFO
+        log_file: 文件路径,如果提供则同时写入该文件(相当于 tee 效果)
+        console: 是否输出到控制台(sys.stdout)
+    """
+    root = logging.getLogger()
+    root.setLevel(level)
+
+    # 清除已有 handlers,避免重复
+    root.handlers.clear()
+
+    formatter = logging.Formatter(
+        "%(asctime)s [%(levelname)-8s] %(name)s: %(message)s",
+        datefmt="%Y-%m-%d %H:%M:%S",
+    )
+
+    if console:
+        console_handler = logging.StreamHandler(sys.stdout)
+        console_handler.setLevel(level)
+        console_handler.setFormatter(formatter)
+        root.addHandler(console_handler)
+
+    if log_file:
+        log_path = Path(log_file)
+        log_path.parent.mkdir(parents=True, exist_ok=True)
+        file_handler = logging.FileHandler(str(log_path), encoding="utf-8")
+        file_handler.setLevel(level)
+        file_handler.setFormatter(formatter)
+        root.addHandler(file_handler)
+
+    # 减少第三方库的日志噪音
+    logging.getLogger("paramiko").setLevel(logging.WARNING)
+    logging.getLogger("ftplib").setLevel(logging.WARNING)
+
+
+def get_logger(name: str) -> logging.Logger:
+    """获取命名日志记录器,推荐使用 __name__ 作为参数"""
+    return logging.getLogger(name)

+ 194 - 0
pipeline.py

@@ -0,0 +1,194 @@
+#!/usr/bin/env python3
+"""
+pipeline.py - 域控软件集成流水线主入口
+
+完全替代原 Shell 脚本的主流程。
+使用模块化设计,每个步骤对应一个或多个模块函数。
+"""
+import os
+import sys
+import logging
+from pathlib import Path
+
+# 导入各模块
+from config import PipelineConfig
+from environment import setup_environment
+from logging_conf import setup_logging
+
+# 其他模块按需导入,避免循环依赖(可在函数内导入)
+# 这里为了代码清晰,提前导入全部
+from binary_repo import pull_binaries, commit_binaries
+from repo_manager import download_mcu, download_soc
+from gerrit import query_changes, apply_changes, label_verified
+from build_mcu import prepare_mcu, compile_mcu
+from build_soc import prepare_soc, compile_soc
+from artifacts import (
+    collect_logs,
+    collect_newest_one_commit,
+    collect_review_records,
+    collect_qac_report,
+    prepare_ftp_upload,
+    commit_snapshot_manifest,
+    clean_csv_add_header,
+)
+from ftp_upload import upload_directory
+from utils import CommandError, RepoError, BuildMCUError, BuildSOCError, GerritError, ArtifactError
+
+
+def run_pipeline():
+    """执行完整的 CI/CD 流水线。"""
+
+    # ---------- 1. 加载配置 ----------
+    cfg = PipelineConfig.from_env()
+
+    # ---------- 2. 配置日志 ----------
+    log_file = os.path.join(cfg.workspace, 'pipeline.log')
+    setup_logging(level=logging.INFO, log_file=log_file, console=True)
+    logger = logging.getLogger(__name__)
+    logger.info("Pipeline started - configuration: %s", cfg)
+
+    try:
+        # ---------- 3. 初始化环境 ----------
+        logger.info("===== Setting up environment =====")
+        setup_environment()
+        # 确保 PYTHONUNBUFFERED 在子进程中也生效(已在 environment 中设置)
+
+        # ---------- 4. 同步二进制仓库 ----------
+        logger.info("===== Syncing binary repositories =====")
+        for repo in cfg.binary_repos:
+            pull_binaries(repo)
+
+        # ---------- 5. 下载源码 ----------
+        logger.info("===== Downloading source code =====")
+        download_mcu(
+            cfg.mcu_dir,
+            cfg.mcu_manifest_url,
+            cfg.repo_branch,
+            f"ssh://{cfg.gerrit_host}/tools/repo",
+            cfg.mcu_release_version,
+            cfg.snapshot,
+        )
+        download_soc(
+            cfg.soc_dir,
+            cfg.soc_manifest_url,
+            cfg.repo_branch,
+            f"ssh://{cfg.gerrit_host}/tools/repo",
+            cfg.soc_release_version,
+            cfg.snapshot,
+        )
+
+        # ---------- 6. 应用 Gerrit 变更 ----------
+        changes = []
+        if cfg.gerrit_flag:
+            logger.info("===== Downloading Gerrit changes =====")
+            changes = query_changes(
+                cfg.gerrit_host,
+                cfg.gerrit_port,
+                cfg.gerrit_name,
+                cfg.gerrit_changes_list,
+            )
+            apply_changes(changes, cfg.mcu_dir, cfg.soc_dir)
+
+        # ---------- 7. 编译 MCU ----------
+        logger.info("===== Compiling MCU =====")
+        prepare_mcu(cfg.mcu_sdk_dir, cfg.soc_dir)   # 当前为空,保留扩展
+        compile_mcu(cfg.mcu_sdk_dir, cfg.profile_mode, cfg.soc_project)
+
+        # ---------- 8. 编译 SOC ----------
+        logger.info("===== Compiling SOC =====")
+        prepare_soc(cfg.mcu_sdk_dir, cfg.soc_dir, cfg.soc_project)
+        compile_soc(
+            cfg.soc_dir,
+            cfg.soc_project,
+            cfg.secure_enable,
+            cfg.factory_emmc_img_enable,
+            cfg.profile_mode,
+        )
+
+        # ---------- 9. Gerrit 打分 ----------
+        if cfg.gerrit_flag and changes:
+            logger.info("===== Labeling Gerrit changes =====")
+            label_verified(
+                cfg.gerrit_host,
+                cfg.gerrit_port,
+                cfg.gerrit_name,
+                changes,
+            )
+
+        # ---------- 10. 自动提交二进制产物 ----------
+        if cfg.auto_commit_server:
+            logger.info("===== Committing binary repos =====")
+            for repo in cfg.binary_repos:
+                commit_binaries(repo, cfg.auto_commit_branch)
+
+        # ---------- 11. 收集日志、快照、QAC ----------
+        logger.info("===== Collecting commit logs and snapshots =====")
+        collect_log_file = Path(cfg.workspace) / 'commit_log.csv'
+        newest_log_file = Path(cfg.workspace) / 'newest_one_commit_log.csv'
+        review_records_dir = Path(cfg.workspace) / 'review_records'
+
+        # 清理旧文件
+        collect_log_file.unlink(missing_ok=True)
+        newest_log_file.unlink(missing_ok=True)
+        if review_records_dir.exists():
+            import shutil
+            shutil.rmtree(review_records_dir)
+        review_records_dir.mkdir(parents=True)
+
+        for repo_dir in [cfg.mcu_dir, cfg.soc_dir]:
+            logger.info("Collecting logs from %s", repo_dir)
+            collect_logs(repo_dir, str(collect_log_file))
+            collect_newest_one_commit(repo_dir, str(newest_log_file))
+            collect_review_records(repo_dir, str(review_records_dir))
+
+        # 清理 CSV 并添加表头
+        header_commit = [
+            'project name', 'commit hash', 'auth name', 'auth email',
+            'auth date', 'commit date', 'subject',
+        ]
+        header_newest = header_commit + ['code lines']  # 在新脚本中可保留兼容
+        clean_csv_add_header(collect_log_file, header_commit)
+        clean_csv_add_header(newest_log_file, header_newest)
+
+        # 收集 QAC 报告
+        qac_report_dir = Path(cfg.workspace) / 'qac_report'
+        collect_qac_report(cfg.mcu_sdk_dir, str(qac_report_dir))
+
+        # ---------- 12. FTP 上传准备 ----------
+        logger.info("===== Preparing FTP upload =====")
+        prepare_ftp_upload(
+            cfg,
+            str(collect_log_file),
+            str(newest_log_file),
+            str(review_records_dir),
+            str(qac_report_dir) if qac_report_dir.exists() else None,
+        )
+
+        # ---------- 13. 提交发布基线 manifest ----------
+        logger.info("===== Committing release manifests =====")
+        commit_snapshot_manifest(cfg.mcu_dir, cfg.mcu_release_version)
+        commit_snapshot_manifest(cfg.soc_dir, cfg.soc_release_version)
+
+        # ---------- 14. 可选:触发实际的 FTP 上传 ----------
+        if os.environ.get('FTP_UPLOAD_ENABLE', 'true').lower() == 'true':
+            logger.info("===== Uploading to FTP =====")
+            # 需要本地部署目录存在,由 prepare_ftp_upload 创建
+            deploy_dir = Path(cfg.deploy_image_local_dir) / cfg.repo_branch
+            # 找到最新的部署子目录(时间戳命名)
+            deploy_subdirs = sorted(deploy_dir.iterdir(), key=lambda p: p.name, reverse=True)
+            if deploy_subdirs:
+                latest_deploy = deploy_subdirs[0]
+                upload_directory(cfg, str(latest_deploy))
+            else:
+                logger.warning("No deploy directory found for FTP upload")
+
+        logger.info("===== Pipeline completed successfully =====")
+
+    except (CommandError, BuildMCUError, BuildSOCError, GerritError,
+            ArtifactError, Exception) as e:
+        logger.exception("Pipeline failed: %s", e)
+        sys.exit(1)
+
+
+if __name__ == '__main__':
+    run_pipeline()

+ 113 - 0
repo_manager.py

@@ -0,0 +1,113 @@
+import os
+import subprocess
+import logging
+from pathlib import Path
+from typing import Optional, List
+
+logger = logging.getLogger(__name__)
+
+
+class RepoError(Exception):
+    """Repo 操作异常"""
+    pass
+
+
+def _run(cmd: List[str], cwd: Optional[str] = None) -> None:
+    """执行命令,失败时抛出异常,同时输出 stdout/stderr 到控制台"""
+    logger.debug("Running: %s", ' '.join(cmd))
+    subprocess.run(cmd, cwd=cwd, check=True)
+
+
+def _repo_init(manifest_url: str, branch: str, repo_url: str,
+               manifest_file: Optional[str] = None, depth: int = 7,
+               cwd: Optional[str] = None):
+    """
+    执行 `repo init`。
+    可选参数 `manifest_file` 用于指定 release manifest(相对路径,如 'release.xml' 或 'release/release_xxx.xml')
+    """
+    cmd = [
+        'repo', 'init',
+        f'--depth={depth}',
+        '-u', manifest_url,
+        '-b', branch,
+        '--repo-url', repo_url,
+    ]
+    if manifest_file:
+        cmd += ['-m', manifest_file]
+    _run(cmd, cwd=cwd)
+
+
+def _repo_sync(jobs: int = 4, cwd: Optional[str] = None):
+    """
+    执行 `repo sync`,参数与 Shell 版本完全一致:
+    -c: 只下载当前分支
+    -d: 分离到 manifest 指定版本
+    -f: 即使某个项目失败也继续
+    --force-sync: 覆盖错误的 object 目录
+    --no-tags: 不下载 tags
+    --prune: 删除不存在远端分支的本地分支
+    """
+    cmd = [
+        'repo', 'sync',
+        '-c', '-d', '-f',
+        '--force-sync',
+        '--no-tags',
+        '--prune',
+        f'-j{jobs}',
+    ]
+    _run(cmd, cwd=cwd)
+
+
+def _get_release_manifest_path(project_dir: str, version: str, snapshot: str) -> Optional[str]:
+    """
+    根据快照配置返回需要使用的 manifest 文件参数(相对路径)。
+    返回 None 表示不需要额外 manifest。
+    逻辑完全对应原 Shell 中的 update_repo_extra_para()。
+    """
+    if not version:
+        return None
+
+    manifests_dir = Path(project_dir) / '.repo' / 'manifests'
+    release_dir = manifests_dir / 'release'
+
+    # 默认使用 release.xml(原脚本:当 version 非空时,先设置 -m release.xml)
+    manifest_file = 'release.xml'
+
+    if snapshot != 'default':
+        specific_file = f"release_{version}.xml"
+        specific_path = release_dir / specific_file
+        if specific_path.is_file():
+            # 使用具体快照文件
+            manifest_file = f"release/{specific_file}"
+        else:
+            raise RepoError(f"Snapshot manifest not found: {specific_path}")
+    return manifest_file
+
+
+def download_project(project_dir: str, manifest_url: str, branch: str,
+                     repo_url: str, version: Optional[str] = None,
+                     snapshot: str = 'default', sync_jobs: int = 4):
+    """
+    下载一个 repo 工程到指定目录。
+    对应原 Shell 中的 download_mcu / download_soc。
+    """
+    target = Path(project_dir)
+    target.mkdir(parents=True, exist_ok=True)
+
+    # 第一次 repo init(不带 -m)
+    _repo_init(manifest_url, branch, repo_url, depth=7, cwd=str(target))
+
+    # 是否需要额外的 manifest(release.xml 或 release/release_xxx.xml)
+    try:
+        extra_manifest = _get_release_manifest_path(str(target), version, snapshot)
+        if extra_manifest:
+            logger.info("Using extra manifest: %s", extra_manifest)
+            # 第二次 repo init 带上 -m 参数(覆盖)
+            _repo_init(manifest_url, branch, repo_url,
+                       manifest_file=extra_manifest, depth=7, cwd=str(target))
+    except RepoError:
+        logger.exception("Failed to determine release manifest")
+        raise
+
+    # repo sync
+    _repo_sync(jobs=sync_jobs, cwd=str(target))

+ 139 - 0
utils.py

@@ -0,0 +1,139 @@
+"""
+utils.py - 通用工具函数,包括:
+  - 带重试的命令执行(解决 License 不足等临时错误)
+  - 简单命令执行
+  - CSV 文件清理与表头添加
+"""
+import subprocess
+import time
+import re
+import logging
+from pathlib import Path
+from typing import List, Optional
+
+logger = logging.getLogger(__name__)
+
+
+class CommandError(Exception):
+    """命令执行失败异常"""
+    pass
+
+
+# ---------------------------------------------------------------------------
+# 1. 带重试的命令执行
+# ---------------------------------------------------------------------------
+def run_command_with_retry(
+    cmd_parts: List[str],
+    cwd: Optional[str] = None,
+    retry_count: int = 5,
+    retry_pattern: Optional[str] = None,
+    log_file: Optional[str] = None
+) -> None:
+    """
+    执行命令,支持基于输出内容的重试。
+
+    Args:
+        cmd_parts: 命令及参数列表,如 ['make', 'all']
+        cwd: 工作目录
+        retry_count: 最大重试次数(尝试总次数 = 1 + retry_count?)
+                     本函数实现为:最多重试 retry_count 次,
+                     即总共执行 retry_count+1 次。
+        retry_pattern: 用于匹配 stdout/stderr 的正则表达式;
+                       如果匹配成功且命令失败,则重试。
+        log_file: 可选路径,命令输出同时写入此文件。
+    """
+    attempt = 0
+    max_attempts = retry_count + 1   # 总共尝试次数,至少1次
+
+    while attempt < max_attempts:
+        attempt += 1
+        logger.debug("Running (attempt %d/%d): %s", attempt, max_attempts, ' '.join(cmd_parts))
+        result = subprocess.run(cmd_parts, cwd=cwd, capture_output=True, text=True)
+
+        # 输出到控制台(可实时显示,这里偷懒了,捕获完后统一打印)
+        if result.stdout:
+            logger.info(result.stdout.rstrip())
+        if result.stderr:
+            logger.error(result.stderr.rstrip())
+
+        # 写入日志文件
+        if log_file:
+            Path(log_file).parent.mkdir(parents=True, exist_ok=True)
+            with open(log_file, 'w', encoding='utf-8') as f:
+                f.write(result.stdout)
+                if result.stderr:
+                    f.write('\n[STDERR]\n')
+                    f.write(result.stderr)
+
+        # 成功则直接返回
+        if result.returncode == 0:
+            logger.debug("Command succeeded on attempt %d", attempt)
+            return
+
+        # 失败处理
+        # 检查是否需要重试
+        if retry_pattern and re.search(retry_pattern, result.stdout + result.stderr):
+            if attempt < max_attempts:
+                logger.warning("Pattern '%s' found, retrying (%d/%d)...",
+                               retry_pattern, attempt, retry_count)
+                time.sleep(1)
+                continue
+            else:
+                raise CommandError(
+                    f"Command failed after {retry_count} retries (pattern match): "
+                    f"exit code {result.returncode}"
+                )
+        else:
+            # 非重试类错误,直接失败
+            raise CommandError(
+                f"Command failed (exit code {result.returncode}):\n"
+                f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
+            )
+
+    # 不应到达这里,但为安全起见
+    raise CommandError(f"Command failed after {retry_count} retries")
+
+
+# ---------------------------------------------------------------------------
+# 2. 简单命令执行(无重试)
+# ---------------------------------------------------------------------------
+def run_command(cmd_parts: List[str], cwd: Optional[str] = None) -> None:
+    """
+    执行命令,失败抛出 CommandError。
+    """
+    result = subprocess.run(cmd_parts, cwd=cwd, capture_output=True, text=True)
+    if result.returncode != 0:
+        raise CommandError(
+            f"Command '{' '.join(cmd_parts)}' failed with exit code {result.returncode}\n"
+            f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
+        )
+
+
+# ---------------------------------------------------------------------------
+# 3. CSV 文件清理与表头添加
+# ---------------------------------------------------------------------------
+def clean_csv_add_header(file_path: Path, header_columns: List[str]) -> None:
+    """
+    删除 CSV 文件中的空白行,并在第一行插入指定的表头列。
+    注意:原 Shell 使用 sed 在文件最前面插入表头,这意味着原文件没有表头,
+          只有数据行。本函数也按此逻辑处理。
+    """
+    if not file_path.is_file():
+        logger.warning("CSV file not found for header cleaning: %s", file_path)
+        return
+
+    # 读取所有行,过滤空行
+    lines = []
+    with open(file_path, 'r', encoding='utf-8') as f:
+        for line in f:
+            stripped = line.strip()
+            if stripped:  # 保留非空行
+                lines.append(stripped)
+
+    # 重新写入:先写入表头,再写入数据行
+    with open(file_path, 'w', encoding='utf-8') as f:
+        f.write(','.join(header_columns) + '\n')
+        for line in lines:
+            f.write(line + '\n')
+
+    logger.debug("Cleaned CSV %s: %d data rows added header", file_path, len(lines))