Files
sc-templates/go/main.go
Andrew Miller 2bc57c073d go: keepalive, exponential backoff, chain_id metadata, durability guarantees
Three related fixes that turn the go template into a client that
survives the full matrix of server restart, client restart, network
blip, half-open TCP, and long outages (hours → months) — without the
user writing a line of reconnect logic in process.go.

1. gRPC keepalive: Time=10s, Timeout=3s, PermitWithoutStream=true.
   Half-open TCP (silent server restart, resumed laptop, NAT drop)
   is detected within ~13s. Previously the OS TCP keepalive took
   ~2h to notice, leaving the client as a ghost stream while prime
   logged "no active gRPC connection" for every skipped transaction.

2. Exponential backoff with jitter on reconnect. Effective delay =
   min(max_backoff_seconds, reconnect_delay_seconds * 2^attempts)
   + random(0, reconnect_delay_seconds). The attempts counter resets
   after any session that runs healthy for 60+ seconds. Jitter
   desynchronises clients so a server restart doesn't trigger a
   thundering herd. New max_backoff_seconds config field, default 120.

3. Unified error signalling: the sender goroutine now tears down the
   stream's context when it hits a Send error. Previously only Recv
   errors triggered a reconnect — a stale stream where only Send was
   broken could sit there indefinitely.

Also: chain_id is a required config field now and goes in the
x-chain-id gRPC metadata header alongside x-api-key and
x-smart-contract-id. Prime rejects streams without it with "missing
chain ID", which was silently breaking every template-based client
until users discovered it the hard way. README documents the
durability contract so contract authors know they don't have to
reimplement any of it.
2026-04-19 21:23:47 -04:00

432 lines
13 KiB
Go
Executable File

