Initial commit — Director app (API + UI)
This commit is contained in:
63
api/pubsub/pubsub.go
Normal file
63
api/pubsub/pubsub.go
Normal file
@@ -0,0 +1,63 @@
|
||||
package pubsub
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"log"
|
||||
|
||||
"director/api/ws"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
const Channel = "director:events"
|
||||
|
||||
type Event struct {
|
||||
Type string `json:"event"`
|
||||
Data any `json:"data"`
|
||||
}
|
||||
|
||||
type PubSub struct {
|
||||
rdb *redis.Client
|
||||
hub *ws.Hub
|
||||
}
|
||||
|
||||
func New(redisAddr string, hub *ws.Hub) *PubSub {
|
||||
rdb := redis.NewClient(&redis.Options{
|
||||
Addr: redisAddr,
|
||||
})
|
||||
return &PubSub{rdb: rdb, hub: hub}
|
||||
}
|
||||
|
||||
// Publish sends an event to Redis pub/sub.
|
||||
// Called by the API handlers and can also be called by external agents via redis-cli.
|
||||
func (ps *PubSub) Publish(ctx context.Context, event string, data any) error {
|
||||
msg, err := json.Marshal(Event{Type: event, Data: data})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return ps.rdb.Publish(ctx, Channel, msg).Err()
|
||||
}
|
||||
|
||||
// Subscribe listens for events on Redis pub/sub and forwards them to WebSocket clients.
|
||||
// This bridges external publishers (OpenClaw agents, cron jobs) to the React UI.
|
||||
func (ps *PubSub) Subscribe(ctx context.Context) {
|
||||
sub := ps.rdb.Subscribe(ctx, Channel)
|
||||
ch := sub.Channel()
|
||||
|
||||
log.Printf("Redis pub/sub subscribed to %s", Channel)
|
||||
|
||||
for msg := range ch {
|
||||
ps.hub.Broadcast([]byte(msg.Payload))
|
||||
}
|
||||
}
|
||||
|
||||
// Ping tests the Redis connection.
|
||||
func (ps *PubSub) Ping(ctx context.Context) error {
|
||||
return ps.rdb.Ping(ctx).Err()
|
||||
}
|
||||
|
||||
// Close shuts down the Redis client.
|
||||
func (ps *PubSub) Close() error {
|
||||
return ps.rdb.Close()
|
||||
}
|
||||
Reference in New Issue
Block a user