From 22cc0e10732d67fe17ef17dff7f4d462353179af Mon Sep 17 00:00:00 2001 From: Cameron Otsuka Date: Wed, 7 Jan 2026 19:51:42 -0800 Subject: [PATCH] get websocket and sse message passing working --- astro.config.mjs | 9 ++-- src/actions/index.ts | 24 +++++++++ src/libs/obs.ts | 87 -------------------------------- src/libs/sse.ts | 13 +++++ src/pages/api/control-panel.ts | 30 +++++++++++ src/pages/api/overlay.ts | 20 ++++++++ src/pages/index.astro | 56 +++++++++++++++++---- src/server/obs-manager.ts | 92 ++++++++++++++++++++++++++++++++++ src/server/sse.ts | 28 +++++++++++ src/stores/obs.ts | 18 +++++-- tsconfig.json | 1 + 11 files changed, 270 insertions(+), 108 deletions(-) create mode 100644 src/actions/index.ts delete mode 100644 src/libs/obs.ts create mode 100644 src/libs/sse.ts create mode 100644 src/pages/api/control-panel.ts create mode 100644 src/pages/api/overlay.ts create mode 100644 src/server/obs-manager.ts create mode 100644 src/server/sse.ts diff --git a/astro.config.mjs b/astro.config.mjs index 87c117a..0347edc 100644 --- a/astro.config.mjs +++ b/astro.config.mjs @@ -1,10 +1,7 @@ // @ts-check -import { defineConfig } from 'astro/config'; - -import node from '@astrojs/node'; +import { defineConfig } from "astro/config"; +import node from "@astrojs/node"; export default defineConfig({ - adapter: node({ - mode: 'standalone', - }), + adapter: node({ mode: "standalone" }), }); diff --git a/src/actions/index.ts b/src/actions/index.ts new file mode 100644 index 0000000..052011a --- /dev/null +++ b/src/actions/index.ts @@ -0,0 +1,24 @@ +import { defineAction } from "astro:actions"; +import { z } from "astro:schema"; +import { obsManager } from "@server/obs-manager"; + +export const server = { + obs: { + connect: defineAction({ + handler: async () => { + await obsManager.connect(); + }, + }), + disconnect: defineAction({ + handler: async () => { + await obsManager.disconnect(); + }, + }), + switchScene: defineAction({ + input: z.object({ sceneName: z.string() }), + handler: async ({ sceneName }) => { + await obsManager.switchScene(sceneName); + }, + }), + }, +}; diff --git a/src/libs/obs.ts b/src/libs/obs.ts deleted file mode 100644 index bb14b33..0000000 --- a/src/libs/obs.ts +++ /dev/null @@ -1,87 +0,0 @@ -import OBSWebSocket from 'obs-websocket-js'; -import type { OBSEventTypes, OBSRequestTypes, OBSResponseTypes } from 'obs-websocket-js'; -import { $connected, $currentScene, $sceneList } from '@stores/obs'; - -class OBSClient { - private obs: OBSWebSocket; - private reconnectTimeout?: NodeJS.Timeout; - private host: string = 'localhost'; - private port: number = 4455; - private password: string = ''; - - constructor() { - this.obs = new OBSWebSocket(); - - // event listeners - this.obs.on('ConnectionClosed', this.scheduleReconnect); - this.obs.on('ConnectionError', this.scheduleReconnect); - this.obs.on('Identified', this.initState); - this.obs.on('CurrentProgramSceneChanged', this.sceneChanged) - this.obs.on('SceneListChanged', this.fetchScenes) - } - - private scheduleReconnect() { - $connected.set(false); - if (this.reconnectTimeout) { - clearTimeout(this.reconnectTimeout); - } - this.reconnectTimeout = setTimeout(() => { - this.connect(); - }, 5000); - } - - private async initState() { - $connected.set(true); - await this.fetchScenes(); - } - - private async fetchScenes() { - try { - const {scenes, currentProgramSceneName}: OBSResponseTypes['GetSceneList'] = await this.obs.call('GetSceneList'); - const sceneList = scenes.map((scene) => ({ - name: scene.sceneName as string, - index: scene.sceneIndex as number - })).reverse(); - $currentScene.set(currentProgramSceneName); - $sceneList.set(sceneList); - } catch (error) { - console.error(error); - } - } - - private sceneChanged(event: OBSEventTypes['CurrentProgramSceneChanged']) { - $currentScene.set(event.sceneName); - } - - async switchScene(switchSceneRequest: OBSRequestTypes['SetCurrentProgramScene']) { - try { - await this.obs.call('SetCurrentProgramScene', switchSceneRequest); - } catch (error) { - console.error(error); - } - } - - async connect() { - const url = `ws://${this.host}:${this.port}`; - try { - await this.obs.connect(url, this.password); - } catch (error) { - console.error(error); - this.scheduleReconnect(); - } - } - - async disconnect() { - if (this.reconnectTimeout) { - clearTimeout(this.reconnectTimeout); - } - await this.obs.disconnect(); - } - - isConnected(): boolean { - return $connected.get(); - } -} - -// Singleton instance -export const obsClient = new OBSClient(); diff --git a/src/libs/sse.ts b/src/libs/sse.ts new file mode 100644 index 0000000..e85f68a --- /dev/null +++ b/src/libs/sse.ts @@ -0,0 +1,13 @@ +export function initSSE( + eventSourceEndpoint: string, + eventType: string, + onmessage: (ev: MessageEvent) => void +) { + const eventSource = new EventSource(eventSourceEndpoint); + eventSource.addEventListener(eventType, onmessage); + + eventSource.onerror = () => { + eventSource.close(); + setTimeout(() => initSSE(eventSourceEndpoint, eventType, onmessage), 3000); + }; +} diff --git a/src/pages/api/control-panel.ts b/src/pages/api/control-panel.ts new file mode 100644 index 0000000..eee7bdb --- /dev/null +++ b/src/pages/api/control-panel.ts @@ -0,0 +1,30 @@ +import type { APIRoute } from "astro"; +import { sse } from "@server/sse"; +import { $obs } from "@stores/obs"; + +export const GET: APIRoute = ({ request }) => { + const stream = new ReadableStream({ + start(controller) { + // Send initial OBS state + const encoder = new TextEncoder(); + const initialState = $obs.get(); + controller.enqueue( + encoder.encode( + `event: obs\ndata: ${JSON.stringify({ category: "state", data: initialState })}\n\n`, + ), + ); + + sse.connect(controller, ["obs"]); + request.signal.addEventListener("abort", () => + sse.disconnect(controller), + ); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + }, + }); +}; diff --git a/src/pages/api/overlay.ts b/src/pages/api/overlay.ts new file mode 100644 index 0000000..678141c --- /dev/null +++ b/src/pages/api/overlay.ts @@ -0,0 +1,20 @@ +import type { APIRoute } from "astro"; +import { sse } from "@server/sse"; + +export const GET: APIRoute = ({ request }) => { + const stream = new ReadableStream({ + start(controller) { + sse.connect(controller, ["overlay"]); + request.signal.addEventListener("abort", () => + sse.disconnect(controller), + ); + }, + }); + + return new Response(stream, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + }, + }); +}; diff --git a/src/pages/index.astro b/src/pages/index.astro index c429820..867fede 100644 --- a/src/pages/index.astro +++ b/src/pages/index.astro @@ -1,15 +1,49 @@ --- -import Base from '@layouts/base.astro'; +import Base from "@layouts/base.astro"; --- -
-

