1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
|
+
-
-
-
+
+
-
-
-
+
+
+
-
+
-
+
-
+
|
use anyhow::{anyhow, bail, Context, Result};
use async_std::task;
use chrono::DateTime;
use sqlx::{
postgres::PgPoolOptions,
Row,
};
use std::{
borrow::Cow,
collections::{
BTreeMap,
HashSet,
},
sync::{Arc, Mutex},
};
#[derive(Clone)]
pub struct Core {
//owner: i64,
//api_key: String,
owner_chat: telegram_bot::UserId,
pub tg: telegram_bot::Api,
pub my: telegram_bot::User,
pool: sqlx::Pool<sqlx::Postgres>,
sources: Arc<Mutex<HashSet<Arc<i32>>>>,
}
impl Core {
pub async fn new(settings: config::Config) -> Result<Arc<Core>> {
pub fn new(settings: config::Config) -> Result<Arc<Core>> {
let owner = settings.get_int("owner")?;
let api_key = settings.get_string("api_key")?;
let tg = telegram_bot::Api::new(&api_key);
let tg_cloned = tg.clone();
let core = Arc::new(Core {
//owner,
//api_key: api_key.clone(),
my: tg.send(telegram_bot::GetMe).await?,
tg,
my: task::block_on(async {
tg_cloned.send(telegram_bot::GetMe).await
})?,
owner_chat: telegram_bot::UserId::new(owner),
pool: PgPoolOptions::new()
.max_connections(5)
.connect_timeout(std::time::Duration::new(300, 0))
.idle_timeout(std::time::Duration::new(60, 0))
.connect_lazy(&settings.get_string("pg")?)?,
sources: Arc::new(Mutex::new(HashSet::new())),
});
let clone = core.clone();
tokio::spawn(async move {
task::spawn(async move {
loop {
let delay = match &clone.autofetch().await {
Err(err) => {
if let Err(err) = clone.send(format!("🛑 {:?}", err), None, None).await {
eprintln!("Autofetch error: {}", err);
};
tokio::time::Duration::from_secs(60)
std::time::Duration::from_secs(60)
},
Ok(time) => *time,
};
tokio::time::sleep(delay).await;
task::sleep(delay).await;
}
});
Ok(core)
}
pub fn stream(&self) -> telegram_bot::UpdatesStream {
self.tg.stream()
|
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
|
-
+
|
sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
.bind(*id)
.bind(date)
.bind(&*post_url)
.execute(&mut conn).await
.with_context(|| format!("Record post:\n{:?}", &conn))?;
drop(conn);
tokio::time::sleep(std::time::Duration::new(4, 0)).await;
task::sleep(std::time::Duration::new(4, 0)).await;
};
posted += 1;
};
posts.clear();
};
let mut conn = self.pool.acquire().await
.with_context(|| format!("Update scrape fetch conn:\n{:?}", &self.pool))?;
|
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
|
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
|
-
+
|
let owner: i64 = row.try_get("owner")?;
let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
if next_fetch < now {
let clone = Core {
owner_chat: telegram_bot::UserId::new(owner),
..self.clone()
};
tokio::spawn(async move {
task::spawn(async move {
if let Err(err) = clone.check(&source_id, owner, true).await {
if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None).await {
eprintln!("Check error: {}", err);
};
};
});
} else if next_fetch - now < delay {
|