omni_orchestrator/initialization/
setup_schema.rs

1use anyhow::Result;
2use colored::Colorize;
3use crate::schemas;
4use crate::logging;
5use crate::PROJECT_ROOT;
6
7/// Loads and initializes the ClickHouse schema from SQL files.
8///
9/// - Retrieves the current schema version from the MySQL metadata table.
10/// - Constructs the path to the schema file based on the version.
11/// - Initializes the ClickHouse schema by executing the SQL file.
12/// - Panics if schema initialization fails.
13///
14/// # Arguments
15/// * `clickhouse_client` - Reference to the ClickHouse client.
16/// * `pool` - Reference to the MySQL connection pool.
17///
18/// # Errors
19/// Returns an error if schema loading or initialization fails.
20pub async fn setup_schema(
21    clickhouse_client: &clickhouse::Client,
22    pool: &sqlx::Pool<sqlx::MySql>
23) -> Result<()> {
24    log::info!("{}", "Loading schema files...".blue());
25    // Get the current schema version from metadata
26    let schema_version =
27        schemas::v1::db::queries::metadata::get_meta_value(pool, "omni_schema_version")
28            .await
29            .unwrap_or_else(|_| "1".to_string());
30
31    // Build the path to the schema file
32    let schema_path = format!("{}/sql/v{}/clickhouse_up.sql", PROJECT_ROOT, schema_version);
33    log::info!(
34        "{}",
35        format!("Loading schema from path: {}", schema_path).blue()
36    );
37
38    log::info!("{}", "Initializing ClickHouse schema...".blue());
39    // Initialize the ClickHouse schema
40    match schemas::v1::api::logging::init_clickhouse_db(clickhouse_client, &schema_path).await {
41        Ok(_) => log::info!("{}", "✓ ClickHouse schema initialized".green()),
42        Err(e) => {
43            log::error!(
44                "{}",
45                format!("Failed to initialize ClickHouse schema: {:?}", e).red()
46            );
47            panic!("Failed to initialize ClickHouse schema");
48        }
49    };
50
51    Ok(())
52}