Index: counter.body.conf ================================================================== --- counter.body.conf +++ counter.body.conf @@ -2,21 +2,16 @@ internal; postgres_query $echo_request_body; postgres_pass counter; } -location = /sleep { - internal; - echo_sleep 1; -} - location = /counter { default_type application/json; userid on; userid_name uid; userid_expires 365d; userid_p3p 'policyref="/w3c/p3p.xml", CP="NOI CUR ADM PSA OUR STP STA"'; - content_by_lua_file '/path/to/counter.lua'; + content_by_lua_file '/home/arcade/work/admin/nginx/counter.content.lua'; + #log_by_lua_file '/home/arcade/work/admin/nginx/counter.log.lua'; expires epoch; - keepalive_timeout 0; } ADDED counter.content.lua Index: counter.content.lua ================================================================== --- /dev/null +++ counter.content.lua @@ -0,0 +1,7 @@ +-- module counter - counts some junk + +local counter = require 'counter' + +counter.connect() +counter.update() +counter.exit() ADDED counter.log.lua Index: counter.log.lua ================================================================== --- /dev/null +++ counter.log.lua @@ -0,0 +1,5 @@ +-- module counter - counts some junk + +local counter = require 'counter' + +counter.log() Index: counter.lua ================================================================== --- counter.lua +++ counter.lua @@ -1,162 +1,138 @@ -- module counter - counts some junk -local today = os.date('*t') -local timestamp = os.time{year = today['year'], month = today['month'], day = today['day']} -local referer = '-'; - -if ngx.var.http_referer ~= nil then - _, _, referer = string.find(ngx.var.http_referer, '^https?://([%w%.]+)') -end - -local key = 'stat_counter_' .. timestamp .. '_' .. referer - --- connect to redis -local redis = require 'resty.redis' -local red = redis:new() -local ok, err = red:connect('127.0.0.1', 6379) -if not ok then - ngx.log(ngx.ERR, 'redis connection failed: ', err) - ngx.exit(500) -end - -local cjson = require 'cjson' - --- decodes values from array into dict -function decode_bulk(reply) +local _counter = {} + +local redis = require 'resty.redis' +local cjson = require 'cjson' + +local closed = false + +function _counter.exit() + if not closed then + local ok, err = _counter.red:set_keepalive(60000, 10) + if not ok then + ngx.log(ngx.ERR, "redis keepalive: " .. err) + end + closed = true + end +end + +function _counter.decode_bulk(reply) + -- decodes values from array into dict local data = {} for j=1, #reply, 2 do data[reply[j] ] = reply[j+1] end - return(data) -end - -local data, empty -empty, err = red:hsetnx(key, 'today', 0) -if not empty then - ngx.log(ngx.ERR, 'redis query failed: ', err) - ngx.exit(500) -end - -if empty == 0 then - data, err = red:hgetall(key) - if not data then - ngx.log(ngx.ERR, 'redis query failed: ', err) - ngx.exit(500) - end -end - -if empty == 1 or data.whole == nil then - -- postgres fallback - result = ngx.location.capture('/postgres', { - method = ngx.HTTP_PUT, - body = "select * from get_stats('" .. referer .. "') as (today int, lastday int, week bigint, whole bigint);" - }) - if result.status ~= 200 or not result.body then - ngx.log(ngx.ERR, 'postgres access failed') - ngx.exit(500) - else - local unrds = require "rds.parser" - local res, err = unrds.parse(result.body) - if res == nil then - ngx.log(ngx.ERR, 'failed to obtain data: ' .. err) - ngx.exit(500) - else - data = res.resultset[1] - local req = {key} - - for name, value in pairs(data) do - if value == unrds.null then - value = 0 - end - if name ~= 'today' then - table.insert(req, name) - table.insert(req, value) - end - end - - red:init_pipeline(3) - red:hmset{req} - red:expire(key, 129600) - red:hincrby(key, 'today', data.today) - res, err = red:commit_pipeline() - if not res then - ngx.log(ngx.ERR, 'redis pipeline failed: ', err) - ngx.exit(500) - end - end - end -end - -data.today = data.today + 1 -ngx.say(cjson.encode(data)) -ngx.eof() - -res, err = red:hincrby(key, 'today', 1) -local uid = '' - -if ngx.var.uid_got ~= nil and string.find(ngx.var.uid_got, 'uid=') == 1 then - uid = string.sub(ngx.var.uid_got, 5) -elseif ngx.var.uid_set ~= nil and string.find(ngx.var.uid_set, 'uid=') == 1 then - uid = string.sub(ngx.var.uid_set, 5) -end - -local hit_key = uid .. '_' .. referer .. '_' .. ngx.var.remote_addr - -red:init_pipeline(5) -red:multi() -red:hincrby('stat_counter_pending', hit_key, 1) -red:expire('stat_counter_pending', 60) -red:renamenx('stat_counter_pending', 'stat_counter_pending_tmp') -red:exec() -renamed, err = red:commit_pipeline() -if not renamed then - ngx.log(ngx.ERR, 'redis multi failed: ', err) - ngx.exit(500) -end - -local continue = false -if tonumber(renamed[5][3]) == 1 then - continue = true -end - -while continue do - ngx.location.capture('/sleep') - - local data - data, err = red:hgetall('stat_counter_pending_tmp') - if not data then - ngx.log(ngx.ERR, 'redis request failed: ', err) - ngx.exit(500) - end - - for name, value in pairs(decode_bulk(data)) do - local fields = {} - name:gsub("[^_]+", function(c) fields[#fields + 1] = c end) - fields[#fields + 1] = value - local result = ngx.location.capture('/postgres', { - method = ngx.HTTP_PUT, - -- prepare select? XXX - body = "select merge_counter('" .. fields[1] .. "'::text, '" .. fields[2] .. "'::text, '" .. fields[3] .. "'::inet, '" .. fields[4] .. "'::smallint);" - }) - if result.status ~= 200 or not result.body then - ngx.log(ngx.ERR, 'postgres access failed') - ngx.exit(500) - end - end - - red:init_pipeline(5) - red:multi() - red:del('stat_counter_pending_tmp') - red:exists('stat_counter_pending') - red:renamenx('stat_counter_pending', 'stat_counter_pending_tmp') - red:exec() - renamed, err = red:commit_pipeline() - if not renamed then - ngx.log(ngx.ERR, 'redis multi failed: ', err) - ngx.exit(500) - end - - if tonumber(renamed[5][1] == 0) or tonumber(renamed[5][2]) == 0 then - continue = false - end -end + return data +end + +function _counter.connect() + -- connect to redis + _counter.red = redis:new() + local ok, err = _counter.red:connect('127.0.0.1', 6379) + if not ok then + ngx.log(ngx.ERR, 'redis connection failed: ', err) + ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) + end + closed = false +end + +function _counter.check_schema() + if ngx.var.site_schema == nil then + ngx.log(ngx.ERR, "No 'site_schema' specified"); + ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) + end +end + +function _counter.update() + _counter.check_schema() + local today = os.date('*t') + local timestamp = os.time{year = today['year'], month = today['month'], day = today['day']} + local referer = '-'; + + if ngx.var.http_referer ~= nil then + _, _, referer = string.find(ngx.var.http_referer, '^https?://([^?&]+)') + --ngx.log(ngx.ERR, "Referer: " .. referer) + end + + local key = ngx.var.site_schema .. '_counter_' .. timestamp .. '_' .. referer + -- ngx.log(ngx.ERR, "Using key: " .. key) + + local res, err + _counter.red:init_pipeline(4) + _counter.red:multi() + _counter.red:hincrby(key, 'today', 1) + _counter.red:hgetall(key) + _counter.red:exec() + res, err = _counter.red:commit_pipeline() + if not res then + ngx.log(ngx.ERR, 'redis pipeline failed: ', err) + ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) + end + + local data = _counter.red:array_to_hash(res[4][2]) + -- ngx.log(ngx.ERR, "Got data: " .. cjson.encode(data)) + + if tonumber(data.today) == 1 then + ngx.log(ngx.ERR, "Reading postgres") + -- postgres fallback + result = ngx.location.capture('/postgres', { + method = ngx.HTTP_PUT, + body = "select * from " .. ngx.var.site_schema .. ".get_stats('" .. referer .. "') as (today int, lastday int, week bigint, whole bigint);" + }) + if result.status ~= 200 or not result.body then + ngx.log(ngx.ERR, 'postgres access failed') + ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) + else + local unrds = require "rds.parser" + local res, err = unrds.parse(result.body) + + if res == nil then + ngx.log(ngx.ERR, 'failed to obtain data: ' .. err) + ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) + else + data = res.resultset[1] + data.today = data.today + 1 + ngx.log(ngx.ERR, "Got data: " .. cjson.encode(data)) + + _counter.red:init_pipeline(4) + _counter.red:multi() + _counter.red:hmset(key, data) + _counter.red:expire(key, 129600) + _counter.red:exec() + res, err = _counter.red:commit_pipeline() + if not res then + ngx.log(ngx.ERR, 'redis pipeline failed: ', err) + ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) + end + end + end + end + + ngx.say(cjson.encode(data)) + ngx.eof() + + local uid = '' + + if ngx.var.uid_got ~= nil and string.find(ngx.var.uid_got, 'uid=') == 1 then + uid = string.sub(ngx.var.uid_got, 5) + elseif ngx.var.uid_set ~= nil and string.find(ngx.var.uid_set, 'uid=') == 1 then + uid = string.sub(ngx.var.uid_set, 5) + end + + local hit_key = uid .. '_' .. referer .. '_' .. ngx.var.remote_addr + local key_pending = ngx.var.site_schema .. '_counter_pending' + + _counter.red:init_pipeline(4) + _counter.red:multi() + _counter.red:hincrby(key_pending, hit_key, 1) + _counter.red:expire(key_pending, 604800) + _counter.red:exec() + res, err = _counter.red:commit_pipeline() + if not res then + ngx.log(ngx.ERR, 'redis transaction failed: ', err) + ngx.exit(ngx.HTTP_INTERNAL_SERVER_ERROR) + end +end + +return _counter Index: counter.sql ================================================================== --- counter.sql +++ counter.sql @@ -1,33 +1,30 @@ -SET search_path = auto, pg_catalog; -BEGIN; - -CREATE TABLE counter ( +CREATE TABLE if not exists counter ( uid text NOT NULL, referer text DEFAULT '-' NOT NULL, ip inet NOT NULL, day date DEFAULT now() NOT NULL, count integer DEFAULT 0 NOT NULL ); -CREATE TABLE counter_sum ( +CREATE TABLE if not exists counter_sum ( referer text DEFAULT '-'::text NOT NULL, day date DEFAULT now() NOT NULL, count integer DEFAULT 0 NOT NULL ); CREATE or replace FUNCTION get_stats(target text) RETURNS record LANGUAGE sql AS $_$ select - coalesce((select count from counter_sum where day = current_date and referer = $1), 0) as today, - coalesce((select count from counter_sum where day = (current_date - interval '1 day')::date and referer = $1), 0) as lastday, - coalesce((select sum(count) from counter_sum + coalesce((select count from limbo.counter_sum where day = current_date and referer = $1), 0) as today, + coalesce((select count from limbo.counter_sum where day = (current_date - interval '1 day')::date and referer = $1), 0) as lastday, + coalesce((select sum(count) from limbo.counter_sum where date_trunc('week', current_date) = date_trunc('week', day) and day <> current_date and referer = $1 group by referer), 0) as week, - coalesce((select sum(count) from counter_sum where day <> current_date and referer = $1 group by referer), 0) as whole + coalesce((select sum(count) from limbo.counter_sum where day <> current_date and referer = $1 group by referer), 0) as whole $_$; -CREATE FUNCTION counter_sum__change_trigger() RETURNS trigger LANGUAGE plpgsql AS $$ +CREATE or replace FUNCTION counter_sum__change_trigger() RETURNS trigger LANGUAGE plpgsql AS $$ declare delta_count smallint; begin if (TG_OP = 'UPDATE') then if (old.referer <> new.referer or old.day <> new.day) then @@ -38,40 +35,40 @@ delta_count = new.count; elsif (TG_OP = 'DELETE') then raise exception 'Deleting counters prohibited'; end if; <> loop - update counter_sum set count = count + delta_count + update limbo.counter_sum set count = count + delta_count where referer = new.referer and day = new.day; exit insert_update when found; begin - insert into counter_sum (referer, day, count) values (new.referer, new.day, delta_count); + insert into limbo.counter_sum (referer, day, count) values (new.referer, new.day, delta_count); exit insert_update; exception when unique_violation then end; end loop insert_update; return new; end; $$; -CREATE FUNCTION merge_counter(merge_uid text, merge_referer text, merge_ip inet, merge_count smallint) RETURNS boolean LANGUAGE plpgsql AS $$ +CREATE or replace FUNCTION merge_counter(merge_uid text, merge_referer text, merge_ip inet, merge_count smallint) RETURNS boolean LANGUAGE plpgsql AS $$ BEGIN <> LOOP - UPDATE counter SET count = count + merge_count + UPDATE limbo.counter SET count = count + merge_count WHERE uid = merge_uid and ip = merge_ip and referer = merge_referer and day = current_date; exit insert_update when found; BEGIN - INSERT INTO counter (uid, referer, ip, count) VALUES (merge_uid, merge_referer, merge_ip, merge_count); + INSERT INTO limbo.counter (uid, referer, ip, count) VALUES (merge_uid, merge_referer, merge_ip, merge_count); EXIT insert_update; EXCEPTION WHEN unique_violation THEN END; END LOOP insert_update; return true; END; $$; -CREATE INDEX counter__date ON counter USING btree (day); -CREATE UNIQUE INDEX counter__uid_nreferer_ip_date ON counter (uid, ip, day) WHERE (referer IS NULL); -CREATE UNIQUE INDEX counter__uid_referer_ip_date ON counter (uid, referer, ip, day); -create unique index counter_sum__referer_day ON counter_sum (referer, day); -CREATE TRIGGER counter__change AFTER INSERT OR DELETE OR UPDATE ON counter FOR EACH ROW EXECUTE PROCEDURE counter_sum__change_trigger(); +CREATE INDEX if not exists counter__date ON counter USING btree (day); +CREATE UNIQUE INDEX if not exists counter__uid_nreferer_ip_date ON counter (uid, ip, day) WHERE (referer IS NULL); +CREATE UNIQUE INDEX if not exists counter__uid_referer_ip_date ON counter (uid, referer, ip, day); +create unique index if not exists counter_sum__referer_day ON counter_sum (referer, day); -end; +drop trigger if exists counter__change on counter; +CREATE TRIGGER counter__change AFTER INSERT OR DELETE OR UPDATE ON counter FOR EACH ROW EXECUTE PROCEDURE counter_sum__change_trigger(); DELETED counter.upstream.conf Index: counter.upstream.conf ================================================================== --- counter.upstream.conf +++ /dev/null @@ -1,4 +0,0 @@ -upstream counter { - postgres_server 127.0.0.1 dbname=test user=test password=test; - postgres_keepalive max=4 mode=multi overflow=ignore; -} ADDED counter.upstream.conf.sample Index: counter.upstream.conf.sample ================================================================== --- /dev/null +++ counter.upstream.conf.sample @@ -0,0 +1,4 @@ +upstream counter { + postgres_server 127.0.0.1 dbname=db user=user password=pass; + postgres_keepalive max=4 mode=multi overflow=ignore; +} ADDED offload/Cargo.toml Index: offload/Cargo.toml ================================================================== --- /dev/null +++ offload/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "offload" +version = "0.1.0" +authors = ["arcade"] + +[dependencies] +config = "0.8.0" +postgres = "0.15" +#postgres-inet = "*" +redis = "0.8.0" +regex = "0.2" ADDED offload/offload.toml.sample Index: offload/offload.toml.sample ================================================================== --- /dev/null +++ offload/offload.toml.sample @@ -0,0 +1,3 @@ +pg = 'postgres://test:pass@%2Ftmp' # urlencoded +schemas = [ "limbo" ] # schemas to watch +delay = 300 # delay of 5 minutes ADDED offload/src/main.rs Index: offload/src/main.rs ================================================================== --- /dev/null +++ offload/src/main.rs @@ -0,0 +1,55 @@ +extern crate config; +extern crate postgres; +extern crate redis; +extern crate regex; + +use postgres::{Connection, TlsMode}; +use redis::Commands; +use regex::Regex; +use std::collections::HashMap; +use std::time::Duration; +use std::thread; + +fn main() { + loop { + let mut settings = config::Config::default(); + settings.merge(config::File::with_name("offload")).expect("Can't read configuration file"); + + let reddb = redis::Client::open("redis://127.0.0.1/").expect("Can't connect to the database"); + let red = reddb.get_connection().expect("Can't initialize new connection"); + + let re = Regex::new("^([0-9A-Z]+)_([a-z./0-9-]+)_([0-9.]+)$").expect("Can't parse regexp"); + let conn = Connection::connect(settings.get_str("pg").expect("Postgres connection absent in config"), TlsMode::None).expect("Can't connect to postgres"); + + let schemas = settings.get_array("schemas").expect("Schema list not found in config").into_iter().map(|value| config::Value::into_str(value).expect("We require string here")); + for schema in schemas { + let data_key = schema.to_owned() + "_counter_pending"; + let cache_key = schema.to_owned() + "_counter_pending_now"; + + if red.exists(&cache_key).expect("Can't query cache existance") { + } else if red.exists(&data_key).expect("Can't query data existance") { + let _ : bool = red.rename_nx(&data_key, &cache_key).expect("Can't readd unflushed hash"); + } else { + continue; + } + + let stats : HashMap = red.hgetall(&cache_key).expect("Can't query for stored stat keys"); + + //println!("# {:?}", stats); + + let trans = conn.transaction().expect("Can't start transaction"); + let stmt = trans.prepare(&format!("select {}.merge_counter($1::text, $2::text, ($3::text)::inet, $4::smallint);", &schema)).expect("Can't prepare statement"); + for (client, count) in stats { + //println!("# {:?}: {:?}", &client, &count); + let cap = re.captures(&client).expect("Client match failed"); + //let addr = IpAddr::V4(cap[3].parse().expect("Can't parse IP")); + //println!("insert into x values({:?}, {:?}, {:?}, {:?});", &cap[1], &cap[2], &cap[3], &count); + stmt.execute(&[&cap[1].to_string(), &cap[2].to_string(), &cap[3].to_string(), &count]).expect("Can't execute prepared"); + //trans.query(&format!("select {}.merge_counter($1::text, $2::text, $3::inet, $4::smallint);", &schema), &[&cap[1].to_string(), &cap[2].to_string(), &cap[3].to_string(), &count]).expect("Can't prepare statement"); + } + trans.commit().expect("Can't commit transaction"); + red.del(cache_key).expect("Can't remove stale key") + } + thread::sleep(Duration::new(settings.get("delay").expect("Delay specification absent in config"), 0)); + } +}