import json
import requests
class HTIApiClient:
def __init__(self, server_url: str, api_code: str, timeout: int = 30):
self.server_url = str(server_url or "").rstrip("/")
self.api_code = str(api_code or "").strip()
self.timeout = timeout
self.session = requests.Session()
if not self.server_url:
raise ValueError("server_url is required")
if not self.api_code:
raise ValueError("api_code is required")
def _headers(self) -> dict:
return {
"X-API-Code": self.api_code,
"Accept": "application/json",
}
def _request(self, path: str, *, params: dict | None = None) -> dict:
response = self.session.get(
f"{self.server_url}{path}",
headers=self._headers(),
params=params,
timeout=self.timeout,
)
try:
payload = response.json()
except Exception:
payload = {"detail": response.text}
if not response.ok:
message = payload.get("detail") or payload.get("message") or response.text
raise RuntimeError(f"HTTP {response.status_code}: {message}")
return payload
def authenticate(self) -> dict:
return self._request("/client-api/hti/auth")
def get_history(self, limit: int = 10) -> dict:
return self._request("/client-api/hti/history", params={"limit": limit})
def stream_realtime(self, poll_seconds: float = 2.0):
response = self.session.get(
f"{self.server_url}/client-api/hti/stream",
params={"api_code": self.api_code, "poll_seconds": poll_seconds},
headers={"Accept": "text/event-stream"},
stream=True,
timeout=(10, None),
)
if not response.ok:
try:
payload = response.json()
except Exception:
payload = {"detail": response.text}
message = payload.get("detail") or payload.get("message") or response.text
raise RuntimeError(f"HTTP {response.status_code}: {message}")
event_name = "message"
data_lines = []
try:
for raw_line in response.iter_lines(chunk_size=1, decode_unicode=True):
if raw_line is None:
continue
line = raw_line.strip()
if not line:
if data_lines:
raw_payload = "\n".join(data_lines)
try:
payload = json.loads(raw_payload)
except Exception:
payload = raw_payload
yield {
"event": event_name,
"data": payload,
}
event_name = "message"
data_lines = []
continue
if line.startswith(":"):
continue
if line.startswith("event:"):
event_name = line.split(":", 1)[1].strip()
continue
if line.startswith("data:"):
data_lines.append(line.split(":", 1)[1].strip())
finally:
response.close()
def print_divider(char: str = "=") -> None:
print(char * 72)
def print_section(title: str) -> None:
print()
print_divider("=")
print(title)
print_divider("=")
def format_filter_values(values) -> str:
if not values:
return "ALL"
return ", ".join(str(value) for value in values)
def print_profile(profile: dict) -> None:
filters = profile.get("filters", {})
print(f"Account : {profile.get('account_name', '-')}")
print(f"API Code : {profile.get('api_code', '-')}")
print(f"Validity : {profile.get('validity_type', '-')}")
print(f"Valid Until : {profile.get('valid_until') or 'Unlimited'}")
print(f"Req / Minute : {profile.get('request_limit_per_minute', '-')}")
print(f"Asset Classes : {format_filter_values(filters.get('asset_classes'))}")
print(f"Timeframes : {format_filter_values(filters.get('timeframes'))}")
print(f"Instruments : {format_filter_values(filters.get('instruments'))}")
print(f"Prefixes : {format_filter_values(filters.get('prefixes'))}")
print(f"Sides : {format_filter_values(filters.get('sides'))}")
print(f"Authenticated At : {profile.get('authenticated_at', '-')}")
def print_idea(idea: dict, index: int | None = None) -> None:
title = f"[{index}] {idea.get('INSTRUMENT', '-')}" if index is not None else str(idea.get("INSTRUMENT", "-"))
print_divider("-")
print(f"{title} | {idea.get('TIMEFRAME', '-')} | {idea.get('ASSET_CLASS', '-')}")
print_divider("-")
print(f"Signal : {idea.get('SIDE', '-')} {idea.get('OPEN_ORDER_TYPE', '-')}")
print(f"Confidence : {idea.get('CONFIDENCE', '-')}")
print(f"Estimated P/L : {idea.get('ESTIMATED_PL', '-')}")
print(f"Price : {idea.get('PRICE', '-')}")
print(f"Take Profit : {idea.get('TAKE_PROFIT', '-')}")
print(f"Stop Loss : {idea.get('STOP_LOSS', '-')}")
print(f"Prefix : {idea.get('PREFIX', '-')}")
print(f"Idea ID : {idea.get('idea_id', idea.get('ID', '-'))}")
print(f"Created At : {idea.get('CREATION_TIME', '-')}")
print(f"Last Update : {idea.get('LAST_UPDATE', '-')}")
print(f"Expiration Time : {idea.get('EXPIRATION_TIME', '-')}")
description = str(idea.get("DESCRIPTION", "")).strip()
if description:
print("Description :")
print(description)
def print_history(payload: dict) -> None:
ideas = payload.get("ideas", [])
print(f"Returned Items : {payload.get('count', 0)}")
print(f"Requested Limit : {payload.get('limit', '-')}")
print(f"Served At : {payload.get('served_at', '-')}")
if not ideas:
print("\nNo history items returned.")
return
for index, idea in enumerate(ideas, start=1):
print()
print_idea(idea, index=index)
def print_stream_guide() -> None:
print("READY = stream connected")
print("PING = stream is still alive")
print("IDEA = new matching trade idea received")
print("ERROR = server closed the stream")
def print_stream_event(message: dict) -> None:
event_name = str(message.get("event", "message")).upper()
data = message.get("data")
if event_name == "READY":
print()
print_divider("=")
print("STREAM READY")
print_divider("=")
print(f"Connected At : {data.get('connected_at', '-')}")
print(f"Poll Seconds : {data.get('poll_seconds', '-')}")
profile = data.get("profile", {})
if profile:
print(f"Account : {profile.get('account_name', '-')}")
print(f"API Code : {profile.get('api_code', '-')}")
return
if event_name == "PING":
print(f"[PING] {data.get('ts', '-')}")
return
if event_name == "IDEA":
print()
print_divider("=")
print("NEW IDEA RECEIVED")
print_divider("=")
print_idea(data)
return
if event_name == "ERROR":
print()
print_divider("=")
print("STREAM ERROR")
print_divider("=")
print(data)
return
print()
print_divider("=")
print(f"EVENT: {event_name}")
print_divider("=")
print(data)
if __name__ == "__main__":
SERVER_URL = "http://your-hti-environment:3500"
CUSTOMER_API_CODE = "YOUR_API_CODE"
client = HTIApiClient(server_url=SERVER_URL, api_code=CUSTOMER_API_CODE)
print_section("AUTH")
auth_payload = client.authenticate()
profile = auth_payload.get("profile", {})
print_profile(profile)
print_section("HISTORY")
history_payload = client.get_history(limit=10)
print_history(history_payload)
print_section("REALTIME")
print_stream_guide()
try:
for message in client.stream_realtime(poll_seconds=2.0):
print_stream_event(message)
except KeyboardInterrupt:
print("\nClient stopped.")