LOADING
返回 2026-01-11

一个AnthropicToOpenai的本地运行的API转换脚本(用于ClaudeCode等)

wslll
| Code | 11 |

AI 摘要

目录

前言

近期各类国产模型的 Coding 能力感觉都是突飞猛涨,因此我也是不断在测试、体验不同模型。**一般对于新的模型我都会用 NewAPI 统一管理。**但是在使用 claude code 调用时,因为有些模型默认是 OpenAI 格式的调用方式,因为懒得在 NewAPI 中修改设置,于是弄了一个下面这样的脚本。只需要在服务器本地后台运行,即可自动转换格式。

基于 Python 开发,支持调用历史记录,默认将 Anthropic 格式转换为 OpenAI 格式,同时支持基本的并发限制设置。

使用方式很简单:
python main.py --baseurl https://your_newapi_url --port 8080 --limit 30
这里就是说,自己可以设置调用地址,本地监听端口以及并发限制。然后运行即可。

代码

import uvicorn
import httpx
import json
import uuid
import asyncio
import argparse
import shutil
import os
import time
from datetime import datetime, timedelta
from pathlib import Path
from fastapi import FastAPI, Request, BackgroundTasks
from fastapi.responses import StreamingResponse, JSONResponse
from typing import List, Dict, Any
# ================= 命令行参数解析 =================
parser = argparse.ArgumentParser(description="Anthropic to OpenAI API Proxy with Logging")
parser.add_argument("--baseurl", type=str, required=True, help="Target OpenAI Base URL (e.g., https://api3.wlai.vip)")
parser.add_argument("--port", type=int, default=49091, help="Listen port")
parser.add_argument("--limit", type=int, default=10, help="Concurrency limit")
# 解析参数(如果脚本作为模块导入不执行)
if __name__ == "__main__":
args = parser.parse_args()
else:
# 兼容 uvicorn 命令行启动,设置默认值
args = argparse.Namespace(baseurl="https://api3.wlai.vip", port=49091, limit=10)
# ================= 全局配置 =================
# 处理 Base URL,确保拼接正确
BASE_URL = args.baseurl.rstrip("/")
if not BASE_URL.endswith("/v1/chat/completions"):
if BASE_URL.endswith("/v1"):
TARGET_URL = f"{BASE_URL}/chat/completions"
else:
TARGET_URL = f"{BASE_URL}/v1/chat/completions"
else:
TARGET_URL = BASE_URL
LISTEN_PORT = args.port
MAX_CONCURRENT = args.limit
LOG_RETENTION_DAYS = 7
LOG_BASE_DIR = Path("logs")
# 信号量
request_semaphore = asyncio.Semaphore(MAX_CONCURRENT)
app = FastAPI()
# ================= 日志与清理模块 =================
def cleanup_old_logs():
"""清理超过 7 天的日志目录"""
if not LOG_BASE_DIR.exists():
return

cutoff_date = datetime.now() - timedelta(days=LOG_RETENTION_DAYS)

# 遍历 logs 下的日期目录 (格式 YYYY-MM-DD)
for date_dir in LOG_BASE_DIR.iterdir():
if date_dir.is_dir():
try:
dir_date = datetime.strptime(date_dir.name, "%Y-%m-%d")
if dir_date < cutoff_date:
print(f"🧹 Cleaning up old logs: {date_dir}")
shutil.rmtree(date_dir)
except ValueError:
continue # 忽略不符合日期格式的目录
def save_request_log(request_id: str, request_data: dict, response_status: int, error_msg: str = None):
"""保存日志到文件系统"""
try:
now = datetime.now()
# 目录结构: logs/2023-10-27/14/
date_str = now.strftime("%Y-%m-%d")
hour_str = now.strftime("%H")

log_dir = LOG_BASE_DIR / date_str / hour_str
log_dir.mkdir(parents=True, exist_ok=True)

log_file = log_dir / f"{request_id}.json"

log_content = {
"timestamp": now.isoformat(),
"request_id": request_id,
"target_url": TARGET_URL,
"status_code": response_status,
"error": error_msg,
"request": {
"model": request_data.get("model"),
"max_tokens": request_data.get("max_tokens"),
"temperature": request_data.get("temperature"),
# 为了日志整洁,message 内容可以考虑截断,这里选择完整记录
"messages_count": len(request_data.get("messages", [])),
"full_body": request_data
}
}

with open(log_file, "w", encoding="utf-8") as f:
json.dump(log_content, f, ensure_ascii=False, indent=2)

except Exception as e:
print(f"⚠️ Failed to write log: {e}")
# ================= API 逻辑 =================
def convert_to_openai_messages(anthropic_body: Dict[str, Any]) -> List[Dict[str, str]]:
messages = []
if "system" in anthropic_body and anthropic_body["system"]:
system_content = anthropic_body["system"]
if isinstance(system_content, list):
text_parts = [item["text"] for item in system_content if item.get("type") == "text"]
system_text = "\n".join(text_parts)
else:
system_text = str(system_content)
messages.append({"role": "system", "content": system_text})

