HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/python3/dist-packages/twisted/internet/test/__pycache__/test_posixbase.cpython-38.pyc
U


W[@%�@s�dZddlmZmZddlmZddlmZddlm	Z	m
Z
ddlmZdZ
zddlmZdd	lmZWnek
r�d
Z
YnXddlmZddlmZGd
d�de	�ZGdd�de�ZGdd�de�ZGdd�de	�ZGdd�de�ZGdd�de�ZdS)z>
Tests for L{twisted.internet.posixbase} and supporting code.
�)�division�absolute_import)�TestCase)�Deferred)�PosixReactorBase�_Waker)�
ServerFactoryN)�unix)�ClientProtoz)Platform does not support AF_UNIX sockets)�Port��reactorc@s4eZdZdd�Zdd�Zdd�Zdd�Zd	d
�ZdS)�TrivialReactorcCsi|_i|_t�|�dS�N)�_readers�_writersr�__init__��self�r�F/usr/lib/python3/dist-packages/twisted/internet/test/test_posixbase.pyrszTrivialReactor.__init__cCsd|j|<dS�NT�r�r�readerrrr�	addReader#szTrivialReactor.addReadercCs|j|=dSrrrrrr�removeReader'szTrivialReactor.removeReadercCsd|j|<dSr�r�r�writerrrr�	addWriter+szTrivialReactor.addWritercCs|j|=dSrrrrrr�removeWriter/szTrivialReactor.removeWriterN)�__name__�
__module__�__qualname__rrrr r!rrrrrs
rc@s0eZdZdZdd�Zdd�Zdd�Zdd	�Zd
S)�PosixReactorBaseTestsz(
    Tests for L{PosixReactorBase}.
    cCs2|�|jt�|�|j|j�|�|j|j�dSr)ZassertIsInstanceZwakerr�assertIn�_internalReadersr�rr
rrr�_checkWaker9sz!PosixReactorBaseTests._checkWakercCst�}|�|�dS)z
        When L{PosixReactorBase} is instantiated, it creates a waker and adds
        it to its internal readers set.
        N)rr)r(rrr�test_wakerIsInternalReader?sz0PosixReactorBaseTests.test_wakerIsInternalReadercCs\t�}t�}|j�|�|�|�|�|j|j�|�|�|�	||j�|�	||j�dS)z�
        Any L{IReadDescriptors} in L{PosixReactorBase._internalReaders} are
        left alone by L{PosixReactorBase._removeAll}.
        N)
r�objectr'�addr�
_removeAllrrr)r&)rr
Zextrarrr�"test_removeAllSkipsInternalReadersHs

z8PosixReactorBaseTests.test_removeAllSkipsInternalReaderscCsnt�}t�}t�}|�|�|�|�|�|j|j�}|�t|�t||g��|�	||j�|�	||j�dS)z�
        L{PosixReactorBase._removeAll} returns a list of removed
        L{IReadDescriptor} and L{IWriteDescriptor} objects.
        N)
rr+rr r-rr�assertEqual�setZassertNotIn)rr
rrZremovedrrr�'test_removeAllReturnsRemovedDescriptorsWs

�z=PosixReactorBaseTests.test_removeAllReturnsRemovedDescriptorsN)r"r#r$�__doc__r)r*r.r1rrrrr%4s
	r%c@s&eZdZdZeee�sdZdd�ZdS)�TCPPortTestsz1
    Tests for L{twisted.internet.tcp.Port}.
    zNon-posixbase reactorcCs,tdt��}d|_dd�|_|�|��t�S)z�
        L{Port.stopListening} returns a L{Deferred} which errbacks if
        L{Port.connectionLost} raises an exception.
        i90TcSsddS)N�rr)�reasonrrr�<lambda>x�z8TCPPortTests.test_connectionLostFailed.<locals>.<lambda>)rrZ	connectedZconnectionLostZ
assertFailure�
stopListening�ZeroDivisionError)r�portrrr�test_connectionLostFailedqs
z&TCPPortTests.test_connectionLostFailedN)	r"r#r$r2�
isinstancer
r�skipr;rrrrr3is
r3c@s8eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�ZdS)
�TimeoutReportReactorz�
    A reactor which is just barely runnable and which cannot monitor any
    readers or writers, and which fires a L{Deferred} with the timeout
    passed to its C{doIteration} method as soon as that method is invoked.
    cCst�|�t�|_d|_dS)N�d)rrr�iterationTimeout�nowrrrrr�s
zTimeoutReportReactor.__init__cCsdS)z�
        Ignore the reader.  This is necessary because the waker will be
        added.  However, we won't actually monitor it for any events.
        Nrrrrrr�szTimeoutReportReactor.addReadercCsgS)z�
        There are no readers or writers, so there is nothing to remove.
        This will be called when the reactor stops, though, so it must be
        implemented.
        rrrrr�	removeAll�szTimeoutReportReactor.removeAllcCs|jS)zx
        Override the real clock with a deterministic one that can be easily
        controlled in a unit test.
        )rArrrr�seconds�szTimeoutReportReactor.secondscCs"|j}|dk	rd|_|�|�dSr)r@�callback)r�timeout�drrr�doIteration�sz TimeoutReportReactor.doIterationN)	r"r#r$r2rrrBrCrGrrrrr>}s	r>c@sPeZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
dd�ZdS)�IterationTimeoutTestsz�
    Tests for the timeout argument L{PosixReactorBase.run} calls
    L{PosixReactorBase.doIteration} with in the presence of various delayed
    calls.
    cs6g}�j�|j��j��fdd�����|dS)Ncs���Sr)�stop)Zignoredrrrr6�r7z>IterationTimeoutTests._checkIterationTimeout.<locals>.<lambda>r)r@ZaddCallback�append�run�rr
