Lines of
src/core.rs
from check-in 9d8a6738fd
that are changed by the sequence of edits moving toward
check-in 9910c2209c:
1: use anyhow::{anyhow, bail, Context, Result};
2: use async_std::task;
3: use chrono::DateTime;
9d8a6738fd 2023-07-30 4: use sqlx::{
9d8a6738fd 2023-07-30 5: postgres::PgPoolOptions,
9d8a6738fd 2023-07-30 6: Row,
9d8a6738fd 2023-07-30 7: };
8: use std::{
9: borrow::Cow,
10: collections::{
11: BTreeMap,
12: HashSet,
13: },
14: sync::{Arc, Mutex},
15: };
16:
17: #[derive(Clone)]
18: pub struct Core {
19: owner_chat: telegram_bot::UserId,
20: pub tg: telegram_bot::Api,
21: pub my: telegram_bot::User,
22: pool: sqlx::Pool<sqlx::Postgres>,
23: sources: Arc<Mutex<HashSet<Arc<i32>>>>,
24: http_client: reqwest::Client,
25: }
26:
27: impl Core {
28: pub fn new(settings: config::Config) -> Result<Arc<Core>> {
29: let owner = settings.get_int("owner")?;
30: let api_key = settings.get_string("api_key")?;
31: let tg = telegram_bot::Api::new(api_key);
32: let tg_cloned = tg.clone();
33:
34: let mut client = reqwest::Client::builder();
35: if let Ok(proxy) = settings.get_string("proxy") {
36: let proxy = reqwest::Proxy::all(proxy)?;
37: client = client.proxy(proxy);
38: }
39: let http_client = client.build()?;
40: let core = Arc::new(Core {
41: tg,
42: my: task::block_on(async {
43: tg_cloned.send(telegram_bot::GetMe).await
44: })?,
45: owner_chat: telegram_bot::UserId::new(owner),
46: pool: PgPoolOptions::new()
47: .max_connections(5)
48: .acquire_timeout(std::time::Duration::new(300, 0))
49: .idle_timeout(std::time::Duration::new(60, 0))
50: .connect_lazy(&settings.get_string("pg")?)?,
51: sources: Arc::new(Mutex::new(HashSet::new())),
52: http_client,
53: });
54: let clone = core.clone();
55: task::spawn(async move {
56: loop {
57: let delay = match &clone.autofetch().await {
58: Err(err) => {
59: if let Err(err) = clone.send(format!("š {:?}", err), None, None).await {
60: eprintln!("Autofetch error: {}", err);
61: };
62: std::time::Duration::from_secs(60)
63: },
64: Ok(time) => *time,
65: };
66: task::sleep(delay).await;
67: }
68: });
69: Ok(core)
70: }
71:
72: pub fn stream(&self) -> telegram_bot::UpdatesStream {
73: self.tg.stream()
74: }
75:
76: pub async fn send<'a, S>(&self, msg: S, target: Option<telegram_bot::UserId>, mode: Option<telegram_bot::types::ParseMode>) -> Result<()>
77: where S: Into<Cow<'a, str>> {
78: let mode = mode.unwrap_or(telegram_bot::types::ParseMode::Html);
79: let target = target.unwrap_or(self.owner_chat);
80: self.tg.send(telegram_bot::SendMessage::new(target, msg).parse_mode(mode)).await?;
81: Ok(())
82: }
83:
84: pub async fn check<S>(&self, id: &i32, owner: S, real: bool) -> Result<Cow<'_, str>>
85: where S: Into<i64> {
86: let owner = owner.into();
87:
88: let mut posted: i32 = 0;
89: let id = {
90: let mut set = self.sources.lock().unwrap();
91: match set.get(id) {
92: Some(id) => id.clone(),
93: None => {
94: let id = Arc::new(*id);
95: set.insert(id.clone());
96: id.clone()
97: },
98: }
99: };
100: let count = Arc::strong_count(&id);
101: if count == 2 {
9d8a6738fd 2023-07-30 102: let mut conn = self.pool.acquire().await
9d8a6738fd 2023-07-30 103: .with_context(|| format!("Query queue fetch conn:\n{:?}", &self.pool))?;
9d8a6738fd 2023-07-30 104: let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
9d8a6738fd 2023-07-30 105: .bind(*id)
9d8a6738fd 2023-07-30 106: .bind(owner)
9d8a6738fd 2023-07-30 107: .fetch_one(&mut conn).await
9d8a6738fd 2023-07-30 108: .with_context(|| format!("Query source:\n{:?}", &self.pool))?;
9d8a6738fd 2023-07-30 109: drop(conn);
9d8a6738fd 2023-07-30 110:
9d8a6738fd 2023-07-30 111: let channel_id: i64 = row.try_get("channel_id")?;
9d8a6738fd 2023-07-30 112: let url: &str = row.try_get("url")?;
9d8a6738fd 2023-07-30 113: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
9d8a6738fd 2023-07-30 114: let url_re = match row.try_get("url_re")? {
9d8a6738fd 2023-07-30 115: Some(x) => Some(sedregex::ReplaceCommand::new(x)?),
9d8a6738fd 2023-07-30 116: None => None,
9d8a6738fd 2023-07-30 117: };
118: let destination = match real {
9d8a6738fd 2023-07-30 119: true => telegram_bot::UserId::new(channel_id),
9d8a6738fd 2023-07-30 120: false => telegram_bot::UserId::new(row.try_get("owner")?),
121: };
122: let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
123: let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
124:
9d8a6738fd 2023-07-30 125: let response = self.http_client.get(url).send().await?;
126: let status = response.status();
127: let content = response.bytes().await?;
128: match rss::Channel::read_from(&content[..]) {
129: Ok(feed) => {
130: for item in feed.items() {
131: if let Some(link) = item.link() {
132: let date = match item.pub_date() {
133: Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
134: None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
135: }?;
136: let url = link;
137: posts.insert(date, url.to_string());
138: }
139: };
140: },
141: Err(err) => match err {
142: rss::Error::InvalidStartTag => {
143: let feed = atom_syndication::Feed::read_from(&content[..])
9d8a6738fd 2023-07-30 144: .with_context(|| format!("Problem opening feed url:\n{}\n{}", &url, status))?;
145: for item in feed.entries() {
146: let date = item.published().unwrap();
147: let url = item.links()[0].href();
148: posts.insert(*date, url.to_string());
149: };
150: },
151: rss::Error::Eof => (),
9d8a6738fd 2023-07-30 152: _ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &url, err, status)
153: }
154: };
155: for (date, url) in posts.iter() {
9d8a6738fd 2023-07-30 156: let mut conn = self.pool.acquire().await
9d8a6738fd 2023-07-30 157: .with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?;
9d8a6738fd 2023-07-30 158: let post_url: Cow<str> = match url_re {
9d8a6738fd 2023-07-30 159: Some(ref x) => x.execute(url),
160: None => url.into(),
161: };
9d8a6738fd 2023-07-30 162: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
9d8a6738fd 2023-07-30 163: .bind(&*post_url)
9d8a6738fd 2023-07-30 164: .bind(*id)
9d8a6738fd 2023-07-30 165: .fetch_one(&mut conn).await
9d8a6738fd 2023-07-30 166: .with_context(|| format!("Check post:\n{:?}", &conn))?;
9d8a6738fd 2023-07-30 167: let exists: bool = row.try_get("exists")?;
9d8a6738fd 2023-07-30 168: if ! exists {
9d8a6738fd 2023-07-30 169: if this_fetch.is_none() || *date > this_fetch.unwrap() {
9d8a6738fd 2023-07-30 170: this_fetch = Some(*date);
9d8a6738fd 2023-07-30 171: };
9d8a6738fd 2023-07-30 172: self.tg.send( match iv_hash {
9d8a6738fd 2023-07-30 173: Some(hash) => telegram_bot::SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", &post_url, hash)),
9d8a6738fd 2023-07-30 174: None => telegram_bot::SendMessage::new(destination, format!("{}", post_url)),
9d8a6738fd 2023-07-30 175: }.parse_mode(telegram_bot::types::ParseMode::Html)).await
9d8a6738fd 2023-07-30 176: .context("Can't post message:")?;
9d8a6738fd 2023-07-30 177: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
9d8a6738fd 2023-07-30 178: .bind(*id)
9d8a6738fd 2023-07-30 179: .bind(date)
9d8a6738fd 2023-07-30 180: .bind(&*post_url)
9d8a6738fd 2023-07-30 181: .execute(&mut conn).await
9d8a6738fd 2023-07-30 182: .with_context(|| format!("Record post:\n{:?}", &conn))?;
9d8a6738fd 2023-07-30 183: drop(conn);
9d8a6738fd 2023-07-30 184: task::sleep(std::time::Duration::new(4, 0)).await;
185: };
186: posted += 1;
187: };
188: posts.clear();
189: };
9d8a6738fd 2023-07-30 190: let mut conn = self.pool.acquire().await
9d8a6738fd 2023-07-30 191: .with_context(|| format!("Update scrape fetch conn:\n{:?}", &self.pool))?;
9d8a6738fd 2023-07-30 192: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
9d8a6738fd 2023-07-30 193: .bind(*id)
9d8a6738fd 2023-07-30 194: .execute(&mut conn).await
9d8a6738fd 2023-07-30 195: .with_context(|| format!("Update scrape:\n{:?}", &conn))?;
196: Ok(format!("Posted: {}", &posted).into())
197: }
198:
199: pub async fn delete<S>(&self, source_id: &i32, owner: S) -> Result<Cow<'_, str>>
200: where S: Into<i64> {
201: let owner = owner.into();
202:
9d8a6738fd 2023-07-30 203: let mut conn = self.pool.acquire().await
9d8a6738fd 2023-07-30 204: .with_context(|| format!("Delete fetch conn:\n{:?}", &self.pool))?;
9d8a6738fd 2023-07-30 205: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
9d8a6738fd 2023-07-30 206: .bind(source_id)
9d8a6738fd 2023-07-30 207: .bind(owner)
9d8a6738fd 2023-07-30 208: .execute(&mut conn).await
9d8a6738fd 2023-07-30 209: .with_context(|| format!("Delete source rule:\n{:?}", &self.pool))?
9d8a6738fd 2023-07-30 210: .rows_affected() {
211: 0 => { Ok("No data found found.".into()) },
212: x => { Ok(format!("{} sources removed.", x).into()) },
213: }
214: }
215:
216: pub async fn clean<S>(&self, source_id: &i32, owner: S) -> Result<Cow<'_, str>>
217: where S: Into<i64> {
218: let owner = owner.into();
219:
9d8a6738fd 2023-07-30 220: let mut conn = self.pool.acquire().await
9d8a6738fd 2023-07-30 221: .with_context(|| format!("Clean fetch conn:\n{:?}", &self.pool))?;
9d8a6738fd 2023-07-30 222: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
9d8a6738fd 2023-07-30 223: .bind(source_id)
9d8a6738fd 2023-07-30 224: .bind(owner)
9d8a6738fd 2023-07-30 225: .execute(&mut conn).await
9d8a6738fd 2023-07-30 226: .with_context(|| format!("Clean seen posts:\n{:?}", &self.pool))?
9d8a6738fd 2023-07-30 227: .rows_affected() {
228: 0 => { Ok("No data found found.".into()) },
229: x => { Ok(format!("{} posts purged.", x).into()) },
230: }
231: }
232:
233: pub async fn enable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
234: where S: Into<i64> {
235: let owner = owner.into();
236:
9d8a6738fd 2023-07-30 237: let mut conn = self.pool.acquire().await
9d8a6738fd 2023-07-30 238: .with_context(|| format!("Enable fetch conn:\n{:?}", &self.pool))?;
9d8a6738fd 2023-07-30 239: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
9d8a6738fd 2023-07-30 240: .bind(source_id)
9d8a6738fd 2023-07-30 241: .bind(owner)
9d8a6738fd 2023-07-30 242: .execute(&mut conn).await
9d8a6738fd 2023-07-30 243: .with_context(|| format!("Enable source:\n{:?}", &self.pool))?
9d8a6738fd 2023-07-30 244: .rows_affected() {
245: 1 => { Ok("Source enabled.") },
246: 0 => { Ok("Source not found.") },
247: _ => { Err(anyhow!("Database error.")) },
248: }
249: }
250:
251: pub async fn disable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
252: where S: Into<i64> {
253: let owner = owner.into();
254:
9d8a6738fd 2023-07-30 255: let mut conn = self.pool.acquire().await
9d8a6738fd 2023-07-30 256: .with_context(|| format!("Disable fetch conn:\n{:?}", &self.pool))?;
9d8a6738fd 2023-07-30 257: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
9d8a6738fd 2023-07-30 258: .bind(source_id)
9d8a6738fd 2023-07-30 259: .bind(owner)
9d8a6738fd 2023-07-30 260: .execute(&mut conn).await
9d8a6738fd 2023-07-30 261: .with_context(|| format!("Disable source:\n{:?}", &self.pool))?
9d8a6738fd 2023-07-30 262: .rows_affected() {
263: 1 => { Ok("Source disabled.") },
264: 0 => { Ok("Source not found.") },
265: _ => { Err(anyhow!("Database error.")) },
266: }
267: }
268:
269: pub async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result<&str>
270: where S: Into<i64> {
271: let owner = owner.into();
272:
9d8a6738fd 2023-07-30 273: let mut conn = self.pool.acquire().await
9d8a6738fd 2023-07-30 274: .with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;
9d8a6738fd 2023-07-30 275:
276: match match update {
277: Some(id) => {
9d8a6738fd 2023-07-30 278: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1").bind(id)
279: },
280: None => {
9d8a6738fd 2023-07-30 281: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
282: },
9d8a6738fd 2023-07-30 283: }
9d8a6738fd 2023-07-30 284: .bind(channel_id)
9d8a6738fd 2023-07-30 285: .bind(url)
9d8a6738fd 2023-07-30 286: .bind(iv_hash)
9d8a6738fd 2023-07-30 287: .bind(owner)
9d8a6738fd 2023-07-30 288: .bind(channel)
9d8a6738fd 2023-07-30 289: .bind(url_re)
9d8a6738fd 2023-07-30 290: .execute(&mut conn).await {
291: Ok(_) => Ok(match update {
292: Some(_) => "Channel updated.",
293: None => "Channel added.",
294: }),
295: Err(sqlx::Error::Database(err)) => {
296: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
297: Some("_bt_check_unique", ) => {
298: Ok("Duplicate key.")
299: },
300: Some(_) => {
301: Ok("Database error.")
302: },
303: None => {
304: Ok("No database error extracted.")
305: },
306: }
307: },
308: Err(err) => {
309: bail!("Sorry, unknown error:\n{:#?}\n", err);
310: },
311: }
312: }
313:
314: async fn autofetch(&self) -> Result<std::time::Duration> {
315: let mut delay = chrono::Duration::minutes(1);
9d8a6738fd 2023-07-30 316: let mut conn = self.pool.acquire().await
9d8a6738fd 2023-07-30 317: .with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?;
9d8a6738fd 2023-07-30 318: let now = chrono::Local::now();
9d8a6738fd 2023-07-30 319: let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
9d8a6738fd 2023-07-30 320: .fetch_all(&mut conn).await?;
9d8a6738fd 2023-07-30 321: for row in queue.iter() {
9d8a6738fd 2023-07-30 322: let source_id: i32 = row.try_get("source_id")?;
9d8a6738fd 2023-07-30 323: let owner: i64 = row.try_get("owner")?;
9d8a6738fd 2023-07-30 324: let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
9d8a6738fd 2023-07-30 325: if next_fetch < now {
9d8a6738fd 2023-07-30 326: let clone = Core {
9d8a6738fd 2023-07-30 327: owner_chat: telegram_bot::UserId::new(owner),
9d8a6738fd 2023-07-30 328: ..self.clone()
9d8a6738fd 2023-07-30 329: };
9d8a6738fd 2023-07-30 330: task::spawn(async move {
9d8a6738fd 2023-07-30 331: if let Err(err) = clone.check(&source_id, owner, true).await {
9d8a6738fd 2023-07-30 332: if let Err(err) = clone.send(&format!("š {:?}", err), None, None).await {
9d8a6738fd 2023-07-30 333: eprintln!("Check error: {}", err);
9d8a6738fd 2023-07-30 334: };
9d8a6738fd 2023-07-30 335: };
9d8a6738fd 2023-07-30 336: });
9d8a6738fd 2023-07-30 337: } else if next_fetch - now < delay {
9d8a6738fd 2023-07-30 338: delay = next_fetch - now;
339: }
340: };
341: queue.clear();
342: Ok(delay.to_std()?)
343: }
344:
345: pub async fn list<S>(&self, owner: S) -> Result<String>
346: where S: Into<i64> {
347: let owner = owner.into();
348:
349: let mut reply: Vec<Cow<str>> = vec![];
9d8a6738fd 2023-07-30 350: let mut conn = self.pool.acquire().await
9d8a6738fd 2023-07-30 351: .with_context(|| format!("List fetch conn:\n{:?}", &self.pool))?;
352: reply.push("Channels:".into());
9d8a6738fd 2023-07-30 353: let rows = sqlx::query("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
9d8a6738fd 2023-07-30 354: .bind(owner)
9d8a6738fd 2023-07-30 355: .fetch_all(&mut conn).await?;
356: for row in rows.iter() {
9d8a6738fd 2023-07-30 357: let source_id: i32 = row.try_get("source_id")?;
9d8a6738fd 2023-07-30 358: let username: &str = row.try_get("channel")?;
9d8a6738fd 2023-07-30 359: let enabled: bool = row.try_get("enabled")?;
9d8a6738fd 2023-07-30 360: let url: &str = row.try_get("url")?;
9d8a6738fd 2023-07-30 361: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
9d8a6738fd 2023-07-30 362: let url_re: Option<&str> = row.try_get("url_re")?;
9d8a6738fd 2023-07-30 363: reply.push(format!("\n\\#ļøā£ {} \\*ļøā£ `{}` {}\nš `{}`", source_id, username,
9d8a6738fd 2023-07-30 364: match enabled {
365: true => "š enabled",
366: false => "ā disabled",
9d8a6738fd 2023-07-30 367: }, url).into());
9d8a6738fd 2023-07-30 368: if let Some(hash) = iv_hash {
369: reply.push(format!("IV: `{}`", hash).into());
370: }
9d8a6738fd 2023-07-30 371: if let Some(re) = url_re {
372: reply.push(format!("RE: `{}`", re).into());
373: }
374: };
375: Ok(reply.join("\n"))
376: }
377: }