go: keepalive, exponential backoff, chain_id metadata, durability guarantees
Three related fixes that turn the go template into a client that survives the full matrix of server restart, client restart, network blip, half-open TCP, and long outages (hours → months) — without the user writing a line of reconnect logic in process.go. 1. gRPC keepalive: Time=10s, Timeout=3s, PermitWithoutStream=true. Half-open TCP (silent server restart, resumed laptop, NAT drop) is detected within ~13s. Previously the OS TCP keepalive took ~2h to notice, leaving the client as a ghost stream while prime logged "no active gRPC connection" for every skipped transaction. 2. Exponential backoff with jitter on reconnect. Effective delay = min(max_backoff_seconds, reconnect_delay_seconds * 2^attempts) + random(0, reconnect_delay_seconds). The attempts counter resets after any session that runs healthy for 60+ seconds. Jitter desynchronises clients so a server restart doesn't trigger a thundering herd. New max_backoff_seconds config field, default 120. 3. Unified error signalling: the sender goroutine now tears down the stream's context when it hits a Send error. Previously only Recv errors triggered a reconnect — a stale stream where only Send was broken could sit there indefinitely. Also: chain_id is a required config field now and goes in the x-chain-id gRPC metadata header alongside x-api-key and x-smart-contract-id. Prime rejects streams without it with "missing chain ID", which was silently breaking every template-based client until users discovered it the hard way. README documents the durability contract so contract authors know they don't have to reimplement any of it.
This commit is contained in:
172
go/main.go
172
go/main.go
@@ -7,6 +7,7 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/signal"
|
||||
"sync"
|
||||
@@ -17,6 +18,7 @@ import (
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
@@ -25,18 +27,41 @@ import (
|
||||
// Configuration and Client Infrastructure
|
||||
// Do not modify this file unless you need to customize the client behavior.
|
||||
// Implement your smart contract logic in process.go instead.
|
||||
//
|
||||
// Durability contract (provided by this file, no work for the user):
|
||||
// - If the Dragonchain Prime server restarts, updates, or momentarily
|
||||
// drops the network, this client auto-reconnects. Transactions
|
||||
// observed during the outage are queued by prime and delivered once
|
||||
// the stream is re-established.
|
||||
// - If this client restarts (crash, deploy, long sleep), it rejoins
|
||||
// the stream and prime re-delivers every still-pending transaction
|
||||
// that should have invoked it, oldest first.
|
||||
// - Half-open TCP (a silent peer that never sent FIN) is detected
|
||||
// within ~13 s via gRPC keepalive pings. No dangling ghost streams.
|
||||
// =============================================================================
|
||||
|
||||
// Config holds the client configuration loaded from YAML
|
||||
type Config struct {
|
||||
ServerAddress string `yaml:"server_address"`
|
||||
SmartContractID string `yaml:"smart_contract_id"`
|
||||
APIKey string `yaml:"api_key"`
|
||||
UseTLS bool `yaml:"use_tls"`
|
||||
TLSCertPath string `yaml:"tls_cert_path"`
|
||||
NumWorkers int `yaml:"num_workers"`
|
||||
ReconnectDelaySecs int `yaml:"reconnect_delay_seconds"`
|
||||
MaxReconnectAttempts int `yaml:"max_reconnect_attempts"`
|
||||
ServerAddress string `yaml:"server_address"`
|
||||
ChainID string `yaml:"chain_id"`
|
||||
SmartContractID string `yaml:"smart_contract_id"`
|
||||
APIKey string `yaml:"api_key"`
|
||||
UseTLS bool `yaml:"use_tls"`
|
||||
TLSCertPath string `yaml:"tls_cert_path"`
|
||||
NumWorkers int `yaml:"num_workers"`
|
||||
|
||||
// ReconnectDelaySecs is the BASE backoff between reconnect attempts.
|
||||
// The effective delay is `base * 2^attempts + jitter` capped at
|
||||
// MaxBackoffSeconds — so repeated failures back off, but a clean
|
||||
// server restart is picked up within a few seconds.
|
||||
ReconnectDelaySecs int `yaml:"reconnect_delay_seconds"`
|
||||
|
||||
// MaxBackoffSeconds caps the exponential backoff. Default 120.
|
||||
MaxBackoffSeconds int `yaml:"max_backoff_seconds"`
|
||||
|
||||
// MaxReconnectAttempts: 0 = infinite (default and recommended — the
|
||||
// whole point of this client is to stay available indefinitely).
|
||||
MaxReconnectAttempts int `yaml:"max_reconnect_attempts"`
|
||||
}
|
||||
|
||||
// Client manages the gRPC connection and request processing
|
||||
@@ -72,6 +97,18 @@ func (c *Client) Connect() error {
|
||||
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
}
|
||||
|
||||
// Keepalive is the load-bearing piece for detecting a half-open
|
||||
// connection. Without it, a silent peer (prime restarted without
|
||||
// sending FIN; laptop resumed from sleep; corporate NAT dropped the
|
||||
// flow) leaves us in a "connected" state until the OS-level TCP
|
||||
// keepalive eventually fires — which on Linux defaults to ~2 hours.
|
||||
// 10 s ping + 3 s timeout catches all of that within ~13 s.
|
||||
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: 10 * time.Second,
|
||||
Timeout: 3 * time.Second,
|
||||
PermitWithoutStream: true,
|
||||
}))
|
||||
|
||||
conn, err := grpc.NewClient(c.config.ServerAddress, opts...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to connect to server: %w", err)
|
||||
@@ -91,17 +128,31 @@ func (c *Client) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run starts the client and processes incoming requests
|
||||
// Run starts the client and processes incoming requests. It returns
|
||||
// when the stream terminates for any reason (server close, network
|
||||
// error, ctx cancellation). The outer reconnect loop in main() calls
|
||||
// Run again after a backoff.
|
||||
func (c *Client) Run(ctx context.Context) error {
|
||||
// Create metadata with authentication headers
|
||||
// Wrap ctx with our own cancel so the sender goroutine can tear down
|
||||
// the stream on Send errors — otherwise stream.Recv() in the main
|
||||
// loop could block forever waiting for a peer that is never coming
|
||||
// back. Any cancel from here propagates to both directions of the
|
||||
// bidi stream.
|
||||
streamCtx, streamCancel := context.WithCancel(ctx)
|
||||
defer streamCancel()
|
||||
|
||||
// Auth + routing metadata. x-chain-id is required by the server; a
|
||||
// missing header yields "missing chain ID" from prime and no
|
||||
// transactions will arrive.
|
||||
md := metadata.Pairs(
|
||||
"x-api-key", c.config.APIKey,
|
||||
"x-smart-contract-id", c.config.SmartContractID,
|
||||
"x-chain-id", c.config.ChainID,
|
||||
)
|
||||
ctx = metadata.NewOutgoingContext(ctx, md)
|
||||
streamCtx = metadata.NewOutgoingContext(streamCtx, md)
|
||||
|
||||
// Establish the bi-directional stream
|
||||
stream, err := c.grpcClient.Run(ctx)
|
||||
stream, err := c.grpcClient.Run(streamCtx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to establish stream: %w", err)
|
||||
}
|
||||
@@ -110,15 +161,17 @@ func (c *Client) Run(ctx context.Context) error {
|
||||
|
||||
// Channel to collect responses from workers
|
||||
responseChan := make(chan *pb.SmartContractResponse, c.config.NumWorkers*2)
|
||||
errChan := make(chan error, 1)
|
||||
errChan := make(chan error, 2)
|
||||
|
||||
// Start worker goroutines
|
||||
for i := 0; i < c.config.NumWorkers; i++ {
|
||||
c.wg.Add(1)
|
||||
go c.worker(ctx, responseChan)
|
||||
go c.worker(streamCtx, responseChan)
|
||||
}
|
||||
|
||||
// Goroutine to send responses back to server
|
||||
// Sender: forwards worker responses back to the server. Any Send
|
||||
// error immediately cancels streamCtx so the Recv loop below exits
|
||||
// instead of blocking forever.
|
||||
go func() {
|
||||
for resp := range responseChan {
|
||||
if err := stream.Send(resp); err != nil {
|
||||
@@ -127,12 +180,17 @@ func (c *Client) Run(ctx context.Context) error {
|
||||
case errChan <- err:
|
||||
default:
|
||||
}
|
||||
streamCancel()
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// Main loop: receive requests and dispatch to workers
|
||||
// Main loop: receive requests and dispatch to workers. stream.Recv
|
||||
// returns when the peer closes the stream, when streamCtx is cancelled
|
||||
// (e.g. because the sender goroutine hit an error), or on a real
|
||||
// transport error.
|
||||
var recvErr error
|
||||
for {
|
||||
req, err := stream.Recv()
|
||||
if err == io.EOF {
|
||||
@@ -140,24 +198,40 @@ func (c *Client) Run(ctx context.Context) error {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
return fmt.Errorf("error receiving request: %w", err)
|
||||
recvErr = err
|
||||
break
|
||||
}
|
||||
|
||||
c.logger.Printf("Received request: transaction_id=%s", req.TransactionId)
|
||||
|
||||
select {
|
||||
case c.workChan <- req:
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
case <-streamCtx.Done():
|
||||
recvErr = streamCtx.Err()
|
||||
goto cleanup
|
||||
}
|
||||
}
|
||||
|
||||
// Cleanup
|
||||
cleanup:
|
||||
// Tear down in-flight workers. Cancelling streamCtx was already done
|
||||
// via defer; close(workChan) lets the worker goroutines exit their
|
||||
// range loop cleanly.
|
||||
close(c.workChan)
|
||||
c.wg.Wait()
|
||||
close(responseChan)
|
||||
c.workChan = make(chan *pb.SmartContractRequest, c.config.NumWorkers*2)
|
||||
|
||||
return nil
|
||||
if recvErr != nil {
|
||||
return fmt.Errorf("error receiving request: %w", recvErr)
|
||||
}
|
||||
// Surface any earlier Send error the sender goroutine parked on
|
||||
// errChan so the reconnect loop sees it.
|
||||
select {
|
||||
case err := <-errChan:
|
||||
return fmt.Errorf("stream send error: %w", err)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// worker processes requests from the work channel
|
||||
@@ -222,7 +296,8 @@ func LoadConfig(path string) (*Config, error) {
|
||||
|
||||
config := &Config{
|
||||
NumWorkers: 10,
|
||||
ReconnectDelaySecs: 5,
|
||||
ReconnectDelaySecs: 3,
|
||||
MaxBackoffSeconds: 120,
|
||||
}
|
||||
|
||||
if err := yaml.Unmarshal(data, config); err != nil {
|
||||
@@ -233,6 +308,9 @@ func LoadConfig(path string) (*Config, error) {
|
||||
if config.ServerAddress == "" {
|
||||
return nil, fmt.Errorf("server_address is required")
|
||||
}
|
||||
if config.ChainID == "" {
|
||||
return nil, fmt.Errorf("chain_id is required")
|
||||
}
|
||||
if config.SmartContractID == "" {
|
||||
return nil, fmt.Errorf("smart_contract_id is required")
|
||||
}
|
||||
@@ -243,6 +321,39 @@ func LoadConfig(path string) (*Config, error) {
|
||||
return config, nil
|
||||
}
|
||||
|
||||
// nextBackoff returns the duration to sleep before the next reconnect.
|
||||
// Computed as base * 2^attempts with a random jitter in [0, base) and
|
||||
// capped at MaxBackoffSeconds. Jitter matters when many clients
|
||||
// reconnect simultaneously after a server restart — it desynchronises
|
||||
// them so they don't all slam accept() at the same instant.
|
||||
func nextBackoff(cfg *Config, attempts int) time.Duration {
|
||||
base := time.Duration(cfg.ReconnectDelaySecs) * time.Second
|
||||
if base <= 0 {
|
||||
base = 3 * time.Second
|
||||
}
|
||||
maxBackoff := time.Duration(cfg.MaxBackoffSeconds) * time.Second
|
||||
if maxBackoff <= 0 {
|
||||
maxBackoff = 120 * time.Second
|
||||
}
|
||||
|
||||
// Cap the exponent so we don't overflow. 2^10 = 1024 ≈ always
|
||||
// clipped by maxBackoff anyway, but keep the math bounded.
|
||||
shift := attempts
|
||||
if shift > 10 {
|
||||
shift = 10
|
||||
}
|
||||
delay := base << shift
|
||||
if delay > maxBackoff {
|
||||
delay = maxBackoff
|
||||
}
|
||||
|
||||
// Jitter range == base, independent of attempts. Adding it ensures
|
||||
// we don't schedule a thundering herd on the next attempt even if
|
||||
// every client started with the same `attempts` count.
|
||||
jitter := time.Duration(rand.Int63n(int64(base)))
|
||||
return delay + jitter
|
||||
}
|
||||
|
||||
func main() {
|
||||
configPath := flag.String("config", "config.yaml", "Path to configuration file")
|
||||
flag.Parse()
|
||||
@@ -268,20 +379,29 @@ func main() {
|
||||
cancel()
|
||||
}()
|
||||
|
||||
// Connection loop with reconnection logic
|
||||
// Connection loop with reconnection logic. A "successful" session
|
||||
// is defined as one where Run() was active for at least 60 s —
|
||||
// anything longer is almost certainly real work, so reset attempts
|
||||
// so the next failure starts the backoff schedule fresh.
|
||||
attempts := 0
|
||||
const healthyRunThreshold = 60 * time.Second
|
||||
|
||||
for {
|
||||
if err := client.Connect(); err != nil {
|
||||
log.Printf("Connection failed: %v", err)
|
||||
} else {
|
||||
attempts = 0
|
||||
start := time.Now()
|
||||
if err := client.Run(ctx); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
log.Println("Shutdown requested")
|
||||
_ = client.Close()
|
||||
break
|
||||
}
|
||||
log.Printf("Stream error: %v", err)
|
||||
}
|
||||
if time.Since(start) > healthyRunThreshold {
|
||||
attempts = 0
|
||||
}
|
||||
}
|
||||
|
||||
_ = client.Close()
|
||||
@@ -297,13 +417,13 @@ func main() {
|
||||
break
|
||||
}
|
||||
|
||||
delay := time.Duration(config.ReconnectDelaySecs) * time.Second
|
||||
delay := nextBackoff(config, attempts-1)
|
||||
log.Printf("Reconnecting in %v (attempt %d)...", delay, attempts)
|
||||
|
||||
select {
|
||||
case <-time.After(delay):
|
||||
case <-ctx.Done():
|
||||
break
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user