Lodestone/
store.rs

1// src/store.rs
2use anyhow::Result;
3use async_trait::async_trait;
4use bytes::Bytes;
5use serde::{Serialize, Deserialize};
6use std::collections::HashMap;
7use std::path::{Path, PathBuf};
8use std::sync::Arc;
9use tokio::fs;
10use tokio::sync::RwLock;
11use tracing::{debug, info, warn, error};
12
13/// Store interface for persisting data
14#[async_trait]
15pub trait Store: Send + Sync {
16    /// Get a value by key
17    async fn get(&self, key: &str) -> Result<Option<Bytes>>;
18    
19    /// Set a value by key
20    async fn set(&self, key: &str, value: &[u8]) -> Result<()>;
21    
22    /// Delete a key
23    async fn delete(&self, key: &str) -> Result<()>;
24    
25    /// List keys with a prefix
26    async fn list(&self, prefix: &str) -> Result<Vec<String>>;
27    
28    /// Check if a key exists
29    async fn exists(&self, key: &str) -> Result<bool>;
30    
31    /// Clear all data
32    async fn clear(&self) -> Result<()>;
33}
34
35/// Memory store for transient storage
36pub struct MemoryStore {
37    data: RwLock<HashMap<String, Bytes>>,
38}
39
40impl MemoryStore {
41    /// Create a new memory store
42    pub fn new() -> Self {
43        Self {
44            data: RwLock::new(HashMap::new()),
45        }
46    }
47}
48
49#[async_trait]
50impl Store for MemoryStore {
51    async fn get(&self, key: &str) -> Result<Option<Bytes>> {
52        let data = self.data.read().await;
53        Ok(data.get(key).cloned())
54    }
55    
56    async fn set(&self, key: &str, value: &[u8]) -> Result<()> {
57        let mut data = self.data.write().await;
58        data.insert(key.to_string(), Bytes::copy_from_slice(value));
59        Ok(())
60    }
61    
62    async fn delete(&self, key: &str) -> Result<()> {
63        let mut data = self.data.write().await;
64        data.remove(key);
65        Ok(())
66    }
67    
68    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
69        let data = self.data.read().await;
70        let keys: Vec<String> = data.keys()
71            .filter(|k| k.starts_with(prefix))
72            .cloned()
73            .collect();
74        Ok(keys)
75    }
76    
77    async fn exists(&self, key: &str) -> Result<bool> {
78        let data = self.data.read().await;
79        Ok(data.contains_key(key))
80    }
81    
82    async fn clear(&self) -> Result<()> {
83        let mut data = self.data.write().await;
84        data.clear();
85        Ok(())
86    }
87}
88
89/// File system store for persistent storage
90pub struct FileStore {
91    /// Base directory to store data
92    base_dir: PathBuf,
93    
94    /// Cache
95    cache: RwLock<HashMap<String, Bytes>>,
96}
97
98impl FileStore {
99    /// Create a new file store
100    pub async fn new<P: AsRef<Path>>(base_dir: P) -> Result<Self> {
101        let base_dir = base_dir.as_ref().to_path_buf();
102        
103        // Create directory if it doesn't exist
104        if !base_dir.exists() {
105            fs::create_dir_all(&base_dir).await?;
106        }
107        
108        Ok(Self {
109            base_dir,
110            cache: RwLock::new(HashMap::new()),
111        })
112    }
113    
114    /// Get the path for a key
115    fn key_path(&self, key: &str) -> PathBuf {
116        // Sanitize key to be a valid filename
117        let safe_key = key.replace("/", "_").replace(":", "_");
118        self.base_dir.join(safe_key)
119    }
120    
121    /// Load all keys into cache
122    pub async fn load_cache(&self) -> Result<()> {
123        let mut cache = self.cache.write().await;
124        
125        let mut dir = fs::read_dir(&self.base_dir).await?;
126        
127        while let Some(entry) = dir.next_entry().await? {
128            let path = entry.path();
129            
130            if path.is_file() {
131                if let Some(filename) = path.file_name() {
132                    if let Some(key) = filename.to_str() {
133                        // Load file content
134                        let data = fs::read(&path).await?;
135                        cache.insert(key.to_string(), Bytes::from(data));
136                    }
137                }
138            }
139        }
140        
141        info!("Loaded {} keys into file store cache", cache.len());
142        
143        Ok(())
144    }
145}
146
147#[async_trait]
148impl Store for FileStore {
149    async fn get(&self, key: &str) -> Result<Option<Bytes>> {
150        // Try cache first
151        let cache = self.cache.read().await;
152        if let Some(value) = cache.get(key) {
153            return Ok(Some(value.clone()));
154        }
155        drop(cache);
156        
157        // Check if file exists
158        let path = self.key_path(key);
159        if !path.exists() {
160            return Ok(None);
161        }
162        
163        // Read from file
164        let data = fs::read(&path).await?;
165        let bytes = Bytes::from(data);
166        
167        // Update cache
168        let mut cache = self.cache.write().await;
169        cache.insert(key.to_string(), bytes.clone());
170        
171        Ok(Some(bytes))
172    }
173    
174    async fn set(&self, key: &str, value: &[u8]) -> Result<()> {
175        // Update file
176        let path = self.key_path(key);
177        fs::write(&path, value).await?;
178        
179        // Update cache
180        let mut cache = self.cache.write().await;
181        cache.insert(key.to_string(), Bytes::copy_from_slice(value));
182        
183        Ok(())
184    }
185    
186    async fn delete(&self, key: &str) -> Result<()> {
187        // Delete file
188        let path = self.key_path(key);
189        if path.exists() {
190            fs::remove_file(&path).await?;
191        }
192        
193        // Update cache
194        let mut cache = self.cache.write().await;
195        cache.remove(key);
196        
197        Ok(())
198    }
199    
200    async fn list(&self, prefix: &str) -> Result<Vec<String>> {
201        let mut keys = Vec::new();
202        
203        let mut dir = fs::read_dir(&self.base_dir).await?;
204        
205        while let Some(entry) = dir.next_entry().await? {
206            let path = entry.path();
207            
208            if path.is_file() {
209                if let Some(filename) = path.file_name() {
210                    if let Some(key) = filename.to_str() {
211                        if key.starts_with(prefix) {
212                            keys.push(key.to_string());
213                        }
214                    }
215                }
216            }
217        }
218        
219        Ok(keys)
220    }
221    
222    async fn exists(&self, key: &str) -> Result<bool> {
223        // Check cache first
224        let cache = self.cache.read().await;
225        if cache.contains_key(key) {
226            return Ok(true);
227        }
228        drop(cache);
229        
230        // Check file system
231        let path = self.key_path(key);
232        Ok(path.exists())
233    }
234    
235    async fn clear(&self) -> Result<()> {
236        // Clear all files
237        let mut dir = fs::read_dir(&self.base_dir).await?;
238        
239        while let Some(entry) = dir.next_entry().await? {
240            let path = entry.path();
241            
242            if path.is_file() {
243                fs::remove_file(&path).await?;
244            }
245        }
246        
247        // Clear cache
248        let mut cache = self.cache.write().await;
249        cache.clear();
250        
251        Ok(())
252    }
253}
254
255/// Factory function to create the appropriate store based on configuration
256pub async fn create_store(
257    data_dir: &Path,
258) -> Result<Arc<Box<dyn Store>>> {
259    // Use file store directly
260    let file_store = FileStore::new(data_dir).await?;
261    let store: Box<dyn Store> = Box::new(file_store);
262    
263    Ok(Arc::new(store))
264}