Annotation For src/main.rs
Logged in as anonymous

Lines of src/main.rs from check-in cd827b4c75 that are changed by the sequence of edits moving toward check-in 9171c791eb:

cd827b4c75 2021-11-10    1: use std::collections::{BTreeMap, HashSet};
cd827b4c75 2021-11-10    2: use std::sync::{Arc, Mutex};
                         3: 
cd827b4c75 2021-11-10    4: use chrono::DateTime;
                         5: use config;
                         6: use futures::StreamExt;
cd827b4c75 2021-11-10    7: use regex::Regex;
cd827b4c75 2021-11-10    8: use reqwest;
cd827b4c75 2021-11-10    9: use sqlx::postgres::PgPoolOptions;
cd827b4c75 2021-11-10   10: use sqlx::Row;
                        11: use tokio;
                        12: 
cd827b4c75 2021-11-10   13: use rss;
cd827b4c75 2021-11-10   14: use atom_syndication;
cd827b4c75 2021-11-10   15: 
                        16: use telegram_bot::*;
cd827b4c75 2021-11-10   17: //use tokio::stream::StreamExt;
                        18: 
                        19: #[macro_use]
                        20: extern crate lazy_static;
                        21: 
cd827b4c75 2021-11-10   22: use anyhow::{anyhow, bail, Context, Result};
cd827b4c75 2021-11-10   23: 
cd827b4c75 2021-11-10   24: #[derive(Clone)]
cd827b4c75 2021-11-10   25: struct Core {
cd827b4c75 2021-11-10   26: 	owner: i64,
cd827b4c75 2021-11-10   27: 	api_key: String,
cd827b4c75 2021-11-10   28: 	owner_chat: UserId,
cd827b4c75 2021-11-10   29: 	tg: telegram_bot::Api,
cd827b4c75 2021-11-10   30: 	my: User,
cd827b4c75 2021-11-10   31: 	pool: sqlx::Pool<sqlx::Postgres>,
cd827b4c75 2021-11-10   32: 	sources: Arc<Mutex<HashSet<Arc<i32>>>>,
cd827b4c75 2021-11-10   33: }
cd827b4c75 2021-11-10   34: 
cd827b4c75 2021-11-10   35: impl Core {
cd827b4c75 2021-11-10   36: 	async fn new(settings: config::Config) -> Result<Core> {
cd827b4c75 2021-11-10   37: 		let owner = settings.get_int("owner")?;
cd827b4c75 2021-11-10   38: 		let api_key = settings.get_str("api_key")?;
cd827b4c75 2021-11-10   39: 		let tg = Api::new(&api_key);
cd827b4c75 2021-11-10   40: 		let core = Core {
cd827b4c75 2021-11-10   41: 			owner: owner,
cd827b4c75 2021-11-10   42: 			api_key: api_key.clone(),
cd827b4c75 2021-11-10   43: 			my: tg.send(telegram_bot::GetMe).await?,
cd827b4c75 2021-11-10   44: 			tg: tg,
cd827b4c75 2021-11-10   45: 			owner_chat: UserId::new(owner),
cd827b4c75 2021-11-10   46: 			pool: PgPoolOptions::new()
cd827b4c75 2021-11-10   47: 				.max_connections(5)
cd827b4c75 2021-11-10   48: 				.connect_timeout(std::time::Duration::new(300, 0))
cd827b4c75 2021-11-10   49: 				.idle_timeout(std::time::Duration::new(60, 0))
cd827b4c75 2021-11-10   50: 				.connect_lazy(&settings.get_str("pg")?)?,
cd827b4c75 2021-11-10   51: 			sources: Arc::new(Mutex::new(HashSet::new())),
cd827b4c75 2021-11-10   52: 		};
cd827b4c75 2021-11-10   53: 		let clone = core.clone();
cd827b4c75 2021-11-10   54: 		tokio::spawn(async move {
cd827b4c75 2021-11-10   55: 			if let Err(err) = &clone.autofetch().await {
cd827b4c75 2021-11-10   56: 				if let Err(err) = clone.debug(&format!("šŸ›‘ {:?}", err), None) {
cd827b4c75 2021-11-10   57: 					eprintln!("Autofetch error: {}", err);
cd827b4c75 2021-11-10   58: 				};
cd827b4c75 2021-11-10   59: 			}
cd827b4c75 2021-11-10   60: 		});
cd827b4c75 2021-11-10   61: 		Ok(core)
cd827b4c75 2021-11-10   62: 	}
cd827b4c75 2021-11-10   63: 
cd827b4c75 2021-11-10   64: 	fn stream(&self) -> telegram_bot::UpdatesStream {
cd827b4c75 2021-11-10   65: 		self.tg.stream()
cd827b4c75 2021-11-10   66: 	}
cd827b4c75 2021-11-10   67: 
cd827b4c75 2021-11-10   68: 	fn debug(&self, msg: &str, target: Option<UserId>) -> Result<()> {
cd827b4c75 2021-11-10   69: 		self.tg.spawn(SendMessage::new(match target {
cd827b4c75 2021-11-10   70: 			Some(user) => user,
cd827b4c75 2021-11-10   71: 			None => self.owner_chat,
cd827b4c75 2021-11-10   72: 		}, msg));
cd827b4c75 2021-11-10   73: 		Ok(())
cd827b4c75 2021-11-10   74: 	}
cd827b4c75 2021-11-10   75: 
cd827b4c75 2021-11-10   76: 	async fn check<S>(&self, id: i32, owner: S, real: bool) -> Result<()>
cd827b4c75 2021-11-10   77: 	where S: Into<i64> {
cd827b4c75 2021-11-10   78: 		let owner: i64 = owner.into();
cd827b4c75 2021-11-10   79: 		let id = {
cd827b4c75 2021-11-10   80: 			let mut set = self.sources.lock().unwrap();
cd827b4c75 2021-11-10   81: 			match set.get(&id) {
cd827b4c75 2021-11-10   82: 				Some(id) => id.clone(),
cd827b4c75 2021-11-10   83: 				None => {
cd827b4c75 2021-11-10   84: 					let id = Arc::new(id);
cd827b4c75 2021-11-10   85: 					set.insert(id.clone());
cd827b4c75 2021-11-10   86: 					id.clone()
cd827b4c75 2021-11-10   87: 				},
cd827b4c75 2021-11-10   88: 			}
cd827b4c75 2021-11-10   89: 		};
cd827b4c75 2021-11-10   90: 		let count = Arc::strong_count(&id);
cd827b4c75 2021-11-10   91: 		if count == 2 {
cd827b4c75 2021-11-10   92: 			let mut conn = self.pool.acquire().await
cd827b4c75 2021-11-10   93: 				.with_context(|| format!("Query queue fetch conn:\n{:?}", &self.pool))?;
cd827b4c75 2021-11-10   94: 			let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner from rsstg_source where source_id = $1 and owner = $2")
cd827b4c75 2021-11-10   95: 				.bind(*id)
cd827b4c75 2021-11-10   96: 				.bind(owner)
cd827b4c75 2021-11-10   97: 				.fetch_one(&mut conn).await
cd827b4c75 2021-11-10   98: 				.with_context(|| format!("Query source:\n{:?}", &self.pool))?;
cd827b4c75 2021-11-10   99: 			drop(conn);
cd827b4c75 2021-11-10  100: 			let channel_id: i64 = row.try_get("channel_id")?;
cd827b4c75 2021-11-10  101: 			let destination = match real {
cd827b4c75 2021-11-10  102: 				true => UserId::new(channel_id),
cd827b4c75 2021-11-10  103: 				false => UserId::new(row.try_get("owner")?),
cd827b4c75 2021-11-10  104: 			};
cd827b4c75 2021-11-10  105: 			let url: &str = row.try_get("url")?;
cd827b4c75 2021-11-10  106: 			let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
cd827b4c75 2021-11-10  107: 			let iv_hash: Option<&str> = row.try_get("iv_hash")?;
cd827b4c75 2021-11-10  108: 			let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
cd827b4c75 2021-11-10  109: 			let content = reqwest::get(url).await?.bytes().await?;
cd827b4c75 2021-11-10  110: 			//let mut content_ = surf::get(url).await.map_err(|err| anyhow!(err))?;
cd827b4c75 2021-11-10  111: 			//eprintln!("Data: {:#?}", &content_);
cd827b4c75 2021-11-10  112: 			//let content = content_.body_bytes().await.map_err(|err| anyhow!(err))?;
cd827b4c75 2021-11-10  113: 			/*
cd827b4c75 2021-11-10  114: 			let feed = rss::Channel::read_from(&content[..])
cd827b4c75 2021-11-10  115: 				.with_context(|| format!("Problem opening feed url:\n{}", &url))?;
cd827b4c75 2021-11-10  116: 			for item in feed.items() {
cd827b4c75 2021-11-10  117: 				let date = match item.pub_date() {
cd827b4c75 2021-11-10  118: 					Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
cd827b4c75 2021-11-10  119: 					None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
cd827b4c75 2021-11-10  120: 				}?;
cd827b4c75 2021-11-10  121: 				let url = item.link().unwrap().to_string();
cd827b4c75 2021-11-10  122: 				posts.insert(date.clone(), url.clone());
cd827b4c75 2021-11-10  123: 			};
cd827b4c75 2021-11-10  124: 			*/
cd827b4c75 2021-11-10  125: 			match rss::Channel::read_from(&content[..]) {
cd827b4c75 2021-11-10  126: 				Ok(feed) => {
cd827b4c75 2021-11-10  127: 					for item in feed.items() {
cd827b4c75 2021-11-10  128: 						match item.link() {
cd827b4c75 2021-11-10  129: 							Some(link) => {
cd827b4c75 2021-11-10  130: 								let date = match item.pub_date() {
cd827b4c75 2021-11-10  131: 									Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
cd827b4c75 2021-11-10  132: 									None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
cd827b4c75 2021-11-10  133: 								}?;
cd827b4c75 2021-11-10  134: 								let url = link.to_string();
cd827b4c75 2021-11-10  135: 								posts.insert(date.clone(), url.clone());
cd827b4c75 2021-11-10  136: 							},
cd827b4c75 2021-11-10  137: 							None => {}
cd827b4c75 2021-11-10  138: 						}
cd827b4c75 2021-11-10  139: 					};
cd827b4c75 2021-11-10  140: 				},
cd827b4c75 2021-11-10  141: 				Err(err) => match err {
cd827b4c75 2021-11-10  142: 					rss::Error::InvalidStartTag => {
cd827b4c75 2021-11-10  143: 						let feed = atom_syndication::Feed::read_from(&content[..])
cd827b4c75 2021-11-10  144: 							.with_context(|| format!("Problem opening feed url:\n{}", &url))?;
cd827b4c75 2021-11-10  145: 						for item in feed.entries() {
cd827b4c75 2021-11-10  146: 							let date = item.published().unwrap();
cd827b4c75 2021-11-10  147: 							let url = item.links()[0].href();
cd827b4c75 2021-11-10  148: 							posts.insert(date.clone(), url.to_string());
cd827b4c75 2021-11-10  149: 						};
cd827b4c75 2021-11-10  150: 					},
cd827b4c75 2021-11-10  151: 					rss::Error::Eof => (),
cd827b4c75 2021-11-10  152: 					_ => bail!("Unsupported or mangled content:\n{:#?}\n", err)
cd827b4c75 2021-11-10  153: 				}
cd827b4c75 2021-11-10  154: 			};
cd827b4c75 2021-11-10  155: 			for (date, url) in posts.iter() {
cd827b4c75 2021-11-10  156: 				let mut conn = self.pool.acquire().await
cd827b4c75 2021-11-10  157: 					.with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?;
cd827b4c75 2021-11-10  158: 				let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
cd827b4c75 2021-11-10  159: 					.bind(&url)
cd827b4c75 2021-11-10  160: 					.bind(*id)
cd827b4c75 2021-11-10  161: 					.fetch_one(&mut conn).await
cd827b4c75 2021-11-10  162: 					.with_context(|| format!("Check post:\n{:?}", &conn))?;
cd827b4c75 2021-11-10  163: 				let exists: bool = row.try_get("exists")?;
cd827b4c75 2021-11-10  164: 				if ! exists {
cd827b4c75 2021-11-10  165: 					if this_fetch == None || *date > this_fetch.unwrap() {
cd827b4c75 2021-11-10  166: 						this_fetch = Some(*date);
cd827b4c75 2021-11-10  167: 					};
cd827b4c75 2021-11-10  168: 					self.tg.send( match iv_hash {
cd827b4c75 2021-11-10  169: 							Some(x) => SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", url, x)),
cd827b4c75 2021-11-10  170: 							None => SendMessage::new(destination, format!("{}", url)),
cd827b4c75 2021-11-10  171: 						}.parse_mode(types::ParseMode::Html)).await
cd827b4c75 2021-11-10  172: 						.context("Can't post message:")?;
cd827b4c75 2021-11-10  173: 					sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
cd827b4c75 2021-11-10  174: 						.bind(*id)
cd827b4c75 2021-11-10  175: 						.bind(date)
cd827b4c75 2021-11-10  176: 						.bind(url)
cd827b4c75 2021-11-10  177: 						.execute(&mut conn).await
cd827b4c75 2021-11-10  178: 						.with_context(|| format!("Record post:\n{:?}", &conn))?;
cd827b4c75 2021-11-10  179: 					drop(conn);
cd827b4c75 2021-11-10  180: 					tokio::time::sleep(std::time::Duration::new(4, 0)).await;
cd827b4c75 2021-11-10  181: 				};
cd827b4c75 2021-11-10  182: 			};
cd827b4c75 2021-11-10  183: 			posts.clear();
cd827b4c75 2021-11-10  184: 		};
cd827b4c75 2021-11-10  185: 		let mut conn = self.pool.acquire().await
cd827b4c75 2021-11-10  186: 			.with_context(|| format!("Update scrape fetch conn:\n{:?}", &self.pool))?;
cd827b4c75 2021-11-10  187: 		sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
cd827b4c75 2021-11-10  188: 			.bind(*id)
cd827b4c75 2021-11-10  189: 			.execute(&mut conn).await
cd827b4c75 2021-11-10  190: 			.with_context(|| format!("Update scrape:\n{:?}", &conn))?;
cd827b4c75 2021-11-10  191: 		Ok(())
cd827b4c75 2021-11-10  192: 	}
cd827b4c75 2021-11-10  193: 
cd827b4c75 2021-11-10  194: 	async fn delete<S>(&self, source_id: &i32, owner: S) -> Result<String>
cd827b4c75 2021-11-10  195: 	where S: Into<i64> {
cd827b4c75 2021-11-10  196: 		let owner: i64 = owner.into();
cd827b4c75 2021-11-10  197: 		let mut conn = self.pool.acquire().await
cd827b4c75 2021-11-10  198: 			.with_context(|| format!("Delete fetch conn:\n{:?}", &self.pool))?;
cd827b4c75 2021-11-10  199: 		match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
cd827b4c75 2021-11-10  200: 			.bind(source_id)
cd827b4c75 2021-11-10  201: 			.bind(owner)
cd827b4c75 2021-11-10  202: 			.execute(&mut conn).await
cd827b4c75 2021-11-10  203: 			.with_context(|| format!("Delete source rule:\n{:?}", &self.pool))?
cd827b4c75 2021-11-10  204: 			.rows_affected() {
cd827b4c75 2021-11-10  205: 			0 => { Ok("No data found found\\.".to_string()) },
cd827b4c75 2021-11-10  206: 			x => { Ok(format!("{} sources removed\\.", x)) },
cd827b4c75 2021-11-10  207: 		}
cd827b4c75 2021-11-10  208: 	}
cd827b4c75 2021-11-10  209: 
cd827b4c75 2021-11-10  210: 	async fn clean<S>(&self, source_id: &i32, owner: S) -> Result<String>
cd827b4c75 2021-11-10  211: 	where S: Into<i64> {
cd827b4c75 2021-11-10  212: 		let owner: i64 = owner.into();
cd827b4c75 2021-11-10  213: 		let mut conn = self.pool.acquire().await
cd827b4c75 2021-11-10  214: 			.with_context(|| format!("Clean fetch conn:\n{:?}", &self.pool))?;
cd827b4c75 2021-11-10  215: 		match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
cd827b4c75 2021-11-10  216: 			.bind(source_id)
cd827b4c75 2021-11-10  217: 			.bind(owner)
cd827b4c75 2021-11-10  218: 			.execute(&mut conn).await
cd827b4c75 2021-11-10  219: 			.with_context(|| format!("Clean seen posts:\n{:?}", &self.pool))?
cd827b4c75 2021-11-10  220: 			.rows_affected() {
cd827b4c75 2021-11-10  221: 			0 => { Ok("No data found found\\.".to_string()) },
cd827b4c75 2021-11-10  222: 			x => { Ok(format!("{} posts purged\\.", x)) },
cd827b4c75 2021-11-10  223: 		}
cd827b4c75 2021-11-10  224: 	}
cd827b4c75 2021-11-10  225: 
cd827b4c75 2021-11-10  226: 	async fn enable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
cd827b4c75 2021-11-10  227: 	where S: Into<i64> {
cd827b4c75 2021-11-10  228: 		let owner: i64 = owner.into();
cd827b4c75 2021-11-10  229: 		let mut conn = self.pool.acquire().await
cd827b4c75 2021-11-10  230: 			.with_context(|| format!("Enable fetch conn:\n{:?}", &self.pool))?;
cd827b4c75 2021-11-10  231: 		match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
cd827b4c75 2021-11-10  232: 			.bind(source_id)
cd827b4c75 2021-11-10  233: 			.bind(owner)
cd827b4c75 2021-11-10  234: 			.execute(&mut conn).await
cd827b4c75 2021-11-10  235: 			.with_context(|| format!("Enable source:\n{:?}", &self.pool))?
cd827b4c75 2021-11-10  236: 			.rows_affected() {
cd827b4c75 2021-11-10  237: 			1 => { Ok("Source enabled\\.") },
cd827b4c75 2021-11-10  238: 			0 => { Ok("Source not found\\.") },
cd827b4c75 2021-11-10  239: 			_ => { Err(anyhow!("Database error.")) },
cd827b4c75 2021-11-10  240: 		}
cd827b4c75 2021-11-10  241: 	}
cd827b4c75 2021-11-10  242: 
cd827b4c75 2021-11-10  243: 	async fn disable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
cd827b4c75 2021-11-10  244: 	where S: Into<i64> {
cd827b4c75 2021-11-10  245: 		let owner: i64 = owner.into();
cd827b4c75 2021-11-10  246: 		let mut conn = self.pool.acquire().await
cd827b4c75 2021-11-10  247: 			.with_context(|| format!("Disable fetch conn:\n{:?}", &self.pool))?;
cd827b4c75 2021-11-10  248: 		match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
cd827b4c75 2021-11-10  249: 			.bind(source_id)
cd827b4c75 2021-11-10  250: 			.bind(owner)
cd827b4c75 2021-11-10  251: 			.execute(&mut conn).await
cd827b4c75 2021-11-10  252: 			.with_context(|| format!("Disable source:\n{:?}", &self.pool))?
cd827b4c75 2021-11-10  253: 			.rows_affected() {
cd827b4c75 2021-11-10  254: 			1 => { Ok("Source disabled\\.") },
cd827b4c75 2021-11-10  255: 			0 => { Ok("Source not found\\.") },
cd827b4c75 2021-11-10  256: 			_ => { Err(anyhow!("Database error.")) },
cd827b4c75 2021-11-10  257: 		}
cd827b4c75 2021-11-10  258: 	}
cd827b4c75 2021-11-10  259: 
cd827b4c75 2021-11-10  260: 	async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, owner: S) -> Result<String>
cd827b4c75 2021-11-10  261: 	where S: Into<i64> {
cd827b4c75 2021-11-10  262: 		let owner: i64 = owner.into();
cd827b4c75 2021-11-10  263: 		let mut conn = self.pool.acquire().await
cd827b4c75 2021-11-10  264: 			.with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;
cd827b4c75 2021-11-10  265: 
cd827b4c75 2021-11-10  266: 		match match update {
cd827b4c75 2021-11-10  267: 				Some(id) => {
cd827b4c75 2021-11-10  268: 					sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6 where source_id = $1").bind(id)
cd827b4c75 2021-11-10  269: 				},
cd827b4c75 2021-11-10  270: 				None => {
cd827b4c75 2021-11-10  271: 					sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel) values ($1, $2, $3, $4, $5)")
cd827b4c75 2021-11-10  272: 				},
cd827b4c75 2021-11-10  273: 			}
cd827b4c75 2021-11-10  274: 			.bind(channel_id)
cd827b4c75 2021-11-10  275: 			.bind(url)
cd827b4c75 2021-11-10  276: 			.bind(iv_hash)
cd827b4c75 2021-11-10  277: 			.bind(owner)
cd827b4c75 2021-11-10  278: 			.bind(channel)
cd827b4c75 2021-11-10  279: 			.execute(&mut conn).await {
cd827b4c75 2021-11-10  280: 			Ok(_) => return Ok(String::from(match update {
cd827b4c75 2021-11-10  281: 				Some(_) => "Channel updated\\.",
cd827b4c75 2021-11-10  282: 				None => "Channel added\\.",
cd827b4c75 2021-11-10  283: 			})),
cd827b4c75 2021-11-10  284: 			Err(sqlx::Error::Database(err)) => {
cd827b4c75 2021-11-10  285: 				match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
cd827b4c75 2021-11-10  286: 					Some("_bt_check_unique", ) => {
cd827b4c75 2021-11-10  287: 						return Ok("Duplicate key\\.".to_string())
cd827b4c75 2021-11-10  288: 					},
cd827b4c75 2021-11-10  289: 					Some(_) => {
cd827b4c75 2021-11-10  290: 						return Ok("Database error\\.".to_string())
cd827b4c75 2021-11-10  291: 					},
cd827b4c75 2021-11-10  292: 					None => {
cd827b4c75 2021-11-10  293: 						return Ok("No database error extracted\\.".to_string())
cd827b4c75 2021-11-10  294: 					},
cd827b4c75 2021-11-10  295: 				};
cd827b4c75 2021-11-10  296: 			},
cd827b4c75 2021-11-10  297: 			Err(err) => {
cd827b4c75 2021-11-10  298: 				bail!("Sorry, unknown error:\n{:#?}\n", err);
cd827b4c75 2021-11-10  299: 			},
cd827b4c75 2021-11-10  300: 		};
cd827b4c75 2021-11-10  301: 	}
cd827b4c75 2021-11-10  302: 
cd827b4c75 2021-11-10  303: 	async fn autofetch(&self) -> Result<()> {
cd827b4c75 2021-11-10  304: 		let mut delay = chrono::Duration::minutes(1);
cd827b4c75 2021-11-10  305: 		let mut now;
cd827b4c75 2021-11-10  306: 		loop {
cd827b4c75 2021-11-10  307: 			let mut conn = self.pool.acquire().await
cd827b4c75 2021-11-10  308: 				.with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?;
cd827b4c75 2021-11-10  309: 			now = chrono::Local::now();
cd827b4c75 2021-11-10  310: 			let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
cd827b4c75 2021-11-10  311: 				.fetch_all(&mut conn).await?;
cd827b4c75 2021-11-10  312: 			for row in queue.iter() {
cd827b4c75 2021-11-10  313: 				let source_id: i32 = row.try_get("source_id")?;
cd827b4c75 2021-11-10  314: 				let owner: i64 = row.try_get("owner")?;
cd827b4c75 2021-11-10  315: 				let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
cd827b4c75 2021-11-10  316: 				if next_fetch < now {
cd827b4c75 2021-11-10  317: 					//let clone = self.clone();
cd827b4c75 2021-11-10  318: 					//clone.owner_chat(UserId::new(owner));
cd827b4c75 2021-11-10  319: 					let clone = Core {
cd827b4c75 2021-11-10  320: 						owner_chat: UserId::new(owner),
cd827b4c75 2021-11-10  321: 						..self.clone()
cd827b4c75 2021-11-10  322: 					};
cd827b4c75 2021-11-10  323: 					tokio::spawn(async move {
cd827b4c75 2021-11-10  324: 						if let Err(err) = clone.check(source_id, owner, true).await {
cd827b4c75 2021-11-10  325: 							if let Err(err) = clone.debug(&format!("šŸ›‘ {:?}", err), None) {
cd827b4c75 2021-11-10  326: 								eprintln!("Check error: {}", err);
cd827b4c75 2021-11-10  327: 							};
cd827b4c75 2021-11-10  328: 						};
cd827b4c75 2021-11-10  329: 					});
cd827b4c75 2021-11-10  330: 				} else {
cd827b4c75 2021-11-10  331: 					if next_fetch - now < delay {
cd827b4c75 2021-11-10  332: 						delay = next_fetch - now;
cd827b4c75 2021-11-10  333: 					}
cd827b4c75 2021-11-10  334: 				}
cd827b4c75 2021-11-10  335: 			};
cd827b4c75 2021-11-10  336: 			queue.clear();
cd827b4c75 2021-11-10  337: 			tokio::time::sleep(delay.to_std()?).await;
cd827b4c75 2021-11-10  338: 			delay = chrono::Duration::minutes(1);
cd827b4c75 2021-11-10  339: 		}
cd827b4c75 2021-11-10  340: 	}
cd827b4c75 2021-11-10  341: 
cd827b4c75 2021-11-10  342: 	async fn list<S>(&self, owner: S) -> Result<Vec<String>>
cd827b4c75 2021-11-10  343: 	where S: Into<i64> {
cd827b4c75 2021-11-10  344: 		let owner = owner.into();
cd827b4c75 2021-11-10  345: 		let mut reply = vec![];
cd827b4c75 2021-11-10  346: 		let mut conn = self.pool.acquire().await
cd827b4c75 2021-11-10  347: 			.with_context(|| format!("List fetch conn:\n{:?}", &self.pool))?;
cd827b4c75 2021-11-10  348: 		reply.push("Channels:".to_string());
cd827b4c75 2021-11-10  349: 		let rows = sqlx::query("select source_id, channel, enabled, url, iv_hash from rsstg_source where owner = $1 order by source_id")
cd827b4c75 2021-11-10  350: 			.bind(owner)
cd827b4c75 2021-11-10  351: 			.fetch_all(&mut conn).await?;
cd827b4c75 2021-11-10  352: 		for row in rows.iter() {
cd827b4c75 2021-11-10  353: 			let source_id: i32 = row.try_get("source_id")?;
cd827b4c75 2021-11-10  354: 			let username: &str = row.try_get("channel")?;
cd827b4c75 2021-11-10  355: 			let enabled: bool = row.try_get("enabled")?;
cd827b4c75 2021-11-10  356: 			let url: &str = row.try_get("url")?;
cd827b4c75 2021-11-10  357: 			let iv_hash: Option<&str> = row.try_get("iv_hash")?;
cd827b4c75 2021-11-10  358: 			reply.push(format!("\n\\#ļøāƒ£ {} \\*ļøāƒ£ `{}` {}\nšŸ”— `{}`", source_id, username,  
cd827b4c75 2021-11-10  359: 				match enabled {
cd827b4c75 2021-11-10  360: 					true  => "šŸ”„ enabled",
cd827b4c75 2021-11-10  361: 					false => "ā›” disabled",
cd827b4c75 2021-11-10  362: 				}, url));
cd827b4c75 2021-11-10  363: 			if let Some(hash) = iv_hash {
cd827b4c75 2021-11-10  364: 				reply.push(format!("IV `{}`", hash));
cd827b4c75 2021-11-10  365: 			}
cd827b4c75 2021-11-10  366: 		};
cd827b4c75 2021-11-10  367: 		Ok(reply)
cd827b4c75 2021-11-10  368: 	}
cd827b4c75 2021-11-10  369: }
                       370: 
                       371: #[tokio::main]
                       372: async fn main() -> Result<()> {
                       373: 	let mut settings = config::Config::default();
                       374: 	settings.merge(config::File::with_name("rsstg"))?;
                       375: 
cd827b4c75 2021-11-10  376: 	let core = Core::new(settings).await?;
                       377: 
                       378: 	let mut stream = core.stream();
                       379: 	stream.allowed_updates(&[AllowedUpdate::Message]);
                       380: 	let mut reply_to: Option<UserId>;
                       381: 
                       382: 	loop {
                       383: 		reply_to = None;
                       384: 		match stream.next().await {
                       385: 			Some(update) => {
                       386: 				if let Err(err) = handle(update?, &core, &mut reply_to).await {
cd827b4c75 2021-11-10  387: 					core.debug(&format!("šŸ›‘ {:?}", err), reply_to)?;
                       388: 				};
                       389: 			},
                       390: 			None => {
cd827b4c75 2021-11-10  391: 				core.debug(&format!("šŸ›‘ None error."), None)?;
                       392: 			}
                       393: 		};
                       394: 	}
                       395: 
                       396: 	//Ok(())
                       397: }
                       398: 
