三层防御: 1. _pubg_lookup_tool 检测 player_name 类型,非 str 时从 AstrMessageEvent 恢复 2. _fetch_all 增加 player_name/platform 类型断言和自动转换 3. _api_request 参数清洗,过滤非 str/int/float 的值 v1.4.2
672 lines
25 KiB
Python
672 lines
25 KiB
Python
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import os
|
||
import tempfile
|
||
from dataclasses import dataclass
|
||
from datetime import datetime
|
||
from typing import TYPE_CHECKING, Optional
|
||
|
||
import aiohttp
|
||
from astrbot.core.star.star_tools import StarTools
|
||
from astrbot.api import AstrBotConfig, logger
|
||
from astrbot.api.event import filter, AstrMessageEvent
|
||
from astrbot.api.star import Context, Star, register
|
||
|
||
try:
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
PIL_OK = True
|
||
except ImportError:
|
||
PIL_OK = False
|
||
|
||
if TYPE_CHECKING:
|
||
from PIL import ImageDraw as IDraw
|
||
from PIL.ImageFont import FreeTypeFont, ImageFont as FontClass
|
||
|
||
_MAP_NAMES = {
|
||
"Baltic_Main": "Erangel",
|
||
"Desert_Main": "Miramar",
|
||
"Savage_Main": "Sanhok",
|
||
"DihorOtok_Main": "Vikendi",
|
||
"Summerland_Main": "Karakin",
|
||
"Tiger_Main": "Taego",
|
||
"Kiki_Main": "Deston",
|
||
"Neon_Main": "Rondo",
|
||
"Range_Main": "Camp Jackal",
|
||
"Chimera_Main": "Paramo",
|
||
"Heaven_Main": "Haven",
|
||
}
|
||
|
||
_MODE_LABELS = {
|
||
"squad-fpp": "四排FPP",
|
||
"squad": "四排TPP",
|
||
"duo-fpp": "双排FPP",
|
||
"duo": "双排TPP",
|
||
"solo-fpp": "单排FPP",
|
||
"solo": "单排TPP",
|
||
}
|
||
|
||
_BAN_STATUS_LABELS = {
|
||
"PermanentBan": "永久封禁",
|
||
"TemporaryBan": "临时封禁",
|
||
"Banned": "封禁中",
|
||
}
|
||
|
||
# ── 颜色主题 ──
|
||
BG = (15, 20, 30)
|
||
CARD = (25, 32, 48)
|
||
ACCENT = (255, 180, 30)
|
||
ACCENT2 = (80, 160, 255)
|
||
WHITE = (240, 240, 240)
|
||
GRAY = (140, 150, 170)
|
||
WIN_CLR = (80, 220, 120)
|
||
SEP = (40, 50, 70)
|
||
BAN_CLR = (255, 70, 70)
|
||
WARN_CLR = (255, 200, 50)
|
||
|
||
PAD = 32
|
||
COL_W = 560
|
||
FONT_DIR = os.path.join(os.path.dirname(__file__), "fonts")
|
||
|
||
API_TIMEOUT = 15
|
||
API_MAX_RETRY = 2
|
||
MATCH_LIMIT = 7
|
||
|
||
|
||
@dataclass
|
||
class PlayerInfo:
|
||
id: str
|
||
name: str
|
||
platform: str
|
||
ban_type: Optional[str]
|
||
|
||
|
||
def _load_font(size: int, bold: bool = False) -> FreeTypeFont | FontClass:
|
||
# 1. Scan local fonts/ directory for any .ttf/.ttc
|
||
if os.path.isdir(FONT_DIR):
|
||
for fname in sorted(os.listdir(FONT_DIR)):
|
||
if fname.lower().endswith((".ttf", ".ttc", ".otf")):
|
||
path = os.path.join(FONT_DIR, fname)
|
||
try:
|
||
return ImageFont.truetype(path, size)
|
||
except Exception:
|
||
continue
|
||
|
||
# 2. Hardcoded well-known paths
|
||
candidates = [
|
||
"C:/Windows/Fonts/msyhbd.ttc" if bold else "C:/Windows/Fonts/msyh.ttc",
|
||
"C:/Windows/Fonts/simhei.ttf" if bold else "C:/Windows/Fonts/simsun.ttc",
|
||
"/usr/share/fonts/truetype/noto/NotoSansCJK-Bold.ttc" if bold else "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
|
||
"/usr/share/fonts/opentype/noto/NotoSansCJK-Bold.ttc" if bold else "/usr/share/fonts/opentype/noto/NotoSansCJK-Regular.ttc",
|
||
"/usr/share/fonts/truetype/noto/NotoSerifCJK-Bold.ttc" if bold else "/usr/share/fonts/truetype/noto/NotoSerifCJK-Regular.ttc",
|
||
"/usr/share/fonts/opentype/noto/NotoSerifCJK-Bold.ttc" if bold else "/usr/share/fonts/opentype/noto/NotoSerifCJK-Regular.ttc",
|
||
]
|
||
for path in candidates:
|
||
if os.path.exists(path):
|
||
try:
|
||
return ImageFont.truetype(path, size, encoding="unic")
|
||
except Exception:
|
||
continue
|
||
# 3. Dynamic fontconfig lookup (Linux) — try specific CJK families first
|
||
try:
|
||
import subprocess
|
||
for pattern in (
|
||
"Noto Sans CJK SC:lang=zh",
|
||
"Noto Sans CJK:lang=zh",
|
||
"WenQuanYi Micro Hei:lang=zh",
|
||
"WenQuanYi Zen Hei:lang=zh",
|
||
"Droid Sans Fallback:lang=zh",
|
||
"sans:lang=zh",
|
||
):
|
||
result = subprocess.run(
|
||
["fc-match", "-f", "%{file}", pattern],
|
||
capture_output=True, text=True, timeout=5,
|
||
)
|
||
path = result.stdout.strip()
|
||
if path:
|
||
try:
|
||
return ImageFont.truetype(path, size, encoding="unic")
|
||
except Exception:
|
||
continue
|
||
# 4. fc-list :lang=zh — enumerate any available CJK font
|
||
result = subprocess.run(
|
||
["fc-list", ":lang=zh", "-f", "%{file}\n"],
|
||
capture_output=True, text=True, timeout=5,
|
||
)
|
||
if result.returncode == 0:
|
||
for line in result.stdout.strip().splitlines():
|
||
path = line.strip()
|
||
if path:
|
||
try:
|
||
return ImageFont.truetype(path, size, encoding="unic")
|
||
except Exception:
|
||
continue
|
||
except Exception:
|
||
pass
|
||
|
||
return ImageFont.load_default()
|
||
|
||
|
||
def _text_w(draw: IDraw.ImageDraw, text: str, font: FreeTypeFont | FontClass) -> int:
|
||
bbox = draw.textbbox((0, 0), text, font=font)
|
||
return bbox[2] - bbox[0]
|
||
|
||
def _truncate_text(draw: IDraw.ImageDraw, text: str, font: FreeTypeFont | FontClass, max_px: int) -> str:
|
||
if _text_w(draw, text, font) <= max_px:
|
||
return text
|
||
while text and _text_w(draw, text + "…", font) > max_px:
|
||
text = text[:-1]
|
||
return text + "…"
|
||
|
||
|
||
def _render_image(
|
||
name: str,
|
||
platform: str,
|
||
gm_stats: dict,
|
||
player_id: str,
|
||
match_results: list,
|
||
ban_type: Optional[str] = None,
|
||
) -> str:
|
||
mode_rows = []
|
||
for mode_key, mode_label in _MODE_LABELS.items():
|
||
s = gm_stats.get(mode_key, {})
|
||
if s.get("roundsPlayed", 0) == 0:
|
||
continue
|
||
mode_rows.append((mode_label, s))
|
||
|
||
match_rows = []
|
||
for match_data in match_results:
|
||
entry = _parse_match(match_data, player_id)
|
||
if entry:
|
||
match_rows.append(entry)
|
||
|
||
is_banned = ban_type is not None
|
||
ban_label = _BAN_STATUS_LABELS.get(ban_type, ban_type) if ban_type else None
|
||
|
||
H_HEADER = 90
|
||
H_BAN_BAR = 32 if is_banned else 0
|
||
H_SEC_TITLE = 44
|
||
H_MODE_ROW = 110
|
||
H_MATCH_ROW = 80
|
||
H_FOOTER = 36
|
||
|
||
total_h = (
|
||
PAD + H_HEADER + PAD + H_BAN_BAR
|
||
+ H_SEC_TITLE + len(mode_rows) * (H_MODE_ROW + 10)
|
||
+ (PAD // 2 + H_SEC_TITLE + len(match_rows) * (H_MATCH_ROW + 8) if match_rows else 0)
|
||
+ H_FOOTER + PAD
|
||
)
|
||
|
||
W = COL_W + PAD * 2
|
||
img = Image.new("RGB", (W, total_h), BG)
|
||
draw = ImageDraw.Draw(img)
|
||
|
||
f_big = _load_font(28, bold=True)
|
||
f_med = _load_font(20, bold=True)
|
||
f_norm = _load_font(18)
|
||
f_small = _load_font(15)
|
||
|
||
y = PAD
|
||
draw.rectangle([PAD, y, W - PAD, y + H_HEADER - 10], fill=CARD, outline=ACCENT, width=2)
|
||
name_max_w = COL_W - 36 - _text_w(draw, "PUBG 战绩", f_med) - 28
|
||
draw.text((PAD + 18, y + 14), _truncate_text(draw, name, f_big, name_max_w), font=f_big, fill=ACCENT)
|
||
draw.text((PAD + 18, y + 50), f"[{platform.upper()}]", font=f_norm, fill=GRAY)
|
||
draw.text((W - PAD - 18 - _text_w(draw, "PUBG 战绩", f_med), y + 28), "PUBG 战绩", font=f_med, fill=ACCENT2)
|
||
y += H_HEADER + PAD // 2
|
||
|
||
if is_banned:
|
||
perm = ban_type in ("PermanentBan", "Banned")
|
||
ban_color = BAN_CLR if perm else WARN_CLR
|
||
draw.rectangle([PAD, y, W - PAD, y + H_BAN_BAR], fill=(40, 20, 20) if perm else (40, 35, 10))
|
||
draw.rectangle([PAD, y, PAD + 8, y + H_BAN_BAR], fill=ban_color)
|
||
draw.text((PAD + 18, y + 4), f"⚠ 账号状态: {ban_label}", font=f_norm, fill=ban_color)
|
||
y += H_BAN_BAR
|
||
|
||
draw.text((PAD, y), "◆ 终身战绩", font=f_med, fill=ACCENT)
|
||
draw.line([(PAD, y + 30), (W - PAD, y + 30)], fill=ACCENT, width=1)
|
||
y += H_SEC_TITLE
|
||
|
||
for mode_label, s in mode_rows:
|
||
rounds = s.get("roundsPlayed", 0)
|
||
wins = s.get("wins", 0)
|
||
top10 = s.get("top10s", 0)
|
||
kills = s.get("kills", 0)
|
||
assists = s.get("assists", 0)
|
||
damage = s.get("damageDealt", 0.0)
|
||
survived = s.get("timeSurvived", 0.0)
|
||
|
||
kd = kills / rounds if rounds else 0
|
||
win_pct = wins / rounds * 100
|
||
top10_pct = top10 / rounds * 100
|
||
avg_dmg = damage / rounds
|
||
avg_min = survived / rounds / 60
|
||
|
||
draw.rectangle([PAD, y, W - PAD, y + H_MODE_ROW], fill=CARD)
|
||
draw.rectangle([PAD, y, PAD + 8, y + H_MODE_ROW], fill=ACCENT2)
|
||
draw.text((PAD + 16, y + 10), mode_label, font=f_med, fill=WHITE)
|
||
|
||
col1_x = PAD + 16
|
||
col2_x = PAD + 16 + (COL_W // 3)
|
||
col3_x = PAD + 16 + (COL_W // 3) * 2
|
||
row2_y = y + 42
|
||
row3_y = y + 72
|
||
|
||
draw.text((col1_x, row2_y), f"场次 {rounds}", font=f_norm, fill=GRAY)
|
||
draw.text((col2_x, row2_y), f"胜场 {wins} ({win_pct:.1f}%)", font=f_norm, fill=WIN_CLR)
|
||
draw.text((col3_x, row2_y), f"Top10 {top10} ({top10_pct:.1f}%)", font=f_norm, fill=GRAY)
|
||
draw.text((col1_x, row3_y), f"K/D {kd:.2f}", font=f_norm, fill=ACCENT)
|
||
draw.text((col2_x, row3_y), f"场均伤害 {avg_dmg:.0f}", font=f_norm, fill=WHITE)
|
||
draw.text((col3_x, row3_y), f"场均存活 {avg_min:.1f}min", font=f_norm, fill=GRAY)
|
||
|
||
y += H_MODE_ROW + 10
|
||
|
||
if match_rows:
|
||
y += PAD // 2
|
||
draw.text((PAD, y), "◆ 最近对局", font=f_med, fill=ACCENT)
|
||
draw.line([(PAD, y + 30), (W - PAD, y + 30)], fill=ACCENT, width=1)
|
||
y += H_SEC_TITLE
|
||
|
||
for idx, entry in enumerate(match_rows, 1):
|
||
is_win = entry["place"] == 1
|
||
card_color = (30, 55, 35) if is_win else CARD
|
||
draw.rectangle([PAD, y, W - PAD, y + H_MATCH_ROW], fill=card_color)
|
||
draw.rectangle([PAD, y, PAD + 8, y + H_MATCH_ROW], fill=(WIN_CLR if is_win else ACCENT2))
|
||
|
||
rank_text = "#1" if is_win else f"#{entry['place']}"
|
||
rank_color = WIN_CLR if is_win else WHITE
|
||
|
||
header_line = f"[{idx}] {entry['date']} {entry['mode']} {entry['map']}"
|
||
draw.text((PAD + 16, y + 8), header_line, font=f_small, fill=GRAY)
|
||
draw.text((W - PAD - 18 - _text_w(draw, rank_text, f_med), y + 6), rank_text, font=f_med, fill=rank_color)
|
||
|
||
stats_line = (
|
||
f"击杀 {entry['kills']} 伤害 {entry['damage']:.0f} "
|
||
f"助攻 {entry['assists']} 爆头 {entry['headshots']} "
|
||
f"最远 {entry['longest']:.0f}m 存活 {entry['survive']:.1f}min"
|
||
)
|
||
draw.text((PAD + 16, y + 36), stats_line, font=f_norm, fill=WHITE)
|
||
|
||
y += H_MATCH_ROW + 8
|
||
|
||
y += PAD // 2
|
||
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
|
||
draw.text((PAD, y), f"数据来源: api.pubg.com · {ts}", font=f_small, fill=SEP)
|
||
|
||
buf = tempfile.NamedTemporaryFile(suffix=".png", delete=False)
|
||
try:
|
||
img.save(buf.name, format="PNG")
|
||
except Exception:
|
||
buf.close()
|
||
os.remove(buf.name)
|
||
raise
|
||
buf.close()
|
||
return buf.name
|
||
|
||
|
||
def _render_text(
|
||
name: str,
|
||
platform: str,
|
||
gm_stats: dict,
|
||
player_id: str,
|
||
match_results: list,
|
||
ban_type: Optional[str] = None,
|
||
) -> str:
|
||
W = 38
|
||
bar = "─" * W
|
||
lines = [
|
||
"┌" + "─" * W + "┐",
|
||
"│" + f" {name} [{platform.upper()}]".center(W) + "│",
|
||
"└" + "─" * W + "┘",
|
||
]
|
||
|
||
if ban_type:
|
||
ban_label = _BAN_STATUS_LABELS.get(ban_type, ban_type)
|
||
lines += ["", f"⚠ 账号状态: {ban_label}", ""]
|
||
else:
|
||
lines += ["", f"✓ 账号状态: 正常", ""]
|
||
|
||
lines += ["", "◆ 终身战绩", bar]
|
||
|
||
has_any = False
|
||
for mode_key, mode_label in _MODE_LABELS.items():
|
||
s = gm_stats.get(mode_key, {})
|
||
rounds = s.get("roundsPlayed", 0)
|
||
if rounds == 0:
|
||
continue
|
||
has_any = True
|
||
wins = s.get("wins", 0)
|
||
top10 = s.get("top10s", 0)
|
||
kills = s.get("kills", 0)
|
||
assists = s.get("assists", 0)
|
||
damage = s.get("damageDealt", 0.0)
|
||
kd = kills / rounds if rounds else 0
|
||
lines += [
|
||
f"▌{mode_label}",
|
||
f" 场次 {rounds} 胜场 {wins}({wins/rounds*100:.1f}%) Top10 {top10}({top10/rounds*100:.1f}%)",
|
||
f" 击杀 {kills} 助攻 {assists} K/D {kd:.2f}",
|
||
f" 场均伤害 {damage/rounds:.0f} 场均存活 {s.get('timeSurvived',0)/rounds/60:.1f}min",
|
||
"",
|
||
]
|
||
if not has_any:
|
||
lines += [" 暂无战绩数据", ""]
|
||
|
||
if match_results:
|
||
lines += ["◆ 最近对局", bar]
|
||
for idx, md in enumerate(match_results, 1):
|
||
e = _parse_match(md, player_id)
|
||
if not e:
|
||
continue
|
||
tag = "吃鸡" if e["place"] == 1 else f"#{e['place']}"
|
||
lines += [
|
||
f"[{idx}] {e['date']} {e['mode']} {e['map']}",
|
||
f" 排名 {tag} 击杀 {e['kills']} 伤害 {e['damage']:.0f}",
|
||
f" 助攻 {e['assists']} 爆头 {e['headshots']} 最远 {e['longest']:.0f}m 存活 {e['survive']:.1f}min",
|
||
"",
|
||
]
|
||
|
||
return "\n".join(lines).rstrip()
|
||
|
||
|
||
def _parse_match(match_data: dict, player_id: str) -> Optional[dict]:
|
||
try:
|
||
attrs = match_data["data"]["attributes"]
|
||
map_name = _MAP_NAMES.get(attrs.get("mapName", ""), attrs.get("mapName", ""))
|
||
mode_label = _MODE_LABELS.get(attrs.get("gameMode", ""), attrs.get("gameMode", ""))
|
||
date_str = _fmt_date(attrs.get("createdAt", ""))
|
||
|
||
for item in match_data.get("included", []):
|
||
if item.get("type") != "participant":
|
||
continue
|
||
stats = item["attributes"]["stats"]
|
||
if stats.get("playerId") != player_id:
|
||
continue
|
||
return {
|
||
"date": date_str, "mode": mode_label, "map": map_name,
|
||
"place": stats.get("winPlace", 0),
|
||
"kills": stats.get("kills", 0),
|
||
"assists": stats.get("assists", 0),
|
||
"damage": stats.get("damageDealt", 0.0),
|
||
"headshots": stats.get("headshotKills", 0),
|
||
"longest": stats.get("longestKill", 0.0),
|
||
"survive": stats.get("timeSurvived", 0.0) / 60,
|
||
}
|
||
except (KeyError, TypeError):
|
||
pass
|
||
return None
|
||
|
||
|
||
def _fmt_date(iso: str) -> str:
|
||
try:
|
||
dt = datetime.fromisoformat(iso.replace("Z", "+00:00"))
|
||
return dt.astimezone().strftime("%m-%d %H:%M")
|
||
except Exception:
|
||
return iso[:10]
|
||
|
||
|
||
async def _api_request(
|
||
session: aiohttp.ClientSession,
|
||
url: str,
|
||
params: Optional[dict] = None,
|
||
retry: int = 0,
|
||
request_timeout: int = 10,
|
||
) -> dict:
|
||
# 参数清洗:仅保留 str / int / float 类型的值,过滤 event 等非法对象
|
||
if params is not None:
|
||
params = {
|
||
k: v for k, v in params.items()
|
||
if isinstance(v, (str, int, float))
|
||
}
|
||
for attempt in range(retry + 1):
|
||
try:
|
||
async with session.get(
|
||
url,
|
||
params=params,
|
||
timeout=aiohttp.ClientTimeout(total=request_timeout),
|
||
) as resp:
|
||
if resp.status == 404:
|
||
raise PubgApiError("资源不存在 (404)")
|
||
if resp.status == 401:
|
||
raise PubgApiError("API Key 无效或已过期,请检查配置。")
|
||
if resp.status == 403:
|
||
raise PubgApiError("无权限访问该资源,请检查 API Key 权限。")
|
||
if resp.status == 429:
|
||
if attempt < retry:
|
||
wait = 2 ** (attempt + 1)
|
||
logger.warning(f"[pubg_plugin] 触发限流,{wait}s 后重试…")
|
||
await asyncio.sleep(wait)
|
||
continue
|
||
raise PubgApiError("请求过于频繁,请稍后再试。")
|
||
if resp.status != 200:
|
||
raise PubgApiError(f"API 请求失败 (HTTP {resp.status})")
|
||
return await resp.json(content_type=None)
|
||
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
|
||
if attempt < retry:
|
||
wait = 2 ** (attempt + 1)
|
||
logger.warning(f"[pubg_plugin] 请求异常 ({e}),{wait}s 后重试…")
|
||
await asyncio.sleep(wait)
|
||
continue
|
||
raise PubgApiError(f"网络请求失败: {e}")
|
||
raise PubgApiError("请求失败(已达最大重试次数)")
|
||
|
||
|
||
@register(
|
||
"astrbot_plugin_pubg",
|
||
"sakuradairong",
|
||
"PUBG 玩家战绩查询插件",
|
||
"1.4.2",
|
||
"https://github.com/sakuradairong/astrbot_plugin_pubg",
|
||
)
|
||
class PubgPlugin(Star):
|
||
def __init__(self, context: Context, config: AstrBotConfig | None = None):
|
||
super().__init__(context)
|
||
self.config = config
|
||
self.api_base = "https://api.pubg.com/shards"
|
||
logger.info(f"[pubg_plugin] 插件已加载,Pillow={'可用' if PIL_OK else '不可用(将回退为文字输出)'}")
|
||
if not PIL_OK:
|
||
logger.warning("[pubg_plugin] 未安装 Pillow,将回退为文字输出。pip install Pillow")
|
||
# Register as LLM function-calling tool for agent mode
|
||
try:
|
||
StarTools.register_llm_tool(
|
||
name="pubg_query",
|
||
func_args=[
|
||
{"type": "string", "name": "player_name", "description": "PUBG 玩家游戏昵称,必须精确匹配"},
|
||
{"type": "string", "name": "platform", "description": "游戏平台,可选: steam(默认), psn, xbox, kakao, stadia"},
|
||
],
|
||
desc="查询 PUBG 玩家终身战绩和最近对局数据,包括 K/D、胜率、场均伤害、场均存活时间。当用户要求查询 PUBG 战绩时调用此工具。",
|
||
func_obj=self._pubg_lookup_tool,
|
||
)
|
||
logger.info("[pubg_plugin] 已注册 LLM 工具 pubg_query")
|
||
except Exception as e:
|
||
logger.warning(f"[pubg_plugin] 注册 LLM 工具失败: {e}")
|
||
|
||
def _get_api_key(self) -> str:
|
||
if self.config is None:
|
||
return ""
|
||
return self.config.get("api_key", "")
|
||
|
||
def _get_platform(self) -> str:
|
||
if self.config is None:
|
||
return "steam"
|
||
return self.config.get("default_platform", "steam")
|
||
|
||
@filter.command("pubg", alias={"查ID", "查询"})
|
||
async def query_stats(self, event: AstrMessageEvent):
|
||
logger.info(f"[pubg_plugin] query_stats 被调用: message={event.message_str!r}")
|
||
parts = event.message_str.strip().split()
|
||
if len(parts) < 2:
|
||
yield event.plain_result(
|
||
"用法: /pubg <玩家名> [平台]\n"
|
||
"平台可选: steam(默认) | psn | xbox | kakao | stadia\n"
|
||
"示例: /pubg shroud steam"
|
||
)
|
||
return
|
||
|
||
player_name = parts[1]
|
||
platform = parts[2].lower() if len(parts) > 2 else self._get_platform()
|
||
logger.info(f"[pubg_plugin] 开始查询: player={player_name}, platform={platform}")
|
||
|
||
valid_platforms = {"steam", "psn", "xbox", "kakao", "stadia"}
|
||
if platform not in valid_platforms:
|
||
yield event.plain_result(
|
||
f"不支持的平台: {platform}\n可选: {', '.join(sorted(valid_platforms))}"
|
||
)
|
||
return
|
||
|
||
api_key = self._get_api_key()
|
||
if not api_key:
|
||
yield event.plain_result(
|
||
"未配置 PUBG API Key,请在插件配置中填写 api_key。\n"
|
||
"申请地址: https://developer.pubg.com/"
|
||
)
|
||
return
|
||
|
||
yield event.plain_result(f"正在查询 {player_name} 的战绩,请稍候…")
|
||
|
||
tmp_path = None
|
||
try:
|
||
player_info, gm_stats, match_results = await self._fetch_all(player_name, platform, api_key)
|
||
|
||
if PIL_OK:
|
||
tmp_path = _render_image(
|
||
player_info.name, player_info.platform,
|
||
gm_stats, player_info.id, match_results,
|
||
ban_type=player_info.ban_type,
|
||
)
|
||
yield event.image_result(tmp_path)
|
||
else:
|
||
text = _render_text(
|
||
player_info.name, player_info.platform,
|
||
gm_stats, player_info.id, match_results,
|
||
ban_type=player_info.ban_type,
|
||
)
|
||
yield event.plain_result(text)
|
||
|
||
except PubgApiError as e:
|
||
yield event.plain_result(str(e))
|
||
except Exception as e:
|
||
logger.error(f"[pubg_plugin] 查询异常: {e}")
|
||
yield event.plain_result("查询时发生未知错误,请稍后重试。")
|
||
finally:
|
||
if tmp_path and os.path.exists(tmp_path):
|
||
try:
|
||
os.remove(tmp_path)
|
||
except Exception:
|
||
pass
|
||
|
||
async def _pubg_lookup_tool(self, event, player_name: str = "", platform: str = "steam") -> str:
|
||
"""Agent 可调用的 PUBG 查询工具函数。"""
|
||
# === 参数防御:确保 player_name 是纯字符串,而非 event 对象 ===
|
||
if not isinstance(player_name, str):
|
||
logger.warning(f"[pubg_plugin] _pubg_lookup_tool: player_name 类型异常 ({type(player_name).__name__}), 尝试修复")
|
||
if isinstance(player_name, AstrMessageEvent):
|
||
# event 被当作 player_name 传入,尝试从消息中提取玩家名
|
||
raw = player_name.message_str.strip()
|
||
parts = raw.split()
|
||
# 格式可能为 "/pubg <name>"、"查询 <name>" 或直接 "<name>"
|
||
if len(parts) >= 2 and parts[0] in ("/pubg", "pubg", "查ID", "查询"):
|
||
player_name = parts[1]
|
||
elif len(parts) >= 1:
|
||
player_name = parts[0]
|
||
else:
|
||
return "请提供玩家昵称。用法: /pubg <玩家名> [平台]"
|
||
else:
|
||
player_name = str(player_name)
|
||
# 确保 platform 是字符串
|
||
if not isinstance(platform, str):
|
||
platform = str(platform) if isinstance(platform, (int, float)) else "steam"
|
||
# 校验 player_name 非空
|
||
if not player_name.strip():
|
||
return "玩家昵称不能为空。用法: /pubg <玩家名> [平台]"
|
||
api_key = self._get_api_key()
|
||
if not api_key:
|
||
return "未配置 PUBG API Key,请在插件配置中填写 api_key。"
|
||
try:
|
||
info, stats, matches = await self._fetch_all(player_name, platform, api_key)
|
||
return _render_text(info.name, info.platform, stats, info.id, matches, info.ban_type)
|
||
except PubgApiError as e:
|
||
return str(e)
|
||
|
||
async def _fetch_all(self, player_name: str, platform: str, api_key: str):
|
||
# 参数防御:确保 player_name 和 platform 是字符串
|
||
if not isinstance(player_name, str):
|
||
logger.warning(f"[pubg_plugin] _fetch_all: player_name 类型异常 ({type(player_name).__name__}), 强制转换")
|
||
player_name = str(player_name)
|
||
if not isinstance(platform, str):
|
||
platform = str(platform)
|
||
headers = {
|
||
"Authorization": f"Bearer {api_key}",
|
||
"Accept": "application/vnd.api+json",
|
||
}
|
||
|
||
async with aiohttp.ClientSession(headers=headers) as session:
|
||
player_data = await _api_request(
|
||
session,
|
||
f"{self.api_base}/{platform}/players",
|
||
params={"filter[playerNames]": player_name},
|
||
retry=API_MAX_RETRY,
|
||
)
|
||
|
||
if not player_data.get("data"):
|
||
raise PubgApiError(f"找不到玩家: {player_name}(平台: {platform})")
|
||
|
||
player = player_data["data"][0]
|
||
player_id = player["id"]
|
||
player_name_real = player["attributes"]["name"]
|
||
raw_ban = player["attributes"].get("banType")
|
||
ban_type = raw_ban if raw_ban and raw_ban not in ("Innocent", "NotBanned") else None
|
||
|
||
logger.info(f"[pubg_plugin] 玩家 {player_name_real} banType={ban_type}")
|
||
|
||
match_ids = [
|
||
m["id"]
|
||
for m in player.get("relationships", {})
|
||
.get("matches", {})
|
||
.get("data", [])
|
||
][:MATCH_LIMIT]
|
||
logger.info(f"[pubg_plugin] 获取该玩家的 {len(match_ids)} 场对局详情")
|
||
|
||
results = await asyncio.gather(
|
||
_api_request(
|
||
session,
|
||
f"{self.api_base}/{platform}/players/{player_id}/seasons/lifetime",
|
||
retry=API_MAX_RETRY,
|
||
),
|
||
*[
|
||
_api_request(
|
||
session,
|
||
f"{self.api_base}/{platform}/matches/{mid}",
|
||
retry=API_MAX_RETRY,
|
||
)
|
||
for mid in match_ids
|
||
],
|
||
return_exceptions=True,
|
||
)
|
||
|
||
lifetime_data = results[0]
|
||
match_results_raw = results[1:]
|
||
|
||
if isinstance(lifetime_data, Exception):
|
||
if isinstance(lifetime_data, PubgApiError):
|
||
raise lifetime_data
|
||
raise PubgApiError(f"获取生涯数据失败: {lifetime_data}")
|
||
|
||
match_results = [
|
||
r for r in match_results_raw
|
||
if not isinstance(r, Exception)
|
||
]
|
||
if match_results_raw and not match_results:
|
||
logger.warning("[pubg_plugin] 所有对局详情获取失败,仅显示生涯数据")
|
||
|
||
gm_stats = lifetime_data["data"]["attributes"]["gameModeStats"]
|
||
player_info = PlayerInfo(
|
||
id=player_id,
|
||
name=player_name_real,
|
||
platform=platform,
|
||
ban_type=ban_type,
|
||
)
|
||
return player_info, gm_stats, match_results
|
||
|
||
|
||
class PubgApiError(Exception):
|
||
pass
|