HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //lib/python3/dist-packages/twisted/internet/test/__pycache__/test_iocp.cpython-38.pyc
U


W[2�
@sJdZddlZddlmZddlmZddlmZmZmZm	Z	m
Z
mZddlmZddl
mZddlmZdd	lmZzPdd
lmZmZmZddlmZmZmZddlmZdd
lmZddl m!Z!Wne"k
r�dZ#YnXzeee��$�Wn,e
k
�rZ%ze&e%�Z'W5dZ%[%XYnXdZ'Gdd�dej(�Z)Gdd�dej(�Z*dS)z,
Tests for L{twisted.internet.iocpreactor}.
�N)�array)�pack)�AF_INET6�AF_INET�SOCK_STREAM�
SOL_SOCKET�error�socket)�verifyClass)�unittest)�msg)�
IPushProducer)�iocpsupport�tcp�udp)�IOCPReactor�EVENTS_PER_LOOP�
KEY_NORMAL)�IReadWriteHandle)�SO_UPDATE_ACCEPT_CONTEXT)�
FileHandlez%This test only applies to IOCPReactorc@s6eZdZdZdd�Zdd�Zdd�Zedk	r2ee_dS)	�SupportTestszn
    Tests for L{twisted.internet.iocpreactor.iocpsupport}, low-level reactor
    implementation helpers.
    c
Cs>td|f�t|t�}|�|j�|�d�|�d�t|t�}|�|j�|�d�z|�||�	�df�Wn8t
k
r�}z|�|jtj
tjf�W5d}~XYnXt|t�}|�|j�tdd�}|�dt�|��|��|d��|�tttd	|����|�||��dd
�|�	�dd
�ft�|��|��dS)a
        Create a C{SOCK_STREAM} connection to localhost using a socket with an
        address family of C{family} and assert that the result of
        L{iocpsupport.get_accept_addrs} is consistent with the result of
        C{socket.getsockname} and C{socket.getpeername}.
        zfamily = %r)�r�FN�Bsr�P�)rr	rZ
addCleanup�closeZbindZlistenZsetblockingZconnectZgetsocknamerZassertIn�errnoZEINPROGRESSZEWOULDBLOCKr�assertEqual�_iocpZaccept�filenoZ
setsockoptrrrZgetpeernameZget_accept_addrs)�selfZfamilyZ	localhost�portZclient�eZserverZbuff�r%�A/usr/lib/python3/dist-packages/twisted/internet/test/test_iocp.py�_acceptAddressTest(s8




(

�� �zSupportTests._acceptAddressTestcCs|�td�dS)a
        L{iocpsupport.get_accept_addrs} returns a three-tuple of address
        information about the socket associated with the file descriptor passed
        to it.  For a connection using IPv4:

          - the first element is C{AF_INET}
          - the second element is a two-tuple of a dotted decimal notation IPv4
            address and a port number giving the peer address of the connection
          - the third element is the same type giving the host address of the
            connection
        z	127.0.0.1N)r'r�r"r%r%r&�test_ipv4AcceptAddressHsz#SupportTests.test_ipv4AcceptAddresscCs|�td�dS)a�
        Like L{test_ipv4AcceptAddress}, but for IPv6 connections.  In this case:

          - the first element is C{AF_INET6}
          - the second element is a two-tuple of a hexadecimal IPv6 address
            literal and a port number giving the peer address of the connection
          - the third element is the same type giving the host address of the
            connection
        z::1N)r'rr(r%r%r&�test_ipv6AcceptAddressWs
z#SupportTests.test_ipv6AcceptAddressN)	�__name__�
__module__�__qualname__�__doc__r'r)r*�ipv6Skip�skipr%r%r%r&r#s rc@s,eZdZdd�Zdd�Zdd�Zdd�Zd	S)
�IOCPReactorTestscCs"t�}|��|�|�d��dS)zd
        Test reactor behavior (doIteration) when there are no pending time
        events.
        N)rZwakeUpZassertFalse�doIteration)r"�irr%r%r&�test_noPendingTimerEventshsz*IOCPReactorTests.test_noPendingTimerEventscCs(|�tttj��|�tttj��dS)zY
        Verify that IOCP socket-representing classes implement IReadWriteHandle
        N)�
assertTruer
rrZ
ConnectionrZPortr(r%r%r&�test_reactorInterfacesrsz'IOCPReactorTests.test_reactorInterfacescCs|�ttt��dS)zH
        Verify that L{Filehandle} implements L{IPushProducer}.
        N)r5r
r
rr(r%r%r&�test_fileHandleInterfaceszsz*IOCPReactorTests.test_fileHandleInterfacescCs�Gdd�d�}t�}|�}t�|j|�}ttd�D]}|j�dt|�q4|�	d�|�
|jt�|�	d�|�
|jtd�dS)z�
        Verify that we don't lose an event when more than EVENTS_PER_LOOP
        events occur in the same reactor iteration
        c@s eZdZdZdd�Zdd�ZdS)z;IOCPReactorTests.test_maxEventsPerIteration.<locals>.FakeFDrcSsdS)N�FakeFDr%r(r%r%r&�	logPrefix�szEIOCPReactorTests.test_maxEventsPerIteration.<locals>.FakeFD.logPrefixcSs|jd7_dS)Nr)�counter)r"Zrc�bytesZevtr%r%r&�cb�sz>IOCPReactorTests.test_maxEventsPerIteration.<locals>.FakeFD.cbN)r+r,r-r:r9r<r%r%r%r&r8�sr8rrN)rr ZEventr<�rangerr#Z	postEventrr2rr:)r"r8r3�fdZevent�_r%r%r&�test_maxEventsPerIteration�s

z+IOCPReactorTests.test_maxEventsPerIterationN)r+r,r-r4r6r7r@r%r%r%r&r1gs
r1)+r.rrZstructrr	rrrrrZzope.interface.verifyr
Z
twisted.trialrZtwisted.python.logrZtwisted.internet.interfacesr
Ztwisted.internet.iocpreactorrr rrZ$twisted.internet.iocpreactor.reactorrrrZ'twisted.internet.iocpreactor.interfacesrZ"twisted.internet.iocpreactor.constrZ%twisted.internet.iocpreactor.abstractr�ImportErrorr0rr$�strr/ZTestCaserr1r%r%r%r&�<module>s. 
D