HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/python3/dist-packages/twisted/test/__pycache__/test_stringtransport.cpython-38.pyc
U


W[�3�@s�dZddlmZddlmZmZmZmZmZm	Z	m
Z
mZmZddl
mZddlmZddlmZmZmZmZddlmZmZGdd	�d	e�ZGd
d�de�ZGdd
�d
e�ZGdd�de�ZdS)z*
Tests for L{twisted.test.proto_helpers}.
�)�verifyObject)	�
ITransport�
IPushProducer�	IConsumer�IReactorTCP�IReactorSSL�IReactorUNIX�IAddress�IListeningPort�
IConnector)�IPv4Address)�TestCase)�StringTransport�
MemoryReactor�RaisingMemoryReactor�NonStreamingProducer)�
ClientFactory�Factoryc@s�eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dd�Z
dd�Zdd�Zdd�Z
dd�Zdd�Zdd�Zdd�Zdd�Zd d!�Zd"d#�Zd$d%�Zd&d'�Zd(S))�StringTransportTestszB
    Tests for L{twisted.test.proto_helpers.StringTransport}.
    cCst�|_dS�N)r�	transport��self�r�C/usr/lib/python3/dist-packages/twisted/test/test_stringtransport.py�setUpszStringTransportTests.setUpcCs:|�tt|j��|�tt|j��|�tt|j��dS)zq
        L{StringTransport} instances provide L{ITransport}, L{IPushProducer},
        and L{IConsumer}.
        N)�
assertTruerrrrrrrrr�test_interfacessz$StringTransportTests.test_interfacescCs>t�}t�}|j�||�|�|jj|�|�|jj|�dS)zz
        L{StringTransport.registerProducer} records the arguments supplied to
        it as instance attributes.
        N)�objectr�registerProducer�assertIs�producer�	streaming�rr!r"rrr�test_registerProducer&s
z*StringTransportTests.test_registerProducercCsLt�}|j�|d�|�t|jjt�d�|�|jj|�|�|jj�dS)zy
        L{StringTransport.registerProducer} raises L{RuntimeError} if a
        producer is already registered.
        TFN)	rrr�assertRaises�RuntimeErrorr r!rr")rr!rrr�test_disallowedRegisterProducer2s�z4StringTransportTests.test_disallowedRegisterProducercCsbt�}t�}|j�|d�|j��|�|jj�|j�|d�|�|jj|�|�|jj�dS)z�
        L{StringTransport.unregisterProducer} causes the transport to forget
        about the registered producer and makes it possible to register a new
        one.
        FTN)	rrr�unregisterProducer�assertIsNoner!r rr")rZoldProducerZnewProducerrrr�test_unregisterProducer?s
z,StringTransportTests.test_unregisterProducercCs|�t|jj�dS)z�
        L{StringTransport.unregisterProducer} raises L{RuntimeError} if called
        when no producer is registered.
        N)r%r&rr(rrrr�test_invalidUnregisterProducerOsz3StringTransportTests.test_invalidUnregisterProducercCs|�|jjd�dS)zO
        L{StringTransport.producerState} is initially C{'producing'}.
        �	producingN)�assertEqualr�
producerStaterrrr�test_initialProducerStateWsz.StringTransportTests.test_initialProducerStatecCs|j��|�|jjd�dS)zy
        L{StringTransport.pauseProducing} changes the C{producerState} of the
        transport to C{'paused'}.
        ZpausedN)r�pauseProducingr-r.rrrr�test_pauseProducing^s
z(StringTransportTests.test_pauseProducingcCs(|j��|j��|�|jjd�dS)z}
        L{StringTransport.resumeProducing} changes the C{producerState} of the
        transport to C{'producing'}.
        r,N)rr0�resumeProducingr-r.rrrr�test_resumeProducinggs

z)StringTransportTests.test_resumeProducingcCs|j��|�|jjd�dS)z{
        L{StringTransport.stopProducing} changes the C{'producerState'} of the
        transport to C{'stopped'}.
        ZstoppedN)r�
stopProducingr-r.rrrr�test_stopProducingqs
z'StringTransportTests.test_stopProducingcCs|j��|�t|jj�dS)zu
        L{StringTransport.pauseProducing} raises L{RuntimeError} if the
        transport has been stopped.
        N)rr4r%r&r0rrrr� test_stoppedTransportCannotPausezs
