omni_orchestrator/schemas/v1/db/queries/
notification.rs

1use anyhow::Context;
2use serde::Serialize;
3use sqlx::{MySql, Pool};
4
5use libomni::types::db::v1 as types;
6use types::notification::{
7    Notification, NotificationWithCount, UserNotification, RoleNotification, 
8    NotificationAcknowledgment, UserNotificationWithRoleNotifications
9};
10
11// =================== User Notifications ===================
12
13/// Retrieves a paginated list of notifications for a specific user.
14///
15/// This function fetches a subset of notifications for a given user based on pagination
16/// parameters, ordering them by creation date in descending order (newest first).
17/// This helps manage large notification lists by retrieving only a specific "page" of results.
18///
19/// # Arguments
20///
21/// * `pool` - Database connection pool for executing the query
22/// * `user_id` - ID of the user whose notifications to retrieve
23/// * `page` - Zero-based page number (e.g., 0 for first page, 1 for second page)
24/// * `per_page` - Number of records to fetch per page
25/// * `include_read` - Whether to include notifications that have been marked as read
26///
27/// # Returns
28///
29/// * `Ok(Vec<UserNotification>)` - Successfully retrieved list of notifications
30/// * `Err(anyhow::Error)` - Failed to fetch notifications, with context
31pub async fn list_user_notifications(
32    pool: &Pool<MySql>,
33    user_id: i64,
34    page: i64,
35    per_page: i64,
36    include_read: bool,
37) -> anyhow::Result<Vec<UserNotification>> {
38    println!("Attempting to fetch user notifications from database...");
39
40    // Build query based on whether to include read notifications
41    let query = if include_read {
42        r#"
43        SELECT * FROM user_notifications
44        WHERE user_id = ?
45        ORDER BY created_at DESC
46        LIMIT ? OFFSET ?
47        "#
48    } else {
49        r#"
50        SELECT * FROM user_notifications
51        WHERE user_id = ? AND read_status = FALSE
52        ORDER BY created_at DESC
53        LIMIT ? OFFSET ?
54        "#
55    };
56
57    let result = sqlx::query_as::<_, UserNotification>(query)
58        .bind(user_id)
59        .bind(per_page)
60        .bind(page * per_page)
61        .fetch_all(pool)
62        .await;
63
64    match result {
65        Ok(notifications) => {
66            println!(
67                "Successfully fetched {} user notifications",
68                notifications.len()
69            );
70            Ok(notifications)
71        }
72        Err(e) => {
73            eprintln!("Error fetching user notifications: {:#?}", e);
74            Err(anyhow::Error::new(e).context("Failed to fetch user notifications"))
75        }
76    }
77}
78
79/// Counts unread notifications for a user.
80///
81/// This function retrieves the count of unread notifications for a specific user,
82/// which is useful for displaying notification badges or alerts.
83///
84/// # Arguments
85///
86/// * `pool` - Database connection pool for executing the query
87/// * `user_id` - ID of the user whose unread notifications to count
88///
89/// # Returns
90///
91/// * `Ok(i64)` - Successfully retrieved count of unread notifications
92/// * `Err(anyhow::Error)` - Failed to count notifications
93pub async fn count_unread_user_notifications(
94    pool: &Pool<MySql>,
95    user_id: i64,
96) -> anyhow::Result<i64> {
97    let count = sqlx::query_scalar::<_, i64>(
98        "SELECT COUNT(*) FROM user_notifications WHERE user_id = ? AND read_status = FALSE",
99    )
100    .bind(user_id)
101    .fetch_one(pool)
102    .await
103    .context("Failed to count unread user notifications")?;
104
105    Ok(count)
106}
107
108/// Retrieves a specific user notification by its unique identifier.
109///
110/// This function fetches a single user notification record matching the provided ID.
111/// It's typically used for retrieving detailed information about a specific notification.
112///
113/// # Arguments
114///
115/// * `pool` - Database connection pool for executing the query
116/// * `id` - Unique identifier of the notification to retrieve
117///
118/// # Returns
119///
120/// * `Ok(UserNotification)` - Successfully retrieved notification
121/// * `Err(anyhow::Error)` - Failed to fetch notification (including if not found)
122pub async fn get_user_notification_by_id(
123    pool: &Pool<MySql>,
124    id: i64,
125) -> anyhow::Result<UserNotification> {
126    let notification = sqlx::query_as::<_, UserNotification>(
127        "SELECT * FROM user_notifications WHERE id = ?",
128    )
129    .bind(id)
130    .fetch_one(pool)
131    .await
132    .context("Failed to fetch user notification")?;
133
134    Ok(notification)
135}
136
137/// Creates a new notification for a user.
138///
139/// This function inserts a new user notification record with the provided parameters.
140/// It handles both required fields and optional fields.
141///
142/// # Arguments
143///
144/// * `pool` - Database connection pool for executing the query
145/// * `user_id` - ID of the user to notify
146/// * `message` - The notification message text
147/// * `notification_type` - Type of notification (info, warning, error, success)
148/// * `org_id` - Optional organization ID related to the notification
149/// * `app_id` - Optional application ID related to the notification
150/// * `importance` - Optional importance level (default is "normal")
151/// * `action_url` - Optional URL for a related action
152/// * `action_label` - Optional label for the action button
153/// * `expires_at` - Optional expiration date for the notification
154///
155/// # Returns
156///
157/// * `Ok(UserNotification)` - Successfully created notification, including database-assigned fields
158/// * `Err(anyhow::Error)` - Failed to create notification
159pub async fn create_user_notification(
160    pool: &Pool<MySql>,
161    user_id: i64,
162    message: &str,
163    notification_type: &str,
164    org_id: Option<i64>,
165    app_id: Option<i64>,
166    importance: Option<&str>,
167    action_url: Option<&str>,
168    action_label: Option<&str>,
169    expires_at: Option<chrono::DateTime<chrono::Utc>>,
170) -> anyhow::Result<UserNotification> {
171    // Begin transaction
172    let mut tx = pool.begin().await?;
173
174    let notification = sqlx::query_as::<_, UserNotification>(
175        r#"INSERT INTO user_notifications (
176            user_id, org_id, app_id, notification_type, message, 
177            importance, action_url, action_label, created_at, expires_at
178        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, ?)"#,
179    )
180    .bind(user_id)
181    .bind(org_id)
182    .bind(app_id)
183    .bind(notification_type)
184    .bind(message)
185    .bind(importance.unwrap_or("normal"))
186    .bind(action_url)
187    .bind(action_label)
188    .bind(expires_at)
189    .fetch_one(&mut *tx)
190    .await
191    .context("Failed to create user notification")?;
192
193    // Commit transaction
194    tx.commit().await?;
195
196    // Return newly created notification
197    Ok(notification)
198}
199
200/// Marks a user notification as read.
201///
202/// This function updates the read_status of a user notification to indicate
203/// that the user has viewed or acknowledged it.
204///
205/// # Arguments
206///
207/// * `pool` - Database connection pool for executing the query
208/// * `id` - Unique identifier of the notification to mark as read
209///
210/// # Returns
211///
212/// * `Ok(UserNotification)` - Successfully updated notification
213/// * `Err(anyhow::Error)` - Failed to update notification
214pub async fn mark_user_notification_as_read(
215    pool: &Pool<MySql>,
216    id: i64,
217) -> anyhow::Result<UserNotification> {
218    let mut tx = pool.begin().await?;
219
220    let notification = sqlx::query_as::<_, UserNotification>(
221        "UPDATE user_notifications SET read_status = TRUE WHERE id = ?",
222    )
223    .bind(id)
224    .fetch_one(&mut *tx)
225    .await
226    .context("Failed to mark user notification as read")?;
227
228    tx.commit().await?;
229    Ok(notification)
230}
231
232/// Marks all notifications for a user as read.
233///
234/// This function updates the read_status of all notifications for a specific user
235/// to indicate that the user has viewed or acknowledged them.
236///
237/// # Arguments
238///
239/// * `pool` - Database connection pool for executing the query
240/// * `user_id` - ID of the user whose notifications to mark as read
241///
242/// # Returns
243///
244/// * `Ok(())` - Successfully updated notifications
245/// * `Err(anyhow::Error)` - Failed to update notifications
246pub async fn mark_all_user_notifications_as_read(
247    pool: &Pool<MySql>,
248    user_id: i64,
249) -> anyhow::Result<()> {
250    let mut tx = pool.begin().await?;
251
252    sqlx::query(
253        "UPDATE user_notifications SET read_status = TRUE WHERE user_id = ? AND read_status = FALSE",
254    )
255    .bind(user_id)
256    .execute(&mut *tx)
257    .await
258    .context("Failed to mark all user notifications as read")?;
259
260    tx.commit().await?;
261    Ok(())
262}
263
264/// Deletes a user notification.
265///
266/// This function permanently removes a user notification record with the specified ID.
267/// The operation is performed within a transaction to ensure data consistency.
268///
269/// # Arguments
270///
271/// * `pool` - Database connection pool for executing the query
272/// * `id` - Unique identifier of the notification to delete
273///
274/// # Returns
275///
276/// * `Ok(())` - Successfully deleted the notification
277/// * `Err(anyhow::Error)` - Failed to delete the notification
278pub async fn delete_user_notification(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
279    let mut tx = pool.begin().await?;
280
281    sqlx::query("DELETE FROM user_notifications WHERE id = ?")
282        .bind(id)
283        .execute(&mut *tx)
284        .await
285        .context("Failed to delete user notification")?;
286
287    tx.commit().await?;
288    Ok(())
289}
290
291/// Deletes all read notifications for a user.
292///
293/// This function permanently removes all notifications that have been marked as read
294/// for a specific user. This can be used as a "clear all" feature to help users
295/// maintain a clean notification list.
296///
297/// # Arguments
298///
299/// * `pool` - Database connection pool for executing the query
300/// * `user_id` - ID of the user whose read notifications should be deleted
301///
302/// # Returns
303///
304/// * `Ok(i64)` - Number of notifications deleted
305/// * `Err(anyhow::Error)` - Failed to delete notifications
306pub async fn delete_read_user_notifications(
307    pool: &Pool<MySql>,
308    user_id: i64,
309) -> anyhow::Result<i64> {
310    let mut tx = pool.begin().await?;
311
312    let result = sqlx::query("DELETE FROM user_notifications WHERE user_id = ? AND read_status = TRUE")
313        .bind(user_id)
314        .execute(&mut *tx)
315        .await
316        .context("Failed to delete read user notifications")?;
317
318    tx.commit().await?;
319    Ok(result.rows_affected() as i64)
320}
321
322// =================== Role Notifications ===================
323
324/// Retrieves a paginated list of role notifications.
325///
326/// This function fetches a subset of role notifications based on pagination parameters,
327/// ordering them by creation date in descending order (newest first).
328///
329/// # Arguments
330///
331/// * `pool` - Database connection pool for executing the query
332/// * `role_id` - ID of the role whose notifications to retrieve
333/// * `page` - Zero-based page number
334/// * `per_page` - Number of records to fetch per page
335///
336/// # Returns
337///
338/// * `Ok(Vec<RoleNotification>)` - Successfully retrieved list of notifications
339/// * `Err(anyhow::Error)` - Failed to fetch notifications
340pub async fn list_role_notifications(
341    pool: &Pool<MySql>,
342    role_id: i64,
343    page: i64,
344    per_page: i64,
345) -> anyhow::Result<Vec<RoleNotification>> {
346    let notifications = sqlx::query_as::<_, RoleNotification>(
347        r#"
348        SELECT * FROM role_notifications
349        WHERE role_id = ?
350        ORDER BY created_at DESC
351        LIMIT ? OFFSET ?
352        "#,
353    )
354    .bind(role_id)
355    .bind(per_page)
356    .bind(page * per_page)
357    .fetch_all(pool)
358    .await
359    .context("Failed to fetch role notifications")?;
360
361    Ok(notifications)
362}
363
364/// Creates a new notification for a role.
365///
366/// This function inserts a new role notification record that will be visible
367/// to all users with the specified role.
368///
369/// # Arguments
370///
371/// * `pool` - Database connection pool for executing the query
372/// * `role_id` - ID of the role to notify
373/// * `message` - The notification message text
374/// * `notification_type` - Type of notification (info, warning, error, success)
375/// * `org_id` - Optional organization ID related to the notification
376/// * `app_id` - Optional application ID related to the notification
377/// * `importance` - Optional importance level (default is "normal")
378/// * `action_url` - Optional URL for a related action
379/// * `action_label` - Optional label for the action button
380/// * `expires_at` - Optional expiration date for the notification
381///
382/// # Returns
383///
384/// * `Ok(RoleNotification)` - Successfully created notification
385/// * `Err(anyhow::Error)` - Failed to create notification
386pub async fn create_role_notification(
387    pool: &Pool<MySql>,
388    role_id: i64,
389    message: &str,
390    notification_type: &str,
391    org_id: Option<i64>,
392    app_id: Option<i64>,
393    importance: Option<&str>,
394    action_url: Option<&str>,
395    action_label: Option<&str>,
396    expires_at: Option<chrono::DateTime<chrono::Utc>>,
397) -> anyhow::Result<RoleNotification> {
398    let mut tx = pool.begin().await?;
399
400    let notification = sqlx::query_as::<_, RoleNotification>(
401        r#"INSERT INTO role_notifications (
402            role_id, org_id, app_id, notification_type, message, 
403            importance, action_url, action_label, created_at, expires_at
404        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP, ?)"#,
405    )
406    .bind(role_id)
407    .bind(org_id)
408    .bind(app_id)
409    .bind(notification_type)
410    .bind(message)
411    .bind(importance.unwrap_or("normal"))
412    .bind(action_url)
413    .bind(action_label)
414    .bind(expires_at)
415    .fetch_one(&mut *tx)
416    .await
417    .context("Failed to create role notification")?;
418
419    tx.commit().await?;
420    Ok(notification)
421}
422
423/// Retrieves a specific role notification by its unique identifier.
424///
425/// # Arguments
426///
427/// * `pool` - Database connection pool for executing the query
428/// * `id` - Unique identifier of the role notification to retrieve
429///
430/// # Returns
431///
432/// * `Ok(RoleNotification)` - Successfully retrieved notification
433/// * `Err(anyhow::Error)` - Failed to fetch notification
434pub async fn get_role_notification_by_id(
435    pool: &Pool<MySql>,
436    id: i64,
437) -> anyhow::Result<RoleNotification> {
438    let notification = sqlx::query_as::<_, RoleNotification>(
439        "SELECT * FROM role_notifications WHERE id = ?",
440    )
441    .bind(id)
442    .fetch_one(pool)
443    .await
444    .context("Failed to fetch role notification")?;
445
446    Ok(notification)
447}
448
449/// Deletes a role notification.
450///
451/// This function permanently removes a role notification record with the specified ID.
452///
453/// # Arguments
454///
455/// * `pool` - Database connection pool for executing the query
456/// * `id` - Unique identifier of the notification to delete
457///
458/// # Returns
459///
460/// * `Ok(())` - Successfully deleted the notification
461/// * `Err(anyhow::Error)` - Failed to delete the notification
462pub async fn delete_role_notification(pool: &Pool<MySql>, id: i64) -> anyhow::Result<()> {
463    let mut tx = pool.begin().await?;
464
465    sqlx::query("DELETE FROM role_notifications WHERE id = ?")
466        .bind(id)
467        .execute(&mut *tx)
468        .await
469        .context("Failed to delete role notification")?;
470
471    tx.commit().await?;
472    Ok(())
473}
474
475// =================== Notification Acknowledgments ===================
476
477/// Creates a notification acknowledgment for a user.
478///
479/// This function records that a user has acknowledged a notification,
480/// which is useful for role-based notifications that need individual tracking.
481///
482/// # Arguments
483///
484/// * `pool` - Database connection pool for executing the query
485/// * `user_id` - ID of the user acknowledging the notification
486/// * `notification_id` - Optional ID of a user notification being acknowledged
487/// * `role_notification_id` - Optional ID of a role notification being acknowledged
488///
489/// # Returns
490///
491/// * `Ok(NotificationAcknowledgment)` - Successfully created acknowledgment
492/// * `Err(anyhow::Error)` - Failed to create acknowledgment
493pub async fn create_notification_acknowledgment(
494    pool: &Pool<MySql>,
495    user_id: i64,
496    notification_id: Option<i64>,
497    role_notification_id: Option<i64>,
498) -> anyhow::Result<NotificationAcknowledgment> {
499    // Validate that exactly one notification type is provided
500    if (notification_id.is_some() && role_notification_id.is_some()) || 
501       (notification_id.is_none() && role_notification_id.is_none()) {
502        return Err(anyhow::anyhow!("Either notification_id OR role_notification_id must be provided, not both or neither"));
503    }
504
505    let mut tx = pool.begin().await?;
506
507    let acknowledgment = sqlx::query_as::<_, NotificationAcknowledgment>(
508        r#"INSERT INTO notification_acknowledgments (
509            user_id, notification_id, role_notification_id, acknowledged_at
510        ) VALUES (?, ?, ?, CURRENT_TIMESTAMP)"#,
511    )
512    .bind(user_id)
513    .bind(notification_id)
514    .bind(role_notification_id)
515    .fetch_one(&mut *tx)
516    .await
517    .context("Failed to create notification acknowledgment")?;
518
519    tx.commit().await?;
520    Ok(acknowledgment)
521}
522
523/// Checks if a user has acknowledged a specific role notification.
524///
525/// # Arguments
526///
527/// * `pool` - Database connection pool for executing the query
528/// * `user_id` - ID of the user to check
529/// * `role_notification_id` - ID of the role notification to check
530///
531/// # Returns
532///
533/// * `Ok(bool)` - True if acknowledged, false otherwise
534/// * `Err(anyhow::Error)` - Failed to check acknowledgment status
535pub async fn has_acknowledged_role_notification(
536    pool: &Pool<MySql>,
537    user_id: i64,
538    role_notification_id: i64,
539) -> anyhow::Result<bool> {
540    let count = sqlx::query_scalar::<_, i64>(
541        r#"
542        SELECT COUNT(*) FROM notification_acknowledgments
543        WHERE user_id = ? AND role_notification_id = ?
544        "#,
545    )
546    .bind(user_id)
547    .bind(role_notification_id)
548    .fetch_one(pool)
549    .await
550    .context("Failed to check notification acknowledgment status")?;
551
552    Ok(count > 0)
553}
554
555/// Retrieves all role notifications for a user with acknowledgment status.
556///
557/// This function fetches role notifications for all roles a user has,
558/// along with information about whether the user has acknowledged each notification.
559///
560/// # Arguments
561///
562/// * `pool` - Database connection pool for executing the query
563/// * `user_id` - ID of the user
564/// * `page` - Zero-based page number
565/// * `per_page` - Number of records to fetch per page
566///
567/// # Returns
568///
569/// * `Ok(Vec<RoleNotificationWithAcknowledgment>)` - Successfully retrieved notifications with acknowledgment status
570/// * `Err(anyhow::Error)` - Failed to fetch notifications
571pub async fn get_user_role_notifications(
572    pool: &Pool<MySql>,
573    user_id: i64,
574    page: i64,
575    per_page: i64,
576) -> anyhow::Result<Vec<UserNotificationWithRoleNotifications>> {
577    // First, get user notifications
578    let user_notifications = list_user_notifications(pool, user_id, page, per_page, true).await?;
579    
580    // Then get role notifications for the user's roles
581    let role_notifications = sqlx::query_as::<_, RoleNotification>(
582        r#"
583        SELECT rn.* FROM role_notifications rn
584        JOIN user_roles ur ON rn.role_id = ur.role_id
585        WHERE ur.user_id = ?
586        ORDER BY rn.created_at DESC
587        LIMIT ? OFFSET ?
588        "#,
589    )
590    .bind(user_id)
591    .bind(per_page)
592    .bind(page * per_page)
593    .fetch_all(pool)
594    .await
595    .context("Failed to fetch role notifications for user")?;
596    
597    // Get acknowledgments for these role notifications
598    let acknowledgments = sqlx::query_as::<_, NotificationAcknowledgment>(
599        r#"
600        SELECT * FROM notification_acknowledgments
601        WHERE user_id = ? AND role_notification_id IN (
602            SELECT rn.id FROM role_notifications rn
603            JOIN user_roles ur ON rn.role_id = ur.role_id
604            WHERE ur.user_id = ?
605        )
606        "#,
607    )
608    .bind(user_id)
609    .bind(user_id)
610    .fetch_all(pool)
611    .await
612    .context("Failed to fetch notification acknowledgments")?;
613    
614    // Combine into a single result
615    let result = UserNotificationWithRoleNotifications {
616        user_notifications,
617        role_notifications,
618        acknowledgments,
619    };
620    
621    Ok(vec![result])
622}
623
624/// Gets notifications for a user from all sources with unread count.
625///
626/// This function provides a comprehensive view of all notifications relevant to a user,
627/// including both direct user notifications and role-based notifications applicable
628/// to the user's roles. It also includes a count of unread notifications.
629///
630/// # Arguments
631///
632/// * `pool` - Database connection pool for executing the query
633/// * `user_id` - ID of the user
634/// * `page` - Zero-based page number
635/// * `per_page` - Number of records to fetch per page
636///
637/// # Returns
638///
639/// * `Ok(NotificationWithCount)` - Successfully retrieved notifications with count
640/// * `Err(anyhow::Error)` - Failed to fetch notifications
641pub async fn get_all_user_notifications_with_count(
642    pool: &Pool<MySql>,
643    user_id: i64,
644    page: i64,
645    per_page: i64,
646) -> anyhow::Result<NotificationWithCount> {
647    // Get user notifications
648    let user_notifications = list_user_notifications(pool, user_id, page, per_page, true).await?;
649    
650    // Get role notifications
651    let role_notifications = sqlx::query_as::<_, RoleNotification>(
652        r#"
653        SELECT rn.* FROM role_notifications rn
654        JOIN user_roles ur ON rn.role_id = ur.role_id
655        WHERE ur.user_id = ?
656        ORDER BY rn.created_at DESC
657        LIMIT ? OFFSET ?
658        "#,
659    )
660    .bind(user_id)
661    .bind(per_page)
662    .bind(page * per_page)
663    .fetch_all(pool)
664    .await
665    .context("Failed to fetch role notifications")?;
666    
667    // Count unread user notifications
668    let unread_count = count_unread_user_notifications(pool, user_id).await?;
669    
670    // Get acknowledgments for role notifications
671    let acknowledgments = sqlx::query_as::<_, NotificationAcknowledgment>(
672        r#"
673        SELECT * FROM notification_acknowledgments
674        WHERE user_id = ? AND role_notification_id IN (
675            SELECT rn.id FROM role_notifications rn
676            JOIN user_roles ur ON rn.role_id = ur.role_id
677            WHERE ur.user_id = ?
678        )
679        "#,
680    )
681    .bind(user_id)
682    .bind(user_id)
683    .fetch_all(pool)
684    .await
685    .context("Failed to fetch acknowledgments")?;
686    
687    // Calculate unacknowledged role notifications
688    let acknowledged_role_notification_ids: Vec<i64> = acknowledgments
689        .iter()
690        .filter_map(|ack| ack.role_notification_id)
691        .collect();
692    
693    let unacknowledged_role_count = role_notifications
694        .iter()
695        .filter(|rn| !acknowledged_role_notification_ids.contains(&rn.id))
696        .count() as i64;
697    
698    // Combine results
699    let result = NotificationWithCount {
700        user_notifications,
701        role_notifications,
702        acknowledgments,
703        unread_user_count: unread_count,
704        unacknowledged_role_count,
705        total_unread_count: unread_count + unacknowledged_role_count,
706    };
707    
708    Ok(result)
709}