Lines of src/sql.rs from check-in bb89b6fab8 that are changed by the sequence of edits moving toward check-in fae13a0e55:
bb89b6fab8 2025-06-15 1: use std::{ bb89b6fab8 2025-06-15 2: borrow::Cow, bb89b6fab8 2025-06-15 3: sync::{ bb89b6fab8 2025-06-15 4: Arc, bb89b6fab8 2025-06-15 5: Mutex, bb89b6fab8 2025-06-15 6: }, bb89b6fab8 2025-06-15 7: }; 8: 9: use anyhow::{ 10: Result, 11: bail, 12: }; 13: use chrono::{ 14: DateTime, 15: FixedOffset, 16: Local, 17: }; 18: use sqlx::{ 19: Postgres, 20: Row, 21: postgres::PgPoolOptions, 22: pool::PoolConnection, 23: }; 24: 25: #[derive(sqlx::FromRow, Debug)] 26: pub struct List { 27: pub source_id: i32, 28: pub channel: String, 29: pub enabled: bool, 30: pub url: String, 31: pub iv_hash: Option<String>, 32: pub url_re: Option<String>, 33: } 34: 35: #[derive(sqlx::FromRow, Debug)] 36: pub struct Source { 37: pub channel_id: i64, 38: pub url: String, 39: pub iv_hash: Option<String>, 40: pub owner: i64, 41: pub url_re: Option<String>, 42: } 43: 44: #[derive(sqlx::FromRow)] 45: pub struct Queue { 46: pub source_id: Option<i32>, 47: pub next_fetch: Option<DateTime<Local>>, 48: pub owner: Option<i64>, 49: } 50: 51: #[derive(Clone)] 52: pub struct Db { bb89b6fab8 2025-06-15 53: pool: Arc<Mutex<Arc<sqlx::Pool<sqlx::Postgres>>>>, 54: } 55: 56: pub struct Conn{ 57: conn: PoolConnection<Postgres>, 58: } 59: 60: impl Db { 61: pub fn new (pguri: &str) -> Result<Db> { 62: Ok(Db{ bb89b6fab8 2025-06-15 63: pool: Arc::new(Mutex::new(Arc::new(PgPoolOptions::new() 64: .max_connections(5) 65: .acquire_timeout(std::time::Duration::new(300, 0)) 66: .idle_timeout(std::time::Duration::new(60, 0)) bb89b6fab8 2025-06-15 67: .connect_lazy(pguri)?))), 68: }) 69: } 70: 71: pub async fn begin(&self) -> Result<Conn> { bb89b6fab8 2025-06-15 72: let pool = self.pool.lock().unwrap().clone(); 73: let conn = Conn::new(pool.acquire().await?).await?; 74: Ok(conn) 75: } 76: } 77: 78: impl Conn { 79: pub async fn new (conn: PoolConnection<Postgres>) -> Result<Conn> { 80: Ok(Conn{ 81: conn, 82: }) 83: } 84: 85: pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> { 86: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") 87: .bind(source_id) 88: .bind(date) 89: .bind(post_url) 90: .execute(&mut *self.conn).await?; 91: Ok(()) 92: } 93: 94: pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>> 95: where I: Into<i64> { 96: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;") 97: .bind(source_id) 98: .bind(owner.into()) 99: .execute(&mut *self.conn).await?.rows_affected() { 100: 0 => { Ok("No data found found.".into()) }, 101: x => { Ok(format!("{x} posts purged.").into()) }, 102: } 103: } 104: 105: pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>> 106: where I: Into<i64> { 107: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;") 108: .bind(source_id) 109: .bind(owner.into()) 110: .execute(&mut *self.conn).await?.rows_affected() { 111: 0 => { Ok("No data found found.".into()) }, 112: x => { Ok(format!("{} sources removed.", x).into()) }, 113: } 114: } 115: 116: pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str> 117: where I: Into<i64> { 118: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2") 119: .bind(source_id) 120: .bind(owner.into()) 121: .execute(&mut *self.conn).await?.rows_affected() { 122: 1 => { Ok("Source disabled.") }, 123: 0 => { Ok("Source not found.") }, 124: _ => { bail!("Database error.") }, 125: } 126: } 127: 128: pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str> 129: where I: Into<i64> { 130: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2") 131: .bind(source_id) 132: .bind(owner.into()) 133: .execute(&mut *self.conn).await?.rows_affected() { 134: 1 => { Ok("Source enabled.") }, 135: 0 => { Ok("Source not found.") }, 136: _ => { bail!("Database error.") }, 137: } 138: } 139: 140: pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<Option<bool>> 141: where I: Into<i64> { 142: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") 143: .bind(post_url) 144: .bind(id.into()) 145: .fetch_one(&mut *self.conn).await?; 146: let exists: Option<bool> = row.try_get("exists")?; 147: Ok(exists) 148: } 149: 150: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> { 151: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") 152: .fetch_all(&mut *self.conn).await?; 153: Ok(block) 154: } 155: 156: pub async fn get_list <I> (&mut self, owner: I) -> Result<Vec<List>> 157: where I: Into<i64> { 158: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id") 159: .bind(owner.into()) 160: .fetch_all(&mut *self.conn).await?; 161: Ok(source) 162: } 163: 164: pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source> 165: where I: Into<i64> { 166: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2") 167: .bind(id) 168: .bind(owner.into()) 169: .fetch_one(&mut *self.conn).await?; 170: Ok(source) 171: } 172: 173: pub async fn set_scrape <I> (&mut self, id: I) -> Result<()> 174: where I: Into<i64> { 175: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") 176: .bind(id.into()) 177: .execute(&mut *self.conn).await?; 178: Ok(()) 179: } 180: 181: pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str> 182: where I: Into<i64> { 183: match match update { 184: Some(id) => { 185: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1") 186: .bind(id) 187: }, 188: None => { 189: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)") 190: }, 191: } 192: .bind(channel_id) 193: .bind(url) 194: .bind(iv_hash) 195: .bind(owner.into()) 196: .bind(channel) 197: .bind(url_re) 198: .execute(&mut *self.conn).await 199: { 200: Ok(_) => Ok(match update { 201: Some(_) => "Channel updated.", 202: None => "Channel added.", 203: }), 204: Err(sqlx::Error::Database(err)) => { 205: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() { 206: Some("_bt_check_unique", ) => { 207: Ok("Duplicate key.") 208: }, 209: Some(_) => { 210: Ok("Database error.") 211: }, 212: None => { 213: Ok("No database error extracted.") 214: }, 215: } 216: }, 217: Err(err) => { 218: bail!("Sorry, unknown error:\n{err:#?}\n"); 219: }, 220: } 221: } 222: }