1use anyhow::Context;
2use serde::Serialize;
3use sqlx::{MySql, Pool, Row};
4use chrono::{DateTime, Utc};
5use serde_json::Value as JsonValue;
6
7use libomni::types::db::v1 as types;
8use types::alert::{
9 Alert, AlertWithAcknowledgments, AlertAcknowledgment,
10 AlertEscalation, AlertHistory, AlertWithRelatedData
11};
12
13pub async fn list_alerts(
39 pool: &Pool<MySql>,
40 page: i64,
41 per_page: i64,
42 status: Option<&str>,
43 severity: Option<&str>,
44 org_id: Option<i64>,
45 app_id: Option<i64>,
46 service: Option<&str>,
47 from_date: Option<DateTime<Utc>>,
48 to_date: Option<DateTime<Utc>>,
49) -> anyhow::Result<Vec<Alert>> {
50 println!("Attempting to fetch alerts from database with filtering...");
51
52 let mut query_string = String::from(
54 "SELECT * FROM alerts WHERE 1=1"
55 );
56
57 if let Some(s) = status {
59 query_string.push_str(" AND status = ?");
60 }
61 if let Some(s) = severity {
62 query_string.push_str(" AND severity = ?");
63 }
64 if let Some(_) = org_id {
65 query_string.push_str(" AND org_id = ?");
66 }
67 if let Some(_) = app_id {
68 query_string.push_str(" AND app_id = ?");
69 }
70 if let Some(s) = service {
71 query_string.push_str(" AND service = ?");
72 }
73 if let Some(_) = from_date {
74 query_string.push_str(" AND timestamp >= ?");
75 }
76 if let Some(_) = to_date {
77 query_string.push_str(" AND timestamp <= ?");
78 }
79
80 query_string.push_str(" ORDER BY timestamp DESC LIMIT ? OFFSET ?");
82
83 let mut query = sqlx::query_as::<_, Alert>(&query_string);
85
86 if let Some(s) = status {
88 query = query.bind(s);
89 }
90 if let Some(s) = severity {
91 query = query.bind(s);
92 }
93 if let Some(id) = org_id {
94 query = query.bind(id);
95 }
96 if let Some(id) = app_id {
97 query = query.bind(id);
98 }
99 if let Some(s) = service {
100 query = query.bind(s);
101 }
102 if let Some(date) = from_date {
103 query = query.bind(date);
104 }
105 if let Some(date) = to_date {
106 query = query.bind(date);
107 }
108
109 query = query.bind(per_page).bind(page * per_page);
111
112 let result = query.fetch_all(pool).await;
114
115 match result {
116 Ok(alerts) => {
117 println!("Successfully fetched {} alerts", alerts.len());
118 Ok(alerts)
119 }
120 Err(e) => {
121 eprintln!("Error fetching alerts: {:#?}", e);
122 Err(anyhow::Error::new(e).context("Failed to fetch alerts"))
123 }
124 }
125}
126
127pub async fn get_alert_with_related_data(
142 pool: &Pool<MySql>,
143 id: i64,
144) -> anyhow::Result<AlertWithRelatedData> {
145 let alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
147 .bind(id)
148 .fetch_one(pool)
149 .await
150 .context("Failed to fetch alert")?;
151
152 let acknowledgments = sqlx::query_as::<_, AlertAcknowledgment>(
154 "SELECT * FROM alert_acknowledgments WHERE alert_id = ? ORDER BY acknowledged_at DESC"
155 )
156 .bind(id)
157 .fetch_all(pool)
158 .await
159 .context("Failed to fetch alert acknowledgments")?;
160
161 let escalations = sqlx::query_as::<_, AlertEscalation>(
163 "SELECT * FROM alert_escalations WHERE alert_id = ? ORDER BY escalated_at DESC"
164 )
165 .bind(id)
166 .fetch_all(pool)
167 .await
168 .context("Failed to fetch alert escalations")?;
169
170 let history = sqlx::query_as::<_, AlertHistory>(
172 "SELECT * FROM alert_history WHERE alert_id = ? ORDER BY performed_at DESC"
173 )
174 .bind(id)
175 .fetch_all(pool)
176 .await
177 .context("Failed to fetch alert history")?;
178
179 Ok(AlertWithRelatedData {
180 alert,
181 acknowledgments,
182 escalations,
183 history,
184 })
185}
186
187pub async fn count_alerts(
205 pool: &Pool<MySql>,
206 status: Option<&str>,
207 severity: Option<&str>,
208 org_id: Option<i64>,
209 app_id: Option<i64>,
210) -> anyhow::Result<i64> {
211 let mut query_string = String::from("SELECT COUNT(*) FROM alerts WHERE 1=1");
213
214 if let Some(_) = status {
216 query_string.push_str(" AND status = ?");
217 }
218 if let Some(_) = severity {
219 query_string.push_str(" AND severity = ?");
220 }
221 if let Some(_) = org_id {
222 query_string.push_str(" AND org_id = ?");
223 }
224 if let Some(_) = app_id {
225 query_string.push_str(" AND app_id = ?");
226 }
227
228 let mut query = sqlx::query_scalar::<_, i64>(&query_string);
230
231 if let Some(s) = status {
233 query = query.bind(s);
234 }
235 if let Some(s) = severity {
236 query = query.bind(s);
237 }
238 if let Some(id) = org_id {
239 query = query.bind(id);
240 }
241 if let Some(id) = app_id {
242 query = query.bind(id);
243 }
244
245 let count = query
247 .fetch_one(pool)
248 .await
249 .context("Failed to count alerts")?;
250
251 Ok(count)
252}
253
254pub async fn create_alert(
278 pool: &Pool<MySql>,
279 alert_type: &str,
280 severity: &str,
281 service: &str,
282 message: &str,
283 metadata: Option<JsonValue>,
284 org_id: Option<i64>,
285 app_id: Option<i64>,
286 instance_id: Option<i64>,
287 region_id: Option<i64>,
288 node_id: Option<i64>,
289) -> anyhow::Result<Alert> {
290 let mut tx = pool.begin().await?;
292
293 let alert = sqlx::query_as::<_, Alert>(
295 r#"INSERT INTO alerts (
296 alert_type, severity, service, message, timestamp, status,
297 metadata, org_id, app_id, instance_id, region_id, node_id
298 ) VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP, 'active', ?, ?, ?, ?, ?, ?)"#,
299 )
300 .bind(alert_type)
301 .bind(severity)
302 .bind(service)
303 .bind(message)
304 .bind(metadata)
305 .bind(org_id)
306 .bind(app_id)
307 .bind(instance_id)
308 .bind(region_id)
309 .bind(node_id)
310 .fetch_one(&mut *tx)
311 .await
312 .context("Failed to create alert")?;
313
314 sqlx::query(
316 r#"INSERT INTO alert_history (
317 alert_id, action, performed_at, previous_state, new_state
318 ) VALUES (?, 'created', CURRENT_TIMESTAMP, NULL, ?)"#,
319 )
320 .bind(alert.id)
321 .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
322 .execute(&mut *tx)
323 .await
324 .context("Failed to create alert history record")?;
325
326 tx.commit().await?;
328
329 Ok(alert)
331}
332
333pub async fn update_alert_status(
352 pool: &Pool<MySql>,
353 id: i64,
354 new_status: &str,
355 user_id: Option<i64>,
356 notes: Option<&str>,
357) -> anyhow::Result<Alert> {
358 let mut tx = pool.begin().await?;
360
361 let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
363 .bind(id)
364 .fetch_one(&mut *tx)
365 .await
366 .context("Failed to fetch current alert state")?;
367
368 let (query, resolved_at, resolved_by) = if new_status == "resolved" || new_status == "auto_resolved" {
370 (
371 "UPDATE alerts SET status = ?, resolved_at = CURRENT_TIMESTAMP, resolved_by = ? WHERE id = ?",
372 Some(chrono::Utc::now()),
373 user_id,
374 )
375 } else {
376 (
377 "UPDATE alerts SET status = ? WHERE id = ?",
378 None,
379 None,
380 )
381 };
382
383 let updated_alert = if new_status == "resolved" || new_status == "auto_resolved" {
385 sqlx::query_as::<_, Alert>(query)
386 .bind(new_status)
387 .bind(user_id)
388 .bind(id)
389 .fetch_one(&mut *tx)
390 .await
391 .context("Failed to update alert status")?
392 } else {
393 sqlx::query_as::<_, Alert>(query)
394 .bind(new_status)
395 .bind(id)
396 .fetch_one(&mut *tx)
397 .await
398 .context("Failed to update alert status")?
399 };
400
401 sqlx::query(
403 r#"INSERT INTO alert_history (
404 alert_id, action, performed_by, performed_at,
405 previous_state, new_state, notes
406 ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
407 )
408 .bind(id)
409 .bind(format!("status_change_to_{}", new_status))
410 .bind(user_id)
411 .bind(serde_json::to_value(¤t_alert).unwrap_or(serde_json::Value::Null))
412 .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
413 .bind(notes)
414 .execute(&mut *tx)
415 .await
416 .context("Failed to create alert history record for status update")?;
417
418 if new_status == "acknowledged" && user_id.is_some() {
420 sqlx::query(
421 r#"INSERT INTO alert_acknowledgments (
422 alert_id, user_id, acknowledged_at, notes
423 ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
424 )
425 .bind(id)
426 .bind(user_id.unwrap())
427 .bind(notes)
428 .execute(&mut *tx)
429 .await
430 .context("Failed to create alert acknowledgment record")?;
431 }
432
433 tx.commit().await?;
435
436 Ok(updated_alert)
437}
438
439pub async fn acknowledge_alert(
457 pool: &Pool<MySql>,
458 alert_id: i64,
459 user_id: i64,
460 notes: Option<&str>,
461 update_status: bool,
462) -> anyhow::Result<AlertAcknowledgment> {
463 let mut tx = pool.begin().await?;
465
466 let acknowledgment = sqlx::query_as::<_, AlertAcknowledgment>(
468 r#"INSERT INTO alert_acknowledgments (
469 alert_id, user_id, acknowledged_at, notes
470 ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
471 )
472 .bind(alert_id)
473 .bind(user_id)
474 .bind(notes)
475 .fetch_one(&mut *tx)
476 .await
477 .context("Failed to create alert acknowledgment")?;
478
479 if update_status {
481 let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
483 .bind(alert_id)
484 .fetch_one(&mut *tx)
485 .await
486 .context("Failed to fetch current alert state")?;
487
488 if current_alert.status == "active" {
490 let updated_alert = sqlx::query_as::<_, Alert>(
492 "UPDATE alerts SET status = 'acknowledged' WHERE id = ?"
493 )
494 .bind(alert_id)
495 .fetch_one(&mut *tx)
496 .await
497 .context("Failed to update alert status to acknowledged")?;
498
499 sqlx::query(
501 r#"INSERT INTO alert_history (
502 alert_id, action, performed_by, performed_at,
503 previous_state, new_state, notes
504 ) VALUES (?, 'status_change_to_acknowledged', ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
505 )
506 .bind(alert_id)
507 .bind(user_id)
508 .bind(serde_json::to_value(¤t_alert).unwrap_or(serde_json::Value::Null))
509 .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
510 .bind(notes)
511 .execute(&mut *tx)
512 .await
513 .context("Failed to create alert history record for acknowledgment")?;
514 }
515 }
516
517 tx.commit().await?;
519
520 Ok(acknowledgment)
521}
522
523pub async fn resolve_alert(
540 pool: &Pool<MySql>,
541 id: i64,
542 user_id: i64,
543 notes: Option<&str>,
544) -> anyhow::Result<Alert> {
545 let mut tx = pool.begin().await?;
547
548 let current_alert = sqlx::query_as::<_, Alert>("SELECT * FROM alerts WHERE id = ?")
550 .bind(id)
551 .fetch_one(&mut *tx)
552 .await
553 .context("Failed to fetch current alert state")?;
554
555 let updated_alert = sqlx::query_as::<_, Alert>(
557 r#"UPDATE alerts
558 SET status = 'resolved',
559 resolved_at = CURRENT_TIMESTAMP,
560 resolved_by = ?
561 WHERE id = ?"#,
562 )
563 .bind(user_id)
564 .bind(id)
565 .fetch_one(&mut *tx)
566 .await
567 .context("Failed to resolve alert")?;
568
569 sqlx::query(
571 r#"INSERT INTO alert_history (
572 alert_id, action, performed_by, performed_at,
573 previous_state, new_state, notes
574 ) VALUES (?, 'resolved', ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
575 )
576 .bind(id)
577 .bind(user_id)
578 .bind(serde_json::to_value(¤t_alert).unwrap_or(serde_json::Value::Null))
579 .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
580 .bind(notes)
581 .execute(&mut *tx)
582 .await
583 .context("Failed to create alert history record for resolution")?;
584
585 tx.commit().await?;
587
588 Ok(updated_alert)
589}
590
591pub async fn create_alert_escalation(
611 pool: &Pool<MySql>,
612 alert_id: i64,
613 escalation_level: i64,
614 escalated_to: JsonValue,
615 escalation_method: &str,
616 response_required_by: Option<DateTime<Utc>>,
617) -> anyhow::Result<AlertEscalation> {
618 let mut tx = pool.begin().await?;
620
621 let escalation = sqlx::query_as::<_, AlertEscalation>(
623 r#"INSERT INTO alert_escalations (
624 alert_id, escalation_level, escalated_at,
625 escalated_to, escalation_method, response_required_by
626 ) VALUES (?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
627 )
628 .bind(alert_id)
629 .bind(escalation_level)
630 .bind(escalated_to)
631 .bind(escalation_method)
632 .bind(response_required_by)
633 .fetch_one(&mut *tx)
634 .await
635 .context("Failed to create alert escalation")?;
636
637 sqlx::query(
639 r#"INSERT INTO alert_history (
640 alert_id, action, performed_at, new_state
641 ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)"#,
642 )
643 .bind(alert_id)
644 .bind(format!("escalated_level_{}", escalation_level))
645 .bind(serde_json::to_value(&escalation).unwrap_or(serde_json::Value::Null))
646 .execute(&mut *tx)
647 .await
648 .context("Failed to create alert history record for escalation")?;
649
650 tx.commit().await?;
652
653 Ok(escalation)
654}
655
656pub async fn add_alert_history(
677 pool: &Pool<MySql>,
678 alert_id: i64,
679 action: &str,
680 performed_by: Option<i64>,
681 notes: Option<&str>,
682 previous_state: Option<JsonValue>,
683 new_state: Option<JsonValue>,
684) -> anyhow::Result<AlertHistory> {
685 let history = sqlx::query_as::<_, AlertHistory>(
686 r#"INSERT INTO alert_history (
687 alert_id, action, performed_by, performed_at,
688 previous_state, new_state, notes
689 ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)"#,
690 )
691 .bind(alert_id)
692 .bind(action)
693 .bind(performed_by)
694 .bind(previous_state)
695 .bind(new_state)
696 .bind(notes)
697 .fetch_one(pool)
698 .await
699 .context("Failed to create alert history record")?;
700
701 Ok(history)
702}
703
704pub async fn get_recent_app_alerts(
721 pool: &Pool<MySql>,
722 app_id: i64,
723 limit: i64,
724 include_resolved: bool,
725) -> anyhow::Result<Vec<Alert>> {
726 let query = if include_resolved {
728 r#"
729 SELECT * FROM alerts
730 WHERE app_id = ?
731 ORDER BY timestamp DESC
732 LIMIT ?
733 "#
734 } else {
735 r#"
736 SELECT * FROM alerts
737 WHERE app_id = ? AND status IN ('active', 'acknowledged')
738 ORDER BY timestamp DESC
739 LIMIT ?
740 "#
741 };
742
743 let alerts = sqlx::query_as::<_, Alert>(query)
744 .bind(app_id)
745 .bind(limit)
746 .fetch_all(pool)
747 .await
748 .context("Failed to fetch app alerts")?;
749
750 Ok(alerts)
751}
752
753pub async fn get_org_active_alerts(
769 pool: &Pool<MySql>,
770 org_id: i64,
771 limit: i64,
772) -> anyhow::Result<Vec<Alert>> {
773 let alerts = sqlx::query_as::<_, Alert>(
774 r#"
775 SELECT * FROM alerts
776 WHERE org_id = ? AND status IN ('active', 'acknowledged')
777 ORDER BY
778 CASE
779 WHEN severity = 'critical' THEN 1
780 WHEN severity = 'warning' THEN 2
781 WHEN severity = 'info' THEN 3
782 ELSE 4
783 END,
784 timestamp DESC
785 LIMIT ?
786 "#
787 )
788 .bind(org_id)
789 .bind(limit)
790 .fetch_all(pool)
791 .await
792 .context("Failed to fetch org active alerts")?;
793
794 Ok(alerts)
795}
796
797pub async fn get_alert_stats(
813 pool: &Pool<MySql>,
814 org_id: i64,
815 days: i64,
816) -> anyhow::Result<JsonValue> {
817 let severity_counts = sqlx::query(
819 r#"
820 SELECT severity, COUNT(*) as count
821 FROM alerts
822 WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
823 GROUP BY severity
824 "#
825 )
826 .bind(org_id)
827 .bind(days)
828 .fetch_all(pool)
829 .await
830 .context("Failed to fetch severity counts")?;
831
832 let status_counts = sqlx::query(
834 r#"
835 SELECT status, COUNT(*) as count
836 FROM alerts
837 WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
838 GROUP BY status
839 "#
840 )
841 .bind(org_id)
842 .bind(days)
843 .fetch_all(pool)
844 .await
845 .context("Failed to fetch status counts")?;
846
847 let service_counts = sqlx::query(
849 r#"
850 SELECT service, COUNT(*) as count
851 FROM alerts
852 WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
853 GROUP BY service
854 "#
855 )
856 .bind(org_id)
857 .bind(days)
858 .fetch_all(pool)
859 .await
860 .context("Failed to fetch service counts")?;
861
862 let daily_trends = sqlx::query(
864 r#"
865 SELECT
866 DATE(timestamp) as date,
867 COUNT(*) as total,
868 SUM(CASE WHEN severity = 'critical' THEN 1 ELSE 0 END) as critical,
869 SUM(CASE WHEN severity = 'warning' THEN 1 ELSE 0 END) as warning,
870 SUM(CASE WHEN severity = 'info' THEN 1 ELSE 0 END) as info
871 FROM alerts
872 WHERE org_id = ? AND timestamp >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
873 GROUP BY DATE(timestamp)
874 ORDER BY DATE(timestamp)
875 "#
876 )
877 .bind(org_id)
878 .bind(days)
879 .fetch_all(pool)
880 .await
881 .context("Failed to fetch daily trend data")?;
882
883 let mut severity_json = serde_json::Map::new();
885 for row in severity_counts {
886 let severity: String = row.get("severity");
887 let count: i64 = row.get("count");
888 severity_json.insert(severity, serde_json::Value::Number(count.into()));
889 }
890
891 let mut status_json = serde_json::Map::new();
892 for row in status_counts {
893 let status: String = row.get("status");
894 let count: i64 = row.get("count");
895 status_json.insert(status, serde_json::Value::Number(count.into()));
896 }
897
898 let mut service_json = serde_json::Map::new();
899 for row in service_counts {
900 let service: String = row.get("service");
901 let count: i64 = row.get("count");
902 service_json.insert(service, serde_json::Value::Number(count.into()));
903 }
904
905 let mut daily_json = Vec::new();
906 for row in daily_trends {
907 let date: chrono::NaiveDate = row.get("date");
908 let total: i64 = row.get("total");
909 let critical: i64 = row.get("critical");
910 let warning: i64 = row.get("warning");
911 let info: i64 = row.get("info");
912
913 let day_data = serde_json::json!({
914 "date": date.format("%Y-%m-%d").to_string(),
915 "total": total,
916 "critical": critical,
917 "warning": warning,
918 "info": info
919 });
920
921 daily_json.push(day_data);
922 }
923
924 let stats = serde_json::json!({
926 "by_severity": severity_json,
927 "by_status": status_json,
928 "by_service": service_json,
929 "daily_trends": daily_json,
930 "period_days": days
931 });
932
933 Ok(stats)
934}
935
936pub async fn get_alerts_needing_escalation(
953 pool: &Pool<MySql>,
954 org_id: Option<i64>,
955 hours_threshold: i64,
956) -> anyhow::Result<Vec<Alert>> {
957 let mut query_string = String::from(
959 r#"
960 SELECT * FROM alerts
961 WHERE status = 'active'
962 AND timestamp <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? HOUR)
963 "#
964 );
965
966 if let Some(_) = org_id {
968 query_string.push_str(" AND org_id = ?");
969 }
970
971 query_string.push_str(
973 r#"
974 ORDER BY
975 CASE
976 WHEN severity = 'critical' THEN 1
977 WHEN severity = 'warning' THEN 2
978 ELSE 3
979 END,
980 timestamp ASC
981 "#
982 );
983
984 let mut query = sqlx::query_as::<_, Alert>(&query_string)
986 .bind(hours_threshold);
987
988 if let Some(id) = org_id {
989 query = query.bind(id);
990 }
991
992 let alerts = query
993 .fetch_all(pool)
994 .await
995 .context("Failed to fetch alerts needing escalation")?;
996
997 Ok(alerts)
998}
999
1000pub async fn auto_resolve_old_alerts(
1017 pool: &Pool<MySql>,
1018 days_threshold: i64,
1019 severity_levels: Option<Vec<&str>>,
1020) -> anyhow::Result<i64> {
1021 let mut tx = pool.begin().await?;
1023
1024 let mut query_string = String::from(
1026 r#"
1027 UPDATE alerts
1028 SET status = 'auto_resolved',
1029 resolved_at = CURRENT_TIMESTAMP
1030 WHERE
1031 status IN ('active', 'acknowledged')
1032 AND timestamp <= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL ? DAY)
1033 "#
1034 );
1035
1036 if let Some(levels) = &severity_levels {
1038 if !levels.is_empty() {
1039 query_string.push_str(" AND severity IN (");
1040 query_string.push_str(&std::iter::repeat("?")
1041 .take(levels.len())
1042 .collect::<Vec<_>>()
1043 .join(", "));
1044 query_string.push_str(")");
1045 }
1046 }
1047
1048 let mut query = sqlx::query(&query_string)
1050 .bind(days_threshold);
1051
1052 if let Some(levels) = severity_levels {
1054 for level in levels {
1055 query = query.bind(level);
1056 }
1057 }
1058
1059 let result = query
1061 .execute(&mut *tx)
1062 .await
1063 .context("Failed to auto-resolve old alerts")?;
1064
1065 let affected_alerts = sqlx::query_as::<_, Alert>(
1067 r#"
1068 SELECT * FROM alerts
1069 WHERE
1070 status = 'auto_resolved'
1071 AND resolved_at >= DATE_SUB(CURRENT_TIMESTAMP, INTERVAL 1 MINUTE)
1072 "#
1073 )
1074 .fetch_all(&mut *tx)
1075 .await
1076 .context("Failed to fetch auto-resolved alerts")?;
1077
1078 for alert in &affected_alerts {
1080 sqlx::query(
1081 r#"
1082 INSERT INTO alert_history (
1083 alert_id, action, performed_at, new_state, notes
1084 ) VALUES (?, 'auto_resolved', CURRENT_TIMESTAMP, ?, 'Alert auto-resolved due to age')
1085 "#
1086 )
1087 .bind(alert.id)
1088 .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
1089 .execute(&mut *tx)
1090 .await
1091 .context("Failed to create history record for auto-resolved alert")?;
1092 }
1093
1094 tx.commit().await?;
1096
1097 Ok(result.rows_affected() as i64)
1098}
1099
1100pub async fn search_alerts(
1119 pool: &Pool<MySql>,
1120 search_query: &str,
1121 org_id: Option<i64>,
1122 page: i64,
1123 per_page: i64,
1124) -> anyhow::Result<Vec<Alert>> {
1125 let pattern = format!("%{}%", search_query);
1127
1128 let mut query_string = String::from(
1130 r#"
1131 SELECT * FROM alerts
1132 WHERE (
1133 message LIKE ? OR
1134 alert_type LIKE ? OR
1135 service LIKE ?
1136 )
1137 "#
1138 );
1139
1140 if let Some(_) = org_id {
1142 query_string.push_str(" AND org_id = ?");
1143 }
1144
1145 query_string.push_str(
1147 r#"
1148 ORDER BY timestamp DESC
1149 LIMIT ? OFFSET ?
1150 "#
1151 );
1152
1153 let mut query = sqlx::query_as::<_, Alert>(&query_string)
1155 .bind(&pattern)
1156 .bind(&pattern)
1157 .bind(&pattern);
1158
1159 if let Some(id) = org_id {
1160 query = query.bind(id);
1161 }
1162
1163 query = query
1164 .bind(per_page)
1165 .bind(page * per_page);
1166
1167 let alerts = query
1168 .fetch_all(pool)
1169 .await
1170 .context("Failed to search alerts")?;
1171
1172 Ok(alerts)
1173}
1174
1175pub async fn count_search_alerts(
1191 pool: &Pool<MySql>,
1192 search_query: &str,
1193 org_id: Option<i64>,
1194) -> anyhow::Result<i64> {
1195 let pattern = format!("%{}%", search_query);
1197
1198 let mut query_string = String::from(
1200 r#"
1201 SELECT COUNT(*) FROM alerts
1202 WHERE (
1203 message LIKE ? OR
1204 alert_type LIKE ? OR
1205 service LIKE ?
1206 )
1207 "#
1208 );
1209
1210 if let Some(_) = org_id {
1212 query_string.push_str(" AND org_id = ?");
1213 }
1214
1215 let mut query = sqlx::query_scalar::<_, i64>(&query_string)
1217 .bind(&pattern)
1218 .bind(&pattern)
1219 .bind(&pattern);
1220
1221 if let Some(id) = org_id {
1222 query = query.bind(id);
1223 }
1224
1225 let count = query
1226 .fetch_one(pool)
1227 .await
1228 .context("Failed to count search alerts")?;
1229
1230 Ok(count)
1231}
1232
1233pub async fn bulk_update_alert_status(
1253 pool: &Pool<MySql>,
1254 ids: Option<Vec<i64>>,
1255 service: Option<&str>,
1256 app_id: Option<i64>,
1257 new_status: &str,
1258 user_id: i64,
1259 notes: Option<&str>,
1260) -> anyhow::Result<i64> {
1261 if ids.is_none() && service.is_none() && app_id.is_none() {
1263 return Err(anyhow::anyhow!("At least one filter (ids, service, or app_id) must be provided"));
1264 }
1265
1266 let mut tx = pool.begin().await?;
1268
1269 let mut select_query_string = String::from("SELECT * FROM alerts WHERE status IN ('active', 'acknowledged')");
1271
1272 if let Some(alert_ids) = &ids {
1273 if !alert_ids.is_empty() {
1274 select_query_string.push_str(" AND id IN (");
1275 select_query_string.push_str(&std::iter::repeat("?")
1276 .take(alert_ids.len())
1277 .collect::<Vec<_>>()
1278 .join(", "));
1279 select_query_string.push_str(")");
1280 }
1281 }
1282
1283 if let Some(_) = service {
1284 select_query_string.push_str(" AND service = ?");
1285 }
1286
1287 if let Some(_) = app_id {
1288 select_query_string.push_str(" AND app_id = ?");
1289 }
1290
1291 let mut select_query = sqlx::query_as::<_, Alert>(&select_query_string);
1293
1294 if let Some(alert_ids) = &ids {
1296 for id in alert_ids {
1297 select_query = select_query.bind(*id);
1298 }
1299 }
1300
1301 if let Some(s) = service {
1302 select_query = select_query.bind(s);
1303 }
1304
1305 if let Some(id) = app_id {
1306 select_query = select_query.bind(id);
1307 }
1308
1309 let affected_alerts = select_query
1311 .fetch_all(&mut *tx)
1312 .await
1313 .context("Failed to fetch alerts for bulk update")?;
1314
1315 if affected_alerts.is_empty() {
1317 return Ok(0);
1318 }
1319
1320 let mut update_query_string = String::from(
1322 "UPDATE alerts SET status = ?"
1323 );
1324
1325 if new_status == "resolved" || new_status == "auto_resolved" {
1327 update_query_string.push_str(", resolved_at = CURRENT_TIMESTAMP, resolved_by = ?");
1328 }
1329
1330 update_query_string.push_str(" WHERE status IN ('active', 'acknowledged')");
1332
1333 if let Some(alert_ids) = &ids {
1334 if !alert_ids.is_empty() {
1335 update_query_string.push_str(" AND id IN (");
1336 update_query_string.push_str(&std::iter::repeat("?")
1337 .take(alert_ids.len())
1338 .collect::<Vec<_>>()
1339 .join(", "));
1340 update_query_string.push_str(")");
1341 }
1342 }
1343
1344 if let Some(_) = service {
1345 update_query_string.push_str(" AND service = ?");
1346 }
1347
1348 if let Some(_) = app_id {
1349 update_query_string.push_str(" AND app_id = ?");
1350 }
1351
1352 let mut update_query = sqlx::query(&update_query_string)
1354 .bind(new_status);
1355
1356 if new_status == "resolved" || new_status == "auto_resolved" {
1358 update_query = update_query.bind(user_id);
1359 }
1360
1361 if let Some(alert_ids) = &ids {
1363 for id in alert_ids {
1364 update_query = update_query.bind(*id);
1365 }
1366 }
1367
1368 if let Some(s) = service {
1369 update_query = update_query.bind(s);
1370 }
1371
1372 if let Some(id) = app_id {
1373 update_query = update_query.bind(id);
1374 }
1375
1376 let update_result = update_query
1378 .execute(&mut *tx)
1379 .await
1380 .context("Failed to bulk update alert status")?;
1381
1382 for alert in &affected_alerts {
1384 let updated_alert = Alert {
1386 id: alert.id,
1387 alert_type: alert.alert_type.clone(),
1388 severity: alert.severity.clone(),
1389 service: alert.service.clone(),
1390 message: alert.message.clone(),
1391 timestamp: alert.timestamp,
1392 status: new_status.to_string(),
1393 metadata: alert.metadata.clone(),
1394 org_id: alert.org_id,
1395 app_id: alert.app_id,
1396 instance_id: alert.instance_id,
1397 region_id: alert.region_id,
1398 node_id: alert.node_id,
1399 resolved_at: if new_status == "resolved" || new_status == "auto_resolved" {
1400 Some(chrono::Utc::now())
1401 } else {
1402 alert.resolved_at
1403 },
1404 resolved_by: if new_status == "resolved" || new_status == "auto_resolved" {
1405 Some(user_id)
1406 } else {
1407 alert.resolved_by
1408 }
1409 };
1410
1411 sqlx::query(
1413 r#"
1414 INSERT INTO alert_history (
1415 alert_id, action, performed_by, performed_at,
1416 previous_state, new_state, notes
1417 ) VALUES (?, ?, ?, CURRENT_TIMESTAMP, ?, ?, ?)
1418 "#
1419 )
1420 .bind(alert.id)
1421 .bind(format!("bulk_status_change_to_{}", new_status))
1422 .bind(user_id)
1423 .bind(serde_json::to_value(&alert).unwrap_or(serde_json::Value::Null))
1424 .bind(serde_json::to_value(&updated_alert).unwrap_or(serde_json::Value::Null))
1425 .bind(notes)
1426 .execute(&mut *tx)
1427 .await
1428 .context("Failed to create history record for bulk update")?;
1429
1430 if new_status == "acknowledged" {
1432 sqlx::query(
1433 r#"
1434 INSERT INTO alert_acknowledgments (
1435 alert_id, user_id, acknowledged_at, notes
1436 ) VALUES (?, ?, CURRENT_TIMESTAMP, ?)
1437 "#
1438 )
1439 .bind(alert.id)
1440 .bind(user_id)
1441 .bind(notes)
1442 .execute(&mut *tx)
1443 .await
1444 .context("Failed to create acknowledgment record for bulk update")?;
1445 }
1446 }
1447
1448 tx.commit().await?;
1450
1451 Ok(update_result.rows_affected() as i64)
1452}