Check-in [26339860ce]
Logged in as anonymous
Overview
Comment:rework time, rework autofetch, clippy lint
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 26339860ced242fe99222ccbe448ca4250846b35ff0f7f4e116752cc3a22cc2c
User & Date: arcade on 2022-02-13 19:57:55.915
Other Links: manifest | tags
Context
2022-02-15
14:56
simplify a little check-in: 093ae6c75b user: arcade tags: trunk
2022-02-13
19:57
rework time, rework autofetch, clippy lint check-in: 26339860ce user: arcade tags: trunk
12:26
implify, clippy lints check-in: f988dfd28f user: arcade tags: trunk
Changes
1
2
3
4

5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

34
35
36
37
38
39
40
41
use anyhow::{bail, Context, Result};
use crate::core::Core;
use regex::Regex;
use sedregex::ReplaceCommand;


lazy_static! {
	static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap();
	static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap();
	static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap();
}

pub async fn start(core: &Core, sender: telegram_bot::UserId) -> Result<()> {
	core.send("We are open. Probably. Visit [channel](https://t.me/rsstg_bot_help/3) for details.", Some(sender), None)?;
	Ok(())
}

pub async fn list(core: &Core, sender: telegram_bot::UserId) -> Result<()> {
	core.send(core.list(sender).await?, Some(sender), Some(telegram_bot::types::ParseMode::MarkdownV2))?;
	Ok(())
}

pub async fn command(core: &Core, sender: telegram_bot::UserId, command: Vec<&str>) -> Result<()> {
	core.send( match &command[1].parse::<i32>() {
		Err(err) => format!("I need a number.\n{}", &err).into(),
		Ok(number) => match command[0] {
			"/check" => core.check(number, sender, false).await
				.context("Channel check failed.")?,
			"/clean" => core.clean(number, sender).await?,
			"/enable" => core.enable(number, sender).await?.into(),
			"/delete" => core.delete(number, sender).await?,
			"/disable" => core.disable(number, sender).await?.into(),
			_ => bail!("Command {} not handled.", &command[0]),
		},

	}, Some(sender), None)?;
	Ok(())
}

pub async fn update(core: &Core, sender: telegram_bot::UserId, command: Vec<&str>) -> Result<()> {
	let mut source_id: Option<i32> = None;
	let at_least = "Requires at least 3 parameters.";
	let first_word = command[0];




>








|




|




|










>
|







1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
use anyhow::{bail, Context, Result};
use crate::core::Core;
use regex::Regex;
use sedregex::ReplaceCommand;
use std::borrow::Cow;

lazy_static! {
	static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap();
	static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap();
	static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap();
}

pub async fn start(core: &Core, sender: telegram_bot::UserId) -> Result<()> {
	core.send("We are open. Probably. Visit [channel](https://t.me/rsstg_bot_help/3) for details.", Some(sender), None).await?;
	Ok(())
}

pub async fn list(core: &Core, sender: telegram_bot::UserId) -> Result<()> {
	core.send(core.list(sender).await?, Some(sender), Some(telegram_bot::types::ParseMode::MarkdownV2)).await?;
	Ok(())
}

pub async fn command(core: &Core, sender: telegram_bot::UserId, command: Vec<&str>) -> Result<()> {
	let msg: Cow<str> = match &command[1].parse::<i32>() {
		Err(err) => format!("I need a number.\n{}", &err).into(),
		Ok(number) => match command[0] {
			"/check" => core.check(number, sender, false).await
				.context("Channel check failed.")?,
			"/clean" => core.clean(number, sender).await?,
			"/enable" => core.enable(number, sender).await?.into(),
			"/delete" => core.delete(number, sender).await?,
			"/disable" => core.disable(number, sender).await?.into(),
			_ => bail!("Command {} not handled.", &command[0]),
		},
	};
	core.send(msg, Some(sender), None).await?;
	Ok(())
}

pub async fn update(core: &Core, sender: telegram_bot::UserId, command: Vec<&str>) -> Result<()> {
	let mut source_id: Option<i32> = None;
	let at_least = "Requires at least 3 parameters.";
	let first_word = command[0];
97
98
99
100
101
102
103
104
105
106
		};
		if admin.user.id == sender {
			user = true;
		};
	};
	if ! me   { bail!("I need to be admin on that channel."); };
	if ! user { bail!("You should be admin on that channel."); };
	core.send(core.update(source_id, channel, channel_id, url, iv_hash, url_re, sender.into()).await?, Some(sender), None)?;
	Ok(())
}







|


99
100
101
102
103
104
105
106
107
108
		};
		if admin.user.id == sender {
			user = true;
		};
	};
	if ! me   { bail!("I need to be admin on that channel."); };
	if ! user { bail!("You should be admin on that channel."); };
	core.send(core.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await?, Some(sender), None).await?;
	Ok(())
}
40
41
42
43
44
45
46

47

48
49
50