cd827b4c75 2021-11-10  399: async fn handle(update: telegram_bot::Update, core: &Core, mut _reply_to: &Option<UserId>) -> Result<()> {
cd827b4c75 2021-11-10  400: 	lazy_static! {
cd827b4c75 2021-11-10  401: 		static ref RE_USERNAME: Regex = Regex::new(r"^@[a-zA-Z][a-zA-Z0-9_]+$").unwrap();
cd827b4c75 2021-11-10  402: 		static ref RE_LINK: Regex = Regex::new(r"^https?://[a-zA-Z.0-9-]+/[-_a-zA-Z.0-9/?=]+$").unwrap();
cd827b4c75 2021-11-10  403: 		static ref RE_IV_HASH: Regex = Regex::new(r"^[a-f0-9]{14}$").unwrap();
cd827b4c75 2021-11-10  404: 	}
cd827b4c75 2021-11-10  405: 
                       406: 	match update.kind {
                       407: 		UpdateKind::Message(message) => {
cd827b4c75 2021-11-10  408: 			let mut reply: Vec<String> = vec![];
                       409: 			match message.kind {
                       410: 				MessageKind::Text { ref data, .. } => {
cd827b4c75 2021-11-10  411: 					let mut words = data.split_whitespace();
cd827b4c75 2021-11-10  412: 					let cmd = words.next().unwrap();
cd827b4c75 2021-11-10  413: 					match cmd {
cd827b4c75 2021-11-10  414: 
cd827b4c75 2021-11-10  415: // start
cd827b4c75 2021-11-10  416: 
cd827b4c75 2021-11-10  417: 						"/start" => {
cd827b4c75 2021-11-10  418: 							reply.push("We are open\\. Probably\\. Visit [channel](https://t.me/rsstg_bot_help/3) for details\\.".to_string());
cd827b4c75 2021-11-10  419: 						},
cd827b4c75 2021-11-10  420: 
cd827b4c75 2021-11-10  421: // list
cd827b4c75 2021-11-10  422: 
cd827b4c75 2021-11-10  423: 						"/list" => {
cd827b4c75 2021-11-10  424: 							reply.append(&mut core.list(message.from.id).await?);
cd827b4c75 2021-11-10  425: 						},
cd827b4c75 2021-11-10  426: 
cd827b4c75 2021-11-10  427: // add
cd827b4c75 2021-11-10  428: 
cd827b4c75 2021-11-10  429: 						"/add" | "/update" => {
cd827b4c75 2021-11-10  430: 							_reply_to = &Some(message.from.id);
cd827b4c75 2021-11-10  431: 							let mut source_id: Option<i32> = None;
cd827b4c75 2021-11-10  432: 							let at_least = "Requires at least 3 parameters.";
cd827b4c75 2021-11-10  433: 							if cmd == "/update" {
cd827b4c75 2021-11-10  434: 								let first_word = words.next()
cd827b4c75 2021-11-10  435: 									.context(at_least)?;
cd827b4c75 2021-11-10  436: 								source_id = Some(first_word.parse::<i32>()
cd827b4c75 2021-11-10  437: 									.with_context(|| format!("I need a number, but got {}.", first_word))?);
cd827b4c75 2021-11-10  438: 							}
cd827b4c75 2021-11-10  439: 							let (channel, url, iv_hash) = (
cd827b4c75 2021-11-10  440: 								words.next().context(at_least)?,
cd827b4c75 2021-11-10  441: 								words.next().context(at_least)?,
cd827b4c75 2021-11-10  442: 								words.next());
cd827b4c75 2021-11-10  443: 							if ! RE_USERNAME.is_match(&channel) {
cd827b4c75 2021-11-10  444: 								reply.push("Usernames should be something like \"@\\[a\\-zA\\-Z]\\[a\\-zA\\-Z0\\-9\\_]+\", aren't they?".to_string());
cd827b4c75 2021-11-10  445: 								bail!("Wrong username {:?}.", &channel);
cd827b4c75 2021-11-10  446: 							}
cd827b4c75 2021-11-10  447: 							if ! RE_LINK.is_match(&url) {
cd827b4c75 2021-11-10  448: 								reply.push("Link should be link to atom/rss feed, something like \"https://domain/path\"\\.".to_string());
cd827b4c75 2021-11-10  449: 								bail!("Url: {:?}", &url);
cd827b4c75 2021-11-10  450: 							}
cd827b4c75 2021-11-10  451: 							if let Some(hash) = iv_hash {
cd827b4c75 2021-11-10  452: 								if ! RE_IV_HASH.is_match(&hash) {
cd827b4c75 2021-11-10  453: 									reply.push("IV hash should be 14 hex digits.".to_string());
cd827b4c75 2021-11-10  454: 									bail!("IV: {:?}", &iv_hash);
cd827b4c75 2021-11-10  455: 								};
cd827b4c75 2021-11-10  456: 							};
cd827b4c75 2021-11-10  457: 							let channel_id = i64::from(core.tg.send(telegram_bot::GetChat::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await?.id());
cd827b4c75 2021-11-10  458: 							let chan_adm = core.tg.send(telegram_bot::GetChatAdministrators::new(telegram_bot::types::ChatRef::ChannelUsername(channel.to_string()))).await
cd827b4c75 2021-11-10  459: 								.context("Sorry, I have no access to that chat\\.")?;
cd827b4c75 2021-11-10  460: 							let (mut me, mut user) = (false, false);
cd827b4c75 2021-11-10  461: 							for admin in chan_adm {
cd827b4c75 2021-11-10  462: 								if admin.user.id == core.my.id {
cd827b4c75 2021-11-10  463: 									me = true;
cd827b4c75 2021-11-10  464: 								};
cd827b4c75 2021-11-10  465: 								if admin.user.id == message.from.id {
cd827b4c75 2021-11-10  466: 									user = true;
cd827b4c75 2021-11-10  467: 								};
cd827b4c75 2021-11-10  468: 							};
cd827b4c75 2021-11-10  469: 							if ! me   { bail!("I need to be admin on that channel\\."); };
cd827b4c75 2021-11-10  470: 							if ! user { bail!("You should be admin on that channel\\."); };
cd827b4c75 2021-11-10  471: 							reply.push(core.update(source_id, channel, channel_id, url, iv_hash, message.from.id).await?);
cd827b4c75 2021-11-10  472: 						},
cd827b4c75 2021-11-10  473: 
cd827b4c75 2021-11-10  474: // check
cd827b4c75 2021-11-10  475: 
cd827b4c75 2021-11-10  476: 						"/check" => {
cd827b4c75 2021-11-10  477: 							match &words.next().unwrap().parse::<i32>() {
cd827b4c75 2021-11-10  478: 								Err(err) => {
cd827b4c75 2021-11-10  479: 									reply.push(format!("I need a number\\.\n{}", &err));
cd827b4c75 2021-11-10  480: 								},
cd827b4c75 2021-11-10  481: 								Ok(number) => {
cd827b4c75 2021-11-10  482: 									core.check(*number, message.from.id, false).await
cd827b4c75 2021-11-10  483: 										.context("Channel check failed.")?;
cd827b4c75 2021-11-10  484: 								},
cd827b4c75 2021-11-10  485: 							};
cd827b4c75 2021-11-10  486: 						},
cd827b4c75 2021-11-10  487: 
cd827b4c75 2021-11-10  488: // clean
cd827b4c75 2021-11-10  489: 
cd827b4c75 2021-11-10  490: 						"/clean" => {
cd827b4c75 2021-11-10  491: 							match &words.next().unwrap().parse::<i32>() {
cd827b4c75 2021-11-10  492: 								Err(err) => {
cd827b4c75 2021-11-10  493: 									reply.push(format!("I need a number\\.\n{}", &err));
cd827b4c75 2021-11-10  494: 								},
cd827b4c75 2021-11-10  495: 								Ok(number) => {
cd827b4c75 2021-11-10  496: 									let result = core.clean(&number, message.from.id).await?;
cd827b4c75 2021-11-10  497: 									reply.push(result.to_string());
cd827b4c75 2021-11-10  498: 								},
cd827b4c75 2021-11-10  499: 							};
cd827b4c75 2021-11-10  500: 						},
cd827b4c75 2021-11-10  501: 
cd827b4c75 2021-11-10  502: // enable
cd827b4c75 2021-11-10  503: 
cd827b4c75 2021-11-10  504: 						"/enable" => {
cd827b4c75 2021-11-10  505: 							match &words.next().unwrap().parse::<i32>() {
cd827b4c75 2021-11-10  506: 								Err(err) => {
cd827b4c75 2021-11-10  507: 									reply.push(format!("I need a number\\.\n{}", &err));
cd827b4c75 2021-11-10  508: 								},
cd827b4c75 2021-11-10  509: 								Ok(number) => {
cd827b4c75 2021-11-10  510: 									let result = core.enable(&number, message.from.id).await?;
cd827b4c75 2021-11-10  511: 									reply.push(result.to_string());
cd827b4c75 2021-11-10  512: 								},
cd827b4c75 2021-11-10  513: 							};
cd827b4c75 2021-11-10  514: 						},
cd827b4c75 2021-11-10  515: 
cd827b4c75 2021-11-10  516: // delete
cd827b4c75 2021-11-10  517: 
cd827b4c75 2021-11-10  518: 						"/delete" => {
cd827b4c75 2021-11-10  519: 							match &words.next().unwrap().parse::<i32>() {
cd827b4c75 2021-11-10  520: 								Err(err) => {
cd827b4c75 2021-11-10  521: 									reply.push(format!("I need a number\\.\n{}", &err));
cd827b4c75 2021-11-10  522: 								},
cd827b4c75 2021-11-10  523: 								Ok(number) => {
cd827b4c75 2021-11-10  524: 									let result = core.delete(&number, message.from.id).await?;
cd827b4c75 2021-11-10  525: 									reply.push(result.to_string());
cd827b4c75 2021-11-10  526: 								},
cd827b4c75 2021-11-10  527: 							};
cd827b4c75 2021-11-10  528: 						},
cd827b4c75 2021-11-10  529: 
cd827b4c75 2021-11-10  530: // disable
cd827b4c75 2021-11-10  531: 
cd827b4c75 2021-11-10  532: 						"/disable" => {
cd827b4c75 2021-11-10  533: 							match &words.next().unwrap().parse::<i32>() {
cd827b4c75 2021-11-10  534: 								Err(err) => {
cd827b4c75 2021-11-10  535: 									reply.push(format!("I need a number\\.\n{}", &err));
cd827b4c75 2021-11-10  536: 								},
cd827b4c75 2021-11-10  537: 								Ok(number) => {
cd827b4c75 2021-11-10  538: 									let result = core.disable(&number, message.from.id).await?;
cd827b4c75 2021-11-10  539: 									reply.push(result.to_string());
cd827b4c75 2021-11-10  540: 								},
cd827b4c75 2021-11-10  541: 							};
cd827b4c75 2021-11-10  542: 						},
cd827b4c75 2021-11-10  543: 
                       544: 						_ => {
                       545: 						},
                       546: 					};
                       547: 				},
                       548: 				_ => {
                       549: 				},
cd827b4c75 2021-11-10  550: 			};
cd827b4c75 2021-11-10  551: 
cd827b4c75 2021-11-10  552: 			if reply.len() > 0 {
cd827b4c75 2021-11-10  553: 				if let Err(err) = core.tg.send(message.text_reply(reply.join("\n")).parse_mode(types::ParseMode::MarkdownV2)).await {
cd827b4c75 2021-11-10  554: 					dbg!(reply.join("\n"));
cd827b4c75 2021-11-10  555: 					println!("{}", err);
cd827b4c75 2021-11-10  556: 				};
                       557: 			};
                       558: 		},
                       559: 		_ => {},
                       560: 	};
                       561: 
                       562: 	Ok(())
                       563: }