Changes On Branch trunk
Logged in as anonymous

Changes In Branch trunk Excluding Merge-Ins

This is equivalent to a diff from 01565c7f7e to d8c1d259a2

2026-04-23
18:44
some fixes (by CodeRabbit), merge changes from release branch leaf check-in: d8c1d259a2 user: arcade tags: trunk
10:36
fix workflow, fix and comment clippy warnings bump rustls-webpki to 0.103.13 (fixes 2 lowsec issues) leaf check-in: 01565c7f7e user: arcade tags: release, v0.5.6
2026-04-18
18:31
bump, update workflow, add update reaction code, change paging logic check-in: be0b8602d1 user: arcade tags: trunk
18:28
bump deps and workflow check-in: 2af2d3bc25 user: arcade tags: release, v0.5.5

Modified Cargo.lock from [1310a2aaa8] to [6aa5bd626f].
1454
1455
1456
1457
1458
1459
1460






1461
1462
1463
1464
1465
1466
1467
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473







+
+
+
+
+
+







source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e99fb7a497b1e3339bc746195567ed8d3e24945ecd636e3619d20b9de9e9149"
dependencies = [
 "pkg-config",
 "vcpkg",
]

[[package]]
name = "linked-hash-map"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0717cef1bc8b636c6e1c1bbdefc09e6322da8a9321966e8928ef80d20f7f770f"

[[package]]
name = "linux-raw-sys"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53"

[[package]]
2030
2031
2032
2033
2034
2035
2036
2037

2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049

2050
2051
2052
2053


2054
2055
2056
2057
2058
2059
2060
2036
2037
2038
2039
2040
2041
2042

2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069







-
+












+




+
+







 "derive_builder",
 "never",
 "quick-xml",
]

[[package]]
name = "rsstg"
version = "0.5.6"
version = "0.6.0"
dependencies = [
 "async-compat",
 "atom_syndication",
 "chrono",
 "config",
 "futures",
 "futures-util",
 "lazy_static",
 "regex",
 "reqwest",
 "rss",
 "sedregex",
 "serde",
 "smol",
 "sqlx",
 "stacked_errors",
 "tgbot",
 "toml",
 "ttl_cache",
 "url",
]

[[package]]
name = "rustc-hash"
version = "2.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
2879
2880
2881
2882
2883
2884
2885

2886
2887
2888
2889

2890
2891
2892
2893
2894
2895
2896
2888
2889
2890
2891
2892
2893
2894
2895
2896
2897
2898
2899
2900
2901
2902
2903
2904
2905
2906
2907







+




+








[[package]]
name = "toml"
version = "1.1.2+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81f3d15e84cbcd896376e6730314d59fb5a87f31e4b038454184435cd57defee"
dependencies = [
 "indexmap",
 "serde_core",
 "serde_spanned",
 "toml_datetime",
 "toml_parser",
 "toml_writer",
 "winnow",
]

[[package]]
name = "toml_datetime"
version = "1.1.1+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
2904
2905
2906
2907
2908
2909
2910






2911
2912
2913
2914
2915
2916
2917
2915
2916
2917
2918
2919
2920
2921
2922
2923
2924
2925
2926
2927
2928
2929
2930
2931
2932
2933
2934







+
+
+
+
+
+







version = "1.1.2+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2abe9b86193656635d2411dc43050282ca48aa31c2451210f4202550afb7526"
dependencies = [
 "winnow",
]

[[package]]
name = "toml_writer"
version = "1.1.1+spec-1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "756daf9b1013ebe47a8776667b466417e2d4c5679d441c26230efd9ef78692db"

[[package]]
name = "tower"
version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4"
dependencies = [
 "futures-core",
2991
2992
2993
2994
2995
2996
2997









2998
2999
3000
3001
3002
3003
3004
3008
3009
3010
3011
3012
3013
3014
3015
3016
3017
3018
3019
3020
3021
3022
3023
3024
3025
3026
3027
3028
3029
3030







+
+
+
+
+
+
+
+
+







]

[[package]]
name = "try-lock"
version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"

[[package]]
name = "ttl_cache"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4189890526f0168710b6ee65ceaedf1460c48a14318ceec933cb26baa492096a"
dependencies = [
 "linked-hash-map",
]

[[package]]
name = "typenum"
version = "1.20.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "40ce102ab67701b8526c123c1bab5cbe42d7040ccfd0f64af1a385808d2f43de"

Modified Cargo.toml from [a175952cdd] to [12972fe474].
1
2
3
4
5





6
7
8
9
10
11
12
13
14
15
16
17
18
19

20
21
22


23
24
25
26
27
1
2



3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32


-
-
-
+
+
+
+
+














+



+
+





[package]
name = "rsstg"
version = "0.5.6"
authors = ["arcade"]
edition = "2021"
version = "0.6.0"
authors = [ "arcade@b1t.name" ]
edition = "2024"
license = "0BSD"
repository = "http://fs.b1t.name/rsstg"

[dependencies]
async-compat = "0.2.5"
atom_syndication = { version = "0.12.4", features = [ "with-serde" ] }
chrono = "0.4.38"
config = { version = "0.15", default-features = false, features = [ "toml" ] }
tgbot = "0.44"
futures = "0.3.30"
futures-util = "0.3.30"
lazy_static = "1.5.0"
regex = "1.10.6"
reqwest = { version = "0.13.1", features = [ "brotli", "socks", "deflate" ]}
rss = "2.0.9"
sedregex = "0.2.5"
serde = "1.0.228"
smol = "2.0.2"
stacked_errors = "0.7.1"
sqlx = { version = "0.8", features = [ "postgres", "runtime-tokio-rustls", "chrono", "macros" ], default-features = false }
toml = "1.1.0"
ttl_cache = "0.5.1"
url = "2.5.8"

