Overview
Comment: | 0.1.9: add ownership checks, rewrite error handling |
---|---|
Downloads: | Tarball | ZIP archive | SQL archive |
Timelines: | family | ancestors | descendants | both | trunk |
Files: | files | file ages | folders |
SHA3-256: |
ebe7c281a5bb2e2187157f210433ca2e |
User & Date: | arcade on 2020-11-27 19:29:11.467 |
Other Links: | manifest | tags |
Context
2020-11-29
| ||
19:48 | added lock, detailed errors check-in: 39ee25f5c3 user: arcade tags: trunk | |
2020-11-27
| ||
19:29 | 0.1.9: add ownership checks, rewrite error handling check-in: ebe7c281a5 user: arcade tags: trunk | |
07:18 | 0.1.8: rewrite error handling check-in: 423cadd9c7 user: arcade tags: trunk | |
Changes
Modified Cargo.toml
from [36bab9d234]
to [d15c4051b4].
1 2 | [package] name = "rsstg" | | | 1 2 3 4 5 6 7 8 9 10 | [package] name = "rsstg" version = "0.1.9" authors = ["arcade"] edition = "2018" [dependencies] chrono = "*" config = "*" futures = "*" |
︙ | ︙ |
Modified rsstg.sql
from [06336e3838]
to [7684f91d94].
︙ | ︙ | |||
28 29 30 31 32 33 34 | FOREIGN KEY (source_id) REFERENCES rsstg_source(source_id) ); create unique index rsstg_post__url on rsstg_post(url); create index rsstg_post__hour on rsstg_post(hour); create index rsstg_post__posted_hour on rsstg_post(posted,hour); create or replace view rsstg_order as | | | 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 | FOREIGN KEY (source_id) REFERENCES rsstg_source(source_id) ); create unique index rsstg_post__url on rsstg_post(url); create index rsstg_post__hour on rsstg_post(hour); create index rsstg_post__posted_hour on rsstg_post(posted,hour); create or replace view rsstg_order as select source_id, coalesce(last_scrape + make_interval(0,0,0,0,0,(60 / (coalesce(activity, 1)/7 + 1) )::integer), now() - interval '1 minute') as next_fetch, owner from rsstg_source natural left join (select source_id, count(*) as activity from rsstg_post where hour = extract('hour' from now())::smallint and posted > now() - interval '7 days' group by source_id) as act where enabled |
︙ | ︙ |
Modified src/main.rs
from [1d0c95223a]
to [7b25067601].
︙ | ︙ | |||
65 66 67 68 69 70 71 | } fn debug(&self, msg: &str) -> Result<()> { self.tg.spawn(SendMessage::new(self.owner_chat, msg)); Ok(()) } | | > > | > | 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | } fn debug(&self, msg: &str) -> Result<()> { self.tg.spawn(SendMessage::new(self.owner_chat, msg)); Ok(()) } async fn check<S>(&self, id: &i32, owner: S, real: bool) -> Result<()> where S: Into<i64> { let owner: i64 = owner.into(); let mut conn = self.pool.acquire().await .with_context(|| format!("š Query queue fetch conn:\n{:?}", &self.pool))?; let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1 and owner = $2") .bind(id) .bind(owner) .fetch_one(&mut conn).await .with_context(|| format!("š Query source:\n{:?}", &self.pool))?; drop(conn); let channel_id: i64 = row.try_get("channel_id")?; let destination = match real { true => UserId::new(channel_id), false => UserId::new(row.try_get("owner")?), |
︙ | ︙ | |||
130 131 132 133 134 135 136 | sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") .bind(id) .execute(&mut conn).await .with_context(|| format!("š Update scrape:\n{:?}", &conn))?; Ok(()) } | | > > | > | | > > | | > | > > | | > > | | > | > > > > > > > > > > > > > > > > > > > > > > > > > > | 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 | sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;") .bind(id) .execute(&mut conn).await .with_context(|| format!("š Update scrape:\n{:?}", &conn))?; Ok(()) } async fn clean<S>(&self, source_id: &i32, id: S) -> Result<String> where S: Into<i64> { let id: i64 = id.into(); let mut conn = self.pool.acquire().await .with_context(|| format!("š Clean fetch conn:\n{:?}", &self.pool))?; match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;") .bind(source_id) .bind(id) .execute(&mut conn).await .with_context(|| format!("š Clean seen posts:\n{:?}", &self.pool))? .rows_affected() { 0 => { Ok("No data found found\\.".to_string()) }, x => { Ok(format!("{} posts purged\\.", x)) }, } } async fn enable<S>(&self, source_id: &i32, id: S) -> Result<&str> where S: Into<i64> { let id: i64 = id.into(); let mut conn = self.pool.acquire().await .with_context(|| format!("š Enable fetch conn:\n{:?}", &self.pool))?; match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2") .bind(source_id) .bind(id) .execute(&mut conn).await .with_context(|| format!("š Enable source:\n\n{:?}", &self.pool))? .rows_affected() { 1 => { Ok("Source disabled\\.") }, 0 => { Ok("Source not found\\.") }, _ => { Err(anyhow!("Database error.")) }, } } async fn disable<S>(&self, source_id: &i32, id: S) -> Result<&str> where S: Into<i64> { let id: i64 = id.into(); let mut conn = self.pool.acquire().await .with_context(|| format!("š Disable fetch conn:\n{:?}", &self.pool))?; match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2") .bind(source_id) .bind(id) .execute(&mut conn).await .with_context(|| format!("š Disable source:\n\n{:?}", &self.pool))? .rows_affected() { 1 => { Ok("Source disabled\\.") }, 0 => { Ok("Source not found\\.") }, _ => { Err(anyhow!("Database error.")) }, } } async fn autofetch(&self) -> Result<()> { let mut delay = chrono::Duration::minutes(5); let mut now; loop { let mut conn = self.pool.acquire().await .with_context(|| format!("š Autofetch fetch conn:\n{:?}", &self.pool))?; now = chrono::Local::now(); let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source natural left join rsstg_channel where next_fetch < now();") .fetch_all(&mut conn).await?; for row in queue.iter() { let source_id: i32 = row.try_get("source_id")?; let owner: i64 = row.try_get("owner")?; let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?; if next_fetch < now { sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;") .bind(source_id) .execute(&mut conn).await .with_context(|| format!("š Lock source:\n\n{:?}", &self.pool))?; let clone = self.clone(); tokio::spawn(async move { if let Err(err) = clone.check(&source_id, owner, true).await { if let Err(err) = clone.debug(&err.to_string()) { eprintln!("Check error: {}", err); }; }; }); } else { if next_fetch - now < delay { delay = next_fetch - now; } } }; queue.clear(); tokio::time::delay_for(delay.to_std()?).await; delay = chrono::Duration::minutes(5); } } async fn list(&self, id: telegram_bot::UserId) -> Result<Vec<String>> { let id = i64::from(id); let mut reply = vec![]; let mut conn = self.pool.acquire().await .with_context(|| format!("š List fetch conn:\n{:?}", &self.pool))?; reply.push("Channels:".to_string()); let rows = sqlx::query("select source_id, username, enabled, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1 order by source_id") .bind(id) .fetch_all(&mut conn).await?; for row in rows.iter() { let source_id: i32 = row.try_get("source_id")?; let username: &str = row.try_get("username")?; let enabled: bool = row.try_get("enabled")?; let url: &str = row.try_get("url")?; let iv_hash: Option<&str> = row.try_get("iv_hash")?; reply.push(format!("\n\\#ļøā£ {} \\*ļøā£ `{}` {}\nš `{}`", source_id, username, match enabled { true => "š enabled", false => "ā disabled", }, url)); if let Some(hash) = iv_hash { reply.push(format!("IV `{}`", hash)); } }; Ok(reply) } } #[tokio::main] async fn main() -> Result<()> { let mut settings = config::Config::default(); settings.merge(config::File::with_name("rsstg"))?; |
︙ | ︙ | |||
252 253 254 255 256 257 258 | "/start" => { reply.push("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.".to_string()); }, // list "/list" => { | < < < < < < < | < < < < < < < < < < < < < < < < < < < | 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 | "/start" => { reply.push("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.".to_string()); }, // list "/list" => { reply.append(&mut core.list(message.from.id).await?); }, // add "/add" | "/update" => { let mut source_id: i32 = 0; if cmd == "/update" { |
︙ | ︙ | |||
422 423 424 425 426 427 428 | "/check" => { match &words.next().unwrap().parse::<i32>() { Err(err) => { reply.push(format!("I need a number\\.\n{}", &err)); }, Ok(number) => { | | < < < < | < < < < < | > > > > | > | > | 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 | "/check" => { match &words.next().unwrap().parse::<i32>() { Err(err) => { reply.push(format!("I need a number\\.\n{}", &err)); }, Ok(number) => { core.check(&number, message.from.id, false).await .context("š Channel check failed.")?; }, }; }, // clean "/clean" => { match &words.next().unwrap().parse::<i32>() { Err(err) => { reply.push(format!("I need a number\\.\n{}", &err)); }, Ok(number) => { let result = core.clean(&number, message.from.id).await?; reply.push(result.to_string()); }, }; }, // enable "/enable" => { match &words.next().unwrap().parse::<i32>() { Err(err) => { |
︙ | ︙ |