HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/python3/dist-packages/twisted/test/__pycache__/test_factories.cpython-38.pyc
U


W[�@shdZddlmZmZddlZddlmZddlmZddl	m
Z
mZGdd�de�Z
Gd	d
�d
e�ZdS)z&
Test code for basic Factory classes.
�)�division�absolute_importN)�TestCase)�Clock)�ReconnectingClientFactory�Protocolc@s eZdZdZdd�Zdd�ZdS)�
FakeConnectorzP
    A fake connector class, to be used to mock connections failed or lost.
    cCsdS�N���selfr
r
�=/usr/lib/python3/dist-packages/twisted/test/test_factories.py�stopConnectingszFakeConnector.stopConnectingcCsdSr	r
rr
r
r
�connectszFakeConnector.connectN)�__name__�
__module__�__qualname__�__doc__rrr
r
r
r
rsrc@s@eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dS)�ReconnectingFactoryTestsz1
    Tests for L{ReconnectingClientFactory}.
    cCsLGdd�dt�}t�}t|_|�d�|��|�|�d�|�|j�dS)z�
        If a L{ReconnectingClientFactory} has C{stopTrying} called while it is
        connected, it does not subsequently attempt to reconnect if the
        connection is later lost.
        c@seZdZdd�Zdd�ZdS)zQReconnectingFactoryTests.test_stopTryingWhenConnected.<locals>.NoConnectConnectorcSstd��dS)Nz%Shouldn't be called, we're connected.��RuntimeErrorrr
r
r
r,sz`ReconnectingFactoryTests.test_stopTryingWhenConnected.<locals>.NoConnectConnector.stopConnectingcSstd��dS)NzShouldn't be reconnecting.rrr
r
r
r.szYReconnectingFactoryTests.test_stopTryingWhenConnected.<locals>.NoConnectConnector.connectN)rrrrrr
r
r
r
�NoConnectConnector+srN)	�objectrrZprotocolZ
buildProtocol�
stopTrying�clientConnectionLost�assertFalse�continueTrying)rr�cr
r
r
�test_stopTryingWhenConnected%s
z5ReconnectingFactoryTests.test_stopTryingWhenConnectedcsTG�fdd�dt�}t��t��_|��_���|��jj�|��j���dS)z
        Calling stopTrying on a L{ReconnectingClientFactory} doesn't attempt a
        retry on any active connector.
        cs$eZdZdZ�fdd�Zdd�ZdS)z[ReconnectingFactoryTests.test_stopTryingDoesNotReconnect.<locals>.FactoryAwareFakeConnectorFcs��|d�dS)z�
                Behave as though an ongoing connection attempt has now
                failed, and notify the factory of this.
                N)�clientConnectionFailedr��fr
r
rCszjReconnectingFactoryTests.test_stopTryingDoesNotReconnect.<locals>.FactoryAwareFakeConnector.stopConnectingcSs
d|_dS)z|
                Record an attempt to reconnect, since this is what we
                are trying to avoid.
                TN)�attemptedRetryrr
r
r
rJszcReconnectingFactoryTests.test_stopTryingDoesNotReconnect.<locals>.FactoryAwareFakeConnector.connectN)rrrr"rrr
r r
r
�FactoryAwareFakeConnector@sr#N)	rrr�clock�	connectorrrr"ZgetDelayedCalls)rr#r
r r
�test_stopTryingDoesNotReconnect;sz8ReconnectingFactoryTests.test_stopTryingDoesNotReconnectcCs*t�}t�t�|��}|�|j|j�dS)z�
        A L{ReconnectingClientFactory} which hasn't been used for anything
        can be pickled and unpickled and end up with the same state.
        N)r�pickle�loads�dumps�assertEqual�__dict__)r�original�
reconstitutedr
r
r
�test_serializeUnused^sz-ReconnectingFactoryTests.test_serializeUnusedcCs2t�}t�}||_t�t�|��}|�|j�dS)z�
        The clock attribute of L{ReconnectingClientFactory} is not serialized,
        and the restored value sets it to the default value, the reactor.
        N)rrr$r'r(r)�assertIsNone)rr$r,r-r
r
r
�test_serializeWithClockhs
z0ReconnectingFactoryTests.test_serializeWithClockcCszt�}|�t�d�|�|j�t�|�}t�|�}|�|j	�|�|j
�|�|jd�|�|j
|j�|�|j�dS)z�
        A L{ReconnectingClientFactory} which is unpickled does not have an
        L{IConnector} and has its reconnecting timing parameters reset to their
        initial values.
        Nr)rrrZ
addCleanuprr'r)r(r/r%Z_callIDr*ZretriesZdelayZinitialDelayZ
assertTruer)r�factoryZ
serializedZunserializedr
r
r
�$test_deserializationResetsParametersts

z=ReconnectingFactoryTests.test_deserializationResetsParameterscCs6t�}t�}||_|�t�d�|�t|j�d�dS)z�
        The clock used by L{ReconnectingClientFactory} can be parametrized, so
        that one can cleanly test reconnections.
        N�)rrr$rrr*�lenZcalls)rr$r1r
r
r
�test_parametrizedClock�s
z/ReconnectingFactoryTests.test_parametrizedClockN)
rrrrrr&r.r0r2r5r
r
r
r
r s#
r)rZ
__future__rrr'Ztwisted.trial.unittestrZtwisted.internet.taskrZtwisted.internet.protocolrrrrrr
r
r
r
�<module>s