Index: Cargo.lock ================================================================== --- Cargo.lock +++ Cargo.lock @@ -353,13 +353,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "cc" -version = "1.2.51" +version = "1.2.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a0aeaff4ff1a90589618835a598e545176939b97874f7abc7851caa0618f203" +checksum = "cd4932aefd12402b36c60956a4fe0035421f544799057659ff86f923657aada3" dependencies = [ "find-msvc-tools", "jobserver", "libc", "shlex", @@ -793,13 +793,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "find-msvc-tools" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645cbb3a84e60b7531617d5ae4e57f7e27308f6445f5abf653209ea76dec8dff" +checksum = "f449e6c6c08c865631d4890cfacf252b3d396c9bcc83adb6623cdb02a8336c41" [[package]] name = "flate2" version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1322,13 +1322,13 @@ "icu_properties", ] [[package]] name = "indexmap" -version = "2.12.1" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" +checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", "hashbrown 0.16.1", ] @@ -1405,13 +1405,13 @@ "spin", ] [[package]] name = "libc" -version = "0.2.179" +version = "0.2.180" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5a2d376baa530d1238d133232d15e239abad80d05838b4b59354e5268af431f" +checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" [[package]] name = "libm" version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3630,22 +3630,22 @@ "synstructure", ] [[package]] name = "zerocopy" -version = "0.8.32" +version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fabae64378cb18147bb18bca364e63bdbe72a0ffe4adf0addfec8aa166b2c56" +checksum = "668f5168d10b9ee831de31933dc111a459c97ec93225beb307aed970d1372dfd" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.32" +version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9c2d862265a8bb4471d87e033e730f536e2a285cc7cb05dbce09a2a97075f90" +checksum = "2c7962b26b0a8685668b671ee4b54d007a67d4eaf05fda79ac0ecf41e32270f1" dependencies = [ "proc-macro2", "quote", "syn", ] Index: src/command.rs ================================================================== --- src/command.rs +++ src/command.rs @@ -22,20 +22,20 @@ static ref RE_USERNAME: Regex = Regex::new(r"^@([a-zA-Z][a-zA-Z0-9_]+)$").unwrap(); static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap(); } pub async fn start (core: &Core, msg: &Message) -> Result<()> { - core.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.", + core.tg.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.", Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?; Ok(()) } pub async fn list (core: &Core, msg: &Message) -> Result<()> { let sender = msg.sender.get_user_id() .stack_err("Ignoring unreal users.")?; let reply = core.list(sender).await.stack()?; - core.send(reply, Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?; + core.tg.send(reply, Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?; Ok(()) } pub async fn command (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> { let mut conn = core.db.begin().await.stack()?; @@ -55,11 +55,11 @@ }, } } else { "This command needs exacly one number.".into() }; - core.send(reply, Some(msg.chat.get_id()), None).await.stack()?; + core.tg.send(reply, Some(msg.chat.get_id()), None).await.stack()?; Ok(()) } pub async fn update (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> { let sender = msg.sender.get_user_id() @@ -130,12 +130,12 @@ } }, None => None, }; let chat_id = ChatUsername::from(channel.as_ref()); - let channel_id = core.tg.execute(GetChat::new(chat_id.clone())).await.stack_err("gettting GetChat")?.id; - let chan_adm = core.tg.execute(GetChatAdministrators::new(chat_id)).await + let channel_id = core.tg.client.execute(GetChat::new(chat_id.clone())).await.stack_err("gettting GetChat")?.id; + let chan_adm = core.tg.client.execute(GetChatAdministrators::new(chat_id)).await .context("Sorry, I have no access to that chat.")?; let (mut me, mut user) = (false, false); for admin in chan_adm { let member_id = match admin { ChatMember::Creator(member) => member.user.id, @@ -143,18 +143,18 @@ ChatMember::Left(_) | ChatMember::Kicked(_) | ChatMember::Member{..} | ChatMember::Restricted(_) => continue, }; - if member_id == core.me.id { + if member_id == core.tg.me.id { me = true; } if member_id == sender { user = true; } }; if ! me { bail!("I need to be admin on that channel."); }; if ! user { bail!("You should be admin on that channel."); }; let mut conn = core.db.begin().await.stack()?; - core.send(conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await.stack()?, Some(msg.chat.get_id()), None).await.stack()?; + core.tg.send(conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await.stack()?, Some(msg.chat.get_id()), None).await.stack()?; Ok(()) } Index: src/core.rs ================================================================== --- src/core.rs +++ src/core.rs @@ -1,8 +1,9 @@ use crate::{ command, sql::Db, + tg_bot::Tg, }; use std::{ borrow::Cow, collections::{ @@ -17,30 +18,21 @@ DateTime, Local, }; use lazy_static::lazy_static; use regex::Regex; -use reqwest::header::{ - CACHE_CONTROL, - EXPIRES, - LAST_MODIFIED -}; +use reqwest::header::LAST_MODIFIED; use smol::{ Timer, lock::Mutex, }; use tgbot::{ - api::Client, handler::UpdateHandler, types::{ - Bot, ChatPeerId, Command, - GetBot, - Message, ParseMode, - SendMessage, Update, UpdateType, UserPeerId, }, }; @@ -114,24 +106,21 @@ } } #[derive(Clone)] pub struct Core { - owner_chat: ChatPeerId, - // max_delay: u16, - pub tg: Client, - pub me: Bot, + pub tg: Tg, pub db: Db, running: Arc>>, http_client: reqwest::Client, } pub struct Post { uri: String, - title: String, - authors: String, - summary: String, + _title: String, + _authors: String, + _summary: String, } impl Core { /// Create a Core instance from configuration and start its background autofetch loop. /// @@ -144,37 +133,29 @@ /// /// On success returns an initialized `Core` with Telegram and HTTP clients, database connection, /// an empty running set for per-id tokens, and a spawned background task that periodically runs /// `autofetch`. If any required setting is missing or initialization fails, an error is returned. pub async fn new(settings: config::Config) -> Result { - let owner_chat = ChatPeerId::from(settings.get_int("owner").stack()?); - let api_key = settings.get_string("api_key").stack()?; - let tg = Client::new(&api_key).stack()? - .with_host(settings.get_string("api_gateway").stack()?); - let mut client = reqwest::Client::builder(); if let Ok(proxy) = settings.get_string("proxy") { let proxy = reqwest::Proxy::all(proxy).stack()?; client = client.proxy(proxy); } - let http_client = client.build().stack()?; - let me = tg.execute(GetBot).await.stack()?; + let core = Core { - tg, - me, - owner_chat, + tg: Tg::new(&settings).await.stack()?, db: Db::new(&settings.get_string("pg").stack()?)?, running: Arc::new(Mutex::new(HashSet::new())), - http_client, - // max_delay: 60, + http_client: client.build().stack()?, }; + let clone = core.clone(); smol::spawn(Compat::new(async move { loop { let delay = match &clone.autofetch().await { Err(err) => { - if let Err(err) = clone.send(format!("🛑 {err}"), None, None).await { + if let Err(err) = clone.tg.send(format!("🛑 {err}"), None, None).await { eprintln!("Autofetch error: {err:?}"); }; std::time::Duration::from_secs(60) }, Ok(time) => *time, @@ -183,22 +164,10 @@ } })).detach(); Ok(core) } - pub async fn send (&self, msg: S, target: Option, mode: Option) -> Result - where S: Into { - let msg = msg.into(); - - let mode = mode.unwrap_or(ParseMode::Html); - let target = target.unwrap_or(self.owner_chat); - self.tg.execute( - SendMessage::new(target, msg) - .with_parse_mode(mode) - ).await.stack() - } - /// Fetches the feed for a source, sends any newly discovered posts to the appropriate chat, and records them in the database. /// /// This acquires a per-source guard to prevent concurrent checks for the same `id`. If a check is already running for /// the given `id`, the function returns an error. If `last_scrape` is provided, it is sent as the `If-Modified-Since` /// header to the feed request. The function parses RSS or Atom feeds, sends unseen post URLs to either the source's @@ -216,11 +185,11 @@ pub async fn check (&self, id: i32, real: bool, last_scrape: Option>) -> Result { let mut posted: i32 = 0; let mut conn = self.db.begin().await.stack()?; let _token = Token::new(&self.running, id).await.stack()?; - let source = conn.get_source(id, self.owner_chat).await.stack()?; + let source = conn.get_source(id, self.tg.owner).await.stack()?; conn.set_scrape(id).await.stack()?; let destination = ChatPeerId::from(match real { true => source.channel_id, false => source.owner, }); @@ -232,10 +201,14 @@ builder = builder.header(LAST_MODIFIED, last_scrape.to_rfc2822()); }; let response = builder.send().await.stack()?; #[cfg(debug_assertions)] { + use reqwest::header::{ + CACHE_CONTROL, + EXPIRES, + }; let headers = response.headers(); let expires = headers.get(EXPIRES); let cache = headers.get(CACHE_CONTROL); if expires.is_some() || cache.is_some() { println!("{} {} {:?} {:?} {:?}", Local::now().to_rfc2822(), &source.url, last_scrape, expires, cache); @@ -259,19 +232,15 @@ } }, None => bail!("Feed item misses posting date."), }), }.stack()?; - let uri = link.to_string(); - let title = item.title().unwrap_or("").to_string(); - let authors = item.author().unwrap_or("").to_string(); - let summary = item.content().unwrap_or("").to_string(); posts.insert(date, Post{ - uri, - title, - authors, - summary, + uri: link.to_string(), + _title: item.title().unwrap_or("").to_string(), + _authors: item.author().unwrap_or("").to_string(), + _summary: item.content().unwrap_or("").to_string(), }); } }; }, Err(err) => match err { @@ -287,18 +256,17 @@ bail!("Feed item missing post links."); } else { links[0].href().to_string() } }; - let title = item.title().to_string(); - let authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::>().join(", "); - let summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() }; + let _authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::>().join(", "); + let _summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() }; posts.insert(*date, Post{ uri, - title, - authors, - summary, + _title: item.title().to_string(), + _authors, + _summary, }); }; }, Err(err) => { bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url) @@ -316,11 +284,11 @@ }; if ! conn.exists(&post_url, id).await.stack()? { if this_fetch.is_none() || *date > this_fetch.unwrap() { this_fetch = Some(*date); }; - self.send( match &source.iv_hash { + self.tg.send( match &source.iv_hash { Some(hash) => format!(" {post_url}"), None => format!("{post_url}"), }, Some(destination), Some(ParseMode::Html)).await.stack()?; conn.add_post(id, date, &post_url).await.stack()?; posted += 1; @@ -340,11 +308,11 @@ for row in queue { if let Some(next_fetch) = row.next_fetch { if next_fetch < now { if let (Some(owner), Some(source_id), last_scrape) = (row.owner, row.source_id, row.last_scrape) { let clone = Core { - owner_chat: ChatPeerId::from(owner), + tg: self.tg.with_owner(owner), ..self.clone() }; let source = { let mut conn = self.db.begin().await.stack()?; match conn.get_one(owner, source_id).await { @@ -352,15 +320,14 @@ Ok(None) => "Source not found in database?".to_string(), Err(err) => format!("Failed to fetch source data:\n{err}"), } }; smol::spawn(Compat::new(async move { - if let Err(err) = clone.check(source_id, true, Some(last_scrape)).await { - if let Err(err) = clone.send(&format!("🛑 {source}\n{}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await { - eprintln!("Check error: {err}"); - // clone.disable(&source_id, owner).await.unwrap(); - }; + if let Err(err) = clone.check(source_id, true, Some(last_scrape)).await + && let Err(err) = clone.tg.send(&format!("🛑 {source}\n{}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await + { + eprintln!("Check error: {err}"); }; })).detach(); } } else if next_fetch - now < delay { delay = next_fetch - now; @@ -381,29 +348,29 @@ } } impl UpdateHandler for Core { async fn handle (&self, update: Update) { - if let UpdateType::Message(msg) = update.update_type { - if let Ok(cmd) = Command::try_from(msg) { - let msg = cmd.get_message(); - let words = cmd.get_args(); - let command = cmd.get_name(); - let res = match command { - "/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, command, msg, words).await, - "/start" => command::start(self, msg).await, - "/list" => command::list(self, msg).await, - "/add" | "/update" => command::update(self, command, msg, words).await, - any => Err(anyhow!("Unknown command: {any}")), - }; - if let Err(err) = res { - if let Err(err2) = self.send(format!("\\#error\n```\n{err}\n```"), - Some(msg.chat.get_id()), - Some(ParseMode::MarkdownV2) - ).await{ - dbg!(err2); - }; - } - }; + if let UpdateType::Message(msg) = update.update_type + && let Ok(cmd) = Command::try_from(msg) + { + let msg = cmd.get_message(); + let words = cmd.get_args(); + let command = cmd.get_name(); + let res = match command { + "/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, command, msg, words).await, + "/start" => command::start(self, msg).await, + "/list" => command::list(self, msg).await, + "/add" | "/update" => command::update(self, command, msg, words).await, + any => Err(anyhow!("Unknown command: {any}")), + }; + if let Err(err) = res + && let Err(err2) = self.tg.send(format!("\\#error\n```\n{err}\n```"), + Some(msg.chat.get_id()), + Some(ParseMode::MarkdownV2) + ).await + { + dbg!(err2); + } }; } } Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -4,10 +4,11 @@ #![warn(missing_docs)] mod command; mod core; mod sql; +mod tg_bot; use async_compat::Compat; use stacked_errors::{ Result, StackableErr, @@ -29,9 +30,9 @@ .build() .stack()?; let core = core::Core::new(settings).await.stack()?; - LongPoll::new(core.tg.clone(), core).run().await; + LongPoll::new(core.tg.client.clone(), core).run().await; Ok(()) } ADDED src/tg_bot.rs Index: src/tg_bot.rs ================================================================== --- /dev/null +++ src/tg_bot.rs @@ -0,0 +1,58 @@ +use stacked_errors::{ + Result, + StackableErr, +}; +use tgbot::{ + api::Client, + types::{ + Bot, + ChatPeerId, + GetBot, + Message, + ParseMode, + SendMessage, + }, +}; + +#[derive(Clone)] +pub struct Tg { + pub me: Bot, + pub owner: ChatPeerId, + pub client: Client, +} + +impl Tg { + pub async fn new (settings: &config::Config) -> Result { + let api_key = settings.get_string("api_key").stack()?; + + let owner = ChatPeerId::from(settings.get_int("owner").stack()?); + let client = Client::new(&api_key).stack()? + .with_host(settings.get_string("api_gateway").stack()?) + .with_max_retries(0); + let me = client.execute(GetBot).await.stack()?; + Ok(Tg { + me, + owner, + client, + }) + } + + pub async fn send (&self, msg: S, target: Option, mode: Option) -> Result + where S: Into { + let msg = msg.into(); + + let mode = mode.unwrap_or(ParseMode::Html); + let target = target.unwrap_or(self.owner); + self.client.execute( + SendMessage::new(target, msg) + .with_parse_mode(mode) + ).await.stack() + } + + pub fn with_owner (&self, owner: i64) -> Tg { + Tg { + owner: ChatPeerId::from(owner), + ..self.clone() + } + } +}