Lines of
src/main.rs
from check-in f322efafd9
that are changed by the sequence of edits moving toward
check-in 5148f929f3:
f322efafd9 2020-11-30 1: use std::collections::BTreeMap;
2:
3: use config;
4:
5: use tokio;
6:
7: use rss;
8:
9: use chrono::DateTime;
10:
11: use regex::Regex;
12:
13: use telegram_bot::*;
14: use tokio::stream::StreamExt;
15:
16: use sqlx::postgres::PgPoolOptions;
17: use sqlx::Row;
18: use sqlx::Done; // .rows_affected()
19:
20: #[macro_use]
21: extern crate lazy_static;
22:
23: use anyhow::{anyhow, bail, Context, Result};
24:
25: #[derive(Clone)]
26: struct Core {
27: owner: i64,
28: api_key: String,
29: owner_chat: UserId,
30: tg: telegram_bot::Api,
31: my: User,
32: pool: sqlx::Pool<sqlx::Postgres>,
33: }
34:
35: impl Core {
36: async fn new(settings: config::Config) -> Result<Core> {
37: let owner = settings.get_int("owner")?;
38: let api_key = settings.get_str("api_key")?;
39: let tg = Api::new(&api_key);
40: let core = Core {
41: owner: owner,
42: api_key: api_key.clone(),
43: my: tg.send(telegram_bot::GetMe).await?,
44: tg: tg,
45: owner_chat: UserId::new(owner),
46: pool: PgPoolOptions::new()
47: .max_connections(5)
48: .connect_timeout(std::time::Duration::new(300, 0))
49: .idle_timeout(std::time::Duration::new(60, 0))
50: .connect_lazy(&settings.get_str("pg")?)?,
51: };
52: let clone = core.clone();
53: tokio::spawn(async move {
54: if let Err(err) = &clone.autofetch().await {
55: if let Err(err) = clone.debug(&format!("š {:?}", err)) {
56: eprintln!("Autofetch error: {}", err);
57: };
58: }
59: });
60: Ok(core)
61: }
62:
63: fn stream(&self) -> telegram_bot::UpdatesStream {
64: self.tg.stream()
65: }
66:
67: fn debug(&self, msg: &str) -> Result<()> {
68: self.tg.spawn(SendMessage::new(self.owner_chat, msg));
69: Ok(())
70: }
71:
f322efafd9 2020-11-30 72: async fn check<S>(&self, id: &i32, owner: S, real: bool) -> Result<()>
73: where S: Into<i64> {
74: let owner: i64 = owner.into();
f322efafd9 2020-11-30 75: let mut conn = self.pool.acquire().await
f322efafd9 2020-11-30 76: .with_context(|| format!("Query queue fetch conn:\n{:?}", &self.pool))?;
f322efafd9 2020-11-30 77: let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1 and owner = $2")
f322efafd9 2020-11-30 78: .bind(id)
f322efafd9 2020-11-30 79: .bind(owner)
f322efafd9 2020-11-30 80: .fetch_one(&mut conn).await
f322efafd9 2020-11-30 81: .with_context(|| format!("Query source:\n{:?}", &self.pool))?;
f322efafd9 2020-11-30 82: drop(conn);
f322efafd9 2020-11-30 83: let channel_id: i64 = row.try_get("channel_id")?;
f322efafd9 2020-11-30 84: let destination = match real {
f322efafd9 2020-11-30 85: true => UserId::new(channel_id),
f322efafd9 2020-11-30 86: false => UserId::new(row.try_get("owner")?),
f322efafd9 2020-11-30 87: };
f322efafd9 2020-11-30 88: let url: &str = row.try_get("url")?;
f322efafd9 2020-11-30 89: let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
f322efafd9 2020-11-30 90: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
f322efafd9 2020-11-30 91: let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
f322efafd9 2020-11-30 92: let feed = rss::Channel::from_url(url)
f322efafd9 2020-11-30 93: .with_context(|| format!("Problem opening feed url:\n{}", &url))?;
f322efafd9 2020-11-30 94: for item in feed.items() {
f322efafd9 2020-11-30 95: let date = match item.pub_date() {
f322efafd9 2020-11-30 96: Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
f322efafd9 2020-11-30 97: None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
f322efafd9 2020-11-30 98: }?;
f322efafd9 2020-11-30 99: let url = item.link().unwrap().to_string();
f322efafd9 2020-11-30 100: posts.insert(date.clone(), url.clone());
f322efafd9 2020-11-30 101: };
f322efafd9 2020-11-30 102: for (date, url) in posts.iter() {
f322efafd9 2020-11-30 103: let mut conn = self.pool.acquire().await
f322efafd9 2020-11-30 104: .with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?;
f322efafd9 2020-11-30 105: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
f322efafd9 2020-11-30 106: .bind(&url)
f322efafd9 2020-11-30 107: .bind(id)
f322efafd9 2020-11-30 108: .fetch_one(&mut conn).await
f322efafd9 2020-11-30 109: .with_context(|| format!("Check post:\n{:?}", &conn))?;
f322efafd9 2020-11-30 110: let exists: bool = row.try_get("exists")?;
f322efafd9 2020-11-30 111: if ! exists {
f322efafd9 2020-11-30 112: if this_fetch == None || *date > this_fetch.unwrap() {
f322efafd9 2020-11-30 113: this_fetch = Some(*date);
f322efafd9 2020-11-30 114: };
f322efafd9 2020-11-30 115: self.tg.send( match iv_hash {
f322efafd9 2020-11-30 116: Some(x) => SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", url, x)),
f322efafd9 2020-11-30 117: None => SendMessage::new(destination, format!("{}", url)),
f322efafd9 2020-11-30 118: }.parse_mode(types::ParseMode::Html)).await
f322efafd9 2020-11-30 119: .context("Can't post message:")?;
f322efafd9 2020-11-30 120: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
f322efafd9 2020-11-30 121: .bind(id)
f322efafd9 2020-11-30 122: .bind(date)
f322efafd9 2020-11-30 123: .bind(url)
f322efafd9 2020-11-30 124: .execute(&mut conn).await
f322efafd9 2020-11-30 125: .with_context(|| format!("Record post:\n{:?}", &conn))?;
f322efafd9 2020-11-30 126: drop(conn);
f322efafd9 2020-11-30 127: tokio::time::delay_for(std::time::Duration::new(4, 0)).await;
f322efafd9 2020-11-30 128: };
f322efafd9 2020-11-30 129: };
f322efafd9 2020-11-30 130: posts.clear();
131: let mut conn = self.pool.acquire().await
132: .with_context(|| format!("Update scrape fetch conn:\n{:?}", &self.pool))?;
133: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
f322efafd9 2020-11-30 134: .bind(id)
135: .execute(&mut conn).await
136: .with_context(|| format!("Update scrape:\n{:?}", &conn))?;
137: Ok(())
138: }
139:
140: async fn delete<S>(&self, source_id: &i32, owner: S) -> Result<String>
141: where S: Into<i64> {
142: let owner: i64 = owner.into();
143: let mut conn = self.pool.acquire().await
144: .with_context(|| format!("Delete fetch conn:\n{:?}", &self.pool))?;
145: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
146: .bind(source_id)
147: .bind(owner)
148: .execute(&mut conn).await
149: .with_context(|| format!("Delete source rule:\n{:?}", &self.pool))?
150: .rows_affected() {
151: 0 => { Ok("No data found found\\.".to_string()) },
152: x => { Ok(format!("{} sources removed\\.", x)) },
153: }
154: }
155:
156: async fn clean<S>(&self, source_id: &i32, owner: S) -> Result<String>
157: where S: Into<i64> {
158: let owner: i64 = owner.into();
159: let mut conn = self.pool.acquire().await
160: .with_context(|| format!("Clean fetch conn:\n{:?}", &self.pool))?;
161: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
162: .bind(source_id)
163: .bind(owner)
164: .execute(&mut conn).await
165: .with_context(|| format!("Clean seen posts:\n{:?}", &self.pool))?
166: .rows_affected() {
167: 0 => { Ok("No data found found\\.".to_string()) },
168: x => { Ok(format!("{} posts purged\\.", x)) },
169: }
170: }
171:
172: async fn enable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
173: where S: Into<i64> {
174: let owner: i64 = owner.into();
175: let mut conn = self.pool.acquire().await
176: .with_context(|| format!("Enable fetch conn:\n{:?}", &self.pool))?;
177: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
178: .bind(source_id)
179: .bind(owner)
180: .execute(&mut conn).await
181: .with_context(|| format!("Enable source:\n{:?}", &self.pool))?
182: .rows_affected() {
183: 1 => { Ok("Source disabled\\.") },
184: 0 => { Ok("Source not found\\.") },
185: _ => { Err(anyhow!("Database error.")) },
186: }
187: }
188:
189: async fn disable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
190: where S: Into<i64> {
191: let owner: i64 = owner.into();
192: let mut conn = self.pool.acquire().await
193: .with_context(|| format!("Disable fetch conn:\n{:?}", &self.pool))?;
194: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
195: .bind(source_id)
196: .bind(owner)
197: .execute(&mut conn).await
198: .with_context(|| format!("Disable source:\n{:?}", &self.pool))?
199: .rows_affected() {
200: 1 => { Ok("Source disabled\\.") },
201: 0 => { Ok("Source not found\\.") },
202: _ => { Err(anyhow!("Database error.")) },
203: }
204: }
205:
206: async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, owner: S) -> Result<String>
207: where S: Into<i64> {
208: let owner: i64 = owner.into();
209: let mut conn = self.pool.acquire().await
210: .with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;
211:
212: match match update {
213: Some(id) => {
214: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6 where source_id = $1").bind(id)
215: },
216: None => {
217: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel) values ($1, $2, $3, $4, $5)")
218: },
219: }
220: .bind(channel_id)
221: .bind(url)
222: .bind(iv_hash)
223: .bind(owner)
224: .bind(channel)
225: .execute(&mut conn).await {
226: Ok(_) => return Ok(String::from("Channel added\\.")),
227: Err(sqlx::Error::Database(err)) => {
228: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
229: Some("_bt_check_unique", ) => {
230: return Ok("Duplicate key\\.".to_string())
231: },
232: Some(_) => {
233: return Ok("Database error\\.".to_string())
234: },
235: None => {
236: return Ok("No database error extracted\\.".to_string())
237: },
238: };
239: },
240: Err(err) => {
241: bail!("Sorry, unknown error:\n{:#?}\n", err);
242: },
243: };
244: }
245:
246: async fn autofetch(&self) -> Result<()> {
247: let mut delay = chrono::Duration::minutes(5);
248: let mut now;
249: loop {
250: let mut conn = self.pool.acquire().await
251: .with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?;
252: now = chrono::Local::now();
f322efafd9 2020-11-30 253: let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now();")
254: .fetch_all(&mut conn).await?;
255: for row in queue.iter() {
256: let source_id: i32 = row.try_get("source_id")?;
257: let owner: i64 = row.try_get("owner")?;
258: let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
259: if next_fetch < now {
f322efafd9 2020-11-30 260: sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;")
f322efafd9 2020-11-30 261: .bind(source_id)
f322efafd9 2020-11-30 262: .execute(&mut conn).await
f322efafd9 2020-11-30 263: .with_context(|| format!(" Lock source:\n\n{:?}", &self.pool))?;
f322efafd9 2020-11-30 264: let clone = self.clone();
265: tokio::spawn(async move {
f322efafd9 2020-11-30 266: if let Err(err) = clone.check(&source_id, owner, true).await {
267: if let Err(err) = clone.debug(&format!("š {:?}", err)) {
268: eprintln!("Check error: {}", err);
269: };
270: };
271: });
272: } else {
273: if next_fetch - now < delay {
274: delay = next_fetch - now;
275: }
276: }
277: };
278: queue.clear();
279: tokio::time::delay_for(delay.to_std()?).await;
280: delay = chrono::Duration::minutes(5);
281: }
282: }
283:
284: async fn list<S>(&self, owner: S) -> Result<Vec<String>>
285: where S: Into<i64> {
286: let owner = owner.into();
287: let mut reply = vec![];
288: let mut conn = self.pool.acquire().await
289: .with_context(|| format!("List fetch conn:\n{:?}", &self.pool))?;
290: reply.push("Channels:".to_string());
291: let rows = sqlx::query("select source_id, channel, enabled, url, iv_hash from rsstg_source where owner = $1 order by source_id")
292: .bind(owner)
293: .fetch_all(&mut conn).await?;
294: for row in rows.iter() {
295: let source_id: i32 = row.try_get("source_id")?;
296: let username: &str = row.try_get("channel")?;
297: let enabled: bool = row.try_get("enabled")?;
298: let url: &str = row.try_get("url")?;
299: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
300: reply.push(format!("\n\\#ļøā£ {} \\*ļøā£ `{}` {}\nš `{}`", source_id, username,
301: match enabled {
302: true => "š enabled",
303: false => "ā disabled",
304: }, url));
305: if let Some(hash) = iv_hash {
306: reply.push(format!("IV `{}`", hash));
307: }
308: };
309: Ok(reply)
310: }
311: }
312:
313: #[tokio::main]
314: async fn main() -> Result<()> {
315: let mut settings = config::Config::default();
316: settings.merge(config::File::with_name("rsstg"))?;
317:
318: let core = Core::new(settings).await?;
319:
320: let mut stream = core.stream();
321:
322: while let Some(update) = stream.next().await {
323: if let Err(err) = handle(update?, &core).await {
324: core.debug(&format!("š {:?}", err))?;
325: };
326: }
327:
328: Ok(())
329: }
330:
331: async fn handle(update: telegram_bot::Update, core: &Core) -> Result<()> {
332: lazy_static! {
333: static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap();
334: static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap();
335: static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap();
336: }
337:
338: match update.kind {
339: UpdateKind::Message(message) => {
340: let mut reply: Vec<String> = vec![];
341: match message.kind {
342: MessageKind::Text { ref data, .. } => {
343: let mut words = data.split_whitespace();
344: let cmd = words.next().unwrap();
345: match cmd {
346:
347: // start
348:
349: "/start" => {
350: reply.push("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.".to_string());
351: },
352:
353: // list
354:
355: "/list" => {
356: reply.append(&mut core.list(message.from.id).await?);
357: },
358:
359: // add
360:
361: "/add" | "/update" => {
362: let mut source_id: Option<i32> = None;
363: let at_least = "Requires at least 3 parameters.";
364: if cmd == "/update" {
365: let first_word = words.next()
366: .context(at_least)?;
367: source_id = Some(first_word.parse::<i32>()
368: .with_context(|| format!("I need a number, but got {}.", first_word))?);
369: }
370: let (channel, url, iv_hash) = (
371: words.next().context(at_least)?,
372: words.next().context(at_least)?,
373: words.next());
374: if ! RE_USERNAME.is_match(&channel) {
375: reply.push("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?".to_string());
376: bail!("Wrong username {:?}.", &channel);
377: }
378: if ! RE_LINK.is_match(&url) {
379: reply.push("Link should be link to atom/rss feed, something like \"https://domain/path\"\\.".to_string());
380: bail!("Url: {:?}", &url);
381: }
382: if let Some(hash) = iv_hash {
383: if ! RE_IV_HASH.is_match(&hash) {
384: reply.push("IV hash should be 14 hex digits.".to_string());
385: bail!("IV: {:?}", &iv_hash);
386: };
387: };
388: let channel_id = i64::from(core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await?.id());
389: let chan_adm = core.tg.send(telegram_bot::GetChatAdministrators::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await
390: .context("Sorry, I have no access to that chat\\.")?;
391: let (mut me, mut user) = (false, false);
392: for admin in chan_adm {
393: if admin.user.id == core.my.id {
394: me = true;
395: };
396: if admin.user.id == message.from.id {
397: user = true;
398: };
399: };
400: if ! me { bail!("I need to be admin on that channel\\."); };
401: if ! user { bail!("You should be admin on that channel\\."); };
402: reply.push(core.update(source_id, channel, channel_id, url, iv_hash, message.from.id).await?);
403: },
404:
405: // check
406:
407: "/check" => {
408: match &words.next().unwrap().parse::<i32>() {
409: Err(err) => {
410: reply.push(format!("I need a number\\.\n{}", &err));
411: },
412: Ok(number) => {
f322efafd9 2020-11-30 413: core.check(&number, message.from.id, false).await
414: .context("Channel check failed.")?;
415: },
416: };
417: },
418:
419: // clean
420:
421: "/clean" => {
422: match &words.next().unwrap().parse::<i32>() {
423: Err(err) => {
424: reply.push(format!("I need a number\\.\n{}", &err));
425: },
426: Ok(number) => {
427: let result = core.clean(&number, message.from.id).await?;
428: reply.push(result.to_string());
429: },
430: };
431: },
432:
433: // enable
434:
435: "/enable" => {
436: match &words.next().unwrap().parse::<i32>() {
437: Err(err) => {
438: reply.push(format!("I need a number\\.\n{}", &err));
439: },
440: Ok(number) => {
441: let result = core.enable(&number, message.from.id).await?;
442: reply.push(result.to_string());
443: },
444: };
445: },
446:
447: // delete
448:
449: "/delete" => {
450: match &words.next().unwrap().parse::<i32>() {
451: Err(err) => {
452: reply.push(format!("I need a number\\.\n{}", &err));
453: },
454: Ok(number) => {
455: let result = core.delete(&number, message.from.id).await?;
456: reply.push(result.to_string());
457: },
458: };
459: },
460:
461: // disable
462:
463: "/disable" => {
464: match &words.next().unwrap().parse::<i32>() {
465: Err(err) => {
466: reply.push(format!("I need a number\\.\n{}", &err));
467: },
468: Ok(number) => {
469: let result = core.disable(&number, message.from.id).await?;
470: reply.push(result.to_string());
471: },
472: };
473: },
474:
475: _ => {
476: },
477: };
478: },
479: _ => {
480: },
481: };
482:
483: if reply.len() > 0 {
484: if let Err(err) = core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await {
485: dbg!(reply.join("\n"));
486: println!("{}", err);
487: };
488: };
489: },
490: _ => {},
491: };
492:
493: Ok(())
494: }