Skip to main content

Overview

This example mirrors the Python HTI client pipeline used in production-style integrations. It performs three steps:
  1. Authenticate the API code
  2. Fetch the latest history snapshot
  3. Keep a real-time stream open for new ideas

Full Example

import json

import requests


class HTIApiClient:
    def __init__(self, server_url: str, api_code: str, timeout: int = 30):
        self.server_url = str(server_url or "").rstrip("/")
        self.api_code = str(api_code or "").strip()
        self.timeout = timeout
        self.session = requests.Session()

        if not self.server_url:
            raise ValueError("server_url is required")
        if not self.api_code:
            raise ValueError("api_code is required")

    def _headers(self) -> dict:
        return {
            "X-API-Code": self.api_code,
            "Accept": "application/json",
        }

    def _request(self, path: str, *, params: dict | None = None) -> dict:
        response = self.session.get(
            f"{self.server_url}{path}",
            headers=self._headers(),
            params=params,
            timeout=self.timeout,
        )

        try:
            payload = response.json()
        except Exception:
            payload = {"detail": response.text}

        if not response.ok:
            message = payload.get("detail") or payload.get("message") or response.text
            raise RuntimeError(f"HTTP {response.status_code}: {message}")

        return payload

    def authenticate(self) -> dict:
        return self._request("/client-api/hti/auth")

    def get_history(self, limit: int = 10) -> dict:
        return self._request("/client-api/hti/history", params={"limit": limit})

    def stream_realtime(self, poll_seconds: float = 2.0):
        response = self.session.get(
            f"{self.server_url}/client-api/hti/stream",
            params={"api_code": self.api_code, "poll_seconds": poll_seconds},
            headers={"Accept": "text/event-stream"},
            stream=True,
            timeout=(10, None),
        )

        if not response.ok:
            try:
                payload = response.json()
            except Exception:
                payload = {"detail": response.text}

            message = payload.get("detail") or payload.get("message") or response.text
            raise RuntimeError(f"HTTP {response.status_code}: {message}")

        event_name = "message"
        data_lines = []

        try:
            for raw_line in response.iter_lines(chunk_size=1, decode_unicode=True):
                if raw_line is None:
                    continue

                line = raw_line.strip()
                if not line:
                    if data_lines:
                        raw_payload = "\n".join(data_lines)
                        try:
                            payload = json.loads(raw_payload)
                        except Exception:
                            payload = raw_payload

                        yield {
                            "event": event_name,
                            "data": payload,
                        }
                        event_name = "message"
                        data_lines = []
                    continue

                if line.startswith(":"):
                    continue

                if line.startswith("event:"):
                    event_name = line.split(":", 1)[1].strip()
                    continue

                if line.startswith("data:"):
                    data_lines.append(line.split(":", 1)[1].strip())
        finally:
            response.close()


def print_divider(char: str = "=") -> None:
    print(char * 72)


def print_section(title: str) -> None:
    print()
    print_divider("=")
    print(title)
    print_divider("=")


def format_filter_values(values) -> str:
    if not values:
        return "ALL"
    return ", ".join(str(value) for value in values)


def print_profile(profile: dict) -> None:
    filters = profile.get("filters", {})
    print(f"Account           : {profile.get('account_name', '-')}")
    print(f"API Code          : {profile.get('api_code', '-')}")
    print(f"Validity          : {profile.get('validity_type', '-')}")
    print(f"Valid Until       : {profile.get('valid_until') or 'Unlimited'}")
    print(f"Req / Minute      : {profile.get('request_limit_per_minute', '-')}")
    print(f"Asset Classes     : {format_filter_values(filters.get('asset_classes'))}")
    print(f"Timeframes        : {format_filter_values(filters.get('timeframes'))}")
    print(f"Instruments       : {format_filter_values(filters.get('instruments'))}")
    print(f"Prefixes          : {format_filter_values(filters.get('prefixes'))}")
    print(f"Sides             : {format_filter_values(filters.get('sides'))}")
    print(f"Authenticated At  : {profile.get('authenticated_at', '-')}")


