Lines of
src/sql.rs
from check-in fabcca1eaf
that are changed by the sequence of edits moving toward
check-in 9adc69d514:
1: use std::{
2: borrow::Cow,
3: fmt,
fabcca1eaf 2026-01-09 4: sync::Arc,
5: };
6:
fabcca1eaf 2026-01-09 7: use smol::lock::Mutex;
8: use chrono::{
9: DateTime,
10: FixedOffset,
11: Local,
12: };
13: use sqlx::{
14: Postgres,
15: Row,
16: postgres::PgPoolOptions,
17: pool::PoolConnection,
18: };
19: use stacked_errors::{
20: Result,
21: StackableErr,
22: bail,
23: };
24:
25: #[derive(sqlx::FromRow, Debug)]
26: pub struct List {
27: pub source_id: i32,
28: pub channel: String,
29: pub enabled: bool,
30: pub url: String,
31: pub iv_hash: Option<String>,
32: pub url_re: Option<String>,
33: }
34:
35: impl fmt::Display for List {
36: fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
37: write!(f, "\\#feed\\_{} \\*ļøā£ `{}` {}\nš `{}`", self.source_id, self.channel,
38: match self.enabled {
39: true => "š enabled",
40: false => "ā disabled",
41: }, self.url)?;
42: if let Some(iv_hash) = &self.iv_hash {
43: write!(f, "\nIV: `{iv_hash}`")?;
44: }
45: if let Some(url_re) = &self.url_re {
46: write!(f, "\nRE: `{url_re}`")?;
47: }
48: Ok(())
49: }
50: }
51:
52: #[derive(sqlx::FromRow, Debug)]
53: pub struct Source {
54: pub channel_id: i64,
55: pub url: String,
56: pub iv_hash: Option<String>,
57: pub owner: i64,
58: pub url_re: Option<String>,
59: }
60:
61: #[derive(sqlx::FromRow)]
62: pub struct Queue {
63: pub source_id: Option<i32>,
64: pub next_fetch: Option<DateTime<Local>>,
65: pub owner: Option<i64>,
66: pub last_scrape: DateTime<Local>,
67: }
68:
69: #[derive(Clone)]
70: pub struct Db (
71: Arc<Mutex<sqlx::Pool<sqlx::Postgres>>>,
72: );
73:
74: impl Db {
75: pub fn new (pguri: &str) -> Result<Db> {
76: Ok(Db (
77: Arc::new(Mutex::new(PgPoolOptions::new()
78: .max_connections(5)
79: .acquire_timeout(std::time::Duration::new(300, 0))
80: .idle_timeout(std::time::Duration::new(60, 0))
81: .connect_lazy(pguri)
82: .stack()?)),
83: ))
84: }
85:
86: pub async fn begin(&self) -> Result<Conn> {
87: let pool = self.0.lock_arc().await;
88: let conn = Conn ( pool.acquire().await.stack()? );
89: Ok(conn)
90: }
91: }
92:
93: pub struct Conn (
94: PoolConnection<Postgres>,
95: );
96:
97: impl Conn {
98: pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
99: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
100: .bind(source_id)
101: .bind(date)
102: .bind(post_url)
103: .execute(&mut *self.0).await.stack()?;
104: Ok(())
105: }
106:
107: pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
108: where I: Into<i64> {
109: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
110: .bind(source_id)
111: .bind(owner.into())
112: .execute(&mut *self.0).await.stack()?.rows_affected() {
113: 0 => { Ok("No data found found.".into()) },
114: x => { Ok(format!("{x} posts purged.").into()) },
115: }
116: }
117:
118: pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
119: where I: Into<i64> {
120: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
121: .bind(source_id)
122: .bind(owner.into())
123: .execute(&mut *self.0).await.stack()?.rows_affected() {
124: 0 => { Ok("No data found found.".into()) },
125: x => { Ok(format!("{x} sources removed.").into()) },
126: }
127: }
128:
129: pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
130: where I: Into<i64> {
131: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
132: .bind(source_id)
133: .bind(owner.into())
134: .execute(&mut *self.0).await.stack()?.rows_affected() {
135: 1 => { Ok("Source disabled.") },
136: 0 => { Ok("Source not found.") },
137: _ => { bail!("Database error.") },
138: }
139: }
140:
141: pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
142: where I: Into<i64> {
143: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
144: .bind(source_id)
145: .bind(owner.into())
146: .execute(&mut *self.0).await.stack()?.rows_affected() {
147: 1 => { Ok("Source enabled.") },
148: 0 => { Ok("Source not found.") },
149: _ => { bail!("Database error.") },
150: }
151: }
152:
153: /// Checks whether a post with the given URL exists for the specified source.
154: ///
155: /// # Parameters
156: /// - `post_url`: The URL of the post to check.
157: /// - `id`: The source identifier (converted to `i64`).
158: ///
159: /// # Returns
160: /// `true` if a post with the URL exists for the source, `false` otherwise.
161: pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<bool>
162: where I: Into<i64> {
163: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
164: .bind(post_url)
165: .bind(id.into())
166: .fetch_one(&mut *self.0).await.stack()?;
167: row.try_get("exists")
168: .stack_err("Database error: can't check whether post exists.")
169: }
170:
171: /// Get all pending events for (now + 1 minute)
172: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
173: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner, last_scrape from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
174: .fetch_all(&mut *self.0).await.stack()?;
175: Ok(block)
176: }
177:
fabcca1eaf 2026-01-09 178: pub async fn get_list <I> (&mut self, owner: I) -> Result<Vec<List>>
179: where I: Into<i64> {
180: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
181: .bind(owner.into())
182: .fetch_all(&mut *self.0).await.stack()?;
183: Ok(source)
184: }
185:
186: pub async fn get_one <I> (&mut self, owner: I, id: i32) -> Result<Option<List>>
187: where I: Into<i64> {
188: let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and source_id = $2")
189: .bind(owner.into())
190: .bind(id)
191: .fetch_optional(&mut *self.0).await.stack()?;
192: Ok(source)
193: }
194:
195: pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
196: where I: Into<i64> {
197: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
198: .bind(id)
199: .bind(owner.into())
200: .fetch_one(&mut *self.0).await.stack()?;
201: Ok(source)
202: }
203:
204: pub async fn set_scrape <I> (&mut self, id: I) -> Result<()>
205: where I: Into<i64> {
206: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
207: .bind(id.into())
208: .execute(&mut *self.0).await.stack()?;
209: Ok(())
210: }
211:
212: pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str>
213: where I: Into<i64> {
214: match match update {
215: Some(id) => {
216: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
217: .bind(id)
218: },
219: None => {
220: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
221: },
222: }
223: .bind(channel_id)
224: .bind(url)
225: .bind(iv_hash)
226: .bind(owner.into())
227: .bind(channel)
228: .bind(url_re)
229: .execute(&mut *self.0).await
fabcca1eaf 2026-01-09 230: {
231: Ok(_) => Ok(match update {
232: Some(_) => "Channel updated.",
233: None => "Channel added.",
234: }),
235: Err(sqlx::Error::Database(err)) => {
236: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
237: Some("_bt_check_unique", ) => {
238: Ok("Duplicate key.")
239: },
240: Some(_) => {
241: Ok("Database error.")
242: },
243: None => {
244: Ok("No database error extracted.")
245: },
246: }
247: },
248: Err(err) => {
249: bail!("Sorry, unknown error:\n{err:#?}\n");
250: },
251: }
252: }
253: }