AsyncLoop.pm   [plain text]


# <@LICENSE>
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to you under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at:
# 
#     http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# </@LICENSE>

=head1 NAME

Mail::SpamAssassin::AsyncLoop - scanner asynchronous event loop

=head1 DESCRIPTION

An asynchronous event loop used for long-running operations, performed "in the
background" during the Mail::SpamAssassin::check() scan operation, such as DNS
blocklist lookups.

=head1 METHODS

=over 4

=cut

package Mail::SpamAssassin::AsyncLoop;

use strict;
use warnings;
use bytes;

use Mail::SpamAssassin;
use Mail::SpamAssassin::Logger;

our @ISA = qw();

#############################################################################

sub new {
  my $class = shift;
  $class = ref($class) || $class;

  my ($main) = @_;
  my $self = {
    main                => $main,
    last_count          => 0,
    times_count_was_same => 0,
    queries_started     => 0,
    queries_completed   => 0,
    pending_lookups     => { }
  };

  bless ($self, $class);
  $self;
}

# ---------------------------------------------------------------------------

=item $obj = $async->start_lookup($obj)

Register the start of a long-running asynchronous lookup operation. C<$obj>
is a hash reference containing the following items:

=over 4

=item key (required)

A key string, unique to this lookup.  This is what is reported in
debug messages, used as the key for C<get_lookup()>, etc.

=item id (required)

An ID string, also unique to this lookup.  Typically, this is the DNS packet ID
as returned by DnsResolver's C<bgsend> method.  Sadly, the Net::DNS
architecture forces us to keep a separate ID string for this task instead of
reusing C<key> -- if you are not using DNS lookups through DnsResolver, it
should be OK to just reuse C<key>.

=item type (required)

A string, typically one word, used to describe the type of lookup in log
messages, such as C<DNSBL>, C<MX>, C<TXT>.

=item poll_callback (optional)

A code reference, which will be called periodically during the
background-processing period.  If you will be performing an async lookup on a
non-DNS-based service, you will need to implement this so that it checks for
new responses and calls C<set_response_packet()> or C<report_id_complete()> as
appropriate.   DNS-based lookups can leave it undefined, since
DnsResolver::poll_responses() will be called automatically anyway.

The code reference will be called with one argument, the C<$ent> object.

=item completed_callback (optional)

A code reference, which will be called when the lookup has been reported as
complete via C<set_response_packet()> or C<report_id_complete()>.

The code reference will be called with one argument, the C<$ent> object.

=back

C<$obj> is returned by this method.

=cut

sub start_lookup {
  my ($self, $ent) = @_;

  die "oops, no id"  unless $ent->{id};
  die "oops, no key" unless $ent->{key};
  die "oops, no type" unless $ent->{type};

  $self->{queries_started}++;
  $self->{pending_lookups}->{$ent->{key}} = $ent;
  $self->{last_start_lookup_time} = time;
  $ent;
}

# ---------------------------------------------------------------------------

=item $obj = $async->get_lookup($key)

Retrieve the pending-lookup object for the given key C<$key>.

If the lookup is complete, this will return C<undef>.

Note that a lookup is still considered "pending" until C<complete_lookups()> is
called, even if it has been reported as complete via C<set_response_packet()>
or C<report_id_complete()>.

=cut

sub get_lookup {
  my ($self, $key) = @_;
  return $self->{pending_lookups}->{$key};
}

# ---------------------------------------------------------------------------

=item @objs = $async->get_pending_lookups()

Retrieve the lookup objects for all pending lookups.

Note that a lookup is still considered "pending" until C<complete_lookups()> is
called, even if it has been reported as complete via C<set_response_packet()>
or C<report_id_complete()>.

=cut

sub get_pending_lookups {
  my ($self) = @_;
  return values %{$self->{pending_lookups}};
}

# ---------------------------------------------------------------------------

=item $alldone = $async->complete_lookups()

Perform a poll of the pending lookups, to see if any are completed; if they
are, their <completed_callback> is called with the entry object for that
lookup.

If there are no lookups remaining, or if too long has elapsed since any results
were returned, C<1> is returned, otherwise C<0>.

=cut

