Index: Cargo.toml ================================================================== --- Cargo.toml +++ Cargo.toml @@ -1,8 +1,8 @@ [package] name = "rsstg" -version = "0.1.5" +version = "0.1.6" authors = ["arcade"] edition = "2018" [dependencies] config = "*" Index: src/main.rs ================================================================== --- src/main.rs +++ src/main.rs @@ -35,11 +35,16 @@ owner: owner, api_key: api_key.clone(), my: tg.send(telegram_bot::GetMe).await?, tg: tg, owner_chat: UserId::new(owner), - pool: PgPoolOptions::new().max_connections(5).connect(&settings.get_str("pg")?).await?, + pool: PgPoolOptions::new() + .max_connections(5) + .connect_timeout(std::time::Duration::new(300, 0)) + .idle_timeout(std::time::Duration::new(60, 0)) + .connect_lazy(&settings.get_str("pg")?)?, + //.connect(&settings.get_str("pg")?).await?, }; let clone = core.clone(); tokio::spawn(async move { if let Err(err) = clone.autofetch().await { eprintln!("connection error: {}", err); @@ -55,167 +60,217 @@ fn debug(&self, msg: &str) -> Result<()> { self.tg.spawn(SendMessage::new(self.owner_chat, msg)); Ok(()) } - async fn check(&self, id: &i32, real: Option) -> Result<()> { - match sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1") - .bind(id) - .fetch_one(&self.pool).await { - Ok(row) => { - let channel_id: i64 = row.try_get("channel_id")?; - let destination = match real { - Some(true) => UserId::new(channel_id), - Some(false) | None => UserId::new(row.try_get("owner")?), - }; - let url: &str = row.try_get("url")?; - let mut this_fetch: Option> = None; - let iv_hash: Option<&str> = row.try_get("iv_hash")?; - let mut posts: BTreeMap, String> = BTreeMap::new(); - match rss::Channel::from_url(url) { - Ok(feed) => { - for item in feed.items() { - let date = match item.pub_date() { - Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), - None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), - }?; - let url = item.link().unwrap().to_string(); - posts.insert(date.clone(), url.clone()); - }; - for (date, url) in posts.iter() { - match sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") - .bind(&url) - .bind(id) - .fetch_one(&self.pool).await { - Ok(row) => { - let exists: bool = row.try_get("exists")?; - if ! exists { - if this_fetch == None || *date > this_fetch.unwrap() { - this_fetch = Some(*date); - } - match self.tg.send( match iv_hash { - Some(x) => SendMessage::new(destination, format!(" {0}", url, x)), - None => SendMessage::new(destination, format!("{}", url)), - }.parse_mode(types::ParseMode::Html)).await { - Ok(_) => { - match sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") - .bind(id) - .bind(date) - .bind(url) - .execute(&self.pool).await { - Ok(_) => {}, - Err(err) => { - self.debug(&err.to_string())?; - }, - }; - }, - Err(err) => { - self.debug(&err.to_string())?; - }, - }; - tokio::time::delay_for(std::time::Duration::new(4, 0)).await; - } - }, - Err(err) => { - self.debug(&err.to_string())?; - }, - }; - }; - posts.clear(); - }, - Err(err) => { - self.debug(&format!("Problem opening feed url:\n{}\n{}", &url, &err.to_string()))?; - }, - }; - match sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") - .bind(id) - .execute(&self.pool).await { - Ok(_) => {}, - Err(err) => { - self.debug(&err.to_string())?; - }, - }; - }, - Err(err) => { - self.debug(&err.to_string())?; + async fn check(&self, id: &i32, real: bool) -> Result<()> { + match self.pool.acquire().await { + Err(err) => { + self.debug(&format!("πŸ›‘ Query queue fetch conn:\n{}\n{:?}", &err, &self.pool))?; + }, + Ok(mut conn) => { + match sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1") + .bind(id) + .fetch_one(&mut conn).await { + Err(err) => { + self.debug(&format!("πŸ›‘ Query queue:\n{}\n{:?}", &err, &conn))?; + }, + Ok(row) => { + drop(conn); + let channel_id: i64 = row.try_get("channel_id")?; + let destination = match real { + true => UserId::new(channel_id), + false => UserId::new(row.try_get("owner")?), + }; + let url: &str = row.try_get("url")?; + let mut this_fetch: Option> = None; + let iv_hash: Option<&str> = row.try_get("iv_hash")?; + let mut posts: BTreeMap, String> = BTreeMap::new(); + match rss::Channel::from_url(url) { + Err(err) => { + self.debug(&format!("πŸ›‘ Problem opening feed url:\n{}\n{}", &url, &err))?; + }, + Ok(feed) => { + for item in feed.items() { + let date = match item.pub_date() { + Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), + None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), + }?; + let url = item.link().unwrap().to_string(); + posts.insert(date.clone(), url.clone()); + }; + for (date, url) in posts.iter() { + match self.pool.acquire().await { + Err(err) => { + self.debug(&format!("πŸ›‘ Check post fetch conn:\n{}\n{:?}", &err, &self.pool))?; + }, + Ok(mut conn) => { + match sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;") + .bind(&url) + .bind(id) + .fetch_one(&mut conn).await { + Err(err) => { + self.debug(&format!("πŸ›‘ Check post:\n{}\n{:?}", &err, &conn))?; + }, + Ok(row) => { + let exists: bool = row.try_get("exists")?; + if ! exists { + if this_fetch == None || *date > this_fetch.unwrap() { + this_fetch = Some(*date); + } + match self.tg.send( match iv_hash { + Some(x) => SendMessage::new(destination, format!(" {0}", url, x)), + None => SendMessage::new(destination, format!("{}", url)), + }.parse_mode(types::ParseMode::Html)).await { + Err(err) => { + self.debug(&format!("πŸ›‘ Can't post message:\n{}", &err))?; + }, + Ok(_) => { + match sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);") + .bind(id) + .bind(date) + .bind(url) + .execute(&mut conn).await { + Ok(_) => {}, + Err(err) => { + self.debug(&format!("πŸ›‘Rrecord post:\n{}\n{:?}", &err, &conn))?; + }, + }; + }, + }; + drop(conn); + tokio::time::delay_for(std::time::Duration::new(4, 0)).await; + } + }, + }; + } + }; + }; + posts.clear(); + }, + }; + match self.pool.acquire().await { + Err(err) => { + self.debug(&format!("πŸ›‘ Update scrape fetch conn:\n{}\n{:?}", &err, &self.pool))?; + }, + Ok(mut conn) => { + match sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") + .bind(id) + .execute(&mut conn).await { + Err(err) => { + self.debug(&format!("πŸ›‘ Update scrape:\n{}\n{:?}", &err, &conn))?; + }, + Ok(_) => {}, + }; + }, + }; + }, + }; }, }; Ok(()) } async fn clean(&self, source_id: i32) -> Result<()> { - match sqlx::query("delete from rsstg_post where source_id = $1;") - .bind(source_id) - .execute(&self.pool).await { - Ok(_) => {}, + match self.pool.acquire().await { Err(err) => { - self.debug(&err.to_string())?; + self.debug(&format!("πŸ›‘ Clean fetch conn:\n{}\n{:?}", &err, &self.pool))?; + }, + Ok(mut conn) => { + match sqlx::query("delete from rsstg_post where source_id = $1;") + .bind(source_id) + .execute(&mut conn).await { + Err(err) => { + self.debug(&format!("πŸ›‘ Clean seen posts:\n{}\n{:?}", &err, &self.pool))?; + }, + Ok(_) => {}, + }; }, }; Ok(()) } async fn enable(&self, source_id: &i32) -> Result<()> { - match sqlx::query("update rsstg_source set enabled = true where source_id = $1") - .bind(source_id) - .execute(&self.pool).await { - Ok(_) => {}, + match self.pool.acquire().await { Err(err) => { - self.debug(&err.to_string())?; + self.debug(&format!("πŸ›‘ Enable fetch conn:\n{}\n{:?}", &err, &self.pool))?; + }, + Ok(mut conn) => { + match sqlx::query("update rsstg_source set enabled = true where source_id = $1") + .bind(source_id) + .execute(&mut conn).await { + Err(err) => { + self.debug(&format!("πŸ›‘ Enable source:\n{}\n{:?}", &err, &self.pool))?; + }, + Ok(_) => {}, + } }, - } + }; Ok(()) } async fn disable(&self, source_id: &i32) -> Result<()> { - match sqlx::query("update rsstg_source set enabled = false where source_id = $1") - .bind(source_id) - .execute(&self.pool).await { - Ok(_) => {}, + match self.pool.acquire().await { Err(err) => { - self.debug(&err.to_string())?; + self.debug(&format!("πŸ›‘ Disable fetch conn:\n{}\n{:?}", &err, &self.pool))?; + }, + Ok(mut conn) => { + match sqlx::query("update rsstg_source set enabled = false where source_id = $1") + .bind(source_id) + .execute(&mut conn).await { + Err(err) => { + self.debug(&format!("πŸ›‘ Disable source:\n{}\n{:?}", &err, &self.pool))?; + }, + Ok(_) => {}, + } }, - } + }; Ok(()) } async fn autofetch(&self) -> Result<()> { let mut delay = chrono::Duration::minutes(5); - let mut next_fetch: DateTime; let mut now; loop { - now = chrono::Local::now(); - let mut rows = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel where next_fetch < now();") - .fetch(&self.pool); - while let Some(row) = rows.try_next().await.unwrap() { - let source_id: i32 = row.try_get("source_id")?; - next_fetch = row.try_get("next_fetch")?; - if next_fetch < now { - match sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;") - .bind(source_id) - .execute(&self.pool).await { - Ok(_) => {}, - Err(err) => { - self.debug(&err.to_string())?; - }, - }; - let clone = self.clone(); - tokio::spawn(async move { - if let Err(err) = clone.check(&source_id.clone(), Some(true)).await { - eprintln!("connection error: {}", err); - } - }); - } else { - if next_fetch - now < delay { - delay = next_fetch - now; - } - } + match self.pool.acquire().await { + Err(err) => { + self.debug(&format!("πŸ›‘ Autofetch fetch conn:\n{}\n{:?}", &err, &self.pool))?; + }, + Ok(mut conn) => { + now = chrono::Local::now(); + let mut queue = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel where next_fetch < now();") + .fetch_all(&mut conn).await?; + for row in queue.iter() { + let source_id: i32 = row.try_get("source_id")?; + let next_fetch: DateTime = row.try_get("next_fetch")?; + if next_fetch < now { + match sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;") + .bind(source_id) + .execute(&mut conn).await { + Ok(_) => {}, + Err(err) => { + self.debug(&err.to_string())?; + }, + }; + let clone = self.clone(); + tokio::spawn(async move { + if let Err(err) = clone.check(&source_id.clone(), true).await { + eprintln!("connection error: {}", err); + } + }); + } else { + if next_fetch - now < delay { + delay = next_fetch - now; + } + } + }; + queue.clear(); + }, }; tokio::time::delay_for(delay.to_std()?).await; delay = chrono::Duration::minutes(5); } - //Ok(()) } } #[tokio::main] @@ -231,10 +286,13 @@ let mut stream = core.stream(); while let Some(update) = stream.next().await { match update { + Err(err) => { + core.debug(&err.to_string())?; + }, Ok(update) => { match update.kind { UpdateKind::Message(message) => { let mut reply: Vec = vec![]; match message.kind { @@ -250,29 +308,37 @@ }, // list "/list" => { - reply.push("Channels:".to_string()); - let mut rows = sqlx::query("select source_id, username, enabled, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1 order by source_id") - .bind(i64::from(message.from.id)) - .fetch(&core.pool); - while let Some(row) = rows.try_next().await? { - let source_id: i32 = row.try_get("source_id")?; - let username: &str = row.try_get("username")?; - let enabled: bool = row.try_get("enabled")?; - let url: &str = row.try_get("url")?; - let iv_hash: Option<&str> = row.try_get("iv_hash")?; - reply.push(format!("\n\\#️⃣ {} \\*️⃣ `{}` {}\nπŸ”— `{}`", source_id, username, - match enabled { - true => "πŸ”„ enabled", - false => "β›” disabled", - }, url)); - if let Some(hash) = iv_hash { - reply.push(format!("IV `{}`", hash)); - } - } + match core.pool.acquire().await { + Err(err) => { + core.debug(&format!("πŸ›‘ Disable fetch conn:\n{}\n{:?}", &err, &core.pool))?; + }, + Ok(mut conn) => { + reply.push("Channels:".to_string()); + let rows = sqlx::query("select source_id, username, enabled, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1 order by source_id") + .bind(i64::from(message.from.id)) + .fetch_all(&mut conn).await?; + for row in rows.iter() { + //while let Some(row) = rows.try_next().await? { + let source_id: i32 = row.try_get("source_id")?; + let username: &str = row.try_get("username")?; + let enabled: bool = row.try_get("enabled")?; + let url: &str = row.try_get("url")?; + let iv_hash: Option<&str> = row.try_get("iv_hash")?; + reply.push(format!("\n\\#️⃣ {} \\*️⃣ `{}` {}\nπŸ”— `{}`", source_id, username, + match enabled { + true => "πŸ”„ enabled", + false => "β›” disabled", + }, url)); + if let Some(hash) = iv_hash { + reply.push(format!("IV `{}`", hash)); + } + } + }, + }; }, // add "/add" | "/update" => { @@ -303,11 +369,11 @@ reply.push("Sorry, I don't know about that channel. Please, add a channel with /addchan.".to_string()); None }, Err(err) => { reply.push("Sorry, unknown error\\.".to_string()); - core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?; + core.debug(&format!("Sorry, unknown error:\n{:#?}\n", err))?; None }, }; if let Some(chan) = chan { match if cmd == "/update" { @@ -334,11 +400,11 @@ }, }; }, Err(err) => { reply.push("Sorry, unknown error\\.".to_string()); - core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?; + core.debug(&format!("Sorry, unknown error:\n{:#?}\n", err))?; }, }; }; }; }, @@ -355,11 +421,11 @@ .fetch_one(&core.pool).await { Ok(chan) => Some(chan.try_get("channel_id")?), Err(sqlx::Error::RowNotFound) => None, Err(err) => { reply.push("Sorry, unknown error\\.".to_string()); - core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?; + core.debug(&format!("Sorry, unknown error:\n{:#?}\n", err))?; None }, }; match chan { Some(chan) => { @@ -403,11 +469,25 @@ }, // check "/check" => { - &core.check(&words.next().unwrap().parse::()?, None).await?; + match &words.next().unwrap().parse::() { + Err(err) => { + reply.push(format!("I need a number\\.\n{}", &err)); + }, + Ok(number) => { + match &core.check(number, false).await { + Ok(_) => { + reply.push("Channel enabled\\.".to_string()); + } + Err(err) => { + core.debug(&format!("πŸ›‘ Channel check failed:\n{}", &err))?; + }, + }; + }, + }; }, // clean "/clean" => { @@ -420,29 +500,43 @@ }, // enable "/enable" => { - match core.enable(&words.next().unwrap().parse::()?).await { - Ok(_) => { - reply.push("Channel enabled\\.".to_string()); - } + match &words.next().unwrap().parse::() { Err(err) => { - core.debug(&err.to_string())?; + reply.push(format!("I need a number\\.\n{}", &err)); + }, + Ok(number) => { + match core.enable(&number).await { + Ok(_) => { + reply.push("Channel enabled\\.".to_string()); + } + Err(err) => { + core.debug(&err.to_string())?; + }, + }; }, }; }, // disable "/disable" => { - match core.disable(&words.next().unwrap().parse::()?).await { - Ok(_) => { - reply.push("Channel disabled\\.".to_string()); - } + match &words.next().unwrap().parse::() { Err(err) => { - core.debug(&err.to_string())?; + reply.push(format!("I need a number\\.\n{}", &err)); + }, + Ok(number) => { + match core.disable(&number).await { + Ok(_) => { + reply.push("Channel disabled\\.".to_string()); + } + Err(err) => { + core.debug(&err.to_string())?; + }, + }; }, }; }, _ => { @@ -464,13 +558,10 @@ }, _ => {}, }; }, - Err(err) => { - core.debug(&err.to_string())?; - }, }; } Ok(()) }