Lines of
src/sql.rs
from check-in 9adc69d514
that are changed by the sequence of edits moving toward
check-in 374eadef45:
1: use crate::{
2: Arc,
3: Mutex,
4: };
5:
6: use std::{
7: borrow::Cow,
8: fmt,
9: };
10:
11: use chrono::{
12: DateTime,
13: FixedOffset,
14: Local,
15: };
16: use sqlx::{
17: Postgres,
18: Row,
19: postgres::PgPoolOptions,
20: pool::PoolConnection,
21: };
22: use stacked_errors::{
23: Result,
24: StackableErr,
25: bail,
26: };
27:
28: #[derive(sqlx::FromRow, Debug)]
29: pub struct List {
30: pub source_id: i32,
31: pub channel: String,
32: pub enabled: bool,
33: pub url: String,
34: pub iv_hash: Option<String>,
35: pub url_re: Option<String>,
36: }
37:
38: impl fmt::Display for List {
39: fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
40: write!(f, "\\#feed\\_{} \\*ļøā£ `{}` {}\nš `{}`", self.source_id, self.channel,
41: match self.enabled {
42: true => "š enabled",
43: false => "ā disabled",
44: }, self.url)?;
45: if let Some(iv_hash) = &self.iv_hash {
46: write!(f, "\nIV: `{iv_hash}`")?;
47: }
48: if let Some(url_re) = &self.url_re {
49: write!(f, "\nRE: `{url_re}`")?;
50: }
51: Ok(())
52: }
53: }
54:
55: /// One feed, used for caching and menu navigation
56: #[derive(sqlx::FromRow, Debug)]
57: pub struct Feed {
58: pub source_id: i32,
59: pub channel: String,
60: }
61:
62: #[derive(sqlx::FromRow, Debug)]
63: pub struct Source {
64: pub channel_id: i64,
65: pub url: String,
66: pub iv_hash: Option<String>,
67: pub owner: i64,
68: pub url_re: Option<String>,
69: }
70:
71: #[derive(sqlx::FromRow)]
72: pub struct Queue {
73: pub source_id: Option<i32>,
74: pub next_fetch: Option<DateTime<Local>>,
75: pub owner: Option<i64>,
76: pub last_scrape: DateTime<Local>,
77: }
78:
79: #[derive(Clone)]
80: pub struct Db (
81: Arc<Mutex<sqlx::Pool<sqlx::Postgres>>>,
82: );
83:
84: impl Db {
85: pub fn new (pguri: &str) -> Result<Db> {
86: Ok(Db (
87: Arc::new(Mutex::new(PgPoolOptions::new()
88: .max_connections(5)
89: .acquire_timeout(std::time::Duration::new(300, 0))
90: .idle_timeout(std::time::Duration::new(60, 0))
91: .connect_lazy(pguri)
92: .stack()?)),
93: ))
94: }
95:
96: pub async fn begin(&self) -> Result<Conn> {
97: let pool = self.0.lock_arc().await;
98: let conn = Conn ( pool.acquire().await.stack()? );
99: Ok(conn)
100: }
101: }
102:
103: pub struct Conn (
104: PoolConnection<Postgres>,
105: );
106:
107: impl Conn {
108: pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
109: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
110: .bind(source_id)
111: .bind(date)
112: .bind(post_url)
113: .execute(&mut *self.0).await.stack()?;
114: Ok(())
115: }
116:
117: pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
118: where I: Into<i64> {
119: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
120: .bind(source_id)
121: .bind(owner.into())
122: .execute(&mut *self.0).await.stack()?.rows_affected() {
123: 0 => { Ok("No data found found.".into()) },
124: x => { Ok(format!("{x} posts purged.").into()) },
125: }
126: }
127:
128: pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
129: where I: Into<i64> {
130: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
131: .bind(source_id)
132: .bind(owner.into())
133: .execute(&mut *self.0).await.stack()?.rows_affected() {
134: 0 => { Ok("No data found found.".into()) },
135: x => { Ok(format!("{x} sources removed.").into()) },
136: }
137: }
138:
139: pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
140: where I: Into<i64> {
141: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
142: .bind(source_id)
143: .bind(owner.into())
144: .execute(&mut *self.0).await.stack()?.rows_affected() {
145: 1 => { Ok("Source disabled.") },
146: 0 => { Ok("Source not found.") },
147: _ => { bail!("Database error.") },
148: }
149: }
150:
151: pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
152: where I: Into<i64> {
153: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
154: .bind(source_id)
155: .bind(owner.into())
156: .execute(&mut *self.0).await.stack()?.rows_affected() {
157: 1 => { Ok("Source enabled.") },
158: 0 => { Ok("Source not found.") },
159: _ => { bail!("Database error.") },
160: }
161: }
162:
163: /// Checks whether a post with the given URL exists for the specified source.
164: ///
165: /// # Parameters
166: /// - `post_url`: The URL of the post to check.
167: /// - `id`: The source identifier (converted to `i64`).
168: ///
169: /// # Returns
170: /// `true` if a post with the URL exists for the source, `false` otherwise.
171: pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<bool>
172: where I: Into<i64> {
173: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
174: .bind(post_url)
175: .bind(id.into())
176: .fetch_one(&mut *self.0).await.stack()?;
177: row.try_get("exists")
178: .stack_err("Database error: can't check whether post exists.")
179: }
180:
181: pub async fn get_feeds <I>(&mut self, owner: I) -> Result<Vec<Feed>>
182: where I: Into<i64> {
183: let block: Vec<Feed> = sqlx::query_as("select source_id, channel from rsstg_source where owner = $1 order by source_id")
184: .bind(owner.into())
185: .fetch_all(&mut *self.0).await.stack()?;
186: Ok(block)
187: }
188:
189: /// Get all pending events for (now + 1 minute)
190: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
191: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner, last_scrape from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
192: .fetch_all(&mut *self.0).await.stack()?;
193: Ok(block)
194: }
195:
196: pub async fn get_list <I>(&mut self, owner: I) -> Result<Vec<List>>
197: where I: Into<i64> {
198: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
199: .bind(owner.into())
200: .fetch_all(&mut *self.0).await.stack()?;
201: Ok(source)
202: }
203:
204: pub async fn get_one <I> (&mut self, owner: I, id: i32) -> Result<Option<List>>
205: where I: Into<i64> {
206: let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and source_id = $2")
207: .bind(owner.into())
208: .bind(id)
209: .fetch_optional(&mut *self.0).await.stack()?;
210: Ok(source)
211: }
212:
213: pub async fn get_one_name <I> (&mut self, owner: I, name: &str) -> Result<Option<List>>
214: where I: Into<i64> {
215: let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and channel = $2")
216: .bind(owner.into())
217: .bind(name)
218: .fetch_optional(&mut *self.0).await.stack()?;
219: Ok(source)
220: }
221:
222: pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
223: where I: Into<i64> {
224: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
225: .bind(id)
226: .bind(owner.into())
227: .fetch_one(&mut *self.0).await.stack()?;
228: Ok(source)
229: }
230:
231: pub async fn set_scrape <I> (&mut self, id: I) -> Result<()>
232: where I: Into<i64> {
233: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
234: .bind(id.into())
235: .execute(&mut *self.0).await.stack()?;
236: Ok(())
237: }
238:
239: pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str>
240: where I: Into<i64> {
241: match match update {
242: Some(id) => {
243: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
244: .bind(id)
245: },
246: None => {
247: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
248: },
249: }
250: .bind(channel_id)
251: .bind(url)
252: .bind(iv_hash)
253: .bind(owner.into())
254: .bind(channel)
255: .bind(url_re)
256: .execute(&mut *self.0).await
257: {
258: Ok(_) => Ok(match update {
259: Some(_) => "Channel updated.",
260: None => "Channel added.",
261: }),
262: Err(sqlx::Error::Database(err)) => {
263: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
264: Some("_bt_check_unique", ) => {
265: Ok("Duplicate key.")
266: },
267: Some(_) => {
268: Ok("Database error.")
269: },
270: None => {
271: Ok("No database error extracted.")
272: },
273: }
274: },
275: Err(err) => {
276: bail!("Sorry, unknown error:\n{err:#?}\n");
277: },
278: }
279: }
280: }