omni_orchestrator/schemas/v1/db/queries/
metadata.rs

1use anyhow::{Context, Result};
2use lazy_static::lazy_static;
3use sqlx::{MySql, Pool};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::Mutex;
7
8/// Global mutex for synchronizing metadata operations.
9///
10/// This mutex prevents race conditions when multiple threads attempt to modify
11/// the metadata table simultaneously. It's particularly important for operations
12/// like table creation and key deduplication which require exclusive access.
13lazy_static! {
14    static ref METADATA_MUTEX: Mutex<()> = Mutex::new(());
15}
16
17/// Creates the metadata table in the database if it doesn't already exist.
18///
19/// This function ensures that the system has a place to store key-value metadata.
20/// It uses a mutex to prevent multiple threads from attempting to create the table
21/// simultaneously, and includes schema optimizations like indices and uniqueness constraints.
22///
23/// # Arguments
24///
25/// * `pool` - Database connection pool for executing the query
26///
27/// # Returns
28///
29/// * `Ok(())` - Table was successfully created or already existed
30/// * `Err(anyhow::Error)` - Failed to create the table
31///
32/// # Concurrency
33///
34/// This function acquires the global metadata mutex to ensure thread safety.
35/// If the table already exists, it returns immediately without performing any changes.
36///
37/// # Schema
38///
39/// The metadata table is created with the following schema:
40/// - `id`: Auto-incrementing primary key
41/// - `key`: Unique string identifier (indexed for fast lookups)
42/// - `value`: Text field storing the metadata value
43/// - `created_at`: Timestamp of when the record was created
44/// - `updated_at`: Timestamp of the last update to the record
45pub async fn create_meta_table(pool: &Pool<MySql>) -> Result<()> {
46    // Acquire a lock to ensure only one thread can modify the metadata table at a time
47    let _lock = METADATA_MUTEX.lock().await;
48
49    // First check if table exists to prevent unnecessary queries
50    if meta_table_exists_internal(pool).await? {
51        return Ok(());
52    }
53
54    // Use stronger uniqueness enforcement in schema
55    let result = sqlx::query(
56        r#"
57        CREATE TABLE IF NOT EXISTS metadata (
58            id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
59            `key` VARCHAR(255) NOT NULL,
60            value TEXT NOT NULL,
61            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
62            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
63            UNIQUE KEY unique_key (`key`),
64            INDEX idx_metadata_key (`key`)
65        ) ENGINE=InnoDB
66        "#,
67    )
68    .execute(pool)
69    .await
70    .context("Error creating metadata table")?;
71
72    log::info!("Successfully created metadata table");
73
74    // After creating the table, ensure no duplicates exist
75    cleanup_duplicate_keys_internal(pool).await?;
76
77    Ok(())
78}
79
80/// Retrieves a metadata value by its key.
81///
82/// This function performs a simple lookup in the metadata table to retrieve
83/// the value associated with the provided key.
84///
85/// # Arguments
86///
87/// * `pool` - Database connection pool for executing the query
88/// * `key` - The unique key whose value to retrieve
89///
90/// # Returns
91///
92/// * `Ok(String)` - Successfully retrieved the value for the key
93/// * `Err(anyhow::Error)` - Failed to fetch the value or key doesn't exist
94///
95/// # Error Handling
96///
97/// Returns an error if the key doesn't exist in the metadata table or if a database
98/// error occurs during the query execution.
99///
100/// # Concurrency
101///
102/// This function doesn't acquire the metadata mutex since it only performs a read
103/// operation, which doesn't risk data corruption.
104pub async fn get_meta_value(pool: &Pool<MySql>, key: &str) -> Result<String> {
105    // No need for mutex here since we're only reading
106    let value = sqlx::query_scalar::<_, String>("SELECT value FROM metadata WHERE `key` = ?")
107        .bind(key)
108        .fetch_one(pool)
109        .await
110        .context(format!("Failed to fetch metadata value for key '{}'", key))?;
111
112    Ok(value)
113}
114
115/// Sets a metadata value for a specific key.
116///
117/// This function stores a key-value pair in the metadata table. If the key already
118/// exists, its value is updated. The operation is performed atomically within a
119/// transaction and is protected by a mutex to prevent concurrent modifications.
120///
121/// # Arguments
122///
123/// * `pool` - Database connection pool for executing the query
124/// * `key` - The unique key to associate with the value
125/// * `value` - The value to store
126///
127/// # Returns
128///
129/// * `Ok(())` - Successfully stored the key-value pair
130/// * `Err(anyhow::Error)` - Failed to store the key-value pair
131///
132/// # Concurrency
133///
134/// This function acquires the global metadata mutex to ensure thread safety
135/// when multiple threads attempt to modify the same key.
136///
137/// # Transaction Handling
138///
139/// This function uses a database transaction that:
140/// 1. Deletes any existing entries with the same key
141/// 2. Inserts the new key-value pair
142///
143/// If any part of this operation fails, the entire transaction is rolled back,
144/// preserving data consistency.
145pub async fn set_meta_value(pool: &Pool<MySql>, key: &str, value: &str) -> Result<()> {
146    // Acquire a lock to ensure only one thread can modify a key at a time
147    let _lock = METADATA_MUTEX.lock().await;
148
149    // Use a transaction to ensure atomicity
150    let mut tx = pool.begin().await.context("Failed to begin transaction")?;
151
152    // First delete any existing entries with this key to ensure no duplicates
153    sqlx::query("DELETE FROM metadata WHERE `key` = ?")
154        .bind(key)
155        .execute(&mut *tx)
156        .await
157        .context(format!(
158            "Failed to delete existing metadata entry for key '{}'",
159            key
160        ))?;
161
162    // Then insert the new value
163    sqlx::query("INSERT INTO metadata (`key`, value) VALUES (?, ?)")
164        .bind(key)
165        .bind(value)
166        .execute(&mut *tx)
167        .await
168        .context(format!("Failed to insert metadata value for key '{}'", key))?;
169
170    // Commit the transaction
171    tx.commit().await.context("Failed to commit transaction")?;
172
173    Ok(())
174}
175
176/// Internal function to check if the metadata table exists.
177///
178/// This function checks if the metadata table exists in the current database
179/// without acquiring the metadata mutex. It's intended for internal use by
180/// functions that already hold the mutex.
181///
182/// # Arguments
183///
184/// * `pool` - Database connection pool for executing the query
185///
186/// # Returns
187///
188/// * `Ok(bool)` - True if the table exists, false otherwise
189/// * `Err(anyhow::Error)` - Failed to check if the table exists
190///
191/// # Note
192///
193/// This is an internal function not protected by the metadata mutex.
194/// It should only be called from functions that already hold the mutex
195/// or where concurrent access is not a concern.
196async fn meta_table_exists_internal(pool: &Pool<MySql>) -> Result<bool> {
197    let table_exists = sqlx::query_scalar::<_, i64>(
198        "SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = DATABASE() AND table_name = 'metadata'"
199    )
200    .fetch_one(pool)
201    .await
202    .context("Failed to check if metadata table exists")?;
203
204    Ok(table_exists > 0)
205}
206
207/// Checks if the metadata table exists in the database.
208///
209/// This function provides a thread-safe way to check if the metadata table
210/// has been created in the database.
211///
212/// # Arguments
213///
214/// * `pool` - Database connection pool for executing the query
215///
216/// # Returns
217///
218/// * `Ok(bool)` - True if the table exists, false otherwise
219/// * `Err(anyhow::Error)` - Failed to check if the table exists
220///
221/// # Concurrency
222///
223/// This function doesn't acquire the metadata mutex since it only performs a read
224/// operation, which doesn't risk data corruption.
225pub async fn meta_table_exists(pool: &Pool<MySql>) -> Result<bool> {
226    // No need for mutex here since we're only reading
227    meta_table_exists_internal(pool).await
228}
229
230/// Internal function to clean up duplicate metadata keys.
231///
232/// This function identifies and resolves duplicate keys in the metadata table
233/// by keeping only the most recently updated entry for each key. It's intended
234/// for internal use by functions that already hold the metadata mutex.
235///
236/// # Arguments
237///
238/// * `pool` - Database connection pool for executing the query
239///
240/// # Returns
241///
242/// * `Ok(usize)` - Number of duplicate entries that were removed
243/// * `Err(anyhow::Error)` - Failed to clean up duplicate keys
244///
245/// # Process
246///
247/// For each key with multiple entries, this function:
248/// 1. Identifies the most recently updated entry by `updated_at` timestamp
249/// 2. Deletes all other entries with the same key
250/// 3. Logs a warning if duplicates were found
251///
252/// # Note
253///
254/// This is an internal function not protected by the metadata mutex.
255/// It should only be called from functions that already hold the mutex.
256async fn cleanup_duplicate_keys_internal(pool: &Pool<MySql>) -> Result<usize> {
257    // Find duplicate keys
258    let duplicates = sqlx::query_as::<_, (String,)>(
259        r#"
260        SELECT `key` FROM metadata 
261        GROUP BY `key` 
262        HAVING COUNT(*) > 1
263        "#,
264    )
265    .fetch_all(pool)
266    .await
267    .context("Failed to fetch duplicate keys")?;
268
269    let mut cleaned_count = 0;
270
271    // For each duplicate key, keep only the latest entry
272    for (key,) in duplicates {
273        let mut tx = pool.begin().await?;
274
275        // Get the ID of the most recently updated entry
276        let latest_id = sqlx::query_scalar::<_, i64>(
277            "SELECT id FROM metadata WHERE `key` = ? ORDER BY updated_at DESC LIMIT 1",
278        )
279        .bind(&key)
280        .fetch_one(&mut *tx)
281        .await
282        .context(format!("Failed to get latest entry ID for key '{}'", key))?;
283
284        // Delete all entries with this key except the latest one
285        let deleted = sqlx::query("DELETE FROM metadata WHERE `key` = ? AND id != ?")
286            .bind(&key)
287            .bind(latest_id)
288            .execute(&mut *tx)
289            .await
290            .context(format!(
291                "Failed to delete duplicate entries for key '{}'",
292                key
293            ))?;
294
295        cleaned_count += deleted.rows_affected() as usize;
296
297        tx.commit().await.context("Failed to commit transaction")?;
298
299        if cleaned_count > 0 {
300            log::warn!(
301                "Cleaned up {} duplicate entries for key '{}'",
302                cleaned_count,
303                key
304            );
305        }
306    }
307
308    Ok(cleaned_count)
309}
310
311/// Cleans up duplicate metadata keys in a thread-safe manner.
312///
313/// This function provides a public, mutex-protected interface to clean up
314/// duplicate keys in the metadata table. It ensures that only one thread
315/// can perform this operation at a time.
316///
317/// # Arguments
318///
319/// * `pool` - Database connection pool for executing the query
320///
321/// # Returns
322///
323/// * `Ok(usize)` - Number of duplicate entries that were removed
324/// * `Err(anyhow::Error)` - Failed to clean up duplicate keys
325///
326/// # Concurrency
327///
328/// This function acquires the global metadata mutex to ensure thread safety
329/// when cleaning up duplicate keys.
330///
331/// # Use Cases
332///
333/// This function is useful for:
334/// - Periodic maintenance of the metadata table
335/// - Resolving inconsistencies that might have been introduced by bugs
336/// - Cleaning up after schema migrations or application upgrades
337pub async fn cleanup_duplicate_keys(pool: &Pool<MySql>) -> Result<usize> {
338    let _lock = METADATA_MUTEX.lock().await;
339    cleanup_duplicate_keys_internal(pool).await
340}
341
342/// Initializes the metadata system, ensuring the table exists and is clean.
343///
344/// This function provides a safe way to initialize the metadata system by:
345/// 1. Creating the metadata table if it doesn't exist
346/// 2. Cleaning up any duplicate keys that might exist
347///
348/// It's designed to be called during application startup to ensure the metadata
349/// system is in a consistent state before it's used.
350///
351/// # Arguments
352///
353/// * `pool` - Database connection pool for executing the query
354///
355/// # Returns
356///
357/// * `Ok(())` - Successfully initialized the metadata system
358/// * `Err(anyhow::Error)` - Failed to initialize the metadata system
359///
360/// # Concurrency
361///
362/// This function acquires the global metadata mutex to ensure thread safety
363/// during initialization.
364///
365/// # Warning
366///
367/// If duplicate keys are found and cleaned up during initialization, an error
368/// is logged as this could indicate issues with the application's use of the
369/// metadata API.
370pub async fn initialize_metadata_system(pool: &Pool<MySql>) -> Result<()> {
371    let _lock = METADATA_MUTEX.lock().await;
372
373    // Create table if it doesn't exist
374    if !meta_table_exists_internal(pool).await? {
375        // Use direct query execution to create table
376        sqlx::query(
377            r#"
378            CREATE TABLE IF NOT EXISTS metadata (
379                id BIGINT NOT NULL AUTO_INCREMENT PRIMARY KEY,
380                `key` VARCHAR(255) NOT NULL,
381                value TEXT NOT NULL,
382                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
383                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
384                UNIQUE KEY unique_key (`key`),
385                INDEX idx_metadata_key (`key`)
386            ) ENGINE=InnoDB
387            "#,
388        )
389        .execute(pool)
390        .await
391        .context("Error creating metadata table")?;
392
393        log::info!("Successfully created metadata table");
394    }
395
396    // Always clean up any duplicates that may exist
397    let cleaned = cleanup_duplicate_keys_internal(pool).await?;
398    if cleaned > 0 {
399        log::error!("Cleaned up {} total duplicate metadata entries during initialization, this could indicate issues with your usage of the matadata API", cleaned);
400    }
401
402    Ok(())
403}
404
405/// Cache implementation for metadata to reduce database access.
406///
407/// This cache maintains an in-memory copy of metadata values to minimize
408/// database queries. It provides methods to get and set values, as well as
409/// to refresh the entire cache from the database.
410///
411/// # Fields
412///
413/// * `pool` - Database connection pool for executing queries
414/// * `cache` - In-memory hash map storing key-value pairs
415///
416/// # Thread Safety
417///
418/// This struct is not thread-safe on its own and should either be used from a
419/// single thread or wrapped in a synchronization primitive like `Arc<Mutex<MetadataCache>>`.
420///
421/// # Performance Considerations
422///
423/// The cache can significantly reduce database load for frequently accessed
424/// metadata values, but it can become stale if other processes modify the
425/// metadata table directly. Use `refresh_cache` periodically or after expected
426/// external changes.
427pub struct MetadataCache {
428    /// Database connection pool for executing queries
429    pool: Pool<MySql>,
430    /// In-memory cache of metadata key-value pairs
431    cache: HashMap<String, String>,
432}
433
434impl MetadataCache {
435    /// Creates a new metadata cache with an empty cache and the provided database pool.
436    ///
437    /// # Arguments
438    ///
439    /// * `pool` - Database connection pool for executing queries
440    ///
441    /// # Returns
442    ///
443    /// A new `MetadataCache` instance with an empty cache
444    pub fn new(pool: Pool<MySql>) -> Self {
445        Self {
446            pool,
447            cache: HashMap::new(),
448        }
449    }
450
451    /// Retrieves a metadata value by its key, using the cache when possible.
452    ///
453    /// This method first checks the in-memory cache for the requested key.
454    /// If the key is not found in the cache, it queries the database and
455    /// updates the cache with the result.
456    ///
457    /// # Arguments
458    ///
459    /// * `key` - The unique key whose value to retrieve
460    ///
461    /// # Returns
462    ///
463    /// * `Ok(String)` - Successfully retrieved the value for the key
464    /// * `Err(anyhow::Error)` - Failed to fetch the value or key doesn't exist
465    ///
466    /// # Cache Behavior
467    ///
468    /// This method:
469    /// - Returns cached values without querying the database when possible
470    /// - Automatically populates the cache with values fetched from the database
471    /// - Does not refresh existing cache entries (use `refresh_cache` for that)
472    pub async fn get(&mut self, key: &str) -> Result<String> {
473        // Check cache first
474        if let Some(value) = self.cache.get(key) {
475            return Ok(value.clone());
476        }
477
478        // If not in cache, get from database
479        let value = get_meta_value(&self.pool, key).await?;
480
481        // Store in cache
482        self.cache.insert(key.to_string(), value.clone());
483
484        Ok(value)
485    }
486
487    /// Sets a metadata value for a specific key and updates the cache.
488    ///
489    /// This method updates both the database and the in-memory cache with
490    /// the new key-value pair. This ensures that subsequent `get` calls
491    /// will return the updated value without requiring a database query.
492    ///
493    /// # Arguments
494    ///
495    /// * `key` - The unique key to associate with the value
496    /// * `value` - The value to store
497    ///
498    /// # Returns
499    ///
500    /// * `Ok(())` - Successfully stored the key-value pair
501    /// * `Err(anyhow::Error)` - Failed to store the key-value pair
502    ///
503    /// # Error Handling
504    ///
505    /// If the database update fails, the cache is not updated, ensuring
506    /// consistency between the cache and the database.
507    pub async fn set(&mut self, key: &str, value: &str) -> Result<()> {
508        // Update database
509        set_meta_value(&self.pool, key, value).await?;
510
511        // Update cache
512        self.cache.insert(key.to_string(), value.to_string());
513
514        Ok(())
515    }
516
517    /// Refreshes the entire cache from the database.
518    ///
519    /// This method clears the in-memory cache and reloads all metadata entries
520    /// from the database. It's useful when the cache might be stale due to
521    /// external changes to the metadata table.
522    ///
523    /// # Returns
524    ///
525    /// * `Ok(())` - Successfully refreshed the cache
526    /// * `Err(anyhow::Error)` - Failed to refresh the cache
527    ///
528    /// # Use Cases
529    ///
530    /// This method is particularly useful in scenarios such as:
531    /// - After application startup to prime the cache
532    /// - After scheduled maintenance that might have modified metadata
533    /// - When cache staleness is detected or suspected
534    /// - Periodically in long-running applications to ensure cache freshness
535    pub async fn refresh_cache(&mut self) -> Result<()> {
536        // Clear cache
537        self.cache.clear();
538
539        // Get all metadata entries
540        let entries = sqlx::query_as::<_, (String, String)>("SELECT `key`, value FROM metadata")
541            .fetch_all(&self.pool)
542            .await
543            .context("Failed to fetch metadata entries for cache refresh")?;
544
545        // Populate cache
546        for (key, value) in entries {
547            self.cache.insert(key, value);
548        }
549
550        Ok(())
551    }
552}