Lines of src/sql.rs from check-in e3f7eeb26a that are changed by the sequence of edits moving toward check-in bb89b6fab8:
e3f7eeb26a 2025-04-26 1: use std::borrow::Cow; 2: 3: use anyhow::{ 4: Result, 5: bail, 6: }; 7: use chrono::{ 8: DateTime, 9: FixedOffset, 10: Local, 11: }; 12: use sqlx::{ e3f7eeb26a 2025-04-26 13: Pool, 14: Postgres, 15: Row, 16: postgres::PgPoolOptions, 17: pool::PoolConnection, 18: }; 19: 20: #[derive(sqlx::FromRow, Debug)] 21: pub struct List { 22: pub source_id: i32, 23: pub channel: String, 24: pub enabled: bool, 25: pub url: String, 26: pub iv_hash: Option<String>, 27: pub url_re: Option<String>, 28: } 29: 30: #[derive(sqlx::FromRow, Debug)] 31: pub struct Source { 32: pub channel_id: i64, 33: pub url: String, 34: pub iv_hash: Option<String>, 35: pub owner: i64, 36: pub url_re: Option<String>, 37: } 38: 39: #[derive(sqlx::FromRow)] 40: pub struct Queue { 41: pub source_id: Option<i32>, 42: pub next_fetch: Option<DateTime<Local>>, 43: pub owner: Option<i64>, 44: } 45: 46: #[derive(Clone)] 47: pub struct Db { e3f7eeb26a 2025-04-26 48: pool: sqlx::Pool<sqlx::Postgres>, 49: } 50: 51: pub struct Conn{ 52: conn: PoolConnection<Postgres>, 53: } 54: 55: impl Db { 56: pub fn new (pguri: &str) -> Result<Db> { 57: Ok(Db{ e3f7eeb26a 2025-04-26 58: pool: PgPoolOptions::new() 59: .max_connections(5) 60: .acquire_timeout(std::time::Duration::new(300, 0)) 61: .idle_timeout(std::time::Duration::new(60, 0)) e3f7eeb26a 2025-04-26 62: .connect_lazy(pguri)?, 63: }) 64: } 65: e3f7eeb26a 2025-04-26 66: pub async fn begin(&mut self) -> Result<Conn> { e3f7eeb26a 2025-04-26 67: Conn::new(&mut self.pool).await 68: } 69: } 70: 71: impl Conn { e3f7eeb26a 2025-04-26 72: pub async fn new (pool: &mut Pool<Postgres>) -> Result<Conn> { e3f7eeb26a 2025-04-26 73: let conn = pool.acquire().await?; 74: Ok(Conn{ 75: conn, 76: }) 77: } 78: e3f7eeb26a 2025-04-26 79: pub async fn add_post (&mut self, id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> { 80: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") e3f7eeb26a 2025-04-26 81: .bind(id) 82: .bind(date) 83: .bind(post_url) 84: .execute(&mut *self.conn).await?; 85: Ok(()) 86: } 87: e3f7eeb26a 2025-04-26 88: pub async fn clean (&mut self, source_id: i32, owner: i64) -> Result<Cow<'_, str>> { 89: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;") 90: .bind(source_id) e3f7eeb26a 2025-04-26 91: .bind(owner) 92: .execute(&mut *self.conn).await?.rows_affected() { 93: 0 => { Ok("No data found found.".into()) }, 94: x => { Ok(format!("{x} posts purged.").into()) }, 95: } 96: } 97: e3f7eeb26a 2025-04-26 98: pub async fn delete (&mut self, source_id: i32, owner: i64) -> Result<Cow<'_, str>> { 99: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;") 100: .bind(source_id) e3f7eeb26a 2025-04-26 101: .bind(owner) 102: .execute(&mut *self.conn).await?.rows_affected() { 103: 0 => { Ok("No data found found.".into()) }, 104: x => { Ok(format!("{} sources removed.", x).into()) }, 105: } 106: } 107: e3f7eeb26a 2025-04-26 108: pub async fn disable (&mut self, source_id: i32, owner: i64) -> Result<&str> { 109: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2") 110: .bind(source_id) e3f7eeb26a 2025-04-26 111: .bind(owner) 112: .execute(&mut *self.conn).await?.rows_affected() { 113: 1 => { Ok("Source disabled.") }, 114: 0 => { Ok("Source not found.") }, 115: _ => { bail!("Database error.") }, 116: } 117: } 118: e3f7eeb26a 2025-04-26 119: pub async fn enable (&mut self, source_id: i32, owner: i64) -> Result<&str> { 120: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2") 121: .bind(source_id) e3f7eeb26a 2025-04-26 122: .bind(owner) 123: .execute(&mut *self.conn).await?.rows_affected() { 124: 1 => { Ok("Source enabled.") }, 125: 0 => { Ok("Source not found.") }, 126: _ => { bail!("Database error.") }, 127: } 128: } 129: e3f7eeb26a 2025-04-26 130: pub async fn exists (&mut self, post_url: &str, id: i32) -> Result<Option<bool>> { 131: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") 132: .bind(post_url) e3f7eeb26a 2025-04-26 133: .bind(id) 134: .fetch_one(&mut *self.conn).await?; 135: let exists: Option<bool> = row.try_get("exists")?; 136: Ok(exists) 137: } 138: 139: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> { 140: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") 141: .fetch_all(&mut *self.conn).await?; 142: Ok(block) 143: } 144: e3f7eeb26a 2025-04-26 145: pub async fn get_list (&mut self, owner: i64) -> Result<Vec<List>> { 146: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id") e3f7eeb26a 2025-04-26 147: .bind(owner) 148: .fetch_all(&mut *self.conn).await?; 149: Ok(source) 150: } 151: e3f7eeb26a 2025-04-26 152: pub async fn get_source (&mut self, id: i32, owner: i64) -> Result<Source> { 153: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2") 154: .bind(id) e3f7eeb26a 2025-04-26 155: .bind(owner) 156: .fetch_one(&mut *self.conn).await?; 157: Ok(source) 158: } 159: e3f7eeb26a 2025-04-26 160: pub async fn set_scrape (&mut self, id: i32) -> Result<()> { 161: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") e3f7eeb26a 2025-04-26 162: .bind(id) 163: .execute(&mut *self.conn).await?; 164: Ok(()) 165: } 166: e3f7eeb26a 2025-04-26 167: pub async fn update (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: i64) -> Result<&str> { 168: match match update { 169: Some(id) => { 170: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1") 171: .bind(id) 172: }, 173: None => { 174: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)") 175: }, 176: } 177: .bind(channel_id) 178: .bind(url) 179: .bind(iv_hash) e3f7eeb26a 2025-04-26 180: .bind(owner) 181: .bind(channel) 182: .bind(url_re) 183: .execute(&mut *self.conn).await 184: { 185: Ok(_) => Ok(match update { 186: Some(_) => "Channel updated.", 187: None => "Channel added.", 188: }), 189: Err(sqlx::Error::Database(err)) => { 190: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() { 191: Some("_bt_check_unique", ) => { 192: Ok("Duplicate key.") 193: }, 194: Some(_) => { 195: Ok("Database error.") 196: }, 197: None => { 198: Ok("No database error extracted.") 199: }, 200: } 201: }, 202: Err(err) => { 203: bail!("Sorry, unknown error:\n{err:#?}\n"); 204: }, 205: } 206: } 207: }