1use crate::models::alert::{
2 Alert, AlertWithAcknowledgments, AlertAcknowledgment,
3 AlertEscalation, AlertHistory, AlertWithRelatedData
4};
5use anyhow::Context;
6use serde::Serialize;
7use sqlx::{MySql, Pool, Row};
8use chrono::{DateTime, Utc};
9use serde_json::Value as JsonValue;
10
11pub async fn list_alerts(
37 pool: &Pool<MySql>,
38 page: i64,
39 per_page: i64,
40 status: Option<&str>,
41 severity: Option<&str>,
42 org_id: Option<i64>,
43 app_id: Option<i64>,
44 service: Option<&str>,
45 from_date: Option<DateTime<Utc>>,
46 to_date: Option<DateTime<Utc>>,
47) -> anyhow::Result<Vec<Alert>> {
48 println!("Attempting to fetch alerts from database with filtering...");
49
50 let mut query_string = String::from(
52 "SELECT * FROM alerts WHERE 1=1"
53 );
54
55 if let Some(s) = status {
57 query_string.push_str(" AND status = ?");
58 }
59 if let Some(s) = severity {
60 query_string.push_str(" AND severity = ?");
61 }
62 if let Some(_) = org_id {
63 query_string.push_str(" AND org_id = ?");
64 }
65 if let Some(_) = app_id {
66 query_string.push_str(" AND app_id = ?");
67 }
68 if let Some(s) = service {
69 query_string.push_str(" AND service = ?");
70 }
71 if let Some(_) = from_date {
72 query_string.push_str(" AND timestamp >= ?");
73 }
74 if let Some(_) = to_date {
75 query_string.push_str(" AND timestamp <= ?");
76 }
77
78 query_string.push_str(" ORDER BY timestamp DESC LIMIT ? OFFSET ?");
80
81 let mut query = sqlx::query_as::<_, Alert>(&query_string);
83
84 if let Some(s) = status {
86 query = query.bind(s);
87 }
88 if let Some(s) = severity {
89 query = query.bind(s);
90 }
91 if let Some(id) = org_id {
92 query = query.bind(id);
93 }
94 if let Some(id) = app_id {
95 query = query.bind(id);
96 }
97 if let Some(s) = service {
98 query = query.bind(s);
99 }
100 if let Some(date) = from_date {
101 query = query.bind(date);
102 }
103 if let Some(date) = to_date {
104 query = query.bind(date);
105 }
106
107 query = query.bind(per_page).bind(page * per_page);
109
110 let result = query.fetch_all(pool).await;
112
113 match result {
114 Ok(alerts) => {
115 println!("Successfully fetched {} alerts", alerts.len());
116 Ok(alerts)
117 }
118 Err(e) => {
119 eprintln!("Error fetching alerts: {:#?}", e);
120 Err(anyhow::Error::new(e).context("Failed to fetch alerts"))
121 }
122 }
123}
124
125pub async fn get_alert_with_related_data(
140 pool: &Pool<MySql>,
141 id: i64,
142) -> anyhow::Result<AlertWithRelatedData> {
143 let alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
145 .bind(id)
146 .fetch_one(pool)
147 .await
148 .context("Failed to fetch alert")?;
149
150 let acknowledgments = sqlx::query_as::<_, AlertAcknowledgment>(
152 "SELECT * FROM alert_acknowledgments WHERE alert_id = ? ORDER BY acknowledged_at DESC"
153 )
154 .bind(id)
155 .fetch_all(pool)
156 .await
157 .context("Failed to fetch alert acknowledgments")?;
158
159 let escalations = sqlx::query_as::<_, AlertEscalation>(
161 "SELECT * FROM alert_escalations WHERE alert_id = ? ORDER BY escalated_at DESC"
162 )
163 .bind(id)
164 .fetch_all(pool)
165 .await
166 .context("Failed to fetch alert escalations")?;
167
168 let history = sqlx::query_as::<_, AlertHistory>(
170 "SELECT * FROM alert_history WHERE alert_id = ? ORDER BY performed_at DESC"
171 )
172 .bind(id)
173 .fetch_all(pool)
174 .await
175 .context("Failed to fetch alert history")?;
176
177 Ok(AlertWithRelatedData {
178 alert,
179 acknowledgments,
180 escalations,
181 history,
182 })
183}
184
185pub async fn count_alerts(
203 pool: &Pool<MySql>,
204 status: Option<&str>,
205 severity: Option<&str>,
206 org_id: Option<i64>,
207 app_id: Option<i64>,
208) -> anyhow::Result<i64> {
209 let mut query_string = String::from("SELECT COUNT(*) FROM alerts WHERE 1=1");
211
212 if let Some(_) = status {
214 query_string.push_str(" AND status = ?");
215 }
216 if let Some(_) = severity {
217 query_string.push_str(" AND severity = ?");
218 }
219 if let Some(_) = org_id {
220 query_string.push_str(" AND org_id = ?");
221 }
222 if let Some(_) = app_id {
223 query_string.push_str(" AND app_id = ?");
224 }
225
226 let mut query = sqlx::query_scalar::<_, i64>(&query_string);
228
229 if let Some(s) = status {
231 query = query.bind(s);
232 }
233 if let Some(s) = severity {
234 query = query.bind(s);
235 }
236 if let Some(id) = org_id {
237 query = query.bind(id);
238 }
239 if let Some(id) = app_id {
240 query = query.bind(id);
241 }
242
243 let count = query
245 .fetch_one(pool)
246 .await
247 .context("Failed to count alerts")?;
248
249 Ok(count)
250}
251
252pub async fn create_alert(
276 pool: &Pool<MySql>,
277 alert_type: &str,
278 severity: &str,
279 service: &str,
280 message: &str,
281 metadata: Option<JsonValue>,
282 org_id: Option<i64>,
283 app_id: Option<i64>,
284 instance_id: Option<i64>,
285 region_id: Option<i64>,
286 node_id: Option<i64>,
287) -> anyhow::Result<Alert> {
288 let mut tx = pool.begin().await?;
290
291 let alert = sqlx::query_as::<_, Alert>(
293 r#"INSERT INTO alerts (
294 alert_type, severity, service, message, timestamp, status,
295 metadata, org_id, app_id, instance_id, region_id, node_id
296 ) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, 'active', ?, ?, ?, ?, ?, ?)"#,
297 )
298 .bind(alert_type)
299 .bind(severity)
300 .bind(service)
301 .bind(message)
302 .bind(metadata)
303 .bind(org_id)
304 .bind(app_id)
305 .bind(instance_id)
306 .bind(region_id)
307 .bind(node_id)
308 .fetch_one(&mut *tx)
309 .await
310 .context("Failed to create alert")?;
311
312 sqlx::query(
314 r#"INSERT INTO alert_history (
315 alert_id, action, performed_at, previous_state, new_state
316 ) VALUES (?, 'created', CURRENT_TIMESTAMP, NULL, ?)"#,
317 )
318 .bind(alert.id)
319 .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
320 .execute(&mut *tx)
321 .await
322 .context("Failed to create alert history record")?;
323
324 tx.commit().await?;
326
327 Ok(alert)
329}
330
331pub async fn update_alert_status(
350 pool: &Pool<MySql>,
351 id: i64,
352 new_status: &str,
353 user_id: Option<i64>,
354 notes: Option<&str>,
355) -> anyhow::Result<Alert> {
356 let mut tx = pool.begin().await?;
358
359 let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
361 .bind(id)
362 .fetch_one(&mut *tx)
363 .await
364 .context("Failed to fetch current alert state")?;
365
366 let (query, resolved_at, resolved_by) = if new_status == "resolved" || new_status == "auto_resolved" {
368 (
369 "UPDATE alerts SET status = ?, resolved_at = CURRENT_TIMESTAMP, resolved_by = ? WHERE id = ?",
370 Some(chrono::Utc::now()),
371 user_id,
372 )
373 } else {
374 (
375 "UPDATE alerts SET status = ? WHERE id = ?",
376 None,
377 None,
378 )
379 };
380
381 let updated_alert = if new_status == "resolved" || new_status == "auto_resolved" {
383 sqlx::query_as::<_, Alert>(query)
384 .bind(new_status)
385 .bind(user_id)
386 .bind(id)
387 .fetch_one(&mut *tx)
388 .await
389 .context("Failed to update alert status")?
390 } else {
391 sqlx::query_as::<_, Alert>(query)
392 .bind(new_status)
393 .bind(id)
394 .fetch_one(&mut *tx)
395 .await
396 .context("Failed to update alert status")?
397 };
398
399 sqlx::query(
401 r#"INSERT INTO alert_history (
402 alert_id, action, performed_by, performed_at,
403 previous_state, new_state, notes
404 ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
405 )
406 .bind(id)
407 .bind(format!("status_change_to_{}", new_status))
408 .bind(user_id)
409 .bind(serde_json::to_value(¤t_alert).unwrap_or(serde_json::Value::Null))
410 .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
411 .bind(notes)
412 .execute(&mut *tx)
413 .await
414 .context("Failed to create alert history record for status update")?;
415
416 if new_status == "acknowledged" && user_id.is_some() {
418 sqlx::query(
419 r#"INSERT INTO alert_acknowledgments (
420 alert_id, user_id, acknowledged_at, notes
421 ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
422 )
423 .bind(id)
424 .bind(user_id.unwrap())
425 .bind(notes)
426 .execute(&mut *tx)
427 .await
428 .context("Failed to create alert acknowledgment record")?;
429 }
430
431 tx.commit().await?;
433
434 Ok(updated_alert)
435}
436
437pub async fn acknowledge_alert(
455 pool: &Pool<MySql>,
456 alert_id: i64,
457 user_id: i64,
458 notes: Option<&str>,
459 update_status: bool,
460) -> anyhow::Result<AlertAcknowledgment> {
461 let mut tx = pool.begin().await?;
463
464 let acknowledgment = sqlx::query_as::<_, AlertAcknowledgment>(
466 r#"INSERT INTO alert_acknowledgments (
467 alert_id, user_id, acknowledged_at, notes
468 ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
469 )
470 .bind(alert_id)
471 .bind(user_id)
472 .bind(notes)
473 .fetch_one(&mut *tx)
474 .await
475 .context("Failed to create alert acknowledgment")?;
476
477 if update_status {
479 let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
481 .bind(alert_id)
482 .fetch_one(&mut *tx)
483 .await
484 .context("Failed to fetch current alert state")?;
485
486 if current_alert.status == "active" {
488 let updated_alert = sqlx::query_as::<_, Alert>(
490 "UPDATE alerts SET status = 'acknowledged' WHERE id = ?"
491 )
492 .bind(alert_id)
493 .fetch_one(&mut *tx)
494 .await
495 .context("Failed to update alert status to acknowledged")?;
496
497 sqlx::query(
499 r#"INSERT INTO alert_history (
500 alert_id, action, performed_by, performed_at,
501 previous_state, new_state, notes
502 ) VALUES (?, 'status_change_to_acknowledged', ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
503 )
504 .bind(alert_id)
505 .bind(user_id)
506 .bind(serde_json::to_value(¤t_alert).unwrap_or(serde_json::Value::Null))
507 .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
508 .bind(notes)
509 .execute(&mut *tx)
510 .await
511 .context("Failed to create alert history record for acknowledgment")?;
512 }
513 }
514
515 tx.commit().await?;
517
518 Ok(acknowledgment)
519}
520
521pub async fn resolve_alert(
538 pool: &Pool<MySql>,
539 id: i64,
540 user_id: i64,
541 notes: Option<&str>,
542) -> anyhow::Result<Alert> {
543 let mut tx = pool.begin().await?;
545
546 let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
548 .bind(id)
549 .fetch_one(&mut *tx)
550 .await
551 .context("Failed to fetch current alert state")?;
552
553 let updated_alert = sqlx::query_as::<_, Alert>(
555 r#"UPDATE alerts
556 SET status = 'resolved',
557 resolved_at = CURRENT_TIMESTAMP,
558 resolved_by = ?
559 WHERE id = ?"#,
560 )
561 .bind(user_id)
562 .bind(id)
563 .fetch_one(&mut *tx)
564 .await
565 .context("Failed to resolve alert")?;
566
567 sqlx::query(
569 r#"INSERT INTO alert_history (
570 alert_id, action, performed_by, performed_at,
571 previous_state, new_state, notes
572 ) VALUES (?, 'resolved', ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
573 )
574 .bind(id)
575 .bind(user_id)
576 .bind(serde_json::to_value(¤t_alert).unwrap_or(serde_json::Value::Null))
577 .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
578 .bind(notes)
579 .execute(&mut *tx)
580 .await
581 .context("Failed to create alert history record for resolution")?;
582
583 tx.commit().await?;
585
586 Ok(updated_alert)
587}
588
589pub async fn create_alert_escalation(
609 pool: &Pool<MySql>,
610 alert_id: i64,
611 escalation_level: i64,
612 escalated_to: JsonValue,
613 escalation_method: &str,
614 response_required_by: Option<DateTime<Utc>>,
615) -> anyhow::Result<AlertEscalation> {
616 let mut tx = pool.begin().await?;
618
619 let escalation = sqlx::query_as::<_, AlertEscalation>(
621 r#"INSERT INTO alert_escalations (
622 alert_id, escalation_level, escalated_at,
623 escalated_to, escalation_method, response_required_by
624 ) VALUES (?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
625 )
626 .bind(alert_id)
627 .bind(escalation_level)
628 .bind(escalated_to)
629 .bind(escalation_method)
630 .bind(response_required_by)
631 .fetch_one(&mut *tx)
632 .await
633 .context("Failed to create alert escalation")?;
634
635 sqlx::query(
637 r#"INSERT INTO alert_history (
638 alert_id, action, performed_at, new_state
639 ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
640 )
641 .bind(alert_id)
642 .bind(format!("escalated_level_{}", escalation_level))
643 .bind(serde_json::to_value(&escalation).unwrap_or(serde_json::Value::Null))
644 .execute(&mut *tx)
645 .await
646 .context("Failed to create alert history record for escalation")?;
647
648 tx.commit().await?;
650
651 Ok(escalation)
652}
653
654pub async fn add_alert_history(
675 pool: &Pool<MySql>,
676 alert_id: i64,
677 action: &str,
678 performed_by: Option<i64>,
679 notes: Option<&str>,
680 previous_state: Option<JsonValue>,
681 new_state: Option<JsonValue>,
682) -> anyhow::Result<AlertHistory> {
683 let history = sqlx::query_as::<_, AlertHistory>(
684 r#"INSERT INTO alert_history (
685 alert_id, action, performed_by, performed_at,
686 previous_state, new_state, notes
687 ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
688 )
689 .bind(alert_id)
690 .bind(action)
691 .bind(performed_by)
692 .bind(previous_state)
693 .bind(new_state)
694 .bind(notes)
695 .fetch_one(pool)
696 .await
697 .context("Failed to create alert history record")?;
698
699 Ok(history)
700}
701
702pub async fn get_recent_app_alerts(
719 pool: &Pool<MySql>,
720 app_id: i64,
721 limit: i64,
722 include_resolved: bool,
723) -> anyhow::Result<Vec<Alert>> {
724 let query = if include_resolved {
726 r#"
727 SELECT * FROM alerts
728 WHERE app_id = ?
729 ORDER BY timestamp DESC
730 LIMIT ?
731 "#
732 } else {
733 r#"
734 SELECT * FROM alerts
735 WHERE app_id = ? AND status IN ('active', 'acknowledged')
736 ORDER BY timestamp DESC
737 LIMIT ?
738 "#
739 };
740
741 let alerts = sqlx::query_as::<_, Alert>(query)
742 .bind(app_id)
743 .bind(limit)
744 .fetch_all(pool)
745 .await
746 .context("Failed to fetch app alerts")?;
747
748 Ok(alerts)
749}
750
751pub async fn get_org_active_alerts(
767 pool: &Pool<MySql>,
768 org_id: i64,
769 limit: i64,
770) -> anyhow::Result<Vec<Alert>> {
771 let alerts = sqlx::query_as::<_, Alert>(
772 r#"
773 SELECT * FROM alerts
774 WHERE org_id = ? AND status IN ('active', 'acknowledged')
775 ORDER BY
776 CASE
777 WHEN severity = 'critical' THEN 1
778 WHEN severity = 'warning' THEN 2
779 WHEN severity = 'info' THEN 3
780 ELSE 4
781 END,
782 timestamp DESC
783 LIMIT ?
784 "#
785 )
786 .bind(org_id)
787 .bind(limit)
788 .fetch_all(pool)
789 .await
790 .context("Failed to fetch org active alerts")?;
791
792 Ok(alerts)
793}
794
795pub async fn get_alert_stats(
811 pool: &Pool<MySql>,
812 org_id: i64,
813 days: i64,
814) -> anyhow::Result<JsonValue> {
815 let severity_counts = sqlx::query(
817 r#"
818 SELECT severity, COUNT(*) as count
819 FROM alerts
820 WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
821 GROUP BY severity
822 "#
823 )
824 .bind(org_id)
825 .bind(days)
826 .fetch_all(pool)
827 .await
828 .context("Failed to fetch severity counts")?;
829
830 let status_counts = sqlx::query(
832 r#"
833 SELECT status, COUNT(*) as count
834 FROM alerts
835 WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
836 GROUP BY status
837 "#
838 )
839 .bind(org_id)
840 .bind(days)
841 .fetch_all(pool)
842 .await
843 .context("Failed to fetch status counts")?;
844
845 let service_counts = sqlx::query(
847 r#"
848 SELECT service, COUNT(*) as count
849 FROM alerts
850 WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
851 GROUP BY service
852 "#
853 )
854 .bind(org_id)
855 .bind(days)
856 .fetch_all(pool)
857 .await
858 .context("Failed to fetch service counts")?;
859
860 let daily_trends = sqlx::query(
862 r#"
863 SELECT
864 DATE(timestamp) as date,
865 COUNT(*) as total,
866 SUM(CASE WHEN severity = 'critical' THEN 1 ELSE 0 END) as critical,
867 SUM(CASE WHEN severity = 'warning' THEN 1 ELSE 0 END) as warning,
868 SUM(CASE WHEN severity = 'info' THEN 1 ELSE 0 END) as info
869 FROM alerts
870 WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
871 GROUP BY DATE(timestamp)
872 ORDER BY DATE(timestamp)
873 "#
874 )
875 .bind(org_id)
876 .bind(days)
877 .fetch_all(pool)
878 .await
879 .context("Failed to fetch daily trend data")?;
880
881 let mut severity_json = serde_json::Map::new();
883 for row in severity_counts {
884 let severity: String = row.get("severity");
885 let count: i64 = row.get("count");
886 severity_json.insert(severity, serde_json::Value::Number(count.into()));
887 }
888
889 let mut status_json = serde_json::Map::new();
890 for row in status_counts {
891 let status: String = row.get("status");
892 let count: i64 = row.get("count");
893 status_json.insert(status, serde_json::Value::Number(count.into()));
894 }
895
896 let mut service_json = serde_json::Map::new();
897 for row in service_counts {
898 let service: String = row.get("service");
899 let count: i64 = row.get("count");
900 service_json.insert(service, serde_json::Value::Number(count.into()));
901 }
902
903 let mut daily_json = Vec::new();
904 for row in daily_trends {
905 let date: chrono::NaiveDate = row.get("date");
906 let total: i64 = row.get("total");
907 let critical: i64 = row.get("critical");
908 let warning: i64 = row.get("warning");
909 let info: i64 = row.get("info");
910
911 let day_data = serde_json::json!({
912 "date": date.format("%Y-%m-%d").to_string(),
913 "total": total,
914 "critical": critical,
915 "warning": warning,
916 "info": info
917 });
918
919 daily_json.push(day_data);
920 }
921
922 let stats = serde_json::json!({
924 "by_severity": severity_json,
925 "by_status": status_json,
926 "by_service": service_json,
927 "daily_trends": daily_json,
928 "period_days": days
929 });
930
931 Ok(stats)
932}
933
934pub async fn get_alerts_needing_escalation(
951 pool: &Pool<MySql>,
952 org_id: Option<i64>,
953 hours_threshold: i64,
954) -> anyhow::Result<Vec<Alert>> {
955 let mut query_string = String::from(
957 r#"
958 SELECT * FROM alerts
959 WHERE status = 'active'
960 AND timestamp <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? HOUR)
961 "#
962 );
963
964 if let Some(_) = org_id {
966 query_string.push_str(" AND org_id = ?");
967 }
968
969 query_string.push_str(
971 r#"
972 ORDER BY
973 CASE
974 WHEN severity = 'critical' THEN 1
975 WHEN severity = 'warning' THEN 2
976 ELSE 3
977 END,
978 timestamp ASC
979 "#
980 );
981
982 let mut query = sqlx::query_as::<_, Alert>(&query_string)
984 .bind(hours_threshold);
985
986 if let Some(id) = org_id {
987 query = query.bind(id);
988 }
989
990 let alerts = query
991 .fetch_all(pool)
992 .await
993 .context("Failed to fetch alerts needing escalation")?;
994
995 Ok(alerts)
996}
997
998pub async fn auto_resolve_old_alerts(
1015 pool: &Pool<MySql>,
1016 days_threshold: i64,
1017 severity_levels: Option<Vec<&str>>,
1018) -> anyhow::Result<i64> {
1019 let mut tx = pool.begin().await?;
1021
1022 let mut query_string = String::from(
1024 r#"
1025 UPDATE alerts
1026 SET status = 'auto_resolved',
1027 resolved_at = CURRENT_TIMESTAMP
1028 WHERE
1029 status IN ('active', 'acknowledged')
1030 AND timestamp <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
1031 "#
1032 );
1033
1034 if let Some(levels) = &severity_levels {
1036 if !levels.is_empty() {
1037 query_string.push_str(" AND severity IN (");
1038 query_string.push_str(&std::iter::repeat("?")
1039 .take(levels.len())
1040 .collect::<Vec<_>>()
1041 .join(", "));
1042 query_string.push_str(")");
1043 }
1044 }
1045
1046 let mut query = sqlx::query(&query_string)
1048 .bind(days_threshold);
1049
1050 if let Some(levels) = severity_levels {
1052 for level in levels {
1053 query = query.bind(level);
1054 }
1055 }
1056
1057 let result = query
1059 .execute(&mut *tx)
1060 .await
1061 .context("Failed to auto-resolve old alerts")?;
1062
1063 let affected_alerts = sqlx::query_as::<_, Alert>(
1065 r#"
1066 SELECT * FROM alerts
1067 WHERE
1068 status = 'auto_resolved'
1069 AND resolved_at >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 1 MINUTE)
1070 "#
1071 )
1072 .fetch_all(&mut *tx)
1073 .await
1074 .context("Failed to fetch auto-resolved alerts")?;
1075
1076 for alert in &affected_alerts {
1078 sqlx::query(
1079 r#"
1080 INSERT INTO alert_history (
1081 alert_id, action, performed_at, new_state, notes
1082 ) VALUES (?, 'auto_resolved', CURRENT_TIMESTAMP, ?, 'Alert auto-resolved due to age')
1083 "#
1084 )
1085 .bind(alert.id)
1086 .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
1087 .execute(&mut *tx)
1088 .await
1089 .context("Failed to create history record for auto-resolved alert")?;
1090 }
1091
1092 tx.commit().await?;
1094
1095 Ok(result.rows_affected() as i64)
1096}
1097
1098pub async fn search_alerts(
1117 pool: &Pool<MySql>,
1118 search_query: &str,
1119 org_id: Option<i64>,
1120 page: i64,
1121 per_page: i64,
1122) -> anyhow::Result<Vec<Alert>> {
1123 let pattern = format!("%{}%", search_query);
1125
1126 let mut query_string = String::from(
1128 r#"
1129 SELECT * FROM alerts
1130 WHERE (
1131 message LIKE ? OR
1132 alert_type LIKE ? OR
1133 service LIKE ?
1134 )
1135 "#
1136 );
1137
1138 if let Some(_) = org_id {
1140 query_string.push_str(" AND org_id = ?");
1141 }
1142
1143 query_string.push_str(
1145 r#"
1146 ORDER BY timestamp DESC
1147 LIMIT ? OFFSET ?
1148 "#
1149 );
1150
1151 let mut query = sqlx::query_as::<_, Alert>(&query_string)
1153 .bind(&pattern)
1154 .bind(&pattern)
1155 .bind(&pattern);
1156
1157 if let Some(id) = org_id {
1158 query = query.bind(id);
1159 }
1160
1161 query = query
1162 .bind(per_page)
1163 .bind(page * per_page);
1164
1165 let alerts = query
1166 .fetch_all(pool)
1167 .await
1168 .context("Failed to search alerts")?;
1169
1170 Ok(alerts)
1171}
1172
1173pub async fn count_search_alerts(
1189 pool: &Pool<MySql>,
1190 search_query: &str,
1191 org_id: Option<i64>,
1192) -> anyhow::Result<i64> {
1193 let pattern = format!("%{}%", search_query);
1195
1196 let mut query_string = String::from(
1198 r#"
1199 SELECT COUNT(*) FROM alerts
1200 WHERE (
1201 message LIKE ? OR
1202 alert_type LIKE ? OR
1203 service LIKE ?
1204 )
1205 "#
1206 );
1207
1208 if let Some(_) = org_id {
1210 query_string.push_str(" AND org_id = ?");
1211 }
1212
1213 let mut query = sqlx::query_scalar::<_, i64>(&query_string)
1215 .bind(&pattern)
1216 .bind(&pattern)
1217 .bind(&pattern);
1218
1219 if let Some(id) = org_id {
1220 query = query.bind(id);
1221 }
1222
1223 let count = query
1224 .fetch_one(pool)
1225 .await
1226 .context("Failed to count search alerts")?;
1227
1228 Ok(count)
1229}
1230
1231pub async fn bulk_update_alert_status(
1251 pool: &Pool<MySql>,
1252 ids: Option<Vec<i64>>,
1253 service: Option<&str>,
1254 app_id: Option<i64>,
1255 new_status: &str,
1256 user_id: i64,
1257 notes: Option<&str>,
1258) -> anyhow::Result<i64> {
1259 if ids.is_none() && service.is_none() && app_id.is_none() {
1261 return Err(anyhow::anyhow!("At least one filter (ids, service, or app_id) must be provided"));
1262 }
1263
1264 let mut tx = pool.begin().await?;
1266
1267 let mut select_query_string = String::from("SELECT * FROM alerts WHERE status IN ('active', 'acknowledged')");
1269
1270 if let Some(alert_ids) = &ids {
1271 if !alert_ids.is_empty() {
1272 select_query_string.push_str(" AND id IN (");
1273 select_query_string.push_str(&std::iter::repeat("?")
1274 .take(alert_ids.len())
1275 .collect::<Vec<_>>()
1276 .join(", "));
1277 select_query_string.push_str(")");
1278 }
1279 }
1280
1281 if let Some(_) = service {
1282 select_query_string.push_str(" AND service = ?");
1283 }
1284
1285 if let Some(_) = app_id {
1286 select_query_string.push_str(" AND app_id = ?");
1287 }
1288
1289 let mut select_query = sqlx::query_as::<_, Alert>(&select_query_string);
1291
1292 if let Some(alert_ids) = &ids {
1294 for id in alert_ids {
1295 select_query = select_query.bind(*id);
1296 }
1297 }
1298
1299 if let Some(s) = service {
1300 select_query = select_query.bind(s);
1301 }
1302
1303 if let Some(id) = app_id {
1304 select_query = select_query.bind(id);
1305 }
1306
1307 let affected_alerts = select_query
1309 .fetch_all(&mut *tx)
1310 .await
1311 .context("Failed to fetch alerts for bulk update")?;
1312
1313 if affected_alerts.is_empty() {
1315 return Ok(0);
1316 }
1317
1318 let mut update_query_string = String::from(
1320 "UPDATE alerts SET status = ?"
1321 );
1322
1323 if new_status == "resolved" || new_status == "auto_resolved" {
1325 update_query_string.push_str(", resolved_at = CURRENT_TIMESTAMP, resolved_by = ?");
1326 }
1327
1328 update_query_string.push_str(" WHERE status IN ('active', 'acknowledged')");
1330
1331 if let Some(alert_ids) = &ids {
1332 if !alert_ids.is_empty() {
1333 update_query_string.push_str(" AND id IN (");
1334 update_query_string.push_str(&std::iter::repeat("?")
1335 .take(alert_ids.len())
1336 .collect::<Vec<_>>()
1337 .join(", "));
1338 update_query_string.push_str(")");
1339 }
1340 }
1341
1342 if let Some(_) = service {
1343 update_query_string.push_str(" AND service = ?");
1344 }
1345
1346 if let Some(_) = app_id {
1347 update_query_string.push_str(" AND app_id = ?");
1348 }
1349
1350 let mut update_query = sqlx::query(&update_query_string)
1352 .bind(new_status);
1353
1354 if new_status == "resolved" || new_status == "auto_resolved" {
1356 update_query = update_query.bind(user_id);
1357 }
1358
1359 if let Some(alert_ids) = &ids {
1361 for id in alert_ids {
1362 update_query = update_query.bind(*id);
1363 }
1364 }
1365
1366 if let Some(s) = service {
1367 update_query = update_query.bind(s);
1368 }
1369
1370 if let Some(id) = app_id {
1371 update_query = update_query.bind(id);
1372 }
1373
1374 let update_result = update_query
1376 .execute(&mut *tx)
1377 .await
1378 .context("Failed to bulk update alert status")?;
1379
1380 for alert in &affected_alerts {
1382 let updated_alert = Alert {
1384 id: alert.id,
1385 alert_type: alert.alert_type.clone(),
1386 severity: alert.severity.clone(),
1387 service: alert.service.clone(),
1388 message: alert.message.clone(),
1389 timestamp: alert.timestamp,
1390 status: new_status.to_string(),
1391 metadata: alert.metadata.clone(),
1392 org_id: alert.org_id,
1393 app_id: alert.app_id,
1394 instance_id: alert.instance_id,
1395 region_id: alert.region_id,
1396 node_id: alert.node_id,
1397 resolved_at: if new_status == "resolved" || new_status == "auto_resolved" {
1398 Some(chrono::Utc::now())
1399 } else {
1400 alert.resolved_at
1401 },
1402 resolved_by: if new_status == "resolved" || new_status == "auto_resolved" {
1403 Some(user_id)
1404 } else {
1405 alert.resolved_by
1406 }
1407 };
1408
1409 sqlx::query(
1411 r#"
1412 INSERT INTO alert_history (
1413 alert_id, action, performed_by, performed_at,
1414 previous_state, new_state, notes
1415 ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)
1416 "#
1417 )
1418 .bind(alert.id)
1419 .bind(format!("bulk_status_change_to_{}", new_status))
1420 .bind(user_id)
1421 .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
1422 .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
1423 .bind(notes)
1424 .execute(&mut *tx)
1425 .await
1426 .context("Failed to create history record for bulk update")?;
1427
1428 if new_status == "acknowledged" {
1430 sqlx::query(
1431 r#"
1432 INSERT INTO alert_acknowledgments (
1433 alert_id, user_id, acknowledged_at, notes
1434 ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)
1435 "#
1436 )
1437 .bind(alert.id)
1438 .bind(user_id)
1439 .bind(notes)
1440 .execute(&mut *tx)
1441 .await
1442 .context("Failed to create acknowledgment record for bulk update")?;
1443 }
1444 }
1445
1446 tx.commit().await?;
1448
1449 Ok(update_result.rows_affected() as i64)
1450}