omni_orchestrator/schemas/v1/db/queries/
cost.rs

1use crate::models::cost::{CostMetric, CostMetricWithType, ResourcePricing, CostBudget, CostProjection, CostAllocationTag};
2use crate::models::util_tables::ResourceType;
3use anyhow::Context;
4use serde::Serialize;
5use sqlx::{MySql, Pool};
6use sqlx::Row;
7use chrono::{DateTime, Utc};
8
9/// Retrieves a paginated list of resource types from the database.
10///
11/// # Arguments
12///
13/// * `pool` - Database connection pool for executing the query
14/// * `page` - Zero-based page number (e.g., 0 for first page, 1 for second page)
15/// * `per_page` - Number of records to fetch per page
16///
17/// # Returns
18///
19/// * `Ok(Vec<ResourceType>)` - Successfully retrieved list of resource types
20/// * `Err(anyhow::Error)` - Failed to fetch resource types, with context
21pub async fn list_resource_types(pool: &Pool<MySql>, page: i64, per_page: i64) -> anyhow::Result<Vec<ResourceType>> {
22    println!("Attempting to fetch resource types from database...");
23
24    let result = sqlx::query_as::<_, ResourceType>(
25        r#"
26        SELECT * FROM resource_types
27        ORDER BY name ASC
28        LIMIT ? OFFSET ?
29        "#,
30    )
31    .bind(per_page)
32    .bind(page * per_page)
33    .fetch_all(pool)
34    .await;
35
36    match result {
37        Ok(types) => {
38            println!("Successfully fetched {} resource types", types.len());
39            Ok(types)
40        }
41        Err(e) => {
42            eprintln!("Error fetching resource types: {:#?}", e);
43            Err(anyhow::Error::new(e).context("Failed to fetch resource types"))
44        }
45    }
46}
47
48/// Counts the total number of resource types in the database.
49pub async fn count_resource_types(pool: &Pool<MySql>) -> anyhow::Result<i64> {
50    let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM resource_types")
51        .fetch_one(pool)
52        .await
53        .context("Failed to count resource types")?;
54
55    Ok(count)
56}
57
58/// Retrieves a specific resource type by its unique identifier.
59pub async fn get_resource_type_by_id(pool: &Pool<MySql>, id: i32) -> anyhow::Result<ResourceType> {
60    let resource_type = sqlx::query_as::<_, ResourceType>("SELECT * FROM resource_types WHERE id = ?")
61        .bind(id)
62        .fetch_one(pool)
63        .await
64        .context("Failed to fetch resource type")?;
65
66    Ok(resource_type)
67}
68
69/// Creates a new resource type in the database.
70pub async fn create_resource_type(
71    pool: &Pool<MySql>,
72    name: &str,
73    category: &str,
74    unit_of_measurement: &str,
75    description: Option<&str>
76) -> anyhow::Result<ResourceType> {
77    let mut tx = pool.begin().await?;
78
79    let resource_type = sqlx::query_as::<_, ResourceType>(
80        r#"INSERT INTO resource_types (
81            name, category, unit_of_measurement, description
82        ) VALUES (?, ?, ?, ?)"#,
83    )
84    .bind(name)
85    .bind(category)
86    .bind(unit_of_measurement)
87    .bind(description)
88    .fetch_one(&mut *tx)
89    .await
90    .context("Failed to create resource type")?;
91
92    tx.commit().await?;
93    Ok(resource_type)
94}
95
96/// Updates an existing resource type in the database.
97pub async fn update_resource_type(
98    pool: &Pool<MySql>,
99    id: i32,
100    name: Option<&str>,
101    category: Option<&str>,
102    unit_of_measurement: Option<&str>,
103    description: Option<&str>
104) -> anyhow::Result<ResourceType> {
105    // Define which fields are being updated
106    let update_fields = [
107        (name.is_some(), "name = ?"),
108        (category.is_some(), "category = ?"),
109        (unit_of_measurement.is_some(), "unit_of_measurement = ?"),
110        (description.is_some(), "description = ?"),
111    ];
112
113    // Build update query with only the fields that have values
114    let field_clauses = update_fields
115        .iter()
116        .filter(|(has_value, _)| *has_value)
117        .map(|(_, field)| format!(", {}", field))
118        .collect::<String>();
119
120    let query = format!(
121        "UPDATE resource_types SET updated_at = CURRENT_TIMESTAMP{} WHERE id = ?",
122        field_clauses
123    );
124
125    // Start binding parameters
126    let mut db_query = sqlx::query_as::<_, ResourceType>(&query);
127
128    // Bind parameters
129    if let Some(val) = name {
130        db_query = db_query.bind(val);
131    }
132    if let Some(val) = category {
133        db_query = db_query.bind(val);
134    }
135    if let Some(val) = unit_of_measurement {
136        db_query = db_query.bind(val);
137    }
138    if let Some(val) = description {
139        db_query = db_query.bind(val);
140    }
141
142    // Bind the ID parameter
143    db_query = db_query.bind(id);
144
145    // Execute the query in a transaction
146    let mut tx = pool.begin().await?;
147    let resource_type = db_query
148        .fetch_one(&mut *tx)
149        .await
150        .context("Failed to update resource type")?;
151
152    tx.commit().await?;
153    Ok(resource_type)
154}
155
156/// Deletes a resource type from the database.
157pub async fn delete_resource_type(pool: &Pool<MySql>, id: i32) -> anyhow::Result<()> {
158    let mut tx = pool.begin().await?;
159
160    sqlx::query("DELETE FROM resource_types WHERE id = ?")
161        .bind(id)
162        .execute(&mut *tx)
163        .await
164        .context("Failed to delete resource type")?;
165
166    tx.commit().await?;
167    Ok(())
168}
169
170/// Retrieves a paginated list of cost metrics from the database, with optional filtering.
171///
172/// # Arguments
173///
174/// * `pool` - Database connection pool for executing the query
175/// * `page` - Zero-based page number (e.g., 0 for first page, 1 for second page)
176/// * `per_page` - Number of records to fetch per page
177/// * `resource_type_id` - Optional filter by resource type
178/// * `provider_id` - Optional filter by provider
179/// * `app_id` - Optional filter by application
180/// * `start_date` - Optional filter for metrics after this date
181/// * `end_date` - Optional filter for metrics before this date
182/// * `billing_period` - Optional filter by billing period (e.g., "2025-05")
183///
184/// # Returns
185///
186/// * `Ok(Vec<CostMetricWithType>)` - Successfully retrieved list of cost metrics with type information
187/// * `Err(anyhow::Error)` - Failed to fetch cost metrics, with context
188pub async fn list_cost_metrics(
189    pool: &Pool<MySql>, 
190    page: i64, 
191    per_page: i64,
192    resource_type_id: Option<i32>,
193    provider_id: Option<i64>,
194    app_id: Option<i64>,
195    start_date: Option<DateTime<Utc>>,
196    end_date: Option<DateTime<Utc>>,
197    billing_period: Option<&str>
198) -> anyhow::Result<Vec<CostMetricWithType>> {
199    println!("Attempting to fetch cost metrics from database with filters...");
200
201    // Start building the query with filters
202    let mut query = String::from(
203        r#"
204        SELECT 
205            cm.*,
206            rt.name as resource_type_name,
207            rt.category as resource_type_category,
208            rt.unit_of_measurement
209        FROM 
210            cost_metrics cm
211        JOIN 
212            resource_types rt ON cm.resource_type_id = rt.id
213        WHERE 1=1
214        "#
215    );
216    
217    let mut params: Vec<String> = Vec::new();
218    
219    if resource_type_id.is_some() {
220        query.push_str(" AND cm.resource_type_id = ?");
221        params.push("resource_type_id".to_string());
222    }
223    
224    if provider_id.is_some() {
225        query.push_str(" AND cm.provider_id = ?");
226        params.push("provider_id".to_string());
227    }
228    
229    if app_id.is_some() {
230        query.push_str(" AND cm.app_id = ?");
231        params.push("app_id".to_string());
232    }
233    
234    if start_date.is_some() {
235        query.push_str(" AND cm.start_time >= ?");
236        params.push("start_date".to_string());
237    }
238    
239    if end_date.is_some() {
240        query.push_str(" AND cm.end_time <= ?");
241        params.push("end_date".to_string());
242    }
243    
244    if billing_period.is_some() {
245        query.push_str(" AND cm.billing_period = ?");
246        params.push("billing_period".to_string());
247    }
248    
249    // Add ordering and pagination
250    query.push_str(" ORDER BY cm.start_time DESC LIMIT ? OFFSET ?");
251    
252    // Prepare the query
253    let mut db_query = sqlx::query_as::<_, CostMetricWithType>(&query);
254    
255    // Bind parameters
256    if let Some(val) = resource_type_id {
257        db_query = db_query.bind(val);
258    }
259    
260    if let Some(val) = provider_id {
261        db_query = db_query.bind(val);
262    }
263    
264    if let Some(val) = app_id {
265        db_query = db_query.bind(val);
266    }
267    
268    if let Some(val) = start_date {
269        db_query = db_query.bind(val);
270    }
271    
272    if let Some(val) = end_date {
273        db_query = db_query.bind(val);
274    }
275    
276    if let Some(val) = billing_period {
277        db_query = db_query.bind(val);
278    }
279    
280    // Add pagination parameters
281    db_query = db_query.bind(per_page).bind(page * per_page);
282    
283    // Execute query
284    let result = db_query.fetch_all(pool).await;
285
286    match result {
287        Ok(metrics) => {
288            println!("Successfully fetched {} cost metrics", metrics.len());
289            Ok(metrics)
290        }
291        Err(e) => {
292            eprintln!("Error fetching cost metrics: {:#?}", e);
293            Err(anyhow::Error::new(e).context("Failed to fetch cost metrics"))
294        }
295    }
296}
297
298/// Counts the total number of cost metrics in the database with optional filtering.
299pub async fn count_cost_metrics(
300    pool: &Pool<MySql>,
301    resource_type_id: Option<i32>,
302    provider_id: Option<i64>,
303    app_id: Option<i64>,
304    start_date: Option<DateTime<Utc>>,
305    end_date: Option<DateTime<Utc>>,
306    billing_period: Option<&str>
307) -> anyhow::Result<i64> {
308    // Start building the query with filters
309    let mut query = String::from(
310        "SELECT COUNT(*) FROM cost_metrics WHERE 1=1"
311    );
312    
313    if resource_type_id.is_some() {
314        query.push_str(" AND resource_type_id = ?");
315    }
316    
317    if provider_id.is_some() {
318        query.push_str(" AND provider_id = ?");
319    }
320    
321    if app_id.is_some() {
322        query.push_str(" AND app_id = ?");
323    }
324    
325    if start_date.is_some() {
326        query.push_str(" AND start_time >= ?");
327    }
328    
329    if end_date.is_some() {
330        query.push_str(" AND end_time <= ?");
331    }
332    
333    if billing_period.is_some() {
334        query.push_str(" AND billing_period = ?");
335    }
336    
337    // Prepare the query
338    let mut db_query = sqlx::query_scalar::<_, i64>(&query);
339    
340    // Bind parameters
341    if let Some(val) = resource_type_id {
342        db_query = db_query.bind(val);
343    }
344    
345    if let Some(val) = provider_id {
346        db_query = db_query.bind(val);
347    }
348    
349    if let Some(val) = app_id {
350        db_query = db_query.bind(val);
351    }
352    
353    if let Some(val) = start_date {
354        db_query = db_query.bind(val);
355    }
356    
357    if let Some(val) = end_date {
358        db_query = db_query.bind(val);
359    }
360    
361    if let Some(val) = billing_period {
362        db_query = db_query.bind(val);
363    }
364    
365    let count = db_query
366        .fetch_one(pool)
367        .await
368        .context("Failed to count cost metrics")?;
369
370    Ok(count)
371}
372
373/// Creates a new cost metric in the database.
374pub async fn create_cost_metric(
375    pool: &Pool<MySql>,
376    resource_type_id: i32,
377    provider_id: Option<i64>,
378    region_id: Option<i64>,
379    app_id: Option<i64>,
380    worker_id: Option<i64>,
381    org_id: Option<i64>,
382    start_time: DateTime<Utc>,
383    end_time: DateTime<Utc>,
384    usage_quantity: f64,
385    unit_cost: f64,
386    currency: &str,
387    total_cost: f64,
388    discount_percentage: Option<f64>,
389    discount_reason: Option<&str>,
390    billing_period: Option<&str>
391) -> anyhow::Result<CostMetric> {
392    let mut tx = pool.begin().await?;
393
394    let cost_metric = sqlx::query_as::<_, CostMetric>(
395        r#"INSERT INTO cost_metrics (
396            resource_type_id, provider_id, region_id, app_id, worker_id, org_id,
397            start_time, end_time, usage_quantity, unit_cost, currency, total_cost,
398            discount_percentage, discount_reason, billing_period
399        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
400    )
401    .bind(resource_type_id)
402    .bind(provider_id)
403    .bind(region_id)
404    .bind(app_id)
405    .bind(worker_id)
406    .bind(org_id)
407    .bind(start_time)
408    .bind(end_time)
409    .bind(usage_quantity)
410    .bind(unit_cost)
411    .bind(currency)
412    .bind(total_cost)
413    .bind(discount_percentage)
414    .bind(discount_reason)
415    .bind(billing_period)
416    .fetch_one(&mut *tx)
417    .await
418    .context("Failed to create cost metric")?;
419
420    tx.commit().await?;
421    Ok(cost_metric)
422}
423
424/// Retrieves a specific cost metric by its unique identifier, with resource type information.
425pub async fn get_cost_metric_by_id(pool: &Pool<MySql>, id: i64) -> anyhow::Result<CostMetricWithType> {
426    let cost_metric = sqlx::query_as::<_, CostMetricWithType>(
427        r#"
428        SELECT 
429            cm.id,
430            cm.resource_type_id,
431            cm.provider_id,
432            cm.region_id,
433            cm.app_id,
434            cm.worker_id,
435            cm.org_id,
436            cm.start_time,
437            cm.end_time,
438            cm.usage_quantity,
439            cm.unit_cost,
440            cm.currency,
441            cm.total_cost,
442            cm.discount_percentage,
443            cm.discount_reason,
444            cm.billing_period,
445            cm.created_at,
446            cm.updated_at,
447            rt.name AS resource_type_name,
448            rt.category AS resource_type_category,
449            rt.unit_of_measurement
450        FROM 
451            cost_metrics cm
452        JOIN 
453            resource_types rt ON cm.resource_type_id = rt.id
454        WHERE 
455            cm.id = ?
456        "#
457    )
458    .bind(id)
459    .fetch_one(pool)
460    .await
461    .map_err(|e| {
462        eprintln!("Failed to fetch cost metric by id {}: {:?}", id, e);
463        anyhow::Error::new(e).context(format!("Failed to fetch cost metric with id {}", id))
464    })?
465;
466
467    Ok(cost_metric)
468}
469
470/// Deletes a cost metric from the database.
471pub async fn delete_cost_metric(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
472    let mut tx = pool.begin().await?;
473
474    sqlx::query("DELETE FROM cost_metrics WHERE id = ?")
475        .bind(id)
476        .execute(&mut *tx)
477        .await
478        .context("Failed to delete cost metric")?;
479
480    tx.commit().await?;
481    Ok(())
482}
483
484/// Get aggregate cost metrics grouped by a specific dimension.
485///
486/// # Arguments
487///
488/// * `pool` - Database connection pool for executing the query
489/// * `dimension` - Dimension to group by ('app', 'provider', 'resource_type', 'region', 'worker', 'org')
490/// * `start_date` - Filter for metrics after this date
491/// * `end_date` - Filter for metrics before this date
492/// * `limit` - Maximum number of results to return
493///
494/// # Returns
495///
496/// * `Ok(Vec<(String, f64)>)` - Successfully retrieved aggregated costs by dimension
497/// * `Err(anyhow::Error)` - Failed to fetch cost metrics, with context
498pub async fn get_cost_metrics_by_dimension(
499    pool: &Pool<MySql>,
500    dimension: &str,
501    start_date: DateTime<Utc>,
502    end_date: DateTime<Utc>,
503    limit: i64
504) -> anyhow::Result<Vec<(String, f64)>> {
505    // Validate and map the dimension to the appropriate SQL expression
506    let (group_field, join_clause) = match dimension {
507        "app" => ("apps.name", "LEFT JOIN apps ON cost_metrics.app_id = apps.id"),
508        "provider" => ("providers.name", "LEFT JOIN providers ON cost_metrics.provider_id = providers.id"),
509        "resource_type" => ("resource_types.name", "LEFT JOIN resource_types ON cost_metrics.resource_type_id = resource_types.id"),
510        "region" => ("regions.name", "LEFT JOIN regions ON cost_metrics.region_id = regions.id"),
511        "worker" => ("workers.name", "LEFT JOIN workers ON cost_metrics.worker_id = workers.id"),
512        "org" => ("orgs.name", "LEFT JOIN orgs ON cost_metrics.org_id = orgs.id"),
513        _ => return Err(anyhow::anyhow!("Invalid dimension: {}", dimension)),
514    };
515
516    let query = format!(
517        r#"
518        SELECT 
519            {} as name,
520            SUM(total_cost) as total_cost
521        FROM 
522            cost_metrics
523        {}
524        WHERE 
525            start_time >= ? AND end_time <= ?
526        GROUP BY 
527            name
528        ORDER BY 
529            total_cost DESC
530        LIMIT ?
531        "#,
532        group_field, join_clause
533    );
534
535    let results = sqlx::query(&query)
536        .bind(start_date)
537        .bind(end_date)
538        .bind(limit)
539        .map(|row: sqlx::mysql::MySqlRow| {
540            let name: String = row.get("name");
541            let total_cost: f64 = row.get("total_cost");
542            (name, total_cost)
543        })
544        .fetch_all(pool)
545        .await
546        .context("Failed to fetch aggregated cost metrics")?;
547
548    Ok(results)
549}
550
551/// Retrieves cost metrics over time for a specific application.
552///
553/// # Arguments
554///
555/// * `pool` - Database connection pool for executing the query
556/// * `app_id` - ID of the application to analyze
557/// * `interval` - Time interval ('day', 'week', 'month')
558/// * `start_date` - Filter for metrics after this date
559/// * `end_date` - Filter for metrics before this date
560///
561/// # Returns
562///
563/// * `Ok(Vec<(DateTime<Utc>, f64)>)` - Successfully retrieved costs over time
564/// * `Err(anyhow::Error)` - Failed to fetch cost metrics, with context
565pub async fn get_app_cost_over_time(
566    pool: &Pool<MySql>,
567    app_id: i64,
568    interval: &str,
569    start_date: DateTime<Utc>,
570    end_date: DateTime<Utc>
571) -> anyhow::Result<Vec<(DateTime<Utc>, f64)>> {
572    // Map the interval to the appropriate SQL date function
573    let date_function = match interval {
574        "day" => "DATE(start_time)",
575        "week" => "DATE(start_time - INTERVAL WEEKDAY(start_time) DAY)", // First day of week
576        "month" => "DATE_FORMAT(start_time, '%Y-%m-01')", // First day of month
577        _ => return Err(anyhow::anyhow!("Invalid interval: {}", interval)),
578    };
579
580    let query = format!(
581        r#"
582        SELECT 
583            {} as time_bucket,
584            SUM(total_cost) as total_cost
585        FROM 
586            cost_metrics
587        WHERE 
588            app_id = ? AND start_time >= ? AND end_time <= ?
589        GROUP BY 
590            time_bucket
591        ORDER BY 
592            time_bucket ASC
593        "#,
594        date_function
595    );
596
597    let results = sqlx::query(&query)
598        .bind(app_id)
599        .bind(start_date)
600        .bind(end_date)
601        .map(|row: sqlx::mysql::MySqlRow| {
602            let time_bucket: DateTime<Utc> = row.get("time_bucket");
603            let total_cost: f64 = row.get("total_cost");
604            (time_bucket, total_cost)
605        })
606        .fetch_all(pool)
607        .await
608        .context("Failed to fetch app cost over time")?;
609
610    Ok(results)
611}
612
613/// Retrieves a paginated list of cost budgets from the database.
614pub async fn list_cost_budgets(pool: &Pool<MySql>, page: i64, per_page: i64) -> anyhow::Result<Vec<CostBudget>> {
615    println!("Attempting to fetch cost budgets from database...");
616
617    let result = sqlx::query_as::<_, CostBudget>(
618        r#"
619        SELECT * FROM cost_budgets
620        ORDER BY created_at DESC
621        LIMIT ? OFFSET ?
622        "#,
623    )
624    .bind(per_page)
625    .bind(page * per_page)
626    .fetch_all(pool)
627    .await;
628
629    match result {
630        Ok(budgets) => {
631            println!("Successfully fetched {} cost budgets", budgets.len());
632            Ok(budgets)
633        }
634        Err(e) => {
635            eprintln!("Error fetching cost budgets: {:#?}", e);
636            Err(anyhow::Error::new(e).context("Failed to fetch cost budgets"))
637        }
638    }
639}
640
641/// Counts the total number of cost budgets in the database.
642pub async fn count_cost_budgets(pool: &Pool<MySql>) -> anyhow::Result<i64> {
643    let count = sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM cost_budgets")
644        .fetch_one(pool)
645        .await
646        .context("Failed to count cost budgets")?;
647
648    Ok(count)
649}
650
651/// Creates a new cost budget in the database.
652pub async fn create_cost_budget(
653    pool: &Pool<MySql>,
654    org_id: i64,
655    app_id: Option<i64>,
656    budget_name: &str,
657    budget_amount: f64,
658    currency: &str,
659    budget_period: &str,
660    period_start: DateTime<Utc>,
661    period_end: DateTime<Utc>,
662    alert_threshold_percentage: f64,
663    alert_contacts: &str,
664    created_by: i64
665) -> anyhow::Result<CostBudget> {
666    let mut tx = pool.begin().await?;
667
668    let cost_budget = sqlx::query_as::<_, CostBudget>(
669        r#"INSERT INTO cost_budgets (
670            org_id, app_id, budget_name, budget_amount, currency, budget_period,
671            period_start, period_end, alert_threshold_percentage, alert_contacts,
672            is_active, created_by
673        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, TRUE, ?)"#,
674    )
675    .bind(org_id)
676    .bind(app_id)
677    .bind(budget_name)
678    .bind(budget_amount)
679    .bind(currency)
680    .bind(budget_period)
681    .bind(period_start)
682    .bind(period_end)
683    .bind(alert_threshold_percentage)
684    .bind(alert_contacts)
685    .bind(created_by)
686    .fetch_one(&mut *tx)
687    .await
688    .context("Failed to create cost budget")?;
689
690    tx.commit().await?;
691    Ok(cost_budget)
692}
693
694/// Retrieves a specific cost budget by its unique identifier.
695pub async fn get_cost_budget_by_id(pool: &Pool<MySql>, id: i64) -> anyhow::Result<CostBudget> {
696    let cost_budget = sqlx::query_as::<_, CostBudget>("SELECT * FROM cost_budgets WHERE id = ?")
697        .bind(id)
698        .fetch_one(pool)
699        .await
700        .context("Failed to fetch cost budget")?;
701
702    Ok(cost_budget)
703}
704
705/// Updates an existing cost budget in the database.
706pub async fn update_cost_budget(
707    pool: &Pool<MySql>,
708    id: i64,
709    budget_name: Option<&str>,
710    budget_amount: Option<f64>,
711    alert_threshold_percentage: Option<f64>,
712    alert_contacts: Option<&str>,
713    is_active: Option<bool>
714) -> anyhow::Result<CostBudget> {
715    // Define which fields are being updated
716    let update_fields = [
717        (budget_name.is_some(), "budget_name = ?"),
718        (budget_amount.is_some(), "budget_amount = ?"),
719        (alert_threshold_percentage.is_some(), "alert_threshold_percentage = ?"),
720        (alert_contacts.is_some(), "alert_contacts = ?"),
721        (is_active.is_some(), "is_active = ?"),
722    ];
723
724    // Build update query with only the fields that have values
725    let field_clauses = update_fields
726        .iter()
727        .filter(|(has_value, _)| *has_value)
728        .map(|(_, field)| format!(", {}", field))
729        .collect::<String>();
730
731    let query = format!(
732        "UPDATE cost_budgets SET updated_at = CURRENT_TIMESTAMP{} WHERE id = ?",
733        field_clauses
734    );
735
736    // Start binding parameters
737    let mut db_query = sqlx::query_as::<_, CostBudget>(&query);
738
739    // Bind parameters
740    if let Some(val) = budget_name {
741        db_query = db_query.bind(val);
742    }
743    if let Some(val) = budget_amount {
744        db_query = db_query.bind(val);
745    }
746    if let Some(val) = alert_threshold_percentage {
747        db_query = db_query.bind(val);
748    }
749    if let Some(val) = alert_contacts {
750        db_query = db_query.bind(val);
751    }
752    if let Some(val) = is_active {
753        db_query = db_query.bind(val);
754    }
755
756    // Bind the ID parameter
757    db_query = db_query.bind(id);
758
759    // Execute the query in a transaction
760    let mut tx = pool.begin().await?;
761    let cost_budget = db_query
762        .fetch_one(&mut *tx)
763        .await
764        .context("Failed to update cost budget")?;
765
766    tx.commit().await?;
767    Ok(cost_budget)
768}
769
770/// Deletes a cost budget from the database.
771pub async fn delete_cost_budget(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
772    let mut tx = pool.begin().await?;
773
774    sqlx::query("DELETE FROM cost_budgets WHERE id = ?")
775        .bind(id)
776        .execute(&mut *tx)
777        .await
778        .context("Failed to delete cost budget")?;
779
780    tx.commit().await?;
781    Ok(())
782}
783
784/// Retrieves a paginated list of cost projections from the database.
785pub async fn list_cost_projections(pool: &Pool<MySql>, page: i64, per_page: i64) -> anyhow::Result<Vec<CostProjection>> {
786    println!("Attempting to fetch cost projections from database...");
787
788    let result = sqlx::query_as::<_, CostProjection>(
789        r#"
790        SELECT * FROM cost_projections
791        ORDER BY created_at DESC
792        LIMIT ? OFFSET ?
793        "#,
794    )
795    .bind(per_page)
796    .bind(page * per_page)
797    .fetch_all(pool)
798    .await;
799
800    match result {
801        Ok(projections) => {
802            println!("Successfully fetched {} cost projections", projections.len());
803            Ok(projections)
804        }
805        Err(e) => {
806            eprintln!("Error fetching cost projections: {:#?}", e);
807            Err(anyhow::Error::new(e).context("Failed to fetch cost projections"))
808        }
809    }
810}
811
812/// Creates a new cost projection in the database.
813pub async fn create_cost_projection(
814    pool: &Pool<MySql>,
815    org_id: i64,
816    app_id: Option<i64>,
817    projection_period: &str,
818    start_date: DateTime<Utc>,
819    end_date: DateTime<Utc>,
820    projected_cost: f64,
821    currency: &str,
822    projection_model: &str,
823    confidence_level: Option<f64>,
824    metadata: Option<&str>
825) -> anyhow::Result<CostProjection> {
826    let mut tx = pool.begin().await?;
827
828    let cost_projection = sqlx::query_as::<_, CostProjection>(
829        r#"INSERT INTO cost_projections (
830            org_id, app_id, projection_period, start_date, end_date,
831            projected_cost, currency, projection_model, confidence_level, metadata
832        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
833    )
834    .bind(org_id)
835    .bind(app_id)
836    .bind(projection_period)
837    .bind(start_date)
838    .bind(end_date)
839    .bind(projected_cost)
840    .bind(currency)
841    .bind(projection_model)
842    .bind(confidence_level)
843    .bind(metadata)
844    .fetch_one(&mut *tx)
845    .await
846    .context("Failed to create cost projection")?;
847
848    tx.commit().await?;
849    Ok(cost_projection)
850}
851
852/// Retrieves a specific cost projection by its unique identifier.
853pub async fn get_cost_projection_by_id(pool: &Pool<MySql>, id: i64) -> anyhow::Result<CostProjection> {
854    let cost_projection = sqlx::query_as::<_, CostProjection>("SELECT * FROM cost_projections WHERE id = ?")
855        .bind(id)
856        .fetch_one(pool)
857        .await
858        .context("Failed to fetch cost projection")?;
859
860    Ok(cost_projection)
861}
862
863/// Deletes a cost projection from the database.
864pub async fn delete_cost_projection(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
865    let mut tx = pool.begin().await?;
866
867    sqlx::query("DELETE FROM cost_projections WHERE id = ?")
868        .bind(id)
869        .execute(&mut *tx)
870        .await
871        .context("Failed to delete cost projection")?;
872
873    tx.commit().await?;
874    Ok(())
875}
876
877/// Retrieves a paginated list of resource pricing entries from the database.
878pub async fn list_resource_pricing(
879    pool: &Pool<MySql>, 
880    page: i64, 
881    per_page: i64,
882    resource_type_id: Option<i32>,
883    provider_id: Option<i64>,
884    region_id: Option<i64>,
885    pricing_model: Option<&str>,
886    tier_name: Option<&str>
887) -> anyhow::Result<Vec<ResourcePricing>> {
888    println!("Attempting to fetch resource pricing from database with filters...");
889
890    // Start building the query with filters
891    let mut query = String::from(
892        "SELECT * FROM resource_pricing WHERE 1=1"
893    );
894    
895    if resource_type_id.is_some() {
896        query.push_str(" AND resource_type_id = ?");
897    }
898    
899    if provider_id.is_some() {
900        query.push_str(" AND provider_id = ?");
901    }
902    
903    if region_id.is_some() {
904        query.push_str(" AND region_id = ?");
905    }
906    
907    if pricing_model.is_some() {
908        query.push_str(" AND pricing_model = ?");
909    }
910    
911    if tier_name.is_some() {
912        query.push_str(" AND tier_name = ?");
913    }
914    
915    // Check for current pricing (effective_to is NULL or in the future)
916    query.push_str(" AND (effective_to IS NULL OR effective_to > CURRENT_TIMESTAMP)");
917    
918    // Add ordering and pagination
919    query.push_str(" ORDER BY created_at DESC LIMIT ? OFFSET ?");
920    
921    // Prepare the query
922    let mut db_query = sqlx::query_as::<_, ResourcePricing>(&query);
923    
924    // Bind parameters
925    if let Some(val) = resource_type_id {
926        db_query = db_query.bind(val);
927    }
928    
929    if let Some(val) = provider_id {
930        db_query = db_query.bind(val);
931    }
932    
933    if let Some(val) = region_id {
934        db_query = db_query.bind(val);
935    }
936    
937    if let Some(val) = pricing_model {
938        db_query = db_query.bind(val);
939    }
940    
941    if let Some(val) = tier_name {
942        db_query = db_query.bind(val);
943    }
944    
945    // Add pagination parameters
946    db_query = db_query.bind(per_page).bind(page * per_page);
947    
948    // Execute query
949    let result = db_query.fetch_all(pool).await;
950
951    match result {
952        Ok(pricing) => {
953            println!("Successfully fetched {} resource pricing entries", pricing.len());
954            Ok(pricing)
955        }
956        Err(e) => {
957            eprintln!("Error fetching resource pricing: {:#?}", e);
958            Err(anyhow::Error::new(e).context("Failed to fetch resource pricing"))
959        }
960    }
961}
962
963/// Creates a new resource pricing entry in the database.
964pub async fn create_resource_pricing(
965    pool: &Pool<MySql>,
966    resource_type_id: i32,
967    provider_id: i64,
968    region_id: Option<i64>,
969    tier_name: &str,
970    unit_price: f64,
971    currency: &str,
972    effective_from: DateTime<Utc>,
973    effective_to: Option<DateTime<Utc>>,
974    pricing_model: &str,
975    commitment_period: Option<&str>,
976    volume_discount_tiers: Option<&str>
977) -> anyhow::Result<ResourcePricing> {
978    let mut tx = pool.begin().await?;
979
980    let resource_pricing = sqlx::query_as::<_, ResourcePricing>(
981        r#"INSERT INTO resource_pricing (
982            resource_type_id, provider_id, region_id, tier_name, unit_price,
983            currency, effective_from, effective_to, pricing_model, 
984            commitment_period, volume_discount_tiers
985        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)"#,
986    )
987    .bind(resource_type_id)
988    .bind(provider_id)
989    .bind(region_id)
990    .bind(tier_name)
991    .bind(unit_price)
992    .bind(currency)
993    .bind(effective_from)
994    .bind(effective_to)
995    .bind(pricing_model)
996    .bind(commitment_period)
997    .bind(volume_discount_tiers)
998    .fetch_one(&mut *tx)
999    .await
1000    .context("Failed to create resource pricing")?;
1001
1002    tx.commit().await?;
1003    Ok(resource_pricing)
1004}
1005
1006/// Retrieves a specific resource pricing entry by its unique identifier.
1007pub async fn get_resource_pricing_by_id(pool: &Pool<MySql>, id: i64) -> anyhow::Result<ResourcePricing> {
1008    let resource_pricing = sqlx::query_as::<_, ResourcePricing>("SELECT * FROM resource_pricing WHERE id = ?")
1009        .bind(id)
1010        .fetch_one(pool)
1011        .await
1012        .context("Failed to fetch resource pricing")?;
1013
1014    Ok(resource_pricing)
1015}
1016
1017/// Updates an existing resource pricing entry in the database.
1018pub async fn update_resource_pricing(
1019    pool: &Pool<MySql>,
1020    id: i64,
1021    unit_price: Option<f64>,
1022    effective_to: Option<DateTime<Utc>>,
1023    volume_discount_tiers: Option<&str>
1024) -> anyhow::Result<ResourcePricing> {
1025    // Define which fields are being updated
1026    let update_fields = [
1027        (unit_price.is_some(), "unit_price = ?"),
1028        (effective_to.is_some(), "effective_to = ?"),
1029        (volume_discount_tiers.is_some(), "volume_discount_tiers = ?"),
1030    ];
1031
1032    // Build update query with only the fields that have values
1033    let field_clauses = update_fields
1034        .iter()
1035        .filter(|(has_value, _)| *has_value)
1036        .map(|(_, field)| format!(", {}", field))
1037        .collect::<String>();
1038
1039    let query = format!(
1040        "UPDATE resource_pricing SET updated_at = CURRENT_TIMESTAMP{} WHERE id = ?",
1041        field_clauses
1042    );
1043
1044    // Start binding parameters
1045    let mut db_query = sqlx::query_as::<_, ResourcePricing>(&query);
1046
1047    // Bind parameters
1048    if let Some(val) = unit_price {
1049        db_query = db_query.bind(val);
1050    }
1051    if let Some(val) = effective_to {
1052        db_query = db_query.bind(val);
1053    }
1054    if let Some(val) = volume_discount_tiers {
1055        db_query = db_query.bind(val);
1056    }
1057
1058    // Bind the ID parameter
1059    db_query = db_query.bind(id);
1060
1061    // Execute the query in a transaction
1062    let mut tx = pool.begin().await?;
1063    let resource_pricing = db_query
1064        .fetch_one(&mut *tx)
1065        .await
1066        .context("Failed to update resource pricing")?;
1067
1068    tx.commit().await?;
1069    Ok(resource_pricing)
1070}
1071
1072/// Deletes a resource pricing entry from the database.
1073pub async fn delete_resource_pricing(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
1074    let mut tx = pool.begin().await?;
1075
1076    sqlx::query("DELETE FROM resource_pricing WHERE id = ?")
1077        .bind(id)
1078        .execute(&mut *tx)
1079        .await
1080        .context("Failed to delete resource pricing")?;
1081
1082    tx.commit().await?;
1083    Ok(())
1084}
1085
1086/// Retrieves a list of cost allocation tags for a specific resource.
1087pub async fn get_cost_allocation_tags(
1088    pool: &Pool<MySql>,
1089    resource_id: i64,
1090    resource_type: &str
1091) -> anyhow::Result<Vec<CostAllocationTag>> {
1092    let tags = sqlx::query_as::<_, CostAllocationTag>(
1093        "SELECT * FROM cost_allocation_tags WHERE resource_id = ? AND resource_type = ?"
1094    )
1095    .bind(resource_id)
1096    .bind(resource_type)
1097    .fetch_all(pool)
1098    .await
1099    .context("Failed to fetch cost allocation tags")?;
1100
1101    Ok(tags)
1102}
1103
1104/// Creates a new cost allocation tag in the database.
1105pub async fn create_cost_allocation_tag(
1106    pool: &Pool<MySql>,
1107    tag_key: &str,
1108    tag_value: &str,
1109    resource_id: i64,
1110    resource_type: &str
1111) -> anyhow::Result<CostAllocationTag> {
1112    let mut tx = pool.begin().await?;
1113
1114    let tag = sqlx::query_as::<_, CostAllocationTag>(
1115        r#"INSERT INTO cost_allocation_tags (
1116            tag_key, tag_value, resource_id, resource_type
1117        ) VALUES (?, ?, ?, ?)"#,
1118    )
1119    .bind(tag_key)
1120    .bind(tag_value)
1121    .bind(resource_id)
1122    .bind(resource_type)
1123    .fetch_one(&mut *tx)
1124    .await
1125    .context("Failed to create cost allocation tag")?;
1126
1127    tx.commit().await?;
1128    Ok(tag)
1129}
1130
1131/// Deletes a cost allocation tag from the database.
1132pub async fn delete_cost_allocation_tag(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
1133    let mut tx = pool.begin().await?;
1134
1135    sqlx::query("DELETE FROM cost_allocation_tags WHERE id = ?")
1136        .bind(id)
1137        .execute(&mut *tx)
1138        .await
1139        .context("Failed to delete cost allocation tag")?;
1140
1141    tx.commit().await?;
1142    Ok(())
1143}