Lines of
src/main.rs
from check-in 0191d490fe
that are changed by the sequence of edits moving toward
check-in 21d16a0993:
1: use config;
2:
3: use tokio;
4: use rss;
5: use chrono::DateTime;
6:
7: use regex::Regex;
8:
9: use tokio::stream::StreamExt;
10: use telegram_bot::*;
11:
12: use sqlx::postgres::PgPoolOptions;
13: use sqlx::Row;
14:
15: type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
16:
17: #[derive(Clone)]
18: struct Core {
19: owner: i64,
20: api_key: String,
21: owner_chat: UserId,
22: tg: telegram_bot::Api,
23: my: User,
24: pool: sqlx::Pool<sqlx::Postgres>,
25: }
26:
27: impl Core {
28: async fn new(settings: config::Config) -> Result<Core> {
29: let owner = settings.get_int("owner")?;
30: let api_key = settings.get_str("api_key")?;
31: let tg = Api::new(&api_key);
32: let core = Core {
33: owner: owner,
34: api_key: api_key.clone(),
35: my: tg.send(telegram_bot::GetMe).await?,
36: tg: tg,
37: owner_chat: UserId::new(owner),
38: pool: PgPoolOptions::new().max_connections(5).connect(&settings.get_str("pg")?).await?,
39: };
40: let clone = core.clone();
41: tokio::spawn(async move {
42: if let Err(err) = clone.autofetch().await {
43: eprintln!("connection error: {}", err);
44: }
45: });
46: Ok(core)
47: }
48:
49: fn stream(&self) -> telegram_bot::UpdatesStream {
50: self.tg.stream()
51: }
52:
53: fn debug(&self, msg: &str) -> Result<()> {
54: self.tg.spawn(SendMessage::new(self.owner_chat, msg));
55: Ok(())
56: }
57:
58: async fn check(&self, channel: &str, real: Option<bool>) -> Result<()> {
59: match sqlx::query("select source_id, channel_id, url, last_fetch, iv_hash, owner from rsstg_source natural left join rsstg_channel where username = $1")
60: .bind(channel)
61: .fetch_one(&self.pool).await {
62: Ok(row) => {
63: let id: i32 = row.try_get("source_id")?;
64: let channel_id: i64 = row.try_get("channel_id")?;
65: let destination = match real {
66: Some(true) => UserId::new(channel_id),
67: Some(false) | None => UserId::new(row.try_get("owner")?),
68: };
69: let url: &str = row.try_get("url")?;
70: let last_fetch: Option<DateTime<chrono::FixedOffset>> = row.try_get("last_fetch")?;
71: let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
72: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
73: match rss::Channel::from_url(url) {
74: Ok(feed) => {
75: self.debug(&format!("# title:{:?} ttl:{:?} hours:{:?} days:{:?}", feed.title(), feed.ttl(), feed.skip_hours(), feed.skip_days()))?;
76: for item in feed.items() {
77: let date = match item.pub_date() {
78: Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
79: None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
80: }?;
81: let url = item.link().unwrap().to_string();
82: if last_fetch == None || date > last_fetch.unwrap() {
83: match sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
84: .bind(&url)
85: .bind(id)
86: .fetch_one(&self.pool).await {
87: Ok(row) => {
88: let exists: bool = row.try_get("exists")?;
89: if ! exists {
90: if this_fetch == None || date > this_fetch.unwrap() {
91: this_fetch = Some(date);
92: }
93: match self.tg.send( match iv_hash {
94: Some(x) => SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", url, x)),
95: None => SendMessage::new(destination, format!("{}", url)),
96: }.parse_mode(types::ParseMode::Html)).await {
97: Ok(_) => {
98: match sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
99: .bind(id)
100: .bind(date)
101: .bind(url)
102: .execute(&self.pool).await {
103: Ok(_) => {},
104: Err(err) => {
105: self.debug(&err.to_string())?;
106: },
107: };
108: },
109: Err(err) => {
110: self.debug(&err.to_string())?;
111: },
112: };
113: tokio::time::delay_for(std::time::Duration::new(4, 0)).await;
114: }
115: },
116: Err(err) => {
117: self.debug(&err.to_string())?;
118: },
119: };
120: };
121: };
122: // update last_fetch
123: if this_fetch != None && (last_fetch == None || this_fetch.unwrap() > last_fetch.unwrap()) {
124: match sqlx::query("update rsstg_source set last_fetch = case when (last_fetch < $1) then $1 else last_fetch end where source_id = $2;")
125: .bind(this_fetch.unwrap())
126: .bind(id)
127: .execute(&self.pool).await {
128: Ok(_) => {},
129: Err(err) => {
130: self.debug(&err.to_string())?;
131: },
132: };
133: }
134: },
135: Err(err) => {
136: self.debug(&err.to_string())?;
137: },
138: };
139: match sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
140: .bind(id)
141: .execute(&self.pool).await {
142: Ok(_) => {},
143: Err(err) => {
144: self.debug(&err.to_string())?;
145: },
146: };
147: },
148: Err(err) => {
149: self.debug(&err.to_string())?;
150: },
151: };
152: Ok(())
153: }
154:
155: async fn clean(&self, source_id: i32) -> Result<()> {
156: for query in vec!["delete from rsstg_post where source_id = $1;", "update rsstg_source set last_fetch = NULL where source_id = $1;"] {
157: match sqlx::query(query)
158: .bind(source_id)
159: .execute(&self.pool).await {
160: Ok(_) => {},
161: Err(err) => {
162: self.debug(&err.to_string())?;
163: },
164: }
165: }
166: Ok(())
167: }
168:
169: async fn enable(&self, user: UserId, channel: &str) -> Result<()> {
170: match sqlx::query("update rsstg_source set enabled = true from rsstg_channel where rsstg_channel.channel_id = rsstg_source.channel_id and rsstg_channel.username = $1 and owner = $2")
171: .bind(channel)
172: .bind(i64::from(user))
173: .execute(&self.pool).await {
174: Ok(_) => {},
175: Err(err) => {
176: self.debug(&err.to_string())?;
177: },
178: }
179: Ok(())
180: }
181:
182: async fn disable(&self, user: UserId, channel: &str) -> Result<()> {
183: match sqlx::query("update rsstg_source set enabled = false from rsstg_channel where rsstg_channel.channel_id = rsstg_source.channel_id and rsstg_channel.username = $1 and owner = $2")
184: .bind(channel)
185: .bind(i64::from(user))
186: .execute(&self.pool).await {
187: Ok(_) => {},
188: Err(err) => {
189: self.debug(&err.to_string())?;
190: },
191: }
192: Ok(())
193: }
194:
195: async fn autofetch(&self) -> Result<()> {
196: let mut delay = chrono::Duration::minutes(5);
197: let mut next_fetch: DateTime<chrono::Local>;
198: let mut now;
199: loop {
200: let mut rows = sqlx::query("select source_id, username, next_fetch from rsstg_order natural left join rsstg_source natural left join rsstg_channel;")
201: .fetch(&self.pool);
202: while let Some(row) = rows.try_next().await.unwrap() {
203: now = chrono::Local::now();
204: let source_id: i32 = row.try_get("source_id")?;
205: next_fetch = row.try_get("next_fetch")?;
206: if next_fetch < now {
207: match sqlx::query("update rsstg_source set last_scrape = now() + interval '1 hour' where source_id = $1;")
208: .bind(source_id)
209: .execute(&self.pool).await {
210: Ok(_) => {},
211: Err(err) => {
212: self.debug(&err.to_string())?;
213: },
214: };
215: let clone = self.clone();
216: let username: String = row.try_get("username")?;
217: let username = username.clone();
218: tokio::spawn(async move {
219: if let Err(err) = clone.check(&username, Some(true)).await {
220: eprintln!("connection error: {}", err);
221: }
222: });
223: //&self.check(row.try_get("username")?, Some(true)).await?;
224: } else {
225: if next_fetch - now < delay {
226: delay = next_fetch - now;
227: }
228: }
229: };
230: tokio::time::delay_for(delay.to_std()?).await;
231: }
232: //Ok(())
233: }
234:
235: }
236:
237: #[tokio::main(basic_scheduler)]
238: async fn main() -> Result<()> {
239: let mut settings = config::Config::default();
240: settings.merge(config::File::with_name("rsstg"))?;
241:
242: let re_username = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$")?;
0191d490fe 2020-11-18 243: let re_link = Regex::new(r"^https?://[a-zA-Z.0-9]+/[-_a-zA-Z.0-9/?=]+$")?;
244: let re_iv_hash = Regex::new(r"^[a-f0-9]{14}$")?;
245:
246: let core = Core::new(settings).await?;
247:
248: let mut stream = core.stream();
249:
250: while let Some(update) = stream.next().await {
251: let update = update?;
252: match update.kind {
253: UpdateKind::Message(message) => {
254: let mut reply: Vec<String> = vec![];
255: match message.kind {
256: MessageKind::Text { ref data, .. } => {
257: let mut words = data.split_whitespace();
258: let cmd = words.next().unwrap();
259: match cmd {
260:
261: // start
262:
263: "/start" => {
264: reply.push("Not in service yet. Try later.".to_string());
265: },
266:
267: // list
268:
269: "/list" => {
270: reply.push("Channels:".to_string());
271: let mut rows = sqlx::query("select username, enabled, url, iv_hash from rsstg_source left join rsstg_channel using (channel_id) where owner = $1")
272: .bind(i64::from(message.from.id))
273: .fetch(&core.pool);
274: while let Some(row) = rows.try_next().await? {
275: let username: &str = row.try_get("username")?;
276: let enabled: bool = row.try_get("enabled")?;
277: let url: &str = row.try_get("url")?;
278: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
279: reply.push(format!("\n\\*️⃣ `{}` {}\n🔗 `{}`", username,
280: match enabled {
281: true => "🔄 enabled",
282: false => "⛔ disabled",
283: }, url));
284: if let Some(hash) = iv_hash {
285: reply.push(format!("IV `{}`", hash));
286: }
287: }
288: },
289:
290: // add
291:
292: "/add" => {
293: let (channel, url, iv_hash) = (words.next().unwrap(), words.next().unwrap(), words.next());
294: let ok_link = re_link.is_match(&url);
295: let ok_hash = match iv_hash {
296: Some(hash) => re_iv_hash.is_match(&hash),
297: None => true,
298: };
299: if ! ok_link {
300: reply.push("Link should be link to atom/rss feed, something like \"https://domain/path\"\\.".to_string());
301: core.debug(&format!("Url: {:?}", &url))?;
302: }
303: if ! ok_hash {
304: reply.push("IV hash should be 14 hex digits.".to_string());
305: core.debug(&format!("IV: {:?}", &iv_hash))?;
306: }
307: if ok_link && ok_hash {
308: let chan: Option<i64> = match sqlx::query("select channel_id from rsstg_channel where username = $1")
309: .bind(channel)
310: .fetch_one(&core.pool).await {
311: Ok(chan) => Some(chan.try_get("channel_id")?),
312: Err(sqlx::Error::RowNotFound) => {
313: reply.push("Sorry, I don't know about that channel. Please, add a channel with /addchan.".to_string());
314: None
315: },
316: Err(err) => {
317: reply.push("Sorry, unknown error\\.".to_string());
318: core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?;
319: None
320: },
321: };
322: match chan {
323: Some(chan) => {
324: match sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner) values ($1, $2, $3, $4) on conflict (channel_id, owner) do update set url = excluded.url, iv_hash = excluded.iv_hash;")
325: .bind(chan)
326: .bind(url)
327: .bind(iv_hash)
328: .bind(i64::from(message.from.id))
329: .execute(&core.pool).await {
330: Ok(_) => reply.push("Channel added\\.".to_string()),
331: Err(sqlx::Error::Database(err)) => {
332: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
333: Some("_bt_check_unique", ) => {
334: reply.push("Duplicate key\\.".to_string());
335: },
336: Some(_) => {
337: reply.push("Database error\\.".to_string());
338: },
339: None => {
340: reply.push("No database error extracted\\.".to_string());
341: },
342: };
343: },
344: Err(err) => {
345: reply.push("Sorry, unknown error\\.".to_string());
346: core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?;
347: },
348: };
349: },
350: None => {},
351: };
352: };
353: },
354:
355: // addchan
356:
357: "/addchan" => {
358: let channel = words.next().unwrap();
359: if ! re_username.is_match(&channel) {
360: reply.push("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?".to_string());
361: } else {
362: let chan: Option<i64> = match sqlx::query("select channel_id from rsstg_channel where username = $1")
363: .bind(channel)
364: .fetch_one(&core.pool).await {
365: Ok(chan) => Some(chan.try_get("channel_id")?),
366: Err(sqlx::Error::RowNotFound) => None,
367: Err(err) => {
368: reply.push("Sorry, unknown error\\.".to_string());
369: core.debug(&format!("Sorry, unknown error: {:#?}\n", err))?;
370: None
371: },
372: };
373: match chan {
374: Some(chan) => {
375: let new_chan = core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatId::new(chan))).await?;
376: if i64::from(new_chan.id()) == chan {
377: reply.push("I already know that channel\\.".to_string());
378: } else {
379: reply.push("Hmm, channel has changed… I'll fix it later\\.".to_string());
380: };
381: },
382: None => {
383: match core.tg.send(telegram_bot::GetChatAdministrators::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await {
384: Ok(chan_adm) => {
385: let (mut me, mut user) = (false, false);
386: for admin in &chan_adm {
387: if admin.user.id == core.my.id {
388: me = true;
389: };
390: if admin.user.id == message.from.id {
391: user = true;
392: };
393: };
394: if ! me { reply.push("I need to be admin on that channel\\.".to_string()); };
395: if ! user { reply.push("You should be admin on that channel\\.".to_string()); };
396: if me && user {
397: let chan_id = core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await?;
398: sqlx::query("insert into rsstg_channel (channel_id, username) values ($1, $2);")
399: .bind(i64::from(chan_id.id()))
400: .bind(channel)
401: .execute(&core.pool).await?;
402: reply.push("Good, I know that channel now\\.\n".to_string());
403: };
404: },
405: Err(_) => {
406: reply.push("Sorry, I have no access to that chat\\.".to_string());
407: },
408: };
409: },
410: };
411: };
412: },
413:
414: // check
415:
416: "/check" => {
417: let channel = words.next().unwrap();
418: if ! re_username.is_match(&channel) {
419: reply.push("Usernames should be something like \"@\\[a-z]\\[a-z0-9_]+\", aren't they?".to_string());
420: } else {
421: &core.check(channel, None).await?;
422: }
423: },
424:
425: // clear
426:
427: "/clean" => {
428: if core.owner != i64::from(message.from.id) {
429: reply.push("Reserved for testing\\.".to_string());
430: } else {
431: let source_id = words.next().unwrap().parse::<i32>().unwrap_or(0);
432: &core.clean(source_id).await?;
433: }
434: },
435:
436: // enable
437:
438: "/enable" => {
439: let channel = words.next().unwrap();
440: if ! re_username.is_match(&channel) {
441: reply.push("Usernames should be something like \"@\\[a-z]\\[a-z0-9_]+\", aren't they?".to_string());
442: } else {
443: match core.enable(message.from.id, channel).await {
444: Ok(_) => {
445: reply.push("Channel enabled\\.".to_string());
446: }
447: Err(err) => {
448: core.debug(&err.to_string())?;
449: },
450: }
451: }
452: },
453:
454: // disable
455:
456: "/disable" => {
457: let channel = words.next().unwrap();
458: if ! re_username.is_match(&channel) {
459: reply.push("Usernames should be something like \"@\\[a-z]\\[a-z0-9_]+\", aren't they?".to_string());
460: } else {
461: match core.disable(message.from.id, channel).await {
462: Ok(_) => {
463: reply.push("Channel disabled\\.".to_string());
464: }
465: Err(err) => {
466: core.debug(&err.to_string())?;
467: },
468: }
469: }
470: },
471:
472: _ => {
473: },
474: };
475: },
476: _ => {
477: },
478: };
479: if reply.len() > 0 {
480: match core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await {
481: Ok(_) => {},
482: Err(err) => {
483: dbg!(reply.join("\n"));
484: println!("{}", err);
485: },
486: }
487: }
488: },
489: _ => {},
490: };
491: }
492:
493: Ok(())
494: }