1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
+
|
use anyhow::{anyhow, bail, Context, Result};
use atom_syndication;
use chrono::DateTime;
use config;
use reqwest;
use sqlx::{
postgres::PgPoolOptions,
Row,
};
use rss;
use std::{
borrow::Cow,
collections::{
BTreeMap,
HashSet,
},
sync::{Arc, Mutex},
};
use telegram_bot;
|
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
|
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
|
-
-
-
+
+
+
+
-
+
+
+
-
|
Ok(core)
}
pub fn stream(&self) -> telegram_bot::UpdatesStream {
self.tg.stream()
}
pub fn send<S>(&self, msg: S, target: Option<telegram_bot::UserId>, parse_mode: Option<telegram_bot::types::ParseMode>) -> Result<()>
where S: Into<String> {
let msg: String = msg.into();
pub fn send<'a, S>(&self, msg: S, target: Option<telegram_bot::UserId>, parse_mode: Option<telegram_bot::types::ParseMode>) -> Result<()>
where S: Into<Cow<'a, str>> {
let msg = msg.into();
let parse_mode = match parse_mode {
Some(mode) => mode,
None => telegram_bot::types::ParseMode::Html,
};
self.tg.spawn(telegram_bot::SendMessage::new(match target {
Some(user) => user,
None => self.owner_chat,
}, msg.to_owned()).parse_mode(parse_mode));
}, msg).parse_mode(parse_mode));
Ok(())
}
pub async fn check<S>(&self, id: &i32, owner: S, real: bool) -> Result<String>
where S: Into<i64> {
let owner = owner.into();
let mut posted: i32 = 0;
let owner: i64 = owner.into();
let id = {
let mut set = self.sources.lock().unwrap();
match set.get(id) {
Some(id) => id.clone(),
None => {
let id = Arc::new(*id);
set.insert(id.clone());
|
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
|
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
|
-
+
+
-
+
+
-
+
+
-
+
+
-
+
+
|
.execute(&mut conn).await
.with_context(|| format!("Update scrape:\n{:?}", &conn))?;
Ok(format!("Posted: {}", &posted))
}
pub async fn delete<S>(&self, source_id: &i32, owner: S) -> Result<String>
where S: Into<i64> {
let owner: i64 = owner.into();
let owner = owner.into();
let mut conn = self.pool.acquire().await
.with_context(|| format!("Delete fetch conn:\n{:?}", &self.pool))?;
match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
.bind(source_id)
.bind(owner)
.execute(&mut conn).await
.with_context(|| format!("Delete source rule:\n{:?}", &self.pool))?
.rows_affected() {
0 => { Ok("No data found found.".to_string()) },
x => { Ok(format!("{} sources removed.", x)) },
}
}
pub async fn clean<S>(&self, source_id: &i32, owner: S) -> Result<String>
where S: Into<i64> {
let owner: i64 = owner.into();
let owner = owner.into();
let mut conn = self.pool.acquire().await
.with_context(|| format!("Clean fetch conn:\n{:?}", &self.pool))?;
match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
.bind(source_id)
.bind(owner)
.execute(&mut conn).await
.with_context(|| format!("Clean seen posts:\n{:?}", &self.pool))?
.rows_affected() {
0 => { Ok("No data found found.".to_string()) },
x => { Ok(format!("{} posts purged.", x)) },
}
}
pub async fn enable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
where S: Into<i64> {
let owner: i64 = owner.into();
let owner = owner.into();
let mut conn = self.pool.acquire().await
.with_context(|| format!("Enable fetch conn:\n{:?}", &self.pool))?;
match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
.bind(source_id)
.bind(owner)
.execute(&mut conn).await
.with_context(|| format!("Enable source:\n{:?}", &self.pool))?
.rows_affected() {
1 => { Ok("Source enabled.") },
0 => { Ok("Source not found.") },
_ => { Err(anyhow!("Database error.")) },
}
}
pub async fn disable<S>(&self, source_id: &i32, owner: S) -> Result<&str>
where S: Into<i64> {
let owner: i64 = owner.into();
let owner = owner.into();
let mut conn = self.pool.acquire().await
.with_context(|| format!("Disable fetch conn:\n{:?}", &self.pool))?;
match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
.bind(source_id)
.bind(owner)
.execute(&mut conn).await
.with_context(|| format!("Disable source:\n{:?}", &self.pool))?
.rows_affected() {
1 => { Ok("Source disabled.") },
0 => { Ok("Source not found.") },
_ => { Err(anyhow!("Database error.")) },
}
}
pub async fn update<S>(&self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: S) -> Result<String>
where S: Into<i64> {
let owner: i64 = owner.into();
let owner = owner.into();
let mut conn = self.pool.acquire().await
.with_context(|| format!("Update fetch conn:\n{:?}", &self.pool))?;
match match update {
Some(id) => {
sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1").bind(id)
},
|
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
|
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
|
+
-
+
+
-
+
+
+
+
|
delay = chrono::Duration::minutes(1);
}
}
pub async fn list<S>(&self, owner: S) -> Result<String>
where S: Into<i64> {
let owner = owner.into();
let mut reply = vec![];
let mut conn = self.pool.acquire().await
.with_context(|| format!("List fetch conn:\n{:?}", &self.pool))?;
reply.push("Channels:".to_string());
let rows = sqlx::query("select source_id, channel, enabled, url, iv_hash from rsstg_source where owner = $1 order by source_id")
let rows = sqlx::query("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
.bind(owner)
.fetch_all(&mut conn).await?;
for row in rows.iter() {
let source_id: i32 = row.try_get("source_id")?;
let username: &str = row.try_get("channel")?;
let enabled: bool = row.try_get("enabled")?;
let url: &str = row.try_get("url")?;
let iv_hash: Option<&str> = row.try_get("iv_hash")?;
let url_re: Option<&str> = row.try_get("url_re")?;
reply.push(format!("\n\\#ļøā£ {} \\*ļøā£ `{}` {}\nš `{}`", source_id, username,
match enabled {
true => "š enabled",
false => "ā disabled",
}, url));
if let Some(hash) = iv_hash {
reply.push(format!("IV `{}`", hash));
reply.push(format!("IV: `{}`", hash));
}
if let Some(re) = url_re {
reply.push(format!("RE: `{}`", re));
}
};
Ok(reply.join("\n"))
}
}
|