Diff
Logged in as anonymous

Differences From Artifact [3f61fa0940]:

To Artifact [a6c540ecfc]:


40
41
42
43
44
45
46

47

48
49
50





51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
				.connect_timeout(std::time::Duration::new(300, 0))
				.idle_timeout(std::time::Duration::new(60, 0))
				.connect_lazy(&settings.get_str("pg")?)?,
			sources: Arc::new(Mutex::new(HashSet::new())),
		});
		let clone = core.clone();
		tokio::spawn(async move {

			if let Err(err) = &clone.autofetch().await {

				if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None) {
					eprintln!("Autofetch error: {}", err);
				};





			}
		});
		Ok(core)
	}

	pub fn stream(&self) -> telegram_bot::UpdatesStream {
		self.tg.stream()
	}

	pub fn send<'a, S>(&self, msg: S, target: Option<telegram_bot::UserId>, parse_mode: Option<telegram_bot::types::ParseMode>) -> Result<()>
	where S: Into<Cow<'a, str>> {
		let msg = msg.into();

		let parse_mode = match parse_mode {
			Some(mode) => mode,
			None => telegram_bot::types::ParseMode::Html,
		};
		self.tg.spawn(telegram_bot::SendMessage::new(match target {
			Some(user) => user,
			None => self.owner_chat,
		}, msg).parse_mode(parse_mode));
		Ok(())
	}

	pub async fn check<S>(&self, id: &i32, owner: S, real: bool) -> Result<Cow<'_, str>>
	where S: Into<i64> {
		let owner = owner.into();








>
|
>
|
|
|
>
>
>
>
>









|



<
<
|
<
<
<
|
|







40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70


71



72
73
74
75
76
77
78
79
80
				.connect_timeout(std::time::Duration::new(300, 0))
				.idle_timeout(std::time::Duration::new(60, 0))
				.connect_lazy(&settings.get_str("pg")?)?,
			sources: Arc::new(Mutex::new(HashSet::new())),
		});
		let clone = core.clone();
		tokio::spawn(async move {
			loop {
				let delay = match &clone.autofetch().await {
					Err(err) => {
						if let Err(err) = clone.send(format!("🛑 {:?}", err), None, None).await {
							eprintln!("Autofetch error: {}", err);
						};
						tokio::time::Duration::from_secs(60)
					},
					Ok(time) => *time,
				};
				tokio::time::sleep(delay).await;
			}
		});
		Ok(core)
	}

	pub fn stream(&self) -> telegram_bot::UpdatesStream {
		self.tg.stream()
	}

	pub async fn send<'a, S>(&self, msg: S, target: Option<telegram_bot::UserId>, mode: Option<telegram_bot::types::ParseMode>) -> Result<()>
	where S: Into<Cow<'a, str>> {
		let msg = msg.into();



		let mode = mode.unwrap_or(telegram_bot::types::ParseMode::Html);



		let target = target.unwrap_or(self.owner_chat);
		self.tg.send(telegram_bot::SendMessage::new(target, msg).parse_mode(mode)).await?;
		Ok(())
	}

	pub async fn check<S>(&self, id: &i32, owner: S, real: bool) -> Result<Cow<'_, str>>
	where S: Into<i64> {
		let owner = owner.into();

119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
					for item in feed.items() {
						if let Some(link) = item.link() {
							let date = match item.pub_date() {
								Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
								None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
							}?;
							let url = link;
							posts.insert(date, url.into());
						}
					};
				},
				Err(err) => match err {
					rss::Error::InvalidStartTag => {
						let feed = atom_syndication::Feed::read_from(&content[..])
							.with_context(|| format!("Problem opening feed url:\n{}\n{}", &url, status))?;
						for item in feed.entries() {
							let date = item.published().unwrap();
							let url = item.links()[0].href();
							posts.insert(*date, url.into());
						};
					},
					rss::Error::Eof => (),
					_ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &url, err, status)
				}
			};
			for (date, url) in posts.iter() {







|










|







121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
					for item in feed.items() {
						if let Some(link) = item.link() {
							let date = match item.pub_date() {
								Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
								None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
							}?;
							let url = link;
							posts.insert(date, url.to_string());
						}
					};
				},
				Err(err) => match err {
					rss::Error::InvalidStartTag => {
						let feed = atom_syndication::Feed::read_from(&content[..])
							.with_context(|| format!("Problem opening feed url:\n{}\n{}", &url, status))?;
						for item in feed.entries() {
							let date = item.published().unwrap();
							let url = item.links()[0].href();
							posts.insert(*date, url.to_string());
						};
					},
					rss::Error::Eof => (),
					_ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &url, err, status)
				}
			};
			for (date, url) in posts.iter() {
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
			.rows_affected() {
			1 => { Ok("Source disabled.") },
			0 => { Ok("Source not found.") },
			_ => { Err(anyhow!("Database error.")) },
		}
	}

	pub async fn update(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: i64) -> Result<&str> {
	//where S: Into<i64> {
		//let owner = owner.into();

		let mut conn = self.pool.acquire().await
			.with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;

		match match update {
				Some(id) => {
					sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1").bind(id)







|
|
|







253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
			.rows_affected() {
			1 => { Ok("Source disabled.") },
			0 => { Ok("Source not found.") },
			_ => { Err(anyhow!("Database error.")) },
		}
	}

	pub async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result<&str>
	where S: Into<i64> {
		let owner = owner.into();

		let mut conn = self.pool.acquire().await
			.with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;

		match match update {
				Some(id) => {
					sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1").bind(id)
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
			},
			Err(err) => {
				bail!("Sorry, unknown error:\n{:#?}\n", err);
			},
		}
	}

	async fn autofetch(&self) -> Result<()> {
		let mut delay = chrono::Duration::minutes(1);
		let mut now;
		loop {
			let mut conn = self.pool.acquire().await
				.with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?;
			now = chrono::Local::now();
			let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
				.fetch_all(&mut conn).await?;
			for row in queue.iter() {
				let source_id: i32 = row.try_get("source_id")?;
				let owner: i64 = row.try_get("owner")?;
				let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
				if next_fetch < now {
					let clone = Core {
						owner_chat: telegram_bot::UserId::new(owner),
						..self.clone()
					};
					tokio::spawn(async move {
						if let Err(err) = clone.check(&source_id, owner, true).await {
							if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None) {
								eprintln!("Check error: {}", err);
							};
						};
					});
				} else if next_fetch - now < delay {
					delay = next_fetch - now;
				}
			};
			queue.clear();
			tokio::time::sleep(delay.to_std()?).await;
			delay = chrono::Duration::minutes(1);
		}
	}

	pub async fn list<S>(&self, owner: S) -> Result<String>
	where S: Into<i64> {
		let owner = owner.into();

		let mut reply: Vec<Cow<str>> = vec![];







|

<
<
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
<
<







298
299
300
301
302
303
304
305
306


307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333


334
335
336
337
338
339
340
			},
			Err(err) => {
				bail!("Sorry, unknown error:\n{:#?}\n", err);
			},
		}
	}

	async fn autofetch(&self) -> Result<std::time::Duration> {
		let mut delay = chrono::Duration::minutes(1);


		let mut conn = self.pool.acquire().await
			.with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?;
		let now = chrono::Local::now();
		let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
			.fetch_all(&mut conn).await?;
		for row in queue.iter() {
			let source_id: i32 = row.try_get("source_id")?;
			let owner: i64 = row.try_get("owner")?;
			let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
			if next_fetch < now {
				let clone = Core {
					owner_chat: telegram_bot::UserId::new(owner),
					..self.clone()
				};
				tokio::spawn(async move {
					if let Err(err) = clone.check(&source_id, owner, true).await {
						if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None).await {
							eprintln!("Check error: {}", err);
						};
					};
				});
			} else if next_fetch - now < delay {
				delay = next_fetch - now;
			}
		};
		queue.clear();
		Ok(delay.to_std()?)


	}

	pub async fn list<S>(&self, owner: S) -> Result<String>
	where S: Into<i64> {
		let owner = owner.into();

		let mut reply: Vec<Cow<str>> = vec![];