OBS Control Panel

-
-
- Test content -
- - \ No newline at end of file +
+

OBS Control Panel

+
+
+ + +
+ + + + diff --git a/src/server/obs-manager.ts b/src/server/obs-manager.ts new file mode 100644 index 0000000..f3eab1c --- /dev/null +++ b/src/server/obs-manager.ts @@ -0,0 +1,92 @@ +import OBSWebSocket from "obs-websocket-js"; +import type { OBSEventTypes, OBSResponseTypes } from "obs-websocket-js"; +import { $obs, obsInitialState } from "@stores/obs"; +import { sse } from "@server/sse"; + +// Broadcast store changes via SSE +$obs.listen((state) => { + sse.send("obs", { category: "state", data: state }); +}); + +class OBSManager { + private obs: OBSWebSocket; + + private host = process.env.OBS_HOST ?? "localhost"; + private port = parseInt(process.env.OBS_PORT ?? "4455"); + private password = process.env.OBS_PASSWORD ?? ""; + + constructor() { + this.obs = new OBSWebSocket(); + this.setupEventListeners(); + } + + private setupEventListeners() { + this.obs.on("ConnectionClosed", this.handleDisconnect); + this.obs.on("ConnectionError", this.handleConnectionError); + this.obs.on("Identified", this.handleIdentified); + this.obs.on("CurrentProgramSceneChanged", this.handleSceneChanged); + this.obs.on("SceneListChanged", this.handleSceneListChanged); + } + + private handleDisconnect = () => { + $obs.set(obsInitialState); + }; + + private handleConnectionError = (error: Error) => { + console.error("OBS Connection Error:", error); + $obs.setKey("connected", false); + }; + + private handleIdentified = async () => { + $obs.setKey("connected", true); + await this.fetchScenes(); + }; + + private handleSceneChanged = ( + event: OBSEventTypes["CurrentProgramSceneChanged"], + ) => { + $obs.setKey("currentScene", event.sceneName); + }; + + private handleSceneListChanged = async () => { + await this.fetchScenes(); + }; + + private async fetchScenes() { + try { + const response: OBSResponseTypes["GetSceneList"] = + await this.obs.call("GetSceneList"); + const sceneList = response.scenes + .map((scene) => ({ + name: scene.sceneName as string, + index: scene.sceneIndex as number, + })) + .reverse(); + + $obs.setKey("currentScene", response.currentProgramSceneName); + $obs.setKey("sceneList", sceneList); + } catch (error) { + console.error("Failed to fetch scenes:", error); + } + } + + async connect() { + const url = `ws://${this.host}:${this.port}`; + try { + await this.obs.connect(url, this.password); + } catch (error) { + console.error("Failed to connect to OBS:", error); + throw error; + } + } + + async disconnect() { + await this.obs.disconnect(); + } + + async switchScene(sceneName: string) { + await this.obs.call("SetCurrentProgramScene", { sceneName }); + } +} + +export const obsManager = new OBSManager(); diff --git a/src/server/sse.ts b/src/server/sse.ts new file mode 100644 index 0000000..1e45680 --- /dev/null +++ b/src/server/sse.ts @@ -0,0 +1,28 @@ +export interface SSEMessage { + category: string; + data: T; +} + +type Controller = ReadableStreamDefaultController; + +const clients = new Map>(); +const encoder = new TextEncoder(); + +export const sse = { + connect: (c: Controller, eventTypes: string[]) => + clients.set(c, new Set(eventTypes)), + disconnect: (c: Controller) => clients.delete(c), + send: (type: string, data: unknown) => { + const msg = encoder.encode( + `event: ${type}\ndata: ${JSON.stringify(data)}\n\n`, + ); + for (const [c, types] of clients) { + if (!types.has(type)) continue; + try { + c.enqueue(msg); + } catch { + clients.delete(c); + } + } + }, +}; diff --git a/src/stores/obs.ts b/src/stores/obs.ts index 71d6176..9728a48 100644 --- a/src/stores/obs.ts +++ b/src/stores/obs.ts @@ -1,5 +1,15 @@ -import { atom } from 'nanostores'; +import { map } from "nanostores"; -export const $connected = atom(false); -export const $currentScene = atom(''); -export const $sceneList = atom<{name: string; index: number;}[]>([]); +export interface OBSState { + connected: boolean; + currentScene: string; + sceneList: { name: string; index: number }[]; +} + +export const obsInitialState: OBSState = { + connected: false, + currentScene: "", + sceneList: [], +} as const; + +export const $obs = map(obsInitialState); diff --git a/tsconfig.json b/tsconfig.json index 763ae54..c88c46e 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -9,6 +9,7 @@ "@components/*": ["components/*"], "@layouts/*": ["layouts/*"], "@libs/*": ["libs/*"], + "@server/*": ["server/*"], "@stores/*": ["stores/*"], "@styles/*": ["styles/*"], }, -- 2.52.0