Files
prime-sdk-python/prime_sdk/client.py
Andrew Miller 5410b283e2 client: allow injecting a requests.Session + stop following redirects
The HTTP Session was hardcoded with no injection point and followed
redirects by default, so a server-side caller pointing the client at an
attacker-influenced base_url (a tenant's prime_endpoint) had no way to
attach an SSRF policy, and a public endpoint could 302-redirect the
request to an internal address (e.g. the cloud metadata service).

- Client/DragonchainSDK now accept an optional `session` so callers can
  inject a Session whose transport adapter refuses internal IPs. Default
  stays unguarded for trusted/CLI use — the guard belongs in the server.
- Requests are sent with allow_redirects=False; Prime never legitimately
  redirects, and a 3xx now surfaces to the caller instead of being followed.
2026-06-04 12:41:12 -04:00

184 lines
5.9 KiB
Python

"""Low-level HTTP client implementing Dragonchain's DC1-HMAC-SHA256 auth.
This is a direct port of the Go SDK's ``client/client.go``. The signing scheme
must match the prime-node authorizer byte-for-byte: the signed message is six
fields joined by ``\\n`` (no trailing newline) in this exact order::
UPPER(method)
path # full path incl. /api/v1/..., no query string
publicId
timestamp
contentType
base64(sha256(body))
"""
import base64
import hashlib
import hmac
import json
import time
from typing import Any, Optional, Type, TypeVar
import requests
from .errors import DragonchainAPIError
T = TypeVar("T")
CONTENT_TYPE_JSON = "application/json"
DEFAULT_TIMEOUT = 30 # seconds
class Client:
"""Authenticated HTTP client for a single Dragonchain node."""
def __init__(
self,
public_id: str,
auth_key_id: str,
auth_key: str,
base_url: str,
timeout: int = DEFAULT_TIMEOUT,
session: Optional[requests.Session] = None,
):
self.public_id = public_id
self.auth_key_id = auth_key_id
self.auth_key = auth_key
self.base_url = base_url.rstrip("/")
self.timeout = timeout
# A caller may inject a pre-configured Session — e.g. one with a
# transport adapter that refuses to connect to internal IPs — when the
# base_url is attacker-influenced (a tenant's prime_endpoint). The
# default Session is unguarded, which is fine for trusted/CLI use; the
# SSRF policy belongs in the server that points this client at
# untrusted endpoints, not baked into the client library.
self._session = session if session is not None else requests.Session()
# -- introspection helpers (mirror the Go accessors) --------------------
def internal_id(self) -> str:
return self.public_id
def get_public_id(self) -> str:
return self.public_id
def get_auth_key_id(self) -> str:
return self.auth_key_id
def endpoint(self) -> str:
return self.base_url
# -- auth ---------------------------------------------------------------
def _hmac_message(
self, method: str, path: str, timestamp: str, content_type: str, body: bytes
) -> str:
b64_content = base64.b64encode(hashlib.sha256(body).digest()).decode("ascii")
return "\n".join(
[
method.upper(),
path,
self.public_id,
timestamp,
content_type,
b64_content,
]
)
def _generate_auth_header(
self, method: str, path: str, timestamp: str, content_type: str, body: bytes
) -> str:
msg = self._hmac_message(method, path, timestamp, content_type, body)
digest = hmac.new(
self.auth_key.encode("utf-8"), msg.encode("utf-8"), hashlib.sha256
).digest()
b64_hmac = base64.b64encode(digest).decode("ascii")
return f"DC1-HMAC-SHA256 {self.auth_key_id}:{b64_hmac}"
# -- request plumbing ---------------------------------------------------
def _do_request(
self,
method: str,
path: str,
content_type: str = "",
body: Any = None,
response_cls: Optional[Type[T]] = None,
) -> Optional[T]:
if isinstance(body, (bytes, bytearray)):
body_bytes = bytes(body)
elif body is not None:
body_bytes = json.dumps(_to_jsonable(body)).encode("utf-8")
content_type = CONTENT_TYPE_JSON
else:
body_bytes = b""
timestamp = str(int(time.time()))
auth_header = self._generate_auth_header(
method, path, timestamp, content_type, body_bytes
)
headers = {
"Authorization": auth_header,
"Dragonchain": self.public_id,
"Timestamp": timestamp,
}
if content_type:
headers["Content-Type"] = content_type
url = self.base_url + path
# allow_redirects=False: Prime is a JSON API that never legitimately
# 3xx-redirects, and following redirects is an SSRF vector (a public
# endpoint that 302s to an internal address). A redirect surfaces as a
# 3xx the caller can inspect rather than being silently followed.
resp = self._session.request(
method,
url,
data=body_bytes,
headers=headers,
timeout=self.timeout,
allow_redirects=False,
)
resp_text = resp.text
if resp.status_code >= 400:
raise DragonchainAPIError(resp.status_code, resp_text.strip())
if response_cls is not None and resp_text:
parsed = json.loads(resp_text)
return response_cls.from_dict(parsed) # type: ignore[attr-defined]
return None
# -- verb helpers -------------------------------------------------------
def get(self, path: str, response_cls: Optional[Type[T]] = None) -> Optional[T]:
return self._do_request("GET", path, response_cls=response_cls)
def post(
self,
path: str,
content_type: str,
body: Any,
response_cls: Optional[Type[T]] = None,
) -> Optional[T]:
return self._do_request("POST", path, content_type, body, response_cls)
def put(
self,
path: str,
content_type: str,
body: Any,
response_cls: Optional[Type[T]] = None,
) -> Optional[T]:
return self._do_request("PUT", path, content_type, body, response_cls)
def delete(self, path: str, response_cls: Optional[Type[T]] = None) -> Optional[T]:
return self._do_request("DELETE", path, response_cls=response_cls)
def _to_jsonable(body: Any) -> Any:
"""Convert request models (which expose ``to_dict``) to JSON-ready data."""
if hasattr(body, "to_dict"):
return body.to_dict()
return body