[profile.release]
lto = true
codegen-units = 1
Modified LICENSE.0BSD from [25cd2d58c1] to [a725e46479].
1

2
3
4
5
6
7
8

1
2
3
4
5
6
7
8
-
+







Copyright (C) 2020-2023 by Volodymyr Kostyrko <arcade@b1t.name>
Copyright (C) 2020-2026 by Volodymyr Kostyrko <arcade@b1t.name>

Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted.

THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
Modified src/command.rs from [f2ddddf16c] to [86e99002d1].
1








2
3
4
5
6
7
8
9
10
11


12
13
14
15

16
17
18
19
20
21
22
23
24
25

26

27
28



29
30
31




32
33
34
35



36







37
38
39












40
41
42
43
44
45
46
47
48
49
50
51

52




53
54
55
56
57
58

59
60

61
62
63












64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26

27
28
29
30
31
32
33
34
35
36
37


38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

90
91
92
93
94
95
96
97
98

99
100

101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136











137
138
139
140
141
142
143
-
+
+
+
+
+
+
+
+










+
+




+

-








+

+
-
-
+
+
+



+
+
+
+




+
+
+
-
+
+
+
+
+
+
+



+
+
+
+
+
+
+
+
+
+
+
+












+
-
+
+
+
+





-
+

-
+



+
+
+
+
+
+
+
+
+
+
+
+




















-
-
-
-
-
-
-
-
-
-
-







use crate::core::Core;
use crate::{
	core::Core,
	tg_bot::{
		Callback,
		MyMessage,
		get_kb,
	},
};

use lazy_static::lazy_static;
use regex::Regex;
use sedregex::ReplaceCommand;
use stacked_errors::{
	Result,
	StackableErr,
	bail,
};
use tgbot::types::{
	CallbackQuery,
	Chat,
	ChatMember,
	ChatUsername,
	GetChat,
	GetChatAdministrators,
	MaybeInaccessibleMessage,
	Message,
	ParseMode::MarkdownV2,
};
use url::Url;

lazy_static! {
	static ref RE_USERNAME: Regex = Regex::new(r"^@([a-zA-Z][a-zA-Z0-9_]+)$").unwrap();
	static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap();
}

/// Sends an informational message to the message's chat linking to the bot help channel.
pub async fn start (core: &Core, msg: &Message) -> Result<()> {
	core.tg.send(MyMessage::html_to(
	core.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.",
		Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?;
		"We are open. Probably. Visit <a href=\"https://t.me/rsstg_bot_help/3\">channel</a>) for details.",
		msg.chat.get_id()
	)).await.stack()?;
	Ok(())
}

/// Send the sender's subscription list to the chat.
///
/// Retrieves the message sender's user ID, obtains their subscription list from `core`,
/// and sends the resulting reply into the message chat using HTML
pub async fn list (core: &Core, msg: &Message) -> Result<()> {
	let sender = msg.sender.get_user_id()
		.stack_err("Ignoring unreal users.")?;
	let reply = core.list(sender).await.stack()?;
	core.tg.send(MyMessage::html_to(reply, msg.chat.get_id())).await.stack()?;
	Ok(())
}
	core.send(reply, Some(msg.chat.get_id()), Some(MarkdownV2)).await.stack()?;

pub async fn test (core: &Core, msg: &Message) -> Result<()> {
	let sender: i64 = msg.sender.get_user_id()
		.stack_err("Ignoring unreal users.")?.into();
	let feeds = core.get_feeds(sender).await.stack()?;
	let kb = get_kb(&Callback::menu(), &feeds).await.stack()?;
	core.tg.send(MyMessage::html_to_kb("Main menu:", msg.chat.get_id(), kb)).await.stack()?;
	Ok(())
}

