Initial commit: smart contract templates for bash, go, python, and typescript

This commit is contained in:
2026-03-17 19:59:47 -04:00
commit 0634e66469
35 changed files with 3794 additions and 0 deletions

40
python/Makefile Executable file
View File

@@ -0,0 +1,40 @@
.PHONY: proto run clean setup venv test
# Generate Python code from proto files
proto:
python -m grpc_tools.protoc \
-I./proto \
--python_out=. \
--grpc_python_out=. \
proto/remote_sc.proto
# Create virtual environment and install dependencies
setup: venv
. venv/bin/activate && pip install -r requirements.txt
# Create virtual environment
venv:
python3 -m venv venv
# Run the smart contract
run:
python main.py --config config.yaml
# Clean generated files
clean:
rm -f *_pb2.py *_pb2_grpc.py
rm -rf __pycache__
rm -rf venv
# Run tests
test:
python -m pytest tests/ -v
# Install dependencies (without venv)
deps:
pip install -r requirements.txt
# Format code
format:
black .
isort .

205
python/README.md Executable file
View File

@@ -0,0 +1,205 @@
# Python Smart Contract Template
A Python-based smart contract client for Dragonchain Prime that connects via gRPC.
## Prerequisites
- Python 3.10 or later
- pip
## Quick Start
1. **Copy this template** to create your smart contract:
```bash
cp -r python /path/to/my-smart-contract
cd /path/to/my-smart-contract
```
2. **Set up the environment**:
```bash
make setup
source venv/bin/activate
```
Or without make:
```bash
python3 -m venv venv
source venv/bin/activate
pip install -r requirements.txt
```
3. **Generate the protobuf code**:
```bash
make proto
```
4. **Configure your connection** by editing `config.yaml`:
```yaml
server_address: "your-dragonchain-server:50051"
smart_contract_id: "your-smart-contract-id"
api_key: "your-api-key"
```
5. **Implement your smart contract logic** in `process.py` by modifying the `process` function.
6. **Run**:
```bash
python main.py --config config.yaml
```
## Configuration
| Field | Description | Default |
|-------|-------------|---------|
| `server_address` | gRPC server address | Required |
| `smart_contract_id` | Your smart contract ID | Required |
| `api_key` | API key for authentication | Required |
| `use_tls` | Enable TLS encryption | `false` |
| `tls_cert_path` | Path to TLS certificate | - |
| `num_workers` | Concurrent transaction processors | `10` |
| `reconnect_delay_seconds` | Delay between reconnection attempts | `5` |
| `max_reconnect_attempts` | Max reconnect attempts (0 = infinite) | `0` |
## Implementing Your Smart Contract
Edit the `process` function in `process.py`:
```python
def process(
tx_json: str,
env_vars: dict[str, str],
secrets: dict[str, str],
) -> ProcessResult:
# Parse the transaction
tx = Transaction.from_json(tx_json)
# Access transaction data
txn_id = tx.header.txn_id
txn_type = tx.header.txn_type
payload = tx.payload
# Access environment variables
sc_name = env_vars.get("SMART_CONTRACT_NAME")
dc_id = env_vars.get("DRAGONCHAIN_ID")
# Access secrets
my_secret = secrets.get("SC_SECRET_MY_SECRET")
# Implement your logic here
result = {
"status": "success",
"data": "your result data",
}
return ProcessResult(
data=result,
output_to_chain=True, # Set to True to persist result on chain
error=None,
)
```
### Transaction Structure
The `Transaction` class in `process.py` matches the Dragonchain transaction format:
```python
@dataclass
class Transaction:
version: str
header: TransactionHeader
payload: dict[str, Any]
@dataclass
class TransactionHeader:
tag: str
dc_id: str
txn_id: str
block_id: str
txn_type: str
timestamp: str
invoker: str
```
### Available Environment Variables
| Variable | Description |
|----------|-------------|
| `TZ` | Timezone |
| `ENVIRONMENT` | Deployment environment |
| `INTERNAL_ID` | Internal identifier |
| `DRAGONCHAIN_ID` | Dragonchain ID |
| `DRAGONCHAIN_ENDPOINT` | Dragonchain API endpoint |
| `SMART_CONTRACT_ID` | This smart contract's ID |
| `SMART_CONTRACT_NAME` | This smart contract's name |
| `SC_ENV_*` | Custom environment variables |
### Secrets
Secrets are provided in the `secrets` dict with keys prefixed by `SC_SECRET_`.
## Project Structure
```
.
├── main.py # Client infrastructure (do not modify)
├── process.py # Your smart contract logic (modify this)
├── proto/
│ └── remote_sc.proto # gRPC service definition
├── config.yaml # Configuration file
├── requirements.txt # Python dependencies
├── Makefile # Build commands
└── README.md # This file
```
### File Descriptions
- **`process.py`** - Contains the `process` function where you implement your smart contract logic. This is the only file you need to modify for most use cases.
- **`main.py`** - Contains the gRPC client infrastructure, connection handling, and worker pool. You typically don't need to modify this file.
## Make Commands
```bash
make setup # Create venv and install dependencies
make proto # Generate Python code from proto files
make run # Run with default config
make test # Run tests
make clean # Remove generated files and venv
make deps # Install dependencies (no venv)
make format # Format code with black and isort
```
## Concurrent Processing
The client uses a thread pool to process multiple transactions concurrently. The number of workers is configurable via `num_workers` in the config file.
## Error Handling
- Return errors from the `process` function via `ProcessResult.error` to report failures
- The client automatically handles reconnection on connection failures
- Logs are captured and sent back with the response
## Docker
Example `Dockerfile`:
```dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
RUN python -m grpc_tools.protoc \
-I./proto \
--python_out=. \
--grpc_python_out=. \
proto/remote_sc.proto
CMD ["python", "main.py", "--config", "config.yaml"]
```
## License
[Your License Here]

