136 lines
2.6 KiB
Go
136 lines
2.6 KiB
Go
|
package videoparser
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"net/http"
|
||
|
"nhooyr.io/websocket"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// subscriber represents a subscriber.
|
||
|
// Messages are sent on the msgs channel and if the client
|
||
|
// cannot keep up with the messages, closeSlow is called.
|
||
|
type subscriber struct {
|
||
|
msgs chan []byte
|
||
|
closeSlow func()
|
||
|
}
|
||
|
|
||
|
type ChatSender struct {
|
||
|
subscribersMu sync.Mutex
|
||
|
subscribers map[*subscriber]struct{}
|
||
|
}
|
||
|
|
||
|
func newChatSender() *ChatSender {
|
||
|
return &ChatSender{
|
||
|
subscribers: make(map[*subscriber]struct{}),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (t *ChatSender) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||
|
c, err := websocket.Accept(w, r, &websocket.AcceptOptions{
|
||
|
OriginPatterns: []string{"*"},
|
||
|
})
|
||
|
if err != nil {
|
||
|
fmt.Println(err.Error())
|
||
|
return
|
||
|
}
|
||
|
defer c.Close(websocket.StatusInternalError, "")
|
||
|
|
||
|
err = t.subscribe(r.Context(), c)
|
||
|
if errors.Is(err, context.Canceled) {
|
||
|
return
|
||
|
}
|
||
|
if websocket.CloseStatus(err) == websocket.StatusNormalClosure ||
|
||
|
websocket.CloseStatus(err) == websocket.StatusGoingAway {
|
||
|
return
|
||
|
}
|
||
|
if err != nil {
|
||
|
fmt.Println(err.Error())
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (t *ChatSender) subscribe(ctx context.Context, c *websocket.Conn) error {
|
||
|
ctx = c.CloseRead(ctx)
|
||
|
|
||
|
s := &subscriber{
|
||
|
msgs: make(chan []byte, 16),
|
||
|
closeSlow: func() {
|
||
|
c.Close(websocket.StatusPolicyViolation, "connection too slow to keep up with messages")
|
||
|
},
|
||
|
}
|
||
|
t.addSubscriber(s)
|
||
|
defer t.deleteSubscriber(s)
|
||
|
|
||
|
for {
|
||
|
select {
|
||
|
case msg := <-s.msgs:
|
||
|
err := writeTimeout(ctx, time.Second*5, c, msg)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
case <-ctx.Done():
|
||
|
return ctx.Err()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type MessageBase struct {
|
||
|
Action string
|
||
|
}
|
||
|
|
||
|
type TextMessage struct {
|
||
|
MessageBase
|
||
|
|
||
|
Message string
|
||
|
}
|
||
|
|
||
|
type ReindexEvent struct {
|
||
|
MessageBase
|
||
|
|
||
|
Event string
|
||
|
}
|
||
|
|
||
|
func (t *ChatSender) Publish(msg []byte) {
|
||
|
t.subscribersMu.Lock()
|
||
|
defer t.subscribersMu.Unlock()
|
||
|
|
||
|
for s := range t.subscribers {
|
||
|
select {
|
||
|
case s.msgs <- msg:
|
||
|
default:
|
||
|
go s.closeSlow()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
var IndexSender = newChatSender()
|
||
|
|
||
|
func SetupSettingsWebsocket() {
|
||
|
http.Handle("/subscribe", IndexSender)
|
||
|
}
|
||
|
|
||
|
// addSubscriber registers a subscriber.
|
||
|
func (t *ChatSender) addSubscriber(s *subscriber) {
|
||
|
t.subscribersMu.Lock()
|
||
|
t.subscribers[s] = struct{}{}
|
||
|
t.subscribersMu.Unlock()
|
||
|
}
|
||
|
|
||
|
// deleteSubscriber deletes the given subscriber.
|
||
|
func (t *ChatSender) deleteSubscriber(s *subscriber) {
|
||
|
t.subscribersMu.Lock()
|
||
|
delete(t.subscribers, s)
|
||
|
t.subscribersMu.Unlock()
|
||
|
}
|
||
|
|
||
|
func writeTimeout(ctx context.Context, timeout time.Duration, c *websocket.Conn, msg []byte) error {
|
||
|
ctx, cancel := context.WithTimeout(ctx, timeout)
|
||
|
defer cancel()
|
||
|
|
||
|
return c.Write(ctx, websocket.MessageText, msg)
|
||
|
}
|