Index: Cargo.lock ================================================================== --- Cargo.lock +++ Cargo.lock @@ -231,11 +231,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc6dde6e4ed435a4c1ee4e73592f5ba9da2151af10076cc04858746af9352d09" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] name = "atoi" version = "1.0.0" @@ -402,13 +402,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" [[package]] name = "cc" -version = "1.0.79" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" +checksum = "6c6b2562119bf28c3439f7f02db99faf0aa1a8cdfe5772a2ee155d32227239f0" +dependencies = [ + "libc", +] [[package]] name = "cfg-if" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1037,11 +1040,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] name = "futures-rustls" version = "0.22.2" @@ -1544,13 +1547,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ef53942eb7bf7ff43a617b3e2c1c4a5ecf5944a7c1bc12d7ee39bbb15e5c1519" [[package]] name = "linux-raw-sys" -version = "0.4.3" +version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "09fc20d2ca12cb9f044c93e3bd6d32d523e6e2ec3db4f7b2939cd99026ecd3f0" +checksum = "57bcfdad1b858c2db7c38303a6d2ad4dfaf5eb53dfeb0910128b2c26d6158503" [[package]] name = "lock_api" version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1800,11 +1803,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] name = "openssl-probe" version = "0.1.5" @@ -1928,11 +1931,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec2e072ecce94ec471b13398d5402c188e76ac03cf74dd1a975161b23a3f6d9c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] name = "pin-project-lite" version = "0.2.10" @@ -2420,18 +2423,18 @@ "windows-sys", ] [[package]] name = "rustix" -version = "0.38.4" +version = "0.38.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a962918ea88d644592894bc6dc55acc6c0956488adcebbfb6e273506b7fd6e5" +checksum = "1ee020b1716f0a80e2ace9b03441a749e402e86712f15f16fe8a8f75afac732f" dependencies = [ "bitflags 2.3.3", "errno", "libc", - "linux-raw-sys 0.4.3", + "linux-raw-sys 0.4.5", "windows-sys", ] [[package]] name = "rustls" @@ -2532,13 +2535,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.178" +version = "1.0.181" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60363bdd39a7be0266a520dab25fdc9241d2f987b08a01e01f0ec6d06a981348" +checksum = "6d3e73c93c3240c0bda063c239298e633114c69a888c3e37ca8bb33f343e9890" dependencies = [ "serde_derive", ] [[package]] @@ -2551,17 +2554,17 @@ "serde", ] [[package]] name = "serde_derive" -version = "1.0.178" +version = "1.0.181" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f28482318d6641454cb273da158647922d1be6b5a2fcc6165cd89ebdd7ed576b" +checksum = "be02f6cb0cd3a5ec20bbcfbcbd749f57daddb1a0882dc2e46a6c236c90b977ed" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] name = "serde_json" version = "1.0.104" @@ -2829,13 +2832,13 @@ "unicode-ident", ] [[package]] name = "syn" -version = "2.0.27" +version = "2.0.28" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b60f673f44a8255b9c8c657daf66a596d435f2da81a555b06dc644d080ba45e0" +checksum = "04361975b3f5e348b2189d8dc55bc942f278b2d482a6a0365de5bdd62d351567" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] @@ -2887,11 +2890,11 @@ checksum = "5486094ee78b2e5038a6382ed7645bc084dc2ec433426ca4c3cb61e2007b8998" dependencies = [ "cfg-if 1.0.0", "fastrand 2.0.0", "redox_syscall 0.3.5", - "rustix 0.38.4", + "rustix 0.38.6", "windows-sys", ] [[package]] name = "thiserror" @@ -2908,11 +2911,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "090198534930841fab3a5d1bb637cde49e339654e606195f8d9c76eeb081dc96" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] name = "time" version = "0.1.45" @@ -3158,11 +3161,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", ] [[package]] name = "tracing-core" version = "0.1.31" @@ -3369,11 +3372,11 @@ "bumpalo", "log", "once_cell", "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", "wasm-bindgen-shared", ] [[package]] name = "wasm-bindgen-futures" @@ -3403,11 +3406,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.27", + "syn 2.0.28", "wasm-bindgen-backend", "wasm-bindgen-shared", ] [[package]] Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "rsstg" -version = "0.2.18" +version = "0.2.19" authors = ["arcade"] edition = "2021" [dependencies] anyhow = "*" Index: src/core.rs ================================================================== --- src/core.rs +++ src/core.rs @@ -1,12 +1,9 @@ use anyhow::{anyhow, bail, Context, Result}; use async_std::task; use chrono::DateTime; -use sqlx::{ - postgres::PgPoolOptions, - Row, -}; +use sqlx::postgres::PgPoolOptions; use std::{ borrow::Cow, collections::{ BTreeMap, HashSet, @@ -97,34 +94,20 @@ }, } }; let count = Arc::strong_count(&id); if count == 2 { - let mut conn = self.pool.acquire().await - .with_context(|| format!("Query queue fetch conn:\n{:?}", &self.pool))?; - let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2") - .bind(*id) - .bind(owner) - .fetch_one(&mut conn).await - .with_context(|| format!("Query source:\n{:?}", &self.pool))?; - drop(conn); - - let channel_id: i64 = row.try_get("channel_id")?; - let url: &str = row.try_get("url")?; - let iv_hash: Option<&str> = row.try_get("iv_hash")?; - let url_re = match row.try_get("url_re")? { - Some(x) => Some(sedregex::ReplaceCommand::new(x)?), - None => None, - }; + let source = sqlx::query!("select source_id, channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2", + *id, owner).fetch_one(&mut self.pool.acquire().await?).await?; let destination = match real { - true => telegram_bot::UserId::new(channel_id), - false => telegram_bot::UserId::new(row.try_get("owner")?), + true => telegram_bot::UserId::new(source.channel_id), + false => telegram_bot::UserId::new(source.owner), }; let mut this_fetch: Option> = None; let mut posts: BTreeMap, String> = BTreeMap::new(); - let response = self.http_client.get(url).send().await?; + let response = self.http_client.get(&source.url).send().await?; let status = response.status(); let content = response.bytes().await?; match rss::Channel::read_from(&content[..]) { Ok(feed) => { for item in feed.items() { @@ -139,111 +122,79 @@ }; }, Err(err) => match err { rss::Error::InvalidStartTag => { let feed = atom_syndication::Feed::read_from(&content[..]) - .with_context(|| format!("Problem opening feed url:\n{}\n{}", &url, status))?; + .with_context(|| format!("Problem opening feed url:\n{}\n{}", &source.url, status))?; for item in feed.entries() { let date = item.published().unwrap(); let url = item.links()[0].href(); posts.insert(*date, url.to_string()); }; }, rss::Error::Eof => (), - _ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &url, err, status) + _ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &source.url, err, status) } }; for (date, url) in posts.iter() { - let mut conn = self.pool.acquire().await - .with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?; - let post_url: Cow = match url_re { - Some(ref x) => x.execute(url), + let post_url: Cow = match source.url_re { + Some(ref x) => sedregex::ReplaceCommand::new(x)?.execute(&source.url), None => url.into(), }; - let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") - .bind(&*post_url) - .bind(*id) - .fetch_one(&mut conn).await - .with_context(|| format!("Check post:\n{:?}", &conn))?; - let exists: bool = row.try_get("exists")?; - if ! exists { - if this_fetch.is_none() || *date > this_fetch.unwrap() { - this_fetch = Some(*date); + if let Some(exists) = sqlx::query!("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;", + &post_url, *id).fetch_one(&mut self.pool.acquire().await?).await?.exists { + if ! exists { + if this_fetch.is_none() || *date > this_fetch.unwrap() { + this_fetch = Some(*date); + }; + self.tg.send( match &source.iv_hash { + Some(hash) => telegram_bot::SendMessage::new(destination, format!(" {0}", &post_url, hash)), + None => telegram_bot::SendMessage::new(destination, format!("{}", post_url)), + }.parse_mode(telegram_bot::types::ParseMode::Html)).await + .context("Can't post message:")?; + sqlx::query!("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);", + *id, date, &post_url).execute(&mut self.pool.acquire().await?).await?; + task::sleep(std::time::Duration::new(4, 0)).await; }; - self.tg.send( match iv_hash { - Some(hash) => telegram_bot::SendMessage::new(destination, format!(" {0}", &post_url, hash)), - None => telegram_bot::SendMessage::new(destination, format!("{}", post_url)), - }.parse_mode(telegram_bot::types::ParseMode::Html)).await - .context("Can't post message:")?; - sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") - .bind(*id) - .bind(date) - .bind(&*post_url) - .execute(&mut conn).await - .with_context(|| format!("Record post:\n{:?}", &conn))?; - drop(conn); - task::sleep(std::time::Duration::new(4, 0)).await; }; posted += 1; }; posts.clear(); }; - let mut conn = self.pool.acquire().await - .with_context(|| format!("Update scrape fetch conn:\n{:?}", &self.pool))?; - sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") - .bind(*id) - .execute(&mut conn).await - .with_context(|| format!("Update scrape:\n{:?}", &conn))?; + sqlx::query!("update rsstg_source set last_scrape = now() where source_id = $1;", + *id).execute(&mut self.pool.acquire().await?).await?; Ok(format!("Posted: {}", &posted).into()) } pub async fn delete(&self, source_id: &i32, owner: S) -> Result> where S: Into { let owner = owner.into(); - let mut conn = self.pool.acquire().await - .with_context(|| format!("Delete fetch conn:\n{:?}", &self.pool))?; - match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;") - .bind(source_id) - .bind(owner) - .execute(&mut conn).await - .with_context(|| format!("Delete source rule:\n{:?}", &self.pool))? - .rows_affected() { + match sqlx::query!("delete from rsstg_source where source_id = $1 and owner = $2;", + source_id, owner).execute(&mut self.pool.acquire().await?).await?.rows_affected() { 0 => { Ok("No data found found.".into()) }, x => { Ok(format!("{} sources removed.", x).into()) }, } } pub async fn clean(&self, source_id: &i32, owner: S) -> Result> where S: Into { let owner = owner.into(); - let mut conn = self.pool.acquire().await - .with_context(|| format!("Clean fetch conn:\n{:?}", &self.pool))?; - match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;") - .bind(source_id) - .bind(owner) - .execute(&mut conn).await - .with_context(|| format!("Clean seen posts:\n{:?}", &self.pool))? - .rows_affected() { + match sqlx::query!("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;", + source_id, owner).execute(&mut self.pool.acquire().await?).await?.rows_affected() { 0 => { Ok("No data found found.".into()) }, x => { Ok(format!("{} posts purged.", x).into()) }, } } pub async fn enable(&self, source_id: &i32, owner: S) -> Result<&str> where S: Into { let owner = owner.into(); - let mut conn = self.pool.acquire().await - .with_context(|| format!("Enable fetch conn:\n{:?}", &self.pool))?; - match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2") - .bind(source_id) - .bind(owner) - .execute(&mut conn).await - .with_context(|| format!("Enable source:\n{:?}", &self.pool))? - .rows_affected() { + match sqlx::query!("update rsstg_source set enabled = true where source_id = $1 and owner = $2", + source_id, owner).execute(&mut self.pool.acquire().await?).await?.rows_affected() { 1 => { Ok("Source enabled.") }, 0 => { Ok("Source not found.") }, _ => { Err(anyhow!("Database error.")) }, } } @@ -250,18 +201,12 @@ pub async fn disable(&self, source_id: &i32, owner: S) -> Result<&str> where S: Into { let owner = owner.into(); - let mut conn = self.pool.acquire().await - .with_context(|| format!("Disable fetch conn:\n{:?}", &self.pool))?; - match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2") - .bind(source_id) - .bind(owner) - .execute(&mut conn).await - .with_context(|| format!("Disable source:\n{:?}", &self.pool))? - .rows_affected() { + match sqlx::query!("update rsstg_source set enabled = false where source_id = $1 and owner = $2", + source_id, owner).execute(&mut self.pool.acquire().await?).await?.rows_affected() { 1 => { Ok("Source disabled.") }, 0 => { Ok("Source not found.") }, _ => { Err(anyhow!("Database error.")) }, } } @@ -268,28 +213,20 @@ pub async fn update(&self, update: Option, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result<&str> where S: Into { let owner = owner.into(); - let mut conn = self.pool.acquire().await - .with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?; - match match update { Some(id) => { - sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1").bind(id) + sqlx::query!("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1", + id, channel_id, url, iv_hash, owner, channel, url_re).execute(&mut self.pool.acquire().await?).await }, None => { - sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)") + sqlx::query!("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)", + channel_id, url, iv_hash, owner, channel, url_re).execute(&mut self.pool.acquire().await?).await }, - } - .bind(channel_id) - .bind(url) - .bind(iv_hash) - .bind(owner) - .bind(channel) - .bind(url_re) - .execute(&mut conn).await { + } { Ok(_) => Ok(match update { Some(_) => "Channel updated.", None => "Channel added.", }), Err(sqlx::Error::Database(err)) => { @@ -311,33 +248,32 @@ } } async fn autofetch(&self) -> Result { let mut delay = chrono::Duration::minutes(1); - let mut conn = self.pool.acquire().await - .with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?; let now = chrono::Local::now(); - let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") - .fetch_all(&mut conn).await?; + let mut queue = sqlx::query!(r#"select source_id, next_fetch as "next_fetch: DateTime", owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';"#) + .fetch_all(&mut self.pool.acquire().await?).await?; for row in queue.iter() { - let source_id: i32 = row.try_get("source_id")?; - let owner: i64 = row.try_get("owner")?; - let next_fetch: DateTime = row.try_get("next_fetch")?; - if next_fetch < now { - let clone = Core { - owner_chat: telegram_bot::UserId::new(owner), - ..self.clone() - }; - task::spawn(async move { - if let Err(err) = clone.check(&source_id, owner, true).await { - if let Err(err) = clone.send(&format!("šŸ›‘ {:?}", err), None, None).await { - eprintln!("Check error: {}", err); - }; - }; - }); - } else if next_fetch - now < delay { - delay = next_fetch - now; + if let Some(next_fetch) = row.next_fetch { + if next_fetch < now { + if let (Some(owner), Some(source_id)) = (row.owner, row.source_id) { + let clone = Core { + owner_chat: telegram_bot::UserId::new(owner), + ..self.clone() + }; + task::spawn(async move { + if let Err(err) = clone.check(&source_id, owner, true).await { + if let Err(err) = clone.send(&format!("šŸ›‘ {:?}", err), None, None).await { + eprintln!("Check error: {}", err); + }; + }; + }); + } + } else if next_fetch - now < delay { + delay = next_fetch - now; + } } }; queue.clear(); Ok(delay.to_std()?) } @@ -345,33 +281,24 @@ pub async fn list(&self, owner: S) -> Result where S: Into { let owner = owner.into(); let mut reply: Vec> = vec![]; - let mut conn = self.pool.acquire().await - .with_context(|| format!("List fetch conn:\n{:?}", &self.pool))?; reply.push("Channels:".into()); - let rows = sqlx::query("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id") - .bind(owner) - .fetch_all(&mut conn).await?; + let rows = sqlx::query!("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id", + owner).fetch_all(&mut self.pool.acquire().await?).await?; for row in rows.iter() { - let source_id: i32 = row.try_get("source_id")?; - let username: &str = row.try_get("channel")?; - let enabled: bool = row.try_get("enabled")?; - let url: &str = row.try_get("url")?; - let iv_hash: Option<&str> = row.try_get("iv_hash")?; - let url_re: Option<&str> = row.try_get("url_re")?; - reply.push(format!("\n\\#ļøāƒ£ {} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", source_id, username, - match enabled { + reply.push(format!("\n\\#ļøāƒ£ {} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", row.source_id, row.channel, + match row.enabled { true => "šŸ”„ enabled", false => "ā›” disabled", - }, url).into()); - if let Some(hash) = iv_hash { + }, row.url).into()); + if let Some(hash) = &row.iv_hash { reply.push(format!("IV: `{}`", hash).into()); } - if let Some(re) = url_re { + if let Some(re) = &row.url_re { reply.push(format!("RE: `{}`", re).into()); } }; Ok(reply.join("\n")) } }