Annotation For src/sql.rs
Logged in as anonymous

Lines of src/sql.rs from check-in 9adc69d514 that are changed by the sequence of edits moving toward check-in 374eadef45:

                         1: use crate::{
                         2: 	Arc,
                         3: 	Mutex,
                         4: };
                         5: 
                         6: use std::{
                         7: 	borrow::Cow,
                         8: 	fmt,
                         9: };
                        10: 
                        11: use chrono::{
                        12: 	DateTime,
                        13: 	FixedOffset,
                        14: 	Local,
                        15: };
                        16: use sqlx::{
                        17: 	Postgres,
                        18: 	Row,
                        19: 	postgres::PgPoolOptions,
                        20: 	pool::PoolConnection,
                        21: };
                        22: use stacked_errors::{
                        23: 	Result,
                        24: 	StackableErr,
                        25: 	bail,
                        26: };
                        27: 
                        28: #[derive(sqlx::FromRow, Debug)]
                        29: pub struct List {
                        30: 	pub source_id: i32,
                        31: 	pub channel: String,
                        32: 	pub enabled: bool,
                        33: 	pub url: String,
                        34: 	pub iv_hash: Option<String>,
                        35: 	pub url_re: Option<String>,
                        36: }
                        37: 
                        38: impl fmt::Display for List {
                        39: 	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
                        40: 		write!(f, "\\#feed\\_{} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", self.source_id, self.channel,
                        41: 			match self.enabled {
                        42: 				true  => "šŸ”„ enabled",
                        43: 				false => "ā›” disabled",
                        44: 			}, self.url)?;
                        45: 		if let Some(iv_hash) = &self.iv_hash {
                        46: 			write!(f, "\nIV: `{iv_hash}`")?;
                        47: 		}
                        48: 		if let Some(url_re) = &self.url_re {
                        49: 			write!(f, "\nRE: `{url_re}`")?;
                        50: 		}
                        51: 		Ok(())
                        52: 	}
                        53: }
                        54: 
                        55: /// One feed, used for caching and menu navigation
                        56: #[derive(sqlx::FromRow, Debug)]
                        57: pub struct Feed {
                        58: 	pub source_id: i32,
                        59: 	pub channel: String,
                        60: }
                        61: 
                        62: #[derive(sqlx::FromRow, Debug)]
                        63: pub struct Source {
                        64: 	pub channel_id: i64,
                        65: 	pub url: String,
                        66: 	pub iv_hash: Option<String>,
                        67: 	pub owner: i64,
                        68: 	pub url_re: Option<String>,
                        69: }
                        70: 
                        71: #[derive(sqlx::FromRow)]
                        72: pub struct Queue {
                        73: 	pub source_id: Option<i32>,
                        74: 	pub next_fetch: Option<DateTime<Local>>,
                        75: 	pub owner: Option<i64>,
                        76: 	pub last_scrape: DateTime<Local>,
                        77: }
                        78: 
                        79: #[derive(Clone)]
                        80: pub struct Db (
                        81: 	Arc<Mutex<sqlx::Pool<sqlx::Postgres>>>,
                        82: );
                        83: 
                        84: impl Db {
                        85: 	pub fn new (pguri: &str) -> Result<Db> {
                        86: 		Ok(Db (
                        87: 			Arc::new(Mutex::new(PgPoolOptions::new()
                        88: 				.max_connections(5)
                        89: 				.acquire_timeout(std::time::Duration::new(300, 0))
                        90: 				.idle_timeout(std::time::Duration::new(60, 0))
                        91: 				.connect_lazy(pguri)
                        92: 				.stack()?)),
                        93: 		))
                        94: 	}
                        95: 
                        96: 	pub async fn begin(&self) -> Result<Conn> {
                        97: 		let pool = self.0.lock_arc().await;
                        98: 		let conn = Conn ( pool.acquire().await.stack()? );
                        99: 		Ok(conn)
                       100: 	}
                       101: }
                       102: 
                       103: pub struct Conn (
                       104: 	PoolConnection<Postgres>,
                       105: );
                       106: 
                       107: impl Conn {
                       108: 	pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
                       109: 		sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
                       110: 			.bind(source_id)
                       111: 			.bind(date)
                       112: 			.bind(post_url)
                       113: 			.execute(&mut *self.0).await.stack()?;
                       114: 		Ok(())
                       115: 	}
                       116: 
                       117: 	pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
                       118: 	where I: Into<i64> {
                       119: 		match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
                       120: 			.bind(source_id)
                       121: 			.bind(owner.into())
                       122: 			.execute(&mut *self.0).await.stack()?.rows_affected() {
                       123: 			0 => { Ok("No data found found.".into()) },
                       124: 			x => { Ok(format!("{x} posts purged.").into()) },
                       125: 		}
                       126: 	}
                       127: 
                       128: 	pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
                       129: 	where I: Into<i64> {
                       130: 		match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
                       131: 			.bind(source_id)
                       132: 			.bind(owner.into())
                       133: 			.execute(&mut *self.0).await.stack()?.rows_affected() {
                       134: 			0 => { Ok("No data found found.".into()) },
                       135: 			x => { Ok(format!("{x} sources removed.").into()) },
                       136: 		}
                       137: 	}
                       138: 
                       139: 	pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
                       140: 	where I: Into<i64> {
                       141: 		match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
                       142: 			.bind(source_id)
                       143: 			.bind(owner.into())
                       144: 			.execute(&mut *self.0).await.stack()?.rows_affected() {
                       145: 			1 => { Ok("Source disabled.") },
                       146: 			0 => { Ok("Source not found.") },
                       147: 			_ => { bail!("Database error.") },
                       148: 		}
                       149: 	}
                       150: 
                       151: 	pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
                       152: 	where I: Into<i64> {
                       153: 		match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
                       154: 			.bind(source_id)
                       155: 			.bind(owner.into())
                       156: 			.execute(&mut *self.0).await.stack()?.rows_affected() {
                       157: 			1 => { Ok("Source enabled.") },
                       158: 			0 => { Ok("Source not found.") },
                       159: 			_ => { bail!("Database error.") },
                       160: 		}
                       161: 	}
                       162: 
                       163: 	/// Checks whether a post with the given URL exists for the specified source.
                       164: 	///
                       165: 	/// # Parameters
                       166: 	/// - `post_url`: The URL of the post to check.
                       167: 	/// - `id`: The source identifier (converted to `i64`).
                       168: 	///
                       169: 	/// # Returns
                       170: 	/// `true` if a post with the URL exists for the source, `false` otherwise.
                       171: 	pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<bool>
                       172: 	where I: Into<i64> {
                       173: 		let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
                       174: 			.bind(post_url)
                       175: 			.bind(id.into())
                       176: 			.fetch_one(&mut *self.0).await.stack()?;
                       177: 		row.try_get("exists")
                       178: 			.stack_err("Database error: can't check whether post exists.")
                       179: 	}
                       180: 
                       181: 	pub async fn get_feeds <I>(&mut self, owner: I) -> Result<Vec<Feed>>
                       182: 	where I: Into<i64> {
                       183: 		let block: Vec<Feed> = sqlx::query_as("select source_id, channel from rsstg_source where owner = $1 order by source_id")
                       184: 			.bind(owner.into())
                       185: 			.fetch_all(&mut *self.0).await.stack()?;
                       186: 		Ok(block)
                       187: 	}
                       188: 
                       189: 	/// Get all pending events for (now + 1 minute)
                       190: 	pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
                       191: 		let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner, last_scrape from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
                       192: 			.fetch_all(&mut *self.0).await.stack()?;
                       193: 		Ok(block)
                       194: 	}
                       195: 
                       196: 	pub async fn get_list <I>(&mut self, owner: I) -> Result<Vec<List>>
                       197: 	where I: Into<i64> {
                       198: 		let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
                       199: 			.bind(owner.into())
                       200: 			.fetch_all(&mut *self.0).await.stack()?;
                       201: 		Ok(source)
                       202: 	}
                       203: 
                       204: 	pub async fn get_one <I> (&mut self, owner: I, id: i32) -> Result<Option<List>>
                       205: 	where I: Into<i64> {
                       206: 		let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and source_id = $2")
                       207: 			.bind(owner.into())
                       208: 			.bind(id)
                       209: 			.fetch_optional(&mut *self.0).await.stack()?;
                       210: 		Ok(source)
                       211: 	}
                       212: 
                       213: 	pub async fn get_one_name <I> (&mut self, owner: I, name: &str) -> Result<Option<List>>
                       214: 	where I: Into<i64> {
                       215: 		let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and channel = $2")
                       216: 			.bind(owner.into())
                       217: 			.bind(name)
                       218: 			.fetch_optional(&mut *self.0).await.stack()?;
                       219: 		Ok(source)
                       220: 	}
                       221: 
                       222: 	pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
                       223: 	where I: Into<i64> {
                       224: 		let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
                       225: 			.bind(id)
                       226: 			.bind(owner.into())
                       227: 			.fetch_one(&mut *self.0).await.stack()?;
                       228: 		Ok(source)
                       229: 	}
                       230: 
                       231: 	pub async fn set_scrape <I> (&mut self, id: I) -> Result<()>
                       232: 	where I: Into<i64> {
                       233: 		sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
                       234: 			.bind(id.into())
                       235: 			.execute(&mut *self.0).await.stack()?;
                       236: 		Ok(())
                       237: 	}
                       238: 
                       239: 	pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str>
                       240: 	where I: Into<i64> {
                       241: 		match match update {
                       242: 				Some(id) => {
                       243: 					sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
                       244: 						.bind(id)
                       245: 				},
                       246: 				None => {
                       247: 					sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
                       248: 				},
                       249: 			}
                       250: 				.bind(channel_id)
                       251: 				.bind(url)
                       252: 				.bind(iv_hash)
                       253: 				.bind(owner.into())
                       254: 				.bind(channel)
                       255: 				.bind(url_re)
                       256: 				.execute(&mut *self.0).await
                       257: 		{
                       258: 			Ok(_) => Ok(match update {
                       259: 				Some(_) => "Channel updated.",
                       260: 				None => "Channel added.",
                       261: 			}),
                       262: 			Err(sqlx::Error::Database(err)) => {
                       263: 				match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
                       264: 					Some("_bt_check_unique", ) => {
                       265: 						Ok("Duplicate key.")
                       266: 					},
                       267: 					Some(_) => {
                       268: 						Ok("Database error.")
                       269: 					},
                       270: 					None => {
                       271: 						Ok("No database error extracted.")
                       272: 					},
                       273: 				}
                       274: 			},
                       275: 			Err(err) => {
                       276: 				bail!("Sorry, unknown error:\n{err:#?}\n");
                       277: 			},
                       278: 		}
                       279: 	}
                       280: }