Overview
| Comment: | change how locking works - add separate Token struct |
|---|---|
| Downloads: | Tarball | ZIP archive | SQL archive |
| Timelines: | family | ancestors | descendants | both | trunk |
| Files: | files | file ages | folders |
| SHA3-256: |
d7fbc271ace6202ea3ba569b03c61bc1 |
| User & Date: | arcade on 2026-01-06 08:54:34.412 |
| Other Links: | manifest | tags |
Context
|
2026-01-06
| ||
| 09:32 | add README check-in: e4165e1e85 user: arcade tags: trunk | |
| 08:54 | change how locking works - add separate Token struct check-in: d7fbc271ac user: arcade tags: trunk | |
| 08:53 | bump check-in: 7690cb4cf8 user: arcade tags: trunk | |
Changes
Modified src/core.rs
from [159dd8fde0]
to [a596b3f681].
| ︙ | ︙ | |||
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
pub static ref RE_SPECIAL: Regex = Regex::new(r"([\-_*\[\]()~`>#+|{}\.!])").unwrap();
}
/// Encodes special HTML entities to prevent them interfering with Telegram HTML
pub fn encode (text: &str) -> Cow<'_, str> {
RE_SPECIAL.replace_all(text, "\\$1")
}
#[derive(Clone)]
pub struct Core {
owner_chat: ChatPeerId,
// max_delay: u16,
pub tg: Client,
pub me: Bot,
pub db: Db,
| > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > | | 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
pub static ref RE_SPECIAL: Regex = Regex::new(r"([\-_*\[\]()~`>#+|{}\.!])").unwrap();
}
/// Encodes special HTML entities to prevent them interfering with Telegram HTML
pub fn encode (text: &str) -> Cow<'_, str> {
RE_SPECIAL.replace_all(text, "\\$1")
}
// This one does nothing except making sure only one token exists for each id
pub struct Token {
running: Arc<Mutex<HashSet<i32>>>,
my_id: i32,
}
impl Token {
fn new (running: &Arc<Mutex<HashSet<i32>>>, my_id: i32) -> Option<Token> {
let running = running.clone();
smol::block_on(async {
let mut set = running.lock_arc().await;
if set.contains(&my_id) {
None
} else {
set.insert(my_id);
Some(Token {
running,
my_id,
})
}
})
}
}
impl Drop for Token {
fn drop (&mut self) {
smol::block_on(async {
let mut set = self.running.lock_arc().await;
set.remove(&self.my_id);
})
}
}
#[derive(Clone)]
pub struct Core {
owner_chat: ChatPeerId,
// max_delay: u16,
pub tg: Client,
pub me: Bot,
pub db: Db,
running: Arc<Mutex<HashSet<i32>>>,
http_client: reqwest::Client,
}
pub struct Post {
uri: String,
title: String,
authors: String,
|
| ︙ | ︙ | |||
93 94 95 96 97 98 99 |
let http_client = client.build().stack()?;
let me = tg.execute(GetBot).await.stack()?;
let core = Core {
tg,
me,
owner_chat,
db: Db::new(&settings.get_string("pg").stack()?)?,
| | | 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
let http_client = client.build().stack()?;
let me = tg.execute(GetBot).await.stack()?;
let core = Core {
tg,
me,
owner_chat,
db: Db::new(&settings.get_string("pg").stack()?)?,
running: Arc::new(Mutex::new(HashSet::new())),
http_client,
// max_delay: 60,
};
let clone = core.clone();
smol::spawn(Compat::new(async move {
loop {
let delay = match &clone.autofetch().await {
|
| ︙ | ︙ | |||
131 132 133 134 135 136 137 |
).await.stack()
}
pub async fn check (&self, id: i32, real: bool, last_scrape: Option<DateTime<Local>>) -> Result<String> {
let mut posted: i32 = 0;
let mut conn = self.db.begin().await.stack()?;
| < < < < < | < | < > | < < < | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | > > > | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | | < | 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 |
).await.stack()
}
pub async fn check (&self, id: i32, real: bool, last_scrape: Option<DateTime<Local>>) -> Result<String> {
let mut posted: i32 = 0;
let mut conn = self.db.begin().await.stack()?;
let token = Token::new(&self.running, id);
if token.is_none() {
bail!("check is already running");
}
let source = conn.get_source(id, self.owner_chat).await.stack()?;
conn.set_scrape(id).await.stack()?;
let destination = ChatPeerId::from(match real {
true => source.channel_id,
false => source.owner,
});
let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, Post> = BTreeMap::new();
let mut builder = self.http_client.get(&source.url);
if let Some(last_scrape) = last_scrape {
builder = builder.header(LAST_MODIFIED, last_scrape.to_rfc2822());
};
let response = builder.send().await.stack()?;
{
let headers = response.headers();
let expires = headers.get(EXPIRES);
let cache = headers.get(CACHE_CONTROL);
if expires.is_some() || cache.is_some() {
println!("{} {} {:?} {:?} {:?}", Local::now().to_rfc2822(), &source.url, last_scrape, expires, cache);
}
}
let status = response.status();
let content = response.bytes().await.stack()?;
match rss::Channel::read_from(&content[..]) {
Ok(feed) => {
for item in feed.items() {
if let Some(link) = item.link() {
let date = match item.pub_date() {
Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
None => DateTime::parse_from_rfc3339(match item.dublin_core_ext() {
Some(dates) => &dates.dates()[0],
None => bail!("Feed item misses posting date."),
}),
}.stack()?;
let uri = link.to_string();
let title = item.title().unwrap_or("").to_string();
let authors = item.author().unwrap_or("").to_string();
let summary = item.content().unwrap_or("").to_string();
posts.insert(date, Post{
uri,
title,
authors,
summary,
});
}
};
},
Err(err) => match err {
rss::Error::InvalidStartTag => {
match atom_syndication::Feed::read_from(&content[..]) {
Ok(feed) => {
for item in feed.entries() {
let date = item.published().unwrap();
let uri = item.links()[0].href().to_string();
let title = item.title().to_string();
let authors = item.authors().iter().map(|x| format!("{} <{:?}>", x.name(), x.email())).collect::<Vec<String>>().join(", ");
let summary = if let Some(sum) = item.summary() { sum.value.clone() } else { String::new() };
posts.insert(*date, Post{
uri,
title,
authors,
summary,
});
};
},
Err(err) => {
bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url)
},
}
},
rss::Error::Eof => (),
_ => bail!("Unsupported or mangled content:\n{:?}\n{err}\n{status:#?}\n", &source.url)
}
};
for (date, post) in posts.iter() {
let post_url: Cow<str> = match source.url_re {
Some(ref x) => sedregex::ReplaceCommand::new(x).stack()?.execute(&post.uri),
None => post.uri.clone().into(),
};
if let Some(exists) = conn.exists(&post_url, id).await.stack()? {
if ! exists {
if this_fetch.is_none() || *date > this_fetch.unwrap() {
this_fetch = Some(*date);
};
self.send( match &source.iv_hash {
Some(hash) => format!("<a href=\"https://t.me/iv?url={post_url}&rhash={hash}\"> </a>{post_url}"),
None => format!("{post_url}"),
}, Some(destination), Some(ParseMode::Html)).await.stack()?;
conn.add_post(id, date, &post_url).await.stack()?;
};
};
posted += 1;
};
posts.clear();
Ok(format!("Posted: {posted}"))
}
async fn autofetch(&self) -> Result<std::time::Duration> {
let mut delay = chrono::Duration::minutes(1);
let now = chrono::Local::now();
let queue = {
|
| ︙ | ︙ |