HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //lib/python3/dist-packages/twisted/internet/test/__pycache__/test_udp_internals.cpython-38.pyc
U


W[-�@s�dZddlmZmZddlZddlmZddlmZddl	m
Z
ddlmZedkrbdd	l
mZndd
l
mZGdd�de�ZGd
d�de�ZGdd�dej�ZdS)zK
Tests for the internal implementation details of L{twisted.internet.udp}.
�)�division�absolute_importN)�unittest)�DatagramProtocol)�udp)�platformTypeZwin32)�WSAEWOULDBLOCK)�EWOULDBLOCKc@s(eZdZdZdd�Zdd�Zdd�ZdS)	�StringUDPSocketa
    A fake UDP socket object, which returns a fixed sequence of strings and/or
    socket errors.  Useful for testing.

    @ivar retvals: A C{list} containing either strings or C{socket.error}s.

    @ivar connectedAddr: The address the socket is connected to.
    cCs||_d|_dS�N)�retvals�
connectedAddr)�selfr�r�J/usr/lib/python3/dist-packages/twisted/internet/test/test_udp_internals.py�__init__"szStringUDPSocket.__init__cCs
||_dSr)r
)r�addrrrr�connect'szStringUDPSocket.connectcCs$|j�d�}t|tj�r|�|dfS)zH
        Return (or raise) the next value from C{self.retvals}.
        rN)r�pop�
isinstance�socket�error)r�sizeZretrrr�recvfrom+szStringUDPSocket.recvfromN)�__name__�
__module__�__qualname__�__doc__rrrrrrrr
s	r
c@s eZdZdZdd�Zdd�ZdS)�	KeepReadsz%
    Accumulate reads in a list.
    cCs
g|_dSr)�reads)rrrrr;szKeepReads.__init__cCs|j�|�dSr)r�append)r�datarrrr�datagramReceived?szKeepReads.datagramReceivedN)rrrrrr"rrrrr6src@s0eZdZdZdd�Zdd�Zdd�Zdd	�Zd
S)�ErrorsTestsz/
    Error handling tests for C{udp.Port}.
    cCs�tj�d�|�tjjd�t�}t�d|�}tddt�	d�dt�	d�g�|_|�
�|�|jddg�|�
�|�|jdddg�dS)z�
        Socket reads with some good data followed by a socket error which can
        be ignored causes reading to stop, and no log messages to be logged.
        i��Nsresults123s456)
rZ_sockErrReadIgnorer �
addCleanup�remover�Portr
rr�doRead�assertEqualr�rZprotocolZportrrr�test_socketReadNormalIs��z!ErrorsTests.test_socketReadNormalcCs�tj�d�|�tjjd�t�}dd�|_t�d|�}tdt	�
d�dt	�
t�g�|_	|��|�
|jdg�|��|�
|jddg�dS)z�
        If the socket is unconnected, socket reads with an immediate
        connection refusal are ignored, and reading stops. The protocol's
        C{connectionRefused} method is not called.
        ��cSsddS)N�rrrrrr�<lambda>m�z5ErrorsTests.test_readImmediateError.<locals>.<lambda>N�a�b)r�_sockErrReadRefuser r$r%r�connectionRefusedr&r
rrr	r'r(rr)rrr�test_readImmediateError`s
�z#ErrorsTests.test_readImmediateErrorcs�tj�d�|�tjjd�t�}g��fdd�|_t�d|�}tdt	�
d�dt	�
t�g�|_	|�dd�|�
�|�|jdg�|��d	g�|�
�|�|jddg�|��d	g�dS)
z�
        If the socket connected, socket reads with an immediate
        connection refusal are ignored, and reading stops. The protocol's
        C{connectionRefused} method is called.
        r+cs
��d�S)NT)r r�Zrefusedrrr-�r.z>ErrorsTests.test_connectedReadImmediateError.<locals>.<lambda>Nr/r0z	127.0.0.1i'T)rr1r r$r%rr2r&r
rrr	rr'r(rr)rr4r� test_connectedReadImmediateError|s �z,ErrorsTests.test_connectedReadImmediateErrorcCsJt�}t�d|�}tdt�d�g�|_|�tj|j�|�|j	dg�dS)zG
        Socket reads with an unknown socket error are raised.
        Nsgoodi���)
rrr&r
rrZassertRaisesr'r(rr)rrr�test_readUnknownError�s
z!ErrorsTests.test_readUnknownErrorN)rrrrr*r3r5r6rrrrr#Ds
r#)rZ
__future__rrrZ
twisted.trialrZtwisted.internet.protocolrZtwisted.internetrZtwisted.python.runtimer�errnorr	�objectr
rZSynchronousTestCaser#rrrr�<module>s