import requests import json import pandas as pd from wordcloud import WordCloud import matplotlib.pyplot as plt def get_fund_flow_data(): """获取资金流向数据""" # 修改 pz 参数为 50,获取更多数据 url = "https://push2.eastmoney.com/api/qt/clist/get?cb=jQuery112309006219872217847_1740232310161&fid=f62&po=1&pz=50&pn=1&np=1&fltt=2&invt=2&ut=8dec03ba335b81bf4ebdf7b29ec27d15&fs=m%3A90+t%3A2&fields=f12%2Cf14%2Cf2%2Cf3%2Cf62%2Cf184%2Cf66%2Cf69%2Cf72%2Cf75%2Cf78%2Cf81%2Cf84%2Cf87%2Cf204%2Cf205%2Cf124%2Cf1%2Cf13" headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', } try: response = requests.get(url, headers=headers) response_text = response.text[response.text.find('(') + 1: response.text.rfind(')')] data = json.loads(response_text) if 'data' not in data or 'diff' not in data['data']: raise Exception("未能获取到数据") items = data['data']['diff'] parsed_data = [] for item in items: net_flow = item['f62'] - item['f66'] parsed_data.append({ '板块': item['f14'], '净流入': net_flow, }) df = pd.DataFrame(parsed_data) return df except Exception as e: print(f"获取数据时发生错误: {str(e)}") return None def generate_wordcloud(data): """生成词云图,增加层次感""" # 获取所有板块及其净流入值 sectors = data['板块'].tolist() net_flows = data['净流入'].tolist() # 将负值转换为正值,保持相对大小关系 min_flow = min(net_flows) if min_flow < 0: net_flows = [flow - min_flow + 1 for flow in net_flows] # 加1避免出现0值 # 准备词云输入数据 wordcloud_data = {sectors[i]: net_flows[i] for i in range(len(sectors))} # 修改词云参数以突出层次感 wordcloud = WordCloud( width=1600, # 增加宽度 height=1000, # 增加高度 background_color='white', font_path='C:\\Windows\\Fonts\\msyh.ttc', min_font_size=8, # 设置最小字体大小 max_font_size=100, # 设置最大字体大小 max_words=50, # 显示最多50个词 relative_scaling=0.8, # 增加层次感,调整词大小的相对比例 colormap='coolwarm', # 使用渐变色方案增加层次感 contour_color='black', # 边缘色 contour_width=2 # 边缘宽度 ).generate_from_frequencies(wordcloud_data) # 显示词云 plt.figure(figsize=(16, 10)) # 增加图像大小 plt.imshow(wordcloud, interpolation='bilinear') plt.axis('off') plt.show() def main(): print("正在获取资金流向数据...") data = get_fund_flow_data() if data is not None: print("正在生成词云图...") generate_wordcloud(data) else: print("获取数据失败,请检查网络连接或接口可用性。") if __name__ == "__main__": main()