312 lines
7.8 KiB
Go
Executable File
312 lines
7.8 KiB
Go
Executable File
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"flag"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
pb "github.com/your-org/smart-contract/proto"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/metadata"
|
|
"gopkg.in/yaml.v3"
|
|
)
|
|
|
|
// =============================================================================
|
|
// Configuration and Client Infrastructure
|
|
// Do not modify this file unless you need to customize the client behavior.
|
|
// Implement your smart contract logic in process.go instead.
|
|
// =============================================================================
|
|
|
|
// Config holds the client configuration loaded from YAML
|
|
type Config struct {
|
|
ServerAddress string `yaml:"server_address"`
|
|
SmartContractID string `yaml:"smart_contract_id"`
|
|
APIKey string `yaml:"api_key"`
|
|
UseTLS bool `yaml:"use_tls"`
|
|
TLSCertPath string `yaml:"tls_cert_path"`
|
|
NumWorkers int `yaml:"num_workers"`
|
|
ReconnectDelaySecs int `yaml:"reconnect_delay_seconds"`
|
|
MaxReconnectAttempts int `yaml:"max_reconnect_attempts"`
|
|
}
|
|
|
|
// Client manages the gRPC connection and request processing
|
|
type Client struct {
|
|
config *Config
|
|
conn *grpc.ClientConn
|
|
grpcClient pb.SmartContractServiceClient
|
|
workChan chan *pb.SmartContractRequest
|
|
wg sync.WaitGroup
|
|
logger *log.Logger
|
|
}
|
|
|
|
// NewClient creates a new smart contract client
|
|
func NewClient(config *Config) *Client {
|
|
return &Client{
|
|
config: config,
|
|
workChan: make(chan *pb.SmartContractRequest, config.NumWorkers*2),
|
|
logger: log.New(os.Stdout, "[SC-Client] ", log.LstdFlags|log.Lmicroseconds),
|
|
}
|
|
}
|
|
|
|
// Connect establishes a connection to the gRPC server
|
|
func (c *Client) Connect() error {
|
|
var opts []grpc.DialOption
|
|
|
|
if c.config.UseTLS {
|
|
creds, err := credentials.NewClientTLSFromFile(c.config.TLSCertPath, "")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to load TLS credentials: %w", err)
|
|
}
|
|
opts = append(opts, grpc.WithTransportCredentials(creds))
|
|
} else {
|
|
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
}
|
|
|
|
conn, err := grpc.NewClient(c.config.ServerAddress, opts...)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to connect to server: %w", err)
|
|
}
|
|
|
|
c.conn = conn
|
|
c.grpcClient = pb.NewSmartContractServiceClient(conn)
|
|
c.logger.Printf("Connected to server at %s", c.config.ServerAddress)
|
|
return nil
|
|
}
|
|
|
|
// Close closes the gRPC connection
|
|
func (c *Client) Close() error {
|
|
if c.conn != nil {
|
|
return c.conn.Close()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Run starts the client and processes incoming requests
|
|
func (c *Client) Run(ctx context.Context) error {
|
|
// Create metadata with authentication headers
|
|
md := metadata.Pairs(
|
|
"x-api-key", c.config.APIKey,
|
|
"x-smart-contract-id", c.config.SmartContractID,
|
|
)
|
|
ctx = metadata.NewOutgoingContext(ctx, md)
|
|
|
|
// Establish the bi-directional stream
|
|
stream, err := c.grpcClient.Run(ctx)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to establish stream: %w", err)
|
|
}
|
|
|
|
c.logger.Printf("Stream established, starting %d workers", c.config.NumWorkers)
|
|
|
|
// Channel to collect responses from workers
|
|
responseChan := make(chan *pb.SmartContractResponse, c.config.NumWorkers*2)
|
|
errChan := make(chan error, 1)
|
|
|
|
// Start worker goroutines
|
|
for i := 0; i < c.config.NumWorkers; i++ {
|
|
c.wg.Add(1)
|
|
go c.worker(ctx, responseChan)
|
|
}
|
|
|
|
// Goroutine to send responses back to server
|
|
go func() {
|
|
for resp := range responseChan {
|
|
if err := stream.Send(resp); err != nil {
|
|
c.logger.Printf("Error sending response: %v", err)
|
|
select {
|
|
case errChan <- err:
|
|
default:
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Main loop: receive requests and dispatch to workers
|
|
for {
|
|
req, err := stream.Recv()
|
|
if err == io.EOF {
|
|
c.logger.Println("Server closed the stream")
|
|
break
|
|
}
|
|
if err != nil {
|
|
return fmt.Errorf("error receiving request: %w", err)
|
|
}
|
|
|
|
c.logger.Printf("Received request: transaction_id=%s", req.TransactionId)
|
|
|
|
select {
|
|
case c.workChan <- req:
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// Cleanup
|
|
close(c.workChan)
|
|
c.wg.Wait()
|
|
close(responseChan)
|
|
|
|
return nil
|
|
}
|
|
|
|
// worker processes requests from the work channel
|
|
func (c *Client) worker(ctx context.Context, responseChan chan<- *pb.SmartContractResponse) {
|
|
defer c.wg.Done()
|
|
|
|
for {
|
|
select {
|
|
case req, ok := <-c.workChan:
|
|
if !ok {
|
|
return
|
|
}
|
|
c.processRequest(ctx, req, responseChan)
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// processRequest handles a single request
|
|
func (c *Client) processRequest(ctx context.Context, req *pb.SmartContractRequest, responseChan chan<- *pb.SmartContractResponse) {
|
|
// Capture logs (in production, you might want a more sophisticated logging approach)
|
|
var logs string
|
|
|
|
// Call the user-defined Process function
|
|
result := Process(ctx, req.TransactionJson, req.EnvVars, req.Secrets)
|
|
|
|
// Build the response
|
|
resp := &pb.SmartContractResponse{
|
|
TransactionId: req.TransactionId,
|
|
OutputToChain: result.OutputToChain,
|
|
Logs: logs,
|
|
}
|
|
|
|
if result.Error != nil {
|
|
resp.Error = result.Error.Error()
|
|
c.logger.Printf("Error processing transaction %s: %v", req.TransactionId, result.Error)
|
|
} else {
|
|
// Marshal the result data to JSON
|
|
resultJSON, err := json.Marshal(result.Data)
|
|
if err != nil {
|
|
resp.Error = fmt.Sprintf("failed to marshal result: %v", err)
|
|
c.logger.Printf("Error marshaling result for transaction %s: %v", req.TransactionId, err)
|
|
} else {
|
|
resp.ResultJson = string(resultJSON)
|
|
c.logger.Printf("Successfully processed transaction %s", req.TransactionId)
|
|
}
|
|
}
|
|
|
|
select {
|
|
case responseChan <- resp:
|
|
case <-ctx.Done():
|
|
}
|
|
}
|
|
|
|
// LoadConfig loads configuration from a YAML file
|
|
func LoadConfig(path string) (*Config, error) {
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read config file: %w", err)
|
|
}
|
|
|
|
config := &Config{
|
|
NumWorkers: 10,
|
|
ReconnectDelaySecs: 5,
|
|
}
|
|
|
|
if err := yaml.Unmarshal(data, config); err != nil {
|
|
return nil, fmt.Errorf("failed to parse config file: %w", err)
|
|
}
|
|
|
|
// Validate required fields
|
|
if config.ServerAddress == "" {
|
|
return nil, fmt.Errorf("server_address is required")
|
|
}
|
|
if config.SmartContractID == "" {
|
|
return nil, fmt.Errorf("smart_contract_id is required")
|
|
}
|
|
if config.APIKey == "" {
|
|
return nil, fmt.Errorf("api_key is required")
|
|
}
|
|
|
|
return config, nil
|
|
}
|
|
|
|
func main() {
|
|
configPath := flag.String("config", "config.yaml", "Path to configuration file")
|
|
flag.Parse()
|
|
|
|
// Load configuration
|
|
config, err := LoadConfig(*configPath)
|
|
if err != nil {
|
|
log.Fatalf("Failed to load config: %v", err)
|
|
}
|
|
|
|
// Create client
|
|
client := NewClient(config)
|
|
|
|
// Setup signal handling for graceful shutdown
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
go func() {
|
|
sig := <-sigChan
|
|
log.Printf("Received signal %v, shutting down...", sig)
|
|
cancel()
|
|
}()
|
|
|
|
// Connection loop with reconnection logic
|
|
attempts := 0
|
|
for {
|
|
if err := client.Connect(); err != nil {
|
|
log.Printf("Connection failed: %v", err)
|
|
} else {
|
|
attempts = 0
|
|
if err := client.Run(ctx); err != nil {
|
|
if ctx.Err() != nil {
|
|
log.Println("Shutdown requested")
|
|
break
|
|
}
|
|
log.Printf("Stream error: %v", err)
|
|
}
|
|
}
|
|
|
|
_ = client.Close()
|
|
|
|
// Check if we should stop reconnecting
|
|
if ctx.Err() != nil {
|
|
break
|
|
}
|
|
|
|
attempts++
|
|
if config.MaxReconnectAttempts > 0 && attempts >= config.MaxReconnectAttempts {
|
|
log.Printf("Max reconnection attempts (%d) reached, exiting", config.MaxReconnectAttempts)
|
|
break
|
|
}
|
|
|
|
delay := time.Duration(config.ReconnectDelaySecs) * time.Second
|
|
log.Printf("Reconnecting in %v (attempt %d)...", delay, attempts)
|
|
|
|
select {
|
|
case <-time.After(delay):
|
|
case <-ctx.Done():
|
|
break
|
|
}
|
|
}
|
|
|
|
log.Println("Client shut down")
|
|
}
|