omni_orchestrator/db_manager/
connection.rs

1use sqlx::{MySql, MySqlPool, Pool};
2use std::sync::Arc;
3use tokio::sync::RwLock;
4use std::collections::HashMap;
5use log::{info, warn, error};
6use crate::db_manager::error::DatabaseError;
7
8/// Manages database connections across the application
9pub struct ConnectionManager {
10    /// Base URL for database connections
11    base_url: String,
12    
13    /// Main application database pool
14    main_pool: Pool<MySql>,
15    
16    /// Platform-specific database pools
17    platform_pools: Arc<RwLock<HashMap<i64, Pool<MySql>>>>,
18}
19
20impl ConnectionManager {
21    /// Creates a new connection manager
22    pub async fn new(base_url: &str) -> Result<Self, DatabaseError> {
23        // Connect to the MySQL server without specifying a database
24        info!("Connecting to MySQL server at {}", base_url);
25        let server_pool = MySqlPool::connect(base_url)
26            .await
27            .map_err(|e| DatabaseError::ConnectionError(e.to_string()))?;
28            
29        // Ensure the main database exists
30        Self::ensure_database_exists(&server_pool, "omni").await?;
31            
32        // Connect to the main database
33        let main_db_url = format!("{}/omni", base_url);
34        info!("Connecting to main database at {}", main_db_url);
35        let main_pool = MySqlPool::connect(&main_db_url)
36            .await
37            .map_err(|e| DatabaseError::ConnectionError(format!(
38                "Failed to connect to main database: {}", e
39            )))?;
40        
41        info!("✓ Database connection established");
42            
43        Ok(Self {
44            base_url: base_url.to_string(),
45            main_pool,
46            platform_pools: Arc::new(RwLock::new(HashMap::new())),
47        })
48    }
49    
50    /// Ensures a database exists, creating it if necessary
51    pub async fn ensure_database_exists(pool: &Pool<MySql>, db_name: &str) -> Result<(), DatabaseError> {
52        info!("Ensuring database exists: {}", db_name);
53        let query = format!("CREATE DATABASE IF NOT EXISTS `{}`", db_name);
54        sqlx::query(&query)
55            .execute(pool)
56            .await
57            .map_err(|e| DatabaseError::SqlxError(e))?;
58            
59        info!("✓ Database {} exists or was created", db_name);
60        Ok(())
61    }
62    
63    /// Gets the main database pool
64    pub fn main_pool(&self) -> &Pool<MySql> {
65        &self.main_pool
66    }
67    
68    /// Gets or creates a platform-specific database pool
69    pub async fn platform_pool(&self, platform_id: i64, platform_name: &str) -> Result<Pool<MySql>, DatabaseError> {
70        // Check if we already have this pool
71        {
72            let pools = self.platform_pools.read().await;
73            if let Some(pool) = pools.get(&platform_id) {
74                return Ok(pool.clone());
75            }
76        }
77        
78        // If not found, create a new pool
79        let db_name = format!("omni_p_{}", platform_name);
80        
81        // Ensure the database exists
82        let server_pool = MySqlPool::connect(&self.base_url)
83            .await
84            .map_err(|e| DatabaseError::ConnectionError(e.to_string()))?;
85            
86        Self::ensure_database_exists(&server_pool, &db_name).await?;
87        
88        // Connect to the platform database
89        let platform_db_url = format!("{}/{}", self.base_url, db_name);
90        info!("Creating pool for platform {}: {}", platform_name, platform_db_url);
91        
92        let pool = MySqlPool::connect(&platform_db_url)
93            .await
94            .map_err(|e| DatabaseError::ConnectionError(format!(
95                "Failed to connect to platform database {}: {}", 
96                db_name, e
97            )))?;
98            
99        // Store the pool
100        {
101            let mut pools = self.platform_pools.write().await;
102            pools.insert(platform_id, pool.clone());
103        }
104        
105        Ok(pool)
106    }
107}