Lines of
src/sql.rs
from check-in bb89b6fab8
that are changed by the sequence of edits moving toward
check-in fae13a0e55:
bb89b6fab8 2025-06-15 1: use std::{
bb89b6fab8 2025-06-15 2: borrow::Cow,
bb89b6fab8 2025-06-15 3: sync::{
bb89b6fab8 2025-06-15 4: Arc,
bb89b6fab8 2025-06-15 5: Mutex,
bb89b6fab8 2025-06-15 6: },
bb89b6fab8 2025-06-15 7: };
8:
9: use anyhow::{
10: Result,
11: bail,
12: };
13: use chrono::{
14: DateTime,
15: FixedOffset,
16: Local,
17: };
18: use sqlx::{
19: Postgres,
20: Row,
21: postgres::PgPoolOptions,
22: pool::PoolConnection,
23: };
24:
25: #[derive(sqlx::FromRow, Debug)]
26: pub struct List {
27: pub source_id: i32,
28: pub channel: String,
29: pub enabled: bool,
30: pub url: String,
31: pub iv_hash: Option<String>,
32: pub url_re: Option<String>,
33: }
34:
35: #[derive(sqlx::FromRow, Debug)]
36: pub struct Source {
37: pub channel_id: i64,
38: pub url: String,
39: pub iv_hash: Option<String>,
40: pub owner: i64,
41: pub url_re: Option<String>,
42: }
43:
44: #[derive(sqlx::FromRow)]
45: pub struct Queue {
46: pub source_id: Option<i32>,
47: pub next_fetch: Option<DateTime<Local>>,
48: pub owner: Option<i64>,
49: }
50:
51: #[derive(Clone)]
52: pub struct Db {
bb89b6fab8 2025-06-15 53: pool: Arc<Mutex<Arc<sqlx::Pool<sqlx::Postgres>>>>,
54: }
55:
56: pub struct Conn{
57: conn: PoolConnection<Postgres>,
58: }
59:
60: impl Db {
61: pub fn new (pguri: &str) -> Result<Db> {
62: Ok(Db{
bb89b6fab8 2025-06-15 63: pool: Arc::new(Mutex::new(Arc::new(PgPoolOptions::new()
64: .max_connections(5)
65: .acquire_timeout(std::time::Duration::new(300, 0))
66: .idle_timeout(std::time::Duration::new(60, 0))
bb89b6fab8 2025-06-15 67: .connect_lazy(pguri)?))),
68: })
69: }
70:
71: pub async fn begin(&self) -> Result<Conn> {
bb89b6fab8 2025-06-15 72: let pool = self.pool.lock().unwrap().clone();
73: let conn = Conn::new(pool.acquire().await?).await?;
74: Ok(conn)
75: }
76: }
77:
78: impl Conn {
79: pub async fn new (conn: PoolConnection<Postgres>) -> Result<Conn> {
80: Ok(Conn{
81: conn,
82: })
83: }
84:
85: pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
86: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
87: .bind(source_id)
88: .bind(date)
89: .bind(post_url)
90: .execute(&mut *self.conn).await?;
91: Ok(())
92: }
93:
94: pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
95: where I: Into<i64> {
96: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
97: .bind(source_id)
98: .bind(owner.into())
99: .execute(&mut *self.conn).await?.rows_affected() {
100: 0 => { Ok("No data found found.".into()) },
101: x => { Ok(format!("{x} posts purged.").into()) },
102: }
103: }
104:
105: pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
106: where I: Into<i64> {
107: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
108: .bind(source_id)
109: .bind(owner.into())
110: .execute(&mut *self.conn).await?.rows_affected() {
111: 0 => { Ok("No data found found.".into()) },
112: x => { Ok(format!("{} sources removed.", x).into()) },
113: }
114: }
115:
116: pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
117: where I: Into<i64> {
118: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
119: .bind(source_id)
120: .bind(owner.into())
121: .execute(&mut *self.conn).await?.rows_affected() {
122: 1 => { Ok("Source disabled.") },
123: 0 => { Ok("Source not found.") },
124: _ => { bail!("Database error.") },
125: }
126: }
127:
128: pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
129: where I: Into<i64> {
130: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
131: .bind(source_id)
132: .bind(owner.into())
133: .execute(&mut *self.conn).await?.rows_affected() {
134: 1 => { Ok("Source enabled.") },
135: 0 => { Ok("Source not found.") },
136: _ => { bail!("Database error.") },
137: }
138: }
139:
140: pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<Option<bool>>
141: where I: Into<i64> {
142: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
143: .bind(post_url)
144: .bind(id.into())
145: .fetch_one(&mut *self.conn).await?;
146: let exists: Option<bool> = row.try_get("exists")?;
147: Ok(exists)
148: }
149:
150: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
151: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
152: .fetch_all(&mut *self.conn).await?;
153: Ok(block)
154: }
155:
156: pub async fn get_list <I> (&mut self, owner: I) -> Result<Vec<List>>
157: where I: Into<i64> {
158: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
159: .bind(owner.into())
160: .fetch_all(&mut *self.conn).await?;
161: Ok(source)
162: }
163:
164: pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
165: where I: Into<i64> {
166: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
167: .bind(id)
168: .bind(owner.into())
169: .fetch_one(&mut *self.conn).await?;
170: Ok(source)
171: }
172:
173: pub async fn set_scrape <I> (&mut self, id: I) -> Result<()>
174: where I: Into<i64> {
175: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
176: .bind(id.into())
177: .execute(&mut *self.conn).await?;
178: Ok(())
179: }
180:
181: pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str>
182: where I: Into<i64> {
183: match match update {
184: Some(id) => {
185: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
186: .bind(id)
187: },
188: None => {
189: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
190: },
191: }
192: .bind(channel_id)
193: .bind(url)
194: .bind(iv_hash)
195: .bind(owner.into())
196: .bind(channel)
197: .bind(url_re)
198: .execute(&mut *self.conn).await
199: {
200: Ok(_) => Ok(match update {
201: Some(_) => "Channel updated.",
202: None => "Channel added.",
203: }),
204: Err(sqlx::Error::Database(err)) => {
205: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
206: Some("_bt_check_unique", ) => {
207: Ok("Duplicate key.")
208: },
209: Some(_) => {
210: Ok("Database error.")
211: },
212: None => {
213: Ok("No database error extracted.")
214: },
215: }
216: },
217: Err(err) => {
218: bail!("Sorry, unknown error:\n{err:#?}\n");
219: },
220: }
221: }
222: }