utils.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. """
  2. utils.py - 通用工具函数,包括:
  3. - 带重试的命令执行(解决 License 不足等临时错误)
  4. - 简单命令执行
  5. - CSV 文件清理与表头添加
  6. """
  7. import subprocess
  8. import time
  9. import re
  10. import logging
  11. from pathlib import Path
  12. from typing import List, Optional
  13. logger = logging.getLogger(__name__)
  14. class CommandError(Exception):
  15. """命令执行失败异常"""
  16. pass
  17. # ---------------------------------------------------------------------------
  18. # 1. 带重试的命令执行
  19. # ---------------------------------------------------------------------------
  20. def run_command_with_retry(
  21. cmd_parts: List[str],
  22. cwd: Optional[str] = None,
  23. retry_count: int = 5,
  24. retry_pattern: Optional[str] = None,
  25. log_file: Optional[str] = None
  26. ) -> None:
  27. """
  28. 执行命令,支持基于输出内容的重试。
  29. Args:
  30. cmd_parts: 命令及参数列表,如 ['make', 'all']
  31. cwd: 工作目录
  32. retry_count: 最大重试次数(尝试总次数 = 1 + retry_count?)
  33. 本函数实现为:最多重试 retry_count 次,
  34. 即总共执行 retry_count+1 次。
  35. retry_pattern: 用于匹配 stdout/stderr 的正则表达式;
  36. 如果匹配成功且命令失败,则重试。
  37. log_file: 可选路径,命令输出同时写入此文件。
  38. """
  39. attempt = 0
  40. max_attempts = retry_count + 1 # 总共尝试次数,至少1次
  41. while attempt < max_attempts:
  42. attempt += 1
  43. logger.debug("Running (attempt %d/%d): %s", attempt, max_attempts, ' '.join(cmd_parts))
  44. result = subprocess.run(cmd_parts, cwd=cwd, capture_output=True, text=True)
  45. # 输出到控制台(可实时显示,这里偷懒了,捕获完后统一打印)
  46. if result.stdout:
  47. logger.info(result.stdout.rstrip())
  48. if result.stderr:
  49. logger.error(result.stderr.rstrip())
  50. # 写入日志文件
  51. if log_file:
  52. Path(log_file).parent.mkdir(parents=True, exist_ok=True)
  53. with open(log_file, 'w', encoding='utf-8') as f:
  54. f.write(result.stdout)
  55. if result.stderr:
  56. f.write('\n[STDERR]\n')
  57. f.write(result.stderr)
  58. # 成功则直接返回
  59. if result.returncode == 0:
  60. logger.debug("Command succeeded on attempt %d", attempt)
  61. return
  62. # 失败处理
  63. # 检查是否需要重试
  64. if retry_pattern and re.search(retry_pattern, result.stdout + result.stderr):
  65. if attempt < max_attempts:
  66. logger.warning("Pattern '%s' found, retrying (%d/%d)...",
  67. retry_pattern, attempt, retry_count)
  68. time.sleep(1)
  69. continue
  70. else:
  71. raise CommandError(
  72. f"Command failed after {retry_count} retries (pattern match): "
  73. f"exit code {result.returncode}"
  74. )
  75. else:
  76. # 非重试类错误,直接失败
  77. raise CommandError(
  78. f"Command failed (exit code {result.returncode}):\n"
  79. f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
  80. )
  81. # 不应到达这里,但为安全起见
  82. raise CommandError(f"Command failed after {retry_count} retries")
  83. # ---------------------------------------------------------------------------
  84. # 2. 简单命令执行(无重试)
  85. # ---------------------------------------------------------------------------
  86. def run_command(cmd_parts: List[str], cwd: Optional[str] = None) -> None:
  87. """
  88. 执行命令,失败抛出 CommandError。
  89. """
  90. result = subprocess.run(cmd_parts, cwd=cwd, capture_output=True, text=True)
  91. if result.returncode != 0:
  92. raise CommandError(
  93. f"Command '{' '.join(cmd_parts)}' failed with exit code {result.returncode}\n"
  94. f"STDOUT:\n{result.stdout}\nSTDERR:\n{result.stderr}"
  95. )
  96. # ---------------------------------------------------------------------------
  97. # 3. CSV 文件清理与表头添加
  98. # ---------------------------------------------------------------------------
  99. def clean_csv_add_header(file_path: Path, header_columns: List[str]) -> None:
  100. """
  101. 删除 CSV 文件中的空白行,并在第一行插入指定的表头列。
  102. 注意:原 Shell 使用 sed 在文件最前面插入表头,这意味着原文件没有表头,
  103. 只有数据行。本函数也按此逻辑处理。
  104. """
  105. if not file_path.is_file():
  106. logger.warning("CSV file not found for header cleaning: %s", file_path)
  107. return
  108. # 读取所有行,过滤空行
  109. lines = []
  110. with open(file_path, 'r', encoding='utf-8') as f:
  111. for line in f:
  112. stripped = line.strip()
  113. if stripped: # 保留非空行
  114. lines.append(stripped)
  115. # 重新写入:先写入表头,再写入数据行
  116. with open(file_path, 'w', encoding='utf-8') as f:
  117. f.write(','.join(header_columns) + '\n')
  118. for line in lines:
  119. f.write(line + '\n')
  120. logger.debug("Cleaned CSV %s: %d data rows added header", file_path, len(lines))