Index: src/command.rs ================================================================== --- src/command.rs +++ src/command.rs @@ -1,28 +1,29 @@ use anyhow::{bail, Context, Result}; use crate::core::Core; use regex::Regex; use sedregex::ReplaceCommand; +use std::borrow::Cow; lazy_static! { static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap(); static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap(); static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap(); } pub async fn start(core: &Core, sender: telegram_bot::UserId) -> Result<()> { - core.send("We are open. Probably. Visit [channel](https://t.me/rsstg_bot_help/3) for details.", Some(sender), None)?; + core.send("We are open. Probably. Visit [channel](https://t.me/rsstg_bot_help/3) for details.", Some(sender), None).await?; Ok(()) } pub async fn list(core: &Core, sender: telegram_bot::UserId) -> Result<()> { - core.send(core.list(sender).await?, Some(sender), Some(telegram_bot::types::ParseMode::MarkdownV2))?; + core.send(core.list(sender).await?, Some(sender), Some(telegram_bot::types::ParseMode::MarkdownV2)).await?; Ok(()) } pub async fn command(core: &Core, sender: telegram_bot::UserId, command: Vec<&str>) -> Result<()> { - core.send( match &command[1].parse::() { + let msg: Cow = match &command[1].parse::() { Err(err) => format!("I need a number.\n{}", &err).into(), Ok(number) => match command[0] { "/check" => core.check(number, sender, false).await .context("Channel check failed.")?, "/clean" => core.clean(number, sender).await?, @@ -29,11 +30,12 @@ "/enable" => core.enable(number, sender).await?.into(), "/delete" => core.delete(number, sender).await?, "/disable" => core.disable(number, sender).await?.into(), _ => bail!("Command {} not handled.", &command[0]), }, - }, Some(sender), None)?; + }; + core.send(msg, Some(sender), None).await?; Ok(()) } pub async fn update(core: &Core, sender: telegram_bot::UserId, command: Vec<&str>) -> Result<()> { let mut source_id: Option = None; @@ -99,8 +101,8 @@ user = true; }; }; if ! me { bail!("I need to be admin on that channel."); }; if ! user { bail!("You should be admin on that channel."); }; - core.send(core.update(source_id, channel, channel_id, url, iv_hash, url_re, sender.into()).await?, Some(sender), None)?; + core.send(core.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await?, Some(sender), None).await?; Ok(()) } Index: src/core.rs ================================================================== --- src/core.rs +++ src/core.rs @@ -42,35 +42,37 @@ .connect_lazy(&settings.get_str("pg")?)?, sources: Arc::new(Mutex::new(HashSet::new())), }); let clone = core.clone(); tokio::spawn(async move { - if let Err(err) = &clone.autofetch().await { - if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None) { - eprintln!("Autofetch error: {}", err); + loop { + let delay = match &clone.autofetch().await { + Err(err) => { + if let Err(err) = clone.send(format!("🛑 {:?}", err), None, None).await { + eprintln!("Autofetch error: {}", err); + }; + tokio::time::Duration::from_secs(60) + }, + Ok(time) => *time, }; + tokio::time::sleep(delay).await; } }); Ok(core) } pub fn stream(&self) -> telegram_bot::UpdatesStream { self.tg.stream() } - pub fn send<'a, S>(&self, msg: S, target: Option, parse_mode: Option) -> Result<()> + pub async fn send<'a, S>(&self, msg: S, target: Option, mode: Option) -> Result<()> where S: Into> { let msg = msg.into(); - let parse_mode = match parse_mode { - Some(mode) => mode, - None => telegram_bot::types::ParseMode::Html, - }; - self.tg.spawn(telegram_bot::SendMessage::new(match target { - Some(user) => user, - None => self.owner_chat, - }, msg).parse_mode(parse_mode)); + let mode = mode.unwrap_or(telegram_bot::types::ParseMode::Html); + let target = target.unwrap_or(self.owner_chat); + self.tg.send(telegram_bot::SendMessage::new(target, msg).parse_mode(mode)).await?; Ok(()) } pub async fn check(&self, id: &i32, owner: S, real: bool) -> Result> where S: Into { @@ -121,11 +123,11 @@ let date = match item.pub_date() { Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), }?; let url = link; - posts.insert(date, url.into()); + posts.insert(date, url.to_string()); } }; }, Err(err) => match err { rss::Error::InvalidStartTag => { @@ -132,11 +134,11 @@ let feed = atom_syndication::Feed::read_from(&content[..]) .with_context(|| format!("Problem opening feed url:\n{}\n{}", &url, status))?; for item in feed.entries() { let date = item.published().unwrap(); let url = item.links()[0].href(); - posts.insert(*date, url.into()); + posts.insert(*date, url.to_string()); }; }, rss::Error::Eof => (), _ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &url, err, status) } @@ -253,13 +255,13 @@ 0 => { Ok("Source not found.") }, _ => { Err(anyhow!("Database error.")) }, } } - pub async fn update(&self, update: Option, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: i64) -> Result<&str> { - //where S: Into { - //let owner = owner.into(); + pub async fn update(&self, update: Option, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result<&str> + where S: Into { + let owner = owner.into(); let mut conn = self.pool.acquire().await .with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?; match match update { @@ -298,43 +300,39 @@ bail!("Sorry, unknown error:\n{:#?}\n", err); }, } } - async fn autofetch(&self) -> Result<()> { - let mut delay = chrono::Duration::minutes(1); - let mut now; - loop { - let mut conn = self.pool.acquire().await - .with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?; - now = chrono::Local::now(); - let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") - .fetch_all(&mut conn).await?; - for row in queue.iter() { - let source_id: i32 = row.try_get("source_id")?; - let owner: i64 = row.try_get("owner")?; - let next_fetch: DateTime = row.try_get("next_fetch")?; - if next_fetch < now { - let clone = Core { - owner_chat: telegram_bot::UserId::new(owner), - ..self.clone() - }; - tokio::spawn(async move { - if let Err(err) = clone.check(&source_id, owner, true).await { - if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None) { - eprintln!("Check error: {}", err); - }; - }; - }); - } else if next_fetch - now < delay { - delay = next_fetch - now; - } - }; - queue.clear(); - tokio::time::sleep(delay.to_std()?).await; - delay = chrono::Duration::minutes(1); - } + async fn autofetch(&self) -> Result { + let mut delay = chrono::Duration::minutes(1); + let mut conn = self.pool.acquire().await + .with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?; + let now = chrono::Local::now(); + let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") + .fetch_all(&mut conn).await?; + for row in queue.iter() { + let source_id: i32 = row.try_get("source_id")?; + let owner: i64 = row.try_get("owner")?; + let next_fetch: DateTime = row.try_get("next_fetch")?; + if next_fetch < now { + let clone = Core { + owner_chat: telegram_bot::UserId::new(owner), + ..self.clone() + }; + tokio::spawn(async move { + if let Err(err) = clone.check(&source_id, owner, true).await { + if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None).await { + eprintln!("Check error: {}", err); + }; + }; + }); + } else if next_fetch - now < delay { + delay = next_fetch - now; + } + }; + queue.clear(); + Ok(delay.to_std()?) } pub async fn list(&self, owner: S) -> Result where S: Into { let owner = owner.into(); Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -22,15 +22,15 @@ loop { reply_to = None; match stream.next().await { Some(update) => { if let Err(err) = handle(update?, &core, &reply_to).await { - core.send(&format!("🛑 {:?}", err), reply_to, None)?; + core.send(&format!("🛑 {:?}", err), reply_to, None).await?; }; }, None => { - core.send("🛑 None error.".to_string(), None, None)?; + core.send("🛑 None error.", None, None).await?; } }; } } @@ -44,13 +44,13 @@ "/start" => command::start(core, sender).await, "/list" => command::list(core, sender).await, "/add" | "/update" => command::update(core, sender, words).await, _ => Ok(()), } { - Err(err) => core.send(&format!("🛑 {:?}", err), Some(sender), None)?, + Err(err) => core.send(format!("🛑 {:?}", err), Some(sender), None).await?, Ok(()) => {}, }; }; }; Ok(()) }