Lines of
src/sql.rs
from check-in fae13a0e55
that are changed by the sequence of edits moving toward
check-in b4af85e31b:
fae13a0e55 2025-06-28 1: use std::borrow::Cow;
2:
3: use anyhow::{
4: Result,
5: bail,
6: };
7: use async_std::sync::{
8: Arc,
9: Mutex,
10: };
11: use chrono::{
12: DateTime,
13: FixedOffset,
14: Local,
15: };
16: use sqlx::{
17: Postgres,
18: Row,
19: postgres::PgPoolOptions,
20: pool::PoolConnection,
21: };
22:
23: #[derive(sqlx::FromRow, Debug)]
24: pub struct List {
25: pub source_id: i32,
26: pub channel: String,
27: pub enabled: bool,
28: pub url: String,
29: pub iv_hash: Option<String>,
30: pub url_re: Option<String>,
31: }
32:
33: #[derive(sqlx::FromRow, Debug)]
34: pub struct Source {
35: pub channel_id: i64,
36: pub url: String,
37: pub iv_hash: Option<String>,
38: pub owner: i64,
39: pub url_re: Option<String>,
40: }
41:
42: #[derive(sqlx::FromRow)]
43: pub struct Queue {
44: pub source_id: Option<i32>,
45: pub next_fetch: Option<DateTime<Local>>,
46: pub owner: Option<i64>,
47: }
48:
49: #[derive(Clone)]
50: pub struct Db {
51: pool: Arc<Mutex<sqlx::Pool<sqlx::Postgres>>>,
52: }
53:
54: pub struct Conn{
55: conn: PoolConnection<Postgres>,
56: }
57:
58: impl Db {
59: pub fn new (pguri: &str) -> Result<Db> {
60: Ok(Db{
61: pool: Arc::new(Mutex::new(PgPoolOptions::new()
62: .max_connections(5)
63: .acquire_timeout(std::time::Duration::new(300, 0))
64: .idle_timeout(std::time::Duration::new(60, 0))
65: .connect_lazy(pguri)?)),
66: })
67: }
68:
69: pub async fn begin(&self) -> Result<Conn> {
70: let pool = self.pool.lock_arc().await;
71: let conn = Conn::new(pool.acquire().await?).await?;
72: Ok(conn)
73: }
74: }
75:
76: impl Conn {
77: pub async fn new (conn: PoolConnection<Postgres>) -> Result<Conn> {
78: Ok(Conn{
79: conn,
80: })
81: }
82:
83: pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
84: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
85: .bind(source_id)
86: .bind(date)
87: .bind(post_url)
88: .execute(&mut *self.conn).await?;
89: Ok(())
90: }
91:
92: pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
93: where I: Into<i64> {
94: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
95: .bind(source_id)
96: .bind(owner.into())
97: .execute(&mut *self.conn).await?.rows_affected() {
98: 0 => { Ok("No data found found.".into()) },
99: x => { Ok(format!("{x} posts purged.").into()) },
100: }
101: }
102:
103: pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
104: where I: Into<i64> {
105: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
106: .bind(source_id)
107: .bind(owner.into())
108: .execute(&mut *self.conn).await?.rows_affected() {
109: 0 => { Ok("No data found found.".into()) },
110: x => { Ok(format!("{} sources removed.", x).into()) },
111: }
112: }
113:
114: pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
115: where I: Into<i64> {
116: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
117: .bind(source_id)
118: .bind(owner.into())
119: .execute(&mut *self.conn).await?.rows_affected() {
120: 1 => { Ok("Source disabled.") },
121: 0 => { Ok("Source not found.") },
122: _ => { bail!("Database error.") },
123: }
124: }
125:
126: pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
127: where I: Into<i64> {
128: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
129: .bind(source_id)
130: .bind(owner.into())
131: .execute(&mut *self.conn).await?.rows_affected() {
132: 1 => { Ok("Source enabled.") },
133: 0 => { Ok("Source not found.") },
134: _ => { bail!("Database error.") },
135: }
136: }
137:
138: pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<Option<bool>>
139: where I: Into<i64> {
140: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
141: .bind(post_url)
142: .bind(id.into())
143: .fetch_one(&mut *self.conn).await?;
144: let exists: Option<bool> = row.try_get("exists")?;
145: Ok(exists)
146: }
147:
148: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
149: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
150: .fetch_all(&mut *self.conn).await?;
151: Ok(block)
152: }
153:
154: pub async fn get_list <I> (&mut self, owner: I) -> Result<Vec<List>>
155: where I: Into<i64> {
156: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
157: .bind(owner.into())
158: .fetch_all(&mut *self.conn).await?;
159: Ok(source)
160: }
161:
162: pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
163: where I: Into<i64> {
164: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
165: .bind(id)
166: .bind(owner.into())
167: .fetch_one(&mut *self.conn).await?;
168: Ok(source)
169: }
170:
171: pub async fn set_scrape <I> (&mut self, id: I) -> Result<()>
172: where I: Into<i64> {
173: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
174: .bind(id.into())
175: .execute(&mut *self.conn).await?;
176: Ok(())
177: }
178:
179: pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str>
180: where I: Into<i64> {
181: match match update {
182: Some(id) => {
183: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
184: .bind(id)
185: },
186: None => {
187: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
188: },
189: }
190: .bind(channel_id)
191: .bind(url)
192: .bind(iv_hash)
193: .bind(owner.into())
194: .bind(channel)
195: .bind(url_re)
196: .execute(&mut *self.conn).await
197: {
198: Ok(_) => Ok(match update {
199: Some(_) => "Channel updated.",
200: None => "Channel added.",
201: }),
202: Err(sqlx::Error::Database(err)) => {
203: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
204: Some("_bt_check_unique", ) => {
205: Ok("Duplicate key.")
206: },
207: Some(_) => {
208: Ok("Database error.")
209: },
210: None => {
211: Ok("No database error extracted.")
212: },
213: }
214: },
215: Err(err) => {
216: bail!("Sorry, unknown error:\n{err:#?}\n");
217: },
218: }
219: }
220: }