b4af85e31b 2025-06-29 1: use std::{
b4af85e31b 2025-06-29 2: borrow::Cow,
b4af85e31b 2025-06-29 3: fmt,
dc7c43b010 2026-01-01 4: sync::Arc,
b4af85e31b 2025-06-29 5: };
b4af85e31b 2025-06-29 6:
dc7c43b010 2026-01-01 7: use smol::lock::Mutex;
0340541002 2025-04-24 8: use chrono::{
0340541002 2025-04-24 9: DateTime,
0340541002 2025-04-24 10: FixedOffset,
0340541002 2025-04-24 11: Local,
0340541002 2025-04-24 12: };
0340541002 2025-04-24 13: use sqlx::{
0340541002 2025-04-24 14: Postgres,
0340541002 2025-04-24 15: Row,
0340541002 2025-04-24 16: postgres::PgPoolOptions,
0340541002 2025-04-24 17: pool::PoolConnection,
44575a91d3 2025-07-09 18: };
44575a91d3 2025-07-09 19: use stacked_errors::{
44575a91d3 2025-07-09 20: Result,
44575a91d3 2025-07-09 21: StackableErr,
44575a91d3 2025-07-09 22: bail,
b4af85e31b 2025-06-29 23: };
0340541002 2025-04-24 24:
0340541002 2025-04-24 25: #[derive(sqlx::FromRow, Debug)]
0340541002 2025-04-24 26: pub struct List {
e3f7eeb26a 2025-04-26 27: pub source_id: i32,
0340541002 2025-04-24 28: pub channel: String,
0340541002 2025-04-24 29: pub enabled: bool,
0340541002 2025-04-24 30: pub url: String,
0340541002 2025-04-24 31: pub iv_hash: Option<String>,
0340541002 2025-04-24 32: pub url_re: Option<String>,
b4af85e31b 2025-06-29 33: }
b4af85e31b 2025-06-29 34:
b4af85e31b 2025-06-29 35: impl fmt::Display for List {
44575a91d3 2025-07-09 36: fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
42b29b744b 2026-01-01 37: write!(f, "\\#feed\\_{} \\*ļøā£ `{}` {}\nš `{}`", self.source_id, self.channel,
b4af85e31b 2025-06-29 38: match self.enabled {
b4af85e31b 2025-06-29 39: true => "š enabled",
b4af85e31b 2025-06-29 40: false => "ā disabled",
b4af85e31b 2025-06-29 41: }, self.url)?;
b4af85e31b 2025-06-29 42: if let Some(iv_hash) = &self.iv_hash {
b4af85e31b 2025-06-29 43: write!(f, "\nIV: `{iv_hash}`")?;
b4af85e31b 2025-06-29 44: }
b4af85e31b 2025-06-29 45: if let Some(url_re) = &self.url_re {
b4af85e31b 2025-06-29 46: write!(f, "\nRE: `{url_re}`")?;
b4af85e31b 2025-06-29 47: }
b4af85e31b 2025-06-29 48: Ok(())
b4af85e31b 2025-06-29 49: }
0340541002 2025-04-24 50: }
0340541002 2025-04-24 51:
0340541002 2025-04-24 52: #[derive(sqlx::FromRow, Debug)]
0340541002 2025-04-24 53: pub struct Source {
0340541002 2025-04-24 54: pub channel_id: i64,
0340541002 2025-04-24 55: pub url: String,
0340541002 2025-04-24 56: pub iv_hash: Option<String>,
0340541002 2025-04-24 57: pub owner: i64,
0340541002 2025-04-24 58: pub url_re: Option<String>,
0340541002 2025-04-24 59: }
0340541002 2025-04-24 60:
0340541002 2025-04-24 61: #[derive(sqlx::FromRow)]
0340541002 2025-04-24 62: pub struct Queue {
0340541002 2025-04-24 63: pub source_id: Option<i32>,
0340541002 2025-04-24 64: pub next_fetch: Option<DateTime<Local>>,
0340541002 2025-04-24 65: pub owner: Option<i64>,
acb0a4ac54 2025-09-28 66: pub last_scrape: DateTime<Local>,
bb89b6fab8 2025-06-15 67: }
bb89b6fab8 2025-06-15 68:
bb89b6fab8 2025-06-15 69: #[derive(Clone)]
c6d3e97290 2025-07-01 70: pub struct Db (
c6d3e97290 2025-07-01 71: Arc<Mutex<sqlx::Pool<sqlx::Postgres>>>,
c6d3e97290 2025-07-01 72: );
bb89b6fab8 2025-06-15 73:
bb89b6fab8 2025-06-15 74: impl Db {
bb89b6fab8 2025-06-15 75: pub fn new (pguri: &str) -> Result<Db> {
c6d3e97290 2025-07-01 76: Ok(Db (
c6d3e97290 2025-07-01 77: Arc::new(Mutex::new(PgPoolOptions::new()
bb89b6fab8 2025-06-15 78: .max_connections(5)
bb89b6fab8 2025-06-15 79: .acquire_timeout(std::time::Duration::new(300, 0))
bb89b6fab8 2025-06-15 80: .idle_timeout(std::time::Duration::new(60, 0))
44575a91d3 2025-07-09 81: .connect_lazy(pguri)
44575a91d3 2025-07-09 82: .stack()?)),
c6d3e97290 2025-07-01 83: ))
bb89b6fab8 2025-06-15 84: }
bb89b6fab8 2025-06-15 85:
bb89b6fab8 2025-06-15 86: pub async fn begin(&self) -> Result<Conn> {
c6d3e97290 2025-07-01 87: let pool = self.0.lock_arc().await;
44575a91d3 2025-07-09 88: let conn = Conn ( pool.acquire().await.stack()? );
bb89b6fab8 2025-06-15 89: Ok(conn)
bb89b6fab8 2025-06-15 90: }
bb89b6fab8 2025-06-15 91: }
bb89b6fab8 2025-06-15 92:
c6d3e97290 2025-07-01 93: pub struct Conn (
c6d3e97290 2025-07-01 94: PoolConnection<Postgres>,
c6d3e97290 2025-07-01 95: );
bb89b6fab8 2025-06-15 96:
c6d3e97290 2025-07-01 97: impl Conn {
bb89b6fab8 2025-06-15 98: pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
bb89b6fab8 2025-06-15 99: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
bb89b6fab8 2025-06-15 100: .bind(source_id)
bb89b6fab8 2025-06-15 101: .bind(date)
bb89b6fab8 2025-06-15 102: .bind(post_url)
44575a91d3 2025-07-09 103: .execute(&mut *self.0).await.stack()?;
bb89b6fab8 2025-06-15 104: Ok(())
bb89b6fab8 2025-06-15 105: }
bb89b6fab8 2025-06-15 106:
bb89b6fab8 2025-06-15 107: pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
bb89b6fab8 2025-06-15 108: where I: Into<i64> {
bb89b6fab8 2025-06-15 109: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
bb89b6fab8 2025-06-15 110: .bind(source_id)
bb89b6fab8 2025-06-15 111: .bind(owner.into())
44575a91d3 2025-07-09 112: .execute(&mut *self.0).await.stack()?.rows_affected() {
0340541002 2025-04-24 113: 0 => { Ok("No data found found.".into()) },
0340541002 2025-04-24 114: x => { Ok(format!("{x} posts purged.").into()) },
0340541002 2025-04-24 115: }
0340541002 2025-04-24 116: }
0340541002 2025-04-24 117:
bb89b6fab8 2025-06-15 118: pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
bb89b6fab8 2025-06-15 119: where I: Into<i64> {
0340541002 2025-04-24 120: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
0340541002 2025-04-24 121: .bind(source_id)
bb89b6fab8 2025-06-15 122: .bind(owner.into())
44575a91d3 2025-07-09 123: .execute(&mut *self.0).await.stack()?.rows_affected() {
0340541002 2025-04-24 124: 0 => { Ok("No data found found.".into()) },
44575a91d3 2025-07-09 125: x => { Ok(format!("{x} sources removed.").into()) },
0340541002 2025-04-24 126: }
0340541002 2025-04-24 127: }
0340541002 2025-04-24 128:
bb89b6fab8 2025-06-15 129: pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
bb89b6fab8 2025-06-15 130: where I: Into<i64> {
0340541002 2025-04-24 131: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
0340541002 2025-04-24 132: .bind(source_id)
bb89b6fab8 2025-06-15 133: .bind(owner.into())
44575a91d3 2025-07-09 134: .execute(&mut *self.0).await.stack()?.rows_affected() {
0340541002 2025-04-24 135: 1 => { Ok("Source disabled.") },
0340541002 2025-04-24 136: 0 => { Ok("Source not found.") },
0340541002 2025-04-24 137: _ => { bail!("Database error.") },
0340541002 2025-04-24 138: }
0340541002 2025-04-24 139: }
0340541002 2025-04-24 140:
bb89b6fab8 2025-06-15 141: pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
bb89b6fab8 2025-06-15 142: where I: Into<i64> {
0340541002 2025-04-24 143: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
0340541002 2025-04-24 144: .bind(source_id)
bb89b6fab8 2025-06-15 145: .bind(owner.into())
44575a91d3 2025-07-09 146: .execute(&mut *self.0).await.stack()?.rows_affected() {
0340541002 2025-04-24 147: 1 => { Ok("Source enabled.") },
0340541002 2025-04-24 148: 0 => { Ok("Source not found.") },
0340541002 2025-04-24 149: _ => { bail!("Database error.") },
0340541002 2025-04-24 150: }
0340541002 2025-04-24 151: }
0340541002 2025-04-24 152:
dc2089ff6a 2026-01-07 153: pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<bool>
bb89b6fab8 2025-06-15 154: where I: Into<i64> {
0340541002 2025-04-24 155: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
0340541002 2025-04-24 156: .bind(post_url)
bb89b6fab8 2025-06-15 157: .bind(id.into())
44575a91d3 2025-07-09 158: .fetch_one(&mut *self.0).await.stack()?;
dc2089ff6a 2026-01-07 159: if let Some(exists) = row.try_get("exists").stack()? {
dc2089ff6a 2026-01-07 160: Ok(exists)
dc2089ff6a 2026-01-07 161: } else {
dc2089ff6a 2026-01-07 162: bail!("Database error: can't check whether source exists.");
dc2089ff6a 2026-01-07 163: }
0340541002 2025-04-24 164: }
0340541002 2025-04-24 165:
dc2089ff6a 2026-01-07 166: /// Get all pending events for (now + 1 minute)
0340541002 2025-04-24 167: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
acb0a4ac54 2025-09-28 168: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner, last_scrape from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
44575a91d3 2025-07-09 169: .fetch_all(&mut *self.0).await.stack()?;
0340541002 2025-04-24 170: Ok(block)
0340541002 2025-04-24 171: }
0340541002 2025-04-24 172:
bb89b6fab8 2025-06-15 173: pub async fn get_list <I> (&mut self, owner: I) -> Result<Vec<List>>
bb89b6fab8 2025-06-15 174: where I: Into<i64> {
bb89b6fab8 2025-06-15 175: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
bb89b6fab8 2025-06-15 176: .bind(owner.into())
44575a91d3 2025-07-09 177: .fetch_all(&mut *self.0).await.stack()?;
b4af85e31b 2025-06-29 178: Ok(source)
b4af85e31b 2025-06-29 179: }
b4af85e31b 2025-06-29 180:
b4af85e31b 2025-06-29 181: pub async fn get_one <I> (&mut self, owner: I, id: i32) -> Result<Option<List>>
b4af85e31b 2025-06-29 182: where I: Into<i64> {
b4af85e31b 2025-06-29 183: let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and source_id = $2")
b4af85e31b 2025-06-29 184: .bind(owner.into())
b4af85e31b 2025-06-29 185: .bind(id)
44575a91d3 2025-07-09 186: .fetch_optional(&mut *self.0).await.stack()?;
bb89b6fab8 2025-06-15 187: Ok(source)
bb89b6fab8 2025-06-15 188: }
bb89b6fab8 2025-06-15 189:
bb89b6fab8 2025-06-15 190: pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
bb89b6fab8 2025-06-15 191: where I: Into<i64> {
bb89b6fab8 2025-06-15 192: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
bb89b6fab8 2025-06-15 193: .bind(id)
bb89b6fab8 2025-06-15 194: .bind(owner.into())
44575a91d3 2025-07-09 195: .fetch_one(&mut *self.0).await.stack()?;
bb89b6fab8 2025-06-15 196: Ok(source)
bb89b6fab8 2025-06-15 197: }
bb89b6fab8 2025-06-15 198:
bb89b6fab8 2025-06-15 199: pub async fn set_scrape <I> (&mut self, id: I) -> Result<()>
bb89b6fab8 2025-06-15 200: where I: Into<i64> {
bb89b6fab8 2025-06-15 201: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
bb89b6fab8 2025-06-15 202: .bind(id.into())
44575a91d3 2025-07-09 203: .execute(&mut *self.0).await.stack()?;
bb89b6fab8 2025-06-15 204: Ok(())
bb89b6fab8 2025-06-15 205: }
bb89b6fab8 2025-06-15 206:
bb89b6fab8 2025-06-15 207: pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str>
bb89b6fab8 2025-06-15 208: where I: Into<i64> {
0340541002 2025-04-24 209: match match update {
0340541002 2025-04-24 210: Some(id) => {
0340541002 2025-04-24 211: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
0340541002 2025-04-24 212: .bind(id)
0340541002 2025-04-24 213: },
0340541002 2025-04-24 214: None => {
0340541002 2025-04-24 215: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
0340541002 2025-04-24 216: },
0340541002 2025-04-24 217: }
0340541002 2025-04-24 218: .bind(channel_id)
0340541002 2025-04-24 219: .bind(url)
0340541002 2025-04-24 220: .bind(iv_hash)
bb89b6fab8 2025-06-15 221: .bind(owner.into())
0340541002 2025-04-24 222: .bind(channel)
0340541002 2025-04-24 223: .bind(url_re)
c6d3e97290 2025-07-01 224: .execute(&mut *self.0).await
0340541002 2025-04-24 225: {
0340541002 2025-04-24 226: Ok(_) => Ok(match update {
0340541002 2025-04-24 227: Some(_) => "Channel updated.",
0340541002 2025-04-24 228: None => "Channel added.",
0340541002 2025-04-24 229: }),
0340541002 2025-04-24 230: Err(sqlx::Error::Database(err)) => {
0340541002 2025-04-24 231: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
0340541002 2025-04-24 232: Some("_bt_check_unique", ) => {
0340541002 2025-04-24 233: Ok("Duplicate key.")
0340541002 2025-04-24 234: },
0340541002 2025-04-24 235: Some(_) => {
0340541002 2025-04-24 236: Ok("Database error.")
0340541002 2025-04-24 237: },
0340541002 2025-04-24 238: None => {
0340541002 2025-04-24 239: Ok("No database error extracted.")
0340541002 2025-04-24 240: },
0340541002 2025-04-24 241: }
0340541002 2025-04-24 242: },
0340541002 2025-04-24 243: Err(err) => {
0340541002 2025-04-24 244: bail!("Sorry, unknown error:\n{err:#?}\n");
0340541002 2025-04-24 245: },
0340541002 2025-04-24 246: }
0340541002 2025-04-24 247: }
0340541002 2025-04-24 248: }