Index: Cargo.lock ================================================================== --- Cargo.lock +++ Cargo.lock @@ -996,13 +996,13 @@ "wasm-bindgen", ] [[package]] name = "h2" -version = "0.4.10" +version = "0.4.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9421a676d1b147b16b82c9225157dc629087ef8ec4d5e2960f9437a90dac0a5" +checksum = "17da50a276f1e01e0ba6c029e47b7100754904ee8a278f886546e98575380785" dependencies = [ "atomic-waker", "bytes", "fnv", "futures-core", @@ -2003,13 +2003,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" [[package]] name = "reqwest" -version = "0.12.20" +version = "0.12.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eabf4c97d9130e2bf606614eb937e86edac8292eaa6f422f995d7e8de1eb1813" +checksum = "4c8cea6b35bcceb099f30173754403d2eba0a5dc18cea3630fccd88251909288" dependencies = [ "async-compression", "base64", "bytes", "encoding_rs", @@ -2098,11 +2098,11 @@ "quick-xml", ] [[package]] name = "rsstg" -version = "0.4.3" +version = "0.4.4" dependencies = [ "anyhow", "async-std", "atom_syndication", "chrono", @@ -2304,24 +2304,24 @@ "serde", ] [[package]] name = "serde_with" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bf65a400f8f66fb7b0552869ad70157166676db75ed8181f8104ea91cf9d0b42" +checksum = "f2c45cd61fefa9db6f254525d46e392b852e0e61d9a1fd36e5bd183450a556d5" dependencies = [ "serde", "serde_derive", "serde_with_macros", ] [[package]] name = "serde_with_macros" -version = "3.13.0" +version = "3.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81679d9ed988d5e9a5e6531dc3f2c28efbd639cbd1dfb628df08edea6004da77" +checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f" dependencies = [ "darling", "proc-macro2", "quote", "syn 2.0.104", Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "rsstg" -version = "0.4.3" +version = "0.4.4" authors = ["arcade"] edition = "2021" [dependencies] anyhow = "1.0.86" Index: src/core.rs ================================================================== --- src/core.rs +++ src/core.rs @@ -7,22 +7,24 @@ borrow::Cow, collections::{ BTreeMap, HashSet, }, - sync::{ - Arc, - Mutex - }, }; use anyhow::{ anyhow, bail, Result, }; -use async_std::task; +use async_std::{ + task, + sync::{ + Arc, + Mutex + }, +}; use chrono::DateTime; use lazy_static::lazy_static; use regex::Regex; use tgbot::{ api::Client, @@ -116,11 +118,11 @@ pub async fn check (&self, id: i32, real: bool) -> Result { let mut posted: i32 = 0; let mut conn = self.db.begin().await?; let id = { - let mut set = self.sources.lock().unwrap(); + let mut set = self.sources.lock_arc().await; match set.get(&id) { Some(id) => id.clone(), None => { let id = Arc::new(id); set.insert(id.clone()); @@ -221,11 +223,11 @@ Err(err) => format!("Failed to fetch source data:\n{err}"), } }; task::spawn(async move { if let Err(err) = clone.check(source_id, true).await { - if let Err(err) = clone.send(&format!("{source}\nšŸ›‘ {}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await { + if let Err(err) = clone.send(&format!("{source}\n\nšŸ›‘ {}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await { eprintln!("Check error: {err:?}"); // clone.disable(&source_id, owner).await.unwrap(); }; }; }); Index: src/sql.rs ================================================================== --- src/sql.rs +++ src/sql.rs @@ -33,11 +33,11 @@ pub url_re: Option, } impl fmt::Display for List { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { - write!(f, "\\#ļøāƒ£ {} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", self.source_id, self.channel, + write!(f, "#{} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", self.source_id, self.channel, match self.enabled { true => "šŸ”„ enabled", false => "ā›” disabled", }, self.url)?; if let Some(iv_hash) = &self.iv_hash { @@ -65,58 +65,52 @@ pub next_fetch: Option>, pub owner: Option, } #[derive(Clone)] -pub struct Db { - pool: Arc>>, -} - -pub struct Conn{ - conn: PoolConnection, -} +pub struct Db ( + Arc>>, +); impl Db { pub fn new (pguri: &str) -> Result { - Ok(Db{ - pool: Arc::new(Mutex::new(PgPoolOptions::new() + Ok(Db ( + Arc::new(Mutex::new(PgPoolOptions::new() .max_connections(5) .acquire_timeout(std::time::Duration::new(300, 0)) .idle_timeout(std::time::Duration::new(60, 0)) .connect_lazy(pguri)?)), - }) + )) } pub async fn begin(&self) -> Result { - let pool = self.pool.lock_arc().await; - let conn = Conn::new(pool.acquire().await?).await?; + let pool = self.0.lock_arc().await; + let conn = Conn ( pool.acquire().await? ); Ok(conn) } } -impl Conn { - pub async fn new (conn: PoolConnection) -> Result { - Ok(Conn{ - conn, - }) - } +pub struct Conn ( + PoolConnection, +); +impl Conn { pub async fn add_post (&mut self, source_id: i32, date: &DateTime, post_url: &str) -> Result<()> { sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") .bind(source_id) .bind(date) .bind(post_url) - .execute(&mut *self.conn).await?; + .execute(&mut *self.0).await?; Ok(()) } pub async fn clean (&mut self, source_id: i32, owner: I) -> Result> where I: Into { match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;") .bind(source_id) .bind(owner.into()) - .execute(&mut *self.conn).await?.rows_affected() { + .execute(&mut *self.0).await?.rows_affected() { 0 => { Ok("No data found found.".into()) }, x => { Ok(format!("{x} posts purged.").into()) }, } } @@ -123,11 +117,11 @@ pub async fn delete (&mut self, source_id: i32, owner: I) -> Result> where I: Into { match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;") .bind(source_id) .bind(owner.into()) - .execute(&mut *self.conn).await?.rows_affected() { + .execute(&mut *self.0).await?.rows_affected() { 0 => { Ok("No data found found.".into()) }, x => { Ok(format!("{} sources removed.", x).into()) }, } } @@ -134,11 +128,11 @@ pub async fn disable (&mut self, source_id: i32, owner: I) -> Result<&str> where I: Into { match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2") .bind(source_id) .bind(owner.into()) - .execute(&mut *self.conn).await?.rows_affected() { + .execute(&mut *self.0).await?.rows_affected() { 1 => { Ok("Source disabled.") }, 0 => { Ok("Source not found.") }, _ => { bail!("Database error.") }, } } @@ -146,11 +140,11 @@ pub async fn enable (&mut self, source_id: i32, owner: I) -> Result<&str> where I: Into { match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2") .bind(source_id) .bind(owner.into()) - .execute(&mut *self.conn).await?.rows_affected() { + .execute(&mut *self.0).await?.rows_affected() { 1 => { Ok("Source enabled.") }, 0 => { Ok("Source not found.") }, _ => { bail!("Database error.") }, } } @@ -158,52 +152,52 @@ pub async fn exists (&mut self, post_url: &str, id: I) -> Result> where I: Into { let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") .bind(post_url) .bind(id.into()) - .fetch_one(&mut *self.conn).await?; + .fetch_one(&mut *self.0).await?; let exists: Option = row.try_get("exists")?; Ok(exists) } pub async fn get_queue (&mut self) -> Result> { let block: Vec = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") - .fetch_all(&mut *self.conn).await?; + .fetch_all(&mut *self.0).await?; Ok(block) } pub async fn get_list (&mut self, owner: I) -> Result> where I: Into { let source: Vec = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id") .bind(owner.into()) - .fetch_all(&mut *self.conn).await?; + .fetch_all(&mut *self.0).await?; Ok(source) } pub async fn get_one (&mut self, owner: I, id: i32) -> Result> where I: Into { let source: Option = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and source_id = $2") .bind(owner.into()) .bind(id) - .fetch_optional(&mut *self.conn).await?; + .fetch_optional(&mut *self.0).await?; Ok(source) } pub async fn get_source (&mut self, id: i32, owner: I) -> Result where I: Into { let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2") .bind(id) .bind(owner.into()) - .fetch_one(&mut *self.conn).await?; + .fetch_one(&mut *self.0).await?; Ok(source) } pub async fn set_scrape (&mut self, id: I) -> Result<()> where I: Into { sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") .bind(id.into()) - .execute(&mut *self.conn).await?; + .execute(&mut *self.0).await?; Ok(()) } pub async fn update (&mut self, update: Option, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str> where I: Into { @@ -220,11 +214,11 @@ .bind(url) .bind(iv_hash) .bind(owner.into()) .bind(channel) .bind(url_re) - .execute(&mut *self.conn).await + .execute(&mut *self.0).await { Ok(_) => Ok(match update { Some(_) => "Channel updated.", None => "Channel added.", }),