omni_orchestrator/db_manager/
connection.rs1use sqlx::{MySql, MySqlPool, Pool};
2use std::sync::Arc;
3use tokio::sync::RwLock;
4use std::collections::HashMap;
5use log::{info, warn, error};
6use crate::db_manager::error::DatabaseError;
7
8pub struct ConnectionManager {
10 base_url: String,
12
13 main_pool: Pool<MySql>,
15
16 platform_pools: Arc<RwLock<HashMap<i64, Pool<MySql>>>>,
18}
19
20impl ConnectionManager {
21 pub async fn new(base_url: &str) -> Result<Self, DatabaseError> {
23 info!("Connecting to MySQL server at {}", base_url);
25 let server_pool = MySqlPool::connect(base_url)
26 .await
27 .map_err(|e| DatabaseError::ConnectionError(e.to_string()))?;
28
29 Self::ensure_database_exists(&server_pool, "omni").await?;
31
32 let main_db_url = format!("{}/omni", base_url);
34 info!("Connecting to main database at {}", main_db_url);
35 let main_pool = MySqlPool::connect(&main_db_url)
36 .await
37 .map_err(|e| DatabaseError::ConnectionError(format!(
38 "Failed to connect to main database: {}", e
39 )))?;
40
41 info!("✓ Database connection established");
42
43 Ok(Self {
44 base_url: base_url.to_string(),
45 main_pool,
46 platform_pools: Arc::new(RwLock::new(HashMap::new())),
47 })
48 }
49
50 pub async fn ensure_database_exists(pool: &Pool<MySql>, db_name: &str) -> Result<(), DatabaseError> {
52 info!("Ensuring database exists: {}", db_name);
53 let query = format!("CREATE DATABASE IF NOT EXISTS `{}`", db_name);
54 sqlx::query(&query)
55 .execute(pool)
56 .await
57 .map_err(|e| DatabaseError::SqlxError(e))?;
58
59 info!("✓ Database {} exists or was created", db_name);
60 Ok(())
61 }
62
63 pub fn main_pool(&self) -> &Pool<MySql> {
65 &self.main_pool
66 }
67
68 pub async fn platform_pool(&self, platform_id: i64, platform_name: &str) -> Result<Pool<MySql>, DatabaseError> {
70 {
72 let pools = self.platform_pools.read().await;
73 if let Some(pool) = pools.get(&platform_id) {
74 return Ok(pool.clone());
75 }
76 }
77
78 let db_name = format!("omni_p_{}", platform_name);
80
81 let server_pool = MySqlPool::connect(&self.base_url)
83 .await
84 .map_err(|e| DatabaseError::ConnectionError(e.to_string()))?;
85
86 Self::ensure_database_exists(&server_pool, &db_name).await?;
87
88 let platform_db_url = format!("{}/{}", self.base_url, db_name);
90 info!("Creating pool for platform {}: {}", platform_name, platform_db_url);
91
92 let pool = MySqlPool::connect(&platform_db_url)
93 .await
94 .map_err(|e| DatabaseError::ConnectionError(format!(
95 "Failed to connect to platform database {}: {}",
96 db_name, e
97 )))?;
98
99 {
101 let mut pools = self.platform_pools.write().await;
102 pools.insert(platform_id, pool.clone());
103 }
104
105 Ok(pool)
106 }
107}