omni_orchestrator/schemas/v1/api/
users.rs

1use super::super::db::queries::user::{
2    create_user, get_user_by_email, record_login_attempt, create_session,
3    invalidate_session, update_user_security, update_user_pii, update_user_meta,
4    get_user_sessions, get_user_meta, get_user_pii
5};
6
7use super::super::super::auth::{AuthConfig, Claims};
8use crate::schemas::v1::db::queries;
9use super::super::db::queries::user::invalidate_all_user_sessions;
10use super::super::models::user::User;
11use chrono::{Duration, Utc};
12use jsonwebtoken::{encode, EncodingKey, Header};
13use log;
14use rand::rngs::OsRng;
15use rand::{RngCore, TryRngCore};
16use rocket::{http::Status, http::Cookie, http::CookieJar};
17use rocket::serde::json::json;
18use rocket::{get, post, put};
19use rocket::response::status::Custom;
20use rocket::State;
21use sha2::{Digest, Sha256};
22use sqlx::mysql::MySqlPool as Pool;
23use uuid::Uuid;
24
25/// Register a new user
26#[post("/auth/register", data = "<data>")]
27pub async fn handle_register(
28    pool: &State<Pool>, 
29    data: String,
30    cookies: &CookieJar<'_>,
31    auth_config: &State<AuthConfig>
32) -> Result<rocket::serde::json::Value, Custom<String>> {
33    let data = match serde_json5::from_str::<serde_json::Value>(&data) {
34        Ok(d) => d,
35        Err(_) => return Err(Custom(Status::BadRequest, String::from("Not a valid JSON object"))),
36    };
37
38    let email = match data.get("email").and_then(|e| e.as_str()) {
39        Some(e) => e,
40        None => {
41            return Err(Custom(
42                Status::BadRequest,
43                String::from("Email is required and must be a string"),
44            ))
45        }
46    };
47
48    let password = match data.get("password").and_then(|p| p.as_str()) {
49        Some(p) => p,
50        None => {
51            return Err(Custom(
52                Status::BadRequest,
53                String::from("Password is required and must be a string"),
54            ))
55        }
56    };
57
58    let name = match data.get("name").and_then(|n| n.as_str()) {
59        Some(n) => n,
60        None => {
61            return Err(Custom(
62                Status::BadRequest,
63                String::from("Name is required and must be a string"),
64            ))
65        }
66    };
67
68    // Check if user already exists
69    if let Ok(_) = get_user_by_email(pool, email).await {
70        return Err(Custom(
71            Status::Conflict,
72            String::from("User with this email already exists"),
73        ));
74    }
75
76    // Generate a random salt and hash the password
77    let mut rng = OsRng;
78    let mut salt = [0u8; 16];
79    rng.try_fill_bytes(&mut salt);
80    let salt_hex = hex::encode(salt);
81    let salted = format!("{}{}", password, salt_hex);
82    let mut hasher = Sha256::new();
83    hasher.update(salted.as_bytes());
84    let password_hash = hex::encode(hasher.finalize());
85
86    // Create the user
87    let user = match create_user(pool, email, &password_hash, &salt_hex).await {
88        Ok(user) => user,
89        Err(e) => {
90            log::error!("Error creating user: {}", e);
91            return Err(Custom(
92                Status::InternalServerError,
93                String::from("Error creating user"),
94            ));
95        }
96    };
97
98    // Create a JWT token for the new user
99    let (token, session_id) = create_auth_token_and_session(
100        pool,
101        &user,
102        auth_config,
103    ).await?;
104
105    // Set session cookie if using cookies
106    let mut cookie = Cookie::new("session_id", session_id.to_string());
107    cookie.set_path("/");
108    cookie.set_http_only(true);
109    cookie.set_same_site(rocket::http::SameSite::Strict);
110    cookies.add(cookie);
111
112    Ok(json!({
113        "token": token,
114        "user": {
115            "id": user.id,
116            "email": user.email,
117            "created_at": user.created_at,
118            "active": user.active
119        }
120    }))
121}
122
123/// Login a user
124#[post("/auth/login", data = "<data>")]
125pub async fn handle_login(
126    pool: &State<Pool>, 
127    auth_config: &State<AuthConfig>,
128    data: String,
129    cookies: &CookieJar<'_>
130) -> Result<rocket::serde::json::Value, Custom<String>> {
131    let data = match serde_json5::from_str::<serde_json::Value>(&data) {
132        Ok(d) => d,
133        Err(_) => return Err(Custom(Status::BadRequest, String::from("Not a valid JSON object"))),
134    };
135
136    let email = match data.get("email").and_then(|e| e.as_str()) {
137        Some(e) => e,
138        None => {
139            return Err(Custom(
140                Status::BadRequest,
141                String::from("Email is required and must be a string"),
142            ))
143        }
144    };
145
146    let password = match data.get("password").and_then(|p| p.as_str()) {
147        Some(p) => p,
148        None => {
149            return Err(Custom(
150                Status::BadRequest,
151                String::from("Password is required and must be a string"),
152            ))
153        }
154    };
155
156    // Get user from database
157    let user = match get_user_by_email(pool, email).await {
158        Ok(user) => {
159            // Check if account is active
160            if !user.active {
161                return Err(Custom(
162                    Status::Forbidden,
163                    String::from("Account is inactive"),
164                ));
165            }
166            
167            // Check password
168            let salted = format!("{}{}", password, user.salt);
169            let mut hasher = Sha256::new();
170            hasher.update(salted.as_bytes());
171            let hashed_password = hex::encode(hasher.finalize());
172            
173            if hashed_password != user.password {
174                // Record the failed attempt
175                let _ = record_login_attempt(pool, user.id, false).await;
176                return Err(Custom(
177                    Status::Unauthorized,
178                    String::from("Invalid credentials"),
179                ));
180            }
181            
182            // Record successful login
183            match record_login_attempt(pool, user.id, true).await {
184                Ok(updated_user) => updated_user,
185                Err(e) => {
186                    log::error!("Error recording login attempt: {}", e);
187                    user
188                }
189            }
190        },
191        Err(_) => {
192            return Err(Custom(
193                Status::Unauthorized,
194                String::from("Invalid credentials"),
195            ));
196        }
197    };
198
199    // Create JWT token and session
200    let (token, session_id) = create_auth_token_and_session(
201        pool,
202        &user,
203        auth_config,
204    ).await?;
205
206    // Set session cookie
207    let mut cookie = Cookie::new("session_id", session_id.to_string());
208    cookie.set_path("/");
209    cookie.set_http_only(true);
210    cookie.set_same_site(rocket::http::SameSite::Strict);
211    cookies.add(cookie);
212
213    Ok(json!({
214        "token": token,
215        "user": {
216            "id": user.id,
217            "email": user.email,
218            "created_at": user.created_at,
219            "active": user.active
220        }
221    }))
222}
223
224/// Get the current user's profile
225#[get("/auth/me")]
226pub async fn get_current_user(
227    user: User,
228) -> Result<rocket::serde::json::Value, Custom<String>> {
229    // Just return the basic user info
230    Ok(json!({
231        "id": user.id,
232        "email": user.email,
233        "created_at": user.created_at,
234        "updated_at": user.updated_at,
235        "active": user.active,
236        "last_login_at": user.last_login_at,
237    }))
238}
239
240/// Update user profile information
241#[put("/users/profile", data = "<data>")]
242pub async fn update_profile(
243    user: User,
244    pool: &State<Pool>,
245    data: String,
246) -> Result<rocket::serde::json::Value, Custom<String>> {
247    let data = match serde_json5::from_str::<serde_json::Value>(&data) {
248        Ok(d) => d,
249        Err(_) => return Err(Custom(Status::BadRequest, String::from("Not a valid JSON object"))),
250    };
251
252    // Extract optional profile fields
253    let first_name = data.get("first_name").and_then(|f| f.as_str());
254    let last_name = data.get("last_name").and_then(|l| l.as_str());
255    let full_name = data.get("full_name").and_then(|f| f.as_str());
256
257    // Update PII info if provided
258    if first_name.is_some() || last_name.is_some() || full_name.is_some() {
259        if let Err(e) = update_user_pii(pool, user.id, first_name, last_name, full_name).await {
260            log::error!("Error updating user PII: {}", e);
261            return Err(Custom(
262                Status::InternalServerError,
263                String::from("Error updating profile information"),
264            ));
265        }
266    }
267
268    // Extract user preferences
269    let timezone = data.get("timezone").and_then(|t| t.as_str());
270    let language = data.get("language").and_then(|l| l.as_str());
271    let theme = data.get("theme").and_then(|t| t.as_str());
272    let onboarding_completed = data.get("onboarding_completed").and_then(|o| o.as_bool());
273
274    // Update meta info if provided
275    if timezone.is_some() || language.is_some() || theme.is_some() || onboarding_completed.is_some() {
276        if let Err(e) = update_user_meta(
277            pool, 
278            user.id, 
279            timezone, 
280            language, 
281            theme, 
282            onboarding_completed
283        ).await {
284            log::error!("Error updating user preferences: {}", e);
285            return Err(Custom(
286                Status::InternalServerError,
287                String::from("Error updating profile preferences"),
288            ));
289        }
290    }
291
292    Ok(json!({
293        "message": "Profile updated successfully"
294    }))
295}
296
297/// Change user password
298#[put("/auth/change-password", data = "<data>")]
299pub async fn change_password(
300    user: User,
301    pool: &State<Pool>,
302    auth_config: &State<AuthConfig>,
303    data: String,
304) -> Result<rocket::serde::json::Value, Custom<String>> {
305    // Parse the incoming JSON data
306    let data = match serde_json5::from_str::<serde_json::Value>(&data) {
307        Ok(d) => d,
308        Err(_) => return Err(Custom(
309            Status::BadRequest, 
310            String::from("Invalid JSON request")
311        )),
312    };
313
314    // Validate current password
315    let current_password = match data.get("current_password").and_then(|p| p.as_str()) {
316        Some(p) if !p.is_empty() => p,
317        _ => return Err(Custom(
318            Status::BadRequest,
319            String::from("Current password is required")
320        )),
321    };
322
323    // Validate new password
324    let new_password = match data.get("new_password").and_then(|p| p.as_str()) {
325        Some(p) if is_password_valid(p) => p,
326        _ => return Err(Custom(
327            Status::BadRequest,
328            String::from("Invalid new password. Must be 12+ characters with mix of uppercase, lowercase, numbers, and symbols")
329        )),
330    };
331
332    // Prevent password reuse
333    if current_password == new_password {
334        return Err(Custom(
335            Status::BadRequest,
336            String::from("New password cannot be the same as current password")
337        ));
338    }
339
340    // Verify current password
341    let salted_current = format!("{}{}", current_password, user.salt);
342    let mut current_hasher = Sha256::new();
343    current_hasher.update(salted_current.as_bytes());
344    let current_hashed_password = hex::encode(current_hasher.finalize());
345    
346    log::info!("DB Salt: {}", user.salt);
347    log::info!("DB Password Hash: {}", user.password);
348    log::info!("Current Password: {}", current_password);
349    log::info!("Salted Current: {}", salted_current);
350    log::info!("Computed Hash: {}", current_hashed_password);
351
352    // Constant-time comparison to prevent timing attacks
353    if current_hashed_password != user.password {
354        return Err(Custom(
355            Status::Unauthorized,
356            String::from("Current password is incorrect")
357        ));
358    }
359
360    // Generate a new salt for the new password
361    let mut rng = OsRng;
362    let mut new_salt = [0u8; 16];
363    rng.try_fill_bytes(&mut new_salt);
364    let new_salt_hex = hex::encode(new_salt);
365    
366    // Hash the new password
367    let new_salted = format!("{}{}", new_password, new_salt_hex);
368    let mut new_hasher = Sha256::new();
369    new_hasher.update(new_salted.as_bytes());
370    let new_password_hash = hex::encode(new_hasher.finalize());
371
372    // Update password with additional security settings
373    match update_user_security(
374        pool,
375        user.id,
376        Some(&new_password_hash),
377        Some(&new_salt_hex),
378        None,
379        None,
380    ).await {
381        Ok(_) => {
382            // Invalidate all existing sessions after password change
383            match invalidate_all_user_sessions(pool, user.id).await {
384                Ok(_) => log::info!("All sessions invalidated for user {}", user.id),
385                Err(e) => log::warn!("Failed to invalidate sessions: {}", e),
386            }
387
388            Ok(json!({
389                "message": "Password changed successfully",
390                "action": "All existing sessions have been terminated"
391            }))
392        },
393        Err(e) => {
394            log::error!("Password update failed: {}", e);
395            Err(Custom(
396                Status::InternalServerError,
397                String::from("Failed to update password")
398            ))
399        }
400    }
401}
402
403/// Validate password strength
404fn is_password_valid(password: &str) -> bool {
405    // Comprehensive password strength check
406    password.len() >= 12 && 
407    password.chars().any(|c| c.is_uppercase()) &&
408    password.chars().any(|c| c.is_lowercase()) &&
409    password.chars().any(|c| c.is_numeric()) &&
410    password.chars().any(|c| !c.is_alphanumeric())
411}
412
413/// Constant-time comparison to prevent timing attacks
414fn constant_time_compare(a: &str, b: &str) -> bool {
415    if a.len() != b.len() {
416        return false;
417    }
418    
419    a.bytes().zip(b.bytes()).fold(0, |acc, (x, y)| acc | (x ^ y)) == 0
420}
421
422/// Logout the current user
423#[post("/auth/logout")]
424pub async fn logout(
425    cookies: &CookieJar<'_>,
426    _user: User, // Renamed to _user since we don't use it directly
427    pool: &State<Pool>,
428) -> Result<rocket::serde::json::Value, Custom<String>> {
429    // Get session token from cookie
430    if let Some(session_cookie) = cookies.get("session_id") {
431        // Invalidate the session
432        if let Err(e) = invalidate_session(pool, session_cookie.value()).await {
433            log::error!("Error invalidating session: {}", e);
434            return Err(Custom(
435                Status::InternalServerError,
436                String::from("Error logging out"),
437            ));
438        }
439
440        // Remove session cookie
441        cookies.remove(Cookie::named("session_id"));
442    }
443
444    Ok(json!({
445        "message": "Logged out successfully"
446    }))
447}
448
449/// Helper function to create a JWT token and session
450async fn create_auth_token_and_session(
451    pool: &State<Pool>,
452    user: &User,
453    auth_config: &State<AuthConfig>,
454) -> Result<(String, i64), Custom<String>> {
455    // Create JWT token
456    let now = Utc::now();
457    let exp = (now + Duration::hours(auth_config.token_expiry_hours)).timestamp() as usize;
458    
459    // Generate a unique session token
460    let session_token = Uuid::new_v4().to_string();
461    
462    let claims = Claims {
463        sub: user.id.to_string(),
464        exp,
465        iat: now.timestamp() as usize,
466        user_data: user.clone(),
467    };
468    
469    let token = match encode(
470        &Header::default(),
471        &claims,
472        &EncodingKey::from_secret(auth_config.jwt_secret.as_bytes()),
473    ) {
474        Ok(t) => t,
475        Err(_) => {
476            return Err(Custom(
477                Status::InternalServerError,
478                String::from("Error creating authentication token"),
479            ));
480        }
481    };
482    
483    // Create a new session
484    let ip = "unknown".to_string();
485    let ua = "unknown".to_string();
486    let expires_at = now + Duration::hours(auth_config.token_expiry_hours);
487    
488    let session_id = match create_session(
489        pool,
490        user.id,
491        &session_token,
492        None,
493        &ip,
494        &ua,
495        expires_at,
496    ).await {
497        Ok(id) => id,
498        Err(e) => {
499            log::error!("Error creating session: {}", e);
500            return Err(Custom(
501                Status::InternalServerError,
502                String::from("Error creating user session"),
503            ));
504        }
505    };
506    
507    Ok((token, session_id))
508}
509
510///list all sessions for a user
511#[get("/auth/sessions")]
512pub async fn list_user_sessions(
513    user: User,
514    pool: &State<Pool>,
515) -> Result<rocket::serde::json::Value, Custom<String>> {
516    // Fetch user sessions
517    match get_user_sessions(pool, user.id).await {
518        Ok(sessions) => Ok(json!({
519            "sessions": sessions
520        })),
521        Err(e) => {
522            log::error!("Error fetching user sessions: {}", e);
523            Err(Custom(
524                Status::InternalServerError,
525                String::from("Error fetching user sessions"),
526            ))
527        }
528    }
529}
530
531/// Invalidate a specific session
532#[delete("/auth/sessions/<session_id>")]
533pub async fn invalidate_user_session(
534    user: User,
535    session_id: String,
536    pool: &State<Pool>,
537) -> Result<rocket::serde::json::Value, Custom<String>> {
538    // Invalidate the session
539    match invalidate_session(pool, &session_id).await {
540        Ok(_) => Ok(json!({
541            "message": "Session invalidated successfully"
542        })),
543        Err(e) => {
544            log::error!("Error invalidating session: {}", e);
545            Err(Custom(
546                Status::InternalServerError,
547                String::from("Error invalidating session"),
548            ))
549        }
550    }
551}
552
553/// Get the current user's complete profile including meta and PII data
554#[get("/users/profile")]
555pub async fn get_user_profile(
556    user: User,
557    pool: &State<Pool>,
558) -> Result<rocket::serde::json::Value, Custom<String>> {
559    // Fetch user meta data
560    let user_meta = match get_user_meta(pool, user.id).await {
561        Ok(meta) => meta,
562        Err(e) => {
563            log::error!("Error fetching user meta: {}", e);
564            return Err(Custom(
565                Status::InternalServerError,
566                String::from("Error fetching user preferences"),
567            ));
568        }
569    };
570    
571    // Fetch user PII data
572    let user_pii = match get_user_pii(pool, user.id).await {
573        Ok(pii) => pii,
574        Err(e) => {
575            log::error!("Error fetching user PII: {}", e);
576            return Err(Custom(
577                Status::InternalServerError,
578                String::from("Error fetching user personal information"),
579            ));
580        }
581    };
582    
583    // Combine all data into a single response
584    Ok(json!({
585        // Basic user information
586        "id": user.id,
587        "email": user.email,
588        "email_verified": user.email_verified > 0,
589        "active": user.active,
590        "status": user.status,
591        "created_at": user.created_at,
592        "updated_at": user.updated_at,
593        "last_login_at": user.last_login_at,
594        
595        // User meta information
596        "timezone": user_meta.timezone,
597        "language": user_meta.language,
598        "theme": user_meta.theme,
599        "notification_preferences": user_meta.notification_preferences,
600        "profile_image": user_meta.profile_image,
601        "dashboard_layout": user_meta.dashboard_layout,
602        "onboarding_completed": user_meta.onboarding_completed > 0,
603        
604        // User PII information
605        "first_name": user_pii.first_name,
606        "last_name": user_pii.last_name,
607        "full_name": user_pii.full_name,
608        "identity_verified": user_pii.identity_verified > 0
609    }))
610}
611
612/// Update user profile information - handles both PII and meta updates in a single endpoint
613#[put("/users/profile", data = "<data>")]
614pub async fn update_user_profile(
615    user: User,
616    pool: &State<Pool>,
617    data: String,
618) -> Result<rocket::serde::json::Value, Custom<String>> {
619    // Parse the incoming JSON data
620    let data = match serde_json5::from_str::<serde_json::Value>(&data) {
621        Ok(d) => d,
622        Err(_) => return Err(Custom(
623            Status::BadRequest, 
624            String::from("Invalid JSON request")
625        )),
626    };
627    
628    // Extract optional profile fields (PII)
629    let first_name = data.get("first_name").and_then(|f| f.as_str());
630    let last_name = data.get("last_name").and_then(|l| l.as_str());
631    let full_name = data.get("full_name").and_then(|f| f.as_str());
632
633    // Update PII info if provided
634    if first_name.is_some() || last_name.is_some() || full_name.is_some() {
635        if let Err(e) = update_user_pii(pool, user.id, first_name, last_name, full_name).await {
636            log::error!("Error updating user PII: {}", e);
637            return Err(Custom(
638                Status::InternalServerError,
639                String::from("Error updating profile information"),
640            ));
641        }
642    }
643
644    // Extract user preferences
645    let timezone = data.get("timezone").and_then(|t| t.as_str());
646    let language = data.get("language").and_then(|l| l.as_str());
647    let theme = data.get("theme").and_then(|t| t.as_str());
648    let onboarding_completed = data.get("onboarding_completed").and_then(|o| o.as_bool());
649
650    // Update meta info if provided
651    if timezone.is_some() || language.is_some() || theme.is_some() || onboarding_completed.is_some() {
652        if let Err(e) = update_user_meta(
653            pool, 
654            user.id, 
655            timezone, 
656            language, 
657            theme, 
658            onboarding_completed
659        ).await {
660            log::error!("Error updating user preferences: {}", e);
661            return Err(Custom(
662                Status::InternalServerError,
663                String::from("Error updating profile preferences"),
664            ));
665        }
666    }
667
668    Ok(json!({
669        "message": "Profile updated successfully",
670        "updated_fields": {
671            "pii": {
672                "first_name": first_name.is_some(),
673                "last_name": last_name.is_some(),
674                "full_name": full_name.is_some(),
675            },
676            "meta": {
677                "timezone": timezone.is_some(),
678                "language": language.is_some(),
679                "theme": theme.is_some(),
680                "onboarding_completed": onboarding_completed.is_some(),
681            }
682        }
683    }))
684}
685
686/// List  all users
687#[get("/users?<page>&<per_page>")]
688pub async fn list_users(
689    page: Option<i64>,
690    per_page: Option<i64>,
691    pool: &State<Pool>,
692) -> Result<rocket::serde::json::Value, Custom<String>> {
693    match (page, per_page) {
694        (Some(page), Some(per_page)) => {
695            // Fetch paginated users and total count
696            let users = match queries::user::list_users(pool, page, per_page).await {
697                Ok(u) => u,
698                Err(e) => {
699                    log::error!("Error fetching users: {}", e);
700                    return Err(Custom(
701                        Status::InternalServerError,
702                        String::from("Error fetching users"),
703                    ));
704                }
705            };
706            let total_count = match queries::user::count_users(pool).await {
707                Ok(c) => c,
708                Err(e) => {
709                    log::error!("Error counting users: {}", e);
710                    return Err(Custom(
711                        Status::InternalServerError,
712                        String::from("Error counting users"),
713                    ));
714                }
715            };
716            let total_pages = ((total_count as f64) / (per_page as f64)).ceil() as i64;
717
718            Ok(json!({
719                "users": users,
720                "pagination": {
721                    "page": page,
722                    "per_page": per_page,
723                    "total_count": total_count,
724                    "total_pages": total_pages
725                }
726            }))
727        }
728        _ => Err(Custom(
729            Status::BadRequest,
730            String::from("Missing pagination parameters: please provide both 'page' and 'per_page'")
731        ))
732    }
733}