for msg in anthropic_body.get("messages", []):
role = msg["role"]
content = msg["content"]
final_content = ""
if isinstance(content, str):
final_content = content
elif isinstance(content, list):
text_parts = []
for item in content:
if item.get("type") == "text":
text_parts.append(item["text"])
# 简单处理图片:Claude code 一般只发文本,如果发图片需要更复杂的转换
final_content = "\n".join(text_parts)
messages.append({"role": role, "content": final_content})
return messages
def create_event(event_type: str, data: Dict[str, Any]) -> str:
return f"event: {event_type}\ndata: {json.dumps(data)}\n\n"
@app.on_event("startup")
async def startup_event():
# 启动时清理日志
cleanup_old_logs()
@app.post("/v1/messages")
async def proxy_messages(request: Request, background_tasks: BackgroundTasks):
request_id = f"req_{uuid.uuid4().hex[:8]}"
anthropic_body = {}

try:
# 1. 获取请求体
anthropic_body = await request.json()

# 2. Header 处理
auth_header = request.headers.get("x-api-key") or request.headers.get("authorization")
if auth_header and not auth_header.startswith("Bearer "):
auth_header = f"Bearer {auth_header}"

headers = {
"Authorization": auth_header,
"Content-Type": "application/json"
}
# 3. 转换为 OpenAI 格式
openai_body = {
"model": anthropic_body.get("model"),
"messages": convert_to_openai_messages(anthropic_body),
"stream": True,
"max_tokens": anthropic_body.get("max_tokens", 4096),
}
if "stop_sequences" in anthropic_body:
openai_body["stop"] = anthropic_body["stop_sequences"]
if "temperature" in anthropic_body:
openai_body["temperature"] = anthropic_body["temperature"]
# 4. 上游请求处理 (带信号量和日志记录)
async def upstream_generator():
log_status = 200
log_error = None

# 记录进入队列的时间
queue_start = time.time()

async with request_semaphore:
# 计算等待时间(可选:如果等待太久可以打日志)
wait_time = time.time() - queue_start

# 重试机制
max_retries = 3
for attempt in range(max_retries):
try:
async with httpx.AsyncClient(timeout=120.0) as client:
async with client.stream("POST", TARGET_URL, headers=headers, json=openai_body) as response:

# 429 处理
if response.status_code == 429:
print(f"⚠️ [429] Rate Limit. Retrying... ({attempt+1}/{max_retries})")
await asyncio.sleep(2 + attempt)
continue

# 错误处理
if response.status_code != 200:
log_status = response.status_code
error_content = await response.aread()
error_text = error_content.decode('utf-8')
log_error = error_text
print(f"❌ Error {response.status_code}: {error_text}")

# 记录失败日志
background_tasks.add_task(save_request_log, request_id, anthropic_body, log_status, log_error)

yield create_event("error", {
"type": "error",
"error": {"type": "api_error", "message": f"Upstream: {response.status_code}"}
})
return
# 成功连接
print(f"✅ [{request_id}] Streaming... (Waited: {wait_time:.2f}s)")

# 发送 Anthropic 协议头
yield create_event("message_start", {
"type": "message_start",
"message": {
"id": request_id,
"type": "message",
"role": "assistant",
"content": [],
"model": anthropic_body.get("model"),
"stop_reason": None,
"stop_sequence": None,
"usage": {"input_tokens": 0, "output_tokens": 0}
}
})
yield create_event("content_block_start", {
"type": "content_block_start",
"index": 0,
"content_block": {"type": "text", "text": ""}
})
# 流式转发
async for line in response.aiter_lines():
if not line.strip(): continue
if line.startswith("data: "):
data_str = line[6:]
if data_str.strip() == "[DONE]": continue
try:
chunk = json.loads(data_str)
if chunk.get("choices"):
content = chunk["choices"][0].get("delta", {}).get("content", "")
if content:
yield create_event("content_block_delta", {
"type": "content_block_delta",
"index": 0,
"delta": {"type": "text_delta", "text": content}
})
except:
continue
# 结束流
yield create_event("content_block_stop", {"type": "content_block_stop", "index": 0})
yield create_event("message_delta", {
"type": "message_delta",
"delta": {"stop_reason": "end_turn"},
"usage": {"output_tokens": 0}
})
yield create_event("message_stop", {"type": "message_stop"})

# 记录成功日志
background_tasks.add_task(save_request_log, request_id, anthropic_body, 200, None)
return
except Exception as e:
if attempt == max_retries - 1:
print(f"❌ Stream Exception: {e}")
log_status = 500
log_error = str(e)
background_tasks.add_task(save_request_log, request_id, anthropic_body, 500, str(e))
yield create_event("error", {"type": "error", "error": {"type": "api_error", "message": str(e)}})
await asyncio.sleep(1)
return StreamingResponse(upstream_generator(), media_type="text/event-stream")
except Exception as e:
print(f"Server Error: {e}")
# 记录严重错误日志
background_tasks.add_task(save_request_log, request_id, anthropic_body, 500, str(e))
return JSONResponse(status_code=500, content={"error": {"type": "server_error", "message": str(e)}})
if __name__ == "__main__":
print(f"🚀 Proxy Starting...")
print(f" Target: {TARGET_URL}")
print(f" Listen: 0.0.0.0:{LISTEN_PORT}")
print(f" Limit: {MAX_CONCURRENT} concurrent requests")
print(f" Logging: ./logs/ (7 days retention)")

uvicorn.run(app, host="0.0.0.0", port=LISTEN_PORT, log_level="warning")