/// Handle channel-management commands that operate on a single numeric source ID.
///
/// This validates that exactly one numeric argument is provided, performs the requested
/// operation (check, clean, enable, delete, disable) against the database or core,
/// and sends the resulting reply to the chat.
///
/// # Parameters
///
/// - `core`: application core containing database and Telegram clients.
/// - `command`: command string (e.g. "/check", "/clean", "/enable", "/delete", "/disable").
/// - `msg`: incoming Telegram message that triggered the command; used to determine sender and chat.
/// - `words`: command arguments; expected to contain exactly one element that parses as a 32-bit integer.
pub async fn command (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> {
	let mut conn = core.db.begin().await.stack()?;
	let sender = msg.sender.get_user_id()
		.stack_err("Ignoring unreal users.")?;
	let reply = if words.len() == 1 {
		match words[0].parse::<i32>() {
			Err(err) => format!("I need a number.\n{}", &err).into(),
			Ok(number) => match command {
				"/check" => core.check(number, false, None).await
					.context("Channel check failed.")?.into(),
				"/clean" => conn.clean(number, sender).await.stack()?,
				"/enable" => conn.enable(number, sender).await.stack()?.into(),
				"/delete" => {
				"/delete" => conn.delete(number, sender).await.stack()?,
					let res = conn.delete(number, sender).await.stack()?;
					core.rm_feed(sender.into(), &number).await.stack()?;
					res
				}
				"/disable" => conn.disable(number, sender).await.stack()?.into(),
				_ => bail!("Command {command} {words:?} not handled."),
			},
		}
	} else {
		"This command needs exacly one number.".into()
		"This command needs exactly one number.".into()
	};
	core.send(reply, Some(msg.chat.get_id()), None).await.stack()?;
	core.tg.send(MyMessage::html_to(reply, msg.chat.get_id())).await.stack()?;
	Ok(())
}

/// Validate command arguments, check permissions and update or add a channel feed configuration in the database.
///
/// This function parses and validates parameters supplied by a user command (either "/update <id> ..." or "/add ..."),
/// verifies the channel username and feed URL, optionally validates an IV hash and a replacement regexp,
/// ensures both the bot and the command sender are administrators of the target channel, and performs the database update.
///
/// # Parameters
///
/// - `command` — the invoked command, expected to be either `"/update"` (followed by a numeric source id) or `"/add"`.
/// - `msg` — the incoming Telegram message; used to derive the command sender and target chat id for the reply.
/// - `words` — the command arguments: for `"/add"` expected `channel url [iv_hash|'-'] [url_re|'-']`; for `"/update"`
///   the first element must be a numeric `source_id` followed by the same parameters.
pub async fn update (core: &Core, command: &str, msg: &Message, words: &[String]) -> Result<()> {
	let sender = msg.sender.get_user_id()
		.stack_err("Ignoring unreal users.")?;
	let mut source_id: Option<i32> = None;
	let at_least = "Requires at least 3 parameters.";
	let mut i_words = words.iter();
	match command {
		"/update" => {
			let next_word = i_words.next().context(at_least)?;
			source_id = Some(next_word.parse::<i32>()
				.context(format!("I need a number, but got {next_word}."))?);
		},
		"/add" => {},
		_ => bail!("Passing {command} is not possible here."),
	};
	let (channel, url, iv_hash, url_re) = (
		i_words.next().context(at_least)?,
		i_words.next().context(at_least)?,
		i_words.next(),
		i_words.next());
	/*
	let channel = match RE_USERNAME.captures(channel) {
		Some(caps) => match caps.get(1) {
			Some(data) => data.as_str(),
			None => bail!("No string found in channel name"),
		},
		None => {
			bail!("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?\nNot {channel:?}");
		},
	};
	*/
	if ! RE_USERNAME.is_match(channel) {
		bail!("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?\nNot {channel:?}");
	};
	{
		let parsed_url = Url::parse(url)
			.stack_err("Expecting a valid link to ATOM/RSS feed.")?;
		match parsed_url.scheme() {
128
129
130
131
132
133
134
135
136


137
138
139
140
141
142
143
144
145
146
147
148

149
150
151
152
153
154
155
156
157
158









159
160

































170
171
172
173
174
175
176


177
178
179
180
181
182
183
184
185
186
187
188
189

190
191
192
193
194
195
196
197
198
199

200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243







-
-
+
+











-
+









-
+
+
+
+
+
+
+
+
+


+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
					Some(thing)
				}
			}
		},
		None => None,
	};
	let chat_id = ChatUsername::from(channel.as_ref());
	let channel_id = core.tg.execute(GetChat::new(chat_id.clone())).await.stack_err("gettting GetChat")?.id;
	let chan_adm = core.tg.execute(GetChatAdministrators::new(chat_id)).await
	let channel_id = core.tg.client.execute(GetChat::new(chat_id.clone())).await.stack_err("getting GetChat")?.id;
	let chan_adm = core.tg.client.execute(GetChatAdministrators::new(chat_id)).await
		.context("Sorry, I have no access to that chat.")?;
	let (mut me, mut user) = (false, false);
	for admin in chan_adm {
		let member_id = match admin {
			ChatMember::Creator(member) => member.user.id,
			ChatMember::Administrator(member) => member.user.id,
			ChatMember::Left(_)
			| ChatMember::Kicked(_)
			| ChatMember::Member{..}
			| ChatMember::Restricted(_) => continue,
		};
		if member_id == core.me.id {
		if member_id == core.tg.me.id {
			me = true;
		}
		if member_id == sender {
			user = true;
		}
	};
	if ! me   { bail!("I need to be admin on that channel."); };
	if ! user { bail!("You should be admin on that channel."); };
	let mut conn = core.db.begin().await.stack()?;
	core.send(conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await.stack()?, Some(msg.chat.get_id()), None).await.stack()?;
	let update = conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await.stack()?;
	core.tg.send(MyMessage::html_to(update, msg.chat.get_id())).await.stack()?;
	if command == "/add" {
		if let Some(new_record) = conn.get_one_name(sender, channel).await.stack()? {
			core.add_feed(sender.into(), new_record.source_id, new_record.channel).await.stack()?;
		} else {
			bail!("Failed to read data on freshly inserted source.");
		}
	};
	Ok(())
}

pub async fn answer_cb (core: &Core, query: &CallbackQuery, cb: &str) -> Result<()> {
	let cb: Callback = toml::from_str(cb).stack()?;
	let sender = &query.from;
	//let mut conn = core.db.begin().await.stack()?;
	let text = "Sample".to_owned();
	if let Some(msg) = &query.message {
		match msg {
			MaybeInaccessibleMessage::Message(message) => {
				if let Some(owner) = message.sender.get_user()
					&& sender == owner
				{
					let feeds = core.get_feeds(owner.id.into()).await.stack()?;
					core.tg.update_message(message.chat.get_id().into(), message.id, text, &feeds, cb).await?;
				} else {
					core.tg.send(MyMessage::html(format!("Can't identify request sender:<br><pre>{:?}</pre>", message))).await.stack()?;
				}
			},
			MaybeInaccessibleMessage::InaccessibleMessage(message) => {
				let sender: i64 = sender.id.into();
				if let Chat::Private(priv_chat) = &message.chat
					&& priv_chat.id == sender
				{
					let feeds = core.get_feeds(priv_chat.id.into()).await.stack()?;
					core.tg.update_message(message.chat.get_id().into(), message.message_id, text, &feeds, cb).await?;
				} else {
					core.tg.send(MyMessage::html(format!("Can't identify request sender:<br><pre>{:?}</pre>", message))).await.stack()?;
				}
			},
		};
	};
	Ok(())
}
Modified src/core.rs from [7b880bf97c] to [f29c7ca645].
1

