omni_orchestrator/schemas/v1/db/queries/
storage.rs1use crate::models::provider::Provider;
2use crate::models::storage::{
3 StorageClass,
4 StorageVolume,
5 StorageSnapshot,
6 StorageQosPolicy,
7 StorageMigration,
8};
9use crate::models::region::Region;
10use anyhow::Context;
11use sqlx::{MySql, Pool};
12use sqlx::Row;
13
14#[derive(Default, Debug)]
16pub struct StorageClassFilter {
17 pub storage_type: Option<String>,
18 pub volume_binding_mode: Option<String>,
19 pub allow_volume_expansion: Option<bool>,
20}
21
22#[derive(Default, Debug, Clone)]
24pub struct StorageVolumeFilter {
25 pub app_id: Option<i64>,
26 pub storage_class_id: Option<i64>,
27 pub status: Option<String>,
28 pub node_id: Option<i64>,
29 pub persistence_level: Option<String>,
30 pub write_concern: Option<String>,
31}
32
33pub async fn list_storage_classes(
35 pool: &Pool<MySql>,
36 filter: StorageClassFilter,
37) -> anyhow::Result<Vec<StorageClass>> {
38 let mut query_builder = sqlx::QueryBuilder::new("SELECT * FROM storage_classes WHERE 1=1");
39
40 if let Some(storage_type) = filter.storage_type {
41 query_builder.push(" AND storage_type = ");
42 query_builder.push_bind(storage_type);
43 }
44
45 if let Some(binding_mode) = filter.volume_binding_mode {
46 query_builder.push(" AND volume_binding_mode = ");
47 query_builder.push_bind(binding_mode);
48 }
49
50 if let Some(allow_expansion) = filter.allow_volume_expansion {
51 query_builder.push(" AND allow_volume_expansion = ");
52 query_builder.push_bind(allow_expansion);
53 }
54
55 let query = query_builder.build_query_as::<StorageClass>();
56 let storage_classes = query
57 .fetch_all(pool)
58 .await
59 .context("Failed to fetch storage classes")?;
60
61 Ok(storage_classes)
62}
63
64pub async fn get_storage_class_by_id(
66 pool: &Pool<MySql>,
67 id: i64,
68) -> anyhow::Result<Option<StorageClass>> {
69 let storage_class = sqlx::query_as::<_, StorageClass>(
70 "SELECT * FROM storage_classes WHERE id = ?"
71 )
72 .bind(id)
73 .fetch_optional(pool)
74 .await
75 .context("Failed to fetch storage class")?;
76
77 Ok(storage_class)
78}
79
80pub async fn list_storage_volumes(
82 pool: &Pool<MySql>,
83 filter: StorageVolumeFilter,
84 page: i64,
85 per_page: i64,
86) -> anyhow::Result<Vec<StorageVolume>> {
87 let offset = page * per_page;
88
89 let mut query_builder = sqlx::QueryBuilder::new("SELECT * FROM storage_volumes WHERE 1=1");
90
91 if let Some(app_id) = filter.app_id {
92 query_builder.push(" AND app_id = ");
93 query_builder.push_bind(app_id);
94 }
95
96 if let Some(storage_class_id) = filter.storage_class_id {
97 query_builder.push(" AND storage_class_id = ");
98 query_builder.push_bind(storage_class_id);
99 }
100
101 if let Some(status) = &filter.status {
102 query_builder.push(" AND status = ");
103 query_builder.push_bind(status);
104 }
105
106 if let Some(node_id) = filter.node_id {
107 query_builder.push(" AND node_id = ");
108 query_builder.push_bind(node_id);
109 }
110
111 if let Some(persistence_level) = &filter.persistence_level {
112 query_builder.push(" AND persistence_level = ");
113 query_builder.push_bind(persistence_level);
114 }
115
116 if let Some(write_concern) = &filter.write_concern {
117 query_builder.push(" AND write_concern = ");
118 query_builder.push_bind(write_concern);
119 }
120
121 query_builder.push(" LIMIT ");
122 query_builder.push_bind(per_page);
123 query_builder.push(" OFFSET ");
124 query_builder.push_bind(offset);
125
126 let query = query_builder.build_query_as::<StorageVolume>();
127 let storage_volumes = query
128 .fetch_all(pool)
129 .await
130 .context("Failed to fetch storage volumes")?;
131
132 Ok(storage_volumes)
133}
134
135pub async fn count_storage_volumes_with_filter(
137 pool: &Pool<MySql>,
138 filter: &StorageVolumeFilter,
139) -> anyhow::Result<i64> {
140 let mut query_builder = sqlx::QueryBuilder::new("SELECT COUNT(*) FROM storage_volumes WHERE 1=1");
141
142 if let Some(app_id) = filter.app_id {
143 query_builder.push(" AND app_id = ");
144 query_builder.push_bind(app_id);
145 }
146
147 if let Some(storage_class_id) = filter.storage_class_id {
148 query_builder.push(" AND storage_class_id = ");
149 query_builder.push_bind(storage_class_id);
150 }
151
152 if let Some(status) = &filter.status {
153 query_builder.push(" AND status = ");
154 query_builder.push_bind(status);
155 }
156
157 if let Some(node_id) = filter.node_id {
158 query_builder.push(" AND node_id = ");
159 query_builder.push_bind(node_id);
160 }
161
162 if let Some(persistence_level) = &filter.persistence_level {
163 query_builder.push(" AND persistence_level = ");
164 query_builder.push_bind(persistence_level);
165 }
166
167 if let Some(write_concern) = &filter.write_concern {
168 query_builder.push(" AND write_concern = ");
169 query_builder.push_bind(write_concern);
170 }
171
172 let query = query_builder.build_query_as::<(i64,)>();
173 let (count,) = query
174 .fetch_one(pool)
175 .await
176 .context("Failed to count storage volumes")?;
177
178 Ok(count)
179}
180
181pub async fn get_volumes_by_storage_class(
183 pool: &Pool<MySql>,
184 storage_class_id: i64,
185 page: i64,
186 per_page: i64,
187) -> anyhow::Result<Vec<StorageVolume>> {
188 let offset = page * per_page;
189 let query = "SELECT * FROM storage_volumes WHERE storage_class_id = ? LIMIT ? OFFSET ?";
190
191 let volumes = sqlx::query_as::<_, StorageVolume>(query)
192 .bind(storage_class_id)
193 .bind(per_page)
194 .bind(offset)
195 .fetch_all(pool)
196 .await
197 .context("Failed to fetch volumes by storage class")?;
198
199 Ok(volumes)
200}
201
202pub async fn list_storage_qos_policies(
204 pool: &Pool<MySql>,
205) -> anyhow::Result<Vec<StorageQosPolicy>> {
206 let policies = sqlx::query_as::<_, StorageQosPolicy>(
207 "SELECT * FROM storage_qos_policies"
208 )
209 .fetch_all(pool)
210 .await
211 .context("Failed to fetch storage QoS policies")?;
212
213 Ok(policies)
214}
215
216pub async fn get_volumes_by_write_concern(
218 pool: &Pool<MySql>,
219 write_concern: String,
220 page: i64,
221 per_page: i64,
222) -> anyhow::Result<Vec<StorageVolume>> {
223 let offset = page * per_page;
224 let query = "SELECT * FROM storage_volumes WHERE write_concern = ? LIMIT ? OFFSET ?";
225
226 let volumes = sqlx::query_as::<_, StorageVolume>(query)
227 .bind(write_concern)
228 .bind(per_page)
229 .bind(offset)
230 .fetch_all(pool)
231 .await
232 .context("Failed to fetch volumes by write concern")?;
233
234 Ok(volumes)
235}
236
237pub async fn get_volumes_by_persistence_level(
239 pool: &Pool<MySql>,
240 persistence_level: String,
241 page: i64,
242 per_page: i64,
243) -> anyhow::Result<Vec<StorageVolume>> {
244 let offset = page * per_page;
245 let query = "SELECT * FROM storage_volumes WHERE persistence_level = ? LIMIT ? OFFSET ?";
246
247 let volumes = sqlx::query_as::<_, StorageVolume>(query)
248 .bind(persistence_level)
249 .bind(per_page)
250 .bind(offset)
251 .fetch_all(pool)
252 .await
253 .context("Failed to fetch volumes by persistence level")?;
254
255 Ok(volumes)
256}
257
258#[derive(Debug, serde::Serialize)]
260pub struct RegionVolumes {
261 pub region: Region,
262 pub volumes: Vec<StorageVolume>
263}
264
265pub async fn get_volumes_for_region(
267 pool: &Pool<MySql>,
268 region_id: i64,
269 page: i64,
270 per_page: i64,
271) -> anyhow::Result<RegionVolumes> {
272 let region = sqlx::query_as::<_, Region>("SELECT * FROM regions WHERE id = ?")
274 .bind(region_id)
275 .fetch_one(pool)
276 .await
277 .context("Failed to fetch region")?;
278
279 let offset = page * per_page;
281
282 let volumes = sqlx::query_as::<_, StorageVolume>(
284 r#"
285 SELECT
286 v.*
287 FROM
288 storage_volumes v
289 INNER JOIN
290 workers w ON v.node_id = w.id
291 WHERE
292 w.region_id = ?
293 ORDER BY
294 v.id
295 LIMIT ? OFFSET ?
296 "#
297 )
298 .bind(region_id)
299 .bind(per_page)
300 .bind(offset)
301 .fetch_all(pool)
302 .await
303 .context("Failed to fetch volumes for region")?;
304
305 Ok(RegionVolumes {
306 region,
307 volumes
308 })
309}
310
311pub async fn count_volumes_for_region(
313 pool: &Pool<MySql>,
314 region_id: i64,
315) -> anyhow::Result<i64> {
316 let (total_volumes,) = sqlx::query_as::<_, (i64,)>(
318 r#"
319 SELECT
320 COUNT(v.id)
321 FROM
322 storage_volumes v
323 INNER JOIN
324 workers w ON v.node_id = w.id
325 INNER JOIN
326 nodes n ON w.id = n.worker_id
327 WHERE
328 w.region_id = ?
329 "#
330 )
331 .bind(region_id)
332 .fetch_one(pool)
333 .await
334 .context("Failed to count volumes for region")?;
335
336 Ok(total_volumes)
337}
338
339#[derive(serde::Serialize)]
340pub struct ProviderRegionVolumes {
341 pub provider: Provider,
342 pub regions: Vec<RegionVolumes>
343}
344
345pub async fn get_volumes_for_provider(
347 pool: &Pool<MySql>,
348 provider_id: i64,
349 page: i64,
350 per_page: i64,
351) -> anyhow::Result<ProviderRegionVolumes> {
352 let provider = sqlx::query_as::<_, Provider>("SELECT * FROM providers WHERE id = ?")
354 .bind(provider_id)
355 .fetch_one(pool)
356 .await
357 .context("Failed to fetch provider")?;
358
359 let regions = sqlx::query_as::<_, Region>(
361 "SELECT * FROM regions WHERE provider = ? ORDER BY name"
362 )
363 .bind(provider_id)
364 .fetch_all(pool)
365 .await
366 .context("Failed to fetch regions for provider")?;
367
368 let mut region_volumes = Vec::new();
369
370 let offset = page * per_page;
372
373 for region in regions {
375 let volumes = sqlx::query_as::<_, StorageVolume>(
377 r#"
378 SELECT
379 v.*
380 FROM
381 storage_volumes v
382 INNER JOIN
383 workers w ON v.node_id = w.id
384 INNER JOIN
385 regions r ON w.region_id = r.id
386 WHERE
387 r.provider = ?
388 AND r.id = ?
389 ORDER BY
390 v.id
391 LIMIT ? OFFSET ?
392 "#
393 )
394 .bind(provider_id)
395 .bind(region.id)
396 .bind(per_page)
397 .bind(offset)
398 .fetch_all(pool)
399 .await
400 .context(format!("Failed to fetch volumes for region {}", region.id))?;
401
402 if !volumes.is_empty() {
404 region_volumes.push(RegionVolumes {
405 region,
406 volumes
407 });
408 }
409 }
410
411 Ok(ProviderRegionVolumes {
412 provider,
413 regions: region_volumes
414 })
415}
416
417pub async fn count_volumes_for_provider(
419 pool: &Pool<MySql>,
420 provider_id: i64,
421) -> anyhow::Result<i64> {
422 let (total_volumes,) = sqlx::query_as::<_, (i64,)>(
424 r#"
425 SELECT
426 COUNT(v.id)
427 FROM
428 storage_volumes v
429 INNER JOIN
430 workers w ON v.node_id = w.id
431 INNER JOIN
432 regions r ON w.region_id = r.id
433 WHERE
434 r.provider = ?
435 "#
436 )
437 .bind(provider_id)
438 .fetch_one(pool)
439 .await
440 .context("Failed to count volumes for provider")?;
441
442 Ok(total_volumes)
443}