Lines of
src/main.rs
from check-in 075be7e40e
that are changed by the sequence of edits moving toward
check-in 6ac5737a72:
1: use config;
2:
3: use tokio;
4: use rss;
5: use chrono::DateTime;
6:
7: use regex::Regex;
8:
9: use tokio::stream::StreamExt;
10: use telegram_bot::*;
11:
12: use sqlx::postgres::PgPoolOptions;
13: use sqlx::Row;
14:
15: type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
16:
17: #[derive(Clone)]
18: struct Core {
19: owner: i64,
20: api_key: String,
21: owner_chat: UserId,
22: tg: telegram_bot::Api,
23: my: User,
24: pool: sqlx::Pool<sqlx::Postgres>,
25: }
26:
27: impl Core {
28: async fn new(settings: config::Config) -> Result<Core> {
29: let owner = settings.get_int("owner")?;
30: let api_key = settings.get_str("api_key")?;
31: let tg = Api::new(&api_key);
32: let core = Core {
33: owner: owner,
34: api_key: api_key.clone(),
35: my: tg.send(telegram_bot::GetMe).await?,
36: tg: tg,
37: owner_chat: UserId::new(owner),
38: pool: PgPoolOptions::new().max_connections(5).connect(&settings.get_str("pg")?).await?,
39: };
40: let clone = core.clone();
41: tokio::spawn(async move {
42: if let Err(err) = clone.autofetch().await {
43: eprintln!("connection error: {}", err);
44: }
45: });
46: Ok(core)
47: }
48:
49: fn stream(&self) -> telegram_bot::UpdatesStream {
50: self.tg.stream()
51: }
52:
53: fn debug(&self, msg: &str) -> Result<()> {
54: self.tg.spawn(SendMessage::new(self.owner_chat, msg));
55: Ok(())
56: }
57:
58: async fn check(&self, id: &i32, real: Option<bool>) -> Result<()> {
59: match sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1")
60: .bind(id)
61: .fetch_one(&self.pool).await {
62: Ok(row) => {
63: let channel_id: i64 = row.try_get("channel_id")?;
64: let destination = match real {
65: Some(true) => UserId::new(channel_id),
66: Some(false) | None => UserId::new(row.try_get("owner")?),
67: };
68: let url: &str = row.try_get("url")?;
69: let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
70: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
71: match rss::Channel::from_url(url) {
72: Ok(feed) => {
73: self.debug(&format!("# title:{:?} ttl:{:?} hours:{:?} days:{:?}", feed.title(), feed.ttl(), feed.skip_hours(), feed.skip_days()))?;
74: for item in feed.items() {
75: let date = match item.pub_date() {
76: Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
77: None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
78: }?;
79: let url = item.link().unwrap().to_string();
80: match sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
81: .bind(&url)
82: .bind(id)
83: .fetch_one(&self.pool).await {
84: Ok(row) => {
85: let exists: bool = row.try_get("exists")?;
86: if ! exists {
075be7e40e 2020-11-18 87: if this_fetch == None || date > this_fetch.unwrap() {
075be7e40e 2020-11-18 88: this_fetch = Some(date);
89: }
90: match self.tg.send( match iv_hash {
91: Some(x) => SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", url, x)),
92: None => SendMessage::new(destination, format!("{}", url)),
93: }.parse_mode(types::ParseMode::Html)).await {
94: Ok(_) => {
95: match sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
96: .bind(id)
97: .bind(date)
98: .bind(url)
99: .execute(&self.pool).await {
100: Ok(_) => {},
101: Err(err) => {
102: self.debug(&err.to_string())?;
103: },
104: };
105: },
106: Err(err) => {
107: self.debug(&err.to_string())?;
108: },
109: };
110: tokio::time::delay_for(std::time::Duration::new(4, 0)).await;
111: }
112: },
113: Err(err) => {
114: self.debug(&err.to_string())?;
115: },
116: };
117: };
118: },
119: Err(err) => {
120: self.debug(&err.to_string())?;
121: },
122: };
123: match sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
124: .bind(id)
125: .execute(&self.pool).await {
126: Ok(_) => {},
127: Err(err) => {
128: self.debug(&err.to_string())?;
129: },
130: };
131: },
132: Err(err) => {
133: self.debug(&err.to_string())?;
134: },
135: };
136: Ok(())
137: }
138:
139: async fn clean(&self, source_id: i32) -> Result<()> {
140: match sqlx::query("delete from rsstg_post where source_id = $1;")
141: .bind(source_id)
142: .execute(&self.pool).await {
143: Ok(_) => {},
144: Err(err) => {
145: self.debug(&err.to_string())?;
146: },
147: };
148: Ok(())
149: }
150:
151: async fn enable(&self, source_id: &i32) -> Result<()> {
152: match sqlx::query("update rsstg_source set enabled = true where source_id = $1")
153: .bind(source_id)
154: .execute(&self.pool).await {
155: Ok(_) => {},
156: Err(err) => {
157: self.debug(&err.to_string())?;
158: },
159: }
160: Ok(())
161: }
162:
163: async fn disable(&self, source_id: &i32) -> Result<()> {
164: match sqlx::query("update rsstg_source set enabled = false where source_id = $1")
165: .bind(source_id)
166: .execute(&self.pool).await {
167: Ok(_) => {},
168: Err(err) => {
169: self.debug(&err.to_string())?;
170: },
171: }
172: Ok(())
173: }
174:
175: async fn autofetch(&self) -> Result<()> {
176: let mut delay = chrono::Duration::minutes(5);
177: let mut next_fetch: DateTime<chrono::Local>;
178: let mut now;
179: loop {
180: self.debug("cycle")?;
075be7e40e 2020-11-18 181: let mut rows = sqlx::query("select source_id, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel;")
182: .fetch(&self.pool);
183: while let Some(row) = rows.try_next().await.unwrap() {
075be7e40e 2020-11-18 184: now = chrono::Local::now();
185: let source_id: i32 = row.try_get("source_id")?;
186: next_fetch = row.try_get("next_fetch")?;
187: if next_fetch < now {
188: match sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;")
189: .bind(source_id)
190: .execute(&self.pool).await {
191: Ok(_) => {},
192: Err(err) => {
193: self.debug(&err.to_string())?;
194: },
195: };
196: let clone = self.clone();
197: tokio::spawn(async move {
198: if let Err(err) = clone.check(&source_id.clone(), Some(true)).await {
199: eprintln!("connection error: {}", err);
200: }
201: });
202: } else {
203: if next_fetch - now < delay {
204: delay = next_fetch - now;
205: }
206: }
207: };
208: tokio::time::delay_for(delay.to_std()?).await;
209: delay = chrono::Duration::minutes(5);
210: }
211: //Ok(())
212: }
213:
214: }
215:
216: #[tokio::main(basic_scheduler)]
217: async fn main() -> Result<()> {
218: let mut settings = config::Config::default();
219: settings.merge(config::File::with_name("rsstg"))?;
220:
221: let re_username = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$")?;
222: let re_link = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$")?;
223: let re_iv_hash = Regex::new(r"^[a-f0-9]{14}$")?;
224:
225: let core = Core::new(settings).await?;
226:
227: let mut stream = core.stream();
228:
229: while let Some(update) = stream.next().await {
230: let update = update?;
231: match update.kind {
232: UpdateKind::Message(message) => {
233: let mut reply: Vec<String> = vec![];
234: match message.kind {
235: MessageKind::Text { ref data, .. } => {
236: let mut words = data.split_whitespace();
237: let cmd = words.next().unwrap();
238: match cmd {
239:
240: // start
241:
242: "/start" => {
243: reply.push("Not in service yet\\. Try later\\.".to_string());
244: },
245:
246: // list
247:
248: "/list" => {
249: reply.push("Channels:".to_string());
250: let mut rows = sqlx::query("select source_id, username, enabled, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1 order by source_id")
251: .bind(i64::from(message.from.id))
252: .fetch(&core.pool);
253: while let Some(row) = rows.try_next().await? {
254: let source_id: i32 = row.try_get("source_id")?;
255: let username: &str = row.try_get("username")?;
256: let enabled: bool = row.try_get("enabled")?;
257: let url: &str = row.try_get("url")?;
258: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
259: reply.push(format!("\n\\#️⃣ {} \\*️⃣ `{}` {}\n🔗 `{}`", source_id, username,
260: match enabled {
261: true => "🔄 enabled",
262: false => "⛔ disabled",
263: }, url));
264: if let Some(hash) = iv_hash {
265: reply.push(format!("IV `{}`", hash));
266: }
267: }
268: },
269:
270: // add
271:
272: "/add" | "/update" => {
273: let mut source_id: i32 = 0;
274: if cmd == "/update" {
275: source_id = words.next().unwrap().parse::<i32>()?;
276: }
277: let (channel, url, iv_hash) = (words.next().unwrap(), words.next().unwrap(), words.next());
278: let ok_link = re_link.is_match(&url);
279: let ok_hash = match iv_hash {
280: Some(hash) => re_iv_hash.is_match(&hash),
281: None => true,
282: };
283: if ! ok_link {
284: reply.push("Link should be link to atom/rss feed, something like \"https://domain/path\"\\.".to_string());
285: core.debug(&format!("Url: {:?}", &url))?;
286: }
287: if ! ok_hash {
288: reply.push("IV hash should be 14 hex digits.".to_string());
289: core.debug(&format!("IV: {:?}", &iv_hash))?;
290: }
291: if ok_link && ok_hash {
292: let chan: Option<i64> = match sqlx::query("select channel_id from rsstg_channel where username = $1")
293: .bind(channel)
294: .fetch_one(&core.pool).await {
295: Ok(chan) => Some(chan.try_get("channel_id")?),
296: Err(sqlx::Error::RowNotFound) => {
297: reply.push("Sorry, I don't know about that channel. Please, add a channel with /addchan.".to_string());
298: None
299: },
300: Err(err) => {
301: reply.push("Sorry, unknown error\\.".to_string());
302: core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?;
303: None
304: },
305: };
306: if let Some(chan) = chan {
307: match if cmd == "/update" {
308: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $4 where source_id = $1").bind(source_id)
309: } else {
310: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner) values ($1, $2, $3, $4)")
311: }
312: .bind(chan)
313: .bind(url)
314: .bind(iv_hash)
315: .bind(i64::from(message.from.id))
316: .execute(&core.pool).await {
317: Ok(_) => reply.push("Channel added\\.".to_string()),
318: Err(sqlx::Error::Database(err)) => {
319: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
320: Some("_bt_check_unique", ) => {
321: reply.push("Duplicate key\\.".to_string());
322: },
323: Some(_) => {
324: reply.push("Database error\\.".to_string());
325: },
326: None => {
327: reply.push("No database error extracted\\.".to_string());
328: },
329: };
330: },
331: Err(err) => {
332: reply.push("Sorry, unknown error\\.".to_string());
333: core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?;
334: },
335: };
336: };
337: };
338: },
339:
340: // addchan
341:
342: "/addchan" => {
343: let channel = words.next().unwrap();
344: if ! re_username.is_match(&channel) {
345: reply.push("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?".to_string());
346: } else {
347: let chan: Option<i64> = match sqlx::query("select channel_id from rsstg_channel where username = $1")
348: .bind(channel)
349: .fetch_one(&core.pool).await {
350: Ok(chan) => Some(chan.try_get("channel_id")?),
351: Err(sqlx::Error::RowNotFound) => None,
352: Err(err) => {
353: reply.push("Sorry, unknown error\\.".to_string());
354: core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?;
355: None
356: },
357: };
358: match chan {
359: Some(chan) => {
360: let new_chan = core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatId::new(chan))).await?;
361: if i64::from(new_chan.id()) == chan {
362: reply.push("I already know that channel\\.".to_string());
363: } else {
364: reply.push("Hmm, channel has changed… I'll fix it later\\.".to_string());
365: };
366: },
367: None => {
368: match core.tg.send(telegram_bot::GetChatAdministrators::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await {
369: Ok(chan_adm) => {
370: let (mut me, mut user) = (false, false);
371: for admin in &chan_adm {
372: if admin.user.id == core.my.id {
373: me = true;
374: };
375: if admin.user.id == message.from.id {
376: user = true;
377: };
378: };
379: if ! me { reply.push("I need to be admin on that channel\\.".to_string()); };
380: if ! user { reply.push("You should be admin on that channel\\.".to_string()); };
381: if me && user {
382: let chan_id = core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await?;
383: sqlx::query("insert into rsstg_channel (channel_id, username) values ($1, $2);")
384: .bind(i64::from(chan_id.id()))
385: .bind(channel)
386: .execute(&core.pool).await?;
387: reply.push("Good, I know that channel now\\.\n".to_string());
388: };
389: },
390: Err(_) => {
391: reply.push("Sorry, I have no access to that chat\\.".to_string());
392: },
393: };
394: },
395: };
396: };
397: },
398:
399: // check
400:
401: "/check" => {
402: &core.check(&words.next().unwrap().parse::<i32>()?, None).await?;
403: },
404:
405: // clear
406:
407: "/clean" => {
408: if core.owner != i64::from(message.from.id) {
409: reply.push("Reserved for testing\\.".to_string());
410: } else {
411: let source_id = words.next().unwrap().parse::<i32>().unwrap_or(0);
412: &core.clean(source_id).await?;
413: }
414: },
415:
416: // enable
417:
418: "/enable" => {
419: match core.enable(&words.next().unwrap().parse::<i32>()?).await {
420: Ok(_) => {
421: reply.push("Channel enabled\\.".to_string());
422: }
423: Err(err) => {
424: core.debug(&err.to_string())?;
425: },
426: };
427: },
428:
429: // disable
430:
431: "/disable" => {
432: match core.disable(&words.next().unwrap().parse::<i32>()?).await {
433: Ok(_) => {
434: reply.push("Channel disabled\\.".to_string());
435: }
436: Err(err) => {
437: core.debug(&err.to_string())?;
438: },
439: };
440: },
441:
442: _ => {
443: },
444: };
445: },
446: _ => {
447: },
448: };
449: if reply.len() > 0 {
450: match core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await {
451: Ok(_) => {},
452: Err(err) => {
453: dbg!(reply.join("\n"));
454: println!("{}", err);
455: },
456: }
457: }
458: },
459: _ => {},
460: };
461: }
462:
463: Ok(())
464: }