2

3






4
5
6
7
8
9
10
11
12

13
14
15
16
17
18
19
20
21
22

23
24
25
26
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52












53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

20
21
22
23
24
25
26
27
28
29

30





31



















32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54








55
56
57
58
59
60
61

+

+

+
+
+
+
+
+








-
+









-
+
-
-
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-






+
+
+
+
+
+
+
+
+
+
+
+





-
-
-
-
-
-
-
-







use crate::{
	Arc,
	command,
	Mutex,
	sql::Db,
	tg_bot::{
		Callback,
		MyMessage,
		Tg,
		validate,
	},
};

use std::{
	borrow::Cow,
	collections::{
		BTreeMap,
		HashSet,
	},
	sync::Arc,
	time::Duration,
};

use async_compat::Compat;
use chrono::{
	DateTime,
	Local,
};
use lazy_static::lazy_static;
use regex::Regex;
use reqwest::header::{
use reqwest::header::LAST_MODIFIED;
	CACHE_CONTROL,
	EXPIRES,
	LAST_MODIFIED
};
use smol::{
use smol::Timer;
	Timer,
	lock::Mutex,
};
use tgbot::{
	api::Client,
	handler::UpdateHandler,
	types::{
		Bot,
		ChatPeerId,
		Command,
		GetBot,
		Message,
		ParseMode,
		SendMessage,
		Update,
		UpdateType,
		UserPeerId,
	},
};
use stacked_errors::{
	Result,
	StackableErr,
	anyhow,
	bail,
};
use tgbot::{
	handler::UpdateHandler,
	types::{
		CallbackQuery,
		ChatPeerId,
		Command,
		Update,
		UpdateType,
		UserPeerId,
	},
};
use ttl_cache::TtlCache;

lazy_static!{
	pub static ref RE_SPECIAL: Regex = Regex::new(r"([\-_*\[\]()~`>#+|{}\.!])").unwrap();
}

/// Escape characters that are special in Telegram MarkdownV2 by prefixing them with a backslash.
///
/// This ensures the returned string can be used as MarkdownV2-formatted Telegram message content
/// without special characters being interpreted as MarkdownV2 markup.
pub fn encode (text: &str) -> Cow<'_, str> {
	RE_SPECIAL.replace_all(text, "\\$1")
}

// This one does nothing except making sure only one token exists for each id
pub struct Token {
	running: Arc<Mutex<HashSet<i32>>>,
	my_id: i32,
}

impl Token {
110
111
112
113
114
115
116



117
118
119
120
121

122
123

124
125
126
127
128
129
130
131
132
133
134



135
136
137
138
139
140
141

142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161

162
163
164

165
166
167

168
169

170
171

172
173
174
175
176
177

178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
99
100
101
102
103
104
105
106
107
108
109
110



111

112
113
114
115
116
117
118
119
120
121



122
123
124
125
126
127
128
129
130

131
132
133
134
135
136
137
138
139
140





141
142
143
144
145

146

147

148


149
150
151

152

153
154
155
156
157
158
159

160
161
162
163
164
165
166
167
168
169
170
171
172












173
174
175
176
177
178
179







+
+
+


-
-
-
+
-

+








-
-
-
+
+
+






-
+









-
-
-
-
-





-
+
-

-
+
-
-

+

-
+
-

+





-
+












-
-
-
-
-
-
-
-
-
-
-
-







		smol::block_on(async {
			let mut set = self.running.lock_arc().await;
			set.remove(&self.my_id);
		})
	}
}

pub type FeedList = BTreeMap<i32, String>;
type UserCache = TtlCache<i64, Arc<Mutex<FeedList>>>;

#[derive(Clone)]
pub struct Core {
	owner_chat: ChatPeerId,
	// max_delay: u16,
	pub tg: Client,
	pub tg: Tg,
	pub me: Bot,
	pub db: Db,
	pub feeds: Arc<Mutex<UserCache>>,
	running: Arc<Mutex<HashSet<i32>>>,
	http_client: reqwest::Client,
}

// XXX Right now that part is unfinished and I guess I need to finish menu first
#[allow(unused)]
pub struct Post {
	uri: String,
	title: String,
	authors: String,
	summary: String,
	_title: String,
	_authors: String,
	_summary: String,
}

