Index: Cargo.lock ================================================================== --- Cargo.lock +++ Cargo.lock @@ -208,13 +208,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" [[package]] name = "cc" -version = "1.0.71" +version = "1.0.72" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd" +checksum = "22a9137b95ea06864e018375b72adfb7db6e6f68cfc8df5a04d00288050485ee" [[package]] name = "cfg-if" version = "0.1.10" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1208,13 +1208,13 @@ "static_assertions", ] [[package]] name = "libc" -version = "0.2.106" +version = "0.2.107" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a60553f9a9e039a333b4e9b20573b9e9b9c0bb3a11e201ccc48ef4283456d673" +checksum = "fbe5e23404da5b4f555ef85ebed98fb4083e55a00c317800bc2a50ede9f3d219" [[package]] name = "linked-hash-map" version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2046,11 +2046,11 @@ "reqwest 0.9.24", ] [[package]] name = "rsstg" -version = "0.1.19" +version = "0.2.0" dependencies = [ "anyhow", "atom_syndication", "chrono", "config", @@ -2058,10 +2058,11 @@ "futures-util", "lazy_static", "regex", "reqwest 0.11.6", "rss", + "sedregex", "sqlx", "telegram-bot", "tokio 1.13.0", ] @@ -2128,10 +2129,19 @@ checksum = "a9dd14d83160b528b7bfd66439110573efcfbe281b17fc2ca9f39f550d619c7e" dependencies = [ "core-foundation-sys", "libc", ] + +[[package]] +name = "sedregex" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19411e23596093f03bbd11dc45603b6329bb4bfec77b9fd13e2b9fc9b02efe3e" +dependencies = [ + "regex", +] [[package]] name = "semver" version = "0.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2194,13 +2204,13 @@ "syn", ] [[package]] name = "serde_json" -version = "1.0.69" +version = "1.0.70" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e466864e431129c7e0d3476b92f20458e5879919a0596c6472738d9fa2d342f8" +checksum = "e277c495ac6cd1a01a58d0a0c574568b4d1ddf14f59965c6a58b8d96400b54f3" dependencies = [ "itoa", "ryu", "serde 1.0.130", ] @@ -2526,13 +2536,13 @@ "winapi 0.3.9", ] [[package]] name = "tinyvec" -version = "1.5.0" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f83b2a3d4d9091d0abd7eba4dc2710b1718583bd4d8992e2190720ea38f391f7" +checksum = "2c1c1d5a42b6245520c249549ec267180beaffcc0615401ac8e31853d4b6d8d2" dependencies = [ "tinyvec_macros", ] [[package]] @@ -3010,13 +3020,13 @@ "wasm-bindgen", ] [[package]] name = "whoami" -version = "1.1.5" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "483a59fee1a93fec90eb08bc2eb4315ef10f4ebc478b3a5fadc969819cb66117" +checksum = "c33ac5ee236a4efbf2c98967e12c6cc0c51d93a744159a52957ba206ae6ef5f7" dependencies = [ "wasm-bindgen", "web-sys", ] Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "rsstg" -version = "0.1.19" +version = "0.2.0" authors = ["arcade"] edition = "2018" [dependencies] anyhow = "*" @@ -12,15 +12,15 @@ futures = "*" futures-util = "*" lazy_static = "*" regex = "*" reqwest = { version = "*", features = [ "brotli", "gzip", "deflate" ]} -#reqwest = "~0.10" rss = { version = "*", features = [ "from_url" ] } +sedregex = "*" sqlx = { version = "*", features = [ "postgres", "tls", "runtime-tokio-native-tls", "chrono" ] } #telegram-bot = "*" telegram-bot = { git = "https://github.com/telegram-rs/telegram-bot" } tokio = { version = "1.2", features = [ "macros", "net", "rt-multi-thread", "time" ] } [profile.release] lto = true codegen-units = 1 ADDED src/command.rs Index: src/command.rs ================================================================== --- /dev/null +++ src/command.rs @@ -0,0 +1,96 @@ +use anyhow::{bail, Context, Result}; +use crate::core::Core; +use regex::Regex; +use sedregex::ReplaceCommand; +use telegram_bot; + +lazy_static! { + static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap(); + static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap(); + static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap(); +} + +pub async fn start(core: &Core, sender: telegram_bot::UserId) -> Result<()> { + core.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.", Some(sender))?; + Ok(()) +} + +pub async fn list(core: &Core, sender: telegram_bot::UserId) -> Result<()> { + core.send(core.list(sender).await?.join("\n"), Some(sender))?; + Ok(()) +} + +pub async fn command(core: &Core, sender: telegram_bot::UserId, command: Vec<&str>) -> Result<()> { + core.send( match &command[1].parse::() { + Err(err) => format!("I need a number\\.\n{}", &err), + Ok(number) => match command[0] { + "/check" => core.check(&number, sender, false).await + .context("Channel check failed.")?, + "/clean" => core.clean(&number, sender).await?, + "/enable" => core.enable(&number, sender).await? + .to_string(), + "/delete" => core.delete(&number, sender).await?, + "/disable" => core.disable(&number, sender).await? + .to_string(), + _ => bail!("Command {} not handled.", &command[0]), + }, + }, Some(sender))?; + Ok(()) +} + +pub async fn update(core: &Core, sender: telegram_bot::UserId, command: Vec<&str>) -> Result<()> { + let mut source_id: Option = None; + let at_least = "Requires at least 3 parameters."; + let first_word = command[0]; + let command = match first_word { + "/update" => { + source_id = Some(command[1].parse::() + .context(format!("I need a number, but got {}.", command[1]))?); + &command[2..] + }, + "/add" => &command[1..], + _ => bail!("Passing {} is not possible here.", command[1]), + }; + let mut i_command = command.into_iter(); + let (channel, url, iv_hash, url_re) = ( + i_command.next().context(at_least)?, + i_command.next().context(at_least)?, + i_command.next(), + i_command.next()); + if ! RE_USERNAME.is_match(&channel) { + core.send(format!("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?\nNot {:?}", &channel), Some(sender))?; + return Ok(()) + } + if ! RE_LINK.is_match(&url) { + core.send(format!("Link should be a link to atom/rss feed, something like \"https://domain/path\".\nNot {:?}", &url), Some(sender))?; + return Ok(()) + } + let iv_hash = match iv_hash { + Some(hash) => { + if ! RE_IV_HASH.is_match(hash) { + core.send(format!("IV hash should be 14 hex digits.\nNot {:?}", hash), Some(sender))?; + return Ok(()) + }; + Some(*hash) + } + None => None, + }; + if let Some(rex) = url_re { + let _url_rex = ReplaceCommand::new(rex).context("Regexp parsing error:")?; + }; + let channel_id = i64::from(core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await?.id()); + let chan_adm = core.tg.send(telegram_bot::GetChatAdministrators::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await + .context("Sorry, I have no access to that chat\\.")?; + let (mut me, mut user) = (false, false); + for admin in chan_adm { + if admin.user.id == core.my.id { + me = true; + }; + if admin.user.id == sender { + user = true; + }; + }; + if ! me { bail!("I need to be admin on that channel\\."); }; + if ! user { bail!("You should be admin on that channel\\."); }; + core.send(core.update(source_id, channel, channel_id, url, iv_hash, None, sender).await?, Some(sender)) +} ADDED src/core.rs Index: src/core.rs ================================================================== --- /dev/null +++ src/core.rs @@ -0,0 +1,369 @@ +use anyhow::{anyhow, bail, Context, Result}; +use atom_syndication; +use chrono::DateTime; +use config; +use regex::Regex; +use reqwest; +use sqlx::{ + postgres::PgPoolOptions, + Row, +}; +use rss; +use std::{ + collections::{ + BTreeMap, + HashSet, + }, + sync::{Arc, Mutex}, +}; +use telegram_bot; + +#[derive(Clone)] +pub struct Core { + owner: i64, + api_key: String, + owner_chat: telegram_bot::UserId, + pub tg: telegram_bot::Api, + pub my: telegram_bot::User, + pool: sqlx::Pool, + sources: Arc>>>, +} + +impl Core { + pub async fn new(settings: config::Config) -> Result { + let owner = settings.get_int("owner")?; + let api_key = settings.get_str("api_key")?; + let tg = telegram_bot::Api::new(&api_key); + let core = Core { + owner: owner, + api_key: api_key.clone(), + my: tg.send(telegram_bot::GetMe).await?, + tg: tg, + owner_chat: telegram_bot::UserId::new(owner), + pool: PgPoolOptions::new() + .max_connections(5) + .connect_timeout(std::time::Duration::new(300, 0)) + .idle_timeout(std::time::Duration::new(60, 0)) + .connect_lazy(&settings.get_str("pg")?)?, + sources: Arc::new(Mutex::new(HashSet::new())), + }; + let clone = core.clone(); + tokio::spawn(async move { + if let Err(err) = &clone.autofetch().await { + if let Err(err) = clone.send(&format!("šŸ›‘ {:?}", err), None) { + eprintln!("Autofetch error: {}", err); + }; + } + }); + Ok(core) + } + + pub fn stream(&self) -> telegram_bot::UpdatesStream { + self.tg.stream() + } + + pub fn send(&self, msg: S, target: Option) -> Result<()> + where S: Into { + let msg: String = msg.into(); + self.tg.spawn(telegram_bot::SendMessage::new(match target { + Some(user) => user, + None => self.owner_chat, + }, msg.to_owned())); + Ok(()) + } + + pub async fn check(&self, id: &i32, owner: S, real: bool) -> Result + where S: Into { + let mut posted: i32 = 0; + let owner: i64 = owner.into(); + let id = { + let mut set = self.sources.lock().unwrap(); + match set.get(id) { + Some(id) => id.clone(), + None => { + let id = Arc::new(*id); + set.insert(id.clone()); + id.clone() + }, + } + }; + let count = Arc::strong_count(&id); + if count == 2 { + let mut conn = self.pool.acquire().await + .with_context(|| format!("Query queue fetch conn:\n{:?}", &self.pool))?; + let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2") + .bind(*id) + .bind(owner) + .fetch_one(&mut conn).await + .with_context(|| format!("Query source:\n{:?}", &self.pool))?; + drop(conn); + let channel_id: i64 = row.try_get("channel_id")?; + let url: &str = row.try_get("url")?; + let iv_hash: Option<&str> = row.try_get("iv_hash")?; + let url_re: Option = match row.try_get("url_re")? { + Some(x) => Some(Regex::new(x)?), + None => None, + }; + let destination = match real { + true => telegram_bot::UserId::new(channel_id), + false => telegram_bot::UserId::new(row.try_get("owner")?), + }; + let mut this_fetch: Option> = None; + let mut posts: BTreeMap, String> = BTreeMap::new(); + let content = reqwest::get(url).await?.bytes().await?; + match rss::Channel::read_from(&content[..]) { + Ok(feed) => { + for item in feed.items() { + match item.link() { + Some(link) => { + let date = match item.pub_date() { + Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), + None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), + }?; + let url = link.to_string(); + posts.insert(date.clone(), url.clone()); + }, + None => {} + } + }; + }, + Err(err) => match err { + rss::Error::InvalidStartTag => { + let feed = atom_syndication::Feed::read_from(&content[..]) + .with_context(|| format!("Problem opening feed url:\n{}", &url))?; + for item in feed.entries() { + let date = item.published().unwrap(); + let url = item.links()[0].href(); + posts.insert(date.clone(), url.to_string()); + }; + }, + rss::Error::Eof => (), + _ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n", &url, err) + } + }; + for (date, url) in posts.iter() { + let mut conn = self.pool.acquire().await + .with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?; + let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") + .bind(&url) + .bind(*id) + .fetch_one(&mut conn).await + .with_context(|| format!("Check post:\n{:?}", &conn))?; + let exists: bool = row.try_get("exists")?; + if ! exists { + if this_fetch == None || *date > this_fetch.unwrap() { + this_fetch = Some(*date); + }; + self.tg.send( match iv_hash { + Some(x) => telegram_bot::SendMessage::new(destination, format!(" {0}", match &url_re { + Some(x) => match x.captures(&url) { + Some(x) => { + bail!("Regex hit, result:\n{:#?}", &x[0]); + &x[0] + }, + None => &url, + }, + None => &url, + }, x)), + None => telegram_bot::SendMessage::new(destination, format!("{}", url)), + }.parse_mode(telegram_bot::types::ParseMode::Html)).await + .context("Can't post message:")?; + sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") + .bind(*id) + .bind(date) + .bind(url) + .execute(&mut conn).await + .with_context(|| format!("Record post:\n{:?}", &conn))?; + drop(conn); + tokio::time::sleep(std::time::Duration::new(4, 0)).await; + }; + posted += 1; + }; + posts.clear(); + }; + let mut conn = self.pool.acquire().await + .with_context(|| format!("Update scrape fetch conn:\n{:?}", &self.pool))?; + sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") + .bind(*id) + .execute(&mut conn).await + .with_context(|| format!("Update scrape:\n{:?}", &conn))?; + Ok(format!("Posted: {}", &posted)) + } + + pub async fn delete(&self, source_id: &i32, owner: S) -> Result + where S: Into { + let owner: i64 = owner.into(); + let mut conn = self.pool.acquire().await + .with_context(|| format!("Delete fetch conn:\n{:?}", &self.pool))?; + match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;") + .bind(source_id) + .bind(owner) + .execute(&mut conn).await + .with_context(|| format!("Delete source rule:\n{:?}", &self.pool))? + .rows_affected() { + 0 => { Ok("No data found found\\.".to_string()) }, + x => { Ok(format!("{} sources removed\\.", x)) }, + } + } + + pub async fn clean(&self, source_id: &i32, owner: S) -> Result + where S: Into { + let owner: i64 = owner.into(); + let mut conn = self.pool.acquire().await + .with_context(|| format!("Clean fetch conn:\n{:?}", &self.pool))?; + match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;") + .bind(source_id) + .bind(owner) + .execute(&mut conn).await + .with_context(|| format!("Clean seen posts:\n{:?}", &self.pool))? + .rows_affected() { + 0 => { Ok("No data found found\\.".to_string()) }, + x => { Ok(format!("{} posts purged\\.", x)) }, + } + } + + pub async fn enable(&self, source_id: &i32, owner: S) -> Result<&str> + where S: Into { + let owner: i64 = owner.into(); + let mut conn = self.pool.acquire().await + .with_context(|| format!("Enable fetch conn:\n{:?}", &self.pool))?; + match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2") + .bind(source_id) + .bind(owner) + .execute(&mut conn).await + .with_context(|| format!("Enable source:\n{:?}", &self.pool))? + .rows_affected() { + 1 => { Ok("Source enabled\\.") }, + 0 => { Ok("Source not found\\.") }, + _ => { Err(anyhow!("Database error.")) }, + } + } + + pub async fn disable(&self, source_id: &i32, owner: S) -> Result<&str> + where S: Into { + let owner: i64 = owner.into(); + let mut conn = self.pool.acquire().await + .with_context(|| format!("Disable fetch conn:\n{:?}", &self.pool))?; + match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2") + .bind(source_id) + .bind(owner) + .execute(&mut conn).await + .with_context(|| format!("Disable source:\n{:?}", &self.pool))? + .rows_affected() { + 1 => { Ok("Source disabled\\.") }, + 0 => { Ok("Source not found\\.") }, + _ => { Err(anyhow!("Database error.")) }, + } + } + + pub async fn update(&self, update: Option, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result + where S: Into { + let owner: i64 = owner.into(); + let mut conn = self.pool.acquire().await + .with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?; + + match match update { + Some(id) => { + sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6 where source_id = $1").bind(id) + }, + None => { + sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)") + }, + } + .bind(channel_id) + .bind(url) + .bind(iv_hash) + .bind(owner) + .bind(channel) + .bind(url_re) + .execute(&mut conn).await { + Ok(_) => return Ok(String::from(match update { + Some(_) => "Channel updated\\.", + None => "Channel added\\.", + })), + Err(sqlx::Error::Database(err)) => { + match err.downcast::().routine() { + Some("_bt_check_unique", ) => { + return Ok("Duplicate key\\.".to_string()) + }, + Some(_) => { + return Ok("Database error\\.".to_string()) + }, + None => { + return Ok("No database error extracted\\.".to_string()) + }, + }; + }, + Err(err) => { + bail!("Sorry, unknown error:\n{:#?}\n", err); + }, + }; + } + + async fn autofetch(&self) -> Result<()> { + let mut delay = chrono::Duration::minutes(1); + let mut now; + loop { + let mut conn = self.pool.acquire().await + .with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?; + now = chrono::Local::now(); + let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") + .fetch_all(&mut conn).await?; + for row in queue.iter() { + let source_id: i32 = row.try_get("source_id")?; + let owner: i64 = row.try_get("owner")?; + let next_fetch: DateTime = row.try_get("next_fetch")?; + if next_fetch < now { + //let clone = self.clone(); + //clone.owner_chat(UserId::new(owner)); + let clone = Core { + owner_chat: telegram_bot::UserId::new(owner), + ..self.clone() + }; + tokio::spawn(async move { + if let Err(err) = clone.check(&source_id, owner, true).await { + if let Err(err) = clone.send(&format!("šŸ›‘ {:?}", err), None) { + eprintln!("Check error: {}", err); + }; + }; + }); + } else { + if next_fetch - now < delay { + delay = next_fetch - now; + } + } + }; + queue.clear(); + tokio::time::sleep(delay.to_std()?).await; + delay = chrono::Duration::minutes(1); + } + } + + pub async fn list(&self, owner: S) -> Result> + where S: Into { + let owner = owner.into(); + let mut reply = vec![]; + let mut conn = self.pool.acquire().await + .with_context(|| format!("List fetch conn:\n{:?}", &self.pool))?; + reply.push("Channels:".to_string()); + let rows = sqlx::query("select source_id, channel, enabled, url, iv_hash from rsstg_source where owner = $1 order by source_id") + .bind(owner) + .fetch_all(&mut conn).await?; + for row in rows.iter() { + let source_id: i32 = row.try_get("source_id")?; + let username: &str = row.try_get("channel")?; + let enabled: bool = row.try_get("enabled")?; + let url: &str = row.try_get("url")?; + let iv_hash: Option<&str> = row.try_get("iv_hash")?; + reply.push(format!("\n\\#ļøāƒ£ {} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", source_id, username, + match enabled { + true => "šŸ”„ enabled", + false => "ā›” disabled", + }, url)); + if let Some(hash) = iv_hash { + reply.push(format!("IV `{}`", hash)); + } + }; + Ok(reply) + } +} Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -1,381 +1,25 @@ -use std::collections::{BTreeMap, HashSet}; -use std::sync::{Arc, Mutex}; +mod command; +mod core; -use chrono::DateTime; use config; use futures::StreamExt; -use regex::Regex; -use reqwest; -use sqlx::postgres::PgPoolOptions; -use sqlx::Row; use tokio; -use rss; -use atom_syndication; - use telegram_bot::*; -//use tokio::stream::StreamExt; #[macro_use] extern crate lazy_static; -use anyhow::{anyhow, bail, Context, Result}; - -#[derive(Clone)] -struct Core { - owner: i64, - api_key: String, - owner_chat: UserId, - tg: telegram_bot::Api, - my: User, - pool: sqlx::Pool, - sources: Arc>>>, -} - -impl Core { - async fn new(settings: config::Config) -> Result { - let owner = settings.get_int("owner")?; - let api_key = settings.get_str("api_key")?; - let tg = Api::new(&api_key); - let core = Core { - owner: owner, - api_key: api_key.clone(), - my: tg.send(telegram_bot::GetMe).await?, - tg: tg, - owner_chat: UserId::new(owner), - pool: PgPoolOptions::new() - .max_connections(5) - .connect_timeout(std::time::Duration::new(300, 0)) - .idle_timeout(std::time::Duration::new(60, 0)) - .connect_lazy(&settings.get_str("pg")?)?, - sources: Arc::new(Mutex::new(HashSet::new())), - }; - let clone = core.clone(); - tokio::spawn(async move { - if let Err(err) = &clone.autofetch().await { - if let Err(err) = clone.debug(&format!("šŸ›‘ {:?}", err), None) { - eprintln!("Autofetch error: {}", err); - }; - } - }); - Ok(core) - } - - fn stream(&self) -> telegram_bot::UpdatesStream { - self.tg.stream() - } - - fn debug(&self, msg: &str, target: Option) -> Result<()> { - self.tg.spawn(SendMessage::new(match target { - Some(user) => user, - None => self.owner_chat, - }, msg)); - Ok(()) - } - - async fn check(&self, id: i32, owner: S, real: bool) -> Result<()> - where S: Into { - let owner: i64 = owner.into(); - let id = { - let mut set = self.sources.lock().unwrap(); - match set.get(&id) { - Some(id) => id.clone(), - None => { - let id = Arc::new(id); - set.insert(id.clone()); - id.clone() - }, - } - }; - let count = Arc::strong_count(&id); - if count == 2 { - let mut conn = self.pool.acquire().await - .with_context(|| format!("Query queue fetch conn:\n{:?}", &self.pool))?; - let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1 and owner = $2") - .bind(*id) - .bind(owner) - .fetch_one(&mut conn).await - .with_context(|| format!("Query source:\n{:?}", &self.pool))?; - drop(conn); - let channel_id: i64 = row.try_get("channel_id")?; - let destination = match real { - true => UserId::new(channel_id), - false => UserId::new(row.try_get("owner")?), - }; - let url: &str = row.try_get("url")?; - let mut this_fetch: Option> = None; - let iv_hash: Option<&str> = row.try_get("iv_hash")?; - let mut posts: BTreeMap, String> = BTreeMap::new(); - let content = reqwest::get(url).await?.bytes().await?; - //let mut content_ = surf::get(url).await.map_err(|err| anyhow!(err))?; - //eprintln!("Data: {:#?}", &content_); - //let content = content_.body_bytes().await.map_err(|err| anyhow!(err))?; - /* - let feed = rss::Channel::read_from(&content[..]) - .with_context(|| format!("Problem opening feed url:\n{}", &url))?; - for item in feed.items() { - let date = match item.pub_date() { - Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), - None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), - }?; - let url = item.link().unwrap().to_string(); - posts.insert(date.clone(), url.clone()); - }; - */ - match rss::Channel::read_from(&content[..]) { - Ok(feed) => { - for item in feed.items() { - match item.link() { - Some(link) => { - let date = match item.pub_date() { - Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), - None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), - }?; - let url = link.to_string(); - posts.insert(date.clone(), url.clone()); - }, - None => {} - } - }; - }, - Err(err) => match err { - rss::Error::InvalidStartTag => { - let feed = atom_syndication::Feed::read_from(&content[..]) - .with_context(|| format!("Problem opening feed url:\n{}", &url))?; - for item in feed.entries() { - let date = item.published().unwrap(); - let url = item.links()[0].href(); - posts.insert(date.clone(), url.to_string()); - }; - }, - rss::Error::Eof => (), - _ => bail!("Unsupported or mangled content:\n{:#?}\n", err) - } - }; - for (date, url) in posts.iter() { - let mut conn = self.pool.acquire().await - .with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?; - let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") - .bind(&url) - .bind(*id) - .fetch_one(&mut conn).await - .with_context(|| format!("Check post:\n{:?}", &conn))?; - let exists: bool = row.try_get("exists")?; - if ! exists { - if this_fetch == None || *date > this_fetch.unwrap() { - this_fetch = Some(*date); - }; - self.tg.send( match iv_hash { - Some(x) => SendMessage::new(destination, format!(" {0}", url, x)), - None => SendMessage::new(destination, format!("{}", url)), - }.parse_mode(types::ParseMode::Html)).await - .context("Can't post message:")?; - sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") - .bind(*id) - .bind(date) - .bind(url) - .execute(&mut conn).await - .with_context(|| format!("Record post:\n{:?}", &conn))?; - drop(conn); - tokio::time::sleep(std::time::Duration::new(4, 0)).await; - }; - }; - posts.clear(); - }; - let mut conn = self.pool.acquire().await - .with_context(|| format!("Update scrape fetch conn:\n{:?}", &self.pool))?; - sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") - .bind(*id) - .execute(&mut conn).await - .with_context(|| format!("Update scrape:\n{:?}", &conn))?; - Ok(()) - } - - async fn delete(&self, source_id: &i32, owner: S) -> Result - where S: Into { - let owner: i64 = owner.into(); - let mut conn = self.pool.acquire().await - .with_context(|| format!("Delete fetch conn:\n{:?}", &self.pool))?; - match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;") - .bind(source_id) - .bind(owner) - .execute(&mut conn).await - .with_context(|| format!("Delete source rule:\n{:?}", &self.pool))? - .rows_affected() { - 0 => { Ok("No data found found\\.".to_string()) }, - x => { Ok(format!("{} sources removed\\.", x)) }, - } - } - - async fn clean(&self, source_id: &i32, owner: S) -> Result - where S: Into { - let owner: i64 = owner.into(); - let mut conn = self.pool.acquire().await - .with_context(|| format!("Clean fetch conn:\n{:?}", &self.pool))?; - match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;") - .bind(source_id) - .bind(owner) - .execute(&mut conn).await - .with_context(|| format!("Clean seen posts:\n{:?}", &self.pool))? - .rows_affected() { - 0 => { Ok("No data found found\\.".to_string()) }, - x => { Ok(format!("{} posts purged\\.", x)) }, - } - } - - async fn enable(&self, source_id: &i32, owner: S) -> Result<&str> - where S: Into { - let owner: i64 = owner.into(); - let mut conn = self.pool.acquire().await - .with_context(|| format!("Enable fetch conn:\n{:?}", &self.pool))?; - match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2") - .bind(source_id) - .bind(owner) - .execute(&mut conn).await - .with_context(|| format!("Enable source:\n{:?}", &self.pool))? - .rows_affected() { - 1 => { Ok("Source enabled\\.") }, - 0 => { Ok("Source not found\\.") }, - _ => { Err(anyhow!("Database error.")) }, - } - } - - async fn disable(&self, source_id: &i32, owner: S) -> Result<&str> - where S: Into { - let owner: i64 = owner.into(); - let mut conn = self.pool.acquire().await - .with_context(|| format!("Disable fetch conn:\n{:?}", &self.pool))?; - match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2") - .bind(source_id) - .bind(owner) - .execute(&mut conn).await - .with_context(|| format!("Disable source:\n{:?}", &self.pool))? - .rows_affected() { - 1 => { Ok("Source disabled\\.") }, - 0 => { Ok("Source not found\\.") }, - _ => { Err(anyhow!("Database error.")) }, - } - } - - async fn update(&self, update: Option, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, owner: S) -> Result - where S: Into { - let owner: i64 = owner.into(); - let mut conn = self.pool.acquire().await - .with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?; - - match match update { - Some(id) => { - sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6 where source_id = $1").bind(id) - }, - None => { - sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel) values ($1, $2, $3, $4, $5)") - }, - } - .bind(channel_id) - .bind(url) - .bind(iv_hash) - .bind(owner) - .bind(channel) - .execute(&mut conn).await { - Ok(_) => return Ok(String::from(match update { - Some(_) => "Channel updated\\.", - None => "Channel added\\.", - })), - Err(sqlx::Error::Database(err)) => { - match err.downcast::().routine() { - Some("_bt_check_unique", ) => { - return Ok("Duplicate key\\.".to_string()) - }, - Some(_) => { - return Ok("Database error\\.".to_string()) - }, - None => { - return Ok("No database error extracted\\.".to_string()) - }, - }; - }, - Err(err) => { - bail!("Sorry, unknown error:\n{:#?}\n", err); - }, - }; - } - - async fn autofetch(&self) -> Result<()> { - let mut delay = chrono::Duration::minutes(1); - let mut now; - loop { - let mut conn = self.pool.acquire().await - .with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?; - now = chrono::Local::now(); - let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") - .fetch_all(&mut conn).await?; - for row in queue.iter() { - let source_id: i32 = row.try_get("source_id")?; - let owner: i64 = row.try_get("owner")?; - let next_fetch: DateTime = row.try_get("next_fetch")?; - if next_fetch < now { - //let clone = self.clone(); - //clone.owner_chat(UserId::new(owner)); - let clone = Core { - owner_chat: UserId::new(owner), - ..self.clone() - }; - tokio::spawn(async move { - if let Err(err) = clone.check(source_id, owner, true).await { - if let Err(err) = clone.debug(&format!("šŸ›‘ {:?}", err), None) { - eprintln!("Check error: {}", err); - }; - }; - }); - } else { - if next_fetch - now < delay { - delay = next_fetch - now; - } - } - }; - queue.clear(); - tokio::time::sleep(delay.to_std()?).await; - delay = chrono::Duration::minutes(1); - } - } - - async fn list(&self, owner: S) -> Result> - where S: Into { - let owner = owner.into(); - let mut reply = vec![]; - let mut conn = self.pool.acquire().await - .with_context(|| format!("List fetch conn:\n{:?}", &self.pool))?; - reply.push("Channels:".to_string()); - let rows = sqlx::query("select source_id, channel, enabled, url, iv_hash from rsstg_source where owner = $1 order by source_id") - .bind(owner) - .fetch_all(&mut conn).await?; - for row in rows.iter() { - let source_id: i32 = row.try_get("source_id")?; - let username: &str = row.try_get("channel")?; - let enabled: bool = row.try_get("enabled")?; - let url: &str = row.try_get("url")?; - let iv_hash: Option<&str> = row.try_get("iv_hash")?; - reply.push(format!("\n\\#ļøāƒ£ {} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", source_id, username, - match enabled { - true => "šŸ”„ enabled", - false => "ā›” disabled", - }, url)); - if let Some(hash) = iv_hash { - reply.push(format!("IV `{}`", hash)); - } - }; - Ok(reply) - } -} +use anyhow::Result; #[tokio::main] async fn main() -> Result<()> { let mut settings = config::Config::default(); settings.merge(config::File::with_name("rsstg"))?; - let core = Core::new(settings).await?; + let core = core::Core::new(settings).await?; let mut stream = core.stream(); stream.allowed_updates(&[AllowedUpdate::Message]); let mut reply_to: Option; @@ -382,182 +26,42 @@ loop { reply_to = None; match stream.next().await { Some(update) => { if let Err(err) = handle(update?, &core, &mut reply_to).await { - core.debug(&format!("šŸ›‘ {:?}", err), reply_to)?; + core.send(&format!("šŸ›‘ {:?}", err), reply_to)?; }; }, None => { - core.debug(&format!("šŸ›‘ None error."), None)?; + core.send(&format!("šŸ›‘ None error."), None)?; } }; } //Ok(()) } -async fn handle(update: telegram_bot::Update, core: &Core, mut _reply_to: &Option) -> Result<()> { - lazy_static! { - static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap(); - static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap(); - static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap(); - } - +async fn handle(update: telegram_bot::Update, core: &core::Core, mut _reply_to: &Option) -> Result<()> { match update.kind { UpdateKind::Message(message) => { - let mut reply: Vec = vec![]; match message.kind { MessageKind::Text { ref data, .. } => { - let mut words = data.split_whitespace(); - let cmd = words.next().unwrap(); - match cmd { - -// start - - "/start" => { - reply.push("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.".to_string()); - }, - -// list - - "/list" => { - reply.append(&mut core.list(message.from.id).await?); - }, - -// add - - "/add" | "/update" => { - _reply_to = &Some(message.from.id); - let mut source_id: Option = None; - let at_least = "Requires at least 3 parameters."; - if cmd == "/update" { - let first_word = words.next() - .context(at_least)?; - source_id = Some(first_word.parse::() - .with_context(|| format!("I need a number, but got {}.", first_word))?); - } - let (channel, url, iv_hash) = ( - words.next().context(at_least)?, - words.next().context(at_least)?, - words.next()); - if ! RE_USERNAME.is_match(&channel) { - reply.push("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?".to_string()); - bail!("Wrong username {:?}.", &channel); - } - if ! RE_LINK.is_match(&url) { - reply.push("Link should be link to atom/rss feed, something like \"https://domain/path\"\\.".to_string()); - bail!("Url: {:?}", &url); - } - if let Some(hash) = iv_hash { - if ! RE_IV_HASH.is_match(&hash) { - reply.push("IV hash should be 14 hex digits.".to_string()); - bail!("IV: {:?}", &iv_hash); - }; - }; - let channel_id = i64::from(core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await?.id()); - let chan_adm = core.tg.send(telegram_bot::GetChatAdministrators::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await - .context("Sorry, I have no access to that chat\\.")?; - let (mut me, mut user) = (false, false); - for admin in chan_adm { - if admin.user.id == core.my.id { - me = true; - }; - if admin.user.id == message.from.id { - user = true; - }; - }; - if ! me { bail!("I need to be admin on that channel\\."); }; - if ! user { bail!("You should be admin on that channel\\."); }; - reply.push(core.update(source_id, channel, channel_id, url, iv_hash, message.from.id).await?); - }, - -// check - - "/check" => { - match &words.next().unwrap().parse::() { - Err(err) => { - reply.push(format!("I need a number\\.\n{}", &err)); - }, - Ok(number) => { - core.check(*number, message.from.id, false).await - .context("Channel check failed.")?; - }, - }; - }, - -// clean - - "/clean" => { - match &words.next().unwrap().parse::() { - Err(err) => { - reply.push(format!("I need a number\\.\n{}", &err)); - }, - Ok(number) => { - let result = core.clean(&number, message.from.id).await?; - reply.push(result.to_string()); - }, - }; - }, - -// enable - - "/enable" => { - match &words.next().unwrap().parse::() { - Err(err) => { - reply.push(format!("I need a number\\.\n{}", &err)); - }, - Ok(number) => { - let result = core.enable(&number, message.from.id).await?; - reply.push(result.to_string()); - }, - }; - }, - -// delete - - "/delete" => { - match &words.next().unwrap().parse::() { - Err(err) => { - reply.push(format!("I need a number\\.\n{}", &err)); - }, - Ok(number) => { - let result = core.delete(&number, message.from.id).await?; - reply.push(result.to_string()); - }, - }; - }, - -// disable - - "/disable" => { - match &words.next().unwrap().parse::() { - Err(err) => { - reply.push(format!("I need a number\\.\n{}", &err)); - }, - Ok(number) => { - let result = core.disable(&number, message.from.id).await?; - reply.push(result.to_string()); - }, - }; - }, - + let sender = message.from.id; + let words: Vec<&str> = data.split_whitespace().collect(); + match words[0] { + "/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(core, sender, words).await?, + "/start" => command::start(core, sender).await?, + "/list" => command::list(core, sender).await?, + "/add" | "/update" => command::update(core, sender, words).await?, _ => { }, }; }, _ => { }, }; - - if reply.len() > 0 { - if let Err(err) = core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await { - dbg!(reply.join("\n")); - println!("{}", err); - }; - }; }, _ => {}, }; Ok(()) }