rErrr�_checkIterationTimeout�s
z,IterationTimeoutTests._checkIterationTimeoutcCst�}|�|�}|�|�dS)zl
        If there are no delayed calls, C{doIteration} is called with a
        timeout of L{None}.
        N)r>rM�assertIsNonerLrrr�test_noCalls�s
z"IterationTimeoutTests.test_noCallscCs0t�}|�ddd��|�|�}|�|d�dS)z�
        If there is a delayed call, C{doIteration} is called with a timeout
        which is the difference between the current time and the time at
        which that call is to run.
        r?cSsdSrrrrrrr6�r7z8IterationTimeoutTests.test_delayedCall.<locals>.<lambda>N�r>�	callLaterrMr/rLrrr�test_delayedCall�s
z&IterationTimeoutTests.test_delayedCallcCs>t�}|�ddd��|jd7_|�|�}|�|d�dS)z�
        If a delayed call is scheduled and then some time passes, the
        timeout passed to C{doIteration} is reduced by the amount of time
        which passed.
        r?cSsdSrrrrrrr6�r7z7IterationTimeoutTests.test_timePasses.<locals>.<lambda>��KN)r>rQrArMr/rLrrr�test_timePasses�s

z%IterationTimeoutTests.test_timePassescCsPt�}|�ddd��|�ddd��|�ddd��|�|�}|�|d�dS)	z�
        If there are several delayed calls, C{doIteration} is called with a
        timeout which is the difference between the current time and the
        time at which the earlier of the two calls is to run.
        �2cSsdSrrrrrrr6�r7zAIterationTimeoutTests.test_multipleDelayedCalls.<locals>.<lambda>�
cSsdSrrrrrrr6�r7r?cSsdSrrrrrrr6�r7NrPrLrrr�test_multipleDelayedCalls�s
z/IterationTimeoutTests.test_multipleDelayedCallscCsHt�}|�ddd��}|jd7_|�d�|�|�}|�|d�dS)z�
        If a delayed call is reset, the timeout passed to C{doIteration} is
        based on the interval between the time when reset is called and the
        new delay of the call.
        rVcSsdSrrrrrrr6�r7z=IterationTimeoutTests.test_resetDelayedCall.<locals>.<lambda>rS�N)r>rQrA�resetrMr/�rr
�callrErrr�test_resetDelayedCall�s

z+IterationTimeoutTests.test_resetDelayedCallcCsHt�}|�ddd��}|jd7_|�d�|�|�}|�|d�dS)z�
        If a delayed call is re-delayed, the timeout passed to
        C{doIteration} is based on the remaining time before the call would
        have been made and the additional amount of time passed to the delay
        method.
        rVcSsdSrrrrrrr6�r7z=IterationTimeoutTests.test_delayDelayedCall.<locals>.<lambda>rW��<N)r>rQrAZdelayrMr/r[rrr�test_delayDelayedCall�s

z+IterationTimeoutTests.test_delayDelayedCallcCs6t�}|�ddd��}|��|�|�}|�|�dS)zp
        If the only delayed call is canceled, L{None} is the timeout passed
        to C{doIteration}.
        rVcSsdSrrrrrrr6r7z>IterationTimeoutTests.test_cancelDelayedCall.<locals>.<lambda>N)r>rQZcancelrMrNr[rrr�test_cancelDelayedCalls

z,IterationTimeoutTests.test_cancelDelayedCallN)r"r#r$r2rMrOrRrUrXr]r`rarrrrrH�s

rHc@s,eZdZdZedk	reZdd�Zdd�ZdS)�ConnectedDatagramPortTestsz/
    Test connected datagram UNIX sockets.
    Ncs.�fdd�}t�dt��}||_|�d�dS)z�
        L{ConnectedDatagramPort} does not call the deprecated C{loseConnection}
        in L{ConnectedDatagramPort.connectionFailed}.
        cs��d�dS)z�
            Dummy C{loseConnection} method. C{loseConnection} is deprecated and
            should not get called.
            z7loseConnection is deprecated and should not get called.N)Zfailrrrr�loseConnectionsz`ConnectedDatagramPortTests.test_connectionFailedDoesntCallLoseConnection.<locals>.loseConnectionN�goodbye)r	�ConnectedDatagramPortr
rc�connectionFailed)rrcr:rrr�-test_connectionFailedDoesntCallLoseConnectionszHConnectedDatagramPortTests.test_connectionFailedDoesntCallLoseConnectioncs@d�_�fdd�}t�dt��}||_|�d����j�dS)z�
        L{ConnectedDatagramPort} calls L{ConnectedDatagramPort.stopListening}
        instead of the deprecated C{loseConnection} in
        L{ConnectedDatagramPort.connectionFailed}.
        Fcs
d�_dS)z8
            Dummy C{stopListening} method.
            TN)�calledrrrrr83szYConnectedDatagramPortTests.test_connectionFailedCallsStopListening.<locals>.stopListeningNrd)rhr	rer
r8rfZ
assertTrue)rr8r:rrr�'test_connectionFailedCallsStopListening+s
zBConnectedDatagramPortTests.test_connectionFailedCallsStopListening)r"r#r$r2�skipSocketsr=rgrirrrrrbs
rb)r2Z
__future__rrZtwisted.trial.unittestrZtwisted.internet.deferrZtwisted.internet.posixbaserrZtwisted.internet.protocolrrjZtwisted.internetr	Ztwisted.test.test_unixr
�ImportErrorZtwisted.internet.tcprr
rr%r3r>rHrbrrrr�<module>s&
5,i