这些设置分散在项目的不同配置文件中,主要是 scheduler.yaml, interceptor.py, runtime.yaml 以及自定义的 pipeline 和 middleware。

以下是 OpenClaw 进阶设置的核心模块和配置指南:
任务调度高级配置 (scheduler.yaml)
这是进阶的核心,用于控制抓取任务的节奏、并发和去重。
worker:
max_running: 10 # 全局最大并发任务数,根据机器和网站承受能力调整
trigger_interval: 1 # 触发新任务检查的间隔(秒),频繁任务可调小
# **重点:任务优先级与队列控制**
priority_strategy: "fifo" # 队列策略:fifo(默认), lifo, priority
# 如果使用 priority,需要在任务中指定 priority 字段(值越小优先级越高)
# **重点:智能速率限制(反爬关键)**
request:
delay:
fixed_delay: 2 # 固定延迟(秒),每个请求后等待时间,保守策略。
# 或使用随机延迟,更模拟人类行为
random_delay:
min: 1
max: 5
# **域名并发控制**:防止对单一站点攻击性抓取
concurrency_per_domain: 2 # 同一域名下同时运行的最大任务数
queue_capacity_per_domain: 100 # 同一域名等待队列的最大任务数
# **重点:持久化与断点续爬**
persistence:
enabled: true # 启用任务状态持久化
type: "json" # 存储类型:json, sqlite
file_path: "./data/task_status.json" # 状态文件路径
# 服务重启后,会根据此文件恢复未完成的任务
# **布隆过滤器(高效内存去重)**
deduplication:
enabled: true
type: "bloom_filter" # 使用布隆过滤器,适用于海量URL去重
filter_capacity: 1000000 # 期望处理的URL总数
error_rate: 0.001 # 可接受的误判率
# 重启后会丢失,适合一次性抓取,如需持久化,需使用 `redis` 类型。
运行时动态配置 (runtime.yaml)
用于在爬虫运行过程中动态调整参数,无需修改代码。
# runtime.yaml
variables:
# 1. 动态分页控制
max_page: 50 # 爬虫运行时可修改的最大翻页数
# 在 interceptor 中可通过 `context.runtime_variables.get('max_page')` 获取
# 2. 代理配置轮换
proxy_enabled: true
proxy_list:
- "http://proxy1.example.com:8080"
- "http://proxy2.example.com:8080"
current_proxy_index: 0 # 可在失败时通过拦截器动态切换
# 3. 请求头轮换池(反爬)
user_agents:
- "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 ..."
- "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 ..."
# 4. 可重试的状态码列表
retry_status_codes: [500, 502, 503, 504, 429, 408]
拦截器高级应用 (interceptor.py)
拦截器是实现复杂逻辑和反爬策略的“大脑”。
# interceptor.py 进阶示例
from openclaw.interceptor.base import BaseInterceptor
class AdvancedInterceptor(BaseInterceptor):
async def before_request(self, context):
"""请求前:动态配置请求参数"""
# 1. 动态切换User-Agent
import random
ua_list = context.runtime_variables.get('user_agents', [])
if ua_list:
context.request.headers['User-Agent'] = random.choice(ua_list)
# 2. 智能代理切换(当遇到429/403时)
if hasattr(context, 'last_response_status') and context.last_response_status in [429, 403]:
self._rotate_proxy(context)
# 3. 为特定站点添加Cookies或签名(应对反爬)
if "example.com" in context.request.url:
context.request.cookies.update({'session_id': 'dynamic_value'})
# 或计算动态参数
import time, hashlib
timestamp = int(time.time())
context.request.params['_t'] = timestamp
context.request.params['_sign'] = self._generate_sign(timestamp)
async def after_response(self, context):
"""响应后:复杂处理与状态判断"""
# 1. 保存上一次响应状态,供before_request使用
context.last_response_status = context.response.status
# 2. 检查响应内容是否被封(如出现“验证码”、“访问过于频繁”等关键词)
if context.response and context.response.text:
if "验证码" in context.response.text or "access denied" in context.response.text.lower():
context.logger.warning("触发反爬机制,尝试减缓速度或更换代理")
# 可以主动抛出特定异常,触发重试或任务暂停
# raise RequestBlockedError("页面被拦截")
# 3. 动态更新任务优先级(发现重要链接提升其优先级)
if "detail" in context.request.url:
context.task.priority = 1 # 高优先级
def _rotate_proxy(self, context):
"""轮换代理IP"""
proxy_list = context.runtime_variables.get('proxy_list', [])
if proxy_list:
current = context.runtime_variables.get('current_proxy_index', 0)
next_index = (current + 1) % len(proxy_list)
context.runtime_variables['current_proxy_index'] = next_index
context.request.proxy = proxy_list[next_index]
context.logger.info(f"切换代理至: {proxy_list[next_index]}")
def _generate_sign(self, timestamp):
"""生成请求签名(示例)"""
secret = "your_secret_key"
string_to_sign = f"{timestamp}{secret}"
return hashlib.md5(string_to_sign.encode()).hexdigest()
数据处理管道 (pipeline.py)
用于对抓取到的 Item 进行复杂的后处理。
# pipeline.py 进阶示例
class AdvancedPipeline:
async def process_item(self, item, spider_name):
"""处理抓取到的数据项"""
# 1. 数据清洗
item = self._clean_data(item)
# 2. 数据验证
if not self._validate_item(item):
raise DropItem(f"数据验证失败: {item}")
# 3. 数据增强(如根据标题获取情感分类)
item['sentiment'] = self._analyze_sentiment(item.get('title', ''))
# 4. 关联外部数据或数据库查询
item['category_id'] = await self._query_category_from_db(item['category_name'])
# 5. 分派到不同的存储(根据类型)
if item['type'] == 'news':
await self._save_to_news_db(item)
elif item['type'] == 'product':
await self._save_to_es(item) # 存到Elasticsearch
return item
def _clean_data(self, item):
"""高级清洗:去除HTML标签、空格、规范化日期等"""
import re
for key, value in item.items():
if isinstance(value, str):
# 去除HTML标签
value = re.sub(r'<[^>]+>', '', value)
# 规范化空白字符
value = ' '.join(value.split())
item[key] = value.strip()
return item
扩展中间件 (Middleware)
用于在请求-响应生命周期中注入更底层的逻辑。
# 示例:自定义重试中间件,针对特定异常增加延迟
from openclaw.middleware.retry import RetryMiddleware
import asyncio
class CustomRetryMiddleware(RetryMiddleware):
async def process_exception(self, request, exception, spider):
# 如果是连接超时,等待更长时间再重试
if "Timeout" in str(exception):
await asyncio.sleep(10) # 等待10秒
return await self._retry(request, exception, spider)
# 其他异常使用默认行为
return await super().process_exception(request, exception, spider)
性能与稳定性调优建议
-
数据库优化:
- 为
task_id,url等字段建立索引。 - 定期清理已完成的任务记录(
scheduler表)。
- 为
-
内存与队列监控:
- 在
interceptor中记录队列长度,当待处理任务过多时,暂停新增种子任务。if context.scheduler.get_waiting_count() > 10000: context.logger.error("任务队列堆积,暂停接收新任务")
- 在
-
分布式扩展(高级):
- 修改
scheduler.persistence.type为redis,可以让多个 OpenClaw 实例共享任务队列和去重集合,实现分布式抓取。
- 修改
-
日志与监控:
- 配置详细的日志级别 (
logging.yaml) 来跟踪问题。 - 将关键指标(如请求速度、成功率)发送到监控系统(如 Prometheus)。
- 配置详细的日志级别 (
进阶工作流
- 配置
scheduler.yaml:设定符合目标网站的速率限制、并发策略和持久化。 - 编写强大的
interceptor.py:实现动态请求头、代理轮换、反爬检测和绕过逻辑。 - 利用
runtime.yaml:将可能变化的参数外置,实现动态调参。 - 完善
pipeline.py:确保数据被正确清洗、验证和存储。 - 根据需要编写
middleware:处理底层网络异常或特殊重试逻辑。 - 启动前,使用
openclaw check命令验证所有配置文件的正确性。
请注意:修改任何配置文件后,需要重启 OpenClaw 服务 (openclaw run) 才能使更改生效,对于 runtime.yaml 中的部分变量,如果拦截器支持热加载,则可能无需重启。
版权声明:除非特别标注,否则均为本站原创文章,转载时请以链接形式注明文章出处。