omni_orchestrator/schemas/v1/db/queries/
storage.rs

1use crate::models::provider::Provider;
2use crate::models::storage::{
3    StorageClass,
4    StorageVolume,
5    StorageSnapshot,
6    StorageQosPolicy,    
7    StorageMigration,
8};
9use crate::models::region::Region;
10use anyhow::Context;
11use sqlx::{MySql, Pool};
12use sqlx::Row;
13
14/// Storage class query filters
15#[derive(Default, Debug)]
16pub struct StorageClassFilter {
17    pub storage_type: Option<String>,
18    pub volume_binding_mode: Option<String>,
19    pub allow_volume_expansion: Option<bool>,
20}
21
22/// Storage volume query filters
23#[derive(Default, Debug, Clone)]
24pub struct StorageVolumeFilter {
25    pub app_id: Option<i64>,
26    pub storage_class_id: Option<i64>,
27    pub status: Option<String>,
28    pub node_id: Option<i64>,
29    pub persistence_level: Option<String>,
30    pub write_concern: Option<String>,
31}
32
33/// Retrieves all storage classes with optional filtering
34pub async fn list_storage_classes(
35    pool: &Pool<MySql>,
36    filter: StorageClassFilter,
37) -> anyhow::Result<Vec<StorageClass>> {
38    let mut query_builder = sqlx::QueryBuilder::new("SELECT * FROM storage_classes WHERE 1=1");
39    
40    if let Some(storage_type) = filter.storage_type {
41        query_builder.push(" AND storage_type = ");
42        query_builder.push_bind(storage_type);
43    }
44    
45    if let Some(binding_mode) = filter.volume_binding_mode {
46        query_builder.push(" AND volume_binding_mode = ");
47        query_builder.push_bind(binding_mode);
48    }
49    
50    if let Some(allow_expansion) = filter.allow_volume_expansion {
51        query_builder.push(" AND allow_volume_expansion = ");
52        query_builder.push_bind(allow_expansion);
53    }
54    
55    let query = query_builder.build_query_as::<StorageClass>();
56    let storage_classes = query
57        .fetch_all(pool)
58        .await
59        .context("Failed to fetch storage classes")?;
60    
61    Ok(storage_classes)
62}
63
64/// Retrieves a single storage class by ID
65pub async fn get_storage_class_by_id(
66    pool: &Pool<MySql>,
67    id: i64,
68) -> anyhow::Result<Option<StorageClass>> {
69    let storage_class = sqlx::query_as::<_, StorageClass>(
70        "SELECT * FROM storage_classes WHERE id = ?"
71    )
72    .bind(id)
73    .fetch_optional(pool)
74    .await
75    .context("Failed to fetch storage class")?;
76    
77    Ok(storage_class)
78}
79
80/// Retrieves a paginated list of storage volumes with filtering
81pub async fn list_storage_volumes(
82    pool: &Pool<MySql>,
83    filter: StorageVolumeFilter,
84    page: i64,
85    per_page: i64,
86) -> anyhow::Result<Vec<StorageVolume>> {
87    let offset = page * per_page;
88    
89    let mut query_builder = sqlx::QueryBuilder::new("SELECT * FROM storage_volumes WHERE 1=1");
90    
91    if let Some(app_id) = filter.app_id {
92        query_builder.push(" AND app_id = ");
93        query_builder.push_bind(app_id);
94    }
95    
96    if let Some(storage_class_id) = filter.storage_class_id {
97        query_builder.push(" AND storage_class_id = ");
98        query_builder.push_bind(storage_class_id);
99    }
100    
101    if let Some(status) = &filter.status {
102        query_builder.push(" AND status = ");
103        query_builder.push_bind(status);
104    }
105    
106    if let Some(node_id) = filter.node_id {
107        query_builder.push(" AND node_id = ");
108        query_builder.push_bind(node_id);
109    }
110    
111    if let Some(persistence_level) = &filter.persistence_level {
112        query_builder.push(" AND persistence_level = ");
113        query_builder.push_bind(persistence_level);
114    }
115    
116    if let Some(write_concern) = &filter.write_concern {
117        query_builder.push(" AND write_concern = ");
118        query_builder.push_bind(write_concern);
119    }
120    
121    query_builder.push(" LIMIT ");
122    query_builder.push_bind(per_page);
123    query_builder.push(" OFFSET ");
124    query_builder.push_bind(offset);
125    
126    let query = query_builder.build_query_as::<StorageVolume>();
127    let storage_volumes = query
128        .fetch_all(pool)
129        .await
130        .context("Failed to fetch storage volumes")?;
131    
132    Ok(storage_volumes)
133}
134
135/// Counts storage volumes with the same filtering options
136pub async fn count_storage_volumes_with_filter(
137    pool: &Pool<MySql>,
138    filter: &StorageVolumeFilter,
139) -> anyhow::Result<i64> {
140    let mut query_builder = sqlx::QueryBuilder::new("SELECT COUNT(*) FROM storage_volumes WHERE 1=1");
141    
142    if let Some(app_id) = filter.app_id {
143        query_builder.push(" AND app_id = ");
144        query_builder.push_bind(app_id);
145    }
146    
147    if let Some(storage_class_id) = filter.storage_class_id {
148        query_builder.push(" AND storage_class_id = ");
149        query_builder.push_bind(storage_class_id);
150    }
151    
152    if let Some(status) = &filter.status {
153        query_builder.push(" AND status = ");
154        query_builder.push_bind(status);
155    }
156    
157    if let Some(node_id) = filter.node_id {
158        query_builder.push(" AND node_id = ");
159        query_builder.push_bind(node_id);
160    }
161    
162    if let Some(persistence_level) = &filter.persistence_level {
163        query_builder.push(" AND persistence_level = ");
164        query_builder.push_bind(persistence_level);
165    }
166    
167    if let Some(write_concern) = &filter.write_concern {
168        query_builder.push(" AND write_concern = ");
169        query_builder.push_bind(write_concern);
170    }
171    
172    let query = query_builder.build_query_as::<(i64,)>();
173    let (count,) = query
174        .fetch_one(pool)
175        .await
176        .context("Failed to count storage volumes")?;
177    
178    Ok(count)
179}
180
181/// Get volumes by storage class
182pub async fn get_volumes_by_storage_class(
183    pool: &Pool<MySql>,
184    storage_class_id: i64,
185    page: i64,
186    per_page: i64,
187) -> anyhow::Result<Vec<StorageVolume>> {
188    let offset = page * per_page;
189    let query = "SELECT * FROM storage_volumes WHERE storage_class_id = ? LIMIT ? OFFSET ?";
190    
191    let volumes = sqlx::query_as::<_, StorageVolume>(query)
192        .bind(storage_class_id)
193        .bind(per_page)
194        .bind(offset)
195        .fetch_all(pool)
196        .await
197        .context("Failed to fetch volumes by storage class")?;
198    
199    Ok(volumes)
200}
201
202/// Get QoS policies
203pub async fn list_storage_qos_policies(
204    pool: &Pool<MySql>,
205) -> anyhow::Result<Vec<StorageQosPolicy>> {
206    let policies = sqlx::query_as::<_, StorageQosPolicy>(
207        "SELECT * FROM storage_qos_policies"
208    )
209    .fetch_all(pool)
210    .await
211    .context("Failed to fetch storage QoS policies")?;
212    
213    Ok(policies)
214}
215
216/// Get storage with specified write concern
217pub async fn get_volumes_by_write_concern(
218    pool: &Pool<MySql>,
219    write_concern: String,
220    page: i64,
221    per_page: i64,
222) -> anyhow::Result<Vec<StorageVolume>> {
223    let offset = page * per_page;
224    let query = "SELECT * FROM storage_volumes WHERE write_concern = ? LIMIT ? OFFSET ?";
225    
226    let volumes = sqlx::query_as::<_, StorageVolume>(query)
227        .bind(write_concern)
228        .bind(per_page)
229        .bind(offset)
230        .fetch_all(pool)
231        .await
232        .context("Failed to fetch volumes by write concern")?;
233    
234    Ok(volumes)
235}
236
237/// Get volumes with specific persistence level
238pub async fn get_volumes_by_persistence_level(
239    pool: &Pool<MySql>,
240    persistence_level: String,
241    page: i64, 
242    per_page: i64,
243) -> anyhow::Result<Vec<StorageVolume>> {
244    let offset = page * per_page;
245    let query = "SELECT * FROM storage_volumes WHERE persistence_level = ? LIMIT ? OFFSET ?";
246    
247    let volumes = sqlx::query_as::<_, StorageVolume>(query)
248        .bind(persistence_level)
249        .bind(per_page)
250        .bind(offset)
251        .fetch_all(pool)
252        .await
253        .context("Failed to fetch volumes by persistence level")?;
254    
255    Ok(volumes)
256}
257
258/// Struct to represent a Region with its storage volumes
259#[derive(Debug, serde::Serialize)]
260pub struct RegionVolumes {
261    pub region: Region,
262    pub volumes: Vec<StorageVolume>
263}
264
265/// Retrieves storage volumes for a specific region grouped by region with pagination
266pub async fn get_volumes_for_region(
267    pool: &Pool<MySql>,
268    region_id: i64,
269    page: i64,
270    per_page: i64,
271) -> anyhow::Result<RegionVolumes> {
272    // First, get the region
273    let region = sqlx::query_as::<_, Region>("SELECT * FROM regions WHERE id = ?")
274        .bind(region_id)
275        .fetch_one(pool)
276        .await
277        .context("Failed to fetch region")?;
278    
279    // Calculate offset
280    let offset = page * per_page;
281    
282    // Get paginated volumes for this region
283    let volumes = sqlx::query_as::<_, StorageVolume>(
284        r#"
285        SELECT
286            v.*
287        FROM 
288            storage_volumes v
289        INNER JOIN 
290            workers w ON v.node_id = w.id
291        WHERE 
292            w.region_id = ?
293        ORDER BY 
294            v.id
295        LIMIT ? OFFSET ?
296        "#
297    )
298    .bind(region_id)
299    .bind(per_page)
300    .bind(offset)
301    .fetch_all(pool)
302    .await
303    .context("Failed to fetch volumes for region")?;
304    
305    Ok(RegionVolumes {
306        region,
307        volumes
308    })
309}
310
311/// Counts the total number of storage volumes for a specific region
312pub async fn count_volumes_for_region(
313    pool: &Pool<MySql>,
314    region_id: i64,
315) -> anyhow::Result<i64> {
316    // Get the total count of volumes for this region
317    let (total_volumes,) = sqlx::query_as::<_, (i64,)>(
318        r#"
319        SELECT
320            COUNT(v.id)
321        FROM 
322            storage_volumes v
323        INNER JOIN 
324            workers w ON v.node_id = w.id
325        INNER JOIN 
326            nodes n ON w.id = n.worker_id
327        WHERE 
328            w.region_id = ?
329        "#
330    )
331    .bind(region_id)
332    .fetch_one(pool)
333    .await
334    .context("Failed to count volumes for region")?;
335    
336    Ok(total_volumes)
337}
338
339#[derive(serde::Serialize)]
340pub struct ProviderRegionVolumes {
341    pub provider: Provider,
342    pub regions: Vec<RegionVolumes>
343}
344
345/// Retrieves storage volumes for a specific provider grouped by region with pagination
346pub async fn get_volumes_for_provider(
347    pool: &Pool<MySql>,
348    provider_id: i64,
349    page: i64,
350    per_page: i64,
351) -> anyhow::Result<ProviderRegionVolumes> {
352    // First, get the provider
353    let provider = sqlx::query_as::<_, Provider>("SELECT * FROM providers WHERE id = ?")
354        .bind(provider_id)
355        .fetch_one(pool)
356        .await
357        .context("Failed to fetch provider")?;
358    
359    // Get all regions for this provider
360    let regions = sqlx::query_as::<_, Region>(
361        "SELECT * FROM regions WHERE provider = ? ORDER BY name"
362    )
363    .bind(provider_id)
364    .fetch_all(pool)
365    .await
366    .context("Failed to fetch regions for provider")?;
367    
368    let mut region_volumes = Vec::new();
369    
370    // Calculate offset
371    let offset = page * per_page;
372    
373    // For each region, get paginated volumes
374    for region in regions {
375        // Get paginated volumes for this region
376        let volumes = sqlx::query_as::<_, StorageVolume>(
377            r#"
378            SELECT
379                v.*
380            FROM 
381                storage_volumes v
382            INNER JOIN 
383                workers w ON v.node_id = w.id
384            INNER JOIN
385                regions r ON w.region_id = r.id
386            WHERE 
387                r.provider = ?
388                AND r.id = ?
389            ORDER BY 
390                v.id
391            LIMIT ? OFFSET ?
392            "#
393        )
394        .bind(provider_id)
395        .bind(region.id)
396        .bind(per_page)
397        .bind(offset)
398        .fetch_all(pool)
399        .await
400        .context(format!("Failed to fetch volumes for region {}", region.id))?;
401        
402        // Only add regions with volumes
403        if !volumes.is_empty() {
404            region_volumes.push(RegionVolumes {
405                region,
406                volumes
407            });
408        }
409    }
410    
411    Ok(ProviderRegionVolumes {
412        provider,
413        regions: region_volumes
414    })
415}
416
417/// Counts the total number of storage volumes for a specific provider
418pub async fn count_volumes_for_provider(
419    pool: &Pool<MySql>,
420    provider_id: i64,
421) -> anyhow::Result<i64> {
422    // Get the total count of volumes for this provider
423    let (total_volumes,) = sqlx::query_as::<_, (i64,)>(
424        r#"
425        SELECT
426            COUNT(v.id)
427        FROM 
428            storage_volumes v
429        INNER JOIN 
430            workers w ON v.node_id = w.id
431        INNER JOIN 
432            regions r ON w.region_id = r.id
433        WHERE 
434            r.provider = ?
435        "#
436    )
437    .bind(provider_id)
438    .fetch_one(pool)
439    .await
440    .context("Failed to count volumes for provider")?;
441    
442    Ok(total_volumes)
443}