Lines of
src/core.rs
from check-in bb89b6fab8
that are changed by the sequence of edits moving toward
check-in fae13a0e55:
1: use crate::{
2: command,
3: sql::Db,
4: };
5:
6: use std::{
7: borrow::Cow,
8: collections::{
9: BTreeMap,
10: HashSet,
11: },
bb89b6fab8 2025-06-15 12: num::TryFromIntError,
13: sync::{
14: Arc,
15: Mutex
16: },
17: };
18:
19: use anyhow::{
20: anyhow,
21: bail,
22: Result,
23: };
24: use async_std::task;
25: use chrono::DateTime;
26: use tgbot::{
27: api::Client,
28: handler::UpdateHandler,
29: types::{
30: Bot,
31: ChatPeerId,
32: Command,
33: GetBot,
34: Message,
35: ParseMode,
36: SendMessage,
37: Update,
38: UpdateType,
39: UserPeerId,
40: },
41: };
bb89b6fab8 2025-06-15 42: use thiserror::Error;
bb89b6fab8 2025-06-15 43:
bb89b6fab8 2025-06-15 44: #[derive(Error, Debug)]
bb89b6fab8 2025-06-15 45: pub enum RssError {
bb89b6fab8 2025-06-15 46: // #[error(transparent)]
bb89b6fab8 2025-06-15 47: // Tg(#[from] TgError),
bb89b6fab8 2025-06-15 48: #[error(transparent)]
bb89b6fab8 2025-06-15 49: Int(#[from] TryFromIntError),
bb89b6fab8 2025-06-15 50: }
51:
52: #[derive(Clone)]
53: pub struct Core {
54: owner_chat: ChatPeerId,
55: // max_delay: u16,
56: pub tg: Client,
57: pub me: Bot,
58: pub db: Db,
59: sources: Arc<Mutex<HashSet<Arc<i32>>>>,
60: http_client: reqwest::Client,
61: }
62:
63: impl Core {
64: pub async fn new(settings: config::Config) -> Result<Core> {
65: let owner_chat = ChatPeerId::from(settings.get_int("owner")?);
66: let api_key = settings.get_string("api_key")?;
67: let tg = Client::new(&api_key)?;
68:
69: let mut client = reqwest::Client::builder();
70: if let Ok(proxy) = settings.get_string("proxy") {
71: let proxy = reqwest::Proxy::all(proxy)?;
72: client = client.proxy(proxy);
73: }
74: let http_client = client.build()?;
75: let me = tg.execute(GetBot).await?;
76: let core = Core {
77: tg,
78: me,
79: owner_chat,
80: db: Db::new(&settings.get_string("pg")?)?,
81: sources: Arc::new(Mutex::new(HashSet::new())),
82: http_client,
83: // max_delay: 60,
84: };
85: let clone = core.clone();
86: task::spawn(async move {
87: loop {
88: let delay = match &clone.autofetch().await {
89: Err(err) => {
90: if let Err(err) = clone.send(format!("š {err:?}"), None, None).await {
91: eprintln!("Autofetch error: {err:?}");
92: };
93: std::time::Duration::from_secs(60)
94: },
95: Ok(time) => *time,
96: };
97: task::sleep(delay).await;
98: }
99: });
100: Ok(core)
101: }
bb89b6fab8 2025-06-15 102:
bb89b6fab8 2025-06-15 103: /*
bb89b6fab8 2025-06-15 104: pub async fn stream(&self) -> Result<()> {
bb89b6fab8 2025-06-15 105: let mut offset: i64 = 0;
bb89b6fab8 2025-06-15 106: let mut params = GetUpdatesParams {
bb89b6fab8 2025-06-15 107: offset: None,
bb89b6fab8 2025-06-15 108: limit: Some(100),
bb89b6fab8 2025-06-15 109: timeout: Some(300),
bb89b6fab8 2025-06-15 110: allowed_updates: Some(vec![AllowedUpdate::Message]),
bb89b6fab8 2025-06-15 111: };
bb89b6fab8 2025-06-15 112: loop {
bb89b6fab8 2025-06-15 113: let updates = self.tg.get_updates(¶ms).await?.result;
bb89b6fab8 2025-06-15 114: if updates.is_empty() {
bb89b6fab8 2025-06-15 115: offset = 0;
bb89b6fab8 2025-06-15 116: params.offset = None;
bb89b6fab8 2025-06-15 117: continue;
bb89b6fab8 2025-06-15 118: }
bb89b6fab8 2025-06-15 119: for update in updates {
bb89b6fab8 2025-06-15 120: if i64::from(update.update_id) >= offset {
bb89b6fab8 2025-06-15 121: offset = i64::from(update.update_id) + 1;
bb89b6fab8 2025-06-15 122: params.offset = Some(offset);
bb89b6fab8 2025-06-15 123: }
bb89b6fab8 2025-06-15 124: if let UpdateContent::Message(msg) = update.content {
bb89b6fab8 2025-06-15 125: if let Some(text) = msg.text {
bb89b6fab8 2025-06-15 126: if let Some(entities) = msg.entities {
bb89b6fab8 2025-06-15 127: let chars: Vec<u16> = text.encode_utf16().collect();
bb89b6fab8 2025-06-15 128: for entity in entities {
bb89b6fab8 2025-06-15 129: if entity.type_field == MessageEntityType::BotCommand && entity.offset != 0 {
bb89b6fab8 2025-06-15 130: bail!("commands should be at message start");
bb89b6fab8 2025-06-15 131: };
bb89b6fab8 2025-06-15 132: let cmd = String::from_utf16_lossy(&chars[entity.offset as usize..entity.length as usize]);
bb89b6fab8 2025-06-15 133: let words: Vec<&str> = text.split_whitespace().collect();
bb89b6fab8 2025-06-15 134: let res = match cmd.as_ref() {
bb89b6fab8 2025-06-15 135: "/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, msg.chat.id, words).await,
bb89b6fab8 2025-06-15 136: "/start" => command::start(self, msg.chat.id).await,
bb89b6fab8 2025-06-15 137: "/list" => command::list(self, msg.chat.id).await,
bb89b6fab8 2025-06-15 138: "/add" | "/update" => command::update(self, msg.chat.id, words).await,
bb89b6fab8 2025-06-15 139: any => Err(anyhow!("Unknown command: {any}")),
bb89b6fab8 2025-06-15 140: };
bb89b6fab8 2025-06-15 141: if let Err(err) = res {
bb89b6fab8 2025-06-15 142: if let Err(err2) = self.send(format!("\\#error\n```\n{err:?}\n```"),
bb89b6fab8 2025-06-15 143: Some(msg.chat.id),
bb89b6fab8 2025-06-15 144: Some(ParseMode::MarkdownV2)
bb89b6fab8 2025-06-15 145: ).await{
bb89b6fab8 2025-06-15 146: dbg!(err2);
bb89b6fab8 2025-06-15 147: };
bb89b6fab8 2025-06-15 148: }
bb89b6fab8 2025-06-15 149: };
bb89b6fab8 2025-06-15 150: };
bb89b6fab8 2025-06-15 151: };
bb89b6fab8 2025-06-15 152: };
bb89b6fab8 2025-06-15 153: }
bb89b6fab8 2025-06-15 154: }
bb89b6fab8 2025-06-15 155: }
bb89b6fab8 2025-06-15 156: */
157:
158: pub async fn send <S>(&self, msg: S, target: Option<ChatPeerId>, mode: Option<ParseMode>) -> Result<Message>
159: where S: Into<String> {
160: let msg = msg.into();
161:
162: let mode = mode.unwrap_or(ParseMode::Html);
163: let target = target.unwrap_or(self.owner_chat);
164: Ok(self.tg.execute(
165: SendMessage::new(target, msg)
166: .with_parse_mode(mode)
167: ).await?)
168: }
169:
170: pub async fn check (&self, id: i32, real: bool) -> Result<String> {
171: let mut posted: i32 = 0;
172: let mut conn = self.db.begin().await?;
173:
174: let id = {
175: let mut set = self.sources.lock().unwrap();
176: match set.get(&id) {
177: Some(id) => id.clone(),
178: None => {
179: let id = Arc::new(id);
180: set.insert(id.clone());
181: id.clone()
182: },
183: }
184: };
185: let count = Arc::strong_count(&id);
186: if count == 2 {
187: let source = conn.get_source(*id, self.owner_chat).await?;
188: conn.set_scrape(*id).await?;
189: let destination = ChatPeerId::from(match real {
190: true => source.channel_id,
191: false => source.owner,
192: });
193: let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
194: let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
195:
196: let response = self.http_client.get(&source.url).send().await?;
197: let status = response.status();
198: let content = response.bytes().await?;
199: match rss::Channel::read_from(&content[..]) {
200: Ok(feed) => {
201: for item in feed.items() {
202: if let Some(link) = item.link() {
203: let date = match item.pub_date() {
204: Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
205: None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
206: }?;
207: let url = link;
208: posts.insert(date, url.to_string());
209: }
210: };
211: },
212: Err(err) => match err {
213: rss::Error::InvalidStartTag => {
214: match atom_syndication::Feed::read_from(&content[..]) {
215: Ok(feed) => {
216: for item in feed.entries() {
217: let date = item.published().unwrap();
218: let url = item.links()[0].href();
219: posts.insert(*date, url.to_string());
220: };
221: },
222: Err(err) => {
223: bail!("Unsupported or mangled content:\n{:?}\n{err:#?}\n{status:#?}\n", &source.url)
224: },
225: }
226: },
227: rss::Error::Eof => (),
228: _ => bail!("Unsupported or mangled content:\n{:?}\n{err:#?}\n{status:#?}\n", &source.url)
229: }
230: };
231: for (date, url) in posts.iter() {
232: let post_url: Cow<str> = match source.url_re {
233: Some(ref x) => sedregex::ReplaceCommand::new(x)?.execute(url),
234: None => url.into(),
235: };
236: if let Some(exists) = conn.exists(&post_url, *id).await? {
237: if ! exists {
238: if this_fetch.is_none() || *date > this_fetch.unwrap() {
239: this_fetch = Some(*date);
240: };
241: self.send( match &source.iv_hash {
242: Some(hash) => format!("<a href=\"https://t.me/iv?url={post_url}&rhash={hash}\"> </a>{post_url}"),
243: None => format!("{post_url}"),
244: }, Some(destination), Some(ParseMode::Html)).await?;
245: conn.add_post(*id, date, &post_url).await?;
246: };
247: };
248: posted += 1;
249: };
250: posts.clear();
251: };
252: Ok(format!("Posted: {posted}"))
253: }
254:
255: async fn autofetch(&self) -> Result<std::time::Duration> {
256: let mut delay = chrono::Duration::minutes(1);
257: let now = chrono::Local::now();
258: let queue = {
259: let mut conn = self.db.begin().await?;
260: conn.get_queue().await?
261: };
262: for row in queue {
263: if let Some(next_fetch) = row.next_fetch {
264: if next_fetch < now {
265: if let (Some(owner), Some(source_id)) = (row.owner, row.source_id) {
266: let clone = Core {
267: owner_chat: ChatPeerId::from(owner),
268: ..self.clone()
269: };
270: task::spawn(async move {
271: if let Err(err) = clone.check(source_id, true).await {
272: if let Err(err) = clone.send(&format!("š {err:?}"), None, None).await {
273: eprintln!("Check error: {err:?}");
274: // clone.disable(&source_id, owner).await.unwrap();
275: };
276: };
277: });
278: }
279: } else if next_fetch - now < delay {
280: delay = next_fetch - now;
281: }
282: }
283: };
284: Ok(delay.to_std()?)
285: }
286:
287: pub async fn list (&self, owner: UserPeerId) -> Result<String> {
288: let mut reply: Vec<Cow<str>> = vec![];
289: reply.push("Channels:".into());
290: let mut conn = self.db.begin().await?;
291: for row in conn.get_list(owner).await? {
292: reply.push(format!("\n\\#ļøā£ {} \\*ļøā£ `{}` {}\nš `{}`", row.source_id, row.channel,
293: match row.enabled {
294: true => "š enabled",
295: false => "ā disabled",
296: }, row.url).into());
297: if let Some(hash) = &row.iv_hash {
298: reply.push(format!("IV: `{hash}`").into());
299: }
300: if let Some(re) = &row.url_re {
301: reply.push(format!("RE: `{re}`").into());
302: }
303: };
304: Ok(reply.join("\n"))
305: }
306: }
307:
308: impl UpdateHandler for Core {
309: async fn handle (&self, update: Update) {
310: if let UpdateType::Message(msg) = update.update_type {
311: if let Ok(cmd) = Command::try_from(msg) {
312: let msg = cmd.get_message();
313: let words = cmd.get_args();
bb89b6fab8 2025-06-15 314: let res = match cmd.get_name() {
bb89b6fab8 2025-06-15 315: "/check" | "/clean" | "/enable" | "/delete" | "/disable" => command::command(self, msg, words).await,
316: "/start" => command::start(self, msg).await,
317: "/list" => command::list(self, msg).await,
bb89b6fab8 2025-06-15 318: "/add" | "/update" => command::update(self, msg, words).await,
319: any => Err(anyhow!("Unknown command: {any}")),
320: };
321: if let Err(err) = res {
322: if let Err(err2) = self.send(format!("\\#error\n```\n{err:?}\n```"),
323: Some(msg.chat.get_id()),
324: Some(ParseMode::MarkdownV2)
325: ).await{
326: dbg!(err2);
327: };
328: }
329: };
330: };
331: }
332: }