fix: resolve black screenshot and text extraction issues

- Root cause: StealthySession.fetch() recycles page to about:blank
  after returning, so screenshot captured blank page.
- Fix: Two-step approach - first fetch() to solve Cloudflare and
  get cookies, then new_page() in same context (with cf_clearance)
  to navigate and screenshot.
- Text extraction: switched from Scrapling CSS selectors to lxml
  .cooked extraction, with regex fallback. Cleans HTML tags and
  normalizes whitespace.
This commit is contained in:
RainySY
2026-06-15 17:23:41 +08:00
parent 1ea2414c32
commit 1bded8efda

227
main.py
View File

@@ -1,8 +1,9 @@
"""
astrbot_plugin_linuxdo - LinuxDo 链接检测 & 预览截图插件
检测聊天消息中的 linux.do 链接,使用 Scrapling 的 StealthyFetcher
绕过 Cloudflare Turnstile自动截图并提取内容摘要发送预览。
检测聊天消息中的 linux.do 链接,使用 Scrapling 的 StealthySession
绕过 Cloudflare Turnstile分两步:先 fetch 拿文本 + cookies
再新建标签页截图(复用 cf_clearance不重复触发验证
"""
import re
@@ -12,6 +13,7 @@ import time
import hashlib
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor
import html as html_mod
from astrbot.core.utils.astrbot_path import get_astrbot_data_path
@@ -23,13 +25,14 @@ import astrbot.api.message_components as Comp
try:
from scrapling.fetchers import StealthyFetcher, StealthySession as _StealthySession
from lxml import html as _lh
SCRAPLING_AVAILABLE = True
except ImportError:
SCRAPLING_AVAILABLE = False
StealthyFetcher = None # type: ignore[assignment]
_StealthySession = None # type: ignore[assignment]
StealthyFetcher = None
_StealthySession = None
_lh = None
# 全局线程池,避免阻塞 AstrBot 事件循环
_EXECUTOR = ThreadPoolExecutor(max_workers=2, thread_name_prefix="linuxdo")
@@ -40,25 +43,20 @@ class LinuxDoPreviewPlugin(Star):
super().__init__(context)
self.config = config
# 插件数据目录 data/plugin_data/astrbot_plugin_linuxdo/
self.data_dir = Path(get_astrbot_data_path()) / "plugin_data" / "astrbot_plugin_linuxdo"
self.screenshot_dir = self.data_dir / "screenshots"
self.screenshot_dir.mkdir(parents=True, exist_ok=True)
logger.info(
f"[LinuxDoPreview] 插件已加载,截图目录: {self.screenshot_dir}"
)
logger.info(f"[LinuxDoPreview] 插件已加载,截图目录: {self.screenshot_dir}")
if not SCRAPLING_AVAILABLE:
logger.warning(
"[LinuxDoPreview] scrapling 未安装!请执行: "
"pip install scrapling[fetchers] && scrapling install"
"[LinuxDoPreview] scrapling 未安装!"
"执行: pip install scrapling[fetchers] && scrapling install && playwright install-deps chromium"
)
# 缓存统计
self._stats = {"total": 0, "cache_hit": 0, "error": 0}
async def terminate(self):
"""插件卸载时清理"""
_EXECUTOR.shutdown(wait=False)
logger.info("[LinuxDoPreview] 插件已卸载")
@@ -66,12 +64,10 @@ class LinuxDoPreviewPlugin(Star):
@filter.event_message_type(filter.EventMessageType.ALL)
async def on_message(self, event: AstrMessageEvent):
"""检测消息中的 linux.do 链接并触发预览"""
text = event.message_str
if not text:
return
# 提取所有 linux.do 链接
urls = re.finditer(
r"https?://(?:[a-z0-9.\-]+\.)*linux\.do/[^\s\"')>}]+",
text,
@@ -84,23 +80,20 @@ class LinuxDoPreviewPlugin(Star):
target_url = matched_urls[0]
logger.info(f"[LinuxDoPreview] 检测到链接: {target_url}")
# 检查是否在忽略列表中(如首页等不需要预览的链接)
if self._should_skip(target_url):
return
# 发送等待提示
yield event.plain_result(f"🔍 正在获取 linux.do 预览,请稍候…")
yield event.plain_result("🔍 正在读取 linux.do 页面…")
try:
screenshot_path, summary = await asyncio.get_event_loop().run_in_executor(
_EXECUTOR,
self._fetch_preview,
target_url,
)
screenshot_path, summary = await asyncio \
.get_event_loop() \
.run_in_executor(_EXECUTOR, self._fetch_preview, target_url)
if screenshot_path and screenshot_path.exists():
yield event.image_result(str(screenshot_path.absolute()))
if summary:
yield event.plain_result(summary)
self._stats["total"] += 1
@@ -110,32 +103,24 @@ class LinuxDoPreviewPlugin(Star):
logger.error(f"[LinuxDoPreview] 预览失败: {type(e).__name__}: {e}")
yield event.plain_result(f"❌ 预览获取失败: {str(e)[:200]}")
# ─────────── 核心逻辑 ───────────
# ─────────── 预处理 ───────────
def _should_skip(self, url: str) -> bool:
"""跳过不需要预览的链接"""
skip_patterns = [
r"linux\.do/?$",
r"linux\.do/latest",
r"linux\.do/categories",
r"linux\.do/tag/",
r"linux\.do/u/",
r"linux\.do/my/",
@staticmethod
def _should_skip(url: str) -> bool:
skip = [
r"linux\.do/?$", r"linux\.do/latest", r"linux\.do/categories",
r"linux\.do/tag/", r"linux\.do/u/", r"linux\.do/my/",
]
for pat in skip_patterns:
if re.search(pat, url, re.IGNORECASE):
return True
return False
return any(re.search(p, url, re.IGNORECASE) for p in skip)
# ─────────── 核心:两步法 ───────────
def _fetch_preview(self, url: str):
"""同步获取页面截图和摘要(在子线程中执行)"""
if not SCRAPLING_AVAILABLE:
raise RuntimeError(
"Scrapling 未安装,请执行: pip install scrapling[fetchers] && scrapling install"
)
raise RuntimeError("Scrapling 未安装")
url_hash = hashlib.md5(url.encode()).hexdigest()
screenshot_path: Path | None = self.screenshot_dir / f"{url_hash}.png"
screenshot_path = self.screenshot_dir / f"{url_hash}.png"
cache_ttl = self.config.get("cache_ttl", 1800)
cache_valid = (
screenshot_path.exists()
@@ -145,78 +130,115 @@ class LinuxDoPreviewPlugin(Star):
StealthyFetcher.adaptive = True # type: ignore[union-attr]
with _StealthySession(headless=True, solve_cloudflare=True) as session: # type: ignore[union-attr]
page = session.fetch(url)
# 提取标题
title = page.css("title::text").get()
if not title:
title = page.css("h1::text, .fancy-title::text").get()
title = (title or "无标题").strip()
with _StealthySession( # type: ignore[union-attr]
headless=True, solve_cloudflare=True
) as session:
# ── Step 1: fetch 触发 Cloudflare 解决,拿 HTML ──
resp = session.fetch(url)
html_str = resp.body.decode("utf-8", errors="replace")
title = self._extract_title(html_str)
content = self._extract_content(html_str)
logger.info(f"[LinuxDoPreview] 标题: {title}")
# 截图(如果缓存已过期或无缓存)
# ── Step 2: 新建标签页(复用 cf_clearance截图 ──
if not cache_valid:
try:
ctx = session.context
if ctx and ctx.pages:
ctx.pages[0].screenshot(
path=str(screenshot_path),
full_page=True,
timeout=self.config.get("screenshot_timeout", 15) * 1000,
screenshot_path = self._take_screenshot(
session, url, screenshot_path
)
logger.info(
f"[LinuxDoPreview] 截图保存: {screenshot_path.name}"
)
except Exception as e:
logger.warning(f"[LinuxDoPreview] 截图失败: {e}")
screenshot_path = None
else:
self._stats["cache_hit"] += 1
logger.info(f"[LinuxDoPreview] 使用缓存截图: {screenshot_path.name}")
# 提取内容摘要
content = self._extract_page_text(page)
# 组装回复文本
summary = self._build_summary(title, content, url)
return screenshot_path, summary
def _extract_page_text(self, page) -> str:
"""从页面中提取可读的文字摘要"""
# 尝试常见的 Discourse 内容选择器
selectors = [
".post .cooked p::text",
".topic-body .cooked p::text",
".regular.contents .cooked p::text",
"article .post-content p::text",
".topic-post .post-content p::text",
'[itemprop="articleBody"] p::text',
"p::text",
"body::text",
]
# ─────────── 截图(复用 StealthySession 的浏览器上下文) ───────────
collected = []
for sel in selectors:
parts = page.css(sel).getall()
if parts:
for p in parts:
t = p.strip()
if t and len(t) > 10:
collected.append(t)
if len(collected) >= 5:
@staticmethod
@staticmethod
def _take_screenshot(session, url: str, save_path: Path) -> Path | None:
"""在已有 cf_clearance 的上下文中新建标签页截图"""
try:
ctx = session.context
if not ctx:
return None
page = ctx.new_page()
page.set_viewport_size({"width": 1280, "height": 1024})
# 导航(已有 cf_clearance cookie不应再触发 Cloudflare
page.goto(url, wait_until="load", timeout=30000)
page.wait_for_timeout(3000)
page.screenshot(
path=str(save_path),
full_page=True,
timeout=20000,
)
sz = save_path.stat().st_size
logger.info(
f"[LinuxDoPreview] 截图保存: {save_path.name} ({sz / 1024:.1f} KB)"
)
page.close()
return save_path
except Exception as e:
logger.warning(f"[LinuxDoPreview] 截图失败: {type(e).__name__}: {e}")
return None
# ─────────── 文本提取 ───────────
@staticmethod
def _extract_title(html_str: str) -> str:
m = re.search(r"<title>(.*?)</title>", html_str, re.DOTALL | re.IGNORECASE)
if m:
t = m.group(1).strip()
t = re.sub(
r"\s*[-–—|]\s*(LINUX\s*DO|LINUXDO).*$", "", t, flags=re.IGNORECASE
)
return t.strip()
return "无标题"
def _extract_content(self, html_str: str) -> str:
try:
return self._extract_via_lxml(html_str)
except Exception:
pass
try:
return self._extract_via_regex(html_str)
except Exception:
pass
return ""
def _extract_via_lxml(self, html_str: str) -> str:
tree = _lh.fromstring(html_str)
parts = []
for el in tree.cssselect(".cooked"):
text = _clean_text(el.text_content())
if len(text) > 15:
parts.append(text)
if len(parts) >= 3:
break
return "\n\n".join(parts)
max_len = self.config.get("max_content_length", 400)
text = " ".join(collected)[:max_len + 200]
return text.strip()[:max_len]
def _extract_via_regex(self, html_str: str) -> str:
parts = []
for m in re.finditer(
r'<div\s+class="cooked">(.*?)</div>\s*</article>', html_str, re.DOTALL
):
text = re.sub(r"<[^>]+>", " ", m.group(1))
text = re.sub(r"\s+", " ", text).strip()
if len(text) > 15:
parts.append(text)
if len(parts) >= 3:
break
return "\n\n".join(parts)
def _build_summary(self, title: str, content: str, url: str) -> str:
"""组装预览文本"""
@staticmethod
def _build_summary(title: str, content: str, url: str) -> str:
lines = [f"📌 {title}"]
if content:
lines.append("")
max_len = self.config.get("max_content_length", 400)
max_len = 400
lines.append(content[:max_len])
if len(content) > max_len:
lines[-1] += ""
@@ -224,11 +246,10 @@ class LinuxDoPreviewPlugin(Star):
lines.append(f"🔗 {url}")
return "\n".join(lines)
# ─────────── 调试指令 ───────────
# ─────────── 管理指令 ───────────
@filter.command("linuxdo_stats")
async def show_stats(self, event: AstrMessageEvent):
"""查看插件统计信息"""
screenshots = list(self.screenshot_dir.glob("*.png"))
cache_size = sum(f.stat().st_size for f in screenshots) / 1024
yield event.plain_result(
@@ -241,9 +262,15 @@ class LinuxDoPreviewPlugin(Star):
@filter.command("linuxdo_clean")
async def clean_cache(self, event: AstrMessageEvent):
"""清理截图缓存"""
count = 0
for f in self.screenshot_dir.glob("*.png"):
f.unlink()
count += 1
yield event.plain_result(f"🧹 已清理 {count} 个缓存截图")
def _clean_text(text: str) -> str:
text = re.sub(r"<[^>]+>", " ", text)
text = re.sub(r"\s+", " ", text)
text = html_mod.unescape(text)
return text.strip()