Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "rsstg" -version = "0.1.8" +version = "0.1.9" authors = ["arcade"] edition = "2018" [dependencies] chrono = "*" Index: rsstg.sql ================================================================== --- rsstg.sql +++ rsstg.sql @@ -30,11 +30,11 @@ create unique index rsstg_post__url on rsstg_post(url); create index rsstg_post__hour on rsstg_post(hour); create index rsstg_post__posted_hour on rsstg_post(posted,hour); create or replace view rsstg_order as - select source_id, coalesce(last_scrape + make_interval(0,0,0,0,0,(60 / (coalesce(activity, 1)/7 + 1) )::integer), now() - interval '1 minute') as next_fetch + select source_id, coalesce(last_scrape + make_interval(0,0,0,0,0,(60 / (coalesce(activity, 1)/7 + 1) )::integer), now() - interval '1 minute') as next_fetch, owner from rsstg_source natural left join (select source_id, count(*) as activity from rsstg_post where hour = extract('hour' from now())::smallint and posted > now() - interval '7 days' Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -67,15 +67,18 @@ fn debug(&self, msg: &str) -> Result<()> { self.tg.spawn(SendMessage::new(self.owner_chat, msg)); Ok(()) } - async fn check(&self, id: &i32, real: bool) -> Result<()> { + async fn check(&self, id: &i32, owner: S, real: bool) -> Result<()> + where S: Into { + let owner: i64 = owner.into(); let mut conn = self.pool.acquire().await .with_context(|| format!("šŸ›‘ Query queue fetch conn:\n{:?}", &self.pool))?; - let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1") + let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1 and owner = $2") .bind(id) + .bind(owner) .fetch_one(&mut conn).await .with_context(|| format!("šŸ›‘ Query source:\n{:?}", &self.pool))?; drop(conn); let channel_id: i64 = row.try_get("channel_id")?; let destination = match real { @@ -132,41 +135,51 @@ .execute(&mut conn).await .with_context(|| format!("šŸ›‘ Update scrape:\n{:?}", &conn))?; Ok(()) } - async fn clean(&self, source_id: i32) -> Result<()> { + async fn clean(&self, source_id: &i32, id: S) -> Result + where S: Into { + let id: i64 = id.into(); let mut conn = self.pool.acquire().await .with_context(|| format!("šŸ›‘ Clean fetch conn:\n{:?}", &self.pool))?; - sqlx::query("delete from rsstg_post where source_id = $1;") + match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;") .bind(source_id) + .bind(id) .execute(&mut conn).await - .with_context(|| format!("šŸ›‘ Clean seen posts:\n{:?}", &self.pool))?; - Ok(()) + .with_context(|| format!("šŸ›‘ Clean seen posts:\n{:?}", &self.pool))? + .rows_affected() { + 0 => { Ok("No data found found\\.".to_string()) }, + x => { Ok(format!("{} posts purged\\.", x)) }, + } } - async fn enable(&self, source_id: &i32, id: telegram_bot::UserId) -> Result<&str> { + async fn enable(&self, source_id: &i32, id: S) -> Result<&str> + where S: Into { + let id: i64 = id.into(); let mut conn = self.pool.acquire().await .with_context(|| format!("šŸ›‘ Enable fetch conn:\n{:?}", &self.pool))?; match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2") .bind(source_id) - .bind(i64::from(id)) + .bind(id) .execute(&mut conn).await .with_context(|| format!("šŸ›‘ Enable source:\n\n{:?}", &self.pool))? .rows_affected() { 1 => { Ok("Source disabled\\.") }, 0 => { Ok("Source not found\\.") }, _ => { Err(anyhow!("Database error.")) }, } } - async fn disable(&self, source_id: &i32, id: telegram_bot::UserId) -> Result<&str> { + async fn disable(&self, source_id: &i32, id: S) -> Result<&str> + where S: Into { + let id: i64 = id.into(); let mut conn = self.pool.acquire().await .with_context(|| format!("šŸ›‘ Disable fetch conn:\n{:?}", &self.pool))?; match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2") .bind(source_id) - .bind(i64::from(id)) + .bind(id) .execute(&mut conn).await .with_context(|| format!("šŸ›‘ Disable source:\n\n{:?}", &self.pool))? .rows_affected() { 1 => { Ok("Source disabled\\.") }, 0 => { Ok("Source not found\\.") }, @@ -179,23 +192,24 @@ let mut now; loop { let mut conn = self.pool.acquire().await .with_context(|| format!("šŸ›‘ Autofetch fetch conn:\n{:?}", &self.pool))?; now = chrono::Local::now(); - let mut queue = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel where next_fetch < now();") + let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source natural left join rsstg_channel where next_fetch < now();") .fetch_all(&mut conn).await?; for row in queue.iter() { let source_id: i32 = row.try_get("source_id")?; + let owner: i64 = row.try_get("owner")?; let next_fetch: DateTime = row.try_get("next_fetch")?; if next_fetch < now { sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;") .bind(source_id) .execute(&mut conn).await .with_context(|| format!("šŸ›‘ Lock source:\n\n{:?}", &self.pool))?; let clone = self.clone(); tokio::spawn(async move { - if let Err(err) = clone.check(&source_id.clone(), true).await { + if let Err(err) = clone.check(&source_id, owner, true).await { if let Err(err) = clone.debug(&err.to_string()) { eprintln!("Check error: {}", err); }; }; }); @@ -209,10 +223,36 @@ tokio::time::delay_for(delay.to_std()?).await; delay = chrono::Duration::minutes(5); } } + async fn list(&self, id: telegram_bot::UserId) -> Result> { + let id = i64::from(id); + let mut reply = vec![]; + let mut conn = self.pool.acquire().await + .with_context(|| format!("šŸ›‘ List fetch conn:\n{:?}", &self.pool))?; + reply.push("Channels:".to_string()); + let rows = sqlx::query("select source_id, username, enabled, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1 order by source_id") + .bind(id) + .fetch_all(&mut conn).await?; + for row in rows.iter() { + let source_id: i32 = row.try_get("source_id")?; + let username: &str = row.try_get("username")?; + let enabled: bool = row.try_get("enabled")?; + let url: &str = row.try_get("url")?; + let iv_hash: Option<&str> = row.try_get("iv_hash")?; + reply.push(format!("\n\\#ļøāƒ£ {} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", source_id, username, + match enabled { + true => "šŸ”„ enabled", + false => "ā›” disabled", + }, url)); + if let Some(hash) = iv_hash { + reply.push(format!("IV `{}`", hash)); + } + }; + Ok(reply) + } } #[tokio::main] async fn main() -> Result<()> { let mut settings = config::Config::default(); @@ -254,37 +294,11 @@ }, // list "/list" => { - match core.pool.acquire().await { - Err(err) => { - core.debug(&format!("šŸ›‘ Disable fetch conn:\n{}\n{:?}", &err, &core.pool))?; - }, - Ok(mut conn) => { - reply.push("Channels:".to_string()); - let rows = sqlx::query("select source_id, username, enabled, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1 order by source_id") - .bind(i64::from(message.from.id)) - .fetch_all(&mut conn).await?; - for row in rows.iter() { - //while let Some(row) = rows.try_next().await? { - let source_id: i32 = row.try_get("source_id")?; - let username: &str = row.try_get("username")?; - let enabled: bool = row.try_get("enabled")?; - let url: &str = row.try_get("url")?; - let iv_hash: Option<&str> = row.try_get("iv_hash")?; - reply.push(format!("\n\\#ļøāƒ£ {} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", source_id, username, - match enabled { - true => "šŸ”„ enabled", - false => "ā›” disabled", - }, url)); - if let Some(hash) = iv_hash { - reply.push(format!("IV `{}`", hash)); - } - } - }, - }; + reply.append(&mut core.list(message.from.id).await?); }, // add "/add" | "/update" => { @@ -424,31 +438,28 @@ match &words.next().unwrap().parse::() { Err(err) => { reply.push(format!("I need a number\\.\n{}", &err)); }, Ok(number) => { - match &core.check(number, false).await { - Ok(_) => { - reply.push("Channel enabled\\.".to_string()); - } - Err(err) => { - core.debug(&format!("šŸ›‘ Channel check failed:\n{}", &err))?; - }, - }; + core.check(&number, message.from.id, false).await + .context("šŸ›‘ Channel check failed.")?; }, }; }, // clean "/clean" => { - if core.owner != i64::from(message.from.id) { - reply.push("Reserved for testing\\.".to_string()); - } else { - let source_id = words.next().unwrap().parse::().unwrap_or(0); - &core.clean(source_id).await?; - } + match &words.next().unwrap().parse::() { + Err(err) => { + reply.push(format!("I need a number\\.\n{}", &err)); + }, + Ok(number) => { + let result = core.clean(&number, message.from.id).await?; + reply.push(result.to_string()); + }, + }; }, // enable "/enable" => {