51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
				.connect_timeout(std::time::Duration::new(300, 0))
				.idle_timeout(std::time::Duration::new(60, 0))
				.connect_lazy(&settings.get_str("pg")?)?,
			sources: Arc::new(Mutex::new(HashSet::new())),
		});
		let clone = core.clone();
		tokio::spawn(async move {

			if let Err(err) = &clone.autofetch().await {

				if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None) {
					eprintln!("Autofetch error: {}", err);
				};





			}
		});
		Ok(core)
	}

	pub fn stream(&self) -> telegram_bot::UpdatesStream {
		self.tg.stream()
	}

	pub fn send<'a, S>(&self, msg: S, target: Option<telegram_bot::UserId>, parse_mode: Option<telegram_bot::types::ParseMode>) -> Result<()>
	where S: Into<Cow<'a, str>> {
		let msg = msg.into();

		let parse_mode = match parse_mode {
			Some(mode) => mode,
			None => telegram_bot::types::ParseMode::Html,
		};
		self.tg.spawn(telegram_bot::SendMessage::new(match target {
			Some(user) => user,
			None => self.owner_chat,
		}, msg).parse_mode(parse_mode));
		Ok(())
	}

	pub async fn check<S>(&self, id: &i32, owner: S, real: bool) -> Result<Cow<'_, str>>
	where S: Into<i64> {
		let owner = owner.into();








>
|
>
|
|
|
>
>
>
>
>









|



<
<
|
<
<
<
|
|







40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70


71



72
73
74
75
76
77
78
79
80
				.connect_timeout(std::time::Duration::new(300, 0))
				.idle_timeout(std::time::Duration::new(60, 0))
				.connect_lazy(&settings.get_str("pg")?)?,
			sources: Arc::new(Mutex::new(HashSet::new())),
		});
		let clone = core.clone();
		tokio::spawn(async move {
			loop {
				let delay = match &clone.autofetch().await {
					Err(err) => {
						if let Err(err) = clone.send(format!("🛑 {:?}", err), None, None).await {
							eprintln!("Autofetch error: {}", err);
						};
						tokio::time::Duration::from_secs(60)
					},
					Ok(time) => *time,
				};
				tokio::time::sleep(delay).await;
			}
		});
		Ok(core)
	}

	pub fn stream(&self) -> telegram_bot::UpdatesStream {
		self.tg.stream()
	}

	pub async fn send<'a, S>(&self, msg: S, target: Option<telegram_bot::UserId>, mode: Option<telegram_bot::types::ParseMode>) -> Result<()>
	where S: Into<Cow<'a, str>> {
		let msg = msg.into();



		let mode = mode.unwrap_or(telegram_bot::types::ParseMode::Html);



		let target = target.unwrap_or(self.owner_chat);
		self.tg.send(telegram_bot::SendMessage::new(target, msg).parse_mode(mode)).await?;
		Ok(())
	}

	pub async fn check<S>(&self, id: &i32, owner: S, real: bool) -> Result<Cow<'_, str>>
	where S: Into<i64> {
		let owner = owner.into();

119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
					for item in feed.items() {
						if let Some(link) = item.link() {
							let date = match item.pub_date() {
								Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
								None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
							}?;
							let url = link;
							posts.insert(date, url.into());
						}
					};
				},
				Err(err) => match err {
					rss::Error::InvalidStartTag => {
						let feed = atom_syndication::Feed::read_from(&content[..])
							.with_context(|| format!("Problem opening feed url:\n{}\n{}", &url, status))?;
						for item in feed.entries() {
							let date = item.published().unwrap();
							let url = item.links()[0].href();
							posts.insert(*date, url.into());
						};
					},
					rss::Error::Eof => (),
					_ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &url, err, status)
				}
			};
			for (date, url) in posts.iter() {







|










|







121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
					for item in feed.items() {
						if let Some(link) = item.link() {
							let date = match item.pub_date() {
								Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
								None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
							}?;
							let url = link;
							posts.insert(date, url.to_string());
						}
					};
				},
				Err(err) => match err {
					rss::Error::InvalidStartTag => {
						let feed = atom_syndication::Feed::read_from(&content[..])
							.with_context(|| format!("Problem opening feed url:\n{}\n{}", &url, status))?;
						for item in feed.entries() {
							let date = item.published().unwrap();
							let url = item.links()[0].href();
							posts.insert(*date, url.to_string());
						};
					},
					rss::Error::Eof => (),
					_ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &url, err, status)
				}
			};
			for (date, url) in posts.iter() {
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
			.rows_affected() {
			1 => { Ok("Source disabled.") },
			0 => { Ok("Source not found.") },
			_ => { Err(anyhow!("Database error.")) },
		}
	}

	pub async fn update(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: i64) -> Result<&str> {
	//where S: Into<i64> {
		//let owner = owner.into();

		let mut conn = self.pool.acquire().await
			.with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;

		match match update {
				Some(id) => {
					sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1").bind(id)







|
|
|







253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
			.rows_affected() {
			1 => { Ok("Source disabled.") },
			0 => { Ok("Source not found.") },
			_ => { Err(anyhow!("Database error.")) },
		}
	}

	pub async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result<&str>
	where S: Into<i64> {
		let owner = owner.into();

		let mut conn = self.pool.acquire().await
			.with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;

		match match update {
				Some(id) => {
					sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1").bind(id)
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
			},
			Err(err) => {
				bail!("Sorry, unknown error:\n{:#?}\n", err);
			},
		}
	}

	async fn autofetch(&self) -> Result<()> {
		let mut delay = chrono::Duration::minutes(1);
		let mut now;
		loop {
			let mut conn = self.pool.acquire().await
				.with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?;
			now = chrono::Local::now();
			let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
				.fetch_all(&mut conn).await?;
			for row in queue.iter() {
				let source_id: i32 = row.try_get("source_id")?;
				let owner: i64 = row.try_get("owner")?;
				let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
				if next_fetch < now {
					let clone = Core {
						owner_chat: telegram_bot::UserId::new(owner),
						..self.clone()
					};
					tokio::spawn(async move {
						if let Err(err) = clone.check(&source_id, owner, true).await {
							if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None) {
								eprintln!("Check error: {}", err);
							};
						};
					});
				} else if next_fetch - now < delay {
					delay = next_fetch - now;
				}
			};
			queue.clear();
			tokio::time::sleep(delay.to_std()?).await;
			delay = chrono::Duration::minutes(1);
		}
	}

	pub async fn list<S>(&self, owner: S) -> Result<String>
	where S: Into<i64> {
		let owner = owner.into();

		let mut reply: Vec<Cow<str>> = vec![];







|

<
<
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
<
<







298
299
300
301
302
303
304
305
306


307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333


334
335
336
337
338
339
340
			},
			Err(err) => {
				bail!("Sorry, unknown error:\n{:#?}\n", err);
			},
		}
	}

	async fn autofetch(&self) -> Result<std::time::Duration> {
		let mut delay = chrono::Duration::minutes(1);


		let mut conn = self.pool.acquire().await
			.with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?;
		let now = chrono::Local::now();
		let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
			.fetch_all(&mut conn).await?;
		for row in queue.iter() {
			let source_id: i32 = row.try_get("source_id")?;
			let owner: i64 = row.try_get("owner")?;
			let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
			if next_fetch < now {
				let clone = Core {
					owner_chat: telegram_bot::UserId::new(owner),
					..self.clone()
				};
				tokio::spawn(async move {
					if let Err(err) = clone.check(&source_id, owner, true).await {
						if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None).await {
							eprintln!("Check error: {}", err);
						};
					};
				});
			} else if next_fetch - now < delay {
				delay = next_fetch - now;
			}
		};
		queue.clear();
		Ok(delay.to_std()?)


	}

	pub async fn list<S>(&self, owner: S) -> Result<String>
	where S: Into<i64> {
		let owner = owner.into();

		let mut reply: Vec<Cow<str>> = vec![];
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
	let mut reply_to: Option<telegram_bot::UserId>;

	loop {
		reply_to = None;
		match stream.next().await {
			Some(update) => {
				if let Err(err) = handle(update?, &core, &reply_to).await {
					core.send(&format!("🛑 {:?}", err), reply_to, None)?;
				};
			},
			None => {
				core.send("🛑 None error.".to_string(), None, None)?;
			}
		};
	}
}

