Skip to content
Go back

mini_redis_rust_practice

rust Redis-like键值存储

支持基本命令:SET key value,GET key,DEL key

cargo new mini_redis —lib mkdir src/bin touch src/bin/server.rs src/bin/client.rs

#[derive(Clone)] 是一个 Rust 属性,它会自动为结构体实现 Clone trait
1.克隆能力:允许通过 .clone() 方法创建结构体的副本
2.浅拷贝语义:对于包含智能指针的类型(如 Arc),克隆操作是高效的(只增加引用计数)
3.自动实现:编译器会为所有字段递归实现克隆逻辑

--------

Arc 是 Atomic Reference Counter(原子引用计数器)的缩写:
作用:线程安全的共享所有权智能指针

核心特性:
✅ 允许多个所有者同时访问相同数据
✅ 自动内存管理(引用计数归零时释放数据)
✅ 线程安全(使用原子操作保证线程安全)

内存模型:
engine1 ──┐
          ├─→ [Arc] ──→ [DashMap数据]
engine2 ──┘


Arc 本身只提供共享访问,修改需要内部可变性:
Arc<Mutex<Data>>  // 互斥锁保护
Arc<RwLock<Data>> // 读写锁保护

[StorageEngine实例] (不可变引用 &self)

└──→ [Arc] ──→ [DashMap]

               ├── 内部:分段锁机制
               ├── 外部:安全修改接口
               └── 支持并发读写
[dependencies]
tokio = {version = "1.45.1", features = ["full"]}  #异步运行时
bytes = "1.10.1"   #高效字节处理
dashmap = {version = "6.1.0"}  #并发 hashmap
serde = {version = "1.0.219", features = ["derive"]}
serde_json = "1.0.140"  # 序列化
anyhow = "1.0.98"   # 简化错误处理
tracing = "0.1.41"    # 日志跟踪
tracing-subscriber = "0.3.19"  #日志配置
bincode = {version = "1.3.3"}  # 序列化

代码如下

数据结构—内存存储

// src/lib.rs
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::sync::Arc;

/// let engine1 = StorageEngine::new();
/// let engine2 = engine1.clone();  // 创建"逻辑副本"
#[derive(Clone)]
pub struct StorageEngine {
    //底层 DashMap 已处理并发安全 --- 提供内部可变性,支持通过不可变引用修改数据
    // Arc 只提供共享访问
    data: Arc<DashMap<String, String>>,
}

impl StorageEngine {
    // 构造函数不需要接收者
    pub fn new() -> Self {
        Self {
            data: Arc::new(DashMap::new()),
        }
    }
    /// 语言      	类似概念	     区别
    /// Java/C# 	隐式 this	Rust 显式声明访问权限(&self)
    pub fn set(&self, key: String, value: String) {
        self.data.insert(key, value);
    }

    pub fn get(&self, key: &str) -> Option<String> {
        self.data.get(key).map(|v| v.value().clone())
    }

