Claude官方开发工具包,支持Claude 4、推理模式、工具调用等最新功能
import anthropic
import asyncio
from typing import List, Dict, Any, AsyncGenerator
import json
class AnthropicClient:
"""Anthropic Claude客户端"""
def __init__(self, api_key: str = None):
self.client = anthropic.Anthropic(api_key=api_key)
self.async_client = anthropic.AsyncAnthropic(api_key=api_key)
def create_message(self,
messages: List[Dict[str, str]],
model: str = "claude-3-5-sonnet-20241022",
max_tokens: int = 4000,
temperature: float = 0.0,
system: str = None) -> Dict[str, Any]:
"""创建消息"""
try:
response = self.client.messages.create(
model=model,
max_tokens=max_tokens,
temperature=temperature,
system=system,
messages=messages
)
return {
"content": response.content[0].text,
"usage": {
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"cache_creation_input_tokens": getattr(response.usage, 'cache_creation_input_tokens', 0),
"cache_read_input_tokens": getattr(response.usage, 'cache_read_input_tokens', 0)
},
"model": response.model,
"stop_reason": response.stop_reason,
"stop_sequence": response.stop_sequence
}
except anthropic.RateLimitError as e:
return {"error": "rate_limit", "message": str(e)}
except anthropic.AuthenticationError as e:
return {"error": "auth", "message": str(e)}
except Exception as e:
return {"error": "general", "message": str(e)}
async def async_create_message(self,
messages: List[Dict[str, str]],
model: str = "claude-3-5-sonnet-20241022",
**kwargs) -> Dict[str, Any]:
"""异步创建消息"""
try:
response = await self.async_client.messages.create(
model=model,
messages=messages,
**kwargs
)
return {
"content": response.content[0].text,
"usage": response.usage.__dict__,
"model": response.model
}
except Exception as e:
return {"error": str(e)}
def stream_message(self,
messages: List[Dict[str, str]],
model: str = "claude-3-5-sonnet-20241022",
**kwargs):
"""流式消息生成"""
try:
with self.client.messages.stream(
model=model,
messages=messages,
**kwargs
) as stream:
for text in stream.text_stream:
yield text
except Exception as e:
yield f"Error: {str(e)}"
async def async_stream_message(self,
messages: List[Dict[str, str]],
model: str = "claude-3-5-sonnet-20241022",
**kwargs) -> AsyncGenerator[str, None]:
"""异步流式消息生成"""
try:
async with self.async_client.messages.stream(
model=model,
messages=messages,
**kwargs
) as stream:
async for text in stream.text_stream:
yield text
except Exception as e:
yield f"Error: {str(e)}"
# 使用示例
client = AnthropicClient()
# 同步调用
response = client.create_message([
{"role": "user", "content": "解释量子计算的基本原理"}
])
# 流式输出
print("流式输出:")
for chunk in client.stream_message([
{"role": "user", "content": "写一个Python排序算法"}
]):
print(chunk, end='', flush=True)
# 异步调用
async def async_example():
response = await client.async_create_message([
{"role": "user", "content": "介绍机器学习的发展历程"}
])
return response
# asyncio.run(async_example())
import Anthropic from '@anthropic-ai/sdk';
class AnthropicJSClient {
constructor(apiKey) {
this.client = new Anthropic({
apiKey: apiKey,
// 2025年新增配置选项
timeout: 600000, // 10分钟超时
maxRetries: 3
});
}
async createMessage(messages, options = {}) {
try {
const response = await this.client.messages.create({
model: options.model || 'claude-3-5-sonnet-20241022',
max_tokens: options.maxTokens || 4000,
temperature: options.temperature || 0,
system: options.system,
messages: messages,
// 2025年新增功能
extra_headers: options.extraHeaders || {},
tools: options.tools || undefined
});
return {
content: response.content[0].text,
usage: response.usage,
model: response.model,
stopReason: response.stop_reason
};
} catch (error) {
if (error instanceof Anthropic.APIError) {
return {
error: 'api_error',
message: error.message,
status: error.status
};
}
throw error;
}
}
async streamMessage(messages, options = {}, onChunk) {
const stream = await this.client.messages.create({
model: options.model || 'claude-3-5-sonnet-20241022',
max_tokens: options.maxTokens || 4000,
messages: messages,
stream: true
});
let fullResponse = '';
for await (const messageStreamEvent of stream) {
if (messageStreamEvent.type === 'content_block_delta') {
const chunk = messageStreamEvent.delta.text;
if (chunk) {
fullResponse += chunk;
if (onChunk) {
onChunk(chunk, fullResponse);
}
}
}
}
return fullResponse;
}
// 2025年新增:推理模式支持
async createReasoningMessage(messages, thinkingBudget = 20000) {
return await this.client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 4000,
messages: messages,
// 启用推理模式
extra_headers: {
'anthropic-beta': 'thinking-2024-12-05'
},
thinking_budget: thinkingBudget
});
}
// 2025年新增:扩展缓存支持
async createCachedMessage(messages, cacheControlBlocks) {
return await this.client.messages.create({
model: 'claude-3-5-sonnet-20241022',
max_tokens: 4000,
messages: messages,
// 启用1小时扩展缓存
extra_headers: {
'anthropic-beta': 'extended-cache-ttl-2025-04-11'
},
cache_control: cacheControlBlocks
});
}
}
// 使用示例
const client = new AnthropicJSClient(process.env.ANTHROPIC_API_KEY);
// 标准调用
const response = await client.createMessage([
{ role: 'user', content: 'Hello Claude!' }
]);
// 流式调用
await client.streamMessage([
{ role: 'user', content: '写一篇关于AI的文章' }
], {}, (chunk, fullResponse) => {
process.stdout.write(chunk);
});
// 推理模式调用
const reasoningResponse = await client.createReasoningMessage([
{ role: 'user', content: '解决这个复杂的数学问题:...' }
], 30000);
# Claude Code SDK - 2025年新增
import subprocess
import json
from pathlib import Path
class ClaudeCodeSDK:
"""Claude Code SDK集成"""
def __init__(self, api_key: str):
self.api_key = api_key
self.setup_claude_code()
def setup_claude_code(self):
"""设置Claude Code SDK环境"""
# 安装Claude Code SDK
try:
subprocess.run(['npm', 'install', '-g', '@anthropic-ai/claude-code'],
check=True, capture_output=True)
print("Claude Code SDK已安装")
except subprocess.CalledProcessError as e:
print(f"安装失败: {e}")
async def query_with_tools(self, prompt: str, max_turns: int = 3, tools: List[str] = None):
"""使用工具进行查询"""
from anthropic_ai_claude_code import query, SDKMessage
messages = []
async for message in query({
"prompt": prompt,
"abortController": None, # 可以添加取消控制
"options": {
"maxTurns": max_turns,
"allowedTools": tools or [],
"model": "claude-3-5-sonnet-20241022"
},
}):
messages.append(message)
return messages
def create_project_assistant(self, project_path: str):
"""创建项目助手"""
return ProjectAssistant(project_path, self.api_key)
class ProjectAssistant:
"""项目级别的Claude助手"""
def __init__(self, project_path: str, api_key: str):
self.project_path = Path(project_path)
self.client = AnthropicClient(api_key)
self.context_cache = {}
def analyze_codebase(self):
"""分析代码库"""
files_content = {}
# 扫描Python文件
for py_file in self.project_path.rglob("*.py"):
if py_file.is_file() and py_file.stat().st_size < 100000: # 100KB限制
try:
with open(py_file, 'r', encoding='utf-8') as f:
files_content[str(py_file.relative_to(self.project_path))] = f.read()
except:
continue
return files_content
def code_review(self, file_path: str, changes: str = None):
"""代码审查"""
if changes:
content = changes
else:
with open(self.project_path / file_path, 'r') as f:
content = f.read()
system_prompt = """你是一个专业的代码审查专家。请审查以下代码并提供:
1. 代码质量评估
2. 潜在bug和问题
3. 性能优化建议
4. 最佳实践建议
5. 安全性考虑"""
response = self.client.create_message(
messages=[{
"role": "user",
"content": f"请审查以下代码文件 {file_path}:\n\n```python\n{content}\n```"
}],
system=system_prompt
)
return response["content"]
def generate_tests(self, file_path: str):
"""生成测试代码"""
with open(self.project_path / file_path, 'r') as f:
source_code = f.read()
system_prompt = """你是一个测试专家。请为给定的Python代码生成全面的pytest测试用例,包括:
1. 正常情况测试
2. 边界情况测试
3. 异常情况测试
4. Mock测试(如需要)"""
response = self.client.create_message(
messages=[{
"role": "user",
"content": f"为以下代码生成测试用例:\n\n```python\n{source_code}\n```"
}],
system=system_prompt
)
return response["content"]
def explain_code(self, file_path: str, line_start: int = None, line_end: int = None):
"""解释代码"""
with open(self.project_path / file_path, 'r') as f:
lines = f.readlines()
if line_start and line_end:
code_snippet = ''.join(lines[line_start-1:line_end])
context = f"代码片段 (第{line_start}-{line_end}行)"
else:
code_snippet = ''.join(lines)
context = "完整文件"
response = self.client.create_message(
messages=[{
"role": "user",
"content": f"请详细解释以下{context}的代码逻辑:\n\n```python\n{code_snippet}\n```"
}]
)
return response["content"]
# 使用示例
code_sdk = ClaudeCodeSDK(api_key="your-api-key")
assistant = code_sdk.create_project_assistant("./my_project")
# 代码审查
review_result = assistant.code_review("src/main.py")
print("代码审查结果:", review_result)
# 生成测试
test_code = assistant.generate_tests("src/utils.py")
print("生成的测试代码:", test_code)
from typing import Callable, Dict, Any, List
import json
class AnthropicToolHandler:
"""Anthropic工具调用处理器"""
def __init__(self, client: AnthropicClient):
self.client = client
self.available_tools = {}
self.mcp_servers = {}
def register_tool(self, name: str, func: Callable, description: str, parameters: Dict):
"""注册工具函数"""
self.available_tools[name] = {
"function": func,
"description": description,
"parameters": parameters
}
def register_mcp_server(self, server_name: str, server_config: Dict):
"""注册MCP服务器"""
self.mcp_servers[server_name] = server_config
def get_tool_definitions(self) -> List[Dict]:
"""获取工具定义"""
tools = []
for name, tool_info in self.available_tools.items():
tools.append({
"name": name,
"description": tool_info["description"],
"input_schema": {
"type": "object",
"properties": tool_info["parameters"]["properties"],
"required": tool_info["parameters"].get("required", [])
}
})
return tools
def execute_tool(self, tool_name: str, arguments: Dict[str, Any]) -> Dict[str, Any]:
"""执行工具调用"""
if tool_name not in self.available_tools:
return {"error": f"Tool {tool_name} not found"}
try:
func = self.available_tools[tool_name]["function"]
result = func(**arguments)
return {"result": result}
except Exception as e:
return {"error": str(e)}
def chat_with_tools(self, messages: List[Dict[str, str]], model: str = "claude-3-5-sonnet-20241022"):
"""支持工具调用的对话"""
tools = self.get_tool_definitions()
# 第一次调用 - 可能触发工具使用
response = self.client.client.messages.create(
model=model,
max_tokens=4000,
messages=messages,
tools=tools
)
# 检查是否有工具调用
if response.content and hasattr(response.content[-1], 'type') and response.content[-1].type == "tool_use":
# 处理工具调用
tool_calls = [block for block in response.content if hasattr(block, 'type') and block.type == "tool_use"]
# 执行工具并收集结果
tool_results = []
for tool_call in tool_calls:
tool_name = tool_call.name
tool_args = tool_call.input
result = self.execute_tool(tool_name, tool_args)
tool_results.append({
"type": "tool_result",
"tool_use_id": tool_call.id,
"content": json.dumps(result)
})
# 将工具结果添加到对话
messages.append({
"role": "assistant",
"content": response.content
})
messages.append({
"role": "user",
"content": tool_results
})
# 再次调用获取最终回答
final_response = self.client.client.messages.create(
model=model,
max_tokens=4000,
messages=messages
)
return final_response.content[0].text
return response.content[0].text
# 示例工具函数
def get_weather(city: str, date: str = None) -> str:
"""获取天气信息"""
return f"{city}今天天气晴朗,温度25°C"
def search_web(query: str, num_results: int = 5) -> List[str]:
"""网络搜索"""
return [f"搜索结果 {i}: {query} 相关内容" for i in range(num_results)]
def execute_code(code: str, language: str = "python") -> Dict[str, Any]:
"""执行代码"""
# 这里应该有安全的代码执行环境
return {
"output": "代码执行结果",
"success": True
}
# 使用示例
client = AnthropicClient()
tool_handler = AnthropicToolHandler(client)
# 注册工具
tool_handler.register_tool(
name="get_weather",
func=get_weather,
description="获取指定城市的天气信息",
parameters={
"properties": {
"city": {"type": "string", "description": "城市名称"},
"date": {"type": "string", "description": "日期"}
},
"required": ["city"]
}
)
tool_handler.register_tool(
name="search_web",
func=search_web,
description="搜索网络信息",
parameters={
"properties": {
"query": {"type": "string", "description": "搜索查询"},
"num_results": {"type": "integer", "description": "结果数量"}
},
"required": ["query"]
}
)
# 测试工具调用
messages = [
{"role": "user", "content": "北京今天天气怎么样?然后帮我搜索一下最新的AI发展趋势"}
]
result = tool_handler.chat_with_tools(messages)
print(result)
import hashlib
import time
from typing import Optional
import redis
class AnthropicCacheManager:
"""Anthropic缓存管理器"""
def __init__(self, redis_url: str = "redis://localhost:6379"):
self.redis_client = redis.from_url(redis_url)
self.default_ttl = 3600 # 1小时
self.extended_ttl = 3600 * 24 # 24小时(2025年扩展缓存)
def generate_cache_key(self, messages: List[Dict], model: str, **kwargs) -> str:
"""生成缓存键"""
content = json.dumps({
"messages": messages,
"model": model,
**kwargs
}, sort_keys=True)
return f"anthropic:{hashlib.md5(content.encode()).hexdigest()}"
def get_cached_response(self, cache_key: str) -> Optional[Dict]:
"""获取缓存响应"""
try:
cached_data = self.redis_client.get(cache_key)
if cached_data:
return json.loads(cached_data)
except Exception as e:
print(f"缓存获取错误: {e}")
return None
def cache_response(self, cache_key: str, response: Dict, ttl: int = None):
"""缓存响应"""
try:
ttl = ttl or self.default_ttl
self.redis_client.setex(
cache_key,
ttl,
json.dumps(response)
)
except Exception as e:
print(f"缓存存储错误: {e}")
def create_cached_message(self,
client: AnthropicClient,
messages: List[Dict[str, str]],
model: str = "claude-3-5-sonnet-20241022",
use_extended_cache: bool = False,
**kwargs) -> Dict[str, Any]:
"""创建带缓存的消息"""
# 生成缓存键
cache_key = self.generate_cache_key(messages, model, **kwargs)
# 尝试从缓存获取
cached_response = self.get_cached_response(cache_key)
if cached_response:
cached_response["cached"] = True
return cached_response
# 缓存未命中,调用API
extra_headers = {}
ttl = self.default_ttl
if use_extended_cache:
# 使用2025年扩展缓存功能
extra_headers['anthropic-beta'] = 'extended-cache-ttl-2025-04-11'
ttl = self.extended_ttl
# API调用
try:
response = client.client.messages.create(
model=model,
max_tokens=kwargs.get('max_tokens', 4000),
temperature=kwargs.get('temperature', 0),
messages=messages,
extra_headers=extra_headers
)
result = {
"content": response.content[0].text,
"usage": response.usage.__dict__,
"model": response.model,
"cached": False,
"timestamp": time.time()
}
# 存储到缓存
self.cache_response(cache_key, result, ttl)
return result
except Exception as e:
return {"error": str(e), "cached": False}
class AnthropicOptimizer:
"""Anthropic性能优化器"""
def __init__(self, client: AnthropicClient):
self.client = client
self.cache_manager = AnthropicCacheManager()
self.request_stats = {"total": 0, "cached": 0, "errors": 0}
def optimized_create_message(self,
messages: List[Dict[str, str]],
enable_cache: bool = True,
use_extended_cache: bool = False,
**kwargs) -> Dict[str, Any]:
"""优化的消息创建"""
self.request_stats["total"] += 1
if enable_cache:
result = self.cache_manager.create_cached_message(
self.client,
messages,
use_extended_cache=use_extended_cache,
**kwargs
)
if result.get("cached"):
self.request_stats["cached"] += 1
if "error" in result:
self.request_stats["errors"] += 1
return result
else:
# 直接API调用
return self.client.create_message(messages, **kwargs)
def get_performance_stats(self) -> Dict[str, Any]:
"""获取性能统计"""
total = self.request_stats["total"]
if total == 0:
return {"message": "No requests made yet"}
cache_hit_rate = self.request_stats["cached"] / total
error_rate = self.request_stats["errors"] / total
return {
"total_requests": total,
"cached_requests": self.request_stats["cached"],
"cache_hit_rate": f"{cache_hit_rate:.2%}",
"error_requests": self.request_stats["errors"],
"error_rate": f"{error_rate:.2%}",
"api_requests": total - self.request_stats["cached"]
}
def batch_process_messages(self,
message_batches: List[List[Dict[str, str]]],
concurrent_limit: int = 5) -> List[Dict[str, Any]]:
"""批量处理消息"""
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def process_batch(messages):
return await self.client.async_create_message(messages)
async def batch_runner():
semaphore = asyncio.Semaphore(concurrent_limit)
async def limited_process(messages):
async with semaphore:
return await process_batch(messages)
tasks = [limited_process(messages) for messages in message_batches]
return await asyncio.gather(*tasks)
return asyncio.run(batch_runner())
# 使用示例
client = AnthropicClient()
optimizer = AnthropicOptimizer(client)
# 使用缓存的消息创建
response = optimizer.optimized_create_message(
messages=[{"role": "user", "content": "什么是量子计算?"}],
enable_cache=True,
use_extended_cache=True
)
print("响应:", response["content"])
print("是否使用缓存:", response.get("cached", False))
# 查看性能统计
stats = optimizer.get_performance_stats()
print("性能统计:", stats)