Index: src/core.rs ================================================================== --- src/core.rs +++ src/core.rs @@ -57,19 +57,52 @@ /// Encodes special HTML entities to prevent them interfering with Telegram HTML pub fn encode (text: &str) -> Cow<'_, str> { RE_SPECIAL.replace_all(text, "\\$1") } + +// This one does nothing except making sure only one token exists for each id +pub struct Token { + running: Arc>>, + my_id: i32, +} + +impl Token { + fn new (running: &Arc>>, my_id: i32) -> Option { + let running = running.clone(); + smol::block_on(async { + let mut set = running.lock_arc().await; + if set.contains(&my_id) { + None + } else { + set.insert(my_id); + Some(Token { + running, + my_id, + }) + } + }) + } +} + +impl Drop for Token { + fn drop (&mut self) { + smol::block_on(async { + let mut set = self.running.lock_arc().await; + set.remove(&self.my_id); + }) + } +} #[derive(Clone)] pub struct Core { owner_chat: ChatPeerId, // max_delay: u16, pub tg: Client, pub me: Bot, pub db: Db, - sources: Arc>>>, + running: Arc>>, http_client: reqwest::Client, } pub struct Post { uri: String, @@ -95,11 +128,11 @@ let core = Core { tg, me, owner_chat, db: Db::new(&settings.get_string("pg").stack()?)?, - sources: Arc::new(Mutex::new(HashSet::new())), + running: Arc::new(Mutex::new(HashSet::new())), http_client, // max_delay: 60, }; let clone = core.clone(); smol::spawn(Compat::new(async move { @@ -133,116 +166,109 @@ pub async fn check (&self, id: i32, real: bool, last_scrape: Option>) -> Result { let mut posted: i32 = 0; let mut conn = self.db.begin().await.stack()?; - let id = { - let mut set = self.sources.lock_arc().await; - match set.get(&id) { - Some(id) => id.clone(), - None => { - let id = Arc::new(id); - set.insert(id.clone()); - id.clone() - }, - } - }; - let count = Arc::strong_count(&id); - if count == 2 { - let source = conn.get_source(*id, self.owner_chat).await.stack()?; - conn.set_scrape(*id).await.stack()?; - let destination = ChatPeerId::from(match real { - true => source.channel_id, - false => source.owner, - }); - let mut this_fetch: Option> = None; - let mut posts: BTreeMap, Post> = BTreeMap::new(); - - let mut builder = self.http_client.get(&source.url); - if let Some(last_scrape) = last_scrape { - builder = builder.header(LAST_MODIFIED, last_scrape.to_rfc2822()); - }; - let response = builder.send().await.stack()?; - { - let headers = response.headers(); - let expires = headers.get(EXPIRES); - let cache = headers.get(CACHE_CONTROL); - if expires.is_some() || cache.is_some() { - println!("{} {} {:?} {:?} {:?}", Local::now().to_rfc2822(), &source.url, last_scrape, expires, cache); - } - } - let status = response.status(); - let content = response.bytes().await.stack()?; - match rss::Channel::read_from(&content[..]) { - Ok(feed) => { - for item in feed.items() { - if let Some(link) = item.link() { - let date = match item.pub_date() { - Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), - None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]), - }.stack()?; - let uri = link.to_string(); - let title = item.title().unwrap_or("").to_string(); - let authors = item.author().unwrap_or("").to_string(); - let summary = item.content().unwrap_or("").to_string(); - posts.insert(date, Post{ - uri, - title, - authors, - summary, - }); - } - }; - }, - Err(err) => match err { - rss::Error::InvalidStartTag => { - match atom_syndication::Feed::read_from(&content[..]) { - Ok(feed) => { - for item in feed.entries() { - let date = item.published().unwrap(); - let uri = item.links()[0].href().to_string(); - let title = item.title().to_string(); - let authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::>().join(", "); - let summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() }; - posts.insert(*date, Post{ - uri, - title, - authors, - summary, - }); - }; - }, - Err(err) => { - bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url) - }, - } - }, - rss::Error::Eof => (), - _ => bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url) - } - }; - for (date, post) in posts.iter() { - let post_url: Cow = match source.url_re { - Some(ref x) => sedregex::ReplaceCommand::new(x).stack()?.execute(&post.uri), - None => post.uri.clone().into(), - }; - if let Some(exists) = conn.exists(&post_url, *id).await.stack()? { - if ! exists { - if this_fetch.is_none() || *date > this_fetch.unwrap() { - this_fetch = Some(*date); - }; - self.send( match &source.iv_hash { - Some(hash) => format!(" {post_url}"), - None => format!("{post_url}"), - }, Some(destination), Some(ParseMode::Html)).await.stack()?; - conn.add_post(*id, date, &post_url).await.stack()?; - }; - }; - posted += 1; - }; - posts.clear(); - }; + let token = Token::new(&self.running, id); + if token.is_none() { + bail!("check is already running"); + } + let source = conn.get_source(id, self.owner_chat).await.stack()?; + conn.set_scrape(id).await.stack()?; + let destination = ChatPeerId::from(match real { + true => source.channel_id, + false => source.owner, + }); + let mut this_fetch: Option> = None; + let mut posts: BTreeMap, Post> = BTreeMap::new(); + + let mut builder = self.http_client.get(&source.url); + if let Some(last_scrape) = last_scrape { + builder = builder.header(LAST_MODIFIED, last_scrape.to_rfc2822()); + }; + let response = builder.send().await.stack()?; + { + let headers = response.headers(); + let expires = headers.get(EXPIRES); + let cache = headers.get(CACHE_CONTROL); + if expires.is_some() || cache.is_some() { + println!("{} {} {:?} {:?} {:?}", Local::now().to_rfc2822(), &source.url, last_scrape, expires, cache); + } + } + let status = response.status(); + let content = response.bytes().await.stack()?; + match rss::Channel::read_from(&content[..]) { + Ok(feed) => { + for item in feed.items() { + if let Some(link) = item.link() { + let date = match item.pub_date() { + Some(feed_date) => DateTime::parse_from_rfc2822(feed_date), + None => DateTime::parse_from_rfc3339(match item.dublin_core_ext() { + Some(dates) => &dates.dates()[0], + None => bail!("Feed item misses posting date."), + }), + }.stack()?; + let uri = link.to_string(); + let title = item.title().unwrap_or("").to_string(); + let authors = item.author().unwrap_or("").to_string(); + let summary = item.content().unwrap_or("").to_string(); + posts.insert(date, Post{ + uri, + title, + authors, + summary, + }); + } + }; + }, + Err(err) => match err { + rss::Error::InvalidStartTag => { + match atom_syndication::Feed::read_from(&content[..]) { + Ok(feed) => { + for item in feed.entries() { + let date = item.published().unwrap(); + let uri = item.links()[0].href().to_string(); + let title = item.title().to_string(); + let authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::>().join(", "); + let summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() }; + posts.insert(*date, Post{ + uri, + title, + authors, + summary, + }); + }; + }, + Err(err) => { + bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url) + }, + } + }, + rss::Error::Eof => (), + _ => bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url) + } + }; + for (date, post) in posts.iter() { + let post_url: Cow = match source.url_re { + Some(ref x) => sedregex::ReplaceCommand::new(x).stack()?.execute(&post.uri), + None => post.uri.clone().into(), + }; + if let Some(exists) = conn.exists(&post_url, id).await.stack()? { + if ! exists { + if this_fetch.is_none() || *date > this_fetch.unwrap() { + this_fetch = Some(*date); + }; + self.send( match &source.iv_hash { + Some(hash) => format!(" {post_url}"), + None => format!("{post_url}"), + }, Some(destination), Some(ParseMode::Html)).await.stack()?; + conn.add_post(id, date, &post_url).await.stack()?; + }; + }; + posted += 1; + }; + posts.clear(); Ok(format!("Posted: {posted}")) } async fn autofetch(&self) -> Result { let mut delay = chrono::Duration::minutes(1);