Lines of
src/main.rs
from check-in fcf57ccb36
that are changed by the sequence of edits moving toward
check-in d52a6ff5c8:
1: use std::collections::{BTreeMap, HashSet};
2: use std::sync::{Arc, Mutex};
3:
4: use config;
5:
6: use tokio;
7:
8: use rss;
9:
10: use chrono::DateTime;
11:
12: use regex::Regex;
13:
14: use telegram_bot::*;
15: use tokio::stream::StreamExt;
16:
17: use sqlx::postgres::PgPoolOptions;
18: use sqlx::Row;
fcf57ccb36 2021-07-25 19: //use sqlx::Done; // .rows_affected()
20:
21: #[macro_use]
22: extern crate lazy_static;
23:
24: use anyhow::{anyhow, bail, Context, Result};
25:
26: #[derive(Clone)]
27: struct Core {
28: owner: i64,
29: api_key: String,
30: owner_chat: UserId,
31: tg: telegram_bot::Api,
32: my: User,
33: pool: sqlx::Pool<sqlx::Postgres>,
34: sources: Arc<Mutex<HashSet<Arc<i32>>>>,
35: }
36:
37: impl Core {
38: async fn new(settings: config::Config) -> Result<Core> {
39: let owner = settings.get_int("owner")?;
40: let api_key = settings.get_str("api_key")?;
41: let tg = Api::new(&api_key);
42: let core = Core {
43: owner: owner,
44: api_key: api_key.clone(),
45: my: tg.send(telegram_bot::GetMe).await?,
46: tg: tg,
47: owner_chat: UserId::new(owner),
48: pool: PgPoolOptions::new()
49: .max_connections(5)
50: .connect_timeout(std::time::Duration::new(300, 0))
51: .idle_timeout(std::time::Duration::new(60, 0))
52: .connect_lazy(&settings.get_str("pg")?)?,
53: sources: Arc::new(Mutex::new(HashSet::new())),
54: };
55: let clone = core.clone();
56: tokio::spawn(async move {
57: if let Err(err) = &clone.autofetch().await {
58: if let Err(err) = clone.debug(&format!("š {:?}", err), None) {
59: eprintln!("Autofetch error: {}", err);
60: };
61: }
62: });
63: Ok(core)
64: }
65:
66: fn stream(&self) -> telegram_bot::UpdatesStream {
67: self.tg.stream()
68: }
69:
70: fn debug(&self, msg: &str, target: Option<UserId>) -> Result<()> {
71: self.tg.spawn(SendMessage::new(match target {
72: Some(user) => user,
73: None => self.owner_chat,
74: }, msg));
75: Ok(())
76: }
77:
78: async fn check<S>(&self, id: i32, owner: S, real: bool) -> Result<()>
79: where S: Into<i64> {
80: let owner: i64 = owner.into();
81: let id = {
82: let mut set = self.sources.lock().unwrap();
83: match set.get(&id) {
84: Some(id) => id.clone(),
85: None => {
86: let id = Arc::new(id);
87: set.insert(id.clone());
88: id.clone()
89: },
90: }
91: };
92: let count = Arc::strong_count(&id);
93: if count == 2 {
94: let mut conn = self.pool.acquire().await
95: .with_context(|| format!("Query queue fetch conn:\n{:?}", &self.pool))?;
96: let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1 and owner = $2")
97: .bind(*id)
98: .bind(owner)
99: .fetch_one(&mut conn).await
100: .with_context(|| format!("Query source:\n{:?}", &self.pool))?;
101: drop(conn);
102: let channel_id: i64 = row.try_get("channel_id")?;
103: let destination = match real {
104: true => UserId::new(channel_id),
105: false => UserId::new(row.try_get("owner")?),
106: };
107: let url: &str = row.try_get("url")?;
108: let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
109: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
110: let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
fcf57ccb36 2021-07-25 111: let feed = rss::Channel::from_url(url)
112: .with_context(|| format!("Problem opening feed url:\n{}", &url))?;
113: for item in feed.items() {
114: let date = match item.pub_date() {
115: Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
116: None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
117: }?;
118: let url = item.link().unwrap().to_string();
119: posts.insert(date.clone(), url.clone());
120: };
121: for (date, url) in posts.iter() {
122: let mut conn = self.pool.acquire().await
123: .with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?;
124: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
125: .bind(&url)
126: .bind(*id)
127: .fetch_one(&mut conn).await
128: .with_context(|| format!("Check post:\n{:?}", &conn))?;
129: let exists: bool = row.try_get("exists")?;
130: if ! exists {
131: if this_fetch == None || *date > this_fetch.unwrap() {
132: this_fetch = Some(*date);
133: };
134: self.tg.send( match iv_hash {
135: Some(x) => SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", url, x)),
136: None => SendMessage::new(destination, format!("{}", url)),
137: }.parse_mode(types::ParseMode::Html)).await
138: .context("Can't post message:")?;
139: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
140: .bind(*id)
141: .bind(date)
142: .bind(url)
143: .execute(&mut conn).await
144: .with_context(|| format!("Record post:\n{:?}", &conn))?;
145: drop(conn);
146: tokio::time::delay_for(std::time::Duration::new(4, 0)).await;
147: };
148: };
149: posts.clear();
150: };
151: let mut conn = self.pool.acquire().await
152: .with_context(|| format!("Update scrape fetch conn:\n{:?}", &self.pool))?;
153: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
154: .bind(*id)
155: .execute(&mut conn).await
156: .with_context(|| format!("Update scrape:\n{:?}", &conn))?;
157: Ok(())
158: }
159:
160: async fn delete<S>(&self, source_id: &i32, owner: S) -> Result<String>
161: where S: Into<i64> {
162: let owner: i64 = owner.into();
163: let mut conn = self.pool.acquire().await
164: .with_context(|| format!("Delete fetch conn:\n{:?}", &self.pool))?;
165: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
166: .bind(source_id)
167: .bind(owner)
168: .execute(&mut conn).await
169: .with_context(|| format!("Delete source rule:\n{:?}", &self.pool))?
170: .rows_affected() {
171: 0 => { Ok("No data found found\\.".to_string()) },
172: x => { Ok(format!("{} sources removed\\.", x)) },
173: }
174: }
175:
176: async fn clean<S>(&self, source_id: &i32, owner: S) -> Result<String>
177: where S: Into<i64> {
178: let owner: i64 = owner.into();
179: let mut conn = self.pool.acquire().await
180: .with_context(|| format!("Clean fetch conn:\n{:?}", &self.pool))?;
181: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
182: .bind(source_id)
183: .bind(owner)
184: .execute(&mut conn).await
185: .with_context(|| format!("Clean seen posts:\n{:?}", &self.pool))?
186: .rows_affected() {
187: 0 => { Ok("No data found found\\.".to_string()) },
188: x => { Ok(format!("{} posts purged\\.", x)) },
189: }
190: }
191:
192: async fn enable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
193: where S: Into<i64> {
194: let owner: i64 = owner.into();
195: let mut conn = self.pool.acquire().await
196: .with_context(|| format!("Enable fetch conn:\n{:?}", &self.pool))?;
197: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
198: .bind(source_id)
199: .bind(owner)
200: .execute(&mut conn).await
201: .with_context(|| format!("Enable source:\n{:?}", &self.pool))?
202: .rows_affected() {
203: 1 => { Ok("Source enabled\\.") },
204: 0 => { Ok("Source not found\\.") },
205: _ => { Err(anyhow!("Database error.")) },
206: }
207: }
208:
209: async fn disable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
210: where S: Into<i64> {
211: let owner: i64 = owner.into();
212: let mut conn = self.pool.acquire().await
213: .with_context(|| format!("Disable fetch conn:\n{:?}", &self.pool))?;
214: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
215: .bind(source_id)
216: .bind(owner)
217: .execute(&mut conn).await
218: .with_context(|| format!("Disable source:\n{:?}", &self.pool))?
219: .rows_affected() {
220: 1 => { Ok("Source disabled\\.") },
221: 0 => { Ok("Source not found\\.") },
222: _ => { Err(anyhow!("Database error.")) },
223: }
224: }
225:
226: async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, owner: S) -> Result<String>
227: where S: Into<i64> {
228: let owner: i64 = owner.into();
229: let mut conn = self.pool.acquire().await
230: .with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;
231:
232: match match update {
233: Some(id) => {
234: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6 where source_id = $1").bind(id)
235: },
236: None => {
237: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel) values ($1, $2, $3, $4, $5)")
238: },
239: }
240: .bind(channel_id)
241: .bind(url)
242: .bind(iv_hash)
243: .bind(owner)
244: .bind(channel)
245: .execute(&mut conn).await {
246: Ok(_) => return Ok(String::from("Channel added\\.")),
247: Err(sqlx::Error::Database(err)) => {
248: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
249: Some("_bt_check_unique", ) => {
250: return Ok("Duplicate key\\.".to_string())
251: },
252: Some(_) => {
253: return Ok("Database error\\.".to_string())
254: },
255: None => {
256: return Ok("No database error extracted\\.".to_string())
257: },
258: };
259: },
260: Err(err) => {
261: bail!("Sorry, unknown error:\n{:#?}\n", err);
262: },
263: };
264: }
265:
266: async fn autofetch(&self) -> Result<()> {
267: let mut delay = chrono::Duration::minutes(1);
268: let mut now;
269: loop {
270: let mut conn = self.pool.acquire().await
271: .with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?;
272: now = chrono::Local::now();
273: let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
274: .fetch_all(&mut conn).await?;
275: for row in queue.iter() {
276: let source_id: i32 = row.try_get("source_id")?;
277: let owner: i64 = row.try_get("owner")?;
278: let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
279: if next_fetch < now {
280: //let clone = self.clone();
281: //clone.owner_chat(UserId::new(owner));
282: let clone = Core {
283: owner_chat: UserId::new(owner),
284: ..self.clone()
285: };
286: tokio::spawn(async move {
287: if let Err(err) = clone.check(source_id, owner, true).await {
288: if let Err(err) = clone.debug(&format!("š {:?}", err), None) {
289: eprintln!("Check error: {}", err);
290: };
291: };
292: });
293: } else {
294: if next_fetch - now < delay {
295: delay = next_fetch - now;
296: }
297: }
298: };
299: queue.clear();
300: tokio::time::delay_for(delay.to_std()?).await;
301: delay = chrono::Duration::minutes(1);
302: }
303: }
304:
305: async fn list<S>(&self, owner: S) -> Result<Vec<String>>
306: where S: Into<i64> {
307: let owner = owner.into();
308: let mut reply = vec![];
309: let mut conn = self.pool.acquire().await
310: .with_context(|| format!("List fetch conn:\n{:?}", &self.pool))?;
311: reply.push("Channels:".to_string());
312: let rows = sqlx::query("select source_id, channel, enabled, url, iv_hash from rsstg_source where owner = $1 order by source_id")
313: .bind(owner)
314: .fetch_all(&mut conn).await?;
315: for row in rows.iter() {
316: let source_id: i32 = row.try_get("source_id")?;
317: let username: &str = row.try_get("channel")?;
318: let enabled: bool = row.try_get("enabled")?;
319: let url: &str = row.try_get("url")?;
320: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
321: reply.push(format!("\n\\#ļøā£ {} \\*ļøā£ `{}` {}\nš `{}`", source_id, username,
322: match enabled {
323: true => "š enabled",
324: false => "ā disabled",
325: }, url));
326: if let Some(hash) = iv_hash {
327: reply.push(format!("IV `{}`", hash));
328: }
329: };
330: Ok(reply)
331: }
332: }
333:
334: #[tokio::main]
335: async fn main() -> Result<()> {
336: let mut settings = config::Config::default();
337: settings.merge(config::File::with_name("rsstg"))?;
338:
339: let core = Core::new(settings).await?;
340:
341: let mut stream = core.stream();
342: stream.allowed_updates(&[AllowedUpdate::Message]);
343: let mut reply_to: Option<UserId>;
344:
345: loop {
346: reply_to = None;
347: match stream.next().await {
348: Some(update) => {
349: if let Err(err) = handle(update?, &core, &mut reply_to).await {
350: core.debug(&format!("š {:?}", err), reply_to)?;
351: };
352: },
353: None => {
354: core.debug(&format!("š None error."), None)?;
355: }
356: };
357: }
358:
359: //Ok(())
360: }
361:
362: async fn handle(update: telegram_bot::Update, core: &Core, mut _reply_to: &Option<UserId>) -> Result<()> {
363: lazy_static! {
364: static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap();
365: static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap();
366: static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap();
367: }
368:
369: match update.kind {
370: UpdateKind::Message(message) => {
371: let mut reply: Vec<String> = vec![];
372: match message.kind {
373: MessageKind::Text { ref data, .. } => {
374: let mut words = data.split_whitespace();
375: let cmd = words.next().unwrap();
376: match cmd {
377:
378: // start
379:
380: "/start" => {
381: reply.push("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.".to_string());
382: },
383:
384: // list
385:
386: "/list" => {
387: reply.append(&mut core.list(message.from.id).await?);
388: },
389:
390: // add
391:
392: "/add" | "/update" => {
393: _reply_to = &Some(message.from.id);
394: let mut source_id: Option<i32> = None;
395: let at_least = "Requires at least 3 parameters.";
396: if cmd == "/update" {
397: let first_word = words.next()
398: .context(at_least)?;
399: source_id = Some(first_word.parse::<i32>()
400: .with_context(|| format!("I need a number, but got {}.", first_word))?);
401: }
402: let (channel, url, iv_hash) = (
403: words.next().context(at_least)?,
404: words.next().context(at_least)?,
405: words.next());
406: if ! RE_USERNAME.is_match(&channel) {
407: reply.push("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?".to_string());
408: bail!("Wrong username {:?}.", &channel);
409: }
410: if ! RE_LINK.is_match(&url) {
411: reply.push("Link should be link to atom/rss feed, something like \"https://domain/path\"\\.".to_string());
412: bail!("Url: {:?}", &url);
413: }
414: if let Some(hash) = iv_hash {
415: if ! RE_IV_HASH.is_match(&hash) {
416: reply.push("IV hash should be 14 hex digits.".to_string());
417: bail!("IV: {:?}", &iv_hash);
418: };
419: };
420: let channel_id = i64::from(core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await?.id());
421: let chan_adm = core.tg.send(telegram_bot::GetChatAdministrators::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await
422: .context("Sorry, I have no access to that chat\\.")?;
423: let (mut me, mut user) = (false, false);
424: for admin in chan_adm {
425: if admin.user.id == core.my.id {
426: me = true;
427: };
428: if admin.user.id == message.from.id {
429: user = true;
430: };
431: };
432: if ! me { bail!("I need to be admin on that channel\\."); };
433: if ! user { bail!("You should be admin on that channel\\."); };
434: reply.push(core.update(source_id, channel, channel_id, url, iv_hash, message.from.id).await?);
435: },
436:
437: // check
438:
439: "/check" => {
440: match &words.next().unwrap().parse::<i32>() {
441: Err(err) => {
442: reply.push(format!("I need a number\\.\n{}", &err));
443: },
444: Ok(number) => {
445: core.check(*number, message.from.id, false).await
446: .context("Channel check failed.")?;
447: },
448: };
449: },
450:
451: // clean
452:
453: "/clean" => {
454: match &words.next().unwrap().parse::<i32>() {
455: Err(err) => {
456: reply.push(format!("I need a number\\.\n{}", &err));
457: },
458: Ok(number) => {
459: let result = core.clean(&number, message.from.id).await?;
460: reply.push(result.to_string());
461: },
462: };
463: },
464:
465: // enable
466:
467: "/enable" => {
468: match &words.next().unwrap().parse::<i32>() {
469: Err(err) => {
470: reply.push(format!("I need a number\\.\n{}", &err));
471: },
472: Ok(number) => {
473: let result = core.enable(&number, message.from.id).await?;
474: reply.push(result.to_string());
475: },
476: };
477: },
478:
479: // delete
480:
481: "/delete" => {
482: match &words.next().unwrap().parse::<i32>() {
483: Err(err) => {
484: reply.push(format!("I need a number\\.\n{}", &err));
485: },
486: Ok(number) => {
487: let result = core.delete(&number, message.from.id).await?;
488: reply.push(result.to_string());
489: },
490: };
491: },
492:
493: // disable
494:
495: "/disable" => {
496: match &words.next().unwrap().parse::<i32>() {
497: Err(err) => {
498: reply.push(format!("I need a number\\.\n{}", &err));
499: },
500: Ok(number) => {
501: let result = core.disable(&number, message.from.id).await?;
502: reply.push(result.to_string());
503: },
504: };
505: },
506:
507: _ => {
508: },
509: };
510: },
511: _ => {
512: },
513: };
514:
515: if reply.len() > 0 {
516: if let Err(err) = core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await {
517: dbg!(reply.join("\n"));
518: println!("{}", err);
519: };
520: };
521: },
522: _ => {},
523: };
524:
525: Ok(())
526: }