Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "rsstg" -version = "0.1.2" +version = "0.1.3" authors = ["arcade"] edition = "2018" [dependencies] config = "*" Index: rsstg.sql ================================================================== --- rsstg.sql +++ rsstg.sql @@ -3,20 +3,19 @@ create unique index rsstg_updates__id on rsstg_updates(update->>'update_id'); -- create table rsstg_users (id integer); create table rsstg_channel ( channel_id bigint primary key, - username text); + username text not null); create unique index rsstg_channel__username on rsstg_channel(username); create table rsstg_source ( source_id serial, channel_id integer not null, url text not null, - last_fetch timestamptz, - last_scrape timestamptz default now(), - enabled boolean default false, + last_scrape not null timestamptz default now(), + enabled boolean not null default false, iv_hash text, owner bigint not null); create unique index rsstg_source__source_id on rsstg_source(source_id); create unique index rsstg_source__channel_id__owner on rsstg_source(channel_id, owner); create index rsstg_source__owner on rsstg_source(owner); @@ -23,11 +22,11 @@ create table rsstg_post ( source_id integer not null, date int not null, url text not null, - hour smallint generated always as (extract('hour' from posted)) stored, + hour smallint not null generated always as (extract('hour' from posted)) stored, FOREIGN KEY (source_id) REFERENCES rsstg_source(source_id) ); create unique index rsstg_post__url on rsstg_post(url); create index rsstg_post__hour on rsstg_post(hour); create index rsstg_post__posted_hour on rsstg_post(posted,hour); Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -53,23 +53,21 @@ fn debug(&self, msg: &str) -> Result<()> { self.tg.spawn(SendMessage::new(self.owner_chat, msg)); Ok(()) } - async fn check(&self, channel: &str, real: Option) -> Result<()> { - match sqlx::query("select source_id, channel_id, url, last_fetch, iv_hash, owner from rsstg_source natural left join rsstg_channel where username = $1") - .bind(channel) + async fn check(&self, id: &i32, real: Option) -> Result<()> { + match sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1") + .bind(id) .fetch_one(&self.pool).await { Ok(row) => { - let id: i32 = row.try_get("source_id")?; let channel_id: i64 = row.try_get("channel_id")?; let destination = match real { Some(true) => UserId::new(channel_id), Some(false) | None => UserId::new(row.try_get("owner")?), }; let url: &str = row.try_get("url")?; - let last_fetch: Option> = row.try_get("last_fetch")?; let mut this_fetch: Option> = None; let iv_hash: Option<&str> = row.try_get("iv_hash")?; match rss::Channel::from_url(url) { Ok(feed) => { self.debug(&format!("# title:{:?} ttl:{:?} hours:{:?} days:{:?}", feed.title(), feed.ttl(), feed.skip_hours(), feed.skip_days()))?; @@ -77,62 +75,48 @@ let date = match item.pub_date() { Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), }?; let url = item.link().unwrap().to_string(); - if last_fetch == None || date > last_fetch.unwrap() { - match sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") - .bind(&url) - .bind(id) - .fetch_one(&self.pool).await { - Ok(row) => { - let exists: bool = row.try_get("exists")?; - if ! exists { - if this_fetch == None || date > this_fetch.unwrap() { - this_fetch = Some(date); - } - match self.tg.send( match iv_hash { - Some(x) => SendMessage::new(destination, format!(" {0}", url, x)), - None => SendMessage::new(destination, format!("{}", url)), - }.parse_mode(types::ParseMode::Html)).await { - Ok(_) => { - match sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") - .bind(id) - .bind(date) - .bind(url) - .execute(&self.pool).await { - Ok(_) => {}, - Err(err) => { - self.debug(&err.to_string())?; - }, - }; - }, - Err(err) => { - self.debug(&err.to_string())?; - }, - }; - tokio::time::delay_for(std::time::Duration::new(4, 0)).await; - } - }, - Err(err) => { - self.debug(&err.to_string())?; - }, - }; - }; - }; - // update last_fetch - if this_fetch != None && (last_fetch == None || this_fetch.unwrap() > last_fetch.unwrap()) { - match sqlx::query("update rsstg_source set last_fetch = case when (last_fetch < $1) then $1 else last_fetch end where source_id = $2;") - .bind(this_fetch.unwrap()) - .bind(id) - .execute(&self.pool).await { - Ok(_) => {}, - Err(err) => { - self.debug(&err.to_string())?; - }, - }; - } + match sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") + .bind(&url) + .bind(id) + .fetch_one(&self.pool).await { + Ok(row) => { + let exists: bool = row.try_get("exists")?; + if ! exists { + if this_fetch == None || date > this_fetch.unwrap() { + this_fetch = Some(date); + } + match self.tg.send( match iv_hash { + Some(x) => SendMessage::new(destination, format!(" {0}", url, x)), + None => SendMessage::new(destination, format!("{}", url)), + }.parse_mode(types::ParseMode::Html)).await { + Ok(_) => { + match sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") + .bind(id) + .bind(date) + .bind(url) + .execute(&self.pool).await { + Ok(_) => {}, + Err(err) => { + self.debug(&err.to_string())?; + }, + }; + }, + Err(err) => { + self.debug(&err.to_string())?; + }, + }; + tokio::time::delay_for(std::time::Duration::new(4, 0)).await; + } + }, + Err(err) => { + self.debug(&err.to_string())?; + }, + }; + }; }, Err(err) => { self.debug(&err.to_string())?; }, }; @@ -151,40 +135,36 @@ }; Ok(()) } async fn clean(&self, source_id: i32) -> Result<()> { - for query in vec!["delete from rsstg_post where source_id = $1;", "update rsstg_source set last_fetch = NULL where source_id = $1;"] { - match sqlx::query(query) - .bind(source_id) - .execute(&self.pool).await { - Ok(_) => {}, - Err(err) => { - self.debug(&err.to_string())?; - }, - } - } + match sqlx::query("delete from rsstg_post where source_id = $1;") + .bind(source_id) + .execute(&self.pool).await { + Ok(_) => {}, + Err(err) => { + self.debug(&err.to_string())?; + }, + }; Ok(()) } - async fn enable(&self, user: UserId, channel: &str) -> Result<()> { - match sqlx::query("update rsstg_source set enabled = true from rsstg_channel where rsstg_channel.channel_id = rsstg_source.channel_id and rsstg_channel.username = $1 and owner = $2") - .bind(channel) - .bind(i64::from(user)) + async fn enable(&self, source_id: &i32) -> Result<()> { + match sqlx::query("update rsstg_source set enabled = true where source_id = $1") + .bind(source_id) .execute(&self.pool).await { Ok(_) => {}, Err(err) => { self.debug(&err.to_string())?; }, } Ok(()) } - async fn disable(&self, user: UserId, channel: &str) -> Result<()> { - match sqlx::query("update rsstg_source set enabled = false from rsstg_channel where rsstg_channel.channel_id = rsstg_source.channel_id and rsstg_channel.username = $1 and owner = $2") - .bind(channel) - .bind(i64::from(user)) + async fn disable(&self, source_id: &i32) -> Result<()> { + match sqlx::query("update rsstg_source set enabled = false where source_id = $1") + .bind(source_id) .execute(&self.pool).await { Ok(_) => {}, Err(err) => { self.debug(&err.to_string())?; }, @@ -195,11 +175,12 @@ async fn autofetch(&self) -> Result<()> { let mut delay = chrono::Duration::minutes(5); let mut next_fetch: DateTime; let mut now; loop { - let mut rows = sqlx::query("select source_id, username, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel;") + self.debug("cycle")?; + let mut rows = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel;") .fetch(&self.pool); while let Some(row) = rows.try_next().await.unwrap() { now = chrono::Local::now(); let source_id: i32 = row.try_get("source_id")?; next_fetch = row.try_get("next_fetch")?; @@ -211,25 +192,23 @@ Err(err) => { self.debug(&err.to_string())?; }, }; let clone = self.clone(); - let username: String = row.try_get("username")?; - let username = username.clone(); tokio::spawn(async move { - if let Err(err) = clone.check(&username, Some(true)).await { + if let Err(err) = clone.check(&source_id.clone(), Some(true)).await { eprintln!("connection error: {}", err); } }); - //&self.check(row.try_get("username")?, Some(true)).await?; } else { if next_fetch - now < delay { delay = next_fetch - now; } } }; tokio::time::delay_for(delay.to_std()?).await; + delay = chrono::Duration::minutes(5); } //Ok(()) } } @@ -259,26 +238,27 @@ match cmd { // start "/start" => { - reply.push("Not in service yet. Try later.".to_string()); + reply.push("Not in service yet\\. Try later\\.".to_string()); }, // list "/list" => { reply.push("Channels:".to_string()); - let mut rows = sqlx::query("select username, enabled, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1") + let mut rows = sqlx::query("select source_id, username, enabled, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1 order by source_id") .bind(i64::from(message.from.id)) .fetch(&core.pool); while let Some(row) = rows.try_next().await? { + let source_id: i32 = row.try_get("source_id")?; let username: &str = row.try_get("username")?; let enabled: bool = row.try_get("enabled")?; let url: &str = row.try_get("url")?; let iv_hash: Option<&str> = row.try_get("iv_hash")?; - reply.push(format!("\n\\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", username, + reply.push(format!("\n\\#ļøāƒ£ {} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", source_id, username, match enabled { true => "šŸ”„ enabled", false => "ā›” disabled", }, url)); if let Some(hash) = iv_hash { @@ -287,11 +267,15 @@ } }, // add - "/add" => { + "/add" | "/update" => { + let mut source_id: i32 = 0; + if cmd == "/update" { + source_id = words.next().unwrap().parse::()?; + } let (channel, url, iv_hash) = (words.next().unwrap(), words.next().unwrap(), words.next()); let ok_link = re_link.is_match(&url); let ok_hash = match iv_hash { Some(hash) => re_iv_hash.is_match(&hash), None => true, @@ -317,39 +301,40 @@ reply.push("Sorry, unknown error\\.".to_string()); core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?; None }, }; - match chan { - Some(chan) => { - match sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner) values ($1, $2, $3, $4) on conflict (channel_id, owner) do update set url = excluded.url, iv_hash = excluded.iv_hash;") - .bind(chan) - .bind(url) - .bind(iv_hash) - .bind(i64::from(message.from.id)) - .execute(&core.pool).await { - Ok(_) => reply.push("Channel added\\.".to_string()), - Err(sqlx::Error::Database(err)) => { - match err.downcast::().routine() { - Some("_bt_check_unique", ) => { - reply.push("Duplicate key\\.".to_string()); - }, - Some(_) => { - reply.push("Database error\\.".to_string()); - }, - None => { - reply.push("No database error extracted\\.".to_string()); - }, - }; - }, - Err(err) => { - reply.push("Sorry, unknown error\\.".to_string()); - core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?; - }, - }; - }, - None => {}, + if let Some(chan) = chan { + match if cmd == "/update" { + sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $4 where source_id = $1").bind(source_id) + } else { + sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner) values ($1, $2, $3, $4)") + } + .bind(chan) + .bind(url) + .bind(iv_hash) + .bind(i64::from(message.from.id)) + .execute(&core.pool).await { + Ok(_) => reply.push("Channel added\\.".to_string()), + Err(sqlx::Error::Database(err)) => { + match err.downcast::().routine() { + Some("_bt_check_unique", ) => { + reply.push("Duplicate key\\.".to_string()); + }, + Some(_) => { + reply.push("Database error\\.".to_string()); + }, + None => { + reply.push("No database error extracted\\.".to_string()); + }, + }; + }, + Err(err) => { + reply.push("Sorry, unknown error\\.".to_string()); + core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?; + }, + }; }; }; }, // addchan @@ -412,16 +397,11 @@ }, // check "/check" => { - let channel = words.next().unwrap(); - if ! re_username.is_match(&channel) { - reply.push("Usernames should be something like \"@\\[a-z]\\[a-z0-9_]+\", aren't they?".to_string()); - } else { - &core.check(channel, None).await?; - } + &core.check(&words.next().unwrap().parse::()?, None).await?; }, // clear "/clean" => { @@ -434,41 +414,31 @@ }, // enable "/enable" => { - let channel = words.next().unwrap(); - if ! re_username.is_match(&channel) { - reply.push("Usernames should be something like \"@\\[a-z]\\[a-z0-9_]+\", aren't they?".to_string()); - } else { - match core.enable(message.from.id, channel).await { - Ok(_) => { - reply.push("Channel enabled\\.".to_string()); - } - Err(err) => { - core.debug(&err.to_string())?; - }, - } - } + match core.enable(&words.next().unwrap().parse::()?).await { + Ok(_) => { + reply.push("Channel enabled\\.".to_string()); + } + Err(err) => { + core.debug(&err.to_string())?; + }, + }; }, // disable "/disable" => { - let channel = words.next().unwrap(); - if ! re_username.is_match(&channel) { - reply.push("Usernames should be something like \"@\\[a-z]\\[a-z0-9_]+\", aren't they?".to_string()); - } else { - match core.disable(message.from.id, channel).await { - Ok(_) => { - reply.push("Channel disabled\\.".to_string()); - } - Err(err) => { - core.debug(&err.to_string())?; - }, - } - } + match core.disable(&words.next().unwrap().parse::()?).await { + Ok(_) => { + reply.push("Channel disabled\\.".to_string()); + } + Err(err) => { + core.debug(&err.to_string())?; + }, + }; }, _ => { }, };