Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions libshpool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ anyhow = "1" # dynamic, unstructured errors
chrono = "0.4" # getting current time and formatting it
serde = "1" # config parsing, connection header formatting
serde_derive = "1" # config parsing, connection header formatting
serde_json = "1" # JSON output for list command
toml = "0.8" # config parsing
byteorder = "1" # endianness
signal-hook = "0.3" # signal handling
Expand Down
36 changes: 34 additions & 2 deletions libshpool/src/daemon/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,8 +313,19 @@ impl Server {
.map_err(|e| anyhow!("joining shell->client after child exit: {:?}", e))?
.context("within shell->client thread after child exit")?;
}
} else if let Err(err) = self.hooks.on_client_disconnect(&header.name) {
warn!("client_disconnect hook: {:?}", err);
} else {
// Client disconnected but shell is still running - set last_disconnected_at
{
let _s = span!(Level::INFO, "disconnect_lock(shells)").entered();
let shells = self.shells.lock().unwrap();
if let Some(session) = shells.get(&header.name) {
session.lifecycle_timestamps.lock().unwrap().last_disconnected_at =
Some(time::SystemTime::now());
}
}
if let Err(err) = self.hooks.on_client_disconnect(&header.name) {
warn!("client_disconnect hook: {:?}", err);
}
}

info!("finished attach streaming section");
Expand Down Expand Up @@ -366,6 +377,8 @@ impl Server {
// the channel is still open so the subshell is still running
info!("taking over existing session inner");
inner.client_stream = Some(stream.try_clone()?);
session.lifecycle_timestamps.lock().unwrap().last_connected_at =
Some(time::SystemTime::now());

if inner
.shell_to_client_join_h
Expand Down Expand Up @@ -432,6 +445,8 @@ impl Server {
matches!(motd, MotdDisplayMode::Dump),
)?;

session.lifecycle_timestamps.lock().unwrap().last_connected_at =
Some(time::SystemTime::now());
shells.insert(header.name.clone(), Box::new(session));
// fallthrough to bidi streaming
} else if let Err(err) = self.hooks.on_reattach(&header.name) {
Expand Down Expand Up @@ -526,6 +541,9 @@ impl Server {
info!("detached session({}), status = {:?}", session, status);
if let shell::ClientConnectionStatus::DetachNone = status {
not_attached_sessions.push(session);
} else {
s.lifecycle_timestamps.lock().unwrap().last_disconnected_at =
Some(time::SystemTime::now());
}
} else {
not_found_sessions.push(session);
Expand Down Expand Up @@ -607,10 +625,23 @@ impl Server {
Err(_) => SessionStatus::Attached,
};

let timestamps = v.lifecycle_timestamps.lock().unwrap();
let last_connected_at_unix_ms = timestamps
.last_connected_at
.map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64))
.transpose()?;

let last_disconnected_at_unix_ms = timestamps
.last_disconnected_at
.map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64))
.transpose()?;

Ok(Session {
name: k.to_string(),
started_at_unix_ms: v.started_at.duration_since(time::UNIX_EPOCH)?.as_millis()
as i64,
last_connected_at_unix_ms,
last_disconnected_at_unix_ms,
status,
})
})
Expand Down Expand Up @@ -957,6 +988,7 @@ impl Server {
child_pid,
child_exit_notifier,
started_at: time::SystemTime::now(),
lifecycle_timestamps: Mutex::new(shell::SessionLifecycleTimestamps::default()),
inner: Arc::new(Mutex::new(session_inner)),
})
}
Expand Down
9 changes: 9 additions & 0 deletions libshpool/src/daemon/shell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,19 @@ const SHELL_TO_CLIENT_POLL_MS: u16 = 100;
// shell->client thread.
const SHELL_TO_CLIENT_CTL_TIMEOUT: time::Duration = time::Duration::from_millis(300);

/// Timestamps tracking when sessions were last connected/disconnected.
/// Combined behind a single lock to avoid taking multiple locks.
#[derive(Debug, Default)]
pub struct SessionLifecycleTimestamps {
pub last_connected_at: Option<time::SystemTime>,
pub last_disconnected_at: Option<time::SystemTime>,
}

/// Session represent a shell session
#[derive(Debug)]
pub struct Session {
pub started_at: time::SystemTime,
pub lifecycle_timestamps: Mutex<SessionLifecycleTimestamps>,
pub child_pid: libc::pid_t,
pub child_exit_notifier: Arc<ExitNotifier>,
pub shell_to_client_ctl: Arc<Mutex<ReaderCtl>>,
Expand Down
7 changes: 5 additions & 2 deletions libshpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,10 @@ will be used if it is present in the environment.")]

#[clap(about = "lists all the running shell sessions")]
#[non_exhaustive]
List,
List {
#[clap(short, long, help = "Output as JSON, includes extra fields")]
json: bool,
},

#[clap(about = "Dynamically change daemon log level

Expand Down Expand Up @@ -370,7 +373,7 @@ pub fn run(args: Args, hooks: Option<Box<dyn hooks::Hooks + Send + Sync>>) -> an
}
Commands::Detach { sessions } => detach::run(sessions, socket),
Commands::Kill { sessions } => kill::run(sessions, socket),
Commands::List => list::run(socket),
Commands::List { json } => list::run(socket, json),
Commands::SetLogLevel { level } => set_log_level::run(level, socket),
};

Expand Down
19 changes: 12 additions & 7 deletions libshpool/src/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@
use std::{io, path::PathBuf, time};

use anyhow::Context;
use chrono::{DateTime, Utc};
use shpool_protocol::{ConnectHeader, ListReply};

use crate::{protocol, protocol::ClientResult};

pub fn run(socket: PathBuf) -> anyhow::Result<()> {
pub fn run(socket: PathBuf, json_output: bool) -> anyhow::Result<()> {
let mut client = match protocol::Client::new(socket) {
Ok(ClientResult::JustClient(c)) => c,
Ok(ClientResult::VersionMismatch { warning, client }) => {
Expand All @@ -38,12 +39,16 @@ pub fn run(socket: PathBuf) -> anyhow::Result<()> {
client.write_connect_header(ConnectHeader::List).context("sending list connect header")?;
let reply: ListReply = client.read_reply().context("reading reply")?;

println!("NAME\tSTARTED_AT\tSTATUS");
for session in reply.sessions.iter() {
let started_at =
time::UNIX_EPOCH + time::Duration::from_millis(session.started_at_unix_ms as u64);
let started_at = chrono::DateTime::<chrono::Utc>::from(started_at);
println!("{}\t{}\t{}", session.name, started_at.to_rfc3339(), session.status);
if json_output {
println!("{}", serde_json::to_string_pretty(&reply)?);
} else {
println!("NAME\tSTARTED_AT\tSTATUS");
for session in reply.sessions.iter() {
let started_at =
time::UNIX_EPOCH + time::Duration::from_millis(session.started_at_unix_ms as u64);
let started_at = DateTime::<Utc>::from(started_at);
println!("{}\t{}\t{}", session.name, started_at.to_rfc3339(), session.status);
}
}

Ok(())
Expand Down
4 changes: 4 additions & 0 deletions shpool-protocol/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,10 @@ pub struct Session {
#[serde(default)]
pub started_at_unix_ms: i64,
#[serde(default)]
pub last_connected_at_unix_ms: Option<i64>,
#[serde(default)]
pub last_disconnected_at_unix_ms: Option<i64>,
#[serde(default)]
pub status: SessionStatus,
}

Expand Down
Loading