package pubsub import ( "context" "encoding/json" "log" "director/api/ws" "github.com/redis/go-redis/v9" ) const Channel = "director:events" type Event struct { Type string `json:"event"` Data any `json:"data"` } type PubSub struct { rdb *redis.Client hub *ws.Hub } func New(redisAddr string, hub *ws.Hub) *PubSub { rdb := redis.NewClient(&redis.Options{ Addr: redisAddr, }) return &PubSub{rdb: rdb, hub: hub} } // Publish sends an event to Redis pub/sub. // Called by the API handlers and can also be called by external agents via redis-cli. func (ps *PubSub) Publish(ctx context.Context, event string, data any) error { msg, err := json.Marshal(Event{Type: event, Data: data}) if err != nil { return err } return ps.rdb.Publish(ctx, Channel, msg).Err() } // Subscribe listens for events on Redis pub/sub and forwards them to WebSocket clients. // This bridges external publishers (OpenClaw agents, cron jobs) to the React UI. func (ps *PubSub) Subscribe(ctx context.Context) { sub := ps.rdb.Subscribe(ctx, Channel) ch := sub.Channel() log.Printf("Redis pub/sub subscribed to %s", Channel) for msg := range ch { ps.hub.Broadcast([]byte(msg.Payload)) } } // Ping tests the Redis connection. func (ps *PubSub) Ping(ctx context.Context) error { return ps.rdb.Ping(ctx).Err() } // Close shuts down the Redis client. func (ps *PubSub) Close() error { return ps.rdb.Close() }