Diff
Logged in as anonymous

Differences From Artifact [87190faf6b]:

To Artifact [3f61fa0940]:


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24


25
26
27
28
29
30
31
32
33

34
35
36
37
38
39



40
41

42
43
44
45
46
47
48
49

50
51
52
53
54
55
56
1

2


3
4
5
6

7
8
9
10
11
12
13
14

15
16
17


18
19
20
21
22
23
24
25
26
27

28
29
30
31



32
33
34
35

36
37
38
39
40
41
42
43

44
45
46
47
48
49
50
51

-

-
-




-








-



-
-
+
+








-
+



-
-
-
+
+
+

-
+







-
+







use anyhow::{anyhow, bail, Context, Result};
use atom_syndication;
use chrono::DateTime;
use config;
use reqwest;
use sqlx::{
	postgres::PgPoolOptions,
	Row,
};
use rss;
use std::{
	borrow::Cow,
	collections::{
		BTreeMap,
		HashSet,
	},
	sync::{Arc, Mutex},
};
use telegram_bot;

#[derive(Clone)]
pub struct Core {
	owner: i64,
	api_key: String,
	//owner: i64,
	//api_key: String,
	owner_chat: telegram_bot::UserId,
	pub tg: telegram_bot::Api,
	pub my: telegram_bot::User,
	pool: sqlx::Pool<sqlx::Postgres>,
	sources: Arc<Mutex<HashSet<Arc<i32>>>>,
}

impl Core {
	pub async fn new(settings: config::Config) -> Result<Core> {
	pub async fn new(settings: config::Config) -> Result<Arc<Core>> {
		let owner = settings.get_int("owner")?;
		let api_key = settings.get_str("api_key")?;
		let tg = telegram_bot::Api::new(&api_key);
		let core = Core {
			owner: owner,
			api_key: api_key.clone(),
		let core = Arc::new(Core {
			//owner,
			//api_key: api_key.clone(),
			my: tg.send(telegram_bot::GetMe).await?,
			tg: tg,
			tg,
			owner_chat: telegram_bot::UserId::new(owner),
			pool: PgPoolOptions::new()
				.max_connections(5)
				.connect_timeout(std::time::Duration::new(300, 0))
				.idle_timeout(std::time::Duration::new(60, 0))
				.connect_lazy(&settings.get_str("pg")?)?,
			sources: Arc::new(Mutex::new(HashSet::new())),
		};
		});
		let clone = core.clone();
		tokio::spawn(async move {
			if let Err(err) = &clone.autofetch().await {
				if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None) {
					eprintln!("Autofetch error: {}", err);
				};
			}
118
119
120
121
122
123
124
125

126
127
128
129
130
131
132






133
134
135
136
137
138
139
140
141
142
143
144
145

146
147
148
149
150
151
152
153
154
155
156

157
158
159
160
161
162
163
113
114
115
116
117
118
119

120







121
122
123
124
125
126


127
128
129
130
131
132
133
134
135
136

137
138
139
140
141
142
143
144
145
146
147

148
149
150
151
152
153
154
155







-
+
-
-
-
-
-
-
-
+
+
+
+
+
+
-
-










-
+










-
+







			let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
			let response = reqwest::get(url).await?;
			let status = response.status();
			let content = response.bytes().await?;
			match rss::Channel::read_from(&content[..]) {
				Ok(feed) => {
					for item in feed.items() {
						match item.link() {
						if let Some(link) = item.link() {
							Some(link) => {
								let date = match item.pub_date() {
									Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
									None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
								}?;
								let url = link;
								posts.insert(date.clone(), url.into());
							let date = match item.pub_date() {
								Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
								None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
							}?;
							let url = link;
							posts.insert(date, url.into());
							},
							None => {}
						}
					};
				},
				Err(err) => match err {
					rss::Error::InvalidStartTag => {
						let feed = atom_syndication::Feed::read_from(&content[..])
							.with_context(|| format!("Problem opening feed url:\n{}\n{}", &url, status))?;
						for item in feed.entries() {
							let date = item.published().unwrap();
							let url = item.links()[0].href();
							posts.insert(date.clone(), url.into());
							posts.insert(*date, url.into());
						};
					},
					rss::Error::Eof => (),
					_ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &url, err, status)
				}
			};
			for (date, url) in posts.iter() {
				let mut conn = self.pool.acquire().await
					.with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?;
				let post_url: Cow<str> = match url_re {
					Some(ref x) => x.execute(url).into(),
					Some(ref x) => x.execute(url),
					None => url.into(),
				};
				let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
					.bind(&*post_url)
					.bind(*id)
					.fetch_one(&mut conn).await
					.with_context(|| format!("Check post:\n{:?}", &conn))?;
259
260
261
262
263
264
265
266
267
268



269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288

289
290
291
292
293
294
295

296
297
298

299
300
301

302
303

304
305
306
307
308

309
310
311
312
313
314
315
251
252
253
254
255
256
257



258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279

280
281
282
283
284
285
286

287
288
289

290
291
292

293
294

295
296
297
298
299

300
301
302
303
304
305
306
307







-
-
-
+
+
+



















-
+






-
+


-
+


-
+

-
+




-
+







			.rows_affected() {
			1 => { Ok("Source disabled.") },
			0 => { Ok("Source not found.") },
			_ => { Err(anyhow!("Database error.")) },
		}
	}

	pub async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result<&str>
	where S: Into<i64> {
		let owner = owner.into();
	pub async fn update(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: i64) -> Result<&str> {
	//where S: Into<i64> {
		//let owner = owner.into();

		let mut conn = self.pool.acquire().await
			.with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;

		match match update {
				Some(id) => {
					sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1").bind(id)
				},
				None => {
					sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
				},
			}
			.bind(channel_id)
			.bind(url)
			.bind(iv_hash)
			.bind(owner)
			.bind(channel)
			.bind(url_re)
			.execute(&mut conn).await {
			Ok(_) => return Ok(match update {
			Ok(_) => Ok(match update {
				Some(_) => "Channel updated.",
				None => "Channel added.",
			}),
			Err(sqlx::Error::Database(err)) => {
				match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
					Some("_bt_check_unique", ) => {
						return Ok("Duplicate key.")
						Ok("Duplicate key.")
					},
					Some(_) => {
						return Ok("Database error.")
						Ok("Database error.")
					},
					None => {
						return Ok("No database error extracted.")
						Ok("No database error extracted.")
					},
				};
				}
			},
			Err(err) => {
				bail!("Sorry, unknown error:\n{:#?}\n", err);
			},
		};
		}
	}

	async fn autofetch(&self) -> Result<()> {
		let mut delay = chrono::Duration::minutes(1);
		let mut now;
		loop {
			let mut conn = self.pool.acquire().await
329
330
331
332
333
334
335
336
337
338


339
340
341
342
343
344
345
346
321
322
323
324
325
326
327



328
329

330
331
332
333
334
335
336







-
-
-
+
+
-







					tokio::spawn(async move {
						if let Err(err) = clone.check(&source_id, owner, true).await {
							if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None) {
								eprintln!("Check error: {}", err);
							};
						};
					});
				} else {
					if next_fetch - now < delay {
						delay = next_fetch - now;
				} else if next_fetch - now < delay {
					delay = next_fetch - now;
					}
				}
			};
			queue.clear();
			tokio::time::sleep(delay.to_std()?).await;
			delay = chrono::Duration::minutes(1);
		}
	}