omni_orchestrator/schemas/v1/db/queries/
alert.rs

1use anyhow::Context;
2use serde::Serialize;
3use sqlx::{MySql, Pool, Row};
4use chrono::{DateTime, Utc};
5use serde_json::Value as JsonValue;
6
7use libomni::types::db::v1 as types;
8use types::alert::{
9    Alert, AlertWithAcknowledgments, AlertAcknowledgment, 
10    AlertEscalation, AlertHistory, AlertWithRelatedData
11};
12
13// =================== Alert Management ===================
14
15/// Retrieves a paginated list of alerts from the database.
16///
17/// This function fetches a subset of alerts based on pagination parameters,
18/// ordering them by timestamp in descending order (newest first). Filtering
19/// options allow for narrowing down results by various criteria.
20///
21/// # Arguments
22///
23/// * `pool` - Database connection pool for executing the query
24/// * `page` - Zero-based page number (e.g., 0 for first page, 1 for second page)
25/// * `per_page` - Number of records to fetch per page
26/// * `status` - Optional filter for alert status
27/// * `severity` - Optional filter for alert severity
28/// * `org_id` - Optional filter for organization ID
29/// * `app_id` - Optional filter for application ID
30/// * `service` - Optional filter for service name
31/// * `from_date` - Optional filter for alerts after this timestamp
32/// * `to_date` - Optional filter for alerts before this timestamp
33///
34/// # Returns
35///
36/// * `Ok(Vec<Alert>)` - Successfully retrieved list of alerts
37/// * `Err(anyhow::Error)` - Failed to fetch alerts, with context
38pub async fn list_alerts(
39    pool: &Pool<MySql>,
40    page: i64,
41    per_page: i64,
42    status: Option<&str>,
43    severity: Option<&str>,
44    org_id: Option<i64>,
45    app_id: Option<i64>,
46    service: Option<&str>,
47    from_date: Option<DateTime<Utc>>,
48    to_date: Option<DateTime<Utc>>,
49) -> anyhow::Result<Vec<Alert>> {
50    println!("Attempting to fetch alerts from database with filtering...");
51
52    // Start building the query with base selection
53    let mut query_string = String::from(
54        "SELECT * FROM alerts WHERE 1=1"
55    );
56    
57    // Add optional filters
58    if let Some(s) = status {
59        query_string.push_str(" AND status = ?");
60    }
61    if let Some(s) = severity {
62        query_string.push_str(" AND severity = ?");
63    }
64    if let Some(_) = org_id {
65        query_string.push_str(" AND org_id = ?");
66    }
67    if let Some(_) = app_id {
68        query_string.push_str(" AND app_id = ?");
69    }
70    if let Some(s) = service {
71        query_string.push_str(" AND service = ?");
72    }
73    if let Some(_) = from_date {
74        query_string.push_str(" AND timestamp >= ?");
75    }
76    if let Some(_) = to_date {
77        query_string.push_str(" AND timestamp <= ?");
78    }
79    
80    // Add order and limit
81    query_string.push_str(" ORDER BY timestamp DESC LIMIT ? OFFSET ?");
82    
83    // Build the query
84    let mut query = sqlx::query_as::<_, Alert>(&query_string);
85    
86    // Bind parameters in the same order
87    if let Some(s) = status {
88        query = query.bind(s);
89    }
90    if let Some(s) = severity {
91        query = query.bind(s);
92    }
93    if let Some(id) = org_id {
94        query = query.bind(id);
95    }
96    if let Some(id) = app_id {
97        query = query.bind(id);
98    }
99    if let Some(s) = service {
100        query = query.bind(s);
101    }
102    if let Some(date) = from_date {
103        query = query.bind(date);
104    }
105    if let Some(date) = to_date {
106        query = query.bind(date);
107    }
108    
109    // Bind pagination params
110    query = query.bind(per_page).bind(page * per_page);
111    
112    // Execute the query
113    let result = query.fetch_all(pool).await;
114
115    match result {
116        Ok(alerts) => {
117            println!("Successfully fetched {} alerts", alerts.len());
118            Ok(alerts)
119        }
120        Err(e) => {
121            eprintln!("Error fetching alerts: {:#?}", e);
122            Err(anyhow::Error::new(e).context("Failed to fetch alerts"))
123        }
124    }
125}
126
127/// Retrieves a specific alert by its ID, along with related acknowledgments and escalations.
128///
129/// This function fetches a single alert record along with its associated acknowledgments,
130/// escalations, and history records to provide comprehensive information about the alert.
131///
132/// # Arguments
133///
134/// * `pool` - Database connection pool for executing the query
135/// * `id` - Unique identifier of the alert to retrieve
136///
137/// # Returns
138///
139/// * `Ok(AlertWithRelatedData)` - Successfully retrieved alert with related data
140/// * `Err(anyhow::Error)` - Failed to fetch alert or related data
141pub async fn get_alert_with_related_data(
142    pool: &Pool<MySql>,
143    id: i64,
144) -> anyhow::Result<AlertWithRelatedData> {
145    // First fetch the alert
146    let alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
147        .bind(id)
148        .fetch_one(pool)
149        .await
150        .context("Failed to fetch alert")?;
151        
152    // Fetch acknowledgments
153    let acknowledgments = sqlx::query_as::<_, AlertAcknowledgment>(
154        "SELECT * FROM alert_acknowledgments WHERE alert_id = ? ORDER BY acknowledged_at DESC"
155    )
156    .bind(id)
157    .fetch_all(pool)
158    .await
159    .context("Failed to fetch alert acknowledgments")?;
160    
161    // Fetch escalations
162    let escalations = sqlx::query_as::<_, AlertEscalation>(
163        "SELECT * FROM alert_escalations WHERE alert_id = ? ORDER BY escalated_at DESC"
164    )
165    .bind(id)
166    .fetch_all(pool)
167    .await
168    .context("Failed to fetch alert escalations")?;
169    
170    // Fetch history
171    let history = sqlx::query_as::<_, AlertHistory>(
172        "SELECT * FROM alert_history WHERE alert_id = ? ORDER BY performed_at DESC"
173    )
174    .bind(id)
175    .fetch_all(pool)
176    .await
177    .context("Failed to fetch alert history")?;
178    
179    Ok(AlertWithRelatedData {
180        alert,
181        acknowledgments,
182        escalations,
183        history,
184    })
185}
186
187/// Counts the total number of alerts in the database with optional filtering.
188///
189/// This function retrieves the total count of alerts that match the provided filter criteria,
190/// which is useful for pagination and reporting purposes.
191///
192/// # Arguments
193///
194/// * `pool` - Database connection pool for executing the query
195/// * `status` - Optional filter for alert status
196/// * `severity` - Optional filter for alert severity
197/// * `org_id` - Optional filter for organization ID
198/// * `app_id` - Optional filter for application ID
199///
200/// # Returns
201///
202/// * `Ok(i64)` - Successfully retrieved count of alerts
203/// * `Err(anyhow::Error)` - Failed to count alerts
204pub async fn count_alerts(
205    pool: &Pool<MySql>,
206    status: Option<&str>,
207    severity: Option<&str>,
208    org_id: Option<i64>,
209    app_id: Option<i64>,
210) -> anyhow::Result<i64> {
211    // Start building the query
212    let mut query_string = String::from("SELECT COUNT(*) FROM alerts WHERE 1=1");
213    
214    // Add optional filters
215    if let Some(_) = status {
216        query_string.push_str(" AND status = ?");
217    }
218    if let Some(_) = severity {
219        query_string.push_str(" AND severity = ?");
220    }
221    if let Some(_) = org_id {
222        query_string.push_str(" AND org_id = ?");
223    }
224    if let Some(_) = app_id {
225        query_string.push_str(" AND app_id = ?");
226    }
227    
228    // Build the query
229    let mut query = sqlx::query_scalar::<_, i64>(&query_string);
230    
231    // Bind parameters in the same order
232    if let Some(s) = status {
233        query = query.bind(s);
234    }
235    if let Some(s) = severity {
236        query = query.bind(s);
237    }
238    if let Some(id) = org_id {
239        query = query.bind(id);
240    }
241    if let Some(id) = app_id {
242        query = query.bind(id);
243    }
244    
245    // Execute the query
246    let count = query
247        .fetch_one(pool)
248        .await
249        .context("Failed to count alerts")?;
250
251    Ok(count)
252}
253
254/// Creates a new alert in the database.
255///
256/// This function inserts a new alert record with the provided parameters and
257/// adds an initial history record to document the alert creation.
258///
259/// # Arguments
260///
261/// * `pool` - Database connection pool for executing the query
262/// * `alert_type` - Type of alert (e.g., "cpu_usage", "memory_usage", "disk_space")
263/// * `severity` - Severity level of the alert (critical, warning, info)
264/// * `service` - Name of the service that generated the alert
265/// * `message` - Alert message text describing the issue
266/// * `metadata` - Optional JSON data with additional alert details
267/// * `org_id` - Optional organization ID related to the alert
268/// * `app_id` - Optional application ID related to the alert
269/// * `instance_id` - Optional instance ID related to the alert
270/// * `region_id` - Optional region ID related to the alert
271/// * `node_id` - Optional node/worker ID related to the alert
272///
273/// # Returns
274///
275/// * `Ok(Alert)` - Successfully created alert, including database-assigned fields
276/// * `Err(anyhow::Error)` - Failed to create alert
277pub async fn create_alert(
278    pool: &Pool<MySql>,
279    alert_type: &str,
280    severity: &str,
281    service: &str,
282    message: &str,
283    metadata: Option<JsonValue>,
284    org_id: Option<i64>,
285    app_id: Option<i64>,
286    instance_id: Option<i64>,
287    region_id: Option<i64>,
288    node_id: Option<i64>,
289) -> anyhow::Result<Alert> {
290    // Begin transaction
291    let mut tx = pool.begin().await?;
292
293    // Insert alert
294    let alert = sqlx::query_as::<_, Alert>(
295        r#"INSERT INTO alerts (
296            alert_type, severity, service, message, timestamp, status,
297            metadata, org_id, app_id, instance_id, region_id, node_id
298        ) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, 'active', ?, ?, ?, ?, ?, ?)"#,
299    )
300    .bind(alert_type)
301    .bind(severity)
302    .bind(service)
303    .bind(message)
304    .bind(metadata)
305    .bind(org_id)
306    .bind(app_id)
307    .bind(instance_id)
308    .bind(region_id)
309    .bind(node_id)
310    .fetch_one(&mut *tx)
311    .await
312    .context("Failed to create alert")?;
313    
314    // Add history record for alert creation
315    sqlx::query(
316        r#"INSERT INTO alert_history (
317            alert_id, action, performed_at, previous_state, new_state
318        ) VALUES (?, 'created', CURRENT_TIMESTAMP, NULL, ?)"#,
319    )
320    .bind(alert.id)
321    .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
322    .execute(&mut *tx)
323    .await
324    .context("Failed to create alert history record")?;
325
326    // Commit transaction
327    tx.commit().await?;
328
329    // Return newly created alert
330    Ok(alert)
331}
332
333/// Updates the status of an alert.
334///
335/// This function changes the status of an alert and records the change
336/// in the alert history table. It can mark alerts as acknowledged, resolved,
337/// or auto-resolved based on system or user actions.
338///
339/// # Arguments
340///
341/// * `pool` - Database connection pool for executing the query
342/// * `id` - Unique identifier of the alert to update
343/// * `new_status` - New status for the alert (active, acknowledged, resolved, auto_resolved)
344/// * `user_id` - Optional ID of the user who performed the action
345/// * `notes` - Optional notes about the status change
346///
347/// # Returns
348///
349/// * `Ok(Alert)` - Successfully updated alert
350/// * `Err(anyhow::Error)` - Failed to update alert
351pub async fn update_alert_status(
352    pool: &Pool<MySql>,
353    id: i64,
354    new_status: &str,
355    user_id: Option<i64>,
356    notes: Option<&str>,
357) -> anyhow::Result<Alert> {
358    // Begin transaction
359    let mut tx = pool.begin().await?;
360    
361    // Get current alert state
362    let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
363        .bind(id)
364        .fetch_one(&mut *tx)
365        .await
366        .context("Failed to fetch current alert state")?;
367    
368    // Prepare update query with resolved_at and resolved_by if needed
369    let (query, resolved_at, resolved_by) = if new_status == "resolved" || new_status == "auto_resolved" {
370        (
371            "UPDATE alerts SET status = ?, resolved_at = CURRENT_TIMESTAMP, resolved_by = ? WHERE id = ?",
372            Some(chrono::Utc::now()),
373            user_id,
374        )
375    } else {
376        (
377            "UPDATE alerts SET status = ? WHERE id = ?",
378            None,
379            None,
380        )
381    };
382    
383    // Execute update based on query type
384    let updated_alert = if new_status == "resolved" || new_status == "auto_resolved" {
385        sqlx::query_as::<_, Alert>(query)
386            .bind(new_status)
387            .bind(user_id)
388            .bind(id)
389            .fetch_one(&mut *tx)
390            .await
391            .context("Failed to update alert status")?
392    } else {
393        sqlx::query_as::<_, Alert>(query)
394            .bind(new_status)
395            .bind(id)
396            .fetch_one(&mut *tx)
397            .await
398            .context("Failed to update alert status")?
399    };
400    
401    // Add history record
402    sqlx::query(
403        r#"INSERT INTO alert_history (
404            alert_id, action, performed_by, performed_at, 
405            previous_state, new_state, notes
406        ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
407    )
408    .bind(id)
409    .bind(format!("status_change_to_{}", new_status))
410    .bind(user_id)
411    .bind(serde_json::to_value(&current_alert).unwrap_or(serde_json::Value::Null))
412    .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
413    .bind(notes)
414    .execute(&mut *tx)
415    .await
416    .context("Failed to create alert history record for status update")?;
417    
418    // Create acknowledgment record if acknowledging
419    if new_status == "acknowledged" && user_id.is_some() {
420        sqlx::query(
421            r#"INSERT INTO alert_acknowledgments (
422                alert_id, user_id, acknowledged_at, notes
423            ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
424        )
425        .bind(id)
426        .bind(user_id.unwrap())
427        .bind(notes)
428        .execute(&mut *tx)
429        .await
430        .context("Failed to create alert acknowledgment record")?;
431    }
432
433    // Commit transaction
434    tx.commit().await?;
435    
436    Ok(updated_alert)
437}
438
439/// Acknowledges an alert by a specific user.
440///
441/// This function creates an acknowledgment record for the alert and
442/// optionally updates the alert status to 'acknowledged' if it's currently 'active'.
443///
444/// # Arguments
445///
446/// * `pool` - Database connection pool for executing the query
447/// * `alert_id` - ID of the alert to acknowledge
448/// * `user_id` - ID of the user acknowledging the alert
449/// * `notes` - Optional notes about the acknowledgment
450/// * `update_status` - Whether to update the alert status to 'acknowledged'
451///
452/// # Returns
453///
454/// * `Ok(AlertAcknowledgment)` - Successfully created acknowledgment
455/// * `Err(anyhow::Error)` - Failed to acknowledge alert
456pub async fn acknowledge_alert(
457    pool: &Pool<MySql>,
458    alert_id: i64,
459    user_id: i64,
460    notes: Option<&str>,
461    update_status: bool,
462) -> anyhow::Result<AlertAcknowledgment> {
463    // Begin transaction
464    let mut tx = pool.begin().await?;
465    
466    // Create acknowledgment
467    let acknowledgment = sqlx::query_as::<_, AlertAcknowledgment>(
468        r#"INSERT INTO alert_acknowledgments (
469            alert_id, user_id, acknowledged_at, notes
470        ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
471    )
472    .bind(alert_id)
473    .bind(user_id)
474    .bind(notes)
475    .fetch_one(&mut *tx)
476    .await
477    .context("Failed to create alert acknowledgment")?;
478    
479    // Update alert status if requested
480    if update_status {
481        // Fetch current alert status
482        let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
483            .bind(alert_id)
484            .fetch_one(&mut *tx)
485            .await
486            .context("Failed to fetch current alert state")?;
487        
488        // Only update if currently active
489        if current_alert.status == "active" {
490            // Update status
491            let updated_alert = sqlx::query_as::<_, Alert>(
492                "UPDATE alerts SET status = 'acknowledged' WHERE id = ?"
493            )
494            .bind(alert_id)
495            .fetch_one(&mut *tx)
496            .await
497            .context("Failed to update alert status to acknowledged")?;
498            
499            // Add history record
500            sqlx::query(
501                r#"INSERT INTO alert_history (
502                    alert_id, action, performed_by, performed_at, 
503                    previous_state, new_state, notes
504                ) VALUES (?, 'status_change_to_acknowledged', ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
505            )
506            .bind(alert_id)
507            .bind(user_id)
508            .bind(serde_json::to_value(&current_alert).unwrap_or(serde_json::Value::Null))
509            .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
510            .bind(notes)
511            .execute(&mut *tx)
512            .await
513            .context("Failed to create alert history record for acknowledgment")?;
514        }
515    }
516    
517    // Commit transaction
518    tx.commit().await?;
519    
520    Ok(acknowledgment)
521}
522
523/// Resolves an alert by a specific user.
524///
525/// This function updates the alert status to 'resolved', sets the resolved_at timestamp
526/// and resolved_by user ID, and creates a history record for the resolution.
527///
528/// # Arguments
529///
530/// * `pool` - Database connection pool for executing the query
531/// * `id` - ID of the alert to resolve
532/// * `user_id` - ID of the user resolving the alert
533/// * `notes` - Optional notes about the resolution
534///
535/// # Returns
536///
537/// * `Ok(Alert)` - Successfully resolved alert
538/// * `Err(anyhow::Error)` - Failed to resolve alert
539pub async fn resolve_alert(
540    pool: &Pool<MySql>,
541    id: i64,
542    user_id: i64,
543    notes: Option<&str>,
544) -> anyhow::Result<Alert> {
545    // Begin transaction
546    let mut tx = pool.begin().await?;
547    
548    // Get current alert state
549    let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
550        .bind(id)
551        .fetch_one(&mut *tx)
552        .await
553        .context("Failed to fetch current alert state")?;
554    
555    // Update alert
556    let updated_alert = sqlx::query_as::<_, Alert>(
557        r#"UPDATE alerts 
558           SET status = 'resolved', 
559               resolved_at = CURRENT_TIMESTAMP, 
560               resolved_by = ? 
561           WHERE id = ?"#,
562    )
563    .bind(user_id)
564    .bind(id)
565    .fetch_one(&mut *tx)
566    .await
567    .context("Failed to resolve alert")?;
568    
569    // Add history record
570    sqlx::query(
571        r#"INSERT INTO alert_history (
572            alert_id, action, performed_by, performed_at, 
573            previous_state, new_state, notes
574        ) VALUES (?, 'resolved', ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
575    )
576    .bind(id)
577    .bind(user_id)
578    .bind(serde_json::to_value(&current_alert).unwrap_or(serde_json::Value::Null))
579    .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
580    .bind(notes)
581    .execute(&mut *tx)
582    .await
583    .context("Failed to create alert history record for resolution")?;
584    
585    // Commit transaction
586    tx.commit().await?;
587    
588    Ok(updated_alert)
589}
590
591/// Creates an escalation record for an alert.
592///
593/// This function adds an escalation record to indicate that an alert has been
594/// escalated to another level of attention, such as notifying administrators
595/// or external systems when an alert has not been addressed in a timely manner.
596///
597/// # Arguments
598///
599/// * `pool` - Database connection pool for executing the query
600/// * `alert_id` - ID of the alert being escalated
601/// * `escalation_level` - Level of escalation (typically 1, 2, 3, etc.)
602/// * `escalated_to` - JSON data specifying where/who the alert was escalated to
603/// * `escalation_method` - Method used for escalation (email, SMS, webhook, etc.)
604/// * `response_required_by` - Optional deadline for when a response is required
605///
606/// # Returns
607///
608/// * `Ok(AlertEscalation)` - Successfully created escalation record
609/// * `Err(anyhow::Error)` - Failed to create escalation record
610pub async fn create_alert_escalation(
611    pool: &Pool<MySql>,
612    alert_id: i64,
613    escalation_level: i64,
614    escalated_to: JsonValue,
615    escalation_method: &str,
616    response_required_by: Option<DateTime<Utc>>,
617) -> anyhow::Result<AlertEscalation> {
618    // Begin transaction
619    let mut tx = pool.begin().await?;
620    
621    // Create escalation record
622    let escalation = sqlx::query_as::<_, AlertEscalation>(
623        r#"INSERT INTO alert_escalations (
624            alert_id, escalation_level, escalated_at, 
625            escalated_to, escalation_method, response_required_by
626        ) VALUES (?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
627    )
628    .bind(alert_id)
629    .bind(escalation_level)
630    .bind(escalated_to)
631    .bind(escalation_method)
632    .bind(response_required_by)
633    .fetch_one(&mut *tx)
634    .await
635    .context("Failed to create alert escalation")?;
636    
637    // Add history record
638    sqlx::query(
639        r#"INSERT INTO alert_history (
640            alert_id, action, performed_at, new_state
641        ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
642    )
643    .bind(alert_id)
644    .bind(format!("escalated_level_{}", escalation_level))
645    .bind(serde_json::to_value(&escalation).unwrap_or(serde_json::Value::Null))
646    .execute(&mut *tx)
647    .await
648    .context("Failed to create alert history record for escalation")?;
649    
650    // Commit transaction
651    tx.commit().await?;
652    
653    Ok(escalation)
654}
655
656/// Adds a custom history entry for an alert.
657///
658/// This function allows adding arbitrary history records for an alert,
659/// which is useful for tracking manual interventions or system actions
660/// that don't fit into predefined categories.
661///
662/// # Arguments
663///
664/// * `pool` - Database connection pool for executing the query
665/// * `alert_id` - ID of the alert
666/// * `action` - Description of the action being recorded
667/// * `performed_by` - Optional ID of the user who performed the action
668/// * `notes` - Optional notes about the action
669/// * `previous_state` - Optional JSON data representing the state before the action
670/// * `new_state` - Optional JSON data representing the state after the action
671///
672/// # Returns
673///
674/// * `Ok(AlertHistory)` - Successfully created history record
675/// * `Err(anyhow::Error)` - Failed to create history record
676pub async fn add_alert_history(
677    pool: &Pool<MySql>,
678    alert_id: i64,
679    action: &str,
680    performed_by: Option<i64>,
681    notes: Option<&str>,
682    previous_state: Option<JsonValue>,
683    new_state: Option<JsonValue>,
684) -> anyhow::Result<AlertHistory> {
685    let history = sqlx::query_as::<_, AlertHistory>(
686        r#"INSERT INTO alert_history (
687            alert_id, action, performed_by, performed_at, 
688            previous_state, new_state, notes
689        ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
690    )
691    .bind(alert_id)
692    .bind(action)
693    .bind(performed_by)
694    .bind(previous_state)
695    .bind(new_state)
696    .bind(notes)
697    .fetch_one(pool)
698    .await
699    .context("Failed to create alert history record")?;
700    
701    Ok(history)
702}
703
704/// Retrieves a list of recent alerts for a specific application.
705///
706/// This function fetches alerts associated with an application,
707/// typically for display on an application dashboard or status page.
708///
709/// # Arguments
710///
711/// * `pool` - Database connection pool for executing the query
712/// * `app_id` - ID of the application
713/// * `limit` - Maximum number of alerts to retrieve
714/// * `include_resolved` - Whether to include resolved alerts
715///
716/// # Returns
717///
718/// * `Ok(Vec<Alert>)` - Successfully retrieved list of alerts
719/// * `Err(anyhow::Error)` - Failed to fetch alerts
720pub async fn get_recent_app_alerts(
721    pool: &Pool<MySql>,
722    app_id: i64,
723    limit: i64,
724    include_resolved: bool,
725) -> anyhow::Result<Vec<Alert>> {
726    // Build query based on whether to include resolved alerts
727    let query = if include_resolved {
728        r#"
729        SELECT * FROM alerts
730        WHERE app_id = ?
731        ORDER BY timestamp DESC
732        LIMIT ?
733        "#
734    } else {
735        r#"
736        SELECT * FROM alerts
737        WHERE app_id = ? AND status IN ('active', 'acknowledged')
738        ORDER BY timestamp DESC
739        LIMIT ?
740        "#
741    };
742    
743    let alerts = sqlx::query_as::<_, Alert>(query)
744        .bind(app_id)
745        .bind(limit)
746        .fetch_all(pool)
747        .await
748        .context("Failed to fetch app alerts")?;
749    
750    Ok(alerts)
751}
752
753/// Retrieves a list of active alerts for a specific organization.
754///
755/// This function fetches active and acknowledged alerts across all applications
756/// and services within an organization, ordered by severity and timestamp.
757///
758/// # Arguments
759///
760/// * `pool` - Database connection pool for executing the query
761/// * `org_id` - ID of the organization
762/// * `limit` - Maximum number of alerts to retrieve
763///
764/// # Returns
765///
766/// * `Ok(Vec<Alert>)` - Successfully retrieved list of alerts
767/// * `Err(anyhow::Error)` - Failed to fetch alerts
768pub async fn get_org_active_alerts(
769    pool: &Pool<MySql>,
770    org_id: i64,
771    limit: i64,
772) -> anyhow::Result<Vec<Alert>> {
773    let alerts = sqlx::query_as::<_, Alert>(
774        r#"
775        SELECT * FROM alerts
776        WHERE org_id = ? AND status IN ('active', 'acknowledged')
777        ORDER BY 
778            CASE 
779                WHEN severity = 'critical' THEN 1
780                WHEN severity = 'warning' THEN 2
781                WHEN severity = 'info' THEN 3
782                ELSE 4
783            END,
784            timestamp DESC
785        LIMIT ?
786        "#
787    )
788    .bind(org_id)
789    .bind(limit)
790    .fetch_all(pool)
791    .await
792    .context("Failed to fetch org active alerts")?;
793    
794    Ok(alerts)
795}
796
797/// Gets statistics about alerts for an organization grouped by severity and status.
798///
799/// This function retrieves counts of alerts for an organization, grouped by
800/// different categories to provide an overview of the alert landscape.
801///
802/// # Arguments
803///
804/// * `pool` - Database connection pool for executing the query
805/// * `org_id` - ID of the organization
806/// * `days` - Number of days to look back for statistics
807///
808/// # Returns
809///
810/// * `Ok(JsonValue)` - JSON object with alert statistics
811/// * `Err(anyhow::Error)` - Failed to fetch alert statistics
812pub async fn get_alert_stats(
813    pool: &Pool<MySql>,
814    org_id: i64,
815    days: i64,
816) -> anyhow::Result<JsonValue> {
817    // Get counts by severity
818    let severity_counts = sqlx::query(
819        r#"
820        SELECT severity, COUNT(*) as count
821        FROM alerts
822        WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
823        GROUP BY severity
824        "#
825    )
826    .bind(org_id)
827    .bind(days)
828    .fetch_all(pool)
829    .await
830    .context("Failed to fetch severity counts")?;
831    
832    // Get counts by status
833    let status_counts = sqlx::query(
834        r#"
835        SELECT status, COUNT(*) as count
836        FROM alerts
837        WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
838        GROUP BY status
839        "#
840    )
841    .bind(org_id)
842    .bind(days)
843    .fetch_all(pool)
844    .await
845    .context("Failed to fetch status counts")?;
846    
847    // Get counts by service
848    let service_counts = sqlx::query(
849        r#"
850        SELECT service, COUNT(*) as count
851        FROM alerts
852        WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
853        GROUP BY service
854        "#
855    )
856    .bind(org_id)
857    .bind(days)
858    .fetch_all(pool)
859    .await
860    .context("Failed to fetch service counts")?;
861    
862    // Get daily trend data
863    let daily_trends = sqlx::query(
864        r#"
865        SELECT 
866            DATE(timestamp) as date,
867            COUNT(*) as total,
868            SUM(CASE WHEN severity = 'critical' THEN 1 ELSE 0 END) as critical,
869            SUM(CASE WHEN severity = 'warning' THEN 1 ELSE 0 END) as warning,
870            SUM(CASE WHEN severity = 'info' THEN 1 ELSE 0 END) as info
871        FROM alerts
872        WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
873        GROUP BY DATE(timestamp)
874        ORDER BY DATE(timestamp)
875        "#
876    )
877    .bind(org_id)
878    .bind(days)
879    .fetch_all(pool)
880    .await
881    .context("Failed to fetch daily trend data")?;
882    
883    // Format all results as JSON
884    let mut severity_json = serde_json::Map::new();
885    for row in severity_counts {
886        let severity: String = row.get("severity");
887        let count: i64 = row.get("count");
888        severity_json.insert(severity, serde_json::Value::Number(count.into()));
889    }
890    
891    let mut status_json = serde_json::Map::new();
892    for row in status_counts {
893        let status: String = row.get("status");
894        let count: i64 = row.get("count");
895        status_json.insert(status, serde_json::Value::Number(count.into()));
896    }
897    
898    let mut service_json = serde_json::Map::new();
899    for row in service_counts {
900        let service: String = row.get("service");
901        let count: i64 = row.get("count");
902        service_json.insert(service, serde_json::Value::Number(count.into()));
903    }
904    
905    let mut daily_json = Vec::new();
906    for row in daily_trends {
907        let date: chrono::NaiveDate = row.get("date");
908        let total: i64 = row.get("total");
909        let critical: i64 = row.get("critical");
910        let warning: i64 = row.get("warning");
911        let info: i64 = row.get("info");
912        
913        let day_data = serde_json::json!({
914            "date": date.format("%Y-%m-%d").to_string(),
915            "total": total,
916            "critical": critical,
917            "warning": warning,
918            "info": info
919        });
920        
921        daily_json.push(day_data);
922    }
923    
924    // Combine all statistics
925    let stats = serde_json::json!({
926        "by_severity": severity_json,
927        "by_status": status_json,
928        "by_service": service_json,
929        "daily_trends": daily_json,
930        "period_days": days
931    });
932    
933    Ok(stats)
934}
935
936/// Retrieves alerts that need escalation based on age and status.
937///
938/// This function identifies alerts that have been active for longer than a specified
939/// threshold period without being acknowledged or resolved, which may indicate
940/// they need to be escalated to ensure appropriate attention.
941///
942/// # Arguments
943///
944/// * `pool` - Database connection pool for executing the query
945/// * `org_id` - Optional organization ID to filter alerts
946/// * `hours_threshold` - Age in hours after which an alert should be considered for escalation
947///
948/// # Returns
949///
950/// * `Ok(Vec<Alert>)` - List of alerts needing escalation
951/// * `Err(anyhow::Error)` - Failed to fetch alerts
952pub async fn get_alerts_needing_escalation(
953    pool: &Pool<MySql>,
954    org_id: Option<i64>,
955    hours_threshold: i64,
956) -> anyhow::Result<Vec<Alert>> {
957    // Start building the query
958    let mut query_string = String::from(
959        r#"
960        SELECT * FROM alerts
961        WHERE status = 'active'
962          AND timestamp <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? HOUR)
963        "#
964    );
965    
966    // Add organization filter if provided
967    if let Some(_) = org_id {
968        query_string.push_str(" AND org_id = ?");
969    }
970    
971    // Add order by
972    query_string.push_str(
973        r#"
974        ORDER BY 
975            CASE 
976                WHEN severity = 'critical' THEN 1
977                WHEN severity = 'warning' THEN 2
978                ELSE 3
979            END,
980            timestamp ASC
981        "#
982    );
983    
984    // Build and execute query
985    let mut query = sqlx::query_as::<_, Alert>(&query_string)
986        .bind(hours_threshold);
987    
988    if let Some(id) = org_id {
989        query = query.bind(id);
990    }
991    
992    let alerts = query
993        .fetch_all(pool)
994        .await
995        .context("Failed to fetch alerts needing escalation")?;
996    
997    Ok(alerts)
998}
999
1000/// Auto-resolves alerts that have been active for longer than a specified period.
1001///
1002/// This function updates the status of old alerts to 'auto_resolved' based on criteria
1003/// such as age, severity, and current status. It's typically used for housekeeping
1004/// to prevent the accumulation of stale alerts.
1005///
1006/// # Arguments
1007///
1008/// * `pool` - Database connection pool for executing the query
1009/// * `days_threshold` - Age in days after which an alert should be auto-resolved
1010/// * `severity_levels` - Optional vector of severity levels to include (e.g., only auto-resolve 'info' alerts)
1011///
1012/// # Returns
1013///
1014/// * `Ok(i64)` - Number of alerts auto-resolved
1015/// * `Err(anyhow::Error)` - Failed to auto-resolve alerts
1016pub async fn auto_resolve_old_alerts(
1017    pool: &Pool<MySql>,
1018    days_threshold: i64,
1019    severity_levels: Option<Vec<&str>>,
1020) -> anyhow::Result<i64> {
1021    // Begin transaction
1022    let mut tx = pool.begin().await?;
1023    
1024    // Build base query
1025    let mut query_string = String::from(
1026        r#"
1027        UPDATE alerts
1028        SET status = 'auto_resolved',
1029            resolved_at = CURRENT_TIMESTAMP
1030        WHERE 
1031            status IN ('active', 'acknowledged')
1032            AND timestamp <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
1033        "#
1034    );
1035    
1036    // Add severity filter if provided
1037    if let Some(levels) = &severity_levels {
1038        if !levels.is_empty() {
1039            query_string.push_str(" AND severity IN (");
1040            query_string.push_str(&std::iter::repeat("?")
1041                .take(levels.len())
1042                .collect::<Vec<_>>()
1043                .join(", "));
1044            query_string.push_str(")");
1045        }
1046    }
1047    
1048    // Build query
1049    let mut query = sqlx::query(&query_string)
1050        .bind(days_threshold);
1051    
1052    // Bind severity levels if provided
1053    if let Some(levels) = severity_levels {
1054        for level in levels {
1055            query = query.bind(level);
1056        }
1057    }
1058    
1059    // Execute update
1060    let result = query
1061        .execute(&mut *tx)
1062        .await
1063        .context("Failed to auto-resolve old alerts")?;
1064    
1065    // Get the affected alerts to create history records
1066    let affected_alerts = sqlx::query_as::<_, Alert>(
1067        r#"
1068        SELECT * FROM alerts
1069        WHERE 
1070            status = 'auto_resolved'
1071            AND resolved_at >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 1 MINUTE)
1072        "#
1073    )
1074    .fetch_all(&mut *tx)
1075    .await
1076    .context("Failed to fetch auto-resolved alerts")?;
1077    
1078    // Create history records for each affected alert
1079    for alert in &affected_alerts {
1080        sqlx::query(
1081            r#"
1082            INSERT INTO alert_history (
1083                alert_id, action, performed_at, new_state, notes
1084            ) VALUES (?, 'auto_resolved', CURRENT_TIMESTAMP, ?, 'Alert auto-resolved due to age')
1085            "#
1086        )
1087        .bind(alert.id)
1088        .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
1089        .execute(&mut *tx)
1090        .await
1091        .context("Failed to create history record for auto-resolved alert")?;
1092    }
1093    
1094    // Commit transaction
1095    tx.commit().await?;
1096    
1097    Ok(result.rows_affected() as i64)
1098}
1099
1100/// Retrieves alerts that match a specific search query.
1101///
1102/// This function searches for alerts containing specific text in their message,
1103/// type, or service fields. It's useful for implementing search functionality
1104/// in the alerts UI.
1105///
1106/// # Arguments
1107///
1108/// * `pool` - Database connection pool for executing the query
1109/// * `search_query` - Text to search for in alert fields
1110/// * `org_id` - Optional organization ID to filter alerts
1111/// * `page` - Zero-based page number for pagination
1112/// * `per_page` - Number of records per page
1113///
1114/// # Returns
1115///
1116/// * `Ok(Vec<Alert>)` - List of matching alerts
1117/// * `Err(anyhow::Error)` - Failed to search for alerts
1118pub async fn search_alerts(
1119    pool: &Pool<MySql>,
1120    search_query: &str,
1121    org_id: Option<i64>,
1122    page: i64,
1123    per_page: i64,
1124) -> anyhow::Result<Vec<Alert>> {
1125    // Prepare search pattern
1126    let pattern = format!("%{}%", search_query);
1127    
1128    // Start building the query
1129    let mut query_string = String::from(
1130        r#"
1131        SELECT * FROM alerts
1132        WHERE (
1133            message LIKE ? OR
1134            alert_type LIKE ? OR
1135            service LIKE ?
1136        )
1137        "#
1138    );
1139    
1140    // Add organization filter if provided
1141    if let Some(_) = org_id {
1142        query_string.push_str(" AND org_id = ?");
1143    }
1144    
1145    // Add order by and pagination
1146    query_string.push_str(
1147        r#"
1148        ORDER BY timestamp DESC
1149        LIMIT ? OFFSET ?
1150        "#
1151    );
1152    
1153    // Build and execute query
1154    let mut query = sqlx::query_as::<_, Alert>(&query_string)
1155        .bind(&pattern)
1156        .bind(&pattern)
1157        .bind(&pattern);
1158    
1159    if let Some(id) = org_id {
1160        query = query.bind(id);
1161    }
1162    
1163    query = query
1164        .bind(per_page)
1165        .bind(page * per_page);
1166    
1167    let alerts = query
1168        .fetch_all(pool)
1169        .await
1170        .context("Failed to search alerts")?;
1171    
1172    Ok(alerts)
1173}
1174
1175/// Gets the count of matching alerts for a search query.
1176///
1177/// This function counts the number of alerts that match a specific search query,
1178/// which is useful for pagination in search results.
1179///
1180/// # Arguments
1181///
1182/// * `pool` - Database connection pool for executing the query
1183/// * `search_query` - Text to search for in alert fields
1184/// * `org_id` - Optional organization ID to filter alerts
1185///
1186/// # Returns
1187///
1188/// * `Ok(i64)` - Count of matching alerts
1189/// * `Err(anyhow::Error)` - Failed to count alerts
1190pub async fn count_search_alerts(
1191    pool: &Pool<MySql>,
1192    search_query: &str,
1193    org_id: Option<i64>,
1194) -> anyhow::Result<i64> {
1195    // Prepare search pattern
1196    let pattern = format!("%{}%", search_query);
1197    
1198    // Start building the query
1199    let mut query_string = String::from(
1200        r#"
1201        SELECT COUNT(*) FROM alerts
1202        WHERE (
1203            message LIKE ? OR
1204            alert_type LIKE ? OR
1205            service LIKE ?
1206        )
1207        "#
1208    );
1209    
1210    // Add organization filter if provided
1211    if let Some(_) = org_id {
1212        query_string.push_str(" AND org_id = ?");
1213    }
1214    
1215    // Build and execute query
1216    let mut query = sqlx::query_scalar::<_, i64>(&query_string)
1217        .bind(&pattern)
1218        .bind(&pattern)
1219        .bind(&pattern);
1220    
1221    if let Some(id) = org_id {
1222        query = query.bind(id);
1223    }
1224    
1225    let count = query
1226        .fetch_one(pool)
1227        .await
1228        .context("Failed to count search alerts")?;
1229    
1230    Ok(count)
1231}
1232
1233/// Bulk updates the status of multiple alerts.
1234///
1235/// This function changes the status of multiple alerts at once based on provided criteria,
1236/// which is useful for operations like resolving all alerts for a specific service or application.
1237///
1238/// # Arguments
1239///
1240/// * `pool` - Database connection pool for executing the query
1241/// * `ids` - Optional vector of alert IDs to update
1242/// * `service` - Optional service name to filter alerts
1243/// * `app_id` - Optional application ID to filter alerts
1244/// * `new_status` - Status to set for matching alerts
1245/// * `user_id` - ID of the user performing the bulk update
1246/// * `notes` - Optional notes about the bulk update
1247///
1248/// # Returns
1249///
1250/// * `Ok(i64)` - Number of alerts updated
1251/// * `Err(anyhow::Error)` - Failed to update alerts
1252pub async fn bulk_update_alert_status(
1253    pool: &Pool<MySql>,
1254    ids: Option<Vec<i64>>,
1255    service: Option<&str>,
1256    app_id: Option<i64>,
1257    new_status: &str,
1258    user_id: i64,
1259    notes: Option<&str>,
1260) -> anyhow::Result<i64> {
1261    // Validate that at least one filter is provided
1262    if ids.is_none() && service.is_none() && app_id.is_none() {
1263        return Err(anyhow::anyhow!("At least one filter (ids, service, or app_id) must be provided"));
1264    }
1265    
1266    // Begin transaction
1267    let mut tx = pool.begin().await?;
1268    
1269    // First, get all alerts that will be affected to create history records
1270    let mut select_query_string = String::from("SELECT * FROM alerts WHERE status IN ('active', 'acknowledged')");
1271    
1272    if let Some(alert_ids) = &ids {
1273        if !alert_ids.is_empty() {
1274            select_query_string.push_str(" AND id IN (");
1275            select_query_string.push_str(&std::iter::repeat("?")
1276                .take(alert_ids.len())
1277                .collect::<Vec<_>>()
1278                .join(", "));
1279            select_query_string.push_str(")");
1280        }
1281    }
1282    
1283    if let Some(_) = service {
1284        select_query_string.push_str(" AND service = ?");
1285    }
1286    
1287    if let Some(_) = app_id {
1288        select_query_string.push_str(" AND app_id = ?");
1289    }
1290    
1291    // Build select query
1292    let mut select_query = sqlx::query_as::<_, Alert>(&select_query_string);
1293    
1294    // Bind parameters
1295    if let Some(alert_ids) = &ids {
1296        for id in alert_ids {
1297            select_query = select_query.bind(*id);
1298        }
1299    }
1300    
1301    if let Some(s) = service {
1302        select_query = select_query.bind(s);
1303    }
1304    
1305    if let Some(id) = app_id {
1306        select_query = select_query.bind(id);
1307    }
1308    
1309    // Execute select query
1310    let affected_alerts = select_query
1311        .fetch_all(&mut *tx)
1312        .await
1313        .context("Failed to fetch alerts for bulk update")?;
1314    
1315    // If no alerts match the criteria, return early
1316    if affected_alerts.is_empty() {
1317        return Ok(0);
1318    }
1319    
1320    // Now prepare the update query
1321    let mut update_query_string = String::from(
1322        "UPDATE alerts SET status = ?"
1323    );
1324    
1325    // Add resolved_at and resolved_by if applicable
1326    if new_status == "resolved" || new_status == "auto_resolved" {
1327        update_query_string.push_str(", resolved_at = CURRENT_TIMESTAMP, resolved_by = ?");
1328    }
1329    
1330    // Add WHERE clause
1331    update_query_string.push_str(" WHERE status IN ('active', 'acknowledged')");
1332    
1333    if let Some(alert_ids) = &ids {
1334        if !alert_ids.is_empty() {
1335            update_query_string.push_str(" AND id IN (");
1336            update_query_string.push_str(&std::iter::repeat("?")
1337                .take(alert_ids.len())
1338                .collect::<Vec<_>>()
1339                .join(", "));
1340            update_query_string.push_str(")");
1341        }
1342    }
1343    
1344    if let Some(_) = service {
1345        update_query_string.push_str(" AND service = ?");
1346    }
1347    
1348    if let Some(_) = app_id {
1349        update_query_string.push_str(" AND app_id = ?");
1350    }
1351    
1352    // Build update query
1353    let mut update_query = sqlx::query(&update_query_string)
1354        .bind(new_status);
1355    
1356    // Bind resolved_by if applicable
1357    if new_status == "resolved" || new_status == "auto_resolved" {
1358        update_query = update_query.bind(user_id);
1359    }
1360    
1361    // Bind filter parameters
1362    if let Some(alert_ids) = &ids {
1363        for id in alert_ids {
1364            update_query = update_query.bind(*id);
1365        }
1366    }
1367    
1368    if let Some(s) = service {
1369        update_query = update_query.bind(s);
1370    }
1371    
1372    if let Some(id) = app_id {
1373        update_query = update_query.bind(id);
1374    }
1375    
1376    // Execute update
1377    let update_result = update_query
1378        .execute(&mut *tx)
1379        .await
1380        .context("Failed to bulk update alert status")?;
1381    
1382    // Create history records for each affected alert
1383    for alert in &affected_alerts {
1384        // Create a new Alert instance with updated fields
1385        let updated_alert = Alert {
1386            id: alert.id,
1387            alert_type: alert.alert_type.clone(),
1388            severity: alert.severity.clone(),
1389            service: alert.service.clone(),
1390            message: alert.message.clone(),
1391            timestamp: alert.timestamp,
1392            status: new_status.to_string(),
1393            metadata: alert.metadata.clone(),
1394            org_id: alert.org_id,
1395            app_id: alert.app_id,
1396            instance_id: alert.instance_id,
1397            region_id: alert.region_id,
1398            node_id: alert.node_id,
1399            resolved_at: if new_status == "resolved" || new_status == "auto_resolved" {
1400                Some(chrono::Utc::now())
1401            } else {
1402                alert.resolved_at
1403            },
1404            resolved_by: if new_status == "resolved" || new_status == "auto_resolved" {
1405                Some(user_id)
1406            } else {
1407                alert.resolved_by
1408            }
1409        };
1410        
1411        // Add history record
1412        sqlx::query(
1413            r#"
1414            INSERT INTO alert_history (
1415                alert_id, action, performed_by, performed_at, 
1416                previous_state, new_state, notes
1417            ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)
1418            "#
1419        )
1420        .bind(alert.id)
1421        .bind(format!("bulk_status_change_to_{}", new_status))
1422        .bind(user_id)
1423        .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
1424        .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
1425        .bind(notes)
1426        .execute(&mut *tx)
1427        .await
1428        .context("Failed to create history record for bulk update")?;
1429        
1430        // If acknowledging, create acknowledgment records
1431        if new_status == "acknowledged" {
1432            sqlx::query(
1433                r#"
1434                INSERT INTO alert_acknowledgments (
1435                    alert_id, user_id, acknowledged_at, notes
1436                ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)
1437                "#
1438            )
1439            .bind(alert.id)
1440            .bind(user_id)
1441            .bind(notes)
1442            .execute(&mut *tx)
1443            .await
1444            .context("Failed to create acknowledgment record for bulk update")?;
1445        }
1446    }
1447    
1448    // Commit transaction
1449    tx.commit().await?;
1450    
1451    Ok(update_result.rows_affected() as i64)
1452}