代码生成能力评估基准,通过164个编程问题测试模型编程水平
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import ast
import sys
import io
import contextlib
@dataclass
class HumanEvalProblem:
"""HumanEval问题数据结构"""
task_id: str
prompt: str
canonical_solution: str
test: str
entry_point: str
description: Optional[str] = None
class HumanEvalDataset:
"""HumanEval数据集管理"""
def __init__(self, data_path: str = "human-eval/data/HumanEval.jsonl"):
self.problems = self.load_problems(data_path)
self.analyze_dataset()
def load_problems(self, data_path: str) -> Dict[str, HumanEvalProblem]:
"""加载HumanEval问题集"""
import json
problems = {}
try:
with open(data_path, 'r') as f:
for line in f:
problem_data = json.loads(line.strip())
problem = HumanEvalProblem(
task_id=problem_data["task_id"],
prompt=problem_data["prompt"],
canonical_solution=problem_data["canonical_solution"],
test=problem_data["test"],
entry_point=problem_data["entry_point"],
description=problem_data.get("description")
)
problems[problem.task_id] = problem
except FileNotFoundError:
print(f"数据文件未找到: {data_path}")
# 创建示例问题用于演示
problems = self.create_sample_problems()
return problems
def create_sample_problems(self) -> Dict[str, HumanEvalProblem]:
"""创建示例问题(演示用)"""
sample_problems = {
"HumanEval/0": HumanEvalProblem(
task_id="HumanEval/0",
prompt='''def has_close_elements(numbers: List[float], threshold: float) -> bool:
""" Check if in given list of numbers, are any two numbers closer to each other than
given threshold.
>>> has_close_elements([1.0, 2.0, 3.0], 0.5)
False
>>> has_close_elements([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3)
True
"""
''',
canonical_solution=''' for idx, elem in enumerate(numbers):
for idx2, elem2 in enumerate(numbers):
if idx != idx2:
distance = abs(elem - elem2)
if distance < threshold:
return True
return False
''',
test='''def check(candidate):
assert candidate([1.0, 2.0, 3.0], 0.5) == False
assert candidate([1.0, 2.8, 3.0, 4.0, 5.0, 2.0], 0.3) == True
assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.3) == True
assert candidate([1.0, 2.0, 3.9, 4.0, 5.0, 2.2], 0.05) == False
assert candidate([1.0, 2.0, 5.9, 4.0, 5.0], 0.95) == True
assert candidate([1.0, 2.0, 5.9, 4.0, 5.0], 0.8) == False
assert candidate([1.0, 2.0, 3.0, 4.0, 5.0, 2.0], 0.1) == True
check(has_close_elements)
''',
entry_point="has_close_elements"
)
}
return sample_problems
def analyze_dataset(self):
"""分析数据集特征"""
if not self.problems:
return
# 统计信息
total_problems = len(self.problems)
avg_prompt_length = sum(len(p.prompt) for p in self.problems.values()) / total_problems
avg_solution_length = sum(len(p.canonical_solution) for p in self.problems.values()) / total_problems
print(f"HumanEval数据集分析:")
print(f"- 总问题数: {total_problems}")
print(f"- 平均提示长度: {avg_prompt_length:.0f}字符")
print(f"- 平均解决方案长度: {avg_solution_length:.0f}字符")
# 分析问题类型
self.categorize_problems()
def categorize_problems(self):
"""问题分类分析"""
categories = {
"字符串处理": 0,
"数学计算": 0,
"列表操作": 0,
"算法逻辑": 0,
"数据结构": 0
}
for problem in self.problems.values():
prompt_lower = problem.prompt.lower()
if any(keyword in prompt_lower for keyword in ["string", "str", "char", "text"]):
categories["字符串处理"] += 1
elif any(keyword in prompt_lower for keyword in ["math", "sum", "product", "calculate"]):
categories["数学计算"] += 1
elif any(keyword in prompt_lower for keyword in ["list", "array", "sort", "filter"]):
categories["列表操作"] += 1
elif any(keyword in prompt_lower for keyword in ["tree", "graph", "node", "dict"]):
categories["数据结构"] += 1
else:
categories["算法逻辑"] += 1
print("\n问题类型分布:")
for category, count in categories.items():
percentage = count / len(self.problems) * 100
print(f"- {category}: {count}个 ({percentage:.1f}%)")
import subprocess
import tempfile
import os
import signal
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import multiprocessing
class SafeCodeExecutor:
"""安全的代码执行环境"""
def __init__(self, timeout: int = 10, memory_limit: int = 128):
self.timeout = timeout
self.memory_limit = memory_limit # MB
def execute_code(self, code: str, test_code: str) -> Dict[str, Any]:
"""安全执行代码并运行测试"""
# 合并代码和测试
full_code = f"{code}\n\n{test_code}"
# 创建临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False) as f:
f.write(full_code)
temp_file = f.name
try:
# 使用subprocess执行代码
result = self.run_with_timeout(temp_file)
return result
finally:
# 清理临时文件
os.unlink(temp_file)
def run_with_timeout(self, file_path: str) -> Dict[str, Any]:
"""带超时的代码执行"""
try:
# 设置资源限制
cmd = [
"python3", "-c",
f"""
import resource
import sys
# 设置内存限制
resource.setrlimit(resource.RLIMIT_AS, ({self.memory_limit * 1024 * 1024}, {self.memory_limit * 1024 * 1024}))
# 设置CPU时间限制
resource.setrlimit(resource.RLIMIT_CPU, ({self.timeout}, {self.timeout}))
# 执行代码
with open('{file_path}', 'r') as f:
exec(f.read())
"""
]
process = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=self.timeout + 5 # 额外5秒缓冲
)
if process.returncode == 0:
return {
"status": "passed",
"stdout": process.stdout,
"stderr": process.stderr,
"execution_time": None # 需要实际测量
}
else:
return {
"status": "failed",
"stdout": process.stdout,
"stderr": process.stderr,
"return_code": process.returncode
}
except subprocess.TimeoutExpired:
return {
"status": "timeout",
"error": f"代码执行超时 ({self.timeout}秒)"
}
except Exception as e:
return {
"status": "error",
"error": str(e)
}
def validate_syntax(self, code: str) -> Dict[str, Any]:
"""验证代码语法"""
try:
ast.parse(code)
return {"valid": True}
except SyntaxError as e:
return {
"valid": False,
"error": f"语法错误: {e.msg} (第{e.lineno}行)",
"lineno": e.lineno
}
except Exception as e:
return {
"valid": False,
"error": f"解析错误: {str(e)}"
}
class HumanEvalExecutor:
"""HumanEval专用执行器"""
def __init__(self, dataset: HumanEvalDataset):
self.dataset = dataset
self.executor = SafeCodeExecutor()
def evaluate_single_problem(self, task_id: str, generated_code: str) -> Dict[str, Any]:
"""评估单个问题的生成代码"""
if task_id not in self.dataset.problems:
return {"error": f"未找到问题: {task_id}"}
problem = self.dataset.problems[task_id]
# 验证语法
syntax_check = self.executor.validate_syntax(generated_code)
if not syntax_check["valid"]:
return {
"task_id": task_id,
"status": "syntax_error",
"error": syntax_check["error"],
"passed": False
}
# 合并提示和生成的代码
complete_code = problem.prompt + generated_code
# 执行测试
execution_result = self.executor.execute_code(complete_code, problem.test)
result = {
"task_id": task_id,
"generated_code": generated_code,
"execution_result": execution_result,
"passed": execution_result["status"] == "passed"
}
return result
def batch_evaluate(self, solutions: Dict[str, List[str]], k_values: List[int] = [1, 10, 100]) -> Dict[str, Any]:
"""批量评估多个解决方案"""
results = {}
for task_id, code_samples in solutions.items():
task_results = []
for i, code in enumerate(code_samples):
result = self.evaluate_single_problem(task_id, code)
result["sample_index"] = i
task_results.append(result)
results[task_id] = task_results
# 计算pass@k指标
pass_at_k_results = self.calculate_pass_at_k(results, k_values)
return {
"detailed_results": results,
"pass_at_k": pass_at_k_results,
"summary": self.generate_summary(results, pass_at_k_results)
}
def calculate_pass_at_k(self, results: Dict[str, List[Dict]], k_values: List[int]) -> Dict[int, float]:
"""计算pass@k指标"""
pass_at_k = {}
for k in k_values:
total_problems = 0
passed_problems = 0
for task_id, task_results in results.items():
if len(task_results) >= k:
total_problems += 1
# 检查前k个样本中是否有至少一个通过
k_samples = task_results[:k]
if any(result["passed"] for result in k_samples):
passed_problems += 1
pass_at_k[k] = passed_problems / total_problems if total_problems > 0 else 0
return pass_at_k
def generate_summary(self, results: Dict, pass_at_k: Dict[int, float]) -> Dict[str, Any]:
"""生成评估摘要"""
total_problems = len(results)
total_samples = sum(len(task_results) for task_results in results.values())
total_passed = sum(
sum(1 for result in task_results if result["passed"])
for task_results in results.values()
)
return {
"total_problems": total_problems,
"total_samples": total_samples,
"total_passed": total_passed,
"overall_success_rate": total_passed / total_samples if total_samples > 0 else 0,
"pass_at_k_summary": {f"pass@{k}": f"{v:.1%}" for k, v in pass_at_k.items()}
}
from abc import ABC, abstractmethod
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
class CodeGenerationModel(ABC):
"""代码生成模型抽象基类"""
@abstractmethod
def generate_code(self, prompt: str, num_samples: int = 1, temperature: float = 0.0) -> List[str]:
"""生成代码"""
pass
@abstractmethod
def get_model_info(self) -> Dict[str, Any]:
"""获取模型信息"""
pass
class OpenAICodeModel(CodeGenerationModel):
"""OpenAI代码生成模型"""
def __init__(self, model_name: str = "gpt-4", api_key: str = None):
self.model_name = model_name
self.client = OpenAI(api_key=api_key)
def generate_code(self, prompt: str, num_samples: int = 1, temperature: float = 0.0) -> List[str]:
"""生成代码样本"""
responses = []
for _ in range(num_samples):
try:
response = self.client.chat.completions.create(
model=self.model_name,
messages=[
{
"role": "system",
"content": "你是一个专业的Python程序员。请根据给定的函数签名和文档字符串完成函数实现。只返回函数体代码,不要包含函数定义。"
},
{
"role": "user",
"content": prompt
}
],
temperature=temperature,
max_tokens=500,
stop=["def ", "class ", "import ", "from "]
)
code = response.choices[0].message.content.strip()
responses.append(code)
except Exception as e:
print(f"代码生成错误: {e}")
responses.append(" pass # Error in generation")
return responses
def get_model_info(self) -> Dict[str, Any]:
return {
"model_name": self.model_name,
"provider": "OpenAI",
"type": "chat_completion"
}
class HumanEvalBenchmark:
"""HumanEval基准测试主类"""
def __init__(self, dataset_path: str = None):
self.dataset = HumanEvalDataset(dataset_path)
self.executor = HumanEvalExecutor(self.dataset)
self.results_history = []
def evaluate_model(self,
model: CodeGenerationModel,
num_samples_per_problem: int = 1,
temperature: float = 0.0,
k_values: List[int] = [1, 10, 100]) -> Dict[str, Any]:
"""评估单个模型"""
print(f"开始评估模型: {model.get_model_info()['model_name']}")
start_time = time.time()
# 生成所有问题的解决方案
solutions = {}
for i, (task_id, problem) in enumerate(self.dataset.problems.items()):
print(f"生成问题 {i+1}/{len(self.dataset.problems)}: {task_id}")
try:
code_samples = model.generate_code(
prompt=problem.prompt,
num_samples=num_samples_per_problem,
temperature=temperature
)
solutions[task_id] = code_samples
except Exception as e:
print(f"生成失败 {task_id}: {e}")
solutions[task_id] = [" pass # Generation failed"] * num_samples_per_problem
# 执行评估
print("执行代码测试...")
evaluation_results = self.executor.batch_evaluate(solutions, k_values)
# 添加元数据
evaluation_results["metadata"] = {
"model_info": model.get_model_info(),
"evaluation_time": time.time() - start_time,
"num_samples_per_problem": num_samples_per_problem,
"temperature": temperature,
"timestamp": time.time()
}
# 保存到历史记录
self.results_history.append(evaluation_results)
return evaluation_results
def compare_models(self, models: List[CodeGenerationModel], **eval_kwargs) -> Dict[str, Any]:
"""比较多个模型"""
comparison_results = {}
for model in models:
model_name = model.get_model_info()["model_name"]
results = self.evaluate_model(model, **eval_kwargs)
comparison_results[model_name] = results
# 生成比较报告
comparison_report = self.generate_comparison_report(comparison_results)
return {
"individual_results": comparison_results,
"comparison_report": comparison_report
}
def generate_comparison_report(self, results: Dict[str, Any]) -> Dict[str, Any]:
"""生成模型比较报告"""
report = {
"model_rankings": {},
"performance_summary": {},
"detailed_comparison": {}
}
# 提取pass@k结果
model_scores = {}
for model_name, result in results.items():
pass_at_k = result["pass_at_k"]
model_scores[model_name] = pass_at_k
report["performance_summary"][model_name] = {
"pass@1": f"{pass_at_k.get(1, 0):.1%}",
"pass@10": f"{pass_at_k.get(10, 0):.1%}",
"pass@100": f"{pass_at_k.get(100, 0):.1%}",
"evaluation_time": f"{result['metadata']['evaluation_time']:.1f}s"
}
# 按pass@1排名
if 1 in next(iter(model_scores.values()), {}):
sorted_models = sorted(
model_scores.items(),
key=lambda x: x[1].get(1, 0),
reverse=True
)
report["model_rankings"]["pass@1"] = [
{"rank": i+1, "model": model, "score": f"{score.get(1, 0):.1%}"}
for i, (model, score) in enumerate(sorted_models)
]
return report
def export_results(self, results: Dict[str, Any], output_path: str):
"""导出评估结果"""
import json
# 序列化结果
serializable_results = self.make_serializable(results)
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(serializable_results, f, ensure_ascii=False, indent=2)
print(f"评估结果已导出至: {output_path}")
def make_serializable(self, obj):
"""使对象可序列化"""
if isinstance(obj, dict):
return {k: self.make_serializable(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [self.make_serializable(item) for item in obj]
elif hasattr(obj, '__dict__'):
return self.make_serializable(obj.__dict__)
else:
return obj
# 使用示例
def main():
# 初始化基准测试
benchmark = HumanEvalBenchmark()
# 创建模型实例
models = [
OpenAICodeModel("gpt-4"),
OpenAICodeModel("gpt-3.5-turbo")
]
# 评估模型
results = benchmark.compare_models(
models=models,
num_samples_per_problem=10,
temperature=0.2,
k_values=[1, 5, 10]
)
# 导出结果
benchmark.export_results(results, "humaneval_comparison.json")
# 打印摘要
for model_name, summary in results["comparison_report"]["performance_summary"].items():
print(f"\n{model_name}:")
for metric, value in summary.items():
print(f" {metric}: {value}")
if __name__ == "__main__":
main()
import math
from scipy.special import comb
def calculate_pass_at_k_unbiased(n: int, c: int, k: int) -> float:
"""
计算无偏的pass@k估计
Args:
n: 生成的样本总数
c: 通过测试的样本数
k: 考虑的样本数
Returns:
pass@k的无偏估计值
"""
if n - c < k:
return 1.0
# 使用组合数学计算
# pass@k = 1 - C(n-c, k) / C(n, k)
return 1.0 - comb(n - c, k) / comb(n, k)
class PassAtKCalculator:
"""Pass@k指标计算器"""
def __init__(self):
self.cache = {}
def calculate_problem_pass_at_k(self, results: List[bool], k: int) -> float:
"""计算单个问题的pass@k"""
n = len(results)
c = sum(results)
if k > n:
return 0.0
if c == 0:
return 0.0
if c >= k:
return 1.0
return calculate_pass_at_k_unbiased(n, c, k)
def calculate_dataset_pass_at_k(self, all_results: Dict[str, List[bool]], k: int) -> Dict[str, Any]:
"""计算整个数据集的pass@k"""
problem_scores = {}
total_score = 0
for problem_id, results in all_results.items():
score = self.calculate_problem_pass_at_k(results, k)
problem_scores[problem_id] = score
total_score += score
avg_score = total_score / len(all_results) if all_results else 0
return {
"pass_at_k": avg_score,
"k": k,
"problem_scores": problem_scores,
"total_problems": len(all_results),
"distribution": self.analyze_score_distribution(problem_scores)
}
def analyze_score_distribution(self, scores: Dict[str, float]) -> Dict[str, Any]:
"""分析分数分布"""
values = list(scores.values())
if not values:
return {}
return {
"mean": sum(values) / len(values),
"median": sorted(values)[len(values) // 2],
"min": min(values),
"max": max(values),
"perfect_score_count": sum(1 for v in values if v == 1.0),
"zero_score_count": sum(1 for v in values if v == 0.0)
}
def benchmark_comparison_2025():
"""2025年主流模型HumanEval表现比较"""
# 基于搜索结果的2025年排行榜数据(示例)
model_performance = {
"Cursor+Claude 4.0": {"pass@1": 0.867, "domain": "科研领域"},
"DeepSeek-Coder": {"pass@1": 0.825, "domain": "通用编程"},
"GPT-4": {"pass@1": 0.813, "domain": "通用编程"},
"Claude-3.5-Sonnet": {"pass@1": 0.792, "domain": "代码生成"},
"GPT-3.5-Turbo": {"pass@1": 0.748, "domain": "轻量化"},
"CodeLlama-34B": {"pass@1": 0.736, "domain": "开源模型"},
"Llama-3.1-70B": {"pass@1": 0.712, "domain": "开源通用"},
}
print("2025年HumanEval排行榜 (Pass@1):")
print("-" * 50)
sorted_models = sorted(
model_performance.items(),
key=lambda x: x[1]["pass@1"],
reverse=True
)
for i, (model, data) in enumerate(sorted_models, 1):
print(f"{i:2d}. {model:<20} {data['pass@1']:.1%} ({data['domain']})")
return model_performance
class HumanEvalErrorAnalyzer:
"""HumanEval错误分析器"""
def __init__(self):
self.error_categories = {
"syntax_error": "语法错误",
"runtime_error": "运行时错误",
"logic_error": "逻辑错误",
"timeout": "超时错误",
"incomplete": "代码不完整"
}
def analyze_failures(self, evaluation_results: Dict[str, Any]) -> Dict[str, Any]:
"""分析失败案例"""
failure_analysis = {
"error_distribution": {},
"problem_difficulty": {},
"common_patterns": [],
"improvement_suggestions": []
}
all_failures = []
# 收集所有失败案例
for task_id, task_results in evaluation_results["detailed_results"].items():
failed_attempts = [r for r in task_results if not r["passed"]]
for failure in failed_attempts:
failure_info = {
"task_id": task_id,
"error_type": self.classify_error(failure),
"generated_code": failure.get("generated_code", ""),
"execution_result": failure.get("execution_result", {})
}
all_failures.append(failure_info)
# 统计错误分布
error_counts = {}
for failure in all_failures:
error_type = failure["error_type"]
error_counts[error_type] = error_counts.get(error_type, 0) + 1
failure_analysis["error_distribution"] = {
error_type: {
"count": count,
"percentage": count / len(all_failures) * 100
}
for error_type, count in error_counts.items()
}
# 分析问题难度
failure_analysis["problem_difficulty"] = self.analyze_problem_difficulty(
evaluation_results["detailed_results"]
)
# 识别常见错误模式
failure_analysis["common_patterns"] = self.identify_common_patterns(all_failures)
return failure_analysis
def classify_error(self, failure_result: Dict[str, Any]) -> str:
"""分类错误类型"""
execution_result = failure_result.get("execution_result", {})
if execution_result.get("status") == "syntax_error":
return "syntax_error"
elif execution_result.get("status") == "timeout":
return "timeout"
elif "stderr" in execution_result and execution_result["stderr"]:
stderr = execution_result["stderr"].lower()
if "syntaxerror" in stderr:
return "syntax_error"
elif any(error in stderr for error in ["nameerror", "attributeerror", "typeerror"]):
return "runtime_error"
else:
return "runtime_error"
elif execution_result.get("status") == "failed":
return "logic_error"
else:
return "incomplete"
def analyze_problem_difficulty(self, detailed_results: Dict[str, List[Dict]]) -> Dict[str, Any]:
"""分析问题难度"""
problem_success_rates = {}
for task_id, results in detailed_results.items():
total_attempts = len(results)
successful_attempts = sum(1 for r in results if r["passed"])
success_rate = successful_attempts / total_attempts if total_attempts > 0 else 0
problem_success_rates[task_id] = success_rate
# 按难度分类
difficulty_levels = {
"简单": [tid for tid, rate in problem_success_rates.items() if rate > 0.8],
"中等": [tid for tid, rate in problem_success_rates.items() if 0.4 <= rate <= 0.8],
"困难": [tid for tid, rate in problem_success_rates.items() if rate < 0.4]
}
return {
"success_rates": problem_success_rates,
"difficulty_levels": difficulty_levels,
"hardest_problems": sorted(
problem_success_rates.items(),
key=lambda x: x[1]
)[:10] # 最困难的10个问题
}
def identify_common_patterns(self, failures: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""识别常见错误模式"""
patterns = []
# 分析常见语法错误
syntax_errors = [f for f in failures if f["error_type"] == "syntax_error"]
if syntax_errors:
patterns.append({
"type": "syntax_patterns",
"description": "常见语法错误",
"count": len(syntax_errors),
"examples": [f["execution_result"].get("stderr", "")[:100] for f in syntax_errors[:3]]
})
# 分析缩进问题
indentation_errors = [
f for f in failures
if "indentationerror" in f["execution_result"].get("stderr", "").lower()
]
if indentation_errors:
patterns.append({
"type": "indentation_errors",
"description": "缩进错误",
"count": len(indentation_errors),
"suggestion": "模型需要更好地处理Python缩进规则"
})
return patterns
class NextGenCodeEval:
"""下一代代码评估框架"""
def __init__(self):
self.evaluation_dimensions = {
"functionality": "功能正确性",
"efficiency": "执行效率",
"readability": "代码可读性",
"maintainability": "可维护性",
"security": "安全性"
}
def comprehensive_evaluation(self, code: str, problem_spec: Dict) -> Dict[str, Any]:
"""综合评估代码质量"""
results = {}
# 功能正确性 (传统HumanEval方式)
results["functionality"] = self.test_functionality(code, problem_spec)
# 效率评估
results["efficiency"] = self.analyze_efficiency(code)
# 可读性评估
results["readability"] = self.analyze_readability(code)
# 安全性检查
results["security"] = self.security_check(code)
return results
def analyze_efficiency(self, code: str) -> Dict[str, Any]:
"""分析代码效率"""
# 这里可以集成复杂度分析工具
return {
"time_complexity": "估算时间复杂度",
"space_complexity": "估算空间复杂度",
"execution_time": "实际执行时间"
}
def analyze_readability(self, code: str) -> Dict[str, Any]:
"""分析代码可读性"""
# 可以使用pylint、flake8等工具
return {
"style_score": 85,
"naming_convention": "良好",
"comment_coverage": "需要改进"
}