Index: Cargo.lock ================================================================== --- Cargo.lock +++ Cargo.lock @@ -101,13 +101,13 @@ "pin-project-lite", ] [[package]] name = "async-compression" -version = "0.4.22" +version = "0.4.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "59a194f9d963d8099596278594b3107448656ba73831c9d8c783e613ce86da64" +checksum = "b37fc50485c4f3f736a4fb14199f6d5f5ba008d7f28fe710306c92780f004c07" dependencies = [ "brotli", "flate2", "futures-core", "memchr", @@ -346,23 +346,23 @@ "piper", ] [[package]] name = "bon" -version = "3.6.1" +version = "3.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94054366e2ff97b455acdd4fdb03913f717febc57b7bbd1741b2c3b87efae030" +checksum = "ced38439e7a86a4761f7f7d5ded5ff009135939ecb464a24452eaa4c1696af7d" dependencies = [ "bon-macros", "rustversion", ] [[package]] name = "bon-macros" -version = "3.6.1" +version = "3.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "542a990e676ce0a0a895ae54b2d94afd012434f2228a85b186c6bc1a7056cdc6" +checksum = "0ce61d2d3844c6b8d31b2353d9f66cf5e632b3e9549583fe3cac2f4f6136725e" dependencies = [ "darling", "ident_case", "prettyplease", "proc-macro2", @@ -371,24 +371,24 @@ "syn 2.0.100", ] [[package]] name = "brotli" -version = "7.0.0" +version = "8.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc97b8f16f944bba54f0433f07e30be199b6dc2bd25937444bbad560bcea29bd" +checksum = "cf19e729cdbd51af9a397fb9ef8ac8378007b797f8273cfbfdf45dcaa316167b" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", "brotli-decompressor", ] [[package]] name = "brotli-decompressor" -version = "4.0.3" +version = "5.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a334ef7c9e23abf0ce748e8cd309037da93e606ad52eb372e4ce327a0dcfbdfd" +checksum = "874bb8112abecc98cbd6d81ea4fa7e94fb9449648c93cc89aa40c81c24d7de03" dependencies = [ "alloc-no-stdlib", "alloc-stdlib", ] @@ -962,13 +962,13 @@ "version_check", ] [[package]] name = "getrandom" -version = "0.2.15" +version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" dependencies = [ "cfg-if", "js-sys", "libc", "wasi 0.11.0+wasi-snapshot-preview1", @@ -1456,13 +1456,13 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d750af042f7ef4f724306de029d18836c26c1765a54a6a3f094cbd23a7267ffa" [[package]] name = "libm" -version = "0.2.11" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8355be11b20d696c8f18f6cc018c4e372165b1fa8126cef092399c9951984ffa" +checksum = "c9627da5196e5d8ed0b0495e61e518847578da83483c37288316d9b2e03a7f72" [[package]] name = "libsqlite3-sys" version = "0.30.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1907,13 +1907,13 @@ "web-time", ] [[package]] name = "quinn-proto" -version = "0.11.10" +version = "0.11.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b820744eb4dc9b57a3398183639c511b5a26d2ed702cedd3febaa1393caa22cc" +checksum = "bcbafbbdbb0f638fe3f35f3c56739f77a8a1d070cb25603226c83339b391472b" dependencies = [ "bytes", "getrandom 0.3.2", "rand 0.9.1", "ring", @@ -2001,11 +2001,11 @@ name = "rand_core" version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom 0.2.15", + "getrandom 0.2.16", ] [[package]] name = "rand_core" version = "0.9.3" @@ -2113,11 +2113,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a4689e6c2294d81e88dc6261c768b63bc4fcdb852be6d1352498b114f61383b7" dependencies = [ "cc", "cfg-if", - "getrandom 0.2.15", + "getrandom 0.2.16", "libc", "untrusted", "windows-sys 0.52.0", ] @@ -2153,11 +2153,11 @@ "quick-xml", ] [[package]] name = "rsstg" -version = "0.3.0" +version = "0.3.1" dependencies = [ "anyhow", "async-std", "atom_syndication", "chrono", @@ -2911,13 +2911,13 @@ "tokio", ] [[package]] name = "tokio-util" -version = "0.7.14" +version = "0.7.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b9590b93e6fcc1739458317cccd391ad3955e2bde8913edf6f95f9e65a8f034" +checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" dependencies = [ "bytes", "futures-core", "futures-sink", "pin-project-lite", Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "rsstg" -version = "0.3.0" +version = "0.3.1" authors = ["arcade"] edition = "2021" [dependencies] anyhow = "1.0.86" Index: rsstg.sql ================================================================== --- rsstg.sql +++ rsstg.sql @@ -8,11 +8,12 @@ channel_id integer not null, url text not null, last_scrape not null timestamptz default now(), enabled boolean not null default true, iv_hash text, - owner bigint not null); + owner bigint not null, + url_re text); create unique index rsstg_source__source_id on rsstg_source(source_id); create unique index rsstg_source__channel_id__owner on rsstg_source(channel_id, owner); create index rsstg_source__owner on rsstg_source(owner); create table rsstg_post ( Index: src/command.rs ================================================================== --- src/command.rs +++ src/command.rs @@ -33,26 +33,28 @@ core.send("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.", Some(chat_id), Some(ParseMode::MarkdownV2)).await?; Ok(()) } -pub async fn list(core: &Core, sender: i64) -> Result<()> { - core.send(core.list(sender).await?, Some(sender), Some(ParseMode::MarkdownV2)).await?; +pub async fn list(core: &mut Core, sender: i64) -> Result<()> { + let msg = core.list(sender).await?; + core.send(msg, Some(sender), Some(ParseMode::MarkdownV2)).await?; Ok(()) } -pub async fn command(core: &Core, sender: i64, command: Vec<&str>) -> Result<()> { +pub async fn command(core: &mut Core, sender: i64, command: Vec<&str>) -> Result<()> { + let mut conn = core.db.begin().await?; if command.len() >= 2 { let msg: Cow = match &command[1].parse::() { Err(err) => format!("I need a number.\n{}", &err).into(), Ok(number) => match command[0] { "/check" => core.check(number, sender, false).await - .context("Channel check failed.")?, - "/clean" => core.clean(number, sender).await?, - "/enable" => core.enable(number, sender).await?.into(), - "/delete" => core.delete(number, sender).await?, - "/disable" => core.disable(number, sender).await?.into(), + .context("Channel check failed.")?.into(), + "/clean" => conn.clean(*number, sender).await?, + "/enable" => conn.enable(*number, sender).await?.into(), + "/delete" => conn.delete(*number, sender).await?, + "/disable" => conn.disable(*number, sender).await?.into(), _ => bail!("Command {} not handled.", &command[0]), }, }; core.send(msg, Some(sender), None).await?; } else { @@ -59,11 +61,11 @@ core.send("This command needs a number.", Some(sender), None).await?; } Ok(()) } -pub async fn update(core: &Core, sender: i64, command: Vec<&str>) -> Result<()> { +pub async fn update(core: &mut Core, sender: i64, command: Vec<&str>) -> Result<()> { let mut source_id: Option = None; let at_least = "Requires at least 3 parameters."; let mut i_command = command.iter(); let first_word = i_command.next().context(at_least)?; match *first_word { @@ -133,8 +135,9 @@ user = true; }; }; if ! me { bail!("I need to be admin on that channel."); }; if ! user { bail!("You should be admin on that channel."); }; - core.send(core.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await?, Some(sender), None).await?; + let mut conn = core.db.begin().await?; + core.send(conn.update(source_id, channel, channel_id, url, iv_hash, url_re, sender).await?, Some(sender), None).await?; Ok(()) } Index: src/core.rs ================================================================== --- src/core.rs +++ src/core.rs @@ -1,6 +1,9 @@ -use crate::command; +use crate::{ + command, + sql::Db, +}; use std::{ borrow::Cow, collections::{ BTreeMap, @@ -12,11 +15,10 @@ Mutex }, }; use anyhow::{ - anyhow, bail, Result, }; use async_std::task; use chrono::DateTime; @@ -33,11 +35,10 @@ }, updates::UpdateContent, AsyncTelegramApi, ParseMode, }; -use sqlx::postgres::PgPoolOptions; use thiserror::Error; #[derive(Error, Debug)] pub enum RssError { // #[error(transparent)] @@ -49,17 +50,17 @@ #[derive(Clone)] pub struct Core { owner_chat: i64, pub tg: Bot, pub me: User, - pool: sqlx::Pool, + pub db: Db, sources: Arc>>>, http_client: reqwest::Client, } impl Core { - pub async fn new(settings: config::Config) -> Result> { + pub async fn new(settings: config::Config) -> Result { let owner_chat = settings.get_int("owner")?; let api_key = settings.get_string("api_key")?; let tg = Bot::new(&api_key); let mut client = reqwest::Client::builder(); @@ -68,23 +69,19 @@ client = client.proxy(proxy); } let http_client = client.build()?; let me = tg.get_me().await?; let me = me.result; - let core = Arc::new(Core { + let core = Core { tg, me, owner_chat, - pool: PgPoolOptions::new() - .max_connections(5) - .acquire_timeout(std::time::Duration::new(300, 0)) - .idle_timeout(std::time::Duration::new(60, 0)) - .connect_lazy(&settings.get_string("pg")?)?, + db: Db::new(&settings.get_string("pg")?)?, sources: Arc::new(Mutex::new(HashSet::new())), http_client, - }); - let clone = core.clone(); + }; + let mut clone = core.clone(); task::spawn(async move { loop { let delay = match &clone.autofetch().await { Err(err) => { if let Err(err) = clone.send(format!("šŸ›‘ {err:?}"), None, None).await { @@ -98,11 +95,11 @@ } }); Ok(core) } - pub async fn stream(&self) -> Result<()> { + pub async fn stream(&mut self) -> Result<()> { let mut offset: i64 = 0; let mut params = GetUpdatesParams { offset: None, limit: Some(100), timeout: Some(300), @@ -163,13 +160,13 @@ .build(); self.tg.send_message(&send).await?; Ok(()) } - pub async fn check (&self, id: &i32, owner: i64, real: bool) -> Result> { + pub async fn check (&mut self, id: &i32, owner: i64, real: bool) -> Result { let mut posted: i32 = 0; - let mut conn = self.pool.acquire().await?; + let mut conn = self.db.begin().await?; let id = { let mut set = self.sources.lock().unwrap(); match set.get(id) { Some(id) => id.clone(), @@ -180,12 +177,11 @@ }, } }; let count = Arc::strong_count(&id); if count == 2 { - let source = sqlx::query!("select source_id, channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2", - *id, owner).fetch_one(&mut *conn).await?; + let source = conn.get_source(*id, owner).await?; let destination = match real { true => source.channel_id, false => source.owner, }; let mut this_fetch: Option> = None; @@ -229,113 +225,39 @@ for (date, url) in posts.iter() { let post_url: Cow = match source.url_re { Some(ref x) => sedregex::ReplaceCommand::new(x)?.execute(url), None => url.into(), }; - if let Some(exists) = sqlx::query!("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;", - &post_url, *id).fetch_one(&mut *conn).await?.exists { + if let Some(exists) = conn.exists(&post_url, *id).await? { if ! exists { if this_fetch.is_none() || *date > this_fetch.unwrap() { this_fetch = Some(*date); }; self.send( match &source.iv_hash { Some(hash) => format!(" {post_url}"), None => format!("{post_url}"), }, Some(destination), Some(ParseMode::Html)).await?; - sqlx::query!("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);", - *id, date, &post_url).execute(&mut *conn).await?; + conn.add_post(*id, date, &post_url).await?; }; }; posted += 1; }; posts.clear(); }; - sqlx::query!("update rsstg_source set last_scrape = now() where source_id = $1;", - *id).execute(&mut *conn).await?; - Ok(format!("Posted: {posted}").into()) - } - - pub async fn delete (&self, source_id: &i32, owner: i64) -> Result> { - match sqlx::query!("delete from rsstg_source where source_id = $1 and owner = $2;", - source_id, owner).execute(&mut *self.pool.acquire().await?).await?.rows_affected() { - 0 => { Ok("No data found found.".into()) }, - x => { Ok(format!("{} sources removed.", x).into()) }, - } - } - - pub async fn clean (&self, source_id: &i32, owner: i64) -> Result> { - match sqlx::query!("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;", - source_id, owner).execute(&mut *self.pool.acquire().await?).await?.rows_affected() { - 0 => { Ok("No data found found.".into()) }, - x => { Ok(format!("{x} posts purged.").into()) }, - } - } - - pub async fn enable (&self, source_id: &i32, owner: i64) -> Result<&str> { - match sqlx::query!("update rsstg_source set enabled = true where source_id = $1 and owner = $2", - source_id, owner).execute(&mut *self.pool.acquire().await?).await?.rows_affected() { - 1 => { Ok("Source enabled.") }, - 0 => { Ok("Source not found.") }, - _ => { Err(anyhow!("Database error.")) }, - } - } - - pub async fn disable (&self, source_id: &i32, owner: i64) -> Result<&str> { - match sqlx::query!("update rsstg_source set enabled = false where source_id = $1 and owner = $2", - source_id, owner).execute(&mut *self.pool.acquire().await?).await?.rows_affected() { - 1 => { Ok("Source disabled.") }, - 0 => { Ok("Source not found.") }, - _ => { Err(anyhow!("Database error.")) }, - } - } - - pub async fn update (&self, update: Option, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: i64) -> Result<&str> { - let mut conn = self.pool.acquire().await?; - - match match update { - Some(id) => { - sqlx::query!("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1", - id, channel_id, url, iv_hash, owner, channel, url_re).execute(&mut *conn).await - }, - None => { - sqlx::query!("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)", - channel_id, url, iv_hash, owner, channel, url_re).execute(&mut *conn).await - }, - } { - Ok(_) => Ok(match update { - Some(_) => "Channel updated.", - None => "Channel added.", - }), - Err(sqlx::Error::Database(err)) => { - match err.downcast::().routine() { - Some("_bt_check_unique", ) => { - Ok("Duplicate key.") - }, - Some(_) => { - Ok("Database error.") - }, - None => { - Ok("No database error extracted.") - }, - } - }, - Err(err) => { - bail!("Sorry, unknown error:\n{err:#?}\n"); - }, - } - } - - async fn autofetch(&self) -> Result { - let mut delay = chrono::Duration::minutes(1); - let now = chrono::Local::now(); - let mut queue = sqlx::query!(r#"select source_id, next_fetch as "next_fetch: DateTime", owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';"#) - .fetch_all(&mut *self.pool.acquire().await?).await?; - for row in queue.iter() { + conn.set_scrape(*id).await?; + Ok(format!("Posted: {posted}")) + } + + async fn autofetch(&mut self) -> Result { + let mut delay = chrono::Duration::minutes(1); + let now = chrono::Local::now(); + let mut conn = self.db.begin().await?; + for row in conn.get_queue().await? { if let Some(next_fetch) = row.next_fetch { if next_fetch < now { if let (Some(owner), Some(source_id)) = (row.owner, row.source_id) { - let clone = Core { + let mut clone = Core { owner_chat: owner, ..self.clone() }; task::spawn(async move { if let Err(err) = clone.check(&source_id, owner, true).await { @@ -349,20 +271,18 @@ } else if next_fetch - now < delay { delay = next_fetch - now; } } }; - queue.clear(); Ok(delay.to_std()?) } - pub async fn list (&self, owner: i64) -> Result { + pub async fn list (&mut self, owner: i64) -> Result { let mut reply: Vec> = vec![]; reply.push("Channels:".into()); - let rows = sqlx::query!("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id", - owner).fetch_all(&mut *self.pool.acquire().await?).await?; - for row in rows.iter() { + let mut conn = self.db.begin().await?; + for row in conn.get_list(owner).await? { reply.push(format!("\n\\#ļøāƒ£ {} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", row.source_id, row.channel, match row.enabled { true => "šŸ”„ enabled", false => "ā›” disabled", }, row.url).into()); Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -3,20 +3,21 @@ #![warn(missing_docs)] mod command; mod core; +mod sql; use anyhow::Result; #[async_std::main] async fn main() -> Result<()> { let settings = config::Config::builder() .add_source(config::File::with_name("rsstg")) .build()?; - let core = core::Core::new(settings).await?; + let mut core = core::Core::new(settings).await?; core.stream().await?; Ok(()) } ADDED src/sql.rs Index: src/sql.rs ================================================================== --- /dev/null +++ src/sql.rs @@ -0,0 +1,207 @@ +use std::borrow::Cow; + +use anyhow::{ + Result, + bail, +}; +use chrono::{ + DateTime, + FixedOffset, + Local, +}; +use sqlx::{ + Pool, + Postgres, + Row, + postgres::PgPoolOptions, + pool::PoolConnection, +}; + +#[derive(sqlx::FromRow, Debug)] +pub struct List { + pub source_id: i64, + pub channel: String, + pub enabled: bool, + pub url: String, + pub iv_hash: Option, + pub url_re: Option, +} + +#[derive(sqlx::FromRow, Debug)] +pub struct Source { + pub channel_id: i64, + pub url: String, + pub iv_hash: Option, + pub owner: i64, + pub url_re: Option, +} + +#[derive(sqlx::FromRow)] +pub struct Queue { + pub source_id: Option, + pub next_fetch: Option>, + pub owner: Option, +} + +#[derive(Clone)] +pub struct Db { + pool: sqlx::Pool, +} + +pub struct Conn{ + conn: PoolConnection, +} + +impl Db { + pub fn new (pguri: &str) -> Result { + Ok(Db{ + pool: PgPoolOptions::new() + .max_connections(5) + .acquire_timeout(std::time::Duration::new(300, 0)) + .idle_timeout(std::time::Duration::new(60, 0)) + .connect_lazy(pguri)?, + }) + } + + pub async fn begin(&mut self) -> Result { + Conn::new(&mut self.pool).await + } +} + +impl Conn { + pub async fn new (pool: &mut Pool) -> Result { + let conn = pool.acquire().await?; + Ok(Conn{ + conn, + }) + } + + pub async fn add_post (&mut self, id: i32, date: &DateTime, post_url: &str) -> Result<()> { + sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") + .bind(id) + .bind(date) + .bind(post_url) + .execute(&mut *self.conn).await?; + Ok(()) + } + + pub async fn clean (&mut self, source_id: i32, owner: i64) -> Result> { + match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;") + .bind(source_id) + .bind(owner) + .execute(&mut *self.conn).await?.rows_affected() { + 0 => { Ok("No data found found.".into()) }, + x => { Ok(format!("{x} posts purged.").into()) }, + } + } + + pub async fn delete (&mut self, source_id: i32, owner: i64) -> Result> { + match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;") + .bind(source_id) + .bind(owner) + .execute(&mut *self.conn).await?.rows_affected() { + 0 => { Ok("No data found found.".into()) }, + x => { Ok(format!("{} sources removed.", x).into()) }, + } + } + + pub async fn disable (&mut self, source_id: i32, owner: i64) -> Result<&str> { + match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2") + .bind(source_id) + .bind(owner) + .execute(&mut *self.conn).await?.rows_affected() { + 1 => { Ok("Source disabled.") }, + 0 => { Ok("Source not found.") }, + _ => { bail!("Database error.") }, + } + } + + pub async fn enable (&mut self, source_id: i32, owner: i64) -> Result<&str> { + match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2") + .bind(source_id) + .bind(owner) + .execute(&mut *self.conn).await?.rows_affected() { + 1 => { Ok("Source enabled.") }, + 0 => { Ok("Source not found.") }, + _ => { bail!("Database error.") }, + } + } + + pub async fn exists (&mut self, post_url: &str, id: i32) -> Result> { + let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") + .bind(post_url) + .bind(id) + .fetch_one(&mut *self.conn).await?; + let exists: Option = row.try_get("exists")?; + Ok(exists) + } + + pub async fn get_queue (&mut self) -> Result> { + let block: Vec = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';") + .fetch_all(&mut *self.conn).await?; + Ok(block) + } + + pub async fn get_list (&mut self, owner: i64) -> Result> { + let source: Vec = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id") + .bind(owner) + .fetch_all(&mut *self.conn).await?; + Ok(source) + } + + pub async fn get_source (&mut self, id: i32, owner: i64) -> Result { + let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2") + .bind(id) + .bind(owner) + .fetch_one(&mut *self.conn).await?; + Ok(source) + } + + pub async fn set_scrape (&mut self, id: i32) -> Result<()> { + sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") + .bind(id) + .execute(&mut *self.conn).await?; + Ok(()) + } + + pub async fn update (&mut self, update: Option, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: i64) -> Result<&str> { + match match update { + Some(id) => { + sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1") + .bind(id) + }, + None => { + sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)") + }, + } + .bind(channel_id) + .bind(url) + .bind(iv_hash) + .bind(owner) + .bind(channel) + .bind(url_re) + .execute(&mut *self.conn).await + { + Ok(_) => Ok(match update { + Some(_) => "Channel updated.", + None => "Channel added.", + }), + Err(sqlx::Error::Database(err)) => { + match err.downcast::().routine() { + Some("_bt_check_unique", ) => { + Ok("Duplicate key.") + }, + Some(_) => { + Ok("Database error.") + }, + None => { + Ok("No database error extracted.") + }, + } + }, + Err(err) => { + bail!("Sorry, unknown error:\n{err:#?}\n"); + }, + } + } +}