SDK 与集成
Python SDK
用 OpenAI Python SDK 接入 CokeAPI,所有 chat / images / video 接口零改动。
CokeAPI 与 OpenAI 协议完全兼容,直接用 OpenAI 官方 Python SDK 即可,不需要单独的 SDK。
安装
pip install openai
# 推荐 1.50+ 版本基础用法
from openai import OpenAI
client = OpenAI(
base_url="https://api.cokeapi.com/v1",
api_key="sk-coke-xxxxxxxx",
)
# 文本
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "用一句话介绍 CokeAPI"}],
)
print(resp.choices[0].message.content)
# 图片
img = client.images.generate(
model="gpt-image-2",
prompt="一只在京都樱花树下的赛博狐狸",
size="1024x1024",
n=1,
)
print(img.data[0].url)流式
stream = client.chat.completions.create(
model="grok-4.20-fast",
messages=[{"role": "user", "content": "讲个三句的笑话"}],
stream=True,
)
for chunk in stream:
print(chunk.choices[0].delta.content or "", end="", flush=True)异步 / 并发
import asyncio
from openai import AsyncOpenAI
client = AsyncOpenAI(
base_url="https://api.cokeapi.com/v1",
api_key="sk-coke-xxxxxxxx",
)
async def main():
tasks = [
client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": q}],
)
for q in ["你好", "再见", "晚安"]
]
results = await asyncio.gather(*tasks)
for r in results:
print(r.choices[0].message.content)
asyncio.run(main())视频生成 (异步任务)
OpenAI SDK 没有视频接口,直接用 httpx:
import httpx, time
BASE = "https://api.cokeapi.com/v1"
HEADERS = {"Authorization": "Bearer sk-coke-xxxxxxxx"}
r = httpx.post(
f"{BASE}/video/generations",
headers=HEADERS,
json={
"model": "grok-imagine-video",
"prompt": "雨夜霓虹中漫步的猫",
"quality": "hd",
"duration": 6,
},
timeout=60,
)
task_id = r.json()["id"]
while True:
poll = httpx.get(f"{BASE}/video/generations/{task_id}", headers=HEADERS, timeout=30)
data = poll.json()
if data["status"] in ("completed", "failed"):
print(data)
break
time.sleep(5)错误处理
from openai import OpenAI, RateLimitError, APIStatusError
try:
client.chat.completions.create(...)
except RateLimitError as e:
# 429,看 Retry-After 头退避
print("被限流,等待重试")
except APIStatusError as e:
# 其他 4xx / 5xx
print(f"业务错误: code={e.body.get('error', {}).get('code')}, "
f"trace_id={e.body.get('error', {}).get('trace_id')}")