The HTTP Session was hardcoded with no injection point and followed redirects by default, so a server-side caller pointing the client at an attacker-influenced base_url (a tenant's prime_endpoint) had no way to attach an SSRF policy, and a public endpoint could 302-redirect the request to an internal address (e.g. the cloud metadata service). - Client/DragonchainSDK now accept an optional `session` so callers can inject a Session whose transport adapter refuses internal IPs. Default stays unguarded for trusted/CLI use — the guard belongs in the server. - Requests are sent with allow_redirects=False; Prime never legitimately redirects, and a 3xx now surfaces to the caller instead of being followed.
184 lines
5.9 KiB
Python
184 lines
5.9 KiB
Python
"""Low-level HTTP client implementing Dragonchain's DC1-HMAC-SHA256 auth.
|
|
|
|
This is a direct port of the Go SDK's ``client/client.go``. The signing scheme
|
|
must match the prime-node authorizer byte-for-byte: the signed message is six
|
|
fields joined by ``\\n`` (no trailing newline) in this exact order::
|
|
|
|
UPPER(method)
|
|
path # full path incl. /api/v1/..., no query string
|
|
publicId
|
|
timestamp
|
|
contentType
|
|
base64(sha256(body))
|
|
"""
|
|
|
|
import base64
|
|
import hashlib
|
|
import hmac
|
|
import json
|
|
import time
|
|
from typing import Any, Optional, Type, TypeVar
|
|
|
|
import requests
|
|
|
|
from .errors import DragonchainAPIError
|
|
|
|
T = TypeVar("T")
|
|
|
|
CONTENT_TYPE_JSON = "application/json"
|
|
DEFAULT_TIMEOUT = 30 # seconds
|
|
|
|
|
|
class Client:
|
|
"""Authenticated HTTP client for a single Dragonchain node."""
|
|
|
|
def __init__(
|
|
self,
|
|
public_id: str,
|
|
auth_key_id: str,
|
|
auth_key: str,
|
|
base_url: str,
|
|
timeout: int = DEFAULT_TIMEOUT,
|
|
session: Optional[requests.Session] = None,
|
|
):
|
|
self.public_id = public_id
|
|
self.auth_key_id = auth_key_id
|
|
self.auth_key = auth_key
|
|
self.base_url = base_url.rstrip("/")
|
|
self.timeout = timeout
|
|
# A caller may inject a pre-configured Session — e.g. one with a
|
|
# transport adapter that refuses to connect to internal IPs — when the
|
|
# base_url is attacker-influenced (a tenant's prime_endpoint). The
|
|
# default Session is unguarded, which is fine for trusted/CLI use; the
|
|
# SSRF policy belongs in the server that points this client at
|
|
# untrusted endpoints, not baked into the client library.
|
|
self._session = session if session is not None else requests.Session()
|
|
|
|
# -- introspection helpers (mirror the Go accessors) --------------------
|
|
|
|
def internal_id(self) -> str:
|
|
return self.public_id
|
|
|
|
def get_public_id(self) -> str:
|
|
return self.public_id
|
|
|
|
def get_auth_key_id(self) -> str:
|
|
return self.auth_key_id
|
|
|
|
def endpoint(self) -> str:
|
|
return self.base_url
|
|
|
|
# -- auth ---------------------------------------------------------------
|
|
|
|
def _hmac_message(
|
|
self, method: str, path: str, timestamp: str, content_type: str, body: bytes
|
|
) -> str:
|
|
b64_content = base64.b64encode(hashlib.sha256(body).digest()).decode("ascii")
|
|
return "\n".join(
|
|
[
|
|
method.upper(),
|
|
path,
|
|
self.public_id,
|
|
timestamp,
|
|
content_type,
|
|
b64_content,
|
|
]
|
|
)
|
|
|
|
def _generate_auth_header(
|
|
self, method: str, path: str, timestamp: str, content_type: str, body: bytes
|
|
) -> str:
|
|
msg = self._hmac_message(method, path, timestamp, content_type, body)
|
|
digest = hmac.new(
|
|
self.auth_key.encode("utf-8"), msg.encode("utf-8"), hashlib.sha256
|
|
).digest()
|
|
b64_hmac = base64.b64encode(digest).decode("ascii")
|
|
return f"DC1-HMAC-SHA256 {self.auth_key_id}:{b64_hmac}"
|
|
|
|
# -- request plumbing ---------------------------------------------------
|
|
|
|
def _do_request(
|
|
self,
|
|
method: str,
|
|
path: str,
|
|
content_type: str = "",
|
|
body: Any = None,
|
|
response_cls: Optional[Type[T]] = None,
|
|
) -> Optional[T]:
|
|
if isinstance(body, (bytes, bytearray)):
|
|
body_bytes = bytes(body)
|
|
elif body is not None:
|
|
body_bytes = json.dumps(_to_jsonable(body)).encode("utf-8")
|
|
content_type = CONTENT_TYPE_JSON
|
|
else:
|
|
body_bytes = b""
|
|
|
|
timestamp = str(int(time.time()))
|
|
auth_header = self._generate_auth_header(
|
|
method, path, timestamp, content_type, body_bytes
|
|
)
|
|
|
|
headers = {
|
|
"Authorization": auth_header,
|
|
"Dragonchain": self.public_id,
|
|
"Timestamp": timestamp,
|
|
}
|
|
if content_type:
|
|
headers["Content-Type"] = content_type
|
|
|
|
url = self.base_url + path
|
|
# allow_redirects=False: Prime is a JSON API that never legitimately
|
|
# 3xx-redirects, and following redirects is an SSRF vector (a public
|
|
# endpoint that 302s to an internal address). A redirect surfaces as a
|
|
# 3xx the caller can inspect rather than being silently followed.
|
|
resp = self._session.request(
|
|
method,
|
|
url,
|
|
data=body_bytes,
|
|
headers=headers,
|
|
timeout=self.timeout,
|
|
allow_redirects=False,
|
|
)
|
|
|
|
resp_text = resp.text
|
|
if resp.status_code >= 400:
|
|
raise DragonchainAPIError(resp.status_code, resp_text.strip())
|
|
|
|
if response_cls is not None and resp_text:
|
|
parsed = json.loads(resp_text)
|
|
return response_cls.from_dict(parsed) # type: ignore[attr-defined]
|
|
return None
|
|
|
|
# -- verb helpers -------------------------------------------------------
|
|
|
|
def get(self, path: str, response_cls: Optional[Type[T]] = None) -> Optional[T]:
|
|
return self._do_request("GET", path, response_cls=response_cls)
|
|
|
|
def post(
|
|
self,
|
|
path: str,
|
|
content_type: str,
|
|
body: Any,
|
|
response_cls: Optional[Type[T]] = None,
|
|
) -> Optional[T]:
|
|
return self._do_request("POST", path, content_type, body, response_cls)
|
|
|
|
def put(
|
|
self,
|
|
path: str,
|
|
content_type: str,
|
|
body: Any,
|
|
response_cls: Optional[Type[T]] = None,
|
|
) -> Optional[T]:
|
|
return self._do_request("PUT", path, content_type, body, response_cls)
|
|
|
|
def delete(self, path: str, response_cls: Optional[Type[T]] = None) -> Optional[T]:
|
|
return self._do_request("DELETE", path, response_cls=response_cls)
|
|
|
|
|
|
def _to_jsonable(body: Any) -> Any:
|
|
"""Convert request models (which expose ``to_dict``) to JSON-ready data."""
|
|
if hasattr(body, "to_dict"):
|
|
return body.to_dict()
|
|
return body
|