Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "rsstg" -version = "0.1.3" +version = "0.1.4" authors = ["arcade"] edition = "2018" [dependencies] config = "*" Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -1,5 +1,7 @@ +use std::collections::BTreeMap; + use config; use tokio; use rss; use chrono::DateTime; @@ -66,28 +68,32 @@ Some(false) | None => UserId::new(row.try_get("owner")?), }; let url: &str = row.try_get("url")?; let mut this_fetch: Option> = None; let iv_hash: Option<&str> = row.try_get("iv_hash")?; + let mut posts: BTreeMap, String> = BTreeMap::new(); match rss::Channel::from_url(url) { Ok(feed) => { self.debug(&format!("# title:{:?} ttl:{:?} hours:{:?} days:{:?}", feed.title(), feed.ttl(), feed.skip_hours(), feed.skip_days()))?; for item in feed.items() { let date = match item.pub_date() { Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), }?; let url = item.link().unwrap().to_string(); + posts.insert(date.clone(), url.clone()); + }; + for (date, url) in posts.iter() { match sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") .bind(&url) .bind(id) .fetch_one(&self.pool).await { Ok(row) => { let exists: bool = row.try_get("exists")?; if ! exists { - if this_fetch == None || date > this_fetch.unwrap() { - this_fetch = Some(date); + if this_fetch == None || *date > this_fetch.unwrap() { + this_fetch = Some(*date); } match self.tg.send( match iv_hash { Some(x) => SendMessage::new(destination, format!(" {0}", url, x)), None => SendMessage::new(destination, format!("{}", url)), }.parse_mode(types::ParseMode::Html)).await { @@ -113,10 +119,11 @@ Err(err) => { self.debug(&err.to_string())?; }, }; }; + posts.clear(); }, Err(err) => { self.debug(&err.to_string())?; }, }; @@ -176,14 +183,14 @@ let mut delay = chrono::Duration::minutes(5); let mut next_fetch: DateTime; let mut now; loop { self.debug("cycle")?; - let mut rows = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel;") + now = chrono::Local::now(); + let mut rows = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel where next_fetch < now();") .fetch(&self.pool); while let Some(row) = rows.try_next().await.unwrap() { - now = chrono::Local::now(); let source_id: i32 = row.try_get("source_id")?; next_fetch = row.try_get("next_fetch")?; if next_fetch < now { match sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;") .bind(source_id)