Overview
Comment: | rework time, rework autofetch, clippy lint |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA3-256: |
26339860ced242fe99222ccbe448ca42 |
User & Date: | arcade on 2022-02-13 19:57:55.915 |
Other Links: | manifest | tags |
Context
2022-02-15
| ||
14:56 | simplify a little check-in: 093ae6c75b user: arcade tags: trunk | |
2022-02-13
| ||
19:57 | rework time, rework autofetch, clippy lint check-in: 26339860ce user: arcade tags: trunk | |
12:26 | implify, clippy lints check-in: f988dfd28f user: arcade tags: trunk | |
Changes
Modified src/command.rs
from [a4cde7014f]
to [aefdc11026].
1 2 3 4 5 6 7 8 9 10 11 12 | use anyhow::{bail, Context, Result}; use crate::core::Core; use regex::Regex; use sedregex::ReplaceCommand; lazy_static! { static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap(); static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap(); static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap(); } pub async fn start(core: &Core, sender: telegram_bot::UserId) -> Result<()> { | > | | | > | | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 | use anyhow::{bail, Context, Result}; use crate::core::Core; use regex::Regex; use sedregex::ReplaceCommand; use std::borrow::Cow; lazy_static! { static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap(); static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap(); static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap(); } pub async fn start(core: &Core, sender: telegram_bot::UserId) -> Result<()> { core.send("We are open. Probably. Visit [channel](https://t.me/rsstg_bot_help/3) for details.", Some(sender), None).await?; Ok(()) } pub async fn list(core: &Core, sender: telegram_bot::UserId) -> Result<()> { core.send(core.list(sender).await?, Some(sender), Some(telegram_bot::types::ParseMode::MarkdownV2)).await?; Ok(()) } pub async fn command(core: &Core, sender: telegram_bot::UserId, command: Vec<&str>) -> Result<()> { let msg: Cow<str> = match &command[1].parse::<i32>() { Err(err) => format!("I need a number.\n{}", &err).into(), Ok(number) => match command[0] { "/check" => core.check(number, sender, false).await .context("Channel check failed.")?, "/clean" => core.clean(number, sender).await?, "/enable" => core.enable(number, sender).await?.into(), "/delete" => core.delete(number, sender).await?, "/disable" => core.disable(number, sender).await?.into(), _ => bail!("Command {} not handled.", &command[0]), }, }; core.send(msg, Some(sender), None).await?; Ok(()) } pub async fn update(core: &Core, sender: telegram_bot::UserId, command: Vec<&str>) -> Result<()> { let mut source_id: Option<i32> = None; let at_least = "Requires at least 3 parameters."; let first_word = command[0]; |
︙ | ︙ | |||
97 98 99 100 101 102 103 | }; if admin.user.id == sender { user = true; }; }; if ! me { bail!("I need to be admin on that channel."); }; if ! user { bail!("You should be admin on that channel."); }; | | | 99 100 101 102 103 104 105 106 107 108 | }; if admin.user.id == sender { user = true; }; }; if ! me { bail!("I need to be admin on that channel."); }; if ! user { bail!("You should be admin on that channel."); }; core.send(core.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await?, Some(sender), None).await?; Ok(()) } |
Modified src/core.rs
from [3f61fa0940]
to [a6c540ecfc].
︙ | ︙ | |||
40 41 42 43 44 45 46 | .connect_timeout(std::time::Duration::new(300, 0)) .idle_timeout(std::time::Duration::new(60, 0)) .connect_lazy(&settings.get_str("pg")?)?, sources: Arc::new(Mutex::new(HashSet::new())), }); let clone = core.clone(); tokio::spawn(async move { | > | > | | | > > > > > | < < | < < < | | | 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 | .connect_timeout(std::time::Duration::new(300, 0)) .idle_timeout(std::time::Duration::new(60, 0)) .connect_lazy(&settings.get_str("pg")?)?, sources: Arc::new(Mutex::new(HashSet::new())), }); let clone = core.clone(); tokio::spawn(async move { loop { let delay = match &clone.autofetch().await { Err(err) => { if let Err(err) = clone.send(format!("🛑 {:?}", err), None, None).await { eprintln!("Autofetch error: {}", err); }; tokio::time::Duration::from_secs(60) }, Ok(time) => *time, }; tokio::time::sleep(delay).await; } }); Ok(core) } pub fn stream(&self) -> telegram_bot::UpdatesStream { self.tg.stream() } pub async fn send<'a, S>(&self, msg: S, target: Option<telegram_bot::UserId>, mode: Option<telegram_bot::types::ParseMode>) -> Result<()> where S: Into<Cow<'a, str>> { let msg = msg.into(); let mode = mode.unwrap_or(telegram_bot::types::ParseMode::Html); let target = target.unwrap_or(self.owner_chat); self.tg.send(telegram_bot::SendMessage::new(target, msg).parse_mode(mode)).await?; Ok(()) } pub async fn check<S>(&self, id: &i32, owner: S, real: bool) -> Result<Cow<'_, str>> where S: Into<i64> { let owner = owner.into(); |
︙ | ︙ | |||
119 120 121 122 123 124 125 | for item in feed.items() { if let Some(link) = item.link() { let date = match item.pub_date() { Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), }?; let url = link; | | | | 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 | for item in feed.items() { if let Some(link) = item.link() { let date = match item.pub_date() { Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), }?; let url = link; posts.insert(date, url.to_string()); } }; }, Err(err) => match err { rss::Error::InvalidStartTag => { let feed = atom_syndication::Feed::read_from(&content[..]) .with_context(|| format!("Problem opening feed url:\n{}\n{}", &url, status))?; for item in feed.entries() { let date = item.published().unwrap(); let url = item.links()[0].href(); posts.insert(*date, url.to_string()); }; }, rss::Error::Eof => (), _ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &url, err, status) } }; for (date, url) in posts.iter() { |
︙ | ︙ | |||
251 252 253 254 255 256 257 | .rows_affected() { 1 => { Ok("Source disabled.") }, 0 => { Ok("Source not found.") }, _ => { Err(anyhow!("Database error.")) }, } } | | | | | 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 | .rows_affected() { 1 => { Ok("Source disabled.") }, 0 => { Ok("Source not found.") }, _ => { Err(anyhow!("Database error.")) }, } } pub async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result<&str> where S: Into<i64> { let owner = owner.into(); let mut conn = self.pool.acquire().await .with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?; match match update { Some(id) => { sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1").bind(id) |
︙ | ︙ | |||
296 297 298 299 300 301 302 | }, Err(err) => { bail!("Sorry, unknown error:\n{:#?}\n", err); }, } } | | < < | | | | | | | | | | | | | | | | | | | | | | | | | | | < < | 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 | }, Err(err) => { bail!("Sorry, unknown error:\n{:#?}\n", err); }, } } async fn autofetch(&self) -> Result<std::time::Duration> { let mut delay = chrono::Duration::minutes(1); let mut conn = self.pool.acquire().await .with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?; let now = chrono::Local::now(); let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") .fetch_all(&mut conn).await?; for row in queue.iter() { let source_id: i32 = row.try_get("source_id")?; let owner: i64 = row.try_get("owner")?; let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?; if next_fetch < now { let clone = Core { owner_chat: telegram_bot::UserId::new(owner), ..self.clone() }; tokio::spawn(async move { if let Err(err) = clone.check(&source_id, owner, true).await { if let Err(err) = clone.send(&format!("🛑 {:?}", err), None, None).await { eprintln!("Check error: {}", err); }; }; }); } else if next_fetch - now < delay { delay = next_fetch - now; } }; queue.clear(); Ok(delay.to_std()?) } pub async fn list<S>(&self, owner: S) -> Result<String> where S: Into<i64> { let owner = owner.into(); let mut reply: Vec<Cow<str>> = vec![]; |
︙ | ︙ |
Modified src/main.rs
from [9c3b6abd62]
to [ff220cfc17].
︙ | ︙ | |||
20 21 22 23 24 25 26 | let mut reply_to: Option<telegram_bot::UserId>; loop { reply_to = None; match stream.next().await { Some(update) => { if let Err(err) = handle(update?, &core, &reply_to).await { | | | | | 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 | let mut reply_to: Option<telegram_bot::UserId>; loop { reply_to = None; match stream.next().await { Some(update) => { if let Err(err) = handle(update?, &core, &reply_to).await { core.send(&format!("🛑 {:?}", err), reply_to, None).await?; }; }, None => { core.send("🛑 None error.", None, None).await?; } }; } } async fn handle(update: telegram_bot::Update, core: &core::Core, mut _reply_to: &Option<telegram_bot::UserId>) -> Result<()> { if let telegram_bot::UpdateKind::Message(message) = update.kind { if let telegram_bot::MessageKind::Text { ref data, .. } = message.kind { let sender = message.from.id; let words: Vec<&str> = data.split_whitespace().collect(); match match words[0] { "/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(core, sender, words).await, "/start" => command::start(core, sender).await, "/list" => command::list(core, sender).await, "/add" | "/update" => command::update(core, sender, words).await, _ => Ok(()), } { Err(err) => core.send(format!("🛑 {:?}", err), Some(sender), None).await?, Ok(()) => {}, }; }; }; Ok(()) } |