0340541002 2025-04-24 arcade: use std::borrow::Cow;
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: use anyhow::{
0340541002 2025-04-24 arcade: Result,
0340541002 2025-04-24 arcade: bail,
0340541002 2025-04-24 arcade: };
0340541002 2025-04-24 arcade: use chrono::{
0340541002 2025-04-24 arcade: DateTime,
0340541002 2025-04-24 arcade: FixedOffset,
0340541002 2025-04-24 arcade: Local,
0340541002 2025-04-24 arcade: };
0340541002 2025-04-24 arcade: use sqlx::{
0340541002 2025-04-24 arcade: Pool,
0340541002 2025-04-24 arcade: Postgres,
0340541002 2025-04-24 arcade: Row,
0340541002 2025-04-24 arcade: postgres::PgPoolOptions,
0340541002 2025-04-24 arcade: pool::PoolConnection,
0340541002 2025-04-24 arcade: };
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: #[derive(sqlx::FromRow, Debug)]
0340541002 2025-04-24 arcade: pub struct List {
0340541002 2025-04-24 arcade: pub source_id: i64,
0340541002 2025-04-24 arcade: pub channel: String,
0340541002 2025-04-24 arcade: pub enabled: bool,
0340541002 2025-04-24 arcade: pub url: String,
0340541002 2025-04-24 arcade: pub iv_hash: Option<String>,
0340541002 2025-04-24 arcade: pub url_re: Option<String>,
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: #[derive(sqlx::FromRow, Debug)]
0340541002 2025-04-24 arcade: pub struct Source {
0340541002 2025-04-24 arcade: pub channel_id: i64,
0340541002 2025-04-24 arcade: pub url: String,
0340541002 2025-04-24 arcade: pub iv_hash: Option<String>,
0340541002 2025-04-24 arcade: pub owner: i64,
0340541002 2025-04-24 arcade: pub url_re: Option<String>,
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: #[derive(sqlx::FromRow)]
0340541002 2025-04-24 arcade: pub struct Queue {
0340541002 2025-04-24 arcade: pub source_id: Option<i32>,
0340541002 2025-04-24 arcade: pub next_fetch: Option<DateTime<Local>>,
0340541002 2025-04-24 arcade: pub owner: Option<i64>,
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: #[derive(Clone)]
0340541002 2025-04-24 arcade: pub struct Db {
0340541002 2025-04-24 arcade: pool: sqlx::Pool<sqlx::Postgres>,
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub struct Conn{
0340541002 2025-04-24 arcade: conn: PoolConnection<Postgres>,
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: impl Db {
0340541002 2025-04-24 arcade: pub fn new (pguri: &str) -> Result<Db> {
0340541002 2025-04-24 arcade: Ok(Db{
0340541002 2025-04-24 arcade: pool: PgPoolOptions::new()
0340541002 2025-04-24 arcade: .max_connections(5)
0340541002 2025-04-24 arcade: .acquire_timeout(std::time::Duration::new(300, 0))
0340541002 2025-04-24 arcade: .idle_timeout(std::time::Duration::new(60, 0))
0340541002 2025-04-24 arcade: .connect_lazy(pguri)?,
0340541002 2025-04-24 arcade: })
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn begin(&mut self) -> Result<Conn> {
0340541002 2025-04-24 arcade: Conn::new(&mut self.pool).await
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: impl Conn {
0340541002 2025-04-24 arcade: pub async fn new (pool: &mut Pool<Postgres>) -> Result<Conn> {
0340541002 2025-04-24 arcade: let conn = pool.acquire().await?;
0340541002 2025-04-24 arcade: Ok(Conn{
0340541002 2025-04-24 arcade: conn,
0340541002 2025-04-24 arcade: })
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn add_post (&mut self, id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
0340541002 2025-04-24 arcade: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
0340541002 2025-04-24 arcade: .bind(id)
0340541002 2025-04-24 arcade: .bind(date)
0340541002 2025-04-24 arcade: .bind(post_url)
0340541002 2025-04-24 arcade: .execute(&mut *self.conn).await?;
0340541002 2025-04-24 arcade: Ok(())
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn clean (&mut self, source_id: i32, owner: i64) -> Result<Cow<'_, str>> {
0340541002 2025-04-24 arcade: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
0340541002 2025-04-24 arcade: .bind(source_id)
0340541002 2025-04-24 arcade: .bind(owner)
0340541002 2025-04-24 arcade: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 arcade: 0 => { Ok("No data found found.".into()) },
0340541002 2025-04-24 arcade: x => { Ok(format!("{x} posts purged.").into()) },
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn delete (&mut self, source_id: i32, owner: i64) -> Result<Cow<'_, str>> {
0340541002 2025-04-24 arcade: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
0340541002 2025-04-24 arcade: .bind(source_id)
0340541002 2025-04-24 arcade: .bind(owner)
0340541002 2025-04-24 arcade: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 arcade: 0 => { Ok("No data found found.".into()) },
0340541002 2025-04-24 arcade: x => { Ok(format!("{} sources removed.", x).into()) },
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn disable (&mut self, source_id: i32, owner: i64) -> Result<&str> {
0340541002 2025-04-24 arcade: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
0340541002 2025-04-24 arcade: .bind(source_id)
0340541002 2025-04-24 arcade: .bind(owner)
0340541002 2025-04-24 arcade: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 arcade: 1 => { Ok("Source disabled.") },
0340541002 2025-04-24 arcade: 0 => { Ok("Source not found.") },
0340541002 2025-04-24 arcade: _ => { bail!("Database error.") },
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn enable (&mut self, source_id: i32, owner: i64) -> Result<&str> {
0340541002 2025-04-24 arcade: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
0340541002 2025-04-24 arcade: .bind(source_id)
0340541002 2025-04-24 arcade: .bind(owner)
0340541002 2025-04-24 arcade: .execute(&mut *self.conn).await?.rows_affected() {
0340541002 2025-04-24 arcade: 1 => { Ok("Source enabled.") },
0340541002 2025-04-24 arcade: 0 => { Ok("Source not found.") },
0340541002 2025-04-24 arcade: _ => { bail!("Database error.") },
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn exists (&mut self, post_url: &str, id: i32) -> Result<Option<bool>> {
0340541002 2025-04-24 arcade: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
0340541002 2025-04-24 arcade: .bind(post_url)
0340541002 2025-04-24 arcade: .bind(id)
0340541002 2025-04-24 arcade: .fetch_one(&mut *self.conn).await?;
0340541002 2025-04-24 arcade: let exists: Option<bool> = row.try_get("exists")?;
0340541002 2025-04-24 arcade: Ok(exists)
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
0340541002 2025-04-24 arcade: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
0340541002 2025-04-24 arcade: .fetch_all(&mut *self.conn).await?;
0340541002 2025-04-24 arcade: Ok(block)
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn get_list (&mut self, owner: i64) -> Result<Vec<List>> {
0340541002 2025-04-24 arcade: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
0340541002 2025-04-24 arcade: .bind(owner)
0340541002 2025-04-24 arcade: .fetch_all(&mut *self.conn).await?;
0340541002 2025-04-24 arcade: Ok(source)
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn get_source (&mut self, id: i32, owner: i64) -> Result<Source> {
0340541002 2025-04-24 arcade: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
0340541002 2025-04-24 arcade: .bind(id)
0340541002 2025-04-24 arcade: .bind(owner)
0340541002 2025-04-24 arcade: .fetch_one(&mut *self.conn).await?;
0340541002 2025-04-24 arcade: Ok(source)
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn set_scrape (&mut self, id: i32) -> Result<()> {
0340541002 2025-04-24 arcade: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
0340541002 2025-04-24 arcade: .bind(id)
0340541002 2025-04-24 arcade: .execute(&mut *self.conn).await?;
0340541002 2025-04-24 arcade: Ok(())
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade:
0340541002 2025-04-24 arcade: pub async fn update (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: i64) -> Result<&str> {
0340541002 2025-04-24 arcade: match match update {
0340541002 2025-04-24 arcade: Some(id) => {
0340541002 2025-04-24 arcade: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
0340541002 2025-04-24 arcade: .bind(id)
0340541002 2025-04-24 arcade: },
0340541002 2025-04-24 arcade: None => {
0340541002 2025-04-24 arcade: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
0340541002 2025-04-24 arcade: },
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade: .bind(channel_id)
0340541002 2025-04-24 arcade: .bind(url)
0340541002 2025-04-24 arcade: .bind(iv_hash)
0340541002 2025-04-24 arcade: .bind(owner)
0340541002 2025-04-24 arcade: .bind(channel)
0340541002 2025-04-24 arcade: .bind(url_re)
0340541002 2025-04-24 arcade: .execute(&mut *self.conn).await
0340541002 2025-04-24 arcade: {
0340541002 2025-04-24 arcade: Ok(_) => Ok(match update {
0340541002 2025-04-24 arcade: Some(_) => "Channel updated.",
0340541002 2025-04-24 arcade: None => "Channel added.",
0340541002 2025-04-24 arcade: }),
0340541002 2025-04-24 arcade: Err(sqlx::Error::Database(err)) => {
0340541002 2025-04-24 arcade: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
0340541002 2025-04-24 arcade: Some("_bt_check_unique", ) => {
0340541002 2025-04-24 arcade: Ok("Duplicate key.")
0340541002 2025-04-24 arcade: },
0340541002 2025-04-24 arcade: Some(_) => {
0340541002 2025-04-24 arcade: Ok("Database error.")
0340541002 2025-04-24 arcade: },
0340541002 2025-04-24 arcade: None => {
0340541002 2025-04-24 arcade: Ok("No database error extracted.")
0340541002 2025-04-24 arcade: },
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade: },
0340541002 2025-04-24 arcade: Err(err) => {
0340541002 2025-04-24 arcade: bail!("Sorry, unknown error:\n{err:#?}\n");
0340541002 2025-04-24 arcade: },
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade: }
0340541002 2025-04-24 arcade: }