bb89b6fab8 2025-06-15 1: use std::{
bb89b6fab8 2025-06-15 2: borrow::Cow,
bb89b6fab8 2025-06-15 3: sync::{
bb89b6fab8 2025-06-15 4: Arc,
bb89b6fab8 2025-06-15 5: Mutex,
bb89b6fab8 2025-06-15 6: },
bb89b6fab8 2025-06-15 7: };
0340541002 2025-04-24 8:
0340541002 2025-04-24 9: use anyhow::{
0340541002 2025-04-24 10: Result,
0340541002 2025-04-24 11: bail,
0340541002 2025-04-24 12: };
0340541002 2025-04-24 13: use chrono::{
0340541002 2025-04-24 14: DateTime,
0340541002 2025-04-24 15: FixedOffset,
0340541002 2025-04-24 16: Local,
0340541002 2025-04-24 17: };
0340541002 2025-04-24 18: use sqlx::{
0340541002 2025-04-24 19: Postgres,
0340541002 2025-04-24 20: Row,
0340541002 2025-04-24 21: postgres::PgPoolOptions,
0340541002 2025-04-24 22: pool::PoolConnection,
0340541002 2025-04-24 23: };
0340541002 2025-04-24 24:
0340541002 2025-04-24 25: #[derive(sqlx::FromRow, Debug)]
0340541002 2025-04-24 26: pub struct List {
e3f7eeb26a 2025-04-26 27: pub source_id: i32,
0340541002 2025-04-24 28: pub channel: String,
0340541002 2025-04-24 29: pub enabled: bool,
0340541002 2025-04-24 30: pub url: String,
0340541002 2025-04-24 31: pub iv_hash: Option<String>,
0340541002 2025-04-24 32: pub url_re: Option<String>,
0340541002 2025-04-24 33: }
0340541002 2025-04-24 34:
0340541002 2025-04-24 35: #[derive(sqlx::FromRow, Debug)]
0340541002 2025-04-24 36: pub struct Source {
0340541002 2025-04-24 37: pub channel_id: i64,
0340541002 2025-04-24 38: pub url: String,
0340541002 2025-04-24 39: pub iv_hash: Option<String>,
0340541002 2025-04-24 40: pub owner: i64,
0340541002 2025-04-24 41: pub url_re: Option<String>,
0340541002 2025-04-24 42: }
0340541002 2025-04-24 43:
0340541002 2025-04-24 44: #[derive(sqlx::FromRow)]
0340541002 2025-04-24 45: pub struct Queue {
0340541002 2025-04-24 46: pub source_id: Option<i32>,
0340541002 2025-04-24 47: pub next_fetch: Option<DateTime<Local>>,
0340541002 2025-04-24 48: pub owner: Option<i64>,
0340541002 2025-04-24 49: }
0340541002 2025-04-24 50:
0340541002 2025-04-24 51: #[derive(Clone)]
0340541002 2025-04-24 52: pub struct Db {
bb89b6fab8 2025-06-15 53: pool: Arc<Mutex<Arc<sqlx::Pool<sqlx::Postgres>>>>,
0340541002 2025-04-24 54: }
0340541002 2025-04-24 55:
0340541002 2025-04-24 56: pub struct Conn{
0340541002 2025-04-24 57: conn: PoolConnection<Postgres>,
0340541002 2025-04-24 58: }
0340541002 2025-04-24 59:
0340541002 2025-04-24 60: impl Db {
0340541002 2025-04-24 61: pub fn new (pguri: &str) -> Result<Db> {
0340541002 2025-04-24 62: Ok(Db{
bb89b6fab8 2025-06-15 63: pool: Arc::new(Mutex::new(Arc::new(PgPoolOptions::new()
0340541002 2025-04-24 64: .max_connections(5)
0340541002 2025-04-24 65: .acquire_timeout(std::time::Duration::new(300, 0))
0340541002 2025-04-24 66: .idle_timeout(std::time::Duration::new(60, 0))
bb89b6fab8 2025-06-15 67: .connect_lazy(pguri)?))),
0340541002 2025-04-24 68: })
0340541002 2025-04-24 69: }
0340541002 2025-04-24 70:
bb89b6fab8 2025-06-15 71: pub async fn begin(&self) -> Result<Conn> {
bb89b6fab8 2025-06-15 72: let pool = self.pool.lock().unwrap().clone();
bb89b6fab8 2025-06-15 73: let conn = Conn::new(pool.acquire().await?).await?;
bb89b6fab8 2025-06-15 74: Ok(conn)
0340541002 2025-04-24 75: }
0340541002 2025-04-24 76: }
0340541002 2025-04-24 77:
0340541002 2025-04-24 78: impl Conn {
bb89b6fab8 2025-06-15 79: pub async fn new (conn: PoolConnection<Postgres>) -> Result<Conn> {
0340541002 2025-04-24 80: Ok(Conn{
0340541002 2025-04-24 81: conn,
0340541002 2025-04-24 82: })
0340541002 2025-04-24 83: }
0340541002 2025-04-24 84:
bb89b6fab8 2025-06-15 85: pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
0340541002 2025-04-24 86: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
bb89b6fab8 2025-06-15 87: .bind(source_id)
0340541002 2025-04-24 88: .bind(date)
0340541002 2025-04-24 89: .bind(post_url)
0340541002 2025-04-24 90: .execute(&mut *self.conn).await?;
0340541002 2025-04-24 91: Ok(())
0340541002 2025-04-24 92: }
0340541002 2025-04-24 93:
bb89b6fab8 2025-06-15 94: pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
bb89b6fab8 2025-06-15 95: where I: Into<i64> {
0340541002 2025-04-24 96: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
0340541002 2025-04-24 97: .bind(source_id)
bb89b6fab8 2025-06-15 98: .bind(owner.into())
0340541002 2025-04-24 99: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 100: 0 => { Ok("No data found found.".into()) },
0340541002 2025-04-24 101: x => { Ok(format!("{x} posts purged.").into()) },
0340541002 2025-04-24 102: }
0340541002 2025-04-24 103: }
0340541002 2025-04-24 104:
bb89b6fab8 2025-06-15 105: pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
bb89b6fab8 2025-06-15 106: where I: Into<i64> {
0340541002 2025-04-24 107: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
0340541002 2025-04-24 108: .bind(source_id)
bb89b6fab8 2025-06-15 109: .bind(owner.into())
0340541002 2025-04-24 110: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 111: 0 => { Ok("No data found found.".into()) },
0340541002 2025-04-24 112: x => { Ok(format!("{} sources removed.", x).into()) },
0340541002 2025-04-24 113: }
0340541002 2025-04-24 114: }
0340541002 2025-04-24 115:
bb89b6fab8 2025-06-15 116: pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
bb89b6fab8 2025-06-15 117: where I: Into<i64> {
0340541002 2025-04-24 118: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
0340541002 2025-04-24 119: .bind(source_id)
bb89b6fab8 2025-06-15 120: .bind(owner.into())
0340541002 2025-04-24 121: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 122: 1 => { Ok("Source disabled.") },
0340541002 2025-04-24 123: 0 => { Ok("Source not found.") },
0340541002 2025-04-24 124: _ => { bail!("Database error.") },
0340541002 2025-04-24 125: }
0340541002 2025-04-24 126: }
0340541002 2025-04-24 127:
bb89b6fab8 2025-06-15 128: pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
bb89b6fab8 2025-06-15 129: where I: Into<i64> {
0340541002 2025-04-24 130: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
0340541002 2025-04-24 131: .bind(source_id)
bb89b6fab8 2025-06-15 132: .bind(owner.into())
0340541002 2025-04-24 133: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 134: 1 => { Ok("Source enabled.") },
0340541002 2025-04-24 135: 0 => { Ok("Source not found.") },
0340541002 2025-04-24 136: _ => { bail!("Database error.") },
0340541002 2025-04-24 137: }
0340541002 2025-04-24 138: }
0340541002 2025-04-24 139:
bb89b6fab8 2025-06-15 140: pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<Option<bool>>
bb89b6fab8 2025-06-15 141: where I: Into<i64> {
0340541002 2025-04-24 142: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
0340541002 2025-04-24 143: .bind(post_url)
bb89b6fab8 2025-06-15 144: .bind(id.into())
0340541002 2025-04-24 145: .fetch_one(&mut *self.conn).await?;
0340541002 2025-04-24 146: let exists: Option<bool> = row.try_get("exists")?;
0340541002 2025-04-24 147: Ok(exists)
0340541002 2025-04-24 148: }
0340541002 2025-04-24 149:
0340541002 2025-04-24 150: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
0340541002 2025-04-24 151: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
0340541002 2025-04-24 152: .fetch_all(&mut *self.conn).await?;
0340541002 2025-04-24 153: Ok(block)
0340541002 2025-04-24 154: }
0340541002 2025-04-24 155:
bb89b6fab8 2025-06-15 156: pub async fn get_list <I> (&mut self, owner: I) -> Result<Vec<List>>
bb89b6fab8 2025-06-15 157: where I: Into<i64> {
0340541002 2025-04-24 158: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
bb89b6fab8 2025-06-15 159: .bind(owner.into())
0340541002 2025-04-24 160: .fetch_all(&mut *self.conn).await?;
0340541002 2025-04-24 161: Ok(source)
0340541002 2025-04-24 162: }
0340541002 2025-04-24 163:
bb89b6fab8 2025-06-15 164: pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
bb89b6fab8 2025-06-15 165: where I: Into<i64> {
0340541002 2025-04-24 166: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
0340541002 2025-04-24 167: .bind(id)
bb89b6fab8 2025-06-15 168: .bind(owner.into())
0340541002 2025-04-24 169: .fetch_one(&mut *self.conn).await?;
0340541002 2025-04-24 170: Ok(source)
0340541002 2025-04-24 171: }
0340541002 2025-04-24 172:
bb89b6fab8 2025-06-15 173: pub async fn set_scrape <I> (&mut self, id: I) -> Result<()>
bb89b6fab8 2025-06-15 174: where I: Into<i64> {
0340541002 2025-04-24 175: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
bb89b6fab8 2025-06-15 176: .bind(id.into())
0340541002 2025-04-24 177: .execute(&mut *self.conn).await?;
0340541002 2025-04-24 178: Ok(())
0340541002 2025-04-24 179: }
0340541002 2025-04-24 180:
bb89b6fab8 2025-06-15 181: pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str>
bb89b6fab8 2025-06-15 182: where I: Into<i64> {
0340541002 2025-04-24 183: match match update {
0340541002 2025-04-24 184: Some(id) => {
0340541002 2025-04-24 185: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
0340541002 2025-04-24 186: .bind(id)
0340541002 2025-04-24 187: },
0340541002 2025-04-24 188: None => {
0340541002 2025-04-24 189: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
0340541002 2025-04-24 190: },
0340541002 2025-04-24 191: }
0340541002 2025-04-24 192: .bind(channel_id)
0340541002 2025-04-24 193: .bind(url)
0340541002 2025-04-24 194: .bind(iv_hash)
bb89b6fab8 2025-06-15 195: .bind(owner.into())
0340541002 2025-04-24 196: .bind(channel)
0340541002 2025-04-24 197: .bind(url_re)
0340541002 2025-04-24 198: .execute(&mut *self.conn).await
0340541002 2025-04-24 199: {
0340541002 2025-04-24 200: Ok(_) => Ok(match update {
0340541002 2025-04-24 201: Some(_) => "Channel updated.",
0340541002 2025-04-24 202: None => "Channel added.",
0340541002 2025-04-24 203: }),
0340541002 2025-04-24 204: Err(sqlx::Error::Database(err)) => {
0340541002 2025-04-24 205: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
0340541002 2025-04-24 206: Some("_bt_check_unique", ) => {
0340541002 2025-04-24 207: Ok("Duplicate key.")
0340541002 2025-04-24 208: },
0340541002 2025-04-24 209: Some(_) => {
0340541002 2025-04-24 210: Ok("Database error.")
0340541002 2025-04-24 211: },
0340541002 2025-04-24 212: None => {
0340541002 2025-04-24 213: Ok("No database error extracted.")
0340541002 2025-04-24 214: },
0340541002 2025-04-24 215: }
0340541002 2025-04-24 216: },
0340541002 2025-04-24 217: Err(err) => {
0340541002 2025-04-24 218: bail!("Sorry, unknown error:\n{err:#?}\n");
0340541002 2025-04-24 219: },
0340541002 2025-04-24 220: }
0340541002 2025-04-24 221: }
0340541002 2025-04-24 222: }