260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
|
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
|
-
+
-
+
|
Err(err) => {
bail!("Sorry, unknown error:\n{:#?}\n", err);
},
};
}
async fn autofetch(&self) -> Result<()> {
let mut delay = chrono::Duration::minutes(5);
let mut delay = chrono::Duration::minutes(1);
let mut now;
loop {
let mut conn = self.pool.acquire().await
.with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?;
now = chrono::Local::now();
let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '5 minutes';")
let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
.fetch_all(&mut conn).await?;
for row in queue.iter() {
let source_id: i32 = row.try_get("source_id")?;
let owner: i64 = row.try_get("owner")?;
let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
if next_fetch < now {
//let clone = self.clone();
|
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
|
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
|
-
+
|
if next_fetch - now < delay {
delay = next_fetch - now;
}
}
};
queue.clear();
tokio::time::delay_for(delay.to_std()?).await;
delay = chrono::Duration::minutes(5);
delay = chrono::Duration::minutes(1);
}
}
async fn list<S>(&self, owner: S) -> Result<Vec<String>>
where S: Into<i64> {
let owner = owner.into();
let mut reply = vec![];
|
335
336
337
338
339
340
341
342
343
344
345
346
347
348
|
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
|
+
|
async fn main() -> Result<()> {
let mut settings = config::Config::default();
settings.merge(config::File::with_name("rsstg"))?;
let core = Core::new(settings).await?;
let mut stream = core.stream();
stream.allowed_updates(&[AllowedUpdate::Message]);
let mut reply_to: Option<UserId>;
loop {
reply_to = None;
match stream.next().await {
Some(update) => {
if let Err(err) = handle(update?, &core, &mut reply_to).await {
|