Index: .github/workflows/rust-clippy.yml ================================================================== --- .github/workflows/rust-clippy.yml +++ .github/workflows/rust-clippy.yml @@ -1,29 +1,22 @@ -name: rust-clippy analyze - +name: rust-ci on: push -# Make sure CI fails on all warnings, including Clippy lints -env: - RUSTFLAGS: "-Dwarnings" - jobs: - rust-clippy-test: - name: Run rust-clippy analyzing + rust-ci-run: + name: Run rust-clippy analyzing and tests runs-on: ubuntu-latest permissions: contents: read - security-events: write steps: - - name: Checkout code - uses: actions/checkout@v6 - - - name: Install Rust toolchain - uses: dtolnay/rust-toolchain@stable - + # SETUP + - uses: actions/checkout@v6 + - uses: dtolnay/rust-toolchain@stable - uses: Swatinem/rust-cache@v2 + # TESTS - name: Run tests - run: cargo test --all-targets --all-features + run: cargo test --all-targets --all-features --release + # CLIPPY - name: Run rust-clippy - run: cargo clippy --all-targets --all-features + run: cargo clippy --all-targets --all-features -- -D warnings Index: Cargo.lock ================================================================== --- Cargo.lock +++ Cargo.lock @@ -353,13 +353,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "cc" -version = "1.2.51" +version = "1.2.52" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a0aeaff4ff1a90589618835a598e545176939b97874f7abc7851caa0618f203" +checksum = "cd4932aefd12402b36c60956a4fe0035421f544799057659ff86f923657aada3" dependencies = [ "find-msvc-tools", "jobserver", "libc", "shlex", @@ -793,13 +793,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "find-msvc-tools" -version = "0.1.6" +version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645cbb3a84e60b7531617d5ae4e57f7e27308f6445f5abf653209ea76dec8dff" +checksum = "f449e6c6c08c865631d4890cfacf252b3d396c9bcc83adb6623cdb02a8336c41" [[package]] name = "flate2" version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1322,13 +1322,13 @@ "icu_properties", ] [[package]] name = "indexmap" -version = "2.12.1" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" +checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", "hashbrown 0.16.1", ] @@ -1405,13 +1405,13 @@ "spin", ] [[package]] name = "libc" -version = "0.2.179" +version = "0.2.180" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c5a2d376baa530d1238d133232d15e239abad80d05838b4b59354e5268af431f" +checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" [[package]] name = "libm" version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2025,14 +2025,16 @@ "lazy_static", "regex", "reqwest", "rss", "sedregex", + "serde", "smol", "sqlx", "stacked_errors", "tgbot", + "toml", "url", ] [[package]] name = "rustc-hash" @@ -2857,18 +2859,20 @@ "tokio", ] [[package]] name = "toml" -version = "0.9.10+spec-1.1.0" +version = "0.9.11+spec-1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0825052159284a1a8b4d6c0c86cbc801f2da5afd2b225fa548c72f2e74002f48" +checksum = "f3afc9a848309fe1aaffaed6e1546a7a14de1f935dc9d89d32afd9a44bab7c46" dependencies = [ + "indexmap", "serde_core", "serde_spanned", "toml_datetime", "toml_parser", + "toml_writer", "winnow", ] [[package]] name = "toml_datetime" @@ -2886,10 +2890,16 @@ checksum = "a3198b4b0a8e11f09dd03e133c0280504d0801269e9afa46362ffde1cbeebf44" dependencies = [ "winnow", ] +[[package]] +name = "toml_writer" +version = "1.0.6+spec-1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab16f14aed21ee8bfd8ec22513f7287cd4a91aa92e44edfe2c17ddd004e92607" + [[package]] name = "tower" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" @@ -3630,22 +3640,22 @@ "synstructure", ] [[package]] name = "zerocopy" -version = "0.8.32" +version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fabae64378cb18147bb18bca364e63bdbe72a0ffe4adf0addfec8aa166b2c56" +checksum = "668f5168d10b9ee831de31933dc111a459c97ec93225beb307aed970d1372dfd" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.32" +version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c9c2d862265a8bb4471d87e033e730f536e2a285cc7cb05dbce09a2a97075f90" +checksum = "2c7962b26b0a8685668b671ee4b54d007a67d4eaf05fda79ac0ecf41e32270f1" dependencies = [ "proc-macro2", "quote", "syn", ] Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,10 +1,12 @@ [package] name = "rsstg" version = "0.5.3" -authors = ["arcade"] -edition = "2021" +authors = [ "arcade@b1t.name" ] +edition = "2024" +license = "0BSD" +repository = "http://fs.b1t.name/rsstg" [dependencies] async-compat = "0.2.5" atom_syndication = { version = "0.12.4", features = [ "with-serde" ] } chrono = "0.4.38" @@ -15,13 +17,15 @@ lazy_static = "1.5.0" regex = "1.10.6" reqwest = { version = "0.13.1", features = [ "brotli", "socks", "deflate" ]} rss = "2.0.9" sedregex = "0.2.5" +serde = "1.0.228" smol = "2.0.2" stacked_errors = "0.7.1" sqlx = { version = "0.8", features = [ "postgres", "runtime-tokio-rustls", "chrono", "macros" ], default-features = false } +toml = "0.9.11+spec-1.1.0" url = "2.5.8" [profile.release] lto = true codegen-units = 1 Index: LICENSE.0BSD ================================================================== --- LICENSE.0BSD +++ LICENSE.0BSD @@ -1,6 +1,6 @@ -Copyright (C) 2020-2023 by Volodymyr Kostyrko +Copyright (C) 2020-2026 by Volodymyr Kostyrko Permission to use, copy, modify, and/or distribute this software for any purpose with or without fee is hereby granted. THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH Index: src/command.rs ================================================================== --- src/command.rs +++ src/command.rs @@ -21,24 +21,41 @@ lazy_static! { static ref RE_USERNAME: Regex = Regex::new(r"^@([a-zA-Z][a-zA-Z0-9_]+)$").unwrap(); static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap(); } +/// Sends an informational message to the message's chat linking to the bot help channel. pub async fn start (core: &Core, msg: &Message) -> Result<()> { - core.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.", + core.tg.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.", Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?; Ok(()) } +/// Send the sender's subscription list to the chat. +/// +/// Retrieves the message sender's user ID, obtains their subscription list from `core`, +/// and sends the resulting reply into the message chat using MarkdownV2. pub async fn list (core: &Core, msg: &Message) -> Result<()> { let sender = msg.sender.get_user_id() .stack_err("Ignoring unreal users.")?; let reply = core.list(sender).await.stack()?; - core.send(reply, Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?; + core.tg.send(reply, Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?; Ok(()) } +/// Handle channel-management commands that operate on a single numeric source ID. +/// +/// This validates that exactly one numeric argument is provided, performs the requested +/// operation (check, clean, enable, delete, disable) against the database or core, +/// and sends the resulting reply to the chat. +/// +/// # Parameters +/// +/// - `core`: application core containing database and Telegram clients. +/// - `command`: command string (e.g. "/check", "/clean", "/enable", "/delete", "/disable"). +/// - `msg`: incoming Telegram message that triggered the command; used to determine sender and chat. +/// - `words`: command arguments; expected to contain exactly one element that parses as a 32-bit integer. pub async fn command (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> { let mut conn = core.db.begin().await.stack()?; let sender = msg.sender.get_user_id() .stack_err("Ignoring unreal users.")?; let reply = if words.len() == 1 { @@ -53,16 +70,27 @@ "/disable" => conn.disable(number, sender).await.stack()?.into(), _ => bail!("Command {command} {words:?} not handled."), }, } } else { - "This command needs exacly one number.".into() + "This command needs exactly one number.".into() }; - core.send(reply, Some(msg.chat.get_id()), None).await.stack()?; + core.tg.send(reply, Some(msg.chat.get_id()), None).await.stack()?; Ok(()) } +/// Validate command arguments, check permissions and update or add a channel feed configuration in the database. +/// +/// This function parses and validates parameters supplied by a user command (either "/update ..." or "/add ..."), +/// verifies the channel username and feed URL, optionally validates an IV hash and a replacement regexp, +/// ensures both the bot and the command sender are administrators of the target channel, and performs the database update. +/// +/// # Parameters +/// +/// - `command` — the invoked command, expected to be either `"/update"` (followed by a numeric source id) or `"/add"`. +/// - `msg` — the incoming Telegram message; used to derive the command sender and target chat id for the reply. +/// - `words` — the command arguments: for `"/add"` expected `channel url [iv_hash|'-'] [url_re|'-']`; for `"/update"` the first element must be a numeric `source_id` followed by the same parameters. pub async fn update (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> { let sender = msg.sender.get_user_id() .stack_err("Ignoring unreal users.")?; let mut source_id: Option = None; let at_least = "Requires at least 3 parameters."; @@ -130,12 +158,12 @@ } }, None => None, }; let chat_id = ChatUsername::from(channel.as_ref()); - let channel_id = core.tg.execute(GetChat::new(chat_id.clone())).await.stack_err("gettting GetChat")?.id; - let chan_adm = core.tg.execute(GetChatAdministrators::new(chat_id)).await + let channel_id = core.tg.client.execute(GetChat::new(chat_id.clone())).await.stack_err("gettting GetChat")?.id; + let chan_adm = core.tg.client.execute(GetChatAdministrators::new(chat_id)).await .context("Sorry, I have no access to that chat.")?; let (mut me, mut user) = (false, false); for admin in chan_adm { let member_id = match admin { ChatMember::Creator(member) => member.user.id, @@ -143,18 +171,19 @@ ChatMember::Left(_) | ChatMember::Kicked(_) | ChatMember::Member{..} | ChatMember::Restricted(_) => continue, }; - if member_id == core.me.id { + if member_id == core.tg.me.id { me = true; } if member_id == sender { user = true; } }; if ! me { bail!("I need to be admin on that channel."); }; if ! user { bail!("You should be admin on that channel."); }; let mut conn = core.db.begin().await.stack()?; - core.send(conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await.stack()?, Some(msg.chat.get_id()), None).await.stack()?; + let update = conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await.stack()?; + core.tg.send(update, Some(msg.chat.get_id()), None).await.stack()?; Ok(()) } Index: src/core.rs ================================================================== --- src/core.rs +++ src/core.rs @@ -1,8 +1,9 @@ use crate::{ command, sql::Db, + tg_bot::Tg, }; use std::{ borrow::Cow, collections::{ @@ -17,30 +18,21 @@ DateTime, Local, }; use lazy_static::lazy_static; use regex::Regex; -use reqwest::header::{ - CACHE_CONTROL, - EXPIRES, - LAST_MODIFIED -}; +use reqwest::header::LAST_MODIFIED; use smol::{ Timer, lock::Mutex, }; use tgbot::{ - api::Client, handler::UpdateHandler, types::{ - Bot, ChatPeerId, Command, - GetBot, - Message, ParseMode, - SendMessage, Update, UpdateType, UserPeerId, }, }; @@ -114,67 +106,56 @@ } } #[derive(Clone)] pub struct Core { - owner_chat: ChatPeerId, - // max_delay: u16, - pub tg: Client, - pub me: Bot, + pub tg: Tg, pub db: Db, running: Arc>>, http_client: reqwest::Client, } pub struct Post { uri: String, - title: String, - authors: String, - summary: String, + _title: String, + _authors: String, + _summary: String, } impl Core { /// Create a Core instance from configuration and start its background autofetch loop. /// /// The provided `settings` must include: - /// - `owner` (integer): chat id to use as the default destination, + /// - `owner` (integer): default chat id to use as the owner/destination, /// - `api_key` (string): Telegram bot API key, /// - `api_gateway` (string): Telegram API gateway host, /// - `pg` (string): PostgreSQL connection string, /// - optional `proxy` (string): proxy URL for the HTTP client. /// /// On success returns an initialized `Core` with Telegram and HTTP clients, database connection, /// an empty running set for per-id tokens, and a spawned background task that periodically runs /// `autofetch`. If any required setting is missing or initialization fails, an error is returned. pub async fn new(settings: config::Config) -> Result { - let owner_chat = ChatPeerId::from(settings.get_int("owner").stack()?); - let api_key = settings.get_string("api_key").stack()?; - let tg = Client::new(&api_key).stack()? - .with_host(settings.get_string("api_gateway").stack()?); - let mut client = reqwest::Client::builder(); if let Ok(proxy) = settings.get_string("proxy") { let proxy = reqwest::Proxy::all(proxy).stack()?; client = client.proxy(proxy); } - let http_client = client.build().stack()?; - let me = tg.execute(GetBot).await.stack()?; + let core = Core { - tg, - me, - owner_chat, + tg: Tg::new(&settings).await.stack()?, db: Db::new(&settings.get_string("pg").stack()?)?, running: Arc::new(Mutex::new(HashSet::new())), - http_client, - // max_delay: 60, + http_client: client.build().stack()?, }; + let clone = core.clone(); smol::spawn(Compat::new(async move { loop { let delay = match &clone.autofetch().await { Err(err) => { - if let Err(err) = clone.send(format!("🛑 {err}"), None, None).await { + if let Err(err) = clone.tg.send(format!("🛑 {err}"), None, None).await { eprintln!("Autofetch error: {err:?}"); }; std::time::Duration::from_secs(60) }, Ok(time) => *time, @@ -183,22 +164,10 @@ } })).detach(); Ok(core) } - pub async fn send (&self, msg: S, target: Option, mode: Option) -> Result - where S: Into { - let msg = msg.into(); - - let mode = mode.unwrap_or(ParseMode::Html); - let target = target.unwrap_or(self.owner_chat); - self.tg.execute( - SendMessage::new(target, msg) - .with_parse_mode(mode) - ).await.stack() - } - /// Fetches the feed for a source, sends any newly discovered posts to the appropriate chat, and records them in the database. /// /// This acquires a per-source guard to prevent concurrent checks for the same `id`. If a check is already running for /// the given `id`, the function returns an error. If `last_scrape` is provided, it is sent as the `If-Modified-Since` /// header to the feed request. The function parses RSS or Atom feeds, sends unseen post URLs to either the source's @@ -216,11 +185,11 @@ pub async fn check (&self, id: i32, real: bool, last_scrape: Option>) -> Result { let mut posted: i32 = 0; let mut conn = self.db.begin().await.stack()?; let _token = Token::new(&self.running, id).await.stack()?; - let source = conn.get_source(id, self.owner_chat).await.stack()?; + let source = conn.get_source(id, self.tg.owner).await.stack()?; conn.set_scrape(id).await.stack()?; let destination = ChatPeerId::from(match real { true => source.channel_id, false => source.owner, }); @@ -232,10 +201,14 @@ builder = builder.header(LAST_MODIFIED, last_scrape.to_rfc2822()); }; let response = builder.send().await.stack()?; #[cfg(debug_assertions)] { + use reqwest::header::{ + CACHE_CONTROL, + EXPIRES, + }; let headers = response.headers(); let expires = headers.get(EXPIRES); let cache = headers.get(CACHE_CONTROL); if expires.is_some() || cache.is_some() { println!("{} {} {:?} {:?} {:?}", Local::now().to_rfc2822(), &source.url, last_scrape, expires, cache); @@ -259,19 +232,15 @@ } }, None => bail!("Feed item misses posting date."), }), }.stack()?; - let uri = link.to_string(); - let title = item.title().unwrap_or("").to_string(); - let authors = item.author().unwrap_or("").to_string(); - let summary = item.content().unwrap_or("").to_string(); posts.insert(date, Post{ - uri, - title, - authors, - summary, + uri: link.to_string(), + _title: item.title().unwrap_or("").to_string(), + _authors: item.author().unwrap_or("").to_string(), + _summary: item.content().unwrap_or("").to_string(), }); } }; }, Err(err) => match err { @@ -287,18 +256,17 @@ bail!("Feed item missing post links."); } else { links[0].href().to_string() } }; - let title = item.title().to_string(); - let authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::>().join(", "); - let summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() }; + let _authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::>().join(", "); + let _summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() }; posts.insert(*date, Post{ uri, - title, - authors, - summary, + _title: item.title().to_string(), + _authors, + _summary, }); }; }, Err(err) => { bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url) @@ -316,11 +284,11 @@ }; if ! conn.exists(&post_url, id).await.stack()? { if this_fetch.is_none() || *date > this_fetch.unwrap() { this_fetch = Some(*date); }; - self.send( match &source.iv_hash { + self.tg.send( match &source.iv_hash { Some(hash) => format!(" {post_url}"), None => format!("{post_url}"), }, Some(destination), Some(ParseMode::Html)).await.stack()?; conn.add_post(id, date, &post_url).await.stack()?; posted += 1; @@ -328,10 +296,15 @@ }; posts.clear(); Ok(format!("Posted: {posted}")) } + /// Determine the delay until the next scheduled fetch and spawn background checks for any overdue sources. + /// + /// This scans the database queue, spawns background tasks to run checks for sources whose `next_fetch` + /// is in the past (each task uses a Core clone with the appropriate owner), and computes the shortest + /// duration until the next `next_fetch`. async fn autofetch(&self) -> Result { let mut delay = chrono::Duration::minutes(1); let now = chrono::Local::now(); let queue = { let mut conn = self.db.begin().await.stack()?; @@ -340,11 +313,11 @@ for row in queue { if let Some(next_fetch) = row.next_fetch { if next_fetch < now { if let (Some(owner), Some(source_id), last_scrape) = (row.owner, row.source_id, row.last_scrape) { let clone = Core { - owner_chat: ChatPeerId::from(owner), + tg: self.tg.with_owner(owner), ..self.clone() }; let source = { let mut conn = self.db.begin().await.stack()?; match conn.get_one(owner, source_id).await { @@ -352,15 +325,14 @@ Ok(None) => "Source not found in database?".to_string(), Err(err) => format!("Failed to fetch source data:\n{err}"), } }; smol::spawn(Compat::new(async move { - if let Err(err) = clone.check(source_id, true, Some(last_scrape)).await { - if let Err(err) = clone.send(&format!("🛑 {source}\n{}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await { - eprintln!("Check error: {err}"); - // clone.disable(&source_id, owner).await.unwrap(); - }; + if let Err(err) = clone.check(source_id, true, Some(last_scrape)).await + && let Err(err) = clone.tg.send(&format!("🛑 {source}\n{}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await + { + eprintln!("Check error: {err}"); }; })).detach(); } } else if next_fetch - now < delay { delay = next_fetch - now; @@ -380,30 +352,36 @@ Ok(reply.join("\n\n")) } } impl UpdateHandler for Core { - async fn handle (&self, update: Update) { - if let UpdateType::Message(msg) = update.update_type { - if let Ok(cmd) = Command::try_from(msg) { - let msg = cmd.get_message(); - let words = cmd.get_args(); - let command = cmd.get_name(); - let res = match command { - "/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, command, msg, words).await, - "/start" => command::start(self, msg).await, - "/list" => command::list(self, msg).await, - "/add" | "/update" => command::update(self, command, msg, words).await, - any => Err(anyhow!("Unknown command: {any}")), - }; - if let Err(err) = res { - if let Err(err2) = self.send(format!("\\#error\n```\n{err}\n```"), - Some(msg.chat.get_id()), - Some(ParseMode::MarkdownV2) - ).await{ - dbg!(err2); - }; - } - }; - }; + /// Dispatches an incoming Telegram update to a matching command handler and reports handler errors to the originating chat. + /// + /// This method inspects the update; if it contains a message that can be parsed as a bot command, + /// it executes the corresponding command handler. If the handler returns an error, the error text + /// is sent back to the message's chat using MarkdownV2 formatting. Unknown commands produce an error + /// which is also reported to the chat. + async fn handle (&self, update: Update) { + if let UpdateType::Message(msg) = update.update_type + && let Ok(cmd) = Command::try_from(msg) + { + let msg = cmd.get_message(); + let words = cmd.get_args(); + let command = cmd.get_name(); + let res = match command { + "/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, command, msg, words).await, + "/start" => command::start(self, msg).await, + "/list" => command::list(self, msg).await, + "/add" | "/update" => command::update(self, command, msg, words).await, + any => Err(anyhow!("Unknown command: {any}")), + }; + if let Err(err) = res + && let Err(err2) = self.tg.send(format!("\\#error\n```\n{err}\n```"), + Some(msg.chat.get_id()), + Some(ParseMode::MarkdownV2) + ).await + { + dbg!(err2); + } + } // TODO: debug log for skipped updates?; } } Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -4,10 +4,11 @@ #![warn(missing_docs)] mod command; mod core; mod sql; +mod tg_bot; use async_compat::Compat; use stacked_errors::{ Result, StackableErr, @@ -20,18 +21,22 @@ })); Ok(()) } +/// Initialises configuration and the bot core, then runs the Telegram long-poll loop. +/// +/// This function loads configuration (with a default API gateway), constructs the application +/// core, and starts the long-polling loop that handles incoming Telegram updates. async fn async_main () -> Result<()> { let settings = config::Config::builder() .set_default("api_gateway", "https://api.telegram.org").stack()? .add_source(config::File::with_name("rsstg")) .build() .stack()?; let core = core::Core::new(settings).await.stack()?; - LongPoll::new(core.tg.clone(), core).run().await; + LongPoll::new(core.tg.client.clone(), core).run().await; Ok(()) } Index: src/sql.rs ================================================================== --- src/sql.rs +++ src/sql.rs @@ -148,21 +148,26 @@ 0 => { Ok("Source not found.") }, _ => { bail!("Database error.") }, } } + /// Checks whether a post with the given URL exists for the specified source. + /// + /// # Parameters + /// - `post_url`: The URL of the post to check. + /// - `id`: The source identifier (converted to `i64`). + /// + /// # Returns + /// `true` if a post with the URL exists for the source, `false` otherwise. pub async fn exists (&mut self, post_url: &str, id: I) -> Result where I: Into { let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") .bind(post_url) .bind(id.into()) .fetch_one(&mut *self.0).await.stack()?; - if let Some(exists) = row.try_get("exists").stack()? { - Ok(exists) - } else { - bail!("Database error: can't check whether source exists."); - } + row.try_get("exists") + .stack_err("Database error: can't check whether post exists.") } /// Get all pending events for (now + 1 minute) pub async fn get_queue (&mut self) -> Result> { let block: Vec = sqlx::query_as("select source_id, next_fetch, owner, last_scrape from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") ADDED src/tg_bot.rs Index: src/tg_bot.rs ================================================================== --- /dev/null +++ src/tg_bot.rs @@ -0,0 +1,99 @@ +use serde::{ + Deserialize, + Serialize, +}; +use stacked_errors::{ + Result, + StackableErr, +}; +use tgbot::{ + api::Client, + types::{ + Bot, + ChatPeerId, + GetBot, + InlineKeyboardButton, + InlineKeyboardMarkup, + Message, + ParseMode, + SendMessage, + }, +}; + +#[derive(Serialize, Deserialize, Debug)] +enum Callback { + // List all feeds (version, name to show) + List(u8, String), +} + +fn get_kb (cb: &Callback) -> Result { + let mark = InlineKeyboardMarkup::from(vec![vec![ + InlineKeyboardButton::for_callback_data("1", + toml::to_string(&Callback::List(0,"xxx".to_owned())).stack()?), + ]]); + Ok(mark) +} + +#[derive(Clone)] +pub struct Tg { + pub me: Bot, + pub owner: ChatPeerId, + pub client: Client, +} + +impl Tg { + /// Construct a new `Tg` instance from configuration. + /// + /// The `settings` must provide the following keys: + /// - `"api_key"` (string), + /// - `"owner"` (integer chat id), + /// - `"api_gateway"` (string). + /// + /// The function initialises the client, configures the gateway and fetches the bot identity + /// before returning the constructed `Tg`. + pub async fn new (settings: &config::Config) -> Result { + let api_key = settings.get_string("api_key").stack()?; + + let owner = ChatPeerId::from(settings.get_int("owner").stack()?); + let client = Client::new(&api_key).stack()? + .with_host(settings.get_string("api_gateway").stack()?) + .with_max_retries(0); + let me = client.execute(GetBot).await.stack()?; + Ok(Tg { + me, + owner, + client, + }) + } + + /// Send a text message to a chat, using an optional target and parse mode. + /// + /// # Returns + /// The sent `Message` on success. + pub async fn send (&self, msg: S, target: Option, mode: Option) -> Result + where S: Into { + let msg = msg.into(); + + let mode = mode.unwrap_or(ParseMode::Html); + let target = target.unwrap_or(self.owner); + self.client.execute( + SendMessage::new(target, msg) + .with_parse_mode(mode) + ).await.stack() + } + + /// Create a copy of this `Tg` with the owner replaced by the given chat ID. + /// + /// # Parameters + /// - `owner`: The Telegram chat identifier to set as the new owner (expressed as an `i64`). + /// + /// # Returns + /// A new `Tg` instance identical to the original except its `owner` field is set to the provided chat ID. + pub fn with_owner (&self, owner: O) -> Tg + where O: Into { + Tg { + owner: ChatPeerId::from(owner.into()), + ..self.clone() + } + } +}