Python和Pandas实现日志分析及其可视化实战指南
日志文件格式为
一、实验目的
本次实验旨在通过Python和Pandas库对杂乱的日志文件进行清洗、统计和可视化,具体目标如下:
-
导入杂乱日志文件并进行数据清洗(去重、时间格式化、提取关键字段)。
-
使用Pandas对日志数据进行统计分析,提取高频错误状态码。
-
使用Matplotlib生成可视化图表,直观展示统计结果。
二、实验环境
-
操作系统:Windows 10
-
编程语言:Python 3.8
-
主要库:
-
Pandas 1.3.5
-
Matplotlib 3.5.1
-
Regular Expressions (re)
-
Datetime
-
开发工具:PyCharm 2021.3
三、实验步骤
步骤 1:导入杂乱日志文件
-
日志文件格式:日志文件包含多条杂乱的日志记录,每条记录的格式为:
IP - - [时间戳] "请求方法 请求路径 HTTP版本" 状态码 响应大小
示例:
192.168.1.1 - - [31/Mar/2025:22:01:17 +0800] "PUT /login HTTP/1.1" 403 8192
-
读取日志文件:使用正则表达式逐行解析日志文件,提取IP、时间戳、请求方法、状态码和响应大小等字段。
步骤 2:数据清洗
-
去重:删除重复的日志条目。
-
时间格式化:将时间戳统一格式化为
datetime
对象,支持两种时间格式: -
%d/%b/%Y:%H:%M:%S %z
(如31/Mar/2025:22:01:17 +0800
) -
%Y-%m-%d %H:%M:%S
(如2025-03-31 22:01:17
) -
提取关键字段:从请求字符串中提取请求方法、路径和HTTP版本。
-
数据类型转换:将状态码和响应大小转换为整数类型。
步骤 3:统计分析
-
统计状态码频率:使用Pandas的
value_counts()
方法统计每个状态码的出现次数。 -
过滤错误状态码:筛选出4xx和5xx状态码(表示客户端或服务器错误)。
步骤 4:可视化
-
设置中文字体:使用
SimHei
字体确保中文标题和标签正确显示。 -
生成柱状图:使用Matplotlib绘制柱状图,展示高频错误状态码的分布情况。
-
图表优化:调整图表布局、字体大小和网格线,使图表更清晰易读。
四、实验结果
1. 数据清洗结果
原始日志条目数:982
去重后的日志条目数:899
有效日志条目数:899(所有时间戳均成功解析)
2. 高频错误状态码统计
统计结果显示,日志中高频错误状态码及其出现次数如下:
403 Forbidden:185次
404 Not Found:178次
500 Internal Server Error:175次
3. 可视化图表
生成的柱状图如下所示: https://example.com/error_status_codes.png
图表清晰展示了三种高频错误状态码的分布情况,其中403 Forbidden和404 Not Found的出现次数最为接近,500 Internal Server Error稍低。
五、实验总结
1. 实验收获
通过本次实验,我们成功实现了以下目标:
-
使用Python和Pandas对杂乱日志文件进行清洗和分析。
-
提取并统计了高频错误状态码。
-
使用Matplotlib生成了直观的可视化图表,并解决了中文显示问题。
2. 问题与改进
-
中文乱码问题:通过设置
SimHei
字体解决了中文显示问题,但需确保系统安装了该字体。 -
图表布局优化:通过调整
tight_layout()
和字体大小,使图表更加美观。 -
性能优化:对于大规模日志文件,可以考虑使用更高效的数据处理方法(如Dask)。
3. 未来工作
-
扩展分析维度:增加按IP地址、时间等维度的统计分析。
-
实时日志监控:开发实时日志分析和告警系统。
-
自动化报告生成:将分析结果生成定期报告,支持邮件或网页展示。
六、代码清单
# 导入必要的库
import pandas as pd
import re
from datetime import datetime
import matplotlib
matplotlib.use('TkAgg') # 显式设置后端为 TkAgg
import matplotlib.pyplot as plt
from matplotlib import rcParams# 定义日志文件路径
log_file_path = 'error_logs.log'# 定义正则表达式模式来匹配日志行
log_pattern = r'(\d+\.\d+\.\d+\.\d+) – – \[(.*?)\] "(.*?)" (\d+) (\d+|-)'# 初始化一个空的列表来存储解析后的日志
parsed_logs = []# 逐行读取日志文件
with open(log_file_path, 'r') as f:
for line in f:
match = re.match(log_pattern, line)
if match:
ip, timestamp, request, status, size = match.groups()
parsed_logs.append({
'ip': ip,
'timestamp': timestamp,
'request': request,
'status': status,
'size': size
})# 将解析后的日志转换为 DataFrame
df = pd.DataFrame(parsed_logs)
print(f"原始日志条目数: {len(parsed_logs)}")# 去重
df = df.drop_duplicates()
print(f"去重后的日志条目数: {len(df)}")# 时间格式化
def parse_timestamp(timestamp):
try:
try:
return datetime.strptime(timestamp, '%d/%b/%Y:%H:%M:%S %z')
except ValueError:
return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
except:
return Nonedf['timestamp'] = df['timestamp'].apply(parse_timestamp)
df = df.dropna(subset=['timestamp']) # 删除无法解析时间戳的行# 提取关键字段
df['method'] = df['request'].apply(lambda x: x.split()[0] if x else '')
df['path'] = df['request'].apply(lambda x: x.split()[1] if x else '')
df['http_version'] = df['request'].apply(lambda x: x.split()[2] if x else '')# 转换数据类型
df['status'] = df['status'].astype(int)
df['size'] = df['size'].apply(lambda x: int(x) if x != '-' else 0)# 统计状态码频率
status_counts = df['status'].value_counts().sort_index()# 过滤出错误状态码(4xx 和 5xx)
error_statuses = status_counts[status_counts.index >= 400]# 绘制柱状图
rcParams['font.sans-serif'] = ['SimHei'] # 使用黑体
rcParams['axes.unicode_minus'] = False # 解决负号显示问题plt.figure(figsize=(10, 6))
error_statuses.plot(kind='bar', color='skyblue', width=0.7)
plt.title('高频错误状态码统计', fontsize=14)
plt.xlabel('状态码', fontsize=12)
plt.ylabel('出现次数', fontsize=12)
plt.xticks(rotation=0, fontsize=10)
plt.yticks(fontsize=10)
plt.grid(axis='y', linestyle='–', alpha=0.7)
plt.tight_layout()
plt.show()
作者:GTMCC1433223