Check-in [fabcca1eaf]
Logged in as anonymous
Overview
Comment:refactor/add comments (by CodeRabbit)
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: fabcca1eafc9428eb800012313eadbdfd08533a35ee12e9ed6313585569fb74c
User & Date: arcade on 2026-01-09 10:41:04.921
Other Links: manifest | tags
Context
2026-01-10
12:16
sample Callback, comment fixes Leaf check-in: 13265e7697 user: arcade tags: trunk
2026-01-09
10:41
refactor/add comments (by CodeRabbit) check-in: fabcca1eaf user: arcade tags: trunk
10:00
move some Telegram code to separate module check-in: 9c4f09193a user: arcade tags: trunk
Changes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
[package]
name = "rsstg"
version = "0.5.3"
authors = [ "arcade@b1t.name" ]
edition = "2024"
license = "0BSD"
license-file = "LICENSE.0BSD"
repository = "http://fs.b1t.name/rsstg"

[dependencies]
async-compat = "0.2.5"
atom_syndication = { version = "0.12.4", features = [ "with-serde" ] }
chrono = "0.4.38"
config = { version = "0.15", default-features = false, features = [ "toml" ] }






<







1
2
3
4
5
6

7
8
9
10
11
12
13
[package]
name = "rsstg"
version = "0.5.3"
authors = [ "arcade@b1t.name" ]
edition = "2024"
license = "0BSD"

repository = "http://fs.b1t.name/rsstg"

[dependencies]
async-compat = "0.2.5"
atom_syndication = { version = "0.12.4", features = [ "with-serde" ] }
chrono = "0.4.38"
config = { version = "0.15", default-features = false, features = [ "toml" ] }
19
20
21
22
23
24
25

26
27
28
29
30
31




32
33
34
35
36
37
38
39












40
41
42
43
44
45
46
use url::Url;

lazy_static! {
	static ref RE_USERNAME: Regex = Regex::new(r"^@([a-zA-Z][a-zA-Z0-9_]+)$").unwrap();
	static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap();
}


pub async fn start (core: &Core, msg: &Message) -> Result<()> {
	core.tg.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.",
		Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?;
	Ok(())
}





