根因:linux.do 登录表单启用 hCaptcha 人机验证,自动化浏览器无法通过, 旧自动登录永远不可能成功;而抓取的 _forum_session 是匿名会话本就存在的 cookie,误报「自动登录成功」,导致受限主题一直 404。 修复: - 移除无效的账号密码自动登录(_auto_login_and_capture) - Cookie 改为每个 StealthySession 会话都重新注入(旧代码跨请求丢失) - 登录校验端点改用 /notifications.json(匿名 403 / 登录 200), 弃用对匿名也返回 404 的 /session/current_user.json - Cookie 配置支持多格式:完整 Cookie 头、单 name=value、裸值(向后兼容) linuxdo_username/password 保留仅为兼容,不再生效。
1182 lines
45 KiB
Python
1182 lines
45 KiB
Python
"""
|
||
astrbot_plugin_linuxdo - LinuxDo 链接检测 & 预览截图插件
|
||
|
||
检测聊天消息中的 linux.do 链接,使用 Scrapling 的 StealthySession
|
||
绕过 Cloudflare Turnstile,分两步:先 fetch 拿文本 + cookies,
|
||
再新建标签页截图(复用 cf_clearance,不重复触发验证)。
|
||
"""
|
||
|
||
import re
|
||
import asyncio
|
||
import time
|
||
import hashlib
|
||
from pathlib import Path
|
||
from concurrent.futures import ThreadPoolExecutor
|
||
import html as html_mod
|
||
import threading
|
||
|
||
from astrbot.core.utils.astrbot_path import get_astrbot_data_path
|
||
|
||
from astrbot.api.event import filter, AstrMessageEvent
|
||
from astrbot.api.star import Context, Star
|
||
from astrbot.api import logger
|
||
from astrbot.api import AstrBotConfig
|
||
|
||
try:
|
||
from scrapling.fetchers import StealthySession as _StealthySession
|
||
from lxml import html as _lh
|
||
SCRAPLING_AVAILABLE = True
|
||
except ImportError:
|
||
SCRAPLING_AVAILABLE = False
|
||
_StealthySession = None
|
||
_lh = None
|
||
|
||
_EXECUTOR = ThreadPoolExecutor(max_workers=2, thread_name_prefix="linuxdo")
|
||
|
||
|
||
class LinuxDoPreviewPlugin(Star):
|
||
"""LinuxDo 链接预览插件"""
|
||
|
||
def __init__(self, context: Context, config: AstrBotConfig):
|
||
super().__init__(context)
|
||
self.config = config
|
||
|
||
self.data_dir = Path(get_astrbot_data_path()) / "plugin_data" / "astrbot_plugin_linuxdo"
|
||
self.screenshot_dir = self.data_dir / "screenshots"
|
||
self.screenshot_dir.mkdir(parents=True, exist_ok=True)
|
||
|
||
logger.info(f"[LinuxDoPreview] 插件已加载,截图目录: {self.screenshot_dir}")
|
||
if not SCRAPLING_AVAILABLE:
|
||
logger.warning(
|
||
"[LinuxDoPreview] scrapling 未安装!"
|
||
"执行: pip install scrapling[fetchers] && scrapling install && playwright install-deps chromium"
|
||
)
|
||
|
||
self._stats = {"total": 0, "cache_hit": 0, "error": 0}
|
||
self._stats_lock = threading.Lock()
|
||
|
||
# 登录状态:跨 fetch 复用在同一 StealthySession 中
|
||
self._auth_check_done = False
|
||
self._logged_in = False
|
||
|
||
async def terminate(self):
|
||
_EXECUTOR.shutdown(wait=False)
|
||
logger.info("[LinuxDoPreview] 插件已卸载")
|
||
|
||
# ─────────── 消息入口 ───────────
|
||
|
||
@filter.event_message_type(filter.EventMessageType.ALL)
|
||
async def on_message(self, event: AstrMessageEvent):
|
||
text = event.message_str
|
||
if not text:
|
||
return
|
||
|
||
urls = re.finditer(
|
||
r"https?://(?:[a-z0-9.\-]+\.)*linux\.do/[^\s\"')>}]+",
|
||
text,
|
||
re.IGNORECASE,
|
||
)
|
||
matched_urls = [m.group(0).rstrip(".,;:!?") for m in urls]
|
||
if not matched_urls:
|
||
return
|
||
|
||
target_url = matched_urls[0]
|
||
logger.info(f"[LinuxDoPreview] 检测到链接: {target_url}")
|
||
|
||
if self._should_skip(target_url):
|
||
return
|
||
|
||
yield event.plain_result("🔍 正在读取 linux.do 页面…")
|
||
|
||
try:
|
||
screenshot_path, summary = await asyncio \
|
||
.get_event_loop() \
|
||
.run_in_executor(_EXECUTOR, self._fetch_preview, target_url)
|
||
|
||
if screenshot_path and screenshot_path.exists():
|
||
yield event.image_result(str(screenshot_path.absolute()))
|
||
|
||
if summary:
|
||
yield event.plain_result(summary)
|
||
|
||
with self._stats_lock:
|
||
self._stats["total"] += 1
|
||
|
||
except Exception as e:
|
||
with self._stats_lock:
|
||
self._stats["error"] += 1
|
||
logger.error(f"[LinuxDoPreview] 预览失败: {type(e).__name__}: {e}")
|
||
yield event.plain_result(f"❌ 预览获取失败: {str(e)[:200]}")
|
||
|
||
# ─────────── 预处理 ───────────
|
||
|
||
@staticmethod
|
||
def _should_skip(url: str) -> bool:
|
||
skip = [
|
||
r"linux\.do/?$", r"linux\.do/latest", r"linux\.do/categories",
|
||
r"linux\.do/tag/", r"linux\.do/u/", r"linux\.do/my/",
|
||
]
|
||
return any(re.search(p, url, re.IGNORECASE) for p in skip)
|
||
|
||
# ─────────── 核心:两步法 ───────────
|
||
|
||
def _fetch_preview(self, url: str):
|
||
if not SCRAPLING_AVAILABLE:
|
||
raise RuntimeError("Scrapling 未安装")
|
||
|
||
url_hash = hashlib.md5(url.encode()).hexdigest()
|
||
screenshot_path = self.screenshot_dir / f"{url_hash}.png"
|
||
cache_ttl = self.config.get("cache_ttl", 1800)
|
||
screenshot_is_valid = False
|
||
if screenshot_path.exists():
|
||
sz = screenshot_path.stat().st_size
|
||
age = time.time() - screenshot_path.stat().st_mtime
|
||
screenshot_is_valid = (
|
||
cache_ttl > 0
|
||
and age < cache_ttl
|
||
and sz > 50 * 1024 # 小于 50KB 的截图视为无效(黑屏/空白)
|
||
)
|
||
|
||
use_api_render = self.config.get("use_api_render", True)
|
||
|
||
with _StealthySession( # type: ignore[union-attr]
|
||
headless=True, solve_cloudflare=True
|
||
) as session:
|
||
# ── 可选:按需登录以访问受限内容 ──
|
||
self._ensure_authenticated(session)
|
||
if use_api_render:
|
||
# ── 方案 A:API + 自定义 HTML 渲染(推荐)──
|
||
topic_data = self._fetch_topic_data(session, url)
|
||
title = self._safe_title(topic_data)
|
||
if topic_data:
|
||
content = self._extract_content_from_topic_data(topic_data)
|
||
if not screenshot_is_valid:
|
||
html = self._build_preview_html(topic_data, url)
|
||
if html:
|
||
screenshot_path = self._render_html_screenshot(
|
||
session, html, screenshot_path
|
||
)
|
||
else:
|
||
# API 拉取失败 → 回退原方案
|
||
resp = session.fetch(url)
|
||
html_str = resp.body.decode("utf-8", errors="replace")
|
||
title = self._extract_title(html_str)
|
||
content = self._extract_content(html_str)
|
||
if not screenshot_is_valid:
|
||
screenshot_path = self._take_screenshot(
|
||
session, url, screenshot_path
|
||
)
|
||
else:
|
||
# ── 方案 B:传统页面 + JS 隐藏 ──
|
||
resp = session.fetch(url)
|
||
html_str = resp.body.decode("utf-8", errors="replace")
|
||
title = self._extract_title(html_str)
|
||
content = self._extract_content_from_json(session, url)
|
||
if not content:
|
||
content = self._extract_content(html_str)
|
||
if not screenshot_is_valid:
|
||
screenshot_path = self._take_screenshot(
|
||
session, url, screenshot_path
|
||
)
|
||
if screenshot_is_valid:
|
||
with self._stats_lock:
|
||
self._stats["cache_hit"] += 1
|
||
logger.info(
|
||
f"[LinuxDoPreview] 标题: {title}, 内容长度: {len(content)}"
|
||
)
|
||
if screenshot_path:
|
||
logger.info(
|
||
f"[LinuxDoPreview] 使用截图: {screenshot_path.name}"
|
||
)
|
||
|
||
summary = self._build_summary(title, content, url)
|
||
return screenshot_path, summary
|
||
|
||
# ─────────── 截图(复用 StealthySession 的浏览器上下文) ───────────
|
||
|
||
def _take_screenshot(self, session, url: str, save_path: Path) -> Path | None:
|
||
"""在已有 cf_clearance 的上下文中新建标签页截图"""
|
||
timeout_ms = self.config.get("screenshot_timeout", 15) * 1000
|
||
try:
|
||
ctx = session.context
|
||
if not ctx:
|
||
return None
|
||
|
||
page = ctx.new_page()
|
||
page.set_viewport_size({"width": 1280, "height": 900})
|
||
|
||
# ── 导航:等 networkidle 确保 JS 动态内容加载完成 ──
|
||
page.goto(url, wait_until="networkidle", timeout=timeout_ms)
|
||
|
||
# ── 等待 Discourse 帖子内容渲染 ──
|
||
try:
|
||
page.wait_for_selector("#post_1", timeout=min(timeout_ms, 10000))
|
||
except Exception:
|
||
page.wait_for_timeout(3000) # 回退:固定等待
|
||
|
||
# ── 隐藏非楼主内容,只保留第一篇帖子完整展示 ──
|
||
page.evaluate("""() => {
|
||
const hide = (sel) => {
|
||
const el = document.querySelector(sel);
|
||
if (el) el.style.display = 'none';
|
||
};
|
||
hide('.d-header'); // 顶部导航栏
|
||
hide('.sidebar-wrapper'); // 左侧边栏
|
||
hide('.topic-navigation-wrapper'); // 帖子导航条
|
||
hide('.footer-nav.visible'); // 底部导航
|
||
hide('.post-stream'); // 隐藏整个帖子流(后面单独显示楼主)
|
||
|
||
// 隐藏所有回复帖子,只保留楼主
|
||
const posts = document.querySelectorAll('.topic-post');
|
||
posts.forEach((post, i) => { if (i > 0) post.style.display = 'none'; });
|
||
|
||
// 滚动到顶部
|
||
window.scrollTo(0, 0);
|
||
}""")
|
||
|
||
# ── 展开 Discourse 截断的长帖 ──
|
||
page.evaluate("""() => {
|
||
// 移除所有展开按钮和截断遮罩
|
||
const removeSelectors = [
|
||
'.expand-post',
|
||
'.gap-bottom',
|
||
'.gap',
|
||
'.large-post-container .show-more',
|
||
'.topic-body .show-more',
|
||
'.cooked .show-more',
|
||
'.lightbox',
|
||
];
|
||
removeSelectors.forEach(sel => {
|
||
document.querySelectorAll(sel).forEach(el => el.remove());
|
||
});
|
||
|
||
// 移除所有 max-height / overflow 限制
|
||
const unclampSelectors = [
|
||
'.cooked',
|
||
'.topic-body',
|
||
'#post_1 .cooked',
|
||
'#post_1 .topic-body',
|
||
'#post_1 .contents',
|
||
'.large-post-container',
|
||
];
|
||
unclampSelectors.forEach(sel => {
|
||
document.querySelectorAll(sel).forEach(el => {
|
||
el.style.maxHeight = 'none';
|
||
el.style.overflow = 'visible';
|
||
el.style.height = 'auto';
|
||
});
|
||
});
|
||
|
||
// 展开 Discourse 长帖截断(data-* 属性方式)
|
||
document.querySelectorAll('[data-expanded]').forEach(el => {
|
||
el.setAttribute('data-expanded', 'true');
|
||
});
|
||
// 移除 truncated 标记
|
||
document.querySelectorAll('.truncated').forEach(el => {
|
||
el.classList.remove('truncated');
|
||
});
|
||
}""")
|
||
|
||
# ── 点击可能存在的展开按钮 ──
|
||
try:
|
||
expand_buttons = page.query_selector_all(
|
||
'#post_1 .expand-post, #post_1 .show-more, '
|
||
'#post_1 button[class*="expand"], '
|
||
'#post_1 a[class*="expand"]'
|
||
)
|
||
for btn in expand_buttons:
|
||
try:
|
||
btn.click()
|
||
page.wait_for_timeout(300)
|
||
except Exception:
|
||
pass
|
||
except Exception:
|
||
pass
|
||
|
||
# ── 再次展开,防止点击按钮后重新截断 ──
|
||
page.evaluate("""() => {
|
||
['#post_1 .cooked', '#post_1 .topic-body', '#post_1 .contents'].forEach(sel => {
|
||
document.querySelectorAll(sel).forEach(el => {
|
||
el.style.maxHeight = 'none';
|
||
el.style.overflow = 'visible';
|
||
el.style.height = 'auto';
|
||
});
|
||
});
|
||
// 确保图片容器也不截断
|
||
document.querySelectorAll('#post_1 .lightbox-wrapper').forEach(el => {
|
||
el.style.maxHeight = 'none';
|
||
el.style.overflow = 'visible';
|
||
});
|
||
}""")
|
||
|
||
# ── 滚动楼主帖子,触发懒加载图片 ──
|
||
post1_box = page.evaluate("""() => {
|
||
const p1 = document.querySelector('#post_1');
|
||
if (!p1) return null;
|
||
const rect = p1.getBoundingClientRect();
|
||
return { top: rect.top + window.scrollY, height: rect.height };
|
||
}""")
|
||
if post1_box:
|
||
post_top = int(post1_box.get('top', 0))
|
||
post_height = int(post1_box.get('height', 0))
|
||
for y in range(post_top, post_top + post_height, 400):
|
||
page.evaluate(f"window.scrollTo(0, {y})")
|
||
page.wait_for_timeout(200)
|
||
else:
|
||
# 回退:滚动整个页面
|
||
total_height = page.evaluate("document.body.scrollHeight")
|
||
for y in range(0, total_height, 400):
|
||
page.evaluate(f"window.scrollTo(0, {y})")
|
||
page.wait_for_timeout(200)
|
||
|
||
# ── 等待图片加载完成 ──
|
||
page.evaluate("""() => {
|
||
return new Promise(resolve => {
|
||
const imgs = document.querySelectorAll('#post_1 img');
|
||
let loaded = 0;
|
||
const total = imgs.length;
|
||
if (total === 0) return resolve();
|
||
imgs.forEach(img => {
|
||
if (img.complete) {
|
||
loaded++;
|
||
if (loaded >= total) resolve();
|
||
} else {
|
||
img.onload = img.onerror = () => {
|
||
loaded++;
|
||
if (loaded >= total) resolve();
|
||
};
|
||
}
|
||
});
|
||
// 最多等 3 秒
|
||
setTimeout(resolve, 3000);
|
||
});
|
||
}""")
|
||
|
||
# ── 滚动回顶部 ──
|
||
page.evaluate("window.scrollTo(0, 0)")
|
||
page.wait_for_timeout(500)
|
||
|
||
# ── 截图:全页模式,隐藏导航栏后内容干净 ──
|
||
full_page = self.config.get("screenshot_full_page", True)
|
||
page.screenshot(
|
||
path=str(save_path),
|
||
full_page=full_page,
|
||
timeout=timeout_ms,
|
||
)
|
||
|
||
sz = save_path.stat().st_size
|
||
logger.info(
|
||
f"[LinuxDoPreview] 截图保存: {save_path.name} ({sz / 1024:.1f} KB)"
|
||
)
|
||
page.close()
|
||
return save_path
|
||
|
||
except Exception as e:
|
||
logger.warning(f"[LinuxDoPreview] 截图失败: {type(e).__name__}: {e}")
|
||
return None
|
||
|
||
# ─────────── 文本提取 ───────────
|
||
|
||
def _extract_content_from_json(self, session, url: str) -> str:
|
||
"""通过 Discourse JSON API 获取完整的楼主帖子内容
|
||
|
||
Discourse 的 .json 端点返回结构化数据,包含完整的 cooked HTML,
|
||
不受页面截断、懒加载或 Cloudflare 渲染问题的影响。
|
||
"""
|
||
try:
|
||
# 构造 JSON URL:topic-url.json 或 topic-url/1.json
|
||
json_url = url.rstrip('/')
|
||
if not json_url.endswith('.json'):
|
||
# 对于帖子链接如 /t/topic-slug/12345/5,取前两段
|
||
parts = json_url.split('/')
|
||
# 找到 /t/ 后的部分
|
||
t_idx = -1
|
||
for i, p in enumerate(parts):
|
||
if p == 't':
|
||
t_idx = i
|
||
break
|
||
if t_idx >= 0 and len(parts) > t_idx + 2:
|
||
# 重建为 /t/slug/id 格式
|
||
json_url = '/'.join(parts[:t_idx + 3])
|
||
json_url += '.json'
|
||
|
||
logger.info(f"[LinuxDoPreview] JSON API 请求: {json_url}")
|
||
resp = session.fetch(json_url)
|
||
if resp.status != 200:
|
||
logger.info(f"[LinuxDoPreview] JSON API 返回 {resp.status}")
|
||
return ""
|
||
|
||
import json
|
||
data = json.loads(resp.body.decode("utf-8", errors="replace"))
|
||
|
||
# 从 post_stream 中提取第一个帖子(楼主)
|
||
post_stream = data.get("post_stream", {})
|
||
posts = post_stream.get("posts", [])
|
||
if not posts:
|
||
return ""
|
||
|
||
first_post = posts[0]
|
||
cooked_html = first_post.get("cooked", "")
|
||
if not cooked_html:
|
||
return ""
|
||
|
||
# 使用 lxml 解析 HTML 并提取纯文本
|
||
if _lh is not None:
|
||
tree = _lh.fromstring(cooked_html)
|
||
return _clean_text(tree.text_content())
|
||
|
||
# 回退:正则去标签
|
||
text = re.sub(r"<[^>]+>", " ", cooked_html)
|
||
text = re.sub(r"\s+", " ", text).strip()
|
||
return html_mod.unescape(text)
|
||
|
||
except Exception as e:
|
||
logger.info(f"[LinuxDoPreview] JSON API 提取失败: {type(e).__name__}: {e}")
|
||
return ""
|
||
|
||
# ─────────── 登录支持(Cookie 注入) ───────────
|
||
|
||
def _has_session_cookie(self) -> bool:
|
||
"""检查是否配置了会话 cookie"""
|
||
cookie = self.config.get("linuxdo_session_cookie", "") or ""
|
||
return bool(cookie.strip())
|
||
|
||
def _has_auto_login(self) -> bool:
|
||
"""检查是否配置了自动登录凭据"""
|
||
u = self.config.get("linuxdo_username", "") or ""
|
||
p = self.config.get("linuxdo_password", "") or ""
|
||
return bool(u.strip() and p.strip())
|
||
|
||
_COOKIE_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_.-]*$")
|
||
# linux.do / Discourse / Cloudflare 可能出现的 cookie 名
|
||
_KNOWN_COOKIE_NAMES = {
|
||
"_t", "_forum_session", "cf_clearance", "_bypass_cache", "dosp",
|
||
"_pf", "_bblean", "theme_ids", "previousVisitAt", "messages-last-modified",
|
||
"_ga", "_gid", "_gcl_au",
|
||
}
|
||
|
||
def _parse_cookie_pairs(self, cookie_str: str) -> list[dict]:
|
||
"""将用户配置的 cookie 字符串解析为 (name, value) 列表。
|
||
|
||
支持三种输入:
|
||
- 完整 Cookie 头(含分号):'_t=xxx; _forum_session=yyy'
|
||
- 单个 'name=value'(name 须是已知 cookie 名):'_t=xxx'
|
||
- 单个裸值(直接当作 _forum_session 的值,向后兼容)
|
||
|
||
说明:Discourse 的 _forum_session 值是 base64,常带 '=' 填充,因此不能
|
||
仅凭是否含 '=' 判断格式,否则会把裸值误判成 name=value。
|
||
"""
|
||
pairs: list[dict] = []
|
||
s = (cookie_str or "").strip()
|
||
if not s:
|
||
return pairs
|
||
if ";" in s:
|
||
# 完整 Cookie 头:按分号拆分
|
||
for part in s.split(";"):
|
||
part = part.strip()
|
||
if "=" not in part:
|
||
continue
|
||
name, value = part.split("=", 1)
|
||
name = name.strip()
|
||
if self._COOKIE_NAME_RE.match(name):
|
||
pairs.append({"name": name, "value": value.strip()})
|
||
elif "=" in s:
|
||
# 无分号但含 '=':仅当前缀是已知 cookie 名时才按 name=value 解析
|
||
name, value = s.split("=", 1)
|
||
name = name.strip()
|
||
if name in self._KNOWN_COOKIE_NAMES and self._COOKIE_NAME_RE.match(name):
|
||
pairs.append({"name": name, "value": value.strip()})
|
||
if not pairs:
|
||
# 裸值 → 当作 _forum_session(向后兼容)
|
||
pairs.append({"name": "_forum_session", "value": s})
|
||
return pairs
|
||
|
||
def _inject_session_cookie(self, session, cookie_value: str = "") -> bool:
|
||
"""将会话 cookie 注入到当前浏览器上下文。
|
||
|
||
注意:StealthySession 每次请求都是新建的浏览器上下文,cookie 不会跨
|
||
请求保留,因此【每个会话都必须重新注入】。
|
||
|
||
Returns: True 表示注入成功,False 表示失败
|
||
"""
|
||
if not cookie_value:
|
||
cookie_value = (self.config.get("linuxdo_session_cookie", "") or "").strip()
|
||
if not cookie_value:
|
||
return False
|
||
|
||
ctx = session.context
|
||
if not ctx:
|
||
return False
|
||
|
||
pairs = self._parse_cookie_pairs(cookie_value)
|
||
if not pairs:
|
||
return False
|
||
|
||
cookies = []
|
||
for p in pairs:
|
||
# _t / _forum_session 是 HttpOnly;其余 cookie 按普通处理
|
||
http_only = p["name"] in ("_t", "_forum_session")
|
||
cookies.append({
|
||
"name": p["name"],
|
||
"value": p["value"],
|
||
"domain": "linux.do",
|
||
"path": "/",
|
||
"httpOnly": http_only,
|
||
"secure": True,
|
||
"sameSite": "Lax",
|
||
})
|
||
|
||
try:
|
||
ctx.add_cookies(cookies)
|
||
logger.info(
|
||
f"[LinuxDoPreview] 已注入会话 cookie: {[c['name'] for c in cookies]}"
|
||
)
|
||
return True
|
||
except Exception as e:
|
||
logger.warning(f"[LinuxDoPreview] Cookie 注入失败: {type(e).__name__}: {e}")
|
||
return False
|
||
|
||
def _check_login_state(self, session) -> bool:
|
||
"""检查当前会话是否已登录。
|
||
|
||
使用 /notifications.json:已登录返回 200,匿名返回 403。
|
||
(/session/current_user.json 对匿名用户也返回 404,无法区分,故弃用。)
|
||
"""
|
||
try:
|
||
resp = session.fetch(
|
||
"https://linux.do/notifications.json", timeout=30000
|
||
)
|
||
return resp.status == 200
|
||
except Exception:
|
||
return False
|
||
|
||
def _ensure_authenticated(self, session) -> bool:
|
||
"""在已绕过 CF 的上下文中按需认证。
|
||
|
||
重要:StealthySession 每次请求都会新建,浏览器上下文不跨请求保留,因此
|
||
配置的 Cookie 必须【每次都注入】当前会话;而【是否登录】的校验结果可以
|
||
缓存(Cookie 有效性不会在请求间变化)。
|
||
|
||
逻辑:
|
||
1) 配置了 linuxdo_session_cookie → 每次注入;首次校验后缓存结果
|
||
2) 仅配置了用户名/密码 → linux.do 登录受 hCaptcha 保护,无法自动登录,
|
||
仅提示一次并降级为匿名访问
|
||
3) 都没配置 → 匿名访问
|
||
"""
|
||
# ── 手动 Cookie:每次请求都注入(上下文是新建的) ──
|
||
if self._has_session_cookie():
|
||
cookie_value = (self.config.get("linuxdo_session_cookie", "") or "").strip()
|
||
if not self._inject_session_cookie(session, cookie_value):
|
||
self._auth_check_done = True
|
||
self._logged_in = False
|
||
return False
|
||
# 校验结果只算一次(Cookie 有效性跨请求稳定)
|
||
if not self._auth_check_done:
|
||
self._logged_in = self._check_login_state(session)
|
||
self._auth_check_done = True
|
||
if self._logged_in:
|
||
logger.info("[LinuxDoPreview] Cookie 登录验证成功")
|
||
else:
|
||
logger.warning(
|
||
"[LinuxDoPreview] 会话 Cookie 无效或已过期,将匿名访问。"
|
||
"请在浏览器重新获取 Cookie(推荐 _t,长效)后填入配置。"
|
||
)
|
||
return self._logged_in
|
||
|
||
# ── 仅用户名/密码:受 hCaptcha 限制,无法自动登录(仅提示一次) ──
|
||
if self._has_auto_login() and not self._auth_check_done:
|
||
self._auth_check_done = True
|
||
logger.warning(
|
||
"[LinuxDoPreview] linux.do 登录启用了 hCaptcha 人机验证,账号密码"
|
||
"自动登录不可用。请在浏览器登录 linux.do 后,F12 → Application → "
|
||
"Cookies → 复制 _t(推荐,长效)或 _forum_session 的值,填入 "
|
||
"linuxdo_session_cookie 配置项。本次降级为匿名访问。"
|
||
)
|
||
|
||
# 都没配置 / 自动登录不可用 → 匿名
|
||
self._logged_in = False
|
||
return False
|
||
|
||
def _fetch_topic_data(self, session, url: str) -> dict | None:
|
||
"""通过 Discourse JSON API 获取完整的主题数据
|
||
|
||
返回的 dict 包含帖子原始数据(cooked HTML、作者、标签、统计等),
|
||
可同时供文本提取和自定义 HTML 渲染使用。
|
||
"""
|
||
try:
|
||
json_url = url.rstrip('/')
|
||
if not json_url.endswith('.json'):
|
||
parts = json_url.split('/')
|
||
t_idx = -1
|
||
for i, p in enumerate(parts):
|
||
if p == 't':
|
||
t_idx = i
|
||
break
|
||
if t_idx >= 0 and len(parts) > t_idx + 2:
|
||
json_url = '/'.join(parts[:t_idx + 3])
|
||
json_url += '.json'
|
||
|
||
logger.info(f"[LinuxDoPreview] 拉取 topic JSON: {json_url}")
|
||
resp = session.fetch(json_url)
|
||
if resp.status != 200:
|
||
logger.info(f"[LinuxDoPreview] topic JSON 返回 {resp.status}")
|
||
return None
|
||
import json
|
||
return json.loads(resp.body.decode("utf-8", errors="replace"))
|
||
except Exception as e:
|
||
logger.info(f"[LinuxDoPreview] topic JSON 拉取失败: {type(e).__name__}: {e}")
|
||
return None
|
||
|
||
@staticmethod
|
||
def _extract_title(html_str: str) -> str:
|
||
m = re.search(r"<title>(.*?)</title>", html_str, re.DOTALL | re.IGNORECASE)
|
||
if m:
|
||
t = m.group(1).strip()
|
||
t = re.sub(
|
||
r"\s*[-–—|]\s*(LINUX\s*DO|LINUXDO).*$", "", t, flags=re.IGNORECASE
|
||
)
|
||
return t.strip()
|
||
return "无标题"
|
||
|
||
def _extract_content(self, html_str: str) -> str:
|
||
try:
|
||
return self._extract_via_lxml(html_str)
|
||
except Exception:
|
||
pass
|
||
try:
|
||
return self._extract_via_regex(html_str)
|
||
except Exception:
|
||
pass
|
||
return ""
|
||
|
||
def _extract_via_lxml(self, html_str: str) -> str:
|
||
if _lh is None:
|
||
return ""
|
||
tree = _lh.fromstring(html_str)
|
||
# 使用更精确的选择器:只提取楼主的内容
|
||
# #post_1 是楼主帖子的 ID
|
||
post_1 = tree.cssselect("#post_1")
|
||
if not post_1:
|
||
# 回退:提取第一个 .cooked
|
||
for el in tree.cssselect(".cooked"):
|
||
text = _clean_text(el.text_content())
|
||
if len(text) > 15:
|
||
return text
|
||
return ""
|
||
# 提取楼主帖子中的 .cooked 内容
|
||
cooked = post_1[0].cssselect(".cooked")
|
||
if cooked:
|
||
return _clean_text(cooked[0].text_content())
|
||
return ""
|
||
|
||
def _extract_via_regex(self, html_str: str) -> str:
|
||
# 使用更精确的正则表达式:匹配楼主帖子
|
||
# 先尝试匹配 #post_1 的帖子
|
||
post_1_match = re.search(
|
||
r'<article[^>]*id="post_1"[^>]*>.*?<div\s+class="cooked">(.*?)</div>\s*</article>',
|
||
html_str,
|
||
re.DOTALL
|
||
)
|
||
if post_1_match:
|
||
text = re.sub(r"<[^>]+>", " ", post_1_match.group(1))
|
||
text = re.sub(r"\s+", " ", text).strip()
|
||
if len(text) > 15:
|
||
return text
|
||
# 回退:提取第一个 .cooked
|
||
for m in re.finditer(
|
||
r'<div\s+class="cooked">(.*?)</div>\s*</article>', html_str, re.DOTALL
|
||
):
|
||
text = re.sub(r"<[^>]+>", " ", m.group(1))
|
||
text = re.sub(r"\s+", " ", text).strip()
|
||
if len(text) > 15:
|
||
return text
|
||
return ""
|
||
|
||
def _build_summary(self, title: str, content: str, url: str) -> str:
|
||
lines = [f"📌 {title}"]
|
||
if content:
|
||
lines.append("")
|
||
max_len = self.config.get("max_content_length", 400)
|
||
lines.append(content[:max_len])
|
||
if len(content) > max_len:
|
||
lines[-1] += "…"
|
||
lines.append("")
|
||
lines.append(f"🔗 {url}")
|
||
return "\n".join(lines)
|
||
|
||
@staticmethod
|
||
def _safe_title(topic_data: dict | None) -> str:
|
||
"""从 topic JSON 中安全提取标题(剥除尾部 - Linux DO 后缀)"""
|
||
if not topic_data:
|
||
return "无标题"
|
||
title = topic_data.get("title") or topic_data.get("fancy_title") or "无标题"
|
||
title = re.sub(
|
||
r"\s*[-–—|]\s*(LINUX\s*DO|LINUXDO).*$", "", title, flags=re.IGNORECASE
|
||
)
|
||
return title.strip() or "无标题"
|
||
|
||
def _extract_content_from_topic_data(self, topic_data: dict) -> str:
|
||
"""从已拉取的 topic JSON 中提取楼主帖子纯文本"""
|
||
try:
|
||
post_stream = topic_data.get("post_stream", {}) or {}
|
||
posts = post_stream.get("posts", []) or []
|
||
if not posts:
|
||
return ""
|
||
cooked_html = posts[0].get("cooked", "") or ""
|
||
if not cooked_html:
|
||
return ""
|
||
if _lh is not None:
|
||
tree = _lh.fromstring(cooked_html)
|
||
return _clean_text(tree.text_content())
|
||
text = re.sub(r"<[^>]+>", " ", cooked_html)
|
||
text = re.sub(r"\s+", " ", text).strip()
|
||
return html_mod.unescape(text)
|
||
except Exception as e:
|
||
logger.info(f"[LinuxDoPreview] topic JSON 文本提取失败: {type(e).__name__}: {e}")
|
||
return ""
|
||
|
||
# ─────────── HTML 预览渲染(API + Scrapling 协作) ───────────
|
||
|
||
def _build_preview_html(self, topic_data: dict, url: str) -> str:
|
||
"""根据 topic JSON 生成自定义预览 HTML
|
||
|
||
优点:布局干净、包含完整内容(不受 Discourse 页面截断/懒加载影响)、
|
||
可控制样式适配聊天平台预览图。
|
||
"""
|
||
# 抽取关键字段
|
||
title = html_mod.escape(topic_data.get("title", "无标题") or "无标题")
|
||
fancy_title = html_mod.escape(topic_data.get("fancy_title", title) or title)
|
||
posts_count = topic_data.get("posts_count", 0)
|
||
views = topic_data.get("views", 0)
|
||
like_count = topic_data.get("like_count", 0)
|
||
created_at = topic_data.get("created_at", "")
|
||
tags = topic_data.get("tags", []) or []
|
||
|
||
post_stream = topic_data.get("post_stream", {}) or {}
|
||
posts = post_stream.get("posts", []) or []
|
||
if not posts:
|
||
return ""
|
||
first = posts[0]
|
||
author_name = html_mod.escape(first.get("name", "") or first.get("username", "") or "")
|
||
author_username = html_mod.escape(first.get("username", "") or "")
|
||
author_initial = (author_name or author_username or "?").strip()[:1].upper()
|
||
author_avatar_raw = first.get("avatar_template", "") or ""
|
||
# 绝对化 + 替换 {size}。Discourse 模板形如:
|
||
# //host/.../avatar.png{size} → 需保留 {size} 占位
|
||
# //host/.../avatar.png → 无占位,原样
|
||
# 任何模板中包含 {size} 都要换为像素数值;否则不强制改动
|
||
if author_avatar_raw and author_avatar_raw.startswith("//"):
|
||
author_avatar = "https:" + author_avatar_raw
|
||
elif author_avatar_raw and author_avatar_raw.startswith("/"):
|
||
author_avatar = "https://linux.do" + author_avatar_raw
|
||
else:
|
||
author_avatar = author_avatar_raw
|
||
if "{size}" in author_avatar:
|
||
author_avatar = author_avatar.replace("{size}", "120")
|
||
post_created = first.get("created_at", "") or ""
|
||
post_like = first.get("like_count", 0)
|
||
cooked_html = first.get("cooked", "") or ""
|
||
|
||
# 把 Discourse 相对资源 URL 补全为绝对 URL
|
||
cooked_html = self._normalize_cooked_urls(cooked_html)
|
||
|
||
# 发布时间格式化
|
||
created_text = ""
|
||
if post_created:
|
||
try:
|
||
created_text = post_created.split("T")[0]
|
||
except Exception:
|
||
created_text = post_created
|
||
|
||
tags_html = "".join(
|
||
f'<span class="tag">#{html_mod.escape(t["name"] if isinstance(t, dict) else str(t))}</span>'
|
||
for t in tags[:6]
|
||
)
|
||
|
||
# 头像、统计数字格式化
|
||
views_text = self._format_count(views)
|
||
posts_text = self._format_count(posts_count)
|
||
likes_text = self._format_count(like_count)
|
||
|
||
# 头像 img 标签(未提供 URL 时仅渲染 fallback 块)
|
||
if author_avatar:
|
||
avatar_img_html = (
|
||
'<img class="avatar" src="'
|
||
+ html_mod.escape(author_avatar)
|
||
+ '" alt="avatar" onerror="this.style.display='
|
||
+ chr(39) + 'none' + chr(39) + '">'
|
||
)
|
||
else:
|
||
avatar_img_html = ''
|
||
|
||
# 完整预览 HTML(含内联 CSS)
|
||
return f"""<!DOCTYPE html>
|
||
<html lang="zh-CN">
|
||
<head>
|
||
<meta charset="utf-8">
|
||
<title>{title}</title>
|
||
<style>
|
||
* {{ box-sizing: border-box; }}
|
||
html, body {{ margin: 0; padding: 0; }}
|
||
body {{
|
||
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", "PingFang SC",
|
||
"Hiragino Sans GB", "Microsoft YaHei", Arial, sans-serif;
|
||
background: #f5f6f8;
|
||
color: #1c1c1c;
|
||
padding: 24px;
|
||
line-height: 1.6;
|
||
}}
|
||
.card {{
|
||
background: #ffffff;
|
||
border-radius: 12px;
|
||
box-shadow: 0 2px 8px rgba(0,0,0,0.06);
|
||
max-width: 760px;
|
||
margin: 0 auto;
|
||
overflow: hidden;
|
||
}}
|
||
.header {{
|
||
padding: 20px 24px 16px 24px;
|
||
border-bottom: 1px solid #eef0f3;
|
||
}}
|
||
.title {{
|
||
font-size: 20px;
|
||
font-weight: 700;
|
||
color: #1769c4;
|
||
margin: 0 0 10px 0;
|
||
line-height: 1.4;
|
||
word-break: break-word;
|
||
}}
|
||
.meta {{
|
||
display: flex;
|
||
align-items: center;
|
||
gap: 10px;
|
||
color: #6a737c;
|
||
font-size: 13px;
|
||
}}
|
||
.meta img.avatar {{
|
||
width: 28px; height: 28px; border-radius: 50%;
|
||
object-fit: cover; background: #ddd;
|
||
}}
|
||
.meta .avatar-wrap {{
|
||
position: relative;
|
||
width: 28px; height: 28px;
|
||
display: inline-block;
|
||
}}
|
||
.meta .avatar-wrap img {{
|
||
position: absolute; inset: 0;
|
||
}}
|
||
.meta .avatar-fallback {{
|
||
position: absolute; inset: 0;
|
||
width: 28px; height: 28px;
|
||
border-radius: 50%;
|
||
background: linear-gradient(135deg, #1769c4, #5a3ec8);
|
||
color: #fff; font-weight: 600;
|
||
display: flex; align-items: center; justify-content: center;
|
||
font-size: 13px;
|
||
text-transform: uppercase;
|
||
}}
|
||
.meta .name {{ color: #1c1c1c; font-weight: 500; }}
|
||
.stats {{
|
||
padding: 10px 24px;
|
||
display: flex;
|
||
gap: 18px;
|
||
color: #6a737c;
|
||
font-size: 13px;
|
||
border-bottom: 1px solid #eef0f3;
|
||
background: #fafbfc;
|
||
}}
|
||
.stats span::before {{ margin-right: 4px; }}
|
||
.tags {{
|
||
padding: 10px 24px 0 24px;
|
||
display: flex;
|
||
gap: 6px;
|
||
flex-wrap: wrap;
|
||
}}
|
||
.tag {{
|
||
background: #e8f0fe;
|
||
color: #1769c4;
|
||
padding: 2px 8px;
|
||
border-radius: 4px;
|
||
font-size: 12px;
|
||
}}
|
||
.content {{
|
||
padding: 16px 24px 8px 24px;
|
||
word-break: break-word;
|
||
}}
|
||
.content p {{ margin: 0 0 10px 0; }}
|
||
.content h1, .content h2, .content h3 {{ margin: 16px 0 8px 0; }}
|
||
.content img {{
|
||
max-width: 100%;
|
||
height: auto;
|
||
border-radius: 6px;
|
||
display: block;
|
||
margin: 8px 0;
|
||
}}
|
||
.content pre, .content code {{
|
||
background: #f6f8fa;
|
||
border-radius: 4px;
|
||
padding: 2px 6px;
|
||
font-family: "SFMono-Regular", Consolas, "Liberation Mono", Menlo, monospace;
|
||
font-size: 13px;
|
||
}}
|
||
.content pre {{ padding: 10px 12px; overflow-x: auto; }}
|
||
.content blockquote {{
|
||
border-left: 3px solid #d0d7de;
|
||
margin: 8px 0;
|
||
padding: 0 12px;
|
||
color: #57606a;
|
||
background: #f6f8fa;
|
||
}}
|
||
.content a {{ color: #1769c4; text-decoration: none; }}
|
||
.content ul, .content ol {{ padding-left: 24px; }}
|
||
.footer {{
|
||
padding: 12px 24px 18px 24px;
|
||
border-top: 1px solid #eef0f3;
|
||
color: #6a737c;
|
||
font-size: 12px;
|
||
word-break: break-all;
|
||
}}
|
||
.footer a {{ color: #1769c4; text-decoration: none; }}
|
||
</style>
|
||
</head>
|
||
<body>
|
||
<div class="card">
|
||
<div class="header">
|
||
<h1 class="title">{fancy_title}</h1>
|
||
<div class="meta">
|
||
<div class="avatar-wrap">
|
||
{avatar_img_html}
|
||
<div class="avatar-fallback">{html_mod.escape(author_initial)}</div>
|
||
</div>
|
||
<span class="name">{author_name}</span>
|
||
<span>·</span>
|
||
<span>{html_mod.escape(created_text)}</span>
|
||
</div>
|
||
</div>
|
||
<div class="stats">
|
||
<span>👀 {views_text}</span>
|
||
<span>💬 {posts_text}</span>
|
||
<span>❤ {likes_text}</span>
|
||
</div>
|
||
{('<div class="tags">' + tags_html + '</div>') if tags else ''}
|
||
<div class="content">
|
||
{cooked_html}
|
||
</div>
|
||
<div class="footer">
|
||
🔗 <a href="{html_mod.escape(url)}">{html_mod.escape(url)}</a>
|
||
</div>
|
||
</div>
|
||
</body>
|
||
</html>"""
|
||
|
||
@staticmethod
|
||
def _format_count(n: int) -> str:
|
||
try:
|
||
n = int(n)
|
||
except (TypeError, ValueError):
|
||
return str(n)
|
||
if n >= 10000:
|
||
return f"{n/10000:.1f}w"
|
||
if n >= 1000:
|
||
return f"{n/1000:.1f}k"
|
||
return str(n)
|
||
|
||
@staticmethod
|
||
def _normalize_cooked_urls(cooked_html: str) -> str:
|
||
"""将 cooked 中的相对资源 URL 转绝对 URL,剥离轻臾框包裹与 meta 信息
|
||
|
||
Discourse 的话题里图片常被包在多层 div 中:
|
||
<div class="lightbox-wrapper">
|
||
<a class="lightbox" href="...">
|
||
<img src="...">
|
||
</a>
|
||
<div class="meta">
|
||
<span class="filename">image.png</span>
|
||
<span>988×703 46.8 KB</span>
|
||
</div>
|
||
</div>
|
||
只保留 <img>、丢弃 meta 信息,避免图加载失败后占据巨大空白。
|
||
"""
|
||
if not cooked_html:
|
||
return ""
|
||
try:
|
||
import re as _re
|
||
# 1) 绝对化 src/href(相对与协议无关 URL)
|
||
cooked_html = _re.sub(
|
||
r'(src|href)="(//[^"]+)"',
|
||
r'\1="https:\2',
|
||
cooked_html,
|
||
)
|
||
cooked_html = _re.sub(
|
||
r'(src|href)="(/uploads/[^"]+)"',
|
||
r'\1="https://linux.do\2',
|
||
cooked_html,
|
||
)
|
||
|
||
# 2) 整块剥离 lightbox-wrapper:仅保留内部 <img>,丢弃其余
|
||
def _pick_imgs(block: str) -> str:
|
||
imgs = _re.findall(r'<img\b[^>]*>', block, flags=_re.IGNORECASE)
|
||
return "".join(imgs)
|
||
|
||
cooked_html = _re.sub(
|
||
r'<div[^>]*class="[^"]*lightbox-wrapper[^"]*"[^>]*>(.*?)</div>',
|
||
lambda m: _pick_imgs(m.group(1)),
|
||
cooked_html,
|
||
flags=_re.DOTALL,
|
||
)
|
||
|
||
# 3) 退路:直接裸的 <a class="lightbox"> 包裹,剥 a、保留 img
|
||
cooked_html = _re.sub(
|
||
r'<a [^>]*class="[^"]*\blightbox\b[^"]*"[^>]*>(.*?)</a>',
|
||
r'\1',
|
||
cooked_html,
|
||
flags=_re.DOTALL,
|
||
)
|
||
|
||
# 4) 删除所有残留的 meta 信息块(文件尺寸、文件名、下载按钮等)
|
||
cooked_html = _re.sub(
|
||
r'<div[^>]*class="[^"]*\bmeta\b[^"]*"[^>]*>.*?</div>',
|
||
'',
|
||
cooked_html,
|
||
flags=_re.DOTALL,
|
||
)
|
||
cooked_html = _re.sub(
|
||
r'<span[^>]*class="[^"]*\bfilename\b[^"]*"[^>]*>.*?</span>',
|
||
'',
|
||
cooked_html,
|
||
flags=_re.DOTALL,
|
||
)
|
||
|
||
# 5) 删除代码块顶部的工具栏(copy/undo 按钮)防止占位
|
||
cooked_html = _re.sub(
|
||
r'<div[^>]*class="[^"]*\bcodeblock-buttons\b[^"]*"[^>]*>.*?</div>',
|
||
'',
|
||
cooked_html,
|
||
flags=_re.DOTALL,
|
||
)
|
||
cooked_html = _re.sub(
|
||
r'<pre[^>]*>\s*<div[^>]*class="[^"]*\bpre-actions\b[^"]*"[^>]*>.*?</div>',
|
||
'<pre>',
|
||
cooked_html,
|
||
flags=_re.DOTALL,
|
||
)
|
||
|
||
# 6) 删除 download 按钮、悬浮提示等装饰
|
||
cooked_html = _re.sub(
|
||
r'<a[^>]*class="[^"]*\bdownload[^"]*"[^>]*>.*?</a>',
|
||
'',
|
||
cooked_html,
|
||
flags=_re.DOTALL,
|
||
)
|
||
|
||
except Exception:
|
||
pass
|
||
return cooked_html
|
||
|
||
def _render_html_screenshot(self, session, html: str, save_path: Path) -> Path | None:
|
||
"""在已破解 CF 的浏览器上下文中渲染自定义 HTML 并截图
|
||
|
||
page.set_content() 不走网络导航,纯本地渲染:零 Cloudflare、零超时、
|
||
零依赖 Discourse 页面布局。content-length 限制为实际内容大小。
|
||
"""
|
||
timeout_ms = self.config.get("screenshot_timeout", 15) * 1000
|
||
if not html:
|
||
return None
|
||
try:
|
||
ctx = session.context
|
||
if not ctx:
|
||
return None
|
||
page = ctx.new_page()
|
||
page.set_viewport_size({"width": 820, "height": 1200})
|
||
|
||
# 设置内容,等待图片资源加载
|
||
page.set_content(html, wait_until="domcontentloaded", timeout=timeout_ms)
|
||
|
||
# 主动等所有 <img> 加载完成(最多 3s),并剔除加载失败的图
|
||
page.evaluate("""() => new Promise(resolve => {
|
||
const imgs = document.querySelectorAll('img');
|
||
if (!imgs.length) return resolve();
|
||
let done = 0;
|
||
const tick = (img) => {
|
||
done++;
|
||
// 图加载失败:移除 <img> 避免占位巨大空白
|
||
if (img.complete && img.naturalWidth === 0) {
|
||
img.remove();
|
||
}
|
||
if (done >= imgs.length) resolve();
|
||
};
|
||
imgs.forEach(img => {
|
||
if (img.complete) tick(img);
|
||
else {
|
||
img.addEventListener('load', () => tick(img), { once: true });
|
||
img.addEventListener('error', () => tick(img), { once: true });
|
||
}
|
||
});
|
||
setTimeout(resolve, 3000);
|
||
})""")
|
||
|
||
page.wait_for_timeout(300)
|
||
|
||
# ── 自适应截图:总是优先对 .card 元素截图,按内容实际边界拍 ──
|
||
# 元素截图零空白、零截断,不受 viewport 高度限制。
|
||
# `screenshot_full_page` 仅作为后备回退:元素截图失败时才使用。
|
||
card_locator = page.locator(".card")
|
||
full_page = self.config.get("screenshot_full_page", True)
|
||
try:
|
||
if card_locator.count() > 0:
|
||
card_locator.first.screenshot(
|
||
path=str(save_path),
|
||
timeout=timeout_ms,
|
||
)
|
||
else:
|
||
page.screenshot(
|
||
path=str(save_path),
|
||
full_page=full_page,
|
||
timeout=timeout_ms,
|
||
)
|
||
except Exception:
|
||
# 回退:若元素截图失败(少见),退到全页截图
|
||
page.screenshot(
|
||
path=str(save_path),
|
||
full_page=full_page,
|
||
timeout=timeout_ms,
|
||
)
|
||
sz = save_path.stat().st_size
|
||
logger.info(
|
||
f"[LinuxDoPreview] 渲染截图: {save_path.name} ({sz / 1024:.1f} KB)"
|
||
)
|
||
page.close()
|
||
return save_path
|
||
except Exception as e:
|
||
logger.warning(f"[LinuxDoPreview] HTML 渲染失败: {type(e).__name__}: {e}")
|
||
return None
|
||
|
||
# ─────────── 管理指令 ───────────
|
||
|
||
@filter.command("linuxdo_stats")
|
||
async def show_stats(self, event: AstrMessageEvent):
|
||
screenshots = list(self.screenshot_dir.glob("*.png"))
|
||
cache_size = sum(f.stat().st_size for f in screenshots) / 1024
|
||
yield event.plain_result(
|
||
f"📊 LinuxDo Preview 统计\n"
|
||
f" 请求总数: {self._stats['total']}\n"
|
||
f" 缓存命中: {self._stats['cache_hit']}\n"
|
||
f" 错误次数: {self._stats['error']}\n"
|
||
f" 缓存截图: {len(screenshots)} ({cache_size:.1f} KB)"
|
||
)
|
||
|
||
@filter.command("linuxdo_clean")
|
||
async def clean_cache(self, event: AstrMessageEvent):
|
||
count = 0
|
||
for f in self.screenshot_dir.glob("*.png"):
|
||
f.unlink()
|
||
count += 1
|
||
yield event.plain_result(f"🧹 已清理 {count} 个缓存截图")
|
||
|
||
|
||
def _clean_text(text: str) -> str:
|
||
text = re.sub(r"<[^>]+>", " ", text)
|
||
text = re.sub(r"\s+", " ", text)
|
||
text = html_mod.unescape(text)
|
||
return text.strip()
|