Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ byteorder = "1.5.0"
windows-service = "0.8.0"

[target.'cfg(unix)'.dependencies]
nix = { version = "0.31", default-features = false, features = ["socket", "signal", "fs"] }
libc.workspace = true
nix = { version = "0.31", default-features = false, features = ["socket", "signal", "fs", "resource"] }

[target.'cfg(target_os = "linux")'.dependencies]
netlink-packet-utils = "0.5.2"
Expand Down
40 changes: 40 additions & 0 deletions lib/file-source/src/file_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ where
pub remove_after: Option<Duration>,
pub emitter: E,
pub rotate_wait: Duration,
pub max_open_files: Option<usize>,
}

/// `FileServer` as Source
Expand Down Expand Up @@ -185,6 +186,21 @@ where
for (_file_id, watcher) in &mut fp_map {
watcher.set_file_findable(false); // assume not findable until found
}

// Pre-build eviction candidates sorted by last_read_success (oldest first).
// This avoids O(n) scans per eviction when many new files appear at once.
let eviction_candidates: Vec<FileFingerprint> = if self.max_open_files.is_some() {
let mut candidates: Vec<_> = fp_map
.iter()
.map(|(&fid, w)| (w.last_read_success(), fid))
.collect();
candidates.sort_unstable();
candidates.into_iter().map(|(_, fid)| fid).collect()
} else {
Vec::new()
};
let mut eviction_idx = 0;

for path in self.paths_provider.paths().into_iter() {
if let Some(file_id) = self
.fingerprinter
Expand Down Expand Up @@ -230,6 +246,30 @@ where
}
} else {
// untracked file fingerprint
// Check max_open_files limit before adding new file
if let Some(max) = self.max_open_files
&& fp_map.len() >= max
{
while eviction_idx < eviction_candidates.len() {
let evict_id = eviction_candidates[eviction_idx];
eviction_idx += 1;
// Skip candidates already removed from fp_map
if let Some(watcher) = fp_map.swap_remove(&evict_id) {
info!(
message = "Evicting least recently read file due to max_open_files limit.",
evicted_path = ?watcher.path,
new_path = ?path,
max_open_files = max,
);
self.emitter.emit_file_unwatched(
&watcher.path,
watcher.reached_eof(),
);
checkpoints.set_dead(evict_id);
break;
}
}
}
self.watch_new_file(path, file_id, &mut fp_map, &checkpoints, false)
.await;
self.emitter.emit_files_open(fp_map.len());
Expand Down
163 changes: 163 additions & 0 deletions src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,173 @@ impl RootOpts {
}
}

#[cfg(unix)]
raise_file_descriptor_limit();

crate::metrics::init_global().expect("metrics initialization failed");
}
}

/// Raise the soft file descriptor limit (RLIMIT_NOFILE) as high as the OS allows.
///
/// Many systems default the soft limit to 1024 (Linux) or 256 (macOS), which is too low
/// for Vector when it monitors large numbers of log files. Raising it prevents
/// "Too many open files (os error 24)" errors without requiring manual sysadmin intervention.
///
/// On Linux, the soft limit is raised to the hard limit (typically 65536+).
/// On macOS, the hard limit can be RLIM_INFINITY, so we first try the hard limit,
/// then fall back to the kernel-enforced `kern.maxfilesperproc` (typically 10240).
#[cfg(unix)]
fn raise_file_descriptor_limit() {
use nix::sys::resource::{Resource, getrlimit, setrlimit};
use tracing::{info, warn};

let (soft, hard) = match getrlimit(Resource::RLIMIT_NOFILE) {
Ok(limits) => limits,
Err(err) => {
warn!(message = "Failed to get file descriptor limit.", %err);
return;
}
};

if soft >= hard {
return; // Already at maximum
}

// Try setting soft limit to hard limit (works on Linux, may fail on macOS)
if setrlimit(Resource::RLIMIT_NOFILE, hard, hard).is_ok() {
info!(
message = "Raised file descriptor limit.",
from = soft,
to = hard,
);
return;
}

// On macOS, the hard limit can be RLIM_INFINITY which setrlimit rejects.
// Fall back to the kernel-enforced kern.maxfilesperproc.
#[cfg(target_os = "macos")]
{
if let Some(maxfiles) = macos_maxfilesperproc()
&& maxfiles > soft
&& setrlimit(Resource::RLIMIT_NOFILE, maxfiles, hard).is_ok()
{
info!(
message = "Raised file descriptor limit.",
from = soft,
to = maxfiles,
);
return;
}
}

warn!(
message = "Failed to raise file descriptor limit.",
current = soft,
attempted = hard,
);
}

