Overview
| Comment: | move some Telegram code to separate module |
|---|---|
| Downloads: | Tarball | ZIP archive | SQL archive |
| Timelines: | family | ancestors | descendants | both | trunk |
| Files: | files | file ages | folders |
| SHA3-256: |
9c4f09193a56f530d70a707d1e3dd330 |
| User & Date: | arcade on 2026-01-09 10:00:04.399 |
| Other Links: | manifest | tags |
Context
|
2026-01-09
| ||
| 10:41 | refactor/add comments (by CodeRabbit) check-in: fabcca1eaf user: arcade tags: trunk | |
| 10:00 | move some Telegram code to separate module check-in: 9c4f09193a user: arcade tags: trunk | |
| 09:59 | fix wrong comment check-in: 8f4dcf5e9d user: arcade tags: trunk | |
Changes
Modified Cargo.lock
from [beed9b6e19]
to [cfecf5f3d2].
| ︙ | ︙ | |||
351 352 353 354 355 356 357 | name = "bytes" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "cc" | | | | 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 | name = "bytes" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" [[package]] name = "cc" version = "1.2.52" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd4932aefd12402b36c60956a4fe0035421f544799057659ff86f923657aada3" dependencies = [ "find-msvc-tools", "jobserver", "libc", "shlex", ] |
| ︙ | ︙ | |||
791 792 793 794 795 796 797 | name = "fastrand" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "find-msvc-tools" | | | | 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 | name = "fastrand" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] name = "find-msvc-tools" version = "0.1.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f449e6c6c08c865631d4890cfacf252b3d396c9bcc83adb6623cdb02a8336c41" [[package]] name = "flate2" version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfe33edd8e85a12a67454e37f8c75e730830d83e313556ab9ebf9ee7fbeb3bfb" dependencies = [ |
| ︙ | ︙ | |||
1320 1321 1322 1323 1324 1325 1326 | dependencies = [ "icu_normalizer", "icu_properties", ] [[package]] name = "indexmap" | | | | 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 | dependencies = [ "icu_normalizer", "icu_properties", ] [[package]] name = "indexmap" version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7714e70437a7dc3ac8eb7e6f8df75fd8eb422675fc7678aff7364301092b1017" dependencies = [ "equivalent", "hashbrown 0.16.1", ] [[package]] name = "ipnet" |
| ︙ | ︙ | |||
1403 1404 1405 1406 1407 1408 1409 | checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" dependencies = [ "spin", ] [[package]] name = "libc" | | | | 1403 1404 1405 1406 1407 1408 1409 1410 1411 1412 1413 1414 1415 1416 1417 1418 1419 | checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" dependencies = [ "spin", ] [[package]] name = "libc" version = "0.2.180" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bcc35a38544a891a5f7c865aca548a982ccb3b8650a5b06d0fd33a10283c56fc" [[package]] name = "libm" version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f9fbbcab51052fe104eb5e5d351cf728d30a5be1fe14d9be8a3b097481fb97de" |
| ︙ | ︙ | |||
3628 3629 3630 3631 3632 3633 3634 | "quote", "syn", "synstructure", ] [[package]] name = "zerocopy" | | | | | | 3628 3629 3630 3631 3632 3633 3634 3635 3636 3637 3638 3639 3640 3641 3642 3643 3644 3645 3646 3647 3648 3649 3650 3651 3652 3653 | "quote", "syn", "synstructure", ] [[package]] name = "zerocopy" version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "668f5168d10b9ee831de31933dc111a459c97ec93225beb307aed970d1372dfd" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" version = "0.8.33" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2c7962b26b0a8685668b671ee4b54d007a67d4eaf05fda79ac0ecf41e32270f1" dependencies = [ "proc-macro2", "quote", "syn", ] [[package]] |
| ︙ | ︙ |
Modified src/command.rs
from [f2ddddf16c]
to [f66e5b24f4].
| ︙ | ︙ | |||
20 21 22 23 24 25 26 |
lazy_static! {
static ref RE_USERNAME: Regex = Regex::new(r"^@([a-zA-Z][a-zA-Z0-9_]+)$").unwrap();
static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap();
}
pub async fn start (core: &Core, msg: &Message) -> Result<()> {
| | | | 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
lazy_static! {
static ref RE_USERNAME: Regex = Regex::new(r"^@([a-zA-Z][a-zA-Z0-9_]+)$").unwrap();
static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap();
}
pub async fn start (core: &Core, msg: &Message) -> Result<()> {
core.tg.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.",
Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?;
Ok(())
}
pub async fn list (core: &Core, msg: &Message) -> Result<()> {
let sender = msg.sender.get_user_id()
.stack_err("Ignoring unreal users.")?;
let reply = core.list(sender).await.stack()?;
core.tg.send(reply, Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?;
Ok(())
}
pub async fn command (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> {
let mut conn = core.db.begin().await.stack()?;
let sender = msg.sender.get_user_id()
.stack_err("Ignoring unreal users.")?;
|
| ︙ | ︙ | |||
53 54 55 56 57 58 59 |
"/disable" => conn.disable(number, sender).await.stack()?.into(),
_ => bail!("Command {command} {words:?} not handled."),
},
}
} else {
"This command needs exacly one number.".into()
};
| | | 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 |
"/disable" => conn.disable(number, sender).await.stack()?.into(),
_ => bail!("Command {command} {words:?} not handled."),
},
}
} else {
"This command needs exacly one number.".into()
};
core.tg.send(reply, Some(msg.chat.get_id()), None).await.stack()?;
Ok(())
}
pub async fn update (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> {
let sender = msg.sender.get_user_id()
.stack_err("Ignoring unreal users.")?;
let mut source_id: Option<i32> = None;
|
| ︙ | ︙ | |||
128 129 130 131 132 133 134 | Some(thing) } } }, None => None, }; let chat_id = ChatUsername::from(channel.as_ref()); | | | | | | 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
Some(thing)
}
}
},
None => None,
};
let chat_id = ChatUsername::from(channel.as_ref());
let channel_id = core.tg.client.execute(GetChat::new(chat_id.clone())).await.stack_err("gettting GetChat")?.id;
let chan_adm = core.tg.client.execute(GetChatAdministrators::new(chat_id)).await
.context("Sorry, I have no access to that chat.")?;
let (mut me, mut user) = (false, false);
for admin in chan_adm {
let member_id = match admin {
ChatMember::Creator(member) => member.user.id,
ChatMember::Administrator(member) => member.user.id,
ChatMember::Left(_)
| ChatMember::Kicked(_)
| ChatMember::Member{..}
| ChatMember::Restricted(_) => continue,
};
if member_id == core.tg.me.id {
me = true;
}
if member_id == sender {
user = true;
}
};
if ! me { bail!("I need to be admin on that channel."); };
if ! user { bail!("You should be admin on that channel."); };
let mut conn = core.db.begin().await.stack()?;
core.tg.send(conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await.stack()?, Some(msg.chat.get_id()), None).await.stack()?;
Ok(())
}
|
Modified src/core.rs
from [170e5288e9]
to [1780cebf50].
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
use crate::{
command,
sql::Db,
};
use std::{
borrow::Cow,
collections::{
BTreeMap,
HashSet,
},
sync::Arc,
};
use async_compat::Compat;
use chrono::{
DateTime,
Local,
};
use lazy_static::lazy_static;
use regex::Regex;
| > | < < < < < < < < < | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
use crate::{
command,
sql::Db,
tg_bot::Tg,
};
use std::{
borrow::Cow,
collections::{
BTreeMap,
HashSet,
},
sync::Arc,
};
use async_compat::Compat;
use chrono::{
DateTime,
Local,
};
use lazy_static::lazy_static;
use regex::Regex;
use reqwest::header::LAST_MODIFIED;
use smol::{
Timer,
lock::Mutex,
};
use tgbot::{
handler::UpdateHandler,
types::{
ChatPeerId,
Command,
ParseMode,
Update,
UpdateType,
UserPeerId,
},
};
use stacked_errors::{
Result,
|
| ︙ | ︙ | |||
112 113 114 115 116 117 118 |
set.remove(&self.my_id);
})
}
}
#[derive(Clone)]
pub struct Core {
| < < | < | | | < < < < < | < | < < | < > | < < < < < < < < < < < < | 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 |
set.remove(&self.my_id);
})
}
}
#[derive(Clone)]
pub struct Core {
pub tg: Tg,
pub db: Db,
running: Arc<Mutex<HashSet<i32>>>,
http_client: reqwest::Client,
}
pub struct Post {
uri: String,
_title: String,
_authors: String,
_summary: String,
}
impl Core {
/// Create a Core instance from configuration and start its background autofetch loop.
///
/// The provided `settings` must include:
/// - `owner` (integer): chat id to use as the default destination,
/// - `api_key` (string): Telegram bot API key,
/// - `api_gateway` (string): Telegram API gateway host,
/// - `pg` (string): PostgreSQL connection string,
/// - optional `proxy` (string): proxy URL for the HTTP client.
///
/// On success returns an initialized `Core` with Telegram and HTTP clients, database connection,
/// an empty running set for per-id tokens, and a spawned background task that periodically runs
/// `autofetch`. If any required setting is missing or initialization fails, an error is returned.
pub async fn new(settings: config::Config) -> Result<Core> {
let mut client = reqwest::Client::builder();
if let Ok(proxy) = settings.get_string("proxy") {
let proxy = reqwest::Proxy::all(proxy).stack()?;
client = client.proxy(proxy);
}
let core = Core {
tg: Tg::new(&settings).await.stack()?,
db: Db::new(&settings.get_string("pg").stack()?)?,
running: Arc::new(Mutex::new(HashSet::new())),
http_client: client.build().stack()?,
};
let clone = core.clone();
smol::spawn(Compat::new(async move {
loop {
let delay = match &clone.autofetch().await {
Err(err) => {
if let Err(err) = clone.tg.send(format!("🛑 {err}"), None, None).await {
eprintln!("Autofetch error: {err:?}");
};
std::time::Duration::from_secs(60)
},
Ok(time) => *time,
};
Timer::after(delay).await;
}
})).detach();
Ok(core)
}
/// Fetches the feed for a source, sends any newly discovered posts to the appropriate chat, and records them in the database.
///
/// This acquires a per-source guard to prevent concurrent checks for the same `id`. If a check is already running for
/// the given `id`, the function returns an error. If `last_scrape` is provided, it is sent as the `If-Modified-Since`
/// header to the feed request. The function parses RSS or Atom feeds, sends unseen post URLs to either the source's
/// channel (when `real` is true) or the source owner (when `real` is false), and persists posted entries so they are
/// not reposted later.
|
| ︙ | ︙ | |||
214 215 216 217 218 219 220 |
///
/// `Posted: N` where `N` is the number of posts processed and sent.
pub async fn check (&self, id: i32, real: bool, last_scrape: Option<DateTime<Local>>) -> Result<String> {
let mut posted: i32 = 0;
let mut conn = self.db.begin().await.stack()?;
let _token = Token::new(&self.running, id).await.stack()?;
| | > > > > | 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
///
/// `Posted: N` where `N` is the number of posts processed and sent.
pub async fn check (&self, id: i32, real: bool, last_scrape: Option<DateTime<Local>>) -> Result<String> {
let mut posted: i32 = 0;
let mut conn = self.db.begin().await.stack()?;
let _token = Token::new(&self.running, id).await.stack()?;
let source = conn.get_source(id, self.tg.owner).await.stack()?;
conn.set_scrape(id).await.stack()?;
let destination = ChatPeerId::from(match real {
true => source.channel_id,
false => source.owner,
});
let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, Post> = BTreeMap::new();
let mut builder = self.http_client.get(&source.url);
if let Some(last_scrape) = last_scrape {
builder = builder.header(LAST_MODIFIED, last_scrape.to_rfc2822());
};
let response = builder.send().await.stack()?;
#[cfg(debug_assertions)]
{
use reqwest::header::{
CACHE_CONTROL,
EXPIRES,
};
let headers = response.headers();
let expires = headers.get(EXPIRES);
let cache = headers.get(CACHE_CONTROL);
if expires.is_some() || cache.is_some() {
println!("{} {} {:?} {:?} {:?}", Local::now().to_rfc2822(), &source.url, last_scrape, expires, cache);
}
}
|
| ︙ | ︙ | |||
257 258 259 260 261 262 263 |
} else {
&dates[0]
}
},
None => bail!("Feed item misses posting date."),
}),
}.stack()?;
| > | | | | < < < < < < | | | | | | 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 |
} else {
&dates[0]
}
},
None => bail!("Feed item misses posting date."),
}),
}.stack()?;
posts.insert(date, Post{
uri: link.to_string(),
_title: item.title().unwrap_or("").to_string(),
_authors: item.author().unwrap_or("").to_string(),
_summary: item.content().unwrap_or("").to_string(),
});
}
};
},
Err(err) => match err {
rss::Error::InvalidStartTag => {
match atom_syndication::Feed::read_from(&content[..]) {
Ok(feed) => {
for item in feed.entries() {
let date = item.published()
.stack_err("Feed item missing publishing date.")?;
let uri = {
let links = item.links();
if links.is_empty() {
bail!("Feed item missing post links.");
} else {
links[0].href().to_string()
}
};
let _authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::<Vec<String>>().join(", ");
let _summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() };
posts.insert(*date, Post{
uri,
_title: item.title().to_string(),
_authors,
_summary,
});
};
},
Err(err) => {
bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url)
},
}
|
| ︙ | ︙ | |||
314 315 316 317 318 319 320 |
Some(ref x) => sedregex::ReplaceCommand::new(x).stack()?.execute(&post.uri),
None => post.uri.clone().into(),
};
if ! conn.exists(&post_url, id).await.stack()? {
if this_fetch.is_none() || *date > this_fetch.unwrap() {
this_fetch = Some(*date);
};
| | | 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 |
Some(ref x) => sedregex::ReplaceCommand::new(x).stack()?.execute(&post.uri),
None => post.uri.clone().into(),
};
if ! conn.exists(&post_url, id).await.stack()? {
if this_fetch.is_none() || *date > this_fetch.unwrap() {
this_fetch = Some(*date);
};
self.tg.send( match &source.iv_hash {
Some(hash) => format!("<a href=\"https://t.me/iv?url={post_url}&rhash={hash}\"> </a>{post_url}"),
None => format!("{post_url}"),
}, Some(destination), Some(ParseMode::Html)).await.stack()?;
conn.add_post(id, date, &post_url).await.stack()?;
posted += 1;
};
};
|
| ︙ | ︙ | |||
338 339 340 341 342 343 344 |
conn.get_queue().await.stack()?
};
for row in queue {
if let Some(next_fetch) = row.next_fetch {
if next_fetch < now {
if let (Some(owner), Some(source_id), last_scrape) = (row.owner, row.source_id, row.last_scrape) {
let clone = Core {
| | | | > | < < | 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 |
conn.get_queue().await.stack()?
};
for row in queue {
if let Some(next_fetch) = row.next_fetch {
if next_fetch < now {
if let (Some(owner), Some(source_id), last_scrape) = (row.owner, row.source_id, row.last_scrape) {
let clone = Core {
tg: self.tg.with_owner(owner),
..self.clone()
};
let source = {
let mut conn = self.db.begin().await.stack()?;
match conn.get_one(owner, source_id).await {
Ok(Some(source)) => source.to_string(),
Ok(None) => "Source not found in database?".to_string(),
Err(err) => format!("Failed to fetch source data:\n{err}"),
}
};
smol::spawn(Compat::new(async move {
if let Err(err) = clone.check(source_id, true, Some(last_scrape)).await
&& let Err(err) = clone.tg.send(&format!("🛑 {source}\n{}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await
{
eprintln!("Check error: {err}");
};
})).detach();
}
} else if next_fetch - now < delay {
delay = next_fetch - now;
}
}
|
| ︙ | ︙ | |||
379 380 381 382 383 384 385 |
};
Ok(reply.join("\n\n"))
}
}
impl UpdateHandler for Core {
async fn handle (&self, update: Update) {
| | | > | | | | | | | | | | | | | | | > | < | | < | 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 |
};
Ok(reply.join("\n\n"))
}
}
impl UpdateHandler for Core {
async fn handle (&self, update: Update) {
if let UpdateType::Message(msg) = update.update_type
&& let Ok(cmd) = Command::try_from(msg)
{
let msg = cmd.get_message();
let words = cmd.get_args();
let command = cmd.get_name();
let res = match command {
"/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, command, msg, words).await,
"/start" => command::start(self, msg).await,
"/list" => command::list(self, msg).await,
"/add" | "/update" => command::update(self, command, msg, words).await,
any => Err(anyhow!("Unknown command: {any}")),
};
if let Err(err) = res
&& let Err(err2) = self.tg.send(format!("\\#error\n```\n{err}\n```"),
Some(msg.chat.get_id()),
Some(ParseMode::MarkdownV2)
).await
{
dbg!(err2);
}
};
}
}
|
Modified src/main.rs
from [527f3dd3d4]
to [aa21911e27].
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
//! This is telegram bot to fetch RSS/ATOM feeds and post results on public
//! channels
#![warn(missing_docs)]
mod command;
mod core;
mod sql;
use async_compat::Compat;
use stacked_errors::{
Result,
StackableErr,
};
use tgbot::handler::LongPoll;
| > | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
//! This is telegram bot to fetch RSS/ATOM feeds and post results on public
//! channels
#![warn(missing_docs)]
mod command;
mod core;
mod sql;
mod tg_bot;
use async_compat::Compat;
use stacked_errors::{
Result,
StackableErr,
};
use tgbot::handler::LongPoll;
|
| ︙ | ︙ | |||
27 28 29 30 31 32 33 |
.set_default("api_gateway", "https://api.telegram.org").stack()?
.add_source(config::File::with_name("rsstg"))
.build()
.stack()?;
let core = core::Core::new(settings).await.stack()?;
| | | 28 29 30 31 32 33 34 35 36 37 38 |
.set_default("api_gateway", "https://api.telegram.org").stack()?
.add_source(config::File::with_name("rsstg"))
.build()
.stack()?;
let core = core::Core::new(settings).await.stack()?;
LongPoll::new(core.tg.client.clone(), core).run().await;
Ok(())
}
|
Added src/tg_bot.rs version [f1db2464ab].