omni_orchestrator/schemas/v1/db/queries/
storage.rs

1use anyhow::Context;
2use sqlx::{MySql, Pool};
3use sqlx::Row;
4
5use libomni::types::db::v1 as types;
6use types::provider::Provider;
7use types::region::Region;
8use types::storage::{
9    StorageClass,
10    StorageVolume,
11    StorageSnapshot,
12    StorageQosPolicy,    
13    StorageMigration,
14};
15
16/// Storage class query filters
17#[derive(Default, Debug)]
18pub struct StorageClassFilter {
19    pub storage_type: Option<String>,
20    pub volume_binding_mode: Option<String>,
21    pub allow_volume_expansion: Option<bool>,
22}
23
24/// Storage volume query filters
25#[derive(Default, Debug, Clone)]
26pub struct StorageVolumeFilter {
27    pub app_id: Option<i64>,
28    pub storage_class_id: Option<i64>,
29    pub status: Option<String>,
30    pub node_id: Option<i64>,
31    pub persistence_level: Option<String>,
32    pub write_concern: Option<String>,
33}
34
35/// Retrieves all storage classes with optional filtering
36pub async fn list_storage_classes(
37    pool: &Pool<MySql>,
38    filter: StorageClassFilter,
39) -> anyhow::Result<Vec<StorageClass>> {
40    let mut query_builder = sqlx::QueryBuilder::new("SELECT * FROM storage_classes WHERE 1=1");
41    
42    if let Some(storage_type) = filter.storage_type {
43        query_builder.push(" AND storage_type = ");
44        query_builder.push_bind(storage_type);
45    }
46    
47    if let Some(binding_mode) = filter.volume_binding_mode {
48        query_builder.push(" AND volume_binding_mode = ");
49        query_builder.push_bind(binding_mode);
50    }
51    
52    if let Some(allow_expansion) = filter.allow_volume_expansion {
53        query_builder.push(" AND allow_volume_expansion = ");
54        query_builder.push_bind(allow_expansion);
55    }
56    
57    let query = query_builder.build_query_as::<StorageClass>();
58    let storage_classes = query
59        .fetch_all(pool)
60        .await
61        .context("Failed to fetch storage classes")?;
62    
63    Ok(storage_classes)
64}
65
66/// Retrieves a single storage class by ID
67pub async fn get_storage_class_by_id(
68    pool: &Pool<MySql>,
69    id: i64,
70) -> anyhow::Result<Option<StorageClass>> {
71    let storage_class = sqlx::query_as::<_, StorageClass>(
72        "SELECT * FROM storage_classes WHERE id = ?"
73    )
74    .bind(id)
75    .fetch_optional(pool)
76    .await
77    .context("Failed to fetch storage class")?;
78    
79    Ok(storage_class)
80}
81
82/// Retrieves a paginated list of storage volumes with filtering
83pub async fn list_storage_volumes(
84    pool: &Pool<MySql>,
85    filter: StorageVolumeFilter,
86    page: i64,
87    per_page: i64,
88) -> anyhow::Result<Vec<StorageVolume>> {
89    let offset = page * per_page;
90    
91    let mut query_builder = sqlx::QueryBuilder::new("SELECT * FROM storage_volumes WHERE 1=1");
92    
93    if let Some(app_id) = filter.app_id {
94        query_builder.push(" AND app_id = ");
95        query_builder.push_bind(app_id);
96    }
97    
98    if let Some(storage_class_id) = filter.storage_class_id {
99        query_builder.push(" AND storage_class_id = ");
100        query_builder.push_bind(storage_class_id);
101    }
102    
103    if let Some(status) = &filter.status {
104        query_builder.push(" AND status = ");
105        query_builder.push_bind(status);
106    }
107    
108    if let Some(node_id) = filter.node_id {
109        query_builder.push(" AND node_id = ");
110        query_builder.push_bind(node_id);
111    }
112    
113    if let Some(persistence_level) = &filter.persistence_level {
114        query_builder.push(" AND persistence_level = ");
115        query_builder.push_bind(persistence_level);
116    }
117    
118    if let Some(write_concern) = &filter.write_concern {
119        query_builder.push(" AND write_concern = ");
120        query_builder.push_bind(write_concern);
121    }
122    
123    query_builder.push(" LIMIT ");
124    query_builder.push_bind(per_page);
125    query_builder.push(" OFFSET ");
126    query_builder.push_bind(offset);
127    
128    let query = query_builder.build_query_as::<StorageVolume>();
129    let storage_volumes = query
130        .fetch_all(pool)
131        .await
132        .context("Failed to fetch storage volumes")?;
133    
134    Ok(storage_volumes)
135}
136
137/// Counts storage volumes with the same filtering options
138pub async fn count_storage_volumes_with_filter(
139    pool: &Pool<MySql>,
140    filter: &StorageVolumeFilter,
141) -> anyhow::Result<i64> {
142    let mut query_builder = sqlx::QueryBuilder::new("SELECT COUNT(*) FROM storage_volumes WHERE 1=1");
143    
144    if let Some(app_id) = filter.app_id {
145        query_builder.push(" AND app_id = ");
146        query_builder.push_bind(app_id);
147    }
148    
149    if let Some(storage_class_id) = filter.storage_class_id {
150        query_builder.push(" AND storage_class_id = ");
151        query_builder.push_bind(storage_class_id);
152    }
153    
154    if let Some(status) = &filter.status {
155        query_builder.push(" AND status = ");
156        query_builder.push_bind(status);
157    }
158    
159    if let Some(node_id) = filter.node_id {
160        query_builder.push(" AND node_id = ");
161        query_builder.push_bind(node_id);
162    }
163    
164    if let Some(persistence_level) = &filter.persistence_level {
165        query_builder.push(" AND persistence_level = ");
166        query_builder.push_bind(persistence_level);
167    }
168    
169    if let Some(write_concern) = &filter.write_concern {
170        query_builder.push(" AND write_concern = ");
171        query_builder.push_bind(write_concern);
172    }
173    
174    let query = query_builder.build_query_as::<(i64,)>();
175    let (count,) = query
176        .fetch_one(pool)
177        .await
178        .context("Failed to count storage volumes")?;
179    
180    Ok(count)
181}
182
183/// Get volumes by storage class
184pub async fn get_volumes_by_storage_class(
185    pool: &Pool<MySql>,
186    storage_class_id: i64,
187    page: i64,
188    per_page: i64,
189) -> anyhow::Result<Vec<StorageVolume>> {
190    let offset = page * per_page;
191    let query = "SELECT * FROM storage_volumes WHERE storage_class_id = ? LIMIT ? OFFSET ?";
192    
193    let volumes = sqlx::query_as::<_, StorageVolume>(query)
194        .bind(storage_class_id)
195        .bind(per_page)
196        .bind(offset)
197        .fetch_all(pool)
198        .await
199        .context("Failed to fetch volumes by storage class")?;
200    
201    Ok(volumes)
202}
203
204/// Get QoS policies
205pub async fn list_storage_qos_policies(
206    pool: &Pool<MySql>,
207) -> anyhow::Result<Vec<StorageQosPolicy>> {
208    let policies = sqlx::query_as::<_, StorageQosPolicy>(
209        "SELECT * FROM storage_qos_policies"
210    )
211    .fetch_all(pool)
212    .await
213    .context("Failed to fetch storage QoS policies")?;
214    
215    Ok(policies)
216}
217
218/// Get storage with specified write concern
219pub async fn get_volumes_by_write_concern(
220    pool: &Pool<MySql>,
221    write_concern: String,
222    page: i64,
223    per_page: i64,
224) -> anyhow::Result<Vec<StorageVolume>> {
225    let offset = page * per_page;
226    let query = "SELECT * FROM storage_volumes WHERE write_concern = ? LIMIT ? OFFSET ?";
227    
228    let volumes = sqlx::query_as::<_, StorageVolume>(query)
229        .bind(write_concern)
230        .bind(per_page)
231        .bind(offset)
232        .fetch_all(pool)
233        .await
234        .context("Failed to fetch volumes by write concern")?;
235    
236    Ok(volumes)
237}
238
239/// Get volumes with specific persistence level
240pub async fn get_volumes_by_persistence_level(
241    pool: &Pool<MySql>,
242    persistence_level: String,
243    page: i64, 
244    per_page: i64,
245) -> anyhow::Result<Vec<StorageVolume>> {
246    let offset = page * per_page;
247    let query = "SELECT * FROM storage_volumes WHERE persistence_level = ? LIMIT ? OFFSET ?";
248    
249    let volumes = sqlx::query_as::<_, StorageVolume>(query)
250        .bind(persistence_level)
251        .bind(per_page)
252        .bind(offset)
253        .fetch_all(pool)
254        .await
255        .context("Failed to fetch volumes by persistence level")?;
256    
257    Ok(volumes)
258}
259
260/// Struct to represent a Region with its storage volumes
261#[derive(Debug, serde::Serialize)]
262pub struct RegionVolumes {
263    pub region: Region,
264    pub volumes: Vec<StorageVolume>
265}
266
267/// Retrieves storage volumes for a specific region grouped by region with pagination
268pub async fn get_volumes_for_region(
269    pool: &Pool<MySql>,
270    region_id: i64,
271    page: i64,
272    per_page: i64,
273) -> anyhow::Result<RegionVolumes> {
274    // First, get the region
275    let region = sqlx::query_as::<_, Region>("SELECT * FROM regions WHERE id = ?")
276        .bind(region_id)
277        .fetch_one(pool)
278        .await
279        .context("Failed to fetch region")?;
280    
281    // Calculate offset
282    let offset = page * per_page;
283    
284    // Get paginated volumes for this region
285    let volumes = sqlx::query_as::<_, StorageVolume>(
286        r#"
287        SELECT
288            v.*
289        FROM 
290            storage_volumes v
291        INNER JOIN 
292            workers w ON v.node_id = w.id
293        WHERE 
294            w.region_id = ?
295        ORDER BY 
296            v.id
297        LIMIT ? OFFSET ?
298        "#
299    )
300    .bind(region_id)
301    .bind(per_page)
302    .bind(offset)
303    .fetch_all(pool)
304    .await
305    .context("Failed to fetch volumes for region")?;
306    
307    Ok(RegionVolumes {
308        region,
309        volumes
310    })
311}
312
313/// Counts the total number of storage volumes for a specific region
314pub async fn count_volumes_for_region(
315    pool: &Pool<MySql>,
316    region_id: i64,
317) -> anyhow::Result<i64> {
318    // Get the total count of volumes for this region
319    let (total_volumes,) = sqlx::query_as::<_, (i64,)>(
320        r#"
321        SELECT
322            COUNT(v.id)
323        FROM 
324            storage_volumes v
325        INNER JOIN 
326            workers w ON v.node_id = w.id
327        INNER JOIN 
328            nodes n ON w.id = n.worker_id
329        WHERE 
330            w.region_id = ?
331        "#
332    )
333    .bind(region_id)
334    .fetch_one(pool)
335    .await
336    .context("Failed to count volumes for region")?;
337    
338    Ok(total_volumes)
339}
340
341#[derive(serde::Serialize)]
342pub struct ProviderRegionVolumes {
343    pub provider: Provider,
344    pub regions: Vec<RegionVolumes>
345}
346
347/// Retrieves storage volumes for a specific provider grouped by region with pagination
348pub async fn get_volumes_for_provider(
349    pool: &Pool<MySql>,
350    provider_id: i64,
351    page: i64,
352    per_page: i64,
353) -> anyhow::Result<ProviderRegionVolumes> {
354    // First, get the provider
355    let provider = sqlx::query_as::<_, Provider>("SELECT * FROM providers WHERE id = ?")
356        .bind(provider_id)
357        .fetch_one(pool)
358        .await
359        .context("Failed to fetch provider")?;
360    
361    // Get all regions for this provider
362    let regions = sqlx::query_as::<_, Region>(
363        "SELECT * FROM regions WHERE provider = ? ORDER BY name"
364    )
365    .bind(provider_id)
366    .fetch_all(pool)
367    .await
368    .context("Failed to fetch regions for provider")?;
369    
370    let mut region_volumes = Vec::new();
371    
372    // Calculate offset
373    let offset = page * per_page;
374    
375    // For each region, get paginated volumes
376    for region in regions {
377        // Get paginated volumes for this region
378        let volumes = sqlx::query_as::<_, StorageVolume>(
379            r#"
380            SELECT
381                v.*
382            FROM 
383                storage_volumes v
384            INNER JOIN 
385                workers w ON v.node_id = w.id
386            INNER JOIN
387                regions r ON w.region_id = r.id
388            WHERE 
389                r.provider = ?
390                AND r.id = ?
391            ORDER BY 
392                v.id
393            LIMIT ? OFFSET ?
394            "#
395        )
396        .bind(provider_id)
397        .bind(region.id)
398        .bind(per_page)
399        .bind(offset)
400        .fetch_all(pool)
401        .await
402        .context(format!("Failed to fetch volumes for region {}", region.id))?;
403        
404        // Only add regions with volumes
405        if !volumes.is_empty() {
406            region_volumes.push(RegionVolumes {
407                region,
408                volumes
409            });
410        }
411    }
412    
413    Ok(ProviderRegionVolumes {
414        provider,
415        regions: region_volumes
416    })
417}
418
419/// Counts the total number of storage volumes for a specific provider
420pub async fn count_volumes_for_provider(
421    pool: &Pool<MySql>,
422    provider_id: i64,
423) -> anyhow::Result<i64> {
424    // Get the total count of volumes for this provider
425    let (total_volumes,) = sqlx::query_as::<_, (i64,)>(
426        r#"
427        SELECT
428            COUNT(v.id)
429        FROM 
430            storage_volumes v
431        INNER JOIN 
432            workers w ON v.node_id = w.id
433        INNER JOIN 
434            regions r ON w.region_id = r.id
435        WHERE 
436            r.provider = ?
437        "#
438    )
439    .bind(provider_id)
440    .fetch_one(pool)
441    .await
442    .context("Failed to count volumes for provider")?;
443    
444    Ok(total_volumes)
445}