TypeScript WebRTC 示例
本文档演示如何使用 TypeScript 通过 MQTT 信令与 EMQX 多媒体服务进行 WebRTC 交互,实现音视频通话、语音识别和文字转语音功能。
EMQX 多媒体服务通过 MQTT 协议进行 WebRTC 信令交换,客户端可以轻松集成音视频通话功能,并享受 AI 驱动的语音识别(ASR)和文字转语音(TTS)服务。
核心架构
TypeScript Client ←→ MQTT 信令 ←→ EMQX 多媒体服务
↓
WebRTC P2P 连接
关键 MQTT 主题:
$webrtc/{device_id}
- 接收服务端的 SDP answer 和 ICE candidates$webrtc/{device_id}/multimedia_proxy
- 发送 SDP offer 和 ICE candidates$message/{device_id}
- 接收 ASR、TTS 和消息数据
核心实现
WebRTC 信令类详解
MqttWebRTCSignaling
是核心的信令管理类,负责处理 MQTT 消息传递、WebRTC 连接建立和多媒体服务交互。该类封装了复杂的信令逻辑,提供简洁的 API 供开发者使用。
import type { MqttClient } from 'mqtt'
interface SignalingMessage {
type: 'sdp_offer' | 'sdp_answer' | 'ice_candidate' | 'webrtc_terminated'
| 'asr_response' | 'tts_begin' | 'tts_text' | 'tts_complete' | 'tts_terminate'
| 'chat' | 'message'
data?: any
payload?: any
reason?: string
results?: string
text?: string
task_id?: string
}
interface WebRTCCallbacks {
onLocalStream?: (stream: MediaStream) => void
onRemoteStream?: (stream: MediaStream) => void
onConnectionStateChange?: (state: string) => void
onASRResponse?: (text: string) => void
onTTSText?: (text: string) => void
onMessage?: (message: string) => void
onError?: (error: Error) => void
}
export class MqttWebRTCSignaling {
private mqttClient: MqttClient
private pc: RTCPeerConnection | null = null
private localStream: MediaStream | null = null
private remoteStream: MediaStream | null = null
private clientId: string
private callbacks: WebRTCCallbacks
private messageHandler: ((topic: string, message: Buffer) => void) | null = null
private ttsText: string = ''
constructor(options: {
mqttClient: MqttClient
clientId: string
callbacks?: WebRTCCallbacks
}) {
this.mqttClient = options.mqttClient
this.clientId = options.clientId
this.callbacks = options.callbacks || {}
}
async connect(): Promise<void> {
if (!this.mqttClient.connected) {
throw new Error('MQTT client is not connected')
}
// 订阅主题
await this.subscribeToTopics()
// 设置消息处理
this.setupMessageHandler()
// 初始化 WebRTC
await this.setupWebRTC()
}
private async subscribeToTopics(): Promise<void> {
const topics = [
`$webrtc/${this.clientId}`,
`$message/${this.clientId}`
]
return new Promise((resolve, reject) => {
this.mqttClient.subscribe(topics, { qos: 0 }, (err) => {
if (err) reject(err)
else resolve()
})
})
}
private setupMessageHandler(): void {
this.messageHandler = (topic: string, message: Buffer) => {
if (topic === `$webrtc/${this.clientId}` || topic === `$message/${this.clientId}`) {
try {
const payload = JSON.parse(message.toString()) as SignalingMessage
this.handleSignalingMessage(topic, payload)
} catch (error) {
console.error('Failed to parse message:', error)
}
}
}
this.mqttClient.on('message', this.messageHandler)
}
private async handleSignalingMessage(topic: string, message: SignalingMessage): Promise<void> {
if (topic === `$webrtc/${this.clientId}`) {
// WebRTC 信令处理
switch (message.type) {
case 'sdp_answer':
if (this.pc && message.data) {
const answer = new RTCSessionDescription({
type: 'answer',
sdp: message.data.sdp || message.data
})
await this.pc.setRemoteDescription(answer)
}
break
case 'ice_candidate':
if (this.pc && message.data) {
const candidate = new RTCIceCandidate(message.data)
await this.pc.addIceCandidate(candidate)
}
break
case 'webrtc_terminated':
this.callbacks.onError?.(new Error(`WebRTC terminated: ${message.reason}`))
this.disconnect()
break
}
} else if (topic === `$message/${this.clientId}`) {
// 多媒体服务消息处理
switch (message.type) {
case 'asr_response':
this.callbacks.onASRResponse?.(message.results || '')
break
case 'tts_begin':
this.ttsText = ''
break
case 'tts_text':
this.ttsText += message.text || ''
this.callbacks.onTTSText?.(this.ttsText)
break
case 'tts_complete':
console.log('TTS complete:', message.task_id)
break
case 'tts_terminate':
console.log('TTS terminated:', message.task_id)
break
case 'message':
this.callbacks.onMessage?.(message.payload || '')
break
}
}
}
private async setupWebRTC(): Promise<void> {
// 获取用户媒体
this.localStream = await navigator.mediaDevices.getUserMedia({
video: true,
audio: {
echoCancellation: true,
noiseSuppression: true,
autoGainControl: true,
sampleRate: 48000
}
})
this.callbacks.onLocalStream?.(this.localStream)
// 创建 RTCPeerConnection
this.pc = new RTCPeerConnection({
iceServers: [{ urls: 'stun:stun.l.google.com:19302' }]
})
this.remoteStream = new MediaStream()
// 添加本地流
this.localStream.getTracks().forEach(track => {
this.pc!.addTrack(track, this.localStream!)
})
// 设置事件处理
this.setupPeerConnectionHandlers()
// 创建并发送 offer
const offer = await this.pc.createOffer()
await this.pc.setLocalDescription(offer)
this.sendSignal('sdp_offer', offer)
}
private setupPeerConnectionHandlers(): void {
if (!this.pc) return
// ICE 候选处理
this.pc.onicecandidate = (event) => {
if (event.candidate) {
this.sendSignal('ice_candidate', event.candidate)
}
}
// 连接状态变化
this.pc.onconnectionstatechange = () => {
if (this.pc) {
this.callbacks.onConnectionStateChange?.(this.pc.connectionState)
if (this.pc.connectionState === 'failed') {
this.callbacks.onError?.(new Error('Connection failed'))
}
}
}
// 远程流处理
this.pc.ontrack = (event) => {
if (this.remoteStream) {
this.remoteStream.addTrack(event.track)
this.callbacks.onRemoteStream?.(this.remoteStream)
}
}
}
private sendSignal(type: string, data: any): void {
const message = JSON.stringify({ type, data })
const topic = `$webrtc/${this.clientId}/multimedia_proxy`
this.mqttClient.publish(topic, message, { qos: 0 }, (err) => {
if (err) {
console.error(`Failed to send ${type}:`, err)
}
})
}
// 音视频控制
toggleAudio(enabled?: boolean): void {
if (!this.localStream) return
this.localStream.getAudioTracks().forEach(track => {
track.enabled = enabled !== undefined ? enabled : !track.enabled
})
}
toggleVideo(enabled?: boolean): void {
if (!this.localStream) return
this.localStream.getVideoTracks().forEach(track => {
track.enabled = enabled !== undefined ? enabled : !track.enabled
})
}
// 发送聊天消息
sendChatMessage(message: string): void {
this.sendSignal('chat', message)
}
disconnect(): void {
// 停止媒体流
if (this.localStream) {
this.localStream.getTracks().forEach(track => track.stop())
this.localStream = null
}
if (this.remoteStream) {
this.remoteStream.getTracks().forEach(track => track.stop())
this.remoteStream = null
}
// 关闭 peer connection
if (this.pc) {
this.pc.close()
this.pc = null
}
// 清理消息处理器
if (this.mqttClient && this.messageHandler) {
this.mqttClient.removeListener('message', this.messageHandler)
this.messageHandler = null
}
// 取消订阅
const topics = [`$webrtc/${this.clientId}`, `$message/${this.clientId}`]
this.mqttClient.unsubscribe(topics)
this.callbacks.onConnectionStateChange?.('disconnected')
}
}
使用示例
以下是基础的使用示例,展示核心 API 的使用方法。该 WebRTC 信令类可以在各种前端环境中使用,包括 Vue、React 等框架,也可以直接在原生 JavaScript 和 HTML 中实现。
完整的实现示例可以查看:MCP AI Companion Demo 项目中的 web 部分。
基础连接和回调设置
创建 WebRTC 连接需要先建立 MQTT 连接,然后配置各种回调函数来处理音视频流和 AI 服务响应。
import mqtt from 'mqtt'
import { MqttWebRTCSignaling } from './mqtt-webrtc-signaling'
// 连接 MQTT
const mqttClient = mqtt.connect('ws://broker.emqx.io:8083/mqtt', {
clientId: 'device_123',
username: 'your-username',
password: 'your-password'
})
// 创建 WebRTC 信令
const signaling = new MqttWebRTCSignaling({
mqttClient,
clientId: 'device_123',
callbacks: {
onLocalStream: (stream) => {
document.getElementById('localVideo').srcObject = stream
},
onRemoteStream: (stream) => {
document.getElementById('remoteVideo').srcObject = stream
},
onASRResponse: (text) => {
console.log('语音识别:', text)
},
onTTSText: (text) => {
console.log('AI 回复:', text)
},
onConnectionStateChange: (state) => {
console.log('连接状态:', state)
}
}
})
// 开始连接
await signaling.connect()
多媒体功能控制
连接建立后,可以通过简单的 API 调用来控制音视频设备、发送消息和管理连接状态。
// 音视频控制
signaling.toggleAudio(false) // 静音
signaling.toggleVideo(true) // 开启摄像头
// 发送聊天消息
signaling.sendChatMessage('Hello AI!')
// 断开连接
signaling.disconnect()
关键交互说明
MQTT 信令交换流程
WebRTC 连接建立通过 MQTT 主题进行信令交换,遵循标准的 offer/answer 模式:
1. 客户端创建 offer → 发送到 $webrtc/{device_id}/multimedia_proxy
2. 服务端处理 offer → 返回 answer 到 $webrtc/{device_id}
3. 双方交换 ICE candidates → 通过上述两个主题进行
4. 建立 P2P 连接 → 开始音视频传输
多媒体 AI 服务
EMQX 多媒体服务提供三种核心 AI 功能,通过 $message/{device_id}
主题进行数据交换:
- ASR (语音识别): 实时识别用户语音并返回文本结果,支持流式识别
- TTS (文字转语音): 将文本转换为自然语音,支持流式文本推送
- 智能对话: 结合 ASR 和 TTS,实现完整的语音交互体验
错误处理和最佳实践
在实际应用中,需要处理各种可能出现的错误情况:
// 常见错误处理
callbacks: {
onError: (error) => {
if (error.message.includes('getUserMedia')) {
alert('无法访问摄像头/麦克风,请检查权限')
} else if (error.message.includes('Connection failed')) {
// 自动重连逻辑
setTimeout(() => signaling?.connect(), 2000)
}
}
}
常见问题解决方案:
- 媒体权限问题: 确保浏览器已授权摄像头和麦克风访问
- 网络连接问题: 实现自动重连机制,处理网络波动
- 信令超时: 设置合理的连接超时时间,提供用户友好的错误提示
高级配置
ICE 服务器配置
为确保 WebRTC 连接在各种网络环境下正常工作,需要配置适当的 ICE 服务器:
const iceServers = [
{ urls: 'stun:stun.l.google.com:19302' }, // 公共 STUN 服务器
{
urls: 'turn:your-turn-server.com:3478', // 私有 TURN 服务器(生产环境推荐)
username: 'turnuser',
credential: 'turnpass'
}
]
媒体约束优化
根据应用场景调整音视频质量参数,平衡质量和带宽消耗:
const mediaConstraints = {
video: {
width: { ideal: 1280 }, // 视频宽度
height: { ideal: 720 }, // 视频高度
frameRate: { ideal: 30 } // 帧率
},
audio: {
echoCancellation: true, // 回声消除
noiseSuppression: true, // 噪声抑制
autoGainControl: true, // 自动增益控制
sampleRate: 48000, // 采样率
sampleSize: 16, // 采样位深
channelCount: 2 // 声道数
}
}
总结
通过 EMQX 多媒体服务的 TypeScript WebRTC 集成,你可以轻松实现:
- 音视频通话: 高质量的实时音视频传输
- AI 语音识别: 实时语音转文字,支持多语言
- 智能语音合成: 自然流畅的文字转语音
- 跨平台兼容: 支持各种前端框架和原生 JavaScript
这为构建现代化的 AI 驱动通信应用提供了完整的技术基础。