pub async fn list (core: &Core, msg: &Message) -> Result<()> {
	let sender = msg.sender.get_user_id()
		.stack_err("Ignoring unreal users.")?;
	let reply = core.list(sender).await.stack()?;
	core.tg.send(reply, Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?;
	Ok(())
}













pub async fn command (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> {
	let mut conn = core.db.begin().await.stack()?;
	let sender = msg.sender.get_user_id()
		.stack_err("Ignoring unreal users.")?;
	let reply = if words.len() == 1 {
		match words[0].parse::<i32>() {
			Err(err) => format!("I need a number.\n{}", &err).into(),







>






>
>
>
>








>
>
>
>
>
>
>
>
>
>
>
>







19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
use url::Url;

lazy_static! {
	static ref RE_USERNAME: Regex = Regex::new(r"^@([a-zA-Z][a-zA-Z0-9_]+)$").unwrap();
	static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap();
}

/// Sends an informational message to the message's chat linking to the bot help channel.
pub async fn start (core: &Core, msg: &Message) -> Result<()> {
	core.tg.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.",
		Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?;
	Ok(())
}

/// Send the sender's subscription list to the chat.
///
/// Retrieves the message sender's user ID, obtains their subscription list from `core`,
/// and sends the resulting reply into the message chat using MarkdownV2.
pub async fn list (core: &Core, msg: &Message) -> Result<()> {
	let sender = msg.sender.get_user_id()
		.stack_err("Ignoring unreal users.")?;
	let reply = core.list(sender).await.stack()?;
	core.tg.send(reply, Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?;
	Ok(())
}

/// Handle channel-management commands that operate on a single numeric source ID.
///
/// This validates that exactly one numeric argument is provided, performs the requested
/// operation (check, clean, enable, delete, disable) against the database or core,
/// and sends the resulting reply to the chat.
///
/// # Parameters
///
/// - `core`: application core containing database and Telegram clients.
/// - `command`: command string (e.g. "/check", "/clean", "/enable", "/delete", "/disable").
/// - `msg`: incoming Telegram message that triggered the command; used to determine sender and chat.
/// - `words`: command arguments; expected to contain exactly one element that parses as a 32-bit integer.
pub async fn command (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> {
	let mut conn = core.db.begin().await.stack()?;
	let sender = msg.sender.get_user_id()
		.stack_err("Ignoring unreal users.")?;
	let reply = if words.len() == 1 {
		match words[0].parse::<i32>() {
			Err(err) => format!("I need a number.\n{}", &err).into(),
57
58
59
60
61
62
63











64
65
66
67
68
69
70
	} else {
		"This command needs exacly one number.".into()
	};
	core.tg.send(reply, Some(msg.chat.get_id()), None).await.stack()?;
	Ok(())
}












pub async fn update (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> {
	let sender = msg.sender.get_user_id()
		.stack_err("Ignoring unreal users.")?;
	let mut source_id: Option<i32> = None;
	let at_least = "Requires at least 3 parameters.";
	let mut i_words = words.iter();
	match command {







>
>
>
>
>
>
>
>
>
>
>







74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
	} else {
		"This command needs exacly one number.".into()
	};
	core.tg.send(reply, Some(msg.chat.get_id()), None).await.stack()?;
	Ok(())
}

/// Validate command arguments, check permissions and update or add a channel feed configuration in the database.
///
/// This function parses and validates parameters supplied by a user command (either "/update <id> ..." or "/add ..."),
/// verifies the channel username and feed URL, optionally validates an IV hash and a replacement regexp,
/// ensures both the bot and the command sender are administrators of the target channel, and performs the database update.
///
/// # Parameters
///
/// - `command` — the invoked command, expected to be either `"/update"` (followed by a numeric source id) or `"/add"`.
/// - `msg` — the incoming Telegram message; used to derive the command sender and target chat id for the reply.
/// - `words` — the command arguments: for `"/add"` expected `channel url [iv_hash|'-'] [url_re|'-']`; for `"/update"` the first element must be a numeric `source_id` followed by the same parameters.
pub async fn update (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> {
	let sender = msg.sender.get_user_id()
		.stack_err("Ignoring unreal users.")?;
	let mut source_id: Option<i32> = None;
	let at_least = "Requires at least 3 parameters.";
	let mut i_words = words.iter();
	match command {
151
152
153
154
155
156
157
158

159
160
		if member_id == sender {
			user = true;
		}
	};
	if ! me   { bail!("I need to be admin on that channel."); };
	if ! user { bail!("You should be admin on that channel."); };
	let mut conn = core.db.begin().await.stack()?;
	core.tg.send(conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await.stack()?, Some(msg.chat.get_id()), None).await.stack()?;

	Ok(())
}







|
>


179
180
181
182
183
184
185
186
187
188
189
		if member_id == sender {
			user = true;
		}
	};
	if ! me   { bail!("I need to be admin on that channel."); };
	if ! user { bail!("You should be admin on that channel."); };
	let mut conn = core.db.begin().await.stack()?;
	let update = conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await.stack()?;
	core.tg.send(update, Some(msg.chat.get_id()), None).await.stack()?;
	Ok(())
}
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
	_summary: String,
}

impl Core {
	/// Create a Core instance from configuration and start its background autofetch loop.
	///
	/// The provided `settings` must include:
	/// - `owner` (integer): chat id to use as the default destination,
	/// - `api_key` (string): Telegram bot API key,
	/// - `api_gateway` (string): Telegram API gateway host,
	/// - `pg` (string): PostgreSQL connection string,
	/// - optional `proxy` (string): proxy URL for the HTTP client.
	///
	/// On success returns an initialized `Core` with Telegram and HTTP clients, database connection,
	/// an empty running set for per-id tokens, and a spawned background task that periodically runs







|







121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
	_summary: String,
}

impl Core {
	/// Create a Core instance from configuration and start its background autofetch loop.
	///
	/// The provided `settings` must include:
	/// - `owner` (integer): default chat id to use as the owner/destination,
	/// - `api_key` (string): Telegram bot API key,
	/// - `api_gateway` (string): Telegram API gateway host,
	/// - `pg` (string): PostgreSQL connection string,
	/// - optional `proxy` (string): proxy URL for the HTTP client.
	///
	/// On success returns an initialized `Core` with Telegram and HTTP clients, database connection,
	/// an empty running set for per-id tokens, and a spawned background task that periodically runs
294
295
296
297
298
299
300





301
302
303
304
305
306
307
				posted += 1;
			};
		};
		posts.clear();
		Ok(format!("Posted: {posted}"))
	}






	async fn autofetch(&self) -> Result<std::time::Duration> {
		let mut delay = chrono::Duration::minutes(1);
		let now = chrono::Local::now();
		let queue = {
			let mut conn = self.db.begin().await.stack()?;
			conn.get_queue().await.stack()?
		};







>
>
>
>
>







294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
				posted += 1;
			};
		};
		posts.clear();
		Ok(format!("Posted: {posted}"))
	}

	/// Determine the delay until the next scheduled fetch and spawn background checks for any overdue sources.
	///
	/// This scans the database queue, spawns background tasks to run checks for sources whose `next_fetch`
	/// is in the past (each task uses a Core clone with the appropriate owner), and computes the shortest
	/// duration until the next `next_fetch`.
	async fn autofetch(&self) -> Result<std::time::Duration> {
		let mut delay = chrono::Duration::minutes(1);
		let now = chrono::Local::now();
		let queue = {
			let mut conn = self.db.begin().await.stack()?;
			conn.get_queue().await.stack()?
		};
345
346
347
348
349
350
351






352
353
354
355
356
357
358
			reply.push(row.to_string());
		};
		Ok(reply.join("\n\n"))
	}
}

impl UpdateHandler for Core {






	async fn handle (&self, update: Update) {
		if let UpdateType::Message(msg) = update.update_type 
			&& let Ok(cmd) = Command::try_from(msg)
		{
			let msg = cmd.get_message();
			let words = cmd.get_args();
			let command = cmd.get_name();







>
>
>
>
>
>







350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
			reply.push(row.to_string());
		};
		Ok(reply.join("\n\n"))
	}
}

impl UpdateHandler for Core {
	/// Dispatches an incoming Telegram update to a matching command handler and reports handler errors to the originating chat.
	///
	/// This method inspects the update; if it contains a message that can be parsed as a bot command,
	/// it executes the corresponding command handler. If the handler returns an error, the error text
	/// is sent back to the message's chat using MarkdownV2 formatting. Unknown commands produce an erro
	/// which is also reported to the chat.
	async fn handle (&self, update: Update) {
		if let UpdateType::Message(msg) = update.update_type 
			&& let Ok(cmd) = Command::try_from(msg)
		{
			let msg = cmd.get_message();
			let words = cmd.get_args();
			let command = cmd.get_name();
367
368
369
370
371
372
373
374
375
376
				&& let Err(err2) = self.tg.send(format!("\\#error\n```\n{err}\n```"),
					Some(msg.chat.get_id()),
					Some(ParseMode::MarkdownV2)
				).await
			{
				dbg!(err2);
			}
		};
	}
}







|


378
379
380
381
382
383
384
385
386
387
				&& let Err(err2) = self.tg.send(format!("\\#error\n```\n{err}\n```"),
					Some(msg.chat.get_id()),
					Some(ParseMode::MarkdownV2)
				).await
			{
				dbg!(err2);
			}
		} // TODO: debug log for skipped updates?;
	}
}
19
20
21
22
23
24
25




