SpamdForkScaling.pm [plain text]
package Mail::SpamAssassin::SpamdForkScaling;
use strict;
use warnings;
use bytes;
use Errno qw();
use Mail::SpamAssassin::Util;
use Mail::SpamAssassin::Logger;
use Mail::SpamAssassin::Timeout;
use vars qw {
@PFSTATE_VARS %EXPORT_TAGS @EXPORT_OK
};
use base qw( Exporter );
@PFSTATE_VARS = qw(
PFSTATE_ERROR PFSTATE_STARTING PFSTATE_IDLE PFSTATE_BUSY PFSTATE_KILLED
PFORDER_ACCEPT PFSTATE_GOT_SIGCHLD
);
%EXPORT_TAGS = (
'pfstates' => [ @PFSTATE_VARS ]
);
@EXPORT_OK = ( @PFSTATE_VARS );
use constant PFSTATE_ERROR => -1;
use constant PFSTATE_STARTING => 0;
use constant PFSTATE_IDLE => 1;
use constant PFSTATE_BUSY => 2;
use constant PFSTATE_KILLED => 3;
use constant PFSTATE_GOT_SIGCHLD => 4;
use constant PFORDER_ACCEPT => 10;
use constant SUPPORT_TEST_INSTRUMENTATION => 0;
our $TEST_MODE_CAUSE_RANDOM_KID_FAILURES = 0;
our $TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES = 0;
our $TEST_MODE_CAUSE_RANDOM_PING_FAILURES = 0;
use constant PF_ACCEPT_ORDER => "A....\n";
use constant PF_PING_ORDER => "P....\n";
use constant TOUT_READ_MAX => 300;
use constant TOUT_PING_INTERVAL => 150;
sub new {
my $class = shift;
$class = ref($class) || $class;
my $self = shift;
if (!defined $self) { $self = { }; }
bless ($self, $class);
$self->{kids} = { };
$self->{overloaded} = 0;
$self->{min_children} ||= 1;
$self->{server_last_ping} = time;
$self;
}
sub add_child {
my ($self, $pid) = @_;
$self->set_child_state ($pid, PFSTATE_STARTING);
}
sub child_exited {
my ($self, $pid) = @_;
dbg("prefork: child $pid: just exited");
$self->set_child_state ($pid, PFSTATE_GOT_SIGCHLD);
$self->{child_just_exited} = 1;
}
sub post_sigchld_cleanup {
my ($self) = @_;
my @pids = grep { $self->{kids}->{$_} == PFSTATE_GOT_SIGCHLD }
keys %{$self->{kids}};
return unless @pids;
foreach my $pid (@pids) {
delete $self->{kids}->{$pid};
$self->{backchannel}->delete_socket_for_child($pid);
}
$self->compute_lowest_child_pid();
}
sub set_exiting_flag {
my ($self) = @_;
$self->{am_exiting} = 1;
}
sub child_error_kill {
my ($self, $pid, $sock) = @_;
warn "prefork: killing failed child $pid fd=".
((defined $sock && defined $sock->fileno) ? $sock->fileno : "undefined");
$self->set_child_state ($pid, PFSTATE_KILLED);
kill 'INT' => $pid
or warn "prefork: kill of failed child $pid failed: $!\n";
$self->{backchannel}->delete_socket_for_child($pid);
if (defined $sock && defined $sock->fileno()) {
$self->{backchannel}->remove_from_selector($sock);
}
if ($sock) {
$sock->close;
}
warn "prefork: killed child $pid\n";
}
sub set_child_state {
my ($self, $pid, $state) = @_;
if ($state == PFSTATE_STARTING || $state == PFSTATE_KILLED ||
$state == PFSTATE_GOT_SIGCHLD || exists $self->{kids}->{$pid})
{
$self->{kids}->{$pid} = $state;
dbg("prefork: child $pid: entering state $state");
$self->compute_lowest_child_pid();
} else {
dbg("prefork: child $pid: ignored new state $state, already exited?");
}
}
sub compute_lowest_child_pid {
my ($self) = @_;
my @pids = grep { $self->{kids}->{$_} == PFSTATE_IDLE }
keys %{$self->{kids}};
my $l = shift @pids;
foreach my $p (@pids) {
if ($l > $p) { $l = $p };
}
$self->{lowest_idle_pid} = $l;
dbg("prefork: new lowest idle kid: ".
($self->{lowest_idle_pid} ? $self->{lowest_idle_pid} : 'none'));
}
sub set_server_fh {
my ($self, @fhs) = @_;
$self->{server_fh} = [];
$self->{server_fileno} = [];
foreach my $fh (@fhs) {
next unless defined $fh;
push @{$self->{server_fh}}, $fh;
push @{$self->{server_fileno}}, $fh->fileno();
}
}
sub main_server_poll {
my ($self, $tout) = @_;
my $rin = ${$self->{backchannel}->{selector}};
if ($self->{overloaded}) {
$self->vec_all(\$rin, $self->{server_fileno}, 0);
}
$self->post_sigchld_cleanup();
my ($rout, $eout, $nfound, $timeleft, $selerr);
my $timer = Mail::SpamAssassin::Timeout->new({ secs => ($tout*2) + 60 });
$timer->run(sub {
$self->{child_just_exited} = 0;
($nfound, $timeleft) = select($rout=$rin, undef, $eout=$rin, $tout);
$selerr = $!;
});
$self->post_sigchld_cleanup();
if ($timer->timed_out) {
dbg("prefork: select timed out (via alarm)");
$nfound = 0;
$timeleft = 0;
}
if (!defined $nfound || $nfound < 0)
{
if (exists &Errno::EINTR && $selerr == &Errno::EINTR)
{
dbg("prefork: select returned err $selerr, probably signalled");
return;
}
if ($self->{child_just_exited} && $nfound == -1) {
dbg("prefork: select returned -1 due to child exiting, ignored ($selerr)");
return;
}
warn "prefork: select returned ".
(defined $nfound ? $nfound : "undef").
"! recovering: $selerr\n";
sleep 1; return;
}
if ($self->vec_all(\$eout, $self->{server_fileno})) {
warn "prefork: select returned error on server filehandle: $selerr $!\n";
return;
}
if (!$nfound) {
my $now = time;
if ($now - $self->{server_last_ping} > TOUT_PING_INTERVAL) {
$self->main_ping_kids($now);
}
return;
}
if ($self->vec_all(\$rout, $self->{server_fileno})) {
if (!$self->order_idle_child_to_accept()) {
$self->{overloaded} = 1;
}
return;
}
foreach my $fh ($self->{backchannel}->select_vec_to_fh_list($rout))
{
if ($self->read_one_message_from_child_socket($fh) == PFSTATE_IDLE)
{
dbg("prefork: child reports idle");
if ($self->{overloaded}) {
dbg("prefork: overloaded, immediately telling kid to accept");
if (!$self->order_idle_child_to_accept()) {
warn "prefork: lost idle kids, so still overloaded";
$self->{overloaded} = 1;
}
else {
dbg("prefork: no longer overloaded");
$self->{overloaded} = 0;
}
}
}
}
$self->adapt_num_children();
}
sub main_ping_kids {
my ($self, $now) = @_;
$self->{server_last_ping} = $now;
keys %{$self->{backchannel}->{kids}}; my ($sock, $kid);
while (($kid, $sock) = each %{$self->{backchannel}->{kids}}) {
if (SUPPORT_TEST_INSTRUMENTATION && $TEST_MODE_CAUSE_RANDOM_PING_FAILURES &&
rand $TEST_MODE_CAUSE_RANDOM_PING_FAILURES < 1)
{
warn "prefork: TEST_MODE_CAUSE_RANDOM_PING_FAILURES simulating ping failure";
}
elsif (defined $sock && defined $sock->fileno) {
$self->syswrite_with_retry($sock, PF_PING_ORDER, $kid, 3) and next;
warn "prefork: write of ping failed to $kid fd=".$sock->fileno.": ".$!;
}
else {
warn "prefork: cannot ping $kid, file handle not defined, child likely ".
"to still be processing SIGCHLD handler after killing itself\n";
}
$self->child_error_kill($kid, $sock);
}
}
sub read_one_message_from_child_socket {
my ($self, $sock) = @_;
my $line;
my $nbytes = $self->sysread_with_timeout($sock, \$line, 6, TOUT_READ_MAX);
if (!defined $nbytes || $nbytes == 0) {
dbg("prefork: child closed connection");
my $fno = $sock->fileno;
if (defined $fno) {
$self->{backchannel}->remove_from_selector($sock);
$sock->close();
}
return PFSTATE_ERROR;
}
if ($nbytes < 6) {
warn("prefork: child gave short message: len=$nbytes bytes=".
join(" ", unpack "C*", $line));
}
chomp $line;
if ($line =~ s/^I//) {
my $pid = unpack("N1", $line);
$self->set_child_state ($pid, PFSTATE_IDLE);
return PFSTATE_IDLE;
}
elsif ($line =~ s/^B//) {
my $pid = unpack("N1", $line);
$self->set_child_state ($pid, PFSTATE_BUSY);
return PFSTATE_BUSY;
}
else {
die "prefork: unknown message from child: '$line'";
return PFSTATE_ERROR;
}
}
sub order_idle_child_to_accept {
my ($self) = @_;
my $kid = $self->{lowest_idle_pid};
if (defined $kid)
{
my $sock = $self->{backchannel}->get_socket_for_child($kid);
if (SUPPORT_TEST_INSTRUMENTATION && $TEST_MODE_CAUSE_RANDOM_KID_FAILURES) {
if (rand $TEST_MODE_CAUSE_RANDOM_KID_FAILURES < 1) {
$sock = undef; warn "prefork: TEST_MODE_CAUSE_RANDOM_KID_FAILURES simulating no socket for kid $kid";
}
}
if (!$sock)
{
warn "prefork: oops! no socket for child $kid, killing";
$self->child_error_kill($kid, $sock);
return $self->order_idle_child_to_accept();
}
if (!$self->syswrite_with_retry($sock, PF_ACCEPT_ORDER, $kid))
{
warn "prefork: killing rogue child $kid, failed to write on fd ".$sock->fileno.": $!\n";
$self->child_error_kill($kid, $sock);
return $self->order_idle_child_to_accept();
}
dbg("prefork: ordered $kid to accept");
my $ret = $self->wait_for_child_to_accept($kid, $sock);
if ($ret) {
return $ret;
} else {
return $self->order_idle_child_to_accept();
}
}
else {
dbg("prefork: no spare children to accept, waiting for one to complete");
return undef;
}
}
sub wait_for_child_to_accept {
my ($self, $kid, $sock) = @_;
while (1) {
my $state = $self->read_one_message_from_child_socket($sock);
if ($state == PFSTATE_BUSY) {
return 1; }
if ($state == PFSTATE_ERROR) {
return undef;
}
else {
warn "prefork: ordered child $kid to accept, but they reported state '$state', killing rogue";
$self->child_error_kill($kid, $sock);
$self->adapt_num_children();
sleep 1;
return undef;
}
}
}
sub child_now_ready_to_accept {
my ($self, $kid) = @_;
if ($self->{waiting_for_idle_child}) {
my $sock = $self->{backchannel}->get_socket_for_child($kid);
$self->syswrite_with_retry($sock, PF_ACCEPT_ORDER, $kid)
or die "prefork: $kid claimed it was ready, but write failed on fd ".
$sock->fileno.": ".$!;
$self->{waiting_for_idle_child} = 0;
}
}
sub set_my_pid {
my ($self, $pid) = @_;
$self->{pid} = $pid; }
sub update_child_status_idle {
my ($self) = @_;
$self->report_backchannel_socket("I".pack("N",$self->{pid})."\n");
}
sub update_child_status_busy {
my ($self) = @_;
$self->report_backchannel_socket("B".pack("N",$self->{pid})."\n");
}
sub report_backchannel_socket {
my ($self, $str) = @_;
my $sock = $self->{backchannel}->get_parent_socket();
$self->syswrite_with_retry($sock, $str, 'parent')
or die "syswrite() to parent failed: $!";
}
sub wait_for_orders {
my ($self) = @_;
my $sock = $self->{backchannel}->get_parent_socket();
while (1) {
my $line;
my $nbytes = $self->sysread_with_timeout($sock, \$line, 6, TOUT_READ_MAX);
if (!defined $nbytes || $nbytes == 0) {
if ($sock->eof()) {
dbg("prefork: parent closed, exiting");
exit;
}
die "prefork: empty order from parent";
}
if ($nbytes < 6) {
warn("prefork: parent gave short message: len=$nbytes bytes=".
join(" ", unpack "C*", $line));
}
chomp $line;
if (index ($line, "P") == 0) { dbg("prefork: periodic ping from spamd parent");
next;
}
if (index ($line, "A") == 0) { return PFORDER_ACCEPT;
}
else {
die "prefork: unknown order from parent: '$line'";
}
}
}
sub sysread_with_timeout {
my ($self, $sock, $lineref, $toread, $timeout) = @_;
$$lineref = ''; my $readsofar = 0;
my $deadline; my $buf;
retry_read:
my $nbytes = $sock->sysread($buf, $toread);
if (!defined $nbytes) {
unless ((exists &Errno::EAGAIN && $! == &Errno::EAGAIN)
|| (exists &Errno::EWOULDBLOCK && $! == &Errno::EWOULDBLOCK))
{
return undef;
}
my $now = time();
my $tout = $timeout;
if (!defined $deadline) {
$deadline = $now + $timeout;
}
elsif ($now > $deadline) {
warn "prefork: sysread(".$sock->fileno.") failed after $timeout secs";
return undef;
}
else {
$tout = $deadline - $now; $tout = 1 if ($tout <= 0); }
dbg("prefork: sysread(".$sock->fileno.") not ready, wait max $tout secs");
my $rin = '';
vec($rin, $sock->fileno, 1) = 1;
select($rin, undef, undef, $tout);
goto retry_read;
}
elsif ($nbytes == 0) { return $readsofar;
}
elsif ($nbytes == $toread) { $readsofar += $nbytes;
$$lineref .= $buf;
return $readsofar;
}
else {
warn "prefork: partial read of $nbytes, toread=".$toread.
"sofar=".$readsofar." fd=".$sock->fileno.", recovering";
$readsofar += $nbytes;
$$lineref .= $buf;
$toread -= $nbytes;
goto retry_read;
}
die "assert: should not get here";
}
sub syswrite_with_retry {
my ($self, $sock, $buf, $targetname, $numretries) = @_;
$numretries ||= 10;
my $written = 0;
my $try = 0;
retry_write:
$try++;
if ($try > 1) {
warn "prefork: syswrite(".$sock->fileno.") to $targetname failed on try $try";
if ($try > $numretries) {
warn "prefork: giving up";
return undef;
}
else {
my $rout = '';
vec($rout, $sock->fileno, 1) = 1;
select(undef, $rout, undef, 1);
}
}
my $nbytes;
if (SUPPORT_TEST_INSTRUMENTATION && $TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES &&
rand $TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES < 1)
{
warn "prefork: TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES simulating write failure";
$nbytes = undef; $! = &Errno::EAGAIN;
}
else {
$nbytes = $sock->syswrite($buf);
}
if (!defined $nbytes) {
unless ((exists &Errno::EAGAIN && $! == &Errno::EAGAIN)
|| (exists &Errno::EWOULDBLOCK && $! == &Errno::EWOULDBLOCK))
{
return undef;
}
warn "prefork: retrying syswrite(): $!";
goto retry_write;
}
else {
$written += $nbytes;
$buf = substr($buf, $nbytes);
if ($buf eq '') {
return $written; }
else {
warn "prefork: partial write of $nbytes to ".
$targetname.", towrite=".length($buf).
" sofar=".$written." fd=".$sock->fileno.", recovering";
goto retry_write;
}
}
die "assert: should not get here";
}
sub adapt_num_children {
my ($self) = @_;
return if $self->{am_exiting};
my $kids = $self->{kids};
my $statestr = '';
my $num_idle = 0;
my @pids = sort { $a <=> $b } keys %{$kids};
my $num_servers = scalar @pids;
foreach my $pid (@pids) {
my $k = $kids->{$pid};
next unless defined $k;
if ($k == PFSTATE_IDLE) {
$statestr .= 'I';
$num_idle++;
}
elsif ($k == PFSTATE_BUSY) {
$statestr .= 'B';
}
elsif ($k == PFSTATE_KILLED) {
$statestr .= 'K';
}
elsif ($k == PFSTATE_GOT_SIGCHLD) {
$statestr .= 'Z';
}
elsif ($k == PFSTATE_ERROR) {
$statestr .= 'E';
}
elsif ($k == PFSTATE_STARTING) {
$statestr .= 'S';
}
else {
$statestr .= '?';
}
}
info("prefork: child states: ".$statestr."\n");
if ($num_idle < $self->{min_idle}) {
if ($num_servers < $self->{max_children}) {
$self->need_to_add_server($num_idle);
} else {
info("prefork: server reached --max-children setting, consider raising it\n");
}
}
elsif ($num_idle > $self->{max_idle} && $num_servers > $self->{min_children}) {
$self->need_to_del_server($num_idle);
}
}
sub need_to_add_server {
my ($self, $num_idle) = @_;
my $cur = ${$self->{cur_children_ref}};
$cur++;
dbg("prefork: adjust: increasing, not enough idle children ($num_idle < $self->{min_idle})");
main::spawn();
}
sub need_to_del_server {
my ($self, $num_idle) = @_;
my $cur = ${$self->{cur_children_ref}};
$cur--;
my $pid;
foreach my $k (keys %{$self->{kids}}) {
my $v = $self->{kids}->{$k};
if ($v == PFSTATE_IDLE)
{
if (!defined $pid || $k > $pid) {
$pid = $k;
}
}
}
if (!defined $pid) {
die "prefork: oops! no idle kids in need_to_del_server?";
}
$self->set_child_state ($pid, PFSTATE_KILLED);
kill 'INT' => $pid;
dbg("prefork: adjust: decreasing, too many idle children ($num_idle > $self->{max_idle}), killed $pid");
}
sub vec_all {
my ($self, $bitsref, $fhs, $value) = @_;
my $ret = 0;
foreach my $fh (@{$fhs}) {
next unless defined $fh;
if (defined $value) {
vec($$bitsref, $fh, 1) = $value;
} else {
$ret |= vec($$bitsref, $fh, 1);
}
}
return $ret;
}
1;
__END__