predicates.py   [plain text]


# -*- coding: Latin-1 -*-

"""Predicates for the mini-language used by rresource.py"""

import struct
import socket
import time

def atoi(s):
    if s == '255.255.255.255':
        return -1
    return struct.unpack('!i', socket.inet_aton(s))[0]


class Address:
    def __eq__(self, address):
        return _Address(address, operator.eq)
    def __ne__(self, address):
        return _Address(address, operator.ne)


class _Address:
    subjugates = None

    def __init__(self, a, f):
        parts = a.split('/', 1)
        if len(parts) == 1:
            parts.append('255.255.255.255')
        self.host, self.mask = parts
        self.mask = atoi(self.mask)
        self.host = atoi(self.host) & self.mask
        self.f = f
    
    def check(self, resource, request):
        if self.f(atoi(request.client[1]) & self.mask, self.host):
            if self.subjugates is None:
                return True
            for s in self.subjugates:
                r = s.check(resource, request)
                if r is not None:
                    return r
        return None

    def __call__(self, *args):
        self.subjugates = args

class Tautology:
    def check(self, resource, request):
        if self.subjugates is not None:
            for s in self.subjugates:
                r = s.check(resource, request)
                if r is not None:
                    return r
            else:
                return None
        else:
            return True

    def __call__(self, *args):
        self.subjugates = args

class Contradiction:
    def check(self, resource, request):
        return False

    def __call__(self, *args):
        pass

import operator
tmpl = "def __%s__(self, m): return __DateThing(self.attr, m, operator.%s)"
class DateThing:
    def __init__(self, attr):
        self.attr = attr
    for fn in 'eq', 'ne', 'lt', 'gt', 'le', 'ge':
        exec tmpl % (fn, fn)

class _DateThing(Tautology):
    def __init__(self, a, m, f):
        self.a = a
        self.m = m
        self.f = f

    def check(self, resource, request):
        if self.f(getattr(time.localtime(), self.a), self.m):
            return Tautology.check(resource, request)
        return false

Month = DateThing('tm_mon')
MonthDay = DateThing('tm_mday')

tmpl = 'def __%s__(self, n): return _UniqueHosts(n, operator.%s)'
class UniqueHosts:
    for fn in 'eq', 'ne', 'lt', 'gt', 'le', 'ge':
        exec tmpl % (fn, fn)

class _UniqueHosts(Tautology):
    def __init__(self, n, op):
        self.n = n
        self.op = op
    
    def check(self, resource, request):
        i = 0
        d = {}
        for conn in request.channel.factory.connections:
            if conn.transport.getPeer()[1] not in d:
                d[conn.transport.getPeer()[1]] = None
                i += 1
        if self.op(i, self.n):
            return Tautology.check(self, resource, request)
        return False

tmpl = 'def __%s__(self, c): return _Connections(c, operator.%s)'
class Connections:
    for fn in 'eq', 'ne', 'lt', 'gt', 'le', 'ge':
        exec tmpl % (fn, fn)

class _Connections(Tautology):
    def __init__(self, c, op):
        self.c = c
        self.op = op
    
    def check(self, resource, request):
        if self.op(len(request.channel.factory.connections), self.c):
            return Tautology.check(self, resource, request)
        return False

Tautology = Tautology()
Contradiction = Contradiction()
Address = Address()
Connections = Connections()
UniqueHosts = UniqueHosts()