Lines of
src/sql.rs
from check-in 42b29b744b
that are changed by the sequence of edits moving toward
check-in dc7c43b010:
1: use std::{
2: borrow::Cow,
3: fmt,
4: };
5:
42b29b744b 2026-01-01 6: use async_std::sync::{
42b29b744b 2026-01-01 7: Arc,
42b29b744b 2026-01-01 8: Mutex,
42b29b744b 2026-01-01 9: };
10: use chrono::{
11: DateTime,
12: FixedOffset,
13: Local,
14: };
15: use sqlx::{
16: Postgres,
17: Row,
18: postgres::PgPoolOptions,
19: pool::PoolConnection,
20: };
21: use stacked_errors::{
22: Result,
23: StackableErr,
24: bail,
25: };
26:
27: #[derive(sqlx::FromRow, Debug)]
28: pub struct List {
29: pub source_id: i32,
30: pub channel: String,
31: pub enabled: bool,
32: pub url: String,
33: pub iv_hash: Option<String>,
34: pub url_re: Option<String>,
35: }
36:
37: impl fmt::Display for List {
38: fn fmt(&self, f: &mut fmt::Formatter<'_>) -> std::result::Result<(), fmt::Error> {
39: write!(f, "\\#feed\\_{} \\*ļøā£ `{}` {}\nš `{}`", self.source_id, self.channel,
40: match self.enabled {
41: true => "š enabled",
42: false => "ā disabled",
43: }, self.url)?;
44: if let Some(iv_hash) = &self.iv_hash {
45: write!(f, "\nIV: `{iv_hash}`")?;
46: }
47: if let Some(url_re) = &self.url_re {
48: write!(f, "\nRE: `{url_re}`")?;
49: }
50: Ok(())
51: }
52: }
53:
54: #[derive(sqlx::FromRow, Debug)]
55: pub struct Source {
56: pub channel_id: i64,
57: pub url: String,
58: pub iv_hash: Option<String>,
59: pub owner: i64,
60: pub url_re: Option<String>,
61: }
62:
63: #[derive(sqlx::FromRow)]
64: pub struct Queue {
65: pub source_id: Option<i32>,
66: pub next_fetch: Option<DateTime<Local>>,
67: pub owner: Option<i64>,
68: pub last_scrape: DateTime<Local>,
69: }
70:
71: #[derive(Clone)]
72: pub struct Db (
73: Arc<Mutex<sqlx::Pool<sqlx::Postgres>>>,
74: );
75:
76: impl Db {
77: pub fn new (pguri: &str) -> Result<Db> {
78: Ok(Db (
79: Arc::new(Mutex::new(PgPoolOptions::new()
80: .max_connections(5)
81: .acquire_timeout(std::time::Duration::new(300, 0))
82: .idle_timeout(std::time::Duration::new(60, 0))
83: .connect_lazy(pguri)
84: .stack()?)),
85: ))
86: }
87:
88: pub async fn begin(&self) -> Result<Conn> {
89: let pool = self.0.lock_arc().await;
90: let conn = Conn ( pool.acquire().await.stack()? );
91: Ok(conn)
92: }
93: }
94:
95: pub struct Conn (
96: PoolConnection<Postgres>,
97: );
98:
99: impl Conn {
100: pub async fn add_post (&mut self, source_id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
101: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
102: .bind(source_id)
103: .bind(date)
104: .bind(post_url)
105: .execute(&mut *self.0).await.stack()?;
106: Ok(())
107: }
108:
109: pub async fn clean <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
110: where I: Into<i64> {
111: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
112: .bind(source_id)
113: .bind(owner.into())
114: .execute(&mut *self.0).await.stack()?.rows_affected() {
115: 0 => { Ok("No data found found.".into()) },
116: x => { Ok(format!("{x} posts purged.").into()) },
117: }
118: }
119:
120: pub async fn delete <I> (&mut self, source_id: i32, owner: I) -> Result<Cow<'_, str>>
121: where I: Into<i64> {
122: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
123: .bind(source_id)
124: .bind(owner.into())
125: .execute(&mut *self.0).await.stack()?.rows_affected() {
126: 0 => { Ok("No data found found.".into()) },
127: x => { Ok(format!("{x} sources removed.").into()) },
128: }
129: }
130:
131: pub async fn disable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
132: where I: Into<i64> {
133: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
134: .bind(source_id)
135: .bind(owner.into())
136: .execute(&mut *self.0).await.stack()?.rows_affected() {
137: 1 => { Ok("Source disabled.") },
138: 0 => { Ok("Source not found.") },
139: _ => { bail!("Database error.") },
140: }
141: }
142:
143: pub async fn enable <I> (&mut self, source_id: i32, owner: I) -> Result<&str>
144: where I: Into<i64> {
145: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
146: .bind(source_id)
147: .bind(owner.into())
148: .execute(&mut *self.0).await.stack()?.rows_affected() {
149: 1 => { Ok("Source enabled.") },
150: 0 => { Ok("Source not found.") },
151: _ => { bail!("Database error.") },
152: }
153: }
154:
155: pub async fn exists <I> (&mut self, post_url: &str, id: I) -> Result<Option<bool>>
156: where I: Into<i64> {
157: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
158: .bind(post_url)
159: .bind(id.into())
160: .fetch_one(&mut *self.0).await.stack()?;
161: let exists: Option<bool> = row.try_get("exists").stack()?;
162: Ok(exists)
163: }
164:
165: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
166: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner, last_scrape from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
167: .fetch_all(&mut *self.0).await.stack()?;
168: Ok(block)
169: }
170:
171: pub async fn get_list <I> (&mut self, owner: I) -> Result<Vec<List>>
172: where I: Into<i64> {
173: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
174: .bind(owner.into())
175: .fetch_all(&mut *self.0).await.stack()?;
176: Ok(source)
177: }
178:
179: pub async fn get_one <I> (&mut self, owner: I, id: i32) -> Result<Option<List>>
180: where I: Into<i64> {
181: let source: Option<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 and source_id = $2")
182: .bind(owner.into())
183: .bind(id)
184: .fetch_optional(&mut *self.0).await.stack()?;
185: Ok(source)
186: }
187:
188: pub async fn get_source <I> (&mut self, id: i32, owner: I) -> Result<Source>
189: where I: Into<i64> {
190: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
191: .bind(id)
192: .bind(owner.into())
193: .fetch_one(&mut *self.0).await.stack()?;
194: Ok(source)
195: }
196:
197: pub async fn set_scrape <I> (&mut self, id: I) -> Result<()>
198: where I: Into<i64> {
199: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
200: .bind(id.into())
201: .execute(&mut *self.0).await.stack()?;
202: Ok(())
203: }
204:
205: pub async fn update <I> (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: I) -> Result<&str>
206: where I: Into<i64> {
207: match match update {
208: Some(id) => {
209: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
210: .bind(id)
211: },
212: None => {
213: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
214: },
215: }
216: .bind(channel_id)
217: .bind(url)
218: .bind(iv_hash)
219: .bind(owner.into())
220: .bind(channel)
221: .bind(url_re)
222: .execute(&mut *self.0).await
223: {
224: Ok(_) => Ok(match update {
225: Some(_) => "Channel updated.",
226: None => "Channel added.",
227: }),
228: Err(sqlx::Error::Database(err)) => {
229: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
230: Some("_bt_check_unique", ) => {
231: Ok("Duplicate key.")
232: },
233: Some(_) => {
234: Ok("Database error.")
235: },
236: None => {
237: Ok("No database error extracted.")
238: },
239: }
240: },
241: Err(err) => {
242: bail!("Sorry, unknown error:\n{err:#?}\n");
243: },
244: }
245: }
246: }