ADDED .github/dependabot.yml Index: .github/dependabot.yml ================================================================== --- /dev/null +++ .github/dependabot.yml @@ -0,0 +1,10 @@ +# Set update schedule for GitHub Actions + +version: 2 +updates: + + - package-ecosystem: "github-actions" + directory: "/" + schedule: + # Check for updates to GitHub Actions every week + interval: "weekly" ADDED .github/workflows/rust-clippy.yml Index: .github/workflows/rust-clippy.yml ================================================================== --- /dev/null +++ .github/workflows/rust-clippy.yml @@ -0,0 +1,29 @@ +name: rust-clippy analyze + +on: push + +# Make sure CI fails on all warnings, including Clippy lints +env: + RUSTFLAGS: "-Dwarnings" + +jobs: + rust-clippy-test: + name: Run rust-clippy analyzing + runs-on: ubuntu-latest + permissions: + contents: read + security-events: write + steps: + - name: Checkout code + uses: actions/checkout@v6 + + - name: Install Rust toolchain + uses: dtolnay/rust-toolchain@stable + + - uses: Swatinem/rust-cache@v2 + + - name: Run tests + run: cargo test --all-targets --all-features + + - name: Run rust-clippy + run: cargo clippy --all-targets --all-features Index: Cargo.lock ================================================================== --- Cargo.lock +++ Cargo.lock @@ -277,13 +277,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "base64ct" -version = "1.8.1" +version = "1.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e050f626429857a27ddccb31e0aca21356bfa709c04041aefddac081a8f068a" +checksum = "7d809780667f4410e7c41b07f52439b94d2bdf8528eeedc287fa38d3b7f95d82" [[package]] name = "bitflags" version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -997,13 +997,13 @@ "wasm-bindgen", ] [[package]] name = "h2" -version = "0.4.12" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f3c0b69cfcb4e1b9f1bf2f53f95f766e4661169728ec61cd3fe5a0166f2d1386" +checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" dependencies = [ "atomic-waker", "bytes", "fnv", "futures-core", @@ -1160,11 +1160,10 @@ "rustls", "rustls-pki-types", "tokio", "tokio-rustls", "tower-service", - "webpki-roots 1.0.4", ] [[package]] name = "hyper-util" version = "0.1.19" @@ -1406,13 +1405,13 @@ "spin", ] [[package]] name = "libc" -version = "0.2.178" +version = "0.2.179" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37c93d8daa9d8a012fd8ab92f088405fb202ea0b6ab73ee2482ae66af4f42091" +checksum = "c5a2d376baa530d1238d133232d15e239abad80d05838b4b59354e5268af431f" [[package]] name = "libm" version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1727,13 +1726,13 @@ "zerocopy", ] [[package]] name = "proc-macro2" -version = "1.0.104" +version = "1.0.105" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9695f8df41bb4f3d222c95a67532365f569318332d03d5f3f67f37b20e6ebdf0" +checksum = "535d180e0ecab6268a3e718bb9fd44db66bbbc256257165fc699dadf70d16fe7" dependencies = [ "unicode-ident", ] [[package]] @@ -1802,13 +1801,13 @@ "windows-sys 0.60.2", ] [[package]] name = "quote" -version = "1.0.42" +version = "1.0.43" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a338cc41d27e6cc6dce6cefc13a0729dfbb81c262b1f519331575dd80ef3067f" +checksum = "dc74d9a594b72ae6656596548f56f667211f8a97b3d4c3d467150794690dc40a" dependencies = [ "proc-macro2", ] [[package]] @@ -1923,35 +1922,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7a2d987857b319362043e95f5353c0535c1f58eec5336fdfcf626430af7def58" [[package]] name = "reqwest" -version = "0.12.28" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +checksum = "04e9018c9d814e5f30cc16a0f03271aeab3571e609612d9fe78c1aa8d11c2f62" dependencies = [ "base64", "bytes", + "encoding_rs", "futures-core", "futures-util", + "h2", "http", "http-body", "http-body-util", "hyper", "hyper-rustls", "hyper-util", "js-sys", "log", + "mime", "mime_guess", "percent-encoding", "pin-project-lite", "quinn", "rustls", "rustls-pki-types", + "rustls-platform-verifier", "serde", "serde_json", - "serde_urlencoded", "sync_wrapper", "tokio", "tokio-rustls", "tokio-util", "tower", @@ -1960,49 +1962,10 @@ "url", "wasm-bindgen", "wasm-bindgen-futures", "wasm-streams", "web-sys", - "webpki-roots 1.0.4", -] - -[[package]] -name = "reqwest" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04e9018c9d814e5f30cc16a0f03271aeab3571e609612d9fe78c1aa8d11c2f62" -dependencies = [ - "base64", - "bytes", - "encoding_rs", - "futures-core", - "h2", - "http", - "http-body", - "http-body-util", - "hyper", - "hyper-rustls", - "hyper-util", - "js-sys", - "log", - "mime", - "percent-encoding", - "pin-project-lite", - "quinn", - "rustls", - "rustls-pki-types", - "rustls-platform-verifier", - "sync_wrapper", - "tokio", - "tokio-rustls", - "tower", - "tower-http", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", ] [[package]] name = "ring" version = "0.17.14" @@ -2017,13 +1980,13 @@ "windows-sys 0.52.0", ] [[package]] name = "rsa" -version = "0.9.9" +version = "0.9.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "40a0376c50d0358279d9d643e4bf7b7be212f1f4ff1da9070a7b54d22ef75c88" +checksum = "b8573f03f5883dcaebdfcf4725caa1ecb9c15b2ef50c43a07b816e06799bb12d" dependencies = [ "const-oid", "digest", "num-bigint-dig", "num-integer", @@ -2049,27 +2012,28 @@ "quick-xml", ] [[package]] name = "rsstg" -version = "0.5.2" +version = "0.5.3" dependencies = [ "async-compat", "atom_syndication", "chrono", "config", "futures", "futures-util", "lazy_static", "regex", - "reqwest 0.13.1", + "reqwest", "rss", "sedregex", "smol", "sqlx", "stacked_errors", "tgbot", + "url", ] [[package]] name = "rustc-hash" version = "2.1.1" @@ -2098,13 +2062,13 @@ "windows-sys 0.61.2", ] [[package]] name = "rustls" -version = "0.23.35" +version = "0.23.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f" +checksum = "c665f33d38cea657d9614f766881e4d510e0eda4239891eea56b4cadcf01801b" dependencies = [ "aws-lc-rs", "once_cell", "ring", "rustls-pki-types", @@ -2278,13 +2242,13 @@ "syn", ] [[package]] name = "serde_json" -version = "1.0.148" +version = "1.0.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3084b546a1dd6289475996f182a22aba973866ea8e8b02c51d9f46b1336a22da" +checksum = "83fc039473c5595ace860d8c4fafa220ff474b3fc6bfdb4293327f1a37e94d86" dependencies = [ "itoa", "memchr", "serde", "serde_core", @@ -2700,13 +2664,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.112" +version = "2.0.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "21f182278bf2d2bcb3c88b1b08a37df029d71ce3d3ae26168e3c653b213b99d4" +checksum = "d4d107df263a3013ef9b1879b0df87d706ff80f65a86ea879bd9c31f9b307c2a" dependencies = [ "proc-macro2", "quote", "unicode-ident", ] @@ -2752,22 +2716,22 @@ "libc", ] [[package]] name = "tgbot" -version = "0.40.0" +version = "0.41.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c07cc857c7553da139c88dc53bdacc7eb834d0a2685d789de4c96754dbe17b10" +checksum = "0212214ba5db8a369e2853900248792d2b19737dda88ecabbb2e31ef85a1d752" dependencies = [ "async-stream", "bytes", "derive_more", "futures-util", "log", "mime", "mime_guess", - "reqwest 0.12.28", + "reqwest", "serde", "serde_json", "serde_with", "shellwords", "tokio", @@ -2845,13 +2809,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20" [[package]] name = "tokio" -version = "1.48.0" +version = "1.49.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ff360e02eab121e0bc37a2d3b4d4dc622e6eda3a8e5253d5435ecf5bd4c68408" +checksum = "72a2903cd7736441aac9df9d7688bd0ce48edccaadf181c3b90be801e81d3d86" dependencies = [ "bytes", "libc", "mio", "pin-project-lite", @@ -2869,24 +2833,24 @@ "tokio", ] [[package]] name = "tokio-stream" -version = "0.1.17" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eca58d7bba4a75707817a2c44174253f9236b2d5fbd055602e9d5c07c139a047" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" dependencies = [ "futures-core", "pin-project-lite", "tokio", ] [[package]] name = "tokio-util" -version = "0.7.17" +version = "0.7.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" +checksum = "9ae9cec805b01e8fc3fd2fe289f89149a9b66dd16786abd8b19cfa7b48cb0098" dependencies = [ "bytes", "futures-core", "futures-sink", "pin-project-lite", @@ -3018,13 +2982,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" [[package]] name = "unicase" -version = "2.8.1" +version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75b844d17643ee918803943289730bec8aac480150456169e647ed0b576ba539" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" [[package]] name = "unicode-bidi" version = "0.3.18" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3057,13 +3021,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "url" -version = "2.5.7" +version = "2.5.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" +checksum = "ff67a8a4397373c3ef660812acab3268222035010ab8680ec4215f38ba3d0eed" dependencies = [ "form_urlencoded", "idna", "percent-encoding", "serde", @@ -3218,13 +3182,13 @@ "wasm-bindgen", ] [[package]] name = "webpki-root-certs" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee3e3b5f5e80bc89f30ce8d0343bf4e5f12341c51f3e26cbeecbc7c85443e85b" +checksum = "36a29fc0408b113f68cf32637857ab740edfafdf460c326cd2afaa2d84cc05dc" dependencies = [ "rustls-pki-types", ] [[package]] @@ -3231,18 +3195,18 @@ name = "webpki-roots" version = "0.26.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "521bc38abb08001b01866da9f51eb7c5d647a19260e00054a8c7fd5f9e57f7a9" dependencies = [ - "webpki-roots 1.0.4", + "webpki-roots 1.0.5", ] [[package]] name = "webpki-roots" -version = "1.0.4" +version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2878ef029c47c6e8cf779119f20fcf52bde7ad42a731b2a304bc221df17571e" +checksum = "12bed680863276c63889429bfd6cab3b99943659923822de1c8a39c49e4d722c" dependencies = [ "rustls-pki-types", ] [[package]] @@ -3666,22 +3630,22 @@ "synstructure", ] [[package]] name = "zerocopy" -version = "0.8.31" +version = "0.8.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd74ec98b9250adb3ca554bdde269adf631549f51d8a8f8f0a10b50f1cb298c3" +checksum = "1fabae64378cb18147bb18bca364e63bdbe72a0ffe4adf0addfec8aa166b2c56" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.31" +version = "0.8.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d8a8d209fdf45cf5138cbb5a506f6b52522a25afccc534d1475dad8e31105c6a" +checksum = "c9c2d862265a8bb4471d87e033e730f536e2a285cc7cb05dbce09a2a97075f90" dependencies = [ "proc-macro2", "quote", "syn", ] @@ -3746,8 +3710,8 @@ "syn", ] [[package]] name = "zmij" -version = "1.0.6" +version = "1.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aac060176f7020d62c3bcc1cdbcec619d54f48b07ad1963a3f80ce7a0c17755f" +checksum = "2fc5a66a20078bf1251bde995aa2fdcc4b800c70b5d92dd2c62abc5c60f679f8" Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,17 +1,17 @@ [package] name = "rsstg" -version = "0.5.2" +version = "0.5.3" authors = ["arcade"] edition = "2021" [dependencies] async-compat = "0.2.5" atom_syndication = { version = "0.12.4", features = [ "with-serde" ] } chrono = "0.4.38" config = { version = "0.15", default-features = false, features = [ "toml" ] } -tgbot = "0.40" +tgbot = "0.41" futures = "0.3.30" futures-util = "0.3.30" lazy_static = "1.5.0" regex = "1.10.6" reqwest = { version = "0.13.1", features = [ "brotli", "socks", "deflate" ]} @@ -18,9 +18,10 @@ rss = "2.0.9" sedregex = "0.2.5" smol = "2.0.2" stacked_errors = "0.7.1" sqlx = { version = "0.8", features = [ "postgres", "runtime-tokio-rustls", "chrono", "macros" ], default-features = false } +url = "2.5.8" [profile.release] lto = true codegen-units = 1 ADDED README.md Index: README.md ================================================================== --- /dev/null +++ README.md @@ -0,0 +1,12 @@ + +Project served me well for a long time to convert my feeds into readable and +searchable list of channels. Bot is public and can be used by everyone, not +only you. It does scrape scheduling taking into account feed activity -- +inactive feeds are polled once an hour, while active ones can be polled faster +based on amount of items posted around same time for last week. Supports +ATOM/RSS feeds, custom IV for links, link editing (to use shorter links if +website uses full links in RSS feeds). + +This is GitHub public mirror, original repo can be found at: + +http://fs.b1t.name/rsstg/ Index: rsstg.sql ================================================================== --- rsstg.sql +++ rsstg.sql @@ -20,11 +20,11 @@ source_id integer not null, posted timestamptz not null, url text not null, hour smallint not null generated always as (extract('hour' from posted at time zone 'utc')) stored, hxm smallint not null generated always as (hxm(posted)) stored, - FOREIGN KEY (source_id) REFERENCES rsstg_source(source_id) on delete cascade, + FOREIGN KEY (source_id) REFERENCES rsstg_source(source_id) on delete cascade ); create unique index rsstg_post__url on rsstg_post(url); create index rsstg_post__hour on rsstg_post(hour); create index rsstg_post__posted_hour on rsstg_post(posted,hour); create index rsstg_post__hxm on rsstg_post(hxm); Index: src/command.rs ================================================================== --- src/command.rs +++ src/command.rs @@ -14,14 +14,14 @@ GetChat, GetChatAdministrators, Message, ParseMode::MarkdownV2, }; +use url::Url; lazy_static! { static ref RE_USERNAME: Regex = Regex::new(r"^@([a-zA-Z][a-zA-Z0-9_]+)$").unwrap(); - static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.:;0-9/?=]+$").unwrap(); static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap(); } pub async fn start (core: &Core, msg: &Message) -> Result<()> { core.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.", @@ -93,12 +93,19 @@ }; */ if ! RE_USERNAME.is_match(channel) { bail!("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?\nNot {channel:?}"); }; - if ! RE_LINK.is_match(url) { - bail!("Link should be a link to atom/rss feed, something like \"https://domain/path\".\nNot {url:?}"); + { + let parsed_url = Url::parse(url) + .stack_err("Expecting a valid link to ATOM/RSS feed.")?; + match parsed_url.scheme() { + "http" | "https" => {}, + scheme => { + bail!("Unsupported URL scheme: {scheme}"); + }, + }; } let iv_hash = match iv_hash { Some(hash) => { match hash.as_ref() { "-" => None, Index: src/core.rs ================================================================== --- src/core.rs +++ src/core.rs @@ -53,23 +53,77 @@ lazy_static!{ pub static ref RE_SPECIAL: Regex = Regex::new(r"([\-_*\[\]()~`>#+|{}\.!])").unwrap(); } -/// Encodes special HTML entities to prevent them interfering with Telegram HTML +/// Escape characters that are special in Telegram MarkdownV2 by prefixing them with a backslash. +/// +/// This ensures the returned string can be used as MarkdownV2-formatted Telegram message content +/// without special characters being interpreted as MarkdownV2 markup. pub fn encode (text: &str) -> Cow<'_, str> { RE_SPECIAL.replace_all(text, "\\$1") } + +// This one does nothing except making sure only one token exists for each id +pub struct Token { + running: Arc>>, + my_id: i32, +} + +impl Token { + /// Attempts to acquire a per-id token by inserting `my_id` into the shared `running` set. + /// + /// If the id was not already present, the function inserts it and returns `Some(Token)`. + /// When the returned `Token` is dropped, the id will be removed from the `running` set, + /// allowing subsequent acquisitions for the same id. + /// + /// # Parameters + /// + /// - `running`: Shared set tracking active ids. + /// - `my_id`: Identifier to acquire a token for. + /// + /// # Returns + /// + /// `Ok(Token)` if the id was successfully acquired, `Error` if a token for the id is already active. + async fn new (running: &Arc>>, my_id: i32) -> Result { + let running = running.clone(); + let mut set = running.lock_arc().await; + if set.contains(&my_id) { + bail!("Token already taken"); + } else { + set.insert(my_id); + Ok(Token { + running, + my_id, + }) + } + } +} + +impl Drop for Token { + /// Releases this token's claim on the shared running-set when the token is dropped. + /// + /// The token's identifier is removed from the shared `running` set so that future + /// operations for the same id may proceed. + /// + /// TODO: is using block_on inside block_on safe? Currently tested and working fine. + fn drop (&mut self) { + smol::block_on(async { + let mut set = self.running.lock_arc().await; + set.remove(&self.my_id); + }) + } +} #[derive(Clone)] pub struct Core { owner_chat: ChatPeerId, // max_delay: u16, pub tg: Client, pub me: Bot, pub db: Db, - sources: Arc>>>, + running: Arc>>, http_client: reqwest::Client, } pub struct Post { uri: String, @@ -77,10 +131,22 @@ authors: String, summary: String, } impl Core { + /// Create a Core instance from configuration and start its background autofetch loop. + /// + /// The provided `settings` must include: + /// - `owner` (integer): chat id to use as the default destination, + /// - `api_key` (string): Telegram bot API key, + /// - `api_gateway` (string): Telegram API gateway host, + /// - `pg` (string): PostgreSQL connection string, + /// - optional `proxy` (string): proxy URL for the HTTP client. + /// + /// On success returns an initialized `Core` with Telegram and HTTP clients, database connection, + /// an empty running set for per-id tokens, and a spawned background task that periodically runs + /// `autofetch`. If any required setting is missing or initialization fails, an error is returned. pub async fn new(settings: config::Config) -> Result { let owner_chat = ChatPeerId::from(settings.get_int("owner").stack()?); let api_key = settings.get_string("api_key").stack()?; let tg = Client::new(&api_key).stack()? .with_host(settings.get_string("api_gateway").stack()?); @@ -95,11 +161,11 @@ let core = Core { tg, me, owner_chat, db: Db::new(&settings.get_string("pg").stack()?)?, - sources: Arc::new(Mutex::new(HashSet::new())), + running: Arc::new(Mutex::new(HashSet::new())), http_client, // max_delay: 60, }; let clone = core.clone(); smol::spawn(Compat::new(async move { @@ -129,120 +195,140 @@ SendMessage::new(target, msg) .with_parse_mode(mode) ).await.stack() } + /// Fetches the feed for a source, sends any newly discovered posts to the appropriate chat, and records them in the database. + /// + /// This acquires a per-source guard to prevent concurrent checks for the same `id`. If a check is already running for + /// the given `id`, the function returns an error. If `last_scrape` is provided, it is sent as the `If-Modified-Since` + /// header to the feed request. The function parses RSS or Atom feeds, sends unseen post URLs to either the source's + /// channel (when `real` is true) or the source owner (when `real` is false), and persists posted entries so they are + /// not reposted later. + /// + /// Parameters: + /// - `id`: Identifier of the source to check. + /// - `real`: When `true`, send posts to the source's channel; when `false`, send to the source owner. + /// - `last_scrape`: Optional timestamp used to set the `If-Modified-Since` header for the HTTP request. + /// + /// # Returns + /// + /// `Posted: N` where `N` is the number of posts processed and sent. pub async fn check (&self, id: i32, real: bool, last_scrape: Option>) -> Result { let mut posted: i32 = 0; let mut conn = self.db.begin().await.stack()?; - let id = { - let mut set = self.sources.lock_arc().await; - match set.get(&id) { - Some(id) => id.clone(), - None => { - let id = Arc::new(id); - set.insert(id.clone()); - id.clone() - }, - } - }; - let count = Arc::strong_count(&id); - if count == 2 { - let source = conn.get_source(*id, self.owner_chat).await.stack()?; - conn.set_scrape(*id).await.stack()?; - let destination = ChatPeerId::from(match real { - true => source.channel_id, - false => source.owner, - }); - let mut this_fetch: Option> = None; - let mut posts: BTreeMap, Post> = BTreeMap::new(); - - let mut builder = self.http_client.get(&source.url); - if let Some(last_scrape) = last_scrape { - builder = builder.header(LAST_MODIFIED, last_scrape.to_rfc2822()); - }; - let response = builder.send().await.stack()?; - { - let headers = response.headers(); - let expires = headers.get(EXPIRES); - let cache = headers.get(CACHE_CONTROL); - if expires.is_some() || cache.is_some() { - println!("{} {} {:?} {:?} {:?}", Local::now().to_rfc2822(), &source.url, last_scrape, expires, cache); - } - } - let status = response.status(); - let content = response.bytes().await.stack()?; - match rss::Channel::read_from(&content[..]) { - Ok(feed) => { - for item in feed.items() { - if let Some(link) = item.link() { - let date = match item.pub_date() { - Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), - None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), - }.stack()?; - let uri = link.to_string(); - let title = item.title().unwrap_or("").to_string(); - let authors = item.author().unwrap_or("").to_string(); - let summary = item.content().unwrap_or("").to_string(); - posts.insert(date, Post{ - uri, - title, - authors, - summary, - }); - } - }; - }, - Err(err) => match err { - rss::Error::InvalidStartTag => { - match atom_syndication::Feed::read_from(&content[..]) { - Ok(feed) => { - for item in feed.entries() { - let date = item.published().unwrap(); - let uri = item.links()[0].href().to_string(); - let title = item.title().to_string(); - let authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::>().join(", "); - let summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() }; - posts.insert(*date, Post{ - uri, - title, - authors, - summary, - }); - }; - }, - Err(err) => { - bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url) - }, - } - }, - rss::Error::Eof => (), - _ => bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url) - } - }; - for (date, post) in posts.iter() { - let post_url: Cow = match source.url_re { - Some(ref x) => sedregex::ReplaceCommand::new(x).stack()?.execute(&post.uri), - None => post.uri.clone().into(), - }; - if let Some(exists) = conn.exists(&post_url, *id).await.stack()? { - if ! exists { - if this_fetch.is_none() || *date > this_fetch.unwrap() { - this_fetch = Some(*date); - }; - self.send( match &source.iv_hash { - Some(hash) => format!(" {post_url}"), - None => format!("{post_url}"), - }, Some(destination), Some(ParseMode::Html)).await.stack()?; - conn.add_post(*id, date, &post_url).await.stack()?; - }; - }; + let _token = Token::new(&self.running, id).await.stack()?; + let source = conn.get_source(id, self.owner_chat).await.stack()?; + conn.set_scrape(id).await.stack()?; + let destination = ChatPeerId::from(match real { + true => source.channel_id, + false => source.owner, + }); + let mut this_fetch: Option> = None; + let mut posts: BTreeMap, Post> = BTreeMap::new(); + + let mut builder = self.http_client.get(&source.url); + if let Some(last_scrape) = last_scrape { + builder = builder.header(LAST_MODIFIED, last_scrape.to_rfc2822()); + }; + let response = builder.send().await.stack()?; + #[cfg(debug_assertions)] + { + let headers = response.headers(); + let expires = headers.get(EXPIRES); + let cache = headers.get(CACHE_CONTROL); + if expires.is_some() || cache.is_some() { + println!("{} {} {:?} {:?} {:?}", Local::now().to_rfc2822(), &source.url, last_scrape, expires, cache); + } + } + let status = response.status(); + let content = response.bytes().await.stack()?; + match rss::Channel::read_from(&content[..]) { + Ok(feed) => { + for item in feed.items() { + if let Some(link) = item.link() { + let date = match item.pub_date() { + Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), + None => DateTime::parse_from_rfc3339(match item.dublin_core_ext() { + Some(ext) => { + let dates = ext.dates(); + if dates.is_empty() { + bail!("Feed item has Dublin Core extension but no dates.") + } else { + &dates[0] + } + }, + None => bail!("Feed item misses posting date."), + }), + }.stack()?; + let uri = link.to_string(); + let title = item.title().unwrap_or("").to_string(); + let authors = item.author().unwrap_or("").to_string(); + let summary = item.content().unwrap_or("").to_string(); + posts.insert(date, Post{ + uri, + title, + authors, + summary, + }); + } + }; + }, + Err(err) => match err { + rss::Error::InvalidStartTag => { + match atom_syndication::Feed::read_from(&content[..]) { + Ok(feed) => { + for item in feed.entries() { + let date = item.published() + .stack_err("Feed item missing publishing date.")?; + let uri = { + let links = item.links(); + if links.is_empty() { + bail!("Feed item missing post links."); + } else { + links[0].href().to_string() + } + }; + let title = item.title().to_string(); + let authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::>().join(", "); + let summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() }; + posts.insert(*date, Post{ + uri, + title, + authors, + summary, + }); + }; + }, + Err(err) => { + bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url) + }, + } + }, + rss::Error::Eof => (), + _ => bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url) + } + }; + for (date, post) in posts.iter() { + let post_url: Cow = match source.url_re { + Some(ref x) => sedregex::ReplaceCommand::new(x).stack()?.execute(&post.uri), + None => post.uri.clone().into(), + }; + if ! conn.exists(&post_url, id).await.stack()? { + if this_fetch.is_none() || *date > this_fetch.unwrap() { + this_fetch = Some(*date); + }; + self.send( match &source.iv_hash { + Some(hash) => format!(" {post_url}"), + None => format!("{post_url}"), + }, Some(destination), Some(ParseMode::Html)).await.stack()?; + conn.add_post(id, date, &post_url).await.stack()?; posted += 1; }; - posts.clear(); }; + posts.clear(); Ok(format!("Posted: {posted}")) } async fn autofetch(&self) -> Result { let mut delay = chrono::Duration::minutes(1); @@ -261,17 +347,17 @@ }; let source = { let mut conn = self.db.begin().await.stack()?; match conn.get_one(owner, source_id).await { Ok(Some(source)) => source.to_string(), - Ok(None) => "Source not found in database.stack()?".to_string(), + Ok(None) => "Source not found in database?".to_string(), Err(err) => format!("Failed to fetch source data:\n{err}"), } }; smol::spawn(Compat::new(async move { if let Err(err) = clone.check(source_id, true, Some(last_scrape)).await { - if let Err(err) = clone.send(&format!("{source}\n\nšŸ›‘ {}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await { + if let Err(err) = clone.send(&format!("šŸ›‘ {source}\n{}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await { eprintln!("Check error: {err}"); // clone.disable(&source_id, owner).await.unwrap(); }; }; })).detach(); Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -13,11 +13,11 @@ StackableErr, }; use tgbot::handler::LongPoll; fn main () -> Result<()> { - smol::future::block_on(Compat::new(async { + smol::block_on(Compat::new(async { async_main().await.unwrap(); })); Ok(()) } Index: src/sql.rs ================================================================== --- src/sql.rs +++ src/sql.rs @@ -148,20 +148,24 @@ 0 => { Ok("Source not found.") }, _ => { bail!("Database error.") }, } } - pub async fn exists (&mut self, post_url: &str, id: I) -> Result> + pub async fn exists (&mut self, post_url: &str, id: I) -> Result where I: Into { let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") .bind(post_url) .bind(id.into()) .fetch_one(&mut *self.0).await.stack()?; - let exists: Option = row.try_get("exists").stack()?; - Ok(exists) + if let Some(exists) = row.try_get("exists").stack()? { + Ok(exists) + } else { + bail!("Database error: can't check whether source exists."); + } } + /// Get all pending events for (now + 1 minute) pub async fn get_queue (&mut self) -> Result> { let block: Vec = sqlx::query_as("select source_id, next_fetch, owner, last_scrape from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") .fetch_all(&mut *self.0).await.stack()?; Ok(block) }