summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorYves Fischer <yvesf-git@xapek.org>2018-11-26 11:43:49 +0100
committerYves Fischer <yvesf-git@xapek.org>2018-11-26 11:43:49 +0100
commit20242a8d3cc2e9a70812f34fcc50c170a654f6c6 (patch)
treed925d0ee2e20be756233092b622ed3a1ca31e94d
parent3b89dc69da0f88cf8e2290523fa50656ac2ebb5d (diff)
downloadnginx-auth-totp-20242a8d3cc2e9a70812f34fcc50c170a654f6c6.tar.gz
nginx-auth-totp-20242a8d3cc2e9a70812f34fcc50c170a654f6c6.zip
Improve readability
- Move tokio runtime to main - Use structopt instead of getopt for option parsing
-rw-r--r--Cargo.toml7
-rw-r--r--src/http_server.rs101
-rw-r--r--src/main.rs96
3 files changed, 97 insertions, 107 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 76b5cd2..65e5f74 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -4,9 +4,7 @@ version = "0.1.0"
authors = ["Yves Fischer <yvesf+git@xapek.org>"]
[dependencies]
-ascii = "0.7"
time = "0.1"
-getopts = "0.2"
simple_logger = "0.4"
log = "0.3"
oath = "0.10.*"
@@ -16,9 +14,12 @@ random = "0.12.*"
tokio = "0.1.*"
tokio-threadpool = "0.1.*"
tokio-executor = "0.1.*"
+tokio-signal = "0.2.*"
+futures = "0.1.*"
http = "0.1.*"
bytes = "0.4.*"
httparse = "1.3.*"
thread_local = "0.3.*"
cookie = "0.11.*"
-url = "1.7.*" \ No newline at end of file
+url = "1.7.*"
+structopt = "0.2.*" \ No newline at end of file
diff --git a/src/http_server.rs b/src/http_server.rs
index 826163c..0e60e72 100644
--- a/src/http_server.rs
+++ b/src/http_server.rs
@@ -3,92 +3,75 @@ use std::io;
use std::net::SocketAddr;
use std::sync::Arc;
use std::boxed::Box;
-use bytes::Bytes;
+use bytes::Bytes;
+use bytes::BytesMut;
use tokio;
use tokio::net::TcpListener;
use tokio::prelude::*;
use tokio::codec::{Encoder, Decoder};
-use tokio_threadpool::Builder;
-use tokio_executor::enter;
-use bytes::BytesMut;
use http::header::HeaderValue;
use http::{Request, Response};
use thread_local::ThreadLocal;
-use system;
-
pub trait HttpHandler<T> {
fn respond(&self, state: &T, req: Request<Bytes>) -> Response<String>;
}
pub fn serve<
T: 'static + Send + Clone + HttpHandler<X>,
- X: Send + Clone + 'static
->(addr: SocketAddr, state: X, handler: T) {
+ X: Send + Clone + 'static,
+>(addr: SocketAddr, state: X, handler: T)
+ -> impl Future<Item=(), Error=()> + Send
+{
let listener = TcpListener::bind(&addr).expect("failed to bind");
info!("Listening on: {}", addr);
let tl_handler: Arc<ThreadLocal<T>> = Arc::new(ThreadLocal::new());
- let tl_state: Arc<ThreadLocal<X>>= Arc::new(ThreadLocal::new());
+ let tl_state: Arc<ThreadLocal<X>> = Arc::new(ThreadLocal::new());
+
+ listener.incoming()
+ .map_err(|e| error!("failed to accept socket; error = {:?}", e))
+ .for_each(move |socket| {
+ let peer_addr = match socket.peer_addr() {
+ Ok(addr) => format!("{}", addr),
+ Err(_) => "<error>".to_string(),
+ };
- let program =
- listener.incoming()
- .map_err(|e| error!("failed to accept socket; error = {:?}", e))
- .for_each(move |socket| {
- let peer_addr = match socket.peer_addr() {
- Ok(addr) => format!("{}", addr),
- Err(_) => "<error>".to_string(),
- };
+ let (tx, rx) =
+ HttpFrame.framed(socket).split();
- let (tx, rx) =
- HttpFrame.framed(socket).split();
+ let tl_handler = tl_handler.clone();
+ let handler = handler.clone();
- let tl_handler = tl_handler.clone();
- let handler = handler.clone();
+ let tl_state = tl_state.clone();
+ let state = state.clone();
- let tl_state = tl_state.clone();
+ let rx_task = rx.and_then(move |req| {
let state = state.clone();
-
- let rx_task = rx.and_then(move |req| {
- let state = state.clone();
- let state = tl_state.get_or(||{
- debug!("Clone state");
- Box::new(state.clone())
- });
- let handler = tl_handler.get_or(|| {
- debug!("Clone handler");
- Box::new(handler.clone())
- });
- info!("{:?} {} {} {:?}", peer_addr, req.method(), req.uri(), req.version());
- let response = handler.respond(&state, req);
- Box::new(future::ok(response))
+ let state = tl_state.get_or(|| {
+ debug!("Clone state");
+ Box::new(state.clone())
});
- let tx_task = tx.send_all(rx_task)
- .then(|res| {
- if let Err(e) = res {
- error!("failed to process connection; error = {:?}", e);
- }
- Ok(())
- });
-
- // Spawn the task that handles the connection.
- tokio::spawn(tx_task);
- Ok(())
+ let handler = tl_handler.get_or(|| {
+ debug!("Clone handler");
+ Box::new(handler.clone())
+ });
+ info!("{:?} {} {} {:?}", peer_addr, req.method(), req.uri(), req.version());
+ let response = handler.respond(&state, req);
+ Box::new(future::ok(response))
});
+ let tx_task = tx.send_all(rx_task)
+ .then(|res| {
+ if let Err(e) = res {
+ error!("failed to process connection; error = {:?}", e);
+ }
+ Ok(())
+ });
-
- let mut builder = Builder::new();
- let runtime = builder
- .name_prefix("httpd-")
- .after_start(|| {
- debug!("Start new worker");
- system::initialize_rng_from_time();
+ // Spawn the task that handles the connection.
+ tokio::spawn(tx_task);
+ Ok(())
})
- .build();
- runtime.spawn(program);
- enter().expect("nested tokio::run")
- .block_on(runtime.shutdown_on_idle())
- .unwrap();
}
///
diff --git a/src/main.rs b/src/main.rs
index bb3c57e..2d5218f 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,23 +1,18 @@
#![feature(test)]
-#![feature(convert_id)]
-#![feature(proc_macro_hygiene)]
-#![feature(try_from)]
#![feature(duration_as_u128)]
-#![feature(libc)]
-use std::env;
use std::sync::Arc;
use std::thread;
use std::sync::atomic;
use std::net::SocketAddr;
-extern crate ascii;
-extern crate getopts;
#[macro_use]
extern crate log;
extern crate tokio;
extern crate tokio_threadpool;
extern crate tokio_executor;
+extern crate tokio_signal;
+extern crate futures;
extern crate time;
extern crate simple_logger;
extern crate oath;
@@ -32,10 +27,14 @@ extern crate bytes;
extern crate thread_local;
extern crate cookie;
extern crate url;
+extern crate structopt;
-use getopts::Options;
+use structopt::StructOpt;
use log::LogLevel::{Debug, Warn};
use time::Duration;
+use futures::{Future, Stream};
+use tokio_threadpool::Builder;
+use tokio_executor::enter;
mod auth_handler;
mod cookie_store;
@@ -44,8 +43,6 @@ mod router;
mod system;
mod totp;
-extern crate libc;
-
use cookie_store::CookieStore;
#[derive(Clone)]
@@ -54,53 +51,62 @@ pub struct ApplicationState {
cookie_max_age: Duration,
}
-fn print_usage(program: &str, opts: &Options) {
- let brief = format!("Usage: {} [options]", program);
- print!("{}", opts.usage(&brief));
+#[derive(Debug, StructOpt)]
+#[structopt(name = "nginx-auth-totp")]
+struct Opt {
+ #[structopt(short = "l", long = "port", default_value = "127.0.0.1:8080")]
+ addr: SocketAddr,
+ #[structopt(short = "d", long = "debug")]
+ debug: bool,
}
fn main() {
- let args: Vec<String> = env::args().collect();
- let program = args[0].clone();
- let mut opts = Options::new();
- opts.optopt("l", "port", "Listen address", "LISTEN-ADDR");
- opts.optflag("d", "debug", "Use loglevel Debug instead of Warn");
- opts.optflag("h", "help", "print this help menu");
- let matches = opts.parse(&args[1..]).unwrap_or_else(|f| panic!(f.to_string()));
-
- if matches.opt_present("h") {
- print_usage(&program, &opts);
- return;
- }
-
- simple_logger::init_with_level(if matches.opt_present("d") { Debug } else { Warn })
+ let opt = Opt::from_args();
+ simple_logger::init_with_level(if opt.debug { Debug } else { Warn })
.unwrap_or_else(|_| panic!("Failed to initialize logger"));
-
- let addr = matches.opt_str("l").unwrap_or_else(||"127.0.0.1:8080".to_string());
- let addr = addr.parse::<SocketAddr>()
- .unwrap_or_else(|_| panic!("Failed to parse LISTEN-ADDRESS"));
-
-
- // concurrent eventual consistent hashmap with <cookie-id, timeout>
let state = ApplicationState { cookie_store: CookieStore::new(), cookie_max_age: Duration::days(1) };
let server_shutdown_condvar = Arc::new(atomic::AtomicBool::new(false));
- let cookie_clean_thread_condvar = server_shutdown_condvar.clone();
- let cookie_clean_state = state.clone();
- let cookie_clean_thread = thread::spawn(move || {
- while !cookie_clean_thread_condvar.load(atomic::Ordering::Relaxed) {
- thread::sleep(std::time::Duration::from_secs(60));
- debug!("Clean cookie cache");
- cookie_clean_state.cookie_store.clean_outdated_cookies();
- }
- });
+ let cookie_clean_thread = {
+ let server_shutdown_condvar = server_shutdown_condvar.clone();
+ let state = state.clone();
+ thread::spawn(move || {
+ thread::park_timeout(std::time::Duration::from_secs(10));
+ while !server_shutdown_condvar.load(atomic::Ordering::Relaxed) {
+ info!("Clean cookie cache");
+ state.cookie_store.clean_outdated_cookies();
+ thread::park_timeout(std::time::Duration::from_secs(60));
+ }
+ })
+ };
let auth_handler = auth_handler::AuthHandler::make();
- http_server::serve(addr, state, auth_handler);
+ let runtime = Builder::new()
+ .name_prefix("httpd-")
+ .after_start(|| {
+ debug!("Start new worker: {}", thread::current().name().unwrap_or("-"));
+ system::initialize_rng_from_time();
+ })
+ .build();
+
+ let program = http_server::serve(opt.addr, state, auth_handler);
+ runtime.spawn(program);
+
+ let ctrl_c_block = tokio_signal::ctrl_c()
+ .flatten_stream().take(1).for_each(|()| {
+ info!("ctrl-c received");
+ Ok(())
+ });
+
+ enter().expect("nested tokio::run")
+ .block_on(ctrl_c_block)
+ .unwrap();
+ runtime.shutdown();
+ info!("Waiting for cookie cleanup thread to stop");
server_shutdown_condvar.store(true, atomic::Ordering::Relaxed);
- debug!("Waiting for cleanup thread to shutdown");
+ cookie_clean_thread.thread().unpark();
cookie_clean_thread.join().unwrap();
}