Diff
Logged in as anonymous

Differences From Artifact [b3dd317a5f]:

To Artifact [5906a79fca]:


1

2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28

29
30
31

32
33
34
35
36



37
38
39
40
41
42
43
44
45
46

47
48
49
50
51
52
53

54
55
56
57

58
59
60
61
62
63
64
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18


19
20
21
22
23
24
25
26

27
28
29
30
31
32



33
34
35
36
37
38
39
40
41
42
43
44
45

46
47
48
49
50
51
52

53
54
55
56

57
58
59
60
61
62
63
64

+
















-
-








-
+



+

-
-
-

+
+
+









-
+






-
+



-
+







use anyhow::{anyhow, bail, Context, Result};
use async_std::task;
use chrono::DateTime;
use sqlx::{
	postgres::PgPoolOptions,
	Row,
};
use std::{
	borrow::Cow,
	collections::{
		BTreeMap,
		HashSet,
	},
	sync::{Arc, Mutex},
};

#[derive(Clone)]
pub struct Core {
	//owner: i64,
	//api_key: String,
	owner_chat: telegram_bot::UserId,
	pub tg: telegram_bot::Api,
	pub my: telegram_bot::User,
	pool: sqlx::Pool<sqlx::Postgres>,
	sources: Arc<Mutex<HashSet<Arc<i32>>>>,
}

impl Core {
	pub async fn new(settings: config::Config) -> Result<Arc<Core>> {
	pub fn new(settings: config::Config) -> Result<Arc<Core>> {
		let owner = settings.get_int("owner")?;
		let api_key = settings.get_string("api_key")?;
		let tg = telegram_bot::Api::new(&api_key);
		let tg_cloned = tg.clone();
		let core = Arc::new(Core {
			//owner,
			//api_key: api_key.clone(),
			my: tg.send(telegram_bot::GetMe).await?,
			tg,
			my: task::block_on(async {
				tg_cloned.send(telegram_bot::GetMe).await
			})?,
			owner_chat: telegram_bot::UserId::new(owner),
			pool: PgPoolOptions::new()
				.max_connections(5)
				.connect_timeout(std::time::Duration::new(300, 0))
				.idle_timeout(std::time::Duration::new(60, 0))
				.connect_lazy(&settings.get_string("pg")?)?,
			sources: Arc::new(Mutex::new(HashSet::new())),
		});
		let clone = core.clone();
		tokio::spawn(async move {
		task::spawn(async move {
			loop {
				let delay = match &clone.autofetch().await {
					Err(err) => {
						if let Err(err) = clone.send(format!("🛑 {:?}", err), None, None).await {
							eprintln!("Autofetch error: {}", err);
						};
						tokio::time::Duration::from_secs(60)
						std::time::Duration::from_secs(60)
					},
					Ok(time) => *time,
				};
				tokio::time::sleep(delay).await;
				task::sleep(delay).await;
			}
		});
		Ok(core)
	}

	pub fn stream(&self) -> telegram_bot::UpdatesStream {
		self.tg.stream()
166
167
168
169
170
171
172
173

174
175
176
177
178
179
180
166
167
168
169
170
171
172

173
174
175
176
177
178
179
180







-
+







					sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
						.bind(*id)
						.bind(date)
						.bind(&*post_url)
						.execute(&mut conn).await
						.with_context(|| format!("Record post:\n{:?}", &conn))?;
					drop(conn);
					tokio::time::sleep(std::time::Duration::new(4, 0)).await;
					task::sleep(std::time::Duration::new(4, 0)).await;
				};
				posted += 1;
			};
			posts.clear();
		};
		let mut conn = self.pool.acquire().await
			.with_context(|| format!("Update scrape fetch conn:\n{:?}", &self.pool))?;
312
313
314
315
316
317
318
319

320
321
322
323
324
325
326
312
313
314
315
316
317
318

319
320
321
322
323
324
325
326







-
+







			let owner: i64 = row.try_get("owner")?;
			let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
			if next_fetch < now {
				let clone = Core {
					owner_chat: telegram_bot::UserId::new(owner),
					..self.clone()
				};
				tokio::spawn(async move {
				task::spawn(async move {
					if let Err(err) = clone.check(&source_id, owner, true).await {
						if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None).await {
							eprintln!("Check error: {}", err);
						};
					};
				});
			} else if next_fetch - now < delay {