diff --git a/lib/gat/admin_test.go b/lib/gat/admin_test.go new file mode 100644 index 0000000000000000000000000000000000000000..605458c89440374a6f10f11ce4f075a7642f69fe --- /dev/null +++ b/lib/gat/admin_test.go @@ -0,0 +1,3 @@ +package gat + +// TODO: no tests in original package. we shoul write our oen diff --git a/lib/gat/client_test.go b/lib/gat/client_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b28f2da2fc9d5abbaab4090d5770795074d3527a --- /dev/null +++ b/lib/gat/client_test.go @@ -0,0 +1,3 @@ +package gat + +// TODO: write client tests, original had none diff --git a/lib/gat/messages_test.go b/lib/gat/messages_test.go new file mode 100644 index 0000000000000000000000000000000000000000..4fab80648e4ee2d06fda36b5f11731f80ad2f32d --- /dev/null +++ b/lib/gat/messages_test.go @@ -0,0 +1,3 @@ +package gat + +// TODO: once we decide on what messages, write the relevant test diff --git a/lib/gat/pool_test.go b/lib/gat/pool_test.go new file mode 100644 index 0000000000000000000000000000000000000000..b9a0eb6a5102fa7a4c396cd3d98cfd9292c821ad --- /dev/null +++ b/lib/gat/pool_test.go @@ -0,0 +1,3 @@ +package gat + +// TODO: no tests, we need to write our own diff --git a/lib/gat/stats.go b/lib/gat/stats.go new file mode 100644 index 0000000000000000000000000000000000000000..8daa678ff0ca4ff6a5ced48725b560e6ebd35c33 --- /dev/null +++ b/lib/gat/stats.go @@ -0,0 +1,570 @@ +package gat + +//TODO: metrics +// let's do this last. we can use the go package for prometheus, its way better than anything we could do +// this would be a good project to teach promethus basics to a junior +// +//use arc_swap::ArcSwap; +///// Statistics and reporting. +//use log::{error, info, trace}; +//use once_cell::sync::Lazy; +//use parking_lot::Mutex; +//use std::collections::HashMap; +//use tokio::sync::mpsc::error::TrySendError; +//use tokio::sync::mpsc::{channel, Receiver, Sender}; +// +//use crate::pool::get_number_of_addresses; +// +//pub static REPORTER: Lazy<ArcSwap<Reporter>> = +// Lazy::new(|| ArcSwap::from_pointee(Reporter::default())); +// +///// Latest stats updated every second; used in SHOW STATS and other admin commands. +//static LATEST_STATS: Lazy<Mutex<HashMap<usize, HashMap<String, i64>>>> = +// Lazy::new(|| Mutex::new(HashMap::new())); +// +///// Statistics period used for average calculations. +///// 15 seconds. +//static STAT_PERIOD: u64 = 15000; +// +///// The names for the events reported +///// to the statistics collector. +//#[derive(Debug, Clone, Copy)] +//enum EventName { +// CheckoutTime, +// Query, +// Transaction, +// DataSent, +// DataReceived, +// ClientWaiting, +// ClientActive, +// ClientIdle, +// ClientDisconnecting, +// ServerActive, +// ServerIdle, +// ServerTested, +// ServerLogin, +// ServerDisconnecting, +// UpdateStats, +// UpdateAverages, +//} +// +///// Event data sent to the collector +///// from clients and servers. +//#[derive(Debug, Clone)] +//pub struct Event { +// /// The name of the event being reported. +// name: EventName, +// +// /// The value being reported. Meaning differs based on event name. +// value: i64, +// +// /// The client or server connection reporting the event. +// process_id: i32, +// +// /// The server the client is connected to. +// address_id: usize, +//} +// +///// The statistics reporter. An instance is given +///// to each possible source of statistics, +///// e.g. clients, servers, connection pool. +//#[derive(Clone, Debug)] +//pub struct Reporter { +// tx: Sender<Event>, +//} +// +//impl Default for Reporter { +// fn default() -> Reporter { +// let (tx, _rx) = channel(5); +// Reporter { tx } +// } +//} +// +//impl Reporter { +// /// Create a new Reporter instance. +// pub fn new(tx: Sender<Event>) -> Reporter { +// Reporter { tx: tx } +// } +// +// /// Send statistics to the task keeping track of stats. +// fn send(&self, event: Event) { +// let name = event.name; +// let result = self.tx.try_send(event); +// +// match result { +// Ok(_) => trace!( +// "{:?} event reported successfully, capacity: {}", +// name, +// self.tx.capacity() +// ), +// +// Err(err) => match err { +// TrySendError::Full { .. } => error!("{:?} event dropped, buffer full", name), +// TrySendError::Closed { .. } => error!("{:?} event dropped, channel closed", name), +// }, +// }; +// } +// +// /// Report a query executed by a client against +// /// a server identified by the `address_id`. +// pub fn query(&self, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::Query, +// value: 1, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event); +// } +// +// /// Report a transaction executed by a client against +// /// a server identified by the `address_id`. +// pub fn transaction(&self, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::Transaction, +// value: 1, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Report data sent to a server identified by `address_id`. +// /// The `amount` is measured in bytes. +// pub fn data_sent(&self, amount: usize, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::DataSent, +// value: amount as i64, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Report data received from a server identified by `address_id`. +// /// The `amount` is measured in bytes. +// pub fn data_received(&self, amount: usize, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::DataReceived, +// value: amount as i64, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Time spent waiting to get a healthy connection from the pool +// /// for a server identified by `address_id`. +// /// Measured in milliseconds. +// pub fn checkout_time(&self, ms: u128, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::CheckoutTime, +// value: ms as i64, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Reports a client identified by `process_id` waiting for a connection +// /// to a server identified by `address_id`. +// pub fn client_waiting(&self, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::ClientWaiting, +// value: 1, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Reports a client identified by `process_id` is done waiting for a connection +// /// to a server identified by `address_id` and is about to query the server. +// pub fn client_active(&self, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::ClientActive, +// value: 1, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Reports a client identified by `process_id` is done querying the server +// /// identified by `address_id` and is no longer active. +// pub fn client_idle(&self, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::ClientIdle, +// value: 1, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Reports a client identified by `process_id` is disconecting from the pooler. +// /// The last server it was connected to is identified by `address_id`. +// pub fn client_disconnecting(&self, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::ClientDisconnecting, +// value: 1, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Reports a server connection identified by `process_id` for +// /// a configured server identified by `address_id` is actively used +// /// by a client. +// pub fn server_active(&self, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::ServerActive, +// value: 1, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Reports a server connection identified by `process_id` for +// /// a configured server identified by `address_id` is no longer +// /// actively used by a client and is now idle. +// pub fn server_idle(&self, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::ServerIdle, +// value: 1, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Reports a server connection identified by `process_id` for +// /// a configured server identified by `address_id` is attempting +// /// to login. +// pub fn server_login(&self, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::ServerLogin, +// value: 1, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Reports a server connection identified by `process_id` for +// /// a configured server identified by `address_id` is being +// /// tested before being given to a client. +// pub fn server_tested(&self, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::ServerTested, +// value: 1, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +// +// /// Reports a server connection identified by `process_id` is disconecting from the pooler. +// /// The configured server it was connected to is identified by `address_id`. +// pub fn server_disconnecting(&self, process_id: i32, address_id: usize) { +// let event = Event { +// name: EventName::ServerDisconnecting, +// value: 1, +// process_id: process_id, +// address_id: address_id, +// }; +// +// self.send(event) +// } +//} +// +///// The statistics collector which is receiving statistics +///// from clients, servers, and the connection pool. There is +///// only one collector (kind of like a singleton). +///// The collector can trigger events on its own, e.g. +///// it updates aggregates every second and averages every +///// 15 seconds. +//pub struct Collector { +// rx: Receiver<Event>, +// tx: Sender<Event>, +//} +// +//impl Collector { +// /// Create a new collector instance. There should only be one instance +// /// at a time. This is ensured by mpsc which allows only one receiver. +// pub fn new(rx: Receiver<Event>, tx: Sender<Event>) -> Collector { +// Collector { rx, tx } +// } +// +// /// The statistics collection handler. It will collect statistics +// /// for `address_id`s starting at 0 up to `addresses`. +// pub async fn collect(&mut self) { +// info!("Events reporter started"); +// +// let stats_template = HashMap::from([ +// ("total_query_count", 0), +// ("total_query_time", 0), +// ("total_received", 0), +// ("total_sent", 0), +// ("total_xact_count", 0), +// ("total_xact_time", 0), +// ("total_wait_time", 0), +// ("avg_query_count", 0), +// ("avg_query_time", 0), +// ("avg_recv", 0), +// ("avg_sent", 0), +// ("avg_xact_count", 0), +// ("avg_xact_time", 0), +// ("avg_wait_time", 0), +// ("maxwait_us", 0), +// ("maxwait", 0), +// ("cl_waiting", 0), +// ("cl_active", 0), +// ("cl_idle", 0), +// ("sv_idle", 0), +// ("sv_active", 0), +// ("sv_login", 0), +// ("sv_tested", 0), +// ]); +// +// let mut stats = HashMap::new(); +// +// // Stats saved after each iteration of the flush event. Used in calculation +// // of averages in the last flush period. +// let mut old_stats: HashMap<usize, HashMap<String, i64>> = HashMap::new(); +// +// // Track which state the client and server are at any given time. +// let mut client_server_states: HashMap<usize, HashMap<i32, EventName>> = HashMap::new(); +// +// // Flush stats to StatsD and calculate averages every 15 seconds. +// let tx = self.tx.clone(); +// tokio::task::spawn(async move { +// let mut interval = +// tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD / 15)); +// loop { +// interval.tick().await; +// let address_count = get_number_of_addresses(); +// for address_id in 0..address_count { +// let _ = tx.try_send(Event { +// name: EventName::UpdateStats, +// value: 0, +// process_id: -1, +// address_id: address_id, +// }); +// } +// } +// }); +// +// let tx = self.tx.clone(); +// tokio::task::spawn(async move { +// let mut interval = +// tokio::time::interval(tokio::time::Duration::from_millis(STAT_PERIOD)); +// loop { +// interval.tick().await; +// let address_count = get_number_of_addresses(); +// for address_id in 0..address_count { +// let _ = tx.try_send(Event { +// name: EventName::UpdateAverages, +// value: 0, +// process_id: -1, +// address_id: address_id, +// }); +// } +// } +// }); +// +// // The collector loop +// loop { +// let stat = match self.rx.recv().await { +// Some(stat) => stat, +// None => { +// info!("Events collector is shutting down"); +// return; +// } +// }; +// +// let stats = stats +// .entry(stat.address_id) +// .or_insert(stats_template.clone()); +// let client_server_states = client_server_states +// .entry(stat.address_id) +// .or_insert(HashMap::new()); +// let old_stats = old_stats.entry(stat.address_id).or_insert(HashMap::new()); +// +// // Some are counters, some are gauges... +// match stat.name { +// EventName::Query => { +// let counter = stats.entry("total_query_count").or_insert(0); +// *counter += stat.value; +// } +// +// EventName::Transaction => { +// let counter = stats.entry("total_xact_count").or_insert(0); +// *counter += stat.value; +// } +// +// EventName::DataSent => { +// let counter = stats.entry("total_sent").or_insert(0); +// *counter += stat.value; +// } +// +// EventName::DataReceived => { +// let counter = stats.entry("total_received").or_insert(0); +// *counter += stat.value; +// } +// +// EventName::CheckoutTime => { +// let counter = stats.entry("total_wait_time").or_insert(0); +// *counter += stat.value; +// +// let counter = stats.entry("maxwait_us").or_insert(0); +// let mic_part = stat.value % 1_000_000; +// +// // Report max time here +// if mic_part > *counter { +// *counter = mic_part; +// } +// +// let counter = stats.entry("maxwait").or_insert(0); +// let seconds = *counter / 1_000_000; +// +// if seconds > *counter { +// *counter = seconds; +// } +// } +// +// EventName::ClientActive +// | EventName::ClientWaiting +// | EventName::ClientIdle +// | EventName::ServerActive +// | EventName::ServerIdle +// | EventName::ServerTested +// | EventName::ServerLogin => { +// client_server_states.insert(stat.process_id, stat.name); +// } +// +// EventName::ClientDisconnecting | EventName::ServerDisconnecting => { +// client_server_states.remove(&stat.process_id); +// } +// +// EventName::UpdateStats => { +// // Calculate connection states +// for (_, state) in client_server_states.iter() { +// match state { +// EventName::ClientActive => { +// let counter = stats.entry("cl_active").or_insert(0); +// *counter += 1; +// } +// +// EventName::ClientWaiting => { +// let counter = stats.entry("cl_waiting").or_insert(0); +// *counter += 1; +// } +// +// EventName::ServerIdle => { +// let counter = stats.entry("sv_idle").or_insert(0); +// *counter += 1; +// } +// +// EventName::ServerActive => { +// let counter = stats.entry("sv_active").or_insert(0); +// *counter += 1; +// } +// +// EventName::ServerTested => { +// let counter = stats.entry("sv_tested").or_insert(0); +// *counter += 1; +// } +// +// EventName::ServerLogin => { +// let counter = stats.entry("sv_login").or_insert(0); +// *counter += 1; +// } +// +// EventName::ClientIdle => { +// let counter = stats.entry("cl_idle").or_insert(0); +// *counter += 1; +// } +// +// _ => unreachable!(), +// }; +// } +// +// // Update latest stats used in SHOW STATS +// let mut guard = LATEST_STATS.lock(); +// for (key, value) in stats.iter() { +// let entry = guard.entry(stat.address_id).or_insert(HashMap::new()); +// entry.insert(key.to_string(), value.clone()); +// } +// +// // These are re-calculated every iteration of the loop, so we don't want to add values +// // from the last iteration. +// for stat in &[ +// "cl_active", +// "cl_waiting", +// "cl_idle", +// "sv_idle", +// "sv_active", +// "sv_tested", +// "sv_login", +// "maxwait", +// "maxwait_us", +// ] { +// stats.insert(stat, 0); +// } +// } +// +// EventName::UpdateAverages => { +// // Calculate averages +// for stat in &[ +// "avg_query_count", +// "avg_query_time", +// "avg_recv", +// "avg_sent", +// "avg_xact_time", +// "avg_xact_count", +// "avg_wait_time", +// ] { +// let total_name = match stat { +// &"avg_recv" => "total_received".to_string(), // Because PgBouncer is saving bytes +// _ => stat.replace("avg_", "total_"), +// }; +// +// let old_value = old_stats.entry(total_name.clone()).or_insert(0); +// let new_value = stats.get(total_name.as_str()).unwrap_or(&0).to_owned(); +// let avg = (new_value - *old_value) / (STAT_PERIOD as i64 / 1_000); // Avg / second +// +// stats.insert(stat, avg); +// *old_value = new_value; +// } +// } +// }; +// } +// } +//} +// +///// Get a snapshot of statistics. Updated once a second +///// by the `Collector`. +//pub fn get_stats() -> HashMap<usize, HashMap<String, i64>> { +// LATEST_STATS.lock().clone() +//} +// +///// Get the statistics reporter used to update stats across the pools/clients. +//pub fn get_reporter() -> Reporter { +// (*(*REPORTER.load())).clone() +//}