26
27
28
29
30
31
32
	smol::block_on(Compat::new(async {
		async_main().await.unwrap();
	}));

	Ok(())
}





async fn async_main () -> Result<()> {
	let settings = config::Config::builder()
		.set_default("api_gateway", "https://api.telegram.org").stack()?
		.add_source(config::File::with_name("rsstg"))
		.build()
		.stack()?;








>
>
>
>







19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
	smol::block_on(Compat::new(async {
		async_main().await.unwrap();
	}));

	Ok(())
}

/// Initialises configuration and the bot core, then runs the Telegram long-poll loop.
///
/// This function loads configuration (with a default API gateway), constructs the application
/// core, and starts the long-polling loop that handles incoming Telegram updates.
async fn async_main () -> Result<()> {
	let settings = config::Config::builder()
		.set_default("api_gateway", "https://api.telegram.org").stack()?
		.add_source(config::File::with_name("rsstg"))
		.build()
		.stack()?;

146
147
148
149
150
151
152








153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
			.execute(&mut *self.0).await.stack()?.rows_affected() {
			1 => { Ok("Source enabled.") },
			0 => { Ok("Source not found.") },
			_ => { bail!("Database error.") },
		}
	}









	pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<bool>
	where I: Into<i64> {
		let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
			.bind(post_url)
			.bind(id.into())
			.fetch_one(&mut *self.0).await.stack()?;
		if let Some(exists) = row.try_get("exists").stack()? {
			Ok(exists)
		} else {
			bail!("Database error: can't check whether post exists.");
		}
	}

	/// Get all pending events for (now + 1 minute)
	pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
		let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner, last_scrape from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
			.fetch_all(&mut *self.0).await.stack()?;
		Ok(block)







>
>
>
>
>
>
>
>






|
<
<
|
<







146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167


168