sub complete_lookups {
  my ($self, $timeout) = @_;
  my %typecount = ();
  my $stillwaiting = 0;

  my $pending = $self->{pending_lookups};
  if (scalar keys %{$pending} <= 0) {
    return 1;           # nothing left to do
  }

  $self->{queries_started} = 0;
  $self->{queries_completed} = 0;

  # trap this loop in an eval { } block, as Net::DNS could throw
  # die()s our way; in particular, process_dnsbl_results() has
  # thrown die()s before (bug 3794).
  eval {

    my $nfound = $self->{main}->{resolver}->poll_responses($timeout);
    $nfound ||= 'no';
    dbg ("async: select found $nfound socks ready");

    foreach my $key (keys %{$pending}) {
      my $ent = $pending->{$key};

      # call a "poll_callback" sub, if one exists
      if (defined $ent->{poll_callback}) {
        $ent->{poll_callback}->($ent);
      }

      my $type = $ent->{type};
      if (!exists ($self->{finished}->{$ent->{id}})) {
        $typecount{$type}++;
        next;
      }

      $ent->{response_packet} = delete $self->{finished}->{$ent->{id}};
      if (defined $ent->{completed_callback}) {
        $ent->{completed_callback}->($ent);
      }

      $self->{queries_completed}++;
      delete $self->{pending_lookups}->{$key};
    }

    dbg("async: queries completed: ".$self->{queries_completed}.
                  " started: ".$self->{queries_started});

    if (1) {
      dbg("async: queries active: ".
          join (' ', map { "$_=$typecount{$_}" } sort keys %typecount)." at ".
          localtime(time));
    }

    # ensure we don't get stuck if a request gets lost in the ether.
    if (!$stillwaiting) {
      my $numkeys = scalar keys %{$self->{pending_lookups}};
      if ($numkeys == 0) {
        $stillwaiting = 0;

      } else {
        $stillwaiting = 1;

        # avoid looping forever if we haven't got all results.
        if ($self->{last_count} == $numkeys) {
          $self->{times_count_was_same}++;
          if ($self->{times_count_was_same} > 20)
          {
            dbg("async: escaping: must have lost requests");
            $self->abort_remaining_lookups();
            $stillwaiting = 0;
          }
        } else {
          $self->{last_count} = $numkeys;
          $self->{times_count_was_same} = 0;
        }
      }
    }

  };

  if ($@) {
    dbg("async: caught complete_lookups death, aborting: $@");
    $stillwaiting = 0;      # abort remaining
  }

  return (!$stillwaiting);
}

# ---------------------------------------------------------------------------

=item $async->abort_remaining_lookups()

Abort any remaining lookups.

=cut

sub abort_remaining_lookups {
  my ($self) = @_;

  my $pending = $self->{pending_lookups};
  my $foundone = 0;
  foreach my $key (keys %{$pending})
  {
    if (!$foundone) {
      dbg("async: aborting remaining lookups");
      $foundone = 1;
    }

    delete $pending->{$key};
  }
  delete $self->{last_start_lookup_time};
  $self->{main}->{resolver}->bgabort();
}

# ---------------------------------------------------------------------------

=item $async->set_response_packet($id, $pkt)

Register a "response packet" for a given query.  C<$id> is the ID for the
query, and must match the C<id> supplied in C<start_lookup()>. C<$pkt> is the
packet object for the response.

If this was called, C<$pkt> will be available in the C<completed_callback>
function as C<$ent-<gt>{response_packet}>.

One or the other of C<set_response_packet()> or C<report_id_complete()>
should be called, but not both.

=cut

sub set_response_packet {
  my ($self, $id, $pkt) = @_;
  $self->{finished}->{$id} = $pkt;
}

=item $async->report_id_complete($id)

Register that a query has completed, and is no longer "pending". C<$id> is the
ID for the query, and must match the C<id> supplied in C<start_lookup()>.

One or the other of C<set_response_packet()> or C<report_id_complete()>
should be called, but not both.

=cut

sub report_id_complete {
  my ($self, $id) = @_;
  $self->{finished}->{$id} = undef;
}

# ---------------------------------------------------------------------------

=item $time = $async->get_last_start_lookup_time()

Get the time of the last call to C<start_lookup()>.  If C<start_lookup()> was
never called or C<abort_remaining_lookups()> has been called
C<get_last_start_lookup_time()> will return undef.

=cut

sub get_last_start_lookup_time {
  my ($self) = @_;
  return $self->{last_start_lookup_time};
}  

# ---------------------------------------------------------------------------

1;

=back

=cut