package ws import ( "encoding/json" "sync" "github.com/gorilla/websocket" ) type Client struct { Conn *websocket.Conn Send chan []byte } type Subscription struct { Client *Client Room string done chan struct{} // used by RegisterSync to wait for completion } type RoomMessage struct { Room string Message []byte } type Hub struct { rooms map[string]map[*Client]bool register chan *Subscription unregister chan *Subscription broadcast chan *RoomMessage mu sync.RWMutex } func NewHub() *Hub { return &Hub{ rooms: make(map[string]map[*Client]bool), register: make(chan *Subscription, 10), unregister: make(chan *Subscription, 10), broadcast: make(chan *RoomMessage, 100), } } func (h *Hub) Run() { for { select { case sub := <-h.register: h.mu.Lock() if h.rooms[sub.Room] == nil { h.rooms[sub.Room] = make(map[*Client]bool) } h.rooms[sub.Room][sub.Client] = true h.mu.Unlock() if sub.done != nil { close(sub.done) } case sub := <-h.unregister: h.mu.Lock() if clients, ok := h.rooms[sub.Room]; ok { if _, exists := clients[sub.Client]; exists { delete(clients, sub.Client) close(sub.Client.Send) } } h.mu.Unlock() case msg := <-h.broadcast: h.mu.RLock() if clients, ok := h.rooms[msg.Room]; ok { for client := range clients { select { case client.Send <- msg.Message: default: close(client.Send) delete(clients, client) } } } h.mu.RUnlock() } } } func (h *Hub) Register(sub *Subscription) { h.register <- sub } // RegisterSync registers a subscription and blocks until the Hub has processed it, // ensuring the client is in the room before returning. func (h *Hub) RegisterSync(sub *Subscription) { sub.done = make(chan struct{}) h.register <- sub <-sub.done } func (h *Hub) Unregister(sub *Subscription) { h.unregister <- sub } func (h *Hub) BroadcastToRoom(room string, data interface{}) { jsonData, err := json.Marshal(data) if err != nil { return } h.broadcast <- &RoomMessage{Room: room, Message: jsonData} }