64 lines
1.4 KiB
Go
64 lines
1.4 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"log"
|
|
|
|
"director/api/ws"
|
|
|
|
"github.com/redis/go-redis/v9"
|
|
)
|
|
|
|
const Channel = "director:events"
|
|
|
|
type Event struct {
|
|
Type string `json:"event"`
|
|
Data any `json:"data"`
|
|
}
|
|
|
|
type PubSub struct {
|
|
rdb *redis.Client
|
|
hub *ws.Hub
|
|
}
|
|
|
|
func New(redisAddr string, hub *ws.Hub) *PubSub {
|
|
rdb := redis.NewClient(&redis.Options{
|
|
Addr: redisAddr,
|
|
})
|
|
return &PubSub{rdb: rdb, hub: hub}
|
|
}
|
|
|
|
// Publish sends an event to Redis pub/sub.
|
|
// Called by the API handlers and can also be called by external agents via redis-cli.
|
|
func (ps *PubSub) Publish(ctx context.Context, event string, data any) error {
|
|
msg, err := json.Marshal(Event{Type: event, Data: data})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return ps.rdb.Publish(ctx, Channel, msg).Err()
|
|
}
|
|
|
|
// Subscribe listens for events on Redis pub/sub and forwards them to WebSocket clients.
|
|
// This bridges external publishers (OpenClaw agents, cron jobs) to the React UI.
|
|
func (ps *PubSub) Subscribe(ctx context.Context) {
|
|
sub := ps.rdb.Subscribe(ctx, Channel)
|
|
ch := sub.Channel()
|
|
|
|
log.Printf("Redis pub/sub subscribed to %s", Channel)
|
|
|
|
for msg := range ch {
|
|
ps.hub.Broadcast([]byte(msg.Payload))
|
|
}
|
|
}
|
|
|
|
// Ping tests the Redis connection.
|
|
func (ps *PubSub) Ping(ctx context.Context) error {
|
|
return ps.rdb.Ping(ctx).Err()
|
|
}
|
|
|
|
// Close shuts down the Redis client.
|
|
func (ps *PubSub) Close() error {
|
|
return ps.rdb.Close()
|
|
}
|