1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
|
-
-
-
-
-
-
-
+
+
-
+
-
-
-
+
+
+
-
+
-
+
|
use anyhow::{anyhow, bail, Context, Result};
use atom_syndication;
use chrono::DateTime;
use config;
use reqwest;
use sqlx::{
postgres::PgPoolOptions,
Row,
};
use rss;
use std::{
borrow::Cow,
collections::{
BTreeMap,
HashSet,
},
sync::{Arc, Mutex},
};
use telegram_bot;
#[derive(Clone)]
pub struct Core {
owner: i64,
api_key: String,
//owner: i64,
//api_key: String,
owner_chat: telegram_bot::UserId,
pub tg: telegram_bot::Api,
pub my: telegram_bot::User,
pool: sqlx::Pool<sqlx::Postgres>,
sources: Arc<Mutex<HashSet<Arc<i32>>>>,
}
impl Core {
pub async fn new(settings: config::Config) -> Result<Core> {
pub async fn new(settings: config::Config) -> Result<Arc<Core>> {
let owner = settings.get_int("owner")?;
let api_key = settings.get_str("api_key")?;
let tg = telegram_bot::Api::new(&api_key);
let core = Core {
owner: owner,
api_key: api_key.clone(),
let core = Arc::new(Core {
//owner,
//api_key: api_key.clone(),
my: tg.send(telegram_bot::GetMe).await?,
tg: tg,
tg,
owner_chat: telegram_bot::UserId::new(owner),
pool: PgPoolOptions::new()
.max_connections(5)
.connect_timeout(std::time::Duration::new(300, 0))
.idle_timeout(std::time::Duration::new(60, 0))
.connect_lazy(&settings.get_str("pg")?)?,
sources: Arc::new(Mutex::new(HashSet::new())),
};
});
let clone = core.clone();
tokio::spawn(async move {
if let Err(err) = &clone.autofetch().await {
if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None) {
eprintln!("Autofetch error: {}", err);
};
}
|
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
|
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
-
+
-
-
-
-
-
-
-
+
+
+
+
+
+
-
-
-
+
-
+
|
let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
let response = reqwest::get(url).await?;
let status = response.status();
let content = response.bytes().await?;
match rss::Channel::read_from(&content[..]) {
Ok(feed) => {
for item in feed.items() {
match item.link() {
if let Some(link) = item.link() {
Some(link) => {
let date = match item.pub_date() {
Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
}?;
let url = link;
posts.insert(date.clone(), url.into());
let date = match item.pub_date() {
Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
}?;
let url = link;
posts.insert(date, url.into());
},
None => {}
}
};
},
Err(err) => match err {
rss::Error::InvalidStartTag => {
let feed = atom_syndication::Feed::read_from(&content[..])
.with_context(|| format!("Problem opening feed url:\n{}\n{}", &url, status))?;
for item in feed.entries() {
let date = item.published().unwrap();
let url = item.links()[0].href();
posts.insert(date.clone(), url.into());
posts.insert(*date, url.into());
};
},
rss::Error::Eof => (),
_ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &url, err, status)
}
};
for (date, url) in posts.iter() {
let mut conn = self.pool.acquire().await
.with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?;
let post_url: Cow<str> = match url_re {
Some(ref x) => x.execute(url).into(),
Some(ref x) => x.execute(url),
None => url.into(),
};
let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
.bind(&*post_url)
.bind(*id)
.fetch_one(&mut conn).await
.with_context(|| format!("Check post:\n{:?}", &conn))?;
|
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
|
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
|
-
-
-
+
+
+
-
+
-
+
-
+
-
+
-
+
-
+
|
.rows_affected() {
1 => { Ok("Source disabled.") },
0 => { Ok("Source not found.") },
_ => { Err(anyhow!("Database error.")) },
}
}
pub async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result<&str>
where S: Into<i64> {
let owner = owner.into();
pub async fn update(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: i64) -> Result<&str> {
//where S: Into<i64> {
//let owner = owner.into();
let mut conn = self.pool.acquire().await
.with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;
match match update {
Some(id) => {
sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1").bind(id)
},
None => {
sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
},
}
.bind(channel_id)
.bind(url)
.bind(iv_hash)
.bind(owner)
.bind(channel)
.bind(url_re)
.execute(&mut conn).await {
Ok(_) => return Ok(match update {
Ok(_) => Ok(match update {
Some(_) => "Channel updated.",
None => "Channel added.",
}),
Err(sqlx::Error::Database(err)) => {
match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
Some("_bt_check_unique", ) => {
return Ok("Duplicate key.")
Ok("Duplicate key.")
},
Some(_) => {
return Ok("Database error.")
Ok("Database error.")
},
None => {
return Ok("No database error extracted.")
Ok("No database error extracted.")
},
};
}
},
Err(err) => {
bail!("Sorry, unknown error:\n{:#?}\n", err);
},
};
}
}
async fn autofetch(&self) -> Result<()> {
let mut delay = chrono::Duration::minutes(1);
let mut now;
loop {
let mut conn = self.pool.acquire().await
|
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
|
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
|
-
-
-
+
+
-
|
tokio::spawn(async move {
if let Err(err) = clone.check(&source_id, owner, true).await {
if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None) {
eprintln!("Check error: {}", err);
};
};
});
} else {
if next_fetch - now < delay {
delay = next_fetch - now;
} else if next_fetch - now < delay {
delay = next_fetch - now;
}
}
};
queue.clear();
tokio::time::sleep(delay.to_std()?).await;
delay = chrono::Duration::minutes(1);
}
}
|