HEX
Server: Apache
System: Linux scp1.abinfocom.com 5.4.0-216-generic #236-Ubuntu SMP Fri Apr 11 19:53:21 UTC 2025 x86_64
User: confeduphaar (1010)
PHP: 8.1.33
Disabled: exec,passthru,shell_exec,system
Upload Files
File: //usr/lib/python3/dist-packages/twisted/test/__pycache__/test_stdio.cpython-38.pyc
U


W[d3�@s$dZddlmZmZddlZddlZddlZddlmZddl	m
Z
mZddlm
Z
ddlmZddlmZmZmZdd	lmZmZmZmZmZdd
lmZdZdZe��r�e
d�dkr�d
Zeej �Z!ej"�#ej$�e!d<n"e�Z!ej"�#ej$��%e�&��e!d<Gdd�dej'�Z(Gdd�dej)�Z*dS)z�
Tests for L{twisted.internet.stdio}.

@var properEnv: A copy of L{os.environ} which has L{bytes} keys/values on POSIX
    platforms and native L{str} keys/values on Windows.
�)�absolute_import�divisionN)�unittest)�filepath�log)�
requireModule)�platform)�range�
intToBytes�bytesEnviron)�error�defer�protocol�stdio�reactor)�ConnectionLostNotifyingProtocolsxyz123abc Twisted is great!Zwin32processzIOn windows, spawnProcess is not available in the absence of win32process.�
PYTHONPATHs
PYTHONPATHc@s4eZdZdZdZdd�Zdd�Zdd�Zd	d
�ZdS)�StandardIOTestProcessProtocola�
    Test helper for collecting output from a child process and notifying
    something when it exits.

    @ivar onConnection: A L{defer.Deferred} which will be called back with
    L{None} when the connection to the child process is established.

    @ivar onCompletion: A L{defer.Deferred} which will be errbacked with the
    failure associated with the child process exiting when it exits.

    @ivar onDataReceived: A L{defer.Deferred} which will be called back with
    this instance whenever C{childDataReceived} is called, or L{None} to
    suppress these callbacks.

    @ivar data: A C{dict} mapping file descriptors to strings containing all
    bytes received from the child process on each file descriptor.
    NcCst��|_t��|_i|_dS�N)r
�Deferred�onConnection�onCompletion�data��self�r�9/usr/lib/python3/dist-packages/twisted/test/test_stdio.py�__init__Bs

z&StandardIOTestProcessProtocol.__init__cCs|j�d�dSr)r�callbackrrrr�connectionMadeHsz,StandardIOTestProcessProtocol.connectionMadecCs>|j�|d�||j|<|jdk	r:|jd}|_|�|�dS)z�
        Record all bytes received from the child process in the C{data}
        dictionary.  Fire C{onDataReceived} if it is not L{None}.
        �N)r�get�onDataReceivedr)r�name�bytes�drrr�childDataReceivedLs
z/StandardIOTestProcessProtocol.childDataReceivedcCs|j�|�dSr)rr)r�reasonrrr�processEndedWsz*StandardIOTestProcessProtocol.processEnded)	�__name__�
__module__�__qualname__�__doc__r"rrr&r(rrrrr.src@s~eZdZeZdd�Zdd�Zdd�Zdd�Zd	d
�Z	dd�Z
d
d�Zdd�Zdd�Z
dd�Zdd�Zdd�Ze��rzde_dS)�StandardInputOutputTestscOs:tjdd|tjjgt|�}tj|tj|fdti|��S)a_
        Launch a child Python process and communicate with it using the
        given ProcessProtocol.

        @param proto: A L{ProcessProtocol} instance which will be connected
        to the child process.

        @param sibling: The basename of a file containing the Python program
        to run in the child process.

        @param *args: strings which will be passed to the child process on
        the command line as C{argv[2:]}.

        @param **kw: additional arguments to pass to L{reactor.spawnProcess}.

        @return: The L{IProcessTransport} provider for the spawned process.
        s-ms
