Lines of
src/core.rs
from check-in 45e34762e4
that are changed by the sequence of edits moving toward
check-in 9d8a6738fd:
1: use anyhow::{anyhow, bail, Context, Result};
2: use async_std::task;
3: use chrono::DateTime;
4: use sqlx::{
5: postgres::PgPoolOptions,
6: Row,
7: };
8: use std::{
9: borrow::Cow,
10: collections::{
11: BTreeMap,
12: HashSet,
13: },
14: sync::{Arc, Mutex},
15: };
16:
17: #[derive(Clone)]
18: pub struct Core {
19: owner_chat: telegram_bot::UserId,
20: pub tg: telegram_bot::Api,
21: pub my: telegram_bot::User,
22: pool: sqlx::Pool<sqlx::Postgres>,
23: sources: Arc<Mutex<HashSet<Arc<i32>>>>,
24: http_client: reqwest::Client,
25: }
26:
27: impl Core {
28: pub fn new(settings: config::Config) -> Result<Arc<Core>> {
29: let owner = settings.get_int("owner")?;
30: let api_key = settings.get_string("api_key")?;
31: let tg = telegram_bot::Api::new(api_key);
32: let tg_cloned = tg.clone();
33:
45e34762e4 2023-05-28 34: let proxy = settings.get_string("proxy")?;
35: let mut client = reqwest::Client::builder();
45e34762e4 2023-05-28 36: if !proxy.is_empty() {
37: let proxy = reqwest::Proxy::all(proxy)?;
38: client = client.proxy(proxy);
39: }
40: let http_client = client.build()?;
41: let core = Arc::new(Core {
42: tg,
43: my: task::block_on(async {
44: tg_cloned.send(telegram_bot::GetMe).await
45: })?,
46: owner_chat: telegram_bot::UserId::new(owner),
47: pool: PgPoolOptions::new()
48: .max_connections(5)
49: .acquire_timeout(std::time::Duration::new(300, 0))
50: .idle_timeout(std::time::Duration::new(60, 0))
51: .connect_lazy(&settings.get_string("pg")?)?,
52: sources: Arc::new(Mutex::new(HashSet::new())),
53: http_client,
54: });
55: let clone = core.clone();
56: task::spawn(async move {
57: loop {
58: let delay = match &clone.autofetch().await {
59: Err(err) => {
60: if let Err(err) = clone.send(format!("š {:?}", err), None, None).await {
61: eprintln!("Autofetch error: {}", err);
62: };
63: std::time::Duration::from_secs(60)
64: },
65: Ok(time) => *time,
66: };
67: task::sleep(delay).await;
68: }
69: });
70: Ok(core)
71: }
72:
73: pub fn stream(&self) -> telegram_bot::UpdatesStream {
74: self.tg.stream()
75: }
76:
77: pub async fn send<'a, S>(&self, msg: S, target: Option<telegram_bot::UserId>, mode: Option<telegram_bot::types::ParseMode>) -> Result<()>
78: where S: Into<Cow<'a, str>> {
79: let mode = mode.unwrap_or(telegram_bot::types::ParseMode::Html);
80: let target = target.unwrap_or(self.owner_chat);
81: self.tg.send(telegram_bot::SendMessage::new(target, msg).parse_mode(mode)).await?;
82: Ok(())
83: }
84:
85: pub async fn check<S>(&self, id: &i32, owner: S, real: bool) -> Result<Cow<'_, str>>
86: where S: Into<i64> {
87: let owner = owner.into();
88:
89: let mut posted: i32 = 0;
90: let id = {
91: let mut set = self.sources.lock().unwrap();
92: match set.get(id) {
93: Some(id) => id.clone(),
94: None => {
95: let id = Arc::new(*id);
96: set.insert(id.clone());
97: id.clone()
98: },
99: }
100: };
101: let count = Arc::strong_count(&id);
102: if count == 2 {
103: let mut conn = self.pool.acquire().await
104: .with_context(|| format!("Query queue fetch conn:\n{:?}", &self.pool))?;
105: let row = sqlx::query("select source_id, channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
106: .bind(*id)
107: .bind(owner)
108: .fetch_one(&mut conn).await
109: .with_context(|| format!("Query source:\n{:?}", &self.pool))?;
110: drop(conn);
111:
112: let channel_id: i64 = row.try_get("channel_id")?;
113: let url: &str = row.try_get("url")?;
114: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
115: let url_re = match row.try_get("url_re")? {
116: Some(x) => Some(sedregex::ReplaceCommand::new(x)?),
117: None => None,
118: };
119: let destination = match real {
120: true => telegram_bot::UserId::new(channel_id),
121: false => telegram_bot::UserId::new(row.try_get("owner")?),
122: };
123: let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
124: let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
125:
126: let response = self.http_client.get(url).send().await?;
127: let status = response.status();
128: let content = response.bytes().await?;
129: match rss::Channel::read_from(&content[..]) {
130: Ok(feed) => {
131: for item in feed.items() {
132: if let Some(link) = item.link() {
133: let date = match item.pub_date() {
134: Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
135: None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
136: }?;
137: let url = link;
138: posts.insert(date, url.to_string());
139: }
140: };
141: },
142: Err(err) => match err {
143: rss::Error::InvalidStartTag => {
144: let feed = atom_syndication::Feed::read_from(&content[..])
145: .with_context(|| format!("Problem opening feed url:\n{}\n{}", &url, status))?;
146: for item in feed.entries() {
147: let date = item.published().unwrap();
148: let url = item.links()[0].href();
149: posts.insert(*date, url.to_string());
150: };
151: },
152: rss::Error::Eof => (),
153: _ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &url, err, status)
154: }
155: };
156: for (date, url) in posts.iter() {
157: let mut conn = self.pool.acquire().await
158: .with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?;
159: let post_url: Cow<str> = match url_re {
160: Some(ref x) => x.execute(url),
161: None => url.into(),
162: };
163: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
164: .bind(&*post_url)
165: .bind(*id)
166: .fetch_one(&mut conn).await
167: .with_context(|| format!("Check post:\n{:?}", &conn))?;
168: let exists: bool = row.try_get("exists")?;
169: if ! exists {
170: if this_fetch.is_none() || *date > this_fetch.unwrap() {
171: this_fetch = Some(*date);
172: };
173: self.tg.send( match iv_hash {
174: Some(hash) => telegram_bot::SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", &post_url, hash)),
175: None => telegram_bot::SendMessage::new(destination, format!("{}", post_url)),
176: }.parse_mode(telegram_bot::types::ParseMode::Html)).await
177: .context("Can't post message:")?;
178: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
179: .bind(*id)
180: .bind(date)
181: .bind(&*post_url)
182: .execute(&mut conn).await
183: .with_context(|| format!("Record post:\n{:?}", &conn))?;
184: drop(conn);
185: task::sleep(std::time::Duration::new(4, 0)).await;
186: };
187: posted += 1;
188: };
189: posts.clear();
190: };
191: let mut conn = self.pool.acquire().await
192: .with_context(|| format!("Update scrape fetch conn:\n{:?}", &self.pool))?;
193: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
194: .bind(*id)
195: .execute(&mut conn).await
196: .with_context(|| format!("Update scrape:\n{:?}", &conn))?;
197: Ok(format!("Posted: {}", &posted).into())
198: }
199:
200: pub async fn delete<S>(&self, source_id: &i32, owner: S) -> Result<Cow<'_, str>>
201: where S: Into<i64> {
202: let owner = owner.into();
203:
204: let mut conn = self.pool.acquire().await
205: .with_context(|| format!("Delete fetch conn:\n{:?}", &self.pool))?;
206: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
207: .bind(source_id)
208: .bind(owner)
209: .execute(&mut conn).await
210: .with_context(|| format!("Delete source rule:\n{:?}", &self.pool))?
211: .rows_affected() {
212: 0 => { Ok("No data found found.".into()) },
213: x => { Ok(format!("{} sources removed.", x).into()) },
214: }
215: }
216:
217: pub async fn clean<S>(&self, source_id: &i32, owner: S) -> Result<Cow<'_, str>>
218: where S: Into<i64> {
219: let owner = owner.into();
220:
221: let mut conn = self.pool.acquire().await
222: .with_context(|| format!("Clean fetch conn:\n{:?}", &self.pool))?;
223: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
224: .bind(source_id)
225: .bind(owner)
226: .execute(&mut conn).await
227: .with_context(|| format!("Clean seen posts:\n{:?}", &self.pool))?
228: .rows_affected() {
229: 0 => { Ok("No data found found.".into()) },
230: x => { Ok(format!("{} posts purged.", x).into()) },
231: }
232: }
233:
234: pub async fn enable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
235: where S: Into<i64> {
236: let owner = owner.into();
237:
238: let mut conn = self.pool.acquire().await
239: .with_context(|| format!("Enable fetch conn:\n{:?}", &self.pool))?;
240: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
241: .bind(source_id)
242: .bind(owner)
243: .execute(&mut conn).await
244: .with_context(|| format!("Enable source:\n{:?}", &self.pool))?
245: .rows_affected() {
246: 1 => { Ok("Source enabled.") },
247: 0 => { Ok("Source not found.") },
248: _ => { Err(anyhow!("Database error.")) },
249: }
250: }
251:
252: pub async fn disable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
253: where S: Into<i64> {
254: let owner = owner.into();
255:
256: let mut conn = self.pool.acquire().await
257: .with_context(|| format!("Disable fetch conn:\n{:?}", &self.pool))?;
258: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
259: .bind(source_id)
260: .bind(owner)
261: .execute(&mut conn).await
262: .with_context(|| format!("Disable source:\n{:?}", &self.pool))?
263: .rows_affected() {
264: 1 => { Ok("Source disabled.") },
265: 0 => { Ok("Source not found.") },
266: _ => { Err(anyhow!("Database error.")) },
267: }
268: }
269:
270: pub async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result<&str>
271: where S: Into<i64> {
272: let owner = owner.into();
273:
274: let mut conn = self.pool.acquire().await
275: .with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;
276:
277: match match update {
278: Some(id) => {
279: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1").bind(id)
280: },
281: None => {
282: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
283: },
284: }
285: .bind(channel_id)
286: .bind(url)
287: .bind(iv_hash)
288: .bind(owner)
289: .bind(channel)
290: .bind(url_re)
291: .execute(&mut conn).await {
292: Ok(_) => Ok(match update {
293: Some(_) => "Channel updated.",
294: None => "Channel added.",
295: }),
296: Err(sqlx::Error::Database(err)) => {
297: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
298: Some("_bt_check_unique", ) => {
299: Ok("Duplicate key.")
300: },
301: Some(_) => {
302: Ok("Database error.")
303: },
304: None => {
305: Ok("No database error extracted.")
306: },
307: }
308: },
309: Err(err) => {
310: bail!("Sorry, unknown error:\n{:#?}\n", err);
311: },
312: }
313: }
314:
315: async fn autofetch(&self) -> Result<std::time::Duration> {
316: let mut delay = chrono::Duration::minutes(1);
317: let mut conn = self.pool.acquire().await
318: .with_context(|| format!("Autofetch fetch conn:\n{:?}", &self.pool))?;
319: let now = chrono::Local::now();
320: let mut queue = sqlx::query("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
321: .fetch_all(&mut conn).await?;
322: for row in queue.iter() {
323: let source_id: i32 = row.try_get("source_id")?;
324: let owner: i64 = row.try_get("owner")?;
325: let next_fetch: DateTime<chrono::Local> = row.try_get("next_fetch")?;
326: if next_fetch < now {
327: let clone = Core {
328: owner_chat: telegram_bot::UserId::new(owner),
329: ..self.clone()
330: };
331: task::spawn(async move {
332: if let Err(err) = clone.check(&source_id, owner, true).await {
333: if let Err(err) = clone.send(&format!("š {:?}", err), None, None).await {
334: eprintln!("Check error: {}", err);
335: };
336: };
337: });
338: } else if next_fetch - now < delay {
339: delay = next_fetch - now;
340: }
341: };
342: queue.clear();
343: Ok(delay.to_std()?)
344: }
345:
346: pub async fn list<S>(&self, owner: S) -> Result<String>
347: where S: Into<i64> {
348: let owner = owner.into();
349:
350: let mut reply: Vec<Cow<str>> = vec![];
351: let mut conn = self.pool.acquire().await
352: .with_context(|| format!("List fetch conn:\n{:?}", &self.pool))?;
353: reply.push("Channels:".into());
354: let rows = sqlx::query("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
355: .bind(owner)
356: .fetch_all(&mut conn).await?;
357: for row in rows.iter() {
358: let source_id: i32 = row.try_get("source_id")?;
359: let username: &str = row.try_get("channel")?;
360: let enabled: bool = row.try_get("enabled")?;
361: let url: &str = row.try_get("url")?;
362: let iv_hash: Option<&str> = row.try_get("iv_hash")?;
363: let url_re: Option<&str> = row.try_get("url_re")?;
364: reply.push(format!("\n\\#ļøā£ {} \\*ļøā£ `{}` {}\nš `{}`", source_id, username,
365: match enabled {
366: true => "š enabled",
367: false => "ā disabled",
368: }, url).into());
369: if let Some(hash) = iv_hash {
370: reply.push(format!("IV: `{}`", hash).into());
371: }
372: if let Some(re) = url_re {
373: reply.push(format!("RE: `{}`", re).into());
374: }
375: };
376: Ok(reply.join("\n"))
377: }
378: }