Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,26 +1,24 @@ [package] name = "rsstg" -version = "0.1.6" +version = "0.1.7" authors = ["arcade"] edition = "2018" [dependencies] +chrono = "*" config = "*" - futures = "*" +futures-util = "*" +regex = "*" +rss = { version = "*", features = [ "from_url" ] } +sqlx = { version = "*", features = [ "postgres", "tls", "runtime-async-std-native-tls", "chrono" ] } telegram-bot = "*" tokio = { version = "0.2", features = ["macros" ] } -futures-util = "*" - -sqlx = { version = "*", features = [ "postgres", "tls", "runtime-async-std-native-tls", "chrono" ] } - -regex = "*" - -rss = { version = "*", features = [ "from_url" ] } - -chrono = "*" +lazy_static = "*" + +anyhow = "*" [profile.release] lto = true codegen-units = 1 Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -11,12 +11,18 @@ use tokio::stream::StreamExt; use telegram_bot::*; use sqlx::postgres::PgPoolOptions; use sqlx::Row; +use sqlx::Done; + +#[macro_use] +extern crate lazy_static; + +use anyhow::{anyhow, Context, Result}; -type Result = std::result::Result>; +//type Result = std::result::Result>; #[derive(Clone)] struct Core { owner: i64, api_key: String, @@ -187,46 +193,38 @@ }, }; Ok(()) } - async fn enable(&self, source_id: &i32) -> Result<()> { - match self.pool.acquire().await { - Err(err) => { - self.debug(&format!("πŸ›‘ Enable fetch conn:\n{}\n{:?}", &err, &self.pool))?; - }, - Ok(mut conn) => { - match sqlx::query("update rsstg_source set enabled = true where source_id = $1") - .bind(source_id) - .execute(&mut conn).await { - Err(err) => { - self.debug(&format!("πŸ›‘ Enable source:\n{}\n{:?}", &err, &self.pool))?; - }, - Ok(_) => {}, - } - }, - }; - Ok(()) - } - - async fn disable(&self, source_id: &i32) -> Result<()> { - match self.pool.acquire().await { - Err(err) => { - self.debug(&format!("πŸ›‘ Disable fetch conn:\n{}\n{:?}", &err, &self.pool))?; - }, - Ok(mut conn) => { - match sqlx::query("update rsstg_source set enabled = false where source_id = $1") - .bind(source_id) - .execute(&mut conn).await { - Err(err) => { - self.debug(&format!("πŸ›‘ Disable source:\n{}\n{:?}", &err, &self.pool))?; - }, - Ok(_) => {}, - } - }, - }; - Ok(()) + async fn enable(&self, source_id: &i32, id: telegram_bot::UserId) -> Result<&str> { + let mut conn = self.pool.acquire().await + .with_context(|| format!("πŸ›‘ Enable fetch conn:\n{:?}", &self.pool))?; + match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2") + .bind(source_id) + .bind(i64::from(id)) + .execute(&mut conn).await + .with_context(|| format!("πŸ›‘ Enable source:\n\n{:?}", &self.pool))? + .rows_affected() { + 1 => { Ok("Source disabled\\.") }, + 0 => { Ok("Source not found\\.") }, + _ => { Err(anyhow!("Database error.")) }, + } + } + + async fn disable(&self, source_id: &i32, id: telegram_bot::UserId) -> Result<&str> { + let mut conn = self.pool.acquire().await + .with_context(|| format!("πŸ›‘ Disable fetch conn:\n{:?}", &self.pool))?; + match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2") + .bind(source_id) + .bind(i64::from(id)) + .execute(&mut conn).await + .with_context(|| format!("πŸ›‘ Disable source:\n\n{:?}", &self.pool))? + .rows_affected() { + 1 => { Ok("Source disabled\\.") }, + 0 => { Ok("Source not found\\.") }, + _ => { Err(anyhow!("Database error.")) }, + } } async fn autofetch(&self) -> Result<()> { let mut delay = chrono::Duration::minutes(5); let mut now; @@ -276,292 +274,290 @@ #[tokio::main] async fn main() -> Result<()> { let mut settings = config::Config::default(); settings.merge(config::File::with_name("rsstg"))?; - let re_username = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$")?; - let re_link = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$")?; - let re_iv_hash = Regex::new(r"^[a-f0-9]{14}$")?; - let core = Core::new(settings).await?; let mut stream = core.stream(); while let Some(update) = stream.next().await { - match update { - Err(err) => { - core.debug(&err.to_string())?; - }, - Ok(update) => { - match update.kind { - UpdateKind::Message(message) => { - let mut reply: Vec = vec![]; - match message.kind { - MessageKind::Text { ref data, .. } => { - let mut words = data.split_whitespace(); - let cmd = words.next().unwrap(); - match cmd { - - // start - - "/start" => { - reply.push("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.".to_string()); - }, - - // list - - "/list" => { - match core.pool.acquire().await { - Err(err) => { - core.debug(&format!("πŸ›‘ Disable fetch conn:\n{}\n{:?}", &err, &core.pool))?; - }, - Ok(mut conn) => { - reply.push("Channels:".to_string()); - let rows = sqlx::query("select source_id, username, enabled, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1 order by source_id") - .bind(i64::from(message.from.id)) - .fetch_all(&mut conn).await?; - for row in rows.iter() { - //while let Some(row) = rows.try_next().await? { - let source_id: i32 = row.try_get("source_id")?; - let username: &str = row.try_get("username")?; - let enabled: bool = row.try_get("enabled")?; - let url: &str = row.try_get("url")?; - let iv_hash: Option<&str> = row.try_get("iv_hash")?; - reply.push(format!("\n\\#️⃣ {} \\*️⃣ `{}` {}\nπŸ”— `{}`", source_id, username, - match enabled { - true => "πŸ”„ enabled", - false => "β›” disabled", - }, url)); - if let Some(hash) = iv_hash { - reply.push(format!("IV `{}`", hash)); - } - } - }, - }; - }, - - // add - - "/add" | "/update" => { - let mut source_id: i32 = 0; - if cmd == "/update" { - source_id = words.next().unwrap().parse::()?; - } - let (channel, url, iv_hash) = (words.next().unwrap(), words.next().unwrap(), words.next()); - let ok_link = re_link.is_match(&url); - let ok_hash = match iv_hash { - Some(hash) => re_iv_hash.is_match(&hash), - None => true, - }; - if ! ok_link { - reply.push("Link should be link to atom/rss feed, something like \"https://domain/path\"\\.".to_string()); - core.debug(&format!("Url: {:?}", &url))?; - } - if ! ok_hash { - reply.push("IV hash should be 14 hex digits.".to_string()); - core.debug(&format!("IV: {:?}", &iv_hash))?; - } - if ok_link && ok_hash { - let chan: Option = match sqlx::query("select channel_id from rsstg_channel where username = $1") - .bind(channel) - .fetch_one(&core.pool).await { - Ok(chan) => Some(chan.try_get("channel_id")?), - Err(sqlx::Error::RowNotFound) => { - reply.push("Sorry, I don't know about that channel. Please, add a channel with /addchan.".to_string()); - None - }, - Err(err) => { - reply.push("Sorry, unknown error\\.".to_string()); - core.debug(&format!("Sorry, unknown error:\n{:#?}\n", err))?; - None - }, - }; - if let Some(chan) = chan { - match if cmd == "/update" { - sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $4 where source_id = $1").bind(source_id) - } else { - sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner) values ($1, $2, $3, $4)") - } - .bind(chan) - .bind(url) - .bind(iv_hash) - .bind(i64::from(message.from.id)) - .execute(&core.pool).await { - Ok(_) => reply.push("Channel added\\.".to_string()), - Err(sqlx::Error::Database(err)) => { - match err.downcast::().routine() { - Some("_bt_check_unique", ) => { - reply.push("Duplicate key\\.".to_string()); - }, - Some(_) => { - reply.push("Database error\\.".to_string()); - }, - None => { - reply.push("No database error extracted\\.".to_string()); - }, - }; - }, - Err(err) => { - reply.push("Sorry, unknown error\\.".to_string()); - core.debug(&format!("Sorry, unknown error:\n{:#?}\n", err))?; - }, - }; - }; - }; - }, - - // addchan - - "/addchan" => { - let channel = words.next().unwrap(); - if ! re_username.is_match(&channel) { - reply.push("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?".to_string()); - } else { - let chan: Option = match sqlx::query("select channel_id from rsstg_channel where username = $1") - .bind(channel) - .fetch_one(&core.pool).await { - Ok(chan) => Some(chan.try_get("channel_id")?), - Err(sqlx::Error::RowNotFound) => None, - Err(err) => { - reply.push("Sorry, unknown error\\.".to_string()); - core.debug(&format!("Sorry, unknown error:\n{:#?}\n", err))?; - None - }, - }; - match chan { - Some(chan) => { - let new_chan = core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatId::new(chan))).await?; - if i64::from(new_chan.id()) == chan { - reply.push("I already know that channel\\.".to_string()); - } else { - reply.push("Hmm, channel has changed… I'll fix it later\\.".to_string()); - }; - }, - None => { - match core.tg.send(telegram_bot::GetChatAdministrators::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await { - Ok(chan_adm) => { - let (mut me, mut user) = (false, false); - for admin in &chan_adm { - if admin.user.id == core.my.id { - me = true; - }; - if admin.user.id == message.from.id { - user = true; - }; - }; - if ! me { reply.push("I need to be admin on that channel\\.".to_string()); }; - if ! user { reply.push("You should be admin on that channel\\.".to_string()); }; - if me && user { - let chan_id = core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await?; - sqlx::query("insert into rsstg_channel (channel_id, username) values ($1, $2);") - .bind(i64::from(chan_id.id())) - .bind(channel) - .execute(&core.pool).await?; - reply.push("Good, I know that channel now\\.\n".to_string()); - }; - }, - Err(_) => { - reply.push("Sorry, I have no access to that chat\\.".to_string()); - }, - }; - }, - }; - }; - }, - - // check - - "/check" => { - match &words.next().unwrap().parse::() { - Err(err) => { - reply.push(format!("I need a number\\.\n{}", &err)); - }, - Ok(number) => { - match &core.check(number, false).await { - Ok(_) => { - reply.push("Channel enabled\\.".to_string()); - } - Err(err) => { - core.debug(&format!("πŸ›‘ Channel check failed:\n{}", &err))?; - }, - }; - }, - }; - }, - - // clean - - "/clean" => { - if core.owner != i64::from(message.from.id) { - reply.push("Reserved for testing\\.".to_string()); - } else { - let source_id = words.next().unwrap().parse::().unwrap_or(0); - &core.clean(source_id).await?; - } - }, - - // enable - - "/enable" => { - match &words.next().unwrap().parse::() { - Err(err) => { - reply.push(format!("I need a number\\.\n{}", &err)); - }, - Ok(number) => { - match core.enable(&number).await { - Ok(_) => { - reply.push("Channel enabled\\.".to_string()); - } - Err(err) => { - core.debug(&err.to_string())?; - }, - }; - }, - }; - }, - - // disable - - "/disable" => { - match &words.next().unwrap().parse::() { - Err(err) => { - reply.push(format!("I need a number\\.\n{}", &err)); - }, - Ok(number) => { - match core.disable(&number).await { - Ok(_) => { - reply.push("Channel disabled\\.".to_string()); - } - Err(err) => { - core.debug(&err.to_string())?; - }, - }; - }, - }; - }, - - _ => { - }, - }; - }, - _ => { - }, - }; - if reply.len() > 0 { - match core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await { - Ok(_) => {}, - Err(err) => { - dbg!(reply.join("\n")); - println!("{}", err); - }, - } - } - }, - _ => {}, - }; - }, - - }; - } + match handle(update?, &core).await { + Ok(_) => {}, + Err(err) => { + core.debug(&err.to_string())?; + } + }; + } + + Ok(()) +} + +async fn handle(update: telegram_bot::Update, core: &Core) -> Result<()> { + lazy_static! { + static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap(); + static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap(); + static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap(); + } + + match update.kind { + UpdateKind::Message(message) => { + let mut reply: Vec = vec![]; + match message.kind { + MessageKind::Text { ref data, .. } => { + let mut words = data.split_whitespace(); + let cmd = words.next().unwrap(); + match cmd { + +// start + + "/start" => { + reply.push("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.".to_string()); + }, + +// list + + "/list" => { + match core.pool.acquire().await { + Err(err) => { + core.debug(&format!("πŸ›‘ Disable fetch conn:\n{}\n{:?}", &err, &core.pool))?; + }, + Ok(mut conn) => { + reply.push("Channels:".to_string()); + let rows = sqlx::query("select source_id, username, enabled, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1 order by source_id") + .bind(i64::from(message.from.id)) + .fetch_all(&mut conn).await?; + for row in rows.iter() { + //while let Some(row) = rows.try_next().await? { + let source_id: i32 = row.try_get("source_id")?; + let username: &str = row.try_get("username")?; + let enabled: bool = row.try_get("enabled")?; + let url: &str = row.try_get("url")?; + let iv_hash: Option<&str> = row.try_get("iv_hash")?; + reply.push(format!("\n\\#️⃣ {} \\*️⃣ `{}` {}\nπŸ”— `{}`", source_id, username, + match enabled { + true => "πŸ”„ enabled", + false => "β›” disabled", + }, url)); + if let Some(hash) = iv_hash { + reply.push(format!("IV `{}`", hash)); + } + } + }, + }; + }, + +// add + + "/add" | "/update" => { + let mut source_id: i32 = 0; + if cmd == "/update" { + source_id = words.next().unwrap().parse::()?; + } + let (channel, url, iv_hash) = (words.next().unwrap(), words.next().unwrap(), words.next()); + let ok_link = RE_LINK.is_match(&url); + let ok_hash = match iv_hash { + Some(hash) => RE_IV_HASH.is_match(&hash), + None => true, + }; + if ! ok_link { + reply.push("Link should be link to atom/rss feed, something like \"https://domain/path\"\\.".to_string()); + core.debug(&format!("Url: {:?}", &url))?; + } + if ! ok_hash { + reply.push("IV hash should be 14 hex digits.".to_string()); + core.debug(&format!("IV: {:?}", &iv_hash))?; + } + if ok_link && ok_hash { + let chan: Option = match sqlx::query("select channel_id from rsstg_channel where username = $1") + .bind(channel) + .fetch_one(&core.pool).await { + Ok(chan) => Some(chan.try_get("channel_id")?), + Err(sqlx::Error::RowNotFound) => { + let chan_id = i64::from(core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await?.id()); + sqlx::query("insert into rsstg_channel (channel_id, username) values ($1, $2);") + .bind(chan_id) + .bind(channel) + .execute(&core.pool).await?; + Some(chan_id) + }, + Err(err) => { + reply.push("Sorry, unknown error\\.".to_string()); + core.debug(&format!("Sorry, unknown error:\n{:#?}\n", err))?; + None + }, + }; + if let Some(chan) = chan { + match if cmd == "/update" { + sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $4 where source_id = $1").bind(source_id) + } else { + sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner) values ($1, $2, $3, $4)") + } + .bind(chan) + .bind(url) + .bind(iv_hash) + .bind(i64::from(message.from.id)) + .execute(&core.pool).await { + Ok(_) => reply.push("Channel added\\.".to_string()), + Err(sqlx::Error::Database(err)) => { + match err.downcast::().routine() { + Some("_bt_check_unique", ) => { + reply.push("Duplicate key\\.".to_string()); + }, + Some(_) => { + reply.push("Database error\\.".to_string()); + }, + None => { + reply.push("No database error extracted\\.".to_string()); + }, + }; + }, + Err(err) => { + reply.push("Sorry, unknown error\\.".to_string()); + core.debug(&format!("Sorry, unknown error:\n{:#?}\n", err))?; + }, + }; + }; + }; + }, + +// addchan + + "/addchan" => { + let channel = words.next().unwrap(); + if ! RE_USERNAME.is_match(&channel) { + reply.push("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?".to_string()); + } else { + let chan: Option = match sqlx::query("select channel_id from rsstg_channel where username = $1") + .bind(channel) + .fetch_one(&core.pool).await { + Ok(chan) => Some(chan.try_get("channel_id")?), + Err(sqlx::Error::RowNotFound) => None, + Err(err) => { + reply.push("Sorry, unknown error\\.".to_string()); + core.debug(&format!("Sorry, unknown error:\n{:#?}", err))?; + None + }, + }; + match chan { + Some(chan) => { + let new_chan = core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatId::new(chan))).await?; + if i64::from(new_chan.id()) == chan { + reply.push("I already know that channel\\.".to_string()); + } else { + reply.push("Hmm, channel has changed… I'll fix it later\\.".to_string()); + }; + }, + None => { + match core.tg.send(telegram_bot::GetChatAdministrators::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await { + Ok(chan_adm) => { + let (mut me, mut user) = (false, false); + for admin in &chan_adm { + if admin.user.id == core.my.id { + me = true; + }; + if admin.user.id == message.from.id { + user = true; + }; + }; + if ! me { reply.push("I need to be admin on that channel\\.".to_string()); }; + if ! user { reply.push("You should be admin on that channel\\.".to_string()); }; + if me && user { + let chan_id = core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await?; + sqlx::query("insert into rsstg_channel (channel_id, username) values ($1, $2);") + .bind(i64::from(chan_id.id())) + .bind(channel) + .execute(&core.pool).await?; + reply.push("Good, I know that channel now\\.\n".to_string()); + }; + }, + Err(_) => { + reply.push("Sorry, I have no access to that chat\\.".to_string()); + }, + }; + }, + }; + }; + }, + +// check + + "/check" => { + match &words.next().unwrap().parse::() { + Err(err) => { + reply.push(format!("I need a number\\.\n{}", &err)); + }, + Ok(number) => { + match &core.check(number, false).await { + Ok(_) => { + reply.push("Channel enabled\\.".to_string()); + } + Err(err) => { + core.debug(&format!("πŸ›‘ Channel check failed:\n{}", &err))?; + }, + }; + }, + }; + }, + +// clean + + "/clean" => { + if core.owner != i64::from(message.from.id) { + reply.push("Reserved for testing\\.".to_string()); + } else { + let source_id = words.next().unwrap().parse::().unwrap_or(0); + &core.clean(source_id).await?; + } + }, + +// enable + + "/enable" => { + match &words.next().unwrap().parse::() { + Err(err) => { + reply.push(format!("I need a number\\.\n{}", &err)); + }, + Ok(number) => { + let result = core.enable(&number, message.from.id).await?; + reply.push(result.to_string()); + }, + }; + }, + +// disable + + "/disable" => { + match &words.next().unwrap().parse::() { + Err(err) => { + reply.push(format!("I need a number\\.\n{}", &err)); + }, + Ok(number) => { + let result = core.disable(&number, message.from.id).await?; + reply.push(result.to_string()); + }, + }; + }, + + _ => { + }, + }; + }, + _ => { + }, + }; + + if reply.len() > 0 { + match core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await { + Ok(_) => {}, + Err(err) => { + dbg!(reply.join("\n")); + println!("{}", err); + }, + } + } + }, + _ => {}, + }; Ok(()) }