z5StringTransportTests.test_stoppedTransportCannotPausecCs|j��|�t|jj�dS)zv
        L{StringTransport.resumeProducing} raises L{RuntimeError} if the
        transport has been stopped.
        N)rr4r%r&r2rrrr�!test_stoppedTransportCannotResume�s
z6StringTransportTests.test_stoppedTransportCannotResumecCs|j��|�t|jj�dS)zz
        L{StringTransport.pauseProducing} raises L{RuntimeError} if the
        transport is being disconnected.
        N)r�loseConnectionr%r&r0rrrr�&test_disconnectingTransportCannotPause�s
z;StringTransportTests.test_disconnectingTransportCannotPausecCs|j��|�t|jj�dS)z{
        L{StringTransport.resumeProducing} raises L{RuntimeError} if the
        transport is being disconnected.
        N)rr8r%r&r2rrrr�'test_disconnectingTransportCannotResume�s
z<StringTransportTests.test_disconnectingTransportCannotResumecCs*|�|jj�|j��|�|jj�dS)zv
        L{StringTransport.loseConnection} toggles the C{disconnecting} instance
        variable to C{True}.
        N)�assertFalserZ
disconnectingr8rrrrr�$test_loseConnectionSetsDisconnecting�s
z9StringTransportTests.test_loseConnectionSetsDisconnectingcCst�}|�t|���|�dS)z�
        If a host address is passed to L{StringTransport.__init__}, that
        value is returned from L{StringTransport.getHost}.
        N)rr r�getHost�r�addressrrr�test_specifiedHostAddress�sz.StringTransportTests.test_specifiedHostAddresscCs t�}|�t|d���|�dS)z�
        If a peer address is passed to L{StringTransport.__init__}, that
        value is returned from L{StringTransport.getPeer}.
        )ZpeerAddressN)rr r�getPeerr>rrr�test_specifiedPeerAddress�s
�z.StringTransportTests.test_specifiedPeerAddresscCst���}|�|t�dS)z�
        If no host address is passed to L{StringTransport.__init__}, an
        L{IPv4Address} is returned from L{StringTransport.getHost}.
        N)rr=�assertIsInstancerr>rrr�test_defaultHostAddress�s
z,StringTransportTests.test_defaultHostAddresscCst���}|�|t�dS)z�
        If no peer address is passed to L{StringTransport.__init__}, an
        L{IPv4Address} is returned from L{StringTransport.getPeer}.
        N)rrArCrr>rrr�test_defaultPeerAddress�s
z,StringTransportTests.test_defaultPeerAddressN)�__name__�
__module__�__qualname__�__doc__rrr$r'r*r+r/r1r3r5r6r7r9r:r<r@rBrDrErrrrrs(

	
					
	
	rc@s@eZdZdZdd�Zdd�Zdd�Zdd	�Zd
d�Zdd
�Z	dS)�ReactorTestszA
    Tests for L{MemoryReactor} and L{RaisingMemoryReactor}.
    cCs(t�}tt|�tt|�tt|�dS)zt
        L{MemoryReactor} provides all of the attributes described by the
        interfaces it advertises.
        N)rrrrr)r�
memoryReactorrrr�test_memoryReactorProvides�s

z'ReactorTests.test_memoryReactorProvidescCs(t�}tt|�tt|�tt|�dS)z{
        L{RaisingMemoryReactor} provides all of the attributes described by the
        interfaces it advertises.
        N)rrrrr)rZraisingReactorrrr�test_raisingReactorProvides�s

z(ReactorTests.test_raisingReactorProvidescCs�t�}|�ddt��|�ddt�d�fD]<}tt|�|��}tt|�|�|j	d�|�|j
d�q(|�dt��}tt|�|��}tt|�|�|jd�dS)a
        L{MemoryReactor.connectTCP}, L{MemoryReactor.connectSSL}, and
        L{MemoryReactor.connectUNIX} will return an L{IConnector} whose
        C{getDestination} method returns an L{IAddress} with attributes which
        reflect the values passed.
        ztest.example.comi� Ns
