progessing on client

This commit is contained in:
2024-12-19 10:18:05 +08:00
parent 31930b362b
commit 7d35fe286d
25 changed files with 1634 additions and 504 deletions

View File

@@ -1,5 +1,8 @@
import { useAccount } from '@/api/account'
import DanmakuClient, { AuthInfo, RoomAuthInfo } from '@/data/DanmakuClient'
import OpenLiveClient, {
AuthInfo,
RoomAuthInfo
} from '@/data/DanmakuClients/OpenLiveClient'
import { defineStore } from 'pinia'
import { computed, ref } from 'vue'
@@ -9,7 +12,7 @@ export interface BCMessage {
}
export const useDanmakuClient = defineStore('DanmakuClient', () => {
const danmakuClient = ref<DanmakuClient>(new DanmakuClient(null))
const danmakuClient = ref<OpenLiveClient>(new OpenLiveClient())
let bc: BroadcastChannel
const isOwnedDanmakuClient = ref(false)
const status = ref<'waiting' | 'initializing' | 'listening' | 'running'>(
@@ -77,8 +80,10 @@ export const useDanmakuClient = defineStore('DanmakuClient', () => {
async (lock) => {
if (lock) {
status.value = 'initializing'
bc = new BroadcastChannel('vtsuru.danmaku.' + accountInfo.value?.id)
console.log('[DanmakuClient] 创建 BroadcastChannel: ' + bc.name)
bc = new BroadcastChannel(
'vtsuru.danmaku.open-live' + accountInfo.value?.id
)
console.log('[DanmakuClient] 创建 BroadcastChannel: ' + bc.name)
bc.onmessage = (event) => {
const message: BCMessage = event.data as BCMessage
const data = message.data ? JSON.parse(message.data) : {}
@@ -86,7 +91,7 @@ export const useDanmakuClient = defineStore('DanmakuClient', () => {
case 'check-client':
sendBCMessage('response-client-status', {
status: status.value,
auth: authInfo.value,
auth: authInfo.value
})
break
case 'response-client-status':
@@ -178,7 +183,7 @@ export const useDanmakuClient = defineStore('DanmakuClient', () => {
const events = danmakuClient.value.events
const eventsAsModel = danmakuClient.value.eventsAsModel
danmakuClient.value = new DanmakuClient(auth || null)
danmakuClient.value = new OpenLiveClient(auth)
danmakuClient.value.events = events
danmakuClient.value.eventsAsModel = eventsAsModel
@@ -192,7 +197,7 @@ export const useDanmakuClient = defineStore('DanmakuClient', () => {
status: 'running',
auth: authInfo.value
})
danmakuClient.value.onEvent('all', (data) => {
danmakuClient.value.on('all', (data) => {
sendBCMessage('on-danmaku', data)
})
return true

View File

@@ -0,0 +1,234 @@
import { useAccount } from '@/api/account'
import { defineStore } from 'pinia'
import { computed, ref } from 'vue'
export interface BCMessage {
type: string
data: string
}
export const useDirectDanmakuClient = defineStore('DirectDanmakuClient', () => {
const danmakuClient = ref<OpenLiveClient>(new OpenLiveClient(null))
let bc: BroadcastChannel
const isOwnedDirectDanmakuClient = ref(false)
const status = ref<'waiting' | 'initializing' | 'listening' | 'running'>(
'waiting'
)
const connected = computed(
() => status.value === 'running' || status.value === 'listening'
)
const authInfo = ref<RoomAuthInfo>()
const accountInfo = useAccount()
let existOtherClient = false
let isInitializing = false
function on(
eventName: 'danmaku' | 'gift' | 'sc' | 'guard',
listener: (...args: any[]) => void
) {
if (!danmakuClient.value.events[eventName]) {
danmakuClient.value.events[eventName] = []
}
danmakuClient.value.events[eventName].push(listener)
}
function onEvent(
eventName: 'danmaku' | 'gift' | 'sc' | 'guard' | 'all',
listener: (...args: any[]) => void
) {
if (!danmakuClient.value.eventsAsModel[eventName]) {
danmakuClient.value.eventsAsModel[eventName] = []
}
danmakuClient.value.eventsAsModel[eventName].push(listener)
}
function off(
eventName: 'danmaku' | 'gift' | 'sc' | 'guard',
listener: (...args: any[]) => void
) {
if (danmakuClient.value.events[eventName]) {
const index = danmakuClient.value.events[eventName].indexOf(listener)
if (index > -1) {
danmakuClient.value.events[eventName].splice(index, 1)
}
}
}
function offEvent(
eventName: 'danmaku' | 'gift' | 'sc' | 'guard' | 'all',
listener: (...args: any[]) => void
) {
if (danmakuClient.value.eventsAsModel[eventName]) {
const index =
danmakuClient.value.eventsAsModel[eventName].indexOf(listener)
if (index > -1) {
danmakuClient.value.eventsAsModel[eventName].splice(index, 1)
}
}
}
async function initClient(auth?: AuthInfo) {
if (!isInitializing && !connected.value) {
isInitializing = true
navigator.locks.request(
'danmakuClientInit',
{ ifAvailable: true },
async (lock) => {
if (lock) {
status.value = 'initializing'
bc = new BroadcastChannel('vtsuru.danmaku.open-live' + accountInfo.value?.id)
console.log('[DirectDanmakuClient] 创建 BroadcastChannel: ' + bc.name)
bc.onmessage = (event) => {
const message: BCMessage = event.data as BCMessage
const data = message.data ? JSON.parse(message.data) : {}
switch (message.type) {
case 'check-client':
sendBCMessage('response-client-status', {
status: status.value,
auth: authInfo.value,
})
break
case 'response-client-status':
switch (
data.status //如果存在已经在运行或者正在启动的客户端, 状态设为 listening
) {
case 'running':
case 'initializing':
status.value = 'listening'
existOtherClient = true
authInfo.value = data.auth
break
}
break
case 'on-danmaku':
const danmaku = JSON.parse(data)
switch (danmaku.cmd) {
case 'LIVE_OPEN_PLATFORM_DM':
danmakuClient.value.onDanmaku(danmaku)
break
case 'LIVE_OPEN_PLATFORM_SEND_GIFT':
danmakuClient.value.onGift(danmaku)
break
case 'LIVE_OPEN_PLATFORM_SUPER_CHAT':
danmakuClient.value.onSC(danmaku)
break
case 'LIVE_OPEN_PLATFORM_GUARD':
danmakuClient.value.onGuard(danmaku)
break
default:
danmakuClient.value.onRawMessage(danmaku)
break
}
break
}
}
console.log('[DirectDanmakuClient] 正在检查客户端状态...')
sendBCMessage('check-client')
setTimeout(() => {
if (!connected.value) {
isOwnedDirectDanmakuClient.value = true
initClientInternal(auth)
} else {
console.log(
'[DirectDanmakuClient] 已存在其他页面弹幕客户端, 开始监听 BroadcastChannel...'
)
}
setInterval(checkClientStatus, 500)
}, 1000)
}
}
)
}
isInitializing = false
return useDirectDanmakuClient()
}
function sendBCMessage(type: string, data?: any) {
bc.postMessage({
type,
data: JSON.stringify(data)
})
}
function checkClientStatus() {
if (!existOtherClient && !isOwnedDirectDanmakuClient.value) {
//当不存在其他客户端, 且自己不是弹幕客户端
//则自己成为新的弹幕客户端
if (status.value != 'initializing') {
console.log('[DirectDanmakuClient] 其他 Client 离线, 开始初始化...')
initClientInternal()
}
} else {
existOtherClient = false //假设其他客户端不存在
sendBCMessage('check-client') //检查其他客户端是否存在
}
}
async function initClientInternal(auth?: AuthInfo) {
status.value = 'initializing'
await navigator.locks.request(
'danmakuClientInitInternal',
{
ifAvailable: true
},
async (lock) => {
if (lock) {
// 有锁
isOwnedDirectDanmakuClient.value = true
const events = danmakuClient.value.events
const eventsAsModel = danmakuClient.value.eventsAsModel
danmakuClient.value = new OpenLiveClient(auth || null)
danmakuClient.value.events = events
danmakuClient.value.eventsAsModel = eventsAsModel
const init = async () => {
const result = await danmakuClient.value.Start()
if (result.success) {
authInfo.value = danmakuClient.value.roomAuthInfo
status.value = 'running'
console.log('[DirectDanmakuClient] 初始化成功')
sendBCMessage('response-client-status', {
status: 'running',
auth: authInfo.value
})
danmakuClient.value.onEvent('all', (data) => {
sendBCMessage('on-danmaku', data)
})
return true
} else {
console.log(
'[DirectDanmakuClient] 初始化失败, 5秒后重试: ' + result.message
)
return false
}
}
while (!(await init())) {
await new Promise((resolve) => {
setTimeout(() => {
resolve(true)
}, 5000)
})
}
} else {
// 无锁
console.log('[DirectDanmakuClient] 正在等待其他页面弹幕客户端初始化...')
status.value = 'listening'
isOwnedDirectDanmakuClient.value = false
}
}
)
}
return {
danmakuClient,
isOwnedDirectDanmakuClient,
status,
connected,
authInfo,
on,
off,
onEvent,
offEvent,
initClient
}
})

View File

@@ -1,102 +1,200 @@
import DanmakuClient from '@/data/DanmakuClient'
import { BASE_HUB_URL } from '@/data/constants'
import BaseDanmakuClient from '@/data/DanmakuClients/BaseDanmakuClient'
import DirectClient, {
DirectClientAuthInfo
} from '@/data/DanmakuClients/DirectClient'
import OpenLiveClient from '@/data/DanmakuClients/OpenLiveClient'
import * as signalR from '@microsoft/signalr'
import * as msgpack from '@microsoft/signalr-protocol-msgpack'
import { useLocalStorage } from '@vueuse/core'
import { format } from 'date-fns'
import { defineStore } from 'pinia'
import { ref } from 'vue'
import { computed, ref } from 'vue'
import { useRoute } from 'vue-router'
import { compress } from 'brotli-compress'
export const useWebFetcher = defineStore('WebFetcher', () => {
const cookie = useLocalStorage('JWT_Token', '')
const route = useRoute()
const startedAt = ref<Date>()
const client = new DanmakuClient(null)
const client = ref<BaseDanmakuClient>()
const signalRClient = ref<signalR.HubConnection>()
const events: string[] = []
const isStarted = ref(false)
let timer: any
let signalRClient: signalR.HubConnection | null = null
let disconnectedByServer = false
async function Start() {
let useCookie = false
/**
* 是否来自Tauri客户端
*/
let isFromClient = false
const prefix = computed(() => {
if (isFromClient) {
return '[web-fetcher-iframe] '
}
return '[web-fetcher] '
})
async function restartDanmakuClient(
type: 'openlive' | 'direct',
directAuthInfo?: DirectClientAuthInfo
) {
console.log(prefix.value + '正在重启弹幕客户端...')
if (
client.value?.state === 'connected' ||
client.value?.state === 'connecting'
) {
client.value.Stop()
}
return await connectDanmakuClient(type, directAuthInfo)
}
async function Start(
type: 'openlive' | 'direct' = 'openlive',
directAuthInfo?: DirectClientAuthInfo,
_isFromClient: boolean = false
): Promise<{ success: boolean; message: string }> {
if (isStarted.value) {
return
}
while (!(await connectSignalR())) {
console.log('[WEB-FETCHER] 连接失败, 5秒后重试')
await new Promise((resolve) => setTimeout(resolve, 5000))
startedAt.value = new Date()
return { success: true, message: '已启动' }
}
const result = await navigator.locks.request(
'webFetcherStart',
async () => {
isFromClient = _isFromClient
while (!(await connectSignalR())) {
console.log(prefix.value + '连接失败, 5秒后重试')
await new Promise((resolve) => setTimeout(resolve, 5000))
}
let result = await connectDanmakuClient(type, directAuthInfo)
while (!result?.success) {
console.log(prefix.value + '弹幕客户端启动失败, 5秒后重试')
await new Promise((resolve) => setTimeout(resolve, 5000))
result = await connectDanmakuClient(type, directAuthInfo)
}
isStarted.value = true
disconnectedByServer = false
return result
}
)
return result
}
function Stop() {
if (!isStarted.value) {
return
}
isStarted.value = false
client.Stop()
client.value?.Stop()
client.value = undefined
if (timer) {
clearInterval(timer)
timer = undefined
}
signalRClient?.stop()
signalRClient = null
signalRClient.value?.stop()
signalRClient.value = undefined
startedAt.value = undefined
}
async function connectDanmakuClient() {
console.log('[WEB-FETCHER] 正在连接弹幕客户端...')
const result = await client.Start()
if (result.success) {
console.log('[WEB-FETCHER] 加载完成, 开始监听弹幕')
client.onEvent('all', onGetDanmakus)
/************* ✨ Codeium Command ⭐ *************/
/**
* Connects to the danmaku client based on the specified type.
*
* @param type - The type of danmaku client to connect, either 'openlive' or 'direct'.
* @param directConnectInfo - Optional authentication information required when connecting to a 'direct' type client.
* It should include a token, roomId, tokenUserId, and buvid.
*
* @returns A promise that resolves to an object containing a success flag and a message.
* If the connection and client start are successful, the client starts listening to danmaku events.
* If the connection fails or the authentication information is not provided for a 'direct' type client,
* the function returns with a failure message.
*/
/****** 3431380f-29f6-41b0-801a-7f081b59b4ff *******/
async function connectDanmakuClient(
type: 'openlive' | 'direct',
directConnectInfo?: {
token: string
roomId: number
tokenUserId: number
buvid: string
}
) {
if (
client.value?.state === 'connected' ||
client.value?.state === 'connecting'
) {
return { success: true, message: '弹幕客户端已启动' }
}
console.log(prefix.value + '正在连接弹幕客户端...')
if (!client.value) {
//只有在没有客户端的时候才创建, 并添加事件
if (type == 'openlive') {
client.value = new OpenLiveClient()
} else {
if (!directConnectInfo) {
return { success: false, message: '未提供弹幕客户端认证信息' }
}
client.value = new DirectClient(directConnectInfo)
}
client.value?.on('all', (data) => onGetDanmakus(data))
}
const result = await client.value?.Start()
if (result?.success) {
console.log(prefix.value + '加载完成, 开始监听弹幕')
timer ??= setInterval(() => {
sendEvents()
}, 1500)
} else {
console.log('[WEB-FETCHER] 弹幕客户端启动失败: ' + result.message)
console.log(prefix.value + '弹幕客户端启动失败: ' + result?.message)
}
return result
}
async function connectSignalR() {
console.log('[WEB-FETCHER] 正在连接到 vtsuru 服务器...')
console.log(prefix.value + '正在连接到 vtsuru 服务器...')
const connection = new signalR.HubConnectionBuilder()
.withUrl(BASE_HUB_URL + 'web-fetcher?token=' + route.query.token, {
headers: {
Authorization: `Bearer ${cookie.value}`,
Authorization: `Bearer ${cookie.value}`
},
skipNegotiation: true,
transport: signalR.HttpTransportType.WebSockets,
transport: signalR.HttpTransportType.WebSockets
})
.withAutomaticReconnect([0, 2000, 10000, 30000])
.withHubProtocol(new msgpack.MessagePackHubProtocol())
.build()
connection.on('Disconnect', (reason: unknown) => {
console.log('[WEB-FETCHER] 被服务器断开连接: ' + reason)
console.log(prefix.value + '被服务器断开连接: ' + reason)
disconnectedByServer = true
connection.stop()
signalRClient = null
signalRClient.value = undefined
})
connection.on('ConnectClient', async () => {
if (client.isRunning) {
/*connection.on('ConnectClient', async () => {
if (client?.state === 'connected') {
return
}
let result = await connectDanmakuClient()
while (!result.success) {
console.log('[WEB-FETCHER] 弹幕客户端启动失败, 5秒后重试')
while (!result?.success) {
console.log(prefix.value + '弹幕客户端启动失败, 5秒后重试')
await new Promise((resolve) => setTimeout(resolve, 5000))
result = await connectDanmakuClient()
}
isStarted.value = true
disconnectedByServer = false
})
})*/
connection.onclose(reconnect)
try {
await connection.start()
console.log('[WEB-FETCHER] 已连接到 vtsuru 服务器')
signalRClient = connection
console.log(prefix.value + '已连接到 vtsuru 服务器')
await connection.send('Finished')
if (isFromClient) {
// 如果来自Tauri客户端设置自己为VTsuru客户端
await connection.send('SetAsVTsuruClient')
}
signalRClient.value = connection
return true
} catch (e) {
console.log('[WEB-FETCHER] 无法连接到 vtsuru 服务器: ' + e)
console.log(prefix.value + '无法连接到 vtsuru 服务器: ' + e)
return false
}
}
@@ -105,8 +203,12 @@ export const useWebFetcher = defineStore('WebFetcher', () => {
return
}
try {
await signalRClient?.start()
console.log('[WEB-FETCHER] 已重新连接')
await signalRClient.value?.start()
await signalRClient.value?.send('Reconnected')
if (isFromClient) {
await signalRClient.value?.send('SetAsVTsuruClient')
}
console.log(prefix.value + '已重新连接')
} catch (err) {
console.log(err)
setTimeout(reconnect, 5000) // 如果连接失败则每5秒尝试一次重新启动连接
@@ -116,7 +218,7 @@ export const useWebFetcher = defineStore('WebFetcher', () => {
events.push(command)
}
async function sendEvents() {
if (signalRClient?.state !== 'Connected') {
if (signalRClient.value?.state !== 'Connected') {
return
}
let tempEvents: string[] = []
@@ -129,15 +231,29 @@ export const useWebFetcher = defineStore('WebFetcher', () => {
count = events.length
}
if (tempEvents.length > 0) {
const result = await signalRClient?.invoke<{
const compressed = await compress(
new TextEncoder().encode(
JSON.stringify({
Events: tempEvents.map((e) =>
typeof e === 'string' ? e : JSON.stringify(e)
),
Version: '1.0.0',
OSInfo: navigator.userAgent,
UseCookie: useCookie
})
)
)
const result = await signalRClient.value?.invoke<{
Success: boolean
Message: string
}>('UploadEvents', tempEvents, false)
}>('UploadEventsCompressed', compressed)
if (result?.Success) {
events.splice(0, count)
console.log(`[WEB-FETCHER] <${format(new Date(), 'HH:mm:ss')}> 上传了 ${count} 条弹幕`)
console.log(
`[WEB-FETCHER] <${format(new Date(), 'HH:mm:ss')}> 上传了 ${count} 条弹幕`
)
} else {
console.error('[WEB-FETCHER] 上传弹幕失败: ' + result?.Message)
console.error(prefix.value + '上传弹幕失败: ' + result?.Message)
}
}
}
@@ -145,7 +261,10 @@ export const useWebFetcher = defineStore('WebFetcher', () => {
return {
Start,
Stop,
restartDanmakuClient,
client,
signalRClient,
isStarted,
startedAt
}
})