Diff
Logged in as anonymous

Differences From Artifact [efefd7f7a3]:

To Artifact [caeb3fb50c]:


19
20
21
22
23
24
25
26
27
28
29










30
31
32
33

34
35
36
37
38



39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57




58
59
60
61
62
63
64
65

66
67

68
69
70
71
72
73
74
75

76
77
78
79
80
81
82
83
84

85
86

87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

104

105
106
107
108
109
110
111
19
20
21
22
23
24
25




26
27
28
29
30
31
32
33
34
35




36





37
38
39
40

41
42
43
44
45
46
47
48
49
50
51
52
53




54
55
56
57
58
59
60
61
62
63
64

65
66

67
68
69
70
71
72
73
74

75

76
77
78
79
80
81
82

83
84

85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

104
105
106
107
108
109
110
111







-
-
-
-
+
+
+
+
+
+
+
+
+
+
-
-
-
-
+
-
-
-
-
-
+
+
+

-













-
-
-
-
+
+
+
+







-
+

-
+







-
+
-







-
+

-
+

















+
-
+







use anyhow::{
	anyhow,
	bail,
	Result,
};
use async_std::task;
use chrono::DateTime;
use frankenstein::{
	AsyncTelegramApi,
	Error as FrankError,
	ParseMode,
use tgbot::{
	api::Client,
	handler::UpdateHandler,
	types::{
		Bot,
		ChatPeerId,
		Command,
		GetBot,
		Message,
		ParseMode,
	client_reqwest::Bot,
	methods::{
		GetUpdatesParams,
		SendMessageParams
		SendMessage,
	},
	types::{
		AllowedUpdate,
		MessageEntityType,
		User,
		Update,
		UpdateType,
		UserPeerId,
	},
	updates::UpdateContent,
};
use thiserror::Error;

#[derive(Error, Debug)]
pub enum RssError {
	// #[error(transparent)]
	// Tg(#[from] TgError),
	#[error(transparent)]
	Int(#[from] TryFromIntError),
}

#[derive(Clone)]
pub struct Core {
	owner_chat: i64,
	max_delay: u16,
	pub tg: Bot,
	pub me: User,
	owner_chat: ChatPeerId,
	// max_delay: u16,
	pub tg: Client,
	pub me: Bot,
	pub db: Db,
	sources: Arc<Mutex<HashSet<Arc<i32>>>>,
	http_client: reqwest::Client,
}

impl Core {
	pub async fn new(settings: config::Config) -> Result<Core> {
		let owner_chat = settings.get_int("owner")?;
		let owner_chat = ChatPeerId::from(settings.get_int("owner")?);
		let api_key = settings.get_string("api_key")?;
		let tg = Bot::new(&api_key);
		let tg = Client::new(&api_key)?;

		let mut client = reqwest::Client::builder();
		if let Ok(proxy) = settings.get_string("proxy") {
			let proxy = reqwest::Proxy::all(proxy)?;
			client = client.proxy(proxy);
		}
		let http_client = client.build()?;
		let me = tg.get_me().await?;
		let me = tg.execute(GetBot).await?;
		let me = me.result;
		let core = Core {
			tg,
			me,
			owner_chat,
			db: Db::new(&settings.get_string("pg")?)?,
			sources: Arc::new(Mutex::new(HashSet::new())),
			http_client,
			max_delay: 60,
			// max_delay: 60,
		};
		let mut clone = core.clone();
		let clone = core.clone();
		task::spawn(async move {
			loop {
				let delay = match &clone.autofetch().await {
					Err(err) => {
						if let Err(err) = clone.send(format!("šŸ›‘ {err:?}"), None, None).await {
							eprintln!("Autofetch error: {err:?}");
						};
						std::time::Duration::from_secs(60)
					},
					Ok(time) => *time,
				};
				task::sleep(delay).await;
			}
		});
		Ok(core)
	}

	/*
	pub async fn stream(&mut self) -> Result<()> {
	pub async fn stream(&self) -> Result<()> {
		let mut offset: i64 = 0;
		let mut params = GetUpdatesParams {
			offset: None,
			limit: Some(100),
			timeout: Some(300),
			allowed_updates: Some(vec![AllowedUpdate::Message]),
		};
149
150
151
152
153
154
155

156
157

158
159
160
161
162

163

164
165
166

167
168
169

170
171
172
173
174
175
176
177
178
179
180


181
182
183
184
185
186
187
188
189
190
191
192
193
194

195
196
197
198
199
200

201
202
203

204
205
206
207
208
209
210
211

212
213

214
215
216

217
218
219
220
221
222
223
149
150
151
152
153
154
155
156
157

158
159
160
161
162
163
164

165



166



167











168
169














170
171
172
173
174
175

176
177
178

179
180
181
182
183
184
185
186

187
188

189
190
191

192
193
194
195
196
197
198
199







+

-
+





+
-
+
-
-
-
+
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+





-
+


-
+







-
+

-
+


-
+







							};
						};
					};
				};
			}
		}
	}
	*/

	pub async fn send <S>(&self, msg: S, target: Option<i64>, mode: Option<ParseMode>) -> Result<()>
	pub async fn send <S>(&self, msg: S, target: Option<ChatPeerId>, mode: Option<ParseMode>) -> Result<Message>
	where S: Into<String> {
		let msg = msg.into();

		let mode = mode.unwrap_or(ParseMode::Html);
		let target = target.unwrap_or(self.owner_chat);
		Ok(self.tg.execute(
		let send = SendMessageParams::builder()
			SendMessage::new(target, msg)
			.chat_id(target)
			.text(msg)
			.parse_mode(mode)
				.with_parse_mode(mode)
			.build();
		loop {
			match self.tg.send_message(&send).await {
		).await?)
				Ok(_) => break,
				Err(err) => match err {
					FrankError::Api(ref resp) => {
						if resp.error_code == 429 {
							let mut my_delay = self.max_delay;
							if let Some(params) = resp.parameters {
								if let Some(delay) = params.retry_after {
									if delay < my_delay {
										my_delay = delay;
									}
								}
	}

							}
							task::sleep(std::time::Duration::from_secs(my_delay.into())).await;
						} else {
							return Err(err.into());
						}
					},
					_ => return Err(err.into()),
				},
			}
		}
		Ok(())
	}

	pub async fn check (&mut self, id: &i32, owner: i64, real: bool) -> Result<String> {
	pub async fn check (&self, id: i32, real: bool) -> Result<String> {
		let mut posted: i32 = 0;
		let mut conn = self.db.begin().await?;

		let id = {
			let mut set = self.sources.lock().unwrap();
			match set.get(id) {
			match set.get(&id) {
				Some(id) => id.clone(),
				None => {
					let id = Arc::new(*id);
					let id = Arc::new(id);
					set.insert(id.clone());
					id.clone()
				},
			}
		};
		let count = Arc::strong_count(&id);
		if count == 2 {
			let source = conn.get_source(*id, owner).await?;
			let source = conn.get_source(*id, self.owner_chat).await?;
			conn.set_scrape(*id).await?;
			let destination = match real {
			let destination = ChatPeerId::from(match real {
				true => source.channel_id,
				false => source.owner,
			};
			});
			let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
			let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();

			let response = self.http_client.get(&source.url).send().await?;
			let status = response.status();
			let content = response.bytes().await?;
			match rss::Channel::read_from(&content[..]) {
272
273
274
275
276
277
278
279

280
281

282
283




284
285
286
287
288


289
290
291
292

293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308

309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327


























248
249
250
251
252
253
254

255
256
257
258


259
260
261
262
263
264
265


266
267
268
269
270

271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286

287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332







-
+


+
-
-
+
+
+
+



-
-
+
+



-
+















-
+



















+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
				posted += 1;
			};
			posts.clear();
		};
		Ok(format!("Posted: {posted}"))
	}

	async fn autofetch(&mut self) -> Result<std::time::Duration> {
	async fn autofetch(&self) -> Result<std::time::Duration> {
		let mut delay = chrono::Duration::minutes(1);
		let now = chrono::Local::now();
		let queue = {
		let mut conn = self.db.begin().await?;
		for row in conn.get_queue().await? {
			let mut conn = self.db.begin().await?;
			conn.get_queue().await?
		};
		for row in queue {
			if let Some(next_fetch) = row.next_fetch {
				if next_fetch < now {
					if let (Some(owner), Some(source_id)) = (row.owner, row.source_id) {
						let mut clone = Core {
							owner_chat: owner,
						let clone = Core {
							owner_chat: ChatPeerId::from(owner),
							..self.clone()
						};
						task::spawn(async move {
							if let Err(err) = clone.check(&source_id, owner, true).await {
							if let Err(err) = clone.check(source_id, true).await {
								if let Err(err) = clone.send(&format!("šŸ›‘ {err:?}"), None, None).await {
									eprintln!("Check error: {err:?}");
									// clone.disable(&source_id, owner).await.unwrap();
								};
							};
						});
					}
				} else if next_fetch - now < delay {
					delay = next_fetch - now;
				}
			}
		};
		Ok(delay.to_std()?)
	}

	pub async fn list (&mut self, owner: i64) -> Result<String> {
	pub async fn list (&self, owner: UserPeerId) -> Result<String> {
		let mut reply: Vec<Cow<str>> = vec![];
		reply.push("Channels:".into());
		let mut conn = self.db.begin().await?;
		for row in conn.get_list(owner).await? {
			reply.push(format!("\n\\#ļøāƒ£ {} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", row.source_id, row.channel,
				match row.enabled {
					true  => "šŸ”„ enabled",
					false => "ā›” disabled",
				}, row.url).into());
			if let Some(hash) = &row.iv_hash {
				reply.push(format!("IV: `{hash}`").into());
			}
			if let Some(re) = &row.url_re {
				reply.push(format!("RE: `{re}`").into());
			}
		};
		Ok(reply.join("\n"))
	}
}

impl UpdateHandler for Core {
	async fn handle (&self, update: Update) {
		if let UpdateType::Message(msg) = update.update_type {
			if let Ok(cmd) = Command::try_from(msg) {
				let msg = cmd.get_message();
				let words = cmd.get_args();
				let res = match cmd.get_name() {
					"/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, msg, words).await,
					"/start" => command::start(self, msg).await,
					"/list" => command::list(self, msg).await,
					"/add" | "/update" => command::update(self, msg, words).await,
					any => Err(anyhow!("Unknown command: {any}")),
				};
				if let Err(err) = res {
					if let Err(err2) = self.send(format!("\\#error\n```\n{err:?}\n```"),
						Some(msg.chat.get_id()),
						Some(ParseMode::MarkdownV2)
					).await{
						dbg!(err2);
					};
				}
			};
		};
	}
}