Lines of
src/main.rs
from check-in 5609487b23
that are changed by the sequence of edits moving toward
check-in fcf57ccb36:
1: use std::collections::{BTreeMap, HashSet};
2: use std::sync::{Arc, Mutex};
3:
4: use config;
5:
6: use tokio;
7:
8: use rss;
9:
10: use chrono::DateTime;
11:
12: use regex::Regex;
13:
14: use telegram_bot::*;
15: use tokio::stream::StreamExt;
16:
17: use sqlx::postgres::PgPoolOptions;
18: use sqlx::Row;
19: //use sqlx::Done; // .rows_affected()
20:
21: #[macro_use]
22: extern crate lazy_static;
23:
24: use anyhow::{anyhow, bail, Context, Result};
25:
26: #[derive(Clone)]
27: struct Core {
28: owner: i64,
29: api_key: String,
30: owner_chat: UserId,
31: tg: telegram_bot::Api,
32: my: User,
33: pool: sqlx::Pool<sqlx::Postgres>,
34: sources: Arc<Mutex<HashSet<Arc<i32>>>>,
35: }
36:
37: impl Core {
38: async fn new(settings: config::Config) -> Result<Core> {
39: let owner = settings.get_int("owner")?;
40: let api_key = settings.get_str("api_key")?;
41: let tg = Api::new(&api_key);
42: let core = Core {
43: owner: owner,
44: api_key: api_key.clone(),
45: my: tg.send(telegram_bot::GetMe).await?,
46: tg: tg,
47: owner_chat: UserId::new(owner),
48: pool: PgPoolOptions::new()
49: .max_connections(5)
50: .connect_timeout(std::time::Duration::new(300, 0))
51: .idle_timeout(std::time::Duration::new(60, 0))
52: .connect_lazy(&settings.get_str("pg")?)?,
53: sources: Arc::new(Mutex::new(HashSet::new())),
54: };
55: let clone = core.clone();
56: tokio::spawn(async move {
57: if let Err(err) = &clone.autofetch().await {
58: if let Err(err) = clone.debug(&format!("š {:?}", err), None) {
59: eprintln!("Autofetch error: {}", err);
60: };
61: }
62: });
63: Ok(core)
64: }
65:
66: fn stream(&self) -> telegram_bot::UpdatesStream {
67: self.tg.stream()
68: }
69:
70: fn debug(&self, msg: &str, target: Option<UserId>) -> Result<()> {
71: self.tg.spawn(SendMessage::new(match target {
72: Some(user) => user,
73: None => self.owner_chat,
74: }, msg));
75: Ok(())
76: }
77:
78: async fn check<S>(&self, id: i32, owner: S, real: bool) -> Result<()>
79: where S: Into<i64> {
80: let owner: i64 = owner.into();
81: let id = {
82: let mut set = self.sources.lock().unwrap();
83: match set.get(&id) {
84: Some(id) => id.clone(),
85: None => {
86: let id = Arc::new(id);
87: set.insert(id.clone());
88: id.clone()
89: },
90: }
91: };
92: let count = Arc::strong_count(&id);
93: if count == 2 {
94: let mut conn = self.pool.acquire().await
95: .with_context(|| format!("Query queue fetch conn:\n{:?}", &self.pool))?;
96: let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1 and owner = $2")
97: .bind(*id)
98: .bind(owner)
99: .fetch_one(&mut conn).await
100: .with_context(|| format!("Query source:\n{:?}", &self.pool))?;
101: drop(conn);
102: let channel_id: i64 = row.try_get("channel_id")?;
103: let destination = match real {
104: true => UserId::new(channel_id),
105: false => UserId::new(row.try_get("owner")?),
106: };
107: let url: &str = row.try_get("url")?;
108: let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
109: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
110: let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
111: let feed = rss::Channel::from_url(url)
112: .with_context(|| format!("Problem opening feed url:\n{}", &url))?;
113: for item in feed.items() {
114: let date = match item.pub_date() {
115: Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
116: None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
117: }?;
118: let url = item.link().unwrap().to_string();
119: posts.insert(date.clone(), url.clone());
120: };
121: for (date, url) in posts.iter() {
122: let mut conn = self.pool.acquire().await
123: .with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?;
124: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
125: .bind(&url)
126: .bind(*id)
127: .fetch_one(&mut conn).await
128: .with_context(|| format!("Check post:\n{:?}", &conn))?;
129: let exists: bool = row.try_get("exists")?;
130: if ! exists {
131: if this_fetch == None || *date > this_fetch.unwrap() {
132: this_fetch = Some(*date);
133: };
134: self.tg.send( match iv_hash {
135: Some(x) => SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", url, x)),
136: None => SendMessage::new(destination, format!("{}", url)),
137: }.parse_mode(types::ParseMode::Html)).await
138: .context("Can't post message:")?;
139: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
140: .bind(*id)
141: .bind(date)
142: .bind(url)
143: .execute(&mut conn).await
144: .with_context(|| format!("Record post:\n{:?}", &conn))?;
145: drop(conn);
146: tokio::time::delay_for(std::time::Duration::new(4, 0)).await;
147: };
148: };
149: posts.clear();
150: };
151: let mut conn = self.pool.acquire().await
152: .with_context(|| format!("Update scrape fetch conn:\n{:?}", &self.pool))?;
153: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
154: .bind(*id)
155: .execute(&mut conn).await
156: .with_context(|| format!("Update scrape:\n{:?}", &conn))?;
157: Ok(())
158: }
159:
160: async fn delete<S>(&self, source_id: &i32, owner: S) -> Result<String>
161: where S: Into<i64> {
162: let owner: i64 = owner.into();
163: let mut conn = self.pool.acquire().await
164: .with_context(|| format!("Delete fetch conn:\n{:?}", &self.pool))?;
165: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
166: .bind(source_id)
167: .bind(owner)
168: .execute(&mut conn).await
169: .with_context(|| format!("Delete source rule:\n{:?}", &self.pool))?
170: .rows_affected() {
171: 0 => { Ok("No data found found\\.".to_string()) },
172: x => { Ok(format!("{} sources removed\\.", x)) },
173: }
174: }
175:
176: async fn clean<S>(&self, source_id: &i32, owner: S) -> Result<String>
177: where S: Into<i64> {
178: let owner: i64 = owner.into();
179: let mut conn = self.pool.acquire().await
180: .with_context(|| format!("Clean fetch conn:\n{:?}", &self.pool))?;
181: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
182: .bind(source_id)
183: .bind(owner)
184: .execute(&mut conn).await
185: .with_context(|| format!("Clean seen posts:\n{:?}", &self.pool))?
186: .rows_affected() {
187: 0 => { Ok("No data found found\\.".to_string()) },
188: x => { Ok(format!("{} posts purged\\.", x)) },
189: }
190: }
191:
192: async fn enable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
193: where S: Into<i64> {
194: let owner: i64 = owner.into();
195: let mut conn = self.pool.acquire().await
196: .with_context(|| format!("Enable fetch conn:\n{:?}", &self.pool))?;
197: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
198: .bind(source_id)
199: .bind(owner)
200: .execute(&mut conn).await
201: .with_context(|| format!("Enable source:\n{:?}", &self.pool))?
202: .rows_affected() {
203: 1 => { Ok("Source enabled\\.") },
204: 0 => { Ok("Source not found\\.") },
205: _ => { Err(anyhow!("Database error.")) },
206: }
207: }
208:
209: async fn disable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
210: where S: Into<i64> {
211: let owner: i64 = owner.into();
212: let mut conn = self.pool.acquire().await
213: .with_context(|| format!("Disable fetch conn:\n{:?}", &self.pool))?;
214: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
215: .bind(source_id)
216: .bind(owner)
217: .execute(&mut conn).await
218: .with_context(|| format!("Disable source:\n{:?}", &self.pool))?
219: .rows_affected() {
220: 1 => { Ok("Source disabled\\.") },
221: 0 => { Ok("Source not found\\.") },
222: _ => { Err(anyhow!("Database error.")) },
223: }
224: }
225:
226: async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, owner: S) -> Result<String>
227: where S: Into<i64> {
228: let owner: i64 = owner.into();
229: let mut conn = self.pool.acquire().await
230: .with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;
231:
232: match match update {
233: Some(id) => {
234: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6 where source_id = $1").bind(id)
235: },
236: None => {
237: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel) values ($1, $2, $3, $4, $5)")
238: },
239: }
240: .bind(channel_id)
241: .bind(url)
242: .bind(iv_hash)
243: .bind(owner)
244: .bind(channel)
245: .execute(&mut conn).await {
246: Ok(_) => return Ok(String::from("Channel added\\.")),
247: Err(sqlx::Error::Database(err)) => {
248: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
249: Some("_bt_check_unique", ) => {
250: return Ok("Duplicate key\\.".to_string())
251: },
252: Some(_) => {
253: return Ok("Database error\\.".to_string())
254: },
255: None => {
256: return Ok("No database error extracted\\.".to_string())
257: },
258: };
259: },
260: Err(err) => {
261: bail!("Sorry, unknown error:\n{:#?}\n", err);
262: },
263: };
264: }
265:
266: async fn autofetch(&self) -> Result<()> {
5609487b23 2021-06-28 267: let mut delay = chrono::Duration::minutes(5);
268: let mut now;
269: loop {
270: let mut conn = self.pool.acquire().await
271: .with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?;
272: now = chrono::Local::now();
5609487b23 2021-06-28 273: let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '5 minutes';")
274: .fetch_all(&mut conn).await?;
275: for row in queue.iter() {
276: let source_id: i32 = row.try_get("source_id")?;
277: let owner: i64 = row.try_get("owner")?;
278: let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
279: if next_fetch < now {
280: //let clone = self.clone();
281: //clone.owner_chat(UserId::new(owner));
282: let clone = Core {
283: owner_chat: UserId::new(owner),
284: ..self.clone()
285: };
286: tokio::spawn(async move {
287: if let Err(err) = clone.check(source_id, owner, true).await {
288: if let Err(err) = clone.debug(&format!("š {:?}", err), None) {
289: eprintln!("Check error: {}", err);
290: };
291: };
292: });
293: } else {
294: if next_fetch - now < delay {
295: delay = next_fetch - now;
296: }
297: }
298: };
299: queue.clear();
300: tokio::time::delay_for(delay.to_std()?).await;
5609487b23 2021-06-28 301: delay = chrono::Duration::minutes(5);
302: }
303: }
304:
305: async fn list<S>(&self, owner: S) -> Result<Vec<String>>
306: where S: Into<i64> {
307: let owner = owner.into();
308: let mut reply = vec![];
309: let mut conn = self.pool.acquire().await
310: .with_context(|| format!("List fetch conn:\n{:?}", &self.pool))?;
311: reply.push("Channels:".to_string());
312: let rows = sqlx::query("select source_id, channel, enabled, url, iv_hash from rsstg_source where owner = $1 order by source_id")
313: .bind(owner)
314: .fetch_all(&mut conn).await?;
315: for row in rows.iter() {
316: let source_id: i32 = row.try_get("source_id")?;
317: let username: &str = row.try_get("channel")?;
318: let enabled: bool = row.try_get("enabled")?;
319: let url: &str = row.try_get("url")?;
320: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
321: reply.push(format!("\n\\#ļøā£ {} \\*ļøā£ `{}` {}\nš `{}`", source_id, username,
322: match enabled {
323: true => "š enabled",
324: false => "ā disabled",
325: }, url));
326: if let Some(hash) = iv_hash {
327: reply.push(format!("IV `{}`", hash));
328: }
329: };
330: Ok(reply)
331: }
332: }
333:
334: #[tokio::main]
335: async fn main() -> Result<()> {
336: let mut settings = config::Config::default();
337: settings.merge(config::File::with_name("rsstg"))?;
338:
339: let core = Core::new(settings).await?;
340:
341: let mut stream = core.stream();
342: let mut reply_to: Option<UserId>;
343:
344: loop {
345: reply_to = None;
346: match stream.next().await {
347: Some(update) => {
348: if let Err(err) = handle(update?, &core, &mut reply_to).await {
349: core.debug(&format!("š {:?}", err), reply_to)?;
350: };
351: },
352: None => {
353: core.debug(&format!("š None error."), None)?;
354: }
355: };
356: }
357:
358: //Ok(())
359: }
360:
361: async fn handle(update: telegram_bot::Update, core: &Core, mut _reply_to: &Option<UserId>) -> Result<()> {
362: lazy_static! {
363: static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap();
364: static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap();
365: static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap();
366: }
367:
368: match update.kind {
369: UpdateKind::Message(message) => {
370: let mut reply: Vec<String> = vec![];
371: match message.kind {
372: MessageKind::Text { ref data, .. } => {
373: let mut words = data.split_whitespace();
374: let cmd = words.next().unwrap();
375: match cmd {
376:
377: // start
378:
379: "/start" => {
380: reply.push("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.".to_string());
381: },
382:
383: // list
384:
385: "/list" => {
386: reply.append(&mut core.list(message.from.id).await?);
387: },
388:
389: // add
390:
391: "/add" | "/update" => {
392: _reply_to = &Some(message.from.id);
393: let mut source_id: Option<i32> = None;
394: let at_least = "Requires at least 3 parameters.";
395: if cmd == "/update" {
396: let first_word = words.next()
397: .context(at_least)?;
398: source_id = Some(first_word.parse::<i32>()
399: .with_context(|| format!("I need a number, but got {}.", first_word))?);
400: }
401: let (channel, url, iv_hash) = (
402: words.next().context(at_least)?,
403: words.next().context(at_least)?,
404: words.next());
405: if ! RE_USERNAME.is_match(&channel) {
406: reply.push("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?".to_string());
407: bail!("Wrong username {:?}.", &channel);
408: }
409: if ! RE_LINK.is_match(&url) {
410: reply.push("Link should be link to atom/rss feed, something like \"https://domain/path\"\\.".to_string());
411: bail!("Url: {:?}", &url);
412: }
413: if let Some(hash) = iv_hash {
414: if ! RE_IV_HASH.is_match(&hash) {
415: reply.push("IV hash should be 14 hex digits.".to_string());
416: bail!("IV: {:?}", &iv_hash);
417: };
418: };
419: let channel_id = i64::from(core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await?.id());
420: let chan_adm = core.tg.send(telegram_bot::GetChatAdministrators::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await
421: .context("Sorry, I have no access to that chat\\.")?;
422: let (mut me, mut user) = (false, false);
423: for admin in chan_adm {
424: if admin.user.id == core.my.id {
425: me = true;
426: };
427: if admin.user.id == message.from.id {
428: user = true;
429: };
430: };
431: if ! me { bail!("I need to be admin on that channel\\."); };
432: if ! user { bail!("You should be admin on that channel\\."); };
433: reply.push(core.update(source_id, channel, channel_id, url, iv_hash, message.from.id).await?);
434: },
435:
436: // check
437:
438: "/check" => {
439: match &words.next().unwrap().parse::<i32>() {
440: Err(err) => {
441: reply.push(format!("I need a number\\.\n{}", &err));
442: },
443: Ok(number) => {
444: core.check(*number, message.from.id, false).await
445: .context("Channel check failed.")?;
446: },
447: };
448: },
449:
450: // clean
451:
452: "/clean" => {
453: match &words.next().unwrap().parse::<i32>() {
454: Err(err) => {
455: reply.push(format!("I need a number\\.\n{}", &err));
456: },
457: Ok(number) => {
458: let result = core.clean(&number, message.from.id).await?;
459: reply.push(result.to_string());
460: },
461: };
462: },
463:
464: // enable
465:
466: "/enable" => {
467: match &words.next().unwrap().parse::<i32>() {
468: Err(err) => {
469: reply.push(format!("I need a number\\.\n{}", &err));
470: },
471: Ok(number) => {
472: let result = core.enable(&number, message.from.id).await?;
473: reply.push(result.to_string());
474: },
475: };
476: },
477:
478: // delete
479:
480: "/delete" => {
481: match &words.next().unwrap().parse::<i32>() {
482: Err(err) => {
483: reply.push(format!("I need a number\\.\n{}", &err));
484: },
485: Ok(number) => {
486: let result = core.delete(&number, message.from.id).await?;
487: reply.push(result.to_string());
488: },
489: };
490: },
491:
492: // disable
493:
494: "/disable" => {
495: match &words.next().unwrap().parse::<i32>() {
496: Err(err) => {
497: reply.push(format!("I need a number\\.\n{}", &err));
498: },
499: Ok(number) => {
500: let result = core.disable(&number, message.from.id).await?;
501: reply.push(result.to_string());
502: },
503: };
504: },
505:
506: _ => {
507: },
508: };
509: },
510: _ => {
511: },
512: };
513:
514: if reply.len() > 0 {
515: if let Err(err) = core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await {
516: dbg!(reply.join("\n"));
517: println!("{}", err);
518: };
519: };
520: },
521: _ => {},
522: };
523:
524: Ok(())
525: }