impl Core {
	/// Create a Core instance from configuration and start its background autofetch loop.
	///
	/// The provided `settings` must include:
	/// - `owner` (integer): chat id to use as the default destination,
	/// - `owner` (integer): default chat id to use as the owner/destination,
	/// - `api_key` (string): Telegram bot API key,
	/// - `api_gateway` (string): Telegram API gateway host,
	/// - `pg` (string): PostgreSQL connection string,
	/// - optional `proxy` (string): proxy URL for the HTTP client.
	///
	/// On success returns an initialized `Core` with Telegram and HTTP clients, database connection,
	/// an empty running set for per-id tokens, and a spawned background task that periodically runs
	/// `autofetch`. If any required setting is missing or initialization fails, an error is returned.
	pub async fn new(settings: config::Config) -> Result<Core> {
		let owner_chat = ChatPeerId::from(settings.get_int("owner").stack()?);
		let api_key = settings.get_string("api_key").stack()?;
		let tg = Client::new(&api_key).stack()?
			.with_host(settings.get_string("api_gateway").stack()?);

		let mut client = reqwest::Client::builder();
		if let Ok(proxy) = settings.get_string("proxy") {
			let proxy = reqwest::Proxy::all(proxy).stack()?;
			client = client.proxy(proxy);
		}
		let http_client = client.build().stack()?;

		let me = tg.execute(GetBot).await.stack()?;
		let core = Core {
			tg,
			tg: Tg::new(&settings).await.stack()?,
			me,
			owner_chat,
			db: Db::new(&settings.get_string("pg").stack()?)?,
			feeds: Arc::new(Mutex::new(TtlCache::new(10000))),
			running: Arc::new(Mutex::new(HashSet::new())),
			http_client,
			http_client: client.build().stack()?,
			// max_delay: 60,
		};

		let clone = core.clone();
		smol::spawn(Compat::new(async move {
			loop {
				let delay = match &clone.autofetch().await {
					Err(err) => {
						if let Err(err) = clone.send(format!("🛑 {err}"), None, None).await {
						if let Err(err) = clone.tg.send(MyMessage::html(format!("🛑 {err}"))).await {
							eprintln!("Autofetch error: {err:?}");
						};
						std::time::Duration::from_secs(60)
					},
					Ok(time) => *time,
				};
				Timer::after(delay).await;
			}
		})).detach();
		Ok(core)
	}

	pub async fn send <S>(&self, msg: S, target: Option<ChatPeerId>, mode: Option<ParseMode>) -> Result<Message>
	where S: Into<String> {
		let msg = msg.into();

		let mode = mode.unwrap_or(ParseMode::Html);
		let target = target.unwrap_or(self.owner_chat);
		self.tg.execute(
			SendMessage::new(target, msg)
				.with_parse_mode(mode)
		).await.stack()
	}

	/// Fetches the feed for a source, sends any newly discovered posts to the appropriate chat, and records them in the database.
	///
	/// This acquires a per-source guard to prevent concurrent checks for the same `id`. If a check is already running for
	/// the given `id`, the function returns an error. If `last_scrape` is provided, it is sent as the `If-Modified-Since`
	/// header to the feed request. The function parses RSS or Atom feeds, sends unseen post URLs to either the source's
	/// channel (when `real` is true) or the source owner (when `real` is false), and persists posted entries so they are
	/// not reposted later.
216
217
218
219
220
221
222
223

224
225
226
227
228
229
230
231
232
233
234
235
236
237
238




239
240
241
242
243
244
245
187
188
189
190
191
192
193

194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220







-
+















+
+
+
+







	///
	/// `Posted: N` where `N` is the number of posts processed and sent.
	pub async fn check (&self, id: i32, real: bool, last_scrape: Option<DateTime<Local>>) -> Result<String> {
		let mut posted: i32 = 0;
		let mut conn = self.db.begin().await.stack()?;

		let _token = Token::new(&self.running, id).await.stack()?;
		let source = conn.get_source(id, self.owner_chat).await.stack()?;
		let source = conn.get_source(id, self.tg.owner).await.stack()?;
		conn.set_scrape(id).await.stack()?;
		let destination = ChatPeerId::from(match real {
			true => source.channel_id,
			false => source.owner,
		});
		let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
		let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, Post> = BTreeMap::new();

		let mut builder = self.http_client.get(&source.url);
		if let Some(last_scrape) = last_scrape {
			builder = builder.header(LAST_MODIFIED, last_scrape.to_rfc2822());
		};
		let response = builder.send().await.stack()?;
		#[cfg(debug_assertions)]
		{
			use reqwest::header::{
				CACHE_CONTROL,
				EXPIRES,
			};
			let headers = response.headers();
			let expires = headers.get(EXPIRES);
			let cache = headers.get(CACHE_CONTROL);
			if expires.is_some() || cache.is_some() {
				println!("{} {} {:?} {:?} {:?}", Local::now().to_rfc2822(), &source.url, last_scrape, expires, cache);
			}
		}
259
260
261
262
263
264
265

266
267
268
269




270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296


297
298
299
300
301



302
303
304
305
306
307
308
234
235
236
237
238
239
240
241




242
243
244
245





246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264



265
266
267
268



269
270
271
272
273
274
275
276
277
278







+
-
-
-
-
+
+
+
+
-
-
-
-
-



















-
-
-
+
+


-
-
-
+
+
+







									} else {
										&dates[0]
									}
								},
								None => bail!("Feed item misses posting date."),
							}),
						}.stack()?;
						posts.insert(date, Post{
						let uri = link.to_string();
						let title = item.title().unwrap_or("").to_string();
						let authors = item.author().unwrap_or("").to_string();
						let summary = item.content().unwrap_or("").to_string();
							uri: link.to_string(),
							_title: item.title().unwrap_or("").to_string(),
							_authors: item.author().unwrap_or("").to_string(),
							_summary: item.content().unwrap_or("").to_string(),
						posts.insert(date, Post{
							uri,
							title,
							authors,
							summary,
						});
					}
				};
			},
			Err(err) => match err {
				rss::Error::InvalidStartTag => {
					match atom_syndication::Feed::read_from(&content[..]) {
						Ok(feed) => {
							for item in feed.entries() {
								let date = item.published()
									.stack_err("Feed item missing publishing date.")?;
								let uri = {
									let links = item.links();
									if links.is_empty() {
										bail!("Feed item missing post links.");
									} else {
										links[0].href().to_string()
									}
								};
								let title = item.title().to_string();
								let authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::<Vec<String>>().join(", ");
								let summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() };
								let _authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::<Vec<String>>().join(", ");
								let _summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() };
								posts.insert(*date, Post{
									uri,
									title,
									authors,
									summary,
									_title: item.title().to_string(),
									_authors,
									_summary,
								});
							};
						},
						Err(err) => {
							bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url)
						},
					}
