Skip to content
Go back

logalyzer_rust_practice

rust 日志分析工具 logalyzer

命令行参数执行

例:cargo run — -f access.log —mod ip —top 3

实现:

  1. 大致,定义了一个命令行参数接受对象. args
  2. 通过传入的文件路径,读取文件输入流 BufReader::new().
  3. 然后逐行遍历 reader.lines() ---- 然后根据逐行的信息,对应的提取 ip 或者 error
  4. 打印输出.

[dependencies]
clap = {version = "4.5.39", features = ["derive"]}   ## 命令行解析
regex = {version = "1.11.1"}  # 正则表达式
serde = {version = "1.0.219", features = ["derive"]}   可选:后续扩展JSON输出
rayon = {version = "1.10.0"}   # 添加并行处理库
memmap2 = "0.9.5"# 内存映射文件处理
indicatif = {version = "0.17.11", features = ["rayon"]}  #进度条库

代码如下

第一版

use clap::Parser;
use std::collections::HashMap;
use std::error::Error;
use std::fs::File;
use std::io::{BufRead, BufReader, ErrorKind};
use std::process::exit;


/// cargo build --release
/// cargo run -- -f access.log --mode error --top 3
fn main() {
    let args = Args::parse();

    // 类型	     所有权	     内存位置	         可变性	大小
    // String	拥有所有权	堆上	                 可变	动态大小 (3个机器字)
    // &str	    借用引用	    栈上(指向堆/静态区)	不可变	固定大小 (2个机器字)
    // &str 无内存分配:只是借用已有字符串数据
    // string 每次调用都涉及堆内存分配和复制
    // &str:明确表示函数不需要所有权,只是借用
    // String:强制转移所有权,调用后原始值失效
    // 场景	        推荐类型  	   原因
    // 只读访问	    &str	       零开销借用
    // 需要修改内容	&mut String	   可变借用避免所有权转移
    // 需要获取所有权	String	       明确所有权转移
    // 函数内部需要扩展字符串	String	需要容量控制
    match analyze_log(&args.file, &args.mode, args.top) {
        Ok(_) => (),
        Err(e) => {
            match e.downcast_ref::<std::io::Error>() {
                Some(io_error) if io_error.kind() == ErrorKind::NotFound => {
                    eprintln!("File {} not found.", args.file)
                }

                _ => eprintln!("Error: {:?}", e),
            }

            exit(1);
        }
    }

}

/// 日志分析工具
/// Debug为结构体自动生成调试输出功能--允许使用 {:?} 或 {:#?} 格式化符号打印结构体内容 -- 开发调试的必备工具,类似 Java 的 toString()
/// toString Args {{ file: {:?}, mode: {:?}, top: {:?} }}
#[derive(Parser, Debug)]
#[command(version, about)]
struct Args{
    /// 输入日志文件路径
    /// 命令行短选项 -f, 长选项 --file 但没有默认值,所以它是必须提供的。
    #[arg(short, long)]
    file: String,

    /// 分析模式:ip(统计ip)|error(错误日志)
    #[arg(short, long, default_value = "ip")]
    mode: String,

    /// 显示前N个结果  default_value_t==带类型的默认值设置
    /// usize 正整数
    #[arg(short, long, default_value_t = 5)]
    top: usize,

}


/// `Result<T, E>` 是Rust的标准枚举,用于表示成功(Ok(T))或失败(Err(E))的结果
/// () 标识 void, 意思是成功的时候 返回 OK(()) -- void
/// Box<...>	堆分配的智能指针(在堆上存储数据)
/// dyn Error	动态分发的错误特征对象(任何实现了 Error trait 的类型)
/// Box<dyn Error>  整体含义	在堆上存储的、实现了标准错误接口的任何错误类型
/// Java 近似类比 Box<dyn Error>	Throwable(所有异常的基类)
/// Result<(), Box<dyn Error>>
// │
// ├── Ok(())        → 成功(无返回值)
// │
// └── Err(          → 失败
//         Box::new(具体错误)   // 如 std::io::Error
//      )
fn analyze_log(file_path: &str, mode: &str, top_n: usize) -> Result<(), Box<dyn Error>> {
    // 1.打开文件
    let file  = File::open(file_path)?;
    let reader = BufReader::new(file);

    // 选择分析模式(String使用 &str 切片避免复制)
    match mode {
        "ip" => count_ips(reader, top_n),
        "error" => find_errors(reader, top_n),
        _ => Err(format!("Unknown mode: {}", mode).into()),
    }
}

