| ︙ | | |
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
-
+
+
+
+
+
+
+
+
+
|
use async_std::{
task,
sync::{
Arc,
Mutex
},
};
use chrono::DateTime;
use chrono::{
DateTime,
Local,
};
use lazy_static::lazy_static;
use regex::Regex;
use reqwest::header::{
CACHE_CONTROL,
EXPIRES,
LAST_MODIFIED
};
use tgbot::{
api::Client,
handler::UpdateHandler,
types::{
Bot,
ChatPeerId,
Command,
|
| ︙ | | |
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
|
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
|
-
+
|
let target = target.unwrap_or(self.owner_chat);
self.tg.execute(
SendMessage::new(target, msg)
.with_parse_mode(mode)
).await.stack()
}
pub async fn check (&self, id: i32, real: bool) -> Result<String> {
pub async fn check (&self, id: i32, real: bool, last_scrape: Option<DateTime<Local>>) -> Result<String> {
let mut posted: i32 = 0;
let mut conn = self.db.begin().await.stack()?;
let id = {
let mut set = self.sources.lock_arc().await;
match set.get(&id) {
Some(id) => id.clone(),
|
| ︙ | | |
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
|
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
|
-
+
+
+
+
+
+
+
+
+
+
+
+
+
|
let destination = ChatPeerId::from(match real {
true => source.channel_id,
false => source.owner,
});
let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
let response = self.http_client.get(&source.url).send().await.stack()?;
let mut builder = self.http_client.get(&source.url);
if let Some(last_scrape) = last_scrape {
builder = builder.header(LAST_MODIFIED, last_scrape.to_rfc2822());
};
let response = builder.send().await.stack()?;
{
let headers = response.headers();
let expires = headers.get(EXPIRES);
let cache = headers.get(CACHE_CONTROL);
if expires.is_some() || cache.is_some() {
println!("{} {} {:?} {:?} {:?}", Local::now().to_rfc2822(), &source.url, last_scrape, expires, cache);
}
}
let status = response.status();
let content = response.bytes().await.stack()?;
match rss::Channel::read_from(&content[..]) {
Ok(feed) => {
for item in feed.items() {
if let Some(link) = item.link() {
let date = match item.pub_date() {
|
| ︙ | | |
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
|
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
|
-
+
-
+
|
let queue = {
let mut conn = self.db.begin().await.stack()?;
conn.get_queue().await.stack()?
};
for row in queue {
if let Some(next_fetch) = row.next_fetch {
if next_fetch < now {
if let (Some(owner), Some(source_id)) = (row.owner, row.source_id) {
if let (Some(owner), Some(source_id), last_scrape) = (row.owner, row.source_id, row.last_scrape) {
let clone = Core {
owner_chat: ChatPeerId::from(owner),
..self.clone()
};
let source = {
let mut conn = self.db.begin().await.stack()?;
match conn.get_one(owner, source_id).await {
Ok(Some(source)) => source.to_string(),
Ok(None) => "Source not found in database.stack()?".to_string(),
Err(err) => format!("Failed to fetch source data:\n{err}"),
}
};
task::spawn(async move {
if let Err(err) = clone.check(source_id, true).await {
if let Err(err) = clone.check(source_id, true, Some(last_scrape)).await {
if let Err(err) = clone.send(&format!("{source}\n\nš {}", encode(&err.to_string())), None, Some(ParseMode::MarkdownV2)).await {
eprintln!("Check error: {err:?}");
// clone.disable(&source_id, owner).await.unwrap();
};
};
});
}
|
| ︙ | | |