HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/python3/dist-packages/twisted/names/test/__pycache__/test_util.cpython-38.pyc
U


W[K�@s�dZddlmZmZddlmZddlmZddlm	Z	ddl
mZddlm
Z
ddlmZdd	lmZmZee�Gd
d�de��Ze	ee�ee�Gdd
�d
e��Ze	ee�dS)z$
Utilities for Twisted.names tests.
�)�division�absolute_import)�	randrange)�implementer)�verifyClass)�IPv4Address)�succeed)�Clock)�IReactorUDP�
IUDPTransportc@sJeZdZdZdd�Zdd�Zdd�Zdd	d
�Zdd�Zd
d�Z	dd�Z
dS)�MemoryDatagramTransporta�
    This L{IUDPTransport} implementation enforces the usual connection rules
    and captures sent traffic in a list for later inspection.

    @ivar _host: The host address to which this transport is bound.
    @ivar _protocol: The protocol connected to this transport.
    @ivar _sentPackets: A C{list} of two-tuples of the datagrams passed to
        C{write} and the addresses to which they are destined.

    @ivar _connectedTo: L{None} if this transport is unconnected, otherwise an
        address to which all traffic is supposedly sent.

    @ivar _maxPacketSize: An C{int} giving the maximum length of a datagram
        which will be successfully handled by C{write}.
    cCs"||_||_g|_d|_||_dS�N)�_host�	_protocol�_sentPackets�_connectedTo�_maxPacketSize)�self�host�protocol�
maxPacketSize�r�>/usr/lib/python3/dist-packages/twisted/names/test/test_util.py�__init__'s
z MemoryDatagramTransport.__init__cCstd|j��S)z_
        Return the address which this transport is pretending to be bound
        to.
        �UDP)r)rr�rrrr�getHost/szMemoryDatagramTransport.getHostcCs |jdk	rtd��||f|_dS)z>
        Connect this transport to the given address.
        NzAlready connected)r�
ValueError)rr�portrrr�connect7s
zMemoryDatagramTransport.connectNcCsH|dkr|j}|dkrtd��t|�|jkr4td��|j�||f�dS)z*
        Send the given datagram.
        NzNeed an addresszPacket too big)rr�lenrr�append)rZdatagramZaddrrrr�write@szMemoryDatagramTransport.writecCs|j��td�S)z+
        Shut down this transport.
        N)rZstopProtocolrrrrr�
stopListeningMs
z%MemoryDatagramTransport.stopListeningcCsdS�zC
        Dummy implementation to satisfy L{IUDPTransport}.
        Nr)r�enabledrrr�setBroadcastAllowedUsz+MemoryDatagramTransport.setBroadcastAllowedcCsdSr$rrrrr�getBroadcastAllowed\sz+MemoryDatagramTransport.getBroadcastAllowed)N)�__name__�
__module__�__qualname__�__doc__rrrr"r#r&r'rrrrrs	

rc@s"eZdZdZdd�Zd	dd�ZdS)
�
MemoryReactoraO
    An L{IReactorTime} and L{IReactorUDP} provider.

    Time is controlled deterministically via the base class, L{Clock}.  UDP is
    handled in-memory by connecting protocols to instances of
    L{MemoryDatagramTransport}.

    @ivar udpPorts: A C{dict} mapping port numbers to instances of
        L{MemoryDatagramTransport}.
    cCst�|�i|_dSr
)r	r�udpPortsrrrrrss
zMemoryReactor.__init__�� cCsZ|dkr tdd�}||jkrq q||jkr2td��t||f||�}||j|<|�|�|S)zR
        Pretend to bind a UDP port and connect the given protocol to it.
        r�izAddress in use)rr-rrZmakeConnection)rrrZ	interfacerZ	transportrrr�	listenUDPxs


�

zMemoryReactor.listenUDPN)r.r/)r(r)r*r+rr1rrrrr,gs
r,N)r+Z
__future__rrZrandomrZzope.interfacerZzope.interface.verifyrZtwisted.internet.addressrZtwisted.internet.deferrZtwisted.internet.taskr	Ztwisted.internet.interfacesr
r�objectrr,rrrr�<module>sL
!