1use rocket::serde::json::{json, Value, Json};
2use rocket::State;
3use rocket::http::Status;
4use std::fs;
5use chrono::{DateTime, Utc};
6use serde::{Serialize, Deserialize};
7use clickhouse::Client;
8use uuid::Uuid;
9
10#[derive(Debug, Serialize, Deserialize)]
12#[serde(rename_all = "lowercase")]
13pub enum LogLevel {
14 Debug,
15 Info,
16 Warn,
17 Error,
18 Fatal,
19}
20
21#[derive(Debug, Serialize, Deserialize, clickhouse::Row)]
23pub struct LogEntry {
24 pub log_id: Option<String>, pub timestamp: DateTime<Utc>,
26 pub platform_id: String,
27 pub org_id: String,
28 pub app_id: String,
29 pub instance_id: String,
30 pub level: LogLevel,
31 pub message: String,
32 pub context: serde_json::Value, }
34
35#[derive(Debug, Serialize, Deserialize)]
37pub struct LogResponse {
38 pub log_id: String,
39 pub timestamp: DateTime<Utc>,
40 pub platform_id: String,
41 pub org_id: String,
42 pub app_id: String,
43 pub instance_id: String,
44 pub level: String, pub message: String,
46 pub context: serde_json::Value, }
48
49#[derive(Debug, Serialize)]
51pub struct Pagination {
52 pub page: i64,
53 pub per_page: i64,
54 pub total_count: i64,
55 pub total_pages: i64,
56}
57
58#[derive(Debug, Serialize, Deserialize)]
60pub struct BulkLogInsert {
61 pub logs: Vec<LogEntry>,
62}
63
64#[derive(Debug, Serialize, Deserialize)]
66struct ErrorStat {
67 platform_id: String,
68 org_id: String,
69 app_id: String,
70 level: u8,
71 count: u64,
72 event_date: chrono::NaiveDate,
73}
74
75pub async fn init_clickhouse_db(client: &Client, schema_path: &str) -> Result<(), clickhouse::error::Error> {
77 let schema_sql = match fs::read_to_string(schema_path) {
79 Ok(content) => content,
80 Err(e) => {
81 eprintln!("Failed to read schema file: {}", e);
82 return Err(clickhouse::error::Error::Custom(format!("Failed to read schema file: {}", e)));
83 }
84 };
85
86 let statements: Vec<String> = schema_sql
88 .split(';')
89 .map(|s| s.trim())
90 .filter(|s| !s.is_empty() && !s.starts_with("--"))
91 .map(|s| s.to_string())
92 .collect();
93
94 println!("Found {} SQL statements to execute", statements.len());
95
96 for (i, stmt) in statements.iter().enumerate() {
98 if stmt.trim().is_empty() {
99 continue; }
101
102 println!("Executing statement {}/{}: {} characters", i+1, statements.len(), stmt.len());
103 match client.query(stmt).execute().await {
104 Ok(_) => println!("Statement {}/{} executed successfully", i+1, statements.len()),
105 Err(e) => {
106 eprintln!("Failed to execute statement {}/{}: {:?}", i+1, statements.len(), e);
107 eprintln!("Statement content: {}", stmt);
108 return Err(e);
109 }
110 }
111 }
112
113 Ok(())
114}
115
116async fn fetch_logs_paginated(
118 client: &Client,
119 query_conditions: &str,
120 page: i64,
121 per_page: i64,
122) -> Result<(Vec<LogResponse>, i64), clickhouse::error::Error> {
123 let offset = (page - 1) * per_page;
125
126 let count_sql = format!(
128 "SELECT count() FROM omni_logs.logs WHERE {}",
129 query_conditions
130 );
131
132 let logs_sql = format!(
134 r#"
135 SELECT
136 log_id,
137 timestamp,
138 platform_id,
139 org_id,
140 app_id,
141 instance_id,
142 level,
143 message,
144 context
145 FROM omni_logs.logs
146 PREWHERE {}
147 ORDER BY timestamp DESC
148 LIMIT {} OFFSET {}
149 "#,
150 query_conditions,
151 per_page,
152 offset
153 );
154
155 let count = client
157 .query(&count_sql)
158 .fetch_one::<i64>()
159 .await?; let rows: Vec<LogEntry> = client.query(&logs_sql).fetch_all().await?;
163
164 let mut logs = Vec::with_capacity(rows.len());
166
167 for row in rows {
168 let log_id: String = row.log_id.ok_or(clickhouse::error::Error::Custom("Missing log_id".to_string()))?;
169 let timestamp: DateTime<Utc> = row.timestamp;
170 let platform_id: String = row.platform_id;
171 let org_id: String = row.org_id;
172 let app_id: String = row.app_id;
173 let instance_id: String = row.instance_id;
174
175 let level_num: u8 = row.level as u8;
177 let level = match level_num {
178 1 => "debug",
179 2 => "info",
180 3 => "warn",
181 4 => "error",
182 5 => "fatal",
183 _ => "unknown",
184 };
185
186 let message: String = row.message;
187
188 let context_str: String = row.context.to_string();
190 let context: serde_json::Value = serde_json::from_str(&context_str)
191 .unwrap_or(serde_json::Value::Null);
192
193 logs.push(LogResponse {
194 log_id,
195 timestamp,
196 platform_id,
197 org_id,
198 app_id,
199 instance_id,
200 level: level.to_string(),
201 message,
202 context,
203 });
204 }
205
206 Ok((logs, count))
207}
208
209#[get("/logs?<page>&<per_page>&<platform_id>&<org_id>&<app_id>&<instance_id>&<level>&<start_time>&<end_time>&<search>")]
211pub async fn list_logs(
212 page: Option<i64>,
213 per_page: Option<i64>,
214 platform_id: Option<String>,
215 org_id: Option<String>,
216 app_id: Option<String>,
217 instance_id: Option<String>,
218 level: Option<String>,
219 start_time: Option<String>,
220 end_time: Option<String>,
221 search: Option<String>,
222 clickhouse: &State<Client>,
223) -> Result<Json<Value>, (Status, Json<Value>)> {
224 let page = page.unwrap_or(1);
226 let per_page = per_page.unwrap_or(50);
227
228 if page < 1 || per_page < 1 || per_page > 1000 {
229 return Err((
230 Status::BadRequest,
231 Json(json!({
232 "error": "Invalid pagination parameters",
233 "message": "Page must be ≥ 1 and per_page must be between 1 and 1000"
234 }))
235 ));
236 }
237
238 let mut conditions = Vec::new();
240
241 let _using_hierarchy_filter = platform_id.is_some() || org_id.is_some() || app_id.is_some();
243
244 if let Some(pid) = platform_id {
245 conditions.push(format!("platform_id = '{}'", pid.replace('\'', "''")));
246 }
247
248 if let Some(oid) = org_id {
249 conditions.push(format!("org_id = '{}'", oid.replace('\'', "''")));
250 }
251
252 if let Some(aid) = app_id {
253 conditions.push(format!("app_id = '{}'", aid.replace('\'', "''")));
254 }
255
256 if let Some(iid) = instance_id {
257 conditions.push(format!("instance_id = '{}'", iid.replace('\'', "''")));
258 }
259
260 if let Some(lvl) = level {
261 let level_enum = match lvl.to_lowercase().as_str() {
263 "debug" => 1,
264 "info" => 2,
265 "warn" => 3,
266 "error" => 4,
267 "fatal" => 5,
268 _ => {
269 return Err((
270 Status::BadRequest,
271 Json(json!({
272 "error": "Invalid log level",
273 "message": "Level must be one of: debug, info, warn, error, fatal"
274 }))
275 ));
276 }
277 };
278 conditions.push(format!("level = {}", level_enum));
279 }
280
281 if let Some(st) = start_time {
283 conditions.push(format!("timestamp >= toDateTime64('{}', 3, 'UTC')", st));
284
285 conditions.push(format!("event_date >= toDate('{}')", st));
287 }
288
289 if let Some(et) = end_time {
290 conditions.push(format!("timestamp <= toDateTime64('{}', 3, 'UTC')", et));
291
292 conditions.push(format!("event_date <= toDate('{}')", et));
294 }
295
296 if let Some(term) = search {
298 let escaped_term = term.replace('\'', "''");
299 conditions.push(format!("message ILIKE '%{}%'", escaped_term));
300 }
301
302 let query_conditions = if conditions.is_empty() {
304 "1=1".to_string()
305 } else {
306 conditions.join(" AND ")
307 };
308
309 match fetch_logs_paginated(clickhouse, &query_conditions, page, per_page).await {
311 Ok((logs, total_count)) => {
312 let total_pages = (total_count + per_page - 1) / per_page; let response = json!({
315 "logs": logs,
316 "pagination": {
317 "page": page,
318 "per_page": per_page,
319 "total_count": total_count,
320 "total_pages": total_pages
321 }
322 });
323
324 Ok(Json(response))
325 },
326 Err(err) => Err((
327 Status::InternalServerError,
328 Json(json!({
329 "error": "Database error",
330 "message": err.to_string()
331 }))
332 ))
333 }
334}
335
336#[get("/platforms/<platform_id>/logs?<page>&<per_page>&<level>&<start_time>&<end_time>&<search>")]
338pub async fn list_platform_logs(
339 platform_id: String,
340 page: Option<i64>,
341 per_page: Option<i64>,
342 level: Option<String>,
343 start_time: Option<String>,
344 end_time: Option<String>,
345 search: Option<String>,
346 clickhouse: &State<Client>,
347) -> Result<Json<Value>, (Status, Json<Value>)> {
348 list_logs(
349 page,
350 per_page,
351 Some(platform_id),
352 None,
353 None,
354 None,
355 level,
356 start_time,
357 end_time,
358 search,
359 clickhouse,
360 ).await
361}
362
363#[get("/orgs/<org_id>/logs?<page>&<per_page>&<platform_id>&<level>&<start_time>&<end_time>&<search>")]
365pub async fn list_org_logs(
366 org_id: String,
367 page: Option<i64>,
368 per_page: Option<i64>,
369 platform_id: Option<String>,
370 level: Option<String>,
371 start_time: Option<String>,
372 end_time: Option<String>,
373 search: Option<String>,
374 clickhouse: &State<Client>,
375) -> Result<Json<Value>, (Status, Json<Value>)> {
376 list_logs(
377 page,
378 per_page,
379 platform_id,
380 Some(org_id),
381 None,
382 None,
383 level,
384 start_time,
385 end_time,
386 search,
387 clickhouse,
388 ).await
389}
390
391#[get("/apps/<app_id>/logs?<page>&<per_page>&<platform_id>&<org_id>&<level>&<start_time>&<end_time>&<search>")]
393pub async fn list_app_logs(
394 app_id: String,
395 page: Option<i64>,
396 per_page: Option<i64>,
397 platform_id: Option<String>,
398 org_id: Option<String>,
399 level: Option<String>,
400 start_time: Option<String>,
401 end_time: Option<String>,
402 search: Option<String>,
403 clickhouse: &State<Client>,
404) -> Result<Json<Value>, (Status, Json<Value>)> {
405 list_logs(
406 page,
407 per_page,
408 platform_id,
409 org_id,
410 Some(app_id),
411 None,
412 level,
413 start_time,
414 end_time,
415 search,
416 clickhouse,
417 ).await
418}
419
420#[get("/instances/<instance_id>/logs?<page>&<per_page>&<platform_id>&<org_id>&<app_id>&<level>&<start_time>&<end_time>&<search>")]
422pub async fn list_instance_logs(
423 instance_id: String,
424 page: Option<i64>,
425 per_page: Option<i64>,
426 platform_id: Option<String>,
427 org_id: Option<String>,
428 app_id: Option<String>,
429 level: Option<String>,
430 start_time: Option<String>,
431 end_time: Option<String>,
432 search: Option<String>,
433 clickhouse: &State<Client>,
434) -> Result<Json<Value>, (Status, Json<Value>)> {
435 list_logs(
436 page,
437 per_page,
438 platform_id,
439 org_id,
440 app_id,
441 Some(instance_id),
442 level,
443 start_time,
444 end_time,
445 search,
446 clickhouse,
447 ).await
448}
449
450#[post("/logs", format = "json", data = "<log_batch>")]
452pub async fn insert_logs(
453 log_batch: Json<BulkLogInsert>,
454 clickhouse: &State<Client>,
455) -> Result<Json<Value>, (Status, Json<Value>)> {
456 let logs = log_batch.into_inner().logs;
457
458 if logs.is_empty() {
459 return Ok(Json(json!({
460 "status": "success",
461 "message": "No logs to insert",
462 "count": 0
463 })));
464 }
465
466 let mut inserted_count = 0;
468
469 let _tx = clickhouse.query("BEGIN TRANSACTION").execute().await;
471
472 for mut log in logs {
473 if log.log_id.is_none() {
475 log.log_id = Some(Uuid::new_v4().to_string());
476 }
477
478 let context_str = serde_json::to_string(&log.context)
480 .unwrap_or_else(|_| "{}".to_string());
481
482 let level_num = match log.level {
484 LogLevel::Debug => 1_u8,
485 LogLevel::Info => 2_u8,
486 LogLevel::Warn => 3_u8,
487 LogLevel::Error => 4_u8,
488 LogLevel::Fatal => 5_u8,
489 };
490
491 let insert_sql = format!(
493 r#"
494 INSERT INTO omni_logs.logs
495 (log_id, timestamp, platform_id, org_id, app_id, instance_id, level, message, context)
496 VALUES ('{}', '{}', '{}', '{}', '{}', '{}', {}, '{}', '{}')
497 "#,
498 log.log_id.unwrap().replace('\'', "''"),
499 log.timestamp.format("%Y-%m-%d %H:%M:%S%.3f"),
500 log.platform_id.replace('\'', "''"),
501 log.org_id.replace('\'', "''"),
502 log.app_id.replace('\'', "''"),
503 log.instance_id.replace('\'', "''"),
504 level_num,
505 log.message.replace('\'', "''"),
506 context_str.replace('\'', "''")
507 );
508
509 if let Err(err) = clickhouse.query(&insert_sql).execute().await {
510 let _ = clickhouse.query("ROLLBACK").execute().await;
512
513 return Err((
514 Status::InternalServerError,
515 Json(json!({
516 "error": "Failed to insert log",
517 "message": err.to_string(),
518 "count": inserted_count
519 }))
520 ));
521 }
522
523 inserted_count += 1;
524 }
525
526 if let Err(err) = clickhouse.query("COMMIT").execute().await {
528 return Err((
529 Status::InternalServerError,
530 Json(json!({
531 "error": "Failed to commit transaction",
532 "message": err.to_string(),
533 "count": inserted_count
534 }))
535 ));
536 }
537
538 Ok(Json(json!({
539 "status": "success",
540 "message": "Logs inserted successfully",
541 "count": inserted_count
542 })))
543}
544
545pub fn routes() -> Vec<rocket::Route> {
547 routes![
548 list_logs,
549 list_platform_logs,
550 list_org_logs,
551 list_app_logs,
552 list_instance_logs,
553 insert_logs
554 ]
555}