omni_orchestrator/schemas/v1/db/queries/
alert.rs

1use crate::models::alert::{
2    Alert, AlertWithAcknowledgments, AlertAcknowledgment, 
3    AlertEscalation, AlertHistory, AlertWithRelatedData
4};
5use anyhow::Context;
6use serde::Serialize;
7use sqlx::{MySql, Pool, Row};
8use chrono::{DateTime, Utc};
9use serde_json::Value as JsonValue;
10
11// =================== Alert Management ===================
12
13/// Retrieves a paginated list of alerts from the database.
14///
15/// This function fetches a subset of alerts based on pagination parameters,
16/// ordering them by timestamp in descending order (newest first). Filtering
17/// options allow for narrowing down results by various criteria.
18///
19/// # Arguments
20///
21/// * `pool` - Database connection pool for executing the query
22/// * `page` - Zero-based page number (e.g., 0 for first page, 1 for second page)
23/// * `per_page` - Number of records to fetch per page
24/// * `status` - Optional filter for alert status
25/// * `severity` - Optional filter for alert severity
26/// * `org_id` - Optional filter for organization ID
27/// * `app_id` - Optional filter for application ID
28/// * `service` - Optional filter for service name
29/// * `from_date` - Optional filter for alerts after this timestamp
30/// * `to_date` - Optional filter for alerts before this timestamp
31///
32/// # Returns
33///
34/// * `Ok(Vec<Alert>)` - Successfully retrieved list of alerts
35/// * `Err(anyhow::Error)` - Failed to fetch alerts, with context
36pub async fn list_alerts(
37    pool: &Pool<MySql>,
38    page: i64,
39    per_page: i64,
40    status: Option<&str>,
41    severity: Option<&str>,
42    org_id: Option<i64>,
43    app_id: Option<i64>,
44    service: Option<&str>,
45    from_date: Option<DateTime<Utc>>,
46    to_date: Option<DateTime<Utc>>,
47) -> anyhow::Result<Vec<Alert>> {
48    println!("Attempting to fetch alerts from database with filtering...");
49
50    // Start building the query with base selection
51    let mut query_string = String::from(
52        "SELECT * FROM alerts WHERE 1=1"
53    );
54    
55    // Add optional filters
56    if let Some(s) = status {
57        query_string.push_str(" AND status = ?");
58    }
59    if let Some(s) = severity {
60        query_string.push_str(" AND severity = ?");
61    }
62    if let Some(_) = org_id {
63        query_string.push_str(" AND org_id = ?");
64    }
65    if let Some(_) = app_id {
66        query_string.push_str(" AND app_id = ?");
67    }
68    if let Some(s) = service {
69        query_string.push_str(" AND service = ?");
70    }
71    if let Some(_) = from_date {
72        query_string.push_str(" AND timestamp >= ?");
73    }
74    if let Some(_) = to_date {
75        query_string.push_str(" AND timestamp <= ?");
76    }
77    
78    // Add order and limit
79    query_string.push_str(" ORDER BY timestamp DESC LIMIT ? OFFSET ?");
80    
81    // Build the query
82    let mut query = sqlx::query_as::<_, Alert>(&query_string);
83    
84    // Bind parameters in the same order
85    if let Some(s) = status {
86        query = query.bind(s);
87    }
88    if let Some(s) = severity {
89        query = query.bind(s);
90    }
91    if let Some(id) = org_id {
92        query = query.bind(id);
93    }
94    if let Some(id) = app_id {
95        query = query.bind(id);
96    }
97    if let Some(s) = service {
98        query = query.bind(s);
99    }
100    if let Some(date) = from_date {
101        query = query.bind(date);
102    }
103    if let Some(date) = to_date {
104        query = query.bind(date);
105    }
106    
107    // Bind pagination params
108    query = query.bind(per_page).bind(page * per_page);
109    
110    // Execute the query
111    let result = query.fetch_all(pool).await;
112
113    match result {
114        Ok(alerts) => {
115            println!("Successfully fetched {} alerts", alerts.len());
116            Ok(alerts)
117        }
118        Err(e) => {
119            eprintln!("Error fetching alerts: {:#?}", e);
120            Err(anyhow::Error::new(e).context("Failed to fetch alerts"))
121        }
122    }
123}
124
125/// Retrieves a specific alert by its ID, along with related acknowledgments and escalations.
126///
127/// This function fetches a single alert record along with its associated acknowledgments,
128/// escalations, and history records to provide comprehensive information about the alert.
129///
130/// # Arguments
131///
132/// * `pool` - Database connection pool for executing the query
133/// * `id` - Unique identifier of the alert to retrieve
134///
135/// # Returns
136///
137/// * `Ok(AlertWithRelatedData)` - Successfully retrieved alert with related data
138/// * `Err(anyhow::Error)` - Failed to fetch alert or related data
139pub async fn get_alert_with_related_data(
140    pool: &Pool<MySql>,
141    id: i64,
142) -> anyhow::Result<AlertWithRelatedData> {
143    // First fetch the alert
144    let alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
145        .bind(id)
146        .fetch_one(pool)
147        .await
148        .context("Failed to fetch alert")?;
149        
150    // Fetch acknowledgments
151    let acknowledgments = sqlx::query_as::<_, AlertAcknowledgment>(
152        "SELECT * FROM alert_acknowledgments WHERE alert_id = ? ORDER BY acknowledged_at DESC"
153    )
154    .bind(id)
155    .fetch_all(pool)
156    .await
157    .context("Failed to fetch alert acknowledgments")?;
158    
159    // Fetch escalations
160    let escalations = sqlx::query_as::<_, AlertEscalation>(
161        "SELECT * FROM alert_escalations WHERE alert_id = ? ORDER BY escalated_at DESC"
162    )
163    .bind(id)
164    .fetch_all(pool)
165    .await
166    .context("Failed to fetch alert escalations")?;
167    
168    // Fetch history
169    let history = sqlx::query_as::<_, AlertHistory>(
170        "SELECT * FROM alert_history WHERE alert_id = ? ORDER BY performed_at DESC"
171    )
172    .bind(id)
173    .fetch_all(pool)
174    .await
175    .context("Failed to fetch alert history")?;
176    
177    Ok(AlertWithRelatedData {
178        alert,
179        acknowledgments,
180        escalations,
181        history,
182    })
183}
184
185/// Counts the total number of alerts in the database with optional filtering.
186///
187/// This function retrieves the total count of alerts that match the provided filter criteria,
188/// which is useful for pagination and reporting purposes.
189///
190/// # Arguments
191///
192/// * `pool` - Database connection pool for executing the query
193/// * `status` - Optional filter for alert status
194/// * `severity` - Optional filter for alert severity
195/// * `org_id` - Optional filter for organization ID
196/// * `app_id` - Optional filter for application ID
197///
198/// # Returns
199///
200/// * `Ok(i64)` - Successfully retrieved count of alerts
201/// * `Err(anyhow::Error)` - Failed to count alerts
202pub async fn count_alerts(
203    pool: &Pool<MySql>,
204    status: Option<&str>,
205    severity: Option<&str>,
206    org_id: Option<i64>,
207    app_id: Option<i64>,
208) -> anyhow::Result<i64> {
209    // Start building the query
210    let mut query_string = String::from("SELECT COUNT(*) FROM alerts WHERE 1=1");
211    
212    // Add optional filters
213    if let Some(_) = status {
214        query_string.push_str(" AND status = ?");
215    }
216    if let Some(_) = severity {
217        query_string.push_str(" AND severity = ?");
218    }
219    if let Some(_) = org_id {
220        query_string.push_str(" AND org_id = ?");
221    }
222    if let Some(_) = app_id {
223        query_string.push_str(" AND app_id = ?");
224    }
225    
226    // Build the query
227    let mut query = sqlx::query_scalar::<_, i64>(&query_string);
228    
229    // Bind parameters in the same order
230    if let Some(s) = status {
231        query = query.bind(s);
232    }
233    if let Some(s) = severity {
234        query = query.bind(s);
235    }
236    if let Some(id) = org_id {
237        query = query.bind(id);
238    }
239    if let Some(id) = app_id {
240        query = query.bind(id);
241    }
242    
243    // Execute the query
244    let count = query
245        .fetch_one(pool)
246        .await
247        .context("Failed to count alerts")?;
248
249    Ok(count)
250}
251
252/// Creates a new alert in the database.
253///
254/// This function inserts a new alert record with the provided parameters and
255/// adds an initial history record to document the alert creation.
256///
257/// # Arguments
258///
259/// * `pool` - Database connection pool for executing the query
260/// * `alert_type` - Type of alert (e.g., "cpu_usage", "memory_usage", "disk_space")
261/// * `severity` - Severity level of the alert (critical, warning, info)
262/// * `service` - Name of the service that generated the alert
263/// * `message` - Alert message text describing the issue
264/// * `metadata` - Optional JSON data with additional alert details
265/// * `org_id` - Optional organization ID related to the alert
266/// * `app_id` - Optional application ID related to the alert
267/// * `instance_id` - Optional instance ID related to the alert
268/// * `region_id` - Optional region ID related to the alert
269/// * `node_id` - Optional node/worker ID related to the alert
270///
271/// # Returns
272///
273/// * `Ok(Alert)` - Successfully created alert, including database-assigned fields
274/// * `Err(anyhow::Error)` - Failed to create alert
275pub async fn create_alert(
276    pool: &Pool<MySql>,
277    alert_type: &str,
278    severity: &str,
279    service: &str,
280    message: &str,
281    metadata: Option<JsonValue>,
282    org_id: Option<i64>,
283    app_id: Option<i64>,
284    instance_id: Option<i64>,
285    region_id: Option<i64>,
286    node_id: Option<i64>,
287) -> anyhow::Result<Alert> {
288    // Begin transaction
289    let mut tx = pool.begin().await?;
290
291    // Insert alert
292    let alert = sqlx::query_as::<_, Alert>(
293        r#"INSERT INTO alerts (
294            alert_type, severity, service, message, timestamp, status,
295            metadata, org_id, app_id, instance_id, region_id, node_id
296        ) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, 'active', ?, ?, ?, ?, ?, ?)"#,
297    )
298    .bind(alert_type)
299    .bind(severity)
300    .bind(service)
301    .bind(message)
302    .bind(metadata)
303    .bind(org_id)
304    .bind(app_id)
305    .bind(instance_id)
306    .bind(region_id)
307    .bind(node_id)
308    .fetch_one(&mut *tx)
309    .await
310    .context("Failed to create alert")?;
311    
312    // Add history record for alert creation
313    sqlx::query(
314        r#"INSERT INTO alert_history (
315            alert_id, action, performed_at, previous_state, new_state
316        ) VALUES (?, 'created', CURRENT_TIMESTAMP, NULL, ?)"#,
317    )
318    .bind(alert.id)
319    .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
320    .execute(&mut *tx)
321    .await
322    .context("Failed to create alert history record")?;
323
324    // Commit transaction
325    tx.commit().await?;
326
327    // Return newly created alert
328    Ok(alert)
329}
330
331/// Updates the status of an alert.
332///
333/// This function changes the status of an alert and records the change
334/// in the alert history table. It can mark alerts as acknowledged, resolved,
335/// or auto-resolved based on system or user actions.
336///
337/// # Arguments
338///
339/// * `pool` - Database connection pool for executing the query
340/// * `id` - Unique identifier of the alert to update
341/// * `new_status` - New status for the alert (active, acknowledged, resolved, auto_resolved)
342/// * `user_id` - Optional ID of the user who performed the action
343/// * `notes` - Optional notes about the status change
344///
345/// # Returns
346///
347/// * `Ok(Alert)` - Successfully updated alert
348/// * `Err(anyhow::Error)` - Failed to update alert
349pub async fn update_alert_status(
350    pool: &Pool<MySql>,
351    id: i64,
352    new_status: &str,
353    user_id: Option<i64>,
354    notes: Option<&str>,
355) -> anyhow::Result<Alert> {
356    // Begin transaction
357    let mut tx = pool.begin().await?;
358    
359    // Get current alert state
360    let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
361        .bind(id)
362        .fetch_one(&mut *tx)
363        .await
364        .context("Failed to fetch current alert state")?;
365    
366    // Prepare update query with resolved_at and resolved_by if needed
367    let (query, resolved_at, resolved_by) = if new_status == "resolved" || new_status == "auto_resolved" {
368        (
369            "UPDATE alerts SET status = ?, resolved_at = CURRENT_TIMESTAMP, resolved_by = ? WHERE id = ?",
370            Some(chrono::Utc::now()),
371            user_id,
372        )
373    } else {
374        (
375            "UPDATE alerts SET status = ? WHERE id = ?",
376            None,
377            None,
378        )
379    };
380    
381    // Execute update based on query type
382    let updated_alert = if new_status == "resolved" || new_status == "auto_resolved" {
383        sqlx::query_as::<_, Alert>(query)
384            .bind(new_status)
385            .bind(user_id)
386            .bind(id)
387            .fetch_one(&mut *tx)
388            .await
389            .context("Failed to update alert status")?
390    } else {
391        sqlx::query_as::<_, Alert>(query)
392            .bind(new_status)
393            .bind(id)
394            .fetch_one(&mut *tx)
395            .await
396            .context("Failed to update alert status")?
397    };
398    
399    // Add history record
400    sqlx::query(
401        r#"INSERT INTO alert_history (
402            alert_id, action, performed_by, performed_at, 
403            previous_state, new_state, notes
404        ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
405    )
406    .bind(id)
407    .bind(format!("status_change_to_{}", new_status))
408    .bind(user_id)
409    .bind(serde_json::to_value(&current_alert).unwrap_or(serde_json::Value::Null))
410    .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
411    .bind(notes)
412    .execute(&mut *tx)
413    .await
414    .context("Failed to create alert history record for status update")?;
415    
416    // Create acknowledgment record if acknowledging
417    if new_status == "acknowledged" && user_id.is_some() {
418        sqlx::query(
419            r#"INSERT INTO alert_acknowledgments (
420                alert_id, user_id, acknowledged_at, notes
421            ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
422        )
423        .bind(id)
424        .bind(user_id.unwrap())
425        .bind(notes)
426        .execute(&mut *tx)
427        .await
428        .context("Failed to create alert acknowledgment record")?;
429    }
430
431    // Commit transaction
432    tx.commit().await?;
433    
434    Ok(updated_alert)
435}
436
437/// Acknowledges an alert by a specific user.
438///
439/// This function creates an acknowledgment record for the alert and
440/// optionally updates the alert status to 'acknowledged' if it's currently 'active'.
441///
442/// # Arguments
443///
444/// * `pool` - Database connection pool for executing the query
445/// * `alert_id` - ID of the alert to acknowledge
446/// * `user_id` - ID of the user acknowledging the alert
447/// * `notes` - Optional notes about the acknowledgment
448/// * `update_status` - Whether to update the alert status to 'acknowledged'
449///
450/// # Returns
451///
452/// * `Ok(AlertAcknowledgment)` - Successfully created acknowledgment
453/// * `Err(anyhow::Error)` - Failed to acknowledge alert
454pub async fn acknowledge_alert(
455    pool: &Pool<MySql>,
456    alert_id: i64,
457    user_id: i64,
458    notes: Option<&str>,
459    update_status: bool,
460) -> anyhow::Result<AlertAcknowledgment> {
461    // Begin transaction
462    let mut tx = pool.begin().await?;
463    
464    // Create acknowledgment
465    let acknowledgment = sqlx::query_as::<_, AlertAcknowledgment>(
466        r#"INSERT INTO alert_acknowledgments (
467            alert_id, user_id, acknowledged_at, notes
468        ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
469    )
470    .bind(alert_id)
471    .bind(user_id)
472    .bind(notes)
473    .fetch_one(&mut *tx)
474    .await
475    .context("Failed to create alert acknowledgment")?;
476    
477    // Update alert status if requested
478    if update_status {
479        // Fetch current alert status
480        let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
481            .bind(alert_id)
482            .fetch_one(&mut *tx)
483            .await
484            .context("Failed to fetch current alert state")?;
485        
486        // Only update if currently active
487        if current_alert.status == "active" {
488            // Update status
489            let updated_alert = sqlx::query_as::<_, Alert>(
490                "UPDATE alerts SET status = 'acknowledged' WHERE id = ?"
491            )
492            .bind(alert_id)
493            .fetch_one(&mut *tx)
494            .await
495            .context("Failed to update alert status to acknowledged")?;
496            
497            // Add history record
498            sqlx::query(
499                r#"INSERT INTO alert_history (
500                    alert_id, action, performed_by, performed_at, 
501                    previous_state, new_state, notes
502                ) VALUES (?, 'status_change_to_acknowledged', ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
503            )
504            .bind(alert_id)
505            .bind(user_id)
506            .bind(serde_json::to_value(&current_alert).unwrap_or(serde_json::Value::Null))
507            .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
508            .bind(notes)
509            .execute(&mut *tx)
510            .await
511            .context("Failed to create alert history record for acknowledgment")?;
512        }
513    }
514    
515    // Commit transaction
516    tx.commit().await?;
517    
518    Ok(acknowledgment)
519}
520
521/// Resolves an alert by a specific user.
522///
523/// This function updates the alert status to 'resolved', sets the resolved_at timestamp
524/// and resolved_by user ID, and creates a history record for the resolution.
525///
526/// # Arguments
527///
528/// * `pool` - Database connection pool for executing the query
529/// * `id` - ID of the alert to resolve
530/// * `user_id` - ID of the user resolving the alert
531/// * `notes` - Optional notes about the resolution
532///
533/// # Returns
534///
535/// * `Ok(Alert)` - Successfully resolved alert
536/// * `Err(anyhow::Error)` - Failed to resolve alert
537pub async fn resolve_alert(
538    pool: &Pool<MySql>,
539    id: i64,
540    user_id: i64,
541    notes: Option<&str>,
542) -> anyhow::Result<Alert> {
543    // Begin transaction
544    let mut tx = pool.begin().await?;
545    
546    // Get current alert state
547    let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
548        .bind(id)
549        .fetch_one(&mut *tx)
550        .await
551        .context("Failed to fetch current alert state")?;
552    
553    // Update alert
554    let updated_alert = sqlx::query_as::<_, Alert>(
555        r#"UPDATE alerts 
556           SET status = 'resolved', 
557               resolved_at = CURRENT_TIMESTAMP, 
558               resolved_by = ? 
559           WHERE id = ?"#,
560    )
561    .bind(user_id)
562    .bind(id)
563    .fetch_one(&mut *tx)
564    .await
565    .context("Failed to resolve alert")?;
566    
567    // Add history record
568    sqlx::query(
569        r#"INSERT INTO alert_history (
570            alert_id, action, performed_by, performed_at, 
571            previous_state, new_state, notes
572        ) VALUES (?, 'resolved', ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
573    )
574    .bind(id)
575    .bind(user_id)
576    .bind(serde_json::to_value(&current_alert).unwrap_or(serde_json::Value::Null))
577    .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
578    .bind(notes)
579    .execute(&mut *tx)
580    .await
581    .context("Failed to create alert history record for resolution")?;
582    
583    // Commit transaction
584    tx.commit().await?;
585    
586    Ok(updated_alert)
587}
588
589/// Creates an escalation record for an alert.
590///
591/// This function adds an escalation record to indicate that an alert has been
592/// escalated to another level of attention, such as notifying administrators
593/// or external systems when an alert has not been addressed in a timely manner.
594///
595/// # Arguments
596///
597/// * `pool` - Database connection pool for executing the query
598/// * `alert_id` - ID of the alert being escalated
599/// * `escalation_level` - Level of escalation (typically 1, 2, 3, etc.)
600/// * `escalated_to` - JSON data specifying where/who the alert was escalated to
601/// * `escalation_method` - Method used for escalation (email, SMS, webhook, etc.)
602/// * `response_required_by` - Optional deadline for when a response is required
603///
604/// # Returns
605///
606/// * `Ok(AlertEscalation)` - Successfully created escalation record
607/// * `Err(anyhow::Error)` - Failed to create escalation record
608pub async fn create_alert_escalation(
609    pool: &Pool<MySql>,
610    alert_id: i64,
611    escalation_level: i64,
612    escalated_to: JsonValue,
613    escalation_method: &str,
614    response_required_by: Option<DateTime<Utc>>,
615) -> anyhow::Result<AlertEscalation> {
616    // Begin transaction
617    let mut tx = pool.begin().await?;
618    
619    // Create escalation record
620    let escalation = sqlx::query_as::<_, AlertEscalation>(
621        r#"INSERT INTO alert_escalations (
622            alert_id, escalation_level, escalated_at, 
623            escalated_to, escalation_method, response_required_by
624        ) VALUES (?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
625    )
626    .bind(alert_id)
627    .bind(escalation_level)
628    .bind(escalated_to)
629    .bind(escalation_method)
630    .bind(response_required_by)
631    .fetch_one(&mut *tx)
632    .await
633    .context("Failed to create alert escalation")?;
634    
635    // Add history record
636    sqlx::query(
637        r#"INSERT INTO alert_history (
638            alert_id, action, performed_at, new_state
639        ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
640    )
641    .bind(alert_id)
642    .bind(format!("escalated_level_{}", escalation_level))
643    .bind(serde_json::to_value(&escalation).unwrap_or(serde_json::Value::Null))
644    .execute(&mut *tx)
645    .await
646    .context("Failed to create alert history record for escalation")?;
647    
648    // Commit transaction
649    tx.commit().await?;
650    
651    Ok(escalation)
652}
653
654/// Adds a custom history entry for an alert.
655///
656/// This function allows adding arbitrary history records for an alert,
657/// which is useful for tracking manual interventions or system actions
658/// that don't fit into predefined categories.
659///
660/// # Arguments
661///
662/// * `pool` - Database connection pool for executing the query
663/// * `alert_id` - ID of the alert
664/// * `action` - Description of the action being recorded
665/// * `performed_by` - Optional ID of the user who performed the action
666/// * `notes` - Optional notes about the action
667/// * `previous_state` - Optional JSON data representing the state before the action
668/// * `new_state` - Optional JSON data representing the state after the action
669///
670/// # Returns
671///
672/// * `Ok(AlertHistory)` - Successfully created history record
673/// * `Err(anyhow::Error)` - Failed to create history record
674pub async fn add_alert_history(
675    pool: &Pool<MySql>,
676    alert_id: i64,
677    action: &str,
678    performed_by: Option<i64>,
679    notes: Option<&str>,
680    previous_state: Option<JsonValue>,
681    new_state: Option<JsonValue>,
682) -> anyhow::Result<AlertHistory> {
683    let history = sqlx::query_as::<_, AlertHistory>(
684        r#"INSERT INTO alert_history (
685            alert_id, action, performed_by, performed_at, 
686            previous_state, new_state, notes
687        ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
688    )
689    .bind(alert_id)
690    .bind(action)
691    .bind(performed_by)
692    .bind(previous_state)
693    .bind(new_state)
694    .bind(notes)
695    .fetch_one(pool)
696    .await
697    .context("Failed to create alert history record")?;
698    
699    Ok(history)
700}
701
702/// Retrieves a list of recent alerts for a specific application.
703///
704/// This function fetches alerts associated with an application,
705/// typically for display on an application dashboard or status page.
706///
707/// # Arguments
708///
709/// * `pool` - Database connection pool for executing the query
710/// * `app_id` - ID of the application
711/// * `limit` - Maximum number of alerts to retrieve
712/// * `include_resolved` - Whether to include resolved alerts
713///
714/// # Returns
715///
716/// * `Ok(Vec<Alert>)` - Successfully retrieved list of alerts
717/// * `Err(anyhow::Error)` - Failed to fetch alerts
718pub async fn get_recent_app_alerts(
719    pool: &Pool<MySql>,
720    app_id: i64,
721    limit: i64,
722    include_resolved: bool,
723) -> anyhow::Result<Vec<Alert>> {
724    // Build query based on whether to include resolved alerts
725    let query = if include_resolved {
726        r#"
727        SELECT * FROM alerts
728        WHERE app_id = ?
729        ORDER BY timestamp DESC
730        LIMIT ?
731        "#
732    } else {
733        r#"
734        SELECT * FROM alerts
735        WHERE app_id = ? AND status IN ('active', 'acknowledged')
736        ORDER BY timestamp DESC
737        LIMIT ?
738        "#
739    };
740    
741    let alerts = sqlx::query_as::<_, Alert>(query)
742        .bind(app_id)
743        .bind(limit)
744        .fetch_all(pool)
745        .await
746        .context("Failed to fetch app alerts")?;
747    
748    Ok(alerts)
749}
750
751/// Retrieves a list of active alerts for a specific organization.
752///
753/// This function fetches active and acknowledged alerts across all applications
754/// and services within an organization, ordered by severity and timestamp.
755///
756/// # Arguments
757///
758/// * `pool` - Database connection pool for executing the query
759/// * `org_id` - ID of the organization
760/// * `limit` - Maximum number of alerts to retrieve
761///
762/// # Returns
763///
764/// * `Ok(Vec<Alert>)` - Successfully retrieved list of alerts
765/// * `Err(anyhow::Error)` - Failed to fetch alerts
766pub async fn get_org_active_alerts(
767    pool: &Pool<MySql>,
768    org_id: i64,
769    limit: i64,
770) -> anyhow::Result<Vec<Alert>> {
771    let alerts = sqlx::query_as::<_, Alert>(
772        r#"
773        SELECT * FROM alerts
774        WHERE org_id = ? AND status IN ('active', 'acknowledged')
775        ORDER BY 
776            CASE 
777                WHEN severity = 'critical' THEN 1
778                WHEN severity = 'warning' THEN 2
779                WHEN severity = 'info' THEN 3
780                ELSE 4
781            END,
782            timestamp DESC
783        LIMIT ?
784        "#
785    )
786    .bind(org_id)
787    .bind(limit)
788    .fetch_all(pool)
789    .await
790    .context("Failed to fetch org active alerts")?;
791    
792    Ok(alerts)
793}
794
795/// Gets statistics about alerts for an organization grouped by severity and status.
796///
797/// This function retrieves counts of alerts for an organization, grouped by
798/// different categories to provide an overview of the alert landscape.
799///
800/// # Arguments
801///
802/// * `pool` - Database connection pool for executing the query
803/// * `org_id` - ID of the organization
804/// * `days` - Number of days to look back for statistics
805///
806/// # Returns
807///
808/// * `Ok(JsonValue)` - JSON object with alert statistics
809/// * `Err(anyhow::Error)` - Failed to fetch alert statistics
810pub async fn get_alert_stats(
811    pool: &Pool<MySql>,
812    org_id: i64,
813    days: i64,
814) -> anyhow::Result<JsonValue> {
815    // Get counts by severity
816    let severity_counts = sqlx::query(
817        r#"
818        SELECT severity, COUNT(*) as count
819        FROM alerts
820        WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
821        GROUP BY severity
822        "#
823    )
824    .bind(org_id)
825    .bind(days)
826    .fetch_all(pool)
827    .await
828    .context("Failed to fetch severity counts")?;
829    
830    // Get counts by status
831    let status_counts = sqlx::query(
832        r#"
833        SELECT status, COUNT(*) as count
834        FROM alerts
835        WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
836        GROUP BY status
837        "#
838    )
839    .bind(org_id)
840    .bind(days)
841    .fetch_all(pool)
842    .await
843    .context("Failed to fetch status counts")?;
844    
845    // Get counts by service
846    let service_counts = sqlx::query(
847        r#"
848        SELECT service, COUNT(*) as count
849        FROM alerts
850        WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
851        GROUP BY service
852        "#
853    )
854    .bind(org_id)
855    .bind(days)
856    .fetch_all(pool)
857    .await
858    .context("Failed to fetch service counts")?;
859    
860    // Get daily trend data
861    let daily_trends = sqlx::query(
862        r#"
863        SELECT 
864            DATE(timestamp) as date,
865            COUNT(*) as total,
866            SUM(CASE WHEN severity = 'critical' THEN 1 ELSE 0 END) as critical,
867            SUM(CASE WHEN severity = 'warning' THEN 1 ELSE 0 END) as warning,
868            SUM(CASE WHEN severity = 'info' THEN 1 ELSE 0 END) as info
869        FROM alerts
870        WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
871        GROUP BY DATE(timestamp)
872        ORDER BY DATE(timestamp)
873        "#
874    )
875    .bind(org_id)
876    .bind(days)
877    .fetch_all(pool)
878    .await
879    .context("Failed to fetch daily trend data")?;
880    
881    // Format all results as JSON
882    let mut severity_json = serde_json::Map::new();
883    for row in severity_counts {
884        let severity: String = row.get("severity");
885        let count: i64 = row.get("count");
886        severity_json.insert(severity, serde_json::Value::Number(count.into()));
887    }
888    
889    let mut status_json = serde_json::Map::new();
890    for row in status_counts {
891        let status: String = row.get("status");
892        let count: i64 = row.get("count");
893        status_json.insert(status, serde_json::Value::Number(count.into()));
894    }
895    
896    let mut service_json = serde_json::Map::new();
897    for row in service_counts {
898        let service: String = row.get("service");
899        let count: i64 = row.get("count");
900        service_json.insert(service, serde_json::Value::Number(count.into()));
901    }
902    
903    let mut daily_json = Vec::new();
904    for row in daily_trends {
905        let date: chrono::NaiveDate = row.get("date");
906        let total: i64 = row.get("total");
907        let critical: i64 = row.get("critical");
908        let warning: i64 = row.get("warning");
909        let info: i64 = row.get("info");
910        
911        let day_data = serde_json::json!({
912            "date": date.format("%Y-%m-%d").to_string(),
913            "total": total,
914            "critical": critical,
915            "warning": warning,
916            "info": info
917        });
918        
919        daily_json.push(day_data);
920    }
921    
922    // Combine all statistics
923    let stats = serde_json::json!({
924        "by_severity": severity_json,
925        "by_status": status_json,
926        "by_service": service_json,
927        "daily_trends": daily_json,
928        "period_days": days
929    });
930    
931    Ok(stats)
932}
933
934/// Retrieves alerts that need escalation based on age and status.
935///
936/// This function identifies alerts that have been active for longer than a specified
937/// threshold period without being acknowledged or resolved, which may indicate
938/// they need to be escalated to ensure appropriate attention.
939///
940/// # Arguments
941///
942/// * `pool` - Database connection pool for executing the query
943/// * `org_id` - Optional organization ID to filter alerts
944/// * `hours_threshold` - Age in hours after which an alert should be considered for escalation
945///
946/// # Returns
947///
948/// * `Ok(Vec<Alert>)` - List of alerts needing escalation
949/// * `Err(anyhow::Error)` - Failed to fetch alerts
950pub async fn get_alerts_needing_escalation(
951    pool: &Pool<MySql>,
952    org_id: Option<i64>,
953    hours_threshold: i64,
954) -> anyhow::Result<Vec<Alert>> {
955    // Start building the query
956    let mut query_string = String::from(
957        r#"
958        SELECT * FROM alerts
959        WHERE status = 'active'
960          AND timestamp <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? HOUR)
961        "#
962    );
963    
964    // Add organization filter if provided
965    if let Some(_) = org_id {
966        query_string.push_str(" AND org_id = ?");
967    }
968    
969    // Add order by
970    query_string.push_str(
971        r#"
972        ORDER BY 
973            CASE 
974                WHEN severity = 'critical' THEN 1
975                WHEN severity = 'warning' THEN 2
976                ELSE 3
977            END,
978            timestamp ASC
979        "#
980    );
981    
982    // Build and execute query
983    let mut query = sqlx::query_as::<_, Alert>(&query_string)
984        .bind(hours_threshold);
985    
986    if let Some(id) = org_id {
987        query = query.bind(id);
988    }
989    
990    let alerts = query
991        .fetch_all(pool)
992        .await
993        .context("Failed to fetch alerts needing escalation")?;
994    
995    Ok(alerts)
996}
997
998/// Auto-resolves alerts that have been active for longer than a specified period.
999///
1000/// This function updates the status of old alerts to 'auto_resolved' based on criteria
1001/// such as age, severity, and current status. It's typically used for housekeeping
1002/// to prevent the accumulation of stale alerts.
1003///
1004/// # Arguments
1005///
1006/// * `pool` - Database connection pool for executing the query
1007/// * `days_threshold` - Age in days after which an alert should be auto-resolved
1008/// * `severity_levels` - Optional vector of severity levels to include (e.g., only auto-resolve 'info' alerts)
1009///
1010/// # Returns
1011///
1012/// * `Ok(i64)` - Number of alerts auto-resolved
1013/// * `Err(anyhow::Error)` - Failed to auto-resolve alerts
1014pub async fn auto_resolve_old_alerts(
1015    pool: &Pool<MySql>,
1016    days_threshold: i64,
1017    severity_levels: Option<Vec<&str>>,
1018) -> anyhow::Result<i64> {
1019    // Begin transaction
1020    let mut tx = pool.begin().await?;
1021    
1022    // Build base query
1023    let mut query_string = String::from(
1024        r#"
1025        UPDATE alerts
1026        SET status = 'auto_resolved',
1027            resolved_at = CURRENT_TIMESTAMP
1028        WHERE 
1029            status IN ('active', 'acknowledged')
1030            AND timestamp <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
1031        "#
1032    );
1033    
1034    // Add severity filter if provided
1035    if let Some(levels) = &severity_levels {
1036        if !levels.is_empty() {
1037            query_string.push_str(" AND severity IN (");
1038            query_string.push_str(&std::iter::repeat("?")
1039                .take(levels.len())
1040                .collect::<Vec<_>>()
1041                .join(", "));
1042            query_string.push_str(")");
1043        }
1044    }
1045    
1046    // Build query
1047    let mut query = sqlx::query(&query_string)
1048        .bind(days_threshold);
1049    
1050    // Bind severity levels if provided
1051    if let Some(levels) = severity_levels {
1052        for level in levels {
1053            query = query.bind(level);
1054        }
1055    }
1056    
1057    // Execute update
1058    let result = query
1059        .execute(&mut *tx)
1060        .await
1061        .context("Failed to auto-resolve old alerts")?;
1062    
1063    // Get the affected alerts to create history records
1064    let affected_alerts = sqlx::query_as::<_, Alert>(
1065        r#"
1066        SELECT * FROM alerts
1067        WHERE 
1068            status = 'auto_resolved'
1069            AND resolved_at >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 1 MINUTE)
1070        "#
1071    )
1072    .fetch_all(&mut *tx)
1073    .await
1074    .context("Failed to fetch auto-resolved alerts")?;
1075    
1076    // Create history records for each affected alert
1077    for alert in &affected_alerts {
1078        sqlx::query(
1079            r#"
1080            INSERT INTO alert_history (
1081                alert_id, action, performed_at, new_state, notes
1082            ) VALUES (?, 'auto_resolved', CURRENT_TIMESTAMP, ?, 'Alert auto-resolved due to age')
1083            "#
1084        )
1085        .bind(alert.id)
1086        .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
1087        .execute(&mut *tx)
1088        .await
1089        .context("Failed to create history record for auto-resolved alert")?;
1090    }
1091    
1092    // Commit transaction
1093    tx.commit().await?;
1094    
1095    Ok(result.rows_affected() as i64)
1096}
1097
1098/// Retrieves alerts that match a specific search query.
1099///
1100/// This function searches for alerts containing specific text in their message,
1101/// type, or service fields. It's useful for implementing search functionality
1102/// in the alerts UI.
1103///
1104/// # Arguments
1105///
1106/// * `pool` - Database connection pool for executing the query
1107/// * `search_query` - Text to search for in alert fields
1108/// * `org_id` - Optional organization ID to filter alerts
1109/// * `page` - Zero-based page number for pagination
1110/// * `per_page` - Number of records per page
1111///
1112/// # Returns
1113///
1114/// * `Ok(Vec<Alert>)` - List of matching alerts
1115/// * `Err(anyhow::Error)` - Failed to search for alerts
1116pub async fn search_alerts(
1117    pool: &Pool<MySql>,
1118    search_query: &str,
1119    org_id: Option<i64>,
1120    page: i64,
1121    per_page: i64,
1122) -> anyhow::Result<Vec<Alert>> {
1123    // Prepare search pattern
1124    let pattern = format!("%{}%", search_query);
1125    
1126    // Start building the query
1127    let mut query_string = String::from(
1128        r#"
1129        SELECT * FROM alerts
1130        WHERE (
1131            message LIKE ? OR
1132            alert_type LIKE ? OR
1133            service LIKE ?
1134        )
1135        "#
1136    );
1137    
1138    // Add organization filter if provided
1139    if let Some(_) = org_id {
1140        query_string.push_str(" AND org_id = ?");
1141    }
1142    
1143    // Add order by and pagination
1144    query_string.push_str(
1145        r#"
1146        ORDER BY timestamp DESC
1147        LIMIT ? OFFSET ?
1148        "#
1149    );
1150    
1151    // Build and execute query
1152    let mut query = sqlx::query_as::<_, Alert>(&query_string)
1153        .bind(&pattern)
1154        .bind(&pattern)
1155        .bind(&pattern);
1156    
1157    if let Some(id) = org_id {
1158        query = query.bind(id);
1159    }
1160    
1161    query = query
1162        .bind(per_page)
1163        .bind(page * per_page);
1164    
1165    let alerts = query
1166        .fetch_all(pool)
1167        .await
1168        .context("Failed to search alerts")?;
1169    
1170    Ok(alerts)
1171}
1172
1173/// Gets the count of matching alerts for a search query.
1174///
1175/// This function counts the number of alerts that match a specific search query,
1176/// which is useful for pagination in search results.
1177///
1178/// # Arguments
1179///
1180/// * `pool` - Database connection pool for executing the query
1181/// * `search_query` - Text to search for in alert fields
1182/// * `org_id` - Optional organization ID to filter alerts
1183///
1184/// # Returns
1185///
1186/// * `Ok(i64)` - Count of matching alerts
1187/// * `Err(anyhow::Error)` - Failed to count alerts
1188pub async fn count_search_alerts(
1189    pool: &Pool<MySql>,
1190    search_query: &str,
1191    org_id: Option<i64>,
1192) -> anyhow::Result<i64> {
1193    // Prepare search pattern
1194    let pattern = format!("%{}%", search_query);
1195    
1196    // Start building the query
1197    let mut query_string = String::from(
1198        r#"
1199        SELECT COUNT(*) FROM alerts
1200        WHERE (
1201            message LIKE ? OR
1202            alert_type LIKE ? OR
1203            service LIKE ?
1204        )
1205        "#
1206    );
1207    
1208    // Add organization filter if provided
1209    if let Some(_) = org_id {
1210        query_string.push_str(" AND org_id = ?");
1211    }
1212    
1213    // Build and execute query
1214    let mut query = sqlx::query_scalar::<_, i64>(&query_string)
1215        .bind(&pattern)
1216        .bind(&pattern)
1217        .bind(&pattern);
1218    
1219    if let Some(id) = org_id {
1220        query = query.bind(id);
1221    }
1222    
1223    let count = query
1224        .fetch_one(pool)
1225        .await
1226        .context("Failed to count search alerts")?;
1227    
1228    Ok(count)
1229}
1230
1231/// Bulk updates the status of multiple alerts.
1232///
1233/// This function changes the status of multiple alerts at once based on provided criteria,
1234/// which is useful for operations like resolving all alerts for a specific service or application.
1235///
1236/// # Arguments
1237///
1238/// * `pool` - Database connection pool for executing the query
1239/// * `ids` - Optional vector of alert IDs to update
1240/// * `service` - Optional service name to filter alerts
1241/// * `app_id` - Optional application ID to filter alerts
1242/// * `new_status` - Status to set for matching alerts
1243/// * `user_id` - ID of the user performing the bulk update
1244/// * `notes` - Optional notes about the bulk update
1245///
1246/// # Returns
1247///
1248/// * `Ok(i64)` - Number of alerts updated
1249/// * `Err(anyhow::Error)` - Failed to update alerts
1250pub async fn bulk_update_alert_status(
1251    pool: &Pool<MySql>,
1252    ids: Option<Vec<i64>>,
1253    service: Option<&str>,
1254    app_id: Option<i64>,
1255    new_status: &str,
1256    user_id: i64,
1257    notes: Option<&str>,
1258) -> anyhow::Result<i64> {
1259    // Validate that at least one filter is provided
1260    if ids.is_none() && service.is_none() && app_id.is_none() {
1261        return Err(anyhow::anyhow!("At least one filter (ids, service, or app_id) must be provided"));
1262    }
1263    
1264    // Begin transaction
1265    let mut tx = pool.begin().await?;
1266    
1267    // First, get all alerts that will be affected to create history records
1268    let mut select_query_string = String::from("SELECT * FROM alerts WHERE status IN ('active', 'acknowledged')");
1269    
1270    if let Some(alert_ids) = &ids {
1271        if !alert_ids.is_empty() {
1272            select_query_string.push_str(" AND id IN (");
1273            select_query_string.push_str(&std::iter::repeat("?")
1274                .take(alert_ids.len())
1275                .collect::<Vec<_>>()
1276                .join(", "));
1277            select_query_string.push_str(")");
1278        }
1279    }
1280    
1281    if let Some(_) = service {
1282        select_query_string.push_str(" AND service = ?");
1283    }
1284    
1285    if let Some(_) = app_id {
1286        select_query_string.push_str(" AND app_id = ?");
1287    }
1288    
1289    // Build select query
1290    let mut select_query = sqlx::query_as::<_, Alert>(&select_query_string);
1291    
1292    // Bind parameters
1293    if let Some(alert_ids) = &ids {
1294        for id in alert_ids {
1295            select_query = select_query.bind(*id);
1296        }
1297    }
1298    
1299    if let Some(s) = service {
1300        select_query = select_query.bind(s);
1301    }
1302    
1303    if let Some(id) = app_id {
1304        select_query = select_query.bind(id);
1305    }
1306    
1307    // Execute select query
1308    let affected_alerts = select_query
1309        .fetch_all(&mut *tx)
1310        .await
1311        .context("Failed to fetch alerts for bulk update")?;
1312    
1313    // If no alerts match the criteria, return early
1314    if affected_alerts.is_empty() {
1315        return Ok(0);
1316    }
1317    
1318    // Now prepare the update query
1319    let mut update_query_string = String::from(
1320        "UPDATE alerts SET status = ?"
1321    );
1322    
1323    // Add resolved_at and resolved_by if applicable
1324    if new_status == "resolved" || new_status == "auto_resolved" {
1325        update_query_string.push_str(", resolved_at = CURRENT_TIMESTAMP, resolved_by = ?");
1326    }
1327    
1328    // Add WHERE clause
1329    update_query_string.push_str(" WHERE status IN ('active', 'acknowledged')");
1330    
1331    if let Some(alert_ids) = &ids {
1332        if !alert_ids.is_empty() {
1333            update_query_string.push_str(" AND id IN (");
1334            update_query_string.push_str(&std::iter::repeat("?")
1335                .take(alert_ids.len())
1336                .collect::<Vec<_>>()
1337                .join(", "));
1338            update_query_string.push_str(")");
1339        }
1340    }
1341    
1342    if let Some(_) = service {
1343        update_query_string.push_str(" AND service = ?");
1344    }
1345    
1346    if let Some(_) = app_id {
1347        update_query_string.push_str(" AND app_id = ?");
1348    }
1349    
1350    // Build update query
1351    let mut update_query = sqlx::query(&update_query_string)
1352        .bind(new_status);
1353    
1354    // Bind resolved_by if applicable
1355    if new_status == "resolved" || new_status == "auto_resolved" {
1356        update_query = update_query.bind(user_id);
1357    }
1358    
1359    // Bind filter parameters
1360    if let Some(alert_ids) = &ids {
1361        for id in alert_ids {
1362            update_query = update_query.bind(*id);
1363        }
1364    }
1365    
1366    if let Some(s) = service {
1367        update_query = update_query.bind(s);
1368    }
1369    
1370    if let Some(id) = app_id {
1371        update_query = update_query.bind(id);
1372    }
1373    
1374    // Execute update
1375    let update_result = update_query
1376        .execute(&mut *tx)
1377        .await
1378        .context("Failed to bulk update alert status")?;
1379    
1380    // Create history records for each affected alert
1381    for alert in &affected_alerts {
1382        // Create a new Alert instance with updated fields
1383        let updated_alert = Alert {
1384            id: alert.id,
1385            alert_type: alert.alert_type.clone(),
1386            severity: alert.severity.clone(),
1387            service: alert.service.clone(),
1388            message: alert.message.clone(),
1389            timestamp: alert.timestamp,
1390            status: new_status.to_string(),
1391            metadata: alert.metadata.clone(),
1392            org_id: alert.org_id,
1393            app_id: alert.app_id,
1394            instance_id: alert.instance_id,
1395            region_id: alert.region_id,
1396            node_id: alert.node_id,
1397            resolved_at: if new_status == "resolved" || new_status == "auto_resolved" {
1398                Some(chrono::Utc::now())
1399            } else {
1400                alert.resolved_at
1401            },
1402            resolved_by: if new_status == "resolved" || new_status == "auto_resolved" {
1403                Some(user_id)
1404            } else {
1405                alert.resolved_by
1406            }
1407        };
1408        
1409        // Add history record
1410        sqlx::query(
1411            r#"
1412            INSERT INTO alert_history (
1413                alert_id, action, performed_by, performed_at, 
1414                previous_state, new_state, notes
1415            ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)
1416            "#
1417        )
1418        .bind(alert.id)
1419        .bind(format!("bulk_status_change_to_{}", new_status))
1420        .bind(user_id)
1421        .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
1422        .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
1423        .bind(notes)
1424        .execute(&mut *tx)
1425        .await
1426        .context("Failed to create history record for bulk update")?;
1427        
1428        // If acknowledging, create acknowledgment records
1429        if new_status == "acknowledged" {
1430            sqlx::query(
1431                r#"
1432                INSERT INTO alert_acknowledgments (
1433                    alert_id, user_id, acknowledged_at, notes
1434                ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)
1435                "#
1436            )
1437            .bind(alert.id)
1438            .bind(user_id)
1439            .bind(notes)
1440            .execute(&mut *tx)
1441            .await
1442            .context("Failed to create acknowledgment record for bulk update")?;
1443        }
1444    }
1445    
1446    // Commit transaction
1447    tx.commit().await?;
1448    
1449    Ok(update_result.rows_affected() as i64)
1450}