OpenAI官方开发工具包,支持GPT、DALL-E、Whisper等模型的完整API接口
import openai
from openai import OpenAI
import asyncio
from typing import List, Dict, Any
class OpenAIClient:
def __init__(self, api_key: str = None):
self.client = OpenAI(api_key=api_key)
self.async_client = openai.AsyncOpenAI(api_key=api_key)
def chat_completion(self, messages: List[Dict], model: str = "gpt-4"):
"""同步聊天完成"""
try:
response = self.client.chat.completions.create(
model=model,
messages=messages,
temperature=0.7,
max_tokens=2000,
top_p=1.0,
frequency_penalty=0.0,
presence_penalty=0.0
)
return {
"content": response.choices[0].message.content,
"usage": {
"prompt_tokens": response.usage.prompt_tokens,
"completion_tokens": response.usage.completion_tokens,
"total_tokens": response.usage.total_tokens
},
"model": response.model,
"finish_reason": response.choices[0].finish_reason
}
except openai.RateLimitError as e:
return {"error": "rate_limit", "message": str(e)}
except openai.AuthenticationError as e:
return {"error": "auth", "message": str(e)}
except Exception as e:
return {"error": "general", "message": str(e)}
async def async_chat_completion(self, messages: List[Dict], model: str = "gpt-4"):
"""异步聊天完成"""
try:
response = await self.async_client.chat.completions.create(
model=model,
messages=messages,
temperature=0.7,
max_tokens=2000
)
return {
"content": response.choices[0].message.content,
"usage": dict(response.usage),
"model": response.model
}
except Exception as e:
return {"error": str(e)}
# 使用示例
client = OpenAIClient()
# 同步调用
response = client.chat_completion([
{"role": "system", "content": "你是一个有用的AI助手。"},
{"role": "user", "content": "解释什么是机器学习"}
])
# 异步调用
async def main():
response = await client.async_chat_completion([
{"role": "user", "content": "介绍Python的优势"}
])
return response
# asyncio.run(main())
import OpenAI from 'openai';
class OpenAIJSClient {
constructor(apiKey) {
this.client = new OpenAI({
apiKey: apiKey,
dangerouslyAllowBrowser: false // 生产环境设为false
});
}
async chatCompletion(messages, options = {}) {
try {
const completion = await this.client.chat.completions.create({
model: options.model || 'gpt-4',
messages: messages,
temperature: options.temperature || 0.7,
max_tokens: options.maxTokens || 2000,
stream: options.stream || false
});
if (options.stream) {
return completion; // 返回流对象
}
return {
content: completion.choices[0].message.content,
usage: completion.usage,
model: completion.model
};
} catch (error) {
if (error instanceof OpenAI.APIError) {
return {
error: 'api_error',
message: error.message,
status: error.status
};
}
throw error;
}
}
async streamingCompletion(messages, onChunk) {
const stream = await this.client.chat.completions.create({
model: 'gpt-4',
messages: messages,
stream: true,
temperature: 0.7
});
let fullResponse = '';
for await (const chunk of stream) {
const content = chunk.choices[0]?.delta?.content || '';
if (content) {
fullResponse += content;
if (onChunk) {
onChunk(content, fullResponse);
}
}
}
return fullResponse;
}
}
// 使用示例
const client = new OpenAIJSClient(process.env.OPENAI_API_KEY);
// 标准调用
const response = await client.chatCompletion([
{ role: 'user', content: 'Hello, how are you?' }
]);
// 流式调用
await client.streamingCompletion([
{ role: 'user', content: '写一个关于AI的故事' }
], (chunk, fullResponse) => {
process.stdout.write(chunk);
});
import openai
from typing import Generator, Iterator
class StreamingHandler:
def __init__(self, client: OpenAI):
self.client = client
def stream_chat_completion(self, messages: List[Dict], model: str = "gpt-4") -> Generator[str, None, None]:
"""生成器模式的流式输出"""
try:
stream = self.client.chat.completions.create(
model=model,
messages=messages,
stream=True,
temperature=0.7
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"Error: {str(e)}"
def stream_with_callback(self, messages: List[Dict], callback_func, model: str = "gpt-4"):
"""回调模式的流式输出"""
full_response = ""
try:
stream = self.client.chat.completions.create(
model=model,
messages=messages,
stream=True
)
for chunk in stream:
if chunk.choices[0].delta.content is not None:
content = chunk.choices[0].delta.content
full_response += content
# 调用回调函数
callback_func(
chunk_content=content,
full_content=full_response,
chunk_data=chunk
)
except Exception as e:
callback_func(error=str(e))
return full_response
async def async_stream_completion(self, messages: List[Dict]) -> AsyncGenerator[str, None]:
"""异步流式输出"""
try:
async_client = openai.AsyncOpenAI()
stream = await async_client.chat.completions.create(
model="gpt-4",
messages=messages,
stream=True
)
async for chunk in stream:
if chunk.choices[0].delta.content is not None:
yield chunk.choices[0].delta.content
except Exception as e:
yield f"Error: {str(e)}"
# 使用示例
client = OpenAI()
streaming_handler = StreamingHandler(client)
# 生成器模式
messages = [{"role": "user", "content": "写一首关于春天的诗"}]
print("流式输出开始:")
for chunk in streaming_handler.stream_chat_completion(messages):
print(chunk, end='', flush=True)
# 回调模式
def on_stream_chunk(chunk_content=None, full_content=None, chunk_data=None, error=None):
if error:
print(f"错误: {error}")
else:
print(chunk_content, end='', flush=True)
# 可以在这里添加更多逻辑
if len(full_content) > 100: # 例如:每100个字符保存一次
save_to_cache(full_content)
streaming_handler.stream_with_callback(messages, on_stream_chunk)
import json
from typing import List, Dict, Callable
class FunctionCallingHandler:
def __init__(self, client: OpenAI):
self.client = client
self.available_functions = {}
def register_function(self, name: str, func: Callable, description: str, parameters: Dict):
"""注册可调用函数"""
self.available_functions[name] = {
"function": func,
"description": description,
"parameters": parameters
}
def get_function_definitions(self) -> List[Dict]:
"""获取函数定义列表"""
functions = []
for name, func_info in self.available_functions.items():
functions.append({
"type": "function",
"function": {
"name": name,
"description": func_info["description"],
"parameters": func_info["parameters"]
}
})
return functions
def execute_function_call(self, function_name: str, arguments: str):
"""执行函数调用"""
if function_name not in self.available_functions:
return {"error": f"Function {function_name} not found"}
try:
# 解析参数
args = json.loads(arguments)
# 执行函数
func = self.available_functions[function_name]["function"]
result = func(**args)
return {"result": result}
except Exception as e:
return {"error": str(e)}
def chat_with_functions(self, messages: List[Dict], model: str = "gpt-4"):
"""支持函数调用的聊天"""
tools = self.get_function_definitions()
response = self.client.chat.completions.create(
model=model,
messages=messages,
tools=tools,
tool_choice="auto"
)
# 检查是否需要函数调用
response_message = response.choices[0].message
if response_message.tool_calls:
# 执行函数调用
messages.append(response_message)
for tool_call in response_message.tool_calls:
function_name = tool_call.function.name
function_args = tool_call.function.arguments
# 执行函数
function_result = self.execute_function_call(function_name, function_args)
# 添加函数结果到消息
messages.append({
"tool_call_id": tool_call.id,
"role": "tool",
"content": json.dumps(function_result)
})
# 再次调用GPT获取最终回答
final_response = self.client.chat.completions.create(
model=model,
messages=messages
)
return final_response.choices[0].message.content
return response_message.content
# 示例函数
def get_weather(city: str, date: str = None):
"""获取天气信息"""
# 这里应该调用真实的天气API
return f"{city}今天天气晴朗,温度25°C"
def calculate(expression: str):
"""计算数学表达式"""
try:
result = eval(expression) # 生产环境需要使用更安全的计算方法
return str(result)
except:
return "计算错误"
# 使用示例
client = OpenAI()
function_handler = FunctionCallingHandler(client)
# 注册函数
function_handler.register_function(
name="get_weather",
func=get_weather,
description="获取指定城市的天气信息",
parameters={
"type": "object",
"properties": {
"city": {
"type": "string",
"description": "城市名称"
},
"date": {
"type": "string",
"description": "日期,格式YYYY-MM-DD"
}
},
"required": ["city"]
}
)
function_handler.register_function(
name="calculate",
func=calculate,
description="计算数学表达式",
parameters={
"type": "object",
"properties": {
"expression": {
"type": "string",
"description": "要计算的数学表达式"
}
},
"required": ["expression"]
}
)
# 测试函数调用
messages = [
{"role": "user", "content": "北京今天天气怎么样?另外帮我算一下 15 * 8 等于多少?"}
]
result = function_handler.chat_with_functions(messages)
print(result)
import base64
import requests
from PIL import Image
import io
class MultiModalHandler:
def __init__(self, client: OpenAI):
self.client = client
def encode_image(self, image_path: str) -> str:
"""将图片编码为base64"""
with open(image_path, "rb") as image_file:
return base64.b64encode(image_file.read()).decode('utf-8')
def vision_chat(self, text_prompt: str, image_path: str = None, image_url: str = None):
"""视觉聊天 - 支持图片理解"""
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": text_prompt}
]
}
]
# 添加图片内容
if image_path:
base64_image = self.encode_image(image_path)
messages[0]["content"].append({
"type": "image_url",
"image_url": {
"url": f"data:image/jpeg;base64,{base64_image}",
"detail": "high"
}
})
elif image_url:
messages[0]["content"].append({
"type": "image_url",
"image_url": {
"url": image_url,
"detail": "high"
}
})
response = self.client.chat.completions.create(
model="gpt-4-vision-preview",
messages=messages,
max_tokens=1000
)
return response.choices[0].message.content
def generate_image(self, prompt: str, size: str = "1024x1024", quality: str = "standard"):
"""使用DALL-E生成图片"""
try:
response = self.client.images.generate(
model="dall-e-3",
prompt=prompt,
size=size,
quality=quality,
n=1
)
return {
"image_url": response.data[0].url,
"revised_prompt": response.data[0].revised_prompt
}
except Exception as e:
return {"error": str(e)}
def speech_to_text(self, audio_file_path: str, language: str = None):
"""使用Whisper进行语音转文本"""
try:
with open(audio_file_path, "rb") as audio_file:
transcript = self.client.audio.transcriptions.create(
model="whisper-1",
file=audio_file,
language=language,
response_format="verbose_json",
timestamp_granularities=["word"]
)
return {
"text": transcript.text,
"language": transcript.language,
"duration": transcript.duration,
"words": transcript.words if hasattr(transcript, 'words') else None
}
except Exception as e:
return {"error": str(e)}
def text_to_speech(self, text: str, voice: str = "alloy", speed: float = 1.0):
"""使用TTS进行文本转语音"""
try:
response = self.client.audio.speech.create(
model="tts-1",
voice=voice, # alloy, echo, fable, onyx, nova, shimmer
input=text,
speed=speed
)
# 保存音频文件
audio_file_path = f"output_speech.mp3"
with open(audio_file_path, "wb") as f:
f.write(response.content)
return {"audio_file": audio_file_path}
except Exception as e:
return {"error": str(e)}
# 使用示例
client = OpenAI()
multimodal_handler = MultiModalHandler(client)
# 图片理解
result = multimodal_handler.vision_chat(
text_prompt="这张图片中有什么?请详细描述。",
image_path="./example_image.jpg"
)
print("图片分析结果:", result)
# 图片生成
image_result = multimodal_handler.generate_image(
prompt="一只可爱的橙色小猫坐在阳光明媚的花园里",
size="1024x1024",
quality="hd"
)
print("生成图片URL:", image_result["image_url"])
# 语音转文本
transcript_result = multimodal_handler.speech_to_text("./audio_sample.mp3")
print("转录结果:", transcript_result["text"])
# 文本转语音
tts_result = multimodal_handler.text_to_speech(
text="你好,这是一个AI生成的语音消息。",
voice="nova"
)
print("语音文件保存至:", tts_result["audio_file"])
import time
import random
from typing import Optional, Any
class RobustOpenAIClient:
def __init__(self, api_key: str, max_retries: int = 3, base_delay: float = 1.0):
self.client = OpenAI(api_key=api_key)
self.max_retries = max_retries
self.base_delay = base_delay
def exponential_backoff_retry(self, func, *args, **kwargs) -> Any:
"""指数退避重试机制"""
last_exception = None
for attempt in range(self.max_retries + 1):
try:
return func(*args, **kwargs)
except openai.RateLimitError as e:
last_exception = e
if attempt == self.max_retries:
break
# 处理速率限制
retry_after = getattr(e.response.headers, 'retry-after', None)
if retry_after:
delay = int(retry_after)
else:
delay = self.base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"速率限制,等待 {delay} 秒后重试...")
time.sleep(delay)
except openai.APIConnectionError as e:
last_exception = e
if attempt == self.max_retries:
break
delay = self.base_delay * (2 ** attempt)
print(f"连接错误,{delay} 秒后重试...")
time.sleep(delay)
except openai.AuthenticationError as e:
# 认证错误不重试
raise e
except Exception as e:
last_exception = e
if attempt == self.max_retries:
break
delay = self.base_delay * (2 ** attempt)
print(f"其他错误,{delay} 秒后重试: {e}")
time.sleep(delay)
raise last_exception
def safe_chat_completion(self, messages: List[Dict], **kwargs) -> Dict:
"""安全的聊天完成调用"""
def _call():
return self.client.chat.completions.create(
messages=messages,
**kwargs
)
try:
response = self.exponential_backoff_retry(_call)
return {
"success": True,
"content": response.choices[0].message.content,
"usage": dict(response.usage),
"model": response.model
}
except Exception as e:
return {
"success": False,
"error": str(e),
"error_type": type(e).__name__
}
import logging
import time
import functools
from typing import Dict, Any
class OpenAIMonitor:
def __init__(self, logger_name: str = "openai_monitor"):
self.logger = logging.getLogger(logger_name)
self.setup_logging()
self.metrics = {
"total_requests": 0,
"successful_requests": 0,
"failed_requests": 0,
"total_tokens": 0,
"total_cost": 0.0
}
def setup_logging(self):
"""设置日志配置"""
handler = logging.StreamHandler()
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def monitor_request(self, func):
"""请求监控装饰器"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
self.metrics["total_requests"] += 1
try:
result = func(*args, **kwargs)
# 记录成功请求
self.metrics["successful_requests"] += 1
# 记录token使用情况
if isinstance(result, dict) and "usage" in result:
usage = result["usage"]
self.metrics["total_tokens"] += usage.get("total_tokens", 0)
# 估算成本(需要根据模型定价更新)
cost = self.estimate_cost(kwargs.get("model", "gpt-4"), usage)
self.metrics["total_cost"] += cost
duration = time.time() - start_time
self.logger.info(f"请求成功 - 耗时: {duration:.2f}s, 模型: {kwargs.get('model', 'unknown')}")
return result
except Exception as e:
self.metrics["failed_requests"] += 1
duration = time.time() - start_time
self.logger.error(f"请求失败 - 耗时: {duration:.2f}s, 错误: {str(e)}")
raise
return wrapper
def estimate_cost(self, model: str, usage: Dict) -> float:
"""估算API调用成本"""
# 2025年价格(示例,需要根据实际价格更新)
pricing = {
"gpt-4": {"input": 0.03, "output": 0.06}, # per 1K tokens
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"gpt-3.5-turbo": {"input": 0.0015, "output": 0.002}
}
if model not in pricing:
return 0.0
input_cost = (usage.get("prompt_tokens", 0) / 1000) * pricing[model]["input"]
output_cost = (usage.get("completion_tokens", 0) / 1000) * pricing[model]["output"]
return input_cost + output_cost
def get_metrics(self) -> Dict[str, Any]:
"""获取监控指标"""
success_rate = (
self.metrics["successful_requests"] / self.metrics["total_requests"]
if self.metrics["total_requests"] > 0 else 0
)
return {
**self.metrics,
"success_rate": success_rate,
"average_cost_per_request": (
self.metrics["total_cost"] / self.metrics["successful_requests"]
if self.metrics["successful_requests"] > 0 else 0
)
}
# 使用示例
monitor = OpenAIMonitor()
client = OpenAI()
@monitor.monitor_request
def monitored_chat(messages, model="gpt-4"):
response = client.chat.completions.create(
model=model,
messages=messages
)
return {
"content": response.choices[0].message.content,
"usage": dict(response.usage)
}
# 执行一些请求
result = monitored_chat([{"role": "user", "content": "Hello!"}])
# 查看监控指标
metrics = monitor.get_metrics()
print(f"成功率: {metrics['success_rate']:.2%}")
print(f"总成本: ${metrics['total_cost']:.4f}")