1
2
3
4
5
6
7
|
1
2
3
4
5
6
7
8
9
|
+
+
|
use std::collections::BTreeMap;
use config;
use tokio;
use rss;
use chrono::DateTime;
use regex::Regex;
|
︙ | | |
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
|
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
|
+
+
+
+
-
-
+
+
|
let destination = match real {
Some(true) => UserId::new(channel_id),
Some(false) | None => UserId::new(row.try_get("owner")?),
};
let url: &str = row.try_get("url")?;
let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
let iv_hash: Option<&str> = row.try_get("iv_hash")?;
let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
match rss::Channel::from_url(url) {
Ok(feed) => {
self.debug(&format!("# title:{:?} ttl:{:?} hours:{:?} days:{:?}", feed.title(), feed.ttl(), feed.skip_hours(), feed.skip_days()))?;
for item in feed.items() {
let date = match item.pub_date() {
Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
}?;
let url = item.link().unwrap().to_string();
posts.insert(date.clone(), url.clone());
};
for (date, url) in posts.iter() {
match sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
.bind(&url)
.bind(id)
.fetch_one(&self.pool).await {
Ok(row) => {
let exists: bool = row.try_get("exists")?;
if ! exists {
if this_fetch == None || date > this_fetch.unwrap() {
this_fetch = Some(date);
if this_fetch == None || *date > this_fetch.unwrap() {
this_fetch = Some(*date);
}
match self.tg.send( match iv_hash {
Some(x) => SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", url, x)),
None => SendMessage::new(destination, format!("{}", url)),
}.parse_mode(types::ParseMode::Html)).await {
Ok(_) => {
match sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
|
︙ | | |
111
112
113
114
115
116
117
118
119
120
121
122
123
124
|
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
+
|
}
},
Err(err) => {
self.debug(&err.to_string())?;
},
};
};
posts.clear();
},
Err(err) => {
self.debug(&err.to_string())?;
},
};
match sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
.bind(id)
|
︙ | | |
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
|
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
|
+
-
+
-
|
async fn autofetch(&self) -> Result<()> {
let mut delay = chrono::Duration::minutes(5);
let mut next_fetch: DateTime<chrono::Local>;
let mut now;
loop {
self.debug("cycle")?;
now = chrono::Local::now();
let mut rows = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel;")
let mut rows = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel where next_fetch < now();")
.fetch(&self.pool);
while let Some(row) = rows.try_next().await.unwrap() {
now = chrono::Local::now();
let source_id: i32 = row.try_get("source_id")?;
next_fetch = row.try_get("next_fetch")?;
if next_fetch < now {
match sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;")
.bind(source_id)
.execute(&self.pool).await {
Ok(_) => {},
|
︙ | | |
| |