/// Query the macOS kernel limit on per-process open files.
#[cfg(target_os = "macos")]
fn macos_maxfilesperproc() -> Option<libc::rlim_t> {
let mut maxfiles: libc::c_int = 0;
let mut len = std::mem::size_of::<libc::c_int>() as libc::size_t;
// Safety: sysctlbyname with a valid null-terminated name and correctly sized output buffer.
// No safe wrapper exists for this macOS-specific call.
let ret = unsafe {
libc::sysctlbyname(
c"kern.maxfilesperproc".as_ptr(),
&mut maxfiles as *mut libc::c_int as *mut libc::c_void,
&mut len,
std::ptr::null_mut(),
0,
)
};
if ret == 0 && maxfiles > 0 {
Some(maxfiles as libc::rlim_t)
} else {
None
}
}

#[cfg(test)]
mod tests {
#[test]
#[cfg(unix)]
fn test_raise_file_descriptor_limit() {
use nix::sys::resource::{Resource, getrlimit, setrlimit};

// Save original limits
let (original_soft, hard) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();

// Lower the soft limit to simulate a constrained environment
let lowered = std::cmp::min(original_soft, 256);
if lowered < hard {
setrlimit(Resource::RLIMIT_NOFILE, lowered, hard).unwrap();

// Verify it was lowered
let (soft_before, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();
assert_eq!(soft_before, lowered);

// Call the function under test
super::raise_file_descriptor_limit();

// Verify the soft limit was raised above the lowered value
let (soft_after, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();
assert!(
soft_after > lowered,
"Expected soft limit to be raised above {lowered}, got {soft_after}"
);

// Restore original limits
setrlimit(Resource::RLIMIT_NOFILE, original_soft, hard).unwrap();
}
}

#[test]
#[cfg(unix)]
fn test_raise_file_descriptor_limit_already_at_max() {
use nix::sys::resource::{Resource, getrlimit, setrlimit};

// Save original limits
let (original_soft, hard) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();

// Set soft = hard so there's nothing to raise
if setrlimit(Resource::RLIMIT_NOFILE, hard, hard).is_err() {
#[cfg(target_os = "macos")]
if let Some(maxfiles) = super::macos_maxfilesperproc() {
let _ = setrlimit(Resource::RLIMIT_NOFILE, maxfiles, hard);
}
}

let (soft_before, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();

// Call the function — should be a no-op
super::raise_file_descriptor_limit();

let (soft_after, _) = getrlimit(Resource::RLIMIT_NOFILE).unwrap();
assert_eq!(soft_before, soft_after);

// Restore original limits
setrlimit(Resource::RLIMIT_NOFILE, original_soft, hard).unwrap();
}

#[test]
#[cfg(target_os = "macos")]
fn test_macos_maxfilesperproc_returns_positive() {
let result = super::macos_maxfilesperproc();
assert!(
result.is_some(),
"macos_maxfilesperproc() should return Some on macOS"
);
assert!(
result.unwrap() > 0,
"kern.maxfilesperproc should be positive"
);
}
}

#[derive(Parser, Debug)]
#[command(rename_all = "kebab-case")]
pub enum SubCommand {
Expand Down
Loading