import requests import pandas as pd import json from datetime import datetime, timedelta import time import matplotlib.pyplot as plt import matplotlib # 设置全局字体 matplotlib.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'SimSun', 'Arial Unicode MS'] matplotlib.rcParams['axes.unicode_minus'] = False # 用来正常显示负号 import numpy as np import tkinter as tk from matplotlib.backends.backend_tkagg import FigureCanvasTkAgg from matplotlib.figure import Figure def get_stock_data(stock_code, market_code, days=90): """获取股票历史数据,限制为过去90天的数据""" url = "http://push2his.eastmoney.com/api/qt/stock/kline/get" # 获取最近90天的数据 end_date = datetime.now() start_date = end_date - timedelta(days=days) # 格式化日期 beg = start_date.strftime('%Y%m%d') end = end_date.strftime('%Y%m%d') params = { 'secid': f'{market_code}.{stock_code}', 'fields1': 'f1,f2,f3,f4,f5,f6,f7,f8,f9,f10,f11,f12,f13', 'fields2': 'f51,f52,f53,f54,f55,f56,f57,f58,f59,f60,f61', 'klt': '101', # 101表示日K线数据 'fqt': '0', # 不复权 'beg': beg, # 起始日期 'end': end, # 结束日期 'lmt': days, # 获取的天数 '_': int(time.time() * 1000) } headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' } try: response = requests.get(url, params=params, headers=headers) data = json.loads(response.text) if data['data'] is None: raise Exception(f"未能获取到{stock_code}的数据") stock_data = data['data']['klines'] parsed_data = [] for item in stock_data: values = item.split(',') parsed_data.append({ '日期': values[0], '开盘价': float(values[1]), '收盘价': float(values[2]), '最高价': float(values[3]), '最低价': float(values[4]), '成交量': float(values[5]), '成交额': float(values[6]), '振幅': float(values[7]), '涨跌幅': float(values[8]), '涨跌额': float(values[9]), '换手率': float(values[10]) }) df = pd.DataFrame(parsed_data) df['RSI'] = calculate_rsi(df['收盘价'].values) # 返回90天的数据,但仅保留最后30天的数据用于可视化 return df.tail(30) # 只返回最后30天的数据 except Exception as e: print(f"获取数据时发生错误: {str(e)}") return None def calculate_rsi(prices, periods=14): """计算RSI指标""" rsi = np.zeros_like(prices) if len(prices) <= periods: return rsi changes = np.diff(prices) gains = np.where(changes > 0, changes, 0) losses = np.where(changes < 0, -changes, 0) avg_gain = np.mean(gains[:periods]) avg_loss = np.mean(losses[:periods]) if avg_loss == 0: rsi[periods] = 100 else: rs = avg_gain / avg_loss rsi[periods] = 100 - (100 / (1 + rs)) for i in range(periods + 1, len(prices)): avg_gain = (avg_gain * (periods - 1) + gains[i - 1]) / periods avg_loss = (avg_loss * (periods - 1) + losses[i - 1]) / periods if avg_loss == 0: rsi[i] = 100 else: rs = avg_gain / avg_loss rsi[i] = 100 - (100 / (1 + rs)) # 确保RSI值在0到100之间 rsi[i] = max(0, min(rsi[i], 100)) return rsi def create_visualization_window(rsi_data): """创建可视化窗口""" root = tk.Tk() root.title("股票指数 RSI走势") # 设置标题 root.geometry("1200x800") root.configure(bg="#f0f0f0") # 设置背景颜色 # 创建Figure fig = Figure(figsize=(12, 8), dpi=100) ax = fig.add_subplot(111) # 设置样式 ax.set_facecolor('#f0f0f0') fig.patch.set_facecolor('white') # 绘制RSI线 colors = ['#2878B5', '#9AC9DB', '#C82423'] # 设置不同的颜色 for (index_name, data), color in zip(rsi_data.items(), colors): dates = pd.to_datetime(data['日期']) ax.plot(dates, data['RSI'], label=index_name, color=color, linewidth=2, marker='o', markersize=4) # 添加过度买入/卖出区域 ax.axhline(y=70, color='#C82423', linestyle='--', alpha=0.3) ax.axhline(y=30, color='#2878B5', linestyle='--', alpha=0.3) ax.fill_between(ax.get_xlim(), 70, 100, color='#C82423', alpha=0.1) ax.fill_between(ax.get_xlim(), 0, 30, color='#2878B5', alpha=0.1) # 设置y轴范围 ax.set_ylim(0, 100) # 设置标题和标签 ax.set_title('股票指数 RSI走势', fontsize=16, pad=20) ax.set_xlabel('日期', fontsize=12) ax.set_ylabel('RSI值', fontsize=12) # 设置图例 ax.legend(loc='best', fontsize=10) # 设置网格 ax.grid(True, linestyle='--', alpha=0.3) # 调整x轴标签 plt.setp(ax.get_xticklabels(), rotation=45) # 调整布局 fig.tight_layout() # 创建canvas canvas = FigureCanvasTkAgg(fig, master=root) canvas.draw() canvas.get_tk_widget().pack(side=tk.TOP, fill=tk.BOTH, expand=1) # 添加关闭按钮 close_button = tk.Button(root, text="关闭", command=root.quit, font=('SimHei', 12), bg='#C82423', fg='white', padx=10, pady=5) close_button.pack(side=tk.BOTTOM, pady=10) return root def save_to_excel(df, filename): """安全地保存数据到Excel文件""" try: # 尝试在data子目录中保存文件 import os data_dir = 'data' # 创建data目录(如果不存在) if not os.path.exists(data_dir): os.makedirs(data_dir) # 构建完整的文件路径 file_path = os.path.join(data_dir, filename) # 如果文件已存在,尝试生成一个新的文件名 base_name = os.path.splitext(filename)[0] extension = os.path.splitext(filename)[1] counter = 1 while os.path.exists(file_path): new_filename = f"{base_name}_{counter}{extension}" file_path = os.path.join(data_dir, new_filename) counter += 1 # 保存文件 df.to_excel(file_path, index=False) print(f"数据已成功保存到: {file_path}") return True except Exception as e: print(f"保存文件时发生错误: {str(e)}") return False def main(): # 爬取多个指数的数据 indices = [ {'code': '000001', 'market': 1, 'name': '上证指数'}, {'code': '399001', 'market': 0, 'name': '深证成指'}, {'code': '399006', 'market': 0, 'name': '创业板指'} ] # 存储多个指数的数据 rsi_data = {} current_date = datetime.now().strftime('%Y%m%d') # 爬取数据 for index in indices: df = get_stock_data(index['code'], index['market'], days=90) # 获取90天的数据 if df is not None: # 保存完整数据到Excel output_file = f'stock_data_{index["code"]}_{current_date}.xlsx' if save_to_excel(df, output_file): # 保存RSI数据用于可视化 rsi_data[index['name']] = df[['日期', 'RSI']] # 保存RSI数据到单独的Excel rsi_df = pd.DataFrame() first_dates = None for index_name, data in rsi_data.items(): if first_dates is None: first_dates = data['日期'] rsi_df['日期'] = first_dates rsi_df[f'{index_name}_RSI'] = data['RSI'] rsi_output = f'RSI_data_{current_date}.xlsx' if save_to_excel(rsi_df, rsi_output): # 创建并显示股票指数的RSI折线图 root = create_visualization_window(rsi_data) root.mainloop() if __name__ == "__main__": main()