# spamd prefork scaling, using an Apache-based algorithm # # <@LICENSE> # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to you under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # package Mail::SpamAssassin::SpamdForkScaling; use strict; use warnings; use bytes; use Errno qw(); use Mail::SpamAssassin::Util; use Mail::SpamAssassin::Logger; use Mail::SpamAssassin::Timeout; use vars qw { @PFSTATE_VARS %EXPORT_TAGS @EXPORT_OK }; use base qw( Exporter ); @PFSTATE_VARS = qw( PFSTATE_ERROR PFSTATE_STARTING PFSTATE_IDLE PFSTATE_BUSY PFSTATE_KILLED PFORDER_ACCEPT PFSTATE_GOT_SIGCHLD ); %EXPORT_TAGS = ( 'pfstates' => [ @PFSTATE_VARS ] ); @EXPORT_OK = ( @PFSTATE_VARS ); use constant PFSTATE_ERROR => -1; use constant PFSTATE_STARTING => 0; use constant PFSTATE_IDLE => 1; use constant PFSTATE_BUSY => 2; use constant PFSTATE_KILLED => 3; use constant PFSTATE_GOT_SIGCHLD => 4; use constant PFORDER_ACCEPT => 10; ########################################################################### # change to 1 to enable the below test instrumentation points use constant SUPPORT_TEST_INSTRUMENTATION => 0; # test instrumentation point: simulate random child failures in 1 in # every N lookups our $TEST_MODE_CAUSE_RANDOM_KID_FAILURES = 0; # test instrumentation point: simulate child->parent and parent->child # write failures (needing retries) once in every N syswrite()s our $TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES = 0; # test instrumentation point: simulate ping failures (for unspecified # reasons) once in every N pings our $TEST_MODE_CAUSE_RANDOM_PING_FAILURES = 0; ########################################################################### # we use the following protocol between the master and child processes to # control when they accept/who accepts: server tells a child to accept with a # PF_ACCEPT_ORDER, child responds with "B$pid\n" when it's busy, and "I$pid\n" # once it's idle again. In addition, the parent sends PF_PING_ORDER # periodically to ping the child processes. Very simple protocol. Note that # the $pid values are packed into 4 bytes so that the buffers are always of a # known length; if you need to transfer longer data, assign a new protocol verb # (the first char) and use the length of the following data buffer as the # packed value. use constant PF_ACCEPT_ORDER => "A....\n"; use constant PF_PING_ORDER => "P....\n"; # timeout for a sysread() on the command channel. if we go this long # without a message from the spamd parent or child, it's an error. use constant TOUT_READ_MAX => 300; # interval between "ping" messages from the spamd parent to all children, # used as a sanity check to ensure TOUT_READ_MAX isn't hit when things # are functional. use constant TOUT_PING_INTERVAL => 150; ########################################################################### sub new { my $class = shift; $class = ref($class) || $class; my $self = shift; if (!defined $self) { $self = { }; } bless ($self, $class); $self->{kids} = { }; $self->{overloaded} = 0; $self->{min_children} ||= 1; $self->{server_last_ping} = time; $self; } ########################################################################### # Parent methods sub add_child { my ($self, $pid) = @_; $self->set_child_state ($pid, PFSTATE_STARTING); } # this is called by the SIGCHLD handler in spamd. The idea is that # main_ping_kids etc. can mark a child as probably dead ("K" state), but until # SIGCHLD is received, the process is still around (in some form), so it # shouldn't be removed from the list until it's confirmed dead. # sub child_exited { my ($self, $pid) = @_; dbg("prefork: child $pid: just exited"); # defer removal from the list until after return from the signal # handler; it seems that we may be corrupting the list structure # by deleting the {kids} hash entry from a sig handler. (bug 5422) $self->set_child_state ($pid, PFSTATE_GOT_SIGCHLD); # note this for the select()-caller's benefit $self->{child_just_exited} = 1; } sub post_sigchld_cleanup { my ($self) = @_; my @pids = grep { $self->{kids}->{$_} == PFSTATE_GOT_SIGCHLD } keys %{$self->{kids}}; return unless @pids; foreach my $pid (@pids) { delete $self->{kids}->{$pid}; # remove from list # remove the child from the backchannel list, too $self->{backchannel}->delete_socket_for_child($pid); } # ensure we recompute, so that we don't try to tell that child to # accept a request, only to find that it's died in the meantime. $self->compute_lowest_child_pid(); } # this is called by SIGTERM and SIGHUP handlers, to ensure that new # kids aren't added while the main code is killing the old ones # and planning to exit. # sub set_exiting_flag { my ($self) = @_; $self->{am_exiting} = 1; } sub child_error_kill { my ($self, $pid, $sock) = @_; warn "prefork: killing failed child $pid fd=". ((defined $sock && defined $sock->fileno) ? $sock->fileno : "undefined"); # close the socket and remove the child from our list $self->set_child_state ($pid, PFSTATE_KILLED); kill 'INT' => $pid or warn "prefork: kill of failed child $pid failed: $!\n"; $self->{backchannel}->delete_socket_for_child($pid); if (defined $sock && defined $sock->fileno()) { $self->{backchannel}->remove_from_selector($sock); } if ($sock) { $sock->close; } warn "prefork: killed child $pid\n"; } sub set_child_state { my ($self, $pid, $state) = @_; # I keep misreading this -- so: this says, if the child is starting, or is # dying, or it has an entry in the {kids} hash, then allow the state to be # set. otherwise the update can be ignored. if ($state == PFSTATE_STARTING || $state == PFSTATE_KILLED || $state == PFSTATE_GOT_SIGCHLD || exists $self->{kids}->{$pid}) { $self->{kids}->{$pid} = $state; dbg("prefork: child $pid: entering state $state"); $self->compute_lowest_child_pid(); } else { dbg("prefork: child $pid: ignored new state $state, already exited?"); } } sub compute_lowest_child_pid { my ($self) = @_; my @pids = grep { $self->{kids}->{$_} == PFSTATE_IDLE } keys %{$self->{kids}}; my $l = shift @pids; foreach my $p (@pids) { if ($l > $p) { $l = $p }; } $self->{lowest_idle_pid} = $l; dbg("prefork: new lowest idle kid: ". ($self->{lowest_idle_pid} ? $self->{lowest_idle_pid} : 'none')); } ########################################################################### sub set_server_fh { my ($self, @fhs) = @_; $self->{server_fh} = []; $self->{server_fileno} = []; foreach my $fh (@fhs) { next unless defined $fh; push @{$self->{server_fh}}, $fh; push @{$self->{server_fileno}}, $fh->fileno(); } } sub main_server_poll { my ($self, $tout) = @_; my $rin = ${$self->{backchannel}->{selector}}; if ($self->{overloaded}) { # don't select on the server fh -- we already KNOW that's ready, # since we're overloaded $self->vec_all(\$rin, $self->{server_fileno}, 0); } # clean up any fresh zombies before we select() $self->post_sigchld_cleanup(); my ($rout, $eout, $nfound, $timeleft, $selerr); # use alarm to back up select()'s built-in alarm, to debug Theo's bug. # not that I can remember what Theo's bug was, but hey ;) A good # 60 seconds extra on the alarm() should make that quite rare... my $timer = Mail::SpamAssassin::Timeout->new({ secs => ($tout*2) + 60 }); $timer->run(sub { # right before select() syscall, but after alarm(), eval scope, etc. $self->{child_just_exited} = 0; ($nfound, $timeleft) = select($rout=$rin, undef, $eout=$rin, $tout); $selerr = $!; }); # in case any kids exited during select() $self->post_sigchld_cleanup(); # bug 4696: under load, the process can go for such a long time without # being context-switched in, that when it does return the alarm() fires # before the select() timeout does. Treat this as a select() timeout if ($timer->timed_out) { dbg("prefork: select timed out (via alarm)"); $nfound = 0; $timeleft = 0; } # errors; handle undef *or* -1 returned. do this before "errors on # the handle" below, since an error condition is signalled both via # a -1 return and a $eout bit. if (!defined $nfound || $nfound < 0) { if (exists &Errno::EINTR && $selerr == &Errno::EINTR) { # this happens if the process is signalled during the select(), # for example if someone sends SIGHUP to reload the configuration. # just return inmmediately dbg("prefork: select returned err $selerr, probably signalled"); return; } # if a child exits during that select() call, it generates a spurious # error, like this: # # Jan 29 12:53:17 dogma spamd[18518]: prefork: child states: BI # Jan 29 12:53:17 dogma spamd[18518]: spamd: handled cleanup of child pid 13101 due to SIGCHLD # Jan 29 12:53:17 dogma spamd[18518]: prefork: select returned -1! recovering: # # avoid by setting a boolean in the child_exited() callback and checking # it here. log $! just in case, though. if ($self->{child_just_exited} && $nfound == -1) { dbg("prefork: select returned -1 due to child exiting, ignored ($selerr)"); return; } warn "prefork: select returned ". (defined $nfound ? $nfound : "undef"). "! recovering: $selerr\n"; sleep 1; # avoid overload return; } # errors on the handle? # return them immediately, they may be from a SIGHUP restart signal if ($self->vec_all(\$eout, $self->{server_fileno})) { warn "prefork: select returned error on server filehandle: $selerr $!\n"; return; } # any action? if (!$nfound) { # none. periodically ping the children though just to ensure # they're still alive and can hear us my $now = time; if ($now - $self->{server_last_ping} > TOUT_PING_INTERVAL) { $self->main_ping_kids($now); } return; } # were the kids ready, or did we get signal? if ($self->vec_all(\$rout, $self->{server_fileno})) { # dbg("prefork: server fh ready"); # the server socket: new connection from a client if (!$self->order_idle_child_to_accept()) { # dbg("prefork: no idle kids, noting overloaded"); # there are no idle kids! we're overloaded, mark that $self->{overloaded} = 1; } return; } # otherwise it's a status report from a child. foreach my $fh ($self->{backchannel}->select_vec_to_fh_list($rout)) { # just read one line. if there's more lines, we'll get them # when we re-enter the can_read() select call above... if ($self->read_one_message_from_child_socket($fh) == PFSTATE_IDLE) { dbg("prefork: child reports idle"); if ($self->{overloaded}) { # if we were overloaded, then now that this kid is idle, # we can use it to handle the waiting connection. zero # the overloaded flag, anyway; if there's >1 waiting # conn, they'll show up next time we do the select. dbg("prefork: overloaded, immediately telling kid to accept"); if (!$self->order_idle_child_to_accept()) { # this can happen if something is buggy in the child, and # it has to be killed, resulting in no idle kids left warn "prefork: lost idle kids, so still overloaded"; $self->{overloaded} = 1; } else { dbg("prefork: no longer overloaded"); $self->{overloaded} = 0; } } } } # now that we've ordered some kids to accept any new connections, # increase/decrease the pool as necessary $self->adapt_num_children(); } sub main_ping_kids { my ($self, $now) = @_; $self->{server_last_ping} = $now; keys %{$self->{backchannel}->{kids}}; # reset each() iterator my ($sock, $kid); while (($kid, $sock) = each %{$self->{backchannel}->{kids}}) { # if the file handle is still defined ping the child # bug 4852: if not, we've run into a race condition with the child's # SIGCHLD handler... try killing again just in case something else happened if (SUPPORT_TEST_INSTRUMENTATION && $TEST_MODE_CAUSE_RANDOM_PING_FAILURES && rand $TEST_MODE_CAUSE_RANDOM_PING_FAILURES < 1) { warn "prefork: TEST_MODE_CAUSE_RANDOM_PING_FAILURES simulating ping failure"; } elsif (defined $sock && defined $sock->fileno) { $self->syswrite_with_retry($sock, PF_PING_ORDER, $kid, 3) and next; warn "prefork: write of ping failed to $kid fd=".$sock->fileno.": ".$!; } else { warn "prefork: cannot ping $kid, file handle not defined, child likely ". "to still be processing SIGCHLD handler after killing itself\n"; } # note: this is safe according to the note in perldoc -f each; 'it is # always safe to delete the item most recently returned by each()' $self->child_error_kill($kid, $sock); } } sub read_one_message_from_child_socket { my ($self, $sock) = @_; # "I b1 b2 b3 b4 \n " or "B b1 b2 b3 b4 \n " my $line; my $nbytes = $self->sysread_with_timeout($sock, \$line, 6, TOUT_READ_MAX); if (!defined $nbytes || $nbytes == 0) { dbg("prefork: child closed connection"); # stop it being select'd my $fno = $sock->fileno; if (defined $fno) { $self->{backchannel}->remove_from_selector($sock); $sock->close(); } return PFSTATE_ERROR; } if ($nbytes < 6) { warn("prefork: child gave short message: len=$nbytes bytes=". join(" ", unpack "C*", $line)); } chomp $line; if ($line =~ s/^I//) { my $pid = unpack("N1", $line); $self->set_child_state ($pid, PFSTATE_IDLE); return PFSTATE_IDLE; } elsif ($line =~ s/^B//) { my $pid = unpack("N1", $line); $self->set_child_state ($pid, PFSTATE_BUSY); return PFSTATE_BUSY; } else { die "prefork: unknown message from child: '$line'"; return PFSTATE_ERROR; } } ########################################################################### sub order_idle_child_to_accept { my ($self) = @_; my $kid = $self->{lowest_idle_pid}; if (defined $kid) { my $sock = $self->{backchannel}->get_socket_for_child($kid); if (SUPPORT_TEST_INSTRUMENTATION && $TEST_MODE_CAUSE_RANDOM_KID_FAILURES) { if (rand $TEST_MODE_CAUSE_RANDOM_KID_FAILURES < 1) { $sock = undef; warn "prefork: TEST_MODE_CAUSE_RANDOM_KID_FAILURES simulating no socket for kid $kid"; } } if (!$sock) { # this should not happen, but if it does, trap it here # before we attempt to call a method on an undef object warn "prefork: oops! no socket for child $kid, killing"; $self->child_error_kill($kid, $sock); # retry with another child return $self->order_idle_child_to_accept(); } if (!$self->syswrite_with_retry($sock, PF_ACCEPT_ORDER, $kid)) { # failure to write to the child; bad news. call it dead warn "prefork: killing rogue child $kid, failed to write on fd ".$sock->fileno.": $!\n"; $self->child_error_kill($kid, $sock); # retry with another child return $self->order_idle_child_to_accept(); } dbg("prefork: ordered $kid to accept"); # now wait for it to say it's done that my $ret = $self->wait_for_child_to_accept($kid, $sock); if ($ret) { return $ret; } else { # retry with another child return $self->order_idle_child_to_accept(); } } else { dbg("prefork: no spare children to accept, waiting for one to complete"); return undef; } } sub wait_for_child_to_accept { my ($self, $kid, $sock) = @_; while (1) { my $state = $self->read_one_message_from_child_socket($sock); if ($state == PFSTATE_BUSY) { return 1; # 1 == success } if ($state == PFSTATE_ERROR) { return undef; } else { warn "prefork: ordered child $kid to accept, but they reported state '$state', killing rogue"; $self->child_error_kill($kid, $sock); $self->adapt_num_children(); sleep 1; return undef; } } } sub child_now_ready_to_accept { my ($self, $kid) = @_; if ($self->{waiting_for_idle_child}) { my $sock = $self->{backchannel}->get_socket_for_child($kid); $self->syswrite_with_retry($sock, PF_ACCEPT_ORDER, $kid) or die "prefork: $kid claimed it was ready, but write failed on fd ". $sock->fileno.": ".$!; $self->{waiting_for_idle_child} = 0; } } ########################################################################### # Child methods sub set_my_pid { my ($self, $pid) = @_; $self->{pid} = $pid; # save calling $$ all the time } sub update_child_status_idle { my ($self) = @_; # "I b1 b2 b3 b4 \n " $self->report_backchannel_socket("I".pack("N",$self->{pid})."\n"); } sub update_child_status_busy { my ($self) = @_; # "B b1 b2 b3 b4 \n " $self->report_backchannel_socket("B".pack("N",$self->{pid})."\n"); } sub report_backchannel_socket { my ($self, $str) = @_; my $sock = $self->{backchannel}->get_parent_socket(); $self->syswrite_with_retry($sock, $str, 'parent') or die "syswrite() to parent failed: $!"; } sub wait_for_orders { my ($self) = @_; my $sock = $self->{backchannel}->get_parent_socket(); while (1) { # "A . . . . \n " my $line; my $nbytes = $self->sysread_with_timeout($sock, \$line, 6, TOUT_READ_MAX); if (!defined $nbytes || $nbytes == 0) { if ($sock->eof()) { dbg("prefork: parent closed, exiting"); exit; } die "prefork: empty order from parent"; } if ($nbytes < 6) { warn("prefork: parent gave short message: len=$nbytes bytes=". join(" ", unpack "C*", $line)); } chomp $line; if (index ($line, "P") == 0) { # string starts with "P" = ping dbg("prefork: periodic ping from spamd parent"); next; } if (index ($line, "A") == 0) { # string starts with "A" = accept return PFORDER_ACCEPT; } else { die "prefork: unknown order from parent: '$line'"; } } } ########################################################################### sub sysread_with_timeout { my ($self, $sock, $lineref, $toread, $timeout) = @_; $$lineref = ''; # clear the output buffer my $readsofar = 0; my $deadline; # we only set this if the first read fails my $buf; retry_read: my $nbytes = $sock->sysread($buf, $toread); if (!defined $nbytes) { unless ((exists &Errno::EAGAIN && $! == &Errno::EAGAIN) || (exists &Errno::EWOULDBLOCK && $! == &Errno::EWOULDBLOCK)) { # an error that wasn't non-blocking I/O-related. that's serious return undef; } # ok, we didn't get it first time. we'll have to start using # select() and timeouts (which is slower). Don't warn just yet, # as it's quite acceptable in our design to have to "block" on # sysread()s here. my $now = time(); my $tout = $timeout; if (!defined $deadline) { # set this. it'll be close enough ;) $deadline = $now + $timeout; } elsif ($now > $deadline) { # timed out! report failure warn "prefork: sysread(".$sock->fileno.") failed after $timeout secs"; return undef; } else { $tout = $deadline - $now; # the remaining timeout $tout = 1 if ($tout <= 0); # ensure it's > 0 } dbg("prefork: sysread(".$sock->fileno.") not ready, wait max $tout secs"); my $rin = ''; vec($rin, $sock->fileno, 1) = 1; select($rin, undef, undef, $tout); goto retry_read; } elsif ($nbytes == 0) { # EOF return $readsofar; # may be a partial read, or 0 for EOF } elsif ($nbytes == $toread) { # a complete read, nice. $readsofar += $nbytes; $$lineref .= $buf; return $readsofar; } else { # we want to know about this. this is not supposed to happen! warn "prefork: partial read of $nbytes, toread=".$toread. "sofar=".$readsofar." fd=".$sock->fileno.", recovering"; $readsofar += $nbytes; $$lineref .= $buf; $toread -= $nbytes; goto retry_read; } die "assert: should not get here"; } sub syswrite_with_retry { my ($self, $sock, $buf, $targetname, $numretries) = @_; $numretries ||= 10; # default 10 retries my $written = 0; my $try = 0; retry_write: $try++; if ($try > 1) { warn "prefork: syswrite(".$sock->fileno.") to $targetname failed on try $try"; if ($try > $numretries) { warn "prefork: giving up"; return undef; } else { # give it 1 second to recover my $rout = ''; vec($rout, $sock->fileno, 1) = 1; select(undef, $rout, undef, 1); } } my $nbytes; if (SUPPORT_TEST_INSTRUMENTATION && $TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES && rand $TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES < 1) { warn "prefork: TEST_MODE_CAUSE_RANDOM_WRITE_RETRIES simulating write failure"; $nbytes = undef; $! = &Errno::EAGAIN; } else { $nbytes = $sock->syswrite($buf); } if (!defined $nbytes) { unless ((exists &Errno::EAGAIN && $! == &Errno::EAGAIN) || (exists &Errno::EWOULDBLOCK && $! == &Errno::EWOULDBLOCK)) { # an error that wasn't non-blocking I/O-related. that's serious return undef; } warn "prefork: retrying syswrite(): $!"; goto retry_write; } else { $written += $nbytes; $buf = substr($buf, $nbytes); if ($buf eq '') { return $written; # it's complete, we can return } else { warn "prefork: partial write of $nbytes to ". $targetname.", towrite=".length($buf). " sofar=".$written." fd=".$sock->fileno.", recovering"; goto retry_write; } } die "assert: should not get here"; } ########################################################################### # Master server code again # this is pretty much the algorithm from perform_idle_server_maintainance() in # Apache's "prefork" MPM. However: we don't do exponential server spawning, # since our servers are a lot more heavyweight than theirs is. sub adapt_num_children { my ($self) = @_; # don't start up new kids while main is working at killing the old ones return if $self->{am_exiting}; my $kids = $self->{kids}; my $statestr = ''; my $num_idle = 0; my @pids = sort { $a <=> $b } keys %{$kids}; my $num_servers = scalar @pids; foreach my $pid (@pids) { my $k = $kids->{$pid}; # note: race condition here. if a child exits between the keys() call # above, and this point, then $k will be undef here due to its deletion # from the hash in the SIGCHLD handler. This is harmless, but ugly, since # it produces a 'Use of uninitialized value in numeric eq (==)' warning at # the "== PFSTATE_IDLE" line below. next unless defined $k; if ($k == PFSTATE_IDLE) { $statestr .= 'I'; $num_idle++; } elsif ($k == PFSTATE_BUSY) { $statestr .= 'B'; } elsif ($k == PFSTATE_KILLED) { $statestr .= 'K'; } elsif ($k == PFSTATE_GOT_SIGCHLD) { $statestr .= 'Z'; } elsif ($k == PFSTATE_ERROR) { $statestr .= 'E'; } elsif ($k == PFSTATE_STARTING) { $statestr .= 'S'; } else { $statestr .= '?'; } } info("prefork: child states: ".$statestr."\n"); # just kill off/add one at a time, to avoid swamping stuff and # reacting too quickly; Apache emulation if ($num_idle < $self->{min_idle}) { if ($num_servers < $self->{max_children}) { $self->need_to_add_server($num_idle); } else { info("prefork: server reached --max-children setting, consider raising it\n"); } } elsif ($num_idle > $self->{max_idle} && $num_servers > $self->{min_children}) { $self->need_to_del_server($num_idle); } } sub need_to_add_server { my ($self, $num_idle) = @_; my $cur = ${$self->{cur_children_ref}}; $cur++; dbg("prefork: adjust: increasing, not enough idle children ($num_idle < $self->{min_idle})"); main::spawn(); # servers will be started once main_server_poll() returns } sub need_to_del_server { my ($self, $num_idle) = @_; my $cur = ${$self->{cur_children_ref}}; $cur--; my $pid; foreach my $k (keys %{$self->{kids}}) { my $v = $self->{kids}->{$k}; if ($v == PFSTATE_IDLE) { # kill the highest; Apache emulation, exploits linux scheduler # behaviour (and is predictable) if (!defined $pid || $k > $pid) { $pid = $k; } } } if (!defined $pid) { # this should be impossible. assert it die "prefork: oops! no idle kids in need_to_del_server?"; } # warning: race condition if these two lines are the other way around. # see bug 3983, comment 37 for details $self->set_child_state ($pid, PFSTATE_KILLED); kill 'INT' => $pid; dbg("prefork: adjust: decreasing, too many idle children ($num_idle > $self->{max_idle}), killed $pid"); } sub vec_all { my ($self, $bitsref, $fhs, $value) = @_; my $ret = 0; foreach my $fh (@{$fhs}) { next unless defined $fh; if (defined $value) { vec($$bitsref, $fh, 1) = $value; } else { $ret |= vec($$bitsref, $fh, 1); } } return $ret; } 1; __END__