Realtime transcription API
wss://api.kotobatech.ai/v1/realtime
マイクやデスクトップ音声、電話などの音声データを、低遅延でテキストに変換できます。
主なユースケース
- 字幕や議事録をリアルタイムで生成
- 電話回線を使った音声対話エージェントや、音声対応アプリの構築
- ターン(発話の区切り)検出
現在はインターフェースとして WebSocket のみを提供しています。
Rate Limit
Realtime APIに関して、 1アカウント(=API key)ごとに、およびシステム全体にて、 同時接続数の制限がございます。
上限に達した状態で接続を試みると、
接続自体が却下されるか、接続受け入れ後のErrorイベントとして
error[code]=rate_limit_errorのError JSONが返却されます。
接続方法: WebSocket
WebSockets はリアルタイムデータ転送に広く使われており、HTTPリクエストの送受信ができる任意のプラットフォームで利用できます。
ブラウザやモバイルなどのエンドユーザーの環境からもご利用いただけますが、APIキーは安全なサーバー環境でのみ使用し、ブラウザなどのクライアント環境では絶対に使用しないでください。
エンドユーザーのクライアント環境からWebSocket接続する場合は、サーバー環境で先に有効期限付きのclient secretを生成し、それをクライアント環境に渡して認証に使ってください。
| 接続先URL | wss://api.kotobatech.ai/v1/realtime |
| リクエストヘッダー | 認証ヘッダー (ブラウザライクな環境からはWebSocket 第二引数を利用) |
client secret を活用するサンプルコード
client secretを生成するを参考に、 以下のようなエンドポイントをサーバーサイドに用意して、 クライアントサイドから呼び出してclient_secretを渡してください。
ex, Node.js/Expressサーバーでの実装
import express from "express";
const app = express();
// An endpoint which would work with the client code above - it returns
// the contents of a REST API request to this protected endpoint
app.get("/session", async (req, res) => {
const r = await fetch("https://api.kotobatech.ai/v1/realtime/transcription_sessions", {
method: "POST",
headers: {
"Authorization": `Bearer ${process.env.KOTOBA_API_KEY}`,
"Content-Type": "application/json",
},
body: JSON.stringify({
}),
});
const data = await r.json();
res.send(data.client_secret.value);
});
app.listen(3000);
サーバーサイド: サンプルコード
Python 接続とsession初期化まで
# requirements:
# pip install websocket-client
import os
import json
import websocket
url = "wss://api.kotobatech.ai/v1/realtime"
headers = [
"Authorization: Bearer " + os.environ.get("KOTOBA_API_KEY"),
]
def on_open(ws):
print("Connected to server.")
def on_message(ws, message):
data = json.loads(message)
print("Received event:", json.dumps(data, indent=2))
if data['type'] == 'transcription_session.created':
update_event = {
"type": "transcription_session.update",
"session": {
"input_audio_format": "pcm16",
"input_audio_sample_rate": 24000,
"input_audio_number_of_channels": 1,
"input_audio_transcription": {"language": "ja", "target_language": "ja"},
},
}
ws.send_text(json.dumps(update_event))
if data['type'] == 'transcription_session.updated':
# 以降、音声データを送信すると書き起こしが始まります。
pass
ws = websocket.WebSocketApp(
url,
header=headers,
on_open=on_open,
on_message=on_message,
)
ws.run_forever()
Python >=3.11 Twilio Media Stream連携
Twilio Media Streamを使って電話音声を書き起こすサンプルです。
事前にTwilio管理画面からauth tokenの取得とWebHook設定を済ませてください。
WebHookは例えば、以下を実行後に ngrok http localhost:8080 でhttpsドメインを作り登録します。
実行後、Twilioの電話番号へ架電すると、通話内容の書き起こしが print されます。
# requirements:
# pip install websocket-client fastapi twilio uvicorn python-multipart
from fastapi import (
FastAPI, Request, Depends, HTTPException, WebSocketDisconnect, WebSocket
)
from fastapi.responses import HTMLResponse
from twilio.request_validator import RequestValidator
from websocket import WebSocket as wss
import json
import os
import threading
import time
auth_token = 'twilio auth token'
API_KEY = os.environ.get('KOTOBA_API_KEY')
DOMAIN = os.environ.get('TWILIO_WEBHOOK_DOMAIN')
url = "wss://api.kotobatech.ai/v1/realtime"
phase = "created"
app = FastAPI()
validator = RequestValidator(auth_token)
# https://www.twilio.com/docs/voice/twiml/stream
twiml = f"""<?xml version="1.0" encoding="UTF-8"?>
<Response>
<Connect>
<Stream url="wss://{DOMAIN}/" statusCallback="https://{DOMAIN}/status"></Stream>
</Connect>
</Response>"""
async def verify_token(request: Request):
print(f"{request.method} {request.url.path} {request.headers} Body={await request.body()}")
async with request.form() as form:
data = {k: v for k, v in form.items()}
if not validator.validate(f"https://{DOMAIN}{request.url.path}", data, request.headers.get('x-twilio-signature', '')):
raise HTTPException(status_code=400, detail="x-twilio-signature header invalid")
@app.post("/twiml", response_class=HTMLResponse,
dependencies=[Depends(verify_token)]
)
async def twiml_endpoint():
return twiml
# https://www.twilio.com/docs/voice/twiml/stream#statuscallback
@app.post("/status",
dependencies=[Depends(verify_token)]
)
async def status_endpoint(request: Request):
print(f"{request.method} {request.url.path} {request.headers} Body={await request.body()}")
return {}
@app.websocket("/")
async def websocket_endpoint(websocket: WebSocket):
# validator.validate(f"wss://{DOMAIN}/", {}, websocket.headers.get('x_twilio_signature'))
await websocket.accept()
ws = simulate()
chunk_num = 1
call_sid = None
try:
while True:
# まずはtwilio media stream処理を記述
chunk = await websocket.receive_text()
try:
data = json.loads(chunk)
print(chunk)
except:
break
if data['event'] == 'connected' or \
data['event'] == 'mark' or \
data['event'] == 'dtmf':
continue
elif data['event'] == 'start':
call_sid = data['start']['callSid']
continue
elif data['event'] == 'media':
stream_sid = data['streamSid']
if call_sid is None:
print("Invalid chunk order")
break
media = data.get('media', None)
if media is None or str(chunk_num) != media.get('chunk', None):
print("Invalid chunk received")
continue
chunk_num += 1
payload = media.get('payload', '')
# 受信した電話音声をkotoba realtime APIへ
append_event = {
"type": "input_audio_buffer.append",
"audio": payload,
}
ws.send_text(json.dumps(append_event))
elif data['event'] == 'stop':
await websocket.close()
break
else:
print("Unknown event received: ", chunk)
continue
except WebSocketDisconnect:
print("Connection closed")
except Exception as e:
print(e)
finally:
await websocket.close()
def receive_events(ws: wss):
global phase
while True:
data = ws.recv()
if not data:
continue
json_data = json.loads(data)
print(json_data)
if json_data["type"] == "transcription_session.updated":
phase = "updated"
def simulate():
global phase
ws = wss()
ws.connect(url, header=["Authorization: Bearer " + API_KEY])
receive_thread = threading.Thread(target=receive_events, args=(ws,), daemon=True)
receive_thread.start()
update_event = {
"type": "transcription_session.update",
"session": {
"input_audio_format": "twilio",
"input_audio_sample_rate": 8_000,
"input_audio_number_of_channels": 1,
"input_audio_transcription": {"language": "ja", "target_language": "ja"},
},
}
ws.send_text(json.dumps(update_event))
while phase != "updated":
time.sleep(0.01)
if not ws.connected:
break
continue
return ws
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", port=8080, log_level="info")
Node.js 接続まで
// requirements:
// npm install ws
import WebSocket from "ws";
const ws = new WebSocket("wss://api.kotobatech.ai/v1/realtime", {
headers: {
"Authorization": "Bearer " + process.env.KOTOBA_API_KEY,
},
});
ws.on("open", function open() {
console.log("Connected to server.");
});
ws.on("message", function incoming(message) {
console.log(JSON.parse(message.toString()));
});
クライアントサイド: サンプルコード
ブラウザをはじめ、ブラウザライクなJS環境(ex, ReactNative, Deno, Cloudflare Workers)ではデフォルトでWebSocket APIが用意されています。
認証方法について任意のHeaderを送信できないため、第二引数sec-websocket-protocolヘッダーを以下のように工夫してください。
なお、WebSocketではCORS制限は適用されません。
const ws = new WebSocket(
"wss://api.kotobatech.ai/v1/realtime",
[
"realtime",
"kotoba-insecure-api-key." + CLIENT_SECRET_VALUE,
]
);
ws.onopen = () => {
console.log("サーバーに接続しました。");
}
function incoming(message) {
console.log(message.data);
}
ws.onmessage = incoming
接続後のイベント処理
Realtime API接続後は双方向でメッセージを送り合います。
メッセージは必ずJSON形式で、type にイベント名を含める必要があります。
Client Event: 利用者が送信するイベント
Server Event: 弊社サーバーから受信するイベント
transcription_session.createdtranscription_session.updatedinput_audio_buffer.committedconversation.item.createdconversation.item.input_audio_transcription.deltaconversation.item.input_audio_transcription.completederror
# 音声を送信
{
"event_id": "optional_any_id_1",
"type": "input_audio_buffer.append",
"audio": "Base64EncodedAudioData"
}
# 書き起こしを受信
{
"event_id": "xxx",
"type": "conversation.item.input_audio_transcription.delta",
"item_id": "optional_any_id_1",
"delta": "こんにちは、"
}
{
"event_id": "yyy",
"type": "conversation.item.input_audio_transcription.delta",
"item_id": "optional_any_id_1",
"delta": "お元気ですか?"
}
{
"event_id": "zzz",
"type": "conversation.item.input_audio_transcription.completed",
"item_id": "optional_any_id_1",
"transcript": "こんにちは、お元気ですか?"
}
以下にイベント発生例を記載します(6~9のイベント発生順序は保証されません)。
turn_detection=true は現在準備中です。 そのため conversation.item.input_audio_transcription.completed は発生しません。
| Client | Server | 説明 | |
|---|---|---|---|
| 1 | transcription_session.created | 接続開始時に発生 (quotaカウント開始) | |
| 2 | transcription_session.update | Sessionの初期化。開始時に送信必須 (注1) | |
| 3 | transcription_session.updated | 初期化成功時に発生 (以降input_audio_buffer.appendを受け入れ可能) | |
| 4 | input_audio_buffer.append | マイクやファイルから音声データのchunkを送信 | |
| 5 | conversation.item.created | 初めてのinput_audio_buffer.append受信時に発生 | |
| 6 | conversation.item.input_audio_transcription.delta | 書き起こし文の一部を返却 | |
| 7 | ~ | 6の繰り返し | |
| 8 | conversation.item.input_audio_transcription.completed | turn_detection=trueかつターン終了の検知時にのみ発生。1ターン分の全文書き起こしを返却 | |
| 9 | ~ | 4, 6~8を繰り返す | |
| 10 | input_audio_buffer.commit | 書き起こし終了の合図 | |
| 11 | conversation.item.input_audio_transcription.delta | 最後(ストレージされていた音声)の書き起こし文を返却(注2) | |
| 12 | input_audio_buffer.committed | input_audio_buffer.commitイベント受信後、サーバーが全てのSessionにストレージされた音声を書き起こして返却した後に発生 | |
| 13 | 通信を切断 | quotaカウント終了 |
言語の変更など、SessionのメタデータをSession中に変更することはできません。 新しい通信を作成していただく必要があります。
最後の書き起こしにおいて、必ずしもターン検出が行われるとは限りません。
そのためconversation.item.input_audio_transcription.deltaもリッスンしておくことを推奨します。