def print_idea(idea: dict, index: int | None = None) -> None:
    title = f"[{index}] {idea.get('INSTRUMENT', '-')}" if index is not None else str(idea.get("INSTRUMENT", "-"))
    print_divider("-")
    print(f"{title} | {idea.get('TIMEFRAME', '-')} | {idea.get('ASSET_CLASS', '-')}")
    print_divider("-")
    print(f"Signal            : {idea.get('SIDE', '-')} {idea.get('OPEN_ORDER_TYPE', '-')}")
    print(f"Confidence        : {idea.get('CONFIDENCE', '-')}")
    print(f"Estimated P/L     : {idea.get('ESTIMATED_PL', '-')}")
    print(f"Price             : {idea.get('PRICE', '-')}")
    print(f"Take Profit       : {idea.get('TAKE_PROFIT', '-')}")
    print(f"Stop Loss         : {idea.get('STOP_LOSS', '-')}")
    print(f"Prefix            : {idea.get('PREFIX', '-')}")
    print(f"Idea ID           : {idea.get('idea_id', idea.get('ID', '-'))}")
    print(f"Created At        : {idea.get('CREATION_TIME', '-')}")
    print(f"Last Update       : {idea.get('LAST_UPDATE', '-')}")
    print(f"Expiration Time   : {idea.get('EXPIRATION_TIME', '-')}")
    description = str(idea.get("DESCRIPTION", "")).strip()
    if description:
        print("Description       :")
        print(description)


def print_history(payload: dict) -> None:
    ideas = payload.get("ideas", [])
    print(f"Returned Items    : {payload.get('count', 0)}")
    print(f"Requested Limit   : {payload.get('limit', '-')}")
    print(f"Served At         : {payload.get('served_at', '-')}")

    if not ideas:
        print("\nNo history items returned.")
        return

    for index, idea in enumerate(ideas, start=1):
        print()
        print_idea(idea, index=index)


def print_stream_guide() -> None:
    print("READY  = stream connected")
    print("PING   = stream is still alive")
    print("IDEA   = new matching trade idea received")
    print("ERROR  = server closed the stream")


def print_stream_event(message: dict) -> None:
    event_name = str(message.get("event", "message")).upper()
    data = message.get("data")

    if event_name == "READY":
        print()
        print_divider("=")
        print("STREAM READY")
        print_divider("=")
        print(f"Connected At      : {data.get('connected_at', '-')}")
        print(f"Poll Seconds      : {data.get('poll_seconds', '-')}")
        profile = data.get("profile", {})
        if profile:
            print(f"Account           : {profile.get('account_name', '-')}")
            print(f"API Code          : {profile.get('api_code', '-')}")
        return

    if event_name == "PING":
        print(f"[PING] {data.get('ts', '-')}")
        return

    if event_name == "IDEA":
        print()
        print_divider("=")
        print("NEW IDEA RECEIVED")
        print_divider("=")
        print_idea(data)
        return

    if event_name == "ERROR":
        print()
        print_divider("=")
        print("STREAM ERROR")
        print_divider("=")
        print(data)
        return

    print()
    print_divider("=")
    print(f"EVENT: {event_name}")
    print_divider("=")
    print(data)


if __name__ == "__main__":
    SERVER_URL = "http://your-hti-environment:3500"
    CUSTOMER_API_CODE = "YOUR_API_CODE"

    client = HTIApiClient(server_url=SERVER_URL, api_code=CUSTOMER_API_CODE)

    print_section("AUTH")
    auth_payload = client.authenticate()
    profile = auth_payload.get("profile", {})
    print_profile(profile)

    print_section("HISTORY")
    history_payload = client.get_history(limit=10)
    print_history(history_payload)

    print_section("REALTIME")
    print_stream_guide()

    try:
        for message in client.stream_realtime(poll_seconds=2.0):
            print_stream_event(message)
    except KeyboardInterrupt:
        print("\nClient stopped.")

Notes

  • Keep the SERVER_URL exactly as provided for your environment
  • Replace only CUSTOMER_API_CODE
  • Run the script as a long-lived process if you want to keep receiving real-time ideas
  • Use the history block as the initial snapshot and the stream block for new arrivals