Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "rsstg" -version = "0.1.7" +version = "0.1.8" authors = ["arcade"] edition = "2018" [dependencies] chrono = "*" Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -1,29 +1,29 @@ use std::collections::BTreeMap; use config; use tokio; + use rss; + use chrono::DateTime; use regex::Regex; -use tokio::stream::StreamExt; use telegram_bot::*; +use tokio::stream::StreamExt; use sqlx::postgres::PgPoolOptions; use sqlx::Row; -use sqlx::Done; +use sqlx::Done; // .rows_affected() #[macro_use] extern crate lazy_static; use anyhow::{anyhow, Context, Result}; -//type Result = std::result::Result>; - #[derive(Clone)] struct Core { owner: i64, api_key: String, owner_chat: UserId, @@ -46,16 +46,17 @@ pool: PgPoolOptions::new() .max_connections(5) .connect_timeout(std::time::Duration::new(300, 0)) .idle_timeout(std::time::Duration::new(60, 0)) .connect_lazy(&settings.get_str("pg")?)?, - //.connect(&settings.get_str("pg")?).await?, }; let clone = core.clone(); tokio::spawn(async move { - if let Err(err) = clone.autofetch().await { - eprintln!("connection error: {}", err); + if let Err(err) = &clone.autofetch().await { + if let Err(err) = clone.debug(&err.to_string()) { + eprintln!("Autofetch error: {}", err); + }; } }); Ok(core) } @@ -67,133 +68,81 @@ self.tg.spawn(SendMessage::new(self.owner_chat, msg)); Ok(()) } async fn check(&self, id: &i32, real: bool) -> Result<()> { - match self.pool.acquire().await { - Err(err) => { - self.debug(&format!("πŸ›‘ Query queue fetch conn:\n{}\n{:?}", &err, &self.pool))?; - }, - Ok(mut conn) => { - match sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1") - .bind(id) - .fetch_one(&mut conn).await { - Err(err) => { - self.debug(&format!("πŸ›‘ Query queue:\n{}\n{:?}", &err, &conn))?; - }, - Ok(row) => { - drop(conn); - let channel_id: i64 = row.try_get("channel_id")?; - let destination = match real { - true => UserId::new(channel_id), - false => UserId::new(row.try_get("owner")?), - }; - let url: &str = row.try_get("url")?; - let mut this_fetch: Option> = None; - let iv_hash: Option<&str> = row.try_get("iv_hash")?; - let mut posts: BTreeMap, String> = BTreeMap::new(); - match rss::Channel::from_url(url) { - Err(err) => { - self.debug(&format!("πŸ›‘ Problem opening feed url:\n{}\n{}", &url, &err))?; - }, - Ok(feed) => { - for item in feed.items() { - let date = match item.pub_date() { - Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), - None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), - }?; - let url = item.link().unwrap().to_string(); - posts.insert(date.clone(), url.clone()); - }; - for (date, url) in posts.iter() { - match self.pool.acquire().await { - Err(err) => { - self.debug(&format!("πŸ›‘ Check post fetch conn:\n{}\n{:?}", &err, &self.pool))?; - }, - Ok(mut conn) => { - match sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") - .bind(&url) - .bind(id) - .fetch_one(&mut conn).await { - Err(err) => { - self.debug(&format!("πŸ›‘ Check post:\n{}\n{:?}", &err, &conn))?; - }, - Ok(row) => { - let exists: bool = row.try_get("exists")?; - if ! exists { - if this_fetch == None || *date > this_fetch.unwrap() { - this_fetch = Some(*date); - } - match self.tg.send( match iv_hash { - Some(x) => SendMessage::new(destination, format!(" {0}", url, x)), - None => SendMessage::new(destination, format!("{}", url)), - }.parse_mode(types::ParseMode::Html)).await { - Err(err) => { - self.debug(&format!("πŸ›‘ Can't post message:\n{}", &err))?; - }, - Ok(_) => { - match sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") - .bind(id) - .bind(date) - .bind(url) - .execute(&mut conn).await { - Ok(_) => {}, - Err(err) => { - self.debug(&format!("πŸ›‘Rrecord post:\n{}\n{:?}", &err, &conn))?; - }, - }; - }, - }; - drop(conn); - tokio::time::delay_for(std::time::Duration::new(4, 0)).await; - } - }, - }; - } - }; - }; - posts.clear(); - }, - }; - match self.pool.acquire().await { - Err(err) => { - self.debug(&format!("πŸ›‘ Update scrape fetch conn:\n{}\n{:?}", &err, &self.pool))?; - }, - Ok(mut conn) => { - match sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") - .bind(id) - .execute(&mut conn).await { - Err(err) => { - self.debug(&format!("πŸ›‘ Update scrape:\n{}\n{:?}", &err, &conn))?; - }, - Ok(_) => {}, - }; - }, - }; - }, - }; - }, - }; + let mut conn = self.pool.acquire().await + .with_context(|| format!("πŸ›‘ Query queue fetch conn:\n{:?}", &self.pool))?; + let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1") + .bind(id) + .fetch_one(&mut conn).await + .with_context(|| format!("πŸ›‘ Query source:\n{:?}", &self.pool))?; + drop(conn); + let channel_id: i64 = row.try_get("channel_id")?; + let destination = match real { + true => UserId::new(channel_id), + false => UserId::new(row.try_get("owner")?), + }; + let url: &str = row.try_get("url")?; + let mut this_fetch: Option> = None; + let iv_hash: Option<&str> = row.try_get("iv_hash")?; + let mut posts: BTreeMap, String> = BTreeMap::new(); + let feed = rss::Channel::from_url(url) + .with_context(|| format!("πŸ›‘ Problem opening feed url:\n{}", &url))?; + for item in feed.items() { + let date = match item.pub_date() { + Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), + None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), + }?; + let url = item.link().unwrap().to_string(); + posts.insert(date.clone(), url.clone()); + }; + for (date, url) in posts.iter() { + let mut conn = self.pool.acquire().await + .with_context(|| format!("πŸ›‘ Check post fetch conn:\n{:?}", &self.pool))?; + let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") + .bind(&url) + .bind(id) + .fetch_one(&mut conn).await + .with_context(|| format!("πŸ›‘ Check post:\n{:?}", &conn))?; + let exists: bool = row.try_get("exists")?; + if ! exists { + if this_fetch == None || *date > this_fetch.unwrap() { + this_fetch = Some(*date); + }; + self.tg.send( match iv_hash { + Some(x) => SendMessage::new(destination, format!(" {0}", url, x)), + None => SendMessage::new(destination, format!("{}", url)), + }.parse_mode(types::ParseMode::Html)).await + .context("πŸ›‘ Can't post message:")?; + sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") + .bind(id) + .bind(date) + .bind(url) + .execute(&mut conn).await + .with_context(|| format!("πŸ›‘Record post:\n{:?}", &conn))?; + drop(conn); + tokio::time::delay_for(std::time::Duration::new(4, 0)).await; + }; + }; + posts.clear(); + let mut conn = self.pool.acquire().await + .with_context(|| format!("πŸ›‘ Update scrape fetch conn:\n{:?}", &self.pool))?; + sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") + .bind(id) + .execute(&mut conn).await + .with_context(|| format!("πŸ›‘ Update scrape:\n{:?}", &conn))?; Ok(()) } async fn clean(&self, source_id: i32) -> Result<()> { - match self.pool.acquire().await { - Err(err) => { - self.debug(&format!("πŸ›‘ Clean fetch conn:\n{}\n{:?}", &err, &self.pool))?; - }, - Ok(mut conn) => { - match sqlx::query("delete from rsstg_post where source_id = $1;") - .bind(source_id) - .execute(&mut conn).await { - Err(err) => { - self.debug(&format!("πŸ›‘ Clean seen posts:\n{}\n{:?}", &err, &self.pool))?; - }, - Ok(_) => {}, - }; - }, - }; + let mut conn = self.pool.acquire().await + .with_context(|| format!("πŸ›‘ Clean fetch conn:\n{:?}", &self.pool))?; + sqlx::query("delete from rsstg_post where source_id = $1;") + .bind(source_id) + .execute(&mut conn).await + .with_context(|| format!("πŸ›‘ Clean seen posts:\n{:?}", &self.pool))?; Ok(()) } async fn enable(&self, source_id: &i32, id: telegram_bot::UserId) -> Result<&str> { let mut conn = self.pool.acquire().await @@ -227,45 +176,38 @@ async fn autofetch(&self) -> Result<()> { let mut delay = chrono::Duration::minutes(5); let mut now; loop { - match self.pool.acquire().await { - Err(err) => { - self.debug(&format!("πŸ›‘ Autofetch fetch conn:\n{}\n{:?}", &err, &self.pool))?; - }, - Ok(mut conn) => { - now = chrono::Local::now(); - let mut queue = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel where next_fetch < now();") - .fetch_all(&mut conn).await?; - for row in queue.iter() { - let source_id: i32 = row.try_get("source_id")?; - let next_fetch: DateTime = row.try_get("next_fetch")?; - if next_fetch < now { - match sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;") - .bind(source_id) - .execute(&mut conn).await { - Ok(_) => {}, - Err(err) => { - self.debug(&err.to_string())?; - }, - }; - let clone = self.clone(); - tokio::spawn(async move { - if let Err(err) = clone.check(&source_id.clone(), true).await { - eprintln!("connection error: {}", err); - } - }); - } else { - if next_fetch - now < delay { - delay = next_fetch - now; - } - } - }; - queue.clear(); - }, - }; + let mut conn = self.pool.acquire().await + .with_context(|| format!("πŸ›‘ Autofetch fetch conn:\n{:?}", &self.pool))?; + now = chrono::Local::now(); + let mut queue = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel where next_fetch < now();") + .fetch_all(&mut conn).await?; + for row in queue.iter() { + let source_id: i32 = row.try_get("source_id")?; + let next_fetch: DateTime = row.try_get("next_fetch")?; + if next_fetch < now { + sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;") + .bind(source_id) + .execute(&mut conn).await + .with_context(|| format!("πŸ›‘ Lock source:\n\n{:?}", &self.pool))?; + let clone = self.clone(); + tokio::spawn(async move { + if let Err(err) = clone.check(&source_id.clone(), true).await { + if let Err(err) = clone.debug(&err.to_string()) { + eprintln!("Check error: {}", err); + }; + }; + }); + } else { + if next_fetch - now < delay { + delay = next_fetch - now; + } + } + }; + queue.clear(); tokio::time::delay_for(delay.to_std()?).await; delay = chrono::Duration::minutes(5); } } @@ -279,15 +221,12 @@ let core = Core::new(settings).await?; let mut stream = core.stream(); while let Some(update) = stream.next().await { - match handle(update?, &core).await { - Ok(_) => {}, - Err(err) => { - core.debug(&err.to_string())?; - } + if let Err(err) = handle(update?, &core).await { + core.debug(&err.to_string())?; }; } Ok(()) } @@ -545,19 +484,16 @@ _ => { }, }; if reply.len() > 0 { - match core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await { - Ok(_) => {}, - Err(err) => { - dbg!(reply.join("\n")); - println!("{}", err); - }, - } - } + if let Err(err) = core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await { + dbg!(reply.join("\n")); + println!("{}", err); + }; + }; }, _ => {}, }; Ok(()) }