24
python/config.yaml Executable file
View File

@@ -0,0 +1,24 @@
# Smart Contract Client Configuration
# Copy this file and fill in your values
# The gRPC server address to connect to
server_address: "localhost:50051"
# Your smart contract ID (provided by Dragonchain)
smart_contract_id: "your-smart-contract-id"
# API key for authentication (provided by Dragonchain)
api_key: "your-api-key"
# Whether to use TLS for the connection
use_tls: false
# Path to TLS certificate (required if use_tls is true)
# tls_cert_path: "/path/to/cert.pem"
# Number of worker threads for processing transactions concurrently
num_workers: 10
# Reconnect settings
reconnect_delay_seconds: 5
max_reconnect_attempts: 0 # 0 = infinite retries

297
python/main.py Executable file
View File

@@ -0,0 +1,297 @@
#!/usr/bin/env python3
"""
Dragonchain Smart Contract Client
A gRPC client that connects to Dragonchain Prime server to process
smart contract transactions.
Do not modify this file unless you need to customize the client behavior.
Implement your smart contract logic in process.py instead.
"""
import argparse
import json
import logging
import queue
import signal
import sys
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import Any, Optional
import grpc
import yaml
import remote_sc_pb2 as pb
import remote_sc_pb2_grpc as pb_grpc
from process import ProcessResult, process
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("SmartContract")
# =============================================================================
# Configuration and Client Infrastructure
# Do not modify this file unless you need to customize the client behavior.
# Implement your smart contract logic in process.py instead.
# =============================================================================
@dataclass
class Config:
"""Client configuration loaded from YAML."""
server_address: str
smart_contract_id: str
api_key: str
use_tls: bool = False
tls_cert_path: Optional[str] = None
num_workers: int = 10
reconnect_delay_seconds: int = 5
max_reconnect_attempts: int = 0 # 0 = infinite
class SmartContractClient:
"""gRPC client for smart contract execution."""
def __init__(self, config: Config):
self.config = config
self.channel: Optional[grpc.Channel] = None
self.stub: Optional[pb_grpc.SmartContractServiceStub] = None
self.running = False
self.work_queue: queue.Queue = queue.Queue()
self.response_queue: queue.Queue = queue.Queue()
self.executor: Optional[ThreadPoolExecutor] = None
def connect(self) -> bool:
"""Establish connection to the gRPC server."""
try:
if self.config.use_tls:
if not self.config.tls_cert_path:
logger.error("TLS enabled but no certificate path provided")
return False
with open(self.config.tls_cert_path, "rb") as f:
creds = grpc.ssl_channel_credentials(f.read())
self.channel = grpc.secure_channel(self.config.server_address, creds)
else:
self.channel = grpc.insecure_channel(self.config.server_address)
self.stub = pb_grpc.SmartContractServiceStub(self.channel)
logger.info(f"Connected to server at {self.config.server_address}")
return True
except Exception as e:
logger.error(f"Failed to connect: {e}")
return False
def close(self):
"""Close the gRPC connection."""
if self.channel:
self.channel.close()
self.channel = None
self.stub = None
def _response_generator(self):
"""Generator that yields responses from the response queue."""
while self.running:
try:
response = self.response_queue.get(timeout=1.0)
if response is None:
break
yield response
except queue.Empty:
continue
def _process_request(self, request: pb.SmartContractRequest):
"""Process a single request and queue the response."""
logs = ""
try:
result = process(
tx_json=request.transaction_json,
env_vars=dict(request.env_vars),
secrets=dict(request.secrets),
)
response = pb.SmartContractResponse(
transaction_id=request.transaction_id,
output_to_chain=result.output_to_chain,
logs=logs,
)
if result.error:
response.error = result.error
logger.error(
f"Error processing transaction {request.transaction_id}: {result.error}"
)
else:
response.result_json = json.dumps(result.data) if result.data else "{}"
logger.info(f"Successfully processed transaction {request.transaction_id}")
except Exception as e:
response = pb.SmartContractResponse(
transaction_id=request.transaction_id,
error=str(e),
logs=logs,
)
logger.exception(f"Exception processing transaction {request.transaction_id}")
self.response_queue.put(response)
def _worker(self):
"""Worker thread that processes requests from the queue."""
while self.running:
try:
request = self.work_queue.get(timeout=1.0)
if request is None:
break
self._process_request(request)
except queue.Empty:
continue
def run(self) -> bool:
"""Run the client and process incoming requests."""
if not self.stub:
logger.error("Not connected to server")
return False
self.running = True
self.executor = ThreadPoolExecutor(max_workers=self.config.num_workers)
# Start worker threads
workers = []
for _ in range(self.config.num_workers):
future = self.executor.submit(self._worker)
workers.append(future)
logger.info(f"Started {self.config.num_workers} worker threads")
# Create metadata for authentication
metadata = [
("x-api-key", self.config.api_key),
("x-smart-contract-id", self.config.smart_contract_id),
]
try:
# Establish bi-directional stream
stream = self.stub.Run(self._response_generator(), metadata=metadata)
logger.info("Stream established, waiting for requests...")
# Receive and dispatch requests
for request in stream:
if not self.running:
break
logger.info(f"Received request: transaction_id={request.transaction_id}")
self.work_queue.put(request)
logger.info("Server closed the stream")
return True
except grpc.RpcError as e:
logger.error(f"gRPC error: {e.code()} - {e.details()}")
return False
except Exception as e:
logger.exception(f"Error in run loop: {e}")
return False
finally:
self.running = False
# Signal workers to stop
for _ in range(self.config.num_workers):
self.work_queue.put(None)
self.response_queue.put(None)
# Wait for workers to finish
if self.executor:
self.executor.shutdown(wait=True)
def stop(self):
"""Stop the client gracefully."""
logger.info("Stopping client...")
self.running = False
def load_config(path: str) -> Config:
"""Load configuration from a YAML file."""
with open(path, "r") as f:
data = yaml.safe_load(f)
# Validate required fields
required = ["server_address", "smart_contract_id", "api_key"]
for field in required:
if field not in data or not data[field]:
raise ValueError(f"Missing required config field: {field}")
return Config(
server_address=data["server_address"],
smart_contract_id=data["smart_contract_id"],
api_key=data["api_key"],
use_tls=data.get("use_tls", False),
tls_cert_path=data.get("tls_cert_path"),
num_workers=data.get("num_workers", 10),
reconnect_delay_seconds=data.get("reconnect_delay_seconds", 5),
max_reconnect_attempts=data.get("max_reconnect_attempts", 0),
)
def main():
parser = argparse.ArgumentParser(description="Dragonchain Smart Contract Client")
parser.add_argument(
"--config",
"-c",
default="config.yaml",
help="Path to configuration file",
)
args = parser.parse_args()
# Load configuration
try:
config = load_config(args.config)
except Exception as e:
logger.error(f"Failed to load config: {e}")
sys.exit(1)
# Create client
client = SmartContractClient(config)
# Setup signal handling for graceful shutdown
def signal_handler(signum, frame):
logger.info(f"Received signal {signum}, shutting down...")
client.stop()
signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)
# Connection loop with reconnection logic
attempts = 0
while True:
if client.connect():
attempts = 0
if not client.run():
if not client.running:
logger.info("Shutdown requested")
break
client.close()
attempts += 1
if config.max_reconnect_attempts > 0 and attempts >= config.max_reconnect_attempts:
logger.error(f"Max reconnection attempts ({config.max_reconnect_attempts}) reached")
break
delay = config.reconnect_delay_seconds
logger.info(f"Reconnecting in {delay} seconds (attempt {attempts})...")
time.sleep(delay)
logger.info("Client shut down")
if __name__ == "__main__":
main()

