Index: Cargo.lock ================================================================== --- Cargo.lock +++ Cargo.lock @@ -1456,10 +1456,16 @@ dependencies = [ "pkg-config", "vcpkg", ] +[[package]] +name = "linked-hash-map" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f" + [[package]] name = "linux-raw-sys" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" @@ -2032,11 +2038,11 @@ "quick-xml", ] [[package]] name = "rsstg" -version = "0.5.6" +version = "0.6.0" dependencies = [ "async-compat", "atom_syndication", "chrono", "config", @@ -2045,14 +2051,17 @@ "lazy_static", "regex", "reqwest", "rss", "sedregex", + "serde", "smol", "sqlx", "stacked_errors", "tgbot", + "toml", + "ttl_cache", "url", ] [[package]] name = "rustc-hash" @@ -2881,14 +2890,16 @@ name = "toml" version = "1.1.2+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "81f3d15e84cbcd896376e6730314d59fb5a87f31e4b038454184435cd57defee" dependencies = [ + "indexmap", "serde_core", "serde_spanned", "toml_datetime", "toml_parser", + "toml_writer", "winnow", ] [[package]] name = "toml_datetime" @@ -2906,10 +2917,16 @@ checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526" dependencies = [ "winnow", ] +[[package]] +name = "toml_writer" +version = "1.1.1+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "756daf9b1013ebe47a8776667b466417e2d4c5679d441c26230efd9ef78692db" + [[package]] name = "tower" version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" @@ -2993,10 +3010,19 @@ [[package]] name = "try-lock" version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" + +[[package]] +name = "ttl_cache" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4189890526f0168710b6ee65ceaedf1460c48a14318ceec933cb26baa492096a" +dependencies = [ + "linked-hash-map", +] [[package]] name = "typenum" version = "1.20.0" source = "registry+https://github.com/rust-lang/crates.io-index" Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,10 +1,12 @@ [package] name = "rsstg" -version = "0.5.6" -authors = ["arcade"] -edition = "2021" +version = "0.6.0" +authors = [ "arcade@b1t.name" ] +edition = "2024" +license = "0BSD" +repository = "http://fs.b1t.name/rsstg" [dependencies] async-compat = "0.2.5" atom_syndication = { version = "0.12.4", features = [ "with-serde" ] } chrono = "0.4.38" @@ -15,13 +17,16 @@ lazy_static = "1.5.0" regex = "1.10.6" reqwest = { version = "0.13.1", features = [ "brotli", "socks", "deflate" ]} rss = "2.0.9" sedregex = "0.2.5" +serde = "1.0.228" smol = "2.0.2" stacked_errors = "0.7.1" sqlx = { version = "0.8", features = [ "postgres", "runtime-tokio-rustls", "chrono", "macros" ], default-features = false } +toml = "1.1.0" +ttl_cache = "0.5.1" url = "2.5.8" [profile.release] lto = true codegen-units = 1 Index: LICENSE.0BSD ================================================================== --- LICENSE.0BSD +++ LICENSE.0BSD @@ -1,6 +1,6 @@ -Copyright (C) 2020-2023 by Volodymyr Kostyrko +Copyright (C) 2020-2026 by Volodymyr Kostyrko Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH Index: src/command.rs ================================================================== --- src/command.rs +++ src/command.rs @@ -1,6 +1,13 @@ -use crate::core::Core; +use crate::{ + core::Core, + tg_bot::{ + Callback, + MyMessage, + get_kb, + }, +}; use lazy_static::lazy_static; use regex::Regex; use sedregex::ReplaceCommand; use stacked_errors::{ @@ -7,38 +14,68 @@ Result, StackableErr, bail, }; use tgbot::types::{ + CallbackQuery, + Chat, ChatMember, ChatUsername, GetChat, GetChatAdministrators, + MaybeInaccessibleMessage, Message, - ParseMode::MarkdownV2, }; use url::Url; lazy_static! { static ref RE_USERNAME: Regex = Regex::new(r"^@([a-zA-Z][a-zA-Z0-9_]+)$").unwrap(); static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap(); } +/// Sends an informational message to the message's chat linking to the bot help channel. pub async fn start (core: &Core, msg: &Message) -> Result<()> { - core.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.", - Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?; + core.tg.send(MyMessage::html_to( + "We are open. Probably. Visit channel) for details.", + msg.chat.get_id() + )).await.stack()?; Ok(()) } +/// Send the sender's subscription list to the chat. +/// +/// Retrieves the message sender's user ID, obtains their subscription list from `core`, +/// and sends the resulting reply into the message chat using HTML pub async fn list (core: &Core, msg: &Message) -> Result<()> { let sender = msg.sender.get_user_id() .stack_err("Ignoring unreal users.")?; let reply = core.list(sender).await.stack()?; - core.send(reply, Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?; + core.tg.send(MyMessage::html_to(reply, msg.chat.get_id())).await.stack()?; + Ok(()) +} + +pub async fn test (core: &Core, msg: &Message) -> Result<()> { + let sender: i64 = msg.sender.get_user_id() + .stack_err("Ignoring unreal users.")?.into(); + let feeds = core.get_feeds(sender).await.stack()?; + let kb = get_kb(&Callback::menu(), &feeds).await.stack()?; + core.tg.send(MyMessage::html_to_kb("Main menu:", msg.chat.get_id(), kb)).await.stack()?; Ok(()) } +/// Handle channel-management commands that operate on a single numeric source ID. +/// +/// This validates that exactly one numeric argument is provided, performs the requested +/// operation (check, clean, enable, delete, disable) against the database or core, +/// and sends the resulting reply to the chat. +/// +/// # Parameters +/// +/// - `core`: application core containing database and Telegram clients. +/// - `command`: command string (e.g. "/check", "/clean", "/enable", "/delete", "/disable"). +/// - `msg`: incoming Telegram message that triggered the command; used to determine sender and chat. +/// - `words`: command arguments; expected to contain exactly one element that parses as a 32-bit integer. pub async fn command (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> { let mut conn = core.db.begin().await.stack()?; let sender = msg.sender.get_user_id() .stack_err("Ignoring unreal users.")?; let reply = if words.len() == 1 { @@ -47,22 +84,38 @@ Ok(number) => match command { "/check" => core.check(number, false, None).await .context("Channel check failed.")?.into(), "/clean" => conn.clean(number, sender).await.stack()?, "/enable" => conn.enable(number, sender).await.stack()?.into(), - "/delete" => conn.delete(number, sender).await.stack()?, + "/delete" => { + let res = conn.delete(number, sender).await.stack()?; + core.rm_feed(sender.into(), &number).await.stack()?; + res + } "/disable" => conn.disable(number, sender).await.stack()?.into(), _ => bail!("Command {command} {words:?} not handled."), }, } } else { - "This command needs exacly one number.".into() + "This command needs exactly one number.".into() }; - core.send(reply, Some(msg.chat.get_id()), None).await.stack()?; + core.tg.send(MyMessage::html_to(reply, msg.chat.get_id())).await.stack()?; Ok(()) } +/// Validate command arguments, check permissions and update or add a channel feed configuration in the database. +/// +/// This function parses and validates parameters supplied by a user command (either "/update ..." or "/add ..."), +/// verifies the channel username and feed URL, optionally validates an IV hash and a replacement regexp, +/// ensures both the bot and the command sender are administrators of the target channel, and performs the database update. +/// +/// # Parameters +/// +/// - `command` — the invoked command, expected to be either `"/update"` (followed by a numeric source id) or `"/add"`. +/// - `msg` — the incoming Telegram message; used to derive the command sender and target chat id for the reply. +/// - `words` — the command arguments: for `"/add"` expected `channel url [iv_hash|'-'] [url_re|'-']`; for `"/update"` +/// the first element must be a numeric `source_id` followed by the same parameters. pub async fn update (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> { let sender = msg.sender.get_user_id() .stack_err("Ignoring unreal users.")?; let mut source_id: Option = None; let at_least = "Requires at least 3 parameters."; @@ -79,21 +132,10 @@ let (channel, url, iv_hash, url_re) = ( i_words.next().context(at_least)?, i_words.next().context(at_least)?, i_words.next(), i_words.next()); - /* - let channel = match RE_USERNAME.captures(channel) { - Some(caps) => match caps.get(1) { - Some(data) => data.as_str(), - None => bail!("No string found in channel name"), - }, - None => { - bail!("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?\nNot {channel:?}"); - }, - }; - */ if ! RE_USERNAME.is_match(channel) { bail!("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?\nNot {channel:?}"); }; { let parsed_url = Url::parse(url) @@ -130,12 +172,12 @@ } }, None => None, }; let chat_id = ChatUsername::from(channel.as_ref()); - let channel_id = core.tg.execute(GetChat::new(chat_id.clone())).await.stack_err("gettting GetChat")?.id; - let chan_adm = core.tg.execute(GetChatAdministrators::new(chat_id)).await + let channel_id = core.tg.client.execute(GetChat::new(chat_id.clone())).await.stack_err("getting GetChat")?.id; + let chan_adm = core.tg.client.execute(GetChatAdministrators::new(chat_id)).await .context("Sorry, I have no access to that chat.")?; let (mut me, mut user) = (false, false); for admin in chan_adm { let member_id = match admin { ChatMember::Creator(member) => member.user.id, @@ -143,18 +185,59 @@ ChatMember::Left(_) | ChatMember::Kicked(_) | ChatMember::Member{..} | ChatMember::Restricted(_) => continue, }; - if member_id == core.me.id { + if member_id == core.tg.me.id { me = true; } if member_id == sender { user = true; } }; if ! me { bail!("I need to be admin on that channel."); }; if ! user { bail!("You should be admin on that channel."); }; let mut conn = core.db.begin().await.stack()?; - core.send(conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await.stack()?, Some(msg.chat.get_id()), None).await.stack()?; + let update = conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await.stack()?; + core.tg.send(MyMessage::html_to(update, msg.chat.get_id())).await.stack()?; + if command == "/add" { + if let Some(new_record) = conn.get_one_name(sender, channel).await.stack()? { + core.add_feed(sender.into(), new_record.source_id, new_record.channel).await.stack()?; + } else { + bail!("Failed to read data on freshly inserted source."); + } + }; + Ok(()) +} + +pub async fn answer_cb (core: &Core, query: &CallbackQuery, cb: &str) -> Result<()> { + let cb: Callback = toml::from_str(cb).stack()?; + let sender = &query.from; + //let mut conn = core.db.begin().await.stack()?; + let text = "Sample".to_owned(); + if let Some(msg) = &query.message { + match msg { + MaybeInaccessibleMessage::Message(message) => { + if let Some(owner) = message.sender.get_user() + && sender == owner + { + let feeds = core.get_feeds(owner.id.into()).await.stack()?; + core.tg.update_message(message.chat.get_id().into(), message.id, text, &feeds, cb).await?; + } else { + core.tg.send(MyMessage::html(format!("Can't identify request sender:
{:?}
", message))).await.stack()?; + } + }, + MaybeInaccessibleMessage::InaccessibleMessage(message) => { + let sender: i64 = sender.id.into(); + if let Chat::Private(priv_chat) = &message.chat + && priv_chat.id == sender + { + let feeds = core.get_feeds(priv_chat.id.into()).await.stack()?; + core.tg.update_message(message.chat.get_id().into(), message.message_id, text, &feeds, cb).await?; + } else { + core.tg.send(MyMessage::html(format!("Can't identify request sender:
{:?}
", message))).await.stack()?; + } + }, + }; + }; Ok(()) } Index: src/core.rs ================================================================== --- src/core.rs +++ src/core.rs @@ -1,70 +1,59 @@ use crate::{ + Arc, command, + Mutex, sql::Db, + tg_bot::{ + Callback, + MyMessage, + Tg, + validate, + }, }; use std::{ borrow::Cow, collections::{ BTreeMap, HashSet, }, - sync::Arc, + time::Duration, }; use async_compat::Compat; use chrono::{ DateTime, Local, }; use lazy_static::lazy_static; use regex::Regex; -use reqwest::header::{ - CACHE_CONTROL, - EXPIRES, - LAST_MODIFIED -}; -use smol::{ - Timer, - lock::Mutex, -}; -use tgbot::{ - api::Client, - handler::UpdateHandler, - types::{ - Bot, - ChatPeerId, - Command, - GetBot, - Message, - ParseMode, - SendMessage, - Update, - UpdateType, - UserPeerId, - }, -}; +use reqwest::header::LAST_MODIFIED; +use smol::Timer; use stacked_errors::{ Result, StackableErr, anyhow, bail, }; +use tgbot::{ + handler::UpdateHandler, + types::{ + CallbackQuery, + ChatPeerId, + Command, + Update, + UpdateType, + UserPeerId, + }, +}; +use ttl_cache::TtlCache; lazy_static!{ pub static ref RE_SPECIAL: Regex = Regex::new(r"([\-_*\[\]()~`>#+|{}\.!])").unwrap(); } -/// Escape characters that are special in Telegram MarkdownV2 by prefixing them with a backslash. -/// -/// This ensures the returned string can be used as MarkdownV2-formatted Telegram message content -/// without special characters being interpreted as MarkdownV2 markup. -pub fn encode (text: &str) -> Cow<'_, str> { - RE_SPECIAL.replace_all(text, "\\$1") -} - // This one does nothing except making sure only one token exists for each id pub struct Token { running: Arc>>, my_id: i32, } @@ -112,71 +101,65 @@ set.remove(&self.my_id); }) } } +pub type FeedList = BTreeMap; +type UserCache = TtlCache>>; + #[derive(Clone)] pub struct Core { - owner_chat: ChatPeerId, - // max_delay: u16, - pub tg: Client, - pub me: Bot, + pub tg: Tg, pub db: Db, + pub feeds: Arc>, running: Arc>>, http_client: reqwest::Client, } // XXX Right now that part is unfinished and I guess I need to finish menu first #[allow(unused)] pub struct Post { uri: String, - title: String, - authors: String, - summary: String, + _title: String, + _authors: String, + _summary: String, } impl Core { /// Create a Core instance from configuration and start its background autofetch loop. /// /// The provided `settings` must include: - /// - `owner` (integer): chat id to use as the default destination, + /// - `owner` (integer): default chat id to use as the owner/destination, /// - `api_key` (string): Telegram bot API key, /// - `api_gateway` (string): Telegram API gateway host, /// - `pg` (string): PostgreSQL connection string, /// - optional `proxy` (string): proxy URL for the HTTP client. /// /// On success returns an initialized `Core` with Telegram and HTTP clients, database connection, /// an empty running set for per-id tokens, and a spawned background task that periodically runs /// `autofetch`. If any required setting is missing or initialization fails, an error is returned. pub async fn new(settings: config::Config) -> Result { - let owner_chat = ChatPeerId::from(settings.get_int("owner").stack()?); - let api_key = settings.get_string("api_key").stack()?; - let tg = Client::new(&api_key).stack()? - .with_host(settings.get_string("api_gateway").stack()?); - let mut client = reqwest::Client::builder(); if let Ok(proxy) = settings.get_string("proxy") { let proxy = reqwest::Proxy::all(proxy).stack()?; client = client.proxy(proxy); } - let http_client = client.build().stack()?; - let me = tg.execute(GetBot).await.stack()?; + let core = Core { - tg, - me, - owner_chat, + tg: Tg::new(&settings).await.stack()?, db: Db::new(&settings.get_string("pg").stack()?)?, + feeds: Arc::new(Mutex::new(TtlCache::new(10000))), running: Arc::new(Mutex::new(HashSet::new())), - http_client, - // max_delay: 60, + http_client: client.build().stack()?, }; + let clone = core.clone(); smol::spawn(Compat::new(async move { loop { let delay = match &clone.autofetch().await { Err(err) => { - if let Err(err) = clone.send(format!("šŸ›‘ {err}"), None, None).await { + if let Err(err) = clone.tg.send(MyMessage::html(format!("šŸ›‘ {err}"))).await { eprintln!("Autofetch error: {err:?}"); }; std::time::Duration::from_secs(60) }, Ok(time) => *time, @@ -185,22 +168,10 @@ } })).detach(); Ok(core) } - pub async fn send (&self, msg: S, target: Option, mode: Option) -> Result - where S: Into { - let msg = msg.into(); - - let mode = mode.unwrap_or(ParseMode::Html); - let target = target.unwrap_or(self.owner_chat); - self.tg.execute( - SendMessage::new(target, msg) - .with_parse_mode(mode) - ).await.stack() - } - /// Fetches the feed for a source, sends any newly discovered posts to the appropriate chat, and records them in the database. /// /// This acquires a per-source guard to prevent concurrent checks for the same `id`. If a check is already running for /// the given `id`, the function returns an error. If `last_scrape` is provided, it is sent as the `If-Modified-Since` /// header to the feed request. The function parses RSS or Atom feeds, sends unseen post URLs to either the source's @@ -218,11 +189,11 @@ pub async fn check (&self, id: i32, real: bool, last_scrape: Option>) -> Result { let mut posted: i32 = 0; let mut conn = self.db.begin().await.stack()?; let _token = Token::new(&self.running, id).await.stack()?; - let source = conn.get_source(id, self.owner_chat).await.stack()?; + let source = conn.get_source(id, self.tg.owner).await.stack()?; conn.set_scrape(id).await.stack()?; let destination = ChatPeerId::from(match real { true => source.channel_id, false => source.owner, }); @@ -234,10 +205,14 @@ builder = builder.header(LAST_MODIFIED, last_scrape.to_rfc2822()); }; let response = builder.send().await.stack()?; #[cfg(debug_assertions)] { + use reqwest::header::{ + CACHE_CONTROL, + EXPIRES, + }; let headers = response.headers(); let expires = headers.get(EXPIRES); let cache = headers.get(CACHE_CONTROL); if expires.is_some() || cache.is_some() { println!("{} {} {:?} {:?} {:?}", Local::now().to_rfc2822(), &source.url, last_scrape, expires, cache); @@ -261,19 +236,15 @@ } }, None => bail!("Feed item misses posting date."), }), }.stack()?; - let uri = link.to_string(); - let title = item.title().unwrap_or("").to_string(); - let authors = item.author().unwrap_or("").to_string(); - let summary = item.content().unwrap_or("").to_string(); posts.insert(date, Post{ - uri, - title, - authors, - summary, + uri: link.to_string(), + _title: item.title().unwrap_or("").to_string(), + _authors: item.author().unwrap_or("").to_string(), + _summary: item.content().unwrap_or("").to_string(), }); } }; }, Err(err) => match err { @@ -289,18 +260,17 @@ bail!("Feed item missing post links."); } else { links[0].href().to_string() } }; - let title = item.title().to_string(); - let authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::>().join(", "); - let summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() }; + let _authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::>().join(", "); + let _summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() }; posts.insert(*date, Post{ uri, - title, - authors, - summary, + _title: item.title().to_string(), + _authors, + _summary, }); }; }, Err(err) => { bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url) @@ -318,22 +288,27 @@ }; if ! conn.exists(&post_url, id).await.stack()? { if this_fetch.is_none() || *date > this_fetch.unwrap() { this_fetch = Some(*date); }; - self.send( match &source.iv_hash { + self.tg.send(MyMessage::html_to(match &source.iv_hash { Some(hash) => format!(" {post_url}"), None => format!("{post_url}"), - }, Some(destination), Some(ParseMode::Html)).await.stack()?; + }, destination)).await.stack()?; conn.add_post(id, date, &post_url).await.stack()?; posted += 1; }; }; posts.clear(); Ok(format!("Posted: {posted}")) } + /// Determine the delay until the next scheduled fetch and spawn background checks for any overdue sources. + /// + /// This scans the database queue, spawns background tasks to run checks for sources whose `next_fetch` + /// is in the past (each task uses a Core clone with the appropriate owner), and computes the shortest + /// duration until the next `next_fetch`. async fn autofetch(&self) -> Result { let mut delay = chrono::Duration::minutes(1); let now = chrono::Local::now(); let queue = { let mut conn = self.db.begin().await.stack()?; @@ -342,11 +317,11 @@ for row in queue { if let Some(next_fetch) = row.next_fetch { if next_fetch < now { if let (Some(owner), Some(source_id), last_scrape) = (row.owner, row.source_id, row.last_scrape) { let clone = Core { - owner_chat: ChatPeerId::from(owner), + tg: self.tg.with_owner(owner), ..self.clone() }; let source = { let mut conn = self.db.begin().await.stack()?; match conn.get_one(owner, source_id).await { @@ -354,15 +329,14 @@ Ok(None) => "Source not found in database?".to_string(), Err(err) => format!("Failed to fetch source data:\n{err}"), } }; smol::spawn(Compat::new(async move { - if let Err(err) = clone.check(source_id, true, Some(last_scrape)).await { - if let Err(err) = clone.send(&format!("šŸ›‘ {source}\n{}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await { - eprintln!("Check error: {err}"); - // clone.disable(&source_id, owner).await.unwrap(); - }; + if let Err(err) = clone.check(source_id, true, Some(last_scrape)).await + && let Err(err) = clone.tg.send(MyMessage::html(format!("šŸ›‘ {source}\n
{}
", &err.to_string()))).await + { + eprintln!("Check error: {err}"); }; })).detach(); } } else if next_fetch - now < delay { delay = next_fetch - now; @@ -370,42 +344,134 @@ } }; delay.to_std().stack() } + /// Displays full list of managed channels for specified user pub async fn list (&self, owner: UserPeerId) -> Result { let mut reply: Vec = vec![]; reply.push("Channels:".into()); let mut conn = self.db.begin().await.stack()?; for row in conn.get_list(owner).await.stack()? { reply.push(row.to_string()); }; Ok(reply.join("\n\n")) } + + /// Returns current cached list of feed for requested user, or loads data from database + pub async fn get_feeds (&self, owner: i64) -> Result>> { + let mut feeds = self.feeds.lock_arc().await; + Ok(match feeds.get(&owner) { + None => { + let mut conn = self.db.begin().await.stack()?; + let feed_list = conn.get_feeds(owner).await.stack()?; + let mut map = BTreeMap::new(); + for feed in feed_list { + map.insert(feed.source_id, feed.channel); + }; + let res = Arc::new(Mutex::new(map)); + feeds.insert(owner, res.clone(), Duration::from_secs(60 * 60 * 3)); + res + }, + Some(res) => res.clone(), + }) + } + + /// Adds feed to cached list + pub async fn add_feed (&self, owner: i64, source_id: i32, channel: String) -> Result<()> { + let mut inserted = true; + { + let mut feeds = self.feeds.lock_arc().await; + if let Some(feed) = feeds.get_mut(&owner) { + let mut feed = feed.lock_arc().await; + feed.insert(source_id, channel); + } else { + inserted = false; + } + } + // in case insert failed - we miss the entry we needed to expand, reload everything from + // database + if !inserted { + self.get_feeds(owner).await.stack()?; + } + Ok(()) + } + + /// Removes feed from cached list + pub async fn rm_feed (&self, owner: i64, source_id: &i32) -> Result<()> { + let mut dropped = false; + { + let mut feeds = self.feeds.lock_arc().await; + if let Some(feed) = feeds.get_mut(&owner) { + let mut feed = feed.lock_arc().await; + feed.remove(source_id); + dropped = true; + } + } + // in case we failed to found feed we need to remove - just reload everything from database + if !dropped { + self.get_feeds(owner).await.stack()?; + } + Ok(()) + } + + pub async fn cb (&self, query: &CallbackQuery, cb: &str) -> Result<()> { + let cb: Callback = toml::from_str(cb).stack()?; + todo!(); + Ok(()) + } } impl UpdateHandler for Core { - async fn handle (&self, update: Update) { - if let UpdateType::Message(msg) = update.update_type { - if let Ok(cmd) = Command::try_from(*msg) { - let msg = cmd.get_message(); - let words = cmd.get_args(); - let command = cmd.get_name(); - let res = match command { - "/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, command, msg, words).await, - "/start" => command::start(self, msg).await, - "/list" => command::list(self, msg).await, - "/add" | "/update" => command::update(self, command, msg, words).await, - any => Err(anyhow!("Unknown command: {any}")), - }; - if let Err(err) = res { - if let Err(err2) = self.send(format!("\\#error\n```\n{err}\n```"), - Some(msg.chat.get_id()), - Some(ParseMode::MarkdownV2) - ).await{ - dbg!(err2); - }; - } - }; - }; + /// Dispatches an incoming Telegram update to a matching command handler and reports handler errors to the originating chat. + /// + /// This method inspects the update; if it contains a message that can be parsed as a bot command, + /// it executes the corresponding command handler. If the handler returns an error, the error text + /// is sent back to the message's chat. Unknown commands produce an error which is also reported to the chat. + async fn handle (&self, update: Update) -> () { + match update.update_type { + UpdateType::Message(msg) => { + if let Ok(cmd) = Command::try_from(*msg) { + let msg = cmd.get_message(); + let words = cmd.get_args(); + let command = cmd.get_name(); + let res = match command { + "/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, command, msg, words).await, + "/start" => command::start(self, msg).await, + "/list" => command::list(self, msg).await, + "/test" => command::test(self, msg).await, + "/add" | "/update" => command::update(self, command, msg, words).await, + any => Err(anyhow!("Unknown command: {any}")), + }; + if let Err(err) = res { + match validate(&err.to_string()) { + Ok(text) => { + if let Err(err2) = self.tg.send(MyMessage::html_to( + format!("#error
{}
", text), + msg.chat.get_id(), + )).await { + dbg!(err2); + } + }, + Err(err2) => { + dbg!(err2); + }, + } + } + } else { + // not a command + } + }, + UpdateType::CallbackQuery(query) => { + if let Some(ref cb) = query.data + && let Err(err) = self.cb(&query, cb).await + && let Err(err) = self.tg.answer_cb(query.id, err.to_string()).await + { + println!("{err:?}"); + } + }, + _ => { + println!("Unhandled UpdateKind:\n{update:?}") + }, + } } } Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -4,12 +4,16 @@ #![warn(missing_docs)] mod command; mod core; mod sql; +mod tg_bot; + +use std::sync::Arc; use async_compat::Compat; +use smol::lock::Mutex; use stacked_errors::{ Result, StackableErr, }; use tgbot::handler::LongPoll; @@ -20,18 +24,22 @@ })); Ok(()) } +/// Initialises configuration and the bot core, then runs the Telegram long-poll loop. +/// +/// This function loads configuration (with a default API gateway), constructs the application +/// core, and starts the long-polling loop that handles incoming Telegram updates. async fn async_main () -> Result<()> { let settings = config::Config::builder() .set_default("api_gateway", "https://api.telegram.org").stack()? .add_source(config::File::with_name("rsstg")) .build() .stack()?; let core = core::Core::new(settings).await.stack()?; - LongPoll::new(core.tg.clone(), core).run().await; + LongPoll::new(core.tg.client.clone(), core).run().await; Ok(()) } Index: src/sql.rs ================================================================== --- src/sql.rs +++ src/sql.rs @@ -1,12 +1,15 @@ +use crate::{ + Arc, + Mutex, +}; + use std::{ borrow::Cow, fmt, - sync::Arc, }; -use smol::lock::Mutex; use chrono::{ DateTime, FixedOffset, Local, }; @@ -32,24 +35,31 @@ pub url_re: Option, } impl fmt::Display for List { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> { - write!(f, "\\#feed\\_{} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", self.source_id, self.channel, + write!(f, "#feed_{} *ļøāƒ£ {} {}\nšŸ”— {}", self.source_id, self.channel, match self.enabled { true => "šŸ”„ enabled", false => "ā›” disabled", }, self.url)?; if let Some(iv_hash) = &self.iv_hash { - write!(f, "\nIV: `{iv_hash}`")?; + write!(f, "\nIV: {iv_hash}")?; } if let Some(url_re) = &self.url_re { - write!(f, "\nRE: `{url_re}`")?; + write!(f, "\nRE: {url_re}")?; } Ok(()) } } + +/// One feed, used for caching and menu navigation +#[derive(sqlx::FromRow, Debug)] +pub struct Feed { + pub source_id: i32, + pub channel: String, +} #[derive(sqlx::FromRow, Debug)] pub struct Source { pub channel_id: i64, pub url: String, @@ -148,31 +158,44 @@ 0 => { Ok("Source not found.") }, _ => { bail!("Database error.") }, } } + /// Checks whether a post with the given URL exists for the specified source. + /// + /// # Parameters + /// - `post_url`: The URL of the post to check. + /// - `id`: The source identifier (converted to `i64`). + /// + /// # Returns + /// `true` if a post with the URL exists for the source, `false` otherwise. pub async fn exists (&mut self, post_url: &str, id: I) -> Result where I: Into { let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") .bind(post_url) .bind(id.into()) .fetch_one(&mut *self.0).await.stack()?; - if let Some(exists) = row.try_get("exists").stack()? { - Ok(exists) - } else { - bail!("Database error: can't check whether source exists."); - } + row.try_get("exists") + .stack_err("Database error: can't check whether post exists.") + } + + pub async fn get_feeds (&mut self, owner: I) -> Result> + where I: Into { + let block: Vec = sqlx::query_as("select source_id, channel from rsstg_source where owner = $1 order by source_id") + .bind(owner.into()) + .fetch_all(&mut *self.0).await.stack()?; + Ok(block) } /// Get all pending events for (now + 1 minute) pub async fn get_queue (&mut self) -> Result> { let block: Vec = sqlx::query_as("select source_id, next_fetch, owner, last_scrape from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") .fetch_all(&mut *self.0).await.stack()?; Ok(block) } - pub async fn get_list (&mut self, owner: I) -> Result> + pub async fn get_list (&mut self, owner: I) -> Result> where I: Into { let source: Vec = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id") .bind(owner.into()) .fetch_all(&mut *self.0).await.stack()?; Ok(source) @@ -184,10 +207,19 @@ .bind(owner.into()) .bind(id) .fetch_optional(&mut *self.0).await.stack()?; Ok(source) } + + pub async fn get_one_name (&mut self, owner: I, name: &str) -> Result> + where I: Into { + let source: Option = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and channel = $2") + .bind(owner.into()) + .bind(name) + .fetch_optional(&mut *self.0).await.stack()?; + Ok(source) + } pub async fn get_source (&mut self, id: i32, owner: I) -> Result where I: Into { let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2") .bind(id) @@ -223,11 +255,11 @@ .bind(iv_hash) .bind(owner.into()) .bind(channel) .bind(url_re) .execute(&mut *self.0).await - { + { Ok(_) => Ok(match update { Some(_) => "Channel updated.", None => "Channel added.", }), Err(sqlx::Error::Database(err)) => { ADDED src/tg_bot.rs Index: src/tg_bot.rs ================================================================== --- /dev/null +++ src/tg_bot.rs @@ -0,0 +1,323 @@ +use crate::{ + Arc, + Mutex, + core::FeedList, +}; + +use std::{ + borrow::Cow, + fmt, + time::Duration, +}; + +use lazy_static::lazy_static; +use serde::{ + Deserialize, + Serialize, +}; +use regex::Regex; +use smol::Timer; +use stacked_errors::{ + bail, + Result, + StackableErr, +}; +use tgbot::{ + api::{ + Client, + ExecuteError + }, + types::{ + AnswerCallbackQuery, + Bot, + ChatPeerId, + EditMessageResult, + EditMessageText, + GetBot, + InlineKeyboardButton, + InlineKeyboardMarkup, + Message, + ParseMode, + SendMessage, + }, +}; + +const CB_VERSION: u8 = 0; + +lazy_static! { + pub static ref RE_CLOSING: Regex = Regex::new(r"").unwrap(); +} + +// validate input as being postable in preformatted block, all html tags are fine, except tags that +// break the block - and , we don't need to escape anything else, as telegram manages +// that +pub fn validate (text: &str) -> Result<&str> { + if RE_CLOSING.is_match(text) { + bail!("Telegram closing tag found."); + } else { + Ok(text) + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub enum Callback { + // Edit one feed (version, name) + Edit(u8, String), + // List all feeds (version, name to show, page number) + List(u8, String, usize), + // Show root menu (version) + Menu(u8), +} + +impl Callback { + pub fn edit (text: S) -> Callback + where S: Into { + Callback::Edit(CB_VERSION, text.into()) + } + + pub fn list (text: S, page: usize) -> Callback + where S: Into { + Callback::List(CB_VERSION, text.into(), page) + } + + pub fn menu () -> Callback { + Callback::Menu(CB_VERSION) + } + + fn version (&self) -> u8 { + match self { + Callback::Edit(version, .. ) => *version, + Callback::List(version, .. ) => *version, + Callback::Menu(version) => *version, + } + } +} + +impl fmt::Display for Callback { + fn fmt (&self, f: &mut fmt::Formatter) -> fmt::Result { + f.write_str(&toml::to_string(self).map_err(|_| fmt::Error)?) + } +} + +/// Produce new Keyboard Markup from current Callback +pub async fn get_kb (cb: &Callback, feeds: &Arc>) -> Result { + if cb.version() != CB_VERSION { + bail!("Wrong callback version."); + } + let mark = match cb { + Callback::Edit(_, _name) => { // XXX edit missing + let kb: Vec> = vec![]; + InlineKeyboardMarkup::from(kb) + }, + Callback::List(_, name, page) => { + let mut kb = vec![]; + let feeds = feeds.lock_arc().await; + let long = feeds.len() > 6; + let (start, end) = if long { + (page * 5, 5 + page * 5) + } else { + (0, 6) + }; + let mut i = 0; + if name.is_empty() { + let feed_iter = feeds.iter().skip(start); + for (id, name) in feed_iter { + kb.push(vec![ + InlineKeyboardButton::for_callback_data( + format!("{}. {}", id, name), + Callback::edit(name).to_string()), + ]); + i += 1; + if i == end { break } + } + } else { + let mut found = false; + let mut first_page = None; + for (id, feed_name) in feeds.iter() { + if name == feed_name { + found = true; + } + i += 1; + kb.push(vec![ + InlineKeyboardButton::for_callback_data( + format!("{}. {}", id, feed_name), + Callback::list("xxx", *page).to_string()), // XXX edit + ]); + if i == end { + // page complete, if found we got the right page, if not - reset and + // continue. + if found { + break + } else { + if first_page.is_none() { + first_page = Some(kb); + } + kb = vec![]; + i = 0 + } + } + } + if !found { + // name not found, showing first page + kb = first_page.unwrap_or_default(); + } + } + if long { + kb.push(vec![ + InlineKeyboardButton::for_callback_data("<<", + Callback::list("", if *page == 0 { *page } else { page - 1 } ).to_string()), + InlineKeyboardButton::for_callback_data(">>", + Callback::list("", page.saturating_add(1)).to_string()), + ]); + } + InlineKeyboardMarkup::from(kb) + }, + Callback::Menu(_) => { + let kb = vec![ + vec![ + InlineKeyboardButton::for_callback_data( + "Add new channel", + Callback::menu().to_string()), // new XXX + ], + vec![ + InlineKeyboardButton::for_callback_data( + "List channels", + Callback::list("", 0).to_string()), + ], + ]; + InlineKeyboardMarkup::from(kb) + }, + }; + Ok(mark) +} + +pub enum MyMessage <'a> { + Html { text: Cow<'a, str> }, + HtmlTo { text: Cow<'a, str>, to: ChatPeerId }, + HtmlToKb { text: Cow<'a, str>, to: ChatPeerId, kb: InlineKeyboardMarkup }, +} + +impl MyMessage <'_> { + pub fn html <'a, S> (text: S) -> MyMessage<'a> + where S: Into> { + let text = text.into(); + MyMessage::Html { text } + } + + pub fn html_to <'a, S> (text: S, to: ChatPeerId) -> MyMessage<'a> + where S: Into> { + let text = text.into(); + MyMessage::HtmlTo { text, to } + } + + pub fn html_to_kb <'a, S> (text: S, to: ChatPeerId, kb: InlineKeyboardMarkup) -> MyMessage<'a> + where S: Into> { + let text = text.into(); + MyMessage::HtmlToKb { text, to, kb } + } + + fn req (&self, tg: &Tg) -> SendMessage { + match self { + MyMessage::Html { text } => + SendMessage::new(tg.owner, text.as_ref()) + .with_parse_mode(ParseMode::Html), + MyMessage::HtmlTo { text, to } => + SendMessage::new(*to, text.as_ref()) + .with_parse_mode(ParseMode::Html), + MyMessage::HtmlToKb { text, to, kb } => + SendMessage::new(*to, text.as_ref()) + .with_parse_mode(ParseMode::Html) + .with_reply_markup(kb.clone()), + } + } +} + +#[derive(Clone)] +pub struct Tg { + pub me: Bot, + pub owner: ChatPeerId, + pub client: Client, +} + +impl Tg { + /// Construct a new `Tg` instance from configuration. + /// + /// The `settings` must provide the following keys: + /// - `"api_key"` (string), + /// - `"owner"` (integer chat id), + /// - `"api_gateway"` (string). + /// + /// The function initialises the client, configures the gateway and fetches the bot identity + /// before returning the constructed `Tg`. + pub async fn new (settings: &config::Config) -> Result { + let api_key = settings.get_string("api_key").stack()?; + + let owner = ChatPeerId::from(settings.get_int("owner").stack()?); + // We don't use retries, as in async environment this will just get us stuck for extra + // amount of time on simple requests. Just bail, show error and ack it in the code. In + // other case we might got stuck with multiple open transactions in database. + let client = Client::new(&api_key).stack()? + .with_host(settings.get_string("api_gateway").stack()?) + .with_max_retries(0); + let me = client.execute(GetBot).await.stack()?; + Ok(Tg { + me, + owner, + client, + }) + } + + /// Send a text message to a chat, using an optional target and parse mode. + /// + /// # Returns + /// The sent `Message` on success. + pub async fn send (&self, msg: MyMessage<'_>) -> Result { + self.client.execute(msg.req(self)).await.stack() + } + + pub async fn answer_cb (&self, id: String, text: String) -> Result { + self.client.execute( + AnswerCallbackQuery::new(id) + .with_text(text) + ).await.stack() + } + + /// Create a copy of this `Tg` with the owner replaced by the given chat ID. + /// + /// # Parameters + /// - `owner`: The Telegram chat identifier to set as the new owner (expressed as an `i64`). + /// + /// # Returns + /// A new `Tg` instance identical to the original except its `owner` field is set to the provided chat ID. + pub fn with_owner (&self, owner: O) -> Tg + where O: Into { + Tg { + owner: ChatPeerId::from(owner.into()), + ..self.clone() + } + } + + // XXX Can loop indefinitely if API calls results retry_after, add max retries? + pub async fn update_message (&self, chat_id: i64, message_id: i64, text: String, feeds: &Arc>, cb: Callback) -> Result { + loop { + let req = EditMessageText::for_chat_message(chat_id, message_id, &text) + .with_reply_markup(get_kb(&cb, feeds).await.stack()?); + let res = self.client.execute(req).await; + match res { + Ok(res) => return Ok(res), + Err(ref err) => { + if let ExecuteError::Response(resp) = err + && let Some(delay) = resp.retry_after() + { + if delay > 60 { + return res.context("Delay too big (>60), not waiting."); + } + Timer::after(Duration::from_secs(delay)).await; + } else { + return res.context("Can't update message"); + } + }, + }; + } + } +}