Annotation For src/sql.rs
Logged in as anonymous

Lines of src/sql.rs from check-in 0340541002 that are changed by the sequence of edits moving toward check-in e3f7eeb26a:

                         1: use std::borrow::Cow;
                         2: 
                         3: use anyhow::{
                         4: 	Result,
                         5: 	bail,
                         6: };
                         7: use chrono::{
                         8: 	DateTime,
                         9: 	FixedOffset,
                        10: 	Local,
                        11: };
                        12: use sqlx::{
                        13: 	Pool,
                        14: 	Postgres,
                        15: 	Row,
                        16: 	postgres::PgPoolOptions,
                        17: 	pool::PoolConnection,
                        18: };
                        19: 
                        20: #[derive(sqlx::FromRow, Debug)]
                        21: pub struct List {
0340541002 2025-04-24   22: 	pub source_id: i64,
                        23: 	pub channel: String,
                        24: 	pub enabled: bool,
                        25: 	pub url: String,
                        26: 	pub iv_hash: Option<String>,
                        27: 	pub url_re: Option<String>,
                        28: }
                        29: 
                        30: #[derive(sqlx::FromRow, Debug)]
                        31: pub struct Source {
                        32: 	pub channel_id: i64,
                        33: 	pub url: String,
                        34: 	pub iv_hash: Option<String>,
                        35: 	pub owner: i64,
                        36: 	pub url_re: Option<String>,
                        37: }
                        38: 
                        39: #[derive(sqlx::FromRow)]
                        40: pub struct Queue {
                        41: 	pub source_id: Option<i32>,
                        42: 	pub next_fetch: Option<DateTime<Local>>,
                        43: 	pub owner: Option<i64>,
                        44: }
                        45: 
                        46: #[derive(Clone)]
                        47: pub struct Db {
                        48: 	pool: sqlx::Pool<sqlx::Postgres>,
                        49: }
                        50: 
                        51: pub struct Conn{
                        52: 	conn: PoolConnection<Postgres>,
                        53: }
                        54: 
                        55: impl Db {
                        56: 	pub fn new (pguri: &str) -> Result<Db> {
                        57: 		Ok(Db{
                        58: 			pool: PgPoolOptions::new()
                        59: 				.max_connections(5)
                        60: 				.acquire_timeout(std::time::Duration::new(300, 0))
                        61: 				.idle_timeout(std::time::Duration::new(60, 0))
                        62: 				.connect_lazy(pguri)?,
                        63: 		})
                        64: 	}
                        65: 
                        66: 	pub async fn begin(&mut self) -> Result<Conn> {
                        67: 		Conn::new(&mut self.pool).await
                        68: 	}
                        69: }
                        70: 
                        71: impl Conn {
                        72: 	pub async fn new (pool: &mut Pool<Postgres>) -> Result<Conn> {
                        73: 		let conn = pool.acquire().await?;
                        74: 		Ok(Conn{
                        75: 			conn,
                        76: 		})
                        77: 	}
                        78: 
                        79: 	pub async fn add_post (&mut self, id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
                        80: 		sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
                        81: 			.bind(id)
                        82: 			.bind(date)
                        83: 			.bind(post_url)
                        84: 			.execute(&mut *self.conn).await?;
                        85: 		Ok(())
                        86: 	}
                        87: 
                        88: 	pub async fn clean (&mut self, source_id: i32, owner: i64) -> Result<Cow<'_, str>> {
                        89: 		match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
                        90: 			.bind(source_id)
                        91: 			.bind(owner)
                        92: 			.execute(&mut *self.conn).await?.rows_affected() {
                        93: 			0 => { Ok("No data found found.".into()) },
                        94: 			x => { Ok(format!("{x} posts purged.").into()) },
                        95: 		}
                        96: 	}
                        97: 
                        98: 	pub async fn delete (&mut self, source_id: i32, owner: i64) -> Result<Cow<'_, str>> {
                        99: 		match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
                       100: 			.bind(source_id)
                       101: 			.bind(owner)
                       102: 			.execute(&mut *self.conn).await?.rows_affected() {
                       103: 			0 => { Ok("No data found found.".into()) },
                       104: 			x => { Ok(format!("{} sources removed.", x).into()) },
                       105: 		}
                       106: 	}
                       107: 
                       108: 	pub async fn disable (&mut self, source_id: i32, owner: i64) -> Result<&str> {
                       109: 		match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
                       110: 			.bind(source_id)
                       111: 			.bind(owner)
                       112: 			.execute(&mut *self.conn).await?.rows_affected() {
                       113: 			1 => { Ok("Source disabled.") },
                       114: 			0 => { Ok("Source not found.") },
                       115: 			_ => { bail!("Database error.") },
                       116: 		}
                       117: 	}
                       118: 
                       119: 	pub async fn enable (&mut self, source_id: i32, owner: i64) -> Result<&str> {
                       120: 		match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
                       121: 			.bind(source_id)
                       122: 			.bind(owner)
                       123: 			.execute(&mut *self.conn).await?.rows_affected() {
                       124: 			1 => { Ok("Source enabled.") },
                       125: 			0 => { Ok("Source not found.") },
                       126: 			_ => { bail!("Database error.") },
                       127: 		}
                       128: 	}
                       129: 
                       130: 	pub async fn exists (&mut self, post_url: &str, id: i32) -> Result<Option<bool>> {
                       131: 		let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
                       132: 			.bind(post_url)
                       133: 			.bind(id)
                       134: 			.fetch_one(&mut *self.conn).await?;
                       135: 		let exists: Option<bool> = row.try_get("exists")?;
                       136: 		Ok(exists)
                       137: 	}
                       138: 
                       139: 	pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
                       140: 		let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
                       141: 			.fetch_all(&mut *self.conn).await?;
                       142: 		Ok(block)
                       143: 	}
                       144: 
                       145: 	pub async fn get_list (&mut self, owner: i64) -> Result<Vec<List>> {
                       146: 		let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
                       147: 			.bind(owner)
                       148: 			.fetch_all(&mut *self.conn).await?;
                       149: 		Ok(source)
                       150: 	}
                       151: 
                       152: 	pub async fn get_source (&mut self, id: i32, owner: i64) -> Result<Source> {
                       153: 		let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
                       154: 			.bind(id)
                       155: 			.bind(owner)
                       156: 			.fetch_one(&mut *self.conn).await?;
                       157: 		Ok(source)
                       158: 	}
                       159: 
                       160: 	pub async fn set_scrape (&mut self, id: i32) -> Result<()> {
                       161: 		sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
                       162: 			.bind(id)
                       163: 			.execute(&mut *self.conn).await?;
                       164: 		Ok(())
                       165: 	}
                       166: 
                       167: 	pub async fn update (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: i64) -> Result<&str> {
                       168: 		match match update {
                       169: 				Some(id) => {
                       170: 					sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
                       171: 						.bind(id)
                       172: 				},
                       173: 				None => {
                       174: 					sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
                       175: 				},
                       176: 			}
                       177: 				.bind(channel_id)
                       178: 				.bind(url)
                       179: 				.bind(iv_hash)
                       180: 				.bind(owner)
                       181: 				.bind(channel)
                       182: 				.bind(url_re)
                       183: 				.execute(&mut *self.conn).await
                       184: 			{
                       185: 			Ok(_) => Ok(match update {
                       186: 				Some(_) => "Channel updated.",
                       187: 				None => "Channel added.",
                       188: 			}),
                       189: 			Err(sqlx::Error::Database(err)) => {
                       190: 				match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
                       191: 					Some("_bt_check_unique", ) => {
                       192: 						Ok("Duplicate key.")
                       193: 					},
                       194: 					Some(_) => {
                       195: 						Ok("Database error.")
                       196: 					},
                       197: 					None => {
                       198: 						Ok("No database error extracted.")
                       199: 					},
                       200: 				}
                       201: 			},
                       202: 			Err(err) => {
                       203: 				bail!("Sorry, unknown error:\n{err:#?}\n");
                       204: 			},
                       205: 		}
                       206: 	}
                       207: }