169
170
171
172
173
174
175
			.execute(&mut *self.0).await.stack()?.rows_affected() {
			1 => { Ok("Source enabled.") },
			0 => { Ok("Source not found.") },
			_ => { bail!("Database error.") },
		}
	}

	/// Checks whether a post with the given URL exists for the specified source.
	///
	/// # Parameters
	/// - `post_url`: The URL of the post to check.
	/// - `id`: The source identifier (converted to `i64`).
	///
	/// # Returns
	/// `true` if a post with the URL exists for the source, `false` otherwise.
	pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<bool>
	where I: Into<i64> {
		let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
			.bind(post_url)
			.bind(id.into())
			.fetch_one(&mut *self.0).await.stack()?;
		row.try_get("exists")


			.stack_err("Database error: can't check whether post exists.")

	}

	/// Get all pending events for (now + 1 minute)
	pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
		let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner, last_scrape from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
			.fetch_all(&mut *self.0).await.stack()?;
		Ok(block)
18
19
20
21
22
23
24









25
26
27
28
29
30
31
32
33
34
35
36
37
38
39




40
41
42
43
44
45
46
47
48
49
50
51







52

53
54
55
56
57
58
pub struct Tg {
	pub me: Bot,
	pub owner: ChatPeerId,
	pub client: Client,
}

impl Tg {









	pub async fn new (settings: &config::Config) -> Result<Tg> {
		let api_key = settings.get_string("api_key").stack()?;

		let owner = ChatPeerId::from(settings.get_int("owner").stack()?);
		let client = Client::new(&api_key).stack()?
			.with_host(settings.get_string("api_gateway").stack()?)
			.with_max_retries(0);
		let me = client.execute(GetBot).await.stack()?;
		Ok(Tg {
			me,
			owner,
			client,
		})
	}





	pub async fn send <S>(&self, msg: S, target: Option<ChatPeerId>, mode: Option<ParseMode>) -> Result<Message>
	where S: Into<String> {
		let msg = msg.into();

		let mode = mode.unwrap_or(ParseMode::Html);
		let target = target.unwrap_or(self.owner);
		self.client.execute(
			SendMessage::new(target, msg)
				.with_parse_mode(mode)
		).await.stack()
	}








	pub fn with_owner (&self, owner: i64) -> Tg {

		Tg {
			owner: ChatPeerId::from(owner),
			..self.clone()
		}
	}
}







>
>
>
>
>
>
>
>
>















>
>
>
>












>
>
>
>
>
>
>
|
>

|




18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
pub struct Tg {
	pub me: Bot,
	pub owner: ChatPeerId,
	pub client: Client,
}

impl Tg {
	/// Construct a new `Tg` instance from configuration.
	///
	/// The `settings` must provide the following keys:
	///  - `"api_key"` (string),
	///  - `"owner"` (integer chat id),
	///  - `"api_gateway"` (string).
	///
	/// The function initialises the client, configures the gateway and fetches the bot identity
	/// before returning the constructed `Tg`.
	pub async fn new (settings: &config::Config) -> Result<Tg> {
		let api_key = settings.get_string("api_key").stack()?;

		let owner = ChatPeerId::from(settings.get_int("owner").stack()?);
		let client = Client::new(&api_key).stack()?
			.with_host(settings.get_string("api_gateway").stack()?)
			.with_max_retries(0);
		let me = client.execute(GetBot).await.stack()?;
		Ok(Tg {
			me,
			owner,
			client,
		})
	}

	/// Send a text message to a chat, using an optional target and parse mode.
	///
	/// # Returns
	/// The sent `Message` on success.
	pub async fn send <S>(&self, msg: S, target: Option<ChatPeerId>, mode: Option<ParseMode>) -> Result<Message>
	where S: Into<String> {
		let msg = msg.into();

		let mode = mode.unwrap_or(ParseMode::Html);
		let target = target.unwrap_or(self.owner);
		self.client.execute(
			SendMessage::new(target, msg)
				.with_parse_mode(mode)
		).await.stack()
	}

	/// Create a copy of this `Tg` with the owner replaced by the given chat ID.
	///
	/// # Parameters
	/// - `owner`: The Telegram chat identifier to set as the new owner (expressed as an `i64`).
	///
	/// # Returns
	/// A new `Tg` instance identical to the original except its `owner` field is set to the provided chat ID.
	pub fn with_owner <O>(&self, owner: O) -> Tg
	where O: Into<i64> {
		Tg {
			owner: ChatPeerId::from(owner.into()),
			..self.clone()
		}
	}
}