    pub fn del(&self, key: &str) -> bool {
        self.data.remove(key).is_some()
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub enum Command {
    Set { key: String, value: String },
    Get { key: String },
    Del { key: String },
}

#[derive(Debug, Serialize, Deserialize)]
pub enum Response {
    Value(Option<String>),
    Ok,
    Error(String),
}

TCP 服务器实现

// src/bin/server.rs
use mini_redis::{Command, Response, StorageEngine};
use std::fmt::format;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
use tracing::{error, info};

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    // 配置日志
    tracing_subscriber::fmt::init();

    let addr = "127.0.0.1:6379";
    let listener = TcpListener::bind(addr).await?;
    info!("Server Listening on: {}", addr);

    let storage = StorageEngine::new();

    // 创建无限循环,持续接受客户端连接
    loop {
        // 等待新客户端连接 (listener.accept().await)
        let (socket, client_addr) = listener.accept().await?;
        info!("Accepted connection from: {}", client_addr);

        // 克隆共享存储
        let storage_clone = storage.clone();

        // 为每个连接生成独立处理任务 --  然后回到循环开始等待链接.
        // 创建异步任务
        tokio::spawn(async move {
            // 转移所有权的异步块
            // move === 转移所有权:将 socket, storage_clone, client_addr 的所有权转移到新任务中
            // 避免借用问题:主循环会继续执行,这些变量在下一次循环会被覆盖
            // 线程安全:确保每个任务独占资源
            if let Err(e) = handle_client(socket, storage_clone).await {
                error!("Client {} error: {:?}", client_addr, e);
            };
        });

        //        主循环迭代1:
        //           socket = 连接1
        //           storage_clone = 存储引用1
        //           client_addr = 地址1
        //           生成任务 → 转移所有权到任务1
        //
        //         主循环迭代2: (任务1仍在运行)
        //           socket = 连接2
        //           storage_clone = 存储引用2
        //           client_addr = 地址2
        //           生成任务 → 转移所有权到任务2
    }
}

async fn handle_client(mut socket: TcpStream, storage: StorageEngine) -> anyhow::Result<()> {
    use tokio::io::AsyncReadExt;

    let mut buf = [0; 1024];

    loop {
        let n = socket.read(&mut buf).await?;

        if n == 0 {
            return Ok(()); //链接关闭
        }

        // 反序列化命令
        let command: Command = match bincode::deserialize(&buf[..n]) {
            Ok(cmd) => cmd,
            Err(e) => {
                let response = Response::Error(format!("Invalid command: {:?}", e));
                socket.write_all(&bincode::serialize(&response)?).await?;
                continue;
            }
        };

        let response = match command {
            Command::Set { key, value } => {
                storage.set(key, value);
                Response::Ok
            }
            Command::Get { key } => {
                let value = storage.get(&key);
                Response::Value(value)
            }
            Command::Del { key } => {
                let result = storage.del(&key);
                if result {
                    Response::Ok
                } else {
                    Response::Error("key not found".to_string())
                }
            }
        };

        // 发送响应
        let response_bytes = bincode::serialize(&response)?;
        socket.write_all(&response_bytes).await?;
    }
}

客户端实现

// src/bin/client.rs
use mini_redis::{Command, Response};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let mut stream = TcpStream::connect("127.0.0.1:6379").await?;

    // 测试 set 命令
    let set_cmd = Command::Set {
        key: "name".to_string(),
        value: "neo ray".to_string(),
    };

    send_command(&mut stream, set_cmd).await?;

    // 测试 get 命令
    let get_cmd = Command::Get {
        key: "name".to_string(),
    };

    let response = send_command(&mut stream, get_cmd).await?;

    if let Response::Value(Some(value)) = response {
        println!("get name: {}", value);
    }

    Ok(())
}

async fn send_command(socket: &mut TcpStream, command: Command) -> anyhow::Result<Response> {
    // 序列化
    let cmd_bytes = bincode::serialize(&command)?;
    socket.write_all(&cmd_bytes).await?;

    // 读取响应
    let mut buf = [0; 1024];
    let n = socket.read(&mut buf).await?;
    // 反序列化
    let response: Response = bincode::deserialize(&buf[..n])?;
    Ok(response)
}

运行

cargo run --bin server
# 输出: Server listening on 127.0.0.1:6379

cargo run --bin client
# 输出: GET name: neo ray

如果不使用 move 会怎样?

编译错误示例:

error[E0373]: async block may outlive the current function, but it borrows `socket`, which is owned by the current function
  --> src/main.rs:15:9
   |
15 |         tokio::spawn(async {
   |         ^^^^^^^^^^^^ may outlive borrowed value `socket`
16 |             handle_client(socket, storage_clone).await
   |                          ------ `socket` is borrowed here
   |
note: function requires argument type to outlive `'static`

原因:

  1. 异步任务可能比当前函数活得更久
  2. 普通借用不能满足生命周期要求
  3. move 强制所有权转移解决此问题

为什么需要 storage.clone()?

let storage_clone = storage.clone();
  1. storage 是 StorageEngine 类型(实现了 Clone)
  2. clone() 只增加 Arc 的引用计数(零开销)
  3. 每个任务需要独立的存储引用
  4. 避免跨任务共享引用导致的生命周期问题



Share this post on: