HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/python3/dist-packages/twisted/internet/test/__pycache__/test_epollreactor.cpython-38.pyc
U


W[��@s�dZddlmZmZddlmZzddlmZWnek
rHdZYnXddl	m
Z
ddlmZGdd	�d	e
�ZGd
d�de�ZdS)z-
Tests for L{twisted.internet.epollreactor}.
�)�division�absolute_import)�TestCase)�_ContinuousPollingN)�Clock��ConnectionDonec@s8eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�ZdS)
�
DescriptorzF
    Records reads and writes, as if it were a C{FileDescriptor}.
    cCs
g|_dS�N)�events��self�r�I/usr/lib/python3/dist-packages/twisted/internet/test/test_epollreactor.py�__init__szDescriptor.__init__cCsdS)N�rrrrr�filenoszDescriptor.filenocCs|j�d�dS)N�read�r�appendrrrr�doRead!szDescriptor.doReadcCs|j�d�dS)N�writerrrrr�doWrite%szDescriptor.doWritecCs|�t�|j�d�dS)N�lost)Ztraprrr)r
�reasonrrr�connectionLost)s
zDescriptor.connectionLostN)	�__name__�
__module__�__qualname__�__doc__rrrrrrrrrr	sr	c@s�eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
dd�Zdd�Zdd�Z
dd�Zdd�Zdd�Zedkr�dZdS)�ContinuousPollingTestsza
    L{_ContinuousPolling} can be used to read and write from C{FileDescriptor}
    objects.
    cCsvtt��}|�|j�t�}|�|�|��|�|�|�|j�|�	|jj
�|�|jj|j
�|�	|�|��dS)zi
        Adding a reader when there was previously no reader starts up a
        C{LoopingCall}.
        N)rr�assertIsNone�_loop�object�assertFalse�	isReading�	addReader�assertIsNotNone�
assertTrue�running�assertIs�clock�_reactor�r
�poller�readerrrr�test_addReader5s

z%ContinuousPollingTests.test_addReadercCsvtt��}|�|j�t�}|�|�|��|�|�|�|j�|�	|jj
�|�|jj|j
�|�	|�|��dS)zi
        Adding a writer when there was previously no writer starts up a
        C{LoopingCall}.
        N)rrr!r"r#r$�	isWriting�	addWriterr'r(r)r*r+r,�r
r.�writerrrr�test_addWriterEs

z%ContinuousPollingTests.test_addWritercCsVtt��}t�}|�|�|�|�|�|j�|�|j�	�g�|�
|�|��dS)z=
        Removing a reader stops the C{LoopingCall}.
        N)rrr#r&�removeReaderr!r"�assertEqualr,�getDelayedCallsr$r%r-rrr�test_removeReaderUs


z(ContinuousPollingTests.test_removeReadercCsVtt��}t�}|�|�|�|�|�|j�|�|j�	�g�|�
|�|��dS)z=
        Removing a writer stops the C{LoopingCall}.
        N)rrr#r2�removeWriterr!r"r7r,r8r$r1r3rrr�test_removeWriterbs


z(ContinuousPollingTests.test_removeWritercCs&tt��}|�t��|�t��dS)zM
        Removing unknown readers and writers silently does nothing.
        N)rrr:r#r6)r
r.rrr�test_removeUnknownos
z)ContinuousPollingTests.test_removeUnknowncCs�tt��}t�}|�|�|�|j�|�t��|�|j�|�t��|�|j�|�t��|�|�|�|j�|�|jj	�|�
t|j�
��d�dS)za
        Adding multiple readers and writers results in a single
        C{LoopingCall}.
        rN)rrr#r2r'r"r&r:r(r)r7�lenr,r8r3rrr�test_multipleReadersAndWritersxs


z5ContinuousPollingTests.test_multipleReadersAndWriterscCs�t�}t|�}t�}|�|�|�|jg�|�d�|�|jdg�|�d�|�|jddg�|�d�|�|jdddg�dS)za
        Adding a reader causes its C{doRead} to be called every 1
        milliseconds.
        g�h㈵��>rN)rrr	r&r7r�advance�r
Zreactorr.Zdescrrr�test_readerPolling�s



z)ContinuousPollingTests.test_readerPollingcCs�t�}t|�}t�}|�|�|�|jg�|�d�|�|jdg�|�d�|�|jddg�|�d�|�|jdddg�dS)zb
        Adding a writer causes its C{doWrite} to be called every 1
        milliseconds.
        ���MbP?rN)rrr	r2r7rr?r@rrr�test_writerPolling�s



z)ContinuousPollingTests.test_writerPollingcCsTt�}t|�}t�}dd�|_|�|�|�|jg�|�d�|�|jdg�dS)zu
        If a C{doRead} returns a value indicating disconnection,
        C{connectionLost} is called on it.
        cSst�Sr
rrrrr�<lambda>��zBContinuousPollingTests.test_connectionLostOnRead.<locals>.<lambda>rBrN)rrr	rr&r7rr?r@rrr�test_connectionLostOnRead�s


z0ContinuousPollingTests.test_connectionLostOnReadcCsTt�}t|�}t�}dd�|_|�|�|�|jg�|�d�|�|jdg�dS)zv
        If a C{doWrite} returns a value indicating disconnection,
        C{connectionLost} is called on it.
        cSst�Sr
rrrrrrD�rEzCContinuousPollingTests.test_connectionLostOnWrite.<locals>.<lambda>rBrN)rrr	rr2r7rr?r@rrr�test_connectionLostOnWrite�s


z1ContinuousPollingTests.test_connectionLostOnWritecCs�tt��}t�}t�}t�}|�|�|�|�|�|�|�|�|��}|�|��g�|�|��g�|�t	|�d�|�t
|�t
|||g��dS)zv
        L{_ContinuousPolling.removeAll} removes all descriptors and returns
        the readers and writers.
        �N)rrr#r&r2Z	removeAllr7�
getReaders�
getWritersr=�set)r
r.r/r4ZbothZremovedrrr�test_removeAll�s




z%ContinuousPollingTests.test_removeAllcCs.tt��}t�}|�|�|�||���dS)zb
        L{_ContinuousPolling.getReaders} returns a list of the read
        descriptors.
        N)rrr#r&�assertInrIr-rrr�test_getReaders�s

z&ContinuousPollingTests.test_getReaderscCs.tt��}t�}|�|�|�||���dS)zc
        L{_ContinuousPolling.getWriters} returns a list of the write
        descriptors.
        N)rrr#r2rMrJr3rrr�test_getWriters�s

z&ContinuousPollingTests.test_getWritersNz(epoll not supported in this environment.)rrrrr0r5r9r;r<r>rArCrFrGrLrNrOr�skiprrrrr /s 

	
r )rZ
__future__rrZtwisted.trial.unittestrZtwisted.internet.epollreactorr�ImportErrorZtwisted.internet.taskrZtwisted.internet.errorrr#r	r rrrr�<module>s