twisted.test.�env)�sys�
executabler�	__class__r*�listZspawnProcess�	properEnv)r�protoZsibling�args�kwrrr�
_spawnProcess`s �����z&StandardInputOutputTests._spawnProcesscs$�fdd�}�fdd�}|�||�S)Ncs��d|f�dS)Nz'Process terminated with non-Failure: %r)Zfail)�resultrrr�cb~sz4StandardInputOutputTests._requireFailure.<locals>.cbcs�|�Srr)�err)rrr�eb�sz4StandardInputOutputTests._requireFailure.<locals>.eb)ZaddCallbacks)rr%rr9r;r)rrr�_requireFailure}sz(StandardInputOutputTests._requireFailurecsL����t�d��t���j}���d�����fdd�}��||�S)z�
        Verify that a protocol connected to L{StandardIO} can disconnect
        itself using C{transport.loseConnection}.
        �Child process logging to sstdio_test_loseconnc	sPt�d��"}|D]}t�d|���qW5QRX��d�j�|�tj�dS)N�rzChild logged: �)	�openr�msg�rstripZfailIfInr�trapr�ProcessDone)r'�f�line��errorLogFile�prrrr(�s
zBStandardInputOutputTests.test_loseConnection.<locals>.processEnded)�mktemprrArrr7r<�rr%r(rrGr�test_loseConnection�sz,StandardInputOutputTests.test_loseConnectioncsf|��}t�d|�t��t���_�fdd�}�j�|�dd�}|��j|�}|�	�d|�|S)z�
        When stdin is closed and the protocol connected to it implements
        L{IHalfCloseableProtocol}, the protocol's C{readConnectionLost} method
        is called.
        r=cs�j}�j��|Sr)rZ	transportZ
closeStdin)Zignoredr%�rIrr�cbBytes�s
zAStandardInputOutputTests.test_readConnectionLost.<locals>.cbBytescSs|�tj�dSr)rCrrD�r'rrrr(�szFStandardInputOutputTests.test_readConnectionLost.<locals>.processEndedsstdio_test_halfclose)
rJrrArr
rr"�addCallbackr<r7)rrHrNr(r%rrMr�test_readConnectionLost�s
�z0StandardInputOutputTests.test_readConnectionLostc
sjt��z�j�dtdd�Wn0tk
rL}zt�t|���W5d}~XYnX��fdd�}���j|�S)z�
        Verify that a write made directly to stdout using L{os.write}
        after StandardIO has finished is reliably received by the
        process reading that stdout.
        sstdio_test_lastwriteT)ZusePTYNcs0���jd�t�d�jf�|�tj�dS)z�
            Asserts that the parent received the bytes written by the child
            immediately after the child starts.
            r?z4Received %r from child, did not find expected bytes.N)�
assertTruer�endswith�UNIQUE_LAST_WRITE_STRINGrCrrDrO�rIrrrr(�s��zEStandardInputOutputTests.test_lastWriteReceived.<locals>.processEnded)	rr7rT�
ValueErrorrZSkipTest�strr<r)r�er(rrUr�test_lastWriteReceived�s�
 
z/StandardInputOutputTests.test_lastWriteReceivedcs2t���j}���d���fdd�}��||�S)z�
        Verify that the transport of a protocol connected to L{StandardIO}
        has C{getHost} and C{getPeer} methods.
        sstdio_test_hostpeercs6�jd��\}}��|���|�|�tj�dS)Nr?)r�
splitlinesrRrCrrD)r'ZhostZpeerrUrrr(�s

z?StandardInputOutputTests.test_hostAndPeer.<locals>.processEnded�rrr7r<rKrrUr�test_hostAndPeer�s
z)StandardInputOutputTests.test_hostAndPeercs2t���j}���d���fdd�}��||�S)z�
        Verify that the C{write} method of the transport of a protocol
        connected to L{StandardIO} sends bytes to standard out.
        sstdio_test_writecs"���jdd�|�tj�dS�Nr?sok!��assertEqualrrCrrDrOrUrrr(�sz9StandardInputOutputTests.test_write.<locals>.processEndedr[rKrrUr�
test_write�s
z#StandardInputOutputTests.test_writecs2t���j}���d���fdd�}��||�S)z�
        Verify that the C{writeSequence} method of the transport of a
        protocol connected to L{StandardIO} sends bytes to standard out.
        sstdio_test_writeseqcs"���jdd�|�tj�dSr]r^rOrUrrr(szAStandardInputOutputTests.test_writeSequence.<locals>.processEndedr[rKrrUr�test_writeSequence�s
z+StandardInputOutputTests.test_writeSequencec	CsB|��}t|d��&}td�D]}|�t|�d�qW5QRX|S)N�wbi�
)rJr@r	�writer
)r�junkPathZjunkFile�irrr�	_junkPath
s
z"StandardInputOutputTests._junkPathcsdt���j}g�ttd�������fdd�����d���j�������fdd�}��||�S)z�
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IProducer} provider.
        �dcs:�r6��t����d����d�t�d�d�dS)Nrc���g{�G�z�?)�appendr
�poprdr�	callLater)Zign)r�proc�toWrite�writtenrrr sz>StandardInputOutputTests.test_producer.<locals>.connectionMadesstdio_test_producercs>���jdd�������dt��f�|�tj�dS)Nr?r z*Connection lost with %d writes left to go.)r_r�joinZassertFalse�lenrCrrDrO)rIrrnrorrr(*s�z<StandardInputOutputTests.test_producer.<locals>.processEnded)rrr2r	r7rrPr<rKr)rrIrmrrnror�
test_producersz&StandardInputOutputTests.test_producercs>t���j}�������d�����fdd�}��||�S)z�
        Verify that the transport of a protocol connected to L{StandardIO}
        is a working L{IConsumer} provider.
        sstdio_test_consumerc	s<t�d��}���jd|���W5QRX|�tj�dS)N�rbr?)r@r_r�readrCrrD)r'rE�rerIrrrr(?s z<StandardInputOutputTests.test_consumer.<locals>.processEnded)rrrgr7r<rKrrur�
test_consumer3sz&StandardInputOutputTests.test_consumercs�t��}t|�}t��������d��_}��|j	�t
|��d�}t�
�s�t��\}}��tj	|���tj	|�||d<tj|f|��d�t�������fdd��t�d������fdd	�}|�|�|S)
aE
        If L{StandardIO} is created with a file descriptor which refers to a
        normal file (ie, a file from the filesystem), L{StandardIO.write}
        writes bytes to that file.  In particular, it does not immediately
        consider the file closed or call its protocol's C{connectionLost}
        method.
        rb)�stdout�stdin�csB�D],}|�kr���dS��t|��q2qt�d��dS)Nr)ZloseConnectionrdr
rrl)�value)�
connection�count�howMany�spinrrr~eszAStandardInputOutputTests.test_normalFileStandardOut.<locals>.spinrc	s8��t���d������d�ttt�����dS)Nr?r )r_�nextZ
getContentrp�mapr
r	rO)r|r}�pathrrr�cbLostqs
�zCStandardInputOutputTests.test_normalFileStandardOut.<locals>.cbLost)r
rrrZFilePathrJr@�normalZ
addCleanup�close�dict�filenor�	isWindows�os�piperZ
StandardIO�	itertoolsr|rrlrP)rZ
onConnLostr4r��kwargsr>�wr�r)r{r|r}r�rr~r�test_normalFileStandardOutFs&
z3StandardInputOutputTests.test_normalFileStandardOutzpStandardIO does not accept stdout as an argument to Windows.  Testing redirection to a file is therefore harder.N)r)r*r+�skipWindowsNopywin32�skipr7r<rLrQrYr\r`rargrrrvr�rr�rrrrr-\s '3�r-)+r,Z
__future__rrr�r/r�Z
twisted.trialrZtwisted.pythonrrZtwisted.python.reflectrZtwisted.python.runtimerZtwisted.python.compatr	r
rZtwisted.internetrr
rrrZtwisted.test.test_tcprrTr�r�r��environr3�pathseprpr��encode�getfilesystemencodingZProcessProtocolrZTestCaser-rrrr�<module>s0
�.