6
python/package-lock.json generated Executable file
View File

@@ -0,0 +1,6 @@
{
"name": "python",
"lockfileVersion": 3,
"requires": true,
"packages": {}
}

140
python/process.py Executable file
View File

@@ -0,0 +1,140 @@
"""
Smart Contract Processing Logic
This file contains the transaction processing logic for your smart contract.
Modify the `process` function to implement your business logic.
"""
import json
from dataclasses import dataclass
from typing import Any, Optional
# =============================================================================
# SMART CONTRACT IMPLEMENTATION - MODIFY THIS FILE
# =============================================================================
@dataclass
class TransactionHeader:
"""Transaction metadata from Dragonchain."""
tag: str
dc_id: str
txn_id: str
block_id: str
txn_type: str
timestamp: str
invoker: str
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "TransactionHeader":
"""Parse header from dictionary."""
return cls(
tag=data.get("tag", ""),
dc_id=data.get("dc_id", ""),
txn_id=data.get("txn_id", ""),
block_id=data.get("block_id", ""),
txn_type=data.get("txn_type", ""),
timestamp=data.get("timestamp", ""),
invoker=data.get("invoker", ""),
)
@dataclass
class Transaction:
"""
Parsed transaction from the server.
Customize this class to match your transaction payload structure.
"""
version: str
header: TransactionHeader
payload: dict[str, Any]
@classmethod
def from_json(cls, json_str: str) -> "Transaction":
"""Parse a transaction from JSON string."""
data = json.loads(json_str)
return cls(
version=data.get("version", ""),
header=TransactionHeader.from_dict(data.get("header", {})),
payload=data.get("payload", {}),
)
@dataclass
class ProcessResult:
"""Result from the process function."""
data: Optional[dict[str, Any]] = None
output_to_chain: bool = True
error: Optional[str] = None
def process(
tx_json: str,
env_vars: dict[str, str],
secrets: dict[str, str],
) -> ProcessResult:
"""
Process an incoming transaction.
Implement your smart contract logic here.
Args:
tx_json: Raw transaction JSON string
env_vars: Environment variables passed from the server
secrets: Secrets passed from the server (e.g., API keys, credentials)
Returns:
ProcessResult containing the result data, whether to output to chain, and any error
"""
try:
# Parse the transaction JSON
tx = Transaction.from_json(tx_json)
except json.JSONDecodeError as e:
return ProcessResult(error=f"Failed to parse transaction: {e}")
# ==========================================================================
# TODO: Implement your smart contract logic here
# ==========================================================================
#
# Example: Access transaction data
# txn_id = tx.header.txn_id
# txn_type = tx.header.txn_type
# payload = tx.payload
#
# Example: Access environment variables
# sc_name = env_vars.get("SMART_CONTRACT_NAME")
# dc_id = env_vars.get("DRAGONCHAIN_ID")
#
# Example: Access secrets
# api_key = secrets.get("SC_SECRET_MY_API_KEY")
#
# Example: Process based on payload action
# action = tx.payload.get("action")
# if action == "create":
# # Handle create operation
# pass
# elif action == "update":
# # Handle update operation
# pass
# else:
# return ProcessResult(error=f"Unknown action: {action}")
# Default implementation: echo back the transaction
result = {
"status": "processed",
"transaction_id": tx.header.txn_id,
"txn_type": tx.header.txn_type,
"payload": tx.payload,
"message": "Transaction processed successfully",
}
return ProcessResult(
data=result,
output_to_chain=True,
error=None,
)

