lua-counter example

Annotation For offload/src/main.rs
anonymous

Annotation For offload/src/main.rs

Origin for each line in offload/src/main.rs from check-in f721596292:

b33171b487 2018-06-01    1: extern crate config;
b33171b487 2018-06-01    2: extern crate postgres;
b33171b487 2018-06-01    3: extern crate redis;
b33171b487 2018-06-01    4: extern crate regex;
b33171b487 2018-06-01    5: 
b33171b487 2018-06-01    6: use postgres::{Connection, TlsMode};
b33171b487 2018-06-01    7: use redis::Commands;
b33171b487 2018-06-01    8: use regex::Regex;
b33171b487 2018-06-01    9: use std::collections::HashMap;
b33171b487 2018-06-01   10: use std::time::Duration;
b33171b487 2018-06-01   11: use std::thread;
b33171b487 2018-06-01   12: 
b33171b487 2018-06-01   13: fn main() {
b33171b487 2018-06-01   14:     loop {
b33171b487 2018-06-01   15:         let mut settings = config::Config::default();
b33171b487 2018-06-01   16:         settings.merge(config::File::with_name("offload")).expect("Can't read configuration file");
b33171b487 2018-06-01   17: 
b33171b487 2018-06-01   18:         let reddb = redis::Client::open("redis://127.0.0.1/").expect("Can't connect to the database");
b33171b487 2018-06-01   19:         let red = reddb.get_connection().expect("Can't initialize new connection");
b33171b487 2018-06-01   20: 
f721596292 2018-10-28   21:         let re = Regex::new("^([0-9A-Z]+)[|_]([a-zA-Z./0-9-_#]+)[|_]([0-9.]+)$").expect("Can't parse regexp");
b33171b487 2018-06-01   22:         let conn = Connection::connect(settings.get_str("pg").expect("Postgres connection absent in config"), TlsMode::None).expect("Can't connect to postgres");
b33171b487 2018-06-01   23: 
b33171b487 2018-06-01   24:         let schemas = settings.get_array("schemas").expect("Schema list not found in config").into_iter().map(|value| config::Value::into_str(value).expect("We require string here"));
b33171b487 2018-06-01   25:         for schema in schemas {
b33171b487 2018-06-01   26:             let data_key = schema.to_owned() + "_counter_pending";
b33171b487 2018-06-01   27:             let cache_key = schema.to_owned() + "_counter_pending_now";
b33171b487 2018-06-01   28: 
b33171b487 2018-06-01   29:             if red.exists(&cache_key).expect("Can't query cache existance") {
b33171b487 2018-06-01   30:             } else if red.exists(&data_key).expect("Can't query data existance") {
b33171b487 2018-06-01   31:                 let _ : bool = red.rename_nx(&data_key, &cache_key).expect("Can't readd unflushed hash");
b33171b487 2018-06-01   32:             } else {
b33171b487 2018-06-01   33:                 continue;
b33171b487 2018-06-01   34:             }
b33171b487 2018-06-01   35: 
b33171b487 2018-06-01   36:             let stats : HashMap<String, i16> = red.hgetall(&cache_key).expect("Can't query for stored stat keys");
b33171b487 2018-06-01   37: 
b33171b487 2018-06-01   38:             //println!("# {:?}", stats);
b33171b487 2018-06-01   39: 
b33171b487 2018-06-01   40:             let trans = conn.transaction().expect("Can't start transaction");
b33171b487 2018-06-01   41:             let stmt = trans.prepare(&format!("select {}.merge_counter($1::text, $2::text, ($3::text)::inet, $4::smallint);", &schema)).expect("Can't prepare statement");
b33171b487 2018-06-01   42:             for (client, count) in stats {
b33171b487 2018-06-01   43:                 //println!("# {:?}: {:?}", &client, &count);
f721596292 2018-10-28   44:                 let cap = re.captures(&client).expect(&format!("Client match failed: {}", &client));
b33171b487 2018-06-01   45:                 //let addr = IpAddr::V4(cap[3].parse().expect("Can't parse IP"));
b33171b487 2018-06-01   46:                 //println!("insert into x values({:?}, {:?}, {:?}, {:?});", &cap[1], &cap[2], &cap[3], &count);
b33171b487 2018-06-01   47:                 stmt.execute(&[&cap[1].to_string(), &cap[2].to_string(), &cap[3].to_string(), &count]).expect("Can't execute prepared");
b33171b487 2018-06-01   48:                 //trans.query(&format!("select {}.merge_counter($1::text, $2::text, $3::inet, $4::smallint);", &schema), &[&cap[1].to_string(), &cap[2].to_string(), &cap[3].to_string(), &count]).expect("Can't prepare statement");
b33171b487 2018-06-01   49:             }
b33171b487 2018-06-01   50:             trans.commit().expect("Can't commit transaction");
b33171b487 2018-06-01   51:             red.del(cache_key).expect("Can't remove stale key")
b33171b487 2018-06-01   52:         }
b33171b487 2018-06-01   53:         thread::sleep(Duration::new(settings.get("delay").expect("Delay specification absent in config"), 0));
b33171b487 2018-06-01   54:     }
b33171b487 2018-06-01   55: }