""" astrbot_plugin_linuxdo - LinuxDo 链接检测 & 预览截图插件 检测聊天消息中的 linux.do 链接,使用 Scrapling 的 StealthySession 绕过 Cloudflare Turnstile,分两步:先 fetch 拿文本 + cookies, 再新建标签页截图(复用 cf_clearance,不重复触发验证)。 """ import re import asyncio import time import hashlib from pathlib import Path from concurrent.futures import ThreadPoolExecutor import html as html_mod import threading from astrbot.core.utils.astrbot_path import get_astrbot_data_path from astrbot.api.event import filter, AstrMessageEvent from astrbot.api.star import Context, Star from astrbot.api import logger from astrbot.api import AstrBotConfig try: from scrapling.fetchers import StealthySession as _StealthySession from lxml import html as _lh SCRAPLING_AVAILABLE = True except ImportError: SCRAPLING_AVAILABLE = False _StealthySession = None _lh = None _EXECUTOR = ThreadPoolExecutor(max_workers=2, thread_name_prefix="linuxdo") class LinuxDoPreviewPlugin(Star): """LinuxDo 链接预览插件""" def __init__(self, context: Context, config: AstrBotConfig): super().__init__(context) self.config = config self.data_dir = Path(get_astrbot_data_path()) / "plugin_data" / "astrbot_plugin_linuxdo" self.screenshot_dir = self.data_dir / "screenshots" self.screenshot_dir.mkdir(parents=True, exist_ok=True) logger.info(f"[LinuxDoPreview] 插件已加载,截图目录: {self.screenshot_dir}") if not SCRAPLING_AVAILABLE: logger.warning( "[LinuxDoPreview] scrapling 未安装!" "执行: pip install scrapling[fetchers] && scrapling install && playwright install-deps chromium" ) self._stats = {"total": 0, "cache_hit": 0, "error": 0} self._stats_lock = threading.Lock() # 登录状态:跨 fetch 复用在同一 StealthySession 中 self._auth_check_done = False self._logged_in = False async def terminate(self): _EXECUTOR.shutdown(wait=False) logger.info("[LinuxDoPreview] 插件已卸载") # ─────────── 消息入口 ─────────── @filter.event_message_type(filter.EventMessageType.ALL) async def on_message(self, event: AstrMessageEvent): text = event.message_str if not text: return urls = re.finditer( r"https?://(?:[a-z0-9.\-]+\.)*linux\.do/[^\s\"')>}]+", text, re.IGNORECASE, ) matched_urls = [m.group(0).rstrip(".,;:!?") for m in urls] if not matched_urls: return target_url = matched_urls[0] logger.info(f"[LinuxDoPreview] 检测到链接: {target_url}") if self._should_skip(target_url): return yield event.plain_result("🔍 正在读取 linux.do 页面…") try: screenshot_path, summary = await asyncio \ .get_event_loop() \ .run_in_executor(_EXECUTOR, self._fetch_preview, target_url) if screenshot_path and screenshot_path.exists(): yield event.image_result(str(screenshot_path.absolute())) if summary: yield event.plain_result(summary) with self._stats_lock: self._stats["total"] += 1 except Exception as e: with self._stats_lock: self._stats["error"] += 1 logger.error(f"[LinuxDoPreview] 预览失败: {type(e).__name__}: {e}") yield event.plain_result(f"❌ 预览获取失败: {str(e)[:200]}") # ─────────── 预处理 ─────────── @staticmethod def _should_skip(url: str) -> bool: skip = [ r"linux\.do/?$", r"linux\.do/latest", r"linux\.do/categories", r"linux\.do/tag/", r"linux\.do/u/", r"linux\.do/my/", ] return any(re.search(p, url, re.IGNORECASE) for p in skip) # ─────────── 核心:两步法 ─────────── def _fetch_preview(self, url: str): if not SCRAPLING_AVAILABLE: raise RuntimeError("Scrapling 未安装") url_hash = hashlib.md5(url.encode()).hexdigest() screenshot_path = self.screenshot_dir / f"{url_hash}.png" cache_ttl = self.config.get("cache_ttl", 1800) screenshot_is_valid = False if screenshot_path.exists(): sz = screenshot_path.stat().st_size age = time.time() - screenshot_path.stat().st_mtime screenshot_is_valid = ( cache_ttl > 0 and age < cache_ttl and sz > 50 * 1024 # 小于 50KB 的截图视为无效(黑屏/空白) ) use_api_render = self.config.get("use_api_render", True) with _StealthySession( # type: ignore[union-attr] headless=True, solve_cloudflare=True ) as session: # ── 可选:按需登录以访问受限内容 ── self._ensure_authenticated(session) if use_api_render: # ── 方案 A:API + 自定义 HTML 渲染(推荐)── topic_data = self._fetch_topic_data(session, url) title = self._safe_title(topic_data) if topic_data: content = self._extract_content_from_topic_data(topic_data) if not screenshot_is_valid: html = self._build_preview_html(topic_data, url) if html: screenshot_path = self._render_html_screenshot( session, html, screenshot_path ) else: # API 拉取失败 → 回退原方案 resp = session.fetch(url) html_str = resp.body.decode("utf-8", errors="replace") title = self._extract_title(html_str) content = self._extract_content(html_str) if not screenshot_is_valid: screenshot_path = self._take_screenshot( session, url, screenshot_path ) else: # ── 方案 B:传统页面 + JS 隐藏 ── resp = session.fetch(url) html_str = resp.body.decode("utf-8", errors="replace") title = self._extract_title(html_str) content = self._extract_content_from_json(session, url) if not content: content = self._extract_content(html_str) if not screenshot_is_valid: screenshot_path = self._take_screenshot( session, url, screenshot_path ) if screenshot_is_valid: with self._stats_lock: self._stats["cache_hit"] += 1 logger.info( f"[LinuxDoPreview] 标题: {title}, 内容长度: {len(content)}" ) if screenshot_path: logger.info( f"[LinuxDoPreview] 使用截图: {screenshot_path.name}" ) summary = self._build_summary(title, content, url) return screenshot_path, summary # ─────────── 截图(复用 StealthySession 的浏览器上下文) ─────────── def _take_screenshot(self, session, url: str, save_path: Path) -> Path | None: """在已有 cf_clearance 的上下文中新建标签页截图""" timeout_ms = self.config.get("screenshot_timeout", 15) * 1000 try: ctx = session.context if not ctx: return None page = ctx.new_page() page.set_viewport_size({"width": 1280, "height": 900}) # ── 导航:等 networkidle 确保 JS 动态内容加载完成 ── page.goto(url, wait_until="networkidle", timeout=timeout_ms) # ── 等待 Discourse 帖子内容渲染 ── try: page.wait_for_selector("#post_1", timeout=min(timeout_ms, 10000)) except Exception: page.wait_for_timeout(3000) # 回退:固定等待 # ── 隐藏非楼主内容,只保留第一篇帖子完整展示 ── page.evaluate("""() => { const hide = (sel) => { const el = document.querySelector(sel); if (el) el.style.display = 'none'; }; hide('.d-header'); // 顶部导航栏 hide('.sidebar-wrapper'); // 左侧边栏 hide('.topic-navigation-wrapper'); // 帖子导航条 hide('.footer-nav.visible'); // 底部导航 hide('.post-stream'); // 隐藏整个帖子流(后面单独显示楼主) // 隐藏所有回复帖子,只保留楼主 const posts = document.querySelectorAll('.topic-post'); posts.forEach((post, i) => { if (i > 0) post.style.display = 'none'; }); // 滚动到顶部 window.scrollTo(0, 0); }""") # ── 展开 Discourse 截断的长帖 ── page.evaluate("""() => { // 移除所有展开按钮和截断遮罩 const removeSelectors = [ '.expand-post', '.gap-bottom', '.gap', '.large-post-container .show-more', '.topic-body .show-more', '.cooked .show-more', '.lightbox', ]; removeSelectors.forEach(sel => { document.querySelectorAll(sel).forEach(el => el.remove()); }); // 移除所有 max-height / overflow 限制 const unclampSelectors = [ '.cooked', '.topic-body', '#post_1 .cooked', '#post_1 .topic-body', '#post_1 .contents', '.large-post-container', ]; unclampSelectors.forEach(sel => { document.querySelectorAll(sel).forEach(el => { el.style.maxHeight = 'none'; el.style.overflow = 'visible'; el.style.height = 'auto'; }); }); // 展开 Discourse 长帖截断(data-* 属性方式) document.querySelectorAll('[data-expanded]').forEach(el => { el.setAttribute('data-expanded', 'true'); }); // 移除 truncated 标记 document.querySelectorAll('.truncated').forEach(el => { el.classList.remove('truncated'); }); }""") # ── 点击可能存在的展开按钮 ── try: expand_buttons = page.query_selector_all( '#post_1 .expand-post, #post_1 .show-more, ' '#post_1 button[class*="expand"], ' '#post_1 a[class*="expand"]' ) for btn in expand_buttons: try: btn.click() page.wait_for_timeout(300) except Exception: pass except Exception: pass # ── 再次展开,防止点击按钮后重新截断 ── page.evaluate("""() => { ['#post_1 .cooked', '#post_1 .topic-body', '#post_1 .contents'].forEach(sel => { document.querySelectorAll(sel).forEach(el => { el.style.maxHeight = 'none'; el.style.overflow = 'visible'; el.style.height = 'auto'; }); }); // 确保图片容器也不截断 document.querySelectorAll('#post_1 .lightbox-wrapper').forEach(el => { el.style.maxHeight = 'none'; el.style.overflow = 'visible'; }); }""") # ── 滚动楼主帖子,触发懒加载图片 ── post1_box = page.evaluate("""() => { const p1 = document.querySelector('#post_1'); if (!p1) return null; const rect = p1.getBoundingClientRect(); return { top: rect.top + window.scrollY, height: rect.height }; }""") if post1_box: post_top = int(post1_box.get('top', 0)) post_height = int(post1_box.get('height', 0)) for y in range(post_top, post_top + post_height, 400): page.evaluate(f"window.scrollTo(0, {y})") page.wait_for_timeout(200) else: # 回退:滚动整个页面 total_height = page.evaluate("document.body.scrollHeight") for y in range(0, total_height, 400): page.evaluate(f"window.scrollTo(0, {y})") page.wait_for_timeout(200) # ── 等待图片加载完成 ── page.evaluate("""() => { return new Promise(resolve => { const imgs = document.querySelectorAll('#post_1 img'); let loaded = 0; const total = imgs.length; if (total === 0) return resolve(); imgs.forEach(img => { if (img.complete) { loaded++; if (loaded >= total) resolve(); } else { img.onload = img.onerror = () => { loaded++; if (loaded >= total) resolve(); }; } }); // 最多等 3 秒 setTimeout(resolve, 3000); }); }""") # ── 滚动回顶部 ── page.evaluate("window.scrollTo(0, 0)") page.wait_for_timeout(500) # ── 截图:全页模式,隐藏导航栏后内容干净 ── full_page = self.config.get("screenshot_full_page", True) page.screenshot( path=str(save_path), full_page=full_page, timeout=timeout_ms, ) sz = save_path.stat().st_size logger.info( f"[LinuxDoPreview] 截图保存: {save_path.name} ({sz / 1024:.1f} KB)" ) page.close() return save_path except Exception as e: logger.warning(f"[LinuxDoPreview] 截图失败: {type(e).__name__}: {e}") return None # ─────────── 文本提取 ─────────── def _extract_content_from_json(self, session, url: str) -> str: """通过 Discourse JSON API 获取完整的楼主帖子内容 Discourse 的 .json 端点返回结构化数据,包含完整的 cooked HTML, 不受页面截断、懒加载或 Cloudflare 渲染问题的影响。 """ try: # 构造 JSON URL:topic-url.json 或 topic-url/1.json json_url = url.rstrip('/') if not json_url.endswith('.json'): # 对于帖子链接如 /t/topic-slug/12345/5,取前两段 parts = json_url.split('/') # 找到 /t/ 后的部分 t_idx = -1 for i, p in enumerate(parts): if p == 't': t_idx = i break if t_idx >= 0 and len(parts) > t_idx + 2: # 重建为 /t/slug/id 格式 json_url = '/'.join(parts[:t_idx + 3]) json_url += '.json' logger.info(f"[LinuxDoPreview] JSON API 请求: {json_url}") resp = session.fetch(json_url) if resp.status != 200: logger.info(f"[LinuxDoPreview] JSON API 返回 {resp.status}") return "" import json data = json.loads(resp.body.decode("utf-8", errors="replace")) # 从 post_stream 中提取第一个帖子(楼主) post_stream = data.get("post_stream", {}) posts = post_stream.get("posts", []) if not posts: return "" first_post = posts[0] cooked_html = first_post.get("cooked", "") if not cooked_html: return "" # 使用 lxml 解析 HTML 并提取纯文本 if _lh is not None: tree = _lh.fromstring(cooked_html) return _clean_text(tree.text_content()) # 回退:正则去标签 text = re.sub(r"<[^>]+>", " ", cooked_html) text = re.sub(r"\s+", " ", text).strip() return html_mod.unescape(text) except Exception as e: logger.info(f"[LinuxDoPreview] JSON API 提取失败: {type(e).__name__}: {e}") return "" # ─────────── 登录支持(Cookie 注入) ─────────── def _has_session_cookie(self) -> bool: """检查是否配置了会话 cookie""" cookie = self.config.get("linuxdo_session_cookie", "") or "" return bool(cookie.strip()) def _has_auto_login(self) -> bool: """检查是否配置了自动登录凭据""" u = self.config.get("linuxdo_username", "") or "" p = self.config.get("linuxdo_password", "") or "" return bool(u.strip() and p.strip()) _COOKIE_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_.-]*$") # linux.do / Discourse / Cloudflare 可能出现的 cookie 名 _KNOWN_COOKIE_NAMES = { "_t", "_forum_session", "cf_clearance", "_bypass_cache", "dosp", "_pf", "_bblean", "theme_ids", "previousVisitAt", "messages-last-modified", "_ga", "_gid", "_gcl_au", } def _parse_cookie_pairs(self, cookie_str: str) -> list[dict]: """将用户配置的 cookie 字符串解析为 (name, value) 列表。 支持三种输入: - 完整 Cookie 头(含分号):'_t=xxx; _forum_session=yyy' - 单个 'name=value'(name 须是已知 cookie 名):'_t=xxx' - 单个裸值(直接当作 _forum_session 的值,向后兼容) 说明:Discourse 的 _forum_session 值是 base64,常带 '=' 填充,因此不能 仅凭是否含 '=' 判断格式,否则会把裸值误判成 name=value。 """ pairs: list[dict] = [] s = (cookie_str or "").strip() if not s: return pairs if ";" in s: # 完整 Cookie 头:按分号拆分 for part in s.split(";"): part = part.strip() if "=" not in part: continue name, value = part.split("=", 1) name = name.strip() if self._COOKIE_NAME_RE.match(name): pairs.append({"name": name, "value": value.strip()}) elif "=" in s: # 无分号但含 '=':仅当前缀是已知 cookie 名时才按 name=value 解析 name, value = s.split("=", 1) name = name.strip() if name in self._KNOWN_COOKIE_NAMES and self._COOKIE_NAME_RE.match(name): pairs.append({"name": name, "value": value.strip()}) if not pairs: # 裸值 → 当作 _forum_session(向后兼容) pairs.append({"name": "_forum_session", "value": s}) return pairs def _inject_session_cookie(self, session, cookie_value: str = "") -> bool: """将会话 cookie 注入到当前浏览器上下文。 注意:StealthySession 每次请求都是新建的浏览器上下文,cookie 不会跨 请求保留,因此【每个会话都必须重新注入】。 Returns: True 表示注入成功,False 表示失败 """ if not cookie_value: cookie_value = (self.config.get("linuxdo_session_cookie", "") or "").strip() if not cookie_value: return False ctx = session.context if not ctx: return False pairs = self._parse_cookie_pairs(cookie_value) if not pairs: return False cookies = [] for p in pairs: # _t / _forum_session 是 HttpOnly;其余 cookie 按普通处理 http_only = p["name"] in ("_t", "_forum_session") cookies.append({ "name": p["name"], "value": p["value"], "domain": "linux.do", "path": "/", "httpOnly": http_only, "secure": True, "sameSite": "Lax", }) try: ctx.add_cookies(cookies) logger.info( f"[LinuxDoPreview] 已注入会话 cookie: {[c['name'] for c in cookies]}" ) return True except Exception as e: logger.warning(f"[LinuxDoPreview] Cookie 注入失败: {type(e).__name__}: {e}") return False def _check_login_state(self, session) -> bool: """检查当前会话是否已登录。 使用 /notifications.json:已登录返回 200,匿名返回 403。 (/session/current_user.json 对匿名用户也返回 404,无法区分,故弃用。) """ try: resp = session.fetch( "https://linux.do/notifications.json", timeout=30000 ) return resp.status == 200 except Exception: return False def _ensure_authenticated(self, session) -> bool: """在已绕过 CF 的上下文中按需认证。 重要:StealthySession 每次请求都会新建,浏览器上下文不跨请求保留,因此 配置的 Cookie 必须【每次都注入】当前会话;而【是否登录】的校验结果可以 缓存(Cookie 有效性不会在请求间变化)。 逻辑: 1) 配置了 linuxdo_session_cookie → 每次注入;首次校验后缓存结果 2) 仅配置了用户名/密码 → linux.do 登录受 hCaptcha 保护,无法自动登录, 仅提示一次并降级为匿名访问 3) 都没配置 → 匿名访问 """ # ── 手动 Cookie:每次请求都注入(上下文是新建的) ── if self._has_session_cookie(): cookie_value = (self.config.get("linuxdo_session_cookie", "") or "").strip() if not self._inject_session_cookie(session, cookie_value): self._auth_check_done = True self._logged_in = False return False # 校验结果只算一次(Cookie 有效性跨请求稳定) if not self._auth_check_done: self._logged_in = self._check_login_state(session) self._auth_check_done = True if self._logged_in: logger.info("[LinuxDoPreview] Cookie 登录验证成功") else: logger.warning( "[LinuxDoPreview] 会话 Cookie 无效或已过期,将匿名访问。" "请在浏览器重新获取 Cookie(推荐 _t,长效)后填入配置。" ) return self._logged_in # ── 仅用户名/密码:受 hCaptcha 限制,无法自动登录(仅提示一次) ── if self._has_auto_login() and not self._auth_check_done: self._auth_check_done = True logger.warning( "[LinuxDoPreview] linux.do 登录启用了 hCaptcha 人机验证,账号密码" "自动登录不可用。请在浏览器登录 linux.do 后,F12 → Application → " "Cookies → 复制 _t(推荐,长效)或 _forum_session 的值,填入 " "linuxdo_session_cookie 配置项。本次降级为匿名访问。" ) # 都没配置 / 自动登录不可用 → 匿名 self._logged_in = False return False def _fetch_topic_data(self, session, url: str) -> dict | None: """通过 Discourse JSON API 获取完整的主题数据 返回的 dict 包含帖子原始数据(cooked HTML、作者、标签、统计等), 可同时供文本提取和自定义 HTML 渲染使用。 """ try: json_url = url.rstrip('/') if not json_url.endswith('.json'): parts = json_url.split('/') t_idx = -1 for i, p in enumerate(parts): if p == 't': t_idx = i break if t_idx >= 0 and len(parts) > t_idx + 2: json_url = '/'.join(parts[:t_idx + 3]) json_url += '.json' logger.info(f"[LinuxDoPreview] 拉取 topic JSON: {json_url}") resp = session.fetch(json_url) if resp.status != 200: logger.info(f"[LinuxDoPreview] topic JSON 返回 {resp.status}") return None import json return json.loads(resp.body.decode("utf-8", errors="replace")) except Exception as e: logger.info(f"[LinuxDoPreview] topic JSON 拉取失败: {type(e).__name__}: {e}") return None @staticmethod def _extract_title(html_str: str) -> str: m = re.search(r"
]*>\s*]*class="[^"]*\bpre-actions\b[^"]*"[^>]*>.*?', '', cooked_html, flags=_re.DOTALL, ) # 6) 删除 download 按钮、悬浮提示等装饰 cooked_html = _re.sub( r']*class="[^"]*\bdownload[^"]*"[^>]*>.*?', '', cooked_html, flags=_re.DOTALL, ) except Exception: pass return cooked_html def _render_html_screenshot(self, session, html: str, save_path: Path) -> Path | None: """在已破解 CF 的浏览器上下文中渲染自定义 HTML 并截图 page.set_content() 不走网络导航,纯本地渲染:零 Cloudflare、零超时、 零依赖 Discourse 页面布局。content-length 限制为实际内容大小。 """ timeout_ms = self.config.get("screenshot_timeout", 15) * 1000 if not html: return None try: ctx = session.context if not ctx: return None page = ctx.new_page() page.set_viewport_size({"width": 820, "height": 1200}) # 设置内容,等待图片资源加载 page.set_content(html, wait_until="domcontentloaded", timeout=timeout_ms) # 主动等所有加载完成(最多 3s),并剔除加载失败的图 page.evaluate("""() => new Promise(resolve => { const imgs = document.querySelectorAll('img'); if (!imgs.length) return resolve(); let done = 0; const tick = (img) => { done++; // 图加载失败:移除
避免占位巨大空白 if (img.complete && img.naturalWidth === 0) { img.remove(); } if (done >= imgs.length) resolve(); }; imgs.forEach(img => { if (img.complete) tick(img); else { img.addEventListener('load', () => tick(img), { once: true }); img.addEventListener('error', () => tick(img), { once: true }); } }); setTimeout(resolve, 3000); })""") page.wait_for_timeout(300) # ── 自适应截图:总是优先对 .card 元素截图,按内容实际边界拍 ── # 元素截图零空白、零截断,不受 viewport 高度限制。 # `screenshot_full_page` 仅作为后备回退:元素截图失败时才使用。 card_locator = page.locator(".card") full_page = self.config.get("screenshot_full_page", True) try: if card_locator.count() > 0: card_locator.first.screenshot( path=str(save_path), timeout=timeout_ms, ) else: page.screenshot( path=str(save_path), full_page=full_page, timeout=timeout_ms, ) except Exception: # 回退:若元素截图失败(少见),退到全页截图 page.screenshot( path=str(save_path), full_page=full_page, timeout=timeout_ms, ) sz = save_path.stat().st_size logger.info( f"[LinuxDoPreview] 渲染截图: {save_path.name} ({sz / 1024:.1f} KB)" ) page.close() return save_path except Exception as e: logger.warning(f"[LinuxDoPreview] HTML 渲染失败: {type(e).__name__}: {e}") return None # ─────────── 管理指令 ─────────── @filter.command("linuxdo_stats") async def show_stats(self, event: AstrMessageEvent): screenshots = list(self.screenshot_dir.glob("*.png")) cache_size = sum(f.stat().st_size for f in screenshots) / 1024 yield event.plain_result( f"📊 LinuxDo Preview 统计\n" f" 请求总数: {self._stats['total']}\n" f" 缓存命中: {self._stats['cache_hit']}\n" f" 错误次数: {self._stats['error']}\n" f" 缓存截图: {len(screenshots)} ({cache_size:.1f} KB)" ) @filter.command("linuxdo_clean") async def clean_cache(self, event: AstrMessageEvent): count = 0 for f in self.screenshot_dir.glob("*.png"): f.unlink() count += 1 yield event.plain_result(f"🧹 已清理 {count} 个缓存截图") def _clean_text(text: str) -> str: text = re.sub(r"<[^>]+>", " ", text) text = re.sub(r"\s+", " ", text) text = html_mod.unescape(text) return text.strip()