48
python/proto/remote_sc.proto Executable file
View File

@@ -0,0 +1,48 @@
syntax = "proto3";
package remote_sc;
// SmartContractService defines the bi-directional streaming service for remote
// smart contract execution. External workers connect to this service to receive
// execution tasks and send back results.
service SmartContractService {
// Run establishes a bi-directional stream. The server sends SmartContractRequest
// messages to the client for execution, and the client sends back
// SmartContractResponse messages with results.
rpc Run(stream SmartContractResponse) returns (stream SmartContractRequest);
}
// SmartContractRequest is sent from the server to the connected worker
// to request execution of a smart contract.
message SmartContractRequest {
// Unique identifier for this execution request, used to correlate responses
string transaction_id = 1;
// Full transaction JSON to be processed by the smart contract
string transaction_json = 2;
// Environment variables to set for the smart contract execution
map<string, string> env_vars = 3;
// Secrets to be made available to the smart contract
map<string, string> secrets = 4;
}
// SmartContractResponse is sent from the worker back to the server
// with the results of smart contract execution.
message SmartContractResponse {
// The transaction_id from the original request, for correlation
string transaction_id = 1;
// The result data from the smart contract execution as JSON
string result_json = 2;
// Logs captured during smart contract execution
string logs = 3;
// Whether to persist the output to the chain
bool output_to_chain = 4;
// Error message if execution failed
string error = 5;
}

4
python/requirements.txt Executable file
View File

@@ -0,0 +1,4 @@
grpcio>=1.60.0
grpcio-tools>=1.60.0
protobuf>=4.25.0
pyyaml>=6.0