fn find_errors(buf_reader: BufReader<File>, top_n: usize) -> Result<(), Box<dyn Error>> {
    let mut error_codes = ["400", "404", "500"];
    let mut errors = Vec::new();

    for line in buf_reader.lines() {
        let line = line?;
        if error_codes.iter().any(|code| line.contains(code)) {
            errors.push(line);
        }
    }
    println!("Found {} error(s)", errors.len());
    for err in errors.iter().take(top_n) {
        println!("{}", err);
    }
    Ok(())
}

fn count_ips(buf_reader: BufReader<File>, top_n: usize) -> Result<(), Box<dyn Error>> {
    let mut ip_counter = HashMap::new();
    let ip_regex = regex::Regex::new(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")?;

    // 逐行读取
    for line in buf_reader.lines() {
        let line = line?; // 解包result
        if let Some(ip) = ip_regex.find(&line) {
            // 使用 entry api 高效计数,类似 java map merge
            *ip_counter.entry(ip.as_str().to_string()).or_insert(0) += 1;
        }
    }

    // 排序并打印Top N
    let mut ips: Vec<_> = ip_counter.iter().collect();
    ips.sort_by(|a, b| b.1.cmp(a.1));  // 降序排序

    println!("Top {} IPs: ", top_n);
    for (ip, counter) in ips.iter().take(top_n) {
        println!("{} - {}", ip, counter);
    }

    Ok(())
}

access.log 测试用例.

192.168.1.105 - - [31/May/2025:14:28:33 +0800] "GET /index.html HTTP/1.1" 200 2326
192.168.1.23 - - [31/May/2025:14:28:34 +0800] "POST /login HTTP/1.1" 302 415
192.168.1.110 - - [31/May/2025:14:28:35 +0800] "GET /style.css HTTP/1.1" 200 1430
192.168.1.105 - - [31/May/2025:14:28:36 +0800] "GET /images/logo.png HTTP/1.1" 200 4289
192.168.1.99 - - [31/May/2025:14:28:37 +0800] "GET /about.html HTTP/1.1" 404 212
192.168.1.105 - - [31/May/2025:14:28:38 +0800] "GET /api/data HTTP/1.1" 200 8923
192.168.1.23 - - [31/May/2025:14:28:39 +0800] "GET /favicon.ico HTTP/1.1" 200 4286
192.168.1.110 - - [31/May/2025:14:28:40 +0800] "GET /products/123 HTTP/1.1" 200 3172
192.168.1.105 - - [31/May/2025:14:28:41 +0800] "POST /checkout HTTP/1.1" 500 1024
192.168.1.110 - - [31/May/2025:14:28:42 +0800] "GET /contact HTTP/1.1" 200 1842

输出样例

cargo run -- -f access.log --mode error

Found 2 error(s)
192.168.1.99 - - [31/May/2025:14:28:37 +0800] "GET /about.html HTTP/1.1" 404 212
192.168.1.105 - - [31/May/2025:14:28:41 +0800] "POST /checkout HTTP/1.1" 500 1024


添加线程池并行处理大文件

使用 rayon 库来实现高效的并行处理 rayon = {version = “1.10.0”} 工作窃取(Work Stealing): Rayon 使用工作窃取算法动态平衡负载 自动利用所有CPU核心

优化大文件处理(添加内存映射)memmap2 = “0.9.5”# 内存映射文件处理 零拷贝技术: 使用 memmap2 实现内存映射文件 避免数据在用户空间和内核空间之间的复制

let ip_counter: HashMap<String, usize> = content
    .par_lines() // 转换为并行迭代器
    .fold(...)   // 局部聚合
    .reduce(...) // 全局合并

fold 方法
并行处理每个子集,将元素聚合到局部累加器中

reduce 方法
1.接收 fold 阶段产生的多个 HashMap
2.使用树状合并策略(两两合并):


线程1结果 ───┐
线程2结果 ───┼─→ 合并结果1 ─┐
线程3结果 ───┘              ├─→ 最终结果
线程4结果 ───┐
线程5结果 ───┼─→ 合并结果2 ─┘
...

生成大日志文件(100万行)

实现: 1.纯 awk 执行,比 Bash 快 5-10 倍 2.无外部命令依赖 3.支持千万行级别日志生成 4.内存占用恒定(约 10MB)

脚本如下

#!/bin/bash
# 超大规模日志生成(使用awk) 适用于千万行级别
# ./generate_large_log_awk.sh 10000000 > 10million.log

total_lines=$1

awk -v total="$total_lines" '
BEGIN {
    srand()
    months = "Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec"
    split(months, month_arr)

    # 生成IP池
    for (i=0; i<50; i++) ips[i] = "192.168.1." i

    # 生成路径池
    for (i=0; i<20; i++) paths[i] = "/path" i

    # 当前时间戳
    now = systime()

    # 生成日志
    for (i=1; i<=total; i++) {
        # 随机IP和路径
        ip = ips[int(rand()*50)]
        path = paths[int(rand()*20)]

        # 随机状态码
        r = rand()
        status = (r < 0.85) ? 200 : (r < 0.95) ? 404 : 500

        # 随机时间偏移
        time_offset = int(rand()*3600)
        log_time = now - time_offset

        # 格式化时间
        log_str = strftime("%d/%b/%Y:%H:%M:%S", log_time)

        # 随机响应大小
        size = int(rand()*5000)

        # 输出日志行
        print ip " - - [" log_str " +0000] \"GET " path " HTTP/1.1\" " status " " size

        # 显示进度
        if (i % 1000000 == 0) {
            printf "Generated %d million lines...\n", i/1000000 > "/dev/stderr"
        }
    }
}'

添加进度条

indicatif = “0.17” # 进度条库

按时间范围过滤

// 扩展命令行参数
#[arg(long)]
start_time: Option<String>,

#[arg(long)]
end_time: Option<String>,

// 在分析函数中添加时间过滤
.filter(|line| {
    // 解析日志中的时间戳
    // 检查是否在指定时间范围内
})

结果持久化

#[arg(short, long)]
output: Option<String>,

// 在函数结束时
if let Some(output_path) = output {
    let mut file = File::create(output_path)?;
    write!(file, "{}", result_string)?;
}

代码如下:

use clap::Parser;
use rayon::iter::ParallelIterator;
use rayon::str::ParallelString;
use std::collections::HashMap;
use std::error::Error;
use std::fs::File;
use std::io::ErrorKind;
use std::process::exit;
use std::str::from_utf8;

// 添加性能计时
use indicatif::{ParallelProgressIterator, ProgressBar, ProgressStyle};
use std::time::Instant;

/// cargo build --release
/// cargo run -- -f access.log --mode error --top 3
fn main() {
    let args = Args::parse();

    let start = Instant::now();

    match analyze_log(&args.file, &args.mode, args.top) {
        Ok(_) => {
            let duration = start.elapsed();
            println!("分析完成,耗时={:.2?}", duration);
        }
        Err(e) => {
            match e.downcast_ref::<std::io::Error>() {
                Some(io_error) if io_error.kind() == ErrorKind::NotFound => {
                    eprintln!("File {} not found.", args.file)
                }

                _ => eprintln!("Error: {:?}", e),
            }

            exit(1);
        }
    }
}

/// 日志分析工具
#[derive(Parser, Debug)]
#[command(version, about)]
struct Args {
    /// 输入日志文件路径
    /// 命令行短选项 -f, 长选项 --file 但没有默认值,所以它是必须提供的。
    #[arg(short, long)]
    file: String,

    /// 分析模式:ip(统计ip)|error(错误日志)
    #[arg(short, long, default_value = "ip")]
    mode: String,

    /// 显示前N个结果  default_value_t==带类型的默认值设置
    /// usize 正整数
    #[arg(short, long, default_value_t = 5)]
    top: usize,

    // 扩展命令行参数
    #[arg(long)]
    start_time: Option<String>,

    #[arg(long)]
    end_time: Option<String>,

    // 结果持久化
    #[arg(short, long)]
    output: Option<String>,
}

fn analyze_log(file_path: &str, mode: &str, top_n: usize) -> Result<(), Box<dyn Error>> {
    // 1.打开文件
    let file = File::open(file_path)?;

    // 使用内存映射文件提高大文件读取效率
    // 使用内存映射文件而不是逐行读取
    let mmap = unsafe { memmap2::Mmap::map(&file)? };
    let content = from_utf8(&mmap)?;

    match mode {
        "ip" => count_ips_parallel(content, top_n),
        "error" => find_errors_parallel(content, top_n),
        _ => Err(format!("Unknown mode: {}", mode).into()),
    }
}

fn find_errors_parallel(content: &str, top_n: usize) -> Result<(), Box<dyn Error>> {
    let mut error_codes = ["400", "404", "500"];

    let errors: Vec<String> = content
        .par_lines()
        .filter(|line| error_codes.iter().any(|code| line.contains(code)))
        .map(|s| s.to_string())
        .collect();

    println!("Found {} errors: ", errors.len());
    for err in errors.iter().take(top_n) {
        println!("{}", err);
    }
    Ok(())
}

fn count_ips_parallel(content: &str, top_n: usize) -> Result<(), Box<dyn Error>> {
    let total_lines = content.lines().count();
    let pb = ProgressBar::new(total_lines as u64);
    pb.set_style(
        ProgressStyle::default_bar()
            .template(
                "{spinner:.green} [{elapsed_precise}] [{bar:40.cyan/blue}] {pos}/{len} ({eta})",
            )?
            .progress_chars("#>-"),
    );

    let ip_regex = regex::Regex::new(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")?;
    // par_lines() 将行处理并行化
    // fold() 用于线程局部聚合
    // reduce() 用于合并线程结果
    let ip_counter: HashMap<String, usize> = content
        .par_lines() // 并行迭代行
        .progress_with(pb)
        .fold(
            || HashMap::new(), // 初始化闭包:创建局部累加器
            |mut acc, line|  // 累加闭包:处理每个元素
                {
                    if let Some(ip) = ip_regex.find(line) {
                        *acc.entry(ip.as_str().to_string()).or_insert(0) += 1;
                    }
                    acc
                },
        )
        .reduce(
            || HashMap::new(), // 初始归并值 // 初始化闭包(通常不会用到)
            |mut a, b|  // 合并闭包:合并两个 HashMap
                {
                    for (ip, count) in b {
                        *a.entry(ip).or_insert(0) += count;
                    }
                    a
                },
        );

    let mut ips: Vec<_> = ip_counter.iter().collect();
    ips.sort_by(|a, b| b.1.cmp(a.1));

    println!("Top {} Ips: ", top_n);

    for (ip, count) in ips.iter().take(top_n) {
        println!("{} - {}", ip, count);
    }

    Ok(())
}

输出样例

picture 0

Running `target\release\logalyzer.exe -f large_access.log --mode ip --top 10`
Top 10 Ips:
192.168.1.41 - 20275
192.168.1.23 - 20267
192.168.1.14 - 20227
192.168.1.31 - 20206
192.168.1.38 - 20154
192.168.1.21 - 20137
192.168.1.46 - 20127
192.168.1.42 - 20116
192.168.1.18 - 20111
192.168.1.32 - 20110
分析完成,耗时=224.78ms

Share this post on: