Lines of
src/core.rs
from check-in 79c91a5357
that are changed by the sequence of edits moving toward
check-in 1c444d34ff:
1: use anyhow::{anyhow, bail, Context, Result};
2: use async_std::task;
3: use chrono::DateTime;
79c91a5357 2024-08-02 4: use lazy_static::lazy_static;
79c91a5357 2024-08-02 5: use regex::Regex;
6: use sqlx::postgres::PgPoolOptions;
7: use std::{
8: borrow::Cow,
9: collections::{
10: BTreeMap,
11: HashSet,
12: },
79c91a5357 2024-08-02 13: sync::{Arc, Mutex},
14: };
15:
79c91a5357 2024-08-02 16: lazy_static! {
79c91a5357 2024-08-02 17: static ref RE_DELAY: Regex = Regex::new(r"^Too Many Requests: retry after ([0-9]+)(,.*)?$").unwrap();
18: }
19:
20: #[derive(Clone)]
21: pub struct Core {
22: owner_chat: telegram_bot::UserId,
23: pub tg: telegram_bot::Api,
24: pub my: telegram_bot::User,
25: pool: sqlx::Pool<sqlx::Postgres>,
26: sources: Arc<Mutex<HashSet<Arc<i32>>>>,
27: http_client: reqwest::Client,
28: }
29:
30: impl Core {
31: pub fn new(settings: config::Config) -> Result<Arc<Core>> {
32: let owner = settings.get_int("owner")?;
33: let api_key = settings.get_string("api_key")?;
34: let tg = telegram_bot::Api::new(api_key);
35: let tg_cloned = tg.clone();
36:
37: let mut client = reqwest::Client::builder();
38: if let Ok(proxy) = settings.get_string("proxy") {
39: let proxy = reqwest::Proxy::all(proxy)?;
40: client = client.proxy(proxy);
41: }
42: let http_client = client.build()?;
43: let core = Arc::new(Core {
44: tg,
45: my: task::block_on(async {
46: tg_cloned.send(telegram_bot::GetMe).await
47: })?,
48: owner_chat: telegram_bot::UserId::new(owner),
49: pool: PgPoolOptions::new()
50: .max_connections(5)
51: .acquire_timeout(std::time::Duration::new(300, 0))
52: .idle_timeout(std::time::Duration::new(60, 0))
53: .connect_lazy(&settings.get_string("pg")?)?,
54: sources: Arc::new(Mutex::new(HashSet::new())),
55: http_client,
56: });
57: let clone = core.clone();
58: task::spawn(async move {
59: loop {
60: let delay = match &clone.autofetch().await {
61: Err(err) => {
62: if let Err(err) = clone.send(format!("š {:?}", err), None, None).await {
63: eprintln!("Autofetch error: {}", err);
64: };
65: std::time::Duration::from_secs(60)
66: },
67: Ok(time) => *time,
68: };
69: task::sleep(delay).await;
70: }
71: });
72: Ok(core)
73: }
74:
75: pub fn stream(&self) -> telegram_bot::UpdatesStream {
76: self.tg.stream()
77: }
78:
79: pub async fn send<'a, S>(&self, msg: S, target: Option<telegram_bot::UserId>, mode: Option<telegram_bot::types::ParseMode>) -> Result<()>
80: where S: Into<Cow<'a, str>> {
81: let mode = mode.unwrap_or(telegram_bot::types::ParseMode::Html);
82: let target = target.unwrap_or(self.owner_chat);
83: self.request(telegram_bot::SendMessage::new(target, msg).parse_mode(mode)).await?;
84: Ok(())
85: }
86:
79c91a5357 2024-08-02 87: pub async fn request<Req: telegram_bot::Request> (&self, req: Req) -> Result<<Req::Response as telegram_bot::ResponseType>::Type> {
88: loop {
89: let res = self.tg.send(&req).await;
90: match res {
91: Ok(_) => return Ok(res?),
92: Err(err) => {
79c91a5357 2024-08-02 93: dbg!(&err);
79c91a5357 2024-08-02 94: if let Some(caps) = RE_DELAY.captures(err.to_string().as_ref()) {
79c91a5357 2024-08-02 95: if let Some(delay) = caps.get(1) {
79c91a5357 2024-08-02 96: let delay = delay.as_str().parse::<u64>()?;
79c91a5357 2024-08-02 97: println!("Throttled, waiting {} senconds.", delay);
79c91a5357 2024-08-02 98: task::sleep(std::time::Duration::from_secs(delay)).await;
79c91a5357 2024-08-02 99: } else {
79c91a5357 2024-08-02 100: bail!("Can't read throttling message.");
79c91a5357 2024-08-02 101: }
79c91a5357 2024-08-02 102: } else {
79c91a5357 2024-08-02 103: return Err(err.into());
104: }
105: },
106: };
107: }
108: }
109:
110: pub async fn check<S>(&self, id: &i32, owner: S, real: bool) -> Result<Cow<'_, str>>
111: where S: Into<i64> {
112: let owner = owner.into();
113:
114: let mut posted: i32 = 0;
115: let id = {
116: let mut set = self.sources.lock().unwrap();
117: match set.get(id) {
118: Some(id) => id.clone(),
119: None => {
120: let id = Arc::new(*id);
121: set.insert(id.clone());
122: id.clone()
123: },
124: }
125: };
126: let count = Arc::strong_count(&id);
127: if count == 2 {
128: let source = sqlx::query!("select source_id, channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2",
129: *id, owner).fetch_one(&mut self.pool.acquire().await?).await?;
130: let destination = match real {
131: true => telegram_bot::UserId::new(source.channel_id),
132: false => telegram_bot::UserId::new(source.owner),
133: };
134: let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
135: let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
136:
137: let response = self.http_client.get(&source.url).send().await?;
138: let status = response.status();
139: let content = response.bytes().await?;
140: match rss::Channel::read_from(&content[..]) {
141: Ok(feed) => {
142: for item in feed.items() {
143: if let Some(link) = item.link() {
144: let date = match item.pub_date() {
145: Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
146: None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
147: }?;
148: let url = link;
149: posts.insert(date, url.to_string());
150: }
151: };
152: },
153: Err(err) => match err {
154: rss::Error::InvalidStartTag => {
155: let feed = atom_syndication::Feed::read_from(&content[..])
156: .with_context(|| format!("Problem opening feed url:\n{}\n{}", &source.url, status))?;
157: for item in feed.entries() {
158: let date = item.published().unwrap();
159: let url = item.links()[0].href();
160: posts.insert(*date, url.to_string());
161: };
162: },
163: rss::Error::Eof => (),
164: _ => bail!("Unsupported or mangled content:\n{:?}\n{:#?}\n{:#?}\n", &source.url, err, status)
165: }
166: };
167: for (date, url) in posts.iter() {
168: let post_url: Cow<str> = match source.url_re {
169: Some(ref x) => sedregex::ReplaceCommand::new(x)?.execute(url),
170: None => url.into(),
171: };
172: if let Some(exists) = sqlx::query!("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;",
173: &post_url, *id).fetch_one(&mut self.pool.acquire().await?).await?.exists {
174: if ! exists {
175: if this_fetch.is_none() || *date > this_fetch.unwrap() {
176: this_fetch = Some(*date);
177: };
178: self.request( match &source.iv_hash {
179: Some(hash) => telegram_bot::SendMessage::new(destination, format!("<a href=\"https://t.me/iv?url={}&rhash={}\"> </a>{0}", &post_url, hash)),
180: None => telegram_bot::SendMessage::new(destination, format!("{}", post_url)),
181: }.parse_mode(telegram_bot::types::ParseMode::Html)).await
182: .context("Can't post message:")?;
183: sqlx::query!("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);",
184: *id, date, &post_url).execute(&mut self.pool.acquire().await?).await?;
185: };
186: };
187: posted += 1;
188: };
189: posts.clear();
190: };
191: sqlx::query!("update rsstg_source set last_scrape = now() where source_id = $1;",
192: *id).execute(&mut self.pool.acquire().await?).await?;
193: Ok(format!("Posted: {}", &posted).into())
194: }
195:
196: pub async fn delete<S>(&self, source_id: &i32, owner: S) -> Result<Cow<'_, str>>
197: where S: Into<i64> {
198: let owner = owner.into();
199:
200: match sqlx::query!("delete from rsstg_source where source_id = $1 and owner = $2;",
201: source_id, owner).execute(&mut self.pool.acquire().await?).await?.rows_affected() {
202: 0 => { Ok("No data found found.".into()) },
203: x => { Ok(format!("{} sources removed.", x).into()) },
204: }
205: }
206:
207: pub async fn clean<S>(&self, source_id: &i32, owner: S) -> Result<Cow<'_, str>>
208: where S: Into<i64> {
209: let owner = owner.into();
210:
211: match sqlx::query!("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;",
212: source_id, owner).execute(&mut self.pool.acquire().await?).await?.rows_affected() {
213: 0 => { Ok("No data found found.".into()) },
214: x => { Ok(format!("{} posts purged.", x).into()) },
215: }
216: }
217:
218: pub async fn enable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
219: where S: Into<i64> {
220: let owner = owner.into();
221:
222: match sqlx::query!("update rsstg_source set enabled = true where source_id = $1 and owner = $2",
223: source_id, owner).execute(&mut self.pool.acquire().await?).await?.rows_affected() {
224: 1 => { Ok("Source enabled.") },
225: 0 => { Ok("Source not found.") },
226: _ => { Err(anyhow!("Database error.")) },
227: }
228: }
229:
230: pub async fn disable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
231: where S: Into<i64> {
232: let owner = owner.into();
233:
234: match sqlx::query!("update rsstg_source set enabled = false where source_id = $1 and owner = $2",
235: source_id, owner).execute(&mut self.pool.acquire().await?).await?.rows_affected() {
236: 1 => { Ok("Source disabled.") },
237: 0 => { Ok("Source not found.") },
238: _ => { Err(anyhow!("Database error.")) },
239: }
240: }
241:
242: pub async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result<&str>
243: where S: Into<i64> {
244: let owner = owner.into();
245:
246: match match update {
247: Some(id) => {
248: sqlx::query!("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1",
249: id, channel_id, url, iv_hash, owner, channel, url_re).execute(&mut self.pool.acquire().await?).await
250: },
251: None => {
252: sqlx::query!("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)",
253: channel_id, url, iv_hash, owner, channel, url_re).execute(&mut self.pool.acquire().await?).await
254: },
255: } {
256: Ok(_) => Ok(match update {
257: Some(_) => "Channel updated.",
258: None => "Channel added.",
259: }),
260: Err(sqlx::Error::Database(err)) => {
261: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
262: Some("_bt_check_unique", ) => {
263: Ok("Duplicate key.")
264: },
265: Some(_) => {
266: Ok("Database error.")
267: },
268: None => {
269: Ok("No database error extracted.")
270: },
271: }
272: },
273: Err(err) => {
274: bail!("Sorry, unknown error:\n{:#?}\n", err);
275: },
276: }
277: }
278:
279: async fn autofetch(&self) -> Result<std::time::Duration> {
280: let mut delay = chrono::Duration::minutes(1);
281: let now = chrono::Local::now();
282: let mut queue = sqlx::query!(r#"select source_id, next_fetch as "next_fetch: DateTime<chrono::Local>", owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';"#)
283: .fetch_all(&mut self.pool.acquire().await?).await?;
284: for row in queue.iter() {
285: if let Some(next_fetch) = row.next_fetch {
286: if next_fetch < now {
287: if let (Some(owner), Some(source_id)) = (row.owner, row.source_id) {
288: let clone = Core {
289: owner_chat: telegram_bot::UserId::new(owner),
290: ..self.clone()
291: };
292: task::spawn(async move {
293: if let Err(err) = clone.check(&source_id, owner, true).await {
294: if let Err(err) = clone.send(&format!("š {:?}", err), None, None).await {
295: dbg!("Check error: {}", err);
296: // clone.disable(&source_id, owner).await.unwrap();
297: };
298: };
299: });
300: }
301: } else if next_fetch - now < delay {
302: delay = next_fetch - now;
303: }
304: }
305: };
306: queue.clear();
307: Ok(delay.to_std()?)
308: }
309:
310: pub async fn list<S>(&self, owner: S) -> Result<String>
311: where S: Into<i64> {
312: let owner = owner.into();
313:
314: let mut reply: Vec<Cow<str>> = vec![];
315: reply.push("Channels:".into());
316: let rows = sqlx::query!("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id",
317: owner).fetch_all(&mut *self.pool.acquire().await?).await?;
318: for row in rows.iter() {
319: reply.push(format!("\n\\#ļøā£ {} \\*ļøā£ `{}` {}\nš `{}`", row.source_id, row.channel,
320: match row.enabled {
321: true => "š enabled",
322: false => "ā disabled",
323: }, row.url).into());
324: if let Some(hash) = &row.iv_hash {
325: reply.push(format!("IV: `{}`", hash).into());
326: }
327: if let Some(re) = &row.url_re {
328: reply.push(format!("RE: `{}`", re).into());
329: }
330: };
331: Ok(reply.join("\n"))
332: }
333: }