17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
|
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
|
+
+
+
+
+
+
+
+
+
+
|
#[derive(Clone)]
pub struct Core {
owner_chat: telegram_bot::UserId,
pub tg: telegram_bot::Api,
pub my: telegram_bot::User,
pool: sqlx::Pool<sqlx::Postgres>,
sources: Arc<Mutex<HashSet<Arc<i32>>>>,
http_client: reqwest::Client,
}
impl Core {
pub fn new(settings: config::Config) -> Result<Arc<Core>> {
let owner = settings.get_int("owner")?;
let api_key = settings.get_string("api_key")?;
let tg = telegram_bot::Api::new(api_key);
let tg_cloned = tg.clone();
let proxy = settings.get_string("proxy")?;
let mut client = reqwest::Client::builder();
if !proxy.is_empty() {
let proxy = reqwest::Proxy::all(proxy)?;
client = client.proxy(proxy);
}
let http_client = client.build()?;
let core = Arc::new(Core {
tg,
my: task::block_on(async {
tg_cloned.send(telegram_bot::GetMe).await
})?,
owner_chat: telegram_bot::UserId::new(owner),
pool: PgPoolOptions::new()
.max_connections(5)
.acquire_timeout(std::time::Duration::new(300, 0))
.idle_timeout(std::time::Duration::new(60, 0))
.connect_lazy(&settings.get_string("pg")?)?,
sources: Arc::new(Mutex::new(HashSet::new())),
http_client,
});
let clone = core.clone();
task::spawn(async move {
loop {
let delay = match &clone.autofetch().await {
Err(err) => {
if let Err(err) = clone.send(format!("🛑 {:?}", err), None, None).await {
|
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
|
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
+
-
+
+
|
.with_context(|| format!("Query queue fetch conn:\n{:?}", &self.pool))?;
let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
.bind(*id)
.bind(owner)
.fetch_one(&mut conn).await
.with_context(|| format!("Query source:\n{:?}", &self.pool))?;
drop(conn);
let channel_id: i64 = row.try_get("channel_id")?;
let url: &str = row.try_get("url")?;
let iv_hash: Option<&str> = row.try_get("iv_hash")?;
let url_re = match row.try_get("url_re")? {
Some(x) => Some(sedregex::ReplaceCommand::new(x)?),
None => None,
};
let destination = match real {
true => telegram_bot::UserId::new(channel_id),
false => telegram_bot::UserId::new(row.try_get("owner")?),
};
let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
let response = reqwest::get(url).await?;
let response = self.http_client.get(url).send().await?;
let status = response.status();
let content = response.bytes().await?;
match rss::Channel::read_from(&content[..]) {
Ok(feed) => {
for item in feed.items() {
if let Some(link) = item.link() {
let date = match item.pub_date() {
|