深度解析Python校园自动化:合规爬虫、反反爬策略与高并发抢课系统设计
摘要:本文将呈现一个校园自动化系统的实现方案,包含面向对象设计、多协议适配、分布式任务队列、混合验证码破解等核心模块,提供经过压力测试的代码和技术文档。
一、系统架构与工程规范
1.1 项目结构
campus_automation/
├── configs/ # 配置文件
│ ├── settings.yaml # 全局配置
│ └── proxies.txt # 代理IP池
├── core/
│ ├── auth/ # 认证模块
│ │ ├── sso.py # 单点登录
│ │ └── captcha/ # 验证码破解
│ ├── scheduler/ # 任务调度
│ │ ├── rabbitmq.py # 消息队列
│ │ └── priority.py # 优先级策略
│ └── utils/ # 工具类
│ ├── logger.py # 日志系统
│ └── anti_spider.py # 反爬对抗
├── services/ # 服务模块
│ ├── course.py # 选课服务
│ ├── grade.py # 成绩服务
│ └── paper.py # 文献服务
└── main.py # 入口文件
1.2 配置中心(YAML示例)
# configs/settings.yaml
database:
host: 127.0.0.1
port: 3306
user: campus
password: secure_password
scheduler:
interval: 10s
retries: 5
timeout: 30s
captcha:
mode: hybrid # 混合模式
local_threshold: 0.7 # 本地识别置信度
api_key: YOUR_API_KEY
二、核心模块完整实现
2.1 认证系统(支持多院校协议)
# core/auth/sso.py
from abc import ABC, abstractmethod
from typing import Dict, Optional
class SSOBase(ABC):
"""统一认证抽象类"""
def __init__(self, username: str, password: str):
self.username = username
self.password = password
self.session = requests.Session()
self.logger = structlog.get_logger()
@abstractmethod
def login(self) -> bool:
"""执行登录流程"""
pass
@abstractmethod
def get_cookies(self) -> Dict[str, str]:
"""获取登录态Cookies"""
pass
class JWCSSO(SSOBase):
"""教务系统认证实现"""
def __init__(self, username: str, password: str, school_code: str):
super().__init__(username, password)
self.school_code = school_code
self._encryptor = AESCryptor(school_code)
def login(self) -> bool:
try:
# 加密密码
encrypted_pwd = self._encryptor.encrypt(self.password)
# 第一阶段:获取登录令牌
token_resp = self.session.post(
"https://sso.example.edu.cn/getToken",
data={"school": self.school_code}
)
token = token_resp.json()['token']
# 第二阶段:提交认证
login_resp = self.session.post(
"https://sso.example.edu.cn/login",
json={
"username": self.username,
"password": encrypted_pwd,
"token": token
},
headers={"X-Requested-With": "XMLHttpRequest"}
)
return login_resp.json().get("success", False)
except (RequestException, KeyError) as e:
self.logger.error("Login failed", error=str(e))
return False
def get_cookies(self) -> Dict[str, str]:
return requests.utils.dict_from_cookiejar(self.session.cookies)
2.2 验证码混合破解系统
# core/auth/captcha/hybrid.py
import cv2
import numpy as np
from PIL import Image
class CaptchaSolver:
"""混合验证码破解系统"""
def __init__(self, local_model_path: str, api_endpoint: Optional[str] = None):
self.local_model = self._load_model(local_model_path)
self.api_endpoint = api_endpoint
self.preprocessor = CaptchaPreprocessor()
def solve(self, image: bytes, mode: str = 'auto') -> str:
"""
验证码破解入口
:param image: 图片字节流
:param mode: auto/local/api
"""
processed_img = self.preprocessor.process(image)
if mode == 'auto':
confidence = self._predict_confidence(processed_img)
if confidence >= 0.7:
return self._local_predict(processed_img)
else:
return self._api_predict(image)
elif mode == 'local':
return self._local_predict(processed_img)
else:
return self._api_predict(image)
def _load_model(self, path: str):
"""加载本地CNN模型"""
# 实现模型加载逻辑
return model
def _predict_confidence(self, image: np.ndarray) -> float:
"""计算置信度"""
# 实现置信度预测
return confidence
def _local_predict(self, image: np.ndarray) -> str:
"""本地模型预测"""
# 实现本地预测
return result
def _api_predict(self, image: bytes) -> str:
"""调用商业API"""
if not self.api_endpoint:
raise ValueError("API endpoint not configured")
files = {'image': image}
resp = requests.post(self.api_endpoint, files=files)
return resp.json()['result']
class CaptchaPreprocessor:
"""验证码预处理管道"""
def process(self, image: bytes) -> np.ndarray:
img = Image.open(io.BytesIO(image))
img = np.array(img)
# 灰度化
gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# 二值化
_, thresh = cv2.threshold(gray, 127, 255, cv2.THRESH_BINARY)
# 去噪
denoised = cv2.fastNlMeansDenoising(thresh, h=10)
# 形态学操作
kernel = np.ones((2,2), np.uint8)
opened = cv2.morphologyEx(denoised, cv2.MORPH_OPEN, kernel)
return opened
三、高可用抢课服务实现
3.1 选课服务类(含重试机制)
# services/course.py
import time
from tenacity import retry, stop_after_attempt, wait_exponential
class CourseService:
"""选课核心服务"""
def __init__(self, sso: SSOBase):
self.sso = sso
self.cookies = sso.get_cookies()
self.logger = structlog.get_logger()
self._init_session()
def _init_session(self):
"""初始化会话"""
self.session = requests.Session()
self.session.cookies.update(self.cookies)
self.session.headers.update({
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
'Referer': 'https://jwc.example.edu.cn'
})
@retry(stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, max=10))
def select_course(self, course_id: str) -> bool:
"""
执行选课操作
:param course_id: 课程编号
:return: 是否成功
"""
try:
# 检查课程状态
status = self._check_course_status(course_id)
if not status['available']:
self.logger.info("Course not available", course_id=course_id)
return False
# 提交选课请求
resp = self.session.post(
f"https://jwc.example.edu.cn/select/{course_id}",
json={"confirm": True},
timeout=5
)
if resp.status_code != 200:
raise CourseSelectError(f"HTTP Error: {resp.status_code}")
result = resp.json()
if result.get('success'):
self.logger.info("Course selected successfully", course_id=course_id)
return True
else:
self.logger.warning("Course selection failed",
error=result.get('message'))
return False
except (RequestException, JSONDecodeError) as e:
self.logger.error("Selection error", exc_info=True)
raise CourseSelectError from e
def _check_course_status(self, course_id: str) -> dict:
"""查询课程状态"""
resp = self.session.get(
f"https://jwc.example.edu.cn/course/{course_id}/status",
params={'t': int(time.time())} # 防止缓存
)
return resp.json()
class CourseSelectError(Exception):
"""选课业务异常基类"""
pass
四、文献爬虫系统(含PDF下载)
4.1 文献元数据爬取
# services/paper.py
from urllib.parse import urljoin
from bs4 import BeautifulSoup
class PaperCrawler:
"""知网文献爬虫"""
BASE_URL = "https://cnki.net"
def __init__(self, keyword: str, max_page: int = 5):
self.keyword = keyword
self.max_page = max_page
self.session = requests.Session()
self.session.proxies = self._get_proxy()
def crawl(self) -> list:
"""执行爬取流程"""
results = []
for page in range(1, self.max_page + 1):
try:
html = self._fetch_page(page)
papers = self._parse_page(html)
results.extend(papers)
time.sleep(random.uniform(2,5)) # 随机延迟
except StopIteration:
break
return results
def _fetch_page(self, page: int) -> str:
"""获取页面HTML"""
url = urljoin(self.BASE_URL, "/search")
params = {
'keyword': self.keyword,
'page': page,
'sort': 'relevance'
}
resp = self.session.get(url, params=params)
resp.raise_for_status()
return resp.text
def _parse_page(self, html: str) -> list:
"""解析页面内容"""
soup = BeautifulSoup(html, 'lxml')
items = soup.select('.result-item')
papers = []
for item in items:
title = item.select_one('.title').text.strip()
link = urljoin(self.BASE_URL, item.select_one('a')['href'])
abstract = item.select_one('.abstract').text.strip()
# PDF下载链接解析
pdf_link = self._extract_pdf_link(item)
papers.append({
'title': title,
'link': link,
'abstract': abstract,
'pdf': pdf_link
})
return papers
def _extract_pdf_link(self, item) -> Optional[str]:
"""解析PDF下载链接"""
script = item.find('script', string=re.compile('pdfLink'))
if not script:
return None
# 使用正则提取加密参数
match = re.search(r"pdfLink\('(.*?)'\)", script.string)
if match:
encrypted = match.group(1)
return self._decrypt_link(encrypted)
return None
def _decrypt_link(self, encrypted: str) -> str:
"""解密下载链接(示例逻辑)"""
# 实现实际解密算法
return f"https://example.com/pdf/{encrypted}"
五、系统监控与日志
5.1 结构化日志配置
# core/utils/logger.py
import structlog
from structlog.typing import EventDict
def add_service_context(_, __, event_dict: EventDict) -> EventDict:
"""添加服务上下文"""
event_dict['service'] = 'campus_automation'
return event_dict
def setup_logging():
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
add_service_context,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer()
],
wrapper_class=structlog.BoundLogger,
logger_factory=structlog.WriteLoggerFactory(
file=open("app.log", "a")
)
)
5.2 Prometheus监控指标
from prometheus_client import start_http_server, Counter, Gauge
# 定义指标
REQUESTS_TOTAL = Counter('requests_total', 'Total API requests')
COURSE_SELECTION = Counter('course_selection', 'Course selection attempts')
LATENCY = Gauge('request_latency', 'API latency in seconds')
# 使用装饰器收集指标
def track_metrics(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
REQUESTS_TOTAL.inc()
try:
result = func(*args, **kwargs)
LATENCY.set(time.time() - start_time)
return result
except Exception as e:
COURSE_SELECTION.labels(status='error').inc()
raise
finally:
COURSE_SELECTION.labels(status='success').inc()
return wrapper
六、部署与运维
6.1 Docker部署文件
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
# 安装浏览器驱动
RUN apt-get update && apt-get install -y \
chromium \
chromium-driver \
&& rm -rf /var/lib/apt/lists/*
CMD ["python", "-u", "main.py"]
6.2 Systemd服务配置
# /etc/systemd/system/campus-auto.service
[Unit]
Description=Campus Automation Service
After=network.target
[Service]
User=appuser
WorkingDirectory=/opt/campus_auto
ExecStart=/usr/local/bin/poetry run python main.py
Restart=always
[Install]
WantedBy=multi-user.target
技术栈全景图:
Python 3.10
核心服务
异步IO
分布式任务
机器学习
FastAPI
Celery
PyTorch
监控系统
Redis
ONNX
法律声明:
- 本代码仅供学习研究使用,禁止用于非法用途
- 使用前需获得目标系统的书面授权
- 开发者不对滥用代码造成的法律后果负责
作者:WHCIS