港股实时行情API接入全流程
港股行情接口支持两种访问方式:
-
HTTP 接口:
适合拉取历史数据、K线数据或新股上市清单等非实时场景。 -
WebSocket 实时接口:
用于订阅实时推送的行情数据,包括逐笔成交、盘口和K线数据。该方式延迟极低,更适合高频交易或量化研究使用。
HTTP 接口示例
HTTP 接口通常用于按需拉取数据,例如历史K线、新股列表等。下面示例演示如何使用HTTP方式查询指定股票的K线。
请求示例
import requests# 申请API KEY: www.infoway.io
api_url = 'https://data.infoway.io/stock/batch_kline/1/10/002594.SZ%2C00285.HK%2CTSLA.US'# 设置请求头
headers = {'User-Agent': 'Mozilla/5.0','Accept': 'application/json','apiKey': 'yourApikey'
}# 发送GET请求
response = requests.get(api_url, headers=headers)# 输出结果
print(f"HTTP code: {response.status_code}")
print(f"message: {response.text}")
请求地址:
data.infoway.io/stock/batch_kline/{klineType}/{klineNum}/{codes}
| 参数名 | 类型 | 示例 | 说明 |
|---|---|---|---|
| klineType | string | 1 | K线周期,支持分钟、日、周、月、年,详细请看官方文档。 |
| klineNum | string | 10 | 返回的K线数量,一次支持最多500根K线 |
| codes | string | 01810.HK | 股票代码,同时支持A股、港股、美股 |
响应示例
{"s": "00005.HK", //产品代码"respList": [{"t": "1752825540", //秒时间戳(UTC+8)"h": "98.250", //最高价"o": "98.200", //开盘价"l": "98.150", //最低价"c": "98.150", //收盘价"v": "44000", //成交量"vw": "4320240.000", //成交额"pc": "-0.05%", //涨跌幅"pca": "-0.050" //涨跌额}]
}
WebSocket 实时行情接口
如果你的应用需要实时推送的数据,例如自动交易系统或高频策略,建议使用 WebSocket 实时接口。WebSocket 能够持续接收服务器推送的最新行情更新,延迟通常在毫秒级。
1. 建立连接
WS订阅地址如下:
股票产品订阅地址:
wss://data.infoway.io/ws?business=stock&apikey=YourAPIKey数字币产品订阅地址:
wss://data.infoway.io/ws?business=crypto&apikey=YourAPIKey外汇、期货等产品订阅地址:
wss://data.infoway.io/ws?business=common&apikey=YourAPIKey
2. 订阅示例:小米集团(1810.HK)
下面的示例展示如何通过 WebSocket 一次性订阅小米集团的 K线、逐笔成交、盘口数据。
示例代码包含了心跳机制和自动重连逻辑,确保连接的稳定性。
import json
import time
import schedule
import threading
import websocket
from loguru import loggerclass WebsocketExample:def __init__(self):self.session = Noneself.ws_url = "wss://data.infoway.io/ws?business=stock&apikey=yourApikey" # 申请API KEY: www.infoway.ioself.reconnecting = Falseself.is_ws_connected = False # 添加连接状态标志def connect_all(self):"""建立WebSocket连接并启动自动重连机制"""try:self.connect(self.ws_url)self.start_reconnection(self.ws_url)except Exception as e:logger.error(f"Failed to connect to {self.ws_url}: {str(e)}")def start_reconnection(self, url):"""启动定时重连检查"""def check_connection():if not self.is_connected():logger.debug("Reconnection attempt...")self.connect(url)# 使用线程定期检查连接状态schedule.every(10).seconds.do(check_connection)def run_scheduler():while True:schedule.run_pending()time.sleep(1)threading.Thread(target=run_scheduler, daemon=True).start()def is_connected(self):"""检查WebSocket连接状态"""return self.session and self.is_ws_connecteddef connect(self, url):"""建立WebSocket连接"""try:if self.is_connected():self.session.close()self.session = websocket.WebSocketApp(url,on_open=self.on_open,on_message=self.on_message,on_error=self.on_error,on_close=self.on_close)# 启动WebSocket连接(非阻塞模式)threading.Thread(target=self.session.run_forever, daemon=True).start()except Exception as e:logger.error(f"Failed to connect to the server: {str(e)}")def on_open(self, ws):"""WebSocket连接建立成功后的回调"""logger.info(f"Connection opened")self.is_ws_connected = True # 设置连接状态为Truetry:# 发送实时成交明细订阅请求trade_send_obj = {"code": 10000,"trace": "01213e9d-90a0-426e-a380-ebed633cba7a","data": {"codes": "01810.HK"}}self.send_message(trade_send_obj)# 不同请求之间间隔一段时间time.sleep(5)# 发送实时盘口数据订阅请求depth_send_obj = {"code": 10003,"trace": "01213e9d-90a0-426e-a380-ebed633cba7a","data": {"codes": "01810.HK"}}self.send_message(depth_send_obj)# 不同请求之间间隔一段时间time.sleep(5)# 发送实时K线数据订阅请求kline_data = {"arr": [{"type": 1,"codes": "01810.HK"}]}kline_send_obj = {"code": 10006,"trace": "01213e9d-90a0-426e-a380-ebed633cba7a","data": kline_data}self.send_message(kline_send_obj)# 启动定时心跳任务schedule.every(30).seconds.do(self.ping)except Exception as e:logger.error(f"Error sending initial messages: {str(e)}")def on_message(self, ws, message):"""接收消息的回调"""try:logger.info(f"Message received: {message}")except Exception as e:logger.error(f"Error processing message: {str(e)}")def on_close(self, ws, close_status_code, close_msg):"""连接关闭的回调"""logger.info(f"Connection closed: {close_status_code} - {close_msg}")self.is_ws_connected = False # 设置连接状态为Falsedef on_error(self, ws, error):"""错误处理的回调"""logger.error(f"WebSocket error: {str(error)}")self.is_ws_connected = False # 发生错误时设置连接状态为Falsedef send_message(self, message_obj):"""发送消息到WebSocket服务器"""if self.is_connected():try:self.session.send(json.dumps(message_obj))except Exception as e:logger.error(f"Error sending message: {str(e)}")else:logger.warning("Cannot send message: Not connected")def ping(self):"""发送心跳包"""ping_obj = {"code": 10010,"trace": "01213e9d-90a0-426e-a380-ebed633cba7a"}self.send_message(ping_obj)# 使用示例
if __name__ == "__main__":ws_client = WebsocketExample()ws_client.connect_all()# 保持主线程运行try:while True:schedule.run_pending()time.sleep(1)except KeyboardInterrupt:logger.info("Exiting...")if ws_client.is_connected():ws_client.session.close()
4. 心跳机制
为保持连接活跃,客户端需要每隔 15 秒 发送一次心跳包:
{"action": "ping"}
服务器会返回:
{"action": "pong"}
如果超过 30 秒未收到 pong 响应,应主动重连。
更多接入教程可以参考这个Github项目
