使用WebSocket实时获取印度股票数据源(无调用次数限制)实战
使用WebSocket实时获取印度股票数据源(无调用次数限制)实战
一、前置准备
1. 获取API密钥
登录 StockTV开发者平台 → 联系客服获取测试Key(格式MY4b781f618e3f43c4b055f25fa61941ad
),该密钥无调用次数限制且支持实时数据持续订阅。
2. 安装Python依赖
pip install websocket-client json pandas
二、核心代码实现
1. 建立WebSocket连接
import websocket
import json
import timeAPI_KEY = "YOUR_API_KEY"
WS_URL = f"wss://ws-api.stocktv.top/connect?key={API_KEY}"def on_message(ws, message):"""处理实时行情推送"""data = json.loads(message)if data.get('type') == 'stock':print(f"[{data['symbol']}] 价格: {data['last']} 涨跌幅: {data['pcp']}%")def on_error(ws, error):print(f"连接异常: {error}")def on_close(ws, close_status_code, close_msg):print(f"连接关闭: {close_msg}")def on_open(ws):"""连接成功后订阅股票"""subscribe_msg = json.dumps({"action": "subscribe","symbols": ["RELIANCE", "NSEI"] # 印度信实工业/Nifty50指数})ws.send(subscribe_msg)print("订阅成功,开始接收实时数据...")
2. 启动实时监听(含自动重连)
def start_websocket():while True:try:ws = websocket.WebSocketApp(WS_URL,on_message=on_message,on_error=on_error,on_close=on_close)ws.on_open = on_openws.run_forever()except Exception as e:print(f"连接异常,5秒后重连: {str(e)}")time.sleep(5)# 启动线程持续运行
import threading
threading.Thread(target=start_websocket, daemon=True).start()
3. 添加心跳机制(保持长连接)
def send_heartbeat(ws):"""每30秒发送心跳包"""while True:try:ws.send(json.dumps({"action": "ping"}))time.sleep(30)except Exception as e:break# 在on_open函数中启动心跳线程
def on_open(ws):# ...原有订阅代码...threading.Thread(target=send_heartbeat, args=(ws,), daemon=True).start()
三、实时数据示例输出
订阅成功,开始接收实时数据...
[RELIANCE] 价格: 2856.15 涨跌幅: +1.23%
[NSEI] 价格: 22985.40 涨跌幅: +0.75%
[RELIANCE] 价格: 2857.80 涨跌幅: +1.35%
四、关键参数说明
字段 | 说明 | 示例值 |
---|---|---|
symbol | 股票/指数代码 | RELIANCE, NSEI |
last | 最新成交价 | 2856.15 |
pcp | 涨跌幅百分比(自动带±号) | +1.23% |
volume | 成交量(股) | 1254875 |
timestamp | 数据时间戳(Unix毫秒级) | 1725002394123 |
五、注意事项
-
连接稳定性
通过自动重连机制+心跳包保障7×24小时持续运行 -
数据时效性
印度市场交易时段为IST 9:15-15:30(北京时间11:45-18:00),非交易时段无实时数据推送 -
性能优化
建议使用异步处理框架(如asyncio
)避免数据堆积,实测单连接可承载100+标的实时推送