b4af85e31b 2025-06-29 1: use std::{
b4af85e31b 2025-06-29 2: borrow::Cow,
b4af85e31b 2025-06-29 3: fmt,
b4af85e31b 2025-06-29 4: };
0340541002 2025-04-24 5:
0340541002 2025-04-24 6: use anyhow::{
0340541002 2025-04-24 7: Result,
0340541002 2025-04-24 8: bail,
fae13a0e55 2025-06-28 9: };
fae13a0e55 2025-06-28 10: use async_std::sync::{
fae13a0e55 2025-06-28 11: Arc,
fae13a0e55 2025-06-28 12: Mutex,
0340541002 2025-04-24 13: };
0340541002 2025-04-24 14: use chrono::{
0340541002 2025-04-24 15: DateTime,
0340541002 2025-04-24 16: FixedOffset,
0340541002 2025-04-24 17: Local,
0340541002 2025-04-24 18: };
0340541002 2025-04-24 19: use sqlx::{
0340541002 2025-04-24 20: Postgres,
0340541002 2025-04-24 21: Row,
0340541002 2025-04-24 22: postgres::PgPoolOptions,
0340541002 2025-04-24 23: pool::PoolConnection,
0340541002 2025-04-24 24: };
0340541002 2025-04-24 25:
0340541002 2025-04-24 26: #[derive(sqlx::FromRow, Debug)]
0340541002 2025-04-24 27: pub struct List {
e3f7eeb26a 2025-04-26 28: pub source_id: i32,
0340541002 2025-04-24 29: pub channel: String,
0340541002 2025-04-24 30: pub enabled: bool,
0340541002 2025-04-24 31: pub url: String,
0340541002 2025-04-24 32: pub iv_hash: Option<String>,
0340541002 2025-04-24 33: pub url_re: Option<String>,
b4af85e31b 2025-06-29 34: }
b4af85e31b 2025-06-29 35:
b4af85e31b 2025-06-29 36: impl fmt::Display for List {
b4af85e31b 2025-06-29 37: fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
b4af85e31b 2025-06-29 38: write!(f, "\\#ļøā£ {} \\*ļøā£ `{}` {}\nš `{}`", self.source_id, self.channel,
b4af85e31b 2025-06-29 39: match self.enabled {
b4af85e31b 2025-06-29 40: true => "š enabled",
b4af85e31b 2025-06-29 41: false => "ā disabled",
b4af85e31b 2025-06-29 42: }, self.url)?;
b4af85e31b 2025-06-29 43: if let Some(iv_hash) = &self.iv_hash {
b4af85e31b 2025-06-29 44: write!(f, "\nIV: `{iv_hash}`")?;
b4af85e31b 2025-06-29 45: }
b4af85e31b 2025-06-29 46: if let Some(url_re) = &self.url_re {
b4af85e31b 2025-06-29 47: write!(f, "\nRE: `{url_re}`")?;
b4af85e31b 2025-06-29 48: }
b4af85e31b 2025-06-29 49: Ok(())
b4af85e31b 2025-06-29 50: }
fae13a0e55 2025-06-28 51: }
fae13a0e55 2025-06-28 52:
0340541002 2025-04-24 53: #[derive(sqlx::FromRow, Debug)]
0340541002 2025-04-24 54: pub struct Source {
0340541002 2025-04-24 55: pub channel_id: i64,
0340541002 2025-04-24 56: pub url: String,
0340541002 2025-04-24 57: pub iv_hash: Option<String>,
0340541002 2025-04-24 58: pub owner: i64,
0340541002 2025-04-24 59: pub url_re: Option<String>,
0340541002 2025-04-24 60: }
0340541002 2025-04-24 61:
0340541002 2025-04-24 62: #[derive(sqlx::FromRow)]
0340541002 2025-04-24 63: pub struct Queue {
0340541002 2025-04-24 64: pub source_id: Option<i32>,
0340541002 2025-04-24 65: pub next_fetch: Option<DateTime<Local>>,
0340541002 2025-04-24 66: pub owner: Option<i64>,
0340541002 2025-04-24 67: }
0340541002 2025-04-24 68:
0340541002 2025-04-24 69: #[derive(Clone)]
0340541002 2025-04-24 70: pub struct Db {
fae13a0e55 2025-06-28 71: pool: Arc<Mutex<sqlx::Pool<sqlx::Postgres>>>,
0340541002 2025-04-24 72: }
0340541002 2025-04-24 73:
0340541002 2025-04-24 74: pub struct Conn{
0340541002 2025-04-24 75: conn: PoolConnection<Postgres>,
0340541002 2025-04-24 76: }
0340541002 2025-04-24 77:
0340541002 2025-04-24 78: impl Db {
0340541002 2025-04-24 79: pub fn new (pguri: &str) -> Result<Db> {
0340541002 2025-04-24 80: Ok(Db{
fae13a0e55 2025-06-28 81: pool: Arc::new(Mutex::new(PgPoolOptions::new()
0340541002 2025-04-24 82: .max_connections(5)
0340541002 2025-04-24 83: .acquire_timeout(std::time::Duration::new(300, 0))
0340541002 2025-04-24 84: .idle_timeout(std::time::Duration::new(60, 0))
fae13a0e55 2025-06-28 85: .connect_lazy(pguri)?)),
0340541002 2025-04-24 86: })
0340541002 2025-04-24 87: }
0340541002 2025-04-24 88:
bb89b6fab8 2025-06-15 89: pub async fn begin(&self) -> Result<Conn> {
fae13a0e55 2025-06-28 90: let pool = self.pool.lock_arc().await;
bb89b6fab8 2025-06-15 91: let conn = Conn::new(pool.acquire().await?).await?;
bb89b6fab8 2025-06-15 92: Ok(conn)
0340541002 2025-04-24 93: }
0340541002 2025-04-24 94: }
0340541002 2025-04-24 95:
0340541002 2025-04-24 96: impl Conn {
bb89b6fab8 2025-06-15 97: pub async fn new (conn: PoolConnection<Postgres>) -> Result<Conn> {
0340541002 2025-04-24 98: Ok(Conn{
0340541002 2025-04-24 99: conn,
0340541002 2025-04-24 100: })
0340541002 2025-04-24 101: }
0340541002 2025-04-24 102:
bb89b6fab8 2025-06-15 103: pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
0340541002 2025-04-24 104: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
bb89b6fab8 2025-06-15 105: .bind(source_id)
0340541002 2025-04-24 106: .bind(date)
0340541002 2025-04-24 107: .bind(post_url)
0340541002 2025-04-24 108: .execute(&mut *self.conn).await?;
0340541002 2025-04-24 109: Ok(())
0340541002 2025-04-24 110: }
0340541002 2025-04-24 111:
bb89b6fab8 2025-06-15 112: pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
bb89b6fab8 2025-06-15 113: where I: Into<i64> {
0340541002 2025-04-24 114: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
0340541002 2025-04-24 115: .bind(source_id)
bb89b6fab8 2025-06-15 116: .bind(owner.into())
0340541002 2025-04-24 117: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 118: 0 => { Ok("No data found found.".into()) },
0340541002 2025-04-24 119: x => { Ok(format!("{x} posts purged.").into()) },
0340541002 2025-04-24 120: }
0340541002 2025-04-24 121: }
0340541002 2025-04-24 122:
bb89b6fab8 2025-06-15 123: pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
bb89b6fab8 2025-06-15 124: where I: Into<i64> {
0340541002 2025-04-24 125: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
0340541002 2025-04-24 126: .bind(source_id)
bb89b6fab8 2025-06-15 127: .bind(owner.into())
0340541002 2025-04-24 128: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 129: 0 => { Ok("No data found found.".into()) },
0340541002 2025-04-24 130: x => { Ok(format!("{} sources removed.", x).into()) },
0340541002 2025-04-24 131: }
0340541002 2025-04-24 132: }
0340541002 2025-04-24 133:
bb89b6fab8 2025-06-15 134: pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
bb89b6fab8 2025-06-15 135: where I: Into<i64> {
0340541002 2025-04-24 136: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
0340541002 2025-04-24 137: .bind(source_id)
bb89b6fab8 2025-06-15 138: .bind(owner.into())
0340541002 2025-04-24 139: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 140: 1 => { Ok("Source disabled.") },
0340541002 2025-04-24 141: 0 => { Ok("Source not found.") },
0340541002 2025-04-24 142: _ => { bail!("Database error.") },
0340541002 2025-04-24 143: }
0340541002 2025-04-24 144: }
0340541002 2025-04-24 145:
bb89b6fab8 2025-06-15 146: pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
bb89b6fab8 2025-06-15 147: where I: Into<i64> {
0340541002 2025-04-24 148: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
0340541002 2025-04-24 149: .bind(source_id)
bb89b6fab8 2025-06-15 150: .bind(owner.into())
0340541002 2025-04-24 151: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 152: 1 => { Ok("Source enabled.") },
0340541002 2025-04-24 153: 0 => { Ok("Source not found.") },
0340541002 2025-04-24 154: _ => { bail!("Database error.") },
0340541002 2025-04-24 155: }
0340541002 2025-04-24 156: }
0340541002 2025-04-24 157:
bb89b6fab8 2025-06-15 158: pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<Option<bool>>
bb89b6fab8 2025-06-15 159: where I: Into<i64> {
0340541002 2025-04-24 160: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
0340541002 2025-04-24 161: .bind(post_url)
bb89b6fab8 2025-06-15 162: .bind(id.into())
0340541002 2025-04-24 163: .fetch_one(&mut *self.conn).await?;
0340541002 2025-04-24 164: let exists: Option<bool> = row.try_get("exists")?;
0340541002 2025-04-24 165: Ok(exists)
0340541002 2025-04-24 166: }
0340541002 2025-04-24 167:
0340541002 2025-04-24 168: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
0340541002 2025-04-24 169: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
0340541002 2025-04-24 170: .fetch_all(&mut *self.conn).await?;
0340541002 2025-04-24 171: Ok(block)
0340541002 2025-04-24 172: }
0340541002 2025-04-24 173:
bb89b6fab8 2025-06-15 174: pub async fn get_list <I> (&mut self, owner: I) -> Result<Vec<List>>
bb89b6fab8 2025-06-15 175: where I: Into<i64> {
0340541002 2025-04-24 176: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
bb89b6fab8 2025-06-15 177: .bind(owner.into())
0340541002 2025-04-24 178: .fetch_all(&mut *self.conn).await?;
b4af85e31b 2025-06-29 179: Ok(source)
b4af85e31b 2025-06-29 180: }
b4af85e31b 2025-06-29 181:
b4af85e31b 2025-06-29 182: pub async fn get_one <I> (&mut self, owner: I, id: i32) -> Result<Option<List>>
b4af85e31b 2025-06-29 183: where I: Into<i64> {
b4af85e31b 2025-06-29 184: let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and source_id = $2")
b4af85e31b 2025-06-29 185: .bind(owner.into())
b4af85e31b 2025-06-29 186: .bind(id)
b4af85e31b 2025-06-29 187: .fetch_optional(&mut *self.conn).await?;
bb89b6fab8 2025-06-15 188: Ok(source)
bb89b6fab8 2025-06-15 189: }
bb89b6fab8 2025-06-15 190:
bb89b6fab8 2025-06-15 191: pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
bb89b6fab8 2025-06-15 192: where I: Into<i64> {
0340541002 2025-04-24 193: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
0340541002 2025-04-24 194: .bind(id)
bb89b6fab8 2025-06-15 195: .bind(owner.into())
0340541002 2025-04-24 196: .fetch_one(&mut *self.conn).await?;
0340541002 2025-04-24 197: Ok(source)
0340541002 2025-04-24 198: }
0340541002 2025-04-24 199:
bb89b6fab8 2025-06-15 200: pub async fn set_scrape <I> (&mut self, id: I) -> Result<()>
bb89b6fab8 2025-06-15 201: where I: Into<i64> {
0340541002 2025-04-24 202: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
bb89b6fab8 2025-06-15 203: .bind(id.into())
0340541002 2025-04-24 204: .execute(&mut *self.conn).await?;
0340541002 2025-04-24 205: Ok(())
0340541002 2025-04-24 206: }
0340541002 2025-04-24 207:
bb89b6fab8 2025-06-15 208: pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str>
bb89b6fab8 2025-06-15 209: where I: Into<i64> {
0340541002 2025-04-24 210: match match update {
0340541002 2025-04-24 211: Some(id) => {
0340541002 2025-04-24 212: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
0340541002 2025-04-24 213: .bind(id)
0340541002 2025-04-24 214: },
0340541002 2025-04-24 215: None => {
0340541002 2025-04-24 216: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
0340541002 2025-04-24 217: },
0340541002 2025-04-24 218: }
0340541002 2025-04-24 219: .bind(channel_id)
0340541002 2025-04-24 220: .bind(url)
0340541002 2025-04-24 221: .bind(iv_hash)
bb89b6fab8 2025-06-15 222: .bind(owner.into())
0340541002 2025-04-24 223: .bind(channel)
0340541002 2025-04-24 224: .bind(url_re)
0340541002 2025-04-24 225: .execute(&mut *self.conn).await
0340541002 2025-04-24 226: {
0340541002 2025-04-24 227: Ok(_) => Ok(match update {
0340541002 2025-04-24 228: Some(_) => "Channel updated.",
0340541002 2025-04-24 229: None => "Channel added.",
0340541002 2025-04-24 230: }),
0340541002 2025-04-24 231: Err(sqlx::Error::Database(err)) => {
0340541002 2025-04-24 232: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
0340541002 2025-04-24 233: Some("_bt_check_unique", ) => {
0340541002 2025-04-24 234: Ok("Duplicate key.")
0340541002 2025-04-24 235: },
0340541002 2025-04-24 236: Some(_) => {
0340541002 2025-04-24 237: Ok("Database error.")
0340541002 2025-04-24 238: },
0340541002 2025-04-24 239: None => {
0340541002 2025-04-24 240: Ok("No database error extracted.")
0340541002 2025-04-24 241: },
0340541002 2025-04-24 242: }
0340541002 2025-04-24 243: },
0340541002 2025-04-24 244: Err(err) => {
0340541002 2025-04-24 245: bail!("Sorry, unknown error:\n{err:#?}\n");
0340541002 2025-04-24 246: },
0340541002 2025-04-24 247: }
0340541002 2025-04-24 248: }
0340541002 2025-04-24 249: }