Lines of
src/sql.rs
from check-in 0340541002
that are changed by the sequence of edits moving toward
check-in e3f7eeb26a:
1: use std::borrow::Cow;
2:
3: use anyhow::{
4: Result,
5: bail,
6: };
7: use chrono::{
8: DateTime,
9: FixedOffset,
10: Local,
11: };
12: use sqlx::{
13: Pool,
14: Postgres,
15: Row,
16: postgres::PgPoolOptions,
17: pool::PoolConnection,
18: };
19:
20: #[derive(sqlx::FromRow, Debug)]
21: pub struct List {
0340541002 2025-04-24 22: pub source_id: i64,
23: pub channel: String,
24: pub enabled: bool,
25: pub url: String,
26: pub iv_hash: Option<String>,
27: pub url_re: Option<String>,
28: }
29:
30: #[derive(sqlx::FromRow, Debug)]
31: pub struct Source {
32: pub channel_id: i64,
33: pub url: String,
34: pub iv_hash: Option<String>,
35: pub owner: i64,
36: pub url_re: Option<String>,
37: }
38:
39: #[derive(sqlx::FromRow)]
40: pub struct Queue {
41: pub source_id: Option<i32>,
42: pub next_fetch: Option<DateTime<Local>>,
43: pub owner: Option<i64>,
44: }
45:
46: #[derive(Clone)]
47: pub struct Db {
48: pool: sqlx::Pool<sqlx::Postgres>,
49: }
50:
51: pub struct Conn{
52: conn: PoolConnection<Postgres>,
53: }
54:
55: impl Db {
56: pub fn new (pguri: &str) -> Result<Db> {
57: Ok(Db{
58: pool: PgPoolOptions::new()
59: .max_connections(5)
60: .acquire_timeout(std::time::Duration::new(300, 0))
61: .idle_timeout(std::time::Duration::new(60, 0))
62: .connect_lazy(pguri)?,
63: })
64: }
65:
66: pub async fn begin(&mut self) -> Result<Conn> {
67: Conn::new(&mut self.pool).await
68: }
69: }
70:
71: impl Conn {
72: pub async fn new (pool: &mut Pool<Postgres>) -> Result<Conn> {
73: let conn = pool.acquire().await?;
74: Ok(Conn{
75: conn,
76: })
77: }
78:
79: pub async fn add_post (&mut self, id: i32, date: &DateTime<FixedOffset>, post_url: &str) -> Result<()> {
80: sqlx::query("insert into rsstg_post (source_id, posted, url) values ($1, $2, $3);")
81: .bind(id)
82: .bind(date)
83: .bind(post_url)
84: .execute(&mut *self.conn).await?;
85: Ok(())
86: }
87:
88: pub async fn clean (&mut self, source_id: i32, owner: i64) -> Result<Cow<'_, str>> {
89: match sqlx::query("delete from rsstg_post p using rsstg_source s where p.source_id = $1 and owner = $2 and p.source_id = s.source_id;")
90: .bind(source_id)
91: .bind(owner)
92: .execute(&mut *self.conn).await?.rows_affected() {
93: 0 => { Ok("No data found found.".into()) },
94: x => { Ok(format!("{x} posts purged.").into()) },
95: }
96: }
97:
98: pub async fn delete (&mut self, source_id: i32, owner: i64) -> Result<Cow<'_, str>> {
99: match sqlx::query("delete from rsstg_source where source_id = $1 and owner = $2;")
100: .bind(source_id)
101: .bind(owner)
102: .execute(&mut *self.conn).await?.rows_affected() {
103: 0 => { Ok("No data found found.".into()) },
104: x => { Ok(format!("{} sources removed.", x).into()) },
105: }
106: }
107:
108: pub async fn disable (&mut self, source_id: i32, owner: i64) -> Result<&str> {
109: match sqlx::query("update rsstg_source set enabled = false where source_id = $1 and owner = $2")
110: .bind(source_id)
111: .bind(owner)
112: .execute(&mut *self.conn).await?.rows_affected() {
113: 1 => { Ok("Source disabled.") },
114: 0 => { Ok("Source not found.") },
115: _ => { bail!("Database error.") },
116: }
117: }
118:
119: pub async fn enable (&mut self, source_id: i32, owner: i64) -> Result<&str> {
120: match sqlx::query("update rsstg_source set enabled = true where source_id = $1 and owner = $2")
121: .bind(source_id)
122: .bind(owner)
123: .execute(&mut *self.conn).await?.rows_affected() {
124: 1 => { Ok("Source enabled.") },
125: 0 => { Ok("Source not found.") },
126: _ => { bail!("Database error.") },
127: }
128: }
129:
130: pub async fn exists (&mut self, post_url: &str, id: i32) -> Result<Option<bool>> {
131: let row = sqlx::query("select exists(select true from rsstg_post where url = $1 and source_id = $2) as exists;")
132: .bind(post_url)
133: .bind(id)
134: .fetch_one(&mut *self.conn).await?;
135: let exists: Option<bool> = row.try_get("exists")?;
136: Ok(exists)
137: }
138:
139: pub async fn get_queue (&mut self) -> Result<Vec<Queue>> {
140: let block: Vec<Queue> = sqlx::query_as("select source_id, next_fetch, owner from rsstg_order natural left join rsstg_source where next_fetch < now() + interval '1 minute';")
141: .fetch_all(&mut *self.conn).await?;
142: Ok(block)
143: }
144:
145: pub async fn get_list (&mut self, owner: i64) -> Result<Vec<List>> {
146: let source: Vec<List> = sqlx::query_as("select source_id, channel, enabled, url, iv_hash, url_re from rsstg_source where owner = $1 order by source_id")
147: .bind(owner)
148: .fetch_all(&mut *self.conn).await?;
149: Ok(source)
150: }
151:
152: pub async fn get_source (&mut self, id: i32, owner: i64) -> Result<Source> {
153: let source: Source = sqlx::query_as("select channel_id, url, iv_hash, owner, url_re from rsstg_source where source_id = $1 and owner = $2")
154: .bind(id)
155: .bind(owner)
156: .fetch_one(&mut *self.conn).await?;
157: Ok(source)
158: }
159:
160: pub async fn set_scrape (&mut self, id: i32) -> Result<()> {
161: sqlx::query("update rsstg_source set last_scrape = now() where source_id = $1;")
162: .bind(id)
163: .execute(&mut *self.conn).await?;
164: Ok(())
165: }
166:
167: pub async fn update (&mut self, update: Option<i32>, channel: &str, channel_id: i64, url: &str, iv_hash: Option<&str>, url_re: Option<&str>, owner: i64) -> Result<&str> {
168: match match update {
169: Some(id) => {
170: sqlx::query("update rsstg_source set channel_id = $2, url = $3, iv_hash = $4, owner = $5, channel = $6, url_re = $7 where source_id = $1")
171: .bind(id)
172: },
173: None => {
174: sqlx::query("insert into rsstg_source (channel_id, url, iv_hash, owner, channel, url_re) values ($1, $2, $3, $4, $5, $6)")
175: },
176: }
177: .bind(channel_id)
178: .bind(url)
179: .bind(iv_hash)
180: .bind(owner)
181: .bind(channel)
182: .bind(url_re)
183: .execute(&mut *self.conn).await
184: {
185: Ok(_) => Ok(match update {
186: Some(_) => "Channel updated.",
187: None => "Channel added.",
188: }),
189: Err(sqlx::Error::Database(err)) => {
190: match err.downcast::<sqlx::postgres::PgDatabaseError>().routine() {
191: Some("_bt_check_unique", ) => {
192: Ok("Duplicate key.")
193: },
194: Some(_) => {
195: Ok("Database error.")
196: },
197: None => {
198: Ok("No database error extracted.")
199: },
200: }
201: },
202: Err(err) => {
203: bail!("Sorry, unknown error:\n{err:#?}\n");
204: },
205: }
206: }
207: }