Check-in [423cadd9c7]
Logged in as anonymous
Overview
Comment:0.1.8: rewrite error handling
Downloads: Tarball | ZIP archive | SQL archive
Timelines: family | ancestors | descendants | both | trunk
Files: files | file ages | folders
SHA3-256: 423cadd9c7567aad4742289bda96d6ff991acc3b6010c25c6520198113dfca6c
User & Date: arcade on 2020-11-27 07:18:32.806
Other Links: manifest | tags
Context
2020-11-27
19:29
0.1.9: add ownership checks, rewrite error handling check-in: ebe7c281a5 user: arcade tags: trunk
07:18
0.1.8: rewrite error handling check-in: 423cadd9c7 user: arcade tags: trunk
2020-11-26
21:02
0.1.7: I'm getting better at error handling + owner checks to enable/disable check-in: 4acdad1942 user: arcade tags: trunk
Changes
1
2
3

4
5
6
7
8
9
10
1
2

3
4
5
6
7
8
9
10


-
+







[package]
name = "rsstg"
version = "0.1.7"
version = "0.1.8"
authors = ["arcade"]
edition = "2018"

[dependencies]
chrono = "*"
config = "*"
futures = "*"
1
2
3
4
5

6

7
8
9
10
11
12

13
14
15
16

17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
1
2
3
4
5
6
7
8
9
10
11
12

13
14
15
16
17

18
19
20
21
22
23
24


25
26
27
28
29
30
31





+

+




-

+



-
+






-
-







use std::collections::BTreeMap;

use config;

use tokio;

use rss;

use chrono::DateTime;

use regex::Regex;

use tokio::stream::StreamExt;
use telegram_bot::*;
use tokio::stream::StreamExt;

use sqlx::postgres::PgPoolOptions;
use sqlx::Row;
use sqlx::Done;
use sqlx::Done; // .rows_affected()

#[macro_use]
extern crate lazy_static;

use anyhow::{anyhow, Context, Result};

//type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

#[derive(Clone)]
struct Core {
	owner: i64,
	api_key: String,
	owner_chat: UserId,
	tg: telegram_bot::Api,
	my: User,
44
45
46
47
48
49
50
51
52
53
54

55
56



57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

73
74

75
76
77
78
79



80
81

82
83
84
85
86
87
88
89
90
91
92
93
94











95
96

97
98
99
100
101
102
103
104
105
106
107
108










109
110

111
112
113
114
115
116




117
118

119
120
121
122
123
124




125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147














148
149
150

151
152

153
154

155
156
157

158
159

160
161
162
163
164



165
166

167
168
169
170
171
172
173
174
175
176
177
178
179
180

181
182

183
184
185
186
187



188
189

190
191
192
193
194
195
196
197
198
199
200
201
44
45
46
47
48
49
50

51
52
53
54


55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72

73


74





75
76
77


78













79
80
81
82
83
84
85
86
87
88
89


90












91
92
93
94
95
96
97
98
99
100


101






102
103
104
105


106






107
108
109
110























111
112
113
114
115
116
117
118
119
120
121
122
123
124



125


126


127



128


129





130
131
132


133









134
135
136
137

138


139





140
141
142


143





144
145
146
147
148
149
150







-



+
-
-
+
+
+















-
+
-
-
+
-
-
-
-
-
+
+
+
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
-
-
+
-
-
-
-
-
-
+
+
+
+
-
-
+
-
-
-
-
-
-
+
+
+
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-
-
+
-
-
+
-
-
+
-
-
-
+
-
-
+
-
-
-
-
-
+
+
+
-
-
+
-
-
-
-
-
-
-
-
-




-
+
-
-
+
-
-
-
-
-
+
+
+
-
-
+
-
-
-
-
-







