# <@LICENSE> # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to you under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at: # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # =head1 NAME Mail::SpamAssassin::AsyncLoop - scanner asynchronous event loop =head1 DESCRIPTION An asynchronous event loop used for long-running operations, performed "in the background" during the Mail::SpamAssassin::check() scan operation, such as DNS blocklist lookups. =head1 METHODS =over 4 =cut package Mail::SpamAssassin::AsyncLoop; use strict; use warnings; use bytes; use Mail::SpamAssassin; use Mail::SpamAssassin::Logger; our @ISA = qw(); ############################################################################# sub new { my $class = shift; $class = ref($class) || $class; my ($main) = @_; my $self = { main => $main, last_count => 0, times_count_was_same => 0, queries_started => 0, queries_completed => 0, pending_lookups => { } }; bless ($self, $class); $self; } # --------------------------------------------------------------------------- =item $obj = $async->start_lookup($obj) Register the start of a long-running asynchronous lookup operation. C<$obj> is a hash reference containing the following items: =over 4 =item key (required) A key string, unique to this lookup. This is what is reported in debug messages, used as the key for C, etc. =item id (required) An ID string, also unique to this lookup. Typically, this is the DNS packet ID as returned by DnsResolver's C method. Sadly, the Net::DNS architecture forces us to keep a separate ID string for this task instead of reusing C -- if you are not using DNS lookups through DnsResolver, it should be OK to just reuse C. =item type (required) A string, typically one word, used to describe the type of lookup in log messages, such as C, C, C. =item poll_callback (optional) A code reference, which will be called periodically during the background-processing period. If you will be performing an async lookup on a non-DNS-based service, you will need to implement this so that it checks for new responses and calls C or C as appropriate. DNS-based lookups can leave it undefined, since DnsResolver::poll_responses() will be called automatically anyway. The code reference will be called with one argument, the C<$ent> object. =item completed_callback (optional) A code reference, which will be called when the lookup has been reported as complete via C or C. The code reference will be called with one argument, the C<$ent> object. =back C<$obj> is returned by this method. =cut sub start_lookup { my ($self, $ent) = @_; die "oops, no id" unless $ent->{id}; die "oops, no key" unless $ent->{key}; die "oops, no type" unless $ent->{type}; $self->{queries_started}++; $self->{pending_lookups}->{$ent->{key}} = $ent; $self->{last_start_lookup_time} = time; $ent; } # --------------------------------------------------------------------------- =item $obj = $async->get_lookup($key) Retrieve the pending-lookup object for the given key C<$key>. If the lookup is complete, this will return C. Note that a lookup is still considered "pending" until C is called, even if it has been reported as complete via C or C. =cut sub get_lookup { my ($self, $key) = @_; return $self->{pending_lookups}->{$key}; } # --------------------------------------------------------------------------- =item @objs = $async->get_pending_lookups() Retrieve the lookup objects for all pending lookups. Note that a lookup is still considered "pending" until C is called, even if it has been reported as complete via C or C. =cut sub get_pending_lookups { my ($self) = @_; return values %{$self->{pending_lookups}}; } # --------------------------------------------------------------------------- =item $alldone = $async->complete_lookups() Perform a poll of the pending lookups, to see if any are completed; if they are, their is called with the entry object for that lookup. If there are no lookups remaining, or if too long has elapsed since any results were returned, C<1> is returned, otherwise C<0>. =cut sub complete_lookups { my ($self, $timeout) = @_; my %typecount = (); my $stillwaiting = 0; my $pending = $self->{pending_lookups}; if (scalar keys %{$pending} <= 0) { return 1; # nothing left to do } $self->{queries_started} = 0; $self->{queries_completed} = 0; # trap this loop in an eval { } block, as Net::DNS could throw # die()s our way; in particular, process_dnsbl_results() has # thrown die()s before (bug 3794). eval { my $nfound = $self->{main}->{resolver}->poll_responses($timeout); $nfound ||= 'no'; dbg ("async: select found $nfound socks ready"); foreach my $key (keys %{$pending}) { my $ent = $pending->{$key}; # call a "poll_callback" sub, if one exists if (defined $ent->{poll_callback}) { $ent->{poll_callback}->($ent); } my $type = $ent->{type}; if (!exists ($self->{finished}->{$ent->{id}})) { $typecount{$type}++; next; } $ent->{response_packet} = delete $self->{finished}->{$ent->{id}}; if (defined $ent->{completed_callback}) { $ent->{completed_callback}->($ent); } $self->{queries_completed}++; delete $self->{pending_lookups}->{$key}; } dbg("async: queries completed: ".$self->{queries_completed}. " started: ".$self->{queries_started}); if (1) { dbg("async: queries active: ". join (' ', map { "$_=$typecount{$_}" } sort keys %typecount)." at ". localtime(time)); } # ensure we don't get stuck if a request gets lost in the ether. if (!$stillwaiting) { my $numkeys = scalar keys %{$self->{pending_lookups}}; if ($numkeys == 0) { $stillwaiting = 0; } else { $stillwaiting = 1; # avoid looping forever if we haven't got all results. if ($self->{last_count} == $numkeys) { $self->{times_count_was_same}++; if ($self->{times_count_was_same} > 20) { dbg("async: escaping: must have lost requests"); $self->abort_remaining_lookups(); $stillwaiting = 0; } } else { $self->{last_count} = $numkeys; $self->{times_count_was_same} = 0; } } } }; if ($@) { dbg("async: caught complete_lookups death, aborting: $@"); $stillwaiting = 0; # abort remaining } return (!$stillwaiting); } # --------------------------------------------------------------------------- =item $async->abort_remaining_lookups() Abort any remaining lookups. =cut sub abort_remaining_lookups { my ($self) = @_; my $pending = $self->{pending_lookups}; my $foundone = 0; foreach my $key (keys %{$pending}) { if (!$foundone) { dbg("async: aborting remaining lookups"); $foundone = 1; } delete $pending->{$key}; } delete $self->{last_start_lookup_time}; $self->{main}->{resolver}->bgabort(); } # --------------------------------------------------------------------------- =item $async->set_response_packet($id, $pkt) Register a "response packet" for a given query. C<$id> is the ID for the query, and must match the C supplied in C. C<$pkt> is the packet object for the response. If this was called, C<$pkt> will be available in the C function as C<$ent-{response_packet}>. One or the other of C or C should be called, but not both. =cut sub set_response_packet { my ($self, $id, $pkt) = @_; $self->{finished}->{$id} = $pkt; } =item $async->report_id_complete($id) Register that a query has completed, and is no longer "pending". C<$id> is the ID for the query, and must match the C supplied in C. One or the other of C or C should be called, but not both. =cut sub report_id_complete { my ($self, $id) = @_; $self->{finished}->{$id} = undef; } # --------------------------------------------------------------------------- =item $time = $async->get_last_start_lookup_time() Get the time of the last call to C. If C was never called or C has been called C will return undef. =cut sub get_last_start_lookup_time { my ($self) = @_; return $self->{last_start_lookup_time}; } # --------------------------------------------------------------------------- 1; =back =cut