Diff
Logged in as anonymous

Differences From Artifact [8391fce4ec]:

To Artifact [c856c2cf77]:


1
2
3
4
5
6

7
8

9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

21
22
23
24
25
26
27






+


+










-







use std::collections::{BTreeMap, HashSet};
use std::sync::{Arc, Mutex};

use config;

use tokio;
use reqwest;

use rss;
use atom_syndication;

use chrono::DateTime;

use regex::Regex;

use telegram_bot::*;
use tokio::stream::StreamExt;

use sqlx::postgres::PgPoolOptions;
use sqlx::Row;
//use sqlx::Done; // .rows_affected()

#[macro_use]
extern crate lazy_static;

use anyhow::{anyhow, bail, Context, Result};

#[derive(Clone)]
104
105
106
107
108
109
110





111

112
113
114
115
116
117
118
119


























120
121
122
123
124
125
126
105
106
107
108
109
110
111
112
113
114
115
116

117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158







+
+
+
+
+
-
+








+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+







				true => UserId::new(channel_id),
				false => UserId::new(row.try_get("owner")?),
			};
			let url: &str = row.try_get("url")?;
			let mut this_fetch: Option<DateTime<chrono::FixedOffset>> = None;
			let iv_hash: Option<&str> = row.try_get("iv_hash")?;
			let mut posts: BTreeMap<DateTime<chrono::FixedOffset>, String> = BTreeMap::new();
			let content = reqwest::get(url).await?.bytes().await?;
			//let mut content_ = surf::get(url).await.map_err(|err| anyhow!(err))?;
			//eprintln!("Data: {:#?}", &content_);
			//let content = content_.body_bytes().await.map_err(|err| anyhow!(err))?;
			/*
			let feed = rss::Channel::from_url(url)
			let feed = rss::Channel::read_from(&content[..])
				.with_context(|| format!("Problem opening feed url:\n{}", &url))?;
			for item in feed.items() {
				let date = match item.pub_date() {
					Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
					None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
				}?;
				let url = item.link().unwrap().to_string();
				posts.insert(date.clone(), url.clone());
			};
			*/
			match rss::Channel::read_from(&content[..]) {
				Ok(feed) => {
					for item in feed.items() {
						let date = match item.pub_date() {
							Some(feed_date) => DateTime::parse_from_rfc2822(feed_date),
							None => DateTime::parse_from_rfc3339(&item.dublin_core_ext().unwrap().dates()[0]),
						}?;
						let url = item.link().unwrap().to_string();
						posts.insert(date.clone(), url.clone());
					};
				},
				Err(err) => match err {
					rss::Error::InvalidStartTag => {
						let feed = atom_syndication::Feed::read_from(&content[..])
							.with_context(|| format!("Problem opening feed url:\n{}", &url))?;
						for item in feed.entries() {
							let date = item.published().unwrap();
							let url = item.links()[0].href();
							posts.insert(date.clone(), url.to_string());
						};
					},
					rss::Error::Eof => (),
					_ => bail!("Unsupported or mangled content:\n{:#?}\n", err)
				}
			};
			for (date, url) in posts.iter() {
				let mut conn = self.pool.acquire().await
					.with_context(|| format!("Check post fetch conn:\n{:?}", &self.pool))?;
				let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
					.bind(&url)
					.bind(*id)