#!/usr/bin/env python
# encoding: utf-8
'''
@author: yfh
@file: increaseVolumeFromCsdn.py
@time: 2025/2/12 15:26
1.当前优化,将量化万。
'''
import baostock as bs
import pandas as pd
from datetime import datetime, timedelta
import logging
from datetime import datetime
# 配置logging模块
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S')
# 登录baostock系统
def login_baostock():
lg = bs.login()
if lg.error_code != '0':
print("登录失败:", lg.error_msg)
exit()
return lg
# 获取最近一个交易日
def get_last_trade_date():
today = datetime.now().strftime("%Y-%m-%d")
trade_dates = bs.query_trade_dates(start_date=today, end_date=today).get_data()
if trade_dates.empty or trade_dates.iloc[0]['is_trading_day'] == '0':
# 如果当天不是交易日,向前推算
last_trade_date = bs.query_trade_dates(
start_date=(datetime.now() - timedelta(days=10)).strftime("%Y-%m-%d"),
end_date=today
).get_data()
last_trade_date = last_trade_date[last_trade_date['is_trading_day'] == '1'].iloc[-1]['calendar_date']
else:
last_trade_date = today
return last_trade_date
# 获取当前日期的前一个交易日
def get_last_trade_date_yesterday():
# 获取昨天的日期
yesterday = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
trade_dates = bs.query_trade_dates(start_date=yesterday, end_date=yesterday).get_data()
if trade_dates.empty or trade_dates.iloc[0]['is_trading_day'] == '0':
# 如果当天不是交易日,向前推算
last_trade_date = bs.query_trade_dates(
start_date=(datetime.now() - timedelta(days=10)).strftime("%Y-%m-%d"),
end_date=yesterday
).get_data()
last_trade_date = last_trade_date[last_trade_date['is_trading_day'] == '1'].iloc[-1]['calendar_date']
else:
last_trade_date = yesterday
return last_trade_date
# 获取所有A股股票代码
# def get_all_stock_codes(last_trade_date):
# rs = bs.query_all_stock(day=last_trade_date) # 使用最近一个交易日获取股票列表
# stock_list = []
# while (rs.error_code == '0') & rs.next():
# stock_code = rs.get_row_data()[0] # 股票代码格式如:sh.600000
# stock_list.append(stock_code)
# return stock_list
# 获取所有A股股票代码(剔除指数代码)
def get_all_stock_codes(last_trade_date):
rs = bs.query_all_stock(day=last_trade_date) # 使用最近一个交易日获取股票列表
stock_list = []
while (rs.error_code == '0') & rs.next():
stock_code = rs.get_row_data()[0] # 股票代码格式如:sh.600000
# 剔除指数代码(指数代码通常以 sh.000 或 sz.399 开头)
if not (stock_code.startswith("sh.000") or stock_code.startswith("sz.399")):
stock_list.append(stock_code)
return stock_list
# 获取股票名称
def get_stock_name(stock_code):
stock_info = bs.query_stock_basic(code=stock_code).get_data()
if stock_info.empty:
return None
if 'code_name' in stock_info.columns: # baostock返回的字段可能是code_name
return stock_info.iloc[0]['code_name']
elif 'stock_name' in stock_info.columns: # 也可能是stock_name
return stock_info.iloc[0]['stock_name']
return None
# 剔除不符合条件的股票(北交所、科创板、ST股票)
def filter_stock(stock_code):
# 剔除北交所股票(代码以 bj. 开头)
if stock_code.startswith("bj."):
return False
# 剔除科创板股票(代码以 sh.688 开头)
if stock_code.startswith("sh.688"):
return False
# 获取股票名称并剔除ST股票
stock_name = get_stock_name(stock_code)
if stock_name is None:
return False
if "ST" in stock_name or "*ST" in stock_name:
return False
return True
# rs = bs.query_history_k_data_plus("sh.600000",
# "date,code,open,high,low,close,preclose,volume,amount,adjustflag,turn,tradestatus,pctChg,isST",
# start_date='2025-02-10',
# end_date='2025-02-10',
# frequency="d",
# adjustflag="3")
# 计算成交量放大倍数
def calculate_volume_multiple(stock_code, target_date):
# print(f"calculate_volume_multiple ====== stock_code is {stock_code}")
# 查询目标日期的成交量
k_data = bs.query_history_k_data_plus(
code=stock_code,
fields="date,volume",
start_date=target_date,
end_date=target_date,
frequency="d",
adjustflag="3"
).get_data()
# print(f"calculate_volume_multiple ====== k_data is {k_data}")
# 检查是否有数据,以及成交量是否为空
if k_data.empty or k_data.iloc[0]['volume'] == '':
return None # 跳过无数据或停牌的股票
current_volume = float(k_data.iloc[0]['volume'])
current_date = k_data.iloc[0]['date']
# 获取前5个交易日的平均成交量
# 计算前5个交易日的起始日期(需手动调整,避免非交易日)
start_date = (datetime.strptime(current_date, "%Y-%m-%d") - timedelta(days=10)).strftime("%Y-%m-%d")
# print(f"calculate_volume_multiple ====== start_date is {start_date}")
df_history = bs.query_history_k_data_plus(
code=stock_code,
fields="date,volume",
start_date=start_date,
end_date=current_date,
frequency="d",
adjustflag="3"
).get_data()
# print(f"calculate_volume_multiple ====== df_history is {df_history}")
# 过滤掉空值数据
df_history = df_history[df_history['volume'] != '']
# 取前5日(排除目标日期自身)
# df_history = df_history[df_history['date'] < current_date]
df_history = df_history[df_history['date'] <= current_date]
if len(df_history) < 5:
return None # 忽略上市不足4日的新股
# 计算前5日平均成交量
avg_volume = df_history.tail(5)['volume'].astype(float).mean()
# print(f"calculate_volume_multiple ====== avg_volume is {avg_volume}")
# 计算成交量放大倍数
if avg_volume == 0:
return None # 避免除以零
volume_multiple = current_volume / avg_volume
# 获取股票名称
stock_name = get_stock_name(stock_code)
if stock_name is None:
return None
# volume 成交量(累计 单位:股) 万手要处理 6 位数
wanshou="万手"
current_volume_string = str(round(current_volume/1000000, 2)) + wanshou
avg_volume_string = str(round(avg_volume/1000000, 2)) + wanshou
return {
"股票代码": stock_code,
"股票名称": stock_name,
"日期": current_date,
"当日成交量": current_volume_string,
"前5日平均成交量": avg_volume_string,
"成交量倍数": round(volume_multiple, 2)
}
def is_before_6pm():
now = datetime.now()
current_hour = now.hour
return current_hour < 18
def get_trade_date():
if is_before_6pm():
# 小于当天18点,取前一天。baostock库交易日18时之前,日 K 数据还未入库,所以找前一天。
last_trade_date = get_last_trade_date_yesterday()
else:
# 大于当天18点,交易日取前当天
last_trade_date = get_last_trade_date()
return last_trade_date
# 主函数
def main():
# 登录baostock
login_baostock()
# 获取最近一个交易日
last_trade_date = get_trade_date()
print(f"last_trade_date is {last_trade_date}")
# 查询日期,可以包含多个,当前只查一天
target_dates = [last_trade_date]
# 获取所有A股股票代码
stock_list = get_all_stock_codes(last_trade_date)
# 筛选符合条件的股票
result = []
for code in stock_list:
if not filter_stock(code):
continue
for date in target_dates:
volume_data = calculate_volume_multiple(code, date)
if volume_data and volume_data['成交量倍数'] >= 3:
result.append(volume_data)
# 将结果转换为DataFrame并排序
if result:
df_result = pd.DataFrame(result)
df_result = df_result.sort_values(by="成交量倍数", ascending=False) # 按成交量倍数倒序排序
# 输出到Excel文件
local_path = "D:/name/tools/bat/volumeIncrease"
output_file = f"{local_path}/成交量放大股票_{last_trade_date}.xlsx"
df_result.to_excel(output_file, index=False)
print(f"结果已保存到文件: {output_file}")
print("\n符合条件的股票列表:")
print(df_result.to_string(index=False))
else:
print("未找到符合条件的股票。")
# 登出系统
bs.logout()
def main2():
# 登录baostock
login_baostock()
# yesterday = get_last_trade_date_yesterday()
yesterday = "2025-02-06"
print(f"yesterday trade date is {yesterday}")
trade_dates = [yesterday]
# 获取所有A股股票代码
stock_list = get_all_stock_codes(yesterday)
# 使用切片来获取前十条元素
# 注意:如果列表长度小于10,这会返回整个列表
first_ten = stock_list[:30]
print(first_ten)
result = []
for code in first_ten:
if not filter_stock(code):
continue
for date in trade_dates:
volume_data = calculate_volume_multiple(code, date)
print(volume_data)
# if volume_data and volume_data['成交量倍数'] >= 3:
# result.append(volume_data)
# 登出系统
bs.logout()
def main3():
# 登录baostock
login_baostock()
# 获取最近一个交易日
last_trade_date = get_trade_date()
print(f"last_trade_date is {last_trade_date}")
kk = calculate_volume_multiple("sh.600666", "2025-02-11")
print(kk)
# 登出系统
bs.logout()
# 程序入口
if __name__ == "__main__":
logging.info("程序开始执行...")
main()
# 记录另一条信息日志
logging.info("程序执行完成。")