/fake/path)
rZ
connectTCPrZ
connectSSLrrZgetDestinationr	r-�host�portZconnectUNIX�name)rrKZ	connectorr?rrr�test_connectDestination�s.���



z$ReactorTests.test_connectDestinationcCs�t�}|�dt��|�dt�d�fD]<}tt|�|��}tt|�|�|j	d�|�|j
d�q$|�dt��}tt|�|��}tt|�|�|jd�dS)a�
        L{MemoryReactor.listenTCP}, L{MemoryReactor.listenSSL} and
        L{MemoryReactor.listenUNIX} will return an L{IListeningPort} whose
        C{getHost} method returns an L{IAddress}; C{listenTCP} and C{listenSSL}
        will have a default host of C{'0.0.0.0'}, and a port that reflects the
        value passed, and C{listenUNIX} will have a name that reflects the path
        passed.
        i2 Nz0.0.0.0s/path/to/socket)
rZ	listenTCPrZ	listenSSLrr
r=r	r-rNrOZ
listenUNIXrP)rrKrOr?rrr�test_listenDefaultHosts	�



z#ReactorTests.test_listenDefaultHostcCsPt�}t�}|�|�|�|�|�|��|g�|�|�|�|��g�dS)z>
        Adding, removing, and listing readers works.
        N)rrZ	addReaderr-Z
getReadersZremoveReader)r�reader�reactorrrr�test_readerss


zReactorTests.test_readerscCsPt�}t�}|�|�|�|�|�|��|g�|�|�|�|��g�dS)z>
        Adding, removing, and listing writers works.
        N)rrZ	addWriterr-Z
getWritersZremoveWriter)r�writerrTrrr�test_writers+s


zReactorTests.test_writersN)
rFrGrHrIrLrMrQrRrUrWrrrrrJ�srJc@s0eZdZdZdd�Zdd�Zdd�Zdd	�Zd
S)�TestConsumerzP
    A very basic test consumer for use with the NonStreamingProducerTests.
    cCsg|_d|_d|_dSr)�writesr!�producerStreamingrrrr�__init__AszTestConsumer.__init__cCs||_||_dS)z�
        Registers a single producer with this consumer. Just keeps track of it.

        @param producer: The producer to register.
        @param streaming: Whether the producer is a streaming one or not.
        N�r!rZr#rrrrGszTestConsumer.registerProducercCsd|_d|_dS)zC
        Forget the producer we had previously registered.
        Nr\rrrrr(RszTestConsumer.unregisterProducercCs|j�|�dS)zz
        Some data was written to the consumer: stores it for later use.

        @param data: The data to write.
        N)rY�append)r�datarrr�writeZszTestConsumer.writeN)rFrGrHrIr[rr(r_rrrrrX=s
rXc@s eZdZdZdd�Zdd�ZdS)�NonStreamingProducerTestszF
    Tests for the L{NonStreamingProducer} to validate behaviour.
    c
Cs�t�}t|�}|�|d�|�|j|�|�|j|�|�|j�td�D]}|�	�qJddddddd	d
ddg
}|�
|j�|�
|j�|�
|j�|�|j|�|�
t|j	�d
S)z�
        When the L{NonStreamingProducer} has resumeProducing called 10 times,
        it writes the counter each time and then fails.
        F�
�0�1�2�3�4�5�6�7�8�9N)rXrrr r!�consumerr;rZ�ranger2r)r-rYr%r&)rrlr!�_ZexpectedWritesrrr�test_producesOnly10Timeshs0
�z2NonStreamingProducerTests.test_producesOnly10TimescCs4t�}t|�}|�|d�|��|�t|j�dS)zb
        When the L{NonStreamingProducer} is paused, it raises a
        L{RuntimeError}.
        FN)rXrrr2r%r&r0)rrlr!rrr�test_cannotPauseProduction�s
z4NonStreamingProducerTests.test_cannotPauseProductionN)rFrGrHrIrorprrrrr`dsr`N)rIZzope.interface.verifyrZtwisted.internet.interfacesrrrrrrr	r
rZtwisted.internet.addressrZtwisted.trial.unittestr
Ztwisted.test.proto_helpersrrrrZtwisted.internet.protocolrrrrJrrXr`rrrr�<module>s,;o'