omni_orchestrator/schemas/v1/db/queries/notification.rs
1use anyhow::Context;
2use serde::Serialize;
3use sqlx::{MySql, Pool};
4
5use libomni::types::db::v1 as types;
6use types::notification::{
7 Notification, NotificationWithCount, UserNotification, RoleNotification,
8 NotificationAcknowledgment, UserNotificationWithRoleNotifications
9};
10
11// =================== User Notifications ===================
12
13/// Retrieves a paginated list of notifications for a specific user.
14///
15/// This function fetches a subset of notifications for a given user based on pagination
16/// parameters, ordering them by creation date in descending order (newest first).
17/// This helps manage large notification lists by retrieving only a specific "page" of results.
18///
19/// # Arguments
20///
21/// * `pool` - Database connection pool for executing the query
22/// * `user_id` - ID of the user whose notifications to retrieve
23/// * `page` - Zero-based page number (e.g., 0 for first page, 1 for second page)
24/// * `per_page` - Number of records to fetch per page
25/// * `include_read` - Whether to include notifications that have been marked as read
26///
27/// # Returns
28///
29/// * `Ok(Vec<UserNotification>)` - Successfully retrieved list of notifications
30/// * `Err(anyhow::Error)` - Failed to fetch notifications, with context
31pub async fn list_user_notifications(
32 pool: &Pool<MySql>,
33 user_id: i64,
34 page: i64,
35 per_page: i64,
36 include_read: bool,
37) -> anyhow::Result<Vec<UserNotification>> {
38 println!("Attempting to fetch user notifications from database...");
39
40 // Build query based on whether to include read notifications
41 let query = if include_read {
42 r#"
43 SELECT * FROM user_notifications
44 WHERE user_id = ?
45 ORDER BY created_at DESC
46 LIMIT ? OFFSET ?
47 "#
48 } else {
49 r#"
50 SELECT * FROM user_notifications
51 WHERE user_id = ? AND read_status = FALSE
52 ORDER BY created_at DESC
53 LIMIT ? OFFSET ?
54 "#
55 };
56
57 let result = sqlx::query_as::<_, UserNotification>(query)
58 .bind(user_id)
59 .bind(per_page)
60 .bind(page * per_page)
61 .fetch_all(pool)
62 .await;
63
64 match result {
65 Ok(notifications) => {
66 println!(
67 "Successfully fetched {} user notifications",
68 notifications.len()
69 );
70 Ok(notifications)
71 }
72 Err(e) => {
73 eprintln!("Error fetching user notifications: {:#?}", e);
74 Err(anyhow::Error::new(e).context("Failed to fetch user notifications"))
75 }
76 }
77}
78
79/// Counts unread notifications for a user.
80///
81/// This function retrieves the count of unread notifications for a specific user,
82/// which is useful for displaying notification badges or alerts.
83///
84/// # Arguments
85///
86/// * `pool` - Database connection pool for executing the query
87/// * `user_id` - ID of the user whose unread notifications to count
88///
89/// # Returns
90///
91/// * `Ok(i64)` - Successfully retrieved count of unread notifications
92/// * `Err(anyhow::Error)` - Failed to count notifications
93pub async fn count_unread_user_notifications(
94 pool: &Pool<MySql>,
95 user_id: i64,
96) -> anyhow::Result<i64> {
97 let count = sqlx::query_scalar::<_, i64>(
98 "SELECT COUNT(*) FROM user_notifications WHERE user_id = ? AND read_status = FALSE",
99 )
100 .bind(user_id)
101 .fetch_one(pool)
102 .await
103 .context("Failed to count unread user notifications")?;
104
105 Ok(count)
106}
107
108/// Retrieves a specific user notification by its unique identifier.
109///
110/// This function fetches a single user notification record matching the provided ID.
111/// It's typically used for retrieving detailed information about a specific notification.
112///
113/// # Arguments
114///
115/// * `pool` - Database connection pool for executing the query
116/// * `id` - Unique identifier of the notification to retrieve
117///
118/// # Returns
119///
120/// * `Ok(UserNotification)` - Successfully retrieved notification
121/// * `Err(anyhow::Error)` - Failed to fetch notification (including if not found)
122pub async fn get_user_notification_by_id(
123 pool: &Pool<MySql>,
124 id: i64,
125) -> anyhow::Result<UserNotification> {
126 let notification = sqlx::query_as::<_, UserNotification>(
127 "SELECT * FROM user_notifications WHERE id = ?",
128 )
129 .bind(id)
130 .fetch_one(pool)
131 .await
132 .context("Failed to fetch user notification")?;
133
134 Ok(notification)
135}
136
137/// Creates a new notification for a user.
138///
139/// This function inserts a new user notification record with the provided parameters.
140/// It handles both required fields and optional fields.
141///
142/// # Arguments
143///
144/// * `pool` - Database connection pool for executing the query
145/// * `user_id` - ID of the user to notify
146/// * `message` - The notification message text
147/// * `notification_type` - Type of notification (info, warning, error, success)
148/// * `org_id` - Optional organization ID related to the notification
149/// * `app_id` - Optional application ID related to the notification
150/// * `importance` - Optional importance level (default is "normal")
151/// * `action_url` - Optional URL for a related action
152/// * `action_label` - Optional label for the action button
153/// * `expires_at` - Optional expiration date for the notification
154///
155/// # Returns
156///
157/// * `Ok(UserNotification)` - Successfully created notification, including database-assigned fields
158/// * `Err(anyhow::Error)` - Failed to create notification
159pub async fn create_user_notification(
160 pool: &Pool<MySql>,
161 user_id: i64,
162 message: &str,
163 notification_type: &str,
164 org_id: Option<i64>,
165 app_id: Option<i64>,
166 importance: Option<&str>,
167 action_url: Option<&str>,
168 action_label: Option<&str>,
169 expires_at: Option<chrono::DateTime<chrono::Utc>>,
170) -> anyhow::Result<UserNotification> {
171 // Begin transaction
172 let mut tx = pool.begin().await?;
173
174 let notification = sqlx::query_as::<_, UserNotification>(
175 r#"INSERT INTO user_notifications (
176 user_id, org_id, app_id, notification_type, message,
177 importance, action_url, action_label, created_at, expires_at
178 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, ?)"#,
179 )
180 .bind(user_id)
181 .bind(org_id)
182 .bind(app_id)
183 .bind(notification_type)
184 .bind(message)
185 .bind(importance.unwrap_or("normal"))
186 .bind(action_url)
187 .bind(action_label)
188 .bind(expires_at)
189 .fetch_one(&mut *tx)
190 .await
191 .context("Failed to create user notification")?;
192
193 // Commit transaction
194 tx.commit().await?;
195
196 // Return newly created notification
197 Ok(notification)
198}
199
200/// Marks a user notification as read.
201///
202/// This function updates the read_status of a user notification to indicate
203/// that the user has viewed or acknowledged it.
204///
205/// # Arguments
206///
207/// * `pool` - Database connection pool for executing the query
208/// * `id` - Unique identifier of the notification to mark as read
209///
210/// # Returns
211///
212/// * `Ok(UserNotification)` - Successfully updated notification
213/// * `Err(anyhow::Error)` - Failed to update notification
214pub async fn mark_user_notification_as_read(
215 pool: &Pool<MySql>,
216 id: i64,
217) -> anyhow::Result<UserNotification> {
218 let mut tx = pool.begin().await?;
219
220 let notification = sqlx::query_as::<_, UserNotification>(
221 "UPDATE user_notifications SET read_status = TRUE WHERE id = ?",
222 )
223 .bind(id)
224 .fetch_one(&mut *tx)
225 .await
226 .context("Failed to mark user notification as read")?;
227
228 tx.commit().await?;
229 Ok(notification)
230}
231
232/// Marks all notifications for a user as read.
233///
234/// This function updates the read_status of all notifications for a specific user
235/// to indicate that the user has viewed or acknowledged them.
236///
237/// # Arguments
238///
239/// * `pool` - Database connection pool for executing the query
240/// * `user_id` - ID of the user whose notifications to mark as read
241///
242/// # Returns
243///
244/// * `Ok(())` - Successfully updated notifications
245/// * `Err(anyhow::Error)` - Failed to update notifications
246pub async fn mark_all_user_notifications_as_read(
247 pool: &Pool<MySql>,
248 user_id: i64,
249) -> anyhow::Result<()> {
250 let mut tx = pool.begin().await?;
251
252 sqlx::query(
253 "UPDATE user_notifications SET read_status = TRUE WHERE user_id = ? AND read_status = FALSE",
254 )
255 .bind(user_id)
256 .execute(&mut *tx)
257 .await
258 .context("Failed to mark all user notifications as read")?;
259
260 tx.commit().await?;
261 Ok(())
262}
263
264/// Deletes a user notification.
265///
266/// This function permanently removes a user notification record with the specified ID.
267/// The operation is performed within a transaction to ensure data consistency.
268///
269/// # Arguments
270///
271/// * `pool` - Database connection pool for executing the query
272/// * `id` - Unique identifier of the notification to delete
273///
274/// # Returns
275///
276/// * `Ok(())` - Successfully deleted the notification
277/// * `Err(anyhow::Error)` - Failed to delete the notification
278pub async fn delete_user_notification(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
279 let mut tx = pool.begin().await?;
280
281 sqlx::query("DELETE FROM user_notifications WHERE id = ?")
282 .bind(id)
283 .execute(&mut *tx)
284 .await
285 .context("Failed to delete user notification")?;
286
287 tx.commit().await?;
288 Ok(())
289}
290
291/// Deletes all read notifications for a user.
292///
293/// This function permanently removes all notifications that have been marked as read
294/// for a specific user. This can be used as a "clear all" feature to help users
295/// maintain a clean notification list.
296///
297/// # Arguments
298///
299/// * `pool` - Database connection pool for executing the query
300/// * `user_id` - ID of the user whose read notifications should be deleted
301///
302/// # Returns
303///
304/// * `Ok(i64)` - Number of notifications deleted
305/// * `Err(anyhow::Error)` - Failed to delete notifications
306pub async fn delete_read_user_notifications(
307 pool: &Pool<MySql>,
308 user_id: i64,
309) -> anyhow::Result<i64> {
310 let mut tx = pool.begin().await?;
311
312 let result = sqlx::query("DELETE FROM user_notifications WHERE user_id = ? AND read_status = TRUE")
313 .bind(user_id)
314 .execute(&mut *tx)
315 .await
316 .context("Failed to delete read user notifications")?;
317
318 tx.commit().await?;
319 Ok(result.rows_affected() as i64)
320}
321
322// =================== Role Notifications ===================
323
324/// Retrieves a paginated list of role notifications.
325///
326/// This function fetches a subset of role notifications based on pagination parameters,
327/// ordering them by creation date in descending order (newest first).
328///
329/// # Arguments
330///
331/// * `pool` - Database connection pool for executing the query
332/// * `role_id` - ID of the role whose notifications to retrieve
333/// * `page` - Zero-based page number
334/// * `per_page` - Number of records to fetch per page
335///
336/// # Returns
337///
338/// * `Ok(Vec<RoleNotification>)` - Successfully retrieved list of notifications
339/// * `Err(anyhow::Error)` - Failed to fetch notifications
340pub async fn list_role_notifications(
341 pool: &Pool<MySql>,
342 role_id: i64,
343 page: i64,
344 per_page: i64,
345) -> anyhow::Result<Vec<RoleNotification>> {
346 let notifications = sqlx::query_as::<_, RoleNotification>(
347 r#"
348 SELECT * FROM role_notifications
349 WHERE role_id = ?
350 ORDER BY created_at DESC
351 LIMIT ? OFFSET ?
352 "#,
353 )
354 .bind(role_id)
355 .bind(per_page)
356 .bind(page * per_page)
357 .fetch_all(pool)
358 .await
359 .context("Failed to fetch role notifications")?;
360
361 Ok(notifications)
362}
363
364/// Creates a new notification for a role.
365///
366/// This function inserts a new role notification record that will be visible
367/// to all users with the specified role.
368///
369/// # Arguments
370///
371/// * `pool` - Database connection pool for executing the query
372/// * `role_id` - ID of the role to notify
373/// * `message` - The notification message text
374/// * `notification_type` - Type of notification (info, warning, error, success)
375/// * `org_id` - Optional organization ID related to the notification
376/// * `app_id` - Optional application ID related to the notification
377/// * `importance` - Optional importance level (default is "normal")
378/// * `action_url` - Optional URL for a related action
379/// * `action_label` - Optional label for the action button
380/// * `expires_at` - Optional expiration date for the notification
381///
382/// # Returns
383///
384/// * `Ok(RoleNotification)` - Successfully created notification
385/// * `Err(anyhow::Error)` - Failed to create notification
386pub async fn create_role_notification(
387 pool: &Pool<MySql>,
388 role_id: i64,
389 message: &str,
390 notification_type: &str,
391 org_id: Option<i64>,
392 app_id: Option<i64>,
393 importance: Option<&str>,
394 action_url: Option<&str>,
395 action_label: Option<&str>,
396 expires_at: Option<chrono::DateTime<chrono::Utc>>,
397) -> anyhow::Result<RoleNotification> {
398 let mut tx = pool.begin().await?;
399
400 let notification = sqlx::query_as::<_, RoleNotification>(
401 r#"INSERT INTO role_notifications (
402 role_id, org_id, app_id, notification_type, message,
403 importance, action_url, action_label, created_at, expires_at
404 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, ?)"#,
405 )
406 .bind(role_id)
407 .bind(org_id)
408 .bind(app_id)
409 .bind(notification_type)
410 .bind(message)
411 .bind(importance.unwrap_or("normal"))
412 .bind(action_url)
413 .bind(action_label)
414 .bind(expires_at)
415 .fetch_one(&mut *tx)
416 .await
417 .context("Failed to create role notification")?;
418
419 tx.commit().await?;
420 Ok(notification)
421}
422
423/// Retrieves a specific role notification by its unique identifier.
424///
425/// # Arguments
426///
427/// * `pool` - Database connection pool for executing the query
428/// * `id` - Unique identifier of the role notification to retrieve
429///
430/// # Returns
431///
432/// * `Ok(RoleNotification)` - Successfully retrieved notification
433/// * `Err(anyhow::Error)` - Failed to fetch notification
434pub async fn get_role_notification_by_id(
435 pool: &Pool<MySql>,
436 id: i64,
437) -> anyhow::Result<RoleNotification> {
438 let notification = sqlx::query_as::<_, RoleNotification>(
439 "SELECT * FROM role_notifications WHERE id = ?",
440 )
441 .bind(id)
442 .fetch_one(pool)
443 .await
444 .context("Failed to fetch role notification")?;
445
446 Ok(notification)
447}
448
449/// Deletes a role notification.
450///
451/// This function permanently removes a role notification record with the specified ID.
452///
453/// # Arguments
454///
455/// * `pool` - Database connection pool for executing the query
456/// * `id` - Unique identifier of the notification to delete
457///
458/// # Returns
459///
460/// * `Ok(())` - Successfully deleted the notification
461/// * `Err(anyhow::Error)` - Failed to delete the notification
462pub async fn delete_role_notification(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
463 let mut tx = pool.begin().await?;
464
465 sqlx::query("DELETE FROM role_notifications WHERE id = ?")
466 .bind(id)
467 .execute(&mut *tx)
468 .await
469 .context("Failed to delete role notification")?;
470
471 tx.commit().await?;
472 Ok(())
473}
474
475// =================== Notification Acknowledgments ===================
476
477/// Creates a notification acknowledgment for a user.
478///
479/// This function records that a user has acknowledged a notification,
480/// which is useful for role-based notifications that need individual tracking.
481///
482/// # Arguments
483///
484/// * `pool` - Database connection pool for executing the query
485/// * `user_id` - ID of the user acknowledging the notification
486/// * `notification_id` - Optional ID of a user notification being acknowledged
487/// * `role_notification_id` - Optional ID of a role notification being acknowledged
488///
489/// # Returns
490///
491/// * `Ok(NotificationAcknowledgment)` - Successfully created acknowledgment
492/// * `Err(anyhow::Error)` - Failed to create acknowledgment
493pub async fn create_notification_acknowledgment(
494 pool: &Pool<MySql>,
495 user_id: i64,
496 notification_id: Option<i64>,
497 role_notification_id: Option<i64>,
498) -> anyhow::Result<NotificationAcknowledgment> {
499 // Validate that exactly one notification type is provided
500 if (notification_id.is_some() && role_notification_id.is_some()) ||
501 (notification_id.is_none() && role_notification_id.is_none()) {
502 return Err(anyhow::anyhow!("Either notification_id OR role_notification_id must be provided, not both or neither"));
503 }
504
505 let mut tx = pool.begin().await?;
506
507 let acknowledgment = sqlx::query_as::<_, NotificationAcknowledgment>(
508 r#"INSERT INTO notification_acknowledgments (
509 user_id, notification_id, role_notification_id, acknowledged_at
510 ) VALUES (?, ?, ?, CURRENT_TIMESTAMP)"#,
511 )
512 .bind(user_id)
513 .bind(notification_id)
514 .bind(role_notification_id)
515 .fetch_one(&mut *tx)
516 .await
517 .context("Failed to create notification acknowledgment")?;
518
519 tx.commit().await?;
520 Ok(acknowledgment)
521}
522
523/// Checks if a user has acknowledged a specific role notification.
524///
525/// # Arguments
526///
527/// * `pool` - Database connection pool for executing the query
528/// * `user_id` - ID of the user to check
529/// * `role_notification_id` - ID of the role notification to check
530///
531/// # Returns
532///
533/// * `Ok(bool)` - True if acknowledged, false otherwise
534/// * `Err(anyhow::Error)` - Failed to check acknowledgment status
535pub async fn has_acknowledged_role_notification(
536 pool: &Pool<MySql>,
537 user_id: i64,
538 role_notification_id: i64,
539) -> anyhow::Result<bool> {
540 let count = sqlx::query_scalar::<_, i64>(
541 r#"
542 SELECT COUNT(*) FROM notification_acknowledgments
543 WHERE user_id = ? AND role_notification_id = ?
544 "#,
545 )
546 .bind(user_id)
547 .bind(role_notification_id)
548 .fetch_one(pool)
549 .await
550 .context("Failed to check notification acknowledgment status")?;
551
552 Ok(count > 0)
553}
554
555/// Retrieves all role notifications for a user with acknowledgment status.
556///
557/// This function fetches role notifications for all roles a user has,
558/// along with information about whether the user has acknowledged each notification.
559///
560/// # Arguments
561///
562/// * `pool` - Database connection pool for executing the query
563/// * `user_id` - ID of the user
564/// * `page` - Zero-based page number
565/// * `per_page` - Number of records to fetch per page
566///
567/// # Returns
568///
569/// * `Ok(Vec<RoleNotificationWithAcknowledgment>)` - Successfully retrieved notifications with acknowledgment status
570/// * `Err(anyhow::Error)` - Failed to fetch notifications
571pub async fn get_user_role_notifications(
572 pool: &Pool<MySql>,
573 user_id: i64,
574 page: i64,
575 per_page: i64,
576) -> anyhow::Result<Vec<UserNotificationWithRoleNotifications>> {
577 // First, get user notifications
578 let user_notifications = list_user_notifications(pool, user_id, page, per_page, true).await?;
579
580 // Then get role notifications for the user's roles
581 let role_notifications = sqlx::query_as::<_, RoleNotification>(
582 r#"
583 SELECT rn.* FROM role_notifications rn
584 JOIN user_roles ur ON rn.role_id = ur.role_id
585 WHERE ur.user_id = ?
586 ORDER BY rn.created_at DESC
587 LIMIT ? OFFSET ?
588 "#,
589 )
590 .bind(user_id)
591 .bind(per_page)
592 .bind(page * per_page)
593 .fetch_all(pool)
594 .await
595 .context("Failed to fetch role notifications for user")?;
596
597 // Get acknowledgments for these role notifications
598 let acknowledgments = sqlx::query_as::<_, NotificationAcknowledgment>(
599 r#"
600 SELECT * FROM notification_acknowledgments
601 WHERE user_id = ? AND role_notification_id IN (
602 SELECT rn.id FROM role_notifications rn
603 JOIN user_roles ur ON rn.role_id = ur.role_id
604 WHERE ur.user_id = ?
605 )
606 "#,
607 )
608 .bind(user_id)
609 .bind(user_id)
610 .fetch_all(pool)
611 .await
612 .context("Failed to fetch notification acknowledgments")?;
613
614 // Combine into a single result
615 let result = UserNotificationWithRoleNotifications {
616 user_notifications,
617 role_notifications,
618 acknowledgments,
619 };
620
621 Ok(vec![result])
622}
623
624/// Gets notifications for a user from all sources with unread count.
625///
626/// This function provides a comprehensive view of all notifications relevant to a user,
627/// including both direct user notifications and role-based notifications applicable
628/// to the user's roles. It also includes a count of unread notifications.
629///
630/// # Arguments
631///
632/// * `pool` - Database connection pool for executing the query
633/// * `user_id` - ID of the user
634/// * `page` - Zero-based page number
635/// * `per_page` - Number of records to fetch per page
636///
637/// # Returns
638///
639/// * `Ok(NotificationWithCount)` - Successfully retrieved notifications with count
640/// * `Err(anyhow::Error)` - Failed to fetch notifications
641pub async fn get_all_user_notifications_with_count(
642 pool: &Pool<MySql>,
643 user_id: i64,
644 page: i64,
645 per_page: i64,
646) -> anyhow::Result<NotificationWithCount> {
647 // Get user notifications
648 let user_notifications = list_user_notifications(pool, user_id, page, per_page, true).await?;
649
650 // Get role notifications
651 let role_notifications = sqlx::query_as::<_, RoleNotification>(
652 r#"
653 SELECT rn.* FROM role_notifications rn
654 JOIN user_roles ur ON rn.role_id = ur.role_id
655 WHERE ur.user_id = ?
656 ORDER BY rn.created_at DESC
657 LIMIT ? OFFSET ?
658 "#,
659 )
660 .bind(user_id)
661 .bind(per_page)
662 .bind(page * per_page)
663 .fetch_all(pool)
664 .await
665 .context("Failed to fetch role notifications")?;
666
667 // Count unread user notifications
668 let unread_count = count_unread_user_notifications(pool, user_id).await?;
669
670 // Get acknowledgments for role notifications
671 let acknowledgments = sqlx::query_as::<_, NotificationAcknowledgment>(
672 r#"
673 SELECT * FROM notification_acknowledgments
674 WHERE user_id = ? AND role_notification_id IN (
675 SELECT rn.id FROM role_notifications rn
676 JOIN user_roles ur ON rn.role_id = ur.role_id
677 WHERE ur.user_id = ?
678 )
679 "#,
680 )
681 .bind(user_id)
682 .bind(user_id)
683 .fetch_all(pool)
684 .await
685 .context("Failed to fetch acknowledgments")?;
686
687 // Calculate unacknowledged role notifications
688 let acknowledged_role_notification_ids: Vec<i64> = acknowledgments
689 .iter()
690 .filter_map(|ack| ack.role_notification_id)
691 .collect();
692
693 let unacknowledged_role_count = role_notifications
694 .iter()
695 .filter(|rn| !acknowledged_role_notification_ids.contains(&rn.id))
696 .count() as i64;
697
698 // Combine results
699 let result = NotificationWithCount {
700 user_notifications,
701 role_notifications,
702 acknowledgments,
703 unread_user_count: unread_count,
704 unacknowledged_role_count,
705 total_unread_count: unread_count + unacknowledged_role_count,
706 };
707
708 Ok(result)
709}