omni_orchestrator/schemas/v1/api/logging/
mod.rs

1use rocket::serde::json::{json, Value, Json};
2use rocket::State;
3use rocket::http::Status;
4use std::fs;
5use chrono::{DateTime, Utc};
6use serde::{Serialize, Deserialize};
7use clickhouse::Client;
8use uuid::Uuid;
9
10// Enum for log levels matching ClickHouse schema
11#[derive(Debug, Serialize, Deserialize)]
12#[serde(rename_all = "lowercase")]
13pub enum LogLevel {
14    Debug,
15    Info,
16    Warn,
17    Error,
18    Fatal,
19}
20
21// Log entry model for deserialization from ClickHouse
22#[derive(Debug, Serialize, Deserialize, clickhouse::Row)]
23pub struct LogEntry {
24    pub log_id: Option<String>,         // Optional, will generate if missing
25    pub timestamp: DateTime<Utc>,
26    pub platform_id: String,
27    pub org_id: String,
28    pub app_id: String,
29    pub instance_id: String,
30    pub level: LogLevel,
31    pub message: String,
32    pub context: serde_json::Value,    // Structured JSON context
33}
34
35// Log entry for internal processing
36#[derive(Debug, Serialize, Deserialize)]
37pub struct LogResponse {
38    pub log_id: String,
39    pub timestamp: DateTime<Utc>,
40    pub platform_id: String,
41    pub org_id: String,
42    pub app_id: String,
43    pub instance_id: String,
44    pub level: String,               // String for API response
45    pub message: String,
46    pub context: serde_json::Value,  // Structured JSON context
47}
48
49// Pagination structure
50#[derive(Debug, Serialize)]
51pub struct Pagination {
52    pub page: i64,
53    pub per_page: i64,
54    pub total_count: i64,
55    pub total_pages: i64,
56}
57
58// Structure for bulk log insertion
59#[derive(Debug, Serialize, Deserialize)]
60pub struct BulkLogInsert {
61    pub logs: Vec<LogEntry>,
62}
63
64// Custom struct for error stats results
65#[derive(Debug, Serialize, Deserialize)]
66struct ErrorStat {
67    platform_id: String,
68    org_id: String,
69    app_id: String,
70    level: u8,
71    count: u64,
72    event_date: chrono::NaiveDate,
73}
74
75// ClickHouse DB initialization
76pub async fn init_clickhouse_db(client: &Client, schema_path: &str) -> Result<(), clickhouse::error::Error> {
77    // Read the SQL schema file
78    let schema_sql = match fs::read_to_string(schema_path) {
79        Ok(content) => content,
80        Err(e) => {
81            eprintln!("Failed to read schema file: {}", e);
82            return Err(clickhouse::error::Error::Custom(format!("Failed to read schema file: {}", e)));
83        }
84    };
85    
86    // Proper SQL parsing: Split by semicolons and handle each statement carefully
87    let statements: Vec<String> = schema_sql
88        .split(';')
89        .map(|s| s.trim())
90        .filter(|s| !s.is_empty() && !s.starts_with("--"))
91        .map(|s| s.to_string())
92        .collect();
93    
94    println!("Found {} SQL statements to execute", statements.len());
95    
96    // Execute each statement separately
97    for (i, stmt) in statements.iter().enumerate() {
98        if stmt.trim().is_empty() {
99            continue; // Skip truly empty statements
100        }
101        
102        println!("Executing statement {}/{}: {} characters", i+1, statements.len(), stmt.len());
103        match client.query(stmt).execute().await {
104            Ok(_) => println!("Statement {}/{} executed successfully", i+1, statements.len()),
105            Err(e) => {
106                eprintln!("Failed to execute statement {}/{}: {:?}", i+1, statements.len(), e);
107                eprintln!("Statement content: {}", stmt);
108                return Err(e);
109            }
110        }
111    }
112    
113    Ok(())
114}
115
116// Function to fetch logs with pagination using optimized ClickHouse queries
117async fn fetch_logs_paginated(
118    client: &Client,
119    query_conditions: &str,
120    page: i64,
121    per_page: i64,
122) -> Result<(Vec<LogResponse>, i64), clickhouse::error::Error> {
123    // Calculate offset for pagination
124    let offset = (page - 1) * per_page;
125    
126    // Count total matching logs (using optimized query)
127    let count_sql = format!(
128        "SELECT count() FROM omni_logs.logs WHERE {}",
129        query_conditions
130    );
131    
132    // Use prewhere for better performance when filtering
133    let logs_sql = format!(
134        r#"
135        SELECT 
136            log_id, 
137            timestamp, 
138            platform_id, 
139            org_id, 
140            app_id, 
141            instance_id, 
142            level, 
143            message, 
144            context
145        FROM omni_logs.logs
146        PREWHERE {}
147        ORDER BY timestamp DESC
148        LIMIT {} OFFSET {}
149        "#,
150        query_conditions, 
151        per_page, 
152        offset
153    );
154    
155    // Execute the count query
156    let count = client
157        .query(&count_sql)
158        .fetch_one::<i64>()
159        .await?;  // Extract the value from the tuple
160    
161    // Execute logs query
162    let rows: Vec<LogEntry> = client.query(&logs_sql).fetch_all().await?;
163    
164    // Convert from internal types to response types
165    let mut logs = Vec::with_capacity(rows.len());
166    
167    for row in rows {
168        let log_id: String = row.log_id.ok_or(clickhouse::error::Error::Custom("Missing log_id".to_string()))?;
169        let timestamp: DateTime<Utc> = row.timestamp;
170        let platform_id: String = row.platform_id;
171        let org_id: String = row.org_id;
172        let app_id: String = row.app_id;
173        let instance_id: String = row.instance_id;
174        
175        // Convert enum to string
176        let level_num: u8 = row.level as u8;
177        let level = match level_num {
178            1 => "debug",
179            2 => "info",
180            3 => "warn",
181            4 => "error",
182            5 => "fatal",
183            _ => "unknown",
184        };
185        
186        let message: String = row.message;
187        
188        // Parse context JSON
189        let context_str: String = row.context.to_string();
190        let context: serde_json::Value = serde_json::from_str(&context_str)
191            .unwrap_or(serde_json::Value::Null);
192        
193        logs.push(LogResponse {
194            log_id,
195            timestamp,
196            platform_id,
197            org_id,
198            app_id,
199            instance_id,
200            level: level.to_string(),
201            message,
202            context,
203        });
204    }
205    
206    Ok((logs, count))
207}
208
209// Main logs endpoint with filtering and pagination
210#[get("/logs?<page>&<per_page>&<platform_id>&<org_id>&<app_id>&<instance_id>&<level>&<start_time>&<end_time>&<search>")]
211pub async fn list_logs(
212    page: Option<i64>,
213    per_page: Option<i64>,
214    platform_id: Option<String>,
215    org_id: Option<String>,
216    app_id: Option<String>,
217    instance_id: Option<String>,
218    level: Option<String>,
219    start_time: Option<String>,
220    end_time: Option<String>,
221    search: Option<String>,
222    clickhouse: &State<Client>,
223) -> Result<Json<Value>, (Status, Json<Value>)> {
224    // Default pagination values
225    let page = page.unwrap_or(1);
226    let per_page = per_page.unwrap_or(50);
227    
228    if page < 1 || per_page < 1 || per_page > 1000 {
229        return Err((
230            Status::BadRequest,
231            Json(json!({
232                "error": "Invalid pagination parameters",
233                "message": "Page must be ≥ 1 and per_page must be between 1 and 1000"
234            }))
235        ));
236    }
237    
238    // Build optimized query conditions
239    let mut conditions = Vec::new();
240    
241    // FIX: Removed unused variable warning by prefixing with underscore
242    let _using_hierarchy_filter = platform_id.is_some() || org_id.is_some() || app_id.is_some();
243    
244    if let Some(pid) = platform_id {
245        conditions.push(format!("platform_id = '{}'", pid.replace('\'', "''")));
246    }
247    
248    if let Some(oid) = org_id {
249        conditions.push(format!("org_id = '{}'", oid.replace('\'', "''")));
250    }
251    
252    if let Some(aid) = app_id {
253        conditions.push(format!("app_id = '{}'", aid.replace('\'', "''")));
254    }
255    
256    if let Some(iid) = instance_id {
257        conditions.push(format!("instance_id = '{}'", iid.replace('\'', "''")));
258    }
259    
260    if let Some(lvl) = level {
261        // Convert string level to enum
262        let level_enum = match lvl.to_lowercase().as_str() {
263            "debug" => 1,
264            "info" => 2,
265            "warn" => 3,
266            "error" => 4, 
267            "fatal" => 5,
268            _ => {
269                return Err((
270                    Status::BadRequest,
271                    Json(json!({
272                        "error": "Invalid log level",
273                        "message": "Level must be one of: debug, info, warn, error, fatal"
274                    }))
275                ));
276            }
277        };
278        conditions.push(format!("level = {}", level_enum));
279    }
280    
281    // Optimize date range searches using partitioning
282    if let Some(st) = start_time {
283        conditions.push(format!("timestamp >= toDateTime64('{}', 3, 'UTC')", st));
284        
285        // Add event_date condition for better partition pruning
286        conditions.push(format!("event_date >= toDate('{}')", st));
287    }
288    
289    if let Some(et) = end_time {
290        conditions.push(format!("timestamp <= toDateTime64('{}', 3, 'UTC')", et));
291        
292        // Add event_date condition for better partition pruning
293        conditions.push(format!("event_date <= toDate('{}')", et));
294    }
295    
296    // Use token bloom filter for message search instead of slow LIKE
297    if let Some(term) = search {
298        let escaped_term = term.replace('\'', "''");
299        conditions.push(format!("message ILIKE '%{}%'", escaped_term));
300    }
301    
302    // Default condition if none provided
303    let query_conditions = if conditions.is_empty() {
304        "1=1".to_string()
305    } else {
306        conditions.join(" AND ")
307    };
308    
309    // Fetch logs with pagination
310    match fetch_logs_paginated(clickhouse, &query_conditions, page, per_page).await {
311        Ok((logs, total_count)) => {
312            let total_pages = (total_count + per_page - 1) / per_page; // Ceiling division
313            
314            let response = json!({
315                "logs": logs,
316                "pagination": {
317                    "page": page,
318                    "per_page": per_page,
319                    "total_count": total_count,
320                    "total_pages": total_pages
321                }
322            });
323            
324            Ok(Json(response))
325        },
326        Err(err) => Err((
327            Status::InternalServerError,
328            Json(json!({
329                "error": "Database error",
330                "message": err.to_string()
331            }))
332        ))
333    }
334}
335
336// Platform routes - reuse the main list_logs with prefilled platform_id
337#[get("/platforms/<platform_id>/logs?<page>&<per_page>&<level>&<start_time>&<end_time>&<search>")]
338pub async fn list_platform_logs(
339    platform_id: String,
340    page: Option<i64>,
341    per_page: Option<i64>,
342    level: Option<String>,
343    start_time: Option<String>,
344    end_time: Option<String>,
345    search: Option<String>,
346    clickhouse: &State<Client>,
347) -> Result<Json<Value>, (Status, Json<Value>)> {
348    list_logs(
349        page,
350        per_page,
351        Some(platform_id),
352        None,
353        None,
354        None,
355        level,
356        start_time,
357        end_time,
358        search,
359        clickhouse,
360    ).await
361}
362
363// Organization routes
364#[get("/orgs/<org_id>/logs?<page>&<per_page>&<platform_id>&<level>&<start_time>&<end_time>&<search>")]
365pub async fn list_org_logs(
366    org_id: String,
367    page: Option<i64>,
368    per_page: Option<i64>,
369    platform_id: Option<String>,
370    level: Option<String>,
371    start_time: Option<String>,
372    end_time: Option<String>,
373    search: Option<String>,
374    clickhouse: &State<Client>,
375) -> Result<Json<Value>, (Status, Json<Value>)> {
376    list_logs(
377        page,
378        per_page,
379        platform_id,
380        Some(org_id),
381        None,
382        None,
383        level,
384        start_time,
385        end_time,
386        search,
387        clickhouse,
388    ).await
389}
390
391// App routes
392#[get("/apps/<app_id>/logs?<page>&<per_page>&<platform_id>&<org_id>&<level>&<start_time>&<end_time>&<search>")]
393pub async fn list_app_logs(
394    app_id: String,
395    page: Option<i64>,
396    per_page: Option<i64>,
397    platform_id: Option<String>,
398    org_id: Option<String>,
399    level: Option<String>,
400    start_time: Option<String>,
401    end_time: Option<String>,
402    search: Option<String>,
403    clickhouse: &State<Client>,
404) -> Result<Json<Value>, (Status, Json<Value>)> {
405    list_logs(
406        page,
407        per_page,
408        platform_id,
409        org_id,
410        Some(app_id),
411        None,
412        level,
413        start_time,
414        end_time,
415        search,
416        clickhouse,
417    ).await
418}
419
420// Instance routes
421#[get("/instances/<instance_id>/logs?<page>&<per_page>&<platform_id>&<org_id>&<app_id>&<level>&<start_time>&<end_time>&<search>")]
422pub async fn list_instance_logs(
423    instance_id: String,
424    page: Option<i64>,
425    per_page: Option<i64>,
426    platform_id: Option<String>,
427    org_id: Option<String>,
428    app_id: Option<String>,
429    level: Option<String>,
430    start_time: Option<String>,
431    end_time: Option<String>,
432    search: Option<String>,
433    clickhouse: &State<Client>,
434) -> Result<Json<Value>, (Status, Json<Value>)> {
435    list_logs(
436        page,
437        per_page,
438        platform_id,
439        org_id,
440        app_id,
441        Some(instance_id),
442        level,
443        start_time,
444        end_time,
445        search,
446        clickhouse,
447    ).await
448}
449
450// Efficient bulk log insertion - using multiple rows approach instead of tuples
451#[post("/logs", format = "json", data = "<log_batch>")]
452pub async fn insert_logs(
453    log_batch: Json<BulkLogInsert>,
454    clickhouse: &State<Client>,
455) -> Result<Json<Value>, (Status, Json<Value>)> {
456    let logs = log_batch.into_inner().logs;
457    
458    if logs.is_empty() {
459        return Ok(Json(json!({
460            "status": "success",
461            "message": "No logs to insert",
462            "count": 0
463        })));
464    }
465    
466    // FIX: Use individual inserts instead of tuples to avoid the Row trait limitation
467    let mut inserted_count = 0;
468    
469    // Start a transaction
470    let _tx = clickhouse.query("BEGIN TRANSACTION").execute().await;
471    
472    for mut log in logs {
473        // Generate UUID if not provided
474        if log.log_id.is_none() {
475            log.log_id = Some(Uuid::new_v4().to_string());
476        }
477        
478        // Serialize context to string
479        let context_str = serde_json::to_string(&log.context)
480            .unwrap_or_else(|_| "{}".to_string());
481        
482        // Convert level to u8
483        let level_num = match log.level {
484            LogLevel::Debug => 1_u8,
485            LogLevel::Info => 2_u8,
486            LogLevel::Warn => 3_u8,
487            LogLevel::Error => 4_u8,
488            LogLevel::Fatal => 5_u8,
489        };
490        
491        // Insert as a single row using SQL parameters
492        let insert_sql = format!(
493            r#"
494            INSERT INTO omni_logs.logs
495            (log_id, timestamp, platform_id, org_id, app_id, instance_id, level, message, context)
496            VALUES ('{}', '{}', '{}', '{}', '{}', '{}', {}, '{}', '{}')
497            "#,
498            log.log_id.unwrap().replace('\'', "''"),
499            log.timestamp.format("%Y-%m-%d %H:%M:%S%.3f"),
500            log.platform_id.replace('\'', "''"),
501            log.org_id.replace('\'', "''"),
502            log.app_id.replace('\'', "''"),
503            log.instance_id.replace('\'', "''"),
504            level_num,
505            log.message.replace('\'', "''"),
506            context_str.replace('\'', "''")
507        );
508        
509        if let Err(err) = clickhouse.query(&insert_sql).execute().await {
510            // Rollback if there's an error
511            let _ = clickhouse.query("ROLLBACK").execute().await;
512            
513            return Err((
514                Status::InternalServerError,
515                Json(json!({
516                    "error": "Failed to insert log",
517                    "message": err.to_string(),
518                    "count": inserted_count
519                }))
520            ));
521        }
522        
523        inserted_count += 1;
524    }
525    
526    // Commit the transaction
527    if let Err(err) = clickhouse.query("COMMIT").execute().await {
528        return Err((
529            Status::InternalServerError,
530            Json(json!({
531                "error": "Failed to commit transaction",
532                "message": err.to_string(),
533                "count": inserted_count
534            }))
535        ));
536    }
537    
538    Ok(Json(json!({
539        "status": "success",
540        "message": "Logs inserted successfully",
541        "count": inserted_count
542    })))
543}
544
545// Function to register all routes
546pub fn routes() -> Vec<rocket::Route> {
547    routes![
548        list_logs,
549        list_platform_logs,
550        list_org_logs,
551        list_app_logs,
552        list_instance_logs,
553        insert_logs
554    ]
555}