Files
astrbot_plugin_Gitparser/main.py

157 lines
6.1 KiB
Python

import re
import aiohttp
from astrbot.api.event import filter, AstrMessageEvent
from astrbot.api.star import Context, Star, star
from astrbot.api import logger, AstrBotConfig
GITHUB_API_BASE = "https://api.github.com"
_REQUEST_TIMEOUT = aiohttp.ClientTimeout(total=10)
_REPO_PATTERN = re.compile(
r'(?<![a-zA-Z0-9.-])github\.com/([a-zA-Z0-9._-]+)/([a-zA-Z0-9._-]+)'
r'(?:\.git)?'
r'(?:\s|$|[^\w./-])'
)
_RELEASE_TAG_PATTERN = re.compile(
r'(?<![a-zA-Z0-9.-])github\.com/([a-zA-Z0-9._-]+)/([a-zA-Z0-9._-]+)/releases/tag/([^\s/]+)'
r'(?:\s|$|[^\w./-])'
)
_RELEASES_PAGE_PATTERN = re.compile(
r'(?<![a-zA-Z0-9.-])github\.com/([a-zA-Z0-9._-]+)/([a-zA-Z0-9._-]+)/releases'
r'(?:\s|$|[^\w./-])'
)
def _find_first_url(text: str, pattern: re.Pattern) -> re.Match | None:
return pattern.search(text)
def _check_rate_limited(data: dict | None) -> bool:
return isinstance(data, dict) and data.get("error") == "rate_limited"
@star
class GitparserPlugin(Star):
def __init__(self, context: Context, config: AstrBotConfig):
super().__init__(context)
self.config = config
token = config.get("github_token", "").strip()
self._headers = {"Accept": "application/vnd.github+json"}
if token:
self._headers["Authorization"] = f"Bearer {token}"
self._session = aiohttp.ClientSession()
async def terminate(self):
if self._session and not self._session.closed:
await self._session.close()
@filter.event_message_type(filter.EventMessageType.ALL)
async def parse_github_link(self, event: AstrMessageEvent):
text = event.message_str
m = _find_first_url(text, _RELEASE_TAG_PATTERN)
if m:
async for result in self._handle_release_by_tag(event, m.group(1), m.group(2), m.group(3)):
yield result
return
m = _find_first_url(text, _RELEASES_PAGE_PATTERN)
if m:
async for result in self._handle_latest_release(event, m.group(1), m.group(2)):
yield result
return
m = _find_first_url(text, _REPO_PATTERN)
if m:
owner, repo = m.group(1), m.group(2)
async for result in self._handle_repo(event, owner, repo):
yield result
async def _fetch_api(self, path: str) -> dict | None:
url = f"{GITHUB_API_BASE}{path}"
try:
async with self._session.get(url, headers=self._headers, timeout=_REQUEST_TIMEOUT) as resp:
if resp.status == 404:
return None
if resp.status == 429:
logger.warning(f"GitHub API rate limited: {path}")
return {"error": "rate_limited"}
if resp.status != 200:
logger.warning(f"GitHub API error {resp.status}: {path}")
return None
return await resp.json()
except aiohttp.ClientError as e:
logger.error(f"HTTP error fetching {path}: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error fetching {path}: {e}")
return None
async def _handle_repo(self, event: AstrMessageEvent, owner: str, repo: str):
data = await self._fetch_api(f"/repos/{owner}/{repo}")
if data is None:
return
if _check_rate_limited(data):
yield event.plain_result("GitHub API 限流,请稍后再试")
return
full_name = data.get("full_name", f"{owner}/{repo}")
description = data.get("description") or "(无描述)"
html_url = data.get("html_url", "")
stars = data.get("stargazers_count", 0)
forks = data.get("forks_count", 0)
watchers = data.get("watchers_count", 0)
open_issues = data.get("open_issues_count", 0)
language = data.get("language") or "未知"
created_at = data.get("created_at", "")[:10]
updated_at = data.get("updated_at", "")[:10]
license_info = data.get("license")
license_name = license_info["spdx_id"] if license_info and isinstance(license_info, dict) else ""
topics = data.get("topics", [])
lines = [
f"\U0001f4e6 {full_name}",
f"{description}",
f"\U0001f517 {html_url}",
f"\u2b50 {stars:,} \U0001f374 {forks:,} \U0001f441 {watchers:,} \u2757 {open_issues:,}",
f"\U0001f524 {language} \U0001f4c5 Updated {updated_at} \U0001f4c6 Created {created_at}",
]
if topics:
lines.append(f"\U0001f3f7 {' '.join(f'#{t}' for t in topics[:8])}")
lines.append(f"\U0001f513 {license_name}")
yield event.plain_result("\n".join(lines))
def _build_release_message(self, owner: str, repo: str, data: dict, fallback_tag: str = "unknown") -> str:
tag_name = data.get("tag_name", fallback_tag)
name = data.get("name") or tag_name
published_at = data.get("published_at", "")[:10]
zip_url = data.get("zipball_url", "")
lines = [
f"\U0001f680 {owner}/{repo} - {tag_name}",
f"\U0001f4dd {name}",
f"\U0001f4c5 发布于: {published_at}",
f"\U0001f4e6 下载: {zip_url}",
]
return "\n".join(lines)
async def _handle_latest_release(self, event: AstrMessageEvent, owner: str, repo: str):
data = await self._fetch_api(f"/repos/{owner}/{repo}/releases/latest")
if data is None:
return
if _check_rate_limited(data):
yield event.plain_result("GitHub API 限流,请稍后再试")
return
yield event.plain_result(self._build_release_message(owner, repo, data))
async def _handle_release_by_tag(self, event: AstrMessageEvent, owner: str, repo: str, tag: str):
data = await self._fetch_api(f"/repos/{owner}/{repo}/releases/tags/{tag}")
if data is None:
return
if _check_rate_limited(data):
yield event.plain_result("GitHub API 限流,请稍后再试")
return
yield event.plain_result(self._build_release_message(owner, repo, data, tag))