Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -4,29 +4,18 @@ authors = ["arcade"] edition = "2018" [dependencies] config = "*" -http = "*" -#native-tls = "*" -#postgres = { version = "*", features = ["with-serde_json-1"] } -#tokio-postgres = "*" -#postgres-native-tls = "*" -serde = { version = "1.0", features = ["derive"] } -serde_json = "*" -#serde_postgres = "*" futures = "*" telegram-bot = "*" -#tbot = "*" tokio = { version = "0.2", features = ["macros" ] } -#tokio-timer = "*" futures-util = "*" sqlx = { version = "*", features = [ "postgres", "tls", "runtime-async-std-native-tls", "chrono" ] } -#async-std = { version = "*", features = [ "attributes" ] } regex = "*" rss = { version = "*", features = [ "from_url" ] } Index: rsstg.sql ================================================================== --- rsstg.sql +++ rsstg.sql @@ -28,15 +28,17 @@ hour smallint generated always as (extract('hour' from posted)) stored, FOREIGN KEY (source_id) REFERENCES rsstg_source(source_id) ); create unique index rsstg_post__url on rsstg_post(url); create index rsstg_post__hour on rsstg_post(hour); +create index rsstg_post__posted_hour on rsstg_post(posted,hour); create or replace view rsstg_order as - select source_id, coalesce(last_scrape + make_interval(0,0,0,0,0,(60 / coalesce(activity, 1))::integer), now() - interval '1 minute') as next_fetch + select source_id, coalesce(last_scrape + make_interval(0,0,0,0,0,(420 / coalesce(activity , 1))::integer), now() - interval '1 minute') as next_fetch from rsstg_source natural left join (select source_id, count(*) as activity from rsstg_post where hour = extract('hour' from now()) + and posted > now() - interval '7 days' group by source_id) as act where enabled order by next_fetch; Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -4,55 +4,48 @@ use rss; use chrono::DateTime; use regex::Regex; -//use tbot; -//use tbot::prelude::*; - -use futures::StreamExt; -use futures::TryStreamExt; +use tokio::stream::StreamExt; use telegram_bot::*; use sqlx::postgres::PgPoolOptions; use sqlx::Row; type Result = std::result::Result>; +#[derive(Clone)] struct Core { owner: i64, + api_key: String, owner_chat: UserId, tg: telegram_bot::Api, my: User, pool: sqlx::Pool, } impl Core { async fn new(settings: config::Config) -> Result { let owner = settings.get_int("owner")?; - let tg = Api::new(settings.get_str("api_key")?); + let api_key = settings.get_str("api_key")?; + let tg = Api::new(&api_key); let core = Core { owner: owner, + api_key: api_key.clone(), my: tg.send(telegram_bot::GetMe).await?, tg: tg, owner_chat: UserId::new(owner), pool: PgPoolOptions::new().max_connections(5).connect(&settings.get_str("pg")?).await?, }; + let clone = core.clone(); tokio::spawn(async move { - if let Err(err) = &core.autofetch().await { + if let Err(err) = clone.autofetch().await { eprintln!("connection error: {}", err); } }); - - let tg = Api::new(settings.get_str("api_key")?); - Ok(Core { - owner: owner, - my: tg.send(telegram_bot::GetMe).await?, - tg: tg, - owner_chat: UserId::new(owner), - pool: PgPoolOptions::new().max_connections(5).connect(&settings.get_str("pg")?).await?, - }) + Ok(core) } fn stream(&self) -> telegram_bot::UpdatesStream { self.tg.stream() } @@ -60,60 +53,77 @@ fn debug(&self, msg: &str) -> Result<()> { self.tg.spawn(SendMessage::new(self.owner_chat, msg)); Ok(()) } - async fn check(&self, id: i32, real: Option) -> Result<()> { - match sqlx::query("select channel_id, url, last_fetch, iv_hash from rsstg_source where source_id = $1") - .bind(id) + async fn check(&self, channel: &str, real: Option) -> Result<()> { + match sqlx::query("select source_id, channel_id, url, last_fetch, iv_hash, owner from rsstg_source natural left join rsstg_channel where username = $1") + .bind(channel) .fetch_one(&self.pool).await { Ok(row) => { + let id: i32 = row.try_get("source_id")?; let channel_id: i64 = row.try_get("channel_id")?; let destination = match real { Some(true) => UserId::new(channel_id), - Some(false) | None => self.owner_chat, + Some(false) | None => UserId::new(row.try_get("owner")?), }; let url: &str = row.try_get("url")?; let last_fetch: Option> = row.try_get("last_fetch")?; let mut this_fetch: Option> = None; let iv_hash: Option<&str> = row.try_get("iv_hash")?; match rss::Channel::from_url(url) { Ok(feed) => { self.debug(&format!("# title:{:?} ttl:{:?} hours:{:?} days:{:?}", feed.title(), feed.ttl(), feed.skip_hours(), feed.skip_days()))?; for item in feed.items() { - let date = DateTime::parse_from_rfc2822(item.pub_date().unwrap()).unwrap(); - let url = item.link().unwrap().to_string(); - if last_fetch == None || date > last_fetch.unwrap() { - if this_fetch == None || date > this_fetch.unwrap() { - this_fetch = Some(date); - } - match self.tg.send( match iv_hash { - Some(x) => SendMessage::new(destination, format!(" {0}", url, x)), - None => SendMessage::new(destination, format!("{}", url)), - }.parse_mode(types::ParseMode::Html)).await { - Ok(_) => { - match sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") - .bind(id) - .bind(date) - .bind(url) - .execute(&self.pool).await { - Ok(_) => {}, - Err(err) => { - self.debug(&err.to_string())?; - }, - }; - }, - Err(err) => { - self.debug(&err.to_string())?; - }, - } - }; - tokio::time::delay_for(std::time::Duration::new(4, 0)).await; - }; - // update last_fetch - if this_fetch != None && (last_fetch == None || this_fetch.unwrap() > last_fetch.unwrap()) { - match sqlx::query("update rsstg_source set last_fetch = $1 where source_id = $2;") + let date = match item.pub_date() { + Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), + None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), + }?; + let url = item.link().unwrap().to_string(); + if last_fetch == None || date > last_fetch.unwrap() { + match sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") + .bind(&url) + .bind(id) + .fetch_one(&self.pool).await { + Ok(row) => { + let exists: bool = row.try_get("exists")?; + if ! exists { + if this_fetch == None || date > this_fetch.unwrap() { + this_fetch = Some(date); + } + match self.tg.send( match iv_hash { + Some(x) => SendMessage::new(destination, format!(" {0}", url, x)), + None => SendMessage::new(destination, format!("{}", url)), + }.parse_mode(types::ParseMode::Html)).await { + Ok(_) => { + match sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") + .bind(id) + .bind(date) + .bind(url) + .execute(&self.pool).await { + Ok(_) => {}, + Err(err) => { + self.debug(&err.to_string())?; + }, + }; + }, + Err(err) => { + self.debug(&err.to_string())?; + }, + }; + tokio::time::delay_for(std::time::Duration::new(4, 0)).await; + } + }, + Err(err) => { + self.debug(&err.to_string())?; + }, + }; + }; + }; + // update last_fetch + if this_fetch != None && (last_fetch == None || this_fetch.unwrap() > last_fetch.unwrap()) { + match sqlx::query("update rsstg_source set last_fetch = case when (last_fetch < $1) then $1 else last_fetch end where source_id = $2;") .bind(this_fetch.unwrap()) .bind(id) .execute(&self.pool).await { Ok(_) => {}, Err(err) => { @@ -182,26 +192,40 @@ Ok(()) } async fn autofetch(&self) -> Result<()> { let mut delay = chrono::Duration::minutes(5); - let mut source_id; let mut next_fetch: DateTime; let mut now; loop { - let mut rows = sqlx::query("select source_id, next_fetch from rsstg_order limit 1;") + let mut rows = sqlx::query("select source_id, username, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel;") .fetch(&self.pool); while let Some(row) = rows.try_next().await.unwrap() { now = chrono::Local::now(); - source_id = row.try_get("source_id")?; + let source_id: i32 = row.try_get("source_id")?; next_fetch = row.try_get("next_fetch")?; if next_fetch < now { - &self.check(source_id, Some(true)).await?; + match sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;") + .bind(source_id) + .execute(&self.pool).await { + Ok(_) => {}, + Err(err) => { + self.debug(&err.to_string())?; + }, + }; + let clone = self.clone(); + let username: String = row.try_get("username")?; + let username = username.clone(); + tokio::spawn(async move { + if let Err(err) = clone.check(&username, Some(true)).await { + eprintln!("connection error: {}", err); + } + }); + //&self.check(row.try_get("username")?, Some(true)).await?; } else { - delay = next_fetch - now; - if delay > chrono::Duration::minutes(5) { - delay = chrono::Duration::minutes(5); + if next_fetch - now < delay { + delay = next_fetch - now; } } }; tokio::time::delay_for(delay.to_std()?).await; } @@ -208,60 +232,21 @@ //Ok(()) } } -#[tokio::main] +#[tokio::main(basic_scheduler)] async fn main() -> Result<()> { let mut settings = config::Config::default(); settings.merge(config::File::with_name("rsstg"))?; - let re_username = Regex::new(r"^@[a-z][a-z0-9_]+$")?; - let re_link = Regex::new(r"^https?://[a-z.0-9]+/[-_a-z.0-9/]+$")?; + let re_username = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$")?; + let re_link = Regex::new(r"^https?://[a-zA-Z.0-9]+/[-_a-zA-Z.0-9/?=]+$")?; let re_iv_hash = Regex::new(r"^[a-f0-9]{14}$")?; - /* - tokio::spawn(async move { - if let Err(e) = connection.await { - eprintln!("connection error: {}", e); - } - }); */ - let core = Core::new(settings).await?; - /* - let mut bot = tbot::Bot::new(settings.get_str("api_key")?).event_loop(); - - bot.command("start", //"Start working.", - |context| async move { - context.send_message_in_reply("Not in service yet. Try later.").call().await.unwrap(); - }, - ); - - bot.command("list", //"List channels.", - |context| async move { - dbg!(&context.chat); - let mut res = "Channels:\n".to_owned(); - let mut rows = sqlx::query("select username, channel_id, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1") - .bind(context.chat.id.0) - .fetch(&pool); - while let Some(row) = rows.try_next().await.unwrap() { - let username: &str = row.try_get("username").unwrap(); - let channel_id: &str = row.try_get("channel_id").unwrap(); - let url: &str = row.try_get("url").unwrap(); - let iv_hash: &str = row.try_get("iv_hash").unwrap(); - res.push_str(&format!("`{}`: `{}` iv:`{}`\n", username, url, iv_hash)); - //match row.get(3) as str { - //Some(x) => x, - //_ => "None" - //})); - } - context.send_message_in_reply(&res).call().await.unwrap(); - }, - ); - */ - let mut stream = core.stream(); while let Some(update) = stream.next().await { let update = update?; match update.kind { @@ -288,20 +273,19 @@ .fetch(&core.pool); while let Some(row) = rows.try_next().await? { let username: &str = row.try_get("username")?; let enabled: bool = row.try_get("enabled")?; let url: &str = row.try_get("url")?; - let iv_hash: &str = row.try_get("iv_hash")?; - reply.push(format!("\n\\*ļøāƒ£ `{}` {}\nšŸ”— `{}`\nIV `{}`", username, + let iv_hash: Option<&str> = row.try_get("iv_hash")?; + reply.push(format!("\n\\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", username, match enabled { true => "šŸ”„ enabled", false => "ā›” disabled", - }, url, iv_hash)); - //match row.get(3) as str { - //Some(x) => x, - //_ => "None" - //})); + }, url)); + if let Some(hash) = iv_hash { + reply.push(format!("IV `{}`", hash)); + } } }, // add @@ -311,11 +295,11 @@ let ok_hash = match iv_hash { Some(hash) => re_iv_hash.is_match(&hash), None => true, }; if ! ok_link { - reply.push("Link should be link to atom/rss feed, something like \"https://domain/path\".".to_string()); + reply.push("Link should be link to atom/rss feed, something like \"https://domain/path\"\\.".to_string()); core.debug(&format!("Url: {:?}", &url))?; } if ! ok_hash { reply.push("IV hash should be 14 hex digits.".to_string()); core.debug(&format!("IV: {:?}", &iv_hash))?; @@ -371,11 +355,11 @@ // addchan "/addchan" => { let channel = words.next().unwrap(); if ! re_username.is_match(&channel) { - reply.push("Usernames should be something like \"@\\[a-z]\\[a-z0-9_]+\", aren't they?".to_string()); + reply.push("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?".to_string()); } else { let chan: Option = match sqlx::query("select channel_id from rsstg_channel where username = $1") .bind(channel) .fetch_one(&core.pool).await { Ok(chan) => Some(chan.try_get("channel_id")?), @@ -428,15 +412,15 @@ }, // check "/check" => { - if core.owner != i64::from(message.from.id) { - reply.push("Reserved for testing\\.".to_string()); + let channel = words.next().unwrap(); + if ! re_username.is_match(&channel) { + reply.push("Usernames should be something like \"@\\[a-z]\\[a-z0-9_]+\", aren't they?".to_string()); } else { - let source_id = words.next().unwrap().parse::().unwrap_or(0); - &core.check(source_id, None).await?; + &core.check(channel, None).await?; } }, // clear @@ -503,36 +487,8 @@ } }, _ => {}, }; } - /* - loop { - println!("cycle"); - for _ in botdb.query("select owner from rsstg_updates where owner is NULL limit 1;", &[])? { - for row in botdb.query("update rsstg_updates set owner = $1 where update->>'update_id' = ( select update->>'update_id' from rsstg_updates where owner is NULL limit 1 for update skip locked ) returning update;", &[owner])? { - let u :types::Update = serde_json::from_value(row.get(0))?; - println!("update: {:?}", &u); - /* - if let Some(message) = &u.message { - //if u["message"] != None { - if let (Some(entities), Some(text)) = (&message.entities, &message.text) { - //if u["message"]["entities"] { - for entry in entities { - if &entry.type_ == "bot_command" { - println!("command: {:?}", &text.chars().skip(entry.offset as usize).take(entry.length as usize).collect::()); - } - println!("entity: {:?}", &entry); - } - } - } - */ - } - } - std::process::exit(0); - } - */ - - //bot.polling().start().await.unwrap(); Ok(()) }