package main
import (
"context"
"encoding/json"
"flag"
"fmt"
"io"
"log"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
pb "github.com/your-org/smart-contract/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"gopkg.in/yaml.v3"
)
// =============================================================================
// Configuration and Client Infrastructure
// Do not modify this file unless you need to customize the client behavior.
// Implement your smart contract logic in process.go instead.
//
// Durability contract (provided by this file, no work for the user):
// - If the Dragonchain Prime server restarts, updates, or momentarily
// drops the network, this client auto-reconnects. Transactions
// observed during the outage are queued by prime and delivered once
// the stream is re-established.
// - If this client restarts (crash, deploy, long sleep), it rejoins
// the stream and prime re-delivers every still-pending transaction
// that should have invoked it, oldest first.
// - Half-open TCP (a silent peer that never sent FIN) is detected
// within ~13 s via gRPC keepalive pings. No dangling ghost streams.
// =============================================================================
// Config holds the client configuration loaded from YAML
type Config struct {
ServerAddress string `yaml:"server_address"`
ChainID string `yaml:"chain_id"`
SmartContractID string `yaml:"smart_contract_id"`
APIKey string `yaml:"api_key"`
UseTLS bool `yaml:"use_tls"`
TLSCertPath string `yaml:"tls_cert_path"`
NumWorkers int `yaml:"num_workers"`
// ReconnectDelaySecs is the BASE backoff between reconnect attempts.
// The effective delay is `base * 2^attempts + jitter` capped at
// MaxBackoffSeconds — so repeated failures back off, but a clean
// server restart is picked up within a few seconds.
ReconnectDelaySecs int `yaml:"reconnect_delay_seconds"`
// MaxBackoffSeconds caps the exponential backoff. Default 120.
MaxBackoffSeconds int `yaml:"max_backoff_seconds"`
// MaxReconnectAttempts: 0 = infinite (default and recommended — the
// whole point of this client is to stay available indefinitely).
MaxReconnectAttempts int `yaml:"max_reconnect_attempts"`
}
// Client manages the gRPC connection and request processing
type Client struct {
config *Config
conn *grpc.ClientConn
grpcClient pb.SmartContractServiceClient
workChan chan *pb.SmartContractRequest
wg sync.WaitGroup
logger *log.Logger
}
// NewClient creates a new smart contract client
func NewClient(config *Config) *Client {
return &Client{
config: config,
workChan: make(chan *pb.SmartContractRequest, config.NumWorkers*2),
logger: log.New(os.Stdout, "[SC-Client] ", log.LstdFlags|log.Lmicroseconds),
}
}
// Connect establishes a connection to the gRPC server
func (c *Client) Connect() error {
var opts []grpc.DialOption
if c.config.UseTLS {
creds, err := credentials.NewClientTLSFromFile(c.config.TLSCertPath, "")
if err != nil {
return fmt.Errorf("failed to load TLS credentials: %w", err)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
// Keepalive is the load-bearing piece for detecting a half-open
// connection. Without it, a silent peer (prime restarted without
// sending FIN; laptop resumed from sleep; corporate NAT dropped the
// flow) leaves us in a "connected" state until the OS-level TCP
// keepalive eventually fires — which on Linux defaults to ~2 hours.
// 10 s ping + 3 s timeout catches all of that within ~13 s.
opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 10 * time.Second,
Timeout: 3 * time.Second,
PermitWithoutStream: true,
}))
conn, err := grpc.NewClient(c.config.ServerAddress, opts...)
if err != nil {
return fmt.Errorf("failed to connect to server: %w", err)
}
c.conn = conn
c.grpcClient = pb.NewSmartContractServiceClient(conn)
c.logger.Printf("Connected to server at %s", c.config.ServerAddress)
return nil
}
// Close closes the gRPC connection
func (c *Client) Close() error {
if c.conn != nil {
return c.conn.Close()
}
return nil
}
// Run starts the client and processes incoming requests. It returns
// when the stream terminates for any reason (server close, network
// error, ctx cancellation). The outer reconnect loop in main() calls
// Run again after a backoff.
func (c *Client) Run(ctx context.Context) error {
// Wrap ctx with our own cancel so the sender goroutine can tear down
// the stream on Send errors — otherwise stream.Recv() in the main
// loop could block forever waiting for a peer that is never coming
// back. Any cancel from here propagates to both directions of the
// bidi stream.
streamCtx, streamCancel := context.WithCancel(ctx)
defer streamCancel()
// Auth + routing metadata. x-chain-id is required by the server; a
// missing header yields "missing chain ID" from prime and no
// transactions will arrive.
md := metadata.Pairs(
"x-api-key", c.config.APIKey,
"x-smart-contract-id", c.config.SmartContractID,
"x-chain-id", c.config.ChainID,
)
streamCtx = metadata.NewOutgoingContext(streamCtx, md)
// Establish the bi-directional stream
stream, err := c.grpcClient.Run(streamCtx)
if err != nil {
return fmt.Errorf("failed to establish stream: %w", err)
}
c.logger.Printf("Stream established, starting %d workers", c.config.NumWorkers)
// Channel to collect responses from workers
responseChan := make(chan *pb.SmartContractResponse, c.config.NumWorkers*2)
errChan := make(chan error, 2)
// Start worker goroutines
for i := 0; i < c.config.NumWorkers; i++ {
c.wg.Add(1)
go c.worker(streamCtx, responseChan)
}
// Sender: forwards worker responses back to the server. Any Send
// error immediately cancels streamCtx so the Recv loop below exits
// instead of blocking forever.
go func() {
for resp := range responseChan {
if err := stream.Send(resp); err != nil {
c.logger.Printf("Error sending response: %v", err)
select {
case errChan <- err:
default:
}
streamCancel()
return
}
}
}()
// Main loop: receive requests and dispatch to workers. stream.Recv
// returns when the peer closes the stream, when streamCtx is cancelled
// (e.g. because the sender goroutine hit an error), or on a real
// transport error.
var recvErr error
for {
req, err := stream.Recv()
if err == io.EOF {
c.logger.Println("Server closed the stream")
break
}
if err != nil {
recvErr = err
break
}
c.logger.Printf("Received request: transaction_id=%s", req.TransactionId)
select {
case c.workChan <- req:
case <-streamCtx.Done():
recvErr = streamCtx.Err()
goto cleanup
}
}
cleanup:
// Tear down in-flight workers. Cancelling streamCtx was already done
// via defer; close(workChan) lets the worker goroutines exit their
// range loop cleanly.
close(c.workChan)
c.wg.Wait()
close(responseChan)
c.workChan = make(chan *pb.SmartContractRequest, c.config.NumWorkers*2)
if recvErr != nil {
return fmt.Errorf("error receiving request: %w", recvErr)
}
// Surface any earlier Send error the sender goroutine parked on
// errChan so the reconnect loop sees it.
select {
case err := <-errChan:
return fmt.Errorf("stream send error: %w", err)
default:
return nil
}
}
// worker processes requests from the work channel
func (c *Client) worker(ctx context.Context, responseChan chan<- *pb.SmartContractResponse) {
defer c.wg.Done()
for {
select {
case req, ok := <-c.workChan:
if !ok {
return
}
c.processRequest(ctx, req, responseChan)
case <-ctx.Done():
return
}
}
}
// processRequest handles a single request
func (c *Client) processRequest(ctx context.Context, req *pb.SmartContractRequest, responseChan chan<- *pb.SmartContractResponse) {
// Capture logs (in production, you might want a more sophisticated logging approach)
var logs string
// Call the user-defined Process function
result := Process(ctx, req.TransactionJson, req.EnvVars, req.Secrets)
// Build the response
resp := &pb.SmartContractResponse{
TransactionId: req.TransactionId,
OutputToChain: result.OutputToChain,
Logs: logs,
}
if result.Error != nil {
resp.Error = result.Error.Error()
c.logger.Printf("Error processing transaction %s: %v", req.TransactionId, result.Error)
} else {
// Marshal the result data to JSON
resultJSON, err := json.Marshal(result.Data)
if err != nil {
resp.Error = fmt.Sprintf("failed to marshal result: %v", err)
c.logger.Printf("Error marshaling result for transaction %s: %v", req.TransactionId, err)
} else {
resp.ResultJson = string(resultJSON)
c.logger.Printf("Successfully processed transaction %s", req.TransactionId)
}
}
select {
case responseChan <- resp:
case <-ctx.Done():
}
}
// LoadConfig loads configuration from a YAML file
func LoadConfig(path string) (*Config, error) {
data, err := os.ReadFile(path)
if err != nil {
return nil, fmt.Errorf("failed to read config file: %w", err)
}
config := &Config{
NumWorkers: 10,
ReconnectDelaySecs: 3,
MaxBackoffSeconds: 120,
}
if err := yaml.Unmarshal(data, config); err != nil {
return nil, fmt.Errorf("failed to parse config file: %w", err)
}
// Validate required fields
if config.ServerAddress == "" {
return nil, fmt.Errorf("server_address is required")
}
if config.ChainID == "" {
return nil, fmt.Errorf("chain_id is required")
}
if config.SmartContractID == "" {
return nil, fmt.Errorf("smart_contract_id is required")
}
if config.APIKey == "" {
return nil, fmt.Errorf("api_key is required")
}
return config, nil
}
// nextBackoff returns the duration to sleep before the next reconnect.
// Computed as base * 2^attempts with a random jitter in [0, base) and
// capped at MaxBackoffSeconds. Jitter matters when many clients
// reconnect simultaneously after a server restart — it desynchronises
// them so they don't all slam accept() at the same instant.
func nextBackoff(cfg *Config, attempts int) time.Duration {
base := time.Duration(cfg.ReconnectDelaySecs) * time.Second
if base <= 0 {
base = 3 * time.Second
}
maxBackoff := time.Duration(cfg.MaxBackoffSeconds) * time.Second
if maxBackoff <= 0 {
maxBackoff = 120 * time.Second
}
// Cap the exponent so we don't overflow. 2^10 = 1024 ≈ always
// clipped by maxBackoff anyway, but keep the math bounded.
shift := attempts
if shift > 10 {
shift = 10
}
delay := base << shift
if delay > maxBackoff {
delay = maxBackoff
}
// Jitter range == base, independent of attempts. Adding it ensures
// we don't schedule a thundering herd on the next attempt even if
// every client started with the same `attempts` count.
jitter := time.Duration(rand.Int63n(int64(base)))
return delay + jitter
}
func main() {
configPath := flag.String("config", "config.yaml", "Path to configuration file")
flag.Parse()
// Load configuration
config, err := LoadConfig(*configPath)
if err != nil {
log.Fatalf("Failed to load config: %v", err)
}
// Create client
client := NewClient(config)
// Setup signal handling for graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
sig := <-sigChan
log.Printf("Received signal %v, shutting down...", sig)
cancel()
}()
// Connection loop with reconnection logic. A "successful" session
// is defined as one where Run() was active for at least 60 s —
// anything longer is almost certainly real work, so reset attempts
// so the next failure starts the backoff schedule fresh.
attempts := 0
const healthyRunThreshold = 60 * time.Second
for {
if err := client.Connect(); err != nil {
log.Printf("Connection failed: %v", err)
} else {
start := time.Now()
if err := client.Run(ctx); err != nil {
if ctx.Err() != nil {
log.Println("Shutdown requested")
_ = client.Close()
break
}
log.Printf("Stream error: %v", err)
}
if time.Since(start) > healthyRunThreshold {
attempts = 0
}
}
_ = client.Close()
// Check if we should stop reconnecting
if ctx.Err() != nil {
break
}
attempts++
if config.MaxReconnectAttempts > 0 && attempts >= config.MaxReconnectAttempts {
log.Printf("Max reconnection attempts (%d) reached, exiting", config.MaxReconnectAttempts)
break
}
delay := nextBackoff(config, attempts-1)
log.Printf("Reconnecting in %v (attempt %d)...", delay, attempts)
select {
case <-time.After(delay):
case <-ctx.Done():
return
}
}
log.Println("Client shut down")
}