rust Redis-like键值存储
支持基本命令:SET key value,GET key,DEL key
cargo new mini_redis —lib mkdir src/bin touch src/bin/server.rs src/bin/client.rs
#[derive(Clone)] 是一个 Rust 属性,它会自动为结构体实现 Clone trait
1.克隆能力:允许通过 .clone() 方法创建结构体的副本
2.浅拷贝语义:对于包含智能指针的类型(如 Arc),克隆操作是高效的(只增加引用计数)
3.自动实现:编译器会为所有字段递归实现克隆逻辑
--------
Arc 是 Atomic Reference Counter(原子引用计数器)的缩写:
作用:线程安全的共享所有权智能指针
核心特性:
✅ 允许多个所有者同时访问相同数据
✅ 自动内存管理(引用计数归零时释放数据)
✅ 线程安全(使用原子操作保证线程安全)
内存模型:
engine1 ──┐
├─→ [Arc] ──→ [DashMap数据]
engine2 ──┘
Arc 本身只提供共享访问,修改需要内部可变性:
Arc<Mutex<Data>> // 互斥锁保护
Arc<RwLock<Data>> // 读写锁保护
[StorageEngine实例] (不可变引用 &self)
│
└──→ [Arc] ──→ [DashMap]
│
├── 内部:分段锁机制
├── 外部:安全修改接口
└── 支持并发读写
[dependencies]
tokio = {version = "1.45.1", features = ["full"]} #异步运行时
bytes = "1.10.1" #高效字节处理
dashmap = {version = "6.1.0"} #并发 hashmap
serde = {version = "1.0.219", features = ["derive"]}
serde_json = "1.0.140" # 序列化
anyhow = "1.0.98" # 简化错误处理
tracing = "0.1.41" # 日志跟踪
tracing-subscriber = "0.3.19" #日志配置
bincode = {version = "1.3.3"} # 序列化
代码如下
数据结构—内存存储
// src/lib.rs
use dashmap::DashMap;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
/// let engine1 = StorageEngine::new();
/// let engine2 = engine1.clone(); // 创建"逻辑副本"
#[derive(Clone)]
pub struct StorageEngine {
//底层 DashMap 已处理并发安全 --- 提供内部可变性,支持通过不可变引用修改数据
// Arc 只提供共享访问
data: Arc<DashMap<String, String>>,
}
impl StorageEngine {
// 构造函数不需要接收者
pub fn new() -> Self {
Self {
data: Arc::new(DashMap::new()),
}
}
/// 语言 类似概念 区别
/// Java/C# 隐式 this Rust 显式声明访问权限(&self)
pub fn set(&self, key: String, value: String) {
self.data.insert(key, value);
}
pub fn get(&self, key: &str) -> Option<String> {
self.data.get(key).map(|v| v.value().clone())
}
pub fn del(&self, key: &str) -> bool {
self.data.remove(key).is_some()
}
}
#[derive(Debug, Serialize, Deserialize)]
pub enum Command {
Set { key: String, value: String },
Get { key: String },
Del { key: String },
}
#[derive(Debug, Serialize, Deserialize)]
pub enum Response {
Value(Option<String>),
Ok,
Error(String),
}
TCP 服务器实现
// src/bin/server.rs
use mini_redis::{Command, Response, StorageEngine};
use std::fmt::format;
use tokio::io::AsyncWriteExt;
use tokio::net::{TcpListener, TcpStream};
use tracing::{error, info};
#[tokio::main]
async fn main() -> anyhow::Result<()> {
// 配置日志
tracing_subscriber::fmt::init();
let addr = "127.0.0.1:6379";
let listener = TcpListener::bind(addr).await?;
info!("Server Listening on: {}", addr);
let storage = StorageEngine::new();
// 创建无限循环,持续接受客户端连接
loop {
// 等待新客户端连接 (listener.accept().await)
let (socket, client_addr) = listener.accept().await?;
info!("Accepted connection from: {}", client_addr);
// 克隆共享存储
let storage_clone = storage.clone();
// 为每个连接生成独立处理任务 -- 然后回到循环开始等待链接.
// 创建异步任务
tokio::spawn(async move {
// 转移所有权的异步块
// move === 转移所有权:将 socket, storage_clone, client_addr 的所有权转移到新任务中
// 避免借用问题:主循环会继续执行,这些变量在下一次循环会被覆盖
// 线程安全:确保每个任务独占资源
if let Err(e) = handle_client(socket, storage_clone).await {
error!("Client {} error: {:?}", client_addr, e);
};
});
// 主循环迭代1:
// socket = 连接1
// storage_clone = 存储引用1
// client_addr = 地址1
// 生成任务 → 转移所有权到任务1
//
// 主循环迭代2: (任务1仍在运行)
// socket = 连接2
// storage_clone = 存储引用2
// client_addr = 地址2
// 生成任务 → 转移所有权到任务2
}
}
async fn handle_client(mut socket: TcpStream, storage: StorageEngine) -> anyhow::Result<()> {
use tokio::io::AsyncReadExt;
let mut buf = [0; 1024];
loop {
let n = socket.read(&mut buf).await?;
if n == 0 {
return Ok(()); //链接关闭
}
// 反序列化命令
let command: Command = match bincode::deserialize(&buf[..n]) {
Ok(cmd) => cmd,
Err(e) => {
let response = Response::Error(format!("Invalid command: {:?}", e));
socket.write_all(&bincode::serialize(&response)?).await?;
continue;
}
};
let response = match command {
Command::Set { key, value } => {
storage.set(key, value);
Response::Ok
}
Command::Get { key } => {
let value = storage.get(&key);
Response::Value(value)
}
Command::Del { key } => {
let result = storage.del(&key);
if result {
Response::Ok
} else {
Response::Error("key not found".to_string())
}
}
};
// 发送响应
let response_bytes = bincode::serialize(&response)?;
socket.write_all(&response_bytes).await?;
}
}
客户端实现
// src/bin/client.rs
use mini_redis::{Command, Response};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let mut stream = TcpStream::connect("127.0.0.1:6379").await?;
// 测试 set 命令
let set_cmd = Command::Set {
key: "name".to_string(),
value: "neo ray".to_string(),
};
send_command(&mut stream, set_cmd).await?;
// 测试 get 命令
let get_cmd = Command::Get {
key: "name".to_string(),
};
let response = send_command(&mut stream, get_cmd).await?;
if let Response::Value(Some(value)) = response {
println!("get name: {}", value);
}
Ok(())
}
async fn send_command(socket: &mut TcpStream, command: Command) -> anyhow::Result<Response> {
// 序列化
let cmd_bytes = bincode::serialize(&command)?;
socket.write_all(&cmd_bytes).await?;
// 读取响应
let mut buf = [0; 1024];
let n = socket.read(&mut buf).await?;
// 反序列化
let response: Response = bincode::deserialize(&buf[..n])?;
Ok(response)
}
运行
cargo run --bin server
# 输出: Server listening on 127.0.0.1:6379
cargo run --bin client
# 输出: GET name: neo ray
如果不使用 move 会怎样?
编译错误示例:
error[E0373]: async block may outlive the current function, but it borrows `socket`, which is owned by the current function
--> src/main.rs:15:9
|
15 | tokio::spawn(async {
| ^^^^^^^^^^^^ may outlive borrowed value `socket`
16 | handle_client(socket, storage_clone).await
| ------ `socket` is borrowed here
|
note: function requires argument type to outlive `'static`
原因:
- 异步任务可能比当前函数活得更久
- 普通借用不能满足生命周期要求
- move 强制所有权转移解决此问题
为什么需要 storage.clone()?
let storage_clone = storage.clone();
- storage 是 StorageEngine 类型(实现了 Clone)
- clone() 只增加 Arc 的引用计数(零开销)
- 每个任务需要独立的存储引用
- 避免跨任务共享引用导致的生命周期问题