omni_orchestrator/schemas/v1/db/queries/
storage.rs1use anyhow::Context;
2use sqlx::{MySql, Pool};
3use sqlx::Row;
4
5use libomni::types::db::v1 as types;
6use types::provider::Provider;
7use types::region::Region;
8use types::storage::{
9 StorageClass,
10 StorageVolume,
11 StorageSnapshot,
12 StorageQosPolicy,
13 StorageMigration,
14};
15
16#[derive(Default, Debug)]
18pub struct StorageClassFilter {
19 pub storage_type: Option<String>,
20 pub volume_binding_mode: Option<String>,
21 pub allow_volume_expansion: Option<bool>,
22}
23
24#[derive(Default, Debug, Clone)]
26pub struct StorageVolumeFilter {
27 pub app_id: Option<i64>,
28 pub storage_class_id: Option<i64>,
29 pub status: Option<String>,
30 pub node_id: Option<i64>,
31 pub persistence_level: Option<String>,
32 pub write_concern: Option<String>,
33}
34
35pub async fn list_storage_classes(
37 pool: &Pool<MySql>,
38 filter: StorageClassFilter,
39) -> anyhow::Result<Vec<StorageClass>> {
40 let mut query_builder = sqlx::QueryBuilder::new("SELECT * FROM storage_classes WHERE 1=1");
41
42 if let Some(storage_type) = filter.storage_type {
43 query_builder.push(" AND storage_type = ");
44 query_builder.push_bind(storage_type);
45 }
46
47 if let Some(binding_mode) = filter.volume_binding_mode {
48 query_builder.push(" AND volume_binding_mode = ");
49 query_builder.push_bind(binding_mode);
50 }
51
52 if let Some(allow_expansion) = filter.allow_volume_expansion {
53 query_builder.push(" AND allow_volume_expansion = ");
54 query_builder.push_bind(allow_expansion);
55 }
56
57 let query = query_builder.build_query_as::<StorageClass>();
58 let storage_classes = query
59 .fetch_all(pool)
60 .await
61 .context("Failed to fetch storage classes")?;
62
63 Ok(storage_classes)
64}
65
66pub async fn get_storage_class_by_id(
68 pool: &Pool<MySql>,
69 id: i64,
70) -> anyhow::Result<Option<StorageClass>> {
71 let storage_class = sqlx::query_as::<_, StorageClass>(
72 "SELECT * FROM storage_classes WHERE id = ?"
73 )
74 .bind(id)
75 .fetch_optional(pool)
76 .await
77 .context("Failed to fetch storage class")?;
78
79 Ok(storage_class)
80}
81
82pub async fn list_storage_volumes(
84 pool: &Pool<MySql>,
85 filter: StorageVolumeFilter,
86 page: i64,
87 per_page: i64,
88) -> anyhow::Result<Vec<StorageVolume>> {
89 let offset = page * per_page;
90
91 let mut query_builder = sqlx::QueryBuilder::new("SELECT * FROM storage_volumes WHERE 1=1");
92
93 if let Some(app_id) = filter.app_id {
94 query_builder.push(" AND app_id = ");
95 query_builder.push_bind(app_id);
96 }
97
98 if let Some(storage_class_id) = filter.storage_class_id {
99 query_builder.push(" AND storage_class_id = ");
100 query_builder.push_bind(storage_class_id);
101 }
102
103 if let Some(status) = &filter.status {
104 query_builder.push(" AND status = ");
105 query_builder.push_bind(status);
106 }
107
108 if let Some(node_id) = filter.node_id {
109 query_builder.push(" AND node_id = ");
110 query_builder.push_bind(node_id);
111 }
112
113 if let Some(persistence_level) = &filter.persistence_level {
114 query_builder.push(" AND persistence_level = ");
115 query_builder.push_bind(persistence_level);
116 }
117
118 if let Some(write_concern) = &filter.write_concern {
119 query_builder.push(" AND write_concern = ");
120 query_builder.push_bind(write_concern);
121 }
122
123 query_builder.push(" LIMIT ");
124 query_builder.push_bind(per_page);
125 query_builder.push(" OFFSET ");
126 query_builder.push_bind(offset);
127
128 let query = query_builder.build_query_as::<StorageVolume>();
129 let storage_volumes = query
130 .fetch_all(pool)
131 .await
132 .context("Failed to fetch storage volumes")?;
133
134 Ok(storage_volumes)
135}
136
137pub async fn count_storage_volumes_with_filter(
139 pool: &Pool<MySql>,
140 filter: &StorageVolumeFilter,
141) -> anyhow::Result<i64> {
142 let mut query_builder = sqlx::QueryBuilder::new("SELECT COUNT(*) FROM storage_volumes WHERE 1=1");
143
144 if let Some(app_id) = filter.app_id {
145 query_builder.push(" AND app_id = ");
146 query_builder.push_bind(app_id);
147 }
148
149 if let Some(storage_class_id) = filter.storage_class_id {
150 query_builder.push(" AND storage_class_id = ");
151 query_builder.push_bind(storage_class_id);
152 }
153
154 if let Some(status) = &filter.status {
155 query_builder.push(" AND status = ");
156 query_builder.push_bind(status);
157 }
158
159 if let Some(node_id) = filter.node_id {
160 query_builder.push(" AND node_id = ");
161 query_builder.push_bind(node_id);
162 }
163
164 if let Some(persistence_level) = &filter.persistence_level {
165 query_builder.push(" AND persistence_level = ");
166 query_builder.push_bind(persistence_level);
167 }
168
169 if let Some(write_concern) = &filter.write_concern {
170 query_builder.push(" AND write_concern = ");
171 query_builder.push_bind(write_concern);
172 }
173
174 let query = query_builder.build_query_as::<(i64,)>();
175 let (count,) = query
176 .fetch_one(pool)
177 .await
178 .context("Failed to count storage volumes")?;
179
180 Ok(count)
181}
182
183pub async fn get_volumes_by_storage_class(
185 pool: &Pool<MySql>,
186 storage_class_id: i64,
187 page: i64,
188 per_page: i64,
189) -> anyhow::Result<Vec<StorageVolume>> {
190 let offset = page * per_page;
191 let query = "SELECT * FROM storage_volumes WHERE storage_class_id = ? LIMIT ? OFFSET ?";
192
193 let volumes = sqlx::query_as::<_, StorageVolume>(query)
194 .bind(storage_class_id)
195 .bind(per_page)
196 .bind(offset)
197 .fetch_all(pool)
198 .await
199 .context("Failed to fetch volumes by storage class")?;
200
201 Ok(volumes)
202}
203
204pub async fn list_storage_qos_policies(
206 pool: &Pool<MySql>,
207) -> anyhow::Result<Vec<StorageQosPolicy>> {
208 let policies = sqlx::query_as::<_, StorageQosPolicy>(
209 "SELECT * FROM storage_qos_policies"
210 )
211 .fetch_all(pool)
212 .await
213 .context("Failed to fetch storage QoS policies")?;
214
215 Ok(policies)
216}
217
218pub async fn get_volumes_by_write_concern(
220 pool: &Pool<MySql>,
221 write_concern: String,
222 page: i64,
223 per_page: i64,
224) -> anyhow::Result<Vec<StorageVolume>> {
225 let offset = page * per_page;
226 let query = "SELECT * FROM storage_volumes WHERE write_concern = ? LIMIT ? OFFSET ?";
227
228 let volumes = sqlx::query_as::<_, StorageVolume>(query)
229 .bind(write_concern)
230 .bind(per_page)
231 .bind(offset)
232 .fetch_all(pool)
233 .await
234 .context("Failed to fetch volumes by write concern")?;
235
236 Ok(volumes)
237}
238
239pub async fn get_volumes_by_persistence_level(
241 pool: &Pool<MySql>,
242 persistence_level: String,
243 page: i64,
244 per_page: i64,
245) -> anyhow::Result<Vec<StorageVolume>> {
246 let offset = page * per_page;
247 let query = "SELECT * FROM storage_volumes WHERE persistence_level = ? LIMIT ? OFFSET ?";
248
249 let volumes = sqlx::query_as::<_, StorageVolume>(query)
250 .bind(persistence_level)
251 .bind(per_page)
252 .bind(offset)
253 .fetch_all(pool)
254 .await
255 .context("Failed to fetch volumes by persistence level")?;
256
257 Ok(volumes)
258}
259
260#[derive(Debug, serde::Serialize)]
262pub struct RegionVolumes {
263 pub region: Region,
264 pub volumes: Vec<StorageVolume>
265}
266
267pub async fn get_volumes_for_region(
269 pool: &Pool<MySql>,
270 region_id: i64,
271 page: i64,
272 per_page: i64,
273) -> anyhow::Result<RegionVolumes> {
274 let region = sqlx::query_as::<_, Region>("SELECT * FROM regions WHERE id = ?")
276 .bind(region_id)
277 .fetch_one(pool)
278 .await
279 .context("Failed to fetch region")?;
280
281 let offset = page * per_page;
283
284 let volumes = sqlx::query_as::<_, StorageVolume>(
286 r#"
287 SELECT
288 v.*
289 FROM
290 storage_volumes v
291 INNER JOIN
292 workers w ON v.node_id = w.id
293 WHERE
294 w.region_id = ?
295 ORDER BY
296 v.id
297 LIMIT ? OFFSET ?
298 "#
299 )
300 .bind(region_id)
301 .bind(per_page)
302 .bind(offset)
303 .fetch_all(pool)
304 .await
305 .context("Failed to fetch volumes for region")?;
306
307 Ok(RegionVolumes {
308 region,
309 volumes
310 })
311}
312
313pub async fn count_volumes_for_region(
315 pool: &Pool<MySql>,
316 region_id: i64,
317) -> anyhow::Result<i64> {
318 let (total_volumes,) = sqlx::query_as::<_, (i64,)>(
320 r#"
321 SELECT
322 COUNT(v.id)
323 FROM
324 storage_volumes v
325 INNER JOIN
326 workers w ON v.node_id = w.id
327 INNER JOIN
328 nodes n ON w.id = n.worker_id
329 WHERE
330 w.region_id = ?
331 "#
332 )
333 .bind(region_id)
334 .fetch_one(pool)
335 .await
336 .context("Failed to count volumes for region")?;
337
338 Ok(total_volumes)
339}
340
341#[derive(serde::Serialize)]
342pub struct ProviderRegionVolumes {
343 pub provider: Provider,
344 pub regions: Vec<RegionVolumes>
345}
346
347pub async fn get_volumes_for_provider(
349 pool: &Pool<MySql>,
350 provider_id: i64,
351 page: i64,
352 per_page: i64,
353) -> anyhow::Result<ProviderRegionVolumes> {
354 let provider = sqlx::query_as::<_, Provider>("SELECT * FROM providers WHERE id = ?")
356 .bind(provider_id)
357 .fetch_one(pool)
358 .await
359 .context("Failed to fetch provider")?;
360
361 let regions = sqlx::query_as::<_, Region>(
363 "SELECT * FROM regions WHERE provider = ? ORDER BY name"
364 )
365 .bind(provider_id)
366 .fetch_all(pool)
367 .await
368 .context("Failed to fetch regions for provider")?;
369
370 let mut region_volumes = Vec::new();
371
372 let offset = page * per_page;
374
375 for region in regions {
377 let volumes = sqlx::query_as::<_, StorageVolume>(
379 r#"
380 SELECT
381 v.*
382 FROM
383 storage_volumes v
384 INNER JOIN
385 workers w ON v.node_id = w.id
386 INNER JOIN
387 regions r ON w.region_id = r.id
388 WHERE
389 r.provider = ?
390 AND r.id = ?
391 ORDER BY
392 v.id
393 LIMIT ? OFFSET ?
394 "#
395 )
396 .bind(provider_id)
397 .bind(region.id)
398 .bind(per_page)
399 .bind(offset)
400 .fetch_all(pool)
401 .await
402 .context(format!("Failed to fetch volumes for region {}", region.id))?;
403
404 if !volumes.is_empty() {
406 region_volumes.push(RegionVolumes {
407 region,
408 volumes
409 });
410 }
411 }
412
413 Ok(ProviderRegionVolumes {
414 provider,
415 regions: region_volumes
416 })
417}
418
419pub async fn count_volumes_for_provider(
421 pool: &Pool<MySql>,
422 provider_id: i64,
423) -> anyhow::Result<i64> {
424 let (total_volumes,) = sqlx::query_as::<_, (i64,)>(
426 r#"
427 SELECT
428 COUNT(v.id)
429 FROM
430 storage_volumes v
431 INNER JOIN
432 workers w ON v.node_id = w.id
433 INNER JOIN
434 regions r ON w.region_id = r.id
435 WHERE
436 r.provider = ?
437 "#
438 )
439 .bind(provider_id)
440 .fetch_one(pool)
441 .await
442 .context("Failed to count volumes for provider")?;
443
444 Ok(total_volumes)
445}