diff --git a/go/README.md b/go/README.md index 6fb7fde..cec3dee 100755 --- a/go/README.md +++ b/go/README.md @@ -43,6 +43,7 @@ make tools 4. **Configure your connection** by editing `config.yaml`: ```yaml server_address: "your-dragonchain-server:50051" + chain_id: "your-chain-public-id" smart_contract_id: "your-smart-contract-id" api_key: "your-api-key" ``` @@ -60,13 +61,24 @@ make tools | Field | Description | Default | |-------|-------------|---------| | `server_address` | gRPC server address | Required | +| `chain_id` | Public chain id the SC is registered on (sent as `x-chain-id` metadata) | Required | | `smart_contract_id` | Your smart contract ID | Required | | `api_key` | API key for authentication | Required | | `use_tls` | Enable TLS encryption | `false` | | `tls_cert_path` | Path to TLS certificate | - | | `num_workers` | Concurrent transaction processors | `10` | -| `reconnect_delay_seconds` | Delay between reconnection attempts | `5` | -| `max_reconnect_attempts` | Max reconnect attempts (0 = infinite) | `0` | +| `reconnect_delay_seconds` | Base delay for exponential backoff between reconnect attempts | `3` | +| `max_backoff_seconds` | Ceiling for the exponential backoff | `120` | +| `max_reconnect_attempts` | Max reconnect attempts (0 = infinite, recommended) | `0` | + +## Durability guarantees (provided by `main.go`, no work for you) + +- **Server restart, update, crash, or network blip** → the client auto-reconnects and resumes processing. Transactions observed while the stream was down stay queued on the Dragonchain Prime side and are delivered (oldest first) on reconnect. +- **Client restart or long outage** → when this process comes back up (minutes, hours, months later), it rejoins the stream and prime re-delivers every still-pending transaction that should have invoked it. +- **Half-open TCP** (silent peer, resumed laptop, corporate NAT dropping idle flows) is detected within ~13 seconds via gRPC keepalive and triggers a reconnect. No dangling ghost streams. +- **Reconnect storms** are avoided: exponential backoff with jitter means many clients reconnecting after a server restart don't all slam `accept()` at the same instant. The timer resets after a stream has been healthy for 60 seconds. + +These are invariants of the template — you do not add any of this in `process.go`. ## Implementing Your Smart Contract diff --git a/go/config.yaml b/go/config.yaml index fa0a0fe..cd44909 100755 --- a/go/config.yaml +++ b/go/config.yaml @@ -4,6 +4,11 @@ # The gRPC server address to connect to server_address: "localhost:50051" +# The public chain id on which this smart contract is registered. +# Sent as the x-chain-id gRPC metadata header — prime rejects streams +# without it. +chain_id: "your-chain-public-id" + # Your smart contract ID (provided by Dragonchain) smart_contract_id: "your-smart-contract-id" @@ -19,6 +24,12 @@ use_tls: false # Number of worker goroutines for processing transactions concurrently num_workers: 10 -# Reconnect settings -reconnect_delay_seconds: 5 +# Reconnect settings. The client uses exponential backoff with jitter: +# effective delay = min(max_backoff_seconds, reconnect_delay_seconds * 2^attempts) + random(0, reconnect_delay_seconds). +# Keep max_reconnect_attempts at 0 (infinite) unless you have a specific +# reason to stop — the client is designed to survive arbitrarily long +# outages and resume processing from the prime-side queue when the +# server returns. +reconnect_delay_seconds: 3 +max_backoff_seconds: 120 max_reconnect_attempts: 0 # 0 = infinite retries diff --git a/go/main.go b/go/main.go index e722016..4acaa3b 100755 --- a/go/main.go +++ b/go/main.go @@ -7,6 +7,7 @@ import ( "fmt" "io" "log" + "math/rand" "os" "os/signal" "sync" @@ -17,6 +18,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "gopkg.in/yaml.v3" ) @@ -25,18 +27,41 @@ import ( // Configuration and Client Infrastructure // Do not modify this file unless you need to customize the client behavior. // Implement your smart contract logic in process.go instead. +// +// Durability contract (provided by this file, no work for the user): +// - If the Dragonchain Prime server restarts, updates, or momentarily +// drops the network, this client auto-reconnects. Transactions +// observed during the outage are queued by prime and delivered once +// the stream is re-established. +// - If this client restarts (crash, deploy, long sleep), it rejoins +// the stream and prime re-delivers every still-pending transaction +// that should have invoked it, oldest first. +// - Half-open TCP (a silent peer that never sent FIN) is detected +// within ~13 s via gRPC keepalive pings. No dangling ghost streams. // ============================================================================= // Config holds the client configuration loaded from YAML type Config struct { - ServerAddress string `yaml:"server_address"` - SmartContractID string `yaml:"smart_contract_id"` - APIKey string `yaml:"api_key"` - UseTLS bool `yaml:"use_tls"` - TLSCertPath string `yaml:"tls_cert_path"` - NumWorkers int `yaml:"num_workers"` - ReconnectDelaySecs int `yaml:"reconnect_delay_seconds"` - MaxReconnectAttempts int `yaml:"max_reconnect_attempts"` + ServerAddress string `yaml:"server_address"` + ChainID string `yaml:"chain_id"` + SmartContractID string `yaml:"smart_contract_id"` + APIKey string `yaml:"api_key"` + UseTLS bool `yaml:"use_tls"` + TLSCertPath string `yaml:"tls_cert_path"` + NumWorkers int `yaml:"num_workers"` + + // ReconnectDelaySecs is the BASE backoff between reconnect attempts. + // The effective delay is `base * 2^attempts + jitter` capped at + // MaxBackoffSeconds — so repeated failures back off, but a clean + // server restart is picked up within a few seconds. + ReconnectDelaySecs int `yaml:"reconnect_delay_seconds"` + + // MaxBackoffSeconds caps the exponential backoff. Default 120. + MaxBackoffSeconds int `yaml:"max_backoff_seconds"` + + // MaxReconnectAttempts: 0 = infinite (default and recommended — the + // whole point of this client is to stay available indefinitely). + MaxReconnectAttempts int `yaml:"max_reconnect_attempts"` } // Client manages the gRPC connection and request processing @@ -72,6 +97,18 @@ func (c *Client) Connect() error { opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials())) } + // Keepalive is the load-bearing piece for detecting a half-open + // connection. Without it, a silent peer (prime restarted without + // sending FIN; laptop resumed from sleep; corporate NAT dropped the + // flow) leaves us in a "connected" state until the OS-level TCP + // keepalive eventually fires — which on Linux defaults to ~2 hours. + // 10 s ping + 3 s timeout catches all of that within ~13 s. + opts = append(opts, grpc.WithKeepaliveParams(keepalive.ClientParameters{ + Time: 10 * time.Second, + Timeout: 3 * time.Second, + PermitWithoutStream: true, + })) + conn, err := grpc.NewClient(c.config.ServerAddress, opts...) if err != nil { return fmt.Errorf("failed to connect to server: %w", err) @@ -91,17 +128,31 @@ func (c *Client) Close() error { return nil } -// Run starts the client and processes incoming requests +// Run starts the client and processes incoming requests. It returns +// when the stream terminates for any reason (server close, network +// error, ctx cancellation). The outer reconnect loop in main() calls +// Run again after a backoff. func (c *Client) Run(ctx context.Context) error { - // Create metadata with authentication headers + // Wrap ctx with our own cancel so the sender goroutine can tear down + // the stream on Send errors — otherwise stream.Recv() in the main + // loop could block forever waiting for a peer that is never coming + // back. Any cancel from here propagates to both directions of the + // bidi stream. + streamCtx, streamCancel := context.WithCancel(ctx) + defer streamCancel() + + // Auth + routing metadata. x-chain-id is required by the server; a + // missing header yields "missing chain ID" from prime and no + // transactions will arrive. md := metadata.Pairs( "x-api-key", c.config.APIKey, "x-smart-contract-id", c.config.SmartContractID, + "x-chain-id", c.config.ChainID, ) - ctx = metadata.NewOutgoingContext(ctx, md) + streamCtx = metadata.NewOutgoingContext(streamCtx, md) // Establish the bi-directional stream - stream, err := c.grpcClient.Run(ctx) + stream, err := c.grpcClient.Run(streamCtx) if err != nil { return fmt.Errorf("failed to establish stream: %w", err) } @@ -110,15 +161,17 @@ func (c *Client) Run(ctx context.Context) error { // Channel to collect responses from workers responseChan := make(chan *pb.SmartContractResponse, c.config.NumWorkers*2) - errChan := make(chan error, 1) + errChan := make(chan error, 2) // Start worker goroutines for i := 0; i < c.config.NumWorkers; i++ { c.wg.Add(1) - go c.worker(ctx, responseChan) + go c.worker(streamCtx, responseChan) } - // Goroutine to send responses back to server + // Sender: forwards worker responses back to the server. Any Send + // error immediately cancels streamCtx so the Recv loop below exits + // instead of blocking forever. go func() { for resp := range responseChan { if err := stream.Send(resp); err != nil { @@ -127,12 +180,17 @@ func (c *Client) Run(ctx context.Context) error { case errChan <- err: default: } + streamCancel() return } } }() - // Main loop: receive requests and dispatch to workers + // Main loop: receive requests and dispatch to workers. stream.Recv + // returns when the peer closes the stream, when streamCtx is cancelled + // (e.g. because the sender goroutine hit an error), or on a real + // transport error. + var recvErr error for { req, err := stream.Recv() if err == io.EOF { @@ -140,24 +198,40 @@ func (c *Client) Run(ctx context.Context) error { break } if err != nil { - return fmt.Errorf("error receiving request: %w", err) + recvErr = err + break } c.logger.Printf("Received request: transaction_id=%s", req.TransactionId) select { case c.workChan <- req: - case <-ctx.Done(): - return ctx.Err() + case <-streamCtx.Done(): + recvErr = streamCtx.Err() + goto cleanup } } - // Cleanup +cleanup: + // Tear down in-flight workers. Cancelling streamCtx was already done + // via defer; close(workChan) lets the worker goroutines exit their + // range loop cleanly. close(c.workChan) c.wg.Wait() close(responseChan) + c.workChan = make(chan *pb.SmartContractRequest, c.config.NumWorkers*2) - return nil + if recvErr != nil { + return fmt.Errorf("error receiving request: %w", recvErr) + } + // Surface any earlier Send error the sender goroutine parked on + // errChan so the reconnect loop sees it. + select { + case err := <-errChan: + return fmt.Errorf("stream send error: %w", err) + default: + return nil + } } // worker processes requests from the work channel @@ -222,7 +296,8 @@ func LoadConfig(path string) (*Config, error) { config := &Config{ NumWorkers: 10, - ReconnectDelaySecs: 5, + ReconnectDelaySecs: 3, + MaxBackoffSeconds: 120, } if err := yaml.Unmarshal(data, config); err != nil { @@ -233,6 +308,9 @@ func LoadConfig(path string) (*Config, error) { if config.ServerAddress == "" { return nil, fmt.Errorf("server_address is required") } + if config.ChainID == "" { + return nil, fmt.Errorf("chain_id is required") + } if config.SmartContractID == "" { return nil, fmt.Errorf("smart_contract_id is required") } @@ -243,6 +321,39 @@ func LoadConfig(path string) (*Config, error) { return config, nil } +// nextBackoff returns the duration to sleep before the next reconnect. +// Computed as base * 2^attempts with a random jitter in [0, base) and +// capped at MaxBackoffSeconds. Jitter matters when many clients +// reconnect simultaneously after a server restart — it desynchronises +// them so they don't all slam accept() at the same instant. +func nextBackoff(cfg *Config, attempts int) time.Duration { + base := time.Duration(cfg.ReconnectDelaySecs) * time.Second + if base <= 0 { + base = 3 * time.Second + } + maxBackoff := time.Duration(cfg.MaxBackoffSeconds) * time.Second + if maxBackoff <= 0 { + maxBackoff = 120 * time.Second + } + + // Cap the exponent so we don't overflow. 2^10 = 1024 ≈ always + // clipped by maxBackoff anyway, but keep the math bounded. + shift := attempts + if shift > 10 { + shift = 10 + } + delay := base << shift + if delay > maxBackoff { + delay = maxBackoff + } + + // Jitter range == base, independent of attempts. Adding it ensures + // we don't schedule a thundering herd on the next attempt even if + // every client started with the same `attempts` count. + jitter := time.Duration(rand.Int63n(int64(base))) + return delay + jitter +} + func main() { configPath := flag.String("config", "config.yaml", "Path to configuration file") flag.Parse() @@ -268,20 +379,29 @@ func main() { cancel() }() - // Connection loop with reconnection logic + // Connection loop with reconnection logic. A "successful" session + // is defined as one where Run() was active for at least 60 s — + // anything longer is almost certainly real work, so reset attempts + // so the next failure starts the backoff schedule fresh. attempts := 0 + const healthyRunThreshold = 60 * time.Second + for { if err := client.Connect(); err != nil { log.Printf("Connection failed: %v", err) } else { - attempts = 0 + start := time.Now() if err := client.Run(ctx); err != nil { if ctx.Err() != nil { log.Println("Shutdown requested") + _ = client.Close() break } log.Printf("Stream error: %v", err) } + if time.Since(start) > healthyRunThreshold { + attempts = 0 + } } _ = client.Close() @@ -297,13 +417,13 @@ func main() { break } - delay := time.Duration(config.ReconnectDelaySecs) * time.Second + delay := nextBackoff(config, attempts-1) log.Printf("Reconnecting in %v (attempt %d)...", delay, attempts) select { case <-time.After(delay): case <-ctx.Done(): - break + return } }