			tg: tg,
			owner_chat: UserId::new(owner),
			pool: PgPoolOptions::new()
				.max_connections(5)
				.connect_timeout(std::time::Duration::new(300, 0))
				.idle_timeout(std::time::Duration::new(60, 0))
				.connect_lazy(&settings.get_str("pg")?)?,
				//.connect(&settings.get_str("pg")?).await?,
		};
		let clone = core.clone();
		tokio::spawn(async move {
			if let Err(err) = &clone.autofetch().await {
			if let Err(err) = clone.autofetch().await {
				eprintln!("connection error: {}", err);
				if let Err(err) = clone.debug(&err.to_string()) {
					eprintln!("Autofetch error: {}", err);
				};
			}
		});
		Ok(core)
	}

	fn stream(&self) -> telegram_bot::UpdatesStream {
		self.tg.stream()
	}

	fn debug(&self, msg: &str) -> Result<()> {
		self.tg.spawn(SendMessage::new(self.owner_chat, msg));
		Ok(())
	}

	async fn check(&self, id: &i32, real: bool) -> Result<()> {
		match self.pool.acquire().await {
		let mut conn = self.pool.acquire().await
			Err(err) => {
				self.debug(&format!("πŸ›‘ Query queue fetch conn:\n{}\n{:?}", &err, &self.pool))?;
			.with_context(|| format!("πŸ›‘ Query queue fetch conn:\n{:?}", &self.pool))?;
			},
			Ok(mut conn) => {
				match sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1")
					.bind(id)
					.fetch_one(&mut conn).await {
		let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1")
			.bind(id)
			.fetch_one(&mut conn).await
					Err(err) => {
						self.debug(&format!("πŸ›‘ Query queue:\n{}\n{:?}", &err, &conn))?;
			.with_context(|| format!("πŸ›‘ Query source:\n{:?}", &self.pool))?;
					},
					Ok(row) => {
						drop(conn);
						let channel_id: i64 = row.try_get("channel_id")?;
						let destination = match real {
							true => UserId::new(channel_id),
							false => UserId::new(row.try_get("owner")?),
						};
						let url: &str = row.try_get("url")?;
						let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
						let iv_hash: Option<&str> = row.try_get("iv_hash")?;
						let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
						match rss::Channel::from_url(url) {
		drop(conn);
		let channel_id: i64 = row.try_get("channel_id")?;
		let destination = match real {
			true => UserId::new(channel_id),
			false => UserId::new(row.try_get("owner")?),
		};
		let url: &str = row.try_get("url")?;
		let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
		let iv_hash: Option<&str> = row.try_get("iv_hash")?;
		let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
		let feed = rss::Channel::from_url(url)
							Err(err) => {
								self.debug(&format!("πŸ›‘ Problem opening feed url:\n{}\n{}", &url, &err))?;
			.with_context(|| format!("πŸ›‘ Problem opening feed url:\n{}", &url))?;
							},
							Ok(feed) => {
								for item in feed.items() {
									let date = match item.pub_date() {
										Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
										None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
									}?;
									let url = item.link().unwrap().to_string();
									posts.insert(date.clone(), url.clone());
								};
								for (date, url) in posts.iter() {
									match self.pool.acquire().await {
		for item in feed.items() {
			let date = match item.pub_date() {
				Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
				None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
			}?;
			let url = item.link().unwrap().to_string();
			posts.insert(date.clone(), url.clone());
		};
		for (date, url) in posts.iter() {
			let mut conn = self.pool.acquire().await
										Err(err) => {
											self.debug(&format!("πŸ›‘ Check post fetch conn:\n{}\n{:?}", &err, &self.pool))?;
				.with_context(|| format!("πŸ›‘ Check post fetch conn:\n{:?}", &self.pool))?;
										},
										Ok(mut conn) => {
											match sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
												.bind(&url)
												.bind(id)
												.fetch_one(&mut conn).await {
			let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
				.bind(&url)
				.bind(id)
				.fetch_one(&mut conn).await
												Err(err) => {
													self.debug(&format!("πŸ›‘ Check post:\n{}\n{:?}", &err, &conn))?;
				.with_context(|| format!("πŸ›‘ Check post:\n{:?}", &conn))?;
												},
												Ok(row) => {
													let exists: bool = row.try_get("exists")?;
													if ! exists {
														if this_fetch == None || *date > this_fetch.unwrap() {
															this_fetch = Some(*date);
			let exists: bool = row.try_get("exists")?;
			if ! exists {
				if this_fetch == None || *date > this_fetch.unwrap() {
					this_fetch = Some(*date);
														}
														match self.tg.send( match iv_hash {
																Some(x) => SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", url, x)),
																None => SendMessage::new(destination, format!("{}", url)),
															}.parse_mode(types::ParseMode::Html)).await {
															Err(err) => {
																self.debug(&format!("πŸ›‘ Can't post message:\n{}", &err))?;
															},
															Ok(_) => {
																match sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
																	.bind(id)
																	.bind(date)
																	.bind(url)
																	.execute(&mut conn).await {
																		Ok(_) => {},
																		Err(err) => {
																			self.debug(&format!("πŸ›‘Rrecord post:\n{}\n{:?}", &err, &conn))?;
																		},
																};
															},
														};
														drop(conn);
														tokio::time::delay_for(std::time::Duration::new(4, 0)).await;
				};
				self.tg.send( match iv_hash {
						Some(x) => SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", url, x)),
						None => SendMessage::new(destination, format!("{}", url)),
					}.parse_mode(types::ParseMode::Html)).await
					.context("πŸ›‘ Can't post message:")?;
				sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
					.bind(id)
					.bind(date)
					.bind(url)
					.execute(&mut conn).await
					.with_context(|| format!("πŸ›‘Record post:\n{:?}", &conn))?;
				drop(conn);
				tokio::time::delay_for(std::time::Duration::new(4, 0)).await;
													}
												},
											};
			};
										}
									};
		};
								};
								posts.clear();
		posts.clear();
							},
						};
						match self.pool.acquire().await {
		let mut conn = self.pool.acquire().await
							Err(err) => {
								self.debug(&format!("πŸ›‘ Update scrape fetch conn:\n{}\n{:?}", &err, &self.pool))?;
			.with_context(|| format!("πŸ›‘ Update scrape fetch conn:\n{:?}", &self.pool))?;
							},
							Ok(mut conn) => {
								match sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
									.bind(id)
									.execute(&mut conn).await {
		sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
			.bind(id)
			.execute(&mut conn).await
									Err(err) => {
										self.debug(&format!("πŸ›‘ Update scrape:\n{}\n{:?}", &err, &conn))?;
			.with_context(|| format!("πŸ›‘ Update scrape:\n{:?}", &conn))?;
									},
									Ok(_) => {},
								};
							},
						};
					},
				};
			},
		};
		Ok(())
	}

	async fn clean(&self, source_id: i32) -> Result<()> {
		match self.pool.acquire().await {
		let mut conn = self.pool.acquire().await
			Err(err) => {
				self.debug(&format!("πŸ›‘ Clean fetch conn:\n{}\n{:?}", &err, &self.pool))?;
			.with_context(|| format!("πŸ›‘ Clean fetch conn:\n{:?}", &self.pool))?;
			},
			Ok(mut conn) => {
				match sqlx::query("delete from rsstg_post where source_id = $1;")
					.bind(source_id)
					.execute(&mut conn).await {
		sqlx::query("delete from rsstg_post where source_id = $1;")
			.bind(source_id)
			.execute(&mut conn).await
					Err(err) => {
						self.debug(&format!("πŸ›‘ Clean seen posts:\n{}\n{:?}", &err, &self.pool))?;
			.with_context(|| format!("πŸ›‘ Clean seen posts:\n{:?}", &self.pool))?;
					},
					Ok(_) => {},
				};
			},
		};
		Ok(())
	}

	async fn enable(&self, source_id: &i32, id: telegram_bot::UserId) -> Result<&str> {
		let mut conn = self.pool.acquire().await
			.with_context(|| format!("πŸ›‘ Enable fetch conn:\n{:?}", &self.pool))?;
		match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
225
226
227
228
229
230
231
232

233
234

235
236
237
238
239
240
241
242
243
244
245
246










247
248
249

250
251
252
253
254
255
256
257
258
259
260
261
262
263
264















265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284

285
286
287

288
289
290
291
292
293
294
295
174
175
176
177
178
179
180

181


182












183
184
185
186
187
188
189
190
191
192



193















194
195
196
197
198
199
200
201
202
203
204
205
206
207
208


209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225

226



227

228
229
230
231
232
233
234







-
+
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
-
-
-
+
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
-
-

















-
+
-
-
-
+
-







		}
	}

	async fn autofetch(&self) -> Result<()> {
		let mut delay = chrono::Duration::minutes(5);
		let mut now;
		loop {
			match self.pool.acquire().await {
			let mut conn = self.pool.acquire().await
				Err(err) => {
					self.debug(&format!("πŸ›‘ Autofetch fetch conn:\n{}\n{:?}", &err, &self.pool))?;
				.with_context(|| format!("πŸ›‘ Autofetch fetch conn:\n{:?}", &self.pool))?;
				},
				Ok(mut conn) => {
					now = chrono::Local::now();
					let mut queue = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel where next_fetch < now();")
						.fetch_all(&mut conn).await?;
					for row in queue.iter() {
						let source_id: i32 = row.try_get("source_id")?;
						let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
						if next_fetch < now {
							match sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;")
								.bind(source_id)
								.execute(&mut conn).await {
			now = chrono::Local::now();
			let mut queue = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel where next_fetch < now();")
				.fetch_all(&mut conn).await?;
			for row in queue.iter() {
				let source_id: i32 = row.try_get("source_id")?;
				let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
				if next_fetch < now {
					sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;")
						.bind(source_id)
						.execute(&mut conn).await
								Ok(_) => {},
								Err(err) => {
									self.debug(&err.to_string())?;
						.with_context(|| format!("πŸ›‘ Lock source:\n\n{:?}", &self.pool))?;
								},
							};
							let clone = self.clone();
							tokio::spawn(async move {
								if let Err(err) = clone.check(&source_id.clone(), true).await {
									eprintln!("connection error: {}", err);
								}
							});
						} else {
							if next_fetch - now < delay {
								delay = next_fetch - now;
							}
						}
					};
					queue.clear();
					let clone = self.clone();
					tokio::spawn(async move {
						if let Err(err) = clone.check(&source_id.clone(), true).await {
							if let Err(err) = clone.debug(&err.to_string()) {
								eprintln!("Check error: {}", err);
							};
						};
					});
				} else {
					if next_fetch - now < delay {
						delay = next_fetch - now;
					}
				}
			};
			queue.clear();
				},
			};
			tokio::time::delay_for(delay.to_std()?).await;
			delay = chrono::Duration::minutes(5);
		}
	}

}

#[tokio::main]
async fn main() -> Result<()> {
	let mut settings = config::Config::default();
	settings.merge(config::File::with_name("rsstg"))?;

	let core = Core::new(settings).await?;

	let mut stream = core.stream();

	while let Some(update) = stream.next().await {
		match handle(update?, &core).await {
		if let Err(err) = handle(update?, &core).await {
			Ok(_) => {},
			Err(err) => {
				core.debug(&err.to_string())?;
			core.debug(&err.to_string())?;
			}
		};
	}

	Ok(())
}

async fn handle(update: telegram_bot::Update, core: &Core) -> Result<()> {
543
544
545
546
547
548
549
550

551
552
553
554


555
556
557


558
559
560
561
562
563
482
483
484
485
486
487
488

489




490
491



492
493
494
495
496
497
498
499







-
+
-
-
-
-
+
+
-
-
-
+
+






					};
				},
				_ => {
				},
			};

			if reply.len() > 0 {
				match core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await {
				if let Err(err) = core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await {
					Ok(_) => {},
					Err(err) => {
						dbg!(reply.join("\n"));
						println!("{}", err);
					dbg!(reply.join("\n"));
					println!("{}", err);
					},
				}
			}
				};
			};
		},
		_ => {},
	};

	Ok(())
}