Files
prime-sdk-python/prime_sdk/client.py
Andrew Miller 4a7d8b875a Initial commit: Python SDK for Dragonchain (Prime)
Synchronous Python SDK modeled on prime-sdk-go. Provides DC1-HMAC-SHA256
auth, dataclass models, and resource clients for system, transaction,
transaction-type, smart-contract, and block endpoints, plus a YAML
credentials loader.
2026-05-29 16:53:16 -04:00

168 lines
5.0 KiB
Python

"""Low-level HTTP client implementing Dragonchain's DC1-HMAC-SHA256 auth.
This is a direct port of the Go SDK's ``client/client.go``. The signing scheme
must match the prime-node authorizer byte-for-byte: the signed message is six
fields joined by ``\\n`` (no trailing newline) in this exact order::
UPPER(method)
path # full path incl. /api/v1/..., no query string
publicId
timestamp
contentType
base64(sha256(body))
"""
import base64
import hashlib
import hmac
import json
import time
from typing import Any, Optional, Type, TypeVar
import requests
from .errors import DragonchainAPIError
T = TypeVar("T")
CONTENT_TYPE_JSON = "application/json"
DEFAULT_TIMEOUT = 30 # seconds
class Client:
"""Authenticated HTTP client for a single Dragonchain node."""
def __init__(
self,
public_id: str,
auth_key_id: str,
auth_key: str,
base_url: str,
timeout: int = DEFAULT_TIMEOUT,
):
self.public_id = public_id
self.auth_key_id = auth_key_id
self.auth_key = auth_key
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self._session = requests.Session()
# -- introspection helpers (mirror the Go accessors) --------------------
def internal_id(self) -> str:
return self.public_id
def get_public_id(self) -> str:
return self.public_id
def get_auth_key_id(self) -> str:
return self.auth_key_id
def endpoint(self) -> str:
return self.base_url
# -- auth ---------------------------------------------------------------
def _hmac_message(
self, method: str, path: str, timestamp: str, content_type: str, body: bytes
) -> str:
b64_content = base64.b64encode(hashlib.sha256(body).digest()).decode("ascii")
return "\n".join(
[
method.upper(),
path,
self.public_id,
timestamp,
content_type,
b64_content,
]
)
def _generate_auth_header(
self, method: str, path: str, timestamp: str, content_type: str, body: bytes
) -> str:
msg = self._hmac_message(method, path, timestamp, content_type, body)
digest = hmac.new(
self.auth_key.encode("utf-8"), msg.encode("utf-8"), hashlib.sha256
).digest()
b64_hmac = base64.b64encode(digest).decode("ascii")
return f"DC1-HMAC-SHA256 {self.auth_key_id}:{b64_hmac}"
# -- request plumbing ---------------------------------------------------
def _do_request(
self,
method: str,
path: str,
content_type: str = "",
body: Any = None,
response_cls: Optional[Type[T]] = None,
) -> Optional[T]:
if isinstance(body, (bytes, bytearray)):
body_bytes = bytes(body)
elif body is not None:
body_bytes = json.dumps(_to_jsonable(body)).encode("utf-8")
content_type = CONTENT_TYPE_JSON
else:
body_bytes = b""
timestamp = str(int(time.time()))
auth_header = self._generate_auth_header(
method, path, timestamp, content_type, body_bytes
)
headers = {
"Authorization": auth_header,
"Dragonchain": self.public_id,
"Timestamp": timestamp,
}
if content_type:
headers["Content-Type"] = content_type
url = self.base_url + path
resp = self._session.request(
method, url, data=body_bytes, headers=headers, timeout=self.timeout
)
resp_text = resp.text
if resp.status_code >= 400:
raise DragonchainAPIError(resp.status_code, resp_text.strip())
if response_cls is not None and resp_text:
parsed = json.loads(resp_text)
return response_cls.from_dict(parsed) # type: ignore[attr-defined]
return None
# -- verb helpers -------------------------------------------------------
def get(self, path: str, response_cls: Optional[Type[T]] = None) -> Optional[T]:
return self._do_request("GET", path, response_cls=response_cls)
def post(
self,
path: str,
content_type: str,
body: Any,
response_cls: Optional[Type[T]] = None,
) -> Optional[T]:
return self._do_request("POST", path, content_type, body, response_cls)
def put(
self,
path: str,
content_type: str,
body: Any,
response_cls: Optional[Type[T]] = None,
) -> Optional[T]:
return self._do_request("PUT", path, content_type, body, response_cls)
def delete(self, path: str, response_cls: Optional[Type[T]] = None) -> Optional[T]:
return self._do_request("DELETE", path, response_cls=response_cls)
def _to_jsonable(body: Any) -> Any:
"""Convert request models (which expose ``to_dict``) to JSON-ready data."""
if hasattr(body, "to_dict"):
return body.to_dict()
return body