316
317
318
319
320
321
322
323

324
325
326

327
328
329
330
331
332
333
334





335
336
337
338
339
340
341
342
343
344
345
346
347

348
349
350
351
352
353
354
355
356
357
358
359
360
361




362
363
364
365
366
367
368
369
370
371
372
373
374

375
376
377
378
379
380
381
382
383































































384
385
386





387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402





















403
404
405
406
407
408
409

























410
411
286
287
288
289
290
291
292

293
294
295

296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321

322
323
324
325
326
327
328
329
330
331
332
333



334
335
336
337


338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
















430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450







451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477







-
+


-
+








+
+
+
+
+












-
+











-
-
-
+
+
+
+
-
-











+









+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+



+
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+


				Some(ref x) => sedregex::ReplaceCommand::new(x).stack()?.execute(&post.uri),
				None => post.uri.clone().into(),
			};
			if ! conn.exists(&post_url, id).await.stack()? {
				if this_fetch.is_none() || *date > this_fetch.unwrap() {
					this_fetch = Some(*date);
				};
				self.send( match &source.iv_hash {
				self.tg.send(MyMessage::html_to(match &source.iv_hash {
					Some(hash) => format!("<a href=\"https://t.me/iv?url={post_url}&rhash={hash}\"> </a>{post_url}"),
					None => format!("{post_url}"),
				}, Some(destination), Some(ParseMode::Html)).await.stack()?;
				}, destination)).await.stack()?;
				conn.add_post(id, date, &post_url).await.stack()?;
				posted += 1;
			};
		};
		posts.clear();
		Ok(format!("Posted: {posted}"))
	}

	/// Determine the delay until the next scheduled fetch and spawn background checks for any overdue sources.
	///
	/// This scans the database queue, spawns background tasks to run checks for sources whose `next_fetch`
	/// is in the past (each task uses a Core clone with the appropriate owner), and computes the shortest
	/// duration until the next `next_fetch`.
	async fn autofetch(&self) -> Result<std::time::Duration> {
		let mut delay = chrono::Duration::minutes(1);
		let now = chrono::Local::now();
		let queue = {
			let mut conn = self.db.begin().await.stack()?;
			conn.get_queue().await.stack()?
		};
		for row in queue {
			if let Some(next_fetch) = row.next_fetch {
				if next_fetch < now {
					if let (Some(owner), Some(source_id), last_scrape) = (row.owner, row.source_id, row.last_scrape) {
						let clone = Core {
							owner_chat: ChatPeerId::from(owner),
							tg: self.tg.with_owner(owner),
							..self.clone()
						};
						let source = {
							let mut conn = self.db.begin().await.stack()?;
							match conn.get_one(owner, source_id).await {
								Ok(Some(source)) => source.to_string(),
								Ok(None) => "Source not found in database?".to_string(),
								Err(err) => format!("Failed to fetch source data:\n{err}"),
							}
						};
						smol::spawn(Compat::new(async move {
							if let Err(err) = clone.check(source_id, true, Some(last_scrape)).await {
								if let Err(err) = clone.send(&format!("🛑 {source}\n{}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await {
									eprintln!("Check error: {err}");
							if let Err(err) = clone.check(source_id, true, Some(last_scrape)).await
								&& let Err(err) = clone.tg.send(MyMessage::html(format!("🛑 {source}\n<pre>{}</pre>", &err.to_string()))).await
							{
								eprintln!("Check error: {err}");
									// clone.disable(&source_id, owner).await.unwrap();
								};
							};
						})).detach();
					}
				} else if next_fetch - now < delay {
					delay = next_fetch - now;
				}
			}
		};
		delay.to_std().stack()
	}

	/// Displays full list of managed channels for specified user
	pub async fn list (&self, owner: UserPeerId) -> Result<String> {
		let mut reply: Vec<String> = vec![];
		reply.push("Channels:".into());
		let mut conn = self.db.begin().await.stack()?;
		for row in conn.get_list(owner).await.stack()? {
			reply.push(row.to_string());
		};
		Ok(reply.join("\n\n"))
	}

	/// Returns current cached list of feed for requested user, or loads data from database
	pub async fn get_feeds (&self, owner: i64) -> Result<Arc<Mutex<FeedList>>> {
		let mut feeds = self.feeds.lock_arc().await;
		Ok(match feeds.get(&owner) {
			None => {
				let mut conn = self.db.begin().await.stack()?;
				let feed_list = conn.get_feeds(owner).await.stack()?;
				let mut map = BTreeMap::new();
				for feed in feed_list {
					map.insert(feed.source_id, feed.channel);
				};
				let res = Arc::new(Mutex::new(map));
				feeds.insert(owner, res.clone(), Duration::from_secs(60 * 60 * 3));
				res
			},
			Some(res) => res.clone(),
		})
	}

	/// Adds feed to cached list
	pub async fn add_feed (&self, owner: i64, source_id: i32, channel: String) -> Result<()> {
		let mut inserted = true;
		{
			let mut feeds = self.feeds.lock_arc().await;
			if let Some(feed) = feeds.get_mut(&owner) {
				let mut feed = feed.lock_arc().await;
				feed.insert(source_id, channel);
			} else {
				inserted = false;
			}
		}
		// in case insert failed - we miss the entry we needed to expand, reload everything from
		// database
		if !inserted {
			self.get_feeds(owner).await.stack()?;
		}
		Ok(())
	}

	/// Removes feed from cached list
	pub async fn rm_feed (&self, owner: i64, source_id: &i32) -> Result<()> {
		let mut dropped = false;
		{
			let mut feeds = self.feeds.lock_arc().await;
			if let Some(feed) = feeds.get_mut(&owner) {
				let mut feed = feed.lock_arc().await;
				feed.remove(source_id);
				dropped = true;
			}
		}
		// in case we failed to found feed we need to remove - just reload everything from database
		if !dropped {
			self.get_feeds(owner).await.stack()?;
		}
		Ok(())
	}

	pub async fn cb (&self, query: &CallbackQuery, cb: &str) -> Result<()> {
		let cb: Callback = toml::from_str(cb).stack()?;
		todo!();
		Ok(())
	}
}

impl UpdateHandler for Core {
	/// Dispatches an incoming Telegram update to a matching command handler and reports handler errors to the originating chat.
	///
	/// This method inspects the update; if it contains a message that can be parsed as a bot command,
	/// it executes the corresponding command handler. If the handler returns an error, the error text
	/// is sent back to the message's chat. Unknown commands produce an error which is also reported to the chat.
	async fn handle (&self, update: Update) {
		if let UpdateType::Message(msg) = update.update_type {
			if let Ok(cmd) = Command::try_from(*msg) {
				let msg = cmd.get_message();
				let words = cmd.get_args();
				let command = cmd.get_name();
				let res = match command {
					"/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, command, msg, words).await,
					"/start" => command::start(self, msg).await,
					"/list" => command::list(self, msg).await,
					"/add" | "/update" => command::update(self, command, msg, words).await,
					any => Err(anyhow!("Unknown command: {any}")),
				};
				if let Err(err) = res {
					if let Err(err2) = self.send(format!("\\#error\n```\n{err}\n```"),
						Some(msg.chat.get_id()),
	async fn handle (&self, update: Update) -> () {
		match update.update_type {
			UpdateType::Message(msg) => {
				if let Ok(cmd) = Command::try_from(*msg) {
					let msg = cmd.get_message();
					let words = cmd.get_args();
					let command = cmd.get_name();
					let res = match command {
						"/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, command, msg, words).await,
						"/start" => command::start(self, msg).await,
						"/list" => command::list(self, msg).await,
						"/test" => command::test(self, msg).await,
						"/add" | "/update" => command::update(self, command, msg, words).await,
						any => Err(anyhow!("Unknown command: {any}")),
					};
					if let Err(err) = res  {
						match validate(&err.to_string()) {
							Ok(text) => {
								if let Err(err2) = self.tg.send(MyMessage::html_to(
									format!("#error<pre>{}</pre>", text),
									msg.chat.get_id(),
						Some(ParseMode::MarkdownV2)
					).await{
						dbg!(err2);
					};
				}
			};
		};
								)).await {
									dbg!(err2);
								}
							},
							Err(err2) => {
								dbg!(err2);
							},
						}
					}
				} else {
					// not a command
				}
			},
			UpdateType::CallbackQuery(query) => {
				if let Some(ref cb) = query.data
					&& let Err(err) = self.cb(&query, cb).await
					&& let Err(err) = self.tg.answer_cb(query.id, err.to_string()).await
				{
					println!("{err:?}");
				}
			},
			_ => {
				println!("Unhandled UpdateKind:\n{update:?}")
			},
		}
	}
}
Modified src/main.rs from [527f3dd3d4] to [97fdb38ade].
1
2
3
4
5
6
7
8

9


10

11
12
13
14
15
16
17
18
19
20
21
22
23
24




25
26
27
28
29
30
31
32
33
34

35
36
37
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41

42
43
44
45








+

+
+

+














+
+
+
+









-
+



//! This is telegram bot to fetch RSS/ATOM feeds and post results on public
//! channels

#![warn(missing_docs)]

mod command;
mod core;
mod sql;
mod tg_bot;

use std::sync::Arc;

use async_compat::Compat;
use smol::lock::Mutex;
use stacked_errors::{
	Result,
	StackableErr,
};
use tgbot::handler::LongPoll;

fn main () -> Result<()> {
	smol::block_on(Compat::new(async {
		async_main().await.unwrap();
	}));

	Ok(())
}

/// Initialises configuration and the bot core, then runs the Telegram long-poll loop.
///
/// This function loads configuration (with a default API gateway), constructs the application
/// core, and starts the long-polling loop that handles incoming Telegram updates.
async fn async_main () -> Result<()> {
	let settings = config::Config::builder()
		.set_default("api_gateway", "https://api.telegram.org").stack()?
		.add_source(config::File::with_name("rsstg"))
		.build()
		.stack()?;

	let core = core::Core::new(settings).await.stack()?;

	LongPoll::new(core.tg.clone(), core).run().await;
	LongPoll::new(core.tg.client.clone(), core).run().await;

	Ok(())
}
Modified src/sql.rs from [33552cf8df] to [91f3b5a681].





1
2
3
4
5
6
7
8
9
10
11
12
13
14
1
2
3
4
5
6
7
8

9
10

11
12
13
14
15
16
17
+
+
+
+
+



-


-







use crate::{
	Arc,
	Mutex,
};

use std::{
	borrow::Cow,
	fmt,
	sync::Arc,
};

use smol::lock::Mutex;
use chrono::{
	DateTime,
	FixedOffset,
	Local,
};
use sqlx::{
	Postgres,
30
31
32
33
34
35
36
37

38
39
40
41
42
43

44
45
46

47
48
49
50







51
52
53
54
55
56
57
33
34
35
36
37
38
39

40
41
42
43
44
45

46
47
48

49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67







-
+





-
+


-
+




+
+
+
+
+
+
+







	pub url: String,
	pub iv_hash: Option<String>,
	pub url_re: Option<String>,
}

impl fmt::Display for List {
	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
		write!(f, "\\#feed\\_{} \\*️⃣ `{}` {}\n🔗 `{}`", self.source_id, self.channel,
		write!(f, "#feed_{} *️⃣ <code>{}</code> {}\n🔗 <code>{}</code>", self.source_id, self.channel,
			match self.enabled {
				true  => "🔄 enabled",
				false => "⛔ disabled",
			}, self.url)?;
		if let Some(iv_hash) = &self.iv_hash {
			write!(f, "\nIV: `{iv_hash}`")?;
			write!(f, "\nIV: <code>{iv_hash}</code>")?;
		}
		if let Some(url_re) = &self.url_re {
			write!(f, "\nRE: `{url_re}`")?;
			write!(f, "\nRE: <code>{url_re}</code>")?;
		}
		Ok(())
	}
}

/// One feed, used for caching and menu navigation
#[derive(sqlx::FromRow, Debug)]
pub struct Feed {
	pub source_id: i32,
	pub channel: String,
}

#[derive(sqlx::FromRow, Debug)]
pub struct Source {
	pub channel_id: i64,
	pub url: String,
	pub iv_hash: Option<String>,
	pub owner: i64,
146
147
148
149
150
151
152








153
154
155
156
157
158
159

160
161
162
163









164
165
166
167
168
169
170
171
172
173

174
175
176
177
178
179
180
181
182
183
184
185
186
187
188









189
190
191
192
193
194
195
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176

177




178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195

196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227







+
+
+
+
+
+
+
+






-
+
-
-
-
-
+
+
+
+
+
+
+
+
+









-
+















+
+
+
+
+
+
+
+
+







			.execute(&mut *self.0).await.stack()?.rows_affected() {
			1 => { Ok("Source enabled.") },
			0 => { Ok("Source not found.") },
			_ => { bail!("Database error.") },
		}
	}

	/// Checks whether a post with the given URL exists for the specified source.
	///
	/// # Parameters
	/// - `post_url`: The URL of the post to check.
	/// - `id`: The source identifier (converted to `i64`).
	///
	/// # Returns
	/// `true` if a post with the URL exists for the source, `false` otherwise.
	pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<bool>
	where I: Into<i64> {
		let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
			.bind(post_url)
			.bind(id.into())
			.fetch_one(&mut *self.0).await.stack()?;
		if let Some(exists) = row.try_get("exists").stack()? {
		row.try_get("exists")
			Ok(exists)
		} else {
			bail!("Database error: can't check whether source exists.");
		}
			.stack_err("Database error: can't check whether post exists.")
	}

	pub async fn get_feeds <I>(&mut self, owner: I) -> Result<Vec<Feed>>
	where I: Into<i64> {
		let block: Vec<Feed> = sqlx::query_as("select source_id, channel from rsstg_source where owner = $1 order by source_id")
			.bind(owner.into())
			.fetch_all(&mut *self.0).await.stack()?;
		Ok(block)
	}

	/// Get all pending events for (now + 1 minute)
	pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
		let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner, last_scrape from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
			.fetch_all(&mut *self.0).await.stack()?;
		Ok(block)
	}

	pub async fn get_list <I> (&mut self, owner: I) -> Result<Vec<List>>
	pub async fn get_list <I>(&mut self, owner: I) -> Result<Vec<List>>
	where I: Into<i64> {
		let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
			.bind(owner.into())
			.fetch_all(&mut *self.0).await.stack()?;
		Ok(source)
	}

	pub async fn get_one <I> (&mut self, owner: I, id: i32) -> Result<Option<List>>
	where I: Into<i64> {
		let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and source_id = $2")
			.bind(owner.into())
			.bind(id)
			.fetch_optional(&mut *self.0).await.stack()?;
		Ok(source)
	}

	pub async fn get_one_name <I> (&mut self, owner: I, name: &str) -> Result<Option<List>>
	where I: Into<i64> {
		let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and channel = $2")
			.bind(owner.into())
			.bind(name)
			.fetch_optional(&mut *self.0).await.stack()?;
		Ok(source)
	}

	pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
	where I: Into<i64> {
		let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
			.bind(id)
			.bind(owner.into())
			.fetch_one(&mut *self.0).await.stack()?;
221
222
223
224
225
226
227
228

229
230
231
232
233
234
235
253
254
255
256
257
258
259

260
261
262
263
264
265
266
267







-
+







				.bind(channel_id)
				.bind(url)
				.bind(iv_hash)
				.bind(owner.into())
				.bind(channel)
				.bind(url_re)
				.execute(&mut *self.0).await
			{
		{
			Ok(_) => Ok(match update {
				Some(_) => "Channel updated.",
				None => "Channel added.",
			}),
			Err(sqlx::Error::Database(err)) => {
				match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
					Some("_bt_check_unique", ) => {
Added src/tg_bot.rs version [186b21c68f].