async fn handle(update: telegram_bot::Update, core: &core::Core, mut _reply_to: &Option<telegram_bot::UserId>) -> Result<()> {
	if let telegram_bot::UpdateKind::Message(message) = update.kind {
		if let telegram_bot::MessageKind::Text { ref data, .. } = message.kind {
			let sender = message.from.id;
			let words: Vec<&str> = data.split_whitespace().collect();
			match match words[0] {
				"/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(core, sender, words).await,
				"/start" => command::start(core, sender).await,
				"/list" => command::list(core, sender).await,
				"/add" | "/update" => command::update(core, sender, words).await,
				_ => Ok(()),
			} {
				Err(err) => core.send(&format!("🛑 {:?}", err), Some(sender), None)?,
				Ok(()) => {},
			};
		};
	};

	Ok(())
}







|



|

















|







20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
	let mut reply_to: Option<telegram_bot::UserId>;

	loop {
		reply_to = None;
		match stream.next().await {
			Some(update) => {
				if let Err(err) = handle(update?, &core, &reply_to).await {
					core.send(&format!("🛑 {:?}", err), reply_to, None).await?;
				};
			},
			None => {
				core.send("🛑 None error.", None, None).await?;
			}
		};
	}
}

async fn handle(update: telegram_bot::Update, core: &core::Core, mut _reply_to: &Option<telegram_bot::UserId>) -> Result<()> {
	if let telegram_bot::UpdateKind::Message(message) = update.kind {
		if let telegram_bot::MessageKind::Text { ref data, .. } = message.kind {
			let sender = message.from.id;
			let words: Vec<&str> = data.split_whitespace().collect();
			match match words[0] {
				"/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(core, sender, words).await,
				"/start" => command::start(core, sender).await,
				"/list" => command::list(core, sender).await,
				"/add" | "/update" => command::update(core, sender, words).await,
				_ => Ok(()),
			} {
				Err(err) => core.send(format!("🛑 {:?}", err), Some(sender), None).await?,
				Ok(()) => {},
			};
		};
	};

	Ok(())
}