1use anyhow::Result;
3use async_trait::async_trait;
4use bytes::Bytes;
5use serde::{Serialize, Deserialize};
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use tokio::fs;
10use tokio::sync::RwLock;
11use tracing::{debug, info, warn, error};
12
13#[async_trait]
15pub trait Store: Send + Sync {
16 async fn get(&self, key: &str) -> Result<Option<Bytes>>;
18
19 async fn set(&self, key: &str, value: &[u8]) -> Result<()>;
21
22 async fn delete(&self, key: &str) -> Result<()>;
24
25 async fn list(&self, prefix: &str) -> Result<Vec<String>>;
27
28 async fn exists(&self, key: &str) -> Result<bool>;
30
31 async fn clear(&self) -> Result<()>;
33}
34
35pub struct MemoryStore {
37 data: RwLock<HashMap<String, Bytes>>,
38}
39
40impl MemoryStore {
41 pub fn new() -> Self {
43 Self {
44 data: RwLock::new(HashMap::new()),
45 }
46 }
47}
48
49#[async_trait]
50impl Store for MemoryStore {
51 async fn get(&self, key: &str) -> Result<Option<Bytes>> {
52 let data = self.data.read().await;
53 Ok(data.get(key).cloned())
54 }
55
56 async fn set(&self, key: &str, value: &[u8]) -> Result<()> {
57 let mut data = self.data.write().await;
58 data.insert(key.to_string(), Bytes::copy_from_slice(value));
59 Ok(())
60 }
61
62 async fn delete(&self, key: &str) -> Result<()> {
63 let mut data = self.data.write().await;
64 data.remove(key);
65 Ok(())
66 }
67
68 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
69 let data = self.data.read().await;
70 let keys: Vec<String> = data.keys()
71 .filter(|k| k.starts_with(prefix))
72 .cloned()
73 .collect();
74 Ok(keys)
75 }
76
77 async fn exists(&self, key: &str) -> Result<bool> {
78 let data = self.data.read().await;
79 Ok(data.contains_key(key))
80 }
81
82 async fn clear(&self) -> Result<()> {
83 let mut data = self.data.write().await;
84 data.clear();
85 Ok(())
86 }
87}
88
89pub struct FileStore {
91 base_dir: PathBuf,
93
94 cache: RwLock<HashMap<String, Bytes>>,
96}
97
98impl FileStore {
99 pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
101 let base_dir = base_dir.as_ref().to_path_buf();
102
103 if !base_dir.exists() {
105 fs::create_dir_all(&base_dir).await?;
106 }
107
108 Ok(Self {
109 base_dir,
110 cache: RwLock::new(HashMap::new()),
111 })
112 }
113
114 fn key_path(&self, key: &str) -> PathBuf {
116 let safe_key = key.replace("/", "_").replace(":", "_");
118 self.base_dir.join(safe_key)
119 }
120
121 pub async fn load_cache(&self) -> Result<()> {
123 let mut cache = self.cache.write().await;
124
125 let mut dir = fs::read_dir(&self.base_dir).await?;
126
127 while let Some(entry) = dir.next_entry().await? {
128 let path = entry.path();
129
130 if path.is_file() {
131 if let Some(filename) = path.file_name() {
132 if let Some(key) = filename.to_str() {
133 let data = fs::read(&path).await?;
135 cache.insert(key.to_string(), Bytes::from(data));
136 }
137 }
138 }
139 }
140
141 info!("Loaded {} keys into file store cache", cache.len());
142
143 Ok(())
144 }
145}
146
147#[async_trait]
148impl Store for FileStore {
149 async fn get(&self, key: &str) -> Result<Option<Bytes>> {
150 let cache = self.cache.read().await;
152 if let Some(value) = cache.get(key) {
153 return Ok(Some(value.clone()));
154 }
155 drop(cache);
156
157 let path = self.key_path(key);
159 if !path.exists() {
160 return Ok(None);
161 }
162
163 let data = fs::read(&path).await?;
165 let bytes = Bytes::from(data);
166
167 let mut cache = self.cache.write().await;
169 cache.insert(key.to_string(), bytes.clone());
170
171 Ok(Some(bytes))
172 }
173
174 async fn set(&self, key: &str, value: &[u8]) -> Result<()> {
175 let path = self.key_path(key);
177 fs::write(&path, value).await?;
178
179 let mut cache = self.cache.write().await;
181 cache.insert(key.to_string(), Bytes::copy_from_slice(value));
182
183 Ok(())
184 }
185
186 async fn delete(&self, key: &str) -> Result<()> {
187 let path = self.key_path(key);
189 if path.exists() {
190 fs::remove_file(&path).await?;
191 }
192
193 let mut cache = self.cache.write().await;
195 cache.remove(key);
196
197 Ok(())
198 }
199
200 async fn list(&self, prefix: &str) -> Result<Vec<String>> {
201 let mut keys = Vec::new();
202
203 let mut dir = fs::read_dir(&self.base_dir).await?;
204
205 while let Some(entry) = dir.next_entry().await? {
206 let path = entry.path();
207
208 if path.is_file() {
209 if let Some(filename) = path.file_name() {
210 if let Some(key) = filename.to_str() {
211 if key.starts_with(prefix) {
212 keys.push(key.to_string());
213 }
214 }
215 }
216 }
217 }
218
219 Ok(keys)
220 }
221
222 async fn exists(&self, key: &str) -> Result<bool> {
223 let cache = self.cache.read().await;
225 if cache.contains_key(key) {
226 return Ok(true);
227 }
228 drop(cache);
229
230 let path = self.key_path(key);
232 Ok(path.exists())
233 }
234
235 async fn clear(&self) -> Result<()> {
236 let mut dir = fs::read_dir(&self.base_dir).await?;
238
239 while let Some(entry) = dir.next_entry().await? {
240 let path = entry.path();
241
242 if path.is_file() {
243 fs::remove_file(&path).await?;
244 }
245 }
246
247 let mut cache = self.cache.write().await;
249 cache.clear();
250
251 Ok(())
252 }
253}
254
255pub async fn create_store(
257 data_dir: &Path,
258) -> Result<Arc<Box<dyn Store>>> {
259 let file_store = FileStore::new(data_dir).await?;
261 let store: Box<dyn Store> = Box::new(file_store);
262
263 Ok(Arc::new(store))
264}