omni_orchestrator/schemas/v1/db/queries/
cost.rs

1use anyhow::Context;
2use serde::Serialize;
3use sqlx::{MySql, Pool};
4use sqlx::Row;
5use chrono::{DateTime, Utc};
6
7use libomni::types::db::v1 as types;
8use types::cost::{CostMetric, CostMetricWithType, ResourcePricing, CostBudget, CostProjection, CostAllocationTag};
9use types::util_tables::ResourceType;
10
11/// Retrieves a paginated list of resource types from the database.
12///
13/// # Arguments
14///
15/// * `pool` - Database connection pool for executing the query
16/// * `page` - Zero-based page number (e.g., 0 for first page, 1 for second page)
17/// * `per_page` - Number of records to fetch per page
18///
19/// # Returns
20///
21/// * `Ok(Vec<ResourceType>)` - Successfully retrieved list of resource types
22/// * `Err(anyhow::Error)` - Failed to fetch resource types, with context
23pub async fn list_resource_types(pool: &Pool<MySql>, page: i64, per_page: i64) -> anyhow::Result<Vec<ResourceType>> {
24    println!("Attempting to fetch resource types from database...");
25
26    let result = sqlx::query_as::<_, ResourceType>(
27        r#"
28        SELECT * FROM resource_types
29        ORDER BY name ASC
30        LIMIT ? OFFSET ?
31        "#,
32    )
33    .bind(per_page)
34    .bind(page * per_page)
35    .fetch_all(pool)
36    .await;
37
38    match result {
39        Ok(types) => {
40            println!("Successfully fetched {} resource types", types.len());
41            Ok(types)
42        }
43        Err(e) => {
44            eprintln!("Error fetching resource types: {:#?}", e);
45            Err(anyhow::Error::new(e).context("Failed to fetch resource types"))
46        }
47    }
48}
49
50/// Counts the total number of resource types in the database.
51pub async fn count_resource_types(pool: &Pool<MySql>) -> anyhow::Result<i64> {
52    let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM resource_types")
53        .fetch_one(pool)
54        .await
55        .context("Failed to count resource types")?;
56
57    Ok(count)
58}
59
60/// Retrieves a specific resource type by its unique identifier.
61pub async fn get_resource_type_by_id(pool: &Pool<MySql>, id: i32) -> anyhow::Result<ResourceType> {
62    let resource_type = sqlx::query_as::<_, ResourceType>("SELECT * FROM resource_types WHERE id = ?")
63        .bind(id)
64        .fetch_one(pool)
65        .await
66        .context("Failed to fetch resource type")?;
67
68    Ok(resource_type)
69}
70
71/// Creates a new resource type in the database.
72pub async fn create_resource_type(
73    pool: &Pool<MySql>,
74    name: &str,
75    category: &str,
76    unit_of_measurement: &str,
77    description: Option<&str>
78) -> anyhow::Result<ResourceType> {
79    let mut tx = pool.begin().await?;
80
81    let resource_type = sqlx::query_as::<_, ResourceType>(
82        r#"INSERT INTO resource_types (
83            name, category, unit_of_measurement, description
84        ) VALUES (?, ?, ?, ?)"#,
85    )
86    .bind(name)
87    .bind(category)
88    .bind(unit_of_measurement)
89    .bind(description)
90    .fetch_one(&mut *tx)
91    .await
92    .context("Failed to create resource type")?;
93
94    tx.commit().await?;
95    Ok(resource_type)
96}
97
98/// Updates an existing resource type in the database.
99pub async fn update_resource_type(
100    pool: &Pool<MySql>,
101    id: i32,
102    name: Option<&str>,
103    category: Option<&str>,
104    unit_of_measurement: Option<&str>,
105    description: Option<&str>
106) -> anyhow::Result<ResourceType> {
107    // Define which fields are being updated
108    let update_fields = [
109        (name.is_some(), "name = ?"),
110        (category.is_some(), "category = ?"),
111        (unit_of_measurement.is_some(), "unit_of_measurement = ?"),
112        (description.is_some(), "description = ?"),
113    ];
114
115    // Build update query with only the fields that have values
116    let field_clauses = update_fields
117        .iter()
118        .filter(|(has_value, _)| *has_value)
119        .map(|(_, field)| format!(", {}", field))
120        .collect::<String>();
121
122    let query = format!(
123        "UPDATE resource_types SET updated_at = CURRENT_TIMESTAMP{} WHERE id = ?",
124        field_clauses
125    );
126
127    // Start binding parameters
128    let mut db_query = sqlx::query_as::<_, ResourceType>(&query);
129
130    // Bind parameters
131    if let Some(val) = name {
132        db_query = db_query.bind(val);
133    }
134    if let Some(val) = category {
135        db_query = db_query.bind(val);
136    }
137    if let Some(val) = unit_of_measurement {
138        db_query = db_query.bind(val);
139    }
140    if let Some(val) = description {
141        db_query = db_query.bind(val);
142    }
143
144    // Bind the ID parameter
145    db_query = db_query.bind(id);
146
147    // Execute the query in a transaction
148    let mut tx = pool.begin().await?;
149    let resource_type = db_query
150        .fetch_one(&mut *tx)
151        .await
152        .context("Failed to update resource type")?;
153
154    tx.commit().await?;
155    Ok(resource_type)
156}
157
158/// Deletes a resource type from the database.
159pub async fn delete_resource_type(pool: &Pool<MySql>, id: i32) -> anyhow::Result<()> {
160    let mut tx = pool.begin().await?;
161
162    sqlx::query("DELETE FROM resource_types WHERE id = ?")
163        .bind(id)
164        .execute(&mut *tx)
165        .await
166        .context("Failed to delete resource type")?;
167
168    tx.commit().await?;
169    Ok(())
170}
171
172/// Retrieves a paginated list of cost metrics from the database, with optional filtering.
173///
174/// # Arguments
175///
176/// * `pool` - Database connection pool for executing the query
177/// * `page` - Zero-based page number (e.g., 0 for first page, 1 for second page)
178/// * `per_page` - Number of records to fetch per page
179/// * `resource_type_id` - Optional filter by resource type
180/// * `provider_id` - Optional filter by provider
181/// * `app_id` - Optional filter by application
182/// * `start_date` - Optional filter for metrics after this date
183/// * `end_date` - Optional filter for metrics before this date
184/// * `billing_period` - Optional filter by billing period (e.g., "2025-05")
185///
186/// # Returns
187///
188/// * `Ok(Vec<CostMetricWithType>)` - Successfully retrieved list of cost metrics with type information
189/// * `Err(anyhow::Error)` - Failed to fetch cost metrics, with context
190pub async fn list_cost_metrics(
191    pool: &Pool<MySql>, 
192    page: i64, 
193    per_page: i64,
194    resource_type_id: Option<i32>,
195    provider_id: Option<i64>,
196    app_id: Option<i64>,
197    start_date: Option<DateTime<Utc>>,
198    end_date: Option<DateTime<Utc>>,
199    billing_period: Option<&str>
200) -> anyhow::Result<Vec<CostMetricWithType>> {
201    println!("Attempting to fetch cost metrics from database with filters...");
202
203    // Start building the query with filters
204    let mut query = String::from(
205        r#"
206        SELECT 
207            cm.*,
208            rt.name as resource_type_name,
209            rt.category as resource_type_category,
210            rt.unit_of_measurement
211        FROM 
212            cost_metrics cm
213        JOIN 
214            resource_types rt ON cm.resource_type_id = rt.id
215        WHERE 1=1
216        "#
217    );
218    
219    let mut params: Vec<String> = Vec::new();
220    
221    if resource_type_id.is_some() {
222        query.push_str(" AND cm.resource_type_id = ?");
223        params.push("resource_type_id".to_string());
224    }
225    
226    if provider_id.is_some() {
227        query.push_str(" AND cm.provider_id = ?");
228        params.push("provider_id".to_string());
229    }
230    
231    if app_id.is_some() {
232        query.push_str(" AND cm.app_id = ?");
233        params.push("app_id".to_string());
234    }
235    
236    if start_date.is_some() {
237        query.push_str(" AND cm.start_time >= ?");
238        params.push("start_date".to_string());
239    }
240    
241    if end_date.is_some() {
242        query.push_str(" AND cm.end_time <= ?");
243        params.push("end_date".to_string());
244    }
245    
246    if billing_period.is_some() {
247        query.push_str(" AND cm.billing_period = ?");
248        params.push("billing_period".to_string());
249    }
250    
251    // Add ordering and pagination
252    query.push_str(" ORDER BY cm.start_time DESC LIMIT ? OFFSET ?");
253    
254    // Prepare the query
255    let mut db_query = sqlx::query_as::<_, CostMetricWithType>(&query);
256    
257    // Bind parameters
258    if let Some(val) = resource_type_id {
259        db_query = db_query.bind(val);
260    }
261    
262    if let Some(val) = provider_id {
263        db_query = db_query.bind(val);
264    }
265    
266    if let Some(val) = app_id {
267        db_query = db_query.bind(val);
268    }
269    
270    if let Some(val) = start_date {
271        db_query = db_query.bind(val);
272    }
273    
274    if let Some(val) = end_date {
275        db_query = db_query.bind(val);
276    }
277    
278    if let Some(val) = billing_period {
279        db_query = db_query.bind(val);
280    }
281    
282    // Add pagination parameters
283    db_query = db_query.bind(per_page).bind(page * per_page);
284    
285    // Execute query
286    let result = db_query.fetch_all(pool).await;
287
288    match result {
289        Ok(metrics) => {
290            println!("Successfully fetched {} cost metrics", metrics.len());
291            Ok(metrics)
292        }
293        Err(e) => {
294            eprintln!("Error fetching cost metrics: {:#?}", e);
295            Err(anyhow::Error::new(e).context("Failed to fetch cost metrics"))
296        }
297    }
298}
299
300/// Counts the total number of cost metrics in the database with optional filtering.
301pub async fn count_cost_metrics(
302    pool: &Pool<MySql>,
303    resource_type_id: Option<i32>,
304    provider_id: Option<i64>,
305    app_id: Option<i64>,
306    start_date: Option<DateTime<Utc>>,
307    end_date: Option<DateTime<Utc>>,
308    billing_period: Option<&str>
309) -> anyhow::Result<i64> {
310    // Start building the query with filters
311    let mut query = String::from(
312        "SELECT COUNT(*) FROM cost_metrics WHERE 1=1"
313    );
314    
315    if resource_type_id.is_some() {
316        query.push_str(" AND resource_type_id = ?");
317    }
318    
319    if provider_id.is_some() {
320        query.push_str(" AND provider_id = ?");
321    }
322    
323    if app_id.is_some() {
324        query.push_str(" AND app_id = ?");
325    }
326    
327    if start_date.is_some() {
328        query.push_str(" AND start_time >= ?");
329    }
330    
331    if end_date.is_some() {
332        query.push_str(" AND end_time <= ?");
333    }
334    
335    if billing_period.is_some() {
336        query.push_str(" AND billing_period = ?");
337    }
338    
339    // Prepare the query
340    let mut db_query = sqlx::query_scalar::<_, i64>(&query);
341    
342    // Bind parameters
343    if let Some(val) = resource_type_id {
344        db_query = db_query.bind(val);
345    }
346    
347    if let Some(val) = provider_id {
348        db_query = db_query.bind(val);
349    }
350    
351    if let Some(val) = app_id {
352        db_query = db_query.bind(val);
353    }
354    
355    if let Some(val) = start_date {
356        db_query = db_query.bind(val);
357    }
358    
359    if let Some(val) = end_date {
360        db_query = db_query.bind(val);
361    }
362    
363    if let Some(val) = billing_period {
364        db_query = db_query.bind(val);
365    }
366    
367    let count = db_query
368        .fetch_one(pool)
369        .await
370        .context("Failed to count cost metrics")?;
371
372    Ok(count)
373}
374
375/// Creates a new cost metric in the database.
376pub async fn create_cost_metric(
377    pool: &Pool<MySql>,
378    resource_type_id: i32,
379    provider_id: Option<i64>,
380    region_id: Option<i64>,
381    app_id: Option<i64>,
382    worker_id: Option<i64>,
383    org_id: Option<i64>,
384    start_time: DateTime<Utc>,
385    end_time: DateTime<Utc>,
386    usage_quantity: f64,
387    unit_cost: f64,
388    currency: &str,
389    total_cost: f64,
390    discount_percentage: Option<f64>,
391    discount_reason: Option<&str>,
392    billing_period: Option<&str>
393) -> anyhow::Result<CostMetric> {
394    let mut tx = pool.begin().await?;
395
396    let cost_metric = sqlx::query_as::<_, CostMetric>(
397        r#"INSERT INTO cost_metrics (
398            resource_type_id, provider_id, region_id, app_id, worker_id, org_id,
399            start_time, end_time, usage_quantity, unit_cost, currency, total_cost,
400            discount_percentage, discount_reason, billing_period
401        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
402    )
403    .bind(resource_type_id)
404    .bind(provider_id)
405    .bind(region_id)
406    .bind(app_id)
407    .bind(worker_id)
408    .bind(org_id)
409    .bind(start_time)
410    .bind(end_time)
411    .bind(usage_quantity)
412    .bind(unit_cost)
413    .bind(currency)
414    .bind(total_cost)
415    .bind(discount_percentage)
416    .bind(discount_reason)
417    .bind(billing_period)
418    .fetch_one(&mut *tx)
419    .await
420    .context("Failed to create cost metric")?;
421
422    tx.commit().await?;
423    Ok(cost_metric)
424}
425
426/// Retrieves a specific cost metric by its unique identifier, with resource type information.
427pub async fn get_cost_metric_by_id(pool: &Pool<MySql>, id: i64) -> anyhow::Result<CostMetricWithType> {
428    let cost_metric = sqlx::query_as::<_, CostMetricWithType>(
429        r#"
430        SELECT 
431            cm.id,
432            cm.resource_type_id,
433            cm.provider_id,
434            cm.region_id,
435            cm.app_id,
436            cm.worker_id,
437            cm.org_id,
438            cm.start_time,
439            cm.end_time,
440            cm.usage_quantity,
441            cm.unit_cost,
442            cm.currency,
443            cm.total_cost,
444            cm.discount_percentage,
445            cm.discount_reason,
446            cm.billing_period,
447            cm.created_at,
448            cm.updated_at,
449            rt.name AS resource_type_name,
450            rt.category AS resource_type_category,
451            rt.unit_of_measurement
452        FROM 
453            cost_metrics cm
454        JOIN 
455            resource_types rt ON cm.resource_type_id = rt.id
456        WHERE 
457            cm.id = ?
458        "#
459    )
460    .bind(id)
461    .fetch_one(pool)
462    .await
463    .map_err(|e| {
464        eprintln!("Failed to fetch cost metric by id {}: {:?}", id, e);
465        anyhow::Error::new(e).context(format!("Failed to fetch cost metric with id {}", id))
466    })?
467;
468
469    Ok(cost_metric)
470}
471
472/// Deletes a cost metric from the database.
473pub async fn delete_cost_metric(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
474    let mut tx = pool.begin().await?;
475
476    sqlx::query("DELETE FROM cost_metrics WHERE id = ?")
477        .bind(id)
478        .execute(&mut *tx)
479        .await
480        .context("Failed to delete cost metric")?;
481
482    tx.commit().await?;
483    Ok(())
484}
485
486/// Get aggregate cost metrics grouped by a specific dimension.
487///
488/// # Arguments
489///
490/// * `pool` - Database connection pool for executing the query
491/// * `dimension` - Dimension to group by ('app', 'provider', 'resource_type', 'region', 'worker', 'org')
492/// * `start_date` - Filter for metrics after this date
493/// * `end_date` - Filter for metrics before this date
494/// * `limit` - Maximum number of results to return
495///
496/// # Returns
497///
498/// * `Ok(Vec<(String, f64)>)` - Successfully retrieved aggregated costs by dimension
499/// * `Err(anyhow::Error)` - Failed to fetch cost metrics, with context
500pub async fn get_cost_metrics_by_dimension(
501    pool: &Pool<MySql>,
502    dimension: &str,
503    start_date: DateTime<Utc>,
504    end_date: DateTime<Utc>,
505    limit: i64
506) -> anyhow::Result<Vec<(String, f64)>> {
507    // Validate and map the dimension to the appropriate SQL expression
508    let (group_field, join_clause) = match dimension {
509        "app" => ("apps.name", "LEFT JOIN apps ON cost_metrics.app_id = apps.id"),
510        "provider" => ("providers.name", "LEFT JOIN providers ON cost_metrics.provider_id = providers.id"),
511        "resource_type" => ("resource_types.name", "LEFT JOIN resource_types ON cost_metrics.resource_type_id = resource_types.id"),
512        "region" => ("regions.name", "LEFT JOIN regions ON cost_metrics.region_id = regions.id"),
513        "worker" => ("workers.name", "LEFT JOIN workers ON cost_metrics.worker_id = workers.id"),
514        "org" => ("orgs.name", "LEFT JOIN orgs ON cost_metrics.org_id = orgs.id"),
515        _ => return Err(anyhow::anyhow!("Invalid dimension: {}", dimension)),
516    };
517
518    let query = format!(
519        r#"
520        SELECT 
521            {} as name,
522            SUM(total_cost) as total_cost
523        FROM 
524            cost_metrics
525        {}
526        WHERE 
527            start_time >= ? AND end_time <= ?
528        GROUP BY 
529            name
530        ORDER BY 
531            total_cost DESC
532        LIMIT ?
533        "#,
534        group_field, join_clause
535    );
536
537    let results = sqlx::query(&query)
538        .bind(start_date)
539        .bind(end_date)
540        .bind(limit)
541        .map(|row: sqlx::mysql::MySqlRow| {
542            let name: String = row.get("name");
543            let total_cost: f64 = row.get("total_cost");
544            (name, total_cost)
545        })
546        .fetch_all(pool)
547        .await
548        .context("Failed to fetch aggregated cost metrics")?;
549
550    Ok(results)
551}
552
553/// Retrieves cost metrics over time for a specific application.
554///
555/// # Arguments
556///
557/// * `pool` - Database connection pool for executing the query
558/// * `app_id` - ID of the application to analyze
559/// * `interval` - Time interval ('day', 'week', 'month')
560/// * `start_date` - Filter for metrics after this date
561/// * `end_date` - Filter for metrics before this date
562///
563/// # Returns
564///
565/// * `Ok(Vec<(DateTime<Utc>, f64)>)` - Successfully retrieved costs over time
566/// * `Err(anyhow::Error)` - Failed to fetch cost metrics, with context
567pub async fn get_app_cost_over_time(
568    pool: &Pool<MySql>,
569    app_id: i64,
570    interval: &str,
571    start_date: DateTime<Utc>,
572    end_date: DateTime<Utc>
573) -> anyhow::Result<Vec<(DateTime<Utc>, f64)>> {
574    // Map the interval to the appropriate SQL date function
575    let date_function = match interval {
576        "day" => "DATE(start_time)",
577        "week" => "DATE(start_time - INTERVAL WEEKDAY(start_time) DAY)", // First day of week
578        "month" => "DATE_FORMAT(start_time, '%Y-%m-01')", // First day of month
579        _ => return Err(anyhow::anyhow!("Invalid interval: {}", interval)),
580    };
581
582    let query = format!(
583        r#"
584        SELECT 
585            {} as time_bucket,
586            SUM(total_cost) as total_cost
587        FROM 
588            cost_metrics
589        WHERE 
590            app_id = ? AND start_time >= ? AND end_time <= ?
591        GROUP BY 
592            time_bucket
593        ORDER BY 
594            time_bucket ASC
595        "#,
596        date_function
597    );
598
599    let results = sqlx::query(&query)
600        .bind(app_id)
601        .bind(start_date)
602        .bind(end_date)
603        .map(|row: sqlx::mysql::MySqlRow| {
604            let time_bucket: DateTime<Utc> = row.get("time_bucket");
605            let total_cost: f64 = row.get("total_cost");
606            (time_bucket, total_cost)
607        })
608        .fetch_all(pool)
609        .await
610        .context("Failed to fetch app cost over time")?;
611
612    Ok(results)
613}
614
615/// Retrieves a paginated list of cost budgets from the database.
616pub async fn list_cost_budgets(pool: &Pool<MySql>, page: i64, per_page: i64) -> anyhow::Result<Vec<CostBudget>> {
617    println!("Attempting to fetch cost budgets from database...");
618
619    let result = sqlx::query_as::<_, CostBudget>(
620        r#"
621        SELECT * FROM cost_budgets
622        ORDER BY created_at DESC
623        LIMIT ? OFFSET ?
624        "#,
625    )
626    .bind(per_page)
627    .bind(page * per_page)
628    .fetch_all(pool)
629    .await;
630
631    match result {
632        Ok(budgets) => {
633            println!("Successfully fetched {} cost budgets", budgets.len());
634            Ok(budgets)
635        }
636        Err(e) => {
637            eprintln!("Error fetching cost budgets: {:#?}", e);
638            Err(anyhow::Error::new(e).context("Failed to fetch cost budgets"))
639        }
640    }
641}
642
643/// Counts the total number of cost budgets in the database.
644pub async fn count_cost_budgets(pool: &Pool<MySql>) -> anyhow::Result<i64> {
645    let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM cost_budgets")
646        .fetch_one(pool)
647        .await
648        .context("Failed to count cost budgets")?;
649
650    Ok(count)
651}
652
653/// Creates a new cost budget in the database.
654pub async fn create_cost_budget(
655    pool: &Pool<MySql>,
656    org_id: i64,
657    app_id: Option<i64>,
658    budget_name: &str,
659    budget_amount: f64,
660    currency: &str,
661    budget_period: &str,
662    period_start: DateTime<Utc>,
663    period_end: DateTime<Utc>,
664    alert_threshold_percentage: f64,
665    alert_contacts: &str,
666    created_by: i64
667) -> anyhow::Result<CostBudget> {
668    let mut tx = pool.begin().await?;
669
670    let cost_budget = sqlx::query_as::<_, CostBudget>(
671        r#"INSERT INTO cost_budgets (
672            org_id, app_id, budget_name, budget_amount, currency, budget_period,
673            period_start, period_end, alert_threshold_percentage, alert_contacts,
674            is_active, created_by
675        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, TRUE, ?)"#,
676    )
677    .bind(org_id)
678    .bind(app_id)
679    .bind(budget_name)
680    .bind(budget_amount)
681    .bind(currency)
682    .bind(budget_period)
683    .bind(period_start)
684    .bind(period_end)
685    .bind(alert_threshold_percentage)
686    .bind(alert_contacts)
687    .bind(created_by)
688    .fetch_one(&mut *tx)
689    .await
690    .context("Failed to create cost budget")?;
691
692    tx.commit().await?;
693    Ok(cost_budget)
694}
695
696/// Retrieves a specific cost budget by its unique identifier.
697pub async fn get_cost_budget_by_id(pool: &Pool<MySql>, id: i64) -> anyhow::Result<CostBudget> {
698    let cost_budget = sqlx::query_as::<_, CostBudget>("SELECT * FROM cost_budgets WHERE id = ?")
699        .bind(id)
700        .fetch_one(pool)
701        .await
702        .context("Failed to fetch cost budget")?;
703
704    Ok(cost_budget)
705}
706
707/// Updates an existing cost budget in the database.
708pub async fn update_cost_budget(
709    pool: &Pool<MySql>,
710    id: i64,
711    budget_name: Option<&str>,
712    budget_amount: Option<f64>,
713    alert_threshold_percentage: Option<f64>,
714    alert_contacts: Option<&str>,
715    is_active: Option<bool>
716) -> anyhow::Result<CostBudget> {
717    // Define which fields are being updated
718    let update_fields = [
719        (budget_name.is_some(), "budget_name = ?"),
720        (budget_amount.is_some(), "budget_amount = ?"),
721        (alert_threshold_percentage.is_some(), "alert_threshold_percentage = ?"),
722        (alert_contacts.is_some(), "alert_contacts = ?"),
723        (is_active.is_some(), "is_active = ?"),
724    ];
725
726    // Build update query with only the fields that have values
727    let field_clauses = update_fields
728        .iter()
729        .filter(|(has_value, _)| *has_value)
730        .map(|(_, field)| format!(", {}", field))
731        .collect::<String>();
732
733    let query = format!(
734        "UPDATE cost_budgets SET updated_at = CURRENT_TIMESTAMP{} WHERE id = ?",
735        field_clauses
736    );
737
738    // Start binding parameters
739    let mut db_query = sqlx::query_as::<_, CostBudget>(&query);
740
741    // Bind parameters
742    if let Some(val) = budget_name {
743        db_query = db_query.bind(val);
744    }
745    if let Some(val) = budget_amount {
746        db_query = db_query.bind(val);
747    }
748    if let Some(val) = alert_threshold_percentage {
749        db_query = db_query.bind(val);
750    }
751    if let Some(val) = alert_contacts {
752        db_query = db_query.bind(val);
753    }
754    if let Some(val) = is_active {
755        db_query = db_query.bind(val);
756    }
757
758    // Bind the ID parameter
759    db_query = db_query.bind(id);
760
761    // Execute the query in a transaction
762    let mut tx = pool.begin().await?;
763    let cost_budget = db_query
764        .fetch_one(&mut *tx)
765        .await
766        .context("Failed to update cost budget")?;
767
768    tx.commit().await?;
769    Ok(cost_budget)
770}
771
772/// Deletes a cost budget from the database.
773pub async fn delete_cost_budget(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
774    let mut tx = pool.begin().await?;
775
776    sqlx::query("DELETE FROM cost_budgets WHERE id = ?")
777        .bind(id)
778        .execute(&mut *tx)
779        .await
780        .context("Failed to delete cost budget")?;
781
782    tx.commit().await?;
783    Ok(())
784}
785
786/// Retrieves a paginated list of cost projections from the database.
787pub async fn list_cost_projections(pool: &Pool<MySql>, page: i64, per_page: i64) -> anyhow::Result<Vec<CostProjection>> {
788    println!("Attempting to fetch cost projections from database...");
789
790    let result = sqlx::query_as::<_, CostProjection>(
791        r#"
792        SELECT * FROM cost_projections
793        ORDER BY created_at DESC
794        LIMIT ? OFFSET ?
795        "#,
796    )
797    .bind(per_page)
798    .bind(page * per_page)
799    .fetch_all(pool)
800    .await;
801
802    match result {
803        Ok(projections) => {
804            println!("Successfully fetched {} cost projections", projections.len());
805            Ok(projections)
806        }
807        Err(e) => {
808            eprintln!("Error fetching cost projections: {:#?}", e);
809            Err(anyhow::Error::new(e).context("Failed to fetch cost projections"))
810        }
811    }
812}
813
814/// Creates a new cost projection in the database.
815pub async fn create_cost_projection(
816    pool: &Pool<MySql>,
817    org_id: i64,
818    app_id: Option<i64>,
819    projection_period: &str,
820    start_date: DateTime<Utc>,
821    end_date: DateTime<Utc>,
822    projected_cost: f64,
823    currency: &str,
824    projection_model: &str,
825    confidence_level: Option<f64>,
826    metadata: Option<&str>
827) -> anyhow::Result<CostProjection> {
828    let mut tx = pool.begin().await?;
829
830    let cost_projection = sqlx::query_as::<_, CostProjection>(
831        r#"INSERT INTO cost_projections (
832            org_id, app_id, projection_period, start_date, end_date,
833            projected_cost, currency, projection_model, confidence_level, metadata
834        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
835    )
836    .bind(org_id)
837    .bind(app_id)
838    .bind(projection_period)
839    .bind(start_date)
840    .bind(end_date)
841    .bind(projected_cost)
842    .bind(currency)
843    .bind(projection_model)
844    .bind(confidence_level)
845    .bind(metadata)
846    .fetch_one(&mut *tx)
847    .await
848    .context("Failed to create cost projection")?;
849
850    tx.commit().await?;
851    Ok(cost_projection)
852}
853
854/// Retrieves a specific cost projection by its unique identifier.
855pub async fn get_cost_projection_by_id(pool: &Pool<MySql>, id: i64) -> anyhow::Result<CostProjection> {
856    let cost_projection = sqlx::query_as::<_, CostProjection>("SELECT * FROM cost_projections WHERE id = ?")
857        .bind(id)
858        .fetch_one(pool)
859        .await
860        .context("Failed to fetch cost projection")?;
861
862    Ok(cost_projection)
863}
864
865/// Deletes a cost projection from the database.
866pub async fn delete_cost_projection(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
867    let mut tx = pool.begin().await?;
868
869    sqlx::query("DELETE FROM cost_projections WHERE id = ?")
870        .bind(id)
871        .execute(&mut *tx)
872        .await
873        .context("Failed to delete cost projection")?;
874
875    tx.commit().await?;
876    Ok(())
877}
878
879/// Retrieves a paginated list of resource pricing entries from the database.
880pub async fn list_resource_pricing(
881    pool: &Pool<MySql>, 
882    page: i64, 
883    per_page: i64,
884    resource_type_id: Option<i32>,
885    provider_id: Option<i64>,
886    region_id: Option<i64>,
887    pricing_model: Option<&str>,
888    tier_name: Option<&str>
889) -> anyhow::Result<Vec<ResourcePricing>> {
890    println!("Attempting to fetch resource pricing from database with filters...");
891
892    // Start building the query with filters
893    let mut query = String::from(
894        "SELECT * FROM resource_pricing WHERE 1=1"
895    );
896    
897    if resource_type_id.is_some() {
898        query.push_str(" AND resource_type_id = ?");
899    }
900    
901    if provider_id.is_some() {
902        query.push_str(" AND provider_id = ?");
903    }
904    
905    if region_id.is_some() {
906        query.push_str(" AND region_id = ?");
907    }
908    
909    if pricing_model.is_some() {
910        query.push_str(" AND pricing_model = ?");
911    }
912    
913    if tier_name.is_some() {
914        query.push_str(" AND tier_name = ?");
915    }
916    
917    // Check for current pricing (effective_to is NULL or in the future)
918    query.push_str(" AND (effective_to IS NULL OR effective_to > CURRENT_TIMESTAMP)");
919    
920    // Add ordering and pagination
921    query.push_str(" ORDER BY created_at DESC LIMIT ? OFFSET ?");
922    
923    // Prepare the query
924    let mut db_query = sqlx::query_as::<_, ResourcePricing>(&query);
925    
926    // Bind parameters
927    if let Some(val) = resource_type_id {
928        db_query = db_query.bind(val);
929    }
930    
931    if let Some(val) = provider_id {
932        db_query = db_query.bind(val);
933    }
934    
935    if let Some(val) = region_id {
936        db_query = db_query.bind(val);
937    }
938    
939    if let Some(val) = pricing_model {
940        db_query = db_query.bind(val);
941    }
942    
943    if let Some(val) = tier_name {
944        db_query = db_query.bind(val);
945    }
946    
947    // Add pagination parameters
948    db_query = db_query.bind(per_page).bind(page * per_page);
949    
950    // Execute query
951    let result = db_query.fetch_all(pool).await;
952
953    match result {
954        Ok(pricing) => {
955            println!("Successfully fetched {} resource pricing entries", pricing.len());
956            Ok(pricing)
957        }
958        Err(e) => {
959            eprintln!("Error fetching resource pricing: {:#?}", e);
960            Err(anyhow::Error::new(e).context("Failed to fetch resource pricing"))
961        }
962    }
963}
964
965/// Creates a new resource pricing entry in the database.
966pub async fn create_resource_pricing(
967    pool: &Pool<MySql>,
968    resource_type_id: i32,
969    provider_id: i64,
970    region_id: Option<i64>,
971    tier_name: &str,
972    unit_price: f64,
973    currency: &str,
974    effective_from: DateTime<Utc>,
975    effective_to: Option<DateTime<Utc>>,
976    pricing_model: &str,
977    commitment_period: Option<&str>,
978    volume_discount_tiers: Option<&str>
979) -> anyhow::Result<ResourcePricing> {
980    let mut tx = pool.begin().await?;
981
982    let resource_pricing = sqlx::query_as::<_, ResourcePricing>(
983        r#"INSERT INTO resource_pricing (
984            resource_type_id, provider_id, region_id, tier_name, unit_price,
985            currency, effective_from, effective_to, pricing_model, 
986            commitment_period, volume_discount_tiers
987        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
988    )
989    .bind(resource_type_id)
990    .bind(provider_id)
991    .bind(region_id)
992    .bind(tier_name)
993    .bind(unit_price)
994    .bind(currency)
995    .bind(effective_from)
996    .bind(effective_to)
997    .bind(pricing_model)
998    .bind(commitment_period)
999    .bind(volume_discount_tiers)
1000    .fetch_one(&mut *tx)
1001    .await
1002    .context("Failed to create resource pricing")?;
1003
1004    tx.commit().await?;
1005    Ok(resource_pricing)
1006}
1007
1008/// Retrieves a specific resource pricing entry by its unique identifier.
1009pub async fn get_resource_pricing_by_id(pool: &Pool<MySql>, id: i64) -> anyhow::Result<ResourcePricing> {
1010    let resource_pricing = sqlx::query_as::<_, ResourcePricing>("SELECT * FROM resource_pricing WHERE id = ?")
1011        .bind(id)
1012        .fetch_one(pool)
1013        .await
1014        .context("Failed to fetch resource pricing")?;
1015
1016    Ok(resource_pricing)
1017}
1018
1019/// Updates an existing resource pricing entry in the database.
1020pub async fn update_resource_pricing(
1021    pool: &Pool<MySql>,
1022    id: i64,
1023    unit_price: Option<f64>,
1024    effective_to: Option<DateTime<Utc>>,
1025    volume_discount_tiers: Option<&str>
1026) -> anyhow::Result<ResourcePricing> {
1027    // Define which fields are being updated
1028    let update_fields = [
1029        (unit_price.is_some(), "unit_price = ?"),
1030        (effective_to.is_some(), "effective_to = ?"),
1031        (volume_discount_tiers.is_some(), "volume_discount_tiers = ?"),
1032    ];
1033
1034    // Build update query with only the fields that have values
1035    let field_clauses = update_fields
1036        .iter()
1037        .filter(|(has_value, _)| *has_value)
1038        .map(|(_, field)| format!(", {}", field))
1039        .collect::<String>();
1040
1041    let query = format!(
1042        "UPDATE resource_pricing SET updated_at = CURRENT_TIMESTAMP{} WHERE id = ?",
1043        field_clauses
1044    );
1045
1046    // Start binding parameters
1047    let mut db_query = sqlx::query_as::<_, ResourcePricing>(&query);
1048
1049    // Bind parameters
1050    if let Some(val) = unit_price {
1051        db_query = db_query.bind(val);
1052    }
1053    if let Some(val) = effective_to {
1054        db_query = db_query.bind(val);
1055    }
1056    if let Some(val) = volume_discount_tiers {
1057        db_query = db_query.bind(val);
1058    }
1059
1060    // Bind the ID parameter
1061    db_query = db_query.bind(id);
1062
1063    // Execute the query in a transaction
1064    let mut tx = pool.begin().await?;
1065    let resource_pricing = db_query
1066        .fetch_one(&mut *tx)
1067        .await
1068        .context("Failed to update resource pricing")?;
1069
1070    tx.commit().await?;
1071    Ok(resource_pricing)
1072}
1073
1074/// Deletes a resource pricing entry from the database.
1075pub async fn delete_resource_pricing(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
1076    let mut tx = pool.begin().await?;
1077
1078    sqlx::query("DELETE FROM resource_pricing WHERE id = ?")
1079        .bind(id)
1080        .execute(&mut *tx)
1081        .await
1082        .context("Failed to delete resource pricing")?;
1083
1084    tx.commit().await?;
1085    Ok(())
1086}
1087
1088/// Retrieves a list of cost allocation tags for a specific resource.
1089pub async fn get_cost_allocation_tags(
1090    pool: &Pool<MySql>,
1091    resource_id: i64,
1092    resource_type: &str
1093) -> anyhow::Result<Vec<CostAllocationTag>> {
1094    let tags = sqlx::query_as::<_, CostAllocationTag>(
1095        "SELECT * FROM cost_allocation_tags WHERE resource_id = ? AND resource_type = ?"
1096    )
1097    .bind(resource_id)
1098    .bind(resource_type)
1099    .fetch_all(pool)
1100    .await
1101    .context("Failed to fetch cost allocation tags")?;
1102
1103    Ok(tags)
1104}
1105
1106/// Creates a new cost allocation tag in the database.
1107pub async fn create_cost_allocation_tag(
1108    pool: &Pool<MySql>,
1109    tag_key: &str,
1110    tag_value: &str,
1111    resource_id: i64,
1112    resource_type: &str
1113) -> anyhow::Result<CostAllocationTag> {
1114    let mut tx = pool.begin().await?;
1115
1116    let tag = sqlx::query_as::<_, CostAllocationTag>(
1117        r#"INSERT INTO cost_allocation_tags (
1118            tag_key, tag_value, resource_id, resource_type
1119        ) VALUES (?, ?, ?, ?)"#,
1120    )
1121    .bind(tag_key)
1122    .bind(tag_value)
1123    .bind(resource_id)
1124    .bind(resource_type)
1125    .fetch_one(&mut *tx)
1126    .await
1127    .context("Failed to create cost allocation tag")?;
1128
1129    tx.commit().await?;
1130    Ok(tag)
1131}
1132
1133/// Deletes a cost allocation tag from the database.
1134pub async fn delete_cost_allocation_tag(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
1135    let mut tx = pool.begin().await?;
1136
1137    sqlx::query("DELETE FROM cost_allocation_tags WHERE id = ?")
1138        .bind(id)
1139        .execute(&mut *tx)
1140        .await
1141        .context("Failed to delete cost allocation tag")?;
1142
1143    tx.commit().await?;
1144    Ok(())
1145}