omni_orchestrator/schemas/v1/api/
users.rs

1use super::super::db::queries::user::{
2    create_user, get_user_by_email, record_login_attempt, create_session,
3    invalidate_session, update_user_security, update_user_pii, update_user_meta,
4    get_user_sessions, get_user_meta, get_user_pii
5};
6
7use crate::schemas::v1::db::queries;
8use super::super::db::queries::user::invalidate_all_user_sessions;
9use chrono::{Duration, Utc};
10use jsonwebtoken::{encode, EncodingKey, Header};
11use log;
12use rand::rngs::OsRng;
13use rand::{RngCore, TryRngCore};
14use rocket::{http::Status, http::Cookie, http::CookieJar};
15use rocket::serde::json::json;
16use rocket::{get, post, put};
17use rocket::response::status::Custom;
18use rocket::State;
19use sha2::{Digest, Sha256};
20use sqlx::mysql::MySqlPool as Pool;
21use uuid::Uuid;
22
23use libomni::types::db::v1 as types;
24use libomni::types::db::auth::{AuthConfig, Claims};
25use types::user::User;
26
27/// Register a new user
28#[post("/auth/register", data = "<data>")]
29pub async fn handle_register(
30    pool: &State<Pool>, 
31    data: String,
32    cookies: &CookieJar<'_>,
33    auth_config: &State<AuthConfig>
34) -> Result<rocket::serde::json::Value, Custom<String>> {
35    let data = match serde_json5::from_str::<serde_json::Value>(&data) {
36        Ok(d) => d,
37        Err(_) => return Err(Custom(Status::BadRequest, String::from("Not a valid JSON object"))),
38    };
39
40    let email = match data.get("email").and_then(|e| e.as_str()) {
41        Some(e) => e,
42        None => {
43            return Err(Custom(
44                Status::BadRequest,
45                String::from("Email is required and must be a string"),
46            ))
47        }
48    };
49
50    let password = match data.get("password").and_then(|p| p.as_str()) {
51        Some(p) => p,
52        None => {
53            return Err(Custom(
54                Status::BadRequest,
55                String::from("Password is required and must be a string"),
56            ))
57        }
58    };
59
60    let name = match data.get("name").and_then(|n| n.as_str()) {
61        Some(n) => n,
62        None => {
63            return Err(Custom(
64                Status::BadRequest,
65                String::from("Name is required and must be a string"),
66            ))
67        }
68    };
69
70    // Check if user already exists
71    if let Ok(_) = get_user_by_email(pool, email).await {
72        return Err(Custom(
73            Status::Conflict,
74            String::from("User with this email already exists"),
75        ));
76    }
77
78    // Generate a random salt and hash the password
79    let mut rng = OsRng;
80    let mut salt = [0u8; 16];
81    rng.try_fill_bytes(&mut salt);
82    let salt_hex = hex::encode(salt);
83    let salted = format!("{}{}", password, salt_hex);
84    let mut hasher = Sha256::new();
85    hasher.update(salted.as_bytes());
86    let password_hash = hex::encode(hasher.finalize());
87
88    // Create the user
89    let user = match create_user(pool, email, &password_hash, &salt_hex).await {
90        Ok(user) => user,
91        Err(e) => {
92            log::error!("Error creating user: {}", e);
93            return Err(Custom(
94                Status::InternalServerError,
95                String::from("Error creating user"),
96            ));
97        }
98    };
99
100    // Create a JWT token for the new user
101    let (token, session_id) = create_auth_token_and_session(
102        pool,
103        &user,
104        auth_config,
105    ).await?;
106
107    // Set session cookie if using cookies
108    let mut cookie = Cookie::new("session_id", session_id.to_string());
109    cookie.set_path("/");
110    cookie.set_http_only(true);
111    cookie.set_same_site(rocket::http::SameSite::Strict);
112    cookies.add(cookie);
113
114    Ok(json!({
115        "token": token,
116        "user": {
117            "id": user.id,
118            "email": user.email,
119            "created_at": user.created_at,
120            "active": user.active
121        }
122    }))
123}
124
125/// Login a user
126#[post("/auth/login", data = "<data>")]
127pub async fn handle_login(
128    pool: &State<Pool>, 
129    auth_config: &State<AuthConfig>,
130    data: String,
131    cookies: &CookieJar<'_>
132) -> Result<rocket::serde::json::Value, Custom<String>> {
133    let data = match serde_json5::from_str::<serde_json::Value>(&data) {
134        Ok(d) => d,
135        Err(_) => return Err(Custom(Status::BadRequest, String::from("Not a valid JSON object"))),
136    };
137
138    let email = match data.get("email").and_then(|e| e.as_str()) {
139        Some(e) => e,
140        None => {
141            return Err(Custom(
142                Status::BadRequest,
143                String::from("Email is required and must be a string"),
144            ))
145        }
146    };
147
148    let password = match data.get("password").and_then(|p| p.as_str()) {
149        Some(p) => p,
150        None => {
151            return Err(Custom(
152                Status::BadRequest,
153                String::from("Password is required and must be a string"),
154            ))
155        }
156    };
157
158    // Get user from database
159    let user = match get_user_by_email(pool, email).await {
160        Ok(user) => {
161            // Check if account is active
162            if !user.active {
163                return Err(Custom(
164                    Status::Forbidden,
165                    String::from("Account is inactive"),
166                ));
167            }
168            
169            // Check password
170            let salted = format!("{}{}", password, user.salt);
171            let mut hasher = Sha256::new();
172            hasher.update(salted.as_bytes());
173            let hashed_password = hex::encode(hasher.finalize());
174            
175            if hashed_password != user.password {
176                // Record the failed attempt
177                let _ = record_login_attempt(pool, user.id, false).await;
178                return Err(Custom(
179                    Status::Unauthorized,
180                    String::from("Invalid credentials"),
181                ));
182            }
183            
184            // Record successful login
185            match record_login_attempt(pool, user.id, true).await {
186                Ok(updated_user) => updated_user,
187                Err(e) => {
188                    log::error!("Error recording login attempt: {}", e);
189                    user
190                }
191            }
192        },
193        Err(_) => {
194            return Err(Custom(
195                Status::Unauthorized,
196                String::from("Invalid credentials"),
197            ));
198        }
199    };
200
201    // Create JWT token and session
202    let (token, session_id) = create_auth_token_and_session(
203        pool,
204        &user,
205        auth_config,
206    ).await?;
207
208    // Set session cookie
209    let mut cookie = Cookie::new("session_id", session_id.to_string());
210    cookie.set_path("/");
211    cookie.set_http_only(true);
212    cookie.set_same_site(rocket::http::SameSite::Strict);
213    cookies.add(cookie);
214
215    Ok(json!({
216        "token": token,
217        "user": {
218            "id": user.id,
219            "email": user.email,
220            "created_at": user.created_at,
221            "active": user.active
222        }
223    }))
224}
225
226/// Get the current user's profile
227#[get("/auth/me")]
228pub async fn get_current_user(
229    user: User,
230) -> Result<rocket::serde::json::Value, Custom<String>> {
231    // Just return the basic user info
232    Ok(json!({
233        "id": user.id,
234        "email": user.email,
235        "created_at": user.created_at,
236        "updated_at": user.updated_at,
237        "active": user.active,
238        "last_login_at": user.last_login_at,
239    }))
240}
241
242/// Update user profile information
243#[put("/users/profile", data = "<data>")]
244pub async fn update_profile(
245    user: User,
246    pool: &State<Pool>,
247    data: String,
248) -> Result<rocket::serde::json::Value, Custom<String>> {
249    let data = match serde_json5::from_str::<serde_json::Value>(&data) {
250        Ok(d) => d,
251        Err(_) => return Err(Custom(Status::BadRequest, String::from("Not a valid JSON object"))),
252    };
253
254    // Extract optional profile fields
255    let first_name = data.get("first_name").and_then(|f| f.as_str());
256    let last_name = data.get("last_name").and_then(|l| l.as_str());
257    let full_name = data.get("full_name").and_then(|f| f.as_str());
258
259    // Update PII info if provided
260    if first_name.is_some() || last_name.is_some() || full_name.is_some() {
261        if let Err(e) = update_user_pii(pool, user.id, first_name, last_name, full_name).await {
262            log::error!("Error updating user PII: {}", e);
263            return Err(Custom(
264                Status::InternalServerError,
265                String::from("Error updating profile information"),
266            ));
267        }
268    }
269
270    // Extract user preferences
271    let timezone = data.get("timezone").and_then(|t| t.as_str());
272    let language = data.get("language").and_then(|l| l.as_str());
273    let theme = data.get("theme").and_then(|t| t.as_str());
274    let onboarding_completed = data.get("onboarding_completed").and_then(|o| o.as_bool());
275
276    // Update meta info if provided
277    if timezone.is_some() || language.is_some() || theme.is_some() || onboarding_completed.is_some() {
278        if let Err(e) = update_user_meta(
279            pool, 
280            user.id, 
281            timezone, 
282            language, 
283            theme, 
284            onboarding_completed
285        ).await {
286            log::error!("Error updating user preferences: {}", e);
287            return Err(Custom(
288                Status::InternalServerError,
289                String::from("Error updating profile preferences"),
290            ));
291        }
292    }
293
294    Ok(json!({
295        "message": "Profile updated successfully"
296    }))
297}
298
299/// Change user password
300#[put("/auth/change-password", data = "<data>")]
301pub async fn change_password(
302    user: User,
303    pool: &State<Pool>,
304    auth_config: &State<AuthConfig>,
305    data: String,
306) -> Result<rocket::serde::json::Value, Custom<String>> {
307    // Parse the incoming JSON data
308    let data = match serde_json5::from_str::<serde_json::Value>(&data) {
309        Ok(d) => d,
310        Err(_) => return Err(Custom(
311            Status::BadRequest, 
312            String::from("Invalid JSON request")
313        )),
314    };
315
316    // Validate current password
317    let current_password = match data.get("current_password").and_then(|p| p.as_str()) {
318        Some(p) if !p.is_empty() => p,
319        _ => return Err(Custom(
320            Status::BadRequest,
321            String::from("Current password is required")
322        )),
323    };
324
325    // Validate new password
326    let new_password = match data.get("new_password").and_then(|p| p.as_str()) {
327        Some(p) if is_password_valid(p) => p,
328        _ => return Err(Custom(
329            Status::BadRequest,
330            String::from("Invalid new password. Must be 12+ characters with mix of uppercase, lowercase, numbers, and symbols")
331        )),
332    };
333
334    // Prevent password reuse
335    if current_password == new_password {
336        return Err(Custom(
337            Status::BadRequest,
338            String::from("New password cannot be the same as current password")
339        ));
340    }
341
342    // Verify current password
343    let salted_current = format!("{}{}", current_password, user.salt);
344    let mut current_hasher = Sha256::new();
345    current_hasher.update(salted_current.as_bytes());
346    let current_hashed_password = hex::encode(current_hasher.finalize());
347    
348    log::info!("DB Salt: {}", user.salt);
349    log::info!("DB Password Hash: {}", user.password);
350    log::info!("Current Password: {}", current_password);
351    log::info!("Salted Current: {}", salted_current);
352    log::info!("Computed Hash: {}", current_hashed_password);
353
354    // Constant-time comparison to prevent timing attacks
355    if current_hashed_password != user.password {
356        return Err(Custom(
357            Status::Unauthorized,
358            String::from("Current password is incorrect")
359        ));
360    }
361
362    // Generate a new salt for the new password
363    let mut rng = OsRng;
364    let mut new_salt = [0u8; 16];
365    rng.try_fill_bytes(&mut new_salt);
366    let new_salt_hex = hex::encode(new_salt);
367    
368    // Hash the new password
369    let new_salted = format!("{}{}", new_password, new_salt_hex);
370    let mut new_hasher = Sha256::new();
371    new_hasher.update(new_salted.as_bytes());
372    let new_password_hash = hex::encode(new_hasher.finalize());
373
374    // Update password with additional security settings
375    match update_user_security(
376        pool,
377        user.id,
378        Some(&new_password_hash),
379        Some(&new_salt_hex),
380        None,
381        None,
382    ).await {
383        Ok(_) => {
384            // Invalidate all existing sessions after password change
385            match invalidate_all_user_sessions(pool, user.id).await {
386                Ok(_) => log::info!("All sessions invalidated for user {}", user.id),
387                Err(e) => log::warn!("Failed to invalidate sessions: {}", e),
388            }
389
390            Ok(json!({
391                "message": "Password changed successfully",
392                "action": "All existing sessions have been terminated"
393            }))
394        },
395        Err(e) => {
396            log::error!("Password update failed: {}", e);
397            Err(Custom(
398                Status::InternalServerError,
399                String::from("Failed to update password")
400            ))
401        }
402    }
403}
404
405/// Validate password strength
406fn is_password_valid(password: &str) -> bool {
407    // Comprehensive password strength check
408    password.len() >= 12 && 
409    password.chars().any(|c| c.is_uppercase()) &&
410    password.chars().any(|c| c.is_lowercase()) &&
411    password.chars().any(|c| c.is_numeric()) &&
412    password.chars().any(|c| !c.is_alphanumeric())
413}
414
415/// Constant-time comparison to prevent timing attacks
416fn constant_time_compare(a: &str, b: &str) -> bool {
417    if a.len() != b.len() {
418        return false;
419    }
420    
421    a.bytes().zip(b.bytes()).fold(0, |acc, (x, y)| acc | (x ^ y)) == 0
422}
423
424/// Logout the current user
425#[post("/auth/logout")]
426pub async fn logout(
427    cookies: &CookieJar<'_>,
428    _user: User, // Renamed to _user since we don't use it directly
429    pool: &State<Pool>,
430) -> Result<rocket::serde::json::Value, Custom<String>> {
431    // Get session token from cookie
432    if let Some(session_cookie) = cookies.get("session_id") {
433        // Invalidate the session
434        if let Err(e) = invalidate_session(pool, session_cookie.value()).await {
435            log::error!("Error invalidating session: {}", e);
436            return Err(Custom(
437                Status::InternalServerError,
438                String::from("Error logging out"),
439            ));
440        }
441
442        // Remove session cookie
443        cookies.remove(Cookie::named("session_id"));
444    }
445
446    Ok(json!({
447        "message": "Logged out successfully"
448    }))
449}
450
451/// Helper function to create a JWT token and session
452async fn create_auth_token_and_session(
453    pool: &State<Pool>,
454    user: &User,
455    auth_config: &State<AuthConfig>,
456) -> Result<(String, i64), Custom<String>> {
457    // Create JWT token
458    let now = Utc::now();
459    let exp = (now + Duration::hours(auth_config.token_expiry_hours)).timestamp() as usize;
460    
461    // Generate a unique session token
462    let session_token = Uuid::new_v4().to_string();
463    
464    let claims = Claims {
465        sub: user.id.to_string(),
466        exp,
467        iat: now.timestamp() as usize,
468        user_data: user.clone(),
469    };
470    
471    let token = match encode(
472        &Header::default(),
473        &claims,
474        &EncodingKey::from_secret(auth_config.jwt_secret.as_bytes()),
475    ) {
476        Ok(t) => t,
477        Err(_) => {
478            return Err(Custom(
479                Status::InternalServerError,
480                String::from("Error creating authentication token"),
481            ));
482        }
483    };
484    
485    // Create a new session
486    let ip = "unknown".to_string();
487    let ua = "unknown".to_string();
488    let expires_at = now + Duration::hours(auth_config.token_expiry_hours);
489    
490    let session_id = match create_session(
491        pool,
492        user.id,
493        &session_token,
494        None,
495        &ip,
496        &ua,
497        expires_at,
498    ).await {
499        Ok(id) => id,
500        Err(e) => {
501            log::error!("Error creating session: {}", e);
502            return Err(Custom(
503                Status::InternalServerError,
504                String::from("Error creating user session"),
505            ));
506        }
507    };
508    
509    Ok((token, session_id))
510}
511
512///list all sessions for a user
513#[get("/auth/sessions")]
514pub async fn list_user_sessions(
515    user: User,
516    pool: &State<Pool>,
517) -> Result<rocket::serde::json::Value, Custom<String>> {
518    // Fetch user sessions
519    match get_user_sessions(pool, user.id).await {
520        Ok(sessions) => Ok(json!({
521            "sessions": sessions
522        })),
523        Err(e) => {
524            log::error!("Error fetching user sessions: {}", e);
525            Err(Custom(
526                Status::InternalServerError,
527                String::from("Error fetching user sessions"),
528            ))
529        }
530    }
531}
532
533/// Invalidate a specific session
534#[delete("/auth/sessions/<session_id>")]
535pub async fn invalidate_user_session(
536    user: User,
537    session_id: String,
538    pool: &State<Pool>,
539) -> Result<rocket::serde::json::Value, Custom<String>> {
540    // Invalidate the session
541    match invalidate_session(pool, &session_id).await {
542        Ok(_) => Ok(json!({
543            "message": "Session invalidated successfully"
544        })),
545        Err(e) => {
546            log::error!("Error invalidating session: {}", e);
547            Err(Custom(
548                Status::InternalServerError,
549                String::from("Error invalidating session"),
550            ))
551        }
552    }
553}
554
555/// Get the current user's complete profile including meta and PII data
556#[get("/users/profile")]
557pub async fn get_user_profile(
558    user: User,
559    pool: &State<Pool>,
560) -> Result<rocket::serde::json::Value, Custom<String>> {
561    // Fetch user meta data
562    let user_meta = match get_user_meta(pool, user.id).await {
563        Ok(meta) => meta,
564        Err(e) => {
565            log::error!("Error fetching user meta: {}", e);
566            return Err(Custom(
567                Status::InternalServerError,
568                String::from("Error fetching user preferences"),
569            ));
570        }
571    };
572    
573    // Fetch user PII data
574    let user_pii = match get_user_pii(pool, user.id).await {
575        Ok(pii) => pii,
576        Err(e) => {
577            log::error!("Error fetching user PII: {}", e);
578            return Err(Custom(
579                Status::InternalServerError,
580                String::from("Error fetching user personal information"),
581            ));
582        }
583    };
584    
585    // Combine all data into a single response
586    Ok(json!({
587        // Basic user information
588        "id": user.id,
589        "email": user.email,
590        "email_verified": user.email_verified > 0,
591        "active": user.active,
592        "status": user.status,
593        "created_at": user.created_at,
594        "updated_at": user.updated_at,
595        "last_login_at": user.last_login_at,
596        
597        // User meta information
598        "timezone": user_meta.timezone,
599        "language": user_meta.language,
600        "theme": user_meta.theme,
601        "notification_preferences": user_meta.notification_preferences,
602        "profile_image": user_meta.profile_image,
603        "dashboard_layout": user_meta.dashboard_layout,
604        "onboarding_completed": user_meta.onboarding_completed > 0,
605        
606        // User PII information
607        "first_name": user_pii.first_name,
608        "last_name": user_pii.last_name,
609        "full_name": user_pii.full_name,
610        "identity_verified": user_pii.identity_verified > 0
611    }))
612}
613
614/// Update user profile information - handles both PII and meta updates in a single endpoint
615#[put("/users/profile", data = "<data>")]
616pub async fn update_user_profile(
617    user: User,
618    pool: &State<Pool>,
619    data: String,
620) -> Result<rocket::serde::json::Value, Custom<String>> {
621    // Parse the incoming JSON data
622    let data = match serde_json5::from_str::<serde_json::Value>(&data) {
623        Ok(d) => d,
624        Err(_) => return Err(Custom(
625            Status::BadRequest, 
626            String::from("Invalid JSON request")
627        )),
628    };
629    
630    // Extract optional profile fields (PII)
631    let first_name = data.get("first_name").and_then(|f| f.as_str());
632    let last_name = data.get("last_name").and_then(|l| l.as_str());
633    let full_name = data.get("full_name").and_then(|f| f.as_str());
634
635    // Update PII info if provided
636    if first_name.is_some() || last_name.is_some() || full_name.is_some() {
637        if let Err(e) = update_user_pii(pool, user.id, first_name, last_name, full_name).await {
638            log::error!("Error updating user PII: {}", e);
639            return Err(Custom(
640                Status::InternalServerError,
641                String::from("Error updating profile information"),
642            ));
643        }
644    }
645
646    // Extract user preferences
647    let timezone = data.get("timezone").and_then(|t| t.as_str());
648    let language = data.get("language").and_then(|l| l.as_str());
649    let theme = data.get("theme").and_then(|t| t.as_str());
650    let onboarding_completed = data.get("onboarding_completed").and_then(|o| o.as_bool());
651
652    // Update meta info if provided
653    if timezone.is_some() || language.is_some() || theme.is_some() || onboarding_completed.is_some() {
654        if let Err(e) = update_user_meta(
655            pool, 
656            user.id, 
657            timezone, 
658            language, 
659            theme, 
660            onboarding_completed
661        ).await {
662            log::error!("Error updating user preferences: {}", e);
663            return Err(Custom(
664                Status::InternalServerError,
665                String::from("Error updating profile preferences"),
666            ));
667        }
668    }
669
670    Ok(json!({
671        "message": "Profile updated successfully",
672        "updated_fields": {
673            "pii": {
674                "first_name": first_name.is_some(),
675                "last_name": last_name.is_some(),
676                "full_name": full_name.is_some(),
677            },
678            "meta": {
679                "timezone": timezone.is_some(),
680                "language": language.is_some(),
681                "theme": theme.is_some(),
682                "onboarding_completed": onboarding_completed.is_some(),
683            }
684        }
685    }))
686}
687
688/// List  all users
689#[get("/users?<page>&<per_page>")]
690pub async fn list_users(
691    page: Option<i64>,
692    per_page: Option<i64>,
693    pool: &State<Pool>,
694) -> Result<rocket::serde::json::Value, Custom<String>> {
695    match (page, per_page) {
696        (Some(page), Some(per_page)) => {
697            // Fetch paginated users and total count
698            let users = match queries::user::list_users(pool, page, per_page).await {
699                Ok(u) => u,
700                Err(e) => {
701                    log::error!("Error fetching users: {}", e);
702                    return Err(Custom(
703                        Status::InternalServerError,
704                        String::from("Error fetching users"),
705                    ));
706                }
707            };
708            let total_count = match queries::user::count_users(pool).await {
709                Ok(c) => c,
710                Err(e) => {
711                    log::error!("Error counting users: {}", e);
712                    return Err(Custom(
713                        Status::InternalServerError,
714                        String::from("Error counting users"),
715                    ));
716                }
717            };
718            let total_pages = ((total_count as f64) / (per_page as f64)).ceil() as i64;
719
720            Ok(json!({
721                "users": users,
722                "pagination": {
723                    "page": page,
724                    "per_page": per_page,
725                    "total_count": total_count,
726                    "total_pages": total_pages
727                }
728            }))
729        }
730        _ => Err(Custom(
731            Status::BadRequest,
732            String::from("Missing pagination parameters: please provide both 'page' and 'per_page'")
733        ))
734    }
735}