rust 日志分析工具 logalyzer
命令行参数执行
例:cargo run — -f access.log —mod ip —top 3
实现:
- 大致,定义了一个命令行参数接受对象. args
- 通过传入的文件路径,读取文件输入流 BufReader::new().
- 然后逐行遍历 reader.lines() ---- 然后根据逐行的信息,对应的提取 ip 或者 error
- 打印输出.
[dependencies]
clap = {version = "4.5.39", features = ["derive"]} ## 命令行解析
regex = {version = "1.11.1"} # 正则表达式
serde = {version = "1.0.219", features = ["derive"]} 可选:后续扩展JSON输出
rayon = {version = "1.10.0"} # 添加并行处理库
memmap2 = "0.9.5"# 内存映射文件处理
indicatif = {version = "0.17.11", features = ["rayon"]} #进度条库
代码如下
第一版
use clap::Parser;
use std::collections::HashMap;
use std::error::Error;
use std::fs::File;
use std::io::{BufRead, BufReader, ErrorKind};
use std::process::exit;
/// cargo build --release
/// cargo run -- -f access.log --mode error --top 3
fn main() {
let args = Args::parse();
// 类型 所有权 内存位置 可变性 大小
// String 拥有所有权 堆上 可变 动态大小 (3个机器字)
// &str 借用引用 栈上(指向堆/静态区) 不可变 固定大小 (2个机器字)
// &str 无内存分配:只是借用已有字符串数据
// string 每次调用都涉及堆内存分配和复制
// &str:明确表示函数不需要所有权,只是借用
// String:强制转移所有权,调用后原始值失效
// 场景 推荐类型 原因
// 只读访问 &str 零开销借用
// 需要修改内容 &mut String 可变借用避免所有权转移
// 需要获取所有权 String 明确所有权转移
// 函数内部需要扩展字符串 String 需要容量控制
match analyze_log(&args.file, &args.mode, args.top) {
Ok(_) => (),
Err(e) => {
match e.downcast_ref::<std::io::Error>() {
Some(io_error) if io_error.kind() == ErrorKind::NotFound => {
eprintln!("File {} not found.", args.file)
}
_ => eprintln!("Error: {:?}", e),
}
exit(1);
}
}
}
/// 日志分析工具
/// Debug为结构体自动生成调试输出功能--允许使用 {:?} 或 {:#?} 格式化符号打印结构体内容 -- 开发调试的必备工具,类似 Java 的 toString()
/// toString Args {{ file: {:?}, mode: {:?}, top: {:?} }}
#[derive(Parser, Debug)]
#[command(version, about)]
struct Args{
/// 输入日志文件路径
/// 命令行短选项 -f, 长选项 --file 但没有默认值,所以它是必须提供的。
#[arg(short, long)]
file: String,
/// 分析模式:ip(统计ip)|error(错误日志)
#[arg(short, long, default_value = "ip")]
mode: String,
/// 显示前N个结果 default_value_t==带类型的默认值设置
/// usize 正整数
#[arg(short, long, default_value_t = 5)]
top: usize,
}
/// `Result<T, E>` 是Rust的标准枚举,用于表示成功(Ok(T))或失败(Err(E))的结果
/// () 标识 void, 意思是成功的时候 返回 OK(()) -- void
/// Box<...> 堆分配的智能指针(在堆上存储数据)
/// dyn Error 动态分发的错误特征对象(任何实现了 Error trait 的类型)
/// Box<dyn Error> 整体含义 在堆上存储的、实现了标准错误接口的任何错误类型
/// Java 近似类比 Box<dyn Error> Throwable(所有异常的基类)
/// Result<(), Box<dyn Error>>
// │
// ├── Ok(()) → 成功(无返回值)
// │
// └── Err( → 失败
// Box::new(具体错误) // 如 std::io::Error
// )
fn analyze_log(file_path: &str, mode: &str, top_n: usize) -> Result<(), Box<dyn Error>> {
// 1.打开文件
let file = File::open(file_path)?;
let reader = BufReader::new(file);
// 选择分析模式(String使用 &str 切片避免复制)
match mode {
"ip" => count_ips(reader, top_n),
"error" => find_errors(reader, top_n),
_ => Err(format!("Unknown mode: {}", mode).into()),
}
}
fn find_errors(buf_reader: BufReader<File>, top_n: usize) -> Result<(), Box<dyn Error>> {
let mut error_codes = ["400", "404", "500"];
let mut errors = Vec::new();
for line in buf_reader.lines() {
let line = line?;
if error_codes.iter().any(|code| line.contains(code)) {
errors.push(line);
}
}
println!("Found {} error(s)", errors.len());
for err in errors.iter().take(top_n) {
println!("{}", err);
}
Ok(())
}
fn count_ips(buf_reader: BufReader<File>, top_n: usize) -> Result<(), Box<dyn Error>> {
let mut ip_counter = HashMap::new();
let ip_regex = regex::Regex::new(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")?;
// 逐行读取
for line in buf_reader.lines() {
let line = line?; // 解包result
if let Some(ip) = ip_regex.find(&line) {
// 使用 entry api 高效计数,类似 java map merge
*ip_counter.entry(ip.as_str().to_string()).or_insert(0) += 1;
}
}
// 排序并打印Top N
let mut ips: Vec<_> = ip_counter.iter().collect();
ips.sort_by(|a, b| b.1.cmp(a.1)); // 降序排序
println!("Top {} IPs: ", top_n);
for (ip, counter) in ips.iter().take(top_n) {
println!("{} - {}", ip, counter);
}
Ok(())
}
access.log 测试用例.
192.168.1.105 - - [31/May/2025:14:28:33 +0800] "GET /index.html HTTP/1.1" 200 2326
192.168.1.23 - - [31/May/2025:14:28:34 +0800] "POST /login HTTP/1.1" 302 415
192.168.1.110 - - [31/May/2025:14:28:35 +0800] "GET /style.css HTTP/1.1" 200 1430
192.168.1.105 - - [31/May/2025:14:28:36 +0800] "GET /images/logo.png HTTP/1.1" 200 4289
192.168.1.99 - - [31/May/2025:14:28:37 +0800] "GET /about.html HTTP/1.1" 404 212
192.168.1.105 - - [31/May/2025:14:28:38 +0800] "GET /api/data HTTP/1.1" 200 8923
192.168.1.23 - - [31/May/2025:14:28:39 +0800] "GET /favicon.ico HTTP/1.1" 200 4286
192.168.1.110 - - [31/May/2025:14:28:40 +0800] "GET /products/123 HTTP/1.1" 200 3172
192.168.1.105 - - [31/May/2025:14:28:41 +0800] "POST /checkout HTTP/1.1" 500 1024
192.168.1.110 - - [31/May/2025:14:28:42 +0800] "GET /contact HTTP/1.1" 200 1842
输出样例
cargo run -- -f access.log --mode error
Found 2 error(s)
192.168.1.99 - - [31/May/2025:14:28:37 +0800] "GET /about.html HTTP/1.1" 404 212
192.168.1.105 - - [31/May/2025:14:28:41 +0800] "POST /checkout HTTP/1.1" 500 1024
添加线程池并行处理大文件
使用 rayon 库来实现高效的并行处理 rayon = {version = “1.10.0”} 工作窃取(Work Stealing): Rayon 使用工作窃取算法动态平衡负载 自动利用所有CPU核心
优化大文件处理(添加内存映射)memmap2 = “0.9.5”# 内存映射文件处理 零拷贝技术: 使用 memmap2 实现内存映射文件 避免数据在用户空间和内核空间之间的复制
let ip_counter: HashMap<String, usize> = content
.par_lines() // 转换为并行迭代器
.fold(...) // 局部聚合
.reduce(...) // 全局合并
fold 方法
并行处理每个子集,将元素聚合到局部累加器中
reduce 方法
1.接收 fold 阶段产生的多个 HashMap
2.使用树状合并策略(两两合并):
线程1结果 ───┐
线程2结果 ───┼─→ 合并结果1 ─┐
线程3结果 ───┘ ├─→ 最终结果
线程4结果 ───┐ │
线程5结果 ───┼─→ 合并结果2 ─┘
... ┘
生成大日志文件(100万行)
实现: 1.纯 awk 执行,比 Bash 快 5-10 倍 2.无外部命令依赖 3.支持千万行级别日志生成 4.内存占用恒定(约 10MB)
脚本如下
#!/bin/bash
# 超大规模日志生成(使用awk) 适用于千万行级别
# ./generate_large_log_awk.sh 10000000 > 10million.log
total_lines=$1
awk -v total="$total_lines" '
BEGIN {
srand()
months = "Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec"
split(months, month_arr)
# 生成IP池
for (i=0; i<50; i++) ips[i] = "192.168.1." i
# 生成路径池
for (i=0; i<20; i++) paths[i] = "/path" i
# 当前时间戳
now = systime()
# 生成日志
for (i=1; i<=total; i++) {
# 随机IP和路径
ip = ips[int(rand()*50)]
path = paths[int(rand()*20)]
# 随机状态码
r = rand()
status = (r < 0.85) ? 200 : (r < 0.95) ? 404 : 500
# 随机时间偏移
time_offset = int(rand()*3600)
log_time = now - time_offset
# 格式化时间
log_str = strftime("%d/%b/%Y:%H:%M:%S", log_time)
# 随机响应大小
size = int(rand()*5000)
# 输出日志行
print ip " - - [" log_str " +0000] \"GET " path " HTTP/1.1\" " status " " size
# 显示进度
if (i % 1000000 == 0) {
printf "Generated %d million lines...\n", i/1000000 > "/dev/stderr"
}
}
}'
添加进度条
indicatif = “0.17” # 进度条库
按时间范围过滤
// 扩展命令行参数
#[arg(long)]
start_time: Option<String>,
#[arg(long)]
end_time: Option<String>,
// 在分析函数中添加时间过滤
.filter(|line| {
// 解析日志中的时间戳
// 检查是否在指定时间范围内
})
结果持久化
#[arg(short, long)]
output: Option<String>,
// 在函数结束时
if let Some(output_path) = output {
let mut file = File::create(output_path)?;
write!(file, "{}", result_string)?;
}
代码如下:
use clap::Parser;
use rayon::iter::ParallelIterator;
use rayon::str::ParallelString;
use std::collections::HashMap;
use std::error::Error;
use std::fs::File;
use std::io::ErrorKind;
use std::process::exit;
use std::str::from_utf8;
// 添加性能计时
use indicatif::{ParallelProgressIterator, ProgressBar, ProgressStyle};
use std::time::Instant;
/// cargo build --release
/// cargo run -- -f access.log --mode error --top 3
fn main() {
let args = Args::parse();
let start = Instant::now();
match analyze_log(&args.file, &args.mode, args.top) {
Ok(_) => {
let duration = start.elapsed();
println!("分析完成,耗时={:.2?}", duration);
}
Err(e) => {
match e.downcast_ref::<std::io::Error>() {
Some(io_error) if io_error.kind() == ErrorKind::NotFound => {
eprintln!("File {} not found.", args.file)
}
_ => eprintln!("Error: {:?}", e),
}
exit(1);
}
}
}
/// 日志分析工具
#[derive(Parser, Debug)]
#[command(version, about)]
struct Args {
/// 输入日志文件路径
/// 命令行短选项 -f, 长选项 --file 但没有默认值,所以它是必须提供的。
#[arg(short, long)]
file: String,
/// 分析模式:ip(统计ip)|error(错误日志)
#[arg(short, long, default_value = "ip")]
mode: String,
/// 显示前N个结果 default_value_t==带类型的默认值设置
/// usize 正整数
#[arg(short, long, default_value_t = 5)]
top: usize,
// 扩展命令行参数
#[arg(long)]
start_time: Option<String>,
#[arg(long)]
end_time: Option<String>,
// 结果持久化
#[arg(short, long)]
output: Option<String>,
}
fn analyze_log(file_path: &str, mode: &str, top_n: usize) -> Result<(), Box<dyn Error>> {
// 1.打开文件
let file = File::open(file_path)?;
// 使用内存映射文件提高大文件读取效率
// 使用内存映射文件而不是逐行读取
let mmap = unsafe { memmap2::Mmap::map(&file)? };
let content = from_utf8(&mmap)?;
match mode {
"ip" => count_ips_parallel(content, top_n),
"error" => find_errors_parallel(content, top_n),
_ => Err(format!("Unknown mode: {}", mode).into()),
}
}
fn find_errors_parallel(content: &str, top_n: usize) -> Result<(), Box<dyn Error>> {
let mut error_codes = ["400", "404", "500"];
let errors: Vec<String> = content
.par_lines()
.filter(|line| error_codes.iter().any(|code| line.contains(code)))
.map(|s| s.to_string())
.collect();
println!("Found {} errors: ", errors.len());
for err in errors.iter().take(top_n) {
println!("{}", err);
}
Ok(())
}
fn count_ips_parallel(content: &str, top_n: usize) -> Result<(), Box<dyn Error>> {
let total_lines = content.lines().count();
let pb = ProgressBar::new(total_lines as u64);
pb.set_style(
ProgressStyle::default_bar()
.template(
"{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})",
)?
.progress_chars("#>-"),
);
let ip_regex = regex::Regex::new(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")?;
// par_lines() 将行处理并行化
// fold() 用于线程局部聚合
// reduce() 用于合并线程结果
let ip_counter: HashMap<String, usize> = content
.par_lines() // 并行迭代行
.progress_with(pb)
.fold(
|| HashMap::new(), // 初始化闭包:创建局部累加器
|mut acc, line| // 累加闭包:处理每个元素
{
if let Some(ip) = ip_regex.find(line) {
*acc.entry(ip.as_str().to_string()).or_insert(0) += 1;
}
acc
},
)
.reduce(
|| HashMap::new(), // 初始归并值 // 初始化闭包(通常不会用到)
|mut a, b| // 合并闭包:合并两个 HashMap
{
for (ip, count) in b {
*a.entry(ip).or_insert(0) += count;
}
a
},
);
let mut ips: Vec<_> = ip_counter.iter().collect();
ips.sort_by(|a, b| b.1.cmp(a.1));
println!("Top {} Ips: ", top_n);
for (ip, count) in ips.iter().take(top_n) {
println!("{} - {}", ip, count);
}
Ok(())
}
输出样例

Running `target\release\logalyzer.exe -f large_access.log --mode ip --top 10`
Top 10 Ips:
192.168.1.41 - 20275
192.168.1.23 - 20267
192.168.1.14 - 20227
192.168.1.31 - 20206
192.168.1.38 - 20154
192.168.1.21 - 20137
192.168.1.46 - 20127
192.168.1.42 - 20116
192.168.1.18 - 20111
192.168.1.32 - 20110
分析完成,耗时=224.78ms