Files
GoFundBot/Backend/fund_api.py
2026-01-18 23:55:25 +08:00

569 lines
23 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import requests
import json
import re
from datetime import datetime
from typing import Dict, List, Any, Union
from stock_service import StockService
# --- 数据清洗器 (原 api_handler.py) ---
class FundDataCleaner:
def __init__(self):
self.cleaned_data = {}
self.stock_service = StockService()
def clean_js_variable(self, value: str) -> Any:
"""清洗JavaScript变量值"""
if value is None:
return None
value_str = str(value).strip()
# 处理布尔值
if value_str.lower() in ['true', 'false']:
return value_str.lower() == 'true'
# 处理数字
if re.match(r'^-?\d+\.?\d*$', value_str):
try:
return float(value_str) if '.' in value_str else int(value_str)
except (ValueError, TypeError):
return value_str
# 处理字符串(去除引号)
if (value_str.startswith('"') and value_str.endswith('"')) or \
(value_str.startswith("'") and value_str.endswith("'")):
return value_str[1:-1]
return value_str
def parse_timestamp(self, timestamp: int) -> str:
"""将时间戳转换为日期字符串"""
try:
return datetime.fromtimestamp(timestamp / 1000).strftime('%Y-%m-%d')
except (ValueError, TypeError):
return str(timestamp)
def clean_rate(self, value: Any) -> Any:
"""清洗费率数据,统一返回数字或 None"""
if value is None:
return None
value_str = str(value).strip()
if not value_str or value_str in ['--', '-', 'null', 'undefined']:
return None
value_str = value_str.replace('%', '').strip()
try:
return float(value_str)
except (ValueError, TypeError):
return None
def clean_array_data(self, data: Any, data_type: str = 'general') -> Any:
"""清洗数组数据"""
if not data:
return []
if data_type == 'net_worth':
# 处理单位净值走势数据
cleaned = []
for item in data:
if isinstance(item, dict):
cleaned.append({
'date': self.parse_timestamp(item.get('x')),
'net_worth': item.get('y'),
'equity_return': item.get('equityReturn'),
'dividend': item.get('unitMoney')
})
# 过滤首日异常数据如面值1.0与实际净值100+差异巨大)
# 新成立ETF常常第一天显示面值1.0,第二天显示实际参考净值(如100),导致计算涨幅异常
if len(cleaned) >= 2:
try:
v0 = float(cleaned[0]['net_worth'])
v1 = float(cleaned[1]['net_worth'])
if v0 > 0 and abs((v1 - v0) / v0) > 0.5:
cleaned.pop(0)
except (ValueError, TypeError):
pass
return cleaned
elif data_type == 'position':
# 处理股票仓位数据
cleaned = []
for item in data:
if isinstance(item, list) and len(item) >= 2:
cleaned.append({
'date': self.parse_timestamp(item[0]),
'position_percentage': item[1]
})
return cleaned
elif data_type == 'performance':
# 处理业绩比较数据
cleaned = []
for item in data:
if isinstance(item, dict):
series_data = []
for data_point in item.get('data', []):
if isinstance(data_point, list) and len(data_point) >= 2:
series_data.append({
'date': self.parse_timestamp(data_point[0]),
'value': data_point[1]
})
cleaned.append({
'name': item.get('name'),
'data': series_data
})
return cleaned
elif data_type == 'ranking':
# 处理排名数据
cleaned = []
for item in data:
if isinstance(item, dict):
cleaned.append({
'date': self.parse_timestamp(item.get('x')),
'rank': item.get('y'),
'total_funds': item.get('sc')
})
return cleaned
else:
# 通用数组处理
return [self.clean_js_variable(item) for item in data]
def clean_fund_info(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""清洗基金基本信息"""
fund_code = self.clean_js_variable(raw_data.get('fS_code'))
# 尝试从本地缓存中获取基金类型
fund_type = raw_data.get('fund_type_from_cache')
if not fund_type:
fund_type = '混合型' # 默认值
info = {
'fund_name': self.clean_js_variable(raw_data.get('fS_name')),
'fund_code': fund_code,
'fund_type': fund_type,
'original_rate': self.clean_rate(raw_data.get('fund_sourceRate')),
'current_rate': self.clean_rate(raw_data.get('fund_Rate')),
'min_subscription_amount': self.clean_js_variable(raw_data.get('fund_minsg')),
'is_hb': self.clean_js_variable(raw_data.get('ishb'))
}
return info
def clean_performance_data(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""清洗业绩数据"""
performance = {
'1_year_return': self.clean_js_variable(raw_data.get('syl_1n')),
'6_month_return': self.clean_js_variable(raw_data.get('syl_6y')),
'3_month_return': self.clean_js_variable(raw_data.get('syl_3y')),
'1_month_return': self.clean_js_variable(raw_data.get('syl_1y'))
}
return performance
def clean_portfolio_data(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""清洗投资组合数据"""
# 获取原始代码列表
stock_codes_raw = self.clean_array_data(raw_data.get('stockCodes'))
# 转换为包含名称的对象列表
enriched_stocks = []
if stock_codes_raw:
for code in stock_codes_raw:
try:
stock_info = self.stock_service.get_stock_info(code)
display_code = self.stock_service.normalize_code(code)
enriched_stocks.append({
'code': display_code,
'original_code': code,
'name': stock_info.get('name', 'Unknown'),
'market': stock_info.get('market', '--'),
'ratio': 0 # 数据源缺失占比设为0
})
except Exception as e:
print(f"Error processing stock code {code}: {e}")
enriched_stocks.append({'code': str(code), 'name': 'Unknown', 'market': '--', 'ratio': 0})
portfolio = {
'stock_codes': enriched_stocks,
'bond_codes': self.clean_array_data(raw_data.get('zqCodes')),
# 为了让前端统一使用 enriched_stocks我们将 stock_codes_new 也设为同样的数据
# 或者是 None让前端回退到 stock_codes。
# 鉴于 stock_codes_new 格式复杂 (116.xxxx),我们直接用处理好的数据覆盖它
'stock_codes_new': enriched_stocks,
'bond_codes_new': self.clean_array_data(raw_data.get('zqCodesNew'))
}
return portfolio
def clean_asset_allocation(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""清洗资产配置数据"""
asset_data = raw_data.get('Data_assetAllocation', {})
cleaned = {
'categories': asset_data.get('categories', []),
'series': []
}
for series in asset_data.get('series', []):
cleaned_series = {
'name': series.get('name'),
'type': series.get('type'),
'data': series.get('data', []),
'yAxis': series.get('yAxis')
}
cleaned['series'].append(cleaned_series)
return cleaned
def clean_fund_manager(self, raw_data: Dict[str, Any]) -> List[Dict[str, Any]]:
"""清洗基金经理数据"""
managers_data = raw_data.get('Data_currentFundManager', [])
cleaned_managers = []
for manager in managers_data:
cleaned_manager = {
'id': manager.get('id'),
'name': manager.get('name'),
'photo_url': manager.get('pic'),
'star_rating': manager.get('star'),
'work_experience': manager.get('workTime'),
'managed_fund_size': manager.get('fundSize'),
'ability_assessment': {
'average_score': manager.get('power', {}).get('avr'),
'categories': manager.get('power', {}).get('categories', []),
'scores': manager.get('power', {}).get('data', []),
'assessment_date': manager.get('power', {}).get('jzrq')
},
'performance': {
'categories': manager.get('profit', {}).get('categories', []),
'series': manager.get('profit', {}).get('series', []),
'assessment_date': manager.get('profit', {}).get('jzrq')
}
}
cleaned_managers.append(cleaned_manager)
return cleaned_managers
def clean_holder_structure(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""清洗持有人结构数据"""
holder_data = raw_data.get('Data_holderStructure', {})
cleaned = {
'categories': holder_data.get('categories', []),
'series': []
}
for series in holder_data.get('series', []):
cleaned_series = {
'name': series.get('name'),
'data': series.get('data', [])
}
cleaned['series'].append(cleaned_series)
return cleaned
def clean_same_type_funds(self, raw_data: Dict[str, Any]) -> List[List[Dict[str, Any]]]:
"""清洗同类型基金数据"""
same_type_data = raw_data.get('swithSameType', [])
cleaned_categories = []
for category in same_type_data:
cleaned_funds = []
for fund_str in category:
parts = fund_str.split('_')
if len(parts) >= 3:
fund_info = {
'code': parts[0],
'name': parts[1],
'return_rate': self.clean_js_variable(parts[2])
}
cleaned_funds.append(fund_info)
cleaned_categories.append(cleaned_funds)
return cleaned_categories
def clean_all_data(self, raw_data: Dict[str, Any]) -> Dict[str, Any]:
"""清洗所有数据"""
cleaned_data = {
'basic_info': self.clean_fund_info(raw_data),
'performance': self.clean_performance_data(raw_data),
'portfolio': self.clean_portfolio_data(raw_data),
# 实时估值数据(来自 fundgz 接口)
'realtime_estimate': {
'name': raw_data.get('name'), # 基金名称
'fund_code': raw_data.get('fundcode'), # 基金代码
'net_worth': raw_data.get('dwjz'), # 单位净值
'net_worth_date': raw_data.get('jzrq'), # 净值日期
'estimate_value': raw_data.get('gsz'), # 估算净值
'estimate_change': raw_data.get('gszzl'), # 估算涨跌幅
'estimate_time': raw_data.get('gztime'), # 估值时间
},
'net_worth_trend': self.clean_array_data(
raw_data.get('Data_netWorthTrend'), 'net_worth'
),
'accumulated_net_worth': self.clean_array_data(
raw_data.get('Data_ACWorthTrend'), 'position'
),
'position_trend': self.clean_array_data(
raw_data.get('Data_fundSharesPositions'), 'position'
),
'total_return_trend': self.clean_array_data(
raw_data.get('Data_grandTotal'), 'performance'
),
'ranking_trend': self.clean_array_data(
raw_data.get('Data_rateInSimilarType'), 'ranking'
),
'ranking_percentage': self.clean_array_data(
raw_data.get('Data_rateInSimilarPersent'), 'position'
),
'scale_fluctuation': raw_data.get('Data_fluctuationScale', {}),
'holder_structure': self.clean_holder_structure(raw_data),
'asset_allocation': self.clean_asset_allocation(raw_data),
'performance_evaluation': raw_data.get('Data_performanceEvaluation', {}),
'fund_managers': self.clean_fund_manager(raw_data),
'subscription_redemption': raw_data.get('Data_buySedemption', {}),
'same_type_funds': self.clean_same_type_funds(raw_data),
'cleaning_timestamp': datetime.now().isoformat()
}
return cleaned_data
# --- 基金 API 客户端 ---
class FundAPI:
def __init__(self):
self.headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
}
self.cleaner = FundDataCleaner()
self._fund_type_cache = None # 基金类型缓存
def _load_fund_type_cache(self):
"""加载基金类型缓存"""
if self._fund_type_cache is not None:
return self._fund_type_cache
try:
import os
base_dir = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
cache_path = os.path.join(base_dir, 'Data', 'fund_list_cache.json')
if os.path.exists(cache_path):
with open(cache_path, 'r', encoding='utf-8') as f:
data = json.load(f)
funds = data.get('funds', [])
# 构建 fund_code -> fund_type 的映射
self._fund_type_cache = {f.get('CODE'): f.get('TYPE') for f in funds if f.get('CODE')}
else:
self._fund_type_cache = {}
except Exception as e:
print(f"Error loading fund type cache: {e}")
self._fund_type_cache = {}
return self._fund_type_cache
def get_fund_data(self, fund_code: str) -> Union[Dict[str, Any], None]:
"""
获取单只基金的完整清洗后数据。
包括基本信息、业绩、持仓、净值走势等。
"""
raw_data = self._fetch_raw_data(fund_code)
if not raw_data:
return None
# 从本地缓存获取基金类型
fund_type_cache = self._load_fund_type_cache()
fund_type = fund_type_cache.get(fund_code, '')
if fund_type:
raw_data['fund_type_from_cache'] = fund_type
# 使用 cleaner 清洗数据
try:
return self.cleaner.clean_all_data(raw_data)
except Exception as e:
print(f"Error cleaning data for {fund_code}: {e}")
return None
def search_funds(self, keyword: str) -> List[Dict[str, Any]]:
"""
搜索基金(返回列表)
"""
url = "https://fundsuggest.eastmoney.com/FundSearch/api/FundSearchAPI.ashx"
params = {
'm': 1,
'key': keyword
}
try:
response = requests.get(url, params=params, headers=self.headers, timeout=5)
if response.status_code == 200:
data = response.json()
if 'Datas' in data:
# 过滤只保留基金类型的条目 (CATEGORYDESC == '基金')
funds = [item for item in data['Datas'] if item.get('CATEGORYDESC') == '基金']
return funds
return []
except Exception as e:
print(f"Search error: {e}")
return []
def _parse_js_value(self, js_content: str, start_pos: int) -> tuple:
"""
从指定位置解析 JS 值(数组、对象、字符串、数字等)
返回 (解析后的值, 结束位置)
"""
pos = start_pos
while pos < len(js_content) and js_content[pos] in ' \t\n\r':
pos += 1
if pos >= len(js_content):
return None, pos
char = js_content[pos]
# 数组
if char == '[':
depth = 1
end_pos = pos + 1
while end_pos < len(js_content) and depth > 0:
c = js_content[end_pos]
if c == '[':
depth += 1
elif c == ']':
depth -= 1
elif c == '"' or c == "'":
# 跳过字符串内容
quote = c
end_pos += 1
while end_pos < len(js_content):
if js_content[end_pos] == quote and js_content[end_pos-1] != '\\':
break
end_pos += 1
end_pos += 1
return js_content[pos:end_pos], end_pos
# 对象
elif char == '{':
depth = 1
end_pos = pos + 1
while end_pos < len(js_content) and depth > 0:
c = js_content[end_pos]
if c == '{':
depth += 1
elif c == '}':
depth -= 1
elif c == '"' or c == "'":
quote = c
end_pos += 1
while end_pos < len(js_content):
if js_content[end_pos] == quote and js_content[end_pos-1] != '\\':
break
end_pos += 1
end_pos += 1
return js_content[pos:end_pos], end_pos
# 字符串
elif char == '"' or char == "'":
quote = char
end_pos = pos + 1
while end_pos < len(js_content):
if js_content[end_pos] == quote and js_content[end_pos-1] != '\\':
end_pos += 1
break
end_pos += 1
return js_content[pos:end_pos], end_pos
# 其他(数字、布尔值等)- 读取到分号
else:
end_pos = pos
while end_pos < len(js_content) and js_content[end_pos] != ';':
end_pos += 1
return js_content[pos:end_pos].strip(), end_pos
def _fetch_raw_data(self, fund_code: str) -> Union[Dict[str, Any], None]:
"""
获取原始基金数据字典形式包含所有JS变量。
"""
data = {}
# 1. 抓取 pingzhongdata 详细数据
url = f"https://fund.eastmoney.com/pingzhongdata/{fund_code}.js"
try:
response = requests.get(url, headers=self.headers, timeout=10)
if response.status_code == 200:
js_content = response.text
# 查找所有 var xxx = 声明
var_pattern = re.compile(r'var\s+(\w+)\s*=\s*')
for match in var_pattern.finditer(js_content):
var_name = match.group(1)
value_start = match.end()
# 解析值
raw_value, _ = self._parse_js_value(js_content, value_start)
if raw_value:
try:
# 尝试 JSON 解析
if raw_value.startswith('[') or raw_value.startswith('{'):
# JS 中可能使用单引号,需要转换为双引号才能解析 JSON
json_value = raw_value.replace("'", '"')
data[var_name] = json.loads(json_value)
elif raw_value.startswith('"') and raw_value.endswith('"'):
data[var_name] = raw_value[1:-1]
elif raw_value.startswith("'") and raw_value.endswith("'"):
data[var_name] = raw_value[1:-1]
else:
data[var_name] = raw_value
except json.JSONDecodeError:
# 如果 JSON 解析失败,保留原始字符串
data[var_name] = raw_value
except Exception as e:
print(f"Error fetching detail for {fund_code}: {e}")
return None
# 2. 抓取实时估值数据 (可选,用于补充实时信息)
try:
real_time_url = f"http://fundgz.1234567.com.cn/js/{fund_code}.js"
response = requests.get(real_time_url, headers=self.headers, timeout=3)
if response.status_code == 200:
match = re.search(r"jsonpgz\((.*?)\);", response.text)
if match:
rt_data = json.loads(match.group(1))
if rt_data:
# 这里的 key 可能和 pingzhongdata 不一样,如果需要合并,要注意 key 冲突
# 暂时作为一个子字段,或者直接合并
data.update(rt_data)
except Exception:
pass # 实时数据获取失败不影响整体
if not data:
return None
# 确保 fS_code 存在
if 'fS_code' not in data:
data['fS_code'] = fund_code
return data
if __name__ == "__main__":
# 测试代码
api = FundAPI()
code = "019127"
print(f"Fetching data for {code}...")
fund_data = api.get_fund_data(code)
if fund_data:
print("\n=== Data Fetch Success ===")
print(f"Name: {fund_data['basic_info']['fund_name']}")
print(f"Manager: {len(fund_data['fund_managers'])} managers recorded")
print(f"Latest Net Worth: {fund_data['net_worth_trend'][-1] if fund_data['net_worth_trend'] else 'N/A'}")
# 保存测试数据
with open(f"fund_{code}_full.json", 'w', encoding='utf-8') as f:
json.dump(fund_data, f, ensure_ascii=False, indent=2)
print(f"Saved to fund_{code}_full.json")
else:
print("Failed to fetch data.")