omni_orchestrator/schemas/v1/db/queries/metadata.rs
1use anyhow::{Context, Result};
2use lazy_static::lazy_static;
3use sqlx::{MySql, Pool};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7
8/// Global mutex for synchronizing metadata operations.
9///
10/// This mutex prevents race conditions when multiple threads attempt to modify
11/// the metadata table simultaneously. It's particularly important for operations
12/// like table creation and key deduplication which require exclusive access.
13lazy_static! {
14 static ref METADATA_MUTEX: Mutex<()> = Mutex::new(());
15}
16
17/// Creates the metadata table in the database if it doesn't already exist.
18///
19/// This function ensures that the system has a place to store key-value metadata.
20/// It uses a mutex to prevent multiple threads from attempting to create the table
21/// simultaneously, and includes schema optimizations like indices and uniqueness constraints.
22///
23/// # Arguments
24///
25/// * `pool` - Database connection pool for executing the query
26///
27/// # Returns
28///
29/// * `Ok(())` - Table was successfully created or already existed
30/// * `Err(anyhow::Error)` - Failed to create the table
31///
32/// # Concurrency
33///
34/// This function acquires the global metadata mutex to ensure thread safety.
35/// If the table already exists, it returns immediately without performing any changes.
36///
37/// # Schema
38///
39/// The metadata table is created with the following schema:
40/// - `id`: Auto-incrementing primary key
41/// - `key`: Unique string identifier (indexed for fast lookups)
42/// - `value`: Text field storing the metadata value
43/// - `created_at`: Timestamp of when the record was created
44/// - `updated_at`: Timestamp of the last update to the record
45pub async fn create_meta_table(pool: &Pool<MySql>) -> Result<()> {
46 // Acquire a lock to ensure only one thread can modify the metadata table at a time
47 let _lock = METADATA_MUTEX.lock().await;
48
49 // First check if table exists to prevent unnecessary queries
50 if meta_table_exists_internal(pool).await? {
51 return Ok(());
52 }
53
54 // Use stronger uniqueness enforcement in schema
55 let result = sqlx::query(
56 r#"
57 CREATE TABLE IF NOT EXISTS metadata (
58 id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
59 `key` VARCHAR(255) NOT NULL,
60 value TEXT NOT NULL,
61 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
62 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
63 UNIQUE KEY unique_key (`key`),
64 INDEX idx_metadata_key (`key`)
65 ) ENGINE=InnoDB
66 "#,
67 )
68 .execute(pool)
69 .await
70 .context("Error creating metadata table")?;
71
72 log::info!("Successfully created metadata table");
73
74 // After creating the table, ensure no duplicates exist
75 cleanup_duplicate_keys_internal(pool).await?;
76
77 Ok(())
78}
79
80/// Retrieves a metadata value by its key.
81///
82/// This function performs a simple lookup in the metadata table to retrieve
83/// the value associated with the provided key.
84///
85/// # Arguments
86///
87/// * `pool` - Database connection pool for executing the query
88/// * `key` - The unique key whose value to retrieve
89///
90/// # Returns
91///
92/// * `Ok(String)` - Successfully retrieved the value for the key
93/// * `Err(anyhow::Error)` - Failed to fetch the value or key doesn't exist
94///
95/// # Error Handling
96///
97/// Returns an error if the key doesn't exist in the metadata table or if a database
98/// error occurs during the query execution.
99///
100/// # Concurrency
101///
102/// This function doesn't acquire the metadata mutex since it only performs a read
103/// operation, which doesn't risk data corruption.
104pub async fn get_meta_value(pool: &Pool<MySql>, key: &str) -> Result<String> {
105 // No need for mutex here since we're only reading
106 let value = sqlx::query_scalar::<_, String>("SELECT value FROM metadata WHERE `key` = ?")
107 .bind(key)
108 .fetch_one(pool)
109 .await
110 .context(format!("Failed to fetch metadata value for key '{}'", key))?;
111
112 Ok(value)
113}
114
115/// Sets a metadata value for a specific key.
116///
117/// This function stores a key-value pair in the metadata table. If the key already
118/// exists, its value is updated. The operation is performed atomically within a
119/// transaction and is protected by a mutex to prevent concurrent modifications.
120///
121/// # Arguments
122///
123/// * `pool` - Database connection pool for executing the query
124/// * `key` - The unique key to associate with the value
125/// * `value` - The value to store
126///
127/// # Returns
128///
129/// * `Ok(())` - Successfully stored the key-value pair
130/// * `Err(anyhow::Error)` - Failed to store the key-value pair
131///
132/// # Concurrency
133///
134/// This function acquires the global metadata mutex to ensure thread safety
135/// when multiple threads attempt to modify the same key.
136///
137/// # Transaction Handling
138///
139/// This function uses a database transaction that:
140/// 1. Deletes any existing entries with the same key
141/// 2. Inserts the new key-value pair
142///
143/// If any part of this operation fails, the entire transaction is rolled back,
144/// preserving data consistency.
145pub async fn set_meta_value(pool: &Pool<MySql>, key: &str, value: &str) -> Result<()> {
146 // Acquire a lock to ensure only one thread can modify a key at a time
147 let _lock = METADATA_MUTEX.lock().await;
148
149 // Use a transaction to ensure atomicity
150 let mut tx = pool.begin().await.context("Failed to begin transaction")?;
151
152 // First delete any existing entries with this key to ensure no duplicates
153 sqlx::query("DELETE FROM metadata WHERE `key` = ?")
154 .bind(key)
155 .execute(&mut *tx)
156 .await
157 .context(format!(
158 "Failed to delete existing metadata entry for key '{}'",
159 key
160 ))?;
161
162 // Then insert the new value
163 sqlx::query("INSERT INTO metadata (`key`, value) VALUES (?, ?)")
164 .bind(key)
165 .bind(value)
166 .execute(&mut *tx)
167 .await
168 .context(format!("Failed to insert metadata value for key '{}'", key))?;
169
170 // Commit the transaction
171 tx.commit().await.context("Failed to commit transaction")?;
172
173 Ok(())
174}
175
176/// Internal function to check if the metadata table exists.
177///
178/// This function checks if the metadata table exists in the current database
179/// without acquiring the metadata mutex. It's intended for internal use by
180/// functions that already hold the mutex.
181///
182/// # Arguments
183///
184/// * `pool` - Database connection pool for executing the query
185///
186/// # Returns
187///
188/// * `Ok(bool)` - True if the table exists, false otherwise
189/// * `Err(anyhow::Error)` - Failed to check if the table exists
190///
191/// # Note
192///
193/// This is an internal function not protected by the metadata mutex.
194/// It should only be called from functions that already hold the mutex
195/// or where concurrent access is not a concern.
196async fn meta_table_exists_internal(pool: &Pool<MySql>) -> Result<bool> {
197 let table_exists = sqlx::query_scalar::<_, i64>(
198 "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = 'metadata'"
199 )
200 .fetch_one(pool)
201 .await
202 .context("Failed to check if metadata table exists")?;
203
204 Ok(table_exists > 0)
205}
206
207/// Checks if the metadata table exists in the database.
208///
209/// This function provides a thread-safe way to check if the metadata table
210/// has been created in the database.
211///
212/// # Arguments
213///
214/// * `pool` - Database connection pool for executing the query
215///
216/// # Returns
217///
218/// * `Ok(bool)` - True if the table exists, false otherwise
219/// * `Err(anyhow::Error)` - Failed to check if the table exists
220///
221/// # Concurrency
222///
223/// This function doesn't acquire the metadata mutex since it only performs a read
224/// operation, which doesn't risk data corruption.
225pub async fn meta_table_exists(pool: &Pool<MySql>) -> Result<bool> {
226 // No need for mutex here since we're only reading
227 meta_table_exists_internal(pool).await
228}
229
230/// Internal function to clean up duplicate metadata keys.
231///
232/// This function identifies and resolves duplicate keys in the metadata table
233/// by keeping only the most recently updated entry for each key. It's intended
234/// for internal use by functions that already hold the metadata mutex.
235///
236/// # Arguments
237///
238/// * `pool` - Database connection pool for executing the query
239///
240/// # Returns
241///
242/// * `Ok(usize)` - Number of duplicate entries that were removed
243/// * `Err(anyhow::Error)` - Failed to clean up duplicate keys
244///
245/// # Process
246///
247/// For each key with multiple entries, this function:
248/// 1. Identifies the most recently updated entry by `updated_at` timestamp
249/// 2. Deletes all other entries with the same key
250/// 3. Logs a warning if duplicates were found
251///
252/// # Note
253///
254/// This is an internal function not protected by the metadata mutex.
255/// It should only be called from functions that already hold the mutex.
256async fn cleanup_duplicate_keys_internal(pool: &Pool<MySql>) -> Result<usize> {
257 // Find duplicate keys
258 let duplicates = sqlx::query_as::<_, (String,)>(
259 r#"
260 SELECT `key` FROM metadata
261 GROUP BY `key`
262 HAVING COUNT(*) > 1
263 "#,
264 )
265 .fetch_all(pool)
266 .await
267 .context("Failed to fetch duplicate keys")?;
268
269 let mut cleaned_count = 0;
270
271 // For each duplicate key, keep only the latest entry
272 for (key,) in duplicates {
273 let mut tx = pool.begin().await?;
274
275 // Get the ID of the most recently updated entry
276 let latest_id = sqlx::query_scalar::<_, i64>(
277 "SELECT id FROM metadata WHERE `key` = ? ORDER BY updated_at DESC LIMIT 1",
278 )
279 .bind(&key)
280 .fetch_one(&mut *tx)
281 .await
282 .context(format!("Failed to get latest entry ID for key '{}'", key))?;
283
284 // Delete all entries with this key except the latest one
285 let deleted = sqlx::query("DELETE FROM metadata WHERE `key` = ? AND id != ?")
286 .bind(&key)
287 .bind(latest_id)
288 .execute(&mut *tx)
289 .await
290 .context(format!(
291 "Failed to delete duplicate entries for key '{}'",
292 key
293 ))?;
294
295 cleaned_count += deleted.rows_affected() as usize;
296
297 tx.commit().await.context("Failed to commit transaction")?;
298
299 if cleaned_count > 0 {
300 log::warn!(
301 "Cleaned up {} duplicate entries for key '{}'",
302 cleaned_count,
303 key
304 );
305 }
306 }
307
308 Ok(cleaned_count)
309}
310
311/// Cleans up duplicate metadata keys in a thread-safe manner.
312///
313/// This function provides a public, mutex-protected interface to clean up
314/// duplicate keys in the metadata table. It ensures that only one thread
315/// can perform this operation at a time.
316///
317/// # Arguments
318///
319/// * `pool` - Database connection pool for executing the query
320///
321/// # Returns
322///
323/// * `Ok(usize)` - Number of duplicate entries that were removed
324/// * `Err(anyhow::Error)` - Failed to clean up duplicate keys
325///
326/// # Concurrency
327///
328/// This function acquires the global metadata mutex to ensure thread safety
329/// when cleaning up duplicate keys.
330///
331/// # Use Cases
332///
333/// This function is useful for:
334/// - Periodic maintenance of the metadata table
335/// - Resolving inconsistencies that might have been introduced by bugs
336/// - Cleaning up after schema migrations or application upgrades
337pub async fn cleanup_duplicate_keys(pool: &Pool<MySql>) -> Result<usize> {
338 let _lock = METADATA_MUTEX.lock().await;
339 cleanup_duplicate_keys_internal(pool).await
340}
341
342/// Initializes the metadata system, ensuring the table exists and is clean.
343///
344/// This function provides a safe way to initialize the metadata system by:
345/// 1. Creating the metadata table if it doesn't exist
346/// 2. Cleaning up any duplicate keys that might exist
347///
348/// It's designed to be called during application startup to ensure the metadata
349/// system is in a consistent state before it's used.
350///
351/// # Arguments
352///
353/// * `pool` - Database connection pool for executing the query
354///
355/// # Returns
356///
357/// * `Ok(())` - Successfully initialized the metadata system
358/// * `Err(anyhow::Error)` - Failed to initialize the metadata system
359///
360/// # Concurrency
361///
362/// This function acquires the global metadata mutex to ensure thread safety
363/// during initialization.
364///
365/// # Warning
366///
367/// If duplicate keys are found and cleaned up during initialization, an error
368/// is logged as this could indicate issues with the application's use of the
369/// metadata API.
370pub async fn initialize_metadata_system(pool: &Pool<MySql>) -> Result<()> {
371 let _lock = METADATA_MUTEX.lock().await;
372
373 // Create table if it doesn't exist
374 if !meta_table_exists_internal(pool).await? {
375 // Use direct query execution to create table
376 sqlx::query(
377 r#"
378 CREATE TABLE IF NOT EXISTS metadata (
379 id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
380 `key` VARCHAR(255) NOT NULL,
381 value TEXT NOT NULL,
382 created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
383 updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
384 UNIQUE KEY unique_key (`key`),
385 INDEX idx_metadata_key (`key`)
386 ) ENGINE=InnoDB
387 "#,
388 )
389 .execute(pool)
390 .await
391 .context("Error creating metadata table")?;
392
393 log::info!("Successfully created metadata table");
394 }
395
396 // Always clean up any duplicates that may exist
397 let cleaned = cleanup_duplicate_keys_internal(pool).await?;
398 if cleaned > 0 {
399 log::error!("Cleaned up {} total duplicate metadata entries during initialization, this could indicate issues with your usage of the matadata API", cleaned);
400 }
401
402 Ok(())
403}
404
405/// Cache implementation for metadata to reduce database access.
406///
407/// This cache maintains an in-memory copy of metadata values to minimize
408/// database queries. It provides methods to get and set values, as well as
409/// to refresh the entire cache from the database.
410///
411/// # Fields
412///
413/// * `pool` - Database connection pool for executing queries
414/// * `cache` - In-memory hash map storing key-value pairs
415///
416/// # Thread Safety
417///
418/// This struct is not thread-safe on its own and should either be used from a
419/// single thread or wrapped in a synchronization primitive like `Arc<Mutex<MetadataCache>>`.
420///
421/// # Performance Considerations
422///
423/// The cache can significantly reduce database load for frequently accessed
424/// metadata values, but it can become stale if other processes modify the
425/// metadata table directly. Use `refresh_cache` periodically or after expected
426/// external changes.
427pub struct MetadataCache {
428 /// Database connection pool for executing queries
429 pool: Pool<MySql>,
430 /// In-memory cache of metadata key-value pairs
431 cache: HashMap<String, String>,
432}
433
434impl MetadataCache {
435 /// Creates a new metadata cache with an empty cache and the provided database pool.
436 ///
437 /// # Arguments
438 ///
439 /// * `pool` - Database connection pool for executing queries
440 ///
441 /// # Returns
442 ///
443 /// A new `MetadataCache` instance with an empty cache
444 pub fn new(pool: Pool<MySql>) -> Self {
445 Self {
446 pool,
447 cache: HashMap::new(),
448 }
449 }
450
451 /// Retrieves a metadata value by its key, using the cache when possible.
452 ///
453 /// This method first checks the in-memory cache for the requested key.
454 /// If the key is not found in the cache, it queries the database and
455 /// updates the cache with the result.
456 ///
457 /// # Arguments
458 ///
459 /// * `key` - The unique key whose value to retrieve
460 ///
461 /// # Returns
462 ///
463 /// * `Ok(String)` - Successfully retrieved the value for the key
464 /// * `Err(anyhow::Error)` - Failed to fetch the value or key doesn't exist
465 ///
466 /// # Cache Behavior
467 ///
468 /// This method:
469 /// - Returns cached values without querying the database when possible
470 /// - Automatically populates the cache with values fetched from the database
471 /// - Does not refresh existing cache entries (use `refresh_cache` for that)
472 pub async fn get(&mut self, key: &str) -> Result<String> {
473 // Check cache first
474 if let Some(value) = self.cache.get(key) {
475 return Ok(value.clone());
476 }
477
478 // If not in cache, get from database
479 let value = get_meta_value(&self.pool, key).await?;
480
481 // Store in cache
482 self.cache.insert(key.to_string(), value.clone());
483
484 Ok(value)
485 }
486
487 /// Sets a metadata value for a specific key and updates the cache.
488 ///
489 /// This method updates both the database and the in-memory cache with
490 /// the new key-value pair. This ensures that subsequent `get` calls
491 /// will return the updated value without requiring a database query.
492 ///
493 /// # Arguments
494 ///
495 /// * `key` - The unique key to associate with the value
496 /// * `value` - The value to store
497 ///
498 /// # Returns
499 ///
500 /// * `Ok(())` - Successfully stored the key-value pair
501 /// * `Err(anyhow::Error)` - Failed to store the key-value pair
502 ///
503 /// # Error Handling
504 ///
505 /// If the database update fails, the cache is not updated, ensuring
506 /// consistency between the cache and the database.
507 pub async fn set(&mut self, key: &str, value: &str) -> Result<()> {
508 // Update database
509 set_meta_value(&self.pool, key, value).await?;
510
511 // Update cache
512 self.cache.insert(key.to_string(), value.to_string());
513
514 Ok(())
515 }
516
517 /// Refreshes the entire cache from the database.
518 ///
519 /// This method clears the in-memory cache and reloads all metadata entries
520 /// from the database. It's useful when the cache might be stale due to
521 /// external changes to the metadata table.
522 ///
523 /// # Returns
524 ///
525 /// * `Ok(())` - Successfully refreshed the cache
526 /// * `Err(anyhow::Error)` - Failed to refresh the cache
527 ///
528 /// # Use Cases
529 ///
530 /// This method is particularly useful in scenarios such as:
531 /// - After application startup to prime the cache
532 /// - After scheduled maintenance that might have modified metadata
533 /// - When cache staleness is detected or suspected
534 /// - Periodically in long-running applications to ensure cache freshness
535 pub async fn refresh_cache(&mut self) -> Result<()> {
536 // Clear cache
537 self.cache.clear();
538
539 // Get all metadata entries
540 let entries = sqlx::query_as::<_, (String, String)>("SELECT `key`, value FROM metadata")
541 .fetch_all(&self.pool)
542 .await
543 .context("Failed to fetch metadata entries for cache refresh")?;
544
545 // Populate cache
546 for (key, value) in entries {
547 self.cache.insert(